Skip to content
This repository has been archived by the owner on Sep 17, 2023. It is now read-only.

Commit

Permalink
Add PubManyAsync (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
igoree authored Mar 10, 2021
1 parent ce38d73 commit 7f5f470
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 2 deletions.
10 changes: 9 additions & 1 deletion src/main/MyNatsClient/INatsClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ public interface INatsClient
/// <param name="p"></param>
void PubMany(Action<IPublisher> p);

/// <summary>
/// Gives access to a publisher that will be running in
/// an async locked scope until your injected delegate
/// is done.
/// </summary>
/// <param name="p"></param>
Task PubManyAsync(Func<IPublisher, Task> p);

/// <summary>
/// Async request response.
/// </summary>
Expand Down Expand Up @@ -249,4 +257,4 @@ public interface INatsClient
/// <param name="subscriptionInfo"></param>
Task UnsubAsync(SubscriptionInfo subscriptionInfo);
}
}
}
14 changes: 14 additions & 0 deletions src/main/MyNatsClient/NatsClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,20 @@ public void PubMany(Action<IPublisher> p)
});
}

public async Task PubManyAsync(Func<IPublisher, Task> p)
{
ThrowIfDisposed();

ThrowIfNotConnected();

await _connection.WithWriteLockAsync(async writer =>
{
await p(new Publisher(writer, _connection.ServerInfo.MaxPayload)).ConfigureAwait(false);
if (ShouldAutoFlush)
await writer.FlushAsync().ConfigureAwait(false);
}).ConfigureAwait(false);
}

public Task<MsgOp> RequestAsync(string subject, string body, CancellationToken cancellationToken = default)
=> RequestAsync(subject, NatsEncoder.GetBytes(body), cancellationToken);

Expand Down
6 changes: 5 additions & 1 deletion src/testing/IntegrationTests/PubSubTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,14 @@ public async Task Client_Should_be_able_to_publish_and_consume_messages_When_pub

await Context.DelayAsync();

_client1.PubMany(async p =>
_client1.PubMany(p =>
{
p.Pub(subject, messages[0]);
p.Pub(subject, Encoding.UTF8.GetBytes(messages[1]));
});

await _client1.PubManyAsync(async p =>
{
await p.PubAsync(subject, messages[2]);
await p.PubAsync(subject, Encoding.UTF8.GetBytes(messages[3]));
});
Expand Down

0 comments on commit 7f5f470

Please sign in to comment.