diff --git a/ocaml/xapi-guard/lib/disk_cache.ml b/ocaml/xapi-guard/lib/disk_cache.ml index 0f0a6e2c248..5e8b9bb0650 100644 --- a/ocaml/xapi-guard/lib/disk_cache.ml +++ b/ocaml/xapi-guard/lib/disk_cache.ml @@ -62,18 +62,15 @@ let unlink_safe file = type valid_file = t * string -type file = - | Latest of valid_file - | Outdated of valid_file - | Temporary of string - | Invalid of string - -let path_of_key root (uuid, timestamp, key) = - root - // Uuidm.to_string uuid +type file = Latest of valid_file | Outdated of valid_file | Invalid of string + +let print_key (uuid, timestamp, key) = + Uuidm.to_string uuid // Types.Tpm.(serialize_key key |> string_of_int) // Mtime.(to_uint64_ns timestamp |> Int64.to_string) +let path_of_key root key = root // print_key key + let key_of_path path = let ( let* ) = Option.bind in let key_dir = Filename.(dirname path) in @@ -81,7 +78,12 @@ let key_of_path path = let* key = Filename.basename key_dir |> int_of_string_opt - |> Option.map Types.Tpm.deserialize_key + |> Option.map (fun e -> + Types.Tpm.deserialize_key e + |> Result.map_error (fun msg -> D.info "Invalid key found: %s" msg) + |> Result.to_option + ) + |> Option.join in let* timestamp = Filename.basename path @@ -90,24 +92,17 @@ let key_of_path path = in Some ((uuid, timestamp, key), path) -let path_is_temp path = - let pathlen = String.length path in - String.ends_with ~suffix:".pre" path - && key_of_path (String.sub path 0 (pathlen - 4)) |> Option.is_some - -let temp_of_path path = path ^ ".pre" +let only_latest = function + | Latest f -> + Either.Left f + | Outdated (_, p) | Invalid p -> + Right p let sort_updates contents = let classify elem = match key_of_path elem with | None -> - let file = - if path_is_temp elem then - Temporary elem - else - Invalid elem - in - Either.Right file + Either.Right (Invalid elem) | Some valid_file -> Either.Left valid_file in @@ -152,7 +147,7 @@ let read_from ~filename = let persist_to ~filename:f_path ~contents = let atomic_write_to_file ~perm f = - let tmp_path = temp_of_path f_path in + let tmp_path = f_path ^ ".pre" in let dirname = Filename.dirname f_path in let flags = Unix.[O_WRONLY; O_CREAT; O_SYNC] in let* fd_tmp = Lwt_unix.openfile tmp_path flags perm in @@ -285,16 +280,10 @@ end = struct let updates = sort_updates contents in (* 2. Pick latest *) - let only_latest = function - | Latest (_, p) -> - Either.Left p - | Temporary p | Outdated (_, p) | Invalid p -> - Right p - in let latest, _ = List.partition_map only_latest updates in (* 3. fall back to remote read if needed *) - let get_contents path = + let get_contents (_, path) = Lwt.catch (fun () -> read_from ~filename:path) (fun _ -> read_remote ()) in @@ -382,43 +371,38 @@ module Watcher : sig end = struct type push_cache = File of valid_file | Update_all | Wait - (* Outdated and invalid files can be deleted, keep temporary files just in case - they need to be recovered *) - let discarder = function - | Latest _ as f -> - Either.Left f - | Temporary _ as f -> - Left f - | Outdated (_, p) -> - Right p - | Invalid p -> - Right p - let get_latest_and_delete_rest root = let* files = get_all_contents root in - let keep, to_delete = List.partition_map discarder files in + let latest, to_delete = List.partition_map only_latest files in let* () = Lwt_list.iter_p unlink_safe to_delete in - (* Ignore temporaty files *) - let latest = - List.filter_map (function Latest f -> Some f | _ -> None) keep - in Lwt.return latest let retry_push push (uuid, timestamp, key) contents = let __FUN = __FUNCTION__ in let push' () = push (uuid, timestamp, key) contents in - let rec retry k = + let counter = Mtime_clock.counter () in + let rec retry is_first_try = let on_error e = - D.info "%s: Error on push, attempt %i. Reason: %s" __FUN k - (Printexc.to_string e) ; + if is_first_try then + D.debug "%s: Error on push, retrying. Reason: %s" __FUN + (Printexc.to_string e) ; let* () = Lwt_unix.sleep 0.1 in - retry (k + 1) + retry false in Lwt.try_bind push' - (function Ok () -> Lwt.return_unit | Error e -> on_error e) + (function + | Ok () -> Lwt.return (not is_first_try) | Error e -> on_error e + ) on_error in - retry 1 + let* failed = retry true in + ( if failed then + let elapsed = Mtime_clock.count counter in + D.debug "%s: Pushed %s after trying for %s" __FUN + (print_key (uuid, timestamp, key)) + (Fmt.to_to_string Mtime.Span.pp elapsed) + ) ; + Lwt.return_unit let push_file push (key, path) = let __FUN = __FUNCTION__ in @@ -519,30 +503,28 @@ end (** Module use to change the cache contents before the reader and writer start running *) module Setup : sig - val retime_cache_contents : Types.Service.t -> unit Lwt.t + val retime_cache_contents : Types.Service.t -> t List.t Lwt.t + (** [retime_cache_contents typ] retimes the current cache contents so they + are time congruently with the current execution and returns the keys of + valid files that are yet to be pushed *) end = struct type file_action = | Keep of file | Delete of string | Move of {from: string; into: string} - let get_fs_action root now = function + let get_fs_action root now acc = function | Latest ((uuid, timestamp, key), from) as latest -> if Mtime.is_later ~than:now timestamp then let timestamp = now in let into = path_of_key root (uuid, timestamp, key) in - Move {from; into} + ((uuid, timestamp, key) :: acc, Move {from; into}) else - Keep latest - | Temporary _ as temp -> - Keep temp + ((uuid, timestamp, key) :: acc, Keep latest) | Invalid p | Outdated (_, p) -> - Delete p + (acc, Delete p) let commit __FUN = function - | Keep (Temporary p) -> - D.warn "%s: Found temporary file, ignoring '%s'" __FUN p ; - Lwt.return_unit | Keep _ -> Lwt.return_unit | Delete p -> @@ -585,19 +567,31 @@ end = struct let now = Mtime_clock.now () in let root = cache_of typ in let* contents = get_all_contents root in - let* () = - contents - |> List.map (get_fs_action root now) - |> Lwt_list.iter_p (commit __FUNCTION__) + let pending, actions = + contents |> List.fold_left_map (get_fs_action root now) [] in - delete_empty_dirs ~delete_root:false root + let* () = Lwt_list.iter_p (commit __FUNCTION__) actions in + let* () = delete_empty_dirs ~delete_root:false root in + Lwt.return pending end let setup typ read write = - let* () = Setup.retime_cache_contents typ in - let queue, push = Lwt_bounded_stream.create 2 in + let* pending = Setup.retime_cache_contents typ in + let capacity = 512 in + let queue, push = Lwt_bounded_stream.create capacity in let lock = Lwt_mutex.create () in - let q = {queue; push; lock; state= Disengaged} in + let state = + if pending = [] then + Direct + else if List.length pending < capacity then + let () = + List.iter (fun e -> Option.value ~default:() (push (Some e))) pending + in + Engaged + else + Disengaged + in + let q = {queue; push; lock; state} in Lwt.return ( Writer.with_cache ~direct:(read, write) typ q , Watcher.watch ~direct:write typ q diff --git a/ocaml/xapi-guard/lib/types.ml b/ocaml/xapi-guard/lib/types.ml index 3f2b41c7682..ff6dbc1dd3c 100644 --- a/ocaml/xapi-guard/lib/types.ml +++ b/ocaml/xapi-guard/lib/types.ml @@ -28,13 +28,13 @@ module Tpm = struct let deserialize_key = function | 0 -> - Perm + Ok Perm | 1 -> - Save + Ok Save | 2 -> - Volatile + Ok Volatile | s -> - Fmt.invalid_arg "Unknown TPM state key: %i" s + Error Printf.(sprintf "Unknown TPM state key: %i" s) let empty_state = "" diff --git a/ocaml/xapi-guard/lib/types.mli b/ocaml/xapi-guard/lib/types.mli index f210ea8c96a..06b811ba30c 100644 --- a/ocaml/xapi-guard/lib/types.mli +++ b/ocaml/xapi-guard/lib/types.mli @@ -17,7 +17,7 @@ module Tpm : sig (** [key_of_swtpm path] returns a state key represented by [path]. These paths are parts of the requests generated by SWTPM and may contain slashes *) - val deserialize_key : int -> key + val deserialize_key : int -> (key, string) Result.t val serialize_key : key -> int (** [serialize key] returns the state key represented by [key]. *) diff --git a/ocaml/xapi-guard/test/cache_test.ml b/ocaml/xapi-guard/test/cache_test.ml index 97b144839a6..3e51cab2c35 100644 --- a/ocaml/xapi-guard/test/cache_test.ml +++ b/ocaml/xapi-guard/test/cache_test.ml @@ -12,7 +12,7 @@ module TPMs = struct let request_persist uuid write = let __FUN = __FUNCTION__ in - let key = Tpm.deserialize_key (Random.int 3) in + let key = Tpm.deserialize_key (Random.int 3) |> Result.get_ok in let time = Mtime_clock.now () in let serial_n = Atomic.fetch_and_add writes_created 1 in @@ -31,7 +31,7 @@ module TPMs = struct let request_read uuid read = let __FUN = __FUNCTION__ in - let key = Tpm.deserialize_key (Random.int 3) in + let key = Tpm.deserialize_key (Random.int 3) |> Result.get_ok in let time = Mtime_clock.now () in let serial_n = Atomic.fetch_and_add reads_created 1 in @@ -200,5 +200,6 @@ let main () = Lwt.return_unit let () = + Debug.log_to_stdout () ; setup_log @@ Some Logs.Debug ; Lwt_main.run (main ())