Skip to content


Test and improve Xapi periodic scheduler (#6155)
Browse files Browse the repository at this point in the history
There was some issue in the current code, from structure corruption to
thread safety.
Add some test and fix discovered issues.

More details on commit messages.
  • Loading branch information
psafont authored Dec 11, 2024
2 parents d8baca7 + 88dd4d9 commit 8941c9d
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 38 deletions.
6 changes: 3 additions & 3 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(public_name xapi-stdext-threads)
(name xapi_stdext_threads)
(modules :standard \ ipq scheduler threadext_test ipq_test)
(modules :standard \ ipq scheduler threadext_test ipq_test scheduler_test)
Expand All @@ -22,8 +22,8 @@

(names threadext_test ipq_test)
(names threadext_test ipq_test scheduler_test)
(package xapi-stdext-threads)
(modules threadext_test ipq_test)
(modules threadext_test ipq_test scheduler_test)
(libraries xapi_stdext_threads alcotest mtime.clock.os mtime fmt threads.posix xapi_stdext_threads_scheduler)
89 changes: 56 additions & 33 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ let delay = Delay.make ()

let queue_default = {func= (fun () -> ()); ty= OneShot; name= ""}

let (pending_event : t option ref) = ref None

let (queue : t Ipq.t) = Ipq.create 50 queue_default

let lock = Mutex.create ()
Expand All @@ -48,48 +50,68 @@ module Clock = struct

let add_to_queue ?(signal = true) name ty start newfunc =
with_lock lock (fun () ->
let ( ++ ) = Clock.add_span in
Ipq.add queue
Ipq.ev= {func= newfunc; ty; name}
; Ipq.time= () ++ start
) ;
if signal then Delay.signal delay
let add_to_queue name ty start newfunc =
let ( ++ ) = Clock.add_span in
let item =
{Ipq.ev= {func= newfunc; ty; name}; Ipq.time= () ++ start}
with_lock lock (fun () -> Ipq.add queue item) ;
Delay.signal delay

let remove_from_queue name =
let index = Ipq.find_p queue (fun {name= n; _} -> name = n) in
if index > -1 then
Ipq.remove queue index
with_lock lock @@ fun () ->
match !pending_event with
| Some ev when = name ->
pending_event := None
| Some _ | None ->
let index = Ipq.find_p queue (fun {name= n; _} -> name = n) in
if index > -1 then
Ipq.remove queue index

let add_periodic_pending () =
with_lock lock @@ fun () ->
match !pending_event with
| Some ({ty= Periodic timer; _} as ev) ->
let ( ++ ) = Clock.add_span in
let item = {Ipq.ev; Ipq.time= () ++ timer} in
Ipq.add queue item ;
pending_event := None
| Some {ty= OneShot; _} ->
pending_event := None
| None ->

let loop () =
debug "%s started" __MODULE__ ;
while true do
let empty = with_lock lock (fun () -> Ipq.is_empty queue) in
if empty then
Thread.delay 10.0
(* Doesn't happen often - the queue isn't usually empty *)
let next = with_lock lock (fun () -> Ipq.maximum queue) in
let now = () in
if next.Ipq.time < now then (
let todo =
(with_lock lock (fun () -> Ipq.pop_maximum queue)).Ipq.ev
let now = () in
let deadline, item =
with_lock lock @@ fun () ->
(* empty: wait till we get something *)
if Ipq.is_empty queue then
(Clock.add_span now 10.0, None)
let next = Ipq.maximum queue in
if Mtime.is_later next.Ipq.time ~than:now then
(* not expired: wait till time or interrupted *)
(next.Ipq.time, None)
else (
(* remove expired item *)
Ipq.pop_maximum queue |> ignore ;
(* save periodic to be scheduled again *)
if next.Ipq.ev.ty <> OneShot then pending_event := Some next.Ipq.ev ;
(now, Some next.Ipq.ev)
match item with
| Some todo ->
(try todo.func () with _ -> ()) ;
match todo.ty with
| OneShot ->
| Periodic timer ->
add_to_queue ~signal:false todo.ty timer todo.func
) else (* Sleep until next event. *)
add_periodic_pending ()
| None -> (
(* Sleep until next event. *)
let sleep =
Mtime.(span next.Ipq.time now)
|> Mtime.Span.(add ms)
|> Clock.span_to_s
Mtime.(span deadline now) |> Mtime.Span.(add ms) |> Clock.span_to_s
try ignore (Delay.wait delay sleep)
with e ->
Expand All @@ -105,6 +127,7 @@ let loop () =
normal delay. New events may be missed."
detailed_msg ;
Thread.delay sleep
with _ ->
Expand Down
3 changes: 1 addition & 2 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ type func_ty =
| OneShot (** Fire just once *)
| Periodic of float (** Fire periodically with a given period in seconds *)

val add_to_queue :
?signal:bool -> string -> func_ty -> float -> (unit -> unit) -> unit
val add_to_queue : string -> func_ty -> float -> (unit -> unit) -> unit
(** Start a new timer. *)

val remove_from_queue : string -> unit
Expand Down
103 changes: 103 additions & 0 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
* Copyright (C) 2024 Cloud Software Group
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* GNU Lesser General Public License for more details.

module Scheduler = Xapi_stdext_threads_scheduler.Scheduler

let started = Atomic.make false

let start_schedule () =
if not ( started true) then
Thread.create Scheduler.loop () |> ignore

let send event data = Event.(send event data |> sync)

let receive event = Event.(receive event |> sync)

let elapsed_ms cnt =
let elapsed_ns = Mtime_clock.count cnt |> Mtime.Span.to_uint64_ns in
Int64.(div elapsed_ns 1000000L |> to_int)

let is_less = Alcotest.(testable (pp int)) Stdlib.( > )

let test_single () =
let finished = Event.new_channel () in
Scheduler.add_to_queue "one" Scheduler.OneShot 0.001 (fun () ->
send finished true
) ;
start_schedule () ;
Alcotest.(check bool) "result" true (receive finished)

let test_remove_self () =
let which = Event.new_channel () in
Scheduler.add_to_queue "self" (Scheduler.Periodic 0.001) 0.001 (fun () ->
(* this should remove the periodic scheduling *)
Scheduler.remove_from_queue "self" ;
(* add an operation to stop the test *)
Scheduler.add_to_queue "stop" Scheduler.OneShot 0.1 (fun () ->
send which "stop"
) ;
send which "self"
) ;
start_schedule () ;
let cnt = Mtime_clock.counter () in
Alcotest.(check string) "same event name" "self" (receive which) ;
Alcotest.(check string) "same event name" "stop" (receive which) ;
let elapsed_ms = elapsed_ms cnt in
Alcotest.check is_less "small time" 300 elapsed_ms

let test_empty () =
let finished = Event.new_channel () in
Scheduler.add_to_queue "one" Scheduler.OneShot 0.001 (fun () ->
send finished true
) ;
start_schedule () ;
Alcotest.(check bool) "finished" true (receive finished) ;
(* wait loop to go to wait with no work to do *)
Thread.delay 0.1 ;
Scheduler.add_to_queue "two" Scheduler.OneShot 0.001 (fun () ->
send finished true
) ;
let cnt = Mtime_clock.counter () in
Alcotest.(check bool) "finished" true (receive finished) ;
let elapsed_ms = elapsed_ms cnt in
Alcotest.check is_less "small time" 100 elapsed_ms

let test_wakeup () =
let which = Event.new_channel () in
(* schedule a long event *)
Scheduler.add_to_queue "long" Scheduler.OneShot 2.0 (fun () ->
send which "long"
) ;
start_schedule () ;
(* wait loop to go to wait with no work to do *)
Thread.delay 0.1 ;
let cnt = Mtime_clock.counter () in
(* schedule a quick event, should wake up the loop *)
Scheduler.add_to_queue "quick" Scheduler.OneShot 0.1 (fun () ->
send which "quick"
) ;
Alcotest.(check string) "same event name" "quick" (receive which) ;
Scheduler.remove_from_queue "long" ;
let elapsed_ms = elapsed_ms cnt in
Alcotest.check is_less "small time" 150 elapsed_ms

let tests =
("test_single", `Quick, test_single)
; ("test_remove_self", `Quick, test_remove_self)
; ("test_empty", `Quick, test_empty)
; ("test_wakeup", `Quick, test_wakeup)

let () = "Scheduler" [("generic", tests)]
Empty file.

0 comments on commit 8941c9d

Please sign in to comment.