Skip to content

Commit

Permalink
Further minimize capture sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
to11mtm committed Feb 10, 2024
1 parent c57f7c7 commit e94254b
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 64 deletions.
97 changes: 33 additions & 64 deletions src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public abstract class BaseByteReadArrayJournalDao : BaseJournalDaoWithReadMessag
private readonly Flow<JournalRow, Try<(IPersistentRepresentation, IImmutableSet<string>, long)>, NotUsed> _deserializeFlow;

private readonly ReadJournalConfig _readJournalConfig;
private readonly TagMode _tagMode;

protected BaseByteReadArrayJournalDao(
IAdvancedScheduler scheduler,
Expand All @@ -42,6 +43,7 @@ protected BaseByteReadArrayJournalDao(
: base(scheduler, materializer, connectionFactory, readJournalConfig, shutdownToken)
{
_readJournalConfig = readJournalConfig;
_tagMode = _readJournalConfig.PluginConfig.TagMode;
_deserializeFlow = serializer.DeserializeFlow();
}

Expand Down Expand Up @@ -82,12 +84,12 @@ public Source<string, NotUsed> AllPersistenceIdsSource(long max)
{
TagMode.Csv => AsyncSource<JournalRow>
.FromEnumerable(
new { separator, tag=$"{separator}{tag}{separator}", offset, maxOffset, maxTake, _connectionFactory = ConnectionFactory },
new { args= new QueryArgs(offset,maxOffset,maxTake,tag,TagMode.Csv), _connectionFactory = ConnectionFactory },
async input =>
{
//var tagValue = input.tag;
return await input._connectionFactory.ExecuteWithTransactionAsync(
input,
input.args,
ReadIsolationLevel,
ShutdownToken,
static async (connection, token, inVals) =>
Expand All @@ -96,55 +98,43 @@ static async (connection, token, inVals) =>
.GetTable<JournalRow>()
.Where(
r =>
r.Tags.Contains(inVals.tag) &&
r.Tags.Contains(inVals.Tag) &&
!r.Deleted &&
r.Ordering > inVals.offset &&
r.Ordering <= inVals.maxOffset)
r.Ordering > inVals.Offset &&
r.Ordering <= inVals.MaxOffset)
.OrderBy(r => r.Ordering)
.Take(inVals.maxTake)
.Take(inVals.Max)
.ToListAsync(token);
});
})
.Via(_deserializeFlow),

TagMode.TagTable => AsyncSource<JournalRow>
.FromEnumerable(
new { inst=this, separator, tag, offset, maxOffset, maxTake },
new { inst=this, args= new QueryArgs(offset,maxOffset,maxTake,tag,TagMode.TagTable)},
static async input =>
{
var inst = input.inst;
return await inst.ConnectionFactory.ExecuteWithTransactionAsync(
input,
input.args,
inst.ReadIsolationLevel,
inst.ShutdownToken,
static async (connection, token,txInput) =>
{
/*
var containedTags = connection.GetTable<JournalTagRow>()
.Where(
r =>
r.OrderingId > txInput.offset &&
r.OrderingId <= txInput.maxOffset &&
r.TagValue == txInput.tag
).Select(r => r.OrderingId);
var query = connection.GetTable<JournalRow>()
.Where(r => r.Ordering.In(containedTags) && r.Deleted == false)
.OrderBy(r => r.Ordering);
*/
var query = connection.GetTable<JournalRow>()
.Where(r => r.Deleted == false)
.Join(
connection.GetTable<JournalTagRow>()
.Where(
jtr =>
jtr.OrderingId > txInput.offset
&& jtr.OrderingId <= txInput.maxOffset
&& jtr.TagValue == txInput.tag),
jtr.OrderingId > txInput.Offset
&& jtr.OrderingId <= txInput.MaxOffset
&& jtr.TagValue == txInput.Tag),
SqlJoinType.Left,
(jr, jtr) => (jr.Ordering == jtr.OrderingId),
(jr, jtr) => jr)
.OrderBy(r => r.Ordering)
.Take(txInput.maxTake);
.Take(txInput.Max);
return await AddTagDataFromTagTableAsync(query, connection, token);
});
})
Expand All @@ -169,7 +159,7 @@ public override Task<Source<Try<ReplayCompletion>, NotUsed>> Messages(
state,
ReadIsolationLevel,
ShutdownToken,
async (connection, token,txState) =>
async (connection, token, txState) =>
{
var query = connection
.GetTable<JournalRow>()
Expand Down Expand Up @@ -202,24 +192,23 @@ public override Task<Source<Try<ReplayCompletion>, NotUsed>> Messages(

public Source<long, NotUsed> JournalSequence(long offset, long limit)
{
var maxTake = MaxTake(limit);

return AsyncSource<long>.FromEnumerable(
new { maxTake, offset, _connectionFactory = ConnectionFactory },
new { maxTake = MaxTake(limit), offset, _connectionFactory = ConnectionFactory },
async input =>
{
return await input._connectionFactory.ExecuteWithTransactionAsync(
new QueryArgs(input.offset,default,input.maxTake, default),
ReadIsolationLevel,
ShutdownToken,
async (connection, token) =>
async (connection, token, args) =>
{
// persistence-jdbc does not filter deleted here.
return await connection
.GetTable<JournalRow>()
.Where(r => r.Ordering > input.offset)
.Where(r => r.Ordering > args.Offset)
.Select(r => r.Ordering)
.OrderBy(r => r)
.Take(input.maxTake)
.Take(args.Max)
.ToListAsync(token);
}
);
Expand Down Expand Up @@ -248,19 +237,7 @@ private static int MaxTake(long max)
? int.MaxValue
: (int)max;

public readonly struct QueryArgs
{
public readonly long Offset;
public readonly long MaxOffset;
public readonly int Max;

public QueryArgs(long offset, long maxOffset, int max)
{
Offset = offset;
MaxOffset = maxOffset;
Max = max;
}
}
public Source<Try<(IPersistentRepresentation, IImmutableSet<string>, long)>, NotUsed> Events(
long offset,
long maxOffset,
Expand All @@ -269,7 +246,7 @@ public QueryArgs(long offset, long maxOffset, int max)
var maxTake = MaxTake(max);

return AsyncSource<JournalRow>.FromEnumerable(
new { _connectionFactory = ConnectionFactory, args=new QueryArgs(offset,maxOffset,maxTake) },
new { _connectionFactory = ConnectionFactory, args=new QueryArgs(offset,maxOffset,maxTake,_tagMode) },
async input =>
{
var cf = input._connectionFactory;
Expand All @@ -278,30 +255,15 @@ public QueryArgs(long offset, long maxOffset, int max)
}
).Via(_deserializeFlow);
}

internal async Task<List<Try<(IPersistentRepresentation, IImmutableSet<string>, long)>>> QueryEvents(long offset, long maxOffset, long take)
{
var e = await ExecuteEventQuery(offset,maxOffset, take);
var retSet = new List<Try<(IPersistentRepresentation, IImmutableSet<string>, long)>>(e.Count);
foreach (var item in e)
{
retSet.Add(_deserializer.Deserialize(item));
}

return retSet;
}

internal async Task<List<JournalRow>> ExecuteEventQuery(long offset, long maxOffset, long take)
{
return await ExecuteEventQuery(new QueryArgs(offset, maxOffset, MaxTake(take)));
}


internal async Task<List<JournalRow>> ExecuteEventQuery(QueryArgs queryArgs)
{
return await ConnectionFactory.ExecuteWithTransactionAsync(
queryArgs,
ReadIsolationLevel,
ShutdownToken,
async (connection, token,a) =>
static async (connection, token,a) =>
{
var query = connection
.GetTable<JournalRow>()
Expand All @@ -313,15 +275,22 @@ internal async Task<List<JournalRow>> ExecuteEventQuery(QueryArgs queryArgs)
.OrderBy(r => r.Ordering)
.Take(a.Max);

return await AddTagDataIfNeededAsync(query, connection, token);
if (a.TagMode != TagMode.TagTable)
{
return await query.ToListAsync(token);
}
else
{
return await AddTagDataFromTagTableAsync(query, connection, token);
}
//return await AddTagDataIfNeededAsync(query, connection, token);
});
}

private async Task<List<JournalRow>> AddTagDataIfNeededAsync(IQueryable<JournalRow> rowQuery, AkkaDataConnection connection, CancellationToken token)
{
if (_readJournalConfig.PluginConfig.TagMode != TagMode.TagTable)
return await rowQuery.ToListAsync(token);

return await AddTagDataFromTagTableAsync(rowQuery, connection, token);
}

Expand Down
32 changes: 32 additions & 0 deletions src/Akka.Persistence.Sql/Query/Dao/QueryArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// -----------------------------------------------------------------------
// <copyright file="QueryArgs.cs" company="Akka.NET Project">
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using Akka.Persistence.Sql.Config;

namespace Akka.Persistence.Sql.Query.Dao
{
internal readonly struct QueryArgs
{
public readonly long Offset;
public readonly long MaxOffset;
public readonly int Max;
public readonly string Tag;
public readonly TagMode TagMode;

public QueryArgs(long offset, long maxOffset, int max, string tag, TagMode tagMode)
{
Offset = offset;
MaxOffset = maxOffset;
Max = max;
Tag = tag;
TagMode= tagMode;
}

public QueryArgs(long offset, long maxOffset, int max, TagMode tagMode) : this(offset, maxOffset, max, null!,tagMode)
{
}
}
}

0 comments on commit e94254b

Please sign in to comment.