diff --git a/processes/consumer/kafka.go b/processes/consumer/kafka.go index df20cb8e7..55cee2dc3 100644 --- a/processes/consumer/kafka.go +++ b/processes/consumer/kafka.go @@ -105,7 +105,7 @@ func StartConsumer(ctx context.Context, cfg config.Config, inMemDB *models.Datab tableName, processErr := args.process(ctx, cfg, inMemDB, dest, metricsClient) if processErr != nil { - logger.Fatal("Failed to process message", slog.Any("err", processErr)) + logger.Fatal("Failed to process message", slog.Any("err", processErr), slog.String("topic", kafkaMsg.Topic)) } msg.EmitIngestionLag(metricsClient, cfg.Mode, kafkaConsumer.Config().GroupID, tableName)