From 1b40bace153f2c6a685eb81a564a479f805a8948 Mon Sep 17 00:00:00 2001 From: halgari Date: Mon, 10 Jun 2024 11:14:06 -0600 Subject: [PATCH] Update how we get DB updates to also include the changed datoms --- .../IConnection.cs | 25 +++++++++++++++- .../IndexSegments/IndexSegment.cs | 11 +++++++ .../Models/ModelExtensions.cs | 29 ------------------- .../{MutableSlice.cs => ObservableDatoms.cs} | 22 +++++++++----- src/NexusMods.MnemonicDB/Connection.cs | 26 ++++++++++++----- tests/NexusMods.MnemonicDB.Tests/DbTests.cs | 12 +++++--- 6 files changed, 76 insertions(+), 49 deletions(-) delete mode 100644 src/NexusMods.MnemonicDB.Abstractions/Models/ModelExtensions.cs rename src/NexusMods.MnemonicDB.Abstractions/Query/{MutableSlice.cs => ObservableDatoms.cs} (83%) diff --git a/src/NexusMods.MnemonicDB.Abstractions/IConnection.cs b/src/NexusMods.MnemonicDB.Abstractions/IConnection.cs index 9e3e404f..12abef7c 100644 --- a/src/NexusMods.MnemonicDB.Abstractions/IConnection.cs +++ b/src/NexusMods.MnemonicDB.Abstractions/IConnection.cs @@ -1,7 +1,25 @@ using System; +using NexusMods.MnemonicDB.Abstractions.IndexSegments; +using NexusMods.MnemonicDB.Abstractions.Internals; namespace NexusMods.MnemonicDB.Abstractions; +/// +/// A database revision, which includes a datom and the datoms added to it. +/// +public struct Revision +{ + /// + /// The database for the most recent transaction + /// + public IDb Database; + + /// + /// The datoms that were added in the most recent transaction + /// + public IndexSegment AddedDatoms; +} + /// /// Represents a connection to a database. /// @@ -12,6 +30,11 @@ public interface IConnection /// public IDb Db { get; } + /// + /// The attribute registry for this connection + /// + public IAttributeRegistry Registry { get; } + /// /// Gets the most recent transaction id. /// @@ -20,7 +43,7 @@ public interface IConnection /// /// A sequential stream of database revisions. /// - public IObservable Revisions { get; } + public IObservable Revisions { get; } /// /// A service provider that entities can use to resolve their values diff --git a/src/NexusMods.MnemonicDB.Abstractions/IndexSegments/IndexSegment.cs b/src/NexusMods.MnemonicDB.Abstractions/IndexSegments/IndexSegment.cs index 522c84a9..e4064b98 100644 --- a/src/NexusMods.MnemonicDB.Abstractions/IndexSegments/IndexSegment.cs +++ b/src/NexusMods.MnemonicDB.Abstractions/IndexSegments/IndexSegment.cs @@ -1,6 +1,7 @@ using System; using System.Collections; using System.Collections.Generic; +using DynamicData; using NexusMods.MnemonicDB.Abstractions.DatomIterators; using NexusMods.MnemonicDB.Abstractions.Internals; using NexusMods.Paths; @@ -87,4 +88,14 @@ IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } + + /// + /// Create a new index segment from the given datoms + /// + public static IndexSegment From(IAttributeRegistry registry, IReadOnlyCollection datoms) + { + using var builder = new IndexSegmentBuilder(registry, datoms.Count); + builder.Add(datoms); + return builder.Build(); + } } diff --git a/src/NexusMods.MnemonicDB.Abstractions/Models/ModelExtensions.cs b/src/NexusMods.MnemonicDB.Abstractions/Models/ModelExtensions.cs deleted file mode 100644 index 1bdcd1bf..00000000 --- a/src/NexusMods.MnemonicDB.Abstractions/Models/ModelExtensions.cs +++ /dev/null @@ -1,29 +0,0 @@ -using System; -using System.Reactive.Linq; - -namespace NexusMods.MnemonicDB.Abstractions.Models; - -/// -/// Extensions for models -/// -public static class ModelExtensions -{ - /// - /// Returns an observable of the revisions of the model, starting with the current version and terminating - /// when the model is no longer valid. - /// - public static IObservable Revisions(this TModel model) - where TModel : IReadOnlyModel - { - // Extract the id so that we don't hold onto the original model (and keep its cache live) - var id = model.Id; - var revisions = model.Db.Connection.Revisions - .Where(db => db.Analytics.LatestTxIds.Contains(id)) - // Start with here, so we don't filter the first Db - .StartWith(model.Db.Connection.Db) - .Select(db => TModel.Create(db, id)) - .TakeWhile(m => m.IsValid()); - return revisions; - } - -} diff --git a/src/NexusMods.MnemonicDB.Abstractions/Query/MutableSlice.cs b/src/NexusMods.MnemonicDB.Abstractions/Query/ObservableDatoms.cs similarity index 83% rename from src/NexusMods.MnemonicDB.Abstractions/Query/MutableSlice.cs rename to src/NexusMods.MnemonicDB.Abstractions/Query/ObservableDatoms.cs index bfeae4c1..732160dc 100644 --- a/src/NexusMods.MnemonicDB.Abstractions/Query/MutableSlice.cs +++ b/src/NexusMods.MnemonicDB.Abstractions/Query/ObservableDatoms.cs @@ -2,37 +2,43 @@ using System.Collections.Generic; using System.Reactive.Linq; using DynamicData; -using Microsoft.Extensions.DependencyInjection; using NexusMods.MnemonicDB.Abstractions.DatomComparators; using NexusMods.MnemonicDB.Abstractions.DatomIterators; using NexusMods.MnemonicDB.Abstractions.IndexSegments; namespace NexusMods.MnemonicDB.Abstractions.Query; -public static class MutableSlice +public static class ObservableDatoms { /// /// Observe a slice of the database, as datoms are added or removed from the database, the observer will be updated /// with the changeset of datoms that have been added or removed. /// - public static IObservable> Observe(IConnection conn, SliceDescriptor descriptor) + public static IObservable> ObserveDatoms(this IConnection conn, SliceDescriptor descriptor) { var comparator = PartialComparator(descriptor.Index); var equality = (IEqualityComparer)comparator; var set = new SortedSet(comparator); - return conn.Revisions.Select((db, idx) => + return conn.Revisions.Select((rev, idx) => { if (idx == 0) - return Setup(set, db, descriptor); - return Diff(set, db, descriptor, equality); + return Setup(set, rev.Database, descriptor); + return Diff(set, rev.AddedDatoms, descriptor, equality); }); } - private static IChangeSet Diff(SortedSet set, IDb db, SliceDescriptor descriptor, IEqualityComparer comparer) + /// + /// Observe all datoms for a given entity id + /// + public static IObservable> ObserveDatoms(this IConnection conn, EntityId id) + { + return conn.ObserveDatoms(SliceDescriptor.Create(id, conn.Registry)); + } + + private static IChangeSet Diff(SortedSet set, IndexSegment updates, SliceDescriptor descriptor, IEqualityComparer comparer) { - var updates = db.Datoms(SliceDescriptor.Create(db.BasisTxId, db.Registry)); List>? changes = null; foreach (var datom in updates) diff --git a/src/NexusMods.MnemonicDB/Connection.cs b/src/NexusMods.MnemonicDB/Connection.cs index 4d92a49e..cde61349 100644 --- a/src/NexusMods.MnemonicDB/Connection.cs +++ b/src/NexusMods.MnemonicDB/Connection.cs @@ -27,7 +27,7 @@ public class Connection : IConnection, IHostedService private readonly ILogger _logger; private Task? _bootstrapTask; - private BehaviorSubject _dbStream; + private BehaviorSubject _dbStream; private IDisposable? _dbStreamDisposable; /// @@ -39,7 +39,7 @@ public Connection(ILogger logger, IDatomStore store, IServiceProvide _logger = logger; _declaredAttributes = declaredAttributes; _store = store; - _dbStream = new BehaviorSubject(null!); + _dbStream = new BehaviorSubject(default!); } /// @@ -50,14 +50,17 @@ public IDb Db { get { - var val = _dbStream.Value; + var val = _dbStream; // ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract if (val == null) ThrowNullDb(); - return val!; + return val!.Value.Database; } } + /// + public IAttributeRegistry Registry => _store.Registry; + private static void ThrowNullDb() { throw new InvalidOperationException("Connection not started, did you forget to start the hosted service?"); @@ -81,11 +84,11 @@ public ITransaction BeginTransaction() } /// - public IObservable Revisions + public IObservable Revisions { get { - if (_dbStream == null) + if (_dbStream == default!) ThrowNullDb(); return _dbStream!; } @@ -183,7 +186,16 @@ private async Task Bootstrap() var storeResult = await AddMissingAttributes(_declaredAttributes); _dbStreamDisposable = _store.TxLog - .Select(log => new Db(log.Snapshot, this, log.TxId, (AttributeRegistry)_store.Registry)) + .Select(log => + { + var db = new Db(log.Snapshot, this, log.TxId, (AttributeRegistry)_store.Registry); + var addedItems = db.Datoms(SliceDescriptor.Create(db.BasisTxId, _store.Registry)); + return new Revision + { + Database = db, + AddedDatoms = addedItems + }; + }) .Subscribe(_dbStream); } catch (Exception ex) diff --git a/tests/NexusMods.MnemonicDB.Tests/DbTests.cs b/tests/NexusMods.MnemonicDB.Tests/DbTests.cs index 95e8fe67..812670de 100644 --- a/tests/NexusMods.MnemonicDB.Tests/DbTests.cs +++ b/tests/NexusMods.MnemonicDB.Tests/DbTests.cs @@ -203,10 +203,9 @@ public async Task CanGetCommitUpdates() Connection.Revisions.Subscribe(update => { - var datoms = update.Datoms(update.BasisTxId).ToArray(); // Only Txes we care about - if (datoms.Any(d => d.E == realId)) - updates.Add(datoms); + if (update.AddedDatoms.Any(d => d.E == realId)) + updates.Add(update.AddedDatoms.Select(d => d.Resolved).ToArray()); }); for (var idx = 0; idx < 4; idx++) @@ -554,6 +553,8 @@ public async Task CanGetModelRevisions() var loadoutNames = new List(); + // TODO: re-enable this once we decide on how to handle revisions + /* using var subscription = loadout.Revisions() .Select(l => l.Name) .Finally(() => loadoutNames.Add("DONE")) @@ -577,6 +578,9 @@ public async Task CanGetModelRevisions() loadoutNames.Count.Should().Be(4, "All revisions should be loaded"); loadoutNames.Should().BeEquivalentTo(["Test Loadout", "Update 1", "Update 2", "DONE"]); + */ + + } [Fact] @@ -590,7 +594,7 @@ public async Task CanObserveIndexChanges() var slice = SliceDescriptor.Create(Mod.Name, Connection.Db.Registry); // Setup the subscription - using var _ = MutableSlice.Observe(Connection, slice) + using var _ = ObservableDatoms.ObserveDatoms(Connection, slice) // Snapshot the values each time .QueryWhenChanged(datoms => datoms.Select(d => d.Resolved.ObjectValue.ToString()!).ToArray()) // Add the changes to the list