forked from apache/flink
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[FLINK-34986][Runtime/State] Basic implementation of AEC, RecordConte…
…xt and reference counting
- Loading branch information
Showing
19 changed files
with
831 additions
and
556 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
144 changes: 144 additions & 0 deletions
144
...time/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.runtime.asyncprocessing; | ||
|
||
import org.apache.flink.api.common.operators.MailboxExecutor; | ||
import org.apache.flink.api.common.state.v2.State; | ||
import org.apache.flink.core.state.InternalStateFuture; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import javax.annotation.Nullable; | ||
|
||
/** | ||
* The Async Execution Controller (AEC) receives processing requests from operators, and put them | ||
* into execution according to some strategies. | ||
* | ||
* <p>It is responsible for: | ||
* <li>Preserving the sequence of elements bearing the same key by delaying subsequent requests | ||
* until the processing of preceding ones is finalized. | ||
* <li>Tracking the in-flight data(records) and blocking the input if too much data in flight | ||
* (back-pressure). It invokes {@link MailboxExecutor#yield()} to pause current operations, | ||
* allowing for the execution of callbacks (mails in Mailbox). | ||
* | ||
* @param <R> the type of the record | ||
* @param <K> the type of the key | ||
*/ | ||
public class AsyncExecutionController<R, K> { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class); | ||
|
||
public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000; | ||
|
||
/** The max allow number of in-flight records. */ | ||
private final int maxInFlightRecordNum; | ||
|
||
/** The key accounting unit which is used to detect the key conflict. */ | ||
final KeyAccountingUnit<R, K> keyAccountingUnit; | ||
|
||
/** | ||
* A factory to build {@link org.apache.flink.core.state.InternalStateFuture}, this will auto | ||
* wire the created future with mailbox executor. Also conducting the context switch. | ||
*/ | ||
private final StateFutureFactory<R, K> stateFutureFactory; | ||
|
||
/** The state executor where the {@link StateRequest} is actually executed. */ | ||
final StateExecutor stateExecutor; | ||
|
||
/** The corresponding context that currently runs in task thread. */ | ||
RecordContext<R, K> currentContext; | ||
|
||
public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { | ||
this(mailboxExecutor, stateExecutor, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); | ||
} | ||
|
||
public AsyncExecutionController( | ||
MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int maxInFlightRecords) { | ||
this.keyAccountingUnit = new KeyAccountingUnit<>(); | ||
this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor); | ||
this.stateExecutor = stateExecutor; | ||
this.maxInFlightRecordNum = maxInFlightRecords; | ||
LOG.info("Create AsyncExecutionController: maxInFlightRecordsNum {}", maxInFlightRecords); | ||
} | ||
|
||
/** | ||
* Build a new context based on record and key. Also wired with internal {@link | ||
* KeyAccountingUnit}. | ||
* | ||
* @param record the given record. | ||
* @param key the given key. | ||
* @return the built record context. | ||
*/ | ||
public RecordContext<R, K> buildContext(R record, K key) { | ||
return new RecordContext<>(keyAccountingUnit, record, key); | ||
} | ||
|
||
/** | ||
* Each time before a code segment (callback) is about to run in mailbox (task thread), this | ||
* method should be called to switch a context in AEC. | ||
* | ||
* @param switchingContext the context to switch. | ||
*/ | ||
public void setCurrentContext(RecordContext<R, K> switchingContext) { | ||
currentContext = switchingContext; | ||
} | ||
|
||
/** | ||
* Submit a {@link StateRequest} to this AEC and trigger if needed. | ||
* | ||
* @param state the state to request. | ||
* @param type the type of this request. | ||
* @param payload the payload input for this request. | ||
* @return the state future. | ||
*/ | ||
public <IN, OUT> InternalStateFuture<OUT> handleRequest( | ||
@Nullable State state, StateRequest.RequestType type, @Nullable IN payload) { | ||
// Step 1: build state future & assign context. | ||
InternalStateFuture<OUT> stateFuture = stateFutureFactory.build(currentContext); | ||
StateRequest<K, IN, OUT> request = | ||
new StateRequest<>(state, type, payload, stateFuture, currentContext); | ||
// Step 2: try to occupy the key and place it into right buffer. | ||
if (currentContext.tryOccupyKey()) { | ||
insertActiveBuffer(request); | ||
} else { | ||
insertBlockingBuffer(request); | ||
} | ||
// Step 3: trigger the (active) buffer if needed. | ||
triggerIfNeeded(false); | ||
return stateFuture; | ||
} | ||
|
||
<IN, OUT> void insertActiveBuffer(StateRequest<K, IN, OUT> request) { | ||
// TODO: implement the active buffer. | ||
} | ||
|
||
<IN, OUT> void insertBlockingBuffer(StateRequest<K, IN, OUT> request) { | ||
// TODO: implement the blocking buffer. | ||
} | ||
|
||
/** | ||
* Trigger a batch of requests. | ||
* | ||
* @param force whether to trigger requests in force. | ||
*/ | ||
void triggerIfNeeded(boolean force) { | ||
// TODO: implement the trigger logic. | ||
} | ||
} |
75 changes: 75 additions & 0 deletions
75
...untime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.runtime.asyncprocessing; | ||
|
||
import org.apache.flink.core.state.StateFutureImpl; | ||
|
||
/** | ||
* A state future that holds the {@link RecordContext} and maintains the reference count of it. The | ||
* reason why we maintain the reference here is that the ContextStateFutureImpl can be created | ||
* multiple times since user may chain their code wildly, some of which are only for internal usage | ||
* (See {@link StateFutureImpl}). So maintaining reference counting by the lifecycle of state future | ||
* is relatively simple and less error-prone. | ||
* | ||
* <p>Reference counting added on {@link RecordContext} follows: | ||
* <li>1. +1 when this future created. | ||
* <li>2. -1 when future completed. | ||
* <li>3. +1 when callback registered. | ||
* <li>4. -1 when callback finished. | ||
*/ | ||
public class ContextStateFutureImpl<T> extends StateFutureImpl<T> { | ||
|
||
private final RecordContext<?, ?> recordContext; | ||
|
||
ContextStateFutureImpl(CallbackRunner callbackRunner, RecordContext<?, ?> recordContext) { | ||
super(callbackRunner); | ||
this.recordContext = recordContext; | ||
// When state request submitted, ref count +1, as described in FLIP-425: | ||
// To cover the statements without a callback, in addition to the reference count marked | ||
// in Fig.5, each state request itself is also protected by a paired reference count. | ||
recordContext.retain(); | ||
} | ||
|
||
@Override | ||
public <A> StateFutureImpl<A> makeNewStateFuture() { | ||
return new ContextStateFutureImpl<>(callbackRunner, recordContext); | ||
} | ||
|
||
@Override | ||
public void callbackRegistered() { | ||
// When a callback registered, as shown in Fig.5 of FLIP-425, at the point of 3 and 5, the | ||
// ref count +1. | ||
recordContext.retain(); | ||
} | ||
|
||
@Override | ||
public void postComplete() { | ||
// When a state request completes, ref count -1, as described in FLIP-425: | ||
// To cover the statements without a callback, in addition to the reference count marked | ||
// in Fig.5, each state request itself is also protected by a paired reference count. | ||
recordContext.release(); | ||
} | ||
|
||
@Override | ||
public void callbackFinished() { | ||
// When a callback ends, as shown in Fig.5 of FLIP-425, at the | ||
// point of 2,4 and 6, the ref count -1. | ||
recordContext.release(); | ||
} | ||
} |
Oops, something went wrong.