diff --git a/common/channel.cc b/common/channel.cc index d72ebd7..af3b95f 100644 --- a/common/channel.cc +++ b/common/channel.cc @@ -83,6 +83,25 @@ static void UnmapMemory(void *p, size_t size, const char *purpose) { mapped_regions->erase(p); #endif +} + +template +static void UnmapBuffers(BufferSetIter first, BufferSetIter last, int num_slots) { + // Unmap any previously mapped buffers. + for (; first < last; ++first) { + int64_t buffers_size = + sizeof(BufferHeader) + + num_slots * + (Aligned<32>(first->slot_size) + sizeof(MessagePrefix)); + if (buffers_size > 0 && first->buffer != nullptr) { + UnmapMemory(first->buffer, buffers_size, "buffers"); + first->buffer = nullptr; + first->slot_size = 0; + } + } + + + } static absl::StatusOr CreateSharedMemory(int id, const char *suffix, @@ -319,15 +338,7 @@ absl::Status Channel::Map(SharedMemoryFds fds, UnmapMemory(scb_, sizeof(SystemControlBlock), "SCB"); UnmapMemory(ccb_, ccb_size, "CCB"); // Unmap any previously mapped buffers. - for (int i = 0; i < index; i++) { - int64_t buffers_size = - sizeof(BufferHeader) + - num_slots_ * - (Aligned<32>(buffers_[i].slot_size) + sizeof(MessagePrefix)); - if (buffers_size > 0 && buffers_[i].buffer != nullptr) { - UnmapMemory(buffers_[i].buffer, buffers_size, "buffers"); - } - } + UnmapBuffers(buffers_.begin(), buffers_.begin() + index, num_slots_); return absl::InternalError(absl::StrFormat( "Failed to map channel buffers: %s", strerror(errno))); } @@ -411,15 +422,7 @@ absl::Status Channel::MapNewBuffers(std::vector buffers) { if (mem == MAP_FAILED) { // Unmap any newly mapped buffers. - for (size_t i = start; i < buffers_.size(); i++) { - int64_t buffers_size = - sizeof(BufferHeader) + - num_slots_ * - (Aligned<32>(buffers_[i].slot_size) + sizeof(MessagePrefix)); - if (buffers_size > 0 && buffers_[i].buffer != nullptr) { - UnmapMemory(buffers_[i].buffer, buffers_size, "buffers"); - } - } + UnmapBuffers(buffers_.begin() + start, buffers_.end(), num_slots_); return absl::InternalError(absl::StrFormat( "Failed to map new channel buffers: %s", strerror(errno))); } diff --git a/manual_tests/perf_subspace.cc b/manual_tests/perf_subspace.cc index d824f61..75609bd 100644 --- a/manual_tests/perf_subspace.cc +++ b/manual_tests/perf_subspace.cc @@ -17,10 +17,10 @@ ABSL_FLAG(int, num_slots, 500, "Number of slots in channel"); void PubCoroutine(co::Coroutine *c) { subspace::Client client(c); - absl::Status s = client.Init(absl::GetFlag(FLAGS_socket)); - if (!s.ok()) { + absl::Status init_status = client.Init(absl::GetFlag(FLAGS_socket)); + if (!init_status.ok()) { fprintf(stderr, "Can't connect to Subspace server: %s\n", - s.ToString().c_str()); + init_status.ToString().c_str()); exit(1); } int num_slots = absl::GetFlag(FLAGS_num_slots); @@ -46,9 +46,9 @@ void PubCoroutine(co::Coroutine *c) { } if (*buffer == nullptr) { // Wait for publisher trigger. - absl::Status s = pub->Wait(); - if (!s.ok()) { - fprintf(stderr, "Can't wait for publisher: %s", s.ToString().c_str()); + absl::Status wait_status = pub->Wait(); + if (!wait_status.ok()) { + fprintf(stderr, "Can't wait for publisher: %s", wait_status.ToString().c_str()); exit(1); } continue; @@ -66,10 +66,10 @@ void PubCoroutine(co::Coroutine *c) { void SubCoroutine(co::Coroutine *c) { subspace::Client client(c); - absl::Status s = client.Init(absl::GetFlag(FLAGS_socket)); - if (!s.ok()) { + absl::Status init_status = client.Init(absl::GetFlag(FLAGS_socket)); + if (!init_status.ok()) { fprintf(stderr, "Can't connect to Subspace server: %s\n", - s.ToString().c_str()); + init_status.ToString().c_str()); exit(1); } int slot_size = absl::GetFlag(FLAGS_slot_size); @@ -97,8 +97,8 @@ void SubCoroutine(co::Coroutine *c) { if (start != 0) { wait_start = toolbelt::Now(); } - if (absl::Status s = sub->Wait(); !s.ok()) { - fprintf(stderr, "Can't wait for subscriber: %s\n", s.ToString().c_str()); + if (absl::Status wait_status = sub->Wait(); !wait_status.ok()) { + fprintf(stderr, "Can't wait for subscriber: %s\n", wait_status.ToString().c_str()); exit(1); } if (wait_start != 0) { diff --git a/manual_tests/perf_subspace_pub.cc b/manual_tests/perf_subspace_pub.cc index 67f3e08..689f5d9 100644 --- a/manual_tests/perf_subspace_pub.cc +++ b/manual_tests/perf_subspace_pub.cc @@ -20,9 +20,9 @@ int main(int argc, char **argv) { signal(SIGPIPE, SIG_IGN); subspace::Client client; - absl::Status s = client.Init(absl::GetFlag(FLAGS_socket)); - if (!s.ok()) { - fprintf(stderr, "Can't connect to Subspace server: %s\n", s.ToString().c_str()); + absl::Status init_status = client.Init(absl::GetFlag(FLAGS_socket)); + if (!init_status.ok()) { + fprintf(stderr, "Can't connect to Subspace server: %s\n", init_status.ToString().c_str()); exit(1); } bool reliable = absl::GetFlag(FLAGS_reliable); @@ -49,9 +49,9 @@ int main(int argc, char **argv) { } if (*buffer == nullptr) { // Wait for publisher trigger. - absl::Status s = pub->Wait(); - if (!s.ok()) { - fprintf(stderr, "Can't wait for publisher: %s", s.ToString().c_str()); + absl::Status wait_status = pub->Wait(); + if (!wait_status.ok()) { + fprintf(stderr, "Can't wait for publisher: %s", wait_status.ToString().c_str()); exit(1); } continue; diff --git a/manual_tests/perf_subspace_sub.cc b/manual_tests/perf_subspace_sub.cc index 837632e..392c8fc 100644 --- a/manual_tests/perf_subspace_sub.cc +++ b/manual_tests/perf_subspace_sub.cc @@ -21,10 +21,10 @@ int main(int argc, char **argv) { signal(SIGPIPE, SIG_IGN); subspace::Client client; - absl::Status s = client.Init(absl::GetFlag(FLAGS_socket)); - if (!s.ok()) { + absl::Status init_status = client.Init(absl::GetFlag(FLAGS_socket)); + if (!init_status.ok()) { fprintf(stderr, "Can't connect to Subspace server: %s\n", - s.ToString().c_str()); + init_status.ToString().c_str()); exit(1); } bool reliable = absl::GetFlag(FLAGS_reliable); @@ -54,8 +54,8 @@ int main(int argc, char **argv) { if (start != 0) { wait_start = toolbelt::Now(); } - if (absl::Status s = sub->Wait(); !s.ok()) { - fprintf(stderr, "Can't wait for subscriber: %s\n", s.ToString().c_str()); + if (absl::Status wait_status = sub->Wait(); !wait_status.ok()) { + fprintf(stderr, "Can't wait for subscriber: %s\n", wait_status.ToString().c_str()); exit(1); } if (wait_start != 0) { diff --git a/manual_tests/pub.cc b/manual_tests/pub.cc index 4715cec..e69fc22 100644 --- a/manual_tests/pub.cc +++ b/manual_tests/pub.cc @@ -18,10 +18,10 @@ int main(int argc, char **argv) { absl::ParseCommandLine(argc, argv); subspace::Client client; - absl::Status s = client.Init(absl::GetFlag(FLAGS_socket)); - if (!s.ok()) { + absl::Status init_status = client.Init(absl::GetFlag(FLAGS_socket)); + if (!init_status.ok()) { fprintf(stderr, "Can't connect to Subspace server: %s\n", - s.ToString().c_str()); + init_status.ToString().c_str()); exit(1); } bool reliable = absl::GetFlag(FLAGS_reliable); @@ -63,10 +63,10 @@ int main(int argc, char **argv) { } if (*buffer == nullptr) { // Wait for publisher trigger. - absl::Status s = pub->Wait(); - if (!s.ok()) { + absl::Status wait_status = pub->Wait(); + if (!wait_status.ok()) { fprintf(stderr, "Can't wait for publisher: %s", - s.ToString().c_str()); + wait_status.ToString().c_str()); exit(1); } continue; diff --git a/manual_tests/sub.cc b/manual_tests/sub.cc index 7b266a4..2dab9f7 100644 --- a/manual_tests/sub.cc +++ b/manual_tests/sub.cc @@ -18,9 +18,9 @@ int main(int argc, char **argv) { subspace::Client client; - absl::Status s = client.Init(absl::GetFlag(FLAGS_socket)); - if (!s.ok()) { - fprintf(stderr, "Can't connect to Subspace server: %s\n", s.ToString().c_str()); + absl::Status init_status = client.Init(absl::GetFlag(FLAGS_socket)); + if (!init_status.ok()) { + fprintf(stderr, "Can't connect to Subspace server: %s\n", init_status.ToString().c_str()); exit(1); } bool reliable = absl::GetFlag(FLAGS_reliable); @@ -39,8 +39,8 @@ int main(int argc, char **argv) { }); for (;;) { - if (absl::Status s = sub->Wait(); !s.ok()) { - fprintf(stderr, "Can't wait for subscriber: %s\n", s.ToString().c_str()); + if (absl::Status wait_status = sub->Wait(); !wait_status.ok()) { + fprintf(stderr, "Can't wait for subscriber: %s\n", wait_status.ToString().c_str()); exit(1); } for (;;) { diff --git a/server/server.cc b/server/server.cc index 5b6fa6b..0da2313 100644 --- a/server/server.cc +++ b/server/server.cc @@ -895,12 +895,12 @@ void Server::BridgeReceiverCoroutine(std::string channel_name, } prefix->flags |= kMessageBridged; - absl::StatusOr s = + absl::StatusOr pub_msg = pub->PublishMessageInternal(*n, /*omit_prefix=*/true); - if (!s.ok()) { + if (!pub_msg.ok()) { logger_.Log(toolbelt::LogLevel::kError, "Failed to publish bridge message for %s: %s", - channel_name.c_str(), s.status().ToString().c_str()); + channel_name.c_str(), pub_msg.status().ToString().c_str()); } }