From b3d20af6787ca5172dae36337d987ddabfdacf1d Mon Sep 17 00:00:00 2001 From: Tyler Roach Date: Tue, 11 Jun 2024 15:15:40 -0400 Subject: [PATCH] DataStore Model Sync Parallelization (#2808) --- aws-datastore/api/aws-datastore.api | 2 + .../datastore/DataStoreConfiguration.java | 67 +++- .../datastore/syncengine/SyncProcessor.java | 29 +- .../datastore/DataStoreConfigurationTest.java | 7 +- .../syncengine/ConcurrentSyncProcessorTest.kt | 287 ++++++++++++++++++ .../testmodels/flat/AmplifyModelProvider.java | 53 ++++ .../testmodels/flat/Model1.java | 185 +++++++++++ .../testmodels/flat/Model2.java | 179 +++++++++++ 8 files changed, 801 insertions(+), 8 deletions(-) create mode 100644 aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/ConcurrentSyncProcessorTest.kt create mode 100644 testmodels/src/main/java/com/amplifyframework/testmodels/flat/AmplifyModelProvider.java create mode 100644 testmodels/src/main/java/com/amplifyframework/testmodels/flat/Model1.java create mode 100644 testmodels/src/main/java/com/amplifyframework/testmodels/flat/Model2.java diff --git a/aws-datastore/api/aws-datastore.api b/aws-datastore/api/aws-datastore.api index 2232919d6d..b319bee8db 100644 --- a/aws-datastore/api/aws-datastore.api +++ b/aws-datastore/api/aws-datastore.api @@ -64,6 +64,7 @@ public final class com/amplifyframework/datastore/DataStoreConfiguration { public fun getSyncExpressions ()Ljava/util/Map; public fun getSyncIntervalInMinutes ()Ljava/lang/Long; public fun getSyncIntervalMs ()Ljava/lang/Long; + public fun getSyncMaxConcurrentModels ()Ljava/lang/Integer; public fun getSyncMaxRecords ()Ljava/lang/Integer; public fun getSyncPageSize ()Ljava/lang/Integer; public fun hashCode ()I @@ -80,6 +81,7 @@ public final class com/amplifyframework/datastore/DataStoreConfiguration$Builder public fun syncExpression (Ljava/lang/Class;Lcom/amplifyframework/datastore/DataStoreSyncExpression;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder; public fun syncExpression (Ljava/lang/String;Lcom/amplifyframework/datastore/DataStoreSyncExpression;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder; public fun syncInterval (JLjava/util/concurrent/TimeUnit;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder; + public fun syncMaxConcurrentModels (Ljava/lang/Integer;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder; public fun syncMaxRecords (Ljava/lang/Integer;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder; public fun syncPageSize (Ljava/lang/Integer;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder; } diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java b/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java index f24444ad08..97ce666186 100644 --- a/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java +++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java @@ -48,6 +48,8 @@ public final class DataStoreConfiguration { static final int DEFAULT_SYNC_PAGE_SIZE = 1_000; @VisibleForTesting static final boolean DEFAULT_DO_SYNC_RETRY = false; + @VisibleForTesting + static final int DEFAULT_SYNC_MAX_CONCURRENT_MODELS = 1; static final int MAX_RECORDS = 1000; static final long MAX_TIME_SEC = 2; @@ -58,6 +60,7 @@ public final class DataStoreConfiguration { private final Integer syncMaxRecords; private final Integer syncPageSize; private final boolean doSyncRetry; + private final Integer syncMaxConcurrentModels; private final Map syncExpressions; private final Long syncIntervalInMinutes; private final Long maxTimeLapseForObserveQuery; @@ -71,6 +74,8 @@ private DataStoreConfiguration(Builder builder) { this.syncIntervalInMinutes = builder.syncIntervalInMinutes; this.syncExpressions = builder.syncExpressions; this.doSyncRetry = builder.doSyncRetry; + this.syncMaxConcurrentModels = builder.syncMaxConcurrentModels != null ? + builder.syncMaxConcurrentModels : DEFAULT_SYNC_MAX_CONCURRENT_MODELS; this.maxTimeLapseForObserveQuery = builder.maxTimeLapseForObserveQuery; this.observeQueryMaxRecords = builder.observeQueryMaxRecords; } @@ -126,9 +131,10 @@ public static DataStoreConfiguration defaults() throws DataStoreException { .syncInterval(DEFAULT_SYNC_INTERVAL_MINUTES, TimeUnit.MINUTES) .syncPageSize(DEFAULT_SYNC_PAGE_SIZE) .syncMaxRecords(DEFAULT_SYNC_MAX_RECORDS) - .doSyncRetry(DEFAULT_DO_SYNC_RETRY) - .observeQueryMaxTime(MAX_TIME_SEC) - .observeQueryMaxRecords(MAX_RECORDS) + .doSyncRetry(DEFAULT_DO_SYNC_RETRY) + .observeQueryMaxTime(MAX_TIME_SEC) + .observeQueryMaxRecords(MAX_RECORDS) + .syncMaxConcurrentModels(DEFAULT_SYNC_MAX_CONCURRENT_MODELS) .build(); } @@ -201,6 +207,23 @@ public Boolean getDoSyncRetry() { return this.doSyncRetry; } + /** + * Gets the number of models that are allowed to concurrently sync. + * NOTE: This value will not be used if any models have associations, instead, the default (1) + * will be used. + * Setting this number to a high value requires that the developer ensure app memory is not a + * concern. If the expected sync data contains a large number of models, with a large number + * of records per model, the concurrency limit should be set to a conservative value. However, + * if the expected sync data contains a large number of models, with a small amount of data in + * each model, setting this limit to a high value will greatly improve sync speeds. + * @return Limit to the number of models that can sync concurrently + */ + @IntRange(from = 1) + @NonNull + public Integer getSyncMaxConcurrentModels() { + return syncMaxConcurrentModels; + } + /** * Returns the Map of all {@link DataStoreSyncExpression}s used to filter data received from AppSync, either during * a sync or over the real-time subscription. @@ -247,6 +270,9 @@ public boolean equals(@Nullable Object thatObject) { if (!ObjectsCompat.equals(getObserveQueryMaxRecords(), that.getObserveQueryMaxRecords())) { return false; } + if (!ObjectsCompat.equals(getSyncMaxConcurrentModels(), that.getSyncMaxConcurrentModels())) { + return false; + } return true; } @@ -261,6 +287,7 @@ public int hashCode() { result = 31 * result + getDoSyncRetry().hashCode(); result = 31 * result + (getObserveQueryMaxRecords() != null ? getObserveQueryMaxRecords().hashCode() : 0); result = 31 * result + getMaxTimeLapseForObserveQuery().hashCode(); + result = 31 * result + getSyncMaxConcurrentModels().hashCode(); return result; } @@ -273,9 +300,10 @@ public String toString() { ", syncPageSize=" + syncPageSize + ", syncIntervalInMinutes=" + syncIntervalInMinutes + ", syncExpressions=" + syncExpressions + - ", doSyncRetry=" + doSyncRetry + - ", maxTimeRelapseForObserveQuery=" + maxTimeLapseForObserveQuery + - ", observeQueryMaxRecords=" + observeQueryMaxRecords + + ", doSyncRetry=" + doSyncRetry + + ", maxTimeRelapseForObserveQuery=" + maxTimeLapseForObserveQuery + + ", observeQueryMaxRecords=" + observeQueryMaxRecords + + ", syncMaxConcurrentModels=" + syncMaxConcurrentModels + '}'; } @@ -309,6 +337,7 @@ public static final class Builder { private Integer syncMaxRecords; private Integer syncPageSize; private boolean doSyncRetry; + private Integer syncMaxConcurrentModels; private Map syncExpressions; private boolean ensureDefaults; private JSONObject pluginJson; @@ -429,6 +458,24 @@ public Builder syncPageSize(@IntRange(from = 0) Integer syncPageSize) { return Builder.this; } + /** + * Sets the max concurrency limit for model syncing. Default is 1 + * NOTE: If any sync models have associations, this value will be unused and the default (1) + * will be used. + * Setting this number to a high value requires that the developer ensure app memory is not a + * concern. If the expected sync data contains a large number of models, with a large number + * of records per model, the concurrency limit should be set to a conservative value. However, + * if the expected sync data contains a large number of models, with a small amount of data in + * each model, setting this limit to a high value will greatly improve sync speeds. + * @param syncMaxConcurrentModels Number of models that can sync concurrently + * @return Current builder + */ + @NonNull + public Builder syncMaxConcurrentModels(@IntRange(from = 1) Integer syncMaxConcurrentModels) { + this.syncMaxConcurrentModels = syncMaxConcurrentModels; + return Builder.this; + } + /** * Sets a sync expression for a particular model to filter which data is synced locally. * The expression is evaluated each time DataStore is started. @@ -518,6 +565,10 @@ private void applyUserProvidedConfiguration() { syncPageSize = getValueOrDefault(userProvidedConfiguration.getSyncPageSize(), syncPageSize); syncExpressions = userProvidedConfiguration.getSyncExpressions(); doSyncRetry = getValueOrDefault(userProvidedConfiguration.getDoSyncRetry(), doSyncRetry); + syncMaxConcurrentModels = getValueOrDefault( + userProvidedConfiguration.getSyncMaxConcurrentModels(), + syncMaxConcurrentModels + ); observeQueryMaxRecords = getValueOrDefault(userProvidedConfiguration.getObserveQueryMaxRecords(), observeQueryMaxRecords); maxTimeLapseForObserveQuery = userProvidedConfiguration.getMaxTimeLapseForObserveQuery() @@ -548,6 +599,10 @@ public DataStoreConfiguration build() throws DataStoreException { syncIntervalInMinutes = getValueOrDefault(syncIntervalInMinutes, DEFAULT_SYNC_INTERVAL_MINUTES); syncMaxRecords = getValueOrDefault(syncMaxRecords, DEFAULT_SYNC_MAX_RECORDS); syncPageSize = getValueOrDefault(syncPageSize, DEFAULT_SYNC_PAGE_SIZE); + syncMaxConcurrentModels = getValueOrDefault( + syncMaxConcurrentModels, + DEFAULT_SYNC_MAX_CONCURRENT_MODELS + ); observeQueryMaxRecords = getValueOrDefault(observeQueryMaxRecords, MAX_RECORDS); maxTimeLapseForObserveQuery = maxTimeLapseForObserveQuery == 0 ? MAX_TIME_SEC : maxTimeLapseForObserveQuery; diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SyncProcessor.java b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SyncProcessor.java index 2acf4acd75..43bd39e086 100644 --- a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SyncProcessor.java +++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SyncProcessor.java @@ -126,16 +126,43 @@ Completable hydrate() { TopologicalOrdering.forRegisteredModels(schemaRegistry, modelProvider); Collections.sort(modelSchemas, ordering::compare); ArrayList toBeSyncedModelArray = new ArrayList<>(); + boolean canSyncConcurrently = true; for (ModelSchema schema : modelSchemas) { //Check to see if query predicate for this schema is not equal to none. This means customer does // not want to sync the data for this model. if (!QueryPredicates.none().equals(queryPredicateProvider.getPredicate(schema.getName()))) { hydrationTasks.add(createHydrationTask(schema)); toBeSyncedModelArray.add(schema.getName()); + if (!schema.getAssociations().isEmpty()) { + canSyncConcurrently = false; + } } } - return Completable.concat(hydrationTasks) + int syncMaxConcurrentModels; + try { + syncMaxConcurrentModels = dataStoreConfigurationProvider + .getConfiguration() + .getSyncMaxConcurrentModels(); + } catch (DataStoreException exception) { + syncMaxConcurrentModels = 1; + } + + Completable syncCompletable; + if (canSyncConcurrently && syncMaxConcurrentModels > 1) { + syncCompletable = Completable.mergeDelayError( + Flowable.fromIterable(hydrationTasks), + syncMaxConcurrentModels + ); + } else { + // The reason we don't do mergeDelayError here with maxConcurrency = 1 is because it would create a + // behavioral difference. If a failure is encountered in concat, sync immediately stops. This would be + // the wrong behavior when concurrency is enabled, but in the single concurrency use case, this matches + // previous behavior + syncCompletable = Completable.concat(hydrationTasks); + } + + return syncCompletable .doOnSubscribe(ignore -> { // This is where we trigger the syncQueriesStarted event since // doOnSubscribe means that all upstream hydration tasks diff --git a/aws-datastore/src/test/java/com/amplifyframework/datastore/DataStoreConfigurationTest.java b/aws-datastore/src/test/java/com/amplifyframework/datastore/DataStoreConfigurationTest.java index 4eaf9bdf2d..c4a546a463 100644 --- a/aws-datastore/src/test/java/com/amplifyframework/datastore/DataStoreConfigurationTest.java +++ b/aws-datastore/src/test/java/com/amplifyframework/datastore/DataStoreConfigurationTest.java @@ -62,6 +62,8 @@ public void testDefaultConfiguration() throws DataStoreException { dataStoreConfiguration.getSyncMaxRecords().intValue()); assertEquals(DataStoreConfiguration.DEFAULT_SYNC_PAGE_SIZE, dataStoreConfiguration.getSyncPageSize().intValue()); + assertEquals(DataStoreConfiguration.DEFAULT_SYNC_MAX_CONCURRENT_MODELS, + dataStoreConfiguration.getSyncMaxConcurrentModels().intValue()); assertTrue(dataStoreConfiguration.getConflictHandler() instanceof AlwaysApplyRemoteHandler); assertTrue(dataStoreConfiguration.getErrorHandler() instanceof DefaultDataStoreErrorHandler); @@ -107,6 +109,7 @@ public void testDefaultOverriddenFromConfigurationAndObject() long expectedSyncIntervalMinutes = 6L; Long expectedSyncIntervalMs = TimeUnit.MINUTES.toMillis(expectedSyncIntervalMinutes); Integer expectedSyncMaxRecords = 3; + Integer expectedSyncMaxConcurrentModels = 5; DummyConflictHandler dummyConflictHandler = new DummyConflictHandler(); DataStoreErrorHandler errorHandler = DefaultDataStoreErrorHandler.instance(); @@ -121,7 +124,8 @@ public void testDefaultOverriddenFromConfigurationAndObject() .errorHandler(errorHandler) .syncExpression(BlogOwner.class, ownerSyncExpression) .syncExpression("Post", postSyncExpression) - .doSyncRetry(true) + .doSyncRetry(true) + .syncMaxConcurrentModels(expectedSyncMaxConcurrentModels) .build(); JSONObject jsonConfigFromFile = new JSONObject() @@ -132,6 +136,7 @@ public void testDefaultOverriddenFromConfigurationAndObject() assertEquals(expectedSyncIntervalMs, dataStoreConfiguration.getSyncIntervalMs()); assertEquals(expectedSyncMaxRecords, dataStoreConfiguration.getSyncMaxRecords()); + assertEquals(expectedSyncMaxConcurrentModels, dataStoreConfiguration.getSyncMaxConcurrentModels()); assertEquals(DataStoreConfiguration.DEFAULT_SYNC_PAGE_SIZE, dataStoreConfiguration.getSyncPageSize().longValue()); assertTrue(dataStoreConfiguration.getDoSyncRetry()); diff --git a/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/ConcurrentSyncProcessorTest.kt b/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/ConcurrentSyncProcessorTest.kt new file mode 100644 index 0000000000..9b1562ee95 --- /dev/null +++ b/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/ConcurrentSyncProcessorTest.kt @@ -0,0 +1,287 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amplifyframework.datastore.syncengine + +import androidx.test.core.app.ApplicationProvider +import com.amplifyframework.core.model.Model +import com.amplifyframework.core.model.ModelProvider +import com.amplifyframework.core.model.SchemaRegistry +import com.amplifyframework.core.model.temporal.Temporal +import com.amplifyframework.datastore.DataStoreConfiguration +import com.amplifyframework.datastore.DataStoreConfigurationProvider +import com.amplifyframework.datastore.DataStoreErrorHandler +import com.amplifyframework.datastore.DataStoreException +import com.amplifyframework.datastore.appsync.AppSync +import com.amplifyframework.datastore.appsync.AppSyncMocking +import com.amplifyframework.datastore.appsync.ModelMetadata +import com.amplifyframework.datastore.appsync.ModelWithMetadata +import com.amplifyframework.datastore.model.SystemModelsProviderFactory +import com.amplifyframework.datastore.storage.SynchronousStorageAdapter +import com.amplifyframework.datastore.storage.sqlite.SQLiteStorageAdapter +import com.amplifyframework.testmodels.flat.AmplifyModelProvider +import com.amplifyframework.testmodels.flat.Model1 +import com.amplifyframework.testmodels.flat.Model2 +import io.mockk.mockk +import io.mockk.verify +import io.reactivex.rxjava3.observers.TestObserver +import java.util.concurrent.TimeUnit +import junit.framework.TestCase.assertEquals +import junit.framework.TestCase.assertTrue +import kotlin.random.Random +import org.junit.After +import org.junit.Before +import org.junit.Test +import org.junit.runner.RunWith +import org.mockito.Mockito.mock +import org.robolectric.RobolectricTestRunner + +/** + * Tests the [SyncProcessor] with concurrency enabled. + */ +@RunWith(RobolectricTestRunner::class) +class ConcurrentSyncProcessorTest { + private val appSync = mock(AppSync::class.java) // using Mockito due to existing AppSyncMocking setup + private val errorHandler = mockk(relaxed = true) + + private lateinit var modelProvider: ModelProvider + private lateinit var storageAdapter: SynchronousStorageAdapter + + private lateinit var syncProcessor: SyncProcessor + + @Before + fun setup() { + modelProvider = AmplifyModelProvider.getInstance() + val schemaRegistry = SchemaRegistry.instance().apply { + clear() + register(modelProvider.models()) + } + + val dataStoreConfiguration = DataStoreConfiguration.builder() + .errorHandler(errorHandler) + .syncInterval(1, TimeUnit.MINUTES) + .syncPageSize(100) + .syncMaxRecords(1000) + .syncMaxConcurrentModels(3) + .build() + + val sqliteStorageAdapter = SQLiteStorageAdapter.forModels( + schemaRegistry, + modelProvider + ) + + storageAdapter = SynchronousStorageAdapter.delegatingTo(sqliteStorageAdapter).apply { + initialize(ApplicationProvider.getApplicationContext(), dataStoreConfiguration) + } + + val syncTimeRegistry = SyncTimeRegistry(sqliteStorageAdapter) + val mutationOutbox: MutationOutbox = PersistentMutationOutbox(sqliteStorageAdapter) + val versionRepository = VersionRepository(sqliteStorageAdapter) + val merger = Merger(mutationOutbox, versionRepository, sqliteStorageAdapter) + + val dataStoreConfigurationProvider = DataStoreConfigurationProvider { dataStoreConfiguration } + + this.syncProcessor = SyncProcessor.builder() + .modelProvider(modelProvider) + .schemaRegistry(schemaRegistry) + .syncTimeRegistry(syncTimeRegistry) + .appSync(appSync) + .merger(merger) + .dataStoreConfigurationProvider(dataStoreConfigurationProvider) + .queryPredicateProvider( + QueryPredicateProvider(dataStoreConfigurationProvider).apply { + resolvePredicates() + } + ) + .retryHandler(RetryHandler()) + .isSyncRetryEnabled(false) + .build() + } + + /** + * Test Cleanup. + * @throws DataStoreException On storage adapter terminate failure + */ + @After + fun tearDown() { + storageAdapter.terminate() + } + + @Test + fun `sync with concurrency`() { + // Arrange a subscription to the storage adapter. We're going to watch for changes. + // We expect to see content here as a result of the SyncProcessor applying updates. + val adapterObserver = storageAdapter.observe().test() + + // Arrange: return some responses for the sync() call on the RemoteModelState + val configurator = AppSyncMocking.sync(appSync) + + val model1 = Model1.builder().name("M1_1").build() + val model1Metadata = ModelMetadata(model1.id, null, Random.nextInt(), Temporal.Timestamp.now()) + val model1WithMetadata = ModelWithMetadata(model1, model1Metadata) + val expectedModel1Response = mutableListOf(model1WithMetadata) + + val model2 = Model2.builder().name("M2_1").build() + val model2Metadata = ModelMetadata(model2.id, null, Random.nextInt(), Temporal.Timestamp.now()) + val model2WithMetadata = ModelWithMetadata(model2, model2Metadata) + val expectedModel2Response = mutableListOf(model2WithMetadata) + + val allExpectedModels = expectedModel1Response + expectedModel2Response + + configurator.mockSuccessResponse(Model1::class.java, model1WithMetadata) + configurator.mockSuccessResponse(Model2::class.java, model2WithMetadata) + + // Act: Call hydrate, and await its completion - assert it completed without error + val hydrationObserver = TestObserver.create>() + syncProcessor.hydrate().subscribe(hydrationObserver) + + assertTrue(hydrationObserver.await(5, TimeUnit.SECONDS)) + hydrationObserver.assertNoErrors() + hydrationObserver.assertComplete() + + // Since hydrate() completed, the storage adapter observer should see some values. + // The number should match expectedResponseItems * 2 (1 for model, 1 for metadata) + // Additionally, there should be 1 LastSyncMetadata record for each model in the provider + adapterObserver.awaitCount(allExpectedModels.size * 2 + AmplifyModelProvider.getInstance().models().size) + + // Validate the changes emitted from the storage adapter's observe(). Sorted to compare lists + assertEquals( + allExpectedModels.flatMap { + listOf(it.model, it.syncMetadata) + }.sortedBy { it.primaryKeyString }, + adapterObserver.values() + .map { it.item() } + .filter { !LastSyncMetadata::class.java.isAssignableFrom(it.javaClass) } + .sortedBy { it.primaryKeyString } + ) + + // Lastly: validate the current contents of the storage adapter. + val itemsInStorage = storageAdapter.query(modelProvider) + assertEquals(allExpectedModels.size, itemsInStorage.size) + + val expectedModels = allExpectedModels.map { it.model }.sortedBy { it.primaryKeyString } + val expectedMetadata = allExpectedModels.map { it.syncMetadata }.sortedBy { it.primaryKeyString } + // system model size excluding metadata should = number of models and + // + 1 (LastSyncMetadata for each + PersistentModelVersion) + val expectedSystemModelsSize = modelProvider.models().size + 1 + + val actualModels = itemsInStorage + .filter { !LastSyncMetadata::class.java.isAssignableFrom(it.javaClass) } + .sortedBy { it.primaryKeyString } + val actualMetadata = + storageAdapter.query(SystemModelsProviderFactory.create()) + .filter { ModelMetadata::class.java.isAssignableFrom(it.javaClass) } + .sortedBy { it.primaryKeyString } + + val actualSystemModels = storageAdapter.query(SystemModelsProviderFactory.create()).filter { + !ModelMetadata::class.java.isAssignableFrom(it.javaClass) + } + + assertEquals(expectedModels, actualModels) + assertEquals(expectedMetadata, actualMetadata) + assertEquals(expectedSystemModelsSize, actualSystemModels.size) + + adapterObserver.dispose() + hydrationObserver.dispose() + } + + @Test + fun `sync with concurrency continues when single model fails`() { + // Arrange a subscription to the storage adapter. We're going to watch for changes. + // We expect to see content here as a result of the SyncProcessor applying updates. + val adapterObserver = storageAdapter.observe().test() + + // Arrange: return some responses for the sync() call on the RemoteModelState + val configurator = AppSyncMocking.sync(appSync) + + val expectedModel1Exception = DataStoreException("Failed to sync Model1", "Failed to sync Model1") + + val model2Item1 = Model2.builder().name("M2_1").build() + val model2Item1Metadata = ModelMetadata(model2Item1.id, null, Random.nextInt(), Temporal.Timestamp.now()) + val model2Item1WithMetadata = ModelWithMetadata(model2Item1, model2Item1Metadata) + val expectedModel2Response1 = mutableListOf(model2Item1WithMetadata) + + val model2Item2 = Model2.builder().name("M2_2").build() + val model2Item2Metadata = ModelMetadata(model2Item2.id, null, Random.nextInt(), Temporal.Timestamp.now()) + val model2Item2WithMetadata = ModelWithMetadata(model2Item2, model2Item2Metadata) + val expectedModel2Response2 = mutableListOf(model2Item2WithMetadata) + + val allExpectedModels = expectedModel2Response1 + expectedModel2Response2 + + configurator.mockFailure(expectedModel1Exception) + configurator.mockSuccessResponse(Model2::class.java, null, "page2", model2Item1WithMetadata) + configurator.mockSuccessResponse(Model2::class.java, "page2", null, model2Item2WithMetadata) + + // Act: Call hydrate, and await its completion - assert it completed without error + val hydrationObserver = TestObserver.create>() + syncProcessor.hydrate().subscribe(hydrationObserver) + + assertTrue(hydrationObserver.await(5, TimeUnit.SECONDS)) + hydrationObserver.assertError { it == expectedModel1Exception } + verify { + errorHandler.accept( + DataStoreException( + "Initial cloud sync failed for Model1.", + expectedModel1Exception, + "Check your internet connection." + ) + ) + } + + // Since hydrate() completed, the storage adapter observer should see some values. + // The number should match expectedResponseItems * 2 (1 for model, 1 for metadata) + // Additionally, there should be 1 LastSyncMetadata for model2 sync success. + // Model1 will not have a last sync record due to failure + adapterObserver.awaitCount(allExpectedModels.size * 2 + 1) + + // Validate the changes emitted from the storage adapter's observe(). Sorted to compare lists + assertEquals( + allExpectedModels.flatMap { + listOf(it.model, it.syncMetadata) + }.sortedBy { it.primaryKeyString }, + adapterObserver.values() + .map { it.item() } + .filter { !LastSyncMetadata::class.java.isAssignableFrom(it.javaClass) } + .sortedBy { it.primaryKeyString } + ) + + // Lastly: validate the current contents of the storage adapter. + val itemsInStorage = storageAdapter.query(modelProvider) + assertEquals(allExpectedModels.size, itemsInStorage.size) + + val expectedModels = allExpectedModels.map { it.model }.sortedBy { it.primaryKeyString } + val expectedMetadata = allExpectedModels.map { it.syncMetadata }.sortedBy { it.primaryKeyString } + // LastSyncMetadata and PersistentModel version for Model2 success + val expectedSystemModelsSize = 2 + + val actualModels = itemsInStorage + .filter { !LastSyncMetadata::class.java.isAssignableFrom(it.javaClass) } + .sortedBy { it.primaryKeyString } + val actualMetadata = + storageAdapter.query(SystemModelsProviderFactory.create()) + .filter { ModelMetadata::class.java.isAssignableFrom(it.javaClass) } + .sortedBy { it.primaryKeyString } + + val actualSystemModels = storageAdapter.query(SystemModelsProviderFactory.create()).filter { + !ModelMetadata::class.java.isAssignableFrom(it.javaClass) + } + + assertEquals(expectedModels, actualModels) + assertEquals(expectedMetadata, actualMetadata) + assertEquals(expectedSystemModelsSize, actualSystemModels.size) + + adapterObserver.dispose() + hydrationObserver.dispose() + } +} diff --git a/testmodels/src/main/java/com/amplifyframework/testmodels/flat/AmplifyModelProvider.java b/testmodels/src/main/java/com/amplifyframework/testmodels/flat/AmplifyModelProvider.java new file mode 100644 index 0000000000..5c177bd2dc --- /dev/null +++ b/testmodels/src/main/java/com/amplifyframework/testmodels/flat/AmplifyModelProvider.java @@ -0,0 +1,53 @@ +package com.amplifyframework.testmodels.flat; + +import com.amplifyframework.core.model.Model; +import com.amplifyframework.core.model.ModelProvider; +import com.amplifyframework.util.Immutable; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +/** + * Contains the set of model classes that implement {@link Model} + * interface. + */ + +public final class AmplifyModelProvider implements ModelProvider { + private static final String AMPLIFY_MODEL_VERSION = "a5218086c100c39df9b1bc3dd3e87c93"; + private static AmplifyModelProvider amplifyGeneratedModelInstance; + private AmplifyModelProvider() { + + } + + public static synchronized AmplifyModelProvider getInstance() { + if (amplifyGeneratedModelInstance == null) { + amplifyGeneratedModelInstance = new AmplifyModelProvider(); + } + return amplifyGeneratedModelInstance; + } + + /** + * Get a set of the model classes. + * + * @return a set of the model classes. + */ + @Override + public Set> models() { + final Set> modifiableSet = new HashSet<>( + Arrays.>asList(Model1.class, Model2.class) + ); + + return Immutable.of(modifiableSet); + + } + + /** + * Get the version of the models. + * + * @return the version string of the models. + */ + @Override + public String version() { + return AMPLIFY_MODEL_VERSION; + } +} diff --git a/testmodels/src/main/java/com/amplifyframework/testmodels/flat/Model1.java b/testmodels/src/main/java/com/amplifyframework/testmodels/flat/Model1.java new file mode 100644 index 0000000000..72a5425bc0 --- /dev/null +++ b/testmodels/src/main/java/com/amplifyframework/testmodels/flat/Model1.java @@ -0,0 +1,185 @@ +package com.amplifyframework.testmodels.flat; + +import androidx.core.util.ObjectsCompat; + +import com.amplifyframework.core.model.Model; +import com.amplifyframework.core.model.ModelIdentifier; +import com.amplifyframework.core.model.annotations.ModelConfig; +import com.amplifyframework.core.model.annotations.ModelField; +import com.amplifyframework.core.model.query.predicate.QueryField; +import com.amplifyframework.core.model.temporal.Temporal; + +import java.util.Objects; +import java.util.UUID; + +import static com.amplifyframework.core.model.query.predicate.QueryField.field; + +/** This is an auto generated class representing the Model1 type in your schema. */ +@SuppressWarnings("all") +@ModelConfig(pluralName = "Model1s", type = Model.Type.USER, version = 1) +public final class Model1 implements Model { + public static final QueryField ID = field("Model1", "id"); + public static final QueryField NAME = field("Model1", "name"); + private final @ModelField(targetType="ID", isRequired = true) String id; + private final @ModelField(targetType="String", isRequired = true) String name; + private @ModelField(targetType="AWSDateTime", isReadOnly = true) Temporal.DateTime createdAt; + private @ModelField(targetType="AWSDateTime", isReadOnly = true) Temporal.DateTime updatedAt; + /** @deprecated This API is internal to Amplify and should not be used. */ + @Deprecated + public String resolveIdentifier() { + return id; + } + + public String getId() { + return id; + } + + public String getName() { + return name; + } + + public Temporal.DateTime getCreatedAt() { + return createdAt; + } + + public Temporal.DateTime getUpdatedAt() { + return updatedAt; + } + + private Model1(String id, String name) { + this.id = id; + this.name = name; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } else if(obj == null || getClass() != obj.getClass()) { + return false; + } else { + Model1 model1 = (Model1) obj; + return ObjectsCompat.equals(getId(), model1.getId()) && + ObjectsCompat.equals(getName(), model1.getName()) && + ObjectsCompat.equals(getCreatedAt(), model1.getCreatedAt()) && + ObjectsCompat.equals(getUpdatedAt(), model1.getUpdatedAt()); + } + } + + @Override + public int hashCode() { + return new StringBuilder() + .append(getId()) + .append(getName()) + .append(getCreatedAt()) + .append(getUpdatedAt()) + .toString() + .hashCode(); + } + + @Override + public String toString() { + return new StringBuilder() + .append("Model1 {") + .append("id=" + String.valueOf(getId()) + ", ") + .append("name=" + String.valueOf(getName()) + ", ") + .append("createdAt=" + String.valueOf(getCreatedAt()) + ", ") + .append("updatedAt=" + String.valueOf(getUpdatedAt())) + .append("}") + .toString(); + } + + public static NameStep builder() { + return new Builder(); + } + + /** + * WARNING: This method should not be used to build an instance of this object for a CREATE mutation. + * This is a convenience method to return an instance of the object with only its ID populated + * to be used in the context of a parameter in a delete mutation or referencing a foreign key + * in a relationship. + * @param id the id of the existing item this instance will represent + * @return an instance of this model with only ID populated + */ + public static Model1 justId(String id) { + return new Model1( + id, + null + ); + } + + public CopyOfBuilder copyOfBuilder() { + return new CopyOfBuilder(id, + name); + } + public interface NameStep { + BuildStep name(String name); + } + + + public interface BuildStep { + Model1 build(); + BuildStep id(String id); + } + + + public static class Builder implements NameStep, BuildStep { + private String id; + private String name; + public Builder() { + + } + + private Builder(String id, String name) { + this.id = id; + this.name = name; + } + + @Override + public Model1 build() { + String id = this.id != null ? this.id : UUID.randomUUID().toString(); + + return new Model1( + id, + name); + } + + @Override + public BuildStep name(String name) { + Objects.requireNonNull(name); + this.name = name; + return this; + } + + /** + * @param id id + * @return Current Builder instance, for fluent method chaining + */ + public BuildStep id(String id) { + this.id = id; + return this; + } + } + + + public final class CopyOfBuilder extends Builder { + private CopyOfBuilder(String id, String name) { + super(id, name); + Objects.requireNonNull(name); + } + + @Override + public CopyOfBuilder name(String name) { + return (CopyOfBuilder) super.name(name); + } + } + + + public static class Model1Identifier extends ModelIdentifier { + private static final long serialVersionUID = 1L; + public Model1Identifier(String id) { + super(id); + } + } + +} diff --git a/testmodels/src/main/java/com/amplifyframework/testmodels/flat/Model2.java b/testmodels/src/main/java/com/amplifyframework/testmodels/flat/Model2.java new file mode 100644 index 0000000000..329d12c1fe --- /dev/null +++ b/testmodels/src/main/java/com/amplifyframework/testmodels/flat/Model2.java @@ -0,0 +1,179 @@ +package com.amplifyframework.testmodels.flat; + +import androidx.core.util.ObjectsCompat; + +import com.amplifyframework.core.model.Model; +import com.amplifyframework.core.model.ModelIdentifier; +import com.amplifyframework.core.model.annotations.ModelConfig; +import com.amplifyframework.core.model.annotations.ModelField; +import com.amplifyframework.core.model.query.predicate.QueryField; +import com.amplifyframework.core.model.temporal.Temporal; + +import java.util.UUID; + +import static com.amplifyframework.core.model.query.predicate.QueryField.field; + +/** This is an auto generated class representing the Model2 type in your schema. */ +@SuppressWarnings("all") +@ModelConfig(pluralName = "Model2s", type = Model.Type.USER, version = 1) +public final class Model2 implements Model { + public static final QueryField ID = field("Model2", "id"); + public static final QueryField NAME = field("Model2", "name"); + private final @ModelField(targetType="ID", isRequired = true) String id; + private final @ModelField(targetType="String") String name; + private @ModelField(targetType="AWSDateTime", isReadOnly = true) Temporal.DateTime createdAt; + private @ModelField(targetType="AWSDateTime", isReadOnly = true) Temporal.DateTime updatedAt; + /** @deprecated This API is internal to Amplify and should not be used. */ + @Deprecated + public String resolveIdentifier() { + return id; + } + + public String getId() { + return id; + } + + public String getName() { + return name; + } + + public Temporal.DateTime getCreatedAt() { + return createdAt; + } + + public Temporal.DateTime getUpdatedAt() { + return updatedAt; + } + + private Model2(String id, String name) { + this.id = id; + this.name = name; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } else if(obj == null || getClass() != obj.getClass()) { + return false; + } else { + Model2 model2 = (Model2) obj; + return ObjectsCompat.equals(getId(), model2.getId()) && + ObjectsCompat.equals(getName(), model2.getName()) && + ObjectsCompat.equals(getCreatedAt(), model2.getCreatedAt()) && + ObjectsCompat.equals(getUpdatedAt(), model2.getUpdatedAt()); + } + } + + @Override + public int hashCode() { + return new StringBuilder() + .append(getId()) + .append(getName()) + .append(getCreatedAt()) + .append(getUpdatedAt()) + .toString() + .hashCode(); + } + + @Override + public String toString() { + return new StringBuilder() + .append("Model2 {") + .append("id=" + String.valueOf(getId()) + ", ") + .append("name=" + String.valueOf(getName()) + ", ") + .append("createdAt=" + String.valueOf(getCreatedAt()) + ", ") + .append("updatedAt=" + String.valueOf(getUpdatedAt())) + .append("}") + .toString(); + } + + public static BuildStep builder() { + return new Builder(); + } + + /** + * WARNING: This method should not be used to build an instance of this object for a CREATE mutation. + * This is a convenience method to return an instance of the object with only its ID populated + * to be used in the context of a parameter in a delete mutation or referencing a foreign key + * in a relationship. + * @param id the id of the existing item this instance will represent + * @return an instance of this model with only ID populated + */ + public static Model2 justId(String id) { + return new Model2( + id, + null + ); + } + + public CopyOfBuilder copyOfBuilder() { + return new CopyOfBuilder(id, + name); + } + public interface BuildStep { + Model2 build(); + BuildStep id(String id); + BuildStep name(String name); + } + + + public static class Builder implements BuildStep { + private String id; + private String name; + public Builder() { + + } + + private Builder(String id, String name) { + this.id = id; + this.name = name; + } + + @Override + public Model2 build() { + String id = this.id != null ? this.id : UUID.randomUUID().toString(); + + return new Model2( + id, + name); + } + + @Override + public BuildStep name(String name) { + this.name = name; + return this; + } + + /** + * @param id id + * @return Current Builder instance, for fluent method chaining + */ + public BuildStep id(String id) { + this.id = id; + return this; + } + } + + + public final class CopyOfBuilder extends Builder { + private CopyOfBuilder(String id, String name) { + super(id, name); + + } + + @Override + public CopyOfBuilder name(String name) { + return (CopyOfBuilder) super.name(name); + } + } + + + public static class Model2Identifier extends ModelIdentifier { + private static final long serialVersionUID = 1L; + public Model2Identifier(String id) { + super(id); + } + } + +}