Skip to content

Commit

Permalink
[FLINK-xxx3][Runtime/State] Declaring local variables in async proces…
Browse files Browse the repository at this point in the history
…sing
  • Loading branch information
Zakelly committed May 13, 2024
1 parent db1283c commit 2209a64
Show file tree
Hide file tree
Showing 23 changed files with 230 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.core.state.InternalStateFuture;
import org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
Expand Down Expand Up @@ -88,6 +89,9 @@ public class AsyncExecutionController<K> implements StateRequestHandler {
/** The state executor where the {@link StateRequest} is actually executed. */
StateExecutor stateExecutor;

/** A manager that allows for declaring processing and variables. */
final DeclarationManager declarationManager;

/** The corresponding context that currently runs in task thread. */
RecordContext<K> currentContext;

Expand All @@ -106,6 +110,7 @@ public AsyncExecutionController(
MailboxExecutor mailboxExecutor,
AsyncFrameworkExceptionHandler exceptionHandler,
StateExecutor stateExecutor,
DeclarationManager declarationManager,
int maxParallelism,
int batchSize,
long bufferTimeout,
Expand All @@ -114,6 +119,7 @@ public AsyncExecutionController(
this.mailboxExecutor = mailboxExecutor;
this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor, exceptionHandler);
this.stateExecutor = stateExecutor;
this.declarationManager = declarationManager;
this.batchSize = batchSize;
this.bufferTimeout = bufferTimeout;
this.maxInFlightRecordNum = maxInFlightRecords;
Expand Down Expand Up @@ -152,13 +158,15 @@ public RecordContext<K> buildContext(Object record, K key) {
RecordContext.EMPTY_RECORD,
key,
this::disposeContext,
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism));
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism),
declarationManager.variableCount());
}
return new RecordContext<>(
record,
key,
this::disposeContext,
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism));
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism),
declarationManager.variableCount());
}

/**
Expand All @@ -169,6 +177,7 @@ public RecordContext<K> buildContext(Object record, K key) {
*/
public void setCurrentContext(RecordContext<K> switchingContext) {
currentContext = switchingContext;
declarationManager.assignVariables(switchingContext);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

/**
Expand Down Expand Up @@ -54,20 +56,31 @@ public class RecordContext<K> extends ReferenceCounted<RecordContext.DisposerRun
/** The keyGroup to which key belongs. */
private final int keyGroup;

private final ArrayList<AtomicReference<?>> declaredVariables;

/**
* The extra context info which is used to hold customized data defined by state backend. The
* state backend can use this field to cache some data that can be used multiple times in
* different stages of asynchronous state execution.
*/
private @Nullable volatile Object extra;

public RecordContext(Object record, K key, Consumer<RecordContext<K>> disposer, int keyGroup) {
public RecordContext(
Object record,
K key,
Consumer<RecordContext<K>> disposer,
int keyGroup,
int variableCount) {
super(0);
this.record = record;
this.key = key;
this.keyOccupied = false;
this.disposer = disposer;
this.keyGroup = keyGroup;
this.declaredVariables = new ArrayList<>(variableCount);
for (int i = 0; i < variableCount; i++) {
declaredVariables.add(null);
}
}

public Object getRecord() {
Expand Down Expand Up @@ -104,6 +117,14 @@ public int getKeyGroup() {
return keyGroup;
}

public AtomicReference<?> getVariableReference(int i) {
return i >= declaredVariables.size() ? null : declaredVariables.get(i);
}

public void setVariableReference(int i, AtomicReference<?> reference) {
declaredVariables.set(i, reference);
}

public void setExtra(Object extra) {
this.extra = extra;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare;
package org.apache.flink.runtime.asyncprocessing.declare;

import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.util.function.FunctionWithException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare;
package org.apache.flink.runtime.asyncprocessing.declare;

import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.ThrowingConsumer;

import java.util.function.Supplier;

/** A context to declare parts of process in user-defined function/operator. */
public class DeclarationContext {

Expand Down Expand Up @@ -76,6 +79,7 @@ public <T, U, V> NamedBiFunction<T, U, V> declare(

/**
* Declaring a processing chain.
*
* @param first the first code block
* @return the chain itself.
* @param <IN> the in type of the first block
Expand All @@ -87,6 +91,21 @@ public <IN, T> DeclarationChain<IN, T>.DeclarationStage<T> declareChain(
return new DeclarationChain<>(this, first).firstStage();
}

/**
* Declare a variable used across the callbacks.
*
* @param type the type information of the variable
* @param name the unique name of this variable
* @param initialValue the initial value when the variable created.
* @return the variable itself that can used by lambdas.
* @param <T> the variable type.
*/
public <T> DeclaredVariable<T> declareVariable(
TypeInformation<T> type, String name, Supplier<T> initialValue)
throws DeclarationException {
return manager.register(type, name, initialValue);
}

DeclarationManager getManager() {
return manager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare;
package org.apache.flink.runtime.asyncprocessing.declare;

/** Exception thrown when something wrong with declaration happens. */
public class DeclarationException extends Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,31 @@
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare;
package org.apache.flink.runtime.asyncprocessing.declare;

import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.ThrowingConsumer;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

/** The manager holds all the declaration information and manage the building procedure. */
public class DeclarationManager {

private final Map<String, NamedCallback> knownCallbacks;

private final Map<String, DeclaredVariable> knownVariables;

private int nextValidNameSequence = 0;

public DeclarationManager() {
this.knownCallbacks = new HashMap<>();
this.knownVariables = new HashMap<>();
}

<T extends NamedCallback> T register(T knownCallback) throws DeclarationException {
Expand All @@ -44,6 +50,16 @@ <T extends NamedCallback> T register(T knownCallback) throws DeclarationExceptio
return knownCallback;
}

<T> DeclaredVariable<T> register(
TypeInformation<T> typeInformation, String name, Supplier<T> initializer)
throws DeclarationException {
DeclaredVariable<T> variable = new DeclaredVariable<>(typeInformation, name, initializer);
if (knownVariables.put(name, variable) != null) {
throw new DeclarationException("Duplicated key " + name);
}
return variable;
}

String nextAssignedName() {
String name;
do {
Expand All @@ -52,10 +68,26 @@ String nextAssignedName() {
return name;
}

public <T> ThrowingConsumer<StreamRecord<T>, Exception> buildProcess(
public void assignVariables(RecordContext<?> context) {
int i = 0;
for (DeclaredVariable variable : knownVariables.values()) {
AtomicReference reference = context.getVariableReference(i);
if (reference == null) {
reference = new AtomicReference(variable.initializer.get());
context.setVariableReference(i, reference);
}
variable.setReference(reference);
}
}

public int variableCount() {
return knownVariables.size();
}

public <T> ThrowingConsumer<T, Exception> buildProcess(
FunctionWithException<
DeclarationContext,
ThrowingConsumer<StreamRecord<T>, Exception>,
ThrowingConsumer<T, Exception>,
DeclarationException>
declaringMethod) {
final DeclarationContext context = new DeclarationContext(this);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.asyncprocessing.declare;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

/** A variable declared in async state processing. The value could be persisted in checkpoint. */
public class DeclaredVariable<T> {

final TypeInformation<T> typeInformation;

final String name;

final Supplier<T> initializer;

AtomicReference<T> reference;

DeclaredVariable(TypeInformation<T> typeInformation, String name, Supplier<T> initializer) {
this.typeInformation = typeInformation;
this.name = name;
this.initializer = initializer;
}

void setReference(AtomicReference<T> reference) {
this.reference = reference;
}

public T get() {
return reference.get();
}

public void set(T newValue) {
reference.set(newValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare;
package org.apache.flink.runtime.asyncprocessing.declare;

import org.apache.flink.util.function.BiFunctionWithException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare;
package org.apache.flink.runtime.asyncprocessing.declare;

/** A named callback that can be identified and checkpoint. */
public abstract class NamedCallback {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare;
package org.apache.flink.runtime.asyncprocessing.declare;

import org.apache.flink.util.function.ThrowingConsumer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare;
package org.apache.flink.runtime.asyncprocessing.declare;

import org.apache.flink.util.function.FunctionWithException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ void setup(
mailboxExecutor,
exceptionHandler,
stateExecutor,
null,
128,
batchSize,
timeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ public void testComplex() {

private <K> RecordContext<K> buildRecordContext(Object record, K key) {
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, 128);
return new RecordContext<>(record, key, (e) -> {}, keyGroup);
return new RecordContext<>(record, key, (e) -> {}, keyGroup, 0);
}

/** A runner that performs single-step debugging. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public <IN, OUT> InternalStateFuture<OUT> handleRequest(

protected ContextKey<Integer> buildContextKey(int i) {
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(i, 128);
RecordContext<Integer> recordContext = new RecordContext<>(i, i, t -> {}, keyGroup);
RecordContext<Integer> recordContext = new RecordContext<>(i, i, t -> {}, keyGroup, 0);
return new ContextKey<>(recordContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void testExecuteValueStateRequest() throws Exception {
V value,
R record) {
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, 128);
RecordContext<K> recordContext = new RecordContext<>(record, key, t -> {}, keyGroup);
RecordContext<K> recordContext = new RecordContext<>(record, key, t -> {}, keyGroup, 0);
TestStateFuture stateFuture = new TestStateFuture<>();
return new StateRequest<>(innerTable, requestType, value, stateFuture, recordContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public final ThrowingConsumer<StreamRecord<IN>, Exception> getRecordProcessor(in
(KeySelector) stateKeySelector,
((AsyncStateProcessingOperator) owner)
.getDeclarationManager()
.<IN>buildProcess(
.<StreamRecord<IN>>buildProcess(
((DeclarativeProcessingInput<IN>) this)::declareProcess));
} else {
return AsyncStateProcessing.<IN>makeRecordProcessor(
Expand Down
Loading

0 comments on commit 2209a64

Please sign in to comment.