Skip to content

Commit

Permalink
Update to v0.50.14
Browse files Browse the repository at this point in the history
  • Loading branch information
Mysticial committed Dec 18, 2024
1 parent 08e6797 commit fe36ab4
Show file tree
Hide file tree
Showing 47 changed files with 1,080 additions and 313 deletions.
77 changes: 42 additions & 35 deletions ClientSource/Connection/PABotBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,16 @@ PABotBase::PABotBase(
, m_last_ack(current_time())
, m_state(State::RUNNING)
, m_error(false)
, m_retransmit_thread(run_with_catch, "PABotBase::retransmit_thread()", [this]{ retransmit_thread(); })
{
set_sniffer(message_logger);

// We must initialize this last because it will trigger the lifetime
// sanitizer if it beats it to construction.
m_retransmit_thread = std::thread(
run_with_catch,
"PABotBase::retransmit_thread()",
[this]{ retransmit_thread(); }
);
}
PABotBase::~PABotBase(){
stop();
Expand All @@ -57,7 +64,7 @@ PABotBase::~PABotBase(){
}

size_t PABotBase::inflight_requests(){
m_sanitizer.check_usage();
auto scope_check = m_sanitizer.check_scope();

// Must be called under m_state_lock.

Expand All @@ -71,15 +78,15 @@ size_t PABotBase::inflight_requests(){
}

void PABotBase::connect(){
m_sanitizer.check_usage();
auto scope_check = m_sanitizer.check_scope();

pabb_MsgAckRequest response;
issue_request_and_wait(
Microcontroller::DeviceRequest_seqnum_reset(), nullptr
).convert<PABB_MSG_ACK_REQUEST>(logger(), response);
}
void PABotBase::stop(){
m_sanitizer.check_usage();
auto scope_check = m_sanitizer.check_scope();

// Make sure only one thread can get in here.
State expected = State::RUNNING;
Expand All @@ -95,7 +102,7 @@ void PABotBase::stop(){
m_retransmit_thread.join();

{
SpinLockGuard lg(m_state_lock, "PABotBase::stop()");
ReadSpinLock lg(m_state_lock, "PABotBase::stop()");

// Send a stop request, but don't wait for a response that we may never
// receive.
Expand All @@ -122,7 +129,7 @@ void PABotBase::set_queue_limit(size_t queue_limit){
}

void PABotBase::wait_for_all_requests(const Cancellable* cancelled){
m_sanitizer.check_usage();
auto scope_check = m_sanitizer.check_scope();

std::unique_lock<std::mutex> lg(m_sleep_lock);
m_logger.log("Waiting for all requests to finish...", COLOR_DARKGREEN);
Expand All @@ -134,7 +141,7 @@ void PABotBase::wait_for_all_requests(const Cancellable* cancelled){
throw InvalidConnectionStateException();
}
{
SpinLockGuard lg1(m_state_lock, "PABotBase::wait_for_all_requests()");
ReadSpinLock lg1(m_state_lock, "PABotBase::wait_for_all_requests()");
#if 0
m_logger.log(
"Waiting for all requests to finish... (Requests: " +
Expand Down Expand Up @@ -170,7 +177,7 @@ bool PABotBase::try_stop_all_commands(){
cout << "-----------------------------------------------------------------------------------------" << endl;
#endif

m_sanitizer.check_usage();
auto scope_check = m_sanitizer.check_scope();

uint64_t seqnum = try_issue_request(nullptr, Microcontroller::DeviceRequest_request_stop(), true);
if (seqnum != 0){
Expand All @@ -197,13 +204,13 @@ void PABotBase::stop_all_commands(){
cout << "-----------------------------------------------------------------------------------------" << endl;
#endif

m_sanitizer.check_usage();
auto scope_check = m_sanitizer.check_scope();

uint64_t seqnum = issue_request(nullptr, Microcontroller::DeviceRequest_request_stop(), true);
clear_all_active_commands(seqnum);
}
bool PABotBase::try_next_command_interrupt(){
m_sanitizer.check_usage();
auto scope_check = m_sanitizer.check_scope();

uint64_t seqnum = try_issue_request(nullptr, Microcontroller::DeviceRequest_next_command_interrupt(), true);
if (seqnum != 0){
Expand All @@ -214,17 +221,17 @@ bool PABotBase::try_next_command_interrupt(){
}
}
void PABotBase::next_command_interrupt(){
m_sanitizer.check_usage();
auto scope_check = m_sanitizer.check_scope();

uint64_t seqnum = issue_request(nullptr, Microcontroller::DeviceRequest_next_command_interrupt(), true);
clear_all_active_commands(seqnum);
}
void PABotBase::clear_all_active_commands(uint64_t seqnum){
m_sanitizer.check_usage();
auto scope_check = m_sanitizer.check_scope();

// Remove all commands at or before the specified seqnum.
std::lock_guard<std::mutex> lg0(m_sleep_lock);
SpinLockGuard lg1(m_state_lock, "PABotBase::next_command_interrupt()");
WriteSpinLock lg1(m_state_lock, "PABotBase::next_command_interrupt()");
m_logger.log("Clearing all active commands... (Commands: " + std::to_string(m_pending_commands.size()) + ")", COLOR_DARKGREEN);

// if (m_pending_commands.size() > 2){
Expand All @@ -247,7 +254,7 @@ void PABotBase::clear_all_active_commands(uint64_t seqnum){
}
template <typename Map>
uint64_t PABotBase::infer_full_seqnum(const Map& map, seqnum_t seqnum) const{
m_sanitizer.check_usage();
auto scope_check = m_sanitizer.check_scope();

// The protocol uses a 32-bit seqnum that wraps around. For our purposes of
// retransmits, we use a full 64-bit seqnum to maintain sorting order
Expand All @@ -274,7 +281,7 @@ uint64_t PABotBase::infer_full_seqnum(const Map& map, seqnum_t seqnum) const{
}

uint64_t PABotBase::oldest_live_seqnum() const{
m_sanitizer.check_usage();
auto scope_check = m_sanitizer.check_scope();

// Must call under state lock.
uint64_t oldest = m_send_seq;
Expand All @@ -289,7 +296,7 @@ uint64_t PABotBase::oldest_live_seqnum() const{

template <typename Params>
void PABotBase::process_ack_request(BotBaseMessage message){
m_sanitizer.check_usage();
auto scope_check = m_sanitizer.check_scope();

if (message.body.size() != sizeof(Params)){
m_sniffer->log("Ignoring message with invalid size.");
Expand All @@ -300,7 +307,7 @@ void PABotBase::process_ack_request(BotBaseMessage message){

AckState state;
{
SpinLockGuard lg(m_state_lock, "PABotBase::process_ack_request()");
WriteSpinLock lg(m_state_lock, "PABotBase::process_ack_request()");

if (m_pending_requests.empty()){
m_sniffer->log("Unexpected request ack message: seqnum = " + std::to_string(seqnum));
Expand Down Expand Up @@ -345,7 +352,7 @@ void PABotBase::process_ack_request(BotBaseMessage message){
}
template <typename Params>
void PABotBase::process_ack_command(BotBaseMessage message){
m_sanitizer.check_usage();
auto scope_check = m_sanitizer.check_scope();

if (message.body.size() != sizeof(Params)){
m_sniffer->log("Ignoring message with invalid size.");
Expand All @@ -354,7 +361,7 @@ void PABotBase::process_ack_command(BotBaseMessage message){
const Params* params = (const Params*)message.body.c_str();
seqnum_t seqnum = params->seqnum;

SpinLockGuard lg(m_state_lock, "PABotBase::process_ack_command()");
WriteSpinLock lg(m_state_lock, "PABotBase::process_ack_command()");

if (m_pending_commands.empty()){
m_sniffer->log("Unexpected command ack message: seqnum = " + std::to_string(seqnum));
Expand Down Expand Up @@ -387,7 +394,7 @@ void PABotBase::process_ack_command(BotBaseMessage message){
}
template <typename Params>
void PABotBase::process_command_finished(BotBaseMessage message){
m_sanitizer.check_usage();
auto scope_check = m_sanitizer.check_scope();

if (message.body.size() != sizeof(Params)){
m_sniffer->log("Ignoring message with invalid size.");
Expand All @@ -403,7 +410,7 @@ void PABotBase::process_command_finished(BotBaseMessage message){
// m_send_queue.emplace_back((uint8_t)PABB_MSG_ACK, std::string((char*)&ack, sizeof(ack)));

std::lock_guard<std::mutex> lg0(m_sleep_lock);
SpinLockGuard lg1(m_state_lock, "PABotBase::process_command_finished() - 0");
WriteSpinLock lg1(m_state_lock, "PABotBase::process_command_finished() - 0");

send_message(BotBaseMessage(PABB_MSG_ACK_REQUEST, std::string((char*)&ack, sizeof(ack))), false);

Expand Down Expand Up @@ -442,7 +449,7 @@ void PABotBase::process_command_finished(BotBaseMessage message){
}
}
void PABotBase::on_recv_message(BotBaseMessage message){
m_sanitizer.check_usage();
auto scope_check = m_sanitizer.check_scope();

switch (message.type){
case PABB_MSG_ACK_COMMAND:
Expand Down Expand Up @@ -495,7 +502,7 @@ void PABotBase::on_recv_message(BotBaseMessage message){
}

void PABotBase::retransmit_thread(){
m_sanitizer.check_usage();
auto scope_check = m_sanitizer.check_scope();

// cout << "retransmit_thread()" << endl;
auto last_sent = current_time();
Expand All @@ -515,7 +522,7 @@ void PABotBase::retransmit_thread(){
}

// Process retransmits.
SpinLockGuard lg(m_state_lock, "PABotBase::retransmit_thread()");
WriteSpinLock lg(m_state_lock, "PABotBase::retransmit_thread()");
// std::cout << "retransmit_thread - m_pending_messages.size(): " << m_pending_messages.size() << std::endl;
// cout << "m_pending_messages.size()" << endl;

Expand Down Expand Up @@ -550,7 +557,7 @@ uint64_t PABotBase::try_issue_request(
const Cancellable* cancelled,
const BotBaseRequest& request, bool silent_remove
){
m_sanitizer.check_usage();
auto scope_check = m_sanitizer.check_scope();

BotBaseMessage message = request.message();
if (message.body.size() < sizeof(uint32_t)){
Expand All @@ -560,7 +567,7 @@ uint64_t PABotBase::try_issue_request(
throw InternalProgramError(&m_logger, PA_CURRENT_FUNCTION, "Message is too long.");
}

SpinLockGuard lg(m_state_lock, "PABotBase::try_issue_request()");
WriteSpinLock lg(m_state_lock, "PABotBase::try_issue_request()");
if (cancelled != nullptr && cancelled->cancelled()){
throw OperationCancelledException();
}
Expand Down Expand Up @@ -615,7 +622,7 @@ uint64_t PABotBase::try_issue_command(
const Cancellable* cancelled,
const BotBaseRequest& request, bool silent_remove
){
m_sanitizer.check_usage();
auto scope_check = m_sanitizer.check_scope();

BotBaseMessage message = request.message();
if (message.body.size() < sizeof(uint32_t)){
Expand All @@ -625,7 +632,7 @@ uint64_t PABotBase::try_issue_command(
throw InternalProgramError(&m_logger, PA_CURRENT_FUNCTION, "Message is too long.");
}

SpinLockGuard lg(m_state_lock, "PABotBase::try_issue_command()");
WriteSpinLock lg(m_state_lock, "PABotBase::try_issue_command()");
if (cancelled != nullptr && cancelled->cancelled()){
throw OperationCancelledException();
}
Expand Down Expand Up @@ -686,7 +693,7 @@ uint64_t PABotBase::issue_request(
const Cancellable* cancelled,
const BotBaseRequest& request, bool silent_remove
){
m_sanitizer.check_usage();
auto scope_check = m_sanitizer.check_scope();

// Issue a request or a command and return.
//
Expand Down Expand Up @@ -728,7 +735,7 @@ uint64_t PABotBase::issue_command(
const Cancellable* cancelled,
const BotBaseRequest& request, bool silent_remove
){
m_sanitizer.check_usage();
auto scope_check = m_sanitizer.check_scope();

// Issue a request or a command and return.
//
Expand Down Expand Up @@ -771,7 +778,7 @@ bool PABotBase::try_issue_request(
const BotBaseRequest& request,
const Cancellable* cancelled
){
m_sanitizer.check_usage();
auto scope_check = m_sanitizer.check_scope();

if (!request.is_command()){
return try_issue_request(cancelled, request, true) != 0;
Expand All @@ -783,7 +790,7 @@ void PABotBase::issue_request(
const BotBaseRequest& request,
const Cancellable* cancelled
){
m_sanitizer.check_usage();
auto scope_check = m_sanitizer.check_scope();

if (!request.is_command()){
issue_request(cancelled, request, true);
Expand All @@ -796,7 +803,7 @@ BotBaseMessage PABotBase::issue_request_and_wait(
const BotBaseRequest& request,
const Cancellable* cancelled
){
m_sanitizer.check_usage();
auto scope_check = m_sanitizer.check_scope();

if (request.is_command()){
throw InternalProgramError(&m_logger, PA_CURRENT_FUNCTION, "This function only supports requests.");
Expand All @@ -806,12 +813,12 @@ BotBaseMessage PABotBase::issue_request_and_wait(
return wait_for_request(seqnum);
}
BotBaseMessage PABotBase::wait_for_request(uint64_t seqnum){
m_sanitizer.check_usage();
auto scope_check = m_sanitizer.check_scope();

std::unique_lock<std::mutex> lg(m_sleep_lock);
while (true){
{
SpinLockGuard slg(m_state_lock, "PABotBase::issue_request_and_wait()");
WriteSpinLock slg(m_state_lock, "PABotBase::issue_request_and_wait()");
auto iter = m_pending_requests.find(seqnum);
if (iter == m_pending_requests.end()){
throw OperationCancelledException();
Expand Down
2 changes: 1 addition & 1 deletion ClientSource/Connection/SerialConnectionPOSIX.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class SerialConnection : public StreamConnection{

private:
virtual void send(const void* data, size_t bytes){
SpinLockGuard lg(m_send_lock, "SerialConnection::send()");
WriteSpinLock lg(m_send_lock, "SerialConnection::send()");
bytes = write(m_fd, data, bytes);
}

Expand Down
2 changes: 1 addition & 1 deletion ClientSource/Connection/SerialConnectionWinAPI.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class SerialConnection : public StreamConnection{


virtual void send(const void* data, size_t bytes){
SpinLockGuard lg(m_send_lock, "SerialConnection::send()");
WriteSpinLock lg(m_send_lock, "SerialConnection::send()");
#if 0
for (size_t c = 0; c < bytes; c++){
std::cout << "Send: " << (int)((const char*)data)[c] << std::endl;
Expand Down
Loading

0 comments on commit fe36ab4

Please sign in to comment.