-
Notifications
You must be signed in to change notification settings - Fork 200
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* created batch-sliding-window * renamed package * Added activities and workflows * Ported activities and workflows from Java * sliding window progress * simplistic test passes * Failure to drain reproduction * Added error handling to record_processor_workflow * added sort * Fixed batch partitions * Added description to README * Fixed nondeterminism errors
- Loading branch information
Showing
7 changed files
with
534 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
## Sliding Window Batch Sample | ||
|
||
A sample implementation of a batch processing Workflow that maintains a sliding window of record processing Workflows. | ||
|
||
A SlidingWindowWorkflow starts a configured number (sliding window size) of RecordProcessorWorkflow children in parallel. | ||
Each child processes a single record. When a child completes a new child is started. | ||
|
||
A SlidingWindowWorkflow calls continue-as-new after starting a preconfigured number of children to keep its history size bounded. | ||
A RecordProcessorWorkflow reports its completion through a Signal to its parent. | ||
This allows to notify a parent that called continue-as-new. | ||
|
||
A single instance of SlidingWindowWorkflow has limited window size and throughput. | ||
To support larger window size and overall throughput multiple instances of SlidingWindowWorkflow run in parallel. | ||
|
||
#### Running the Sliding Window Batch Sample | ||
|
||
Make sure the [Temporal Server is running locally](https://docs.temporal.io/application-development/foundations#run-a-development-cluster). | ||
|
||
From the root of the project, start a Worker: | ||
|
||
```bash | ||
go run batch-sliding-window/worker/main.go | ||
``` | ||
|
||
Start the Workflow Execution: | ||
|
||
```bash | ||
go run batch-sliding-window/starter/main.go | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
package batch_sliding_window | ||
|
||
import ( | ||
"fmt" | ||
"go.temporal.io/sdk/temporal" | ||
"go.temporal.io/sdk/workflow" | ||
"time" | ||
) | ||
|
||
// ProcessBatchWorkflowInput input of the ProcessBatchWorkflow. | ||
// A single input structure is preferred to multiple workflow arguments to simplify backward compatible API changes. | ||
type ProcessBatchWorkflowInput struct { | ||
PageSize int // Number of children started by a single sliding window workflow run | ||
SlidingWindowSize int // Maximum number of children to run in parallel. | ||
Partitions int // How many sliding windows to run in parallel. | ||
} | ||
|
||
// ProcessBatchWorkflow sample Partitions the data set into continuous ranges. | ||
// A real application can choose any other way to divide the records into multiple collections. | ||
func ProcessBatchWorkflow(ctx workflow.Context, input ProcessBatchWorkflowInput) (processed int, err error) { | ||
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ | ||
StartToCloseTimeout: 5 * time.Second, | ||
}) | ||
|
||
var recordLoader *RecordLoader // RecordLoader activity reference | ||
var recordCount int | ||
err = workflow.ExecuteActivity(ctx, recordLoader.GetRecordCount).Get(ctx, &recordCount) | ||
if err != nil { | ||
return 0, err | ||
} | ||
|
||
if input.SlidingWindowSize < input.Partitions { | ||
return 0, temporal.NewApplicationError( | ||
"SlidingWindowSize cannot be less than number of partitions", "invalidInput") | ||
} | ||
partitions := divideIntoPartitions(recordCount, input.Partitions) | ||
windowSizes := divideIntoPartitions(input.SlidingWindowSize, input.Partitions) | ||
|
||
workflow.GetLogger(ctx).Info("ProcessBatchWorkflow", | ||
"input", input, | ||
"recordCount", recordCount, | ||
"partitions", partitions, | ||
"windowSizes", windowSizes) | ||
|
||
var results []workflow.ChildWorkflowFuture | ||
offset := 0 | ||
for i := 0; i < input.Partitions; i++ { | ||
// Makes child id more user-friendly | ||
childId := fmt.Sprintf("%s/%d", workflow.GetInfo(ctx).WorkflowExecution.ID, i) | ||
childCtx := workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{WorkflowID: childId}) | ||
// Define partition boundaries. | ||
maximumPartitionOffset := offset + partitions[i] | ||
if maximumPartitionOffset > recordCount { | ||
maximumPartitionOffset = recordCount | ||
} | ||
input := SlidingWindowWorkflowInput{ | ||
PageSize: input.PageSize, | ||
SlidingWindowSize: windowSizes[i], | ||
Offset: offset, // inclusive | ||
MaximumOffset: maximumPartitionOffset, // exclusive | ||
} | ||
child := workflow.ExecuteChildWorkflow(childCtx, SlidingWindowWorkflow, input) | ||
results = append(results, child) | ||
offset += partitions[i] | ||
} | ||
// Waits for all child workflows to complete | ||
result := 0 | ||
for _, partitionResult := range results { | ||
var r int | ||
err := partitionResult.Get(ctx, &r) // blocks until the child completion | ||
if err != nil { | ||
return 0, err | ||
} | ||
result += r | ||
} | ||
return result, nil | ||
} | ||
|
||
func divideIntoPartitions(number int, n int) []int { | ||
base := number / n | ||
remainder := number % n | ||
partitions := make([]int, n) | ||
|
||
for i := 0; i < n; i++ { | ||
partitions[i] = base | ||
} | ||
|
||
for i := 0; i < remainder; i++ { | ||
partitions[i] += 1 | ||
} | ||
|
||
return partitions | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
package batch_sliding_window | ||
|
||
import "fmt" | ||
|
||
type ( | ||
// RecordLoader activities structure. | ||
RecordLoader struct { | ||
RecordCount int | ||
} | ||
|
||
GetRecordsInput struct { | ||
PageSize int | ||
Offset int | ||
MaxOffset int | ||
} | ||
|
||
SingleRecord struct { | ||
Id int | ||
} | ||
|
||
GetRecordsOutput struct { | ||
Records []SingleRecord | ||
} | ||
) | ||
|
||
// GetRecordCount activity returns the total record count. | ||
// Used to partition processing across parallel sliding windows. | ||
// The sample implementation just returns a fake value passed during worker initialization. | ||
func (p *RecordLoader) GetRecordCount() (int, error) { | ||
return p.RecordCount, nil | ||
} | ||
|
||
// GetRecords activity returns records loaded from an external data source. The sample returns fake records. | ||
func (p *RecordLoader) GetRecords(input GetRecordsInput) (output GetRecordsOutput, err error) { | ||
if input.MaxOffset > p.RecordCount { | ||
panic(fmt.Sprintf("maxOffset(%d)>recordCount(%d", input.MaxOffset, p.RecordCount)) | ||
} | ||
var records []SingleRecord | ||
limit := input.Offset + input.PageSize | ||
if limit > input.MaxOffset { | ||
limit = input.MaxOffset | ||
} | ||
for i := input.Offset; i < limit; i++ { | ||
records = append(records, SingleRecord{Id: i}) | ||
} | ||
return GetRecordsOutput{Records: records}, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
package batch_sliding_window | ||
|
||
import ( | ||
"go.temporal.io/sdk/workflow" | ||
"math/rand" | ||
"time" | ||
) | ||
|
||
// RecordProcessorWorkflow workflow that implements processing of a single record. | ||
func RecordProcessorWorkflow(ctx workflow.Context, r SingleRecord) error { | ||
err := ProcessRecord(ctx, r) | ||
// Notify parent about completion via signal | ||
parent := workflow.GetInfo(ctx).ParentWorkflowExecution | ||
// This workflow is always expected to have a parent. | ||
// But for unit testing it might be useful to skip the notification if there is none. | ||
if parent != nil { | ||
// Doesn't specify runId as parent calls continue-as-new. | ||
signaled := workflow.SignalExternalWorkflow(ctx, parent.ID, "", "ReportCompletion", r.Id) | ||
// Ensure that signal is delivered. | ||
// Completing workflow before this Future is ready might lead to the signal loss. | ||
signalErr := signaled.Get(ctx, nil) | ||
if signalErr != nil { | ||
return signalErr | ||
} | ||
} | ||
return err | ||
} | ||
|
||
// ProcessRecord simulates application specific record processing. | ||
func ProcessRecord(ctx workflow.Context, r SingleRecord) error { | ||
// Simulate some processing | ||
|
||
// Use SideEffect to get a random number to ensure workflow determinism. | ||
encodedRandom := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { | ||
//workflowcheck:ignore | ||
return rand.Intn(10) | ||
}) | ||
var random int | ||
err := encodedRandom.Get(&random) | ||
if err != nil { | ||
return err | ||
} | ||
err = workflow.Sleep(ctx, time.Duration(random)*time.Second) | ||
if err != nil { | ||
return err | ||
} | ||
workflow.GetLogger(ctx).Info("Processed ", r) | ||
return nil | ||
} |
Oops, something went wrong.