Skip to content

Commit

Permalink
cleaned up projector (#164)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaronontheweb authored Mar 26, 2024
1 parent 9b8f5c2 commit e057907
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public static Props GetProps(string persistenceId)
{
return Props.Create(() => new ProductTotalsActor(persistenceId));
}

/// <summary>
/// Used to help differentiate what type of entity this is inside Akka.Persistence's database
/// </summary>
Expand All @@ -30,7 +30,7 @@ public ProductTotalsActor(string persistenceId)
{
PersistenceId = $"{TotalsEntityNameConstant}-" + persistenceId;
State = new ProductState();

Recover<SnapshotOffer>(offer =>
{
if (offer.Snapshot is ProductState state)
Expand All @@ -39,43 +39,33 @@ public ProductTotalsActor(string persistenceId)
}
});

Recover<IProductEvent>(productEvent =>
{
State = State.ProcessEvent(productEvent);
});
Recover<IProductEvent>(productEvent => { State = State.ProcessEvent(productEvent); });

Command<IProductCommand>(cmd =>
{
var response = State.ProcessCommand(cmd);
var sentResponse = false;

if (response.ResponseEvents.Any())
if (response.ResponseEvents.Count != 0)
{
PersistAll(response.ResponseEvents, productEvent =>
{
_log.Info("Processed: {0}", productEvent);

if (productEvent is ProductInventoryWarningEvent warning)
{
_log.Warning(warning.ToString());
}

State = State.ProcessEvent(productEvent);

if (!sentResponse) // otherwise we'll generate a response-per-event
{
sentResponse = true;

async Task<ProductCommandResponse> ReplyToSender()
{
await Task.Delay(1);
return response;
}
var sender = Sender;
ReplyToSender().PipeTo(sender, failure: ex => new Status.Failure(ex));

Sender.Tell(response);
}
if(LastSequenceNr % 10 == 0)

if (LastSequenceNr % 10 == 0)
SaveSnapshot(State);
});
}
Expand All @@ -86,10 +76,7 @@ async Task<ProductCommandResponse> ReplyToSender()
});


Command<SaveSnapshotSuccess>(success =>
{

});
Command<SaveSnapshotSuccess>(success => { });

Command<FetchProduct>(fetch =>
{
Expand All @@ -102,8 +89,8 @@ async Task<ProductCommandResponse> ReplyToSender()
}
});
}

public override string PersistenceId { get; }

public ProductState State { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,4 @@
</Content>
<None Remove="sharding.conf" />
</ItemGroup>

<ItemGroup>
<Folder Include="Configuration\" />
</ItemGroup>
</Project>

0 comments on commit e057907

Please sign in to comment.