diff --git a/src/Akka.Persistence.PostgreSql/Journal/PostgreSqlQueryExecutor.cs b/src/Akka.Persistence.PostgreSql/Journal/PostgreSqlQueryExecutor.cs old mode 100644 new mode 100755 index dd030fc..638a23a --- a/src/Akka.Persistence.PostgreSql/Journal/PostgreSqlQueryExecutor.cs +++ b/src/Akka.Persistence.PostgreSql/Journal/PostgreSqlQueryExecutor.cs @@ -17,7 +17,8 @@ using System.Data; using System.Data.Common; using System.Linq; -using System.Text; +using System.Threading; +using System.Threading.Tasks; namespace Akka.Persistence.PostgreSql.Journal { @@ -32,7 +33,16 @@ public PostgreSqlQueryExecutor(PostgreSqlQueryConfiguration configuration, Akka. { _configuration = configuration; var storedAs = configuration.StoredAs.ToString().ToUpperInvariant(); - + + var allEventColumnNames = $@" + e.{Configuration.PersistenceIdColumnName} as PersistenceId, + e.{Configuration.SequenceNrColumnName} as SequenceNr, + e.{Configuration.TimestampColumnName} as Timestamp, + e.{Configuration.IsDeletedColumnName} as IsDeleted, + e.{Configuration.ManifestColumnName} as Manifest, + e.{Configuration.PayloadColumnName} as Payload, + e.{Configuration.SerializerIdColumnName} as SerializerId"; + CreateEventsJournalSql = $@" CREATE TABLE IF NOT EXISTS {Configuration.FullJournalTableName} ( {Configuration.OrderingColumnName} BIGSERIAL NOT NULL PRIMARY KEY, @@ -42,10 +52,13 @@ public PostgreSqlQueryExecutor(PostgreSqlQueryConfiguration configuration, Akka. {Configuration.TimestampColumnName} BIGINT NOT NULL, {Configuration.ManifestColumnName} VARCHAR(500) NOT NULL, {Configuration.PayloadColumnName} {storedAs} NOT NULL, - {Configuration.TagsColumnName} VARCHAR(100) NULL, + {Configuration.TagsColumnName} VARCHAR(100)[] NULL, {Configuration.SerializerIdColumnName} INTEGER NULL, CONSTRAINT {Configuration.JournalEventsTableName}_uq UNIQUE ({Configuration.PersistenceIdColumnName}, {Configuration.SequenceNrColumnName}) ); + + CREATE INDEX IF NOT EXISTS idx_{Configuration.FullJournalTableName.Replace('.', '_')}_{Configuration.TagsColumnName}_gin + ON {Configuration.FullJournalTableName} USING gin ({Configuration.TagsColumnName}); "; CreateMetaTableSql = $@" @@ -55,6 +68,19 @@ public PostgreSqlQueryExecutor(PostgreSqlQueryConfiguration configuration, Akka. CONSTRAINT {Configuration.MetaTableName}_pk PRIMARY KEY ({Configuration.PersistenceIdColumnName}, {Configuration.SequenceNrColumnName}) );"; + HighestTagOrderingSql = + $@" + SELECT MAX(e.{Configuration.OrderingColumnName}) as Ordering + FROM {Configuration.FullJournalTableName} e + WHERE e.{Configuration.OrderingColumnName} > @Ordering AND e.{Configuration.TagsColumnName} @> @Tag"; + + ByTagSql = + $@" + SELECT {allEventColumnNames}, e.{Configuration.OrderingColumnName} as Ordering + FROM {Configuration.FullJournalTableName} e + WHERE e.{Configuration.OrderingColumnName} > @Ordering AND e.{Configuration.TagsColumnName} @> @Tag + ORDER BY {Configuration.OrderingColumnName} ASC"; + switch (_configuration.StoredAs) { case StoredAsType.ByteA: @@ -75,7 +101,7 @@ public PostgreSqlQueryExecutor(PostgreSqlQueryConfiguration configuration, Akka. var deserializer = Serialization.FindSerializerForType(type, Configuration.DefaultSerializer); return deserializer.FromBinary((byte[])serialized, type); } - }; + }; break; case StoredAsType.JsonB: _serialize = e => new SerializationResult(NpgsqlDbType.Jsonb, JsonConvert.SerializeObject(e.Payload, _configuration.JsonSerializerSettings), null); @@ -93,6 +119,8 @@ public PostgreSqlQueryExecutor(PostgreSqlQueryConfiguration configuration, Akka. protected override DbCommand CreateCommand(DbConnection connection) => ((NpgsqlConnection)connection).CreateCommand(); protected override string CreateEventsJournalSql { get; } protected override string CreateMetaTableSql { get; } + protected override string HighestTagOrderingSql { get; } + protected override string ByTagSql { get; } protected override void WriteEvent(DbCommand command, IPersistentRepresentation e, IImmutableSet tags) { @@ -125,17 +153,61 @@ protected override void WriteEvent(DbCommand command, IPersistentRepresentation command.Parameters.Add(new NpgsqlParameter("@Payload", serializationResult.DbType) { Value = serializationResult.Payload }); - if (tags.Count != 0) + command.Parameters.Add(tags.Count != 0 + ? new NpgsqlParameter("@Tag", NpgsqlDbType.Array | NpgsqlDbType.Varchar) {Value = tags.ToArray()} + : new NpgsqlParameter("@Tag", NpgsqlDbType.Array | NpgsqlDbType.Varchar) {Value = DBNull.Value}); + } + + /// + /// TBD + /// + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + public override async Task SelectByTagAsync(DbConnection connection, CancellationToken cancellationToken, string tag, long fromOffset, long toOffset, long max, + Action callback) + { + using (var command = GetCommand(connection, ByTagSql)) { - var tagBuilder = new StringBuilder(";", tags.Sum(x => x.Length) + tags.Count + 1); - foreach (var tag in tags) + var take = Math.Min(toOffset - fromOffset, max); + command.Parameters.Add(new NpgsqlParameter("@Tag", NpgsqlDbType.Array | NpgsqlDbType.Varchar) { Value = new[] { tag }}); + AddParameter(command, "@Ordering", DbType.Int64, fromOffset); + AddParameter(command, "@Take", DbType.Int64, take); + + CommandBehavior commandBehavior; + + if (Configuration.UseSequentialAccess) + { + commandBehavior = CommandBehavior.SequentialAccess; + } + else + { + commandBehavior = CommandBehavior.Default; + } + + using (var reader = await command.ExecuteReaderAsync(commandBehavior, cancellationToken)) { - tagBuilder.Append(tag).Append(';'); + while (await reader.ReadAsync(cancellationToken)) + { + var persistent = ReadEvent(reader); + var ordering = reader.GetInt64(OrderingIndex); + callback(new ReplayedTaggedMessage(persistent, tag, ordering)); + } } + } - AddParameter(command, "@Tag", DbType.String, tagBuilder.ToString()); + using (var command = GetCommand(connection, HighestTagOrderingSql)) + { + command.Parameters.Add(new NpgsqlParameter("@Tag", NpgsqlDbType.Array | NpgsqlDbType.Varchar) { Value = new[] { tag } }); + AddParameter(command, "@Ordering", DbType.Int64, fromOffset); + var maxOrdering = (await command.ExecuteScalarAsync(cancellationToken)) as long? ?? 0L; + return maxOrdering; } - else AddParameter(command, "@Tag", DbType.String, DBNull.Value); } private static string QualifiedName(IPersistentRepresentation e) @@ -169,7 +241,7 @@ protected override IPersistentRepresentation ReadEvent(DbDataReader reader) return new Persistent(deserialized, sequenceNr, persistenceId, manifest, isDeleted, ActorRefs.NoSender, null); } } - + public class PostgreSqlQueryConfiguration : QueryConfiguration { public readonly StoredAsType StoredAs; @@ -191,10 +263,10 @@ public PostgreSqlQueryConfiguration( TimeSpan timeout, StoredAsType storedAs, string defaultSerializer, - JsonSerializerSettings jsonSerializerSettings = null, + JsonSerializerSettings jsonSerializerSettings = null, bool useSequentialAccess = true) : base(schemaName, journalEventsTableName, metaTableName, persistenceIdColumnName, sequenceNrColumnName, - payloadColumnName, manifestColumnName, timestampColumnName, isDeletedColumnName, tagsColumnName, orderingColumn, + payloadColumnName, manifestColumnName, timestampColumnName, isDeletedColumnName, tagsColumnName, orderingColumn, serializerIdColumnName, timeout, defaultSerializer, useSequentialAccess) { StoredAs = storedAs;