Skip to content

Commit

Permalink
Fix dropped message detection
Browse files Browse the repository at this point in the history
Also updates to latest coroutines and toolbelt
  • Loading branch information
dallison committed Sep 8, 2023
1 parent 57da4a9 commit bde970e
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 12 deletions.
12 changes: 6 additions & 6 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ http_archive(

http_archive(
name = "toolbelt",
urls = ["https://github.com/dallison/cpp_toolbelt/archive/refs/tags/1.1.0.tar.gz"],
strip_prefix = "cpp_toolbelt-1.1.0",
sha256 = "6cc17ad74089c036866e6ef740f57ac318d416dbc6761bf85ec923af7c5c0b95"
urls = ["https://github.com/dallison/cpp_toolbelt/archive/refs/tags/1.1.1.tar.gz"],
strip_prefix = "cpp_toolbelt-1.1.1",
sha256 = "8c2a99448ea2776ffd400a4a85b5355c19f0506f4b4306cdeff67d1ba20474fb"
)

# For local debugging of toolbelt coroutine library.
Expand All @@ -81,9 +81,9 @@ http_archive(

http_archive(
name = "coroutines",
urls = ["https://github.com/dallison/co/archive/refs/tags/1.3.2.tar.gz"],
strip_prefix = "co-1.3.2",
sha256 = "2a005dc6e86e2e8ac732605bf7ec464b39cb6ee64d3bbc40e3205f8348e790d1"
urls = ["https://github.com/dallison/co/archive/refs/tags/1.3.3.tar.gz"],
strip_prefix = "co-1.3.3",
sha256 = "5c152150ee06213ae2c0af83609d2c5baacb621f857c69dd77c41f0fc453ae32"
)

# For local debugging of co coroutine library.
Expand Down
16 changes: 10 additions & 6 deletions client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ void Client::RegisterResizeCallback(
resize_callbacks_[publisher] = std::move(callback);
}

absl::Status
Client::UnregisterResizeCallback(PublisherImpl *publisher) {
absl::Status Client::UnregisterResizeCallback(PublisherImpl *publisher) {
auto it = resize_callbacks_.find(publisher);
if (it == resize_callbacks_.end()) {
return absl::InternalError(absl::StrFormat(
Expand Down Expand Up @@ -449,7 +448,8 @@ Client::ReadMessageInternal(SubscriberImpl *subscriber, ReadMode mode,
printf("read new_slot: %d: %" PRId64 "\n", new_slot->id, new_slot->ordinal);
}

if (last_ordinal != -1 && new_slot->ordinal != (last_ordinal + 1)) {
if (mode == ReadMode::kReadNext && last_ordinal != -1 &&
new_slot->ordinal != (last_ordinal + 1)) {
// We dropped a message. If we have a callback registered for this
// channel, call it with the number of dropped messages.
auto it = dropped_message_callbacks_.find(subscriber);
Expand Down Expand Up @@ -549,11 +549,13 @@ struct pollfd Client::GetPollFd(PublisherImpl *publisher) const {
return fd;
}

toolbelt::FileDescriptor Client::GetFileDescriptor(SubscriberImpl *subscriber) const {
toolbelt::FileDescriptor
Client::GetFileDescriptor(SubscriberImpl *subscriber) const {
return subscriber->GetPollFd();
}

toolbelt::FileDescriptor Client::GetFileDescriptor(PublisherImpl *publisher) const {
toolbelt::FileDescriptor
Client::GetFileDescriptor(PublisherImpl *publisher) const {
if (!publisher->IsReliable()) {
return toolbelt::FileDescriptor();
}
Expand Down Expand Up @@ -833,7 +835,9 @@ absl::Status Client::ResizeChannel(PublisherImpl *publisher,
// an error, we don't perform the resize.
auto it = resize_callbacks_.find(publisher);
if (it != resize_callbacks_.end()) {
if (absl::Status s = it->second(publisher, publisher->SlotSize(), new_slot_size); !s.ok()) {
if (absl::Status s =
it->second(publisher, publisher->SlotSize(), new_slot_size);
!s.ok()) {
return s;
}
}
Expand Down

0 comments on commit bde970e

Please sign in to comment.