-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Missing take and optimize some captures #347
Changes from all commits
c57f7c7
e94254b
27dc9bf
71d45fb
5b4be3c
224f15c
26acabb
ee0c83b
db7a7df
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ | |
using System.Threading; | ||
using System.Threading.Tasks; | ||
using Akka.Persistence.Sql.Db; | ||
using Akka.Persistence.Sql.Query.Dao; | ||
|
||
namespace Akka.Persistence.Sql.Extensions | ||
{ | ||
|
@@ -72,5 +73,44 @@ public static async Task<T> ExecuteWithTransactionAsync<T>( | |
throw; | ||
} | ||
} | ||
|
||
internal static Task<T> ExecuteWithTransactionAsync<TState,T>( | ||
this DbStateHolder factory, | ||
TState state, | ||
Func<AkkaDataConnection, CancellationToken, TState, Task<T>> handler) | ||
{ | ||
return factory.ConnectionFactory.ExecuteWithTransactionAsync(state, factory.IsolationLevel, factory.ShutdownToken, handler); | ||
} | ||
Comment on lines
+77
to
+83
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Convenience method; since we are using the stateholder to avoid other captures, may as well add some sugar to go with it and make the main DAO cleaner. |
||
|
||
public static async Task<T> ExecuteWithTransactionAsync<TState,T>( | ||
this AkkaPersistenceDataConnectionFactory factory, | ||
TState state, | ||
IsolationLevel level, | ||
CancellationToken token, | ||
Func<AkkaDataConnection, CancellationToken, TState, Task<T>> handler) | ||
{ | ||
await using var connection = factory.GetConnection(); | ||
await using var tx = await connection.BeginTransactionAsync(level, token); | ||
|
||
try | ||
{ | ||
var result = await handler(connection, token, state); | ||
await tx.CommitAsync(token); | ||
return result; | ||
} | ||
catch (Exception ex1) | ||
{ | ||
try | ||
{ | ||
await tx.RollbackAsync(token); | ||
} | ||
catch (Exception ex2) | ||
{ | ||
throw new AggregateException("Exception thrown when rolling back database transaction", ex2, ex1); | ||
} | ||
|
||
throw; | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ public abstract class BaseByteReadArrayJournalDao : BaseJournalDaoWithReadMessag | |
private readonly Flow<JournalRow, Try<(IPersistentRepresentation, IImmutableSet<string>, long)>, NotUsed> _deserializeFlow; | ||
|
||
private readonly ReadJournalConfig _readJournalConfig; | ||
private readonly DbStateHolder _dbStateHolder; | ||
|
||
protected BaseByteReadArrayJournalDao( | ||
IAdvancedScheduler scheduler, | ||
|
@@ -42,6 +43,7 @@ protected BaseByteReadArrayJournalDao( | |
: base(scheduler, materializer, connectionFactory, readJournalConfig, shutdownToken) | ||
{ | ||
_readJournalConfig = readJournalConfig; | ||
_dbStateHolder = new DbStateHolder(connectionFactory, ReadIsolationLevel, ShutdownToken, _readJournalConfig.PluginConfig.TagMode); | ||
_deserializeFlow = serializer.DeserializeFlow(); | ||
} | ||
|
||
|
@@ -50,20 +52,19 @@ public Source<string, NotUsed> AllPersistenceIdsSource(long max) | |
var maxTake = MaxTake(max); | ||
|
||
return AsyncSource<string>.FromEnumerable( | ||
new { _connectionFactory = ConnectionFactory, maxTake }, | ||
async input => | ||
new { _dbStateHolder, maxTake }, | ||
static async input => | ||
{ | ||
return await input._connectionFactory.ExecuteWithTransactionAsync( | ||
ReadIsolationLevel, | ||
ShutdownToken, | ||
async (connection, token) => | ||
return await input._dbStateHolder.ExecuteWithTransactionAsync( | ||
input.maxTake, | ||
async (connection, token,take) => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we just return a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm honestly not sure, concerns aside from potential stack trace confusion on errors would be whether the fact this func is actually invoked inside two There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some discussions on this are here:
Would the performance boost for each query execution be worth it, does it outweight the possible exception stack trace confusion? From my experience, the "proper" stack trace from an exception thrown from within an awaited Task is confusing in itself, since it is thrown from inside the async...await FSM and most often it will be a detached from the actual code that invoked it (when the thrown exception is caught by the debugger), so I can't really say that it is an actual improvement in debugging experience. But then again, I could be doing it wrong. On the other hand, the article does point out that using proper async puts the exception where it belongs, in the future when it is supposed to happen, if it indeed needs to be thrown. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tend to agree with @to11mtm on this one, returning a |
||
{ | ||
return await connection | ||
.GetTable<JournalRow>() | ||
.Where(r => r.Deleted == false) | ||
.Select(r => r.PersistenceId) | ||
.Distinct() | ||
.Take(input.maxTake) | ||
.Take(take) | ||
.ToListAsync(token); | ||
}); | ||
}); | ||
|
@@ -82,54 +83,50 @@ public Source<string, NotUsed> AllPersistenceIdsSource(long max) | |
{ | ||
TagMode.Csv => AsyncSource<JournalRow> | ||
.FromEnumerable( | ||
new { separator, tag, offset, maxOffset, maxTake, _connectionFactory = ConnectionFactory }, | ||
async input => | ||
new { args= new QueryArgs(offset,maxOffset,maxTake, | ||
$"{separator}{tag}{separator}"), _dbStateHolder }, | ||
static async input => | ||
{ | ||
var tagValue = $"{separator}{input.tag}{separator}"; | ||
return await input._connectionFactory.ExecuteWithTransactionAsync( | ||
ReadIsolationLevel, | ||
ShutdownToken, | ||
async (connection, token) => | ||
//var tagValue = input.tag; | ||
return await input._dbStateHolder.ExecuteWithTransactionAsync( | ||
input.args, | ||
static async (connection, token, inVals) => | ||
{ | ||
return await connection | ||
.GetTable<JournalRow>() | ||
.Where( | ||
r => | ||
r.Tags.Contains(tagValue) && | ||
r.Tags.Contains(inVals.Tag) && | ||
!r.Deleted && | ||
r.Ordering > input.offset && | ||
r.Ordering <= input.maxOffset) | ||
r.Ordering > inVals.Offset && | ||
r.Ordering <= inVals.MaxOffset) | ||
.OrderBy(r => r.Ordering) | ||
.Take(input.maxTake) | ||
.Take(inVals.Max) | ||
.ToListAsync(token); | ||
}); | ||
}) | ||
.Via(_deserializeFlow), | ||
|
||
TagMode.TagTable => AsyncSource<JournalRow> | ||
.FromEnumerable( | ||
new { separator, tag, offset, maxOffset, maxTake, _connectionFactory = ConnectionFactory }, | ||
async input => | ||
new { _dbStateHolder, args= new QueryArgs(offset,maxOffset,maxTake,tag)}, | ||
static async input => | ||
{ | ||
return await input._connectionFactory.ExecuteWithTransactionAsync( | ||
ReadIsolationLevel, | ||
ShutdownToken, | ||
async (connection, token) => | ||
return await input._dbStateHolder.ExecuteWithTransactionAsync( | ||
input.args, | ||
static async (connection, token,txInput) => | ||
{ | ||
var journalTable = connection.GetTable<JournalRow>(); | ||
var tagTable = connection.GetTable<JournalTagRow>(); | ||
|
||
var query = | ||
from r in journalTable | ||
from lp in tagTable.Where(jtr => jtr.OrderingId == r.Ordering).DefaultIfEmpty() | ||
where lp.OrderingId > input.offset && | ||
lp.OrderingId <= input.maxOffset && | ||
from r in connection.GetTable<JournalRow>() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would still be nice to convert this to method syntax later, if only because it's something like 300 lines of IL as it stands 😅 |
||
from lp in connection.GetTable<JournalTagRow>() | ||
.Where(jtr => jtr.OrderingId == r.Ordering).DefaultIfEmpty() | ||
where lp.OrderingId > txInput.Offset && | ||
lp.OrderingId <= txInput.MaxOffset && | ||
!r.Deleted && | ||
lp.TagValue == input.tag | ||
lp.TagValue == txInput.Tag | ||
orderby r.Ordering | ||
select r; | ||
|
||
return await AddTagDataFromTagTableAsync(query, connection, token); | ||
return await AddTagDataFromTagTableAsync(query.Take(txInput.Max), connection, token); | ||
}); | ||
}) | ||
.Via(_deserializeFlow), | ||
|
@@ -146,26 +143,25 @@ public override Task<Source<Try<ReplayCompletion>, NotUsed>> Messages( | |
=> Task.FromResult( | ||
AsyncSource<JournalRow> | ||
.FromEnumerable( | ||
new { _connectionFactory = ConnectionFactory, persistenceId, fromSequenceNr, toSequenceNr, toTake = MaxTake(max) }, | ||
async state => | ||
new { persistenceId, fromSequenceNr, toSequenceNr, toTake = MaxTake(max), _dbStateHolder }, | ||
static async state => | ||
{ | ||
return await state._connectionFactory.ExecuteWithTransactionAsync( | ||
ReadIsolationLevel, | ||
ShutdownToken, | ||
async (connection, token) => | ||
return await state._dbStateHolder.ExecuteWithTransactionAsync( | ||
state, | ||
async (connection, token, txState) => | ||
{ | ||
var query = connection | ||
.GetTable<JournalRow>() | ||
.Where( | ||
r => | ||
r.PersistenceId == state.persistenceId && | ||
r.SequenceNumber >= state.fromSequenceNr && | ||
r.SequenceNumber <= state.toSequenceNr && | ||
r.PersistenceId == txState.persistenceId && | ||
r.SequenceNumber >= txState.fromSequenceNr && | ||
r.SequenceNumber <= txState.toSequenceNr && | ||
r.Deleted == false) | ||
.OrderBy(r => r.SequenceNumber) | ||
.Take(state.toTake); | ||
.Take(txState.toTake); | ||
|
||
return await AddTagDataIfNeededAsync(query, connection, token); | ||
return await AddTagDataIfNeededAsync(txState._dbStateHolder.Mode, query, connection, token); | ||
}); | ||
}) | ||
.Via(_deserializeFlow) | ||
|
@@ -185,24 +181,21 @@ public override Task<Source<Try<ReplayCompletion>, NotUsed>> Messages( | |
|
||
public Source<long, NotUsed> JournalSequence(long offset, long limit) | ||
{ | ||
var maxTake = MaxTake(limit); | ||
|
||
return AsyncSource<long>.FromEnumerable( | ||
new { maxTake, offset, _connectionFactory = ConnectionFactory }, | ||
new { maxTake = MaxTake(limit), offset, _dbStateHolder }, | ||
async input => | ||
{ | ||
return await input._connectionFactory.ExecuteWithTransactionAsync( | ||
ReadIsolationLevel, | ||
ShutdownToken, | ||
async (connection, token) => | ||
return await input._dbStateHolder.ExecuteWithTransactionAsync( | ||
new QueryArgs(input.offset,default,input.maxTake, default), | ||
static async (connection, token, args) => | ||
{ | ||
// persistence-jdbc does not filter deleted here. | ||
return await connection | ||
.GetTable<JournalRow>() | ||
.Where(r => r.Ordering > input.offset) | ||
.Where(r => r.Ordering > args.Offset) | ||
.Select(r => r.Ordering) | ||
.OrderBy(r => r) | ||
.Take(input.maxTake) | ||
.Take(args.Max) | ||
.ToListAsync(token); | ||
} | ||
); | ||
|
@@ -231,6 +224,7 @@ private static int MaxTake(long max) | |
? int.MaxValue | ||
: (int)max; | ||
|
||
|
||
public Source<Try<(IPersistentRepresentation, IImmutableSet<string>, long)>, NotUsed> Events( | ||
long offset, | ||
long maxOffset, | ||
|
@@ -239,35 +233,69 @@ private static int MaxTake(long max) | |
var maxTake = MaxTake(max); | ||
|
||
return AsyncSource<JournalRow>.FromEnumerable( | ||
new { _connectionFactory = ConnectionFactory, maxTake, maxOffset, offset }, | ||
async input => | ||
new {_dbStateHolder , args=new QueryArgs(offset,maxOffset,maxTake) }, | ||
static async input => | ||
{ | ||
return await input._connectionFactory.ExecuteWithTransactionAsync( | ||
ReadIsolationLevel, | ||
ShutdownToken, | ||
async (connection, token) => | ||
{ | ||
var query = connection | ||
.GetTable<JournalRow>() | ||
.Where( | ||
r => | ||
r.Ordering > input.offset && | ||
r.Ordering <= input.maxOffset && | ||
r.Deleted == false) | ||
.OrderBy(r => r.Ordering) | ||
.Take(input.maxTake); | ||
|
||
return await AddTagDataIfNeededAsync(query, connection, token); | ||
}); | ||
return await ExecuteEventQuery(input._dbStateHolder, input._dbStateHolder.Mode, input.args); | ||
} | ||
).Via(_deserializeFlow); | ||
} | ||
|
||
|
||
internal static async Task<List<JournalRow>> ExecuteEventQuery(DbStateHolder stateHolder, TagMode tagMode, QueryArgs queryArgs) | ||
{ | ||
return tagMode != TagMode.TagTable | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is ugly and IDK how I feel but it's the easiest way to get more captures out of the way, short of some more significant refactoring where we had some different ways to compose without going back to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Use an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comment that was somehow left unposted, I wound up moving these back into separate methods. |
||
? await ExecuteEventQueryNonTagTable(stateHolder, queryArgs) | ||
: await ExecuteEventQueryTagTable(stateHolder, queryArgs); | ||
} | ||
|
||
private async Task<List<JournalRow>> AddTagDataIfNeededAsync(IQueryable<JournalRow> rowQuery, AkkaDataConnection connection, CancellationToken token) | ||
private static async Task<List<JournalRow>> ExecuteEventQueryTagTable(DbStateHolder stateHolder, QueryArgs queryArgs) | ||
{ | ||
if (_readJournalConfig.PluginConfig.TagMode != TagMode.TagTable) | ||
return await rowQuery.ToListAsync(token); | ||
return await stateHolder.ExecuteWithTransactionAsync( | ||
queryArgs, | ||
static async (connection, token, a) => | ||
{ | ||
var query = connection | ||
.GetTable<JournalRow>() | ||
.Where( | ||
r => | ||
r.Ordering > a.Offset && | ||
r.Ordering <= a.MaxOffset && | ||
r.Deleted == false) | ||
.OrderBy(r => r.Ordering) | ||
.Take(a.Max); | ||
return await AddTagDataFromTagTableAsync(query, connection, token); | ||
}); | ||
} | ||
|
||
private static async Task<List<JournalRow>> ExecuteEventQueryNonTagTable(DbStateHolder stateHolder, QueryArgs queryArgs) | ||
{ | ||
return await stateHolder.ExecuteWithTransactionAsync( | ||
queryArgs, | ||
static async (connection, token, a) => | ||
{ | ||
return await connection | ||
.GetTable<JournalRow>() | ||
.Where( | ||
r => | ||
r.Ordering > a.Offset && | ||
r.Ordering <= a.MaxOffset && | ||
r.Deleted == false) | ||
.OrderBy(r => r.Ordering) | ||
.Take(a.Max) | ||
.ToListAsync(token); | ||
}); | ||
} | ||
|
||
private static async Task<List<JournalRow>> AddTagDataIfNeededAsync( | ||
TagMode mode, | ||
IQueryable<JournalRow> rowQuery, | ||
AkkaDataConnection connection, | ||
CancellationToken token | ||
) | ||
{ | ||
if (mode != TagMode.TagTable) | ||
return await rowQuery.ToListAsync(token); | ||
return await AddTagDataFromTagTableAsync(rowQuery, connection, token); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
subtle thing; now we are capturing just the
DataOptions
reference rather than the factory itself...IDK how much the GC really cares but it sounds like the sort of thing that can annoy a cycle detector,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM