Skip to content

Commit

Permalink
new feature: auto-extend job timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathanlukas committed Mar 5, 2024
1 parent 0990b56 commit af81e58
Show file tree
Hide file tree
Showing 13 changed files with 103 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ public CommandExceptionHandlingStrategy commandExceptionHandlingStrategy(
public JobWorkerManager jobWorkerManager(
final CommandExceptionHandlingStrategy commandExceptionHandlingStrategy,
final JsonMapper jsonMapper,
final MetricsRecorder metricsRecorder, final ZeebeClientExecutorService zeebeClientExecutorService) {
return new JobWorkerManager(commandExceptionHandlingStrategy, jsonMapper, metricsRecorder,zeebeClientExecutorService);
final MetricsRecorder metricsRecorder,
final ZeebeClientExecutorService zeebeClientExecutorService) {
return new JobWorkerManager(
commandExceptionHandlingStrategy, jsonMapper, metricsRecorder, zeebeClientExecutorService);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public void customize(ZeebeWorkerValue zeebeWorker) {
applyDefaultWorkerName(zeebeWorker);
applyDefaultJobWorkerType(zeebeWorker);
applyFetchVariables(zeebeWorker);
applyAutoExtendTimeout(zeebeWorker);
applyOverrides(zeebeWorker);
}

Expand Down Expand Up @@ -102,9 +103,6 @@ private List<String> readVariablesAsTypeParameters(MethodInfo methodInfo) {
}

private void applyOverrides(ZeebeWorkerValue zeebeWorker) {
final Map<String, ZeebeWorkerValue> workerConfigurationMap =
zeebeClientConfigurationProperties.getWorker().getOverride();
setAutoExtendTimeout(zeebeWorker);
final Map<String, ZeebeWorkerValue> workerConfigurationMap =
zeebeClientConfigurationProperties.getWorker().getOverride();
final String workerType = zeebeWorker.getType();
Expand Down Expand Up @@ -159,7 +157,7 @@ private void applyDefaultJobWorkerType(ZeebeWorkerValue zeebeWorker) {
}
}

private void setAutoExtendTimeout(ZeebeWorkerValue workerValue) {
private void applyAutoExtendTimeout(ZeebeWorkerValue workerValue) {
if (!workerValue.isAutoExtendTimeout()) {
workerValue.setAutoExtendTimeout(
zeebeClientConfigurationProperties.getJob().isAutoExtendTimeout());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,11 +550,6 @@ public int hashCode() {

public static class Job {

@Override
public String toString() {
return "Job{" + "timeout=" + timeout + ", pollInterval=" + pollInterval + '}';
}

private Duration timeout = DEFAULT.getDefaultJobTimeout();
private Duration pollInterval = DEFAULT.getDefaultJobPollInterval();
private boolean autoExtendTimeout;
Expand Down Expand Up @@ -592,17 +587,34 @@ public void setPollInterval(Duration pollInterval) {
this.pollInterval = pollInterval;
}

@Override
public String toString() {
return "Job{"
+ "timeout="
+ timeout
+ ", pollInterval="
+ pollInterval
+ ", autoExtendTimeout="
+ autoExtendTimeout
+ ", extendTimeoutPeriod="
+ extendTimeoutPeriod
+ '}';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Job job = (Job) o;
return Objects.equals(timeout, job.timeout) && Objects.equals(pollInterval, job.pollInterval);
return autoExtendTimeout == job.autoExtendTimeout
&& Objects.equals(timeout, job.timeout)
&& Objects.equals(pollInterval, job.pollInterval)
&& Objects.equals(extendTimeoutPeriod, job.extendTimeoutPeriod);
}

@Override
public int hashCode() {
return Objects.hash(timeout, pollInterval);
return Objects.hash(timeout, pollInterval, autoExtendTimeout, extendTimeoutPeriod);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.camunda.zeebe.spring.client.annotation.value.ZeebeWorkerValue;
import io.camunda.zeebe.spring.client.bean.ClassInfo;
import io.camunda.zeebe.spring.client.bean.MethodInfo;
import java.time.Duration;
import java.util.Arrays;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -175,6 +176,23 @@ void shouldApplyOverrides() {
assertThat(zeebeWorkerValue.getEnabled()).isFalse();
}

@Test
void shouldApplyAutoExtendTimeout() {
// given
ZeebeClientConfigurationProperties properties = properties();
properties.getJob().setAutoExtendTimeout(true);
properties.getJob().setExtendTimeoutPeriod(Duration.ofSeconds(25));
PropertyBasedZeebeWorkerValueCustomizer customizer =
new PropertyBasedZeebeWorkerValueCustomizer(properties);
ZeebeWorkerValue zeebeWorkerValue = new ZeebeWorkerValue();
zeebeWorkerValue.setMethodInfo(methodInfo(this, "testBean", "sampleWorker"));
// when
customizer.customize(zeebeWorkerValue);
// then
assertThat(zeebeWorkerValue.getExtendTimeoutPeriod()).isEqualTo(Duration.ofSeconds(25));
assertThat(zeebeWorkerValue.isAutoExtendTimeout()).isEqualTo(true);
}

private static class ComplexProcessVariable {
private String var3;
private String var4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
@Documented
public @interface JobWorker {

String type() default
""; // set to empty string which leads to method name being used (if not
// ${zeebe.client.worker.default-type}" is configured) Implemented in
// ZeebeWorkerAnnotationProcessor
String type() default ""; // set to empty string which leads to method name being used (if not

// ${zeebe.client.worker.default-type}" is configured) Implemented in
// ZeebeWorkerAnnotationProcessor

String name() default
""; // set to empty string which leads to default from ZeebeClientBuilderImpl being used in
// ZeebeWorkerAnnotationProcessor

// ZeebeWorkerAnnotationProcessor

/**
* Set the time (in milliseconds) for how long a job is exclusively assigned for this worker. In
Expand Down Expand Up @@ -82,7 +83,7 @@ String name() default

boolean autoExtendTimeout() default false;

long extendTimeoutPeriodMillis() default -1;
long extendTimeoutPeriodSeconds() default -1;

String[] tenantIds() default {};
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ String name() default
boolean enabled() default true;

String[] tenantIds() default {};
boolean autoExtendTimeout() default false;

long extendTimeoutPeriodMillis() default -1;
boolean autoExtendTimeout() default false;

long extendTimeoutPeriodSeconds() default -1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ public Optional<ZeebeWorkerValue> readJobWorkerAnnotationForMethod(final MethodI
annotation.enabled(),
methodInfo,
Arrays.asList(annotation.tenantIds()),
annotation.fetchAllVariables()));
annotation.fetchAllVariables(),
annotation.autoExtendTimeout(),
Duration.ofSeconds(annotation.extendTimeoutPeriodSeconds())));
} else {
Optional<ZeebeWorker> legacyAnnotation = methodInfo.getAnnotation(ZeebeWorker.class);
if (legacyAnnotation.isPresent()) {
Expand All @@ -104,7 +106,9 @@ public Optional<ZeebeWorkerValue> readJobWorkerAnnotationForMethod(final MethodI
annotation.enabled(),
methodInfo,
Arrays.asList(annotation.tenantIds()),
annotation.forceFetchAllVariables()));
annotation.forceFetchAllVariables(),
annotation.autoExtendTimeout(),
Duration.ofSeconds(annotation.extendTimeoutPeriodSeconds())));
}
}
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package io.camunda.zeebe.spring.client.annotation.value;

import io.camunda.zeebe.spring.client.bean.MethodInfo;
import io.camunda.zeebe.spring.client.bean.ParameterInfo;

import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -48,7 +46,9 @@ public ZeebeWorkerValue(
Boolean enabled,
MethodInfo methodInfo,
List<String> tenantIds,
boolean forceFetchAllVariables) {
boolean forceFetchAllVariables,
boolean autoExtendTimeout,
Duration extendTimeoutPeriod) {
this.type = type;
this.name = name;
this.timeout = timeout;
Expand All @@ -61,6 +61,8 @@ public ZeebeWorkerValue(
this.methodInfo = methodInfo;
this.tenantIds = tenantIds;
this.forceFetchAllVariables = forceFetchAllVariables;
this.autoExtendTimeout = autoExtendTimeout;
this.extendTimeoutPeriod = extendTimeoutPeriod;
}

public Duration getExtendTimeoutPeriod() {
Expand Down Expand Up @@ -209,6 +211,10 @@ public String toString() {
+ tenantIds
+ ", forceFetchAllVariables="
+ forceFetchAllVariables
+ ", autoExtendTimeout="
+ autoExtendTimeout
+ ", extendTimeoutPeriod="
+ extendTimeoutPeriod
+ '}';
}

Expand All @@ -218,6 +224,7 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
ZeebeWorkerValue that = (ZeebeWorkerValue) o;
return forceFetchAllVariables == that.forceFetchAllVariables
&& autoExtendTimeout == that.autoExtendTimeout
&& Objects.equals(type, that.type)
&& Objects.equals(name, that.name)
&& Objects.equals(timeout, that.timeout)
Expand All @@ -228,7 +235,8 @@ public boolean equals(Object o) {
&& Arrays.equals(fetchVariables, that.fetchVariables)
&& Objects.equals(enabled, that.enabled)
&& Objects.equals(methodInfo, that.methodInfo)
&& Objects.equals(tenantIds, that.tenantIds);
&& Objects.equals(tenantIds, that.tenantIds)
&& Objects.equals(extendTimeoutPeriod, that.extendTimeoutPeriod);
}

@Override
Expand All @@ -245,7 +253,9 @@ public int hashCode() {
enabled,
methodInfo,
tenantIds,
forceFetchAllVariables);
forceFetchAllVariables,
autoExtendTimeout,
extendTimeoutPeriod);
result = 31 * result + Arrays.hashCode(fetchVariables);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public ScheduledFuture<?> startAutoExtendTimeout(long deadline, long jobKey, Dur
}

private void extendTimeout(long jobKey, Duration timeout) {
LOG.trace("Updating job timeout of job {} by {}",jobKey,timeout.toString());
LOG.trace("Updating job timeout of job {} by {}", jobKey, timeout.toString());
zeebeClient.newUpdateTimeoutCommand(jobKey).timeout(timeout).send().join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ScheduledFuture;
import org.slf4j.Logger;

/** Zeebe JobHandler that invokes a Spring bean */
public class JobHandlerInvokingSpringBeans implements JobHandler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public class JobWorkerManager {
public JobWorkerManager(
CommandExceptionHandlingStrategy commandExceptionHandlingStrategy,
JsonMapper jsonMapper,
MetricsRecorder metricsRecorder,ZeebeClientExecutorService zeebeClientExecutorService) {
MetricsRecorder metricsRecorder,
ZeebeClientExecutorService zeebeClientExecutorService) {
this.commandExceptionHandlingStrategy = commandExceptionHandlingStrategy;
this.jsonMapper = jsonMapper;
this.metricsRecorder = metricsRecorder;
Expand All @@ -48,7 +49,7 @@ public JobWorker openWorker(ZeebeClient client, ZeebeWorkerValue zeebeWorkerValu
commandExceptionHandlingStrategy,
jsonMapper,
metricsRecorder,
new AutoExtendTimeoutManager(zeebeClientExecutorService,client)));
new AutoExtendTimeoutManager(zeebeClientExecutorService, client)));
}

public JobWorker openWorker(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,9 @@ public static class TenantBound {
@JobWorker(tenantIds = "tenant-1")
public void handle() {}
}

public static class AutoExtend {
@JobWorker(autoExtendTimeout = true, extendTimeoutPeriodSeconds = 30)
public void handle() {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.camunda.zeebe.spring.client.bean.ClassInfoTest;
import io.camunda.zeebe.spring.client.bean.MethodInfo;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Optional;
Expand Down Expand Up @@ -59,6 +60,22 @@ void shouldReadTenantIds() {
assertThat(zeebeWorkerValue.get().getTenantIds()).containsOnly("tenant-1");
}

@Test
void shouldReadAutoExtendTimeout() {
// given
final ZeebeWorkerAnnotationProcessor annotationProcessor = createDefaultAnnotationProcessor();
final MethodInfo methodInfo = extract(ClassInfoTest.AutoExtend.class);

// when
final Optional<ZeebeWorkerValue> zeebeWorkerValue =
annotationProcessor.readJobWorkerAnnotationForMethod(methodInfo);

// then
assertTrue(zeebeWorkerValue.isPresent());
assertThat(zeebeWorkerValue.get().getAutoComplete()).isEqualTo(true);
assertThat(zeebeWorkerValue.get().getExtendTimeoutPeriod()).isEqualTo(Duration.ofSeconds(30));
}

@Test
public void applyOnWithZeebeWorkerAllValues() {
// given
Expand Down

0 comments on commit af81e58

Please sign in to comment.