Skip to content

Commit

Permalink
Add update info to MDC context (#2259)
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns authored Oct 8, 2024
1 parent 089bbea commit d1dc2e1
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,6 @@ private LoggerTag() {}
public static final String SIDE_EFFECT_ID = "SideEffectId";
public static final String CHILD_WORKFLOW_ID = "ChildWorkflowId";
public static final String ATTEMPT = "Attempt";
public static final String UPDATE_ID = "UpdateId";
public static final String UPDATE_NAME = "UpdateName";
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.temporal.common.context.ContextPropagator;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.converter.DefaultDataConverter;
import io.temporal.internal.logging.LoggerTag;
import io.temporal.internal.replay.ReplayWorkflow;
import io.temporal.internal.replay.ReplayWorkflowContext;
import io.temporal.internal.replay.WorkflowContext;
Expand All @@ -47,6 +48,7 @@
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/**
* SyncWorkflow supports workflows that use synchronous blocking code. An instance is created per
Expand Down Expand Up @@ -166,6 +168,8 @@ public void handleUpdate(
() -> {
try {
workflowContext.setCurrentUpdateInfo(updateInfo);
MDC.put(LoggerTag.UPDATE_ID, updateInfo.getUpdateId());
MDC.put(LoggerTag.UPDATE_NAME, updateInfo.getUpdateName());
// Skip validator on replay
if (!callbacks.isReplaying()) {
try {
Expand Down
32 changes: 25 additions & 7 deletions temporal-sdk/src/test/java/io/temporal/workflow/LoggerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -71,13 +73,21 @@ public void tearDown() throws Exception {

@WorkflowInterface
public interface TestWorkflow {
@UpdateMethod
void update(String id);

@WorkflowMethod
void execute(String id);
}

public static class TestLoggingInWorkflow implements LoggerTest.TestWorkflow {
private final Logger workflowLogger = Workflow.getLogger(TestLoggingInWorkflow.class);

@Override
public void update(String id) {
workflowLogger.info("Updating workflow {}.", id);
}

@Override
public void execute(String id) {
workflowLogger.info("Start executing workflow {}.", id);
Expand Down Expand Up @@ -107,7 +117,7 @@ public void executeChild(String id) {
}

@Test
public void testWorkflowLogger() {
public void testWorkflowLogger() throws ExecutionException, InterruptedException {
Worker worker = env.newWorker(taskQueue);
worker.registerWorkflowImplementationTypes(
TestLoggingInWorkflow.class, TestLoggerInChildWorkflow.class);
Expand All @@ -122,14 +132,18 @@ public void testWorkflowLogger() {
LoggerTest.TestWorkflow workflow =
workflowClient.newWorkflowStub(LoggerTest.TestWorkflow.class, options);
String wfId = UUID.randomUUID().toString();
workflow.execute(wfId);

assertEquals(1, matchingLines(String.format("Start executing workflow %s.", wfId)));
assertEquals(1, matchingLines(String.format("Executing child workflow %s.", wfId)));
assertEquals(1, matchingLines(String.format("Done executing workflow %s.", wfId)));
CompletableFuture<Void> result = WorkflowClient.execute(workflow::execute, wfId);
workflow.update(wfId);
result.get();

assertEquals(1, matchingLines(String.format("Start executing workflow %s.", wfId), false));
assertEquals(1, matchingLines(String.format("Executing child workflow %s.", wfId), false));
assertEquals(1, matchingLines(String.format("Done executing workflow %s.", wfId), false));
// Assert the update log is present
assertEquals(1, matchingLines(String.format("Updating workflow %s.", wfId), true));
}

private int matchingLines(String message) {
private int matchingLines(String message, boolean isUpdateMethod) {
int i = 0;
// Make copy to avoid ConcurrentModificationException
List<ILoggingEvent> list = new ArrayList<>(listAppender.list);
Expand All @@ -139,6 +153,10 @@ private int matchingLines(String message) {
assertTrue(event.getMDCPropertyMap().containsKey(LoggerTag.WORKFLOW_TYPE));
assertTrue(event.getMDCPropertyMap().containsKey(LoggerTag.RUN_ID));
assertTrue(event.getMDCPropertyMap().containsKey(LoggerTag.TASK_QUEUE));
if (isUpdateMethod) {
assertTrue(event.getMDCPropertyMap().containsKey(LoggerTag.UPDATE_ID));
assertTrue(event.getMDCPropertyMap().containsKey(LoggerTag.UPDATE_NAME));
}
i++;
}
}
Expand Down

0 comments on commit d1dc2e1

Please sign in to comment.