Skip to content

Commit

Permalink
Coordinated Tasks (#7947)
Browse files Browse the repository at this point in the history
Adds an API and SPI to coordinate possibly long-running work to build and persist an `Obj`.

The API is pretty much just this:
```java
Persist persist; // provided
TasksService service; // provided
Tasks tasks = service.forPersist(persist);

SomeTaskRequest taskRequest; // specific for each TaskType/ObjType

CompletionStage<SomeTaskObj> finalResult = tasks.submit(taskRequest);
```

The returned `CompletionStage` can be chained in other `CompletionStage`s/`CompletableFuture`s and other implementations like Mutiny's `Uni` or Vert.X's `Future`.

The implementation ensures that only one "worker job" runs for each task, even across multiple Nessie instances.

Each task is identified using an implementation of a `TaskRequest` instance. Each persisted `TaskObj` has a state, which can be a final state (no more changes happening) like SUCCESS and FAILURE, or a transient state like RUNNING or ERROR_RETRY. WRT final states have no cache expiration, but tranient states have a task-specific cache expiration.

Recent changes for conditional-updates and "dynamic cache expiration" were prerequisites for this change.
  • Loading branch information
snazy authored Jan 22, 2024
1 parent 861e538 commit 61bb1f0
Show file tree
Hide file tree
Showing 43 changed files with 4,104 additions and 1 deletion.
3 changes: 3 additions & 0 deletions bom/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ dependencies {
api(project(":nessie-server-store-proto"))
api(project(":nessie-content-generator"))
api(project(":nessie-protobuf-relocated"))
api(project(":nessie-tasks-api"))
api(project(":nessie-tasks-service-async"))
api(project(":nessie-tasks-service-impl"))
api(project(":nessie-versioned-spi"))
api(project(":nessie-versioned-storage-batching"))
api(project(":nessie-versioned-storage-bigtable"))
Expand Down
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ maven-resolver-provider = { module = "org.apache.maven:maven-resolver-provider",
maven-resolver-transport-file = { module = "org.apache.maven.resolver:maven-resolver-transport-file", version.ref = "mavenResolver" }
maven-resolver-transport-http = { module = "org.apache.maven.resolver:maven-resolver-transport-http", version.ref = "mavenResolver" }
micrometer-core = { module = "io.micrometer:micrometer-core", version = "1.12.2" }
microprofile-contextpropagation-api = { module = "org.eclipse.microprofile.context-propagation:microprofile-context-propagation-api", version = "1.3" }
microprofile-openapi = { module = "org.eclipse.microprofile.openapi:microprofile-openapi-api", version = "3.1.1" }
mockito-core = { module = "org.mockito:mockito-core", version.ref = "mockito" }
mockito-junit-jupiter = { module = "org.mockito:mockito-junit-jupiter", version.ref = "mockito" }
Expand Down Expand Up @@ -169,6 +170,7 @@ testcontainers-keycloak = { module = "com.github.dasniko:testcontainers-keycloak
threeten-extra = { module = "org.threeten:threeten-extra", version = "1.7.2" }
undertow-core = { module = "io.undertow:undertow-core", version.ref = "undertow" }
undertow-servlet = { module = "io.undertow:undertow-servlet", version.ref = "undertow" }
vertx-core = { module = "io.vertx:vertx-core", version = "4.5.1" }
weld-se-core = { module = "org.jboss.weld.se:weld-se-core", version = "5.1.2.Final" }
wiremock = { module = "com.github.tomakehurst:wiremock-jre8-standalone", version = "2.35.1" }

Expand Down
3 changes: 3 additions & 0 deletions gradle/projects.main.properties
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ nessie-server-store=servers/store
nessie-server-store-proto=servers/store-proto
nessie-content-generator=tools/content-generator
nessie-protobuf-relocated=tools/protobuf-relocated
nessie-tasks-api=tasks/api
nessie-tasks-service-async=tasks/service/async
nessie-tasks-service-impl=tasks/service/impl
nessie-versioned-spi=versioned/spi
nessie-versioned-storage-batching=versioned/storage/batching
nessie-versioned-storage-bigtable=versioned/storage/bigtable
Expand Down
59 changes: 59 additions & 0 deletions tasks/api/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (C) 2022 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins {
id("nessie-conventions-server")
id("nessie-jacoco")
alias(libs.plugins.jmh)
}

extra["maven.name"] = "Nessie - Tasks - API"

dependencies {
implementation(project(":nessie-versioned-storage-common"))

compileOnly(libs.jakarta.validation.api)
compileOnly(libs.jakarta.annotation.api)

compileOnly(libs.errorprone.annotations)
implementation(libs.guava)
implementation(libs.slf4j.api)

compileOnly(libs.immutables.builder)
compileOnly(libs.immutables.value.annotations)
annotationProcessor(libs.immutables.value.processor)

implementation(platform(libs.jackson.bom))
implementation("com.fasterxml.jackson.core:jackson-databind")
implementation("com.fasterxml.jackson.core:jackson-annotations")

testImplementation(platform(libs.junit.bom))
testImplementation(libs.bundles.junit.testing)

testImplementation(project(":nessie-versioned-storage-testextension"))
testImplementation(project(":nessie-versioned-storage-inmemory"))
testRuntimeOnly(libs.logback.classic)

jmhImplementation(libs.jmh.core)
jmhImplementation(project(":nessie-versioned-storage-common-tests"))
jmhAnnotationProcessor(libs.jmh.generator.annprocess)
}

tasks.named("processJmhJandexIndex").configure { enabled = false }

tasks.named("processTestJandexIndex").configure { enabled = false }

jmh { jmhVersion.set(libs.versions.jmh.get()) }
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (C) 2024 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.nessie.tasks.api;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import java.io.IOException;
import java.time.Instant;

public final class JacksonSerializers {
private JacksonSerializers() {}

public static final class InstantAsLongDeserializer extends StdDeserializer<Instant> {
public InstantAsLongDeserializer() {
super(Instant.class);
}

@Override
public Instant deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
if (p.currentToken().isNumeric()) {
long millis = p.getValueAsLong();
return Instant.ofEpochMilli(millis);
}
return null;
}
}

public static final class InstantAsLongSerializer extends StdSerializer<Instant> {
public InstantAsLongSerializer() {
super(Instant.class);
}

@Override
public void serialize(Instant value, JsonGenerator gen, SerializerProvider serializers)
throws IOException {
if (value == null) {
gen.writeNull();
} else {
gen.writeNumber(value.toEpochMilli());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (C) 2024 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.nessie.tasks.api;

import static org.projectnessie.nessie.tasks.api.TaskState.failureState;

import java.time.Clock;
import java.time.Instant;
import org.projectnessie.versioned.storage.common.persist.ObjType;

/**
* Defines the behavior of tasks.
*
* <p>This included the calculation or retry timestamps and lost-task timeouts and mapping between
* exceptions and task state.
*/
public interface TaskBehavior<T extends TaskObj, B extends TaskObj.Builder> {

/**
* Convert the task state as an exception, if the state represents an error or failure state. This
* is used to transform a persisted {@linkplain TaskStatus#FAILURE failure status} to a Java
* exception for local callers.
*/
Throwable stateAsException(T obj);

/**
* Retrieve the timestamp when the next running-state update that refreshes the {@link
* TaskState#retryNotBefore() retry-not-before} and {@link TaskState#lostNotBefore()}
* lost-not-before} timestamps in the database, shall happen. The returned timestamp must be
* earlier than any of these "not-before" timestamps and defines when the update will be
* scheduled.
*/
Instant performRunningStateUpdateAt(Clock clock, T running);

/**
* Build a new {@linkplain TaskStatus#RUNNING running} task-state with "fresh" {@linkplain
* TaskState#retryNotBefore() retry-not-before} and {@linkplain TaskState#lostNotBefore()
* lost-not-before} timestamps.
*/
TaskState runningTaskState(Clock clock, T running);

/**
* Called when the task execution resulted in an exception, Build a new {@linkplain
* TaskStatus#ERROR_RETRY error-retry}, with "fresh" {@linkplain TaskState#retryNotBefore()
* retry-not-before} timestamp, or {@linkplain TaskStatus#FAILURE failure} task-state.
*/
default TaskState asErrorTaskState(Clock clock, T base, Throwable t) {
return failureState(t.toString());
}

/** Create a new, empty task-object builder. */
B newObjBuilder();

ObjType objType();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (C) 2024 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.nessie.tasks.api;

import static org.projectnessie.nessie.tasks.api.TaskObjUtil.TASK_DEFAULT_CACHE_EXPIRE;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import org.projectnessie.versioned.storage.common.objtypes.CustomObjType.CacheExpireCalculation;
import org.projectnessie.versioned.storage.common.persist.ObjId;
import org.projectnessie.versioned.storage.common.persist.ObjType;
import org.projectnessie.versioned.storage.common.persist.UpdateableObj;

/** Base interface for objects that represent task state and value. */
public interface TaskObj extends UpdateableObj {
/** Retrieve the task-state of this object. */
TaskState taskState();

interface Builder {

@CanIgnoreReturnValue
Builder from(TaskObj base);

@CanIgnoreReturnValue
Builder versionToken(String versionToken);

@CanIgnoreReturnValue
Builder id(ObjId id);

@CanIgnoreReturnValue
Builder type(ObjType type);

@CanIgnoreReturnValue
Builder taskState(TaskState taskState);

TaskObj build();
}

@SuppressWarnings("unchecked")
static <T extends TaskObj> CacheExpireCalculation<T> taskDefaultCacheExpire() {
return (CacheExpireCalculation<T>) TASK_DEFAULT_CACHE_EXPIRE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (C) 2024 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.nessie.tasks.api;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.projectnessie.versioned.storage.common.persist.ObjType.CACHE_UNLIMITED;

import java.time.Instant;
import org.projectnessie.versioned.storage.common.objtypes.CustomObjType;

final class TaskObjUtil {
private TaskObjUtil() {}

static final CustomObjType.CacheExpireCalculation<? extends TaskObj> TASK_DEFAULT_CACHE_EXPIRE =
(obj, currentTimeMicros) -> {
TaskState state = obj.taskState();
if (state.status().isFinal()) {
return CACHE_UNLIMITED;
}

Instant retryNotBefore = state.retryNotBefore();
if (retryNotBefore != null) {
return MILLISECONDS.toMicros(retryNotBefore.toEpochMilli());
}

return CACHE_UNLIMITED;
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (C) 2024 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.nessie.tasks.api;

import java.util.concurrent.CompletionStage;
import org.projectnessie.versioned.storage.common.persist.ObjId;
import org.projectnessie.versioned.storage.common.persist.ObjType;

/** Base interface for value objects that identify a task's input parameters. */
public interface TaskRequest<T extends TaskObj, B extends TaskObj.Builder> {
/** Declares the {@linkplain ObjType object type} for this request. */
ObjType objType();

/**
* Globally identifies the task request across all {@linkplain ObjType object types}).
*
* <p>Implementations must derive the ID from the task type and the task parameters.
*/
ObjId objId();

TaskBehavior<T, B> behavior();

/**
* Start execution of the task, this function must not block and/or wait for the task execution to
* finish.
*
* <p>The implementation is responsible to choose the right scheduling implementation. For
* example: tasks that are supposed to run very long, like 5 seconds or more, must not use a <a
* href="https://javadoc.io/static/io.vertx/vertx-core/4.5.1/io/vertx/core/Vertx.html#executeBlocking-java.util.concurrent.Callable-boolean-">Vert.X's
* {@code executeBlocking()}</a>.
*/
CompletionStage<B> submitExecution();

/** Applies parameters from this request to the object builder. */
default B applyRequestToObjBuilder(B builder) {
return builder;
}
}
Loading

0 comments on commit 61bb1f0

Please sign in to comment.