diff --git a/ocaml/xapi-aux/throttle.ml b/ocaml/xapi-aux/throttle.ml index 18e955e3c0..b55378b2f3 100644 --- a/ocaml/xapi-aux/throttle.ml +++ b/ocaml/xapi-aux/throttle.ml @@ -41,9 +41,20 @@ module Make (Size : SIZE) = struct end module Batching = struct - type t = {delay_before: Mtime.span; delay_between: Mtime.span} + type t = { + delay_initial: Mtime.span + ; delay_before: Mtime.span + ; delay_between: Mtime.span + } - let make ~delay_before ~delay_between = {delay_before; delay_between} + let make ~delay_before ~delay_between = + (* we are dividing, cannot overflow *) + let delay_initial = + Mtime.Span.to_float_ns delay_between /. 16. + |> Mtime.Span.of_float_ns + |> Option.get + in + {delay_initial; delay_before; delay_between} (** [perform_delay delay] calls {!val:Thread.delay} when [delay] is non-zero. @@ -55,11 +66,15 @@ module Batching = struct if Mtime.Span.is_longer delay ~than:Mtime.Span.min_span then Thread.delay (Clock.Timer.span_to_s delay) - let with_recursive_loop config f arg = - let rec self arg = - perform_delay config.delay_between ; - (f [@tailcall]) self arg + let span_min a b = if Mtime.Span.is_shorter a ~than:b then a else b + + let with_recursive_loop config f = + let rec self arg input = + let arg = span_min config.delay_between Mtime.Span.(2 * arg) in + perform_delay arg ; + (f [@tailcall]) (self arg) input in + let self0 arg input = (f [@tailcall]) (self arg) input in perform_delay config.delay_before ; - f self arg + f (self0 config.delay_initial) end diff --git a/ocaml/xapi-aux/throttle.mli b/ocaml/xapi-aux/throttle.mli index fb4212b565..2551194013 100644 --- a/ocaml/xapi-aux/throttle.mli +++ b/ocaml/xapi-aux/throttle.mli @@ -34,21 +34,27 @@ module Batching : sig *) val with_recursive_loop : t -> (('a -> 'b) -> 'a -> 'b) -> 'a -> 'b - (** [with_recursive_loop config f arg] calls [f self arg], where [self] can be used + (** [with_recursive config f arg] calls [f self arg], where [self] can be used for recursive calls. - A [delay_before] amount of seconds is inserted once, and [delay_between] is inserted between recursive calls: + [arg] is an argument that the implementation of [f] can change between recursive calls for its own purposes, + otherwise [()] can be used. + + A [delay_before] amount of seconds is inserted once, and [delay_between/8] is inserted between recursive calls, + except the first one, and delays increase exponentially until [delay_between] is reached {v delay_before f ... (self[@tailcall]) ... - delay_between f ... (self[@tailcall]) ... - delay_between + delay_between/8 f ... + (self[@tailcall]) ... + delay_between/4 + f ... v} - The delays are determined by [config] + The delays are determined by [config], and [delay_between] uses an exponential backoff, up to [config.delay_between] delay. *) end diff --git a/ocaml/xapi/xapi_event.ml b/ocaml/xapi/xapi_event.ml index 39d87363df..18195d0337 100644 --- a/ocaml/xapi/xapi_event.ml +++ b/ocaml/xapi/xapi_event.ml @@ -497,11 +497,11 @@ let rec next ~__context = in (* Like grab_range () only guarantees to return a non-empty range by blocking if necessary *) let grab_nonempty_range = - Throttle.Batching.with_recursive_loop batching @@ fun self () -> + Throttle.Batching.with_recursive_loop batching @@ fun self arg -> let last_id, end_id = grab_range () in if last_id = end_id then let (_ : int64) = wait subscription end_id in - (self [@tailcall]) () + (self [@tailcall]) arg else (last_id, end_id) in @@ -608,7 +608,7 @@ let from_inner __context session subs from from_t timer batching = let msg_gen, messages, tableset, (creates, mods, deletes, last) = with_call session subs (fun sub -> let grab_nonempty_range = - Throttle.Batching.with_recursive_loop batching @@ fun self () -> + Throttle.Batching.with_recursive_loop batching @@ fun self arg -> let ( (msg_gen, messages, _tableset, (creates, mods, deletes, last)) as result ) = @@ -627,7 +627,7 @@ let from_inner __context session subs from from_t timer batching = (* last id the client got is equivalent to the current one *) last_msg_gen := msg_gen ; wait2 sub last timer ; - (self [@tailcall]) () + (self [@tailcall]) arg ) else result in