From 27ab88fa4220a050c23317a40a6c9b82eeafb7cf Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Mon, 4 Dec 2023 13:54:26 -0500 Subject: [PATCH] Change channel events so that created_on is db time and is included in queued task payload --- backends/rapidpro/backend_test.go | 12 ++++-- backends/rapidpro/channel_event.go | 16 ++++---- backends/rapidpro/task.go | 65 +++++------------------------- 3 files changed, 27 insertions(+), 66 deletions(-) diff --git a/backends/rapidpro/backend_test.go b/backends/rapidpro/backend_test.go index 97ee68d50..2ddf5c777 100644 --- a/backends/rapidpro/backend_test.go +++ b/backends/rapidpro/backend_test.go @@ -15,6 +15,7 @@ import ( "testing" "time" + "github.com/buger/jsonparser" "github.com/gomodule/redigo/redis" "github.com/lib/pq" "github.com/nyaruka/courier" @@ -22,6 +23,7 @@ import ( "github.com/nyaruka/courier/test" "github.com/nyaruka/gocommon/dbutil/assertdb" "github.com/nyaruka/gocommon/httpx" + "github.com/nyaruka/gocommon/jsonx" "github.com/nyaruka/gocommon/urns" "github.com/nyaruka/gocommon/uuids" "github.com/nyaruka/null/v3" @@ -1313,7 +1315,8 @@ func (ts *BackendTestSuite) TestMailroomEvents() { clog := courier.NewChannelLog(courier.ChannelLogTypeUnknown, channel, nil) urn, _ := urns.NewTelURNForCountry("12065551616", channel.Country()) - event := ts.b.NewChannelEvent(channel, courier.EventTypeReferral, urn, clog).WithExtra(map[string]string{"ref_id": "12345"}). + event := ts.b.NewChannelEvent(channel, courier.EventTypeReferral, urn, clog). + WithExtra(map[string]string{"ref_id": "12345"}). WithContactName("kermit frog"). WithOccurredOn(time.Date(2020, 8, 5, 13, 30, 0, 123456789, time.UTC)) err := ts.b.WriteChannelEvent(ctx, event, clog) @@ -1322,6 +1325,7 @@ func (ts *BackendTestSuite) TestMailroomEvents() { contact, err := contactForURN(ctx, ts.b, channel.OrgID_, channel, urn, nil, "", clog) ts.NoError(err) ts.Equal(null.String("kermit frog"), contact.Name_) + ts.False(contact.IsNew_) dbE := event.(*ChannelEvent) dbE = readChannelEventFromDB(ts.b, dbE.ID_) @@ -1449,9 +1453,11 @@ func (ts *BackendTestSuite) assertQueuedContactTask(contactID ContactID, expecte data, err := redis.Bytes(rc.Do("LPOP", fmt.Sprintf("c:1:%d", contactID))) ts.NoError(err) + // created_on is usually DB time so exclude it from task body comparison + data = jsonparser.Delete(data, "task", "created_on") + var body map[string]any - err = json.Unmarshal(data, &body) - ts.NoError(err) + jsonx.MustUnmarshal(data, &body) ts.Equal(expectedType, body["type"]) ts.Equal(expectedBody, body["task"]) } diff --git a/backends/rapidpro/channel_event.go b/backends/rapidpro/channel_event.go index e9b58c022..3e86a8b58 100644 --- a/backends/rapidpro/channel_event.go +++ b/backends/rapidpro/channel_event.go @@ -62,7 +62,6 @@ type ChannelEvent struct { // newChannelEvent creates a new channel event func newChannelEvent(channel courier.Channel, eventType courier.ChannelEventType, urn urns.URN, clog *courier.ChannelLog) *ChannelEvent { dbChannel := channel.(*Channel) - now := time.Now().In(time.UTC) return &ChannelEvent{ ChannelUUID_: dbChannel.UUID_, @@ -70,8 +69,7 @@ func newChannelEvent(channel courier.Channel, eventType courier.ChannelEventType ChannelID_: dbChannel.ID_, URN_: urn, EventType_: eventType, - OccurredOn_: now, - CreatedOn_: now, + OccurredOn_: time.Now().In(time.UTC), LogUUIDs: []string{string(clog.UUID())}, channel: dbChannel, @@ -136,11 +134,11 @@ func writeChannelEvent(ctx context.Context, b *backend, event courier.ChannelEve const sqlInsertChannelEvent = ` INSERT INTO - channels_channelevent( org_id, channel_id, contact_id, contact_urn_id, event_type, optin_id, extra, occurred_on, created_on, log_uuids) - VALUES(:org_id, :channel_id, :contact_id, :contact_urn_id, :event_type, :optin_id, :extra, :occurred_on, :created_on, :log_uuids) -RETURNING id` + channels_channelevent( org_id, channel_id, contact_id, contact_urn_id, event_type, optin_id, extra, occurred_on, created_on, log_uuids) + VALUES(:org_id, :channel_id, :contact_id, :contact_urn_id, :event_type, :optin_id, :extra, :occurred_on, NOW(), :log_uuids) +RETURNING id, created_on` -// writeChannelEventToDB writes the passed in msg status to our db +// writeChannelEventToDB writes the passed in channel event to our db func writeChannelEventToDB(ctx context.Context, b *backend, e *ChannelEvent, clog *courier.ChannelLog) error { // grab the contact for this event contact, err := contactForURN(ctx, b, e.OrgID_, e.channel, e.URN_, e.URNAuthTokens_, e.ContactName_, clog) @@ -159,8 +157,8 @@ func writeChannelEventToDB(ctx context.Context, b *backend, e *ChannelEvent, clo defer rows.Close() rows.Next() - err = rows.Scan(&e.ID_) - if err != nil { + + if err = rows.Scan(&e.ID_, &e.CreatedOn_); err != nil { return err } diff --git a/backends/rapidpro/task.go b/backends/rapidpro/task.go index 76dc5ad11..f9967a355 100644 --- a/backends/rapidpro/task.go +++ b/backends/rapidpro/task.go @@ -31,73 +31,30 @@ func queueMsgHandling(rc redis.Conn, c *Contact, m *Msg) error { } func queueChannelEvent(rc redis.Conn, c *Contact, e *ChannelEvent) error { - // queue to mailroom + body := map[string]any{ + "org_id": e.OrgID_, + "contact_id": e.ContactID_, + "urn_id": e.ContactURNID_, + "channel_id": e.ChannelID_, + "extra": e.Extra(), + "new_contact": c.IsNew_, + "occurred_on": e.OccurredOn_, + "created_on": e.CreatedOn_, + } + switch e.EventType() { case courier.EventTypeStopContact: - body := map[string]any{ - "org_id": e.OrgID_, - "contact_id": e.ContactID_, - "occurred_on": e.OccurredOn_, - } return queueMailroomTask(rc, "stop_contact", e.OrgID_, e.ContactID_, body) - case courier.EventTypeWelcomeMessage: - body := map[string]any{ - "org_id": e.OrgID_, - "contact_id": e.ContactID_, - "urn_id": e.ContactURNID_, - "channel_id": e.ChannelID_, - "new_contact": c.IsNew_, - "occurred_on": e.OccurredOn_, - } return queueMailroomTask(rc, "welcome_message", e.OrgID_, e.ContactID_, body) - case courier.EventTypeReferral: - body := map[string]any{ - "org_id": e.OrgID_, - "contact_id": e.ContactID_, - "urn_id": e.ContactURNID_, - "channel_id": e.ChannelID_, - "extra": e.Extra(), - "new_contact": c.IsNew_, - "occurred_on": e.OccurredOn_, - } return queueMailroomTask(rc, "referral", e.OrgID_, e.ContactID_, body) - case courier.EventTypeNewConversation: - body := map[string]any{ - "org_id": e.OrgID_, - "contact_id": e.ContactID_, - "urn_id": e.ContactURNID_, - "channel_id": e.ChannelID_, - "extra": e.Extra(), - "new_contact": c.IsNew_, - "occurred_on": e.OccurredOn_, - } return queueMailroomTask(rc, "new_conversation", e.OrgID_, e.ContactID_, body) - case courier.EventTypeOptIn: - body := map[string]any{ - "org_id": e.OrgID_, - "contact_id": e.ContactID_, - "urn_id": e.ContactURNID_, - "channel_id": e.ChannelID_, - "extra": e.Extra(), - "occurred_on": e.OccurredOn_, - } return queueMailroomTask(rc, "optin", e.OrgID_, e.ContactID_, body) - case courier.EventTypeOptOut: - body := map[string]any{ - "org_id": e.OrgID_, - "contact_id": e.ContactID_, - "urn_id": e.ContactURNID_, - "channel_id": e.ChannelID_, - "extra": e.Extra(), - "occurred_on": e.OccurredOn_, - } return queueMailroomTask(rc, "optout", e.OrgID_, e.ContactID_, body) - default: return fmt.Errorf("unknown event type: %s", e.EventType()) }