diff --git a/src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs b/src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs index d025f741..182559bd 100644 --- a/src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs +++ b/src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs @@ -31,6 +31,7 @@ public abstract class BaseByteReadArrayJournalDao : BaseJournalDaoWithReadMessag private readonly Flow, long)>, NotUsed> _deserializeFlow; private readonly ReadJournalConfig _readJournalConfig; + private readonly TagMode _tagMode; protected BaseByteReadArrayJournalDao( IAdvancedScheduler scheduler, @@ -42,6 +43,7 @@ protected BaseByteReadArrayJournalDao( : base(scheduler, materializer, connectionFactory, readJournalConfig, shutdownToken) { _readJournalConfig = readJournalConfig; + _tagMode = _readJournalConfig.PluginConfig.TagMode; _deserializeFlow = serializer.DeserializeFlow(); } @@ -82,12 +84,12 @@ public Source AllPersistenceIdsSource(long max) { TagMode.Csv => AsyncSource .FromEnumerable( - new { separator, tag=$"{separator}{tag}{separator}", offset, maxOffset, maxTake, _connectionFactory = ConnectionFactory }, + new { args= new QueryArgs(offset,maxOffset,maxTake,tag,TagMode.Csv), _connectionFactory = ConnectionFactory }, async input => { //var tagValue = input.tag; return await input._connectionFactory.ExecuteWithTransactionAsync( - input, + input.args, ReadIsolationLevel, ShutdownToken, static async (connection, token, inVals) => @@ -96,12 +98,12 @@ static async (connection, token, inVals) => .GetTable() .Where( r => - r.Tags.Contains(inVals.tag) && + r.Tags.Contains(inVals.Tag) && !r.Deleted && - r.Ordering > inVals.offset && - r.Ordering <= inVals.maxOffset) + r.Ordering > inVals.Offset && + r.Ordering <= inVals.MaxOffset) .OrderBy(r => r.Ordering) - .Take(inVals.maxTake) + .Take(inVals.Max) .ToListAsync(token); }); }) @@ -109,42 +111,30 @@ static async (connection, token, inVals) => TagMode.TagTable => AsyncSource .FromEnumerable( - new { inst=this, separator, tag, offset, maxOffset, maxTake }, + new { inst=this, args= new QueryArgs(offset,maxOffset,maxTake,tag,TagMode.TagTable)}, static async input => { var inst = input.inst; return await inst.ConnectionFactory.ExecuteWithTransactionAsync( - input, + input.args, inst.ReadIsolationLevel, inst.ShutdownToken, static async (connection, token,txInput) => { - /* - var containedTags = connection.GetTable() - .Where( - r => - r.OrderingId > txInput.offset && - r.OrderingId <= txInput.maxOffset && - r.TagValue == txInput.tag - ).Select(r => r.OrderingId); - var query = connection.GetTable() - .Where(r => r.Ordering.In(containedTags) && r.Deleted == false) - .OrderBy(r => r.Ordering); - */ var query = connection.GetTable() .Where(r => r.Deleted == false) .Join( connection.GetTable() .Where( jtr => - jtr.OrderingId > txInput.offset - && jtr.OrderingId <= txInput.maxOffset - && jtr.TagValue == txInput.tag), + jtr.OrderingId > txInput.Offset + && jtr.OrderingId <= txInput.MaxOffset + && jtr.TagValue == txInput.Tag), SqlJoinType.Left, (jr, jtr) => (jr.Ordering == jtr.OrderingId), (jr, jtr) => jr) .OrderBy(r => r.Ordering) - .Take(txInput.maxTake); + .Take(txInput.Max); return await AddTagDataFromTagTableAsync(query, connection, token); }); }) @@ -169,7 +159,7 @@ public override Task, NotUsed>> Messages( state, ReadIsolationLevel, ShutdownToken, - async (connection, token,txState) => + async (connection, token, txState) => { var query = connection .GetTable() @@ -202,24 +192,23 @@ public override Task, NotUsed>> Messages( public Source JournalSequence(long offset, long limit) { - var maxTake = MaxTake(limit); - return AsyncSource.FromEnumerable( - new { maxTake, offset, _connectionFactory = ConnectionFactory }, + new { maxTake = MaxTake(limit), offset, _connectionFactory = ConnectionFactory }, async input => { return await input._connectionFactory.ExecuteWithTransactionAsync( + new QueryArgs(input.offset,default,input.maxTake, default), ReadIsolationLevel, ShutdownToken, - async (connection, token) => + async (connection, token, args) => { // persistence-jdbc does not filter deleted here. return await connection .GetTable() - .Where(r => r.Ordering > input.offset) + .Where(r => r.Ordering > args.Offset) .Select(r => r.Ordering) .OrderBy(r => r) - .Take(input.maxTake) + .Take(args.Max) .ToListAsync(token); } ); @@ -248,19 +237,7 @@ private static int MaxTake(long max) ? int.MaxValue : (int)max; - public readonly struct QueryArgs - { - public readonly long Offset; - public readonly long MaxOffset; - public readonly int Max; - public QueryArgs(long offset, long maxOffset, int max) - { - Offset = offset; - MaxOffset = maxOffset; - Max = max; - } - } public Source, long)>, NotUsed> Events( long offset, long maxOffset, @@ -269,7 +246,7 @@ public QueryArgs(long offset, long maxOffset, int max) var maxTake = MaxTake(max); return AsyncSource.FromEnumerable( - new { _connectionFactory = ConnectionFactory, args=new QueryArgs(offset,maxOffset,maxTake) }, + new { _connectionFactory = ConnectionFactory, args=new QueryArgs(offset,maxOffset,maxTake,_tagMode) }, async input => { var cf = input._connectionFactory; @@ -278,30 +255,15 @@ public QueryArgs(long offset, long maxOffset, int max) } ).Via(_deserializeFlow); } - - internal async Task, long)>>> QueryEvents(long offset, long maxOffset, long take) - { - var e = await ExecuteEventQuery(offset,maxOffset, take); - var retSet = new List, long)>>(e.Count); - foreach (var item in e) - { - retSet.Add(_deserializer.Deserialize(item)); - } - - return retSet; - } - - internal async Task> ExecuteEventQuery(long offset, long maxOffset, long take) - { - return await ExecuteEventQuery(new QueryArgs(offset, maxOffset, MaxTake(take))); - } + + internal async Task> ExecuteEventQuery(QueryArgs queryArgs) { return await ConnectionFactory.ExecuteWithTransactionAsync( queryArgs, ReadIsolationLevel, ShutdownToken, - async (connection, token,a) => + static async (connection, token,a) => { var query = connection .GetTable() @@ -313,7 +275,15 @@ internal async Task> ExecuteEventQuery(QueryArgs queryArgs) .OrderBy(r => r.Ordering) .Take(a.Max); - return await AddTagDataIfNeededAsync(query, connection, token); + if (a.TagMode != TagMode.TagTable) + { + return await query.ToListAsync(token); + } + else + { + return await AddTagDataFromTagTableAsync(query, connection, token); + } + //return await AddTagDataIfNeededAsync(query, connection, token); }); } @@ -321,7 +291,6 @@ private async Task> AddTagDataIfNeededAsync(IQueryable +// Copyright (C) 2013-2023 .NET Foundation +// +// ----------------------------------------------------------------------- + +using Akka.Persistence.Sql.Config; + +namespace Akka.Persistence.Sql.Query.Dao +{ + internal readonly struct QueryArgs + { + public readonly long Offset; + public readonly long MaxOffset; + public readonly int Max; + public readonly string Tag; + public readonly TagMode TagMode; + + public QueryArgs(long offset, long maxOffset, int max, string tag, TagMode tagMode) + { + Offset = offset; + MaxOffset = maxOffset; + Max = max; + Tag = tag; + TagMode= tagMode; + } + + public QueryArgs(long offset, long maxOffset, int max, TagMode tagMode) : this(offset, maxOffset, max, null!,tagMode) + { + } + } +}