Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(datastore): update pending mutation events version from mutation response #3452

Merged
merged 2 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -249,16 +249,22 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
guard let reconciliationQueue = reconciliationQueue else {
let dataStoreError = DataStoreError.configuration(
"reconciliationQueue is unexpectedly nil",
"""
The reference to reconciliationQueue has been released while an ongoing mutation was being processed.
\(AmplifyErrorMessages.reportBugToAWS())
"""
"""
The reference to reconciliationQueue has been released while an ongoing mutation was being processed.
\(AmplifyErrorMessages.reportBugToAWS())
"""
)
stateMachine.notify(action: .errored(dataStoreError))
return
}
reconciliationQueue.offer([mutationSync], modelName: mutationEvent.modelName)
completeProcessingEvent(mutationEvent, mutationSync: mutationSync)
MutationEvent.reconcilePendingMutationEventsVersion(
sent: mutationEvent,
received: mutationSync,
storageAdapter: storageAdapter
) { _ in
self.completeProcessingEvent(mutationEvent, mutationSync: mutationSync)
}
} else {
completeProcessingEvent(mutationEvent)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,33 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
}
}

/// Always retrieve and use the largest version when available. The source of the version comes
/// from either the MutationEvent itself, which represents the queue request, or the persisted version
/// from the metadata table.
///
/// **Version in the Mutation Event**. If there are mulitple mutation events pending, each outgoing
/// mutation processing will result in synchronously updating the pending mutation's version
/// before enqueuing the mutation response for reconciliation.
///
/// **Version persisted in the metadata table**: Reconciliation will persist the latest version in the
/// metadata table. In cases of quick consecutive updates, the MutationEvent's version could
/// be greater than the persisted since the MutationEvent is updated from the original thread that
/// processed the outgoing mutation.
private func getLatestVersion(_ mutationEvent: MutationEvent) -> Int? {
5d marked this conversation as resolved.
Show resolved Hide resolved
let latestSyncedMetadataVersion = getLatestSyncMetadata()?.version
let mutationEventVersion = mutationEvent.version
switch (latestSyncedMetadataVersion, mutationEventVersion) {
case let (.some(syncedVersion), .some(version)):
return max(syncedVersion, version)
case let (.some(syncedVersion), .none):
return syncedVersion
case let (.none, .some(version)):
return version
case (.none, .none):
return nil
}
}

/// Creates a GraphQLRequest based on given `mutationType`
/// - Parameters:
/// - mutationType: mutation type
Expand All @@ -112,7 +139,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
mutationType: GraphQLMutationType,
authType: AWSAuthorizationType? = nil
) -> GraphQLRequest<MutationSync<AnyModel>>? {
let latestSyncMetadata = getLatestSyncMetadata()
let version = getLatestVersion(mutationEvent)
var request: GraphQLRequest<MutationSync<AnyModel>>

do {
Expand All @@ -133,7 +160,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
request = GraphQLRequest<MutationSyncResult>.deleteMutation(of: model,
modelSchema: modelSchema,
where: graphQLFilter,
version: latestSyncMetadata?.version)
version: version)
case .update:
let model = try mutationEvent.decodeModel()
guard let modelSchema = ModelRegistry.modelSchema(from: mutationEvent.modelName) else {
Expand All @@ -145,7 +172,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
request = GraphQLRequest<MutationSyncResult>.updateMutation(of: model,
modelSchema: modelSchema,
where: graphQLFilter,
version: latestSyncMetadata?.version)
version: version)
case .create:
let model = try mutationEvent.decodeModel()
guard let modelSchema = ModelRegistry.modelSchema(from: mutationEvent.modelName) else {
Expand All @@ -156,7 +183,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
}
request = GraphQLRequest<MutationSyncResult>.createMutation(of: model,
modelSchema: modelSchema,
version: latestSyncMetadata?.version)
version: version)
}
} catch {
let apiError = APIError.unknown("Couldn't decode model", "", error)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

import Amplify
import Dispatch
import AWSPluginsCore

extension MutationEvent {
// Consecutive operations that modify a model results in a sequence of pending mutation events that
// have the current version of the model. The first mutation event has the correct version of the model,
// while the subsequent events will have lower versions if the first mutation event is successfully synced
// to the cloud. By reconciling the pending mutation events after syncing the first mutation event,
// we attempt to update the pending version to the latest version from the response.
// The before and after conditions for consecutive update scenarios are as below:
// - Save, then immediately update
// Queue Before - [(version: nil, inprocess: true, type: .create),
// (version: nil, inprocess: false, type: .update)]
// Response - [version: 1, type: .create]
// Queue After - [(version: 1, inprocess: false, type: .update)]
// - Save, then immediately delete
// Queue Before - [(version: nil, inprocess: true, type: .create),
// (version: nil, inprocess: false, type: .delete)]
// Response - [version: 1, type: .create]
// Queue After - [(version: 1, inprocess: false, type: .delete)]
// - Save, sync, then immediately update and delete
// Queue Before (After save, sync)
// - [(version: 1, inprocess: true, type: .update), (version: 1, inprocess: false, type: .delete)]
// Response - [version: 2, type: .update]
// Queue After - [(version: 2, inprocess: false, type: .delete)]
//
// For a given model `id`, checks the version of the head of pending mutation event queue
// against the API response version in `mutationSync` and saves it in the mutation event table if
// the response version is a newer one
static func reconcilePendingMutationEventsVersion(sent mutationEvent: MutationEvent,
received mutationSync: MutationSync<AnyModel>,
storageAdapter: StorageEngineAdapter,
completion: @escaping DataStoreCallback<Void>) {
MutationEvent.pendingMutationEvents(
forMutationEvent: mutationEvent,
storageAdapter: storageAdapter
) { queryResult in
switch queryResult {
case .failure(let dataStoreError):
completion(.failure(dataStoreError))
case .success(let localMutationEvents):
guard let existingEvent = localMutationEvents.first else {
completion(.success(()))
return
}

guard let reconciledEvent = reconcile(pendingMutationEvent: existingEvent,
with: mutationEvent,
responseMutationSync: mutationSync) else {
completion(.success(()))
return
}

storageAdapter.save(reconciledEvent, condition: nil, eagerLoad: true) { result in
switch result {
case .failure(let dataStoreError):
completion(.failure(dataStoreError))
case .success:
completion(.success(()))
}
}
}
}
}

static func reconcile(pendingMutationEvent: MutationEvent,
with requestMutationEvent: MutationEvent,
responseMutationSync: MutationSync<AnyModel>) -> MutationEvent? {
// return if version of the pending mutation event is not nil and
// is >= version contained in the response
if pendingMutationEvent.version != nil &&
pendingMutationEvent.version! >= responseMutationSync.syncMetadata.version {
return nil
}

do {
let responseModel = responseMutationSync.model.instance
let requestModel = try requestMutationEvent.decodeModel()

// check if the data sent in the request is the same as the response
// if it is, update the pending mutation event version to the response version
guard let modelSchema = ModelRegistry.modelSchema(from: requestMutationEvent.modelName),
modelSchema.compare(responseModel, requestModel) else {
return nil
}

var pendingMutationEvent = pendingMutationEvent
pendingMutationEvent.version = responseMutationSync.syncMetadata.version
return pendingMutationEvent
} catch {
Amplify.log.verbose("Error decoding models: \(error)")
return nil
}
}

}
Loading
Loading