Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/dallison/subspace
Browse files Browse the repository at this point in the history
  • Loading branch information
dallison committed Jan 20, 2024
2 parents 6299c9c + f76ecba commit 1089b68
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 54 deletions.
39 changes: 21 additions & 18 deletions common/channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,25 @@ static void UnmapMemory(void *p, size_t size, const char *purpose) {

mapped_regions->erase(p);
#endif
}

template <typename BufferSetIter>
static void UnmapBuffers(BufferSetIter first, BufferSetIter last, int num_slots) {
// Unmap any previously mapped buffers.
for (; first < last; ++first) {
int64_t buffers_size =
sizeof(BufferHeader) +
num_slots *
(Aligned<32>(first->slot_size) + sizeof(MessagePrefix));
if (buffers_size > 0 && first->buffer != nullptr) {
UnmapMemory(first->buffer, buffers_size, "buffers");
first->buffer = nullptr;
first->slot_size = 0;
}
}



}

static absl::StatusOr<void *> CreateSharedMemory(int id, const char *suffix,
Expand Down Expand Up @@ -319,15 +338,7 @@ absl::Status Channel::Map(SharedMemoryFds fds,
UnmapMemory(scb_, sizeof(SystemControlBlock), "SCB");
UnmapMemory(ccb_, ccb_size, "CCB");
// Unmap any previously mapped buffers.
for (int i = 0; i < index; i++) {
int64_t buffers_size =
sizeof(BufferHeader) +
num_slots_ *
(Aligned<32>(buffers_[i].slot_size) + sizeof(MessagePrefix));
if (buffers_size > 0 && buffers_[i].buffer != nullptr) {
UnmapMemory(buffers_[i].buffer, buffers_size, "buffers");
}
}
UnmapBuffers(buffers_.begin(), buffers_.begin() + index, num_slots_);
return absl::InternalError(absl::StrFormat(
"Failed to map channel buffers: %s", strerror(errno)));
}
Expand Down Expand Up @@ -411,15 +422,7 @@ absl::Status Channel::MapNewBuffers(std::vector<SlotBuffer> buffers) {

if (mem == MAP_FAILED) {
// Unmap any newly mapped buffers.
for (size_t i = start; i < buffers_.size(); i++) {
int64_t buffers_size =
sizeof(BufferHeader) +
num_slots_ *
(Aligned<32>(buffers_[i].slot_size) + sizeof(MessagePrefix));
if (buffers_size > 0 && buffers_[i].buffer != nullptr) {
UnmapMemory(buffers_[i].buffer, buffers_size, "buffers");
}
}
UnmapBuffers(buffers_.begin() + start, buffers_.end(), num_slots_);
return absl::InternalError(absl::StrFormat(
"Failed to map new channel buffers: %s", strerror(errno)));
}
Expand Down
22 changes: 11 additions & 11 deletions manual_tests/perf_subspace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ ABSL_FLAG(int, num_slots, 500, "Number of slots in channel");

void PubCoroutine(co::Coroutine *c) {
subspace::Client client(c);
absl::Status s = client.Init(absl::GetFlag(FLAGS_socket));
if (!s.ok()) {
absl::Status init_status = client.Init(absl::GetFlag(FLAGS_socket));
if (!init_status.ok()) {
fprintf(stderr, "Can't connect to Subspace server: %s\n",
s.ToString().c_str());
init_status.ToString().c_str());
exit(1);
}
int num_slots = absl::GetFlag(FLAGS_num_slots);
Expand All @@ -46,9 +46,9 @@ void PubCoroutine(co::Coroutine *c) {
}
if (*buffer == nullptr) {
// Wait for publisher trigger.
absl::Status s = pub->Wait();
if (!s.ok()) {
fprintf(stderr, "Can't wait for publisher: %s", s.ToString().c_str());
absl::Status wait_status = pub->Wait();
if (!wait_status.ok()) {
fprintf(stderr, "Can't wait for publisher: %s", wait_status.ToString().c_str());
exit(1);
}
continue;
Expand All @@ -66,10 +66,10 @@ void PubCoroutine(co::Coroutine *c) {

void SubCoroutine(co::Coroutine *c) {
subspace::Client client(c);
absl::Status s = client.Init(absl::GetFlag(FLAGS_socket));
if (!s.ok()) {
absl::Status init_status = client.Init(absl::GetFlag(FLAGS_socket));
if (!init_status.ok()) {
fprintf(stderr, "Can't connect to Subspace server: %s\n",
s.ToString().c_str());
init_status.ToString().c_str());
exit(1);
}
int slot_size = absl::GetFlag(FLAGS_slot_size);
Expand Down Expand Up @@ -97,8 +97,8 @@ void SubCoroutine(co::Coroutine *c) {
if (start != 0) {
wait_start = toolbelt::Now();
}
if (absl::Status s = sub->Wait(); !s.ok()) {
fprintf(stderr, "Can't wait for subscriber: %s\n", s.ToString().c_str());
if (absl::Status wait_status = sub->Wait(); !wait_status.ok()) {
fprintf(stderr, "Can't wait for subscriber: %s\n", wait_status.ToString().c_str());
exit(1);
}
if (wait_start != 0) {
Expand Down
12 changes: 6 additions & 6 deletions manual_tests/perf_subspace_pub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ int main(int argc, char **argv) {
signal(SIGPIPE, SIG_IGN);

subspace::Client client;
absl::Status s = client.Init(absl::GetFlag(FLAGS_socket));
if (!s.ok()) {
fprintf(stderr, "Can't connect to Subspace server: %s\n", s.ToString().c_str());
absl::Status init_status = client.Init(absl::GetFlag(FLAGS_socket));
if (!init_status.ok()) {
fprintf(stderr, "Can't connect to Subspace server: %s\n", init_status.ToString().c_str());
exit(1);
}
bool reliable = absl::GetFlag(FLAGS_reliable);
Expand All @@ -49,9 +49,9 @@ int main(int argc, char **argv) {
}
if (*buffer == nullptr) {
// Wait for publisher trigger.
absl::Status s = pub->Wait();
if (!s.ok()) {
fprintf(stderr, "Can't wait for publisher: %s", s.ToString().c_str());
absl::Status wait_status = pub->Wait();
if (!wait_status.ok()) {
fprintf(stderr, "Can't wait for publisher: %s", wait_status.ToString().c_str());
exit(1);
}
continue;
Expand Down
10 changes: 5 additions & 5 deletions manual_tests/perf_subspace_sub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ int main(int argc, char **argv) {
signal(SIGPIPE, SIG_IGN);

subspace::Client client;
absl::Status s = client.Init(absl::GetFlag(FLAGS_socket));
if (!s.ok()) {
absl::Status init_status = client.Init(absl::GetFlag(FLAGS_socket));
if (!init_status.ok()) {
fprintf(stderr, "Can't connect to Subspace server: %s\n",
s.ToString().c_str());
init_status.ToString().c_str());
exit(1);
}
bool reliable = absl::GetFlag(FLAGS_reliable);
Expand Down Expand Up @@ -54,8 +54,8 @@ int main(int argc, char **argv) {
if (start != 0) {
wait_start = toolbelt::Now();
}
if (absl::Status s = sub->Wait(); !s.ok()) {
fprintf(stderr, "Can't wait for subscriber: %s\n", s.ToString().c_str());
if (absl::Status wait_status = sub->Wait(); !wait_status.ok()) {
fprintf(stderr, "Can't wait for subscriber: %s\n", wait_status.ToString().c_str());
exit(1);
}
if (wait_start != 0) {
Expand Down
12 changes: 6 additions & 6 deletions manual_tests/pub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ int main(int argc, char **argv) {
absl::ParseCommandLine(argc, argv);

subspace::Client client;
absl::Status s = client.Init(absl::GetFlag(FLAGS_socket));
if (!s.ok()) {
absl::Status init_status = client.Init(absl::GetFlag(FLAGS_socket));
if (!init_status.ok()) {
fprintf(stderr, "Can't connect to Subspace server: %s\n",
s.ToString().c_str());
init_status.ToString().c_str());
exit(1);
}
bool reliable = absl::GetFlag(FLAGS_reliable);
Expand Down Expand Up @@ -63,10 +63,10 @@ int main(int argc, char **argv) {
}
if (*buffer == nullptr) {
// Wait for publisher trigger.
absl::Status s = pub->Wait();
if (!s.ok()) {
absl::Status wait_status = pub->Wait();
if (!wait_status.ok()) {
fprintf(stderr, "Can't wait for publisher: %s",
s.ToString().c_str());
wait_status.ToString().c_str());
exit(1);
}
continue;
Expand Down
10 changes: 5 additions & 5 deletions manual_tests/sub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ int main(int argc, char **argv) {

subspace::Client client;

absl::Status s = client.Init(absl::GetFlag(FLAGS_socket));
if (!s.ok()) {
fprintf(stderr, "Can't connect to Subspace server: %s\n", s.ToString().c_str());
absl::Status init_status = client.Init(absl::GetFlag(FLAGS_socket));
if (!init_status.ok()) {
fprintf(stderr, "Can't connect to Subspace server: %s\n", init_status.ToString().c_str());
exit(1);
}
bool reliable = absl::GetFlag(FLAGS_reliable);
Expand All @@ -39,8 +39,8 @@ int main(int argc, char **argv) {
});

for (;;) {
if (absl::Status s = sub->Wait(); !s.ok()) {
fprintf(stderr, "Can't wait for subscriber: %s\n", s.ToString().c_str());
if (absl::Status wait_status = sub->Wait(); !wait_status.ok()) {
fprintf(stderr, "Can't wait for subscriber: %s\n", wait_status.ToString().c_str());
exit(1);
}
for (;;) {
Expand Down
6 changes: 3 additions & 3 deletions server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -895,12 +895,12 @@ void Server::BridgeReceiverCoroutine(std::string channel_name,
}
prefix->flags |= kMessageBridged;

absl::StatusOr<const Message> s =
absl::StatusOr<const Message> pub_msg =
pub->PublishMessageInternal(*n, /*omit_prefix=*/true);
if (!s.ok()) {
if (!pub_msg.ok()) {
logger_.Log(toolbelt::LogLevel::kError,
"Failed to publish bridge message for %s: %s",
channel_name.c_str(), s.status().ToString().c_str());
channel_name.c_str(), pub_msg.status().ToString().c_str());
}
}

Expand Down

0 comments on commit 1089b68

Please sign in to comment.