Skip to content

Commit

Permalink
Fix Take and leaky captures
Browse files Browse the repository at this point in the history
  • Loading branch information
to11mtm committed Feb 10, 2024
1 parent 85c7e54 commit c57f7c7
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 52 deletions.
31 changes: 31 additions & 0 deletions src/Akka.Persistence.Sql/Extensions/ConnectionFactoryExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,36 @@ public static async Task<T> ExecuteWithTransactionAsync<T>(
throw;
}
}

public static async Task<T> ExecuteWithTransactionAsync<TState,T>(
this AkkaPersistenceDataConnectionFactory factory,
TState state,
IsolationLevel level,
CancellationToken token,
Func<AkkaDataConnection, CancellationToken, TState, Task<T>> handler)
{
await using var connection = factory.GetConnection();
await using var tx = await connection.BeginTransactionAsync(level, token);

try
{
var result = await handler(connection, token, state);
await tx.CommitAsync(token);
return result;
}
catch (Exception ex1)
{
try
{
await tx.RollbackAsync(token);
}
catch (Exception ex2)
{
throw new AggregateException("Exception thrown when rolling back database transaction", ex2, ex1);
}

throw;
}
}
}
}
158 changes: 106 additions & 52 deletions src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,17 @@ public Source<string, NotUsed> AllPersistenceIdsSource(long max)
new { _connectionFactory = ConnectionFactory, maxTake },
async input =>
{
return await input._connectionFactory.ExecuteWithTransactionAsync(
return await input._connectionFactory.ExecuteWithTransactionAsync(input.maxTake,
ReadIsolationLevel,
ShutdownToken,
async (connection, token) =>
async (connection, token,take) =>
{
return await connection
.GetTable<JournalRow>()
.Where(r => r.Deleted == false)
.Select(r => r.PersistenceId)
.Distinct()
.Take(input.maxTake)
.Take(take)
.ToListAsync(token);
});
});
Expand All @@ -82,53 +82,69 @@ public Source<string, NotUsed> AllPersistenceIdsSource(long max)
{
TagMode.Csv => AsyncSource<JournalRow>
.FromEnumerable(
new { separator, tag, offset, maxOffset, maxTake, _connectionFactory = ConnectionFactory },
new { separator, tag=$"{separator}{tag}{separator}", offset, maxOffset, maxTake, _connectionFactory = ConnectionFactory },
async input =>
{
var tagValue = $"{separator}{input.tag}{separator}";
//var tagValue = input.tag;
return await input._connectionFactory.ExecuteWithTransactionAsync(
input,
ReadIsolationLevel,
ShutdownToken,
async (connection, token) =>
static async (connection, token, inVals) =>
{
return await connection
.GetTable<JournalRow>()
.Where(
r =>
r.Tags.Contains(tagValue) &&
r.Tags.Contains(inVals.tag) &&
!r.Deleted &&
r.Ordering > input.offset &&
r.Ordering <= input.maxOffset)
r.Ordering > inVals.offset &&
r.Ordering <= inVals.maxOffset)
.OrderBy(r => r.Ordering)
.Take(input.maxTake)
.Take(inVals.maxTake)
.ToListAsync(token);
});
})
.Via(_deserializeFlow),

TagMode.TagTable => AsyncSource<JournalRow>
.FromEnumerable(
new { separator, tag, offset, maxOffset, maxTake, _connectionFactory = ConnectionFactory },
async input =>
new { inst=this, separator, tag, offset, maxOffset, maxTake },
static async input =>
{
return await input._connectionFactory.ExecuteWithTransactionAsync(
ReadIsolationLevel,
ShutdownToken,
async (connection, token) =>
var inst = input.inst;
return await inst.ConnectionFactory.ExecuteWithTransactionAsync(
input,
inst.ReadIsolationLevel,
inst.ShutdownToken,
static async (connection, token,txInput) =>
{
var journalTable = connection.GetTable<JournalRow>();
var tagTable = connection.GetTable<JournalTagRow>();

var query =
from r in journalTable
from lp in tagTable.Where(jtr => jtr.OrderingId == r.Ordering).DefaultIfEmpty()
where lp.OrderingId > input.offset &&
lp.OrderingId <= input.maxOffset &&
!r.Deleted &&
lp.TagValue == input.tag
orderby r.Ordering
select r;

/*
var containedTags = connection.GetTable<JournalTagRow>()
.Where(
r =>
r.OrderingId > txInput.offset &&
r.OrderingId <= txInput.maxOffset &&
r.TagValue == txInput.tag
).Select(r => r.OrderingId);
var query = connection.GetTable<JournalRow>()
.Where(r => r.Ordering.In(containedTags) && r.Deleted == false)
.OrderBy(r => r.Ordering);
*/
var query = connection.GetTable<JournalRow>()
.Where(r => r.Deleted == false)
.Join(
connection.GetTable<JournalTagRow>()
.Where(
jtr =>
jtr.OrderingId > txInput.offset
&& jtr.OrderingId <= txInput.maxOffset
&& jtr.TagValue == txInput.tag),
SqlJoinType.Left,
(jr, jtr) => (jr.Ordering == jtr.OrderingId),
(jr, jtr) => jr)
.OrderBy(r => r.Ordering)
.Take(txInput.maxTake);
return await AddTagDataFromTagTableAsync(query, connection, token);
});
})
Expand All @@ -150,20 +166,21 @@ public override Task<Source<Try<ReplayCompletion>, NotUsed>> Messages(
async state =>
{
return await state._connectionFactory.ExecuteWithTransactionAsync(
state,
ReadIsolationLevel,
ShutdownToken,
async (connection, token) =>
async (connection, token,txState) =>
{
var query = connection
.GetTable<JournalRow>()
.Where(
r =>
r.PersistenceId == state.persistenceId &&
r.SequenceNumber >= state.fromSequenceNr &&
r.SequenceNumber <= state.toSequenceNr &&
r.PersistenceId == txState.persistenceId &&
r.SequenceNumber >= txState.fromSequenceNr &&
r.SequenceNumber <= txState.toSequenceNr &&
r.Deleted == false)
.OrderBy(r => r.SequenceNumber)
.Take(state.toTake);
.Take(txState.toTake);

return await AddTagDataIfNeededAsync(query, connection, token);
});
Expand Down Expand Up @@ -231,6 +248,19 @@ private static int MaxTake(long max)
? int.MaxValue
: (int)max;

public readonly struct QueryArgs
{
public readonly long Offset;
public readonly long MaxOffset;
public readonly int Max;

public QueryArgs(long offset, long maxOffset, int max)
{
Offset = offset;
MaxOffset = maxOffset;
Max = max;
}
}
public Source<Try<(IPersistentRepresentation, IImmutableSet<string>, long)>, NotUsed> Events(
long offset,
long maxOffset,
Expand All @@ -239,30 +269,54 @@ private static int MaxTake(long max)
var maxTake = MaxTake(max);

return AsyncSource<JournalRow>.FromEnumerable(
new { _connectionFactory = ConnectionFactory, maxTake, maxOffset, offset },
new { _connectionFactory = ConnectionFactory, args=new QueryArgs(offset,maxOffset,maxTake) },
async input =>
{
return await input._connectionFactory.ExecuteWithTransactionAsync(
ReadIsolationLevel,
ShutdownToken,
async (connection, token) =>
{
var query = connection
.GetTable<JournalRow>()
.Where(
r =>
r.Ordering > input.offset &&
r.Ordering <= input.maxOffset &&
r.Deleted == false)
.OrderBy(r => r.Ordering)
.Take(input.maxTake);

return await AddTagDataIfNeededAsync(query, connection, token);
});
var cf = input._connectionFactory;
var a = input.args;
return await ExecuteEventQuery(a);
}
).Via(_deserializeFlow);
}

internal async Task<List<Try<(IPersistentRepresentation, IImmutableSet<string>, long)>>> QueryEvents(long offset, long maxOffset, long take)
{
var e = await ExecuteEventQuery(offset,maxOffset, take);
var retSet = new List<Try<(IPersistentRepresentation, IImmutableSet<string>, long)>>(e.Count);
foreach (var item in e)
{
retSet.Add(_deserializer.Deserialize(item));
}

return retSet;
}

internal async Task<List<JournalRow>> ExecuteEventQuery(long offset, long maxOffset, long take)
{
return await ExecuteEventQuery(new QueryArgs(offset, maxOffset, MaxTake(take)));
}
internal async Task<List<JournalRow>> ExecuteEventQuery(QueryArgs queryArgs)
{
return await ConnectionFactory.ExecuteWithTransactionAsync(
queryArgs,
ReadIsolationLevel,
ShutdownToken,
async (connection, token,a) =>
{
var query = connection
.GetTable<JournalRow>()
.Where(
r =>
r.Ordering > a.Offset &&
r.Ordering <= a.MaxOffset &&
r.Deleted == false)
.OrderBy(r => r.Ordering)
.Take(a.Max);

return await AddTagDataIfNeededAsync(query, connection, token);
});
}

private async Task<List<JournalRow>> AddTagDataIfNeededAsync(IQueryable<JournalRow> rowQuery, AkkaDataConnection connection, CancellationToken token)
{
if (_readJournalConfig.PluginConfig.TagMode != TagMode.TagTable)
Expand Down

0 comments on commit c57f7c7

Please sign in to comment.