From dfa7ffe60df09c251f4ff7995632b87fee95a0bc Mon Sep 17 00:00:00 2001 From: Sebastian Walter Date: Mon, 30 Oct 2023 13:32:37 +0100 Subject: [PATCH] Adds API to IEventStore to load events up to a given sequence number --- .../EventStore/MongoDbEventPersistence.cs | 11 +++++++ .../EventStores/MsSqlEventPersistence.cs | 31 +++++++++++++++++ .../PostgresSqlEventPersistence.cs | 33 +++++++++++++++++++ .../Suites/TestSuiteForEventStore.cs | 16 +++++++++ .../IntegrationTests/CancellationTests.cs | 10 ++++++ .../Snapshots/SnapshotAggregateRootTests.cs | 25 ++++++++++++++ .../EventFlow/EventStores/EventStoreBase.cs | 32 ++++++++++++++++-- .../Files/FilesEventPersistence.cs | 25 ++++++++++++++ .../EventStores/IEventPersistence.cs | 6 ++++ Source/EventFlow/EventStores/IEventStore.cs | 8 +++++ .../InMemory/InMemoryEventPersistence.cs | 24 ++++++++++++-- 11 files changed, 216 insertions(+), 5 deletions(-) diff --git a/Source/EventFlow.MongoDB/EventStore/MongoDbEventPersistence.cs b/Source/EventFlow.MongoDB/EventStore/MongoDbEventPersistence.cs index 172a8c876..d022c5a82 100644 --- a/Source/EventFlow.MongoDB/EventStore/MongoDbEventPersistence.cs +++ b/Source/EventFlow.MongoDB/EventStore/MongoDbEventPersistence.cs @@ -113,6 +113,17 @@ public async Task> LoadCommittedEvent .ConfigureAwait(continueOnCapturedContext: false); } + public async Task> LoadCommittedEventsAsync(IIdentity id, int fromEventSequenceNumber, int toEventSequenceNumber, + CancellationToken cancellationToken) + { + return await MongoDbEventStoreCollection + .Find(model => model.AggregateId == id.Value && + model.AggregateSequenceNumber >= fromEventSequenceNumber && + model.AggregateSequenceNumber <= toEventSequenceNumber) + .ToListAsync(cancellationToken) + .ConfigureAwait(continueOnCapturedContext: false); + } + public async Task DeleteEventsAsync(IIdentity id, CancellationToken cancellationToken) { DeleteResult affectedRows = await MongoDbEventStoreCollection diff --git a/Source/EventFlow.MsSql/EventStores/MsSqlEventPersistence.cs b/Source/EventFlow.MsSql/EventStores/MsSqlEventPersistence.cs index d6bbb470b..676138ae7 100644 --- a/Source/EventFlow.MsSql/EventStores/MsSqlEventPersistence.cs +++ b/Source/EventFlow.MsSql/EventStores/MsSqlEventPersistence.cs @@ -195,6 +195,37 @@ ORDER BY return eventDataModels; } + public async Task> LoadCommittedEventsAsync( + IIdentity id, + int fromEventSequenceNumber, + int toEventSequenceNumber, + CancellationToken cancellationToken) + { + const string sql = @" + SELECT + GlobalSequenceNumber, BatchId, AggregateId, AggregateName, Data, Metadata, AggregateSequenceNumber + FROM EventFlow + WHERE + AggregateId = @AggregateId AND + AggregateSequenceNumber >= @FromEventSequenceNumber AND + AggregateSequenceNumber <= @ToEventSequenceNumber + ORDER BY + AggregateSequenceNumber ASC"; + var eventDataModels = await _connection.QueryAsync( + Label.Named("mssql-fetch-events"), + null, + cancellationToken, + sql, + new + { + AggregateId = id.Value, + FromEventSequenceNumber = fromEventSequenceNumber, + ToEventSequenceNumber = toEventSequenceNumber + }) + .ConfigureAwait(false); + return eventDataModels; + } + public async Task DeleteEventsAsync(IIdentity id, CancellationToken cancellationToken) { const string sql = @"DELETE FROM EventFlow WHERE AggregateId = @AggregateId"; diff --git a/Source/EventFlow.PostgreSql/EventStores/PostgresSqlEventPersistence.cs b/Source/EventFlow.PostgreSql/EventStores/PostgresSqlEventPersistence.cs index 3a1980ffa..184fd676f 100644 --- a/Source/EventFlow.PostgreSql/EventStores/PostgresSqlEventPersistence.cs +++ b/Source/EventFlow.PostgreSql/EventStores/PostgresSqlEventPersistence.cs @@ -170,6 +170,38 @@ INSERT INTO return eventDataModels; } + public async Task> LoadCommittedEventsAsync( + IIdentity id, + int fromEventSequenceNumber, + int toEventSequenceNumber, + CancellationToken cancellationToken) + { + const string sql = @" + SELECT + GlobalSequenceNumber, BatchId, AggregateId, AggregateName, Data, Metadata, AggregateSequenceNumber + FROM EventFlow + WHERE + AggregateId = @AggregateId AND + AggregateSequenceNumber >= @FromEventSequenceNumber AND + AggregateSequenceNumber <= @ToSequenceNumber + ORDER BY + AggregateSequenceNumber ASC;"; + + var eventDataModels = await _connection.QueryAsync( + Label.Named("postgresql-fetch-events"), + null, + cancellationToken, + sql, + new + { + AggregateId = id.Value, + FromEventSequenceNumber = fromEventSequenceNumber, + ToSequenceNumber = toEventSequenceNumber + }) + .ConfigureAwait(false); + return eventDataModels; + } + public async Task> LoadCommittedEventsAsync( IIdentity id, int fromEventSequenceNumber, @@ -184,6 +216,7 @@ FROM EventFlow AggregateSequenceNumber >= @FromEventSequenceNumber ORDER BY AggregateSequenceNumber ASC;"; + var eventDataModels = await _connection.QueryAsync( Label.Named("postgresql-fetch-events"), null, diff --git a/Source/EventFlow.TestHelpers/Suites/TestSuiteForEventStore.cs b/Source/EventFlow.TestHelpers/Suites/TestSuiteForEventStore.cs index 5ef9770f6..ad938f3cb 100644 --- a/Source/EventFlow.TestHelpers/Suites/TestSuiteForEventStore.cs +++ b/Source/EventFlow.TestHelpers/Suites/TestSuiteForEventStore.cs @@ -187,6 +187,22 @@ public async Task LoadingOfEventsCanStartLater() domainEvents.ElementAt(1).AggregateSequenceNumber.Should().Be(4); domainEvents.ElementAt(2).AggregateSequenceNumber.Should().Be(5); } + + [Test] + public async Task LoadingOfEventsCanStartLaterAndStopEarlier() + { + // Arrange + var id = ThingyId.New; + await PublishPingCommandsAsync(id, 5); + + // Act + var domainEvents = await EventStore.LoadEventsAsync(id, 3, 4, CancellationToken.None); + + // Assert + domainEvents.Should().HaveCount(2); + domainEvents.ElementAt(0).AggregateSequenceNumber.Should().Be(3); + domainEvents.ElementAt(1).AggregateSequenceNumber.Should().Be(4); + } [Test] public async Task AggregateCanHaveMultipleCommits() diff --git a/Source/EventFlow.Tests/IntegrationTests/CancellationTests.cs b/Source/EventFlow.Tests/IntegrationTests/CancellationTests.cs index ca224818f..a6798ef15 100644 --- a/Source/EventFlow.Tests/IntegrationTests/CancellationTests.cs +++ b/Source/EventFlow.Tests/IntegrationTests/CancellationTests.cs @@ -379,6 +379,16 @@ public async Task> LoadCommittedEvent return result; } + public async Task> LoadCommittedEventsAsync(IIdentity id, + int fromEventSequenceNumber, + int toEventSequenceNumber, + CancellationToken cancellationToken) + { + var result = await _inner.LoadCommittedEventsAsync(id, fromEventSequenceNumber, toEventSequenceNumber, cancellationToken); + await LoadCompletionSource.Task; + return result; + } + public Task DeleteEventsAsync(IIdentity id, CancellationToken cancellationToken) { return _inner.DeleteEventsAsync(id, cancellationToken); diff --git a/Source/EventFlow.Tests/UnitTests/Snapshots/SnapshotAggregateRootTests.cs b/Source/EventFlow.Tests/UnitTests/Snapshots/SnapshotAggregateRootTests.cs index 9c5bf3318..d99c0d7cc 100644 --- a/Source/EventFlow.Tests/UnitTests/Snapshots/SnapshotAggregateRootTests.cs +++ b/Source/EventFlow.Tests/UnitTests/Snapshots/SnapshotAggregateRootTests.cs @@ -115,6 +115,27 @@ public async Task Test_Arrange_EventStore(int eventInStore, int fromEventSequenc // Assert domainEvents.Should().HaveCount(expectedNumberOfEvents); } + + [Description("Mock test")] + [TestCase(5, 3, 5, 3)] + [TestCase(5, 0, 5, 5)] + [TestCase(5, 1, 2, 2)] + [TestCase(0, 1, 2, 0)] + public async Task Test_Arrange_EventStore_SequenceRange(int eventInStore, int fromEventSequenceNumber, int toEventSequenceNumber, int expectedNumberOfEvents) + { + // Arrange + Arrange_EventStore(ManyDomainEvents(eventInStore)); + + // Act + var domainEvents = await _eventStoreMock.Object.LoadEventsAsync( + A(), + fromEventSequenceNumber, + toEventSequenceNumber, + CancellationToken.None); + + // Assert + domainEvents.Should().HaveCount(expectedNumberOfEvents); + } private void Arrange_EventStore(IEnumerable> domainEvents) { @@ -123,6 +144,10 @@ private void Arrange_EventStore(IEnumerable e.LoadEventsAsync(It.IsAny(), It.IsAny(), It.IsAny())) .Returns((id, seq, c) => Task.FromResult>>(domainEventList.Skip(Math.Max(seq - 1, 0)).ToList())); + + _eventStoreMock + .Setup(e => e.LoadEventsAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns((id, from, to, c) => Task.FromResult>>(domainEventList.Take(to).Skip(Math.Max(from - 1, 0)).ToList())); } private void Arrange_Snapshot(ThingySnapshot thingySnapshot) diff --git a/Source/EventFlow/EventStores/EventStoreBase.cs b/Source/EventFlow/EventStores/EventStoreBase.cs index 286904c53..0b17dd9f0 100644 --- a/Source/EventFlow/EventStores/EventStoreBase.cs +++ b/Source/EventFlow/EventStores/EventStoreBase.cs @@ -154,6 +154,25 @@ public Task>> LoadEvents cancellationToken); } + public async Task>> LoadEventsAsync( + TIdentity id, + int fromEventSequenceNumber, + int toEventSequenceNumber, + CancellationToken cancellationToken) where TAggregate : IAggregateRoot where TIdentity : IIdentity + { + if (fromEventSequenceNumber < 1) throw new ArgumentOutOfRangeException(nameof(fromEventSequenceNumber), "Event sequence numbers start at 1"); + if (toEventSequenceNumber <= fromEventSequenceNumber) throw new ArgumentOutOfRangeException(nameof(toEventSequenceNumber), "Event sequence numbers end at start"); + + var committedDomainEvents = await _eventPersistence.LoadCommittedEventsAsync( + id, + fromEventSequenceNumber, + toEventSequenceNumber, + cancellationToken) + .ConfigureAwait(false); + + return await MapToDomainEvents(id, cancellationToken, committedDomainEvents); + } + public virtual async Task>> LoadEventsAsync( TIdentity id, int fromEventSequenceNumber, @@ -164,10 +183,17 @@ public virtual async Task(id, cancellationToken, committedDomainEvents); + } + + private async Task>> MapToDomainEvents(TIdentity id, CancellationToken cancellationToken, + IReadOnlyCollection committedDomainEvents) where TAggregate : IAggregateRoot where TIdentity : IIdentity + { var domainEvents = (IReadOnlyCollection>)committedDomainEvents .Select(e => _eventJsonSerializer.Deserialize(id, e)) .ToList(); diff --git a/Source/EventFlow/EventStores/Files/FilesEventPersistence.cs b/Source/EventFlow/EventStores/Files/FilesEventPersistence.cs index 27b6dfc02..8dbf163d7 100644 --- a/Source/EventFlow/EventStores/Files/FilesEventPersistence.cs +++ b/Source/EventFlow/EventStores/Files/FilesEventPersistence.cs @@ -236,6 +236,31 @@ public async Task> LoadCommittedEvent } } + public async Task> LoadCommittedEventsAsync( + IIdentity id, + int fromEventSequenceNumber, + int toEventSequenceNumber, + CancellationToken cancellationToken) + { + using (await _asyncLock.WaitAsync(cancellationToken).ConfigureAwait(false)) + { + var committedDomainEvents = new List(); + for (var i = fromEventSequenceNumber; i <= toEventSequenceNumber ; i++) + { + var eventPath = _filesEventLocator.GetEventPath(id, i); + if (!File.Exists(eventPath)) + { + return committedDomainEvents; + } + + var committedDomainEvent = await LoadFileEventDataFile(eventPath).ConfigureAwait(false); + committedDomainEvents.Add(committedDomainEvent); + } + + return committedDomainEvents; + } + } + public async Task DeleteEventsAsync(IIdentity id, CancellationToken cancellationToken) { _logger.LogTrace("Deleting entity with ID {EventFilePath}", id); diff --git a/Source/EventFlow/EventStores/IEventPersistence.cs b/Source/EventFlow/EventStores/IEventPersistence.cs index 4d0c2fd3e..63a62a118 100644 --- a/Source/EventFlow/EventStores/IEventPersistence.cs +++ b/Source/EventFlow/EventStores/IEventPersistence.cs @@ -44,6 +44,12 @@ Task> LoadCommittedEventsAsync( IIdentity id, int fromEventSequenceNumber, CancellationToken cancellationToken); + + Task> LoadCommittedEventsAsync( + IIdentity id, + int fromEventSequenceNumber, + int toEventSequenceNumber, + CancellationToken cancellationToken); Task DeleteEventsAsync(IIdentity id, CancellationToken cancellationToken); } diff --git a/Source/EventFlow/EventStores/IEventStore.cs b/Source/EventFlow/EventStores/IEventStore.cs index ee51027b7..5a3b51180 100644 --- a/Source/EventFlow/EventStores/IEventStore.cs +++ b/Source/EventFlow/EventStores/IEventStore.cs @@ -51,6 +51,14 @@ Task>> LoadEventsAsync where TIdentity : IIdentity; + + Task>> LoadEventsAsync( + TIdentity id, + int fromSequenceNumber, + int toSequenceNumber, + CancellationToken cancellationToken) + where TAggregate : IAggregateRoot + where TIdentity : IIdentity; Task>> LoadEventsAsync( TIdentity id, diff --git a/Source/EventFlow/EventStores/InMemory/InMemoryEventPersistence.cs b/Source/EventFlow/EventStores/InMemory/InMemoryEventPersistence.cs index bb1a0c466..e326b7c2a 100644 --- a/Source/EventFlow/EventStores/InMemory/InMemoryEventPersistence.cs +++ b/Source/EventFlow/EventStores/InMemory/InMemoryEventPersistence.cs @@ -193,13 +193,33 @@ public Task> LoadCommittedEventsAsync IIdentity id, int fromEventSequenceNumber, CancellationToken cancellationToken) + { + return LoadCommittedEventsAsync( + id, + fromEventSequenceNumber, + e => e.AggregateSequenceNumber >= fromEventSequenceNumber); + } + + public Task> LoadCommittedEventsAsync( + IIdentity id, + int fromEventSequenceNumber, + int toEventSequenceNumber, + CancellationToken cancellationToken) + { + return LoadCommittedEventsAsync( + id, + fromEventSequenceNumber, + e => e.AggregateSequenceNumber >= fromEventSequenceNumber && e.AggregateSequenceNumber <= toEventSequenceNumber); + } + + private Task> LoadCommittedEventsAsync(IIdentity id,int fromEventSequenceNumber, Func filter) { IReadOnlyCollection result; if (_eventStore.TryGetValue(id.Value, out var committedDomainEvent)) result = fromEventSequenceNumber <= 1 - ? (IReadOnlyCollection) committedDomainEvent - : committedDomainEvent.Where(e => e.AggregateSequenceNumber >= fromEventSequenceNumber).ToList(); + ? (IReadOnlyCollection)committedDomainEvent + : committedDomainEvent.Where(filter).ToList(); else result = new List();