diff --git a/Sources/ExecutionContext.swift b/Sources/ExecutionContext.swift index 635fc59..cb7f731 100644 --- a/Sources/ExecutionContext.swift +++ b/Sources/ExecutionContext.swift @@ -106,12 +106,8 @@ extension DispatchQueue { /// Schedule given block for execution after given interval passes. /// Scheduled execution can be cancelled by disposing the returned disposable. public func disposableAfter(when interval: Double, block: @escaping () -> Void) -> Disposable { - let disposable = SimpleDisposable() - asyncAfter(deadline: .now() + interval) { - if !disposable.isDisposed { - block() - } - } - return disposable + let workItem = DispatchWorkItem(block: block) + asyncAfter(deadline: .now() + interval, execute: workItem) + return BlockDisposable(workItem.cancel) } } diff --git a/Sources/SignalProtocol+Filtering.swift b/Sources/SignalProtocol+Filtering.swift index a070729..f8a0d3b 100644 --- a/Sources/SignalProtocol+Filtering.swift +++ b/Sources/SignalProtocol+Filtering.swift @@ -374,24 +374,45 @@ extension SignalProtocol { } } - /// Throttle the signal to emit at most one element per given `seconds` interval. + /// Throttle the signal to emit at most one element per given `seconds` interval. Signal will emit latest element from each interval. /// /// Check out interactive example at [https://rxmarbles.com/#throttle](https://rxmarbles.com/#throttle) - public func throttle(for seconds: Double) -> Signal { + public func throttle(for seconds: Double, queue: DispatchQueue = DispatchQueue(label: "com.reactive_kit.signal.throttle")) -> Signal { return Signal { observer in - let lock = NSRecursiveLock(name: "com.reactive_kit.signal.throttle") - var _lastEventTime: DispatchTime? + var isInitialElement = true + var throttledDisposable: Disposable? = nil + var lastElement: Element? = nil + var isFinished: Bool = false return self.observe { event in - switch event { - case .next(let element): - lock.lock(); defer { lock.unlock() } - let now = DispatchTime.now() - if _lastEventTime == nil || now.rawValue > (_lastEventTime! + seconds).rawValue { - _lastEventTime = now - observer.receive(element) + queue.async { + switch event { + case .next(let element): + if isInitialElement { + isInitialElement = false + observer.receive(element) + } else { + lastElement = element + } + guard throttledDisposable == nil else { return } + throttledDisposable = queue.disposableAfter(when: seconds) { + if let element = lastElement { + observer.receive(element) + lastElement = nil + } + if isFinished { + observer.receive(completion: .finished) + } + throttledDisposable = nil + } + case .failed(let error): + observer.receive(completion: .failure(error)) + case .completed: + guard throttledDisposable == nil else { + isFinished = true + return + } + observer.receive(completion: .finished) } - default: - observer.on(event) } } } diff --git a/Tests/ReactiveKitTests/SignalTests.swift b/Tests/ReactiveKitTests/SignalTests.swift index 5a862db..2618b1c 100644 --- a/Tests/ReactiveKitTests/SignalTests.swift +++ b/Tests/ReactiveKitTests/SignalTests.swift @@ -309,6 +309,42 @@ class SignalTests: XCTestCase { XCTAssertTrue(subscriber.isFinished) } + func testDebounce() { + // event 0 @ 0.0s - debounced + // event 1 @ 0.4s - debounced + // event 2 @ 0.8s - debounced + // event 3 @ 1.2s - debounced + // event 4 @ 1.6s - debounced + // timesup @ 2.6s - return 4 + // event 5 @ 3.6s - debounced + // timesup @ 4.6s - return 5 + let values = Signal(sequence: 0..<5, interval: 0.4) + .append(Signal(just: 5, after: 2)) + .debounce(for: 1) + .waitAndCollectElements() + XCTAssertEqual(values, [4, 5]) + } + + func testThrottle() { + // event 0 @ 0.0s - return 0 + // event 1 @ 0.4s - throttled + // event 2 @ 0.8s - throttled + // event 3 @ 1.2s - throttled + // throttle timesup @ 1.5s - return 3 + // event 4 @ 1.6s - throttled + // event 5 @ 2.0s - throttled + // event 6 @ 2.4s - throttled + // event 7 @ 2.8s - throttled + // throttle timesup @ 3.0s - return 7 + // event 8 @ 3.2s - throttled + // event 9 @ 3.6s - throttled + // completed @ 3.6s - return 9 + let values = Signal(sequence: 0..<10, interval: 0.4) + .throttle(for: 1.5) + .waitAndCollectElements() + XCTAssertEqual(values, [0, 3, 7, 9]) + } + func testIgnoreNils() { let subscriber = Subscribers.Accumulator() let publisher = Signal(sequence: Array([1, nil, 3])).ignoreNils()