Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Missing take and optimize some captures #347

Merged
merged 9 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,19 @@ public AkkaPersistenceDataConnectionFactory(IProviderConfig<JournalTableConfig>
fmb.Build();

_useCloneDataConnection = config.UseCloneConnection;
_opts = new DataOptions()
var opts =new DataOptions()
.UseConnectionString(config.ProviderName, config.ConnectionString)
.UseMappingSchema(mappingSchema);
_opts = opts;

if (config.ProviderName.ToLower().StartsWith("sqlserver"))
_policy = new SqlServerRetryPolicy();



_cloneConnection = new Lazy<AkkaDataConnection>(
() => new AkkaDataConnection(
_opts.ConnectionOptions.ProviderName,
new DataConnection(_opts)));
opts.ConnectionOptions.ProviderName,
new DataConnection(opts)));
Comment on lines 54 to +57
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subtle thing; now we are capturing just the DataOptions reference rather than the factory itself...

IDK how much the GC really cares but it sounds like the sort of thing that can annoy a cycle detector,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

}

public AkkaPersistenceDataConnectionFactory(IProviderConfig<SnapshotTableConfiguration> config)
Expand Down Expand Up @@ -81,17 +83,17 @@ public AkkaPersistenceDataConnectionFactory(IProviderConfig<SnapshotTableConfigu
fmb.Build();

_useCloneDataConnection = config.UseCloneConnection;
_opts = new DataOptions()
var opts = new DataOptions()
.UseConnectionString(config.ProviderName, config.ConnectionString)
.UseMappingSchema(mappingSchema);

_opts = opts;
if (config.ProviderName.ToLower().StartsWith("sqlserver"))
_policy = new SqlServerRetryPolicy();

_cloneConnection = new Lazy<AkkaDataConnection>(
() => new AkkaDataConnection(
_opts.ConnectionOptions.ProviderName,
new DataConnection(_opts)));
opts.ConnectionOptions.ProviderName,
new DataConnection(opts)));
}

private static void MapJournalRow(
Expand Down
40 changes: 40 additions & 0 deletions src/Akka.Persistence.Sql/Extensions/ConnectionFactoryExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Threading;
using System.Threading.Tasks;
using Akka.Persistence.Sql.Db;
using Akka.Persistence.Sql.Query.Dao;

namespace Akka.Persistence.Sql.Extensions
{
Expand Down Expand Up @@ -72,5 +73,44 @@ public static async Task<T> ExecuteWithTransactionAsync<T>(
throw;
}
}

internal static Task<T> ExecuteWithTransactionAsync<TState,T>(
this DbStateHolder factory,
TState state,
Func<AkkaDataConnection, CancellationToken, TState, Task<T>> handler)
{
return factory.ConnectionFactory.ExecuteWithTransactionAsync(state, factory.IsolationLevel, factory.ShutdownToken, handler);
}
Comment on lines +77 to +83
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Convenience method; since we are using the stateholder to avoid other captures, may as well add some sugar to go with it and make the main DAO cleaner.


public static async Task<T> ExecuteWithTransactionAsync<TState,T>(
this AkkaPersistenceDataConnectionFactory factory,
TState state,
IsolationLevel level,
CancellationToken token,
Func<AkkaDataConnection, CancellationToken, TState, Task<T>> handler)
{
await using var connection = factory.GetConnection();
await using var tx = await connection.BeginTransactionAsync(level, token);

try
{
var result = await handler(connection, token, state);
await tx.CommitAsync(token);
return result;
}
catch (Exception ex1)
{
try
{
await tx.RollbackAsync(token);
}
catch (Exception ex2)
{
throw new AggregateException("Exception thrown when rolling back database transaction", ex2, ex1);
}

throw;
}
}
}
}
180 changes: 104 additions & 76 deletions src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public abstract class BaseByteReadArrayJournalDao : BaseJournalDaoWithReadMessag
private readonly Flow<JournalRow, Try<(IPersistentRepresentation, IImmutableSet<string>, long)>, NotUsed> _deserializeFlow;

private readonly ReadJournalConfig _readJournalConfig;
private readonly DbStateHolder _dbStateHolder;

protected BaseByteReadArrayJournalDao(
IAdvancedScheduler scheduler,
Expand All @@ -42,6 +43,7 @@ protected BaseByteReadArrayJournalDao(
: base(scheduler, materializer, connectionFactory, readJournalConfig, shutdownToken)
{
_readJournalConfig = readJournalConfig;
_dbStateHolder = new DbStateHolder(connectionFactory, ReadIsolationLevel, ShutdownToken, _readJournalConfig.PluginConfig.TagMode);
_deserializeFlow = serializer.DeserializeFlow();
}

Expand All @@ -50,20 +52,19 @@ public Source<string, NotUsed> AllPersistenceIdsSource(long max)
var maxTake = MaxTake(max);

return AsyncSource<string>.FromEnumerable(
new { _connectionFactory = ConnectionFactory, maxTake },
async input =>
new { _dbStateHolder, maxTake },
static async input =>
{
return await input._connectionFactory.ExecuteWithTransactionAsync(
ReadIsolationLevel,
ShutdownToken,
async (connection, token) =>
return await input._dbStateHolder.ExecuteWithTransactionAsync(
input.maxTake,
async (connection, token,take) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just return a Task here instead of async / await ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm honestly not sure, concerns aside from potential stack trace confusion on errors would be whether the fact this func is actually invoked inside two using contexts... Better safe than sorry when it comes to that stuff IMO.

Copy link
Contributor

@Arkatufus Arkatufus Feb 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some discussions on this are here:

Would the performance boost for each query execution be worth it, does it outweight the possible exception stack trace confusion?

From my experience, the "proper" stack trace from an exception thrown from within an awaited Task is confusing in itself, since it is thrown from inside the async...await FSM and most often it will be a detached from the actual code that invoked it (when the thrown exception is caught by the debugger), so I can't really say that it is an actual improvement in debugging experience. But then again, I could be doing it wrong.

On the other hand, the article does point out that using proper async puts the exception where it belongs, in the future when it is supposed to happen, if it indeed needs to be thrown.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to agree with @to11mtm on this one, returning a Task from inside a using block is quite dangerous because we don't know just how robust the IDispose is being implemented by the 3rd party API developers.

{
return await connection
.GetTable<JournalRow>()
.Where(r => r.Deleted == false)
.Select(r => r.PersistenceId)
.Distinct()
.Take(input.maxTake)
.Take(take)
.ToListAsync(token);
});
});
Expand All @@ -82,54 +83,50 @@ public Source<string, NotUsed> AllPersistenceIdsSource(long max)
{
TagMode.Csv => AsyncSource<JournalRow>
.FromEnumerable(
new { separator, tag, offset, maxOffset, maxTake, _connectionFactory = ConnectionFactory },
async input =>
new { args= new QueryArgs(offset,maxOffset,maxTake,
$"{separator}{tag}{separator}"), _dbStateHolder },
static async input =>
{
var tagValue = $"{separator}{input.tag}{separator}";
return await input._connectionFactory.ExecuteWithTransactionAsync(
ReadIsolationLevel,
ShutdownToken,
async (connection, token) =>
//var tagValue = input.tag;
return await input._dbStateHolder.ExecuteWithTransactionAsync(
input.args,
static async (connection, token, inVals) =>
{
return await connection
.GetTable<JournalRow>()
.Where(
r =>
r.Tags.Contains(tagValue) &&
r.Tags.Contains(inVals.Tag) &&
!r.Deleted &&
r.Ordering > input.offset &&
r.Ordering <= input.maxOffset)
r.Ordering > inVals.Offset &&
r.Ordering <= inVals.MaxOffset)
.OrderBy(r => r.Ordering)
.Take(input.maxTake)
.Take(inVals.Max)
.ToListAsync(token);
});
})
.Via(_deserializeFlow),

TagMode.TagTable => AsyncSource<JournalRow>
.FromEnumerable(
new { separator, tag, offset, maxOffset, maxTake, _connectionFactory = ConnectionFactory },
async input =>
new { _dbStateHolder, args= new QueryArgs(offset,maxOffset,maxTake,tag)},
static async input =>
{
return await input._connectionFactory.ExecuteWithTransactionAsync(
ReadIsolationLevel,
ShutdownToken,
async (connection, token) =>
return await input._dbStateHolder.ExecuteWithTransactionAsync(
input.args,
static async (connection, token,txInput) =>
{
var journalTable = connection.GetTable<JournalRow>();
var tagTable = connection.GetTable<JournalTagRow>();

var query =
from r in journalTable
from lp in tagTable.Where(jtr => jtr.OrderingId == r.Ordering).DefaultIfEmpty()
where lp.OrderingId > input.offset &&
lp.OrderingId <= input.maxOffset &&
from r in connection.GetTable<JournalRow>()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would still be nice to convert this to method syntax later, if only because it's something like 300 lines of IL as it stands 😅

from lp in connection.GetTable<JournalTagRow>()
.Where(jtr => jtr.OrderingId == r.Ordering).DefaultIfEmpty()
where lp.OrderingId > txInput.Offset &&
lp.OrderingId <= txInput.MaxOffset &&
!r.Deleted &&
lp.TagValue == input.tag
lp.TagValue == txInput.Tag
orderby r.Ordering
select r;

return await AddTagDataFromTagTableAsync(query, connection, token);
return await AddTagDataFromTagTableAsync(query.Take(txInput.Max), connection, token);
});
})
.Via(_deserializeFlow),
Expand All @@ -146,26 +143,25 @@ public override Task<Source<Try<ReplayCompletion>, NotUsed>> Messages(
=> Task.FromResult(
AsyncSource<JournalRow>
.FromEnumerable(
new { _connectionFactory = ConnectionFactory, persistenceId, fromSequenceNr, toSequenceNr, toTake = MaxTake(max) },
async state =>
new { persistenceId, fromSequenceNr, toSequenceNr, toTake = MaxTake(max), _dbStateHolder },
static async state =>
{
return await state._connectionFactory.ExecuteWithTransactionAsync(
ReadIsolationLevel,
ShutdownToken,
async (connection, token) =>
return await state._dbStateHolder.ExecuteWithTransactionAsync(
state,
async (connection, token, txState) =>
{
var query = connection
.GetTable<JournalRow>()
.Where(
r =>
r.PersistenceId == state.persistenceId &&
r.SequenceNumber >= state.fromSequenceNr &&
r.SequenceNumber <= state.toSequenceNr &&
r.PersistenceId == txState.persistenceId &&
r.SequenceNumber >= txState.fromSequenceNr &&
r.SequenceNumber <= txState.toSequenceNr &&
r.Deleted == false)
.OrderBy(r => r.SequenceNumber)
.Take(state.toTake);
.Take(txState.toTake);

return await AddTagDataIfNeededAsync(query, connection, token);
return await AddTagDataIfNeededAsync(txState._dbStateHolder.Mode, query, connection, token);
});
})
.Via(_deserializeFlow)
Expand All @@ -185,24 +181,21 @@ public override Task<Source<Try<ReplayCompletion>, NotUsed>> Messages(

public Source<long, NotUsed> JournalSequence(long offset, long limit)
{
var maxTake = MaxTake(limit);

return AsyncSource<long>.FromEnumerable(
new { maxTake, offset, _connectionFactory = ConnectionFactory },
new { maxTake = MaxTake(limit), offset, _dbStateHolder },
async input =>
{
return await input._connectionFactory.ExecuteWithTransactionAsync(
ReadIsolationLevel,
ShutdownToken,
async (connection, token) =>
return await input._dbStateHolder.ExecuteWithTransactionAsync(
new QueryArgs(input.offset,default,input.maxTake, default),
static async (connection, token, args) =>
{
// persistence-jdbc does not filter deleted here.
return await connection
.GetTable<JournalRow>()
.Where(r => r.Ordering > input.offset)
.Where(r => r.Ordering > args.Offset)
.Select(r => r.Ordering)
.OrderBy(r => r)
.Take(input.maxTake)
.Take(args.Max)
.ToListAsync(token);
}
);
Expand Down Expand Up @@ -231,6 +224,7 @@ private static int MaxTake(long max)
? int.MaxValue
: (int)max;


public Source<Try<(IPersistentRepresentation, IImmutableSet<string>, long)>, NotUsed> Events(
long offset,
long maxOffset,
Expand All @@ -239,35 +233,69 @@ private static int MaxTake(long max)
var maxTake = MaxTake(max);

return AsyncSource<JournalRow>.FromEnumerable(
new { _connectionFactory = ConnectionFactory, maxTake, maxOffset, offset },
async input =>
new {_dbStateHolder , args=new QueryArgs(offset,maxOffset,maxTake) },
static async input =>
{
return await input._connectionFactory.ExecuteWithTransactionAsync(
ReadIsolationLevel,
ShutdownToken,
async (connection, token) =>
{
var query = connection
.GetTable<JournalRow>()
.Where(
r =>
r.Ordering > input.offset &&
r.Ordering <= input.maxOffset &&
r.Deleted == false)
.OrderBy(r => r.Ordering)
.Take(input.maxTake);

return await AddTagDataIfNeededAsync(query, connection, token);
});
return await ExecuteEventQuery(input._dbStateHolder, input._dbStateHolder.Mode, input.args);
}
).Via(_deserializeFlow);
}


internal static async Task<List<JournalRow>> ExecuteEventQuery(DbStateHolder stateHolder, TagMode tagMode, QueryArgs queryArgs)
{
return tagMode != TagMode.TagTable
Copy link
Member Author

@to11mtm to11mtm Feb 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is ugly and IDK how I feel but it's the easiest way to get more captures out of the way, short of some more significant refactoring where we had some different ways to compose without going back to Func<> madness...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is ugly

Use an if statement instead of a ternary operator and it'll look better, I swear :p

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment that was somehow left unposted, I wound up moving these back into separate methods.

? await ExecuteEventQueryNonTagTable(stateHolder, queryArgs)
: await ExecuteEventQueryTagTable(stateHolder, queryArgs);
}

private async Task<List<JournalRow>> AddTagDataIfNeededAsync(IQueryable<JournalRow> rowQuery, AkkaDataConnection connection, CancellationToken token)
private static async Task<List<JournalRow>> ExecuteEventQueryTagTable(DbStateHolder stateHolder, QueryArgs queryArgs)
{
if (_readJournalConfig.PluginConfig.TagMode != TagMode.TagTable)
return await rowQuery.ToListAsync(token);
return await stateHolder.ExecuteWithTransactionAsync(
queryArgs,
static async (connection, token, a) =>
{
var query = connection
.GetTable<JournalRow>()
.Where(
r =>
r.Ordering > a.Offset &&
r.Ordering <= a.MaxOffset &&
r.Deleted == false)
.OrderBy(r => r.Ordering)
.Take(a.Max);
return await AddTagDataFromTagTableAsync(query, connection, token);
});
}

private static async Task<List<JournalRow>> ExecuteEventQueryNonTagTable(DbStateHolder stateHolder, QueryArgs queryArgs)
{
return await stateHolder.ExecuteWithTransactionAsync(
queryArgs,
static async (connection, token, a) =>
{
return await connection
.GetTable<JournalRow>()
.Where(
r =>
r.Ordering > a.Offset &&
r.Ordering <= a.MaxOffset &&
r.Deleted == false)
.OrderBy(r => r.Ordering)
.Take(a.Max)
.ToListAsync(token);
});
}

private static async Task<List<JournalRow>> AddTagDataIfNeededAsync(
TagMode mode,
IQueryable<JournalRow> rowQuery,
AkkaDataConnection connection,
CancellationToken token
)
{
if (mode != TagMode.TagTable)
return await rowQuery.ToListAsync(token);
return await AddTagDataFromTagTableAsync(rowQuery, connection, token);
}

Expand Down
Loading
Loading