From 81cbd74b5053126af360cbfd36cd70de61e25bfd Mon Sep 17 00:00:00 2001 From: Frediano Ziglio Date: Sun, 1 Dec 2024 13:24:56 +0000 Subject: [PATCH] Rewrite Delay module Old implementation had different issues. Just use mutexes and conditions using C stubs. Current Ocaml Condition module does not have support for timeout waiting for conditions, so use C stubs. This implementation does not require opening a pipe. This reduces the possibilities of errors calling "wait" to zero. Mostly of the times it does not require kernel calls. Signed-off-by: Frediano Ziglio --- .../lib/xapi-stdext-threads/delay_stubs.c | 169 ++++++++++++++++++ .../xapi-stdext/lib/xapi-stdext-threads/dune | 5 + .../lib/xapi-stdext-threads/threadext.ml | 76 ++------ .../lib/xapi-stdext-threads/threadext.mli | 6 +- 4 files changed, 195 insertions(+), 61 deletions(-) create mode 100644 ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/delay_stubs.c diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/delay_stubs.c b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/delay_stubs.c new file mode 100644 index 0000000000..05138c263d --- /dev/null +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/delay_stubs.c @@ -0,0 +1,169 @@ +/* + * Copyright (C) 2024 Cloud Software Group + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + */ + +#include +#include +#include +#include + +#include +#include +#include + +typedef struct delay { + pthread_mutex_t mtx; + pthread_cond_t cond; + bool signaled; +} delay; + +// Initialize delay +// Returns error number or 0 if success +static int delay_init(delay *d) +{ + int err; + pthread_condattr_t cond_attr; + + d->signaled = false; + + err = pthread_condattr_init(&cond_attr); + if (err) + goto err0; + err = pthread_condattr_setclock(&cond_attr, CLOCK_MONOTONIC); + if (!err) + err = pthread_cond_init(&d->cond, &cond_attr); + if (err) + goto err1; + err = pthread_mutex_init(&d->mtx, NULL); + if (err) + goto err2; + pthread_condattr_destroy(&cond_attr); + return 0; + +err2: + pthread_cond_destroy(&d->cond); +err1: + pthread_condattr_destroy(&cond_attr); +err0: + return err; +} + +static void delay_destroy(delay *d) +{ + pthread_cond_destroy(&d->cond); + pthread_mutex_destroy(&d->mtx); +} + +static void delay_signal(delay *d) +{ + // there are quite some chances lock is not held + if (pthread_mutex_trylock(&d->mtx) == 0) { + d->signaled = true; + pthread_cond_signal(&d->cond); + pthread_mutex_unlock(&d->mtx); + return; + } + + // slow way, release engine + caml_release_runtime_system(); + pthread_mutex_lock(&d->mtx); + d->signaled = true; + pthread_cond_signal(&d->cond); + pthread_mutex_unlock(&d->mtx); + caml_acquire_runtime_system(); +} + +// Wait for deadline or signal. +// Returns error number or 0 if success. +// Error can be ETIMEDOUT. +int delay_wait(delay *d, const struct timespec *deadline) +{ + int err; + + caml_release_runtime_system(); + pthread_mutex_lock(&d->mtx); + do { + if (d->signaled) { + d->signaled = false; + err = 0; + break; + } + err = pthread_cond_timedwait(&d->cond, &d->mtx, deadline); + } while (err == 0); + pthread_mutex_unlock(&d->mtx); + caml_acquire_runtime_system(); + return err; +} + +#define delay_val(v) (*((delay **)Data_custom_val(v))) + +static void delay_finalize(value v_delay) +{ + delay *d = delay_val(v_delay); + delay_destroy(d); + caml_stat_free(d); +} + +static struct custom_operations delay_ops = { + "xapi.delay", + delay_finalize, + custom_compare_default, + custom_hash_default, + custom_serialize_default, + custom_deserialize_default, + custom_compare_ext_default, + custom_fixed_length_default +}; + +CAMLprim value caml_xapi_delay_create(value v_unit) +{ + CAMLparam1(v_unit); + CAMLlocal1(res); + delay *d; + int err; + + d = caml_stat_alloc(sizeof(*d)); + err = delay_init(d); + if (err) { + caml_stat_free(d); + unix_error(err, "caml_delay_create", Nothing); + } + res = caml_alloc_custom(&delay_ops, sizeof(delay *), 0, 1); + delay_val(res) = d; + CAMLreturn(res); +} + +CAMLprim value caml_xapi_delay_signal(value v_delay) +{ + CAMLparam1(v_delay); + delay *d = delay_val(v_delay); + delay_signal(d); + CAMLreturn(Val_unit); +} + +CAMLprim value caml_xapi_delay_wait(value v_delay, value v_deadline) +{ + CAMLparam2(v_delay, v_deadline); + delay *d = delay_val(v_delay); + uint64_t deadline = (uint64_t) Int64_val(v_deadline); + struct timespec ts = { + deadline / 1000000000u, + deadline % 1000000000u + }; + + int err = delay_wait(d, &ts); + if (err != 0 && err != ETIMEDOUT) + unix_error(err, "caml_delay_wait", Nothing); + + CAMLreturn(err ? Val_true : Val_false); +} diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune index e0437e881a..7fcff9e08c 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune @@ -3,10 +3,15 @@ (name xapi_stdext_threads) (modules :standard \ ipq scheduler threadext_test ipq_test) (libraries + mtime + mtime.clock.os threads.posix unix xapi-stdext-unix xapi-stdext-pervasives) + (foreign_stubs + (language c) + (names delay_stubs)) ) (library diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml index 311d985ca6..b954a159dd 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml @@ -55,69 +55,27 @@ let thread_iter f xs = match thread_iter_all_exns f xs with [] -> () | (_, e) :: _ -> raise e module Delay = struct - (* Concrete type is the ends of a pipe *) - type t = { - (* A pipe is used to wake up a thread blocked in wait: *) - mutable pipe_in: Unix.file_descr option - ; (* Indicates that a signal arrived before a wait: *) - mutable signalled: bool - ; m: M.t - } + type t - let make () = {pipe_in= None; signalled= false; m= M.create ()} + external make : unit -> t = "caml_xapi_delay_create" - exception Pre_signalled + external signal : t -> unit = "caml_xapi_delay_signal" - let wait (x : t) (seconds : float) = - let to_close = ref [] in - let close' fd = - if List.mem fd !to_close then Unix.close fd ; - to_close := List.filter (fun x -> fd <> x) !to_close - in - finally - (fun () -> - try - let pipe_out = - Mutex.execute x.m (fun () -> - if x.signalled then ( - x.signalled <- false ; - raise Pre_signalled - ) ; - let pipe_out, pipe_in = Unix.pipe () in - (* these will be unconditionally closed on exit *) - to_close := [pipe_out; pipe_in] ; - x.pipe_in <- Some pipe_in ; - x.signalled <- false ; - pipe_out - ) - in - let open Xapi_stdext_unix.Unixext in - (* flush the single byte from the pipe *) - try - let (_ : string) = - time_limited_single_read pipe_out 1 ~max_wait:seconds - in - false - with Timeout -> true - (* return true if we waited the full length of time, false if we were woken *) - with Pre_signalled -> false - ) - (fun () -> - Mutex.execute x.m (fun () -> - x.pipe_in <- None ; - List.iter close' !to_close - ) - ) + external wait : t -> int64 -> bool = "caml_xapi_delay_wait" - let signal (x : t) = - Mutex.execute x.m (fun () -> - match x.pipe_in with - | Some fd -> - ignore (Unix.write fd (Bytes.of_string "X") 0 1) - | None -> - x.signalled <- true - (* If the wait hasn't happened yet then store up the signal *) - ) + let wait d t = + if t <= 0. then + true + else + match Mtime.Span.of_float_ns (t *. 1e9) with + | Some span -> + let now = Mtime_clock.now () in + let deadline = + Mtime.add_span now span |> Option.value ~default:Mtime.max_stamp + in + wait d (Mtime.to_uint64_ns deadline) + | None -> + invalid_arg "Time specified too big" end let wait_timed_read fd timeout = diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.mli b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.mli index b5edcff21b..5c02ff5487 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.mli +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.mli @@ -31,8 +31,10 @@ module Delay : sig val wait : t -> float -> bool (** Blocks the calling thread for a given period of time with the option of returning early if someone calls 'signal'. Returns true if the full time - period elapsed and false if signalled. Note that multple 'signals' are - coalesced; 'signals' sent before 'wait' is called are not lost. *) + period elapsed and false if signalled. Note that multiple 'signals' are + coalesced; 'signals' sent before 'wait' is called are not lost. + Only one thread should call 'wait' for a given 'Delay', attempts + to call from multiple thread is an undefined behaviour. *) val signal : t -> unit (** Sends a signal to a waiting thread. See 'wait' *)