From f198d3ab3be38405dcb5e18d71696e639f7d00d6 Mon Sep 17 00:00:00 2001 From: JaffaKetchup Date: Sun, 31 Mar 2024 23:36:33 +0100 Subject: [PATCH] Implemented (worker side) tile importing (for `skip` and `rename` `ImportConflictStrategy` only) Fixed race condition in exporter Fixed bugs in example application --- .../screens/export_import/export_import.dart | 3 +- .../objectbox/backend/internal_worker.dart | 265 +++++++++++------- 2 files changed, 161 insertions(+), 107 deletions(-) diff --git a/example/lib/screens/export_import/export_import.dart b/example/lib/screens/export_import/export_import.dart index 72475ed5..4f1398b6 100644 --- a/example/lib/screens/export_import/export_import.dart +++ b/example/lib/screens/export_import/export_import.dart @@ -156,8 +156,7 @@ class _ExportImportPopupState extends State { late final bool isExporting; late final Icon icon; - if (snapshot.data! == FileSystemEntityType.notFound && - !forceOverrideExisting) { + if (snapshot.data! == FileSystemEntityType.notFound) { icon = const Icon(Icons.save); isExporting = true; } else if (snapshot.data! == FileSystemEntityType.file && diff --git a/lib/src/backend/impls/objectbox/backend/internal_worker.dart b/lib/src/backend/impls/objectbox/backend/internal_worker.dart index 019a7ffd..ad2bdf79 100644 --- a/lib/src/backend/impls/objectbox/backend/internal_worker.dart +++ b/lib/src/backend/impls/objectbox/backend/internal_worker.dart @@ -923,7 +923,7 @@ Future _worker( exportingRoot .runInTransaction( TxMode.write, - () => exportingStores.listen( + () => exportingStores.map( (exportingStore) { exportingRoot.box().put( storesObjectsForRelations[exportingStore.name] = @@ -940,7 +940,7 @@ Future _worker( }, ), ) - .asFuture() + .last .then( (_) { final exportingTiles = root.runInTransaction( @@ -951,7 +951,7 @@ Future _worker( exportingRoot .runInTransaction( TxMode.write, - () => exportingTiles.listen( + () => exportingTiles.map( (exportingTile) { exportingRoot.box().put( ObjectBoxTile( @@ -968,7 +968,7 @@ Future _worker( }, ), ) - .asFuture() + .last .then( (_) { storesQuery.close(); @@ -1039,31 +1039,35 @@ Future _worker( final nameToState = {}; // ignore: unnecessary_parenthesis (switch (strategy) { - ImportConflictStrategy.skip => importingStoresQuery.stream().where( - (importingStore) { - final name = importingStore.name; - final hasConflict = (specificStoresQuery - ..param(ObjectBoxStore_.name).value = name) - .count() == - 1; - nameToState[name] = hasConflict ? null : name; + ImportConflictStrategy.skip => importingStoresQuery + .stream() + .where( + (importingStore) { + final name = importingStore.name; + final hasConflict = (specificStoresQuery + ..param(ObjectBoxStore_.name).value = name) + .count() == + 1; + nameToState[name] = hasConflict ? null : name; + + if (hasConflict) return false; - if (hasConflict) return false; - - root.box().put( - ObjectBoxStore( - name: name, - length: importingStore.length, - size: importingStore.size, - hits: importingStore.hits, - misses: importingStore.misses, - metadataJson: importingStore.metadataJson, - ), - mode: PutMode.insert, - ); - return true; - }, - ).toList(), + root.box().put( + ObjectBoxStore( + name: name, + length: importingStore.length, + size: importingStore.size, + hits: importingStore.hits, + misses: importingStore.misses, + metadataJson: importingStore.metadataJson, + ), + mode: PutMode.insert, + ); + return true; + }, + ) + .map((s) => s.name) + .toList(), ImportConflictStrategy.rename => importingStoresQuery.stream().map((importingStore) { final name = importingStore.name; @@ -1084,7 +1088,7 @@ Future _worker( ), mode: PutMode.insert, ); - return importingStore; + return name; } else { final newName = nameToState[name] = '$name [Imported ${DateTime.now()}]'; @@ -1103,7 +1107,7 @@ Future _worker( ), mode: PutMode.insert, ); - return newStore; + return newName; } }).toList(), ImportConflictStrategy.replace || @@ -1128,7 +1132,7 @@ Future _worker( mode: PutMode.insert, ); } - return importingStore; + return name; }, ).toList() /*.then( @@ -1169,7 +1173,7 @@ Future _worker( sendRes( id: cmd.id, data: { - 'expectStream': true, + 'expectStream': true, // TODO: Needs subscription? 'nameToState': nameToState, }, ); @@ -1183,7 +1187,129 @@ Future _worker( if (strategy == ImportConflictStrategy.skip || strategy == ImportConflictStrategy.rename) { - throw UnimplementedError(); + final importingTilesQuery = + importingRoot.box().query().build(); + final importingTiles = importingTilesQuery.stream(); + + final existingStoresQuery = root + .box() + .query(ObjectBoxStore_.name.equals('')) + .build(); + final existingTilesQuery = root + .box() + .query(ObjectBoxTile_.url.equals('')) + .build(); + + final storesToUpdate = {}; + + int rootDeltaLength = 0; + int rootDeltaSize = 0; + + Iterable convertToExistingStores( + Iterable importingStores, + ) => + importingStores + .where( + (s) => storesToImport.contains(s.name), + ) + .map( + (s) => storesToUpdate[s.name] ??= (existingStoresQuery + ..param(ObjectBoxStore_.name).value = s.name) + .findUnique()!, + ); + + root + .runInTransaction( + TxMode.write, + () => importingTiles.map( + (importingTile) { + try { + root.box().put( + ObjectBoxTile( + url: importingTile.url, + bytes: importingTile.bytes, + lastModified: importingTile.lastModified, + )..stores.addAll( + convertToExistingStores( + importingTile.stores, + ), + ), + mode: PutMode.insert, + ); + + rootDeltaLength++; + rootDeltaSize += importingTile.bytes.lengthInBytes; + } on UniqueViolationException { + final existingTile = (existingTilesQuery + ..param(ObjectBoxTile_.url).value = + importingTile.url) + .findUnique()!; + + final newRelatedStores = + convertToExistingStores(importingTile.stores); + + if (existingTile.lastModified + .isAfter(importingTile.lastModified)) { + /*for (final newRelatedStore in newRelatedStores) { + storesToUpdate[newRelatedStore.name] = + (storesToUpdate[newRelatedStore.name] ?? + newRelatedStore) + ..size += + -importingTile.bytes.lengthInBytes + + existingTile.bytes.lengthInBytes; + }*/ + return; + } + + root.box().put( + ObjectBoxTile( + url: importingTile.url, + bytes: importingTile.bytes, + lastModified: importingTile.lastModified, + )..stores.addAll( + { + ...existingTile.stores, + ...newRelatedStores, + }, + ), + mode: PutMode.update, + ); + + for (final existingTileStore in existingTile.stores) { + storesToUpdate[existingTileStore.name] = + (storesToUpdate[existingTileStore.name] ?? + existingTileStore) + ..size += -existingTile.bytes.lengthInBytes + + importingTile.bytes.lengthInBytes; + } + + rootDeltaSize += -existingTile.bytes.lengthInBytes + + importingTile.bytes.lengthInBytes; + } + }, + ), + ) + .last + .then((_) { + updateRootStatistics( + deltaLength: rootDeltaLength, + deltaSize: rootDeltaSize, + ); + + importingTilesQuery.close(); + existingStoresQuery.close(); + existingTilesQuery.close(); + importingRoot.close(); + + importFile.deleteSync(); + File(path.join(importDir, 'lock.mdb')).deleteSync(); + importDirIO.deleteSync(); + + sendRes( + id: cmd.id, + data: {'expectStream': true, 'finished': null}, + ); + }); } else { throw UnimplementedError(); } @@ -1196,78 +1322,7 @@ Future _worker( TxMode.write, () => importingTiles.listen( (importingTile) { - if (strategy == ImportConflictStrategy.skip && - importingTile.stores.length == 1 && - storesToSkipTiles.contains( - importingTile.stores[0].name, - )) return; - - importingTile.stores.removeWhere( - (s) => storesToSkipTiles.contains(s.name), - ); - - try { - root.box().put( - ObjectBoxTile( - url: importingTile.url, - bytes: importingTile.bytes, - lastModified: importingTile.lastModified, - )..stores.addAll( - importingTile.stores.map( - (e) => storesObjectsForRelations[e.name]!, - ), - ), - mode: PutMode.insert, - ); - - rootDeltaLength++; - rootDeltaSize += importingTile.bytes.lengthInBytes; - } on UniqueViolationException { - final existingTile = (specificTilesQuery - ..param(ObjectBoxTile_.url).value = - importingTile.url) - .findUnique()!; - - final newRelatedStores = importingTile.stores.map( - (e) => storesObjectsForRelations[e.name]!, - ); - - if (existingTile.lastModified - .isAfter(importingTile.lastModified)) { - for (final newRelatedStore in newRelatedStores) { - storesToUpdate[newRelatedStore.name] = - (storesToUpdate[newRelatedStore.name] ?? - newRelatedStore) - ..size += -importingTile.bytes.lengthInBytes + - existingTile.bytes.lengthInBytes; - } - return; - } - - root.box().put( - ObjectBoxTile( - url: importingTile.url, - bytes: importingTile.bytes, - lastModified: importingTile.lastModified, - )..stores.addAll( - { - ...existingTile.stores, - ...newRelatedStores, - }, - ), - mode: PutMode.update, - ); - - for (final existingTileStore in existingTile.stores) { - storesToUpdate[existingTileStore.name] = - (storesToUpdate[existingTileStore.name] ?? - existingTileStore) - ..size += -existingTile.bytes.lengthInBytes + - importingTile.bytes.lengthInBytes; - } - - rootDeltaSize += -existingTile.bytes.lengthInBytes + - importingTile.bytes.lengthInBytes; + } }, ),