Skip to content

Commit

Permalink
feat(metadata-jobs): improve consumer logging (datahub-project#10173)
Browse files Browse the repository at this point in the history
  • Loading branch information
darnaut authored Mar 31, 2024
1 parent 7f7d713 commit c3257d8
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -628,12 +628,20 @@ public List<UpdateAspectResult> ingestAspects(
public List<UpdateAspectResult> ingestAspects(
@Nonnull final AspectsBatch aspectsBatch, boolean emitMCL, boolean overwrite) {

// Skip DB timer for empty batch
if (aspectsBatch.getItems().size() == 0) {
return Collections.emptyList();
}

log.info("Ingesting aspects batch to database, items: {}", aspectsBatch.getItems());
Timer.Context ingestToLocalDBTimer =
MetricUtils.timer(this.getClass(), "ingestAspectsToLocalDB").time();
List<UpdateAspectResult> ingestResults = ingestAspectsToLocalDB(aspectsBatch, overwrite);
List<UpdateAspectResult> mclResults = emitMCL(ingestResults, emitMCL);
ingestToLocalDBTimer.stop();
long took = ingestToLocalDBTimer.stop();
log.info(
"Ingestion of aspects batch to database took {} ms", TimeUnit.NANOSECONDS.toMillis(took));

List<UpdateAspectResult> mclResults = emitMCL(ingestResults, emitMCL);
return mclResults;
}

Expand Down Expand Up @@ -1505,10 +1513,7 @@ public Optional<Pair<Future<?>, Boolean>> conditionallyProduceMCLAsync(
AspectSpec aspectSpec) {
boolean isNoOp = oldAspect == newAspect;
if (!isNoOp || alwaysEmitChangeLog || shouldAspectEmitChangeLog(aspectSpec)) {
log.debug(
"Producing MetadataChangeLog for ingested aspect {}, urn {}",
aspectSpec.getName(),
entityUrn);
log.info("Producing MCL for ingested aspect {}, urn {}", aspectSpec.getName(), entityUrn);

final MetadataChangeLog metadataChangeLog =
constructMCL(
Expand All @@ -1528,8 +1533,8 @@ public Optional<Pair<Future<?>, Boolean>> conditionallyProduceMCLAsync(
alwaysProduceMCLAsync(entityUrn, aspectSpec, metadataChangeLog);
return emissionStatus.getFirst() != null ? Optional.of(emissionStatus) : Optional.empty();
} else {
log.debug(
"Skipped producing MetadataChangeLog for ingested aspect {}, urn {}. Aspect has not changed.",
log.info(
"Skipped producing MCL for ingested aspect {}, urn {}. Aspect has not changed.",
aspectSpec.getName(),
entityUrn);
return Optional.empty();
Expand Down Expand Up @@ -1636,7 +1641,7 @@ private void ingestSnapshotUnion(
final List<Pair<String, RecordTemplate>> aspectRecordsToIngest =
NewModelUtils.getAspectsFromSnapshot(snapshotRecord);

log.info("INGEST urn {} with system metadata {}", urn, systemMetadata.toString());
log.info("Ingesting entity urn {} with system metadata {}", urn, systemMetadata.toString());

AspectsBatchImpl aspectsBatch =
AspectsBatchImpl.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ public void appendRunId(@Nonnull String entityName, @Nonnull Urn urn, @Nullable
return;
}
final String docId = maybeDocId.get();
log.debug(String.format("Appending run id for entityName: %s, docId: %s", entityName, docId));
log.info(
"Appending run id for entity name: {}, doc id: {}, run id: {}", entityName, docId, runId);
esWriteDAO.applyScriptUpdate(
entityName,
docId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon

if (response.hasFailures()) {
log.error(
"Failed to feed bulk request. Number of events: "
"Failed to feed bulk request "
+ executionId
+ "."
+ " Number of events: "
+ response.getItems().length
+ " Took time ms: "
+ response.getTook().getMillis()
Expand All @@ -56,7 +59,10 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon
+ response.buildFailureMessage());
} else {
log.info(
"Successfully fed bulk request. Number of events: "
"Successfully fed bulk request "
+ executionId
+ "."
+ " Number of events: "
+ response.getItems().length
+ " Took time ms: "
+ response.getTook().getMillis()
Expand All @@ -69,7 +75,8 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
// Exception raised outside this method
log.error(
"Error feeding bulk request. No retries left. Request: {}",
"Error feeding bulk request {}. No retries left. Request: {}",
executionId,
buildBulkRequestSummary(request),
failure);
incrementMetrics(request, failure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ private ESBulkProcessor(
public ESBulkProcessor add(DocWriteRequest<?> request) {
MetricUtils.counter(this.getClass(), ES_WRITES_METRIC).inc();
bulkProcessor.add(request);
log.info(
"Added request id: {}, operation type: {}, index: {}",
request.id(),
request.opType(),
request.index());
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,15 @@ public void consume(final ConsumerRecord<String, String> consumerRecord) {
try (Timer.Context i = MetricUtils.timer(this.getClass(), "consume").time()) {
kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp());
final String record = consumerRecord.value();
log.debug("Got DHUE");

log.info(
"Got DHUE event key: {}, topic: {}, partition: {}, offset: {}, value size: {}, timestamp: {}",
consumerRecord.key(),
consumerRecord.topic(),
consumerRecord.partition(),
consumerRecord.offset(),
consumerRecord.serializedValueSize(),
consumerRecord.timestamp());

Optional<DataHubUsageEventTransformer.TransformedDocument> eventDocument =
dataHubUsageEventTransformer.transformDataHubUsageEvent(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,14 @@ public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord)
try (Timer.Context i = MetricUtils.timer(this.getClass(), "consume").time()) {
kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp());
final GenericRecord record = consumerRecord.value();
log.debug(
"Got Generic MCL on topic: {}, partition: {}, offset: {}",
log.info(
"Got MCL event key: {}, topic: {}, partition: {}, offset: {}, value size: {}, timestamp: {}",
consumerRecord.key(),
consumerRecord.topic(),
consumerRecord.partition(),
consumerRecord.offset());
consumerRecord.offset(),
consumerRecord.serializedValueSize(),
consumerRecord.timestamp());
MetricUtils.counter(this.getClass(), "received_mcl_count").inc();

MetadataChangeLog event;
Expand All @@ -96,17 +99,23 @@ public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord)
return;
}

log.debug(
"Invoking MCL hooks for urn: {}, key: {}",
log.info(
"Invoking MCL hooks for urn: {}, aspect name: {}, entity type: {}, change type: {}",
event.getEntityUrn(),
event.getEntityKeyAspect());
event.hasAspectName() ? event.getAspectName() : null,
event.hasEntityType() ? event.getEntityType() : null,
event.hasChangeType() ? event.getChangeType() : null);

// Here - plug in additional "custom processor hooks"
for (MetadataChangeLogHook hook : this.hooks) {
if (!hook.isEnabled()) {
log.debug(String.format("Skipping disabled hook %s", hook.getClass()));
log.info(String.format("Skipping disabled hook %s", hook.getClass()));
continue;
}
log.info(
"Invoking MCL hook {} for urn: {}",
hook.getClass().getSimpleName(),
event.getEntityUrn());
try (Timer.Context ignored =
MetricUtils.timer(this.getClass(), hook.getClass().getSimpleName() + "_latency")
.time()) {
Expand All @@ -121,10 +130,7 @@ public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord)
}
// TODO: Manually commit kafka offsets after full processing.
MetricUtils.counter(this.getClass(), "consumed_mcl_count").inc();
log.debug(
"Successfully completed MCL hooks for urn: {}, key: {}",
event.getEntityUrn(),
event.getEntityKeyAspect());
log.info("Successfully completed MCL hooks for urn: {}", event.getEntityUrn());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord)
try (Timer.Context i = MetricUtils.timer(this.getClass(), "consume").time()) {
kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp());
final GenericRecord record = consumerRecord.value();

log.info(
"Got MCE event key: {}, topic: {}, partition: {}, offset: {}, value size: {}, timestamp: {}",
consumerRecord.key(),
consumerRecord.topic(),
consumerRecord.partition(),
consumerRecord.offset(),
consumerRecord.serializedValueSize(),
consumerRecord.timestamp());

log.debug("Record {}", record);

MetadataChangeEvent event = new MetadataChangeEvent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,25 @@ public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord)
try (Timer.Context ignored = MetricUtils.timer(this.getClass(), "consume").time()) {
kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp());
final GenericRecord record = consumerRecord.value();

log.info(
"Got MCP event key: {}, topic: {}, partition: {}, offset: {}, value size: {}, timestamp: {}",
consumerRecord.key(),
consumerRecord.topic(),
consumerRecord.partition(),
consumerRecord.offset(),
consumerRecord.serializedValueSize(),
consumerRecord.timestamp());

log.debug("Record {}", record);

MetadataChangeProposal event = new MetadataChangeProposal();
try {
event = EventUtils.avroToPegasusMCP(record);
log.debug("MetadataChangeProposal {}", event);
// TODO: Get this from the event itself.
entityClient.ingestProposal(event, false);
String urn = entityClient.ingestProposal(event, false);
log.info("Successfully processed MCP event urn: {}", urn);
} catch (Throwable throwable) {
log.error("MCP Processor Error", throwable);
log.error("Message: {}", record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,14 @@ public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord)

kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp());
final GenericRecord record = consumerRecord.value();
log.debug(
"Got Generic PE on topic: {}, partition: {}, offset: {}",
log.info(
"Got PE event key: {}, topic: {}, partition: {}, offset: {}, value size: {}, timestamp: {}",
consumerRecord.key(),
consumerRecord.topic(),
consumerRecord.partition(),
consumerRecord.offset());
consumerRecord.offset(),
consumerRecord.serializedValueSize(),
consumerRecord.timestamp());
MetricUtils.counter(this.getClass(), "received_pe_count").inc();

PlatformEvent event;
Expand All @@ -68,9 +71,13 @@ public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord)
return;
}

log.debug("Invoking PE hooks for event name {}", event.getName());
log.info("Invoking PE hooks for event name {}", event.getName());

for (PlatformEventHook hook : this.hooks) {
log.info(
"Invoking PE hook {} for event name {}",
hook.getClass().getSimpleName(),
event.getName());
try (Timer.Context ignored =
MetricUtils.timer(this.getClass(), hook.getClass().getSimpleName() + "_latency")
.time()) {
Expand All @@ -83,7 +90,7 @@ public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord)
}
}
MetricUtils.counter(this.getClass(), "consumed_pe_count").inc();
log.debug("Successfully completed PE hooks for event with name {}", event.getName());
log.info("Successfully completed PE hooks for event with name {}", event.getName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void testAsyncDefaultAspects() throws URISyntaxException {
.request(req)
.build())));
aspectResource.ingestProposal(mcp, "false");
verify(producer, times(10))
verify(producer, times(5))
.produceMetadataChangeLog(eq(urn), any(AspectSpec.class), any(MetadataChangeLog.class));
verifyNoMoreInteractions(producer);
}
Expand Down

0 comments on commit c3257d8

Please sign in to comment.