Skip to content

Commit

Permalink
Merge pull request #667 from nyaruka/channel_event_fix
Browse files Browse the repository at this point in the history
Change channel events so that created_on is db time and is included in queued task payload
  • Loading branch information
rowanseymour authored Dec 4, 2023
2 parents 98ea509 + 27ab88f commit 1445bf8
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 66 deletions.
12 changes: 9 additions & 3 deletions backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ import (
"testing"
"time"

"github.com/buger/jsonparser"
"github.com/gomodule/redigo/redis"
"github.com/lib/pq"
"github.com/nyaruka/courier"
"github.com/nyaruka/courier/queue"
"github.com/nyaruka/courier/test"
"github.com/nyaruka/gocommon/dbutil/assertdb"
"github.com/nyaruka/gocommon/httpx"
"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/gocommon/uuids"
"github.com/nyaruka/null/v3"
Expand Down Expand Up @@ -1313,7 +1315,8 @@ func (ts *BackendTestSuite) TestMailroomEvents() {
clog := courier.NewChannelLog(courier.ChannelLogTypeUnknown, channel, nil)
urn, _ := urns.NewTelURNForCountry("12065551616", channel.Country())

event := ts.b.NewChannelEvent(channel, courier.EventTypeReferral, urn, clog).WithExtra(map[string]string{"ref_id": "12345"}).
event := ts.b.NewChannelEvent(channel, courier.EventTypeReferral, urn, clog).
WithExtra(map[string]string{"ref_id": "12345"}).
WithContactName("kermit frog").
WithOccurredOn(time.Date(2020, 8, 5, 13, 30, 0, 123456789, time.UTC))
err := ts.b.WriteChannelEvent(ctx, event, clog)
Expand All @@ -1322,6 +1325,7 @@ func (ts *BackendTestSuite) TestMailroomEvents() {
contact, err := contactForURN(ctx, ts.b, channel.OrgID_, channel, urn, nil, "", clog)
ts.NoError(err)
ts.Equal(null.String("kermit frog"), contact.Name_)
ts.False(contact.IsNew_)

dbE := event.(*ChannelEvent)
dbE = readChannelEventFromDB(ts.b, dbE.ID_)
Expand Down Expand Up @@ -1449,9 +1453,11 @@ func (ts *BackendTestSuite) assertQueuedContactTask(contactID ContactID, expecte
data, err := redis.Bytes(rc.Do("LPOP", fmt.Sprintf("c:1:%d", contactID)))
ts.NoError(err)

// created_on is usually DB time so exclude it from task body comparison
data = jsonparser.Delete(data, "task", "created_on")

var body map[string]any
err = json.Unmarshal(data, &body)
ts.NoError(err)
jsonx.MustUnmarshal(data, &body)
ts.Equal(expectedType, body["type"])
ts.Equal(expectedBody, body["task"])
}
Expand Down
16 changes: 7 additions & 9 deletions backends/rapidpro/channel_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,14 @@ type ChannelEvent struct {
// newChannelEvent creates a new channel event
func newChannelEvent(channel courier.Channel, eventType courier.ChannelEventType, urn urns.URN, clog *courier.ChannelLog) *ChannelEvent {
dbChannel := channel.(*Channel)
now := time.Now().In(time.UTC)

return &ChannelEvent{
ChannelUUID_: dbChannel.UUID_,
OrgID_: dbChannel.OrgID_,
ChannelID_: dbChannel.ID_,
URN_: urn,
EventType_: eventType,
OccurredOn_: now,
CreatedOn_: now,
OccurredOn_: time.Now().In(time.UTC),
LogUUIDs: []string{string(clog.UUID())},

channel: dbChannel,
Expand Down Expand Up @@ -136,11 +134,11 @@ func writeChannelEvent(ctx context.Context, b *backend, event courier.ChannelEve

const sqlInsertChannelEvent = `
INSERT INTO
channels_channelevent( org_id, channel_id, contact_id, contact_urn_id, event_type, optin_id, extra, occurred_on, created_on, log_uuids)
VALUES(:org_id, :channel_id, :contact_id, :contact_urn_id, :event_type, :optin_id, :extra, :occurred_on, :created_on, :log_uuids)
RETURNING id`
channels_channelevent( org_id, channel_id, contact_id, contact_urn_id, event_type, optin_id, extra, occurred_on, created_on, log_uuids)
VALUES(:org_id, :channel_id, :contact_id, :contact_urn_id, :event_type, :optin_id, :extra, :occurred_on, NOW(), :log_uuids)
RETURNING id, created_on`

// writeChannelEventToDB writes the passed in msg status to our db
// writeChannelEventToDB writes the passed in channel event to our db
func writeChannelEventToDB(ctx context.Context, b *backend, e *ChannelEvent, clog *courier.ChannelLog) error {
// grab the contact for this event
contact, err := contactForURN(ctx, b, e.OrgID_, e.channel, e.URN_, e.URNAuthTokens_, e.ContactName_, clog)
Expand All @@ -159,8 +157,8 @@ func writeChannelEventToDB(ctx context.Context, b *backend, e *ChannelEvent, clo
defer rows.Close()

rows.Next()
err = rows.Scan(&e.ID_)
if err != nil {

if err = rows.Scan(&e.ID_, &e.CreatedOn_); err != nil {
return err
}

Expand Down
65 changes: 11 additions & 54 deletions backends/rapidpro/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,73 +31,30 @@ func queueMsgHandling(rc redis.Conn, c *Contact, m *Msg) error {
}

func queueChannelEvent(rc redis.Conn, c *Contact, e *ChannelEvent) error {
// queue to mailroom
body := map[string]any{
"org_id": e.OrgID_,
"contact_id": e.ContactID_,
"urn_id": e.ContactURNID_,
"channel_id": e.ChannelID_,
"extra": e.Extra(),
"new_contact": c.IsNew_,
"occurred_on": e.OccurredOn_,
"created_on": e.CreatedOn_,
}

switch e.EventType() {
case courier.EventTypeStopContact:
body := map[string]any{
"org_id": e.OrgID_,
"contact_id": e.ContactID_,
"occurred_on": e.OccurredOn_,
}
return queueMailroomTask(rc, "stop_contact", e.OrgID_, e.ContactID_, body)

case courier.EventTypeWelcomeMessage:
body := map[string]any{
"org_id": e.OrgID_,
"contact_id": e.ContactID_,
"urn_id": e.ContactURNID_,
"channel_id": e.ChannelID_,
"new_contact": c.IsNew_,
"occurred_on": e.OccurredOn_,
}
return queueMailroomTask(rc, "welcome_message", e.OrgID_, e.ContactID_, body)

case courier.EventTypeReferral:
body := map[string]any{
"org_id": e.OrgID_,
"contact_id": e.ContactID_,
"urn_id": e.ContactURNID_,
"channel_id": e.ChannelID_,
"extra": e.Extra(),
"new_contact": c.IsNew_,
"occurred_on": e.OccurredOn_,
}
return queueMailroomTask(rc, "referral", e.OrgID_, e.ContactID_, body)

case courier.EventTypeNewConversation:
body := map[string]any{
"org_id": e.OrgID_,
"contact_id": e.ContactID_,
"urn_id": e.ContactURNID_,
"channel_id": e.ChannelID_,
"extra": e.Extra(),
"new_contact": c.IsNew_,
"occurred_on": e.OccurredOn_,
}
return queueMailroomTask(rc, "new_conversation", e.OrgID_, e.ContactID_, body)

case courier.EventTypeOptIn:
body := map[string]any{
"org_id": e.OrgID_,
"contact_id": e.ContactID_,
"urn_id": e.ContactURNID_,
"channel_id": e.ChannelID_,
"extra": e.Extra(),
"occurred_on": e.OccurredOn_,
}
return queueMailroomTask(rc, "optin", e.OrgID_, e.ContactID_, body)

case courier.EventTypeOptOut:
body := map[string]any{
"org_id": e.OrgID_,
"contact_id": e.ContactID_,
"urn_id": e.ContactURNID_,
"channel_id": e.ChannelID_,
"extra": e.Extra(),
"occurred_on": e.OccurredOn_,
}
return queueMailroomTask(rc, "optout", e.OrgID_, e.ContactID_, body)

default:
return fmt.Errorf("unknown event type: %s", e.EventType())
}
Expand Down

0 comments on commit 1445bf8

Please sign in to comment.