Skip to content

Commit

Permalink
Chekcpoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Jul 1, 2024
1 parent 8b035cb commit a750c0a
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 46 deletions.
101 changes: 101 additions & 0 deletions lib/mongo/change_event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package mongo

import (
"fmt"
"go.mongodb.org/mongo-driver/bson"
)

type ChangeEvent struct {
operationType string
documentKey bson.M
collection string
objectID any

fullDocument *bson.M
}

func NewChangeEvent(rawChangeEvent bson.M) (*ChangeEvent, error) {
operationType, isOk := rawChangeEvent["operationType"]
if !isOk {
return nil, fmt.Errorf("failed to get operationType from change event: %v", rawChangeEvent)
}

castedOperationType, isOk := operationType.(string)
if !isOk {
return nil, fmt.Errorf("expected operationType to be string, got: %T", operationType)
}

documentKey, isOk := rawChangeEvent["documentKey"]
if !isOk {
return nil, fmt.Errorf("failed to get documentKey from change event: %v", rawChangeEvent)
}

castedDocumentKey, isOk := documentKey.(bson.M)
if !isOk {
return nil, fmt.Errorf("expected documentKey to be bson.M, got: %T", documentKey)
}

ns, isOk := rawChangeEvent["ns"]
if !isOk {
return nil, fmt.Errorf("failed to get namespace from change event: %v", rawChangeEvent)
}

nsBsonM, isOk := ns.(bson.M)
if !isOk {
return nil, fmt.Errorf("expected ns to be bson.M, got: %T", ns)
}

coll, isOk := nsBsonM["coll"]
if !isOk {
return nil, fmt.Errorf("failed to get collection from change event: %v", rawChangeEvent)
}

collString, isOk := coll.(string)
if !isOk {
return nil, fmt.Errorf("expected collection to be string, got: %T", coll)
}

objectID, isOk := castedDocumentKey["_id"]
if !isOk {
return nil, fmt.Errorf("failed to get _id from documentKey: %v", castedDocumentKey)
}

changeEvent := &ChangeEvent{
operationType: castedOperationType,
documentKey: castedDocumentKey,
collection: collString,
objectID: objectID,
}

fullDoc, isOk := rawChangeEvent["fullDocument"]
if isOk {
castedFullDocument, isOk := fullDoc.(bson.M)
if !isOk {
return nil, fmt.Errorf("expected fullDocument to be bson.M, got: %T", fullDoc)
}

changeEvent.fullDocument = &castedFullDocument
}

return changeEvent, nil
}

func (c ChangeEvent) ObjectID() any {
return c.objectID
}

func (c ChangeEvent) Operation() string {
return c.operationType
}

func (c ChangeEvent) Collection() string {
return c.collection
}

func (c ChangeEvent) FullDocument() (bson.M, error) {
if c.fullDocument == nil {
return nil, fmt.Errorf("fullDocument is not present")
}

return *c.fullDocument, nil
}
72 changes: 26 additions & 46 deletions sources/mongo/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/constants"
"github.com/artie-labs/reader/lib"
mongolib "github.com/artie-labs/reader/lib/mongo"
"github.com/artie-labs/reader/lib/persistedmap"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
Expand Down Expand Up @@ -41,7 +42,10 @@ func newStreamingIterator(ctx context.Context, db *mongo.Database, cfg config.Mo
}}},
}

opts := options.ChangeStream()
// Setting `updateLookup` will emit the whole document for updates
// Ref: https://www.mongodb.com/docs/manual/reference/change-events/update/#description
opts := options.ChangeStream().SetFullDocument(options.UpdateLookup)

storage := persistedmap.NewPersistedMap(filePath)
if resumeToken, exists := storage.Get(offsetKey); exists {
castedResumeToken, isOk := resumeToken.(bson.Raw)
Expand Down Expand Up @@ -78,64 +82,40 @@ func (s *streaming) HasNext() bool {
func (s *streaming) Next() ([]lib.RawMessage, error) {
var rawMsgs []lib.RawMessage
if s.batchSize > int32(len(rawMsgs)) && s.changeStream.Next(s.ctx) {
var changeEvent bson.M
if err := s.changeStream.Decode(&changeEvent); err != nil {
var rawChangeEvent bson.M
if err := s.changeStream.Decode(&rawChangeEvent); err != nil {
return nil, fmt.Errorf("failed to decode change event: %v", err)
}

s.offsets.Set(offsetKey, s.changeStream.ResumeToken())
ns, isOk := changeEvent["ns"]
if !isOk {
return nil, fmt.Errorf("failed to get namespace from change event: %v", changeEvent)
}

nsBsonM, isOk := ns.(bson.M)
if !isOk {
return nil, fmt.Errorf("expected ns to be bson.M, got: %T", ns)
}

coll, isOk := nsBsonM["coll"]
if !isOk {
return nil, fmt.Errorf("failed to get collection from change event: %v", changeEvent)
}

collString, isOk := coll.(string)
if !isOk {
return nil, fmt.Errorf("expected collection to be string, got: %T", coll)
changeEvent, err := mongolib.NewChangeEvent(rawChangeEvent)
if err != nil {
return nil, fmt.Errorf("failed to parse change event: %w", err)
}

if collection, watching := s.collectionsToWatchMap[collString]; watching {
documentKey, isOk := changeEvent["documentKey"]
if !isOk {
return nil, fmt.Errorf("failed to get documentKey from change event: %v", changeEvent)
}

documentKeyBsonM, isOk := documentKey.(bson.M)
if !isOk {
return nil, fmt.Errorf("expected documentKey to be bson.M, got: %T", documentKey)
}

operationType, isOk := changeEvent["operationType"]
if !isOk {
return nil, fmt.Errorf("failed to get operationType from change event: %v", changeEvent)
}

operationTypeString, isOk := operationType.(string)
if !isOk {
return nil, fmt.Errorf("expected operationType to be string, got: %T", operationType)
}

if collection, watching := s.collectionsToWatchMap[changeEvent.Collection()]; watching {
var err error
var msg *Message
switch operationTypeString {
switch changeEvent.Operation() {
case "delete":
msg, err = ParseMessage(bson.M{"_id": documentKeyBsonM}, "d")
msg, err = ParseMessage(bson.M{"_id": changeEvent.ObjectID()}, "d")
case "insert":
msg, err = ParseMessage(changeEvent["fullDocument"].(bson.M), "c")
fullDocument, err := changeEvent.FullDocument()
if err != nil {
return nil, fmt.Errorf("failed to get fullDocument from change event: %v", changeEvent)
}

msg, err = ParseMessage(fullDocument, "c")

Check failure on line 109 in sources/mongo/streaming.go

View workflow job for this annotation

GitHub Actions / test

this value of err is never used (SA4006)
case "update":
msg, err = ParseMessage(changeEvent["fullDocument"].(bson.M), "u")
fullDocument, err := changeEvent.FullDocument()
if err != nil {
return nil, fmt.Errorf("failed to get fullDocument from change event: %v", changeEvent)
}

msg, err = ParseMessage(fullDocument, "u")

Check failure on line 116 in sources/mongo/streaming.go

View workflow job for this annotation

GitHub Actions / test

this value of err is never used (SA4006)
default:
return nil, fmt.Errorf("unsupported operation type: %s", operationTypeString)
return nil, fmt.Errorf("unsupported operation type: %s", changeEvent.Operation())
}

if err != nil {
Expand Down

0 comments on commit a750c0a

Please sign in to comment.