From c72184ed5fe1bbac98ef396d179e3bde4e8bec06 Mon Sep 17 00:00:00 2001 From: Michael Law <1365977+lawmicha@users.noreply.github.com> Date: Mon, 20 Nov 2023 14:44:38 -0500 Subject: [PATCH] fix(DataStore): Reconcile mutation responses from conflict handler path (#3370) --- .../OutgoingMutationQueue.swift | 3 +- ...ocessMutationErrorFromCloudOperation.swift | 30 +++++++++++++++---- ...MutationErrorFromCloudOperationTests.swift | 7 ++++- 3 files changed, 32 insertions(+), 8 deletions(-) diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift index 2fa24be34a..2acb52f892 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift @@ -282,7 +282,8 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior { api: api, storageAdapter: storageAdapter, graphQLResponseError: graphQLResponseError, - apiError: apiError + apiError: apiError, + reconciliationQueue: reconciliationQueue ) { [weak self] result in guard let self = self else { return diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/ProcessMutationErrorFromCloudOperation.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/ProcessMutationErrorFromCloudOperation.swift index 6b7894226d..2e4528a2e6 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/ProcessMutationErrorFromCloudOperation.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/ProcessMutationErrorFromCloudOperation.swift @@ -27,13 +27,15 @@ class ProcessMutationErrorFromCloudOperation: AsynchronousOperation { private let completion: (Result) -> Void private var mutationOperation: AtomicValue>?> private weak var api: APICategoryGraphQLBehaviorExtended? - + private weak var reconciliationQueue: IncomingEventReconciliationQueue? + init(dataStoreConfiguration: DataStoreConfiguration, mutationEvent: MutationEvent, api: APICategoryGraphQLBehaviorExtended, storageAdapter: StorageEngineAdapter, graphQLResponseError: GraphQLResponseError>? = nil, apiError: APIError? = nil, + reconciliationQueue: IncomingEventReconciliationQueue? = nil, completion: @escaping (Result) -> Void) { self.dataStoreConfiguration = dataStoreConfiguration self.mutationEvent = mutationEvent @@ -41,6 +43,7 @@ class ProcessMutationErrorFromCloudOperation: AsynchronousOperation { self.storageAdapter = storageAdapter self.graphQLResponseError = graphQLResponseError self.apiError = apiError + self.reconciliationQueue = reconciliationQueue self.completion = completion self.mutationOperation = AtomicValue(initialValue: nil) @@ -311,12 +314,27 @@ class ProcessMutationErrorFromCloudOperation: AsynchronousOperation { if case .failure(let error) = cloudResult { dataStoreConfiguration.errorHandler(error) } - - if case .success(let response) = cloudResult, - case .failure(let error) = response { - dataStoreConfiguration.errorHandler(error) + + if case let .success(graphQLResponse) = cloudResult { + if case .failure(let error) = graphQLResponse { + dataStoreConfiguration.errorHandler(error) + } else if case let .success(graphQLResult) = graphQLResponse { + guard let reconciliationQueue = reconciliationQueue else { + let dataStoreError = DataStoreError.configuration( + "reconciliationQueue is unexpectedly nil", + """ + The reference to reconciliationQueue has been released while an ongoing mutation was being processed. + \(AmplifyErrorMessages.reportBugToAWS()) + """ + ) + finish(result: .failure(dataStoreError)) + return + } + + reconciliationQueue.offer([graphQLResult], modelName: mutationEvent.modelName) + } } - + finish(result: .success(nil)) } diff --git a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/MutationQueue/ProcessMutationErrorFromCloudOperationTests.swift b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/MutationQueue/ProcessMutationErrorFromCloudOperationTests.swift index 12afd41348..2fe9e2fe98 100644 --- a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/MutationQueue/ProcessMutationErrorFromCloudOperationTests.swift +++ b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/MutationQueue/ProcessMutationErrorFromCloudOperationTests.swift @@ -24,6 +24,7 @@ class ProcessMutationErrorFromCloudOperationTests: XCTestCase { var storageAdapter: StorageEngineAdapter! var localPost = Post(title: "localTitle", content: "localContent", createdAt: .now()) let queue = OperationQueue() + let reconciliationQueue = MockReconciliationQueue() override func setUp() async throws { await tryOrFail { @@ -573,12 +574,13 @@ class ProcessMutationErrorFromCloudOperationTests: XCTestCase { expectConflicthandlerCalled.fulfill() resolve(.retryLocal) }) - + let operation = ProcessMutationErrorFromCloudOperation(dataStoreConfiguration: configuration, mutationEvent: mutationEvent, api: mockAPIPlugin, storageAdapter: storageAdapter, graphQLResponseError: graphQLResponseError, + reconciliationQueue: reconciliationQueue, completion: completion) queue.addOperation(operation) @@ -656,6 +658,7 @@ class ProcessMutationErrorFromCloudOperationTests: XCTestCase { api: mockAPIPlugin, storageAdapter: storageAdapter, graphQLResponseError: graphQLResponseError, + reconciliationQueue: reconciliationQueue, completion: completion) queue.addOperation(operation) @@ -950,6 +953,7 @@ class ProcessMutationErrorFromCloudOperationTests: XCTestCase { api: mockAPIPlugin, storageAdapter: storageAdapter, graphQLResponseError: graphQLResponseError, + reconciliationQueue: reconciliationQueue, completion: completion) queue.addOperation(operation) @@ -1029,6 +1033,7 @@ class ProcessMutationErrorFromCloudOperationTests: XCTestCase { api: mockAPIPlugin, storageAdapter: storageAdapter, graphQLResponseError: graphQLResponseError, + reconciliationQueue: reconciliationQueue, completion: completion) queue.addOperation(operation)