Skip to content

Commit

Permalink
Add BatchedForkedArray and other changes (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
0xLeif authored Jun 23, 2023
1 parent faa0f4c commit 58c39d9
Show file tree
Hide file tree
Showing 9 changed files with 395 additions and 27 deletions.
72 changes: 53 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ When using Fork, functions will be ran in parallel and higher order forks will a

## Objects
- `Fork`: Using a single input create two separate async functions that return `LeftOutput` and `RightOutput`.
- `ForkedArray`: Using a single array and a single async function, parallelize the work for each value of the array.
- `BatchedForkedArray`: Using a single array and a single async function, batch the parallelized work for each value of the array
- `ForkedActor`: Using a single actor create two separate async functions that are passed the actor.
- `KeyPathActor`: A generic Actor that uses KeyPaths to update and set values.
- `ForkedArray`: Using a single array and a single async function, parallelize the work for each value of the array.

## Basic usage

Expand All @@ -72,21 +73,64 @@ let rightOutput = try await fork.right()
XCTAssertEqual(leftOutput, true)
XCTAssertEqual(rightOutput, "10")

let mergedFork: () async throws -> String = fork.merge(
using: { bool, string in
if bool {
return string + string
}

return string
let output: String = try await fork.merged { bool, string in
if bool {
return string + string
}
)

return string
}

let output = await mergedFork()

XCTAssertEqual(output, "1010")
```

## ForkedArray Example

A ForkedArray makes it easy to perform an asynchronous function on all of the elements in an Array. ForkedArray helps with the [example](#why-use-fork) above.

```swift
let forkedArray = ForkedArray(photoNames, map: downloadPhoto(named:))
let photos = try await forkedArray.output()
```

## BatchedForkedArray

The `BatchedForkedArray` allows you to efficiently parallelize and batch process an array of values using an async function. It provides methods for both resolving the parallelized array in a single output as well as streaming the batches of the resolved array.


```swift
let batchedForkedArray = BatchedForkedArray(photoNames, batch: 3, map: downloadPhoto(named:))
let photos = try await forkedArray.output()
```

In the above example, we create an instance of `BatchedForkedArray` with a batch size of 3 and the downloadPhoto function as the map closure.

To resolve the batched array, we use the `output()` method, which executes the downloadPhoto function on each batch of photo names in parallel. After the resolution is complete, the `photos` array will contain the downloaded photos in the order they were processed.


```swift
let photoNames = [Int](0 ..< 100)

let batchedForkedArray = BatchedForkedArray(
photoNames,
batch: 5,
map: downloadPhoto(named:)
)

for try await batch in batchedForkedArray.stream() {
for photo in batch {
// Perform operations on each photo in the batch
print(photo)
}
}
```

In this example, we create an instance of `BatchedForkedArray` with a batch size of 5 and the `downloadPhoto(named:)` function as the map closure. By using the `stream()` method, we can iterate over batches of photo names asynchronously.

Within the for-await loop, each batch of photo names is processed asynchronously. We then iterate over each photo in the batch and perform operations accordingly. This allows for efficient processing of large datasets in batches while controlling the number of parallel processes running at once.

## ForkedActor Example

```swift
Expand Down Expand Up @@ -143,16 +187,6 @@ let actorValue = try await forkedActor.act().value
XCTAssertEqual(actorValue, 3)
```

## ForkedArray Example

A ForkedArray makes it easy to perform an asynchronous function on all of the elements in an Array. ForkedArray helps with the [example](#why-use-fork) above.

```swift
let forkedArray = ForkedArray(photoNames, output: downloadPhoto(named:))
let photos = try await forkedArray.output()
```


## Extra Examples

- [Vapor ForkedActor Example](https://github.com/0xLeif/VaporForkDemo)
Expand Down
88 changes: 88 additions & 0 deletions Sources/Fork/BatchedForkedArray.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/// Using a single array and a single async function, batch the parallelized work for each value of the array
public struct BatchedForkedArray<Value, Output> {
private let batchedArray: [[Value]]
private let filter: (Value) async throws -> Bool
private let map: (Value) async throws -> Output

/// Create a ``BatchedForkedArray`` using a single `Array`
/// - Parameters:
/// - array: The `Array` to be used in creating the output
/// - batch: The number of elements to batch together. (The minimum value is 1)
/// - filter: An `async` closure that determines if the value should be used or not
/// - map: An `async` closure that uses the `Array.Element` as its input
public init(
_ array: [Value],
batch: UInt,
filter: @escaping (Value) async throws -> Bool,
map: @escaping (Value) async throws -> Output
) {
var index: Int = 0
let batchLimit: UInt = max(batch, 1)
let batchedArray: [[Value]]

batchedArray = array.reduce(into: []) { partialResult, value in
guard partialResult.isEmpty == false else {
partialResult.append([value])
return
}

guard partialResult[index].count < batchLimit else {
partialResult.append([value])
return index += 1
}

partialResult[index].append(value)
}

self.batchedArray = batchedArray
self.filter = filter
self.map = map
}

/// Asynchronously resolve the forked array
///
/// - Returns: The resolved array after performing the batched operations
public func output() async throws -> [Output] {
var batchedOutput: [[Output]] = []

for batch in batchedArray {
let batchedValues = try await batch.asyncFilter(filter).asyncMap(map)

batchedOutput.append(batchedValues)
}

return batchedOutput.flatMap(identity)
}

/// Stream the forked array asynchronously
///
/// - Returns: An AsyncThrowingStream object that yields batches of the resolved array
public func stream() -> AsyncThrowingStream<[Output], Error> {
AsyncThrowingStream { continuation in
Task {
for batch in batchedArray {
let batchedValues = try await batch.asyncFilter(filter).asyncMap(map)

continuation.yield(batchedValues)
}

continuation.finish()
}
}
}
}

extension BatchedForkedArray {
/// Create a ``BatchedForkedArray`` using a single `Array`
/// - Parameters:
/// - array: The `Array` to be used in creating the output
/// - batch: The number of elements to batch together. (The minimum value is 1)
/// - map: An `async` closure that uses the `Array.Element` as its input
public init(
_ array: [Value],
batch: UInt,
map: @escaping (Value) async throws -> Output
) {
self.init(array, batch: batch, filter: { _ in true }, map: map)
}
}
72 changes: 72 additions & 0 deletions Sources/Fork/Extensions/Sequence+BatchedForkedArray.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
extension Sequence {
/// Create a ``BatchedForkedArray`` from the current `Sequence`
public func batchedFork<Output>(
batch: UInt,
filter: @escaping (Element) async throws -> Bool,
map: @escaping (Element) async throws -> Output
) -> BatchedForkedArray<Element, Output> {
BatchedForkedArray(
Array(self),
batch: batch,
filter: filter,
map: map
)
}

/// Create a ``BatchedForkedArray`` from the current `Sequence`
public func batchedFork<Output>(
batch: UInt,
map: @escaping (Element) async throws -> Output
) -> BatchedForkedArray<Element, Output> {
batchedFork(batch: batch, filter: { _ in true }, map: map)
}

/// Create a ``BatchedForkedArray`` from the current `Sequence` and get the Output Array
public func batchedForked<Output>(
batch: UInt,
filter: @escaping (Element) async throws -> Bool,
map: @escaping (Element) async throws -> Output
) async throws -> [Output] {
try await fork(filter: filter, map: map).output()
}

/// Create a ``BatchedForkedArray`` from the current `Sequence` and get the Output Array
public func batchedForked<Output>(
batch: UInt,
map: @escaping (Element) async throws -> Output
) async throws -> [Output] {
try await batchedForked(batch: batch, filter: { _ in true }, map: map)
}

/// Returns an array containing the results of mapping the given closure over the sequence’s elements.
public func asyncBatchedMap<Output>(
batch: UInt,
_ transform: @escaping (Element) async throws -> Output
) async throws -> [Output] {
try await batchedFork(batch: batch, map: transform).output()
}

/// Returns an array containing the results, that aren't nil, of mapping the given closure over the sequence’s elements.
public func asyncBatchedCompactMap<Output>(
batch: UInt,
_ transform: @escaping (Element) async throws -> Output?
) async throws -> [Output] {
try await batchedFork(batch: batch, map: transform).output().compactMap { $0 }
}

/// Returns an array containing only the true results from the given closure over the sequence’s elements.
public func asyncBatchedFilter(
batch: UInt,
_ isIncluded: @escaping (Element) async throws -> Bool
) async throws -> [Element] {
try await batchedFork(batch: batch, filter: isIncluded, map: identity).output()
}

/// Calls the given closure for each of the elements in the Sequence. This function uses ``BatchedForkedArray`` and will be parallelized when possible.
public func asyncBatchedForEach(
batch: UInt,
_ transform: @escaping (Element) async throws -> Void
) async throws {
_ = try await asyncBatchedMap(batch: batch, transform)
}
}
4 changes: 2 additions & 2 deletions Sources/Fork/Fork.swift
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ extension Fork {
public func merged() async throws where LeftOutput == Void, RightOutput == Void {
try await merged(using: { _, _ in () })
}

/// Merge the ``Fork`` and return the `RightOutput` when `LeftOutput` is Void
public func merged() async throws -> RightOutput where LeftOutput == Void {
try await merged(using: { _, output in output })
}

/// Merge the ``Fork`` and return the `LeftOutput` when `RightOutput` is Void
public func merged() async throws -> LeftOutput where RightOutput == Void {
try await merged(using: { output, _ in output })
Expand Down
1 change: 0 additions & 1 deletion Sources/Fork/ForkedActor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,4 @@ extension ForkedActor {
rightOutput: rightOutput
)
}

}
4 changes: 2 additions & 2 deletions Sources/Fork/ForkedArray.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public struct ForkedArray<Value, Output> {
/// - Parameters:
/// - array: The `Array` to be used in creating the output
/// - filter: An `async` closure that determines if the value should be used or not
/// - output: An `async` closure that uses the `Array.Element` as its input
/// - map: An `async` closure that uses the `Array.Element` as its input
public init(
_ array: [Value],
filter: @escaping (Value) async throws -> Bool,
Expand Down Expand Up @@ -60,7 +60,7 @@ extension ForkedArray {
/// Create a ``ForkedArray`` using a single `Array`
/// - Parameters:
/// - array: The `Array` to be used in creating the output
/// - output: An `async` closure that uses the `Array.Element` as its input
/// - map: An `async` closure that uses the `Array.Element` as its input
public init(
_ array: [Value],
map: @escaping (Value) async throws -> Output
Expand Down
Loading

0 comments on commit 58c39d9

Please sign in to comment.