Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use array datatype for tags storage and searching #70

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 85 additions & 13 deletions src/Akka.Persistence.PostgreSql/Journal/PostgreSqlQueryExecutor.cs
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
using System.Data;
using System.Data.Common;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Akka.Persistence.PostgreSql.Journal
{
Expand All @@ -32,7 +33,16 @@ public PostgreSqlQueryExecutor(PostgreSqlQueryConfiguration configuration, Akka.
{
_configuration = configuration;
var storedAs = configuration.StoredAs.ToString().ToUpperInvariant();


var allEventColumnNames = $@"
e.{Configuration.PersistenceIdColumnName} as PersistenceId,
e.{Configuration.SequenceNrColumnName} as SequenceNr,
e.{Configuration.TimestampColumnName} as Timestamp,
e.{Configuration.IsDeletedColumnName} as IsDeleted,
e.{Configuration.ManifestColumnName} as Manifest,
e.{Configuration.PayloadColumnName} as Payload,
e.{Configuration.SerializerIdColumnName} as SerializerId";

CreateEventsJournalSql = $@"
CREATE TABLE IF NOT EXISTS {Configuration.FullJournalTableName} (
{Configuration.OrderingColumnName} BIGSERIAL NOT NULL PRIMARY KEY,
Expand All @@ -42,10 +52,13 @@ public PostgreSqlQueryExecutor(PostgreSqlQueryConfiguration configuration, Akka.
{Configuration.TimestampColumnName} BIGINT NOT NULL,
{Configuration.ManifestColumnName} VARCHAR(500) NOT NULL,
{Configuration.PayloadColumnName} {storedAs} NOT NULL,
{Configuration.TagsColumnName} VARCHAR(100) NULL,
{Configuration.TagsColumnName} VARCHAR(100)[] NULL,
{Configuration.SerializerIdColumnName} INTEGER NULL,
CONSTRAINT {Configuration.JournalEventsTableName}_uq UNIQUE ({Configuration.PersistenceIdColumnName}, {Configuration.SequenceNrColumnName})
);

CREATE INDEX IF NOT EXISTS idx_{Configuration.FullJournalTableName.Replace('.', '_')}_{Configuration.TagsColumnName}_gin
ON {Configuration.FullJournalTableName} USING gin ({Configuration.TagsColumnName});
";

CreateMetaTableSql = $@"
Expand All @@ -55,6 +68,19 @@ public PostgreSqlQueryExecutor(PostgreSqlQueryConfiguration configuration, Akka.
CONSTRAINT {Configuration.MetaTableName}_pk PRIMARY KEY ({Configuration.PersistenceIdColumnName}, {Configuration.SequenceNrColumnName})
);";

HighestTagOrderingSql =
$@"
SELECT MAX(e.{Configuration.OrderingColumnName}) as Ordering
FROM {Configuration.FullJournalTableName} e
WHERE e.{Configuration.OrderingColumnName} > @Ordering AND e.{Configuration.TagsColumnName} @> @Tag";

ByTagSql =
$@"
SELECT {allEventColumnNames}, e.{Configuration.OrderingColumnName} as Ordering
FROM {Configuration.FullJournalTableName} e
WHERE e.{Configuration.OrderingColumnName} > @Ordering AND e.{Configuration.TagsColumnName} @> @Tag
ORDER BY {Configuration.OrderingColumnName} ASC";

switch (_configuration.StoredAs)
{
case StoredAsType.ByteA:
Expand All @@ -75,7 +101,7 @@ public PostgreSqlQueryExecutor(PostgreSqlQueryConfiguration configuration, Akka.
var deserializer = Serialization.FindSerializerForType(type, Configuration.DefaultSerializer);
return deserializer.FromBinary((byte[])serialized, type);
}
};
};
break;
case StoredAsType.JsonB:
_serialize = e => new SerializationResult(NpgsqlDbType.Jsonb, JsonConvert.SerializeObject(e.Payload, _configuration.JsonSerializerSettings), null);
Expand All @@ -93,6 +119,8 @@ public PostgreSqlQueryExecutor(PostgreSqlQueryConfiguration configuration, Akka.
protected override DbCommand CreateCommand(DbConnection connection) => ((NpgsqlConnection)connection).CreateCommand();
protected override string CreateEventsJournalSql { get; }
protected override string CreateMetaTableSql { get; }
protected override string HighestTagOrderingSql { get; }
protected override string ByTagSql { get; }

protected override void WriteEvent(DbCommand command, IPersistentRepresentation e, IImmutableSet<string> tags)
{
Expand Down Expand Up @@ -125,17 +153,61 @@ protected override void WriteEvent(DbCommand command, IPersistentRepresentation

command.Parameters.Add(new NpgsqlParameter("@Payload", serializationResult.DbType) { Value = serializationResult.Payload });

if (tags.Count != 0)
command.Parameters.Add(tags.Count != 0
? new NpgsqlParameter("@Tag", NpgsqlDbType.Array | NpgsqlDbType.Varchar) {Value = tags.ToArray()}
: new NpgsqlParameter("@Tag", NpgsqlDbType.Array | NpgsqlDbType.Varchar) {Value = DBNull.Value});
}

/// <summary>
/// TBD
/// </summary>
/// <param name="connection">TBD</param>
/// <param name="cancellationToken">TBD</param>
/// <param name="tag">TBD</param>
/// <param name="fromOffset">TBD</param>
/// <param name="toOffset">TBD</param>
/// <param name="max">TBD</param>
/// <param name="callback">TBD</param>
/// <returns>TBD</returns>
public override async Task<long> SelectByTagAsync(DbConnection connection, CancellationToken cancellationToken, string tag, long fromOffset, long toOffset, long max,
Action<ReplayedTaggedMessage> callback)
{
using (var command = GetCommand(connection, ByTagSql))
{
var tagBuilder = new StringBuilder(";", tags.Sum(x => x.Length) + tags.Count + 1);
foreach (var tag in tags)
var take = Math.Min(toOffset - fromOffset, max);
command.Parameters.Add(new NpgsqlParameter("@Tag", NpgsqlDbType.Array | NpgsqlDbType.Varchar) { Value = new[] { tag }});
AddParameter(command, "@Ordering", DbType.Int64, fromOffset);
AddParameter(command, "@Take", DbType.Int64, take);

CommandBehavior commandBehavior;

if (Configuration.UseSequentialAccess)
{
commandBehavior = CommandBehavior.SequentialAccess;
}
else
{
commandBehavior = CommandBehavior.Default;
}

using (var reader = await command.ExecuteReaderAsync(commandBehavior, cancellationToken))
{
tagBuilder.Append(tag).Append(';');
while (await reader.ReadAsync(cancellationToken))
{
var persistent = ReadEvent(reader);
var ordering = reader.GetInt64(OrderingIndex);
callback(new ReplayedTaggedMessage(persistent, tag, ordering));
}
}
}

AddParameter(command, "@Tag", DbType.String, tagBuilder.ToString());
using (var command = GetCommand(connection, HighestTagOrderingSql))
{
command.Parameters.Add(new NpgsqlParameter("@Tag", NpgsqlDbType.Array | NpgsqlDbType.Varchar) { Value = new[] { tag } });
AddParameter(command, "@Ordering", DbType.Int64, fromOffset);
var maxOrdering = (await command.ExecuteScalarAsync(cancellationToken)) as long? ?? 0L;
return maxOrdering;
}
else AddParameter(command, "@Tag", DbType.String, DBNull.Value);
}

private static string QualifiedName(IPersistentRepresentation e)
Expand Down Expand Up @@ -169,7 +241,7 @@ protected override IPersistentRepresentation ReadEvent(DbDataReader reader)
return new Persistent(deserialized, sequenceNr, persistenceId, manifest, isDeleted, ActorRefs.NoSender, null);
}
}

public class PostgreSqlQueryConfiguration : QueryConfiguration
{
public readonly StoredAsType StoredAs;
Expand All @@ -191,10 +263,10 @@ public PostgreSqlQueryConfiguration(
TimeSpan timeout,
StoredAsType storedAs,
string defaultSerializer,
JsonSerializerSettings jsonSerializerSettings = null,
JsonSerializerSettings jsonSerializerSettings = null,
bool useSequentialAccess = true)
: base(schemaName, journalEventsTableName, metaTableName, persistenceIdColumnName, sequenceNrColumnName,
payloadColumnName, manifestColumnName, timestampColumnName, isDeletedColumnName, tagsColumnName, orderingColumn,
payloadColumnName, manifestColumnName, timestampColumnName, isDeletedColumnName, tagsColumnName, orderingColumn,
serializerIdColumnName, timeout, defaultSerializer, useSequentialAccess)
{
StoredAs = storedAs;
Expand Down