Skip to content

Commit

Permalink
Panic if failed to process message (#822)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Jul 29, 2024
1 parent 9fc1e8a commit cad919a
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 11 deletions.
7 changes: 4 additions & 3 deletions processes/consumer/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,12 @@ func StartConsumer(ctx context.Context, cfg config.Config, inMemDB *models.Datab
}

tableName, processErr := args.process(ctx, cfg, inMemDB, dest, metricsClient)
msg.EmitIngestionLag(metricsClient, cfg.Mode, kafkaConsumer.Config().GroupID, tableName)
msg.EmitRowLag(metricsClient, cfg.Mode, kafkaConsumer.Config().GroupID, tableName)
if processErr != nil {
slog.With(artie.KafkaMsgLogFields(kafkaMsg)...).Warn("Skipping message...", slog.Any("err", processErr))
logger.Fatal("Failed to process message", slog.Any("err", processErr))
}

msg.EmitIngestionLag(metricsClient, cfg.Mode, kafkaConsumer.Config().GroupID, tableName)
msg.EmitRowLag(metricsClient, cfg.Mode, kafkaConsumer.Config().GroupID, tableName)
}
}(topic)
}
Expand Down
9 changes: 1 addition & 8 deletions processes/consumer/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,6 @@ func StartSubscriber(ctx context.Context, cfg config.Config, inMemDB *models.Dat
for {
err = sub.Receive(ctx, func(_ context.Context, pubsubMsg *gcp_pubsub.Message) {
msg := artie.NewMessage(nil, pubsubMsg, topic)
logFields := []any{
slog.String("topic", msg.Topic()),
slog.String("msgID", msg.PubSub.ID),
slog.String("key", string(msg.Key())),
slog.String("value", string(msg.Value())),
}

args := processArgs{
Msg: msg,
GroupID: subName,
Expand All @@ -103,7 +96,7 @@ func StartSubscriber(ctx context.Context, cfg config.Config, inMemDB *models.Dat
tableName, processErr := args.process(ctx, cfg, inMemDB, dest, metricsClient)
msg.EmitIngestionLag(metricsClient, cfg.Mode, subName, tableName)
if processErr != nil {
slog.With(logFields...).Warn("Skipping message...", slog.Any("err", processErr))
logger.Fatal("Failed to process message", slog.Any("err", processErr))
}
})

Expand Down

0 comments on commit cad919a

Please sign in to comment.