Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support waiting on condition variable until a specific time point #665

Draft
wants to merge 19 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions include/zenoh-pico/system/common/platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ z_result_t _z_condvar_drop(_z_condvar_t *cv);
z_result_t _z_condvar_signal(_z_condvar_t *cv);
z_result_t _z_condvar_signal_all(_z_condvar_t *cv);
z_result_t _z_condvar_wait(_z_condvar_t *cv, _z_mutex_t *m);
z_result_t _z_condvar_wait_until(_z_condvar_t *cv, _z_mutex_t *m, const z_clock_t *abstime, bool *timeout);

/**
* Initializes a condition variable.
Expand Down Expand Up @@ -323,6 +324,25 @@ z_result_t z_condvar_signal(z_loaned_condvar_t *cv);
*/
z_result_t z_condvar_wait(z_loaned_condvar_t *cv, z_loaned_mutex_t *m);

/**
* Waits for a signal on the condition variable while holding a mutex until a specified time.
*
* The calling thread is blocked until the condition variable is signaled or the timeout occurs.
* The associated mutex must be locked by the calling thread, and it will be automatically unlocked while waiting.
* The `timeout` bool pointer should either be NULL or point to a valid memory, in which case the function will store a
* value indicating whether a timeout occurred. If NULL is passed in for `timeout`, it will not be set.
*
* Parameters:
* cv: Pointer to a :c:type:`z_loaned_condvar_t` on which to wait.
* m: Pointer to a :c:type:`z_loaned_mutex_t` that will be unlocked during the wait.
* abstime: Absolute end time.
* timeout: Whether a timeout occurred.
*
* Returns:
* ``0`` if the wait is successful, a negative value otherwise.
*/
z_result_t z_condvar_wait_until(z_loaned_condvar_t *cv, z_loaned_mutex_t *m, const z_clock_t *abstime, bool *timeout);

/*------------------ Sleep ------------------*/
/**
* Suspends execution for a specified amount of time in microseconds.
Expand Down Expand Up @@ -396,6 +416,33 @@ unsigned long z_clock_elapsed_ms(z_clock_t *time);
*/
unsigned long z_clock_elapsed_s(z_clock_t *time);

/**
* Offsets the clock by a specified duration in microseconds.
*
* Parameters:
* clock: Pointer to a `z_clock_t` to offset.
* duration: The duration in microseconds.
*/
void z_clock_advance_us(z_clock_t *clock, unsigned long duration);

/**
* Offsets the clock by a specified duration in milliseconds.
*
* Parameters:
* clock: Pointer to a `z_clock_t` to offset.
* duration: The duration in milliseconds.
*/
void z_clock_advance_ms(z_clock_t *clock, unsigned long duration);

/**
* Offsets the clock by a specified duration in seconds.
*
* Parameters:
* clock: Pointer to a `z_clock_t` to offset.
* duration: The duration in seconds.
*/
void z_clock_advance_s(z_clock_t *clock, unsigned long duration);

/*------------------ Time ------------------*/

/**
Expand Down
46 changes: 44 additions & 2 deletions src/system/arduino/esp32/system.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//

#include <Arduino.h>
#include <errno.h>
#include <esp_heap_caps.h>
#include <esp_random.h>
#include <stddef.h>
Expand Down Expand Up @@ -114,15 +115,34 @@ z_result_t _z_mutex_try_lock(_z_mutex_t *m) { _Z_CHECK_SYS_ERR(pthread_mutex_try
z_result_t _z_mutex_unlock(_z_mutex_t *m) { _Z_CHECK_SYS_ERR(pthread_mutex_unlock(m)); }

/*------------------ Condvar ------------------*/
z_result_t _z_condvar_init(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_init(cv, NULL)); }
z_result_t _z_condvar_init(_z_condvar_t *cv) {
pthread_condattr_t attr;
pthread_condattr_init(&attr);
pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
_Z_CHECK_SYS_ERR(pthread_cond_init(cv, &attr));
}

z_result_t _z_condvar_drop(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_destroy(cv)); }

z_result_t _z_condvar_signal(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_signal(cv)); }

z_result_t _z_condvar_signal_all(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_broadcast(cv)); }

z_result_t _z_condvar_wait(_z_condvar_t *cv, _z_mutex_t *m) { _Z_CHECK_SYS_ERR(pthread_cond_wait(cv, m)); }
z_result_t _z_condvar_wait_until(_z_condvar_t *cv, _z_mutex_t *m, const z_clock_t *abstime, bool *timeout) {
int error = pthread_cond_timedwait(cv, m, abstime);

if (error == ETIMEDOUT) {
if (timeout != NULL) {
*timeout = true;
}
return 0;
}

if (timeout != NULL) {
*timeout = false;
}
_Z_CHECK_SYS_ERR(error);
}
#endif // Z_FEATURE_MULTI_THREAD == 1

/*------------------ Sleep ------------------*/
Expand Down Expand Up @@ -177,6 +197,28 @@ unsigned long z_clock_elapsed_s(z_clock_t *instant) {
return elapsed;
}

void z_clock_advance_us(z_clock_t *clock, unsigned long duration) {
clock->tv_sec += duration / 1000000;
clock->tv_nsec += (duration % 1000000) * 1000;

if (clock->tv_nsec >= 1000000000) {
clock->tv_sec += 1;
clock->tv_nsec -= 1000000000;
}
}

void z_clock_advance_ms(z_clock_t *clock, unsigned long duration) {
clock->tv_sec += duration / 1000;
clock->tv_nsec += (duration % 1000) * 1000000;

if (clock->tv_nsec >= 1000000000) {
clock->tv_sec += 1;
clock->tv_nsec -= 1000000000;
}
}

void z_clock_advance_s(z_clock_t *clock, unsigned long duration) { clock->tv_sec += duration; }

/*------------------ Time ------------------*/
z_time_t z_time_now(void) {
z_time_t now;
Expand Down
3 changes: 3 additions & 0 deletions src/system/common/platform.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,8 @@ z_result_t z_condvar_drop(z_moved_condvar_t *cv) { return _z_condvar_drop(&cv->_

z_result_t z_condvar_signal(z_loaned_condvar_t *cv) { return _z_condvar_signal(cv); }
z_result_t z_condvar_wait(z_loaned_condvar_t *cv, z_loaned_mutex_t *m) { return _z_condvar_wait(cv, m); }
z_result_t z_condvar_wait_until(z_loaned_condvar_t *cv, z_loaned_mutex_t *m, const z_clock_t *abstime, bool *timeout) {
return _z_condvar_wait_until(cv, m, abstime, timeout);
}

#endif // Z_FEATURE_MULTI_THREAD == 1
46 changes: 45 additions & 1 deletion src/system/espidf/system.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//

#include <errno.h>
#include <esp_heap_caps.h>
#include <esp_random.h>
#include <stddef.h>
Expand Down Expand Up @@ -142,7 +143,12 @@ z_result_t _z_mutex_try_lock(_z_mutex_t *m) { _Z_CHECK_SYS_ERR(pthread_mutex_try
z_result_t _z_mutex_unlock(_z_mutex_t *m) { _Z_CHECK_SYS_ERR(pthread_mutex_unlock(m)); }

/*------------------ Condvar ------------------*/
z_result_t _z_condvar_init(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_init(cv, NULL)); }
z_result_t _z_condvar_init(_z_condvar_t *cv) {
pthread_condattr_t attr;
pthread_condattr_init(&attr);
pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
_Z_CHECK_SYS_ERR(pthread_cond_init(cv, &attr));
}

z_result_t _z_condvar_drop(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_destroy(cv)); }

Expand All @@ -151,6 +157,22 @@ z_result_t _z_condvar_signal(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_s
z_result_t _z_condvar_signal_all(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_broadcast(cv)); }

z_result_t _z_condvar_wait(_z_condvar_t *cv, _z_mutex_t *m) { _Z_CHECK_SYS_ERR(pthread_cond_wait(cv, m)); }

z_result_t _z_condvar_wait_until(_z_condvar_t *cv, _z_mutex_t *m, const z_clock_t *abstime, bool *timeout) {
int error = pthread_cond_timedwait(cv, m, abstime);

if (error == ETIMEDOUT) {
if (timeout != NULL) {
*timeout = true;
}
return 0;
}

if (timeout != NULL) {
*timeout = false;
}
_Z_CHECK_SYS_ERR(error);
}
#endif // Z_FEATURE_MULTI_THREAD == 1

/*------------------ Sleep ------------------*/
Expand Down Expand Up @@ -202,6 +224,28 @@ unsigned long z_clock_elapsed_s(z_clock_t *instant) {
return elapsed;
}

void z_clock_advance_us(z_clock_t *clock, unsigned long duration) {
clock->tv_sec += duration / 1000000;
clock->tv_nsec += (duration % 1000000) * 1000;

if (clock->tv_nsec >= 1000000000) {
clock->tv_sec += 1;
clock->tv_nsec -= 1000000000;
}
}

void z_clock_advance_ms(z_clock_t *clock, unsigned long duration) {
clock->tv_sec += duration / 1000;
clock->tv_nsec += (duration % 1000) * 1000000;

if (clock->tv_nsec >= 1000000000) {
clock->tv_sec += 1;
clock->tv_nsec -= 1000000000;
}
}

void z_clock_advance_s(z_clock_t *clock, unsigned long duration) { clock->tv_sec += duration; }

/*------------------ Time ------------------*/
z_time_t z_time_now(void) {
z_time_t now;
Expand Down
41 changes: 41 additions & 0 deletions src/system/freertos_plus_tcp/system.c
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,38 @@ z_result_t _z_condvar_wait(_z_condvar_t *cv, _z_mutex_t *m) {

return _Z_RES_OK;
}

z_result_t _z_condvar_wait_until(_z_condvar_t *cv, _z_mutex_t *m, const z_clock_t *abstime, bool *timeout) {
if (!cv || !m) {
return _Z_ERR_GENERIC;
}

TickType_t now = xTaskGetTickCount();
TickType_t target_time = *abstime;
TickType_t block_duration = (target_time > now) ? (target_time - now) : 0;

xSemaphoreTake(cv->mutex, portMAX_DELAY);
cv->waiters++;
xSemaphoreGive(cv->mutex);

_z_mutex_unlock(m);

bool timed_out = xSemaphoreTake(cv->sem, block_duration) == pdFALSE;

_z_mutex_lock(m);

if (timed_out) {
xSemaphoreTake(cv->mutex, portMAX_DELAY);
cv->waiters--;
xSemaphoreGive(cv->mutex);
}

if (timeout != NULL) {
*timeout = timed_out;
}

return _Z_RES_OK;
}
#endif // Z_MULTI_THREAD == 1

/*------------------ Sleep ------------------*/
Expand Down Expand Up @@ -296,6 +328,15 @@ unsigned long z_clock_elapsed_ms(z_clock_t *instant) {

unsigned long z_clock_elapsed_s(z_clock_t *instant) { return z_clock_elapsed_ms(instant) / 1000; }

void z_clock_advance_us(z_clock_t *clock, unsigned long duration) { z_clock_advance_ms(clock, duration / 1000); }

void z_clock_advance_ms(z_clock_t *clock, unsigned long duration) {
unsigned long ticks = pdMS_TO_TICKS(duration);
*clock += ticks;
}

void z_clock_advance_s(z_clock_t *clock, unsigned long duration) { z_clock_advance_ms(clock, duration * 1000); }

/*------------------ Time ------------------*/
z_time_t z_time_now(void) {
z_time_t now;
Expand Down
55 changes: 55 additions & 0 deletions src/system/mbed/system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,39 @@ z_result_t _z_condvar_wait(_z_condvar_t *cv, _z_mutex_t *m) {

return _Z_RES_OK;
}

z_result_t _z_condvar_wait_until(_z_condvar_t *cv, _z_mutex_t *m, const z_clock_t *abstime, bool *timeout) {
if (!cv || !m) {
return _Z_ERR_GENERIC;
}

auto &cond_var = *(condvar *)*cv;

auto target_time =
Kernel::Clock::time_point(Kernel::Clock::duration(abstime->tv_sec * 1000LL + abstime->tv_nsec / 1000000));

cond_var.mutex.lock();
cond_var.waiters++;
cond_var.mutex.unlock();

_z_mutex_unlock(m);

bool timed_out = cond_var.sem.try_acquire_until(target_time) == false;

_z_mutex_lock(m);

if (timed_out) {
cond_var.mutex.lock();
cond_var.waiters--;
cond_var.mutex.unlock();
}

if (timeout != NULL) {
*timeout = timed_out;
}

return _Z_RES_OK;
}
#endif // Z_FEATURE_MULTI_THREAD == 1

/*------------------ Sleep ------------------*/
Expand Down Expand Up @@ -222,6 +255,28 @@ unsigned long z_clock_elapsed_s(z_clock_t *instant) {
return elapsed;
}

void z_clock_advance_us(z_clock_t *clock, unsigned long duration) {
clock->tv_sec += duration / 1000000;
clock->tv_nsec += (duration % 1000000) * 1000;

if (clock->tv_nsec >= 1000000000) {
clock->tv_sec += 1;
clock->tv_nsec -= 1000000000;
}
}

void z_clock_advance_ms(z_clock_t *clock, unsigned long duration) {
clock->tv_sec += duration / 1000;
clock->tv_nsec += (duration % 1000) * 1000000;

if (clock->tv_nsec >= 1000000000) {
clock->tv_sec += 1;
clock->tv_nsec -= 1000000000;
}
}

void z_clock_advance_s(z_clock_t *clock, unsigned long duration) { clock->tv_sec += duration; }

/*------------------ Time ------------------*/
z_time_t z_time_now(void) {
z_time_t now;
Expand Down
Loading
Loading