Skip to content

Commit

Permalink
CP-52821: use Mtime in Xapi_periodic_scheduler (#6161)
Browse files Browse the repository at this point in the history
Target: feature/perf

Note that this conflicts with
7e9310f?diff=unified&w=1,
but is a pre-requisite of
#6126
  • Loading branch information
edwintorok authored Dec 12, 2024
2 parents 0b34302 + 6b6c6c5 commit 83f4517
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 56 deletions.
2 changes: 1 addition & 1 deletion ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
(public_name xapi-stdext-threads.scheduler)
(name xapi_stdext_threads_scheduler)
(modules ipq scheduler)
(libraries mtime mtime.clock.os threads.posix unix xapi-log xapi-stdext-threads)
(libraries mtime mtime.clock.os threads.posix unix xapi-log xapi-stdext-threads clock)
)

(tests
Expand Down
14 changes: 7 additions & 7 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.ml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*)
(* Imperative priority queue *)

type 'a event = {ev: 'a; time: Mtime.t}
type 'a event = {ev: 'a; time: Mtime.span}

type 'a t = {default: 'a event; mutable size: int; mutable data: 'a event array}

Expand All @@ -23,7 +23,7 @@ let create n default =
if n <= 0 then
invalid_arg "create"
else
let default = {ev= default; time= Mtime_clock.now ()} in
let default = {ev= default; time= Mtime_clock.elapsed ()} in
{default; size= 0; data= Array.make n default}

let is_empty h = h.size <= 0
Expand All @@ -45,7 +45,7 @@ let add h x =
(* moving [x] up in the heap *)
let rec moveup i =
let fi = (i - 1) / 2 in
if i > 0 && Mtime.is_later d.(fi).time ~than:x.time then (
if i > 0 && Mtime.Span.is_longer d.(fi).time ~than:x.time then (
d.(i) <- d.(fi) ;
moveup fi
) else
Expand All @@ -69,7 +69,7 @@ let remove h s =
(* moving [x] up in the heap *)
let rec moveup i =
let fi = (i - 1) / 2 in
if i > 0 && Mtime.is_later d.(fi).time ~than:x.time then (
if i > 0 && Mtime.Span.is_longer d.(fi).time ~than:x.time then (
d.(i) <- d.(fi) ;
moveup fi
) else
Expand All @@ -83,7 +83,7 @@ let remove h s =
let j' = j + 1 in
if j' < n && d.(j').time < d.(j).time then j' else j
in
if Mtime.is_earlier d.(j).time ~than:x.time then (
if Mtime.Span.is_shorter d.(j).time ~than:x.time then (
d.(i) <- d.(j) ;
movedown j
) else
Expand All @@ -93,7 +93,7 @@ let remove h s =
in
if s = n then
()
else if Mtime.is_later d.(s).time ~than:x.time then
else if Mtime.Span.is_longer d.(s).time ~than:x.time then
moveup s
else
movedown s ;
Expand Down Expand Up @@ -129,7 +129,7 @@ let check h =
let d = h.data in
for i = 1 to h.size - 1 do
let fi = (i - 1) / 2 in
let ordered = Mtime.is_later d.(i).time ~than:d.(fi).time in
let ordered = Mtime.Span.is_longer d.(i).time ~than:d.(fi).time in
assert ordered
done

Expand Down
2 changes: 1 addition & 1 deletion ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.mli
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* GNU Lesser General Public License for more details.
*)

type 'a event = {ev: 'a; time: Mtime.t}
type 'a event = {ev: 'a; time: Mtime.span}

type 'a t

Expand Down
14 changes: 7 additions & 7 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq_test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ module Ipq = Xapi_stdext_threads_scheduler.Ipq
(* test we get "out of bound" exception calling Ipq.remove *)
let test_out_of_index () =
let q = Ipq.create 10 0 in
Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.now ()} ;
Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.elapsed ()} ;
let is_oob = function
| Invalid_argument s when String.ends_with ~suffix:" out of bounds" s ->
true
Expand All @@ -43,18 +43,18 @@ let test_leak () =
let use_array () = array.(0) <- 'a' in
let allocated = Atomic.make true in
Gc.finalise (fun _ -> Atomic.set allocated false) array ;
Ipq.add q {Ipq.ev= use_array; Ipq.time= Mtime_clock.now ()} ;
Ipq.add q {Ipq.ev= use_array; Ipq.time= Mtime_clock.elapsed ()} ;
Ipq.remove q 0 ;
Gc.full_major () ;
Gc.full_major () ;
Alcotest.(check bool) "allocated" false (Atomic.get allocated) ;
Ipq.add q {Ipq.ev= default; Ipq.time= Mtime_clock.now ()}
Ipq.add q {Ipq.ev= default; Ipq.time= Mtime_clock.elapsed ()}

(* test Ipq.is_empty call *)
let test_empty () =
let q = Ipq.create 10 0 in
Alcotest.(check bool) "same value" true (Ipq.is_empty q) ;
Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.now ()} ;
Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.elapsed ()} ;
Alcotest.(check bool) "same value" false (Ipq.is_empty q) ;
Ipq.remove q 0 ;
Alcotest.(check bool) "same value" true (Ipq.is_empty q)
Expand All @@ -75,7 +75,7 @@ let set queue =
Ipq.iter
(fun d ->
let t = d.time in
let t = Mtime.to_uint64_ns t in
let t = Mtime.Span.to_uint64_ns t in
s := Int64Set.add t !s
)
queue ;
Expand All @@ -86,7 +86,7 @@ let test_old () =
let s = ref Int64Set.empty in
let add i =
let ti = Random.int64 1000000L in
let t = Mtime.of_uint64_ns ti in
let t = Mtime.Span.of_uint64_ns ti in
let e = {Ipq.time= t; Ipq.ev= i} in
Ipq.add test e ;
s := Int64Set.add ti !s
Expand Down Expand Up @@ -123,7 +123,7 @@ let test_old () =
let prev = ref 0L in
for _ = 0 to 49 do
let e = Ipq.pop_maximum test in
let t = Mtime.to_uint64_ns e.time in
let t = Mtime.Span.to_uint64_ns e.time in
Alcotest.(check bool)
(Printf.sprintf "%Ld bigger than %Ld" t !prev)
true (t >= !prev) ;
Expand Down
49 changes: 23 additions & 26 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -33,31 +33,23 @@ let (queue : t Ipq.t) = Ipq.create 50 queue_default

let lock = Mutex.create ()

module Clock = struct
let span s = Mtime.Span.of_uint64_ns (Int64.of_float (s *. 1e9))

let span_to_s span =
Mtime.Span.to_uint64_ns span |> Int64.to_float |> fun ns -> ns /. 1e9

let add_span clock secs =
(* return mix or max available value if the add overflows *)
match Mtime.add_span clock (span secs) with
| Some t ->
t
| None when secs > 0. ->
Mtime.max_stamp
| None ->
Mtime.min_stamp
end

let add_to_queue name ty start newfunc =
let ( ++ ) = Clock.add_span in
let add_to_queue_span name ty start_span newfunc =
let ( ++ ) = Mtime.Span.add in
let item =
{Ipq.ev= {func= newfunc; ty; name}; Ipq.time= Mtime_clock.now () ++ start}
{
Ipq.ev= {func= newfunc; ty; name}
; Ipq.time= Mtime_clock.elapsed () ++ start_span
}
in
with_lock lock (fun () -> Ipq.add queue item) ;
Delay.signal delay

let add_to_queue name ty start newfunc =
let start_span =
Clock.Timer.s_to_span start |> Option.value ~default:Mtime.Span.max_span
in
add_to_queue_span name ty start_span newfunc

let remove_from_queue name =
with_lock lock @@ fun () ->
match !pending_event with
Expand All @@ -72,8 +64,11 @@ let add_periodic_pending () =
with_lock lock @@ fun () ->
match !pending_event with
| Some ({ty= Periodic timer; _} as ev) ->
let ( ++ ) = Clock.add_span in
let item = {Ipq.ev; Ipq.time= Mtime_clock.now () ++ timer} in
let ( ++ ) = Mtime.Span.add in
let delta =
Clock.Timer.s_to_span timer |> Option.value ~default:Mtime.Span.max_span
in
let item = {Ipq.ev; Ipq.time= Mtime_clock.elapsed () ++ delta} in
Ipq.add queue item ;
pending_event := None
| Some {ty= OneShot; _} ->
Expand All @@ -85,15 +80,15 @@ let loop () =
debug "%s started" __MODULE__ ;
try
while true do
let now = Mtime_clock.now () in
let now = Mtime_clock.elapsed () in
let deadline, item =
with_lock lock @@ fun () ->
(* empty: wait till we get something *)
if Ipq.is_empty queue then
(Clock.add_span now 10.0, None)
(Mtime.Span.add now Mtime.Span.(10 * s), None)
else
let next = Ipq.maximum queue in
if Mtime.is_later next.Ipq.time ~than:now then
if Mtime.Span.is_longer next.Ipq.time ~than:now then
(* not expired: wait till time or interrupted *)
(next.Ipq.time, None)
else (
Expand All @@ -111,7 +106,9 @@ let loop () =
| None -> (
(* Sleep until next event. *)
let sleep =
Mtime.(span deadline now) |> Mtime.Span.(add ms) |> Clock.span_to_s
Mtime.(Span.abs_diff deadline now)
|> Mtime.Span.(add ms)
|> Clock.Timer.span_to_s
in
try ignore (Delay.wait delay sleep)
with e ->
Expand Down
4 changes: 4 additions & 0 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ type func_ty =
| OneShot (** Fire just once *)
| Periodic of float (** Fire periodically with a given period in seconds *)

val add_to_queue_span :
string -> func_ty -> Mtime.span -> (unit -> unit) -> unit
(** Start a new timer. *)

val add_to_queue : string -> func_ty -> float -> (unit -> unit) -> unit
(** Start a new timer. *)

Expand Down
38 changes: 24 additions & 14 deletions ocaml/xapi/xapi_event.ml
Original file line number Diff line number Diff line change
Expand Up @@ -419,20 +419,25 @@ module From = struct

let session_is_invalid call = with_lock call.m (fun () -> call.session_invalid)

let wait2 call from_id deadline =
let wait2 call from_id timer =
let timeoutname = Printf.sprintf "event_from_timeout_%Ld" call.index in
with_lock m (fun () ->
while
from_id = call.cur_id
&& (not (session_is_invalid call))
&& Unix.gettimeofday () < deadline
&& not (Clock.Timer.has_expired timer)
do
Xapi_stdext_threads_scheduler.Scheduler.add_to_queue timeoutname
Xapi_stdext_threads_scheduler.Scheduler.OneShot
(deadline -. Unix.gettimeofday () +. 0.5)
(fun () -> Condition.broadcast c) ;
Condition.wait c m ;
Xapi_stdext_threads_scheduler.Scheduler.remove_from_queue timeoutname
match Clock.Timer.remaining timer with
| Expired _ ->
()
| Remaining delta ->
Xapi_stdext_threads_scheduler.Scheduler.add_to_queue_span
timeoutname Xapi_stdext_threads_scheduler.Scheduler.OneShot
delta (fun () -> Condition.broadcast c
) ;
Condition.wait c m ;
Xapi_stdext_threads_scheduler.Scheduler.remove_from_queue
timeoutname
done
) ;
if session_is_invalid call then (
Expand Down Expand Up @@ -506,7 +511,7 @@ let rec next ~__context =
else
rpc_of_events relevant

let from_inner __context session subs from from_t deadline =
let from_inner __context session subs from from_t timer =
let open Xapi_database in
let open From in
(* The database tables involved in our subscription *)
Expand Down Expand Up @@ -605,14 +610,14 @@ let from_inner __context session subs from from_t deadline =
&& mods = []
&& deletes = []
&& messages = []
&& Unix.gettimeofday () < deadline
&& not (Clock.Timer.has_expired timer)
then (
last_generation := last ;
(* Cur_id was bumped, but nothing relevent fell out of the db. Therefore the *)
sub.cur_id <- last ;
(* last id the client got is equivalent to the current one *)
last_msg_gen := msg_gen ;
wait2 sub last deadline ;
wait2 sub last timer ;
Thread.delay 0.05 ;
grab_nonempty_range ()
) else
Expand Down Expand Up @@ -705,14 +710,19 @@ let from ~__context ~classes ~token ~timeout =
)
in
let subs = List.map Subscription.of_string classes in
let deadline = Unix.gettimeofday () +. timeout in
let duration =
timeout
|> Clock.Timer.s_to_span
|> Option.value ~default:Mtime.Span.(24 * hour)
in
let timer = Clock.Timer.start ~duration in
(* We need to iterate because it's possible for an empty event set
to be generated if we peek in-between a Modify and a Delete; we'll
miss the Delete event and fail to generate the Modify because the
snapshot can't be taken. *)
let rec loop () =
let event_from = from_inner __context session subs from from_t deadline in
if event_from.events = [] && Unix.gettimeofday () < deadline then (
let event_from = from_inner __context session subs from from_t timer in
if event_from.events = [] && not (Clock.Timer.has_expired timer) then (
debug "suppressing empty event.from" ;
loop ()
) else
Expand Down

0 comments on commit 83f4517

Please sign in to comment.