diff --git a/processes/consumer/kafka.go b/processes/consumer/kafka.go index 2b3a810e1..60302b714 100644 --- a/processes/consumer/kafka.go +++ b/processes/consumer/kafka.go @@ -104,11 +104,12 @@ func StartConsumer(ctx context.Context, cfg config.Config, inMemDB *models.Datab } tableName, processErr := args.process(ctx, cfg, inMemDB, dest, metricsClient) - msg.EmitIngestionLag(metricsClient, cfg.Mode, kafkaConsumer.Config().GroupID, tableName) - msg.EmitRowLag(metricsClient, cfg.Mode, kafkaConsumer.Config().GroupID, tableName) if processErr != nil { - slog.With(artie.KafkaMsgLogFields(kafkaMsg)...).Warn("Skipping message...", slog.Any("err", processErr)) + logger.Fatal("Failed to process message", slog.Any("err", processErr)) } + + msg.EmitIngestionLag(metricsClient, cfg.Mode, kafkaConsumer.Config().GroupID, tableName) + msg.EmitRowLag(metricsClient, cfg.Mode, kafkaConsumer.Config().GroupID, tableName) } }(topic) } diff --git a/processes/consumer/pubsub.go b/processes/consumer/pubsub.go index 2b11d111f..0720ea31b 100644 --- a/processes/consumer/pubsub.go +++ b/processes/consumer/pubsub.go @@ -87,13 +87,6 @@ func StartSubscriber(ctx context.Context, cfg config.Config, inMemDB *models.Dat for { err = sub.Receive(ctx, func(_ context.Context, pubsubMsg *gcp_pubsub.Message) { msg := artie.NewMessage(nil, pubsubMsg, topic) - logFields := []any{ - slog.String("topic", msg.Topic()), - slog.String("msgID", msg.PubSub.ID), - slog.String("key", string(msg.Key())), - slog.String("value", string(msg.Value())), - } - args := processArgs{ Msg: msg, GroupID: subName, @@ -103,7 +96,7 @@ func StartSubscriber(ctx context.Context, cfg config.Config, inMemDB *models.Dat tableName, processErr := args.process(ctx, cfg, inMemDB, dest, metricsClient) msg.EmitIngestionLag(metricsClient, cfg.Mode, subName, tableName) if processErr != nil { - slog.With(logFields...).Warn("Skipping message...", slog.Any("err", processErr)) + logger.Fatal("Failed to process message", slog.Any("err", processErr)) } })