Skip to content

Commit

Permalink
Clean up more.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Jul 4, 2024
1 parent e8a0665 commit 5885afe
Showing 1 changed file with 32 additions and 31 deletions.
63 changes: 32 additions & 31 deletions sources/mongo/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ func newStreamingIterator(ctx context.Context, db *mongo.Database, cfg config.Mo
opts.SetResumeAfter(decodedBytes)
}

// TODO: Think about full document delete
cs, err := db.Watch(ctx, pipeline, opts)
if err != nil {
return nil, fmt.Errorf("failed to start change stream: %w", err)
Expand Down Expand Up @@ -100,47 +99,49 @@ func (s *streaming) Next() ([]lib.RawMessage, error) {
}

fmt.Println("?changeEvent", rawChangeEvent)
collection, watching := s.collectionsToWatchMap[changeEvent.Collection()]
if !watching {
continue
}

if collection, watching := s.collectionsToWatchMap[changeEvent.Collection()]; watching {
var err error
var msg *Message
switch changeEvent.Operation() {
case "delete":
msg, err = ParseMessage(bson.M{"_id": changeEvent.ObjectID()}, "d")
case "insert":
fullDocument, err := changeEvent.FullDocument()
if err != nil {
return nil, fmt.Errorf("failed to get fullDocument from change event: %v", changeEvent)
}

msg, err = ParseMessage(fullDocument, "c")
case "update":
fullDocument, err := changeEvent.FullDocument()
if err != nil {
return nil, fmt.Errorf("failed to get fullDocument from change event: %v", changeEvent)
}

msg, err = ParseMessage(fullDocument, "u")
default:
return nil, fmt.Errorf("unsupported operation type: %s", changeEvent.Operation())
}

var msg *Message
switch changeEvent.Operation() {
case "delete":
// TODO: Think about providing the `before` row for a deleted event.
msg, err = ParseMessage(bson.M{"_id": changeEvent.ObjectID()}, "d")
case "insert":
fullDocument, err := changeEvent.FullDocument()
if err != nil {
return nil, fmt.Errorf("failed to parse message: %w", err)
return nil, fmt.Errorf("failed to get fullDocument from change event: %v", changeEvent)
}

rawMessage, err := msg.ToRawMessage(collection, s.cfg.Database)
msg, err = ParseMessage(fullDocument, "c")

Check failure on line 118 in sources/mongo/streaming.go

View workflow job for this annotation

GitHub Actions / test

this value of err is never used (SA4006)
case "update":
fullDocument, err := changeEvent.FullDocument()
if err != nil {
return nil, fmt.Errorf("failed to convert message to raw message: %w", err)
return nil, fmt.Errorf("failed to get fullDocument from change event: %v", changeEvent)
}

rawMsgs = append(rawMsgs, rawMessage)
msg, err = ParseMessage(fullDocument, "u")

Check failure on line 125 in sources/mongo/streaming.go

View workflow job for this annotation

GitHub Actions / test

this value of err is never used (SA4006)
default:
return nil, fmt.Errorf("unsupported operation type: %s", changeEvent.Operation())
}

if err != nil {
return nil, fmt.Errorf("failed to parse message: %w", err)
}

rawMessage, err := msg.ToRawMessage(collection, s.cfg.Database)
if err != nil {
return nil, fmt.Errorf("failed to convert message to raw message: %w", err)
}

rawMsgs = append(rawMsgs, rawMessage)
}

if len(rawMsgs) == 0 {
// If no messages, let's sleep for a while before checking again
time.Sleep(1 * time.Second)
// If there are no messages, let's sleep a bit before we try again
time.Sleep(2 * time.Second)
}

return rawMsgs, nil
Expand Down

0 comments on commit 5885afe

Please sign in to comment.