Skip to content

Commit

Permalink
fix: add objectid to idempotencekey snapshot oslo
Browse files Browse the repository at this point in the history
  • Loading branch information
emalfroy authored Jan 26, 2024
1 parent ca829b7 commit 94e3109
Showing 1 changed file with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ await snapshotManager.FindMatchingSnapshot(

When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<StreetNameWasRemovedV2>>(async (_, message, ct) =>
{
await Produce($"{osloNamespace}/{message.Message.PersistentLocalId}", "{}", message.Position, ct);
await Produce($"{osloNamespace}/{message.Message.PersistentLocalId}", message.Message.PersistentLocalId.ToString(),"{}", message.Position, ct);
});
}

Expand All @@ -219,25 +219,26 @@ private async Task FindAndProduce(

if (result != null)
{
await Produce(result.Identificator.Id, result.JsonContent, storePosition, ct);
await Produce(result.Identificator.Id, result.Identificator.ObjectId, result.JsonContent, storePosition, ct);
}
}

private async Task Produce(
string puri,
string objectId,
string jsonContent,
long storePosition,
CancellationToken cancellationToken = default)
{
var result = await _producer.Produce(
new MessageKey(objectId),
new MessageKey(puri),
jsonContent,
new List<MessageHeader> { new MessageHeader(MessageHeader.IdempotenceKey, storePosition.ToString()) },
new List<MessageHeader> { new MessageHeader(MessageHeader.IdempotenceKey, $"{objectId}-{storePosition.ToString()}") },
cancellationToken);

if (!result.IsSuccess)
{
throw new InvalidOperationException(result.Error + Environment.NewLine + result.ErrorReason); //TODO: create custom exception
throw new InvalidOperationException(result.Error + Environment.NewLine + result.ErrorReason);
}
}
}
Expand Down

0 comments on commit 94e3109

Please sign in to comment.