diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts index bc1ee7affd7..866002f15e6 100644 --- a/bom/build.gradle.kts +++ b/bom/build.gradle.kts @@ -54,6 +54,9 @@ dependencies { api(project(":nessie-server-store-proto")) api(project(":nessie-content-generator")) api(project(":nessie-protobuf-relocated")) + api(project(":nessie-tasks-api")) + api(project(":nessie-tasks-service-async")) + api(project(":nessie-tasks-service-impl")) api(project(":nessie-versioned-spi")) api(project(":nessie-versioned-storage-batching")) api(project(":nessie-versioned-storage-bigtable")) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 52acefaebbc..15688a2dc5e 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -127,6 +127,7 @@ maven-resolver-provider = { module = "org.apache.maven:maven-resolver-provider", maven-resolver-transport-file = { module = "org.apache.maven.resolver:maven-resolver-transport-file", version.ref = "mavenResolver" } maven-resolver-transport-http = { module = "org.apache.maven.resolver:maven-resolver-transport-http", version.ref = "mavenResolver" } micrometer-core = { module = "io.micrometer:micrometer-core", version = "1.12.2" } +microprofile-contextpropagation-api = { module = "org.eclipse.microprofile.context-propagation:microprofile-context-propagation-api", version = "1.3" } microprofile-openapi = { module = "org.eclipse.microprofile.openapi:microprofile-openapi-api", version = "3.1.1" } mockito-core = { module = "org.mockito:mockito-core", version.ref = "mockito" } mockito-junit-jupiter = { module = "org.mockito:mockito-junit-jupiter", version.ref = "mockito" } @@ -169,6 +170,7 @@ testcontainers-keycloak = { module = "com.github.dasniko:testcontainers-keycloak threeten-extra = { module = "org.threeten:threeten-extra", version = "1.7.2" } undertow-core = { module = "io.undertow:undertow-core", version.ref = "undertow" } undertow-servlet = { module = "io.undertow:undertow-servlet", version.ref = "undertow" } +vertx-core = { module = "io.vertx:vertx-core", version = "4.5.1" } weld-se-core = { module = "org.jboss.weld.se:weld-se-core", version = "5.1.2.Final" } wiremock = { module = "com.github.tomakehurst:wiremock-jre8-standalone", version = "2.35.1" } diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties index ecb850c8e3e..c4586af94e4 100644 --- a/gradle/projects.main.properties +++ b/gradle/projects.main.properties @@ -38,6 +38,9 @@ nessie-server-store=servers/store nessie-server-store-proto=servers/store-proto nessie-content-generator=tools/content-generator nessie-protobuf-relocated=tools/protobuf-relocated +nessie-tasks-api=tasks/api +nessie-tasks-service-async=tasks/service/async +nessie-tasks-service-impl=tasks/service/impl nessie-versioned-spi=versioned/spi nessie-versioned-storage-batching=versioned/storage/batching nessie-versioned-storage-bigtable=versioned/storage/bigtable diff --git a/tasks/api/build.gradle.kts b/tasks/api/build.gradle.kts new file mode 100644 index 00000000000..2193bdf4bd5 --- /dev/null +++ b/tasks/api/build.gradle.kts @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2022 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { + id("nessie-conventions-server") + id("nessie-jacoco") + alias(libs.plugins.jmh) +} + +extra["maven.name"] = "Nessie - Tasks - API" + +dependencies { + implementation(project(":nessie-versioned-storage-common")) + + compileOnly(libs.jakarta.validation.api) + compileOnly(libs.jakarta.annotation.api) + + compileOnly(libs.errorprone.annotations) + implementation(libs.guava) + implementation(libs.slf4j.api) + + compileOnly(libs.immutables.builder) + compileOnly(libs.immutables.value.annotations) + annotationProcessor(libs.immutables.value.processor) + + implementation(platform(libs.jackson.bom)) + implementation("com.fasterxml.jackson.core:jackson-databind") + implementation("com.fasterxml.jackson.core:jackson-annotations") + + testImplementation(platform(libs.junit.bom)) + testImplementation(libs.bundles.junit.testing) + + testImplementation(project(":nessie-versioned-storage-testextension")) + testImplementation(project(":nessie-versioned-storage-inmemory")) + testRuntimeOnly(libs.logback.classic) + + jmhImplementation(libs.jmh.core) + jmhImplementation(project(":nessie-versioned-storage-common-tests")) + jmhAnnotationProcessor(libs.jmh.generator.annprocess) +} + +tasks.named("processJmhJandexIndex").configure { enabled = false } + +tasks.named("processTestJandexIndex").configure { enabled = false } + +jmh { jmhVersion.set(libs.versions.jmh.get()) } diff --git a/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/JacksonSerializers.java b/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/JacksonSerializers.java new file mode 100644 index 00000000000..a76a45b26d0 --- /dev/null +++ b/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/JacksonSerializers.java @@ -0,0 +1,60 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.api; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; +import java.io.IOException; +import java.time.Instant; + +public final class JacksonSerializers { + private JacksonSerializers() {} + + public static final class InstantAsLongDeserializer extends StdDeserializer { + public InstantAsLongDeserializer() { + super(Instant.class); + } + + @Override + public Instant deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + if (p.currentToken().isNumeric()) { + long millis = p.getValueAsLong(); + return Instant.ofEpochMilli(millis); + } + return null; + } + } + + public static final class InstantAsLongSerializer extends StdSerializer { + public InstantAsLongSerializer() { + super(Instant.class); + } + + @Override + public void serialize(Instant value, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + if (value == null) { + gen.writeNull(); + } else { + gen.writeNumber(value.toEpochMilli()); + } + } + } +} diff --git a/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TaskBehavior.java b/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TaskBehavior.java new file mode 100644 index 00000000000..076d7318b41 --- /dev/null +++ b/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TaskBehavior.java @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.api; + +import static org.projectnessie.nessie.tasks.api.TaskState.failureState; + +import java.time.Clock; +import java.time.Instant; +import org.projectnessie.versioned.storage.common.persist.ObjType; + +/** + * Defines the behavior of tasks. + * + *

This included the calculation or retry timestamps and lost-task timeouts and mapping between + * exceptions and task state. + */ +public interface TaskBehavior { + + /** + * Convert the task state as an exception, if the state represents an error or failure state. This + * is used to transform a persisted {@linkplain TaskStatus#FAILURE failure status} to a Java + * exception for local callers. + */ + Throwable stateAsException(T obj); + + /** + * Retrieve the timestamp when the next running-state update that refreshes the {@link + * TaskState#retryNotBefore() retry-not-before} and {@link TaskState#lostNotBefore()} + * lost-not-before} timestamps in the database, shall happen. The returned timestamp must be + * earlier than any of these "not-before" timestamps and defines when the update will be + * scheduled. + */ + Instant performRunningStateUpdateAt(Clock clock, T running); + + /** + * Build a new {@linkplain TaskStatus#RUNNING running} task-state with "fresh" {@linkplain + * TaskState#retryNotBefore() retry-not-before} and {@linkplain TaskState#lostNotBefore() + * lost-not-before} timestamps. + */ + TaskState runningTaskState(Clock clock, T running); + + /** + * Called when the task execution resulted in an exception, Build a new {@linkplain + * TaskStatus#ERROR_RETRY error-retry}, with "fresh" {@linkplain TaskState#retryNotBefore() + * retry-not-before} timestamp, or {@linkplain TaskStatus#FAILURE failure} task-state. + */ + default TaskState asErrorTaskState(Clock clock, T base, Throwable t) { + return failureState(t.toString()); + } + + /** Create a new, empty task-object builder. */ + B newObjBuilder(); + + ObjType objType(); +} diff --git a/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TaskObj.java b/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TaskObj.java new file mode 100644 index 00000000000..6bc3b8b1276 --- /dev/null +++ b/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TaskObj.java @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.api; + +import static org.projectnessie.nessie.tasks.api.TaskObjUtil.TASK_DEFAULT_CACHE_EXPIRE; + +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import org.projectnessie.versioned.storage.common.objtypes.CustomObjType.CacheExpireCalculation; +import org.projectnessie.versioned.storage.common.persist.ObjId; +import org.projectnessie.versioned.storage.common.persist.ObjType; +import org.projectnessie.versioned.storage.common.persist.UpdateableObj; + +/** Base interface for objects that represent task state and value. */ +public interface TaskObj extends UpdateableObj { + /** Retrieve the task-state of this object. */ + TaskState taskState(); + + interface Builder { + + @CanIgnoreReturnValue + Builder from(TaskObj base); + + @CanIgnoreReturnValue + Builder versionToken(String versionToken); + + @CanIgnoreReturnValue + Builder id(ObjId id); + + @CanIgnoreReturnValue + Builder type(ObjType type); + + @CanIgnoreReturnValue + Builder taskState(TaskState taskState); + + TaskObj build(); + } + + @SuppressWarnings("unchecked") + static CacheExpireCalculation taskDefaultCacheExpire() { + return (CacheExpireCalculation) TASK_DEFAULT_CACHE_EXPIRE; + } +} diff --git a/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TaskObjUtil.java b/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TaskObjUtil.java new file mode 100644 index 00000000000..b6c2cacee06 --- /dev/null +++ b/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TaskObjUtil.java @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.api; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.projectnessie.versioned.storage.common.persist.ObjType.CACHE_UNLIMITED; + +import java.time.Instant; +import org.projectnessie.versioned.storage.common.objtypes.CustomObjType; + +final class TaskObjUtil { + private TaskObjUtil() {} + + static final CustomObjType.CacheExpireCalculation TASK_DEFAULT_CACHE_EXPIRE = + (obj, currentTimeMicros) -> { + TaskState state = obj.taskState(); + if (state.status().isFinal()) { + return CACHE_UNLIMITED; + } + + Instant retryNotBefore = state.retryNotBefore(); + if (retryNotBefore != null) { + return MILLISECONDS.toMicros(retryNotBefore.toEpochMilli()); + } + + return CACHE_UNLIMITED; + }; +} diff --git a/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TaskRequest.java b/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TaskRequest.java new file mode 100644 index 00000000000..09756229dc3 --- /dev/null +++ b/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TaskRequest.java @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.api; + +import java.util.concurrent.CompletionStage; +import org.projectnessie.versioned.storage.common.persist.ObjId; +import org.projectnessie.versioned.storage.common.persist.ObjType; + +/** Base interface for value objects that identify a task's input parameters. */ +public interface TaskRequest { + /** Declares the {@linkplain ObjType object type} for this request. */ + ObjType objType(); + + /** + * Globally identifies the task request across all {@linkplain ObjType object types}). + * + *

Implementations must derive the ID from the task type and the task parameters. + */ + ObjId objId(); + + TaskBehavior behavior(); + + /** + * Start execution of the task, this function must not block and/or wait for the task execution to + * finish. + * + *

The implementation is responsible to choose the right scheduling implementation. For + * example: tasks that are supposed to run very long, like 5 seconds or more, must not use a Vert.X's + * {@code executeBlocking()}. + */ + CompletionStage submitExecution(); + + /** Applies parameters from this request to the object builder. */ + default B applyRequestToObjBuilder(B builder) { + return builder; + } +} diff --git a/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TaskState.java b/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TaskState.java new file mode 100644 index 00000000000..e97fc122659 --- /dev/null +++ b/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TaskState.java @@ -0,0 +1,120 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.api; + +import static com.google.common.base.Preconditions.checkState; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; +import java.time.Instant; +import org.immutables.value.Value; +import org.projectnessie.nessie.tasks.api.JacksonSerializers.InstantAsLongDeserializer; +import org.projectnessie.nessie.tasks.api.JacksonSerializers.InstantAsLongSerializer; + +/** Task state as a value object. */ +@Value.Immutable +@JsonSerialize(as = ImmutableTaskState.class) +@JsonDeserialize(as = ImmutableTaskState.class) +public interface TaskState { + /** The current task status. */ + @Value.Parameter(order = 1) + TaskStatus status(); + + /** + * Represents the earliest timestamp when a retryable error can be retried or the value/state of a + * running task can be refreshed. Only valid for {@link TaskStatus#RUNNING RUNNING} and {@link + * TaskStatus#ERROR_RETRY ERROR_RETRY}. + */ + @Value.Parameter(order = 2) + @Nullable + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonDeserialize(using = InstantAsLongDeserializer.class) + @JsonSerialize(using = InstantAsLongSerializer.class) + Instant retryNotBefore(); + + /** + * Represents the earliest timestamp when a task service can assume that a {@link + * TaskStatus#RUNNING RUNNING} task is lost and should be re-started. Only valid for {@link + * TaskStatus#RUNNING RUNNING}. + */ + @Value.Parameter(order = 3) + @Nullable + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonDeserialize(using = InstantAsLongDeserializer.class) + @JsonSerialize(using = InstantAsLongSerializer.class) + Instant lostNotBefore(); + + /** + * Represents an error message. Only valid for {@link TaskStatus#FAILURE FAILURE} and {@link + * TaskStatus#ERROR_RETRY ERROR_RETRY}. + */ + @Value.Parameter(order = 4) + @Nullable + @JsonInclude(JsonInclude.Include.NON_NULL) + String message(); + + TaskState SUCCESS = ImmutableTaskState.of(TaskStatus.SUCCESS, null, null, null); + + static TaskState successState() { + return SUCCESS; + } + + static TaskState runningState(@Nonnull Instant retryNotBefore, @Nonnull Instant lostNotBefore) { + return ImmutableTaskState.of(TaskStatus.RUNNING, retryNotBefore, lostNotBefore, null); + } + + static TaskState retryableErrorState(@Nonnull Instant retryNotBefore, @Nonnull String message) { + return ImmutableTaskState.of(TaskStatus.ERROR_RETRY, retryNotBefore, null, message); + } + + static TaskState failureState(@Nonnull String message) { + return ImmutableTaskState.of(TaskStatus.FAILURE, null, null, message); + } + + static TaskState taskState( + TaskStatus taskStatus, Instant retryNotBefore, Instant lostNotBefore, String message) { + return ImmutableTaskState.of(taskStatus, retryNotBefore, lostNotBefore, message); + } + + @Value.Check + default void check() { + switch (status()) { + case SUCCESS: + checkState(retryNotBefore() == null, "retryNotBefore must be null for SUCCESS"); + checkState(lostNotBefore() == null, "retryNotBefore must be null for SUCCESS"); + break; + case FAILURE: + checkState(retryNotBefore() == null, "retryNotBefore must be null for FAILURE"); + checkState(lostNotBefore() == null, "lostNotBefore must be null for FAILURE"); + checkState(message() != null, "message must not be null for FAILURE"); + break; + case RUNNING: + checkState(retryNotBefore() != null, "retryNotBefore must not be null for RUNNING"); + checkState(lostNotBefore() != null, "lostNotBefore must not be null for RUNNING"); + break; + case ERROR_RETRY: + checkState(retryNotBefore() != null, "retryNotBefore must not be null for ERROR_RETRY"); + checkState(lostNotBefore() == null, "lostNotBefore must be null for ERROR_RETRY"); + checkState(message() != null, "message must not be null for ERROR_RETRY"); + break; + default: + throw new IllegalStateException("Unknown task status " + status()); + } + } +} diff --git a/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TaskStatus.java b/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TaskStatus.java new file mode 100644 index 00000000000..d3423672081 --- /dev/null +++ b/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TaskStatus.java @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.api; + +public enum TaskStatus { + /** The task is currently running. */ + RUNNING(false, false, true), + /** The task has finished successfully. */ + SUCCESS(true, false, false), + /** The task has failed and can be retried. */ + ERROR_RETRY(false, true, true), + /** The task has failed without a way to recover. */ + FAILURE(true, true, false), + ; + + private final boolean finalState; + private final boolean error; + private final boolean retryable; + + TaskStatus(boolean finalState, boolean error, boolean retryable) { + this.finalState = finalState; + this.error = error; + this.retryable = retryable; + } + + /** + * Whether the status represents a final state, which will never be updated. + * + *

Final states are never retried and can be unconditionally cached. + */ + public boolean isFinal() { + return finalState; + } + + /** Whether the status represents an error. */ + public boolean isError() { + return error; + } + + public boolean isRetryable() { + return retryable; + } +} diff --git a/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/Tasks.java b/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/Tasks.java new file mode 100644 index 00000000000..c7c1cf63137 --- /dev/null +++ b/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/Tasks.java @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.api; + +import java.util.concurrent.CompletionStage; + +/** + * Central API interface to execute long-running I/O intensive computations, ensuring that no more + * than one computation for the same input happens. + */ +public interface Tasks { + /** + * Issue a request for a task described by {@link TaskRequest}. Tasks for the same request are + * executed only once. The returned {@link CompletionStage} finishes when the task enters a + * {@linkplain TaskStatus#isFinal() final status} ({@linkplain TaskStatus#SUCCESS SUCCESS} or + * {@linkplain TaskStatus#FAILURE FAILURE}). + */ + CompletionStage submit( + TaskRequest taskRequest); +} diff --git a/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TasksService.java b/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TasksService.java new file mode 100644 index 00000000000..40d4e0cf151 --- /dev/null +++ b/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TasksService.java @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.api; + +import java.util.concurrent.CompletionStage; +import org.projectnessie.versioned.storage.common.persist.Persist; + +/** + * Interface for task service implementation separating the stateful tasks service instance from the + * stateless per-Nessie-repository {@link Tasks} instance. + */ +public interface TasksService { + /** Retrieve the {@link Tasks} instance for a given {@link Persist}. */ + Tasks forPersist(Persist persist); + + CompletionStage shutdown(); +} diff --git a/tasks/service/async/build.gradle.kts b/tasks/service/async/build.gradle.kts new file mode 100644 index 00000000000..475e7cb01af --- /dev/null +++ b/tasks/service/async/build.gradle.kts @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2022 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { + id("nessie-conventions-server") + id("nessie-jacoco") +} + +extra["maven.name"] = "Nessie - Tasks - Async" + +dependencies { + compileOnly(libs.vertx.core) + compileOnly(libs.microprofile.contextpropagation.api) + + compileOnly(libs.immutables.builder) + compileOnly(libs.immutables.value.annotations) + annotationProcessor(libs.immutables.value.processor) + + testImplementation(platform(libs.junit.bom)) + testImplementation(libs.bundles.junit.testing) + + testImplementation(libs.vertx.core) + testImplementation(libs.microprofile.contextpropagation.api) +} diff --git a/tasks/service/async/src/main/java/org/projectnessie/nessie/tasks/async/TasksAsync.java b/tasks/service/async/src/main/java/org/projectnessie/nessie/tasks/async/TasksAsync.java new file mode 100644 index 00000000000..f715c85bb77 --- /dev/null +++ b/tasks/service/async/src/main/java/org/projectnessie/nessie/tasks/async/TasksAsync.java @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.async; + +import java.time.Clock; +import java.time.Instant; +import java.util.concurrent.CompletionStage; +import java.util.function.Supplier; + +public interface TasksAsync { + Clock clock(); + + default CompletionStage call(Runnable runnable) { + return supply( + () -> { + runnable.run(); + return null; + }); + } + + CompletionStage supply(Supplier runnable); + + /** + * Schedule a {@link Runnable} to run at the given timestamp. + * + *

The scheduled task can always be cancelled as long as it is not running via + * {@link CompletionStage#toCompletableFuture() CompletionStage.toCompletableFuture()}.{@link java.util.concurrent.CompletableFuture#cancel(boolean) cancel(false)}; + * . + * + *

Whether an already running task can be interrupted is not guaranteed and depends on the + * implementation. It is highly recommended to not assume that a running task can be interrupted. + */ + CompletionStage schedule(Runnable runnable, Instant scheduleNotBefore); + + default long calculateDelay(Clock clock, long minimumDelayMillis, Instant scheduleNotBefore) { + long retryEarliestEpochMillis = scheduleNotBefore.toEpochMilli(); + long delayMillis = retryEarliestEpochMillis - clock.millis(); + + return Math.max(delayMillis, minimumDelayMillis); + } +} diff --git a/tasks/service/async/src/main/java/org/projectnessie/nessie/tasks/async/pool/JavaPoolTasksAsync.java b/tasks/service/async/src/main/java/org/projectnessie/nessie/tasks/async/pool/JavaPoolTasksAsync.java new file mode 100644 index 00000000000..b96c3e7012c --- /dev/null +++ b/tasks/service/async/src/main/java/org/projectnessie/nessie/tasks/async/pool/JavaPoolTasksAsync.java @@ -0,0 +1,81 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.async.pool; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import java.time.Clock; +import java.time.Instant; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.function.Supplier; +import org.projectnessie.nessie.tasks.async.TasksAsync; + +public class JavaPoolTasksAsync implements TasksAsync { + private final ScheduledExecutorService executorService; + private final Clock clock; + private final long minimumDelayMillis; + + public JavaPoolTasksAsync( + ScheduledExecutorService executorService, Clock clock, long minimumDelayMillis) { + this.executorService = executorService; + this.clock = clock; + this.minimumDelayMillis = minimumDelayMillis; + } + + @Override + public CompletionStage supply(Supplier supplier) { + return CompletableFuture.supplyAsync(supplier, executorService); + } + + @Override + public CompletionStage schedule(Runnable runnable, Instant scheduleNotBefore) { + long realDelay = calculateDelay(clock, minimumDelayMillis, scheduleNotBefore); + + CompletableFuture completable = new CompletableFuture<>(); + + ScheduledFuture future = + executorService.schedule( + () -> { + try { + runnable.run(); + completable.complete(null); + } catch (Throwable t) { + completable.completeExceptionally(new CompletionException(t)); + } + }, + realDelay, + MILLISECONDS); + + completable.whenComplete( + (v, t) -> { + if (t instanceof CancellationException) { + future.cancel(true); // allow interruption of blocking tasks + } + }); + + return completable; + } + + @Override + public Clock clock() { + return clock; + } +} diff --git a/tasks/service/async/src/main/java/org/projectnessie/nessie/tasks/async/vertx/VertxTasksAsync.java b/tasks/service/async/src/main/java/org/projectnessie/nessie/tasks/async/vertx/VertxTasksAsync.java new file mode 100644 index 00000000000..4fbda34ee8c --- /dev/null +++ b/tasks/service/async/src/main/java/org/projectnessie/nessie/tasks/async/vertx/VertxTasksAsync.java @@ -0,0 +1,96 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.async.vertx; + +import io.vertx.core.Vertx; +import java.time.Clock; +import java.time.Instant; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.function.Supplier; +import org.projectnessie.nessie.tasks.async.TasksAsync; + +public class VertxTasksAsync implements TasksAsync { + private final Vertx vertx; + private final Clock clock; + private final long minimumDelayMillis; + + public VertxTasksAsync(Vertx vertx, Clock clock, long minimumDelayMillis) { + this.vertx = vertx; + this.clock = clock; + this.minimumDelayMillis = minimumDelayMillis; + } + + @Override + public CompletionStage supply(Supplier supplier) { + return vertx + .executeBlocking( + () -> { + try { + return supplier.get(); + } catch (Throwable t) { + throw new CompletionException(t); + } + }, + false) + .toCompletionStage(); + } + + @Override + public CompletionStage schedule(Runnable runnable, Instant scheduleNotBefore) { + long realDelay = calculateDelay(clock, minimumDelayMillis, scheduleNotBefore); + + // Cannot use Vertx.timer(), because current Quarkus 3.6.6 has a Vertx version that does not + // have Vertx.timer(). We can use this once Quarkus uses a Vertx version >= 4.5.1: + // Timer timer = vertx.timer(realDelay, TimeUnit.MILLISECONDS); + // CompletionStage stage = timer.toCompletionStage().thenRun(runnable); + // return new VertxScheduledHandle(timer, stage); + + CompletableFuture completableFuture = new CompletableFuture<>(); + long timerId = + vertx.setTimer( + realDelay, + id -> + // scheduling the runnable as a blocking task, a timer handlers must not block + vertx.executeBlocking( + () -> { + try { + runnable.run(); + completableFuture.complete(null); + return null; + } catch (Throwable t) { + completableFuture.completeExceptionally(new CompletionException(t)); + return null; + } + })); + + completableFuture.whenComplete( + (v, t) -> { + if (t instanceof CancellationException) { + vertx.cancelTimer(timerId); // won't interrupt blocking tasks! + } + }); + + return completableFuture; + } + + @Override + public Clock clock() { + return clock; + } +} diff --git a/tasks/service/async/src/main/java/org/projectnessie/nessie/tasks/async/wrapping/ThreadContextTasksAsync.java b/tasks/service/async/src/main/java/org/projectnessie/nessie/tasks/async/wrapping/ThreadContextTasksAsync.java new file mode 100644 index 00000000000..4004c06ccad --- /dev/null +++ b/tasks/service/async/src/main/java/org/projectnessie/nessie/tasks/async/wrapping/ThreadContextTasksAsync.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.async.wrapping; + +import java.util.function.Supplier; +import org.eclipse.microprofile.context.ThreadContext; +import org.projectnessie.nessie.tasks.async.TasksAsync; + +/** Allows using the Microprofile {@link ThreadContext} with a {@link TasksAsync}. */ +public class ThreadContextTasksAsync extends WrappingTasksAsync { + private final ThreadContext threadContext; + + public ThreadContextTasksAsync(TasksAsync tasksAsync, ThreadContext threadContext) { + super(tasksAsync); + this.threadContext = threadContext; + } + + @Override + protected Runnable wrapRunnable(Runnable runnable) { + return threadContext.contextualRunnable(runnable); + } + + @Override + protected Supplier wrapSupplier(Supplier supplier) { + return threadContext.contextualSupplier(supplier); + } +} diff --git a/tasks/service/async/src/main/java/org/projectnessie/nessie/tasks/async/wrapping/WrappingTasksAsync.java b/tasks/service/async/src/main/java/org/projectnessie/nessie/tasks/async/wrapping/WrappingTasksAsync.java new file mode 100644 index 00000000000..1c27b4a57e9 --- /dev/null +++ b/tasks/service/async/src/main/java/org/projectnessie/nessie/tasks/async/wrapping/WrappingTasksAsync.java @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.async.wrapping; + +import java.time.Clock; +import java.time.Instant; +import java.util.concurrent.CompletionStage; +import java.util.function.Supplier; +import org.projectnessie.nessie.tasks.async.TasksAsync; + +/** + * Used to wrap {@link Runnable}s and {@link Supplier}s passed to {@link TasksAsync} + * implementations. + */ +public abstract class WrappingTasksAsync implements TasksAsync { + private final TasksAsync delegate; + + protected WrappingTasksAsync(TasksAsync delegate) { + this.delegate = delegate; + } + + protected abstract Runnable wrapRunnable(Runnable runnable); + + protected abstract Supplier wrapSupplier(Supplier supplier); + + @Override + public Clock clock() { + return delegate.clock(); + } + + @Override + public CompletionStage call(Runnable runnable) { + return delegate.call(wrapRunnable(runnable)); + } + + @Override + public CompletionStage supply(Supplier supplier) { + return delegate.supply(wrapSupplier(supplier)); + } + + @Override + public CompletionStage schedule(Runnable runnable, Instant scheduleNotBefore) { + return delegate.schedule(wrapRunnable(runnable), scheduleNotBefore); + } + + @Override + public long calculateDelay(Clock clock, long minimumDelayMillis, Instant scheduleNotBefore) { + return delegate.calculateDelay(clock, minimumDelayMillis, scheduleNotBefore); + } +} diff --git a/tasks/service/async/src/test/java/org/projectnessie/nessie/tasks/async/BaseTasksAsync.java b/tasks/service/async/src/test/java/org/projectnessie/nessie/tasks/async/BaseTasksAsync.java new file mode 100644 index 00000000000..64901c1c0a8 --- /dev/null +++ b/tasks/service/async/src/test/java/org/projectnessie/nessie/tasks/async/BaseTasksAsync.java @@ -0,0 +1,263 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.async; + +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +@ExtendWith(SoftAssertionsExtension.class) +public abstract class BaseTasksAsync { + @InjectSoftAssertions protected SoftAssertions soft; + + protected abstract TasksAsync tasksAsync(); + + @ParameterizedTest + @ValueSource(ints = {1, 10, 1000}) + public void callWorks(int num) throws InterruptedException { + TasksAsync async = tasksAsync(); + + Semaphore semImmediate = new Semaphore(0); + Semaphore semStage = new Semaphore(0); + List> stages = new ArrayList<>(num); + + for (int i = 0; i < num; i++) { + stages.add( + async + .call(semImmediate::release) + // on CompletionStage + .thenAccept(x -> semStage.release())); + } + + soft.assertThat(semImmediate.tryAcquire(num, 10, TimeUnit.SECONDS)).isTrue(); + soft.assertThat(semStage.tryAcquire(num, 10, TimeUnit.SECONDS)).isTrue(); + + stages.stream() + .map(CompletionStage::toCompletableFuture) + .forEach( + cf -> { + try { + cf.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + soft.assertThat(stages) + .hasSize(num) + .extracting(CompletionStage::toCompletableFuture) + .allMatch(CompletableFuture::isDone); + } + + @ParameterizedTest + @ValueSource(ints = {1, 10, 1000}) + public void supplyWorks(int num) throws InterruptedException { + TasksAsync async = tasksAsync(); + + Semaphore semImmediate = new Semaphore(0); + Semaphore semStage = new Semaphore(0); + List> stages = new ArrayList<>(num); + + for (int i = 0; i < num; i++) { + int i2 = i; + stages.add( + async + .supply( + () -> { + semImmediate.release(); + return i2; + }) + // on CompletionStage + .thenApply( + v -> { + semStage.release(); + return v; + })); + } + + soft.assertThat(semImmediate.tryAcquire(num, 10, TimeUnit.SECONDS)).isTrue(); + soft.assertThat(semStage.tryAcquire(num, 10, TimeUnit.SECONDS)).isTrue(); + + Set nums = + stages.stream() + .map(CompletionStage::toCompletableFuture) + .map( + cf -> { + try { + return cf.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toSet()); + + soft.assertThat(stages) + .hasSize(num) + .extracting(CompletionStage::toCompletableFuture) + .allMatch(CompletableFuture::isDone); + + soft.assertThat(nums) + .containsExactlyInAnyOrderElementsOf( + IntStream.range(0, num).boxed().collect(Collectors.toSet())); + } + + @Test + public void callExceptionallyWorks() throws Exception { + TasksAsync async = tasksAsync(); + + CompletionStage stage = + async.call( + () -> { + throw new RuntimeException("hello"); + }); + + Throwable mappedFailure = + stage.handle((result, failure) -> failure).toCompletableFuture().get(); + + soft.assertThat(mappedFailure) + .isInstanceOf(CompletionException.class) + .extracting(Throwable::getCause) + .isInstanceOf(RuntimeException.class) + .extracting(Throwable::getMessage) + .isEqualTo("hello"); + } + + @Test + public void scheduledExceptionallyWorks() throws Exception { + TasksAsync async = tasksAsync(); + + CompletionStage stage = + async.schedule( + () -> { + throw new RuntimeException("hello"); + }, + Instant.now()); + + Throwable mappedFailure = + stage.handle((result, failure) -> failure).toCompletableFuture().get(); + + soft.assertThat(mappedFailure) + .isInstanceOf(CompletionException.class) + .extracting(Throwable::getCause) + .isInstanceOf(RuntimeException.class) + .extracting(Throwable::getMessage) + .isEqualTo("hello"); + } + + @ParameterizedTest + @ValueSource(ints = {1, 10, 1000}) + public void scheduleWorks(int num) throws InterruptedException { + TasksAsync async = tasksAsync(); + + Semaphore sem = new Semaphore(0); + + for (int i = 0; i < num; i++) { + async.schedule(sem::release, async.clock().instant()); + } + + soft.assertThat(sem.tryAcquire(num, 10, TimeUnit.SECONDS)).isTrue(); + } + + @Test + public void cancelDoesNotError() { + // See below + TasksAsync async = tasksAsync(); + AtomicBoolean mark = new AtomicBoolean(); + CompletionStage handle = + async.schedule(() -> mark.set(true), async.clock().instant().plus(1500, ChronoUnit.MILLIS)); + handle.toCompletableFuture().cancel(true); + soft.assertThat(mark).isFalse(); + } + + @Test + @Disabled( + "Disabled because this test would run for a long time and the value of this test is questionable.") + public void cancelReallyWorks() throws InterruptedException { + TasksAsync async = tasksAsync(); + + CountDownLatch started = new CountDownLatch(1); + CountDownLatch cancelled = new CountDownLatch(1); + + CompletionStage handle = + async.schedule( + () -> { + try { + started.countDown(); + new Semaphore(0).acquire(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, + async.clock().instant().plus(5, ChronoUnit.SECONDS)); + soft.assertThat(handle).isNotNull(); + + AtomicReference result = new AtomicReference<>(); + AtomicReference error = new AtomicReference<>(); + handle.whenComplete( + (r, t) -> { + cancelled.countDown(); + result.set(r); + error.set(t); + }); + + handle.toCompletableFuture().cancel(true); + soft.assertThat(cancelled.await(10, TimeUnit.SECONDS)).isTrue(); + soft.assertThat(result.get()).isNull(); + soft.assertThat(error.get()).isInstanceOf(CancellationException.class); + + soft.assertThat(started.await(10, TimeUnit.SECONDS)).isFalse(); + } + + @Test + public void realDelayCalculation() { + TasksAsync async = tasksAsync(); + + Clock clock = Clock.fixed(Instant.EPOCH, ZoneId.of("UTC")); + Instant now = clock.instant(); + + soft.assertThat(async.calculateDelay(clock, 1L, now)).isEqualTo(1L); + soft.assertThat(async.calculateDelay(clock, 42L, now)).isEqualTo(42L); + + soft.assertThat(async.calculateDelay(clock, 1L, now.plus(10, ChronoUnit.MILLIS))) + .isEqualTo(10L); + + soft.assertThat(async.calculateDelay(clock, 1L, now.minus(10, ChronoUnit.MILLIS))) + .isEqualTo(1L); + } +} diff --git a/tasks/service/async/src/test/java/org/projectnessie/nessie/tasks/async/pool/TestJavaPoolTasksAsync.java b/tasks/service/async/src/test/java/org/projectnessie/nessie/tasks/async/pool/TestJavaPoolTasksAsync.java new file mode 100644 index 00000000000..fb8d5f92b20 --- /dev/null +++ b/tasks/service/async/src/test/java/org/projectnessie/nessie/tasks/async/pool/TestJavaPoolTasksAsync.java @@ -0,0 +1,107 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.async.pool; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Clock; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.projectnessie.nessie.tasks.async.BaseTasksAsync; +import org.projectnessie.nessie.tasks.async.TasksAsync; + +public class TestJavaPoolTasksAsync extends BaseTasksAsync { + protected static ScheduledExecutorService executorService; + + @BeforeAll + public static void setup() { + executorService = Executors.newScheduledThreadPool(3); + } + + @AfterAll + public static void tearDown() throws InterruptedException { + executorService.shutdownNow(); + assertTrue(executorService.awaitTermination(30, TimeUnit.SECONDS)); + } + + @Override + protected TasksAsync tasksAsync() { + return new JavaPoolTasksAsync(executorService, Clock.systemUTC(), 1L); + } + + @Test + public void cancelScheduledWithInterrupt() throws InterruptedException { + TasksAsync async = tasksAsync(); + + CountDownLatch started = new CountDownLatch(1); + CountDownLatch finished = new CountDownLatch(1); + + CompletionStage handle = + async.schedule( + () -> { + try { + started.countDown(); + new Semaphore(0).acquire(); + } catch (InterruptedException ignored) { + finished.countDown(); + } + }, + async.clock().instant()); + soft.assertThat(handle).isNotNull(); + + soft.assertThat(started.await(10, TimeUnit.SECONDS)).isTrue(); + handle.toCompletableFuture().cancel(true); + soft.assertThat(finished.await(10, TimeUnit.SECONDS)).isTrue(); + } + + @Test + @Disabled( + "Interrupt does not work, although the CompletableFuture is properly cancelled and yields a CompletionException as its result") + public void cancelSubmittedWithInterrupt() throws InterruptedException { + TasksAsync async = tasksAsync(); + + CountDownLatch started = new CountDownLatch(1); + CountDownLatch finished = new CountDownLatch(1); + + CompletionStage handle = + async.supply( + () -> { + try { + started.countDown(); + new Semaphore(0).acquire(); + return "foo"; + } catch (InterruptedException ignored) { + finished.countDown(); + throw new RuntimeException(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + }); + soft.assertThat(handle).isNotNull(); + + soft.assertThat(started.await(10, TimeUnit.SECONDS)).isTrue(); + handle.toCompletableFuture().cancel(true); + soft.assertThat(finished.await(10, TimeUnit.SECONDS)).isTrue(); + } +} diff --git a/tasks/service/async/src/test/java/org/projectnessie/nessie/tasks/async/vertx/TestVertxTasksAsync.java b/tasks/service/async/src/test/java/org/projectnessie/nessie/tasks/async/vertx/TestVertxTasksAsync.java new file mode 100644 index 00000000000..022ef9083db --- /dev/null +++ b/tasks/service/async/src/test/java/org/projectnessie/nessie/tasks/async/vertx/TestVertxTasksAsync.java @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.async.vertx; + +import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; +import java.time.Clock; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.projectnessie.nessie.tasks.async.BaseTasksAsync; +import org.projectnessie.nessie.tasks.async.TasksAsync; + +public class TestVertxTasksAsync extends BaseTasksAsync { + private static Vertx vertx; + + @BeforeAll + public static void setup() { + vertx = + Vertx.builder() + .with(new VertxOptions().setWorkerPoolSize(3).setEventLoopPoolSize(2)) + .build(); + } + + @AfterAll + public static void stop() { + vertx.close().result(); + } + + @Override + protected TasksAsync tasksAsync() { + return new VertxTasksAsync(vertx, Clock.systemUTC(), 1L); + } +} diff --git a/tasks/service/async/src/test/java/org/projectnessie/nessie/tasks/async/wrapping/TestThreadContextTasksAsync.java b/tasks/service/async/src/test/java/org/projectnessie/nessie/tasks/async/wrapping/TestThreadContextTasksAsync.java new file mode 100644 index 00000000000..67885faf752 --- /dev/null +++ b/tasks/service/async/src/test/java/org/projectnessie/nessie/tasks/async/wrapping/TestThreadContextTasksAsync.java @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.async.wrapping; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import java.time.Clock; +import java.time.Instant; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.eclipse.microprofile.context.ThreadContext; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.projectnessie.nessie.tasks.async.TasksAsync; +import org.projectnessie.nessie.tasks.async.pool.JavaPoolTasksAsync; + +@ExtendWith(SoftAssertionsExtension.class) +public class TestThreadContextTasksAsync { + @InjectSoftAssertions protected SoftAssertions soft; + + protected static ScheduledExecutorService executorService; + + @BeforeAll + public static void setup() { + executorService = Executors.newScheduledThreadPool(3); + } + + @AfterAll + public static void tearDown() throws InterruptedException { + executorService.shutdownNow(); + assertTrue(executorService.awaitTermination(30, TimeUnit.SECONDS)); + } + + @Test + public void call() throws Exception { + TasksAsync base = new JavaPoolTasksAsync(executorService, Clock.systemUTC(), 1L); + + ThreadContext threadContext = mock(ThreadContext.class); + + TasksAsync async = new ThreadContextTasksAsync(base, threadContext); + + CountDownLatch latchCall = new CountDownLatch(1); + Runnable runnable = latchCall::countDown; + when(threadContext.contextualRunnable(any())).thenReturn(runnable); + + async.call(runnable); + soft.assertThat(latchCall.await(10, TimeUnit.SECONDS)).isTrue(); + verify(threadContext).contextualRunnable(any()); + verifyNoMoreInteractions(threadContext); + } + + @Test + public void supply() throws Exception { + TasksAsync base = new JavaPoolTasksAsync(executorService, Clock.systemUTC(), 1L); + + ThreadContext threadContext = mock(ThreadContext.class); + + TasksAsync async = new ThreadContextTasksAsync(base, threadContext); + + CountDownLatch latchSupply = new CountDownLatch(1); + Supplier supplier = + () -> { + latchSupply.countDown(); + return "supply"; + }; + when(threadContext.contextualSupplier(any())).thenReturn(supplier); + + async.supply(supplier); + soft.assertThat(latchSupply.await(10, TimeUnit.SECONDS)).isTrue(); + verify(threadContext).contextualSupplier(any()); + verifyNoMoreInteractions(threadContext); + } + + @Test + public void schedule() throws Exception { + TasksAsync base = new JavaPoolTasksAsync(executorService, Clock.systemUTC(), 1L); + + ThreadContext threadContext = mock(ThreadContext.class); + + TasksAsync async = new ThreadContextTasksAsync(base, threadContext); + + CountDownLatch latchSchedule = new CountDownLatch(1); + Runnable runnable = latchSchedule::countDown; + when(threadContext.contextualRunnable(any())).thenReturn(runnable); + + async.schedule(runnable, Instant.EPOCH); + soft.assertThat(latchSchedule.await(10, TimeUnit.SECONDS)).isTrue(); + verify(threadContext).contextualRunnable(any()); + verifyNoMoreInteractions(threadContext); + } +} diff --git a/tasks/service/async/src/test/java/org/projectnessie/nessie/tasks/async/wrapping/TestWrappingTasksAsync.java b/tasks/service/async/src/test/java/org/projectnessie/nessie/tasks/async/wrapping/TestWrappingTasksAsync.java new file mode 100644 index 00000000000..77c866868bb --- /dev/null +++ b/tasks/service/async/src/test/java/org/projectnessie/nessie/tasks/async/wrapping/TestWrappingTasksAsync.java @@ -0,0 +1,247 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.async.wrapping; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.projectnessie.nessie.tasks.async.TasksAsync; +import org.projectnessie.nessie.tasks.async.pool.JavaPoolTasksAsync; + +@ExtendWith(SoftAssertionsExtension.class) +public class TestWrappingTasksAsync { + @InjectSoftAssertions protected SoftAssertions soft; + + protected static ScheduledExecutorService executorService; + protected static TasksAsync base; + + @BeforeAll + public static void setup() { + executorService = Executors.newScheduledThreadPool(3); + base = new JavaPoolTasksAsync(executorService, Clock.systemUTC(), 1L); + } + + @AfterAll + public static void tearDown() throws InterruptedException { + executorService.shutdownNow(); + assertTrue(executorService.awaitTermination(30, TimeUnit.SECONDS)); + } + + @Test + public void wrapClock() { + TasksAsync mock = mock(TasksAsync.class); + + TasksAsync async = + new WrappingTasksAsync(mock) { + @Override + protected Runnable wrapRunnable(Runnable runnable) { + return runnable; + } + + @Override + protected Supplier wrapSupplier(Supplier supplier) { + return supplier; + } + }; + + Clock c = Clock.fixed(Instant.EPOCH, ZoneId.of("UTC")); + when(mock.clock()).thenReturn(c); + + soft.assertThat(async.clock()).isSameAs(c); + verify(mock).clock(); + verifyNoMoreInteractions(mock); + } + + @Test + public void wrapCalculateDelay() { + TasksAsync mock = mock(TasksAsync.class); + + TasksAsync async = + new WrappingTasksAsync(mock) { + @Override + protected Runnable wrapRunnable(Runnable runnable) { + return runnable; + } + + @Override + protected Supplier wrapSupplier(Supplier supplier) { + return supplier; + } + }; + + Long magic = 4242424242L; + when(mock.calculateDelay(any(), anyLong(), any())).thenReturn(magic); + + soft.assertThat(async.calculateDelay(Clock.systemUTC(), 1L, Instant.EPOCH)).isEqualTo(magic); + verify(mock).calculateDelay(any(), anyLong(), any()); + verifyNoMoreInteractions(mock); + } + + @Test + public void wrapCall() throws Exception { + AtomicReference wrapped = new AtomicReference<>(); + AtomicReference called = new AtomicReference<>(); + + TasksAsync async = + new WrappingTasksAsync(base) { + @Override + protected Runnable wrapRunnable(Runnable runnable) { + wrapped.set(runnable); + return () -> { + called.set(runnable); + runnable.run(); + }; + } + + @Override + protected Supplier wrapSupplier(Supplier supplier) { + wrapped.set(supplier); + return () -> { + called.set(supplier); + return supplier.get(); + }; + } + }; + + CountDownLatch latchCall = new CountDownLatch(1); + Runnable runnable = latchCall::countDown; + async.call(runnable); + soft.assertThat(latchCall.await(10, TimeUnit.SECONDS)).isTrue(); + soft.assertThat(wrapped.get()).isSameAs(runnable); + soft.assertThat(called.get()).isSameAs(runnable); + wrapped.set(null); + called.set(null); + + CountDownLatch latchSupply = new CountDownLatch(1); + Supplier supplier = + () -> { + latchSupply.countDown(); + return "supply"; + }; + async.supply(supplier); + soft.assertThat(latchSupply.await(10, TimeUnit.SECONDS)).isTrue(); + soft.assertThat(wrapped.get()).isSameAs(supplier); + soft.assertThat(called.get()).isSameAs(supplier); + wrapped.set(null); + called.set(null); + + CountDownLatch latchSchedule = new CountDownLatch(1); + runnable = latchSchedule::countDown; + async.schedule(runnable, Instant.EPOCH); + soft.assertThat(latchSchedule.await(10, TimeUnit.SECONDS)).isTrue(); + soft.assertThat(wrapped.get()).isSameAs(runnable); + soft.assertThat(called.get()).isSameAs(runnable); + wrapped.set(null); + called.set(null); + } + + @Test + public void wrapSupply() throws Exception { + AtomicReference wrapped = new AtomicReference<>(); + AtomicReference called = new AtomicReference<>(); + + TasksAsync async = + new WrappingTasksAsync(base) { + @Override + protected Runnable wrapRunnable(Runnable runnable) { + wrapped.set(runnable); + return () -> { + called.set(runnable); + runnable.run(); + }; + } + + @Override + protected Supplier wrapSupplier(Supplier supplier) { + wrapped.set(supplier); + return () -> { + called.set(supplier); + return supplier.get(); + }; + } + }; + + CountDownLatch latchSupply = new CountDownLatch(1); + Supplier supplier = + () -> { + latchSupply.countDown(); + return "supply"; + }; + async.supply(supplier); + soft.assertThat(latchSupply.await(10, TimeUnit.SECONDS)).isTrue(); + soft.assertThat(wrapped.get()).isSameAs(supplier); + soft.assertThat(called.get()).isSameAs(supplier); + wrapped.set(null); + called.set(null); + } + + @Test + public void wrapSchedule() throws Exception { + AtomicReference wrapped = new AtomicReference<>(); + AtomicReference called = new AtomicReference<>(); + + TasksAsync async = + new WrappingTasksAsync(base) { + @Override + protected Runnable wrapRunnable(Runnable runnable) { + wrapped.set(runnable); + return () -> { + called.set(runnable); + runnable.run(); + }; + } + + @Override + protected Supplier wrapSupplier(Supplier supplier) { + wrapped.set(supplier); + return () -> { + called.set(supplier); + return supplier.get(); + }; + } + }; + + CountDownLatch latchSchedule = new CountDownLatch(1); + Runnable runnable = latchSchedule::countDown; + async.schedule(runnable, Instant.EPOCH); + soft.assertThat(latchSchedule.await(10, TimeUnit.SECONDS)).isTrue(); + soft.assertThat(wrapped.get()).isSameAs(runnable); + soft.assertThat(called.get()).isSameAs(runnable); + wrapped.set(null); + called.set(null); + } +} diff --git a/tasks/service/async/src/test/resources/META-INF/services/org.projectnessie.versioned.storage.common.persist.ObjTypeBundle b/tasks/service/async/src/test/resources/META-INF/services/org.projectnessie.versioned.storage.common.persist.ObjTypeBundle new file mode 100644 index 00000000000..59c064923cc --- /dev/null +++ b/tasks/service/async/src/test/resources/META-INF/services/org.projectnessie.versioned.storage.common.persist.ObjTypeBundle @@ -0,0 +1,17 @@ +# +# Copyright (C) 2024 Dremio +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.projectnessie.nessie.tasks.service.tasktypes.BasicTaskObjTypeBundle diff --git a/tasks/service/impl/build.gradle.kts b/tasks/service/impl/build.gradle.kts new file mode 100644 index 00000000000..7775f891e5c --- /dev/null +++ b/tasks/service/impl/build.gradle.kts @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2022 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { + id("nessie-conventions-server") + id("nessie-jacoco") +} + +extra["maven.name"] = "Nessie - Tasks - Service" + +dependencies { + implementation(project(":nessie-tasks-api")) + implementation(project(":nessie-tasks-service-async")) + implementation(project(":nessie-versioned-storage-common")) + + compileOnly(libs.jakarta.validation.api) + compileOnly(libs.jakarta.annotation.api) + + compileOnly(libs.immutables.builder) + compileOnly(libs.immutables.value.annotations) + annotationProcessor(libs.immutables.value.processor) + + compileOnly(libs.errorprone.annotations) + implementation(libs.guava) + implementation(libs.slf4j.api) + + implementation(platform(libs.opentelemetry.bom)) + implementation("io.opentelemetry:opentelemetry-api") + + compileOnly(libs.vertx.core) + + compileOnly(platform(libs.jackson.bom)) + compileOnly("com.fasterxml.jackson.core:jackson-annotations") + + testImplementation(platform(libs.junit.bom)) + testImplementation(libs.bundles.junit.testing) + testImplementation(libs.threeten.extra) + + testImplementation(libs.vertx.core) + + testCompileOnly(libs.jakarta.validation.api) + testCompileOnly(libs.jakarta.annotation.api) + + testCompileOnly(libs.immutables.builder) + testCompileOnly(libs.immutables.value.annotations) + testAnnotationProcessor(libs.immutables.value.processor) + + testImplementation(platform(libs.jackson.bom)) + testImplementation("com.fasterxml.jackson.core:jackson-databind") + testImplementation("com.fasterxml.jackson.core:jackson-annotations") + + testImplementation(project(":nessie-versioned-storage-testextension")) + testImplementation(project(":nessie-versioned-storage-inmemory")) + testRuntimeOnly(libs.logback.classic) +} diff --git a/tasks/service/impl/src/main/java/org/projectnessie/nessie/tasks/service/TasksServiceConfig.java b/tasks/service/impl/src/main/java/org/projectnessie/nessie/tasks/service/TasksServiceConfig.java new file mode 100644 index 00000000000..af360babbf0 --- /dev/null +++ b/tasks/service/impl/src/main/java/org/projectnessie/nessie/tasks/service/TasksServiceConfig.java @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.service; + +import static com.google.common.base.Preconditions.checkState; + +import org.immutables.value.Value; + +@Value.Immutable +public interface TasksServiceConfig { + + long DEFAULT_RACE_WAIT_MILLIS_MIN = 50L; + long DEFAULT_RACE_WAIT_MILLIS_MAX = 200L; + + @Value.Parameter(order = 1) + String name(); + + @Value.Parameter(order = 2) + @Value.Default + default long raceWaitMillisMin() { + return DEFAULT_RACE_WAIT_MILLIS_MIN; + } + + @Value.Parameter(order = 3) + @Value.Default + default long raceWaitMillisMax() { + return DEFAULT_RACE_WAIT_MILLIS_MAX; + } + + static TasksServiceConfig tasksServiceConfig( + String name, long raceWaitMillisMin, long raceWaitMillisMax) { + return ImmutableTasksServiceConfig.of(name, raceWaitMillisMin, raceWaitMillisMax); + } + + @Value.Check + default void check() { + checkState(raceWaitMillisMin() < raceWaitMillisMax()); + checkState(raceWaitMillisMin() > 0L); + } +} diff --git a/tasks/service/impl/src/main/java/org/projectnessie/nessie/tasks/service/impl/TaskServiceMetrics.java b/tasks/service/impl/src/main/java/org/projectnessie/nessie/tasks/service/impl/TaskServiceMetrics.java new file mode 100644 index 00000000000..2803fdb12eb --- /dev/null +++ b/tasks/service/impl/src/main/java/org/projectnessie/nessie/tasks/service/impl/TaskServiceMetrics.java @@ -0,0 +1,125 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.service.impl; + +import org.projectnessie.nessie.tasks.api.TaskRequest; +import org.projectnessie.nessie.tasks.api.TaskStatus; +import org.projectnessie.nessie.tasks.api.Tasks; + +/** Used to implement metrics (counters) and to verify interactions in tests. */ +public interface TaskServiceMetrics { + + /** A new per-task singleton has been created. */ + void startNewTaskController(); + + /** Task attempt iteration. */ + void taskAttempt(); + + /** Final, successful task result returned. */ + void taskAttemptFinalSuccess(); + + /** Final, failure task result returned. */ + void taskAttemptFinalFailure(); + + /** Task attempt detected that task is running in another process. */ + void taskAttemptRunning(); + + /** Task attempt detected that task ran into a retryable error. */ + void taskAttemptErrorRetry(); + + /** New task object is being created in the database. */ + void taskCreation(); + + /** New task object not created due to conditional-update race. */ + void taskCreationRace(); + + /** Unhandled error while storing new task object. */ + void taskCreationUnhandled(); + + /** Unhandled error during task attempt. */ + void taskAttemptUnhandled(); + + /** Task successfully updated with retry information. */ + void taskRetryStateChangeSucceeded(); + + /** Task updated with retry information failed due to conditional-update race. */ + void taskRetryStateChangeRace(); + + /** Starting task execution locally. */ + void taskExecution(); + + /** Local task execution finished. */ + void taskExecutionFinished(); + + /** + * Local task execution finished with successful result, state successfully updated in database. + */ + void taskExecutionResult(); + + /** + * Local task execution finished with successful result, failed to update state in database due to + * conditional-update race. + */ + void taskExecutionResultRace(); + + /** Local task execution finished with retryable error, state successfully updated in database. */ + void taskExecutionRetryableError(); + + /** Local task execution finished with failure result, state successfully updated in database. */ + void taskExecutionFailure(); + + /** + * Local task execution finished with failure result, failed to update state in database due to + * conditional-update race. + */ + void taskExecutionFailureRace(); + + /** Unhandled error during task execution. */ + void taskExecutionUnhandled(); + + /** A lost-task has been detected. */ + void taskLossDetected(); + + /** Lost task successfully reassigned. */ + void taskLostReassigned(); + + /** Lost task not reassigned (race with other instance). */ + void taskLostReassignRace(); + + /** Starting running-state update with fresh values. */ + void taskUpdateRunningState(); + + /** Running-state update with fresh values succeeded. */ + void taskRunningStateUpdated(); + + /** Running-state update with fresh values ran into race with another instance. */ + void taskRunningStateUpdateRace(); + + /** Running-state update cancelled, because task is no longer in running-state. */ + void taskRunningStateUpdateNoLongerRunning(); + + /** + * Task object requested for {@link Tasks#submit(TaskRequest)} has the final {@link + * TaskStatus#FAILURE FAILURE} state and is returned immediately. + */ + void taskHasFinalFailure(); + + /** + * Task object requested for {@link Tasks#submit(TaskRequest)} has the final {@link + * TaskStatus#SUCCESS SUCCESS} state and is returned immediately. + */ + void taskHasFinalSuccess(); +} diff --git a/tasks/service/impl/src/main/java/org/projectnessie/nessie/tasks/service/impl/TasksServiceImpl.java b/tasks/service/impl/src/main/java/org/projectnessie/nessie/tasks/service/impl/TasksServiceImpl.java new file mode 100644 index 00000000000..c7418bdd078 --- /dev/null +++ b/tasks/service/impl/src/main/java/org/projectnessie/nessie/tasks/service/impl/TasksServiceImpl.java @@ -0,0 +1,602 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.service.impl; + +import static com.google.common.base.Preconditions.checkState; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.CompletableFuture.completedStage; +import static java.util.concurrent.CompletableFuture.failedStage; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ConcurrentModificationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.immutables.value.Value; +import org.projectnessie.nessie.tasks.api.TaskBehavior; +import org.projectnessie.nessie.tasks.api.TaskObj; +import org.projectnessie.nessie.tasks.api.TaskRequest; +import org.projectnessie.nessie.tasks.api.TaskState; +import org.projectnessie.nessie.tasks.api.TaskStatus; +import org.projectnessie.nessie.tasks.api.Tasks; +import org.projectnessie.nessie.tasks.api.TasksService; +import org.projectnessie.nessie.tasks.async.TasksAsync; +import org.projectnessie.nessie.tasks.service.TasksServiceConfig; +import org.projectnessie.versioned.storage.common.exceptions.ObjNotFoundException; +import org.projectnessie.versioned.storage.common.exceptions.ObjTooLargeException; +import org.projectnessie.versioned.storage.common.persist.Obj; +import org.projectnessie.versioned.storage.common.persist.ObjId; +import org.projectnessie.versioned.storage.common.persist.Persist; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TasksServiceImpl implements TasksService { + private static final Logger LOGGER = LoggerFactory.getLogger(TasksServiceImpl.class); + + private final String name; + private final TasksAsync async; + private final TaskServiceMetrics metrics; + + private final long raceWaitMillisMin; + private final long raceWaitMillisMax; + + private final ConcurrentMap> currentTasks = + new ConcurrentHashMap<>(); + + private volatile boolean shutdown; + + public TasksServiceImpl(TasksAsync async, TaskServiceMetrics metrics, TasksServiceConfig config) { + this.async = async; + this.metrics = metrics; + this.name = config.name(); + this.raceWaitMillisMin = config.raceWaitMillisMin(); + this.raceWaitMillisMax = config.raceWaitMillisMax(); + } + + @Override + public CompletionStage shutdown() { + shutdown = true; + return CompletableFuture.allOf( + currentTasks.values().stream() + .map(CompletionStage::toCompletableFuture) + .toArray(CompletableFuture[]::new)) + .thenApply(x -> null); + } + + @Override + public Tasks forPersist(Persist persist) { + return new TasksImpl(persist); + } + + CompletionStage submit( + Persist persist, TaskRequest taskRequest) { + ObjId objId = taskRequest.objId(); + + // Try to get the object and immediately return if it has a final state. We expect to hit final + // states way more often, so preventing the concurrent-hash-map interactions and especially the + // asynchronous task handling improves the implementation. + // TODO using `fetchObj()` would be wrong here, because it is *synchronous* and can block. + // Options: + // a) remove this "optimization" + // b) add a `getObjIfCached(ObjId)` --> chosen for now + // c) add a `fetchObjAsync()`, but adding async variants to all database implementations will + // be tricky + Obj obj = persist.getImmediate(taskRequest.objId()); + if (obj != null) { + T taskObj = castObj(taskRequest, obj); + TaskStatus status = taskObj.taskState().status(); + switch (status) { + case FAILURE: + metrics.taskHasFinalFailure(); + return failedStage(taskRequest.behavior().stateAsException(taskObj)); + case SUCCESS: + metrics.taskHasFinalSuccess(); + return completedStage(taskObj); + default: + // task object exists but has a non-final state, handle it asynchronously + checkState(!status.isFinal(), "Expect non-final task status"); + break; + } + } + + // Ensure that only one "base" completable future exists for each obj-id. + TaskKey taskKey = TaskKey.taskKey(persist.config().repositoryId(), objId); + + // The shutdown-check can be racy, if shutdown() is called after the `if` but before the access + // to `currentTasks`. But since `shutdown()` is usually only relevant for tests, that trade-off + // is acceptable. + if (shutdown) { + return CompletableFuture.failedStage( + new IllegalStateException("Tasks service already shutdown")); + } + + @SuppressWarnings("unchecked") + CompletionStage r = + (CompletionStage) + currentTasks.computeIfAbsent( + taskKey, + id -> { + metrics.startNewTaskController(); + ExecParams execParams = new ExecParams(persist, taskRequest); + LOGGER.trace("{}: Starting new local task controller for {}", name, execParams); + async.call(() -> tryLocal(execParams)); + return execParams.resultFuture; + }); + return r; + } + + private void finalResult(ExecParams params, TaskObj result) { + try { + params.resultFuture.complete(result); + } finally { + removeFromCurrentTasks(params); + } + } + + private void finalFailure(ExecParams params, Throwable t) { + try { + params.resultFuture.completeExceptionally(t); + } finally { + removeFromCurrentTasks(params); + } + } + + private void removeFromCurrentTasks(ExecParams params) { + TaskKey taskKey = TaskKey.taskKey(params.persist.config().repositoryId(), params.objId()); + currentTasks.remove(taskKey); + } + + private void tryLocal(ExecParams params) { + // Called from a thread pool, need to lock. + params.lock.lock(); + try { + metrics.taskAttempt(); + LOGGER.trace("{}: Task evaluation attempt for {}", name, params); + + TaskObj obj = castObj(params.taskRequest, params.persist.fetchObj(params.objId())); + // keep in mind: `obj` might be a locally cached instance that is not in sync w/ the + // database! + + TaskState state = obj.taskState(); + LOGGER.trace("{}: Evaluating task for {} with state {}", name, params, state); + + switch (state.status()) { + case SUCCESS: + metrics.taskAttemptFinalSuccess(); + finalResult(params, obj); + break; + case FAILURE: + metrics.taskAttemptFinalFailure(); + finalFailure(params, params.taskRequest.behavior().stateAsException(obj)); + break; + case RUNNING: + metrics.taskAttemptRunning(); + checkRunningTask(params, state, obj); + break; + case ERROR_RETRY: + metrics.taskAttemptErrorRetry(); + maybeAttemptErrorRetry(params, state, obj); + break; + default: + throw new IllegalStateException("Unknown task status " + state.status()); + } + + } catch (ObjNotFoundException e) { + LOGGER.trace("{}: Task for {} does not yet exist, creating", name, params); + + try { + metrics.taskCreation(); + TaskBehavior behavior = params.taskRequest.behavior(); + TaskObj obj = + withNewVersionToken( + params + .taskRequest + .applyRequestToObjBuilder(behavior.newObjBuilder()) + .id(params.taskRequest.objId()) + .type(params.taskRequest.objType()) + .taskState(behavior.runningTaskState(async.clock(), null))); + + if (params.persist.storeObj(obj)) { + LOGGER.trace("{}: Task creation for {} succeeded", name, params); + issueLocalTaskExecution(params, obj); + } else { + LOGGER.trace("{}: Task creation for {} failed, retrying", name, params); + + // Another process stored the task-obj for the task-request, reschedule but do not loop to + // be "nice" and give other requests the ability to run. + metrics.taskCreationRace(); + reattemptAfterRace(params); + } + } catch (Throwable t) { + // Unhandled failure + LOGGER.error( + "{}: Unhandled state while storing initial task execution state for {}", + name, + params, + t); + metrics.taskCreationUnhandled(); + finalFailure(params, t); + } + } catch (Throwable t) { + // Unhandled failure + LOGGER.error("{}: Unhandled state during local task attempt for {}", name, params, t); + metrics.taskAttemptUnhandled(); + finalFailure(params, t); + } finally { + params.lock.unlock(); + } + } + + // Called while ExecParams is locked from tryLocal() + private void checkRunningTask(ExecParams params, TaskState state, TaskObj obj) + throws ObjTooLargeException { + Instant now = async.clock().instant(); + if (now.compareTo(requireNonNull(state.lostNotBefore())) >= 0) { + metrics.taskLossDetected(); + LOGGER.warn("{}: Detected lost task for {}", name, params); + TaskBehavior behavior = params.taskRequest.behavior(); + TaskObj retryState = + withNewVersionToken( + behavior + .newObjBuilder() + .from(obj) + .taskState(behavior.runningTaskState(async.clock(), obj))); + + if (params.persist.updateConditional(obj, retryState)) { + metrics.taskLostReassigned(); + issueLocalTaskExecution(params, retryState); + } else { + metrics.taskLostReassignRace(); + reattemptAfterRace(params); + } + } else { + async.schedule(() -> tryLocal(params), state.retryNotBefore()); + } + } + + // Called while ExecParams is locked from tryLocal() + private void maybeAttemptErrorRetry(ExecParams params, TaskState state, TaskObj obj) + throws ObjTooLargeException { + Instant now = async.clock().instant(); + if (now.compareTo(requireNonNull(state.retryNotBefore())) >= 0) { + TaskBehavior behavior = params.taskRequest.behavior(); + TaskObj retryState = + withNewVersionToken( + behavior + .newObjBuilder() + .from(obj) + .taskState(behavior.runningTaskState(async.clock(), obj))); + + if (params.persist.updateConditional(obj, retryState)) { + metrics.taskRetryStateChangeSucceeded(); + issueLocalTaskExecution(params, retryState); + } else { + metrics.taskRetryStateChangeRace(); + reattemptAfterRace(params); + } + } else { + async.schedule(() -> tryLocal(params), state.retryNotBefore()); + } + } + + private void reattemptAfterRace(ExecParams params) { + long raceWaitMillis = + ThreadLocalRandom.current().nextLong(raceWaitMillisMin, raceWaitMillisMax); + async.schedule( + () -> tryLocal(params), async.clock().instant().plus(raceWaitMillis, ChronoUnit.MILLIS)); + } + + // Called while ExecParams is locked from tryLocal() + private void issueLocalTaskExecution(ExecParams params, TaskObj obj) { + LOGGER.debug("{}: Starting local task execution for {}", name, params); + metrics.taskExecution(); + + params.runningObj = obj; + scheduleTaskRunningUpdate(params, obj); + + params + .taskRequest + .submitExecution() + .whenComplete( + (resultBuilder, failure) -> localTaskFinished(params, resultBuilder, failure)); + } + + private void localTaskFinished( + ExecParams params, TaskObj.Builder resultBuilder, Throwable failure) { + // Called from a thread pool, need to lock. + params.lock.lock(); + try { + TaskObj expected = params.runningObj; + + params.cancelRunningStateUpdate(); + + metrics.taskExecutionFinished(); + + if (expected == null) { + unexpectedNullExpectedState(params, resultBuilder, failure); + return; + } + + if (resultBuilder != null) { + TaskObj r = withNewVersionToken(resultBuilder); + + LOGGER.trace("{}, Task execution for {} succeeded, updating database", name, params); + + // Task execution succeeded with a final result + if (params.persist.updateConditional(expected, r)) { + metrics.taskExecutionResult(); + // Database updated with final result + LOGGER.debug( + "{}: Task execution success result for {} updated in database, returning final result", + name, + params); + finalResult(params, r); + } else { + metrics.taskExecutionResultRace(); + // Another process updated the database state in the meantime. + String msg = + format( + "Failed to update successful task execution result for %s in database (race condition), exposing as a failure", + params); + LOGGER.warn("{}: {}", name, msg); + finalFailure(params, new ConcurrentModificationException(msg, failure)); + } + } else if (failure == null) { + failure = + new NullPointerException("Local task execution return a null object, which is illegal"); + } + if (failure != null) { + LOGGER.trace("{}: Task execution for {} failed, updating database", name, params); + + TaskBehavior behavior = params.taskRequest.behavior(); + TaskState newState = behavior.asErrorTaskState(async.clock(), expected, failure); + checkState(newState.status().isError()); + TaskObj updatedObj = + withNewVersionToken(behavior.newObjBuilder().from(expected).taskState(newState)); + if (params.persist.updateConditional(expected, updatedObj)) { + // Database updated with final result + if (newState.status().isRetryable()) { + metrics.taskExecutionRetryableError(); + LOGGER.debug( + "{}: Task execution raised retryable error for {} updated in database, retrying", + name, + params); + reattemptAfterRetryableError(params, newState.retryNotBefore()); + } else { + metrics.taskExecutionFailure(); + LOGGER.debug( + "{}: Task execution ended in final failure for {} updated in database, returning final result", + name, + params); + finalFailure(params, failure); + } + } else { + metrics.taskExecutionFailureRace(); + String msg = + format( + "Failed to update failure task execution result for %s in database (race condition)", + params); + LOGGER.warn("{}: {}", name, msg); + finalFailure(params, new ConcurrentModificationException(msg, failure)); + } + } + + } catch (Throwable t2) { + // Unhandled failure + LOGGER.error( + "{}: Unhandled state while evaluating task execution result for {}", name, params, t2); + metrics.taskExecutionUnhandled(); + finalFailure(params, t2); + } finally { + params.lock.unlock(); + } + } + + private void unexpectedNullExpectedState( + ExecParams params, TaskObj.Builder resultBuilder, Throwable failure) { + // Oops ... no clue how that might have happened, but handle it just in case. + String res; + if (failure != null) { + res = "exceptionally"; + } else if (resultBuilder != null) { + res = "successfully"; + } else { + res = "with an illegal null result"; + } + String msg = + format( + "Task execution for %s finished %s, but the expected task obj state is null. Cannot persist the task execution result.", + params, res); + LOGGER.error("{}, {}", name, msg); + Exception ex = new IllegalStateException(msg); + if (failure != null) { + ex.addSuppressed(failure); + } + finalFailure(params, ex); + } + + private void scheduleTaskRunningUpdate(ExecParams params, TaskObj current) { + // Called while holding the ExecParams.lock + Instant scheduleNotBefore = + params.taskRequest.behavior().performRunningStateUpdateAt(async.clock(), current); + params.runningUpdateScheduled = + async.schedule(() -> updateRunningState(params), scheduleNotBefore); + } + + private void updateRunningState(ExecParams params) { + // Called from a thread pool, need to lock. + params.lock.lock(); + try { + + TaskObj current = params.runningObj; + if (current == null) { + // Local task execution finished, do nothing. + LOGGER.trace( + "{}: Local task execution has finished, no need to update running state for {}", + name, + params); + return; + } + + metrics.taskUpdateRunningState(); + TaskState state = current.taskState(); + if (state.status() == TaskStatus.RUNNING) { + TaskBehavior behavior = params.taskRequest.behavior(); + TaskObj updated = + withNewVersionToken( + behavior + .newObjBuilder() + .from(current) + .taskState(behavior.runningTaskState(async.clock(), null))); + if (updated.taskState().status() != TaskStatus.RUNNING) { + throw new IllegalStateException( + format( + "TaskBehavior.runningTaskState() implementation %s returned illegal status %s, must return RUNNING", + behavior.getClass().getName(), updated.taskState().status())); + } + + try { + if (params.persist.updateConditional(current, updated)) { + params.runningObj = updated; + metrics.taskRunningStateUpdated(); + // Current state successfully updated in database, reschedule running task update + LOGGER.trace( + "{}: Successfully updated state for locally running task for {}", name, params); + scheduleTaskRunningUpdate(params, updated); + } else { + metrics.taskRunningStateUpdateRace(); + // Ran into a (remote) race, retry running-update + LOGGER.warn( + "{}: Race on database update while updating running state for {}. The result of the local task " + + "execution might be lost. When the local task execution finishes, it may also run into an " + + "update-race, indicating that the task-result is lost.", + name, + params); + return; // don't re-schedule, there's no chance that another update will succeed. + } + } catch (Throwable t) { + LOGGER.error("{}: Unexpected exception updating task state for {}", name, params, t); + // re-schedule ... and pray + scheduleTaskRunningUpdate(params, current); + } + } else { + metrics.taskRunningStateUpdateNoLongerRunning(); + LOGGER.trace( + "{}: Task for {} no longer running, skipping further local running state updates", + name, + params); + } + } finally { + params.lock.unlock(); + } + } + + private void reattemptAfterRetryableError(ExecParams params, Instant retryNotBefore) { + async.schedule(() -> tryLocal(params), retryNotBefore); + } + + private static final class ExecParams { + final Persist persist; + final CompletableFuture resultFuture; + final TaskRequest taskRequest; + + final Lock lock = new ReentrantLock(); + + TaskObj runningObj; + CompletionStage runningUpdateScheduled; + + @SuppressWarnings("unchecked") + ExecParams(Persist persist, TaskRequest taskRequest) { + this.persist = persist; + this.resultFuture = new CompletableFuture<>(); + this.taskRequest = (TaskRequest) taskRequest; + } + + ObjId objId() { + return taskRequest.objId(); + } + + void cancelRunningStateUpdate() { + // Cancel scheduled running-state update + CompletionStage handle = runningUpdateScheduled; + if (handle != null) { + runningObj = null; + runningUpdateScheduled = null; + // Don't interrupt, not all implementations support that (Vert.X won't, see + // https://github.com/eclipse-vertx/vert.x/issues/3334) + handle.toCompletableFuture().cancel(false); + } + } + + @Override + public String toString() { + return taskRequest.objType().name() + ':' + taskRequest.objId(); + } + } + + private TaskObj withNewVersionToken(TaskObj.Builder builder) { + return builder.versionToken(ObjId.randomObjId().toString()).build(); + } + + private static T castObj( + TaskRequest taskRequest, Obj obj) { + Class clazz = taskRequest.behavior().objType().targetClass(); + try { + @SuppressWarnings("unchecked") + T taskObj = (T) clazz.cast(obj); + return taskObj; + } catch (ClassCastException e) { + throw new ClassCastException( + "Failed to cast obj of type " + + obj.type().name() + + " to the task request's expected type " + + clazz.getName()); + } + } + + final class TasksImpl implements Tasks { + final Persist persist; + + public TasksImpl(Persist persist) { + this.persist = persist; + } + + @Override + public CompletionStage submit( + TaskRequest taskRequest) { + return TasksServiceImpl.this.submit(persist, taskRequest); + } + } + + @Value.Immutable + interface TaskKey { + @Value.Parameter(order = 1) + String repositoryId(); + + @Value.Parameter(order = 2) + ObjId objId(); + + static TaskKey taskKey(String repositoryId, ObjId objId) { + return ImmutableTaskKey.of(repositoryId, objId); + } + } +} diff --git a/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/impl/TestTasksServiceCachingImpl.java b/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/impl/TestTasksServiceCachingImpl.java new file mode 100644 index 00000000000..229332b8136 --- /dev/null +++ b/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/impl/TestTasksServiceCachingImpl.java @@ -0,0 +1,21 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.service.impl; + +import org.projectnessie.versioned.storage.testextension.NessiePersistCache; + +@NessiePersistCache +public class TestTasksServiceCachingImpl extends TestTasksServiceImpl {} diff --git a/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/impl/TestTasksServiceImpl.java b/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/impl/TestTasksServiceImpl.java new file mode 100644 index 00000000000..7bdf6cc6a97 --- /dev/null +++ b/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/impl/TestTasksServiceImpl.java @@ -0,0 +1,1007 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.service.impl; + +import static com.google.common.base.Preconditions.checkState; +import static org.assertj.core.api.InstanceOfAssertFactories.type; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.projectnessie.nessie.tasks.service.TasksServiceConfig.DEFAULT_RACE_WAIT_MILLIS_MAX; +import static org.projectnessie.nessie.tasks.service.TasksServiceConfig.DEFAULT_RACE_WAIT_MILLIS_MIN; +import static org.projectnessie.nessie.tasks.service.tasktypes.BasicTaskBehavior.FRESH_LOST_RETRY_NOT_BEFORE; +import static org.projectnessie.nessie.tasks.service.tasktypes.BasicTaskBehavior.FRESH_RUNNING_RETRY_NOT_BEFORE; +import static org.projectnessie.nessie.tasks.service.tasktypes.BasicTaskBehavior.RETRYABLE_ERROR_NOT_BEFORE; +import static org.projectnessie.nessie.tasks.service.tasktypes.BasicTaskRequest.basicTaskRequest; +import static org.projectnessie.versioned.storage.common.config.StoreConfig.CONFIG_REPOSITORY_ID; + +import java.time.Instant; +import java.time.ZoneId; +import java.time.temporal.ChronoUnit; +import java.util.ConcurrentModificationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; +import org.projectnessie.nessie.tasks.api.TaskObj; +import org.projectnessie.nessie.tasks.api.TaskState; +import org.projectnessie.nessie.tasks.api.Tasks; +import org.projectnessie.nessie.tasks.service.TasksServiceConfig; +import org.projectnessie.nessie.tasks.service.tasktypes.BasicTaskObj; +import org.projectnessie.nessie.tasks.service.tasktypes.BasicTaskRequest; +import org.projectnessie.nessie.tasks.service.tasktypes.RetryableException; +import org.projectnessie.versioned.storage.common.persist.Persist; +import org.projectnessie.versioned.storage.inmemory.InmemoryBackendTestFactory; +import org.projectnessie.versioned.storage.testextension.NessieBackend; +import org.projectnessie.versioned.storage.testextension.NessiePersist; +import org.projectnessie.versioned.storage.testextension.NessieStoreConfig; +import org.projectnessie.versioned.storage.testextension.PersistExtension; +import org.threeten.extra.MutableClock; + +@ExtendWith({PersistExtension.class, SoftAssertionsExtension.class}) +@NessieBackend(InmemoryBackendTestFactory.class) +public class TestTasksServiceImpl { + @NessiePersist static Persist persist; + @InjectSoftAssertions protected SoftAssertions soft; + + @Test + public void multipleRepos( + @NessieStoreConfig(name = CONFIG_REPOSITORY_ID, value = "some-other") @NessiePersist + Persist otherRepo) { + MutableClock clock = MutableClock.of(Instant.now(), ZoneId.of("UTC")); + TestingTasksAsync async = new TestingTasksAsync(clock); + TaskServiceMetrics metrics = mock(TaskServiceMetrics.class); + TasksServiceImpl service = new TasksServiceImpl(async, metrics, tasksServiceConfig(1)); + + Tasks tasks1 = service.forPersist(persist); + Tasks tasks2 = service.forPersist(otherRepo); + + CompletableFuture taskCompletionStage1 = new CompletableFuture<>(); + CompletableFuture taskCompletionStage2 = new CompletableFuture<>(); + + BasicTaskRequest taskRequest1 = basicTaskRequest("hello", () -> taskCompletionStage1); + BasicTaskRequest taskRequest2 = basicTaskRequest("hello", () -> taskCompletionStage2); + + // Want the same ObjId to verify that multiple repositories work fine + soft.assertThat(taskRequest1.objId()).isEqualTo(taskRequest2.objId()); + + // Submit task request for repo 1 + CompletableFuture taskFuture1 = tasks1.submit(taskRequest1).toCompletableFuture(); + verify(metrics).startNewTaskController(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + // Submit task request for repo 2 + CompletableFuture taskFuture2 = tasks2.submit(taskRequest2).toCompletableFuture(); + soft.assertThat(taskFuture2).isNotSameAs(taskFuture1); + verify(metrics).startNewTaskController(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + // 2nd request for the same task 1 + CompletableFuture taskFuture1b = + tasks1.submit(basicTaskRequest("hello", () -> null)).toCompletableFuture(); + soft.assertThat(taskFuture1b).isSameAs(taskFuture1).isNotSameAs(taskFuture2); + verifyNoMoreInteractions(metrics); + + // 2nd request for the same task 2 + CompletableFuture taskFuture2b = + tasks2.submit(basicTaskRequest("hello", () -> null)).toCompletableFuture(); + soft.assertThat(taskFuture2b).isSameAs(taskFuture2).isNotSameAs(taskFuture1); + verifyNoMoreInteractions(metrics); + + soft.assertThat(async.doWork()).isEqualTo(2); // task 1 + 2 + soft.assertThat(taskFuture1).isNotDone(); + soft.assertThat(taskFuture2).isNotDone(); + verify(metrics, times(2)).taskAttempt(); // task 1 + 2 + verify(metrics, times(2)).taskCreation(); // task 1 + 2 + verify(metrics, times(2)).taskExecution(); // task 1 + 2 + verifyNoMoreInteractions(metrics); + reset(metrics); + + clock.add(250, ChronoUnit.MILLIS); + taskCompletionStage1.complete( + BasicTaskObj.builder() + .id(taskRequest1.objId()) + .taskParameter(taskRequest1.taskParameter()) + .taskResult(taskRequest1.taskParameter() + " finished") + .taskState(TaskState.successState())); + soft.assertThat(taskFuture1).isCompleted(); + soft.assertThat(async.doWork()).isEqualTo(1); // task 2 + verify(metrics).taskExecutionFinished(); // task 1 + verify(metrics).taskExecutionResult(); // task 1 + verify(metrics).taskUpdateRunningState(); // task 2 + verify(metrics).taskRunningStateUpdated(); // task 2 + verifyNoMoreInteractions(metrics); + reset(metrics); + + taskCompletionStage2.complete( + BasicTaskObj.builder() + .id(taskRequest2.objId()) + .taskParameter(taskRequest2.taskParameter()) + .taskResult(taskRequest2.taskParameter() + " finished") + .taskState(TaskState.successState())); + soft.assertThat(taskFuture2).isCompleted(); + soft.assertThat(async.doWork()).isEqualTo(0); + verify(metrics).taskExecutionFinished(); // task 2 + verify(metrics).taskExecutionResult(); // task 2 + verifyNoMoreInteractions(metrics); + reset(metrics); + } + + @Test + public void singleServiceSingleConsumer() throws Exception { + MutableClock clock = MutableClock.of(Instant.now(), ZoneId.of("UTC")); + TestingTasksAsync async = new TestingTasksAsync(clock); + + CompletableFuture taskCompletionStage = new CompletableFuture<>(); + + TaskServiceMetrics metrics = mock(TaskServiceMetrics.class); + TasksServiceImpl service = new TasksServiceImpl(async, metrics, tasksServiceConfig(1)); + Tasks tasks = service.forPersist(persist); + BasicTaskRequest taskRequest = basicTaskRequest("hello", () -> taskCompletionStage); + + CompletableFuture taskFuture = tasks.submit(taskRequest).toCompletableFuture(); + verify(metrics).startNewTaskController(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + // 2nd request for the same task (would fail w/ the `null` completion-stage) + CompletableFuture taskFuture2 = + tasks.submit(basicTaskRequest("hello", () -> null)).toCompletableFuture(); + soft.assertThat(taskFuture2).isSameAs(taskFuture); + verifyNoMoreInteractions(metrics); + + soft.assertThat(async.doWork()).isEqualTo(1); + soft.assertThat(taskFuture).isNotDone(); + verify(metrics).taskAttempt(); + verify(metrics).taskCreation(); + verify(metrics).taskExecution(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + clock.add(250, ChronoUnit.MILLIS); + soft.assertThat(async.doWork()).isEqualTo(1); + soft.assertThat(taskFuture).isNotDone(); + verify(metrics).taskUpdateRunningState(); + verify(metrics).taskRunningStateUpdated(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + clock.add(250, ChronoUnit.MILLIS); + taskCompletionStage.complete( + BasicTaskObj.builder() + .id(taskRequest.objId()) + .taskParameter(taskRequest.taskParameter()) + .taskResult(taskRequest.taskParameter() + " finished") + .taskState(TaskState.successState())); + soft.assertThat(taskFuture).isCompleted(); + soft.assertThat(async.doWork()).isEqualTo(0); + verify(metrics).taskExecutionFinished(); + verify(metrics).taskExecutionResult(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + clock.add(250, ChronoUnit.MILLIS); + soft.assertThat(async.doWork()).isEqualTo(0); + verifyNoMoreInteractions(metrics); + + clock.add(1, ChronoUnit.SECONDS); + soft.assertThat(async.doWork()).isEqualTo(0); + soft.assertThat(taskFuture.get()) + .asInstanceOf(type(BasicTaskObj.class)) + .extracting(BasicTaskObj::taskResult) + .isEqualTo("hello finished"); + verifyNoMoreInteractions(metrics); + + // Validate the optimization for the cached-object in TaskServiceImpl.execute() + BasicTaskRequest req = basicTaskRequest("hello", () -> null); + if (persist.getImmediate(req.objId()) != null) { + CompletableFuture followUp = tasks.submit(req).toCompletableFuture(); + soft.assertThat(async.doWork()).isEqualTo(0); + soft.assertThat(followUp).isCompleted(); + verify(metrics).taskHasFinalSuccess(); + verifyNoMoreInteractions(metrics); + reset(metrics); + } + } + + @Test + public void singleServiceSingleConsumerFailure() { + MutableClock clock = MutableClock.of(Instant.now(), ZoneId.of("UTC")); + TestingTasksAsync async = new TestingTasksAsync(clock); + + CompletableFuture taskCompletionStage = new CompletableFuture<>(); + + TaskServiceMetrics metrics = mock(TaskServiceMetrics.class); + TasksServiceImpl service = new TasksServiceImpl(async, metrics, tasksServiceConfig(1)); + Tasks tasks = service.forPersist(persist); + BasicTaskRequest taskRequest = basicTaskRequest("hello", () -> taskCompletionStage); + + CompletableFuture taskFuture = tasks.submit(taskRequest).toCompletableFuture(); + verify(metrics).startNewTaskController(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + // 2nd request for the same task (would fail w/ the `null` completion-stage) + CompletableFuture taskFuture2 = + tasks.submit(basicTaskRequest("hello", () -> null)).toCompletableFuture(); + soft.assertThat(taskFuture2).isSameAs(taskFuture); + verifyNoMoreInteractions(metrics); + + soft.assertThat(async.doWork()).isEqualTo(1); + soft.assertThat(taskFuture).isNotDone(); + verify(metrics).taskAttempt(); + verify(metrics).taskCreation(); + verify(metrics).taskExecution(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + clock.add(250, ChronoUnit.MILLIS); + soft.assertThat(async.doWork()).isEqualTo(1); + soft.assertThat(taskFuture).isNotDone(); + verify(metrics).taskUpdateRunningState(); + verify(metrics).taskRunningStateUpdated(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + clock.add(250, ChronoUnit.MILLIS); + taskCompletionStage.completeExceptionally(new RuntimeException("failed task")); + soft.assertThat(taskFuture).isCompletedExceptionally(); + soft.assertThat(async.doWork()).isEqualTo(0); + verify(metrics).taskExecutionFinished(); + verify(metrics).taskExecutionFailure(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + clock.add(250, ChronoUnit.MILLIS); + soft.assertThat(async.doWork()).isEqualTo(0); + verifyNoMoreInteractions(metrics); + + clock.add(1, ChronoUnit.SECONDS); + soft.assertThat(async.doWork()).isEqualTo(0); + verifyNoMoreInteractions(metrics); + + // Validate the optimization for the cached-object in TaskServiceImpl.execute() + BasicTaskRequest req = basicTaskRequest("hello", () -> null); + if (persist.getImmediate(req.objId()) != null) { + CompletableFuture followUp = tasks.submit(req).toCompletableFuture(); + soft.assertThat(async.doWork()).isEqualTo(0); + soft.assertThat(followUp).isCompletedExceptionally(); + verify(metrics).taskHasFinalFailure(); + verifyNoMoreInteractions(metrics); + reset(metrics); + } + } + + @Test + public void taskReturnsIllegalNullResult() { + MutableClock clock = MutableClock.of(Instant.now(), ZoneId.of("UTC")); + TestingTasksAsync async = new TestingTasksAsync(clock); + + CompletableFuture taskCompletionStage = new CompletableFuture<>(); + + TaskServiceMetrics metrics = mock(TaskServiceMetrics.class); + TasksServiceImpl service = new TasksServiceImpl(async, metrics, tasksServiceConfig(1)); + Tasks tasks = service.forPersist(persist); + BasicTaskRequest taskRequest = basicTaskRequest("hello", () -> taskCompletionStage); + + CompletableFuture taskFuture = tasks.submit(taskRequest).toCompletableFuture(); + verify(metrics).startNewTaskController(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + soft.assertThat(async.doWork()).isEqualTo(1); + soft.assertThat(taskFuture).isNotDone(); + verify(metrics).taskAttempt(); + verify(metrics).taskCreation(); + verify(metrics).taskExecution(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + clock.add(250, ChronoUnit.MILLIS); + soft.assertThat(async.doWork()).isEqualTo(1); + soft.assertThat(taskFuture).isNotDone(); + verify(metrics).taskUpdateRunningState(); + verify(metrics).taskRunningStateUpdated(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + clock.add(250, ChronoUnit.MILLIS); + taskCompletionStage.complete(null); + soft.assertThat(taskFuture).isCompletedExceptionally(); + soft.assertThat(async.doWork()).isEqualTo(0); + verify(metrics).taskExecutionFinished(); + verify(metrics).taskExecutionFailure(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + soft.assertThatThrownBy(taskFuture::get) + .isInstanceOf(ExecutionException.class) + .cause() + .isInstanceOf(NullPointerException.class) + .hasMessage("Local task execution return a null object, which is illegal"); + } + + @Test + public void runningTaskUpdateRace() { + MutableClock clock = MutableClock.of(Instant.now(), ZoneId.of("UTC")); + TestingTasksAsync async = new TestingTasksAsync(clock); + + CompletableFuture taskCompletionStage = new CompletableFuture<>(); + + TaskServiceMetrics metrics = mock(TaskServiceMetrics.class); + TasksServiceImpl service = new TasksServiceImpl(async, metrics, tasksServiceConfig(1)); + Tasks tasks = service.forPersist(persist); + BasicTaskRequest taskRequest = basicTaskRequest("hello", () -> taskCompletionStage); + + CompletableFuture taskFuture = tasks.submit(taskRequest).toCompletableFuture(); + verify(metrics).startNewTaskController(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + soft.assertThat(async.doWork()).isEqualTo(1); + soft.assertThat(taskFuture).isNotDone(); + verify(metrics).taskAttempt(); + verify(metrics).taskCreation(); + verify(metrics).taskExecution(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + Mockito.doAnswer( + invocation -> { + BasicTaskObj obj = + persist.fetchTypedObj(taskRequest.objId(), BasicTaskObj.TYPE, BasicTaskObj.class); + TaskObj updated = + BasicTaskObj.builder() + .from(obj) + .versionToken(obj.versionToken() + "_concurrent_update") + .build(); + checkState(persist.updateConditional(obj, updated)); + return null; + }) + .when(metrics) + .taskUpdateRunningState(); + + clock.add(250, ChronoUnit.MILLIS); + soft.assertThat(async.doWork()).isEqualTo(1); + soft.assertThat(taskFuture).isNotDone(); + verify(metrics).taskUpdateRunningState(); + verify(metrics).taskRunningStateUpdateRace(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + clock.add(250, ChronoUnit.MILLIS); + taskCompletionStage.complete( + BasicTaskObj.builder() + .id(taskRequest.objId()) + .taskParameter(taskRequest.taskParameter()) + .taskResult(taskRequest.taskParameter() + " finished") + .taskState(TaskState.successState())); + soft.assertThat(taskFuture).isCompletedExceptionally(); + soft.assertThat(async.doWork()).isEqualTo(0); + verify(metrics).taskExecutionFinished(); + verify(metrics).taskExecutionResultRace(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + soft.assertThatThrownBy(taskFuture::get) + .cause() + .isInstanceOf(ConcurrentModificationException.class) + .hasMessageMatching( + "Failed to update successful task execution result for basic:[0-9a-f]+ in database \\(race condition\\), exposing as a failure"); + + clock.add(250, ChronoUnit.MILLIS); + soft.assertThat(async.doWork()).isEqualTo(0); + verifyNoMoreInteractions(metrics); + } + + @Test + public void successfulTaskUpdateRace() { + MutableClock clock = MutableClock.of(Instant.now(), ZoneId.of("UTC")); + TestingTasksAsync async = new TestingTasksAsync(clock); + + CompletableFuture taskCompletionStage = new CompletableFuture<>(); + + TaskServiceMetrics metrics = mock(TaskServiceMetrics.class); + TasksServiceImpl service = new TasksServiceImpl(async, metrics, tasksServiceConfig(1)); + Tasks tasks = service.forPersist(persist); + BasicTaskRequest taskRequest = basicTaskRequest("hello", () -> taskCompletionStage); + + CompletableFuture taskFuture = tasks.submit(taskRequest).toCompletableFuture(); + verify(metrics).startNewTaskController(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + soft.assertThat(async.doWork()).isEqualTo(1); + soft.assertThat(taskFuture).isNotDone(); + verify(metrics).taskAttempt(); + verify(metrics).taskCreation(); + verify(metrics).taskExecution(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + clock.add(250, ChronoUnit.MILLIS); + soft.assertThat(async.doWork()).isEqualTo(1); + soft.assertThat(taskFuture).isNotDone(); + verify(metrics).taskUpdateRunningState(); + verify(metrics).taskRunningStateUpdated(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + Mockito.doAnswer( + invocation -> { + BasicTaskObj obj = + persist.fetchTypedObj(taskRequest.objId(), BasicTaskObj.TYPE, BasicTaskObj.class); + TaskObj updated = + BasicTaskObj.builder() + .from(obj) + .versionToken(obj.versionToken() + "_concurrent_update") + .build(); + checkState(persist.updateConditional(obj, updated)); + return null; + }) + .when(metrics) + .taskExecutionFinished(); + + clock.add(250, ChronoUnit.MILLIS); + taskCompletionStage.complete( + BasicTaskObj.builder() + .id(taskRequest.objId()) + .taskParameter(taskRequest.taskParameter()) + .taskResult(taskRequest.taskParameter() + " finished") + .taskState(TaskState.successState())); + soft.assertThat(taskFuture).isCompletedExceptionally(); + soft.assertThat(async.doWork()).isEqualTo(0); + verify(metrics).taskExecutionFinished(); + verify(metrics).taskExecutionResultRace(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + soft.assertThatThrownBy(taskFuture::get) + .cause() + .isInstanceOf(ConcurrentModificationException.class) + .hasMessageMatching( + "Failed to update successful task execution result for basic:[0-9a-f]+ in database \\(race condition\\), exposing as a failure"); + + clock.add(250, ChronoUnit.MILLIS); + soft.assertThat(async.doWork()).isEqualTo(0); + verifyNoMoreInteractions(metrics); + } + + @Test + public void failedTaskUpdateRace() { + MutableClock clock = MutableClock.of(Instant.now(), ZoneId.of("UTC")); + TestingTasksAsync async = new TestingTasksAsync(clock); + + CompletableFuture taskCompletionStage = new CompletableFuture<>(); + + TaskServiceMetrics metrics = mock(TaskServiceMetrics.class); + TasksServiceImpl service = new TasksServiceImpl(async, metrics, tasksServiceConfig(1)); + Tasks tasks = service.forPersist(persist); + BasicTaskRequest taskRequest = basicTaskRequest("hello", () -> taskCompletionStage); + + CompletableFuture taskFuture = tasks.submit(taskRequest).toCompletableFuture(); + verify(metrics).startNewTaskController(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + soft.assertThat(async.doWork()).isEqualTo(1); + soft.assertThat(taskFuture).isNotDone(); + verify(metrics).taskAttempt(); + verify(metrics).taskCreation(); + verify(metrics).taskExecution(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + clock.add(250, ChronoUnit.MILLIS); + soft.assertThat(async.doWork()).isEqualTo(1); + soft.assertThat(taskFuture).isNotDone(); + verify(metrics).taskUpdateRunningState(); + verify(metrics).taskRunningStateUpdated(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + Mockito.doAnswer( + invocation -> { + BasicTaskObj obj = + persist.fetchTypedObj(taskRequest.objId(), BasicTaskObj.TYPE, BasicTaskObj.class); + TaskObj updated = + BasicTaskObj.builder() + .from(obj) + .versionToken(obj.versionToken() + "_concurrent_update") + .build(); + checkState(persist.updateConditional(obj, updated)); + return null; + }) + .when(metrics) + .taskExecutionFinished(); + + clock.add(250, ChronoUnit.MILLIS); + taskCompletionStage.completeExceptionally(new RuntimeException("foo bar")); + soft.assertThat(taskFuture).isCompletedExceptionally(); + soft.assertThat(async.doWork()).isEqualTo(0); + verify(metrics).taskExecutionFinished(); + verify(metrics).taskExecutionFailureRace(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + soft.assertThatThrownBy(taskFuture::get) + .cause() + .isInstanceOf(ConcurrentModificationException.class) + .hasMessageMatching( + "Failed to update failure task execution result for basic:[0-9a-f]+ in database \\(race condition\\)"); + + clock.add(250, ChronoUnit.MILLIS); + soft.assertThat(async.doWork()).isEqualTo(0); + verifyNoMoreInteractions(metrics); + } + + @Test + public void twoServicesDistributed() throws Exception { + MutableClock clock = MutableClock.of(Instant.now(), ZoneId.of("UTC")); + TestingTasksAsync async = new TestingTasksAsync(clock); + + CompletableFuture taskCompletionStage = new CompletableFuture<>(); + + TaskServiceMetrics metrics1 = mock(TaskServiceMetrics.class); + TaskServiceMetrics metrics2 = mock(TaskServiceMetrics.class); + + TasksServiceImpl service1 = new TasksServiceImpl(async, metrics1, tasksServiceConfig(1)); + Tasks tasks1 = service1.forPersist(persist); + TasksServiceImpl service2 = new TasksServiceImpl(async, metrics2, tasksServiceConfig(2)); + Tasks tasks2 = service2.forPersist(persist); + + BasicTaskRequest taskRequest1 = basicTaskRequest("hello", () -> taskCompletionStage); + CompletableFuture taskFuture1 = tasks1.submit(taskRequest1).toCompletableFuture(); + verify(metrics1).startNewTaskController(); + verifyNoMoreInteractions(metrics1); + reset(metrics1); + + soft.assertThat(async.doWork()).isEqualTo(1); + soft.assertThat(taskFuture1).isNotDone(); + verify(metrics1).taskAttempt(); + verify(metrics1).taskCreation(); + verify(metrics1).taskExecution(); + verifyNoMoreInteractions(metrics1); + verifyNoMoreInteractions(metrics2); + reset(metrics1); + + clock.add(250, ChronoUnit.MILLIS); + BasicTaskRequest taskRequest2 = basicTaskRequest("hello", () -> taskCompletionStage); + CompletableFuture taskFuture2 = tasks2.submit(taskRequest2).toCompletableFuture(); + verify(metrics2).startNewTaskController(); + verify(metrics2).startNewTaskController(); + verifyNoMoreInteractions(metrics2); + reset(metrics2); + + soft.assertThat(async.doWork()).isEqualTo(2); + soft.assertThat(taskFuture1).isNotDone(); + soft.assertThat(taskFuture2).isNotDone(); + verify(metrics1).taskUpdateRunningState(); + verify(metrics1).taskRunningStateUpdated(); + verifyNoMoreInteractions(metrics1); + verify(metrics2).taskAttempt(); + verify(metrics2).taskAttemptRunning(); + verifyNoMoreInteractions(metrics2); + reset(metrics1); + reset(metrics2); + + clock.add(250, ChronoUnit.MILLIS); + soft.assertThat(async.doWork()).isEqualTo(1); + soft.assertThat(taskFuture1).isNotDone(); + soft.assertThat(taskFuture2).isNotDone(); + verify(metrics1).taskUpdateRunningState(); + verify(metrics1).taskRunningStateUpdated(); + verifyNoMoreInteractions(metrics1); + verifyNoMoreInteractions(metrics2); + reset(metrics1); + reset(metrics2); + + clock.add(250, ChronoUnit.MILLIS); + taskCompletionStage.complete( + BasicTaskObj.builder() + .id(taskRequest1.objId()) + .taskParameter(taskRequest1.taskParameter()) + .taskResult(taskRequest1.taskParameter() + " finished") + .taskState(TaskState.successState())); + soft.assertThat(taskFuture1).isCompleted(); + // Clock did not advance to when the state for the task object is refreshed, so the 2nd future + // is still not done. + soft.assertThat(taskFuture2).isNotDone(); + verify(metrics1).taskExecutionFinished(); + verify(metrics1).taskExecutionResult(); + verifyNoMoreInteractions(metrics1); + verifyNoMoreInteractions(metrics2); + reset(metrics1); + reset(metrics2); + + clock.add(250, ChronoUnit.MILLIS); + soft.assertThat(async.doWork()).isEqualTo(0); + soft.assertThat(taskFuture1).isCompleted(); + // Clock did not advance to when the state for the task object is refreshed, so the 2nd future + // is still not done. + soft.assertThat(taskFuture2).isNotDone(); + verifyNoMoreInteractions(metrics1); + verifyNoMoreInteractions(metrics2); + reset(metrics1); + reset(metrics2); + + clock.add(FRESH_RUNNING_RETRY_NOT_BEFORE); + soft.assertThat(async.doWork()).isEqualTo(1); + soft.assertThat(taskFuture1).isCompleted(); + // Clock advance enough to refresh, so the 2nd future is done now. + soft.assertThat(taskFuture2).isCompleted(); + verifyNoMoreInteractions(metrics1); + verify(metrics2).taskAttempt(); + verify(metrics2).taskAttemptFinalSuccess(); + verifyNoMoreInteractions(metrics2); + reset(metrics1); + reset(metrics2); + + clock.add(250, ChronoUnit.MILLIS); + soft.assertThat(async.doWork()).isEqualTo(0); + verifyNoMoreInteractions(metrics1); + verifyNoMoreInteractions(metrics2); + + clock.add(1, ChronoUnit.SECONDS); + soft.assertThat(async.doWork()).isEqualTo(0); + verifyNoMoreInteractions(metrics1); + verifyNoMoreInteractions(metrics2); + + soft.assertThat(taskFuture1.get()) + .asInstanceOf(type(BasicTaskObj.class)) + .isEqualTo(taskFuture2.get()) + .extracting(BasicTaskObj::taskResult) + .isEqualTo("hello finished"); + } + + @Test + public void singleServiceSingleConsumerRetryableError() throws Exception { + MutableClock clock = MutableClock.of(Instant.now(), ZoneId.of("UTC")); + TestingTasksAsync async = new TestingTasksAsync(clock); + + AtomicReference> taskCompletionStage = + new AtomicReference<>(new CompletableFuture<>()); + + TaskServiceMetrics metrics = mock(TaskServiceMetrics.class); + TasksServiceImpl service = new TasksServiceImpl(async, metrics, tasksServiceConfig(1)); + Tasks tasks = service.forPersist(persist); + BasicTaskRequest taskRequest = basicTaskRequest("hello", taskCompletionStage::get); + + CompletableFuture taskFuture = tasks.submit(taskRequest).toCompletableFuture(); + verify(metrics).startNewTaskController(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + // 2nd request for the same task (would fail w/ the `null` completion-stage) + CompletableFuture taskFuture2 = + tasks.submit(basicTaskRequest("hello", () -> null)).toCompletableFuture(); + soft.assertThat(taskFuture2).isSameAs(taskFuture); + + soft.assertThat(async.doWork()).isEqualTo(1); + soft.assertThat(taskFuture).isNotDone(); + verify(metrics).taskAttempt(); + verify(metrics).taskCreation(); + verify(metrics).taskExecution(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + clock.add(250, ChronoUnit.MILLIS); + soft.assertThat(async.doWork()).isEqualTo(1); + soft.assertThat(taskFuture).isNotDone(); + verify(metrics).taskUpdateRunningState(); + verify(metrics).taskRunningStateUpdated(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + clock.add(250, ChronoUnit.MILLIS); + taskCompletionStage.get().completeExceptionally(new RetryableException("retryable")); + verify(metrics).taskExecutionFinished(); + verify(metrics).taskExecutionRetryableError(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + soft.assertThat(taskFuture).isNotDone(); + soft.assertThat(async.doWork()).isEqualTo(0); + verifyNoMoreInteractions(metrics); + + clock.add(RETRYABLE_ERROR_NOT_BEFORE); + + // need a new CompletableFuture that can be used when the task's execution is re-triggered + taskCompletionStage.set(new CompletableFuture<>()); + + soft.assertThat(taskFuture).isNotDone(); + soft.assertThat(async.doWork()).isEqualTo(1); + verify(metrics).taskAttempt(); + verify(metrics).taskAttemptErrorRetry(); + verify(metrics).taskRetryStateChangeSucceeded(); + verify(metrics).taskExecution(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + clock.add(250, ChronoUnit.MILLIS); + soft.assertThat(async.doWork()).isEqualTo(1); + verify(metrics).taskUpdateRunningState(); + verify(metrics).taskRunningStateUpdated(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + taskCompletionStage + .get() + .complete( + BasicTaskObj.builder() + .id(taskRequest.objId()) + .taskParameter(taskRequest.taskParameter()) + .taskResult(taskRequest.taskParameter() + " finished") + .taskState(TaskState.successState())); + verify(metrics).taskExecutionFinished(); + verify(metrics).taskExecutionResult(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + clock.add(1, ChronoUnit.SECONDS); + soft.assertThat(async.doWork()).isEqualTo(0); + verifyNoMoreInteractions(metrics); + + soft.assertThat(taskFuture.get()) + .asInstanceOf(type(BasicTaskObj.class)) + .extracting(BasicTaskObj::taskResult) + .isEqualTo("hello finished"); + + clock.add(1, ChronoUnit.SECONDS); + soft.assertThat(async.doWork()).isEqualTo(0); + verifyNoMoreInteractions(metrics); + } + + @Test + public void twoServicesDistributedTaskFailure() { + MutableClock clock = MutableClock.of(Instant.now(), ZoneId.of("UTC")); + TestingTasksAsync async1 = new TestingTasksAsync(clock); + TestingTasksAsync async2 = new TestingTasksAsync(clock); + + CompletableFuture taskCompletionStage = new CompletableFuture<>(); + + TaskServiceMetrics metrics1 = mock(TaskServiceMetrics.class); + TaskServiceMetrics metrics2 = mock(TaskServiceMetrics.class); + + TasksServiceImpl service1 = new TasksServiceImpl(async1, metrics1, tasksServiceConfig(1)); + Tasks tasks1 = service1.forPersist(persist); + TasksServiceImpl service2 = new TasksServiceImpl(async2, metrics2, tasksServiceConfig(2)); + Tasks tasks2 = service2.forPersist(persist); + + BasicTaskRequest taskRequest1 = basicTaskRequest("hello", () -> taskCompletionStage); + CompletableFuture taskFuture1 = tasks1.submit(taskRequest1).toCompletableFuture(); + verify(metrics1).startNewTaskController(); + verifyNoMoreInteractions(metrics1); + reset(metrics1); + + soft.assertThat(async1.doWork()).isEqualTo(1); + soft.assertThat(async2.doWork()).isEqualTo(0); + soft.assertThat(taskFuture1).isNotDone(); + verify(metrics1).taskAttempt(); + verify(metrics1).taskCreation(); + verify(metrics1).taskExecution(); + verifyNoMoreInteractions(metrics1); + verifyNoMoreInteractions(metrics2); + reset(metrics1); + reset(metrics2); + + clock.add(250, ChronoUnit.MILLIS); + BasicTaskRequest taskRequest2 = basicTaskRequest("hello", () -> taskCompletionStage); + CompletableFuture taskFuture2 = tasks2.submit(taskRequest2).toCompletableFuture(); + verify(metrics2).startNewTaskController(); + verifyNoMoreInteractions(metrics2); + reset(metrics2); + + soft.assertThat(async1.doWork()).isEqualTo(1); + soft.assertThat(async2.doWork()).isEqualTo(1); + soft.assertThat(taskFuture1).isNotDone(); + soft.assertThat(taskFuture2).isNotDone(); + verify(metrics1).taskUpdateRunningState(); + verify(metrics1).taskRunningStateUpdated(); + verifyNoMoreInteractions(metrics1); + verify(metrics2).taskAttempt(); + verify(metrics2).taskAttemptRunning(); + verifyNoMoreInteractions(metrics2); + reset(metrics1); + reset(metrics2); + + clock.add(250, ChronoUnit.MILLIS); + soft.assertThat(async1.doWork()).isEqualTo(1); + soft.assertThat(async2.doWork()).isEqualTo(0); + soft.assertThat(taskFuture1).isNotDone(); + soft.assertThat(taskFuture2).isNotDone(); + verify(metrics1).taskUpdateRunningState(); + verify(metrics1).taskRunningStateUpdated(); + verifyNoMoreInteractions(metrics1); + verifyNoMoreInteractions(metrics2); + reset(metrics1); + + clock.add(250, ChronoUnit.MILLIS); + taskCompletionStage.completeExceptionally(new RuntimeException("failed task")); + soft.assertThat(taskFuture1).isCompletedExceptionally(); + // Clock did not advance to when the state for the task object is refreshed, so the 2nd future + // is still not done. + soft.assertThat(taskFuture2).isNotDone(); + verify(metrics1).taskExecutionFinished(); + verify(metrics1).taskExecutionFailure(); + verifyNoMoreInteractions(metrics1); + verifyNoMoreInteractions(metrics2); + reset(metrics1); + + clock.add(250, ChronoUnit.MILLIS); + soft.assertThat(async1.doWork()).isEqualTo(0); + soft.assertThat(async2.doWork()).isEqualTo(0); + soft.assertThat(taskFuture1).isCompletedExceptionally(); + // Clock did not advance to when the state for the task object is refreshed, so the 2nd future + // is still not done. + soft.assertThat(taskFuture2).isNotDone(); + verifyNoMoreInteractions(metrics1); + verifyNoMoreInteractions(metrics2); + + clock.add(1000, ChronoUnit.MILLIS); + soft.assertThat(async1.doWork()).isEqualTo(0); + soft.assertThat(async2.doWork()).isEqualTo(0); + soft.assertThat(taskFuture1).isCompletedExceptionally(); + // Clock did not advance to when the state for the task object is refreshed, so the 2nd future + // is still not done. + soft.assertThat(taskFuture2).isNotDone(); + verifyNoMoreInteractions(metrics1); + verifyNoMoreInteractions(metrics2); + + clock.add(250, ChronoUnit.MILLIS); + soft.assertThat(async1.doWork()).isEqualTo(0); + soft.assertThat(async2.doWork()).isEqualTo(1); + // Clock advance enough to refresh, so the 2nd future is done now. + soft.assertThat(taskFuture2).isCompletedExceptionally(); + verifyNoMoreInteractions(metrics1); + verify(metrics2).taskAttempt(); + verify(metrics2).taskAttemptFinalFailure(); + verifyNoMoreInteractions(metrics2); + reset(metrics2); + + clock.add(1, ChronoUnit.SECONDS); + soft.assertThat(async1.doWork()).isEqualTo(0); + soft.assertThat(async2.doWork()).isEqualTo(0); + verifyNoMoreInteractions(metrics1); + verifyNoMoreInteractions(metrics2); + + soft.assertThatThrownBy(taskFuture2::get) + .isInstanceOf(ExecutionException.class) + .cause() + // This exception is generated via TaskBehavior.stateAsException() + .isInstanceOf(Exception.class) + .hasMessage("java.lang.RuntimeException: failed task"); + } + + @Test + public void twoServicesDistributedLostTask() { + MutableClock clock = MutableClock.of(Instant.now(), ZoneId.of("UTC")); + TestingTasksAsync async1 = new TestingTasksAsync(clock); + TestingTasksAsync async2 = new TestingTasksAsync(clock); + + CompletableFuture taskCompletionStage1 = new CompletableFuture<>(); + CompletableFuture taskCompletionStage2 = new CompletableFuture<>(); + + TaskServiceMetrics metrics1 = mock(TaskServiceMetrics.class); + TaskServiceMetrics metrics2 = mock(TaskServiceMetrics.class); + + TasksServiceImpl service1 = new TasksServiceImpl(async1, metrics1, tasksServiceConfig(1)); + Tasks tasks1 = service1.forPersist(persist); + TasksServiceImpl service2 = new TasksServiceImpl(async2, metrics2, tasksServiceConfig(2)); + Tasks tasks2 = service2.forPersist(persist); + + BasicTaskRequest taskRequest1 = basicTaskRequest("hello", () -> taskCompletionStage1); + CompletableFuture taskFuture1 = tasks1.submit(taskRequest1).toCompletableFuture(); + verify(metrics1).startNewTaskController(); + verifyNoMoreInteractions(metrics1); + reset(metrics1); + + soft.assertThat(async1.doWork()).isEqualTo(1); + soft.assertThat(async2.doWork()).isEqualTo(0); + soft.assertThat(taskFuture1).isNotDone(); + verify(metrics1).taskAttempt(); + verify(metrics1).taskCreation(); + verify(metrics1).taskExecution(); + verifyNoMoreInteractions(metrics1); + verifyNoMoreInteractions(metrics2); + reset(metrics1); + + clock.add(250, ChronoUnit.MILLIS); + BasicTaskRequest taskRequest2 = basicTaskRequest("hello", () -> taskCompletionStage2); + CompletableFuture taskFuture2 = tasks2.submit(taskRequest2).toCompletableFuture(); + verify(metrics2).startNewTaskController(); + verifyNoMoreInteractions(metrics2); + reset(metrics2); + + soft.assertThat(async1.doWork()).isEqualTo(1); + soft.assertThat(async2.doWork()).isEqualTo(1); + soft.assertThat(taskFuture1).isNotDone(); + soft.assertThat(taskFuture2).isNotDone(); + verify(metrics1).taskUpdateRunningState(); + verify(metrics1).taskRunningStateUpdated(); + verifyNoMoreInteractions(metrics1); + verify(metrics2).taskAttempt(); + verify(metrics2).taskAttemptRunning(); + verifyNoMoreInteractions(metrics2); + reset(metrics1); + reset(metrics2); + + clock.add(250, ChronoUnit.MILLIS); + soft.assertThat(async1.doWork()).isEqualTo(1); + soft.assertThat(async2.doWork()).isEqualTo(0); + soft.assertThat(taskFuture1).isNotDone(); + soft.assertThat(taskFuture2).isNotDone(); + verify(metrics1).taskUpdateRunningState(); + verify(metrics1).taskRunningStateUpdated(); + verifyNoMoreInteractions(metrics1); + verifyNoMoreInteractions(metrics2); + reset(metrics1); + + // Simulate that service #1 died --> do not run any more tasks there + + clock.add(250, ChronoUnit.MILLIS); + soft.assertThat(taskFuture2).isNotDone(); + + clock.add(FRESH_LOST_RETRY_NOT_BEFORE); + soft.assertThat(async2.doWork()).isEqualTo(1); + verify(metrics2).taskAttempt(); + verify(metrics2).taskAttemptRunning(); + verify(metrics2).taskLossDetected(); + verify(metrics2).taskLostReassigned(); + verify(metrics2).taskExecution(); + verifyNoMoreInteractions(metrics2); + reset(metrics2); + + clock.add(250, ChronoUnit.MILLIS); + soft.assertThat(async2.doWork()).isEqualTo(1); + soft.assertThat(taskFuture2).isNotDone(); + verify(metrics2).taskUpdateRunningState(); + verify(metrics2).taskRunningStateUpdated(); + verifyNoMoreInteractions(metrics2); + reset(metrics2); + + taskCompletionStage2.complete( + BasicTaskObj.builder() + .id(taskRequest2.objId()) + .taskParameter(taskRequest2.taskParameter()) + .taskResult(taskRequest2.taskParameter() + " finished") + .taskState(TaskState.successState())); + verify(metrics2).taskExecutionFinished(); + verify(metrics2).taskExecutionResult(); + verifyNoMoreInteractions(metrics2); + reset(metrics2); + + clock.add(250, ChronoUnit.MILLIS); + soft.assertThat(async2.doWork()).isEqualTo(0); + soft.assertThat(taskFuture2).isDone(); + verifyNoMoreInteractions(metrics1); + verifyNoMoreInteractions(metrics2); + } + + static TasksServiceConfig tasksServiceConfig(int inst) { + return TasksServiceConfig.tasksServiceConfig( + "instance#" + inst, DEFAULT_RACE_WAIT_MILLIS_MIN, DEFAULT_RACE_WAIT_MILLIS_MAX); + } +} diff --git a/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/impl/TestingTasksAsync.java b/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/impl/TestingTasksAsync.java new file mode 100644 index 00000000000..5d8d28d28e7 --- /dev/null +++ b/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/impl/TestingTasksAsync.java @@ -0,0 +1,127 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.service.impl; + +import java.time.Clock; +import java.time.Instant; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Supplier; +import org.projectnessie.nessie.tasks.async.TasksAsync; + +/** + * {@link TasksAsync} implementation for testing purposes. Collects tasks to run in an ordered + * collection, tasks are run by calling {@link #doWork()}. There are no threads nor is there any + * other concurrent code involved. Maybe (not) surprisingly, this class is not thread safe. + */ +public class TestingTasksAsync implements TasksAsync { + + private final Queue scheduledTasks = new PriorityQueue<>(); + + private final Clock clock; + + public TestingTasksAsync(Clock clock) { + this.clock = clock; + } + + public int doWork() { + int tasks = 0; + Instant now = clock.instant(); + while (true) { + ScheduledTask t; + t = scheduledTasks.peek(); + if (t == null || t.runAt.compareTo(now) > 0) { + break; + } + scheduledTasks.remove(t); + if (t.cancelled) { + continue; + } + tasks++; + try { + t.completable.complete(t.runnable.get()); + } catch (Throwable ex) { + t.completable.completeExceptionally(ex); + } + } + return tasks; + } + + @Override + public Clock clock() { + return clock; + } + + @SuppressWarnings("unchecked") + @Override + public CompletionStage supply(Supplier supplier) { + ScheduledTask task = new ScheduledTask(Instant.EPOCH, (Supplier) supplier); + scheduledTasks.add(task); + return (CompletionStage) task.completable; + } + + @Override + public CompletionStage schedule(Runnable runnable, Instant scheduleNotBefore) { + ScheduledTask task = + new ScheduledTask( + scheduleNotBefore, + () -> { + runnable.run(); + return null; + }); + scheduledTasks.add(task); + task.completable.whenComplete( + (v, t) -> { + if (t instanceof CancellationException) { + task.cancelled = true; + } + }); + return task.completionStage(); + } + + static final class ScheduledTask implements Comparable { + private final Instant runAt; + private final Supplier runnable; + private final long random; + volatile boolean cancelled; + private final CompletableFuture completable; + + ScheduledTask(Instant runAt, Supplier runnable) { + this.runAt = runAt; + this.runnable = runnable; + this.random = ThreadLocalRandom.current().nextLong(); + this.completable = new CompletableFuture<>(); + } + + @Override + public int compareTo(ScheduledTask o) { + int c = runAt.compareTo(o.runAt); + if (c != 0) { + return c; + } + return Long.compare(random, o.random); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + CompletionStage completionStage() { + return (CompletionStage) completable; + } + } +} diff --git a/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/tasktypes/BasicTaskBehavior.java b/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/tasktypes/BasicTaskBehavior.java new file mode 100644 index 00000000000..5723bc64f3a --- /dev/null +++ b/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/tasktypes/BasicTaskBehavior.java @@ -0,0 +1,89 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.service.tasktypes; + +import static org.projectnessie.nessie.tasks.api.TaskState.failureState; +import static org.projectnessie.nessie.tasks.api.TaskState.retryableErrorState; +import static org.projectnessie.nessie.tasks.api.TaskState.runningState; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.time.temporal.TemporalAmount; +import org.projectnessie.nessie.tasks.api.TaskBehavior; +import org.projectnessie.nessie.tasks.api.TaskState; +import org.projectnessie.versioned.storage.common.persist.ObjType; + +public class BasicTaskBehavior implements TaskBehavior { + + public static final TemporalAmount FRESH_RUNNING_RETRY_NOT_BEFORE = + Duration.of(2, ChronoUnit.SECONDS); + public static final TemporalAmount FRESH_LOST_RETRY_NOT_BEFORE = + Duration.of(1, ChronoUnit.MINUTES); + public static final TemporalAmount RETRYABLE_ERROR_NOT_BEFORE = + Duration.of(5, ChronoUnit.SECONDS); + public static final TemporalAmount RUNNING_UPDATE_INTERVAL = Duration.of(250, ChronoUnit.MILLIS); + + public static final BasicTaskBehavior INSTANCE = new BasicTaskBehavior(); + + public BasicTaskBehavior() {} + + @Override + public Throwable stateAsException(BasicTaskObj obj) { + return new Exception(obj.taskState().message()); + } + + @Override + public Instant performRunningStateUpdateAt(Clock clock, BasicTaskObj running) { + return clock.instant().plus(RUNNING_UPDATE_INTERVAL); + } + + @Override + public TaskState asErrorTaskState(Clock clock, BasicTaskObj base, Throwable t) { + if (t instanceof RetryableException) { + return retryableErrorState(retryableErrorNotBefore(clock), t.getMessage()); + } + return failureState(t.toString()); + } + + @Override + public BasicTaskObj.Builder newObjBuilder() { + return ImmutableBasicTaskObj.builder(); + } + + @Override + public TaskState runningTaskState(Clock clock, BasicTaskObj running) { + return runningState(freshRunningRetryNotBefore(clock), freshLostRetryNotBefore(clock)); + } + + @Override + public ObjType objType() { + return BasicTaskObj.TYPE; + } + + private Instant freshRunningRetryNotBefore(Clock clock) { + return clock.instant().plus(FRESH_RUNNING_RETRY_NOT_BEFORE); + } + + private Instant freshLostRetryNotBefore(Clock clock) { + return clock.instant().plus(FRESH_LOST_RETRY_NOT_BEFORE); + } + + private Instant retryableErrorNotBefore(Clock clock) { + return clock.instant().plus(RETRYABLE_ERROR_NOT_BEFORE); + } +} diff --git a/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/tasktypes/BasicTaskObj.java b/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/tasktypes/BasicTaskObj.java new file mode 100644 index 00000000000..5d0ba82e021 --- /dev/null +++ b/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/tasktypes/BasicTaskObj.java @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.service.tasktypes; + +import static org.projectnessie.versioned.storage.common.objtypes.CustomObjType.dynamicCaching; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import jakarta.annotation.Nullable; +import org.immutables.value.Value; +import org.projectnessie.nessie.tasks.api.TaskObj; +import org.projectnessie.nessie.tasks.api.TaskState; +import org.projectnessie.versioned.storage.common.persist.ObjId; +import org.projectnessie.versioned.storage.common.persist.ObjType; + +@Value.Immutable +@JsonSerialize(as = ImmutableBasicTaskObj.class) +@JsonDeserialize(as = ImmutableBasicTaskObj.class) +public interface BasicTaskObj extends TaskObj { + ObjType TYPE = + dynamicCaching("basic", "basic", BasicTaskObj.class, TaskObj.taskDefaultCacheExpire()); + + @Override + @Value.Default + default ObjType type() { + return TYPE; + } + + String taskParameter(); + + @Nullable + String taskResult(); + + interface Builder extends TaskObj.Builder { + + @Override + Builder id(ObjId id); + + @CanIgnoreReturnValue + Builder taskParameter(String taskParameter); + + @CanIgnoreReturnValue + Builder taskResult(String taskResult); + + @Override + @CanIgnoreReturnValue + Builder taskState(TaskState taskState); + + BasicTaskObj build(); + } + + static Builder builder() { + return ImmutableBasicTaskObj.builder(); + } +} diff --git a/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/tasktypes/BasicTaskObjTypeBundle.java b/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/tasktypes/BasicTaskObjTypeBundle.java new file mode 100644 index 00000000000..9137c314c13 --- /dev/null +++ b/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/tasktypes/BasicTaskObjTypeBundle.java @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.service.tasktypes; + +import java.util.function.Consumer; +import org.projectnessie.versioned.storage.common.persist.ObjType; +import org.projectnessie.versioned.storage.common.persist.ObjTypeBundle; + +public class BasicTaskObjTypeBundle implements ObjTypeBundle { + @Override + public void register(Consumer registrar) { + registrar.accept(BasicTaskObj.TYPE); + } +} diff --git a/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/tasktypes/BasicTaskRequest.java b/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/tasktypes/BasicTaskRequest.java new file mode 100644 index 00000000000..5d7784d9802 --- /dev/null +++ b/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/tasktypes/BasicTaskRequest.java @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.service.tasktypes; + +import static org.projectnessie.nessie.tasks.service.tasktypes.BasicTaskObj.TYPE; + +import java.util.concurrent.CompletionStage; +import java.util.function.Supplier; +import org.immutables.value.Value; +import org.projectnessie.nessie.tasks.api.TaskBehavior; +import org.projectnessie.nessie.tasks.api.TaskRequest; +import org.projectnessie.versioned.storage.common.persist.ObjId; +import org.projectnessie.versioned.storage.common.persist.ObjIdHasher; +import org.projectnessie.versioned.storage.common.persist.ObjType; + +@Value.Immutable +public interface BasicTaskRequest extends TaskRequest { + @Override + @Value.NonAttribute + default ObjType objType() { + return TYPE; + } + + @Value.Parameter(order = 1) + String taskParameter(); + + @Value.Parameter(order = 2) + ObjId objId(); + + @Value.Parameter(order = 3) + Supplier> taskCompletionStageSupplier(); + + @Override + @Value.Parameter(order = 4) + TaskBehavior behavior(); + + @Override + @Value.NonAttribute + default CompletionStage submitExecution() { + return taskCompletionStageSupplier().get(); + } + + @Override + default BasicTaskObj.Builder applyRequestToObjBuilder(BasicTaskObj.Builder builder) { + return builder.taskParameter(taskParameter()); + } + + static BasicTaskRequest basicTaskRequest( + String taskParameter, + Supplier> taskCompletionStageSupplier) { + ObjId objId = ObjIdHasher.objIdHasher(TYPE.name()).hash(taskParameter).generate(); + return ImmutableBasicTaskRequest.of( + taskParameter, objId, taskCompletionStageSupplier, BasicTaskBehavior.INSTANCE); + } +} diff --git a/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/tasktypes/RetryableException.java b/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/tasktypes/RetryableException.java new file mode 100644 index 00000000000..39c93db60c8 --- /dev/null +++ b/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/tasktypes/RetryableException.java @@ -0,0 +1,22 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.nessie.tasks.service.tasktypes; + +public class RetryableException extends Exception { + public RetryableException(String message) { + super(message); + } +} diff --git a/tasks/service/impl/src/test/resources/META-INF/services/org.projectnessie.versioned.storage.common.persist.ObjTypeBundle b/tasks/service/impl/src/test/resources/META-INF/services/org.projectnessie.versioned.storage.common.persist.ObjTypeBundle new file mode 100644 index 00000000000..59c064923cc --- /dev/null +++ b/tasks/service/impl/src/test/resources/META-INF/services/org.projectnessie.versioned.storage.common.persist.ObjTypeBundle @@ -0,0 +1,17 @@ +# +# Copyright (C) 2024 Dremio +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.projectnessie.nessie.tasks.service.tasktypes.BasicTaskObjTypeBundle diff --git a/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/CachingPersistImpl.java b/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/CachingPersistImpl.java index 090837d5ce2..49c588151b8 100644 --- a/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/CachingPersistImpl.java +++ b/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/CachingPersistImpl.java @@ -62,6 +62,11 @@ public Obj fetchObj(@Nonnull ObjId id) throws ObjNotFoundException { } } + @Override + public Obj getImmediate(@Nonnull ObjId id) { + return cache.get(id); + } + @Override @Nonnull public T fetchTypedObj(@Nonnull ObjId id, ObjType type, Class typeClass) diff --git a/versioned/storage/cache/src/test/java/org/projectnessie/versioned/storage/cache/TestCachingInmemoryPersist.java b/versioned/storage/cache/src/test/java/org/projectnessie/versioned/storage/cache/TestCachingInmemoryPersist.java index 149c51f983b..a14eb433a7b 100644 --- a/versioned/storage/cache/src/test/java/org/projectnessie/versioned/storage/cache/TestCachingInmemoryPersist.java +++ b/versioned/storage/cache/src/test/java/org/projectnessie/versioned/storage/cache/TestCachingInmemoryPersist.java @@ -15,8 +15,45 @@ */ package org.projectnessie.versioned.storage.cache; +import static org.projectnessie.versioned.storage.common.objtypes.ContentValueObj.contentValue; +import static org.projectnessie.versioned.storage.common.persist.ObjId.randomObjId; +import static org.projectnessie.versioned.storage.commontests.AbstractBasePersistTests.randomContentId; + +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.projectnessie.nessie.relocated.protobuf.ByteString; +import org.projectnessie.versioned.storage.common.persist.Obj; +import org.projectnessie.versioned.storage.common.persist.Persist; import org.projectnessie.versioned.storage.commontests.AbstractPersistTests; +import org.projectnessie.versioned.storage.testextension.NessiePersist; import org.projectnessie.versioned.storage.testextension.NessiePersistCache; +import org.projectnessie.versioned.storage.testextension.PersistExtension; @NessiePersistCache -public class TestCachingInmemoryPersist extends AbstractPersistTests {} +public class TestCachingInmemoryPersist extends AbstractPersistTests { + + @Nested + @ExtendWith({PersistExtension.class, SoftAssertionsExtension.class}) + public class CacheSpecific { + @InjectSoftAssertions protected SoftAssertions soft; + + @NessiePersist protected Persist persist; + + @Test + public void getImmediate() throws Exception { + Obj obj = contentValue(randomObjId(), randomContentId(), 1, ByteString.copyFromUtf8("hello")); + soft.assertThat(persist.getImmediate(obj.id())).isNull(); + persist.storeObj(obj); + soft.assertThat(persist.getImmediate(obj.id())).isEqualTo(obj); + persist.deleteObj(obj.id()); + soft.assertThat(persist.getImmediate(obj.id())).isNull(); + persist.storeObj(obj); + persist.fetchObj(obj.id()); + soft.assertThat(persist.getImmediate(obj.id())).isEqualTo(obj); + } + } +} diff --git a/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/persist/ObservingPersist.java b/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/persist/ObservingPersist.java index cbdc51e0fc2..14b6d32da19 100644 --- a/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/persist/ObservingPersist.java +++ b/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/persist/ObservingPersist.java @@ -129,6 +129,14 @@ public Obj fetchObj(@Nonnull ObjId id) throws ObjNotFoundException { return delegate.fetchObj(id); } + @WithSpan + @Override + @Counted(PREFIX) + @Timed(value = PREFIX, histogram = true) + public Obj getImmediate(@Nonnull ObjId id) { + return delegate.getImmediate(id); + } + @WithSpan @Override @Counted(PREFIX) diff --git a/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/persist/Persist.java b/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/persist/Persist.java index 091fd920b3e..b48ba6f672b 100644 --- a/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/persist/Persist.java +++ b/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/persist/Persist.java @@ -155,6 +155,10 @@ Reference updateReferencePointer(@Nonnull Reference reference, @Nonnull ObjId ne @Nonnull Obj fetchObj(@Nonnull ObjId id) throws ObjNotFoundException; + default Obj getImmediate(@Nonnull ObjId id) { + return null; + } + /** * Retrieves the object with ID {@code id}, having the same {@link ObjType type}. * diff --git a/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/PersistDelegate.java b/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/PersistDelegate.java index 82a32ff42b3..b053a001d70 100644 --- a/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/PersistDelegate.java +++ b/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/PersistDelegate.java @@ -108,6 +108,11 @@ public Obj fetchObj(@Nonnull ObjId id) throws ObjNotFoundException { return delegate.fetchObj(id); } + @Override + public Obj getImmediate(@Nonnull ObjId id) { + return delegate.getImmediate(id); + } + @Override @Nonnull public T fetchTypedObj(@Nonnull ObjId id, ObjType type, Class typeClass)