From 58c39d96cbf69cbbe9a5ea881936476fb8e8384c Mon Sep 17 00:00:00 2001 From: Zach Date: Fri, 23 Jun 2023 13:43:05 -0600 Subject: [PATCH] Add BatchedForkedArray and other changes (#17) --- README.md | 72 ++++++--- Sources/Fork/BatchedForkedArray.swift | 88 +++++++++++ .../Sequence+BatchedForkedArray.swift | 72 +++++++++ Sources/Fork/Fork.swift | 4 +- Sources/Fork/ForkedActor.swift | 1 - Sources/Fork/ForkedArray.swift | 4 +- Tests/ForkTests/BatchedForkedArrayTests.swift | 149 ++++++++++++++++++ Tests/ForkTests/ForkTests.swift | 31 +++- Tests/ForkTests/ForkedArrayTests.swift | 1 - 9 files changed, 395 insertions(+), 27 deletions(-) create mode 100644 Sources/Fork/BatchedForkedArray.swift create mode 100644 Sources/Fork/Extensions/Sequence+BatchedForkedArray.swift create mode 100644 Tests/ForkTests/BatchedForkedArrayTests.swift diff --git a/README.md b/README.md index 7c08126..bf1d04a 100644 --- a/README.md +++ b/README.md @@ -47,9 +47,10 @@ When using Fork, functions will be ran in parallel and higher order forks will a ## Objects - `Fork`: Using a single input create two separate async functions that return `LeftOutput` and `RightOutput`. +- `ForkedArray`: Using a single array and a single async function, parallelize the work for each value of the array. +- `BatchedForkedArray`: Using a single array and a single async function, batch the parallelized work for each value of the array - `ForkedActor`: Using a single actor create two separate async functions that are passed the actor. - `KeyPathActor`: A generic Actor that uses KeyPaths to update and set values. -- `ForkedArray`: Using a single array and a single async function, parallelize the work for each value of the array. ## Basic usage @@ -72,21 +73,64 @@ let rightOutput = try await fork.right() XCTAssertEqual(leftOutput, true) XCTAssertEqual(rightOutput, "10") -let mergedFork: () async throws -> String = fork.merge( - using: { bool, string in - if bool { - return string + string - } - - return string +let output: String = try await fork.merged { bool, string in + if bool { + return string + string } -) + + return string +} let output = await mergedFork() XCTAssertEqual(output, "1010") ``` +## ForkedArray Example + +A ForkedArray makes it easy to perform an asynchronous function on all of the elements in an Array. ForkedArray helps with the [example](#why-use-fork) above. + +```swift +let forkedArray = ForkedArray(photoNames, map: downloadPhoto(named:)) +let photos = try await forkedArray.output() +``` + +## BatchedForkedArray + +The `BatchedForkedArray` allows you to efficiently parallelize and batch process an array of values using an async function. It provides methods for both resolving the parallelized array in a single output as well as streaming the batches of the resolved array. + + +```swift +let batchedForkedArray = BatchedForkedArray(photoNames, batch: 3, map: downloadPhoto(named:)) +let photos = try await forkedArray.output() +``` + +In the above example, we create an instance of `BatchedForkedArray` with a batch size of 3 and the downloadPhoto function as the map closure. + +To resolve the batched array, we use the `output()` method, which executes the downloadPhoto function on each batch of photo names in parallel. After the resolution is complete, the `photos` array will contain the downloaded photos in the order they were processed. + + +```swift +let photoNames = [Int](0 ..< 100) + +let batchedForkedArray = BatchedForkedArray( + photoNames, + batch: 5, + map: downloadPhoto(named:) +) + +for try await batch in batchedForkedArray.stream() { + for photo in batch { + // Perform operations on each photo in the batch + print(photo) + } +} +``` + +In this example, we create an instance of `BatchedForkedArray` with a batch size of 5 and the `downloadPhoto(named:)` function as the map closure. By using the `stream()` method, we can iterate over batches of photo names asynchronously. + +Within the for-await loop, each batch of photo names is processed asynchronously. We then iterate over each photo in the batch and perform operations accordingly. This allows for efficient processing of large datasets in batches while controlling the number of parallel processes running at once. + ## ForkedActor Example ```swift @@ -143,16 +187,6 @@ let actorValue = try await forkedActor.act().value XCTAssertEqual(actorValue, 3) ``` -## ForkedArray Example - -A ForkedArray makes it easy to perform an asynchronous function on all of the elements in an Array. ForkedArray helps with the [example](#why-use-fork) above. - -```swift -let forkedArray = ForkedArray(photoNames, output: downloadPhoto(named:)) -let photos = try await forkedArray.output() -``` - - ## Extra Examples - [Vapor ForkedActor Example](https://github.com/0xLeif/VaporForkDemo) diff --git a/Sources/Fork/BatchedForkedArray.swift b/Sources/Fork/BatchedForkedArray.swift new file mode 100644 index 0000000..40a37ad --- /dev/null +++ b/Sources/Fork/BatchedForkedArray.swift @@ -0,0 +1,88 @@ +/// Using a single array and a single async function, batch the parallelized work for each value of the array +public struct BatchedForkedArray { + private let batchedArray: [[Value]] + private let filter: (Value) async throws -> Bool + private let map: (Value) async throws -> Output + + /// Create a ``BatchedForkedArray`` using a single `Array` + /// - Parameters: + /// - array: The `Array` to be used in creating the output + /// - batch: The number of elements to batch together. (The minimum value is 1) + /// - filter: An `async` closure that determines if the value should be used or not + /// - map: An `async` closure that uses the `Array.Element` as its input + public init( + _ array: [Value], + batch: UInt, + filter: @escaping (Value) async throws -> Bool, + map: @escaping (Value) async throws -> Output + ) { + var index: Int = 0 + let batchLimit: UInt = max(batch, 1) + let batchedArray: [[Value]] + + batchedArray = array.reduce(into: []) { partialResult, value in + guard partialResult.isEmpty == false else { + partialResult.append([value]) + return + } + + guard partialResult[index].count < batchLimit else { + partialResult.append([value]) + return index += 1 + } + + partialResult[index].append(value) + } + + self.batchedArray = batchedArray + self.filter = filter + self.map = map + } + + /// Asynchronously resolve the forked array + /// + /// - Returns: The resolved array after performing the batched operations + public func output() async throws -> [Output] { + var batchedOutput: [[Output]] = [] + + for batch in batchedArray { + let batchedValues = try await batch.asyncFilter(filter).asyncMap(map) + + batchedOutput.append(batchedValues) + } + + return batchedOutput.flatMap(identity) + } + + /// Stream the forked array asynchronously + /// + /// - Returns: An AsyncThrowingStream object that yields batches of the resolved array + public func stream() -> AsyncThrowingStream<[Output], Error> { + AsyncThrowingStream { continuation in + Task { + for batch in batchedArray { + let batchedValues = try await batch.asyncFilter(filter).asyncMap(map) + + continuation.yield(batchedValues) + } + + continuation.finish() + } + } + } +} + +extension BatchedForkedArray { + /// Create a ``BatchedForkedArray`` using a single `Array` + /// - Parameters: + /// - array: The `Array` to be used in creating the output + /// - batch: The number of elements to batch together. (The minimum value is 1) + /// - map: An `async` closure that uses the `Array.Element` as its input + public init( + _ array: [Value], + batch: UInt, + map: @escaping (Value) async throws -> Output + ) { + self.init(array, batch: batch, filter: { _ in true }, map: map) + } +} diff --git a/Sources/Fork/Extensions/Sequence+BatchedForkedArray.swift b/Sources/Fork/Extensions/Sequence+BatchedForkedArray.swift new file mode 100644 index 0000000..5473387 --- /dev/null +++ b/Sources/Fork/Extensions/Sequence+BatchedForkedArray.swift @@ -0,0 +1,72 @@ +extension Sequence { + /// Create a ``BatchedForkedArray`` from the current `Sequence` + public func batchedFork( + batch: UInt, + filter: @escaping (Element) async throws -> Bool, + map: @escaping (Element) async throws -> Output + ) -> BatchedForkedArray { + BatchedForkedArray( + Array(self), + batch: batch, + filter: filter, + map: map + ) + } + + /// Create a ``BatchedForkedArray`` from the current `Sequence` + public func batchedFork( + batch: UInt, + map: @escaping (Element) async throws -> Output + ) -> BatchedForkedArray { + batchedFork(batch: batch, filter: { _ in true }, map: map) + } + + /// Create a ``BatchedForkedArray`` from the current `Sequence` and get the Output Array + public func batchedForked( + batch: UInt, + filter: @escaping (Element) async throws -> Bool, + map: @escaping (Element) async throws -> Output + ) async throws -> [Output] { + try await fork(filter: filter, map: map).output() + } + + /// Create a ``BatchedForkedArray`` from the current `Sequence` and get the Output Array + public func batchedForked( + batch: UInt, + map: @escaping (Element) async throws -> Output + ) async throws -> [Output] { + try await batchedForked(batch: batch, filter: { _ in true }, map: map) + } + + /// Returns an array containing the results of mapping the given closure over the sequence’s elements. + public func asyncBatchedMap( + batch: UInt, + _ transform: @escaping (Element) async throws -> Output + ) async throws -> [Output] { + try await batchedFork(batch: batch, map: transform).output() + } + + /// Returns an array containing the results, that aren't nil, of mapping the given closure over the sequence’s elements. + public func asyncBatchedCompactMap( + batch: UInt, + _ transform: @escaping (Element) async throws -> Output? + ) async throws -> [Output] { + try await batchedFork(batch: batch, map: transform).output().compactMap { $0 } + } + + /// Returns an array containing only the true results from the given closure over the sequence’s elements. + public func asyncBatchedFilter( + batch: UInt, + _ isIncluded: @escaping (Element) async throws -> Bool + ) async throws -> [Element] { + try await batchedFork(batch: batch, filter: isIncluded, map: identity).output() + } + + /// Calls the given closure for each of the elements in the Sequence. This function uses ``BatchedForkedArray`` and will be parallelized when possible. + public func asyncBatchedForEach( + batch: UInt, + _ transform: @escaping (Element) async throws -> Void + ) async throws { + _ = try await asyncBatchedMap(batch: batch, transform) + } +} diff --git a/Sources/Fork/Fork.swift b/Sources/Fork/Fork.swift index 15bd21d..15b8766 100644 --- a/Sources/Fork/Fork.swift +++ b/Sources/Fork/Fork.swift @@ -140,12 +140,12 @@ extension Fork { public func merged() async throws where LeftOutput == Void, RightOutput == Void { try await merged(using: { _, _ in () }) } - + /// Merge the ``Fork`` and return the `RightOutput` when `LeftOutput` is Void public func merged() async throws -> RightOutput where LeftOutput == Void { try await merged(using: { _, output in output }) } - + /// Merge the ``Fork`` and return the `LeftOutput` when `RightOutput` is Void public func merged() async throws -> LeftOutput where RightOutput == Void { try await merged(using: { output, _ in output }) diff --git a/Sources/Fork/ForkedActor.swift b/Sources/Fork/ForkedActor.swift index 7132498..b84adc5 100644 --- a/Sources/Fork/ForkedActor.swift +++ b/Sources/Fork/ForkedActor.swift @@ -59,5 +59,4 @@ extension ForkedActor { rightOutput: rightOutput ) } - } diff --git a/Sources/Fork/ForkedArray.swift b/Sources/Fork/ForkedArray.swift index 9b1646f..10fb21d 100644 --- a/Sources/Fork/ForkedArray.swift +++ b/Sources/Fork/ForkedArray.swift @@ -17,7 +17,7 @@ public struct ForkedArray { /// - Parameters: /// - array: The `Array` to be used in creating the output /// - filter: An `async` closure that determines if the value should be used or not - /// - output: An `async` closure that uses the `Array.Element` as its input + /// - map: An `async` closure that uses the `Array.Element` as its input public init( _ array: [Value], filter: @escaping (Value) async throws -> Bool, @@ -60,7 +60,7 @@ extension ForkedArray { /// Create a ``ForkedArray`` using a single `Array` /// - Parameters: /// - array: The `Array` to be used in creating the output - /// - output: An `async` closure that uses the `Array.Element` as its input + /// - map: An `async` closure that uses the `Array.Element` as its input public init( _ array: [Value], map: @escaping (Value) async throws -> Output diff --git a/Tests/ForkTests/BatchedForkedArrayTests.swift b/Tests/ForkTests/BatchedForkedArrayTests.swift new file mode 100644 index 0000000..18ddc3d --- /dev/null +++ b/Tests/ForkTests/BatchedForkedArrayTests.swift @@ -0,0 +1,149 @@ +import XCTest +@testable import Fork + +final class BatchedForkedArrayTests: XCTestCase { + func testBatchedForkedArrayOutput_x() async throws { + let photoNames = [Int](0 ..< 100) + + let batchedForkedArray = BatchedForkedArray( + photoNames, + batch: 5, + map: { "\($0)" } + ) + + let batchedArray = try await batchedForkedArray.output() + + XCTAssertEqual(batchedArray.count, photoNames.count) + } + + func testBatchedForkedArrayStream_x() async throws { + let photoNames = [Int](0 ..< 100) + + let batchedForkedArray = photoNames.batchedFork( + batch: 5, + map: { "\($0)" } + ) + + for try await batch in batchedForkedArray.stream() { + XCTAssertEqual(batch.count, 5) + } + } + + func testBatchedForkedArray() async throws { + let photoNames: [String] = (0 ... Int.random(in: 1 ..< 10)).map(\.description) + @Sendable func downloadPhoto(named: String) async -> String { named } + func show(_ photos: [String]) { } + + let forkedArray = BatchedForkedArray( + photoNames, + batch: 5, + map: downloadPhoto(named:) + ) + let photos = try await forkedArray.output() + + XCTAssertEqual(photos, photoNames) + } + + func testBatchedForkedArray_ForEach() async throws { + try await [ + "Hello", " ", // First batch + "World", "!" // Second batch + ] + .asyncBatchedForEach(batch: 2) { print($0) } + } + + func testBatchedForkedArray_none() async throws { + let photoNames: [String] = [] + @Sendable func downloadPhoto(named: String) async -> String { named } + func show(_ photos: [String]) { } + + let forkedArray = BatchedForkedArray(photoNames, batch: 5, map: downloadPhoto(named:)) + let photos = try await forkedArray.output() + + XCTAssertEqual(photos, photoNames) + } + + func testBatchedForkedArray_one() async throws { + let photoNames = ["one"] + @Sendable func isValidPhoto(named: String) async -> Bool { true } + + let photos = try await photoNames.asyncBatchedFilter(batch: 0, isValidPhoto(named:)) + + XCTAssertEqual(photos, photoNames) + } + + func testBatchedForkedArray_two() async throws { + let photoNames = ["one", "two"] + @Sendable func downloadPhoto(named: String) async -> String { named } + + let photos = try await photoNames.batchedForked( + batch: 2, + map: downloadPhoto(named:) + ) + + XCTAssertEqual(photos, photoNames) + } + + func testBatchedForkedArray_three() async throws { + let photoNames = ["one", "two", "three"] + @Sendable func downloadPhoto(named: String) async -> String { named } + + let photos = try await photoNames.asyncBatchedMap(batch: 2, downloadPhoto(named:)) + XCTAssertEqual(photos, photoNames) + } + + func testBatchedForkedArray_x() async throws { + let photoNames = (0 ... Int.random(in: 3 ..< 100)).map(\.description) + @Sendable func downloadPhoto(named: String) async -> String { named } + + let forkedArray = photoNames.batchedFork(batch: 10, map: downloadPhoto(named:)) + let photos = try await forkedArray.output() + + XCTAssertEqual(photos, photoNames) + } + + func testBatchedForkedArrayCompactMap_x() async throws { + let photoNames = [Int](0 ..< 100) + @Sendable func asyncFilter(number: Int) async -> String? { + guard number.isMultiple(of: 2) else { return nil } + + return number.description + } + + let compactedArray = try await photoNames.asyncBatchedCompactMap(batch: 10, asyncFilter(number:)) + + XCTAssertEqual(compactedArray.count, photoNames.count / 2) + } + + func testBatchedForkedArray_order() async throws { + let photoNames = ["Hello", " ", "World", "!"] + @Sendable func downloadPhoto(named: String) async -> String { named } + + let forkedArray = photoNames.batchedFork(batch: 2, map: downloadPhoto(named:)) + let photos = try await forkedArray.output() + + XCTAssertEqual(photos, photoNames) + } + + func testBatchedForkedArraySet() async throws { + let set = Set(0 ..< 9) + + let outputArray = try await set.asyncBatchedMap(batch: 3, identity) + + XCTAssertEqual(outputArray, Array(set)) + } + + func testBatchedForkedArrayDictionary() async throws { + let dictionary: [String: String] = [:] + + let outputArray = try await dictionary.batchedForked( + batch: 1, + filter: { (key: String, value: String) in + return true + }, + map: identity + ) + + XCTAssert(type(of: outputArray) == [Dictionary.Element].self) + } +} diff --git a/Tests/ForkTests/ForkTests.swift b/Tests/ForkTests/ForkTests.swift index 426ed7c..a545b33 100644 --- a/Tests/ForkTests/ForkTests.swift +++ b/Tests/ForkTests/ForkTests.swift @@ -79,7 +79,7 @@ final class ForkTests: XCTestCase { leftOutput: { () }, rightOutput: { expectedValue } ) - .merged() + .merged() XCTAssertEqual(value, expectedValue) } @@ -90,8 +90,35 @@ final class ForkTests: XCTestCase { leftOutput: { expectedValue }, rightOutput: { () } ) - .merged() + .merged() XCTAssertEqual(value, expectedValue) } + + func testForkCancel() async throws { + let fork = Fork( + leftOutput: { + try await Task.sleep(nanoseconds: 100_000_000) + }, + rightOutput: { + try await Task.sleep(nanoseconds: 100_000_000) + } + ) + + let forkedTask = Task { + do { + try await fork.merged { _, _ in + XCTFail() + } + } catch is CancellationError { + XCTAssert(true) + } catch { + XCTFail() + } + } + + forkedTask.cancel() + + await forkedTask.value + } } diff --git a/Tests/ForkTests/ForkedArrayTests.swift b/Tests/ForkTests/ForkedArrayTests.swift index 2063ebb..f7f05e5 100644 --- a/Tests/ForkTests/ForkedArrayTests.swift +++ b/Tests/ForkTests/ForkedArrayTests.swift @@ -56,7 +56,6 @@ class ForkedArrayTests: XCTestCase { @Sendable func downloadPhoto(named: String) async -> String { named } let photos = try await photoNames.asyncMap(downloadPhoto(named:)) - XCTAssertEqual(photos, photoNames) }