Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Missing take and optimize some captures #347

Merged
merged 9 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions src/Akka.Persistence.Sql/Extensions/ConnectionFactoryExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,36 @@ public static async Task<T> ExecuteWithTransactionAsync<T>(
throw;
}
}

public static async Task<T> ExecuteWithTransactionAsync<TState,T>(
this AkkaPersistenceDataConnectionFactory factory,
TState state,
IsolationLevel level,
CancellationToken token,
Func<AkkaDataConnection, CancellationToken, TState, Task<T>> handler)
{
await using var connection = factory.GetConnection();
await using var tx = await connection.BeginTransactionAsync(level, token);

try
{
var result = await handler(connection, token, state);
await tx.CommitAsync(token);
return result;
}
catch (Exception ex1)
{
try
{
await tx.RollbackAsync(token);
}
catch (Exception ex2)
{
throw new AggregateException("Exception thrown when rolling back database transaction", ex2, ex1);
}

throw;
}
}
}
}
142 changes: 81 additions & 61 deletions src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public abstract class BaseByteReadArrayJournalDao : BaseJournalDaoWithReadMessag
private readonly Flow<JournalRow, Try<(IPersistentRepresentation, IImmutableSet<string>, long)>, NotUsed> _deserializeFlow;

private readonly ReadJournalConfig _readJournalConfig;
private readonly TagMode _tagMode;

protected BaseByteReadArrayJournalDao(
IAdvancedScheduler scheduler,
Expand All @@ -42,6 +43,7 @@ protected BaseByteReadArrayJournalDao(
: base(scheduler, materializer, connectionFactory, readJournalConfig, shutdownToken)
{
_readJournalConfig = readJournalConfig;
_tagMode = _readJournalConfig.PluginConfig.TagMode;
to11mtm marked this conversation as resolved.
Show resolved Hide resolved
_deserializeFlow = serializer.DeserializeFlow();
}

Expand All @@ -53,17 +55,17 @@ public Source<string, NotUsed> AllPersistenceIdsSource(long max)
new { _connectionFactory = ConnectionFactory, maxTake },
async input =>
{
return await input._connectionFactory.ExecuteWithTransactionAsync(
return await input._connectionFactory.ExecuteWithTransactionAsync(input.maxTake,
ReadIsolationLevel,
ShutdownToken,
async (connection, token) =>
async (connection, token,take) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just return a Task here instead of async / await ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm honestly not sure, concerns aside from potential stack trace confusion on errors would be whether the fact this func is actually invoked inside two using contexts... Better safe than sorry when it comes to that stuff IMO.

Copy link
Contributor

@Arkatufus Arkatufus Feb 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some discussions on this are here:

Would the performance boost for each query execution be worth it, does it outweight the possible exception stack trace confusion?

From my experience, the "proper" stack trace from an exception thrown from within an awaited Task is confusing in itself, since it is thrown from inside the async...await FSM and most often it will be a detached from the actual code that invoked it (when the thrown exception is caught by the debugger), so I can't really say that it is an actual improvement in debugging experience. But then again, I could be doing it wrong.

On the other hand, the article does point out that using proper async puts the exception where it belongs, in the future when it is supposed to happen, if it indeed needs to be thrown.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to agree with @to11mtm on this one, returning a Task from inside a using block is quite dangerous because we don't know just how robust the IDispose is being implemented by the 3rd party API developers.

{
return await connection
.GetTable<JournalRow>()
.Where(r => r.Deleted == false)
.Select(r => r.PersistenceId)
.Distinct()
.Take(input.maxTake)
.Take(take)
.ToListAsync(token);
});
});
Expand All @@ -82,53 +84,57 @@ public Source<string, NotUsed> AllPersistenceIdsSource(long max)
{
TagMode.Csv => AsyncSource<JournalRow>
.FromEnumerable(
new { separator, tag, offset, maxOffset, maxTake, _connectionFactory = ConnectionFactory },
new { args= new QueryArgs(offset,maxOffset,maxTake,tag,TagMode.Csv), _connectionFactory = ConnectionFactory },
to11mtm marked this conversation as resolved.
Show resolved Hide resolved
async input =>
{
var tagValue = $"{separator}{input.tag}{separator}";
//var tagValue = input.tag;
return await input._connectionFactory.ExecuteWithTransactionAsync(
input.args,
ReadIsolationLevel,
ShutdownToken,
async (connection, token) =>
static async (connection, token, inVals) =>
{
return await connection
.GetTable<JournalRow>()
.Where(
r =>
r.Tags.Contains(tagValue) &&
r.Tags.Contains(inVals.Tag) &&
!r.Deleted &&
r.Ordering > input.offset &&
r.Ordering <= input.maxOffset)
r.Ordering > inVals.Offset &&
r.Ordering <= inVals.MaxOffset)
.OrderBy(r => r.Ordering)
.Take(input.maxTake)
.Take(inVals.Max)
.ToListAsync(token);
});
})
.Via(_deserializeFlow),

TagMode.TagTable => AsyncSource<JournalRow>
.FromEnumerable(
new { separator, tag, offset, maxOffset, maxTake, _connectionFactory = ConnectionFactory },
async input =>
new { inst=this, args= new QueryArgs(offset,maxOffset,maxTake,tag,TagMode.TagTable)},
static async input =>
{
return await input._connectionFactory.ExecuteWithTransactionAsync(
ReadIsolationLevel,
ShutdownToken,
async (connection, token) =>
var inst = input.inst;
return await inst.ConnectionFactory.ExecuteWithTransactionAsync(
input.args,
inst.ReadIsolationLevel,
inst.ShutdownToken,
static async (connection, token,txInput) =>
{
var journalTable = connection.GetTable<JournalRow>();
var tagTable = connection.GetTable<JournalTagRow>();

var query =
from r in journalTable
from lp in tagTable.Where(jtr => jtr.OrderingId == r.Ordering).DefaultIfEmpty()
where lp.OrderingId > input.offset &&
lp.OrderingId <= input.maxOffset &&
!r.Deleted &&
lp.TagValue == input.tag
orderby r.Ordering
select r;

var query = connection.GetTable<JournalRow>()
.Where(r => r.Deleted == false)
.Join(
connection.GetTable<JournalTagRow>()
.Where(
jtr =>
jtr.OrderingId > txInput.Offset
&& jtr.OrderingId <= txInput.MaxOffset
&& jtr.TagValue == txInput.Tag),
SqlJoinType.Left,
(jr, jtr) => (jr.Ordering == jtr.OrderingId),
(jr, jtr) => jr)
.OrderBy(r => r.Ordering)
.Take(txInput.Max);
Copy link
Member Author

@to11mtm to11mtm Feb 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, We didn't have a take here.

However, AFAIR, the LINQ query syntax tends to be more finicky with captures/closures than (carefully) plotted out L2Db Method syntax. However, if the lambda syntax somehow is producing something needed for a specific DB, we can go back to it. NVM It's not playing nice and we can deal with that later.

return await AddTagDataFromTagTableAsync(query, connection, token);
});
})
Expand All @@ -146,24 +152,25 @@ public override Task<Source<Try<ReplayCompletion>, NotUsed>> Messages(
=> Task.FromResult(
AsyncSource<JournalRow>
.FromEnumerable(
new { _connectionFactory = ConnectionFactory, persistenceId, fromSequenceNr, toSequenceNr, toTake = MaxTake(max) },
new { persistenceId, fromSequenceNr, toSequenceNr, toTake = MaxTake(max) },
to11mtm marked this conversation as resolved.
Show resolved Hide resolved
async state =>
{
return await state._connectionFactory.ExecuteWithTransactionAsync(
return await ConnectionFactory.ExecuteWithTransactionAsync(
state,
ReadIsolationLevel,
ShutdownToken,
async (connection, token) =>
async (connection, token, txState) =>
{
var query = connection
.GetTable<JournalRow>()
.Where(
r =>
r.PersistenceId == state.persistenceId &&
r.SequenceNumber >= state.fromSequenceNr &&
r.SequenceNumber <= state.toSequenceNr &&
r.PersistenceId == txState.persistenceId &&
r.SequenceNumber >= txState.fromSequenceNr &&
r.SequenceNumber <= txState.toSequenceNr &&
r.Deleted == false)
.OrderBy(r => r.SequenceNumber)
.Take(state.toTake);
.Take(txState.toTake);

return await AddTagDataIfNeededAsync(query, connection, token);
});
Expand All @@ -185,24 +192,23 @@ public override Task<Source<Try<ReplayCompletion>, NotUsed>> Messages(

public Source<long, NotUsed> JournalSequence(long offset, long limit)
{
var maxTake = MaxTake(limit);

return AsyncSource<long>.FromEnumerable(
new { maxTake, offset, _connectionFactory = ConnectionFactory },
new { maxTake = MaxTake(limit), offset, _connectionFactory = ConnectionFactory },
async input =>
{
return await input._connectionFactory.ExecuteWithTransactionAsync(
new QueryArgs(input.offset,default,input.maxTake, default),
ReadIsolationLevel,
ShutdownToken,
async (connection, token) =>
async (connection, token, args) =>
{
// persistence-jdbc does not filter deleted here.
return await connection
.GetTable<JournalRow>()
.Where(r => r.Ordering > input.offset)
.Where(r => r.Ordering > args.Offset)
.Select(r => r.Ordering)
.OrderBy(r => r)
.Take(input.maxTake)
.Take(args.Max)
.ToListAsync(token);
}
);
Expand Down Expand Up @@ -231,6 +237,7 @@ private static int MaxTake(long max)
? int.MaxValue
: (int)max;


public Source<Try<(IPersistentRepresentation, IImmutableSet<string>, long)>, NotUsed> Events(
long offset,
long maxOffset,
Expand All @@ -239,35 +246,48 @@ private static int MaxTake(long max)
var maxTake = MaxTake(max);

return AsyncSource<JournalRow>.FromEnumerable(
new { _connectionFactory = ConnectionFactory, maxTake, maxOffset, offset },
new { args=new QueryArgs(offset,maxOffset,maxTake,_tagMode) },
async input =>
{
return await input._connectionFactory.ExecuteWithTransactionAsync(
ReadIsolationLevel,
ShutdownToken,
async (connection, token) =>
{
var query = connection
.GetTable<JournalRow>()
.Where(
r =>
r.Ordering > input.offset &&
r.Ordering <= input.maxOffset &&
r.Deleted == false)
.OrderBy(r => r.Ordering)
.Take(input.maxTake);

return await AddTagDataIfNeededAsync(query, connection, token);
});
return await ExecuteEventQuery(input.args);
}
).Via(_deserializeFlow);
}


internal async Task<List<JournalRow>> ExecuteEventQuery(QueryArgs queryArgs)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentionally moved into it's own method,

Among other things, Another memory optimization for us to consider is -not- using Akka streams in the Unfold internal read calls; Where this gets tricky, is that marshalling via Akka streams pipeline, neatly solves problems where a lot of DB Providers aren't really all that async 🙃.

SQLite (Fair enough,) I think some oracle providers, IDK about MySQL...

I can fold back in for now if needed, but it does make the pipeline at least a little bit neater.

{
return await ConnectionFactory.ExecuteWithTransactionAsync(
queryArgs,
ReadIsolationLevel,
ShutdownToken,
static async (connection, token,a) =>
{
var query = connection
.GetTable<JournalRow>()
.Where(
r =>
r.Ordering > a.Offset &&
r.Ordering <= a.MaxOffset &&
r.Deleted == false)
.OrderBy(r => r.Ordering)
.Take(a.Max);

if (a.Mode != TagMode.TagTable)
{
return await query.ToListAsync(token);
}
else
{
return await AddTagDataFromTagTableAsync(query, connection, token);
}
});
}

private async Task<List<JournalRow>> AddTagDataIfNeededAsync(IQueryable<JournalRow> rowQuery, AkkaDataConnection connection, CancellationToken token)
{
if (_readJournalConfig.PluginConfig.TagMode != TagMode.TagTable)
return await rowQuery.ToListAsync(token);

return await AddTagDataFromTagTableAsync(rowQuery, connection, token);
}

Expand Down
32 changes: 32 additions & 0 deletions src/Akka.Persistence.Sql/Query/Dao/QueryArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// -----------------------------------------------------------------------
// <copyright file="QueryArgs.cs" company="Akka.NET Project">
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using Akka.Persistence.Sql.Config;

namespace Akka.Persistence.Sql.Query.Dao
{
internal readonly struct QueryArgs
{
public readonly long Offset;
public readonly long MaxOffset;
public readonly int Max;
public readonly TagMode Mode;
to11mtm marked this conversation as resolved.
Show resolved Hide resolved
public readonly string Tag;

public QueryArgs(long offset, long maxOffset, int max, string tag, TagMode tagMode)
{
Offset = offset;
MaxOffset = maxOffset;
Max = max;
Tag = tag;
Mode= tagMode;
}

public QueryArgs(long offset, long maxOffset, int max, TagMode tagMode) : this(offset, maxOffset, max, null!,tagMode)
{
}
}
}
Loading