Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throttle refactor #157

Merged
merged 10 commits into from
Nov 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 134 additions & 98 deletions Sources/Afluent/SequenceOperators/ThrottleSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,139 +18,175 @@ extension AsyncSequences {
let latest: Bool

public struct AsyncIterator: AsyncIteratorProtocol {
typealias Instant = C.Instant

var upstream: Upstream
let interval: C.Duration
let clock: C
let latest: Bool
init(upstream: Upstream, interval: C.Duration, clock: C, latest: Bool) {
self.upstream = upstream
self.interval = interval
self.clock = clock
self.latest = latest
self.state = State()
}

var iterator: AsyncThrowingStream<(Element?, Element?), Error>.Iterator?
let state = State()
private let upstream: Upstream
private let interval: C.Duration
private let clock: C
private let latest: Bool
private let state: State
private var iterationTask: Task<Void, Never>?

public mutating func next() async throws -> Element? {
try Task.checkCancellation()

if iterator == nil {
iterator = AsyncThrowingStream<(Element?, Element?), Error> {
[clock, interval, upstream, state] continuation in

let intervalTask = DeferredTask {
guard let intervalStartInstant = state.startInstant else { return }
state.hasStartedInterval = true
if await state.finished {
return nil
}

let intervalEndInstant = intervalStartInstant.advanced(by: interval)
try await clock.sleep(until: intervalEndInstant, tolerance: nil)
return try await nextUpstreamElement()
}

let firstElement = state.firstElement
let latestElement = state.latestElement
private mutating func nextUpstreamElement() async throws -> Element? {
await startIterationIfNecessary()
try await waitForNextInterval()
while true {
await Task.yield()
try Task.checkCancellation()
if let nextElement = await state.consumeNextElement(at: clock.now) {
switch nextElement {
case .emitted(let element):
return element
case .error(let error):
cancelTask()
throw error
case .finished:
cancelTask()
return nil
}
}
}
}

continuation.yield((firstElement, latestElement))
private mutating func cancelTask() {
self.iterationTask?.cancel()
}

state.firstElement = nil
state.hasStartedInterval = false
}
private mutating func startIterationIfNecessary() async {
guard iterationTask == nil else { return }

let iterationTask = Task {
do {
try Task.checkCancellation()
for try await el in upstream {
try Task.checkCancellation()
if !state.hasSeenFirstElement {
continuation.yield((el, el))
state.hasSeenFirstElement = true
continue
}
if state.firstElement == nil {
state.startInstant = clock.now
state.firstElement = el
intervalTask.run()
}
state.latestElement = el
}
if state.hasStartedInterval {
continuation.yield((state.firstElement, state.latestElement))
}
continuation.finish()
} catch {
if state.hasStartedInterval {
continuation.yield((state.firstElement, state.latestElement))
}
continuation.finish(throwing: error)
}
}
let upstream = self.upstream
let latest = self.latest
let state = self.state

continuation.onTermination = { _ in
// Clean up any running tasks if the upstream is terminated.
// We are unable to write a test to specifically target this behavior but it should be kept in place to ensure there are no breaking edge cases.
intervalTask.cancel()
iterationTask.cancel()
self.iterationTask = Task {
do {
for try await element in upstream {
async let _ = state.setNext(element: element, useLatest: latest)
}

}.makeAsyncIterator()
await state.setFinish()
} catch {
await state.setError(error)
}
}
await Task.yield()
}

while let (firstElement, latestElement) = try await iterator?.next() {
try Task.checkCancellation()
return latest ? latestElement : firstElement
private func waitForNextInterval() async throws {
guard let lastElementEmittedInstant = await state.lastElementEmittedInstant else {
return
}

return nil
let nextInstant = lastElementEmittedInstant.advanced(by: interval)
try await clock.sleep(until: nextInstant, tolerance: nil)
await Task.yield()
}
}

public func makeAsyncIterator() -> AsyncIterator {
AsyncIterator(upstream: upstream, interval: interval, clock: clock, latest: latest)
AsyncIterator(
upstream: upstream,
interval: interval,
clock: clock,
latest: latest)
}
}
}

@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, visionOS 1.0, *)
extension AsyncSequence where Self: Sendable, Element: Sendable {
/// Emits either the first or latest element received during a specified amount of time.
/// - Parameter interval: The interval of time in which to observe and emit either the first or latest element.
/// - Parameter latest: If `true`, emits the latest element in the time interval. If `false`, emits the first element in the time interval.
/// - Note: The first element in upstream will always be returned immediately. Once a second element is received, then the clock will begin for the given time interval and return the first or latest element once completed.
public func throttle<C: Clock>(for interval: C.Duration, clock: C, latest: Bool)
-> AsyncSequences.Throttle<Self, C>
{
AsyncSequences.Throttle(upstream: self, interval: interval, clock: clock, latest: latest)
extension AsyncSequences.Throttle.AsyncIterator {
/// An event from the upstream sequence.
private enum ElementEvent {
case emitted(Element)
case error(Error)
case finished

/// Returns `true` if either `finished` or `error`.
var isFinished: Bool {
switch self {
case .emitted: return false
case .error: return true
case .finished: return true
}
}
}
}

@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, visionOS 1.0, *)
extension AsyncSequences.Throttle {
class State: @unchecked Sendable {
private var _hasSeenFirstElement: Bool = false
var hasSeenFirstElement: Bool {
get { lock.protect { _hasSeenFirstElement } }
set { lock.protect { _hasSeenFirstElement = newValue } }
private actor State: Sendable {
/// Sets the next element as "finished", overwriting any currently set element.
func setFinish() {
self._nextElement = .finished
}

private var _hasStartedInterval: Bool = false
var hasStartedInterval: Bool {
get { lock.protect { _hasStartedInterval } }
set { lock.protect { _hasStartedInterval = newValue } }
/// When an error occurs, sets the next element as an error, overwriting any currently set element.
func setError(_ error: Error) {
self._nextElement = .error(error)
}

private var _firstElement: Element?
var firstElement: Element? {
get { lock.protect { _firstElement } }
set { lock.protect { _firstElement = newValue } }
/// Sets the next element.
/// If using latest, this element will be set for staging.
/// If _not_ using latest, the element will be set for staging if no other element is already set.
func setNext(element: Element, useLatest: Bool) {
if useLatest {
self._nextElement = .emitted(element)
} else if self._nextElement == nil {
self._nextElement = .emitted(element)
}
}

/// Consumes the element that's next, if present.
/// If no element is present, then `nil` is returned, meaning we're still waiting on an event from the upstream.
/// Calling this function also sets the `lastElementEmittedInstant` to the passed instant.
func consumeNextElement(at instant: C.Instant) -> ElementEvent? {
guard let nextElement = self._nextElement else {
return nil
}
self._nextElement = nil
self._finished = nextElement.isFinished
self._lastElementEmittedInstant = instant
return nextElement
}

private var _latestElement: Element?
var latestElement: Element? {
get { lock.protect { _latestElement } }
set { lock.protect { _latestElement = newValue } }
/// The last instant an element was emitted, if an element has already been emitted.
/// This value is `nil` if no element has been emitted yet.
var lastElementEmittedInstant: C.Instant? {
_lastElementEmittedInstant
}

private var _startInstant: C.Instant?
var startInstant: C.Instant? {
get { lock.protect { _startInstant } }
set { lock.protect { _startInstant = newValue } }
/// Indicates whether the upstream has finished with `nil` or an error.
var finished: Bool {
_finished
}

private let lock = NSRecursiveLock()
private var _nextElement: ElementEvent?
private var _lastElementEmittedInstant: C.Instant?
private var _finished = false
}
}

@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, visionOS 1.0, *)
extension AsyncSequence where Self: Sendable, Element: Sendable {
/// Emits either the first or latest element received during a specified amount of time.
/// - Parameter interval: The interval of time in which to observe and emit either the first or latest element.
/// - Parameter latest: If `true`, emits the latest element in the time interval. If `false`, emits the first element in the time interval.
/// - Note: The first element in upstream will always be returned immediately. Once a second element is received, then the clock will begin for the given time interval and return the first or latest element once completed.
public func throttle<C: Clock>(for interval: C.Duration, clock: C, latest: Bool)
-> AsyncSequences.Throttle<Self, C>
{
AsyncSequences.Throttle(upstream: self, interval: interval, clock: clock, latest: latest)
}
}
Loading