Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: Auto extend job timeout #625

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ public CommandExceptionHandlingStrategy commandExceptionHandlingStrategy(
public JobWorkerManager jobWorkerManager(
final CommandExceptionHandlingStrategy commandExceptionHandlingStrategy,
final JsonMapper jsonMapper,
final MetricsRecorder metricsRecorder) {
return new JobWorkerManager(commandExceptionHandlingStrategy, jsonMapper, metricsRecorder);
final MetricsRecorder metricsRecorder,
final ZeebeClientExecutorService zeebeClientExecutorService) {
return new JobWorkerManager(
commandExceptionHandlingStrategy, jsonMapper, metricsRecorder, zeebeClientExecutorService);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public void customize(ZeebeWorkerValue zeebeWorker) {
applyDefaultWorkerName(zeebeWorker);
applyDefaultJobWorkerType(zeebeWorker);
applyFetchVariables(zeebeWorker);
applyAutoExtendTimeout(zeebeWorker);
applyOverrides(zeebeWorker);
}

Expand Down Expand Up @@ -155,4 +156,15 @@ private void applyDefaultJobWorkerType(ZeebeWorkerValue zeebeWorker) {
}
}
}

private void applyAutoExtendTimeout(ZeebeWorkerValue workerValue) {
if (!workerValue.isAutoExtendTimeout()) {
workerValue.setAutoExtendTimeout(
zeebeClientConfigurationProperties.getJob().isAutoExtendTimeout());
}
if (workerValue.getExtendTimeoutPeriod() == null) {
workerValue.setExtendTimeoutPeriod(
zeebeClientConfigurationProperties.getJob().getExtendTimeoutPeriod());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -550,13 +550,26 @@ public int hashCode() {

public static class Job {

@Override
public String toString() {
return "Job{" + "timeout=" + timeout + ", pollInterval=" + pollInterval + '}';
}

private Duration timeout = DEFAULT.getDefaultJobTimeout();
private Duration pollInterval = DEFAULT.getDefaultJobPollInterval();
private boolean autoExtendTimeout;
private Duration extendTimeoutPeriod = Duration.ofSeconds(30);

public Duration getExtendTimeoutPeriod() {
return extendTimeoutPeriod;
}

public void setExtendTimeoutPeriod(Duration extendTimeoutPeriod) {
this.extendTimeoutPeriod = extendTimeoutPeriod;
}

public boolean isAutoExtendTimeout() {
return autoExtendTimeout;
}

public void setAutoExtendTimeout(boolean autoExtendTimeout) {
this.autoExtendTimeout = autoExtendTimeout;
}

public Duration getTimeout() {
return timeout;
Expand All @@ -574,17 +587,34 @@ public void setPollInterval(Duration pollInterval) {
this.pollInterval = pollInterval;
}

@Override
public String toString() {
return "Job{"
+ "timeout="
+ timeout
+ ", pollInterval="
+ pollInterval
+ ", autoExtendTimeout="
+ autoExtendTimeout
+ ", extendTimeoutPeriod="
+ extendTimeoutPeriod
+ '}';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Job job = (Job) o;
return Objects.equals(timeout, job.timeout) && Objects.equals(pollInterval, job.pollInterval);
return autoExtendTimeout == job.autoExtendTimeout
&& Objects.equals(timeout, job.timeout)
&& Objects.equals(pollInterval, job.pollInterval)
&& Objects.equals(extendTimeoutPeriod, job.extendTimeoutPeriod);
}

@Override
public int hashCode() {
return Objects.hash(timeout, pollInterval);
return Objects.hash(timeout, pollInterval, autoExtendTimeout, extendTimeoutPeriod);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.camunda.zeebe.spring.client.annotation.value.ZeebeWorkerValue;
import io.camunda.zeebe.spring.client.bean.ClassInfo;
import io.camunda.zeebe.spring.client.bean.MethodInfo;
import java.time.Duration;
import java.util.Arrays;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -175,6 +176,23 @@ void shouldApplyOverrides() {
assertThat(zeebeWorkerValue.getEnabled()).isFalse();
}

@Test
void shouldApplyAutoExtendTimeout() {
// given
ZeebeClientConfigurationProperties properties = properties();
properties.getJob().setAutoExtendTimeout(true);
properties.getJob().setExtendTimeoutPeriod(Duration.ofSeconds(25));
PropertyBasedZeebeWorkerValueCustomizer customizer =
new PropertyBasedZeebeWorkerValueCustomizer(properties);
ZeebeWorkerValue zeebeWorkerValue = new ZeebeWorkerValue();
zeebeWorkerValue.setMethodInfo(methodInfo(this, "testBean", "sampleWorker"));
// when
customizer.customize(zeebeWorkerValue);
// then
assertThat(zeebeWorkerValue.getExtendTimeoutPeriod()).isEqualTo(Duration.ofSeconds(25));
assertThat(zeebeWorkerValue.isAutoExtendTimeout()).isEqualTo(true);
}

private static class ComplexProcessVariable {
private String var3;
private String var4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,9 @@ String name() default

boolean enabled() default true;

boolean autoExtendTimeout() default false;

long extendTimeoutPeriodSeconds() default -1;

String[] tenantIds() default {};
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,8 @@ String name() default
boolean enabled() default true;

String[] tenantIds() default {};

boolean autoExtendTimeout() default false;

long extendTimeoutPeriodSeconds() default -1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.camunda.zeebe.spring.client.bean.MethodInfo;
import io.camunda.zeebe.spring.client.jobhandling.JobWorkerManager;
import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -85,7 +86,9 @@ public Optional<ZeebeWorkerValue> readJobWorkerAnnotationForMethod(final MethodI
annotation.enabled(),
methodInfo,
Arrays.asList(annotation.tenantIds()),
annotation.fetchAllVariables()));
annotation.fetchAllVariables(),
annotation.autoExtendTimeout(),
Duration.ofSeconds(annotation.extendTimeoutPeriodSeconds())));
} else {
Optional<ZeebeWorker> legacyAnnotation = methodInfo.getAnnotation(ZeebeWorker.class);
if (legacyAnnotation.isPresent()) {
Expand All @@ -103,7 +106,9 @@ public Optional<ZeebeWorkerValue> readJobWorkerAnnotationForMethod(final MethodI
annotation.enabled(),
methodInfo,
Arrays.asList(annotation.tenantIds()),
annotation.forceFetchAllVariables()));
annotation.forceFetchAllVariables(),
annotation.autoExtendTimeout(),
Duration.ofSeconds(annotation.extendTimeoutPeriodSeconds())));
}
}
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.camunda.zeebe.spring.client.annotation.value;

import io.camunda.zeebe.spring.client.bean.MethodInfo;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -28,6 +29,8 @@ public class ZeebeWorkerValue implements ZeebeAnnotationValue<MethodInfo> {
private MethodInfo methodInfo;
private List<String> tenantIds;
private boolean forceFetchAllVariables;
private boolean autoExtendTimeout;
private Duration extendTimeoutPeriod;

public ZeebeWorkerValue() {}

Expand All @@ -43,7 +46,9 @@ public ZeebeWorkerValue(
Boolean enabled,
MethodInfo methodInfo,
List<String> tenantIds,
boolean forceFetchAllVariables) {
boolean forceFetchAllVariables,
boolean autoExtendTimeout,
Duration extendTimeoutPeriod) {
this.type = type;
this.name = name;
this.timeout = timeout;
Expand All @@ -56,6 +61,24 @@ public ZeebeWorkerValue(
this.methodInfo = methodInfo;
this.tenantIds = tenantIds;
this.forceFetchAllVariables = forceFetchAllVariables;
this.autoExtendTimeout = autoExtendTimeout;
this.extendTimeoutPeriod = extendTimeoutPeriod;
}

public Duration getExtendTimeoutPeriod() {
return extendTimeoutPeriod;
}

public void setExtendTimeoutPeriod(Duration extendTimeoutPeriod) {
this.extendTimeoutPeriod = extendTimeoutPeriod;
}

public boolean isAutoExtendTimeout() {
return autoExtendTimeout;
}

public void setAutoExtendTimeout(boolean autoExtendTimeout) {
this.autoExtendTimeout = autoExtendTimeout;
}

public String getType() {
Expand Down Expand Up @@ -188,6 +211,10 @@ public String toString() {
+ tenantIds
+ ", forceFetchAllVariables="
+ forceFetchAllVariables
+ ", autoExtendTimeout="
+ autoExtendTimeout
+ ", extendTimeoutPeriod="
+ extendTimeoutPeriod
+ '}';
}

Expand All @@ -197,6 +224,7 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
ZeebeWorkerValue that = (ZeebeWorkerValue) o;
return forceFetchAllVariables == that.forceFetchAllVariables
&& autoExtendTimeout == that.autoExtendTimeout
&& Objects.equals(type, that.type)
&& Objects.equals(name, that.name)
&& Objects.equals(timeout, that.timeout)
Expand All @@ -207,7 +235,8 @@ public boolean equals(Object o) {
&& Arrays.equals(fetchVariables, that.fetchVariables)
&& Objects.equals(enabled, that.enabled)
&& Objects.equals(methodInfo, that.methodInfo)
&& Objects.equals(tenantIds, that.tenantIds);
&& Objects.equals(tenantIds, that.tenantIds)
&& Objects.equals(extendTimeoutPeriod, that.extendTimeoutPeriod);
}

@Override
Expand All @@ -224,7 +253,9 @@ public int hashCode() {
enabled,
methodInfo,
tenantIds,
forceFetchAllVariables);
forceFetchAllVariables,
autoExtendTimeout,
extendTimeoutPeriod);
result = 31 * result + Arrays.hashCode(fetchVariables);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.camunda.zeebe.spring.client.jobhandling;

import io.camunda.zeebe.client.ZeebeClient;
import java.time.Duration;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AutoExtendTimeoutManager {
private static final Logger LOG = LoggerFactory.getLogger(AutoExtendTimeoutManager.class);
private final ZeebeClientExecutorService zeebeClientExecutorService;
private final ZeebeClient zeebeClient;

public AutoExtendTimeoutManager(
ZeebeClientExecutorService zeebeClientExecutorService, ZeebeClient zeebeClient) {
this.zeebeClientExecutorService = zeebeClientExecutorService;
this.zeebeClient = zeebeClient;
}

public ScheduledFuture<?> startAutoExtendTimeout(long deadline, long jobKey, Duration period) {
return zeebeClientExecutorService
.get()
.scheduleAtFixedRate(
() -> extendTimeout(jobKey, period),
Integer.max(0, (int) (deadline - System.currentTimeMillis() - 5000)),
period.minusSeconds(5).getSeconds(),
TimeUnit.SECONDS);
}

private void extendTimeout(long jobKey, Duration timeout) {
LOG.trace("Updating job timeout of job {} by {}", jobKey, timeout.toString());
zeebeClient.newUpdateTimeoutCommand(jobKey).timeout(timeout).send().join();
}
}
Loading
Loading