Skip to content

Commit

Permalink
Implemented (worker side) tile importing (for skip and rename `Im…
Browse files Browse the repository at this point in the history
…portConflictStrategy` only)

Fixed race condition in exporter
Fixed bugs in example application
  • Loading branch information
JaffaKetchup committed Mar 31, 2024
1 parent c48ba68 commit f198d3a
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 107 deletions.
3 changes: 1 addition & 2 deletions example/lib/screens/export_import/export_import.dart
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,7 @@ class _ExportImportPopupState extends State<ExportImportPopup> {

late final bool isExporting;
late final Icon icon;
if (snapshot.data! == FileSystemEntityType.notFound &&
!forceOverrideExisting) {
if (snapshot.data! == FileSystemEntityType.notFound) {
icon = const Icon(Icons.save);
isExporting = true;
} else if (snapshot.data! == FileSystemEntityType.file &&
Expand Down
265 changes: 160 additions & 105 deletions lib/src/backend/impls/objectbox/backend/internal_worker.dart
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ Future<void> _worker(
exportingRoot
.runInTransaction(
TxMode.write,
() => exportingStores.listen(
() => exportingStores.map(
(exportingStore) {
exportingRoot.box<ObjectBoxStore>().put(
storesObjectsForRelations[exportingStore.name] =
Expand All @@ -940,7 +940,7 @@ Future<void> _worker(
},
),
)
.asFuture<void>()
.last
.then(
(_) {
final exportingTiles = root.runInTransaction(
Expand All @@ -951,7 +951,7 @@ Future<void> _worker(
exportingRoot
.runInTransaction(
TxMode.write,
() => exportingTiles.listen(
() => exportingTiles.map(
(exportingTile) {
exportingRoot.box<ObjectBoxTile>().put(
ObjectBoxTile(
Expand All @@ -968,7 +968,7 @@ Future<void> _worker(
},
),
)
.asFuture<void>()
.last
.then(
(_) {
storesQuery.close();
Expand Down Expand Up @@ -1039,31 +1039,35 @@ Future<void> _worker(
final nameToState = <String, String?>{};
// ignore: unnecessary_parenthesis
(switch (strategy) {
ImportConflictStrategy.skip => importingStoresQuery.stream().where(
(importingStore) {
final name = importingStore.name;
final hasConflict = (specificStoresQuery
..param(ObjectBoxStore_.name).value = name)
.count() ==
1;
nameToState[name] = hasConflict ? null : name;
ImportConflictStrategy.skip => importingStoresQuery
.stream()
.where(
(importingStore) {
final name = importingStore.name;
final hasConflict = (specificStoresQuery
..param(ObjectBoxStore_.name).value = name)
.count() ==
1;
nameToState[name] = hasConflict ? null : name;

if (hasConflict) return false;

if (hasConflict) return false;

root.box<ObjectBoxStore>().put(
ObjectBoxStore(
name: name,
length: importingStore.length,
size: importingStore.size,
hits: importingStore.hits,
misses: importingStore.misses,
metadataJson: importingStore.metadataJson,
),
mode: PutMode.insert,
);
return true;
},
).toList(),
root.box<ObjectBoxStore>().put(
ObjectBoxStore(
name: name,
length: importingStore.length,
size: importingStore.size,
hits: importingStore.hits,
misses: importingStore.misses,
metadataJson: importingStore.metadataJson,
),
mode: PutMode.insert,
);
return true;
},
)
.map((s) => s.name)
.toList(),
ImportConflictStrategy.rename =>
importingStoresQuery.stream().map((importingStore) {
final name = importingStore.name;
Expand All @@ -1084,7 +1088,7 @@ Future<void> _worker(
),
mode: PutMode.insert,
);
return importingStore;
return name;
} else {
final newName =
nameToState[name] = '$name [Imported ${DateTime.now()}]';
Expand All @@ -1103,7 +1107,7 @@ Future<void> _worker(
),
mode: PutMode.insert,
);
return newStore;
return newName;
}
}).toList(),
ImportConflictStrategy.replace ||
Expand All @@ -1128,7 +1132,7 @@ Future<void> _worker(
mode: PutMode.insert,
);
}
return importingStore;
return name;
},
).toList()
/*.then(
Expand Down Expand Up @@ -1169,7 +1173,7 @@ Future<void> _worker(
sendRes(
id: cmd.id,
data: {
'expectStream': true,
'expectStream': true, // TODO: Needs subscription?
'nameToState': nameToState,
},
);
Expand All @@ -1183,7 +1187,129 @@ Future<void> _worker(

if (strategy == ImportConflictStrategy.skip ||
strategy == ImportConflictStrategy.rename) {
throw UnimplementedError();
final importingTilesQuery =
importingRoot.box<ObjectBoxTile>().query().build();
final importingTiles = importingTilesQuery.stream();

final existingStoresQuery = root
.box<ObjectBoxStore>()
.query(ObjectBoxStore_.name.equals(''))
.build();
final existingTilesQuery = root
.box<ObjectBoxTile>()
.query(ObjectBoxTile_.url.equals(''))
.build();

final storesToUpdate = <String, ObjectBoxStore>{};

int rootDeltaLength = 0;
int rootDeltaSize = 0;

Iterable<ObjectBoxStore> convertToExistingStores(
Iterable<ObjectBoxStore> importingStores,
) =>
importingStores
.where(
(s) => storesToImport.contains(s.name),
)
.map(
(s) => storesToUpdate[s.name] ??= (existingStoresQuery
..param(ObjectBoxStore_.name).value = s.name)
.findUnique()!,
);

root
.runInTransaction(
TxMode.write,
() => importingTiles.map(
(importingTile) {
try {
root.box<ObjectBoxTile>().put(
ObjectBoxTile(
url: importingTile.url,
bytes: importingTile.bytes,
lastModified: importingTile.lastModified,
)..stores.addAll(
convertToExistingStores(
importingTile.stores,
),
),
mode: PutMode.insert,
);

rootDeltaLength++;
rootDeltaSize += importingTile.bytes.lengthInBytes;
} on UniqueViolationException {
final existingTile = (existingTilesQuery
..param(ObjectBoxTile_.url).value =
importingTile.url)
.findUnique()!;

final newRelatedStores =
convertToExistingStores(importingTile.stores);

if (existingTile.lastModified
.isAfter(importingTile.lastModified)) {
/*for (final newRelatedStore in newRelatedStores) {
storesToUpdate[newRelatedStore.name] =
(storesToUpdate[newRelatedStore.name] ??
newRelatedStore)
..size +=
-importingTile.bytes.lengthInBytes +
existingTile.bytes.lengthInBytes;
}*/
return;
}

root.box<ObjectBoxTile>().put(
ObjectBoxTile(
url: importingTile.url,
bytes: importingTile.bytes,
lastModified: importingTile.lastModified,
)..stores.addAll(
{
...existingTile.stores,
...newRelatedStores,
},
),
mode: PutMode.update,
);

for (final existingTileStore in existingTile.stores) {
storesToUpdate[existingTileStore.name] =
(storesToUpdate[existingTileStore.name] ??
existingTileStore)
..size += -existingTile.bytes.lengthInBytes +
importingTile.bytes.lengthInBytes;
}

rootDeltaSize += -existingTile.bytes.lengthInBytes +
importingTile.bytes.lengthInBytes;
}
},
),
)
.last
.then((_) {
updateRootStatistics(
deltaLength: rootDeltaLength,
deltaSize: rootDeltaSize,
);

importingTilesQuery.close();
existingStoresQuery.close();
existingTilesQuery.close();
importingRoot.close();

importFile.deleteSync();
File(path.join(importDir, 'lock.mdb')).deleteSync();
importDirIO.deleteSync();

sendRes(
id: cmd.id,
data: {'expectStream': true, 'finished': null},
);
});
} else {
throw UnimplementedError();
}
Expand All @@ -1196,78 +1322,7 @@ Future<void> _worker(
TxMode.write,
() => importingTiles.listen(
(importingTile) {
if (strategy == ImportConflictStrategy.skip &&
importingTile.stores.length == 1 &&
storesToSkipTiles.contains(
importingTile.stores[0].name,
)) return;
importingTile.stores.removeWhere(
(s) => storesToSkipTiles.contains(s.name),
);
try {
root.box<ObjectBoxTile>().put(
ObjectBoxTile(
url: importingTile.url,
bytes: importingTile.bytes,
lastModified: importingTile.lastModified,
)..stores.addAll(
importingTile.stores.map(
(e) => storesObjectsForRelations[e.name]!,
),
),
mode: PutMode.insert,
);
rootDeltaLength++;
rootDeltaSize += importingTile.bytes.lengthInBytes;
} on UniqueViolationException {
final existingTile = (specificTilesQuery
..param(ObjectBoxTile_.url).value =
importingTile.url)
.findUnique()!;
final newRelatedStores = importingTile.stores.map(
(e) => storesObjectsForRelations[e.name]!,
);
if (existingTile.lastModified
.isAfter(importingTile.lastModified)) {
for (final newRelatedStore in newRelatedStores) {
storesToUpdate[newRelatedStore.name] =
(storesToUpdate[newRelatedStore.name] ??
newRelatedStore)
..size += -importingTile.bytes.lengthInBytes +
existingTile.bytes.lengthInBytes;
}
return;
}
root.box<ObjectBoxTile>().put(
ObjectBoxTile(
url: importingTile.url,
bytes: importingTile.bytes,
lastModified: importingTile.lastModified,
)..stores.addAll(
{
...existingTile.stores,
...newRelatedStores,
},
),
mode: PutMode.update,
);
for (final existingTileStore in existingTile.stores) {
storesToUpdate[existingTileStore.name] =
(storesToUpdate[existingTileStore.name] ??
existingTileStore)
..size += -existingTile.bytes.lengthInBytes +
importingTile.bytes.lengthInBytes;
}
rootDeltaSize += -existingTile.bytes.lengthInBytes +
importingTile.bytes.lengthInBytes;
}
},
),
Expand Down

0 comments on commit f198d3a

Please sign in to comment.