Skip to content

Commit

Permalink
[FLINK-36928] Split the action of Epoch into two parts (trigger & final)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zakelly committed Dec 18, 2024
1 parent 12ee299 commit d7459da
Show file tree
Hide file tree
Showing 15 changed files with 397 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,20 +196,38 @@ public AsyncExecutionController(
* @return the built record context.
*/
public RecordContext<K> buildContext(Object record, K key) {
return buildContext(record, key, false);
}

/**
* Build a new context based on record and key. Also wired with internal {@link
* KeyAccountingUnit}.
*
* @param record the given record.
* @param key the given key.
* @param inheritEpoch whether to inherit epoch from the current context. Or otherwise create a
* new one.
* @return the built record context.
*/
public RecordContext<K> buildContext(Object record, K key, boolean inheritEpoch) {
if (record == null) {
return new RecordContext<>(
RecordContext.EMPTY_RECORD,
key,
this::disposeContext,
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism),
epochManager.onRecord());
inheritEpoch
? epochManager.onEpoch(currentContext.getEpoch())
: epochManager.onRecord());
}
return new RecordContext<>(
record,
key,
this::disposeContext,
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism),
epochManager.onRecord());
inheritEpoch
? epochManager.onEpoch(currentContext.getEpoch())
: epochManager.onRecord());
}

/**
Expand Down Expand Up @@ -430,16 +448,31 @@ private void notifyNewMail() {
}
}

public void processNonRecord(ThrowingRunnable<? extends Exception> action) {
Runnable wrappedAction =
() -> {
try {
action.run();
} catch (Exception e) {
exceptionHandler.handleException("Failed to process non-record.", e);
}
};
epochManager.onNonRecord(wrappedAction, epochParallelMode);
public void processNonRecord(
@Nullable ThrowingRunnable<? extends Exception> triggerAction,
@Nullable ThrowingRunnable<? extends Exception> finalAction) {
epochManager.onNonRecord(
triggerAction == null
? null
: () -> {
try {
triggerAction.run();
} catch (Exception e) {
exceptionHandler.handleException(
"Failed to process non-record.", e);
}
},
finalAction == null
? null
: () -> {
try {
finalAction.run();
} catch (Exception e) {
exceptionHandler.handleException(
"Failed to process non-record.", e);
}
},
epochParallelMode);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,20 @@ public enum ParallelMode {
/** Current active epoch, only one active epoch at the same time. */
Epoch activeEpoch;

/** The epoch that will be provided to new records. */
@Nullable Epoch overrideEpoch;

/** The flag that prevent the recursive call of {@link #tryFinishInQueue()}. */
boolean recursiveFlag;

public EpochManager(AsyncExecutionController<?> aec) {
this.epochNum = 0;
this.outputQueue = new LinkedList<>();
this.asyncExecutionController = aec;
// init an empty epoch, the epoch action will be updated when non-record is received.
this.activeEpoch = new Epoch(epochNum++);
this.overrideEpoch = null;
this.recursiveFlag = false;
}

/**
Expand All @@ -83,23 +91,42 @@ public EpochManager(AsyncExecutionController<?> aec) {
* @return the current open epoch.
*/
public Epoch onRecord() {
activeEpoch.ongoingRecordCount++;
return activeEpoch;
if (overrideEpoch != null) {
overrideEpoch.ongoingRecordCount++;
return overrideEpoch;
} else {
activeEpoch.ongoingRecordCount++;
return activeEpoch;
}
}

/**
* Add a record to a specified epoch.
*
* @param epoch the specified epoch.
* @return the specified epoch itself.
*/
public Epoch onEpoch(Epoch epoch) {
epoch.ongoingRecordCount++;
return epoch;
}

/**
* Add a non-record to the current epoch, close current epoch and open a new epoch. Must be
* invoked within task thread.
*
* @param action the action associated with this non-record.
* @param triggerAction the action associated with this non-record.
* @param parallelMode the parallel mode for this epoch.
*/
public void onNonRecord(Runnable action, ParallelMode parallelMode) {
public void onNonRecord(
@Nullable Runnable triggerAction,
@Nullable Runnable finalAction,
ParallelMode parallelMode) {
LOG.trace(
"on NonRecord, old epoch: {}, outputQueue size: {}",
activeEpoch,
outputQueue.size());
switchActiveEpoch(action);
switchActiveEpoch(triggerAction, finalAction);
if (parallelMode == ParallelMode.SERIAL_BETWEEN_EPOCH) {
asyncExecutionController.drainInflightRecords(0);
}
Expand All @@ -118,19 +145,34 @@ public void completeOneRecord(Epoch epoch) {
}

private void tryFinishInQueue() {
// We don't permit recursive call of this method.
if (recursiveFlag) {
return;
}
recursiveFlag = true;
// If one epoch has been closed before and all records in
// this epoch have finished, the epoch will be removed from the output queue.
while (!outputQueue.isEmpty() && outputQueue.peek().tryFinish()) {
LOG.trace(
"Finish epoch: {}, outputQueue size: {}",
outputQueue.peek(),
outputQueue.size());
outputQueue.pop();
while (!outputQueue.isEmpty()) {
Epoch epoch = outputQueue.peek();
// The epoch is override for inheritance during possible trigger action.
overrideEpoch = epoch;
try {
if (epoch.tryFinish()) {
outputQueue.pop();
} else {
break;
}
} finally {
// Clear the override
overrideEpoch = null;
}
}
recursiveFlag = false;
}

private void switchActiveEpoch(Runnable action) {
activeEpoch.close(action);
private void switchActiveEpoch(
@Nullable Runnable triggerAction, @Nullable Runnable finalAction) {
activeEpoch.close(triggerAction, finalAction);
outputQueue.offer(activeEpoch);
this.activeEpoch = new Epoch(epochNum++);
tryFinishInQueue();
Expand All @@ -151,7 +193,15 @@ enum EpochStatus {
/**
* One epoch can only be finished when it meets the following three conditions. 1. The
* records of this epoch have finished execution. 2. The epoch is closed. 3. The epoch is in
* the front of outputQueue.
* the front of outputQueue. When the status transit from {@link #CLOSED} to {@link
* #FINISHING}, a trigger action will go.
*/
FINISHING,

/**
* After the action is triggered, there might be more async process bind to this epoch.
* After all these process finished, a final action will go and the epoch will fall into
* {@link #FINISHED} status.
*/
FINISHED
}
Expand All @@ -167,43 +217,78 @@ public static class Epoch {
/** The number of records that are still ongoing in this epoch. */
int ongoingRecordCount;

/** The action associated with non-record of this epoch(e.g. advance watermark). */
@Nullable Runnable action;
/** The action associated with non-record of this epoch(e.g. triggering timer). */
@Nullable Runnable triggerAction;

/**
* The action when we finish this epoch and the triggerAction as well as any async
* processing.
*/
@Nullable Runnable finalAction;

EpochStatus status;

public Epoch(long id) {
this.id = id;
this.ongoingRecordCount = 0;
this.status = EpochStatus.OPEN;
this.action = null;
this.triggerAction = null;
this.finalAction = null;
}

/**
* Try to finish this epoch.
* Try to finish this epoch. This is the core logic of triggering actions. The state machine
* and timeline are as follows:
*
* <pre>
* Action: close() triggerAction wait finalAction
* Statue: OPEN ----- CLOSED ----------FINISHING -------- FINISHED -----------
* </pre>
*
* @return whether this epoch has been normally finished.
*/
boolean tryFinish() {
if (this.status == EpochStatus.FINISHED) {
if (status == EpochStatus.FINISHED) {
// This epoch has been finished for some reason, but it is not finished here.
// Preventing recursive call of #tryFinishInQueue().
return false;
}
if (ongoingRecordCount == 0 && this.status == EpochStatus.CLOSED) {
this.status = EpochStatus.FINISHED;
if (action != null) {
action.run();
if (ongoingRecordCount == 0) {
if (status == EpochStatus.CLOSED) {
// CLOSED -> FINISHING
transition(EpochStatus.FINISHING);
if (triggerAction != null) {
// trigger action will use {@link overrideEpoch}.
triggerAction.run();
}
}
// After the triggerAction run, if there is no new async process, the
// ongoingRecordCount remains 0, then the status should transit to FINISHED.
// Otherwise, we will reach here when ongoingRecordCount reaches 0 again.
if (ongoingRecordCount == 0 && status == EpochStatus.FINISHING) {
// FINISHING -> FINISHED
transition(EpochStatus.FINISHED);
if (finalAction != null) {
finalAction.run();
}
}
return true;
return status == EpochStatus.FINISHED;
}
return false;
}

/** Close this epoch. */
void close(Runnable action) {
this.action = action;
this.status = EpochStatus.CLOSED;
void transition(EpochStatus newStatus) {
if (status != newStatus) {
LOG.trace("Epoch {} transit from {} to {}", this, status, newStatus);
status = newStatus;
}
}

/** Close this epoch with defined triggerAction and finalAction. */
void close(@Nullable Runnable triggerAction, @Nullable Runnable finalAction) {
this.triggerAction = triggerAction;
this.finalAction = finalAction;
transition(EpochStatus.CLOSED);
}

public String toString() {
Expand Down
Loading

0 comments on commit d7459da

Please sign in to comment.