Skip to content

Commit

Permalink
fix(datastore): observequery local only collect (#3214)
Browse files Browse the repository at this point in the history
  • Loading branch information
atierian authored Dec 11, 2023
1 parent d3af5f9 commit 1994e14
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ extension StorageEngine {
}
}

/// Expresses whether the `StorageEngine` syncs from a remote source
/// based on whether the `AWSAPIPlugin` is present.
var syncsFromRemote: Bool {
tryGetAPIPlugin() != nil
}

private func tryGetAPIPlugin() -> APICategoryPlugin? {
do {
return try Amplify.API.getPlugin(for: validAPIPluginKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,7 @@ protocol StorageEngineBehavior: AnyObject, ModelStorageBehavior {
func startSync() -> Result<SyncEngineInitResult, DataStoreError>
func stopSync(completion: @escaping DataStoreCallback<Void>)
func clear(completion: @escaping DataStoreCallback<Void>)

/// expresses whether the conforming type is syncing from a remote source.
var syncsFromRemote: Bool { get }
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,26 @@ class ObserveQueryTaskRunner<M: Model>: InternalTaskRunner, InternalTaskAsyncThr
func subscribeToItemChanges() {
serialQueue.async { [weak self] in
guard let self = self else { return }

self.batchItemsChangedSink = self.dataStorePublisher.publisher
.filter { _ in !self.dispatchedModelSyncedEvent.get() }
.filter(self.filterByModelName(mutationEvent:))
.filter(self.filterByPredicateMatch(mutationEvent:))
.handleEvents(receiveOutput: self.onItemChangeDuringSync(mutationEvent:) )
.collect(.byTimeOrCount(self.serialQueue, self.itemsChangedPeriodicPublishTimeInSeconds, self.itemsChangedMaxSize))
.collect(
.byTimeOrCount(
// on queue
self.serialQueue,
// collect over this timeframe
self.itemsChangedPeriodicPublishTimeInSeconds,
// If the `storageEngine` does sync from remote, the initial batch should
// collect snapshots based on time / snapshots received.
// If it doesn't, it should publish each snapshot without waiting.
self.storageEngine.syncsFromRemote
? self.itemsChangedMaxSize
: 1
)
)
.sink(receiveCompletion: self.onReceiveCompletion(completed:),
receiveValue: self.onItemsChangeDuringSync(mutationEvents:))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@ class MockStorageEngineBehavior: StorageEngineBehavior {

}

var syncsFromRemote: Bool { true }

var mockSyncEnginePublisher: PassthroughSubject<RemoteSyncEngineEvent, DataStoreError>!
var mockSyncEngineSubscription: AnyCancellable! {
willSet {
Expand Down

0 comments on commit 1994e14

Please sign in to comment.