From c3257d81c80af69b0b9772f30436bf32113d92f2 Mon Sep 17 00:00:00 2001 From: Davi Arnaut Date: Sun, 31 Mar 2024 08:07:58 -0700 Subject: [PATCH] feat(metadata-jobs): improve consumer logging (#10173) --- .../metadata/entity/EntityServiceImpl.java | 23 +++++++++------ .../elasticsearch/ElasticSearchService.java | 3 +- .../elasticsearch/update/BulkListener.java | 13 +++++++-- .../elasticsearch/update/ESBulkProcessor.java | 5 ++++ .../kafka/DataHubUsageEventsProcessor.java | 10 ++++++- .../kafka/MetadataChangeLogProcessor.java | 28 +++++++++++-------- .../kafka/MetadataChangeEventsProcessor.java | 10 +++++++ .../MetadataChangeProposalsProcessor.java | 13 ++++++++- .../datahub/event/PlatformEventProcessor.java | 17 +++++++---- .../resources/entity/AspectResourceTest.java | 2 +- 10 files changed, 92 insertions(+), 32 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index 61d48c72f4341d..7f11170d12e726 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -628,12 +628,20 @@ public List ingestAspects( public List ingestAspects( @Nonnull final AspectsBatch aspectsBatch, boolean emitMCL, boolean overwrite) { + // Skip DB timer for empty batch + if (aspectsBatch.getItems().size() == 0) { + return Collections.emptyList(); + } + + log.info("Ingesting aspects batch to database, items: {}", aspectsBatch.getItems()); Timer.Context ingestToLocalDBTimer = MetricUtils.timer(this.getClass(), "ingestAspectsToLocalDB").time(); List ingestResults = ingestAspectsToLocalDB(aspectsBatch, overwrite); - List mclResults = emitMCL(ingestResults, emitMCL); - ingestToLocalDBTimer.stop(); + long took = ingestToLocalDBTimer.stop(); + log.info( + "Ingestion of aspects batch to database took {} ms", TimeUnit.NANOSECONDS.toMillis(took)); + List mclResults = emitMCL(ingestResults, emitMCL); return mclResults; } @@ -1505,10 +1513,7 @@ public Optional, Boolean>> conditionallyProduceMCLAsync( AspectSpec aspectSpec) { boolean isNoOp = oldAspect == newAspect; if (!isNoOp || alwaysEmitChangeLog || shouldAspectEmitChangeLog(aspectSpec)) { - log.debug( - "Producing MetadataChangeLog for ingested aspect {}, urn {}", - aspectSpec.getName(), - entityUrn); + log.info("Producing MCL for ingested aspect {}, urn {}", aspectSpec.getName(), entityUrn); final MetadataChangeLog metadataChangeLog = constructMCL( @@ -1528,8 +1533,8 @@ public Optional, Boolean>> conditionallyProduceMCLAsync( alwaysProduceMCLAsync(entityUrn, aspectSpec, metadataChangeLog); return emissionStatus.getFirst() != null ? Optional.of(emissionStatus) : Optional.empty(); } else { - log.debug( - "Skipped producing MetadataChangeLog for ingested aspect {}, urn {}. Aspect has not changed.", + log.info( + "Skipped producing MCL for ingested aspect {}, urn {}. Aspect has not changed.", aspectSpec.getName(), entityUrn); return Optional.empty(); @@ -1636,7 +1641,7 @@ private void ingestSnapshotUnion( final List> aspectRecordsToIngest = NewModelUtils.getAspectsFromSnapshot(snapshotRecord); - log.info("INGEST urn {} with system metadata {}", urn, systemMetadata.toString()); + log.info("Ingesting entity urn {} with system metadata {}", urn, systemMetadata.toString()); AspectsBatchImpl aspectsBatch = AspectsBatchImpl.builder() diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java index 0effed1d9a578c..017daab925911f 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java @@ -122,7 +122,8 @@ public void appendRunId(@Nonnull String entityName, @Nonnull Urn urn, @Nullable return; } final String docId = maybeDocId.get(); - log.debug(String.format("Appending run id for entityName: %s, docId: %s", entityName, docId)); + log.info( + "Appending run id for entity name: {}, doc id: {}, run id: {}", entityName, docId, runId); esWriteDAO.applyScriptUpdate( entityName, docId, diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/BulkListener.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/BulkListener.java index 274829df53ba8c..485b95192389ee 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/BulkListener.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/BulkListener.java @@ -47,7 +47,10 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon if (response.hasFailures()) { log.error( - "Failed to feed bulk request. Number of events: " + "Failed to feed bulk request " + + executionId + + "." + + " Number of events: " + response.getItems().length + " Took time ms: " + response.getTook().getMillis() @@ -56,7 +59,10 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon + response.buildFailureMessage()); } else { log.info( - "Successfully fed bulk request. Number of events: " + "Successfully fed bulk request " + + executionId + + "." + + " Number of events: " + response.getItems().length + " Took time ms: " + response.getTook().getMillis() @@ -69,7 +75,8 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon public void afterBulk(long executionId, BulkRequest request, Throwable failure) { // Exception raised outside this method log.error( - "Error feeding bulk request. No retries left. Request: {}", + "Error feeding bulk request {}. No retries left. Request: {}", + executionId, buildBulkRequestSummary(request), failure); incrementMetrics(request, failure); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESBulkProcessor.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESBulkProcessor.java index a2b9292eac6e4a..fc29aca4117845 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESBulkProcessor.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESBulkProcessor.java @@ -79,6 +79,11 @@ private ESBulkProcessor( public ESBulkProcessor add(DocWriteRequest request) { MetricUtils.counter(this.getClass(), ES_WRITES_METRIC).inc(); bulkProcessor.add(request); + log.info( + "Added request id: {}, operation type: {}, index: {}", + request.id(), + request.opType(), + request.index()); return this; } diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessor.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessor.java index a5fd44b0a4c602..ce7376f1f8d662 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessor.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessor.java @@ -54,7 +54,15 @@ public void consume(final ConsumerRecord consumerRecord) { try (Timer.Context i = MetricUtils.timer(this.getClass(), "consume").time()) { kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp()); final String record = consumerRecord.value(); - log.debug("Got DHUE"); + + log.info( + "Got DHUE event key: {}, topic: {}, partition: {}, offset: {}, value size: {}, timestamp: {}", + consumerRecord.key(), + consumerRecord.topic(), + consumerRecord.partition(), + consumerRecord.offset(), + consumerRecord.serializedValueSize(), + consumerRecord.timestamp()); Optional eventDocument = dataHubUsageEventTransformer.transformDataHubUsageEvent(record); diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeLogProcessor.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeLogProcessor.java index a416e6f2e79086..2c3e1da1fa4d04 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeLogProcessor.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeLogProcessor.java @@ -75,11 +75,14 @@ public void consume(final ConsumerRecord consumerRecord) try (Timer.Context i = MetricUtils.timer(this.getClass(), "consume").time()) { kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp()); final GenericRecord record = consumerRecord.value(); - log.debug( - "Got Generic MCL on topic: {}, partition: {}, offset: {}", + log.info( + "Got MCL event key: {}, topic: {}, partition: {}, offset: {}, value size: {}, timestamp: {}", + consumerRecord.key(), consumerRecord.topic(), consumerRecord.partition(), - consumerRecord.offset()); + consumerRecord.offset(), + consumerRecord.serializedValueSize(), + consumerRecord.timestamp()); MetricUtils.counter(this.getClass(), "received_mcl_count").inc(); MetadataChangeLog event; @@ -96,17 +99,23 @@ public void consume(final ConsumerRecord consumerRecord) return; } - log.debug( - "Invoking MCL hooks for urn: {}, key: {}", + log.info( + "Invoking MCL hooks for urn: {}, aspect name: {}, entity type: {}, change type: {}", event.getEntityUrn(), - event.getEntityKeyAspect()); + event.hasAspectName() ? event.getAspectName() : null, + event.hasEntityType() ? event.getEntityType() : null, + event.hasChangeType() ? event.getChangeType() : null); // Here - plug in additional "custom processor hooks" for (MetadataChangeLogHook hook : this.hooks) { if (!hook.isEnabled()) { - log.debug(String.format("Skipping disabled hook %s", hook.getClass())); + log.info(String.format("Skipping disabled hook %s", hook.getClass())); continue; } + log.info( + "Invoking MCL hook {} for urn: {}", + hook.getClass().getSimpleName(), + event.getEntityUrn()); try (Timer.Context ignored = MetricUtils.timer(this.getClass(), hook.getClass().getSimpleName() + "_latency") .time()) { @@ -121,10 +130,7 @@ public void consume(final ConsumerRecord consumerRecord) } // TODO: Manually commit kafka offsets after full processing. MetricUtils.counter(this.getClass(), "consumed_mcl_count").inc(); - log.debug( - "Successfully completed MCL hooks for urn: {}, key: {}", - event.getEntityUrn(), - event.getEntityKeyAspect()); + log.info("Successfully completed MCL hooks for urn: {}", event.getEntityUrn()); } } } diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java index 7bb0f93756d7ab..bd79313adf036c 100644 --- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java @@ -72,6 +72,16 @@ public void consume(final ConsumerRecord consumerRecord) try (Timer.Context i = MetricUtils.timer(this.getClass(), "consume").time()) { kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp()); final GenericRecord record = consumerRecord.value(); + + log.info( + "Got MCE event key: {}, topic: {}, partition: {}, offset: {}, value size: {}, timestamp: {}", + consumerRecord.key(), + consumerRecord.topic(), + consumerRecord.partition(), + consumerRecord.offset(), + consumerRecord.serializedValueSize(), + consumerRecord.timestamp()); + log.debug("Record {}", record); MetadataChangeEvent event = new MetadataChangeEvent(); diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java index a4f5a287bc8fd0..ddf1fb0a726f12 100644 --- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java @@ -62,6 +62,16 @@ public void consume(final ConsumerRecord consumerRecord) try (Timer.Context ignored = MetricUtils.timer(this.getClass(), "consume").time()) { kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp()); final GenericRecord record = consumerRecord.value(); + + log.info( + "Got MCP event key: {}, topic: {}, partition: {}, offset: {}, value size: {}, timestamp: {}", + consumerRecord.key(), + consumerRecord.topic(), + consumerRecord.partition(), + consumerRecord.offset(), + consumerRecord.serializedValueSize(), + consumerRecord.timestamp()); + log.debug("Record {}", record); MetadataChangeProposal event = new MetadataChangeProposal(); @@ -69,7 +79,8 @@ public void consume(final ConsumerRecord consumerRecord) event = EventUtils.avroToPegasusMCP(record); log.debug("MetadataChangeProposal {}", event); // TODO: Get this from the event itself. - entityClient.ingestProposal(event, false); + String urn = entityClient.ingestProposal(event, false); + log.info("Successfully processed MCP event urn: {}", urn); } catch (Throwable throwable) { log.error("MCP Processor Error", throwable); log.error("Message: {}", record); diff --git a/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java b/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java index 955d5c67c09a78..46793aaaaf4a55 100644 --- a/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java +++ b/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java @@ -50,11 +50,14 @@ public void consume(final ConsumerRecord consumerRecord) kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp()); final GenericRecord record = consumerRecord.value(); - log.debug( - "Got Generic PE on topic: {}, partition: {}, offset: {}", + log.info( + "Got PE event key: {}, topic: {}, partition: {}, offset: {}, value size: {}, timestamp: {}", + consumerRecord.key(), consumerRecord.topic(), consumerRecord.partition(), - consumerRecord.offset()); + consumerRecord.offset(), + consumerRecord.serializedValueSize(), + consumerRecord.timestamp()); MetricUtils.counter(this.getClass(), "received_pe_count").inc(); PlatformEvent event; @@ -68,9 +71,13 @@ public void consume(final ConsumerRecord consumerRecord) return; } - log.debug("Invoking PE hooks for event name {}", event.getName()); + log.info("Invoking PE hooks for event name {}", event.getName()); for (PlatformEventHook hook : this.hooks) { + log.info( + "Invoking PE hook {} for event name {}", + hook.getClass().getSimpleName(), + event.getName()); try (Timer.Context ignored = MetricUtils.timer(this.getClass(), hook.getClass().getSimpleName() + "_latency") .time()) { @@ -83,7 +90,7 @@ public void consume(final ConsumerRecord consumerRecord) } } MetricUtils.counter(this.getClass(), "consumed_pe_count").inc(); - log.debug("Successfully completed PE hooks for event with name {}", event.getName()); + log.info("Successfully completed PE hooks for event with name {}", event.getName()); } } } diff --git a/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java b/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java index 62edb9fdfa6281..a940242067edec 100644 --- a/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java +++ b/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java @@ -123,7 +123,7 @@ public void testAsyncDefaultAspects() throws URISyntaxException { .request(req) .build()))); aspectResource.ingestProposal(mcp, "false"); - verify(producer, times(10)) + verify(producer, times(5)) .produceMetadataChangeLog(eq(urn), any(AspectSpec.class), any(MetadataChangeLog.class)); verifyNoMoreInteractions(producer); }