Skip to content

Commit

Permalink
Merge pull request xapi-project#5506 from psafont/private/paus/twins
Browse files Browse the repository at this point in the history
  • Loading branch information
psafont authored Mar 27, 2024
2 parents 7f1d315 + 7268e36 commit add5d2c
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 79 deletions.
138 changes: 66 additions & 72 deletions ocaml/xapi-guard/lib/disk_cache.ml
Original file line number Diff line number Diff line change
Expand Up @@ -62,26 +62,28 @@ let unlink_safe file =

type valid_file = t * string

type file =
| Latest of valid_file
| Outdated of valid_file
| Temporary of string
| Invalid of string

let path_of_key root (uuid, timestamp, key) =
root
// Uuidm.to_string uuid
type file = Latest of valid_file | Outdated of valid_file | Invalid of string

let print_key (uuid, timestamp, key) =
Uuidm.to_string uuid
// Types.Tpm.(serialize_key key |> string_of_int)
// Mtime.(to_uint64_ns timestamp |> Int64.to_string)

let path_of_key root key = root // print_key key

let key_of_path path =
let ( let* ) = Option.bind in
let key_dir = Filename.(dirname path) in
let* uuid = Filename.(basename (dirname key_dir)) |> Uuidm.of_string in
let* key =
Filename.basename key_dir
|> int_of_string_opt
|> Option.map Types.Tpm.deserialize_key
|> Option.map (fun e ->
Types.Tpm.deserialize_key e
|> Result.map_error (fun msg -> D.info "Invalid key found: %s" msg)
|> Result.to_option
)
|> Option.join
in
let* timestamp =
Filename.basename path
Expand All @@ -90,24 +92,17 @@ let key_of_path path =
in
Some ((uuid, timestamp, key), path)

let path_is_temp path =
let pathlen = String.length path in
String.ends_with ~suffix:".pre" path
&& key_of_path (String.sub path 0 (pathlen - 4)) |> Option.is_some

let temp_of_path path = path ^ ".pre"
let only_latest = function
| Latest f ->
Either.Left f
| Outdated (_, p) | Invalid p ->
Right p

let sort_updates contents =
let classify elem =
match key_of_path elem with
| None ->
let file =
if path_is_temp elem then
Temporary elem
else
Invalid elem
in
Either.Right file
Either.Right (Invalid elem)
| Some valid_file ->
Either.Left valid_file
in
Expand Down Expand Up @@ -152,7 +147,7 @@ let read_from ~filename =

let persist_to ~filename:f_path ~contents =
let atomic_write_to_file ~perm f =
let tmp_path = temp_of_path f_path in
let tmp_path = f_path ^ ".pre" in
let dirname = Filename.dirname f_path in
let flags = Unix.[O_WRONLY; O_CREAT; O_SYNC] in
let* fd_tmp = Lwt_unix.openfile tmp_path flags perm in
Expand Down Expand Up @@ -285,16 +280,10 @@ end = struct
let updates = sort_updates contents in

(* 2. Pick latest *)
let only_latest = function
| Latest (_, p) ->
Either.Left p
| Temporary p | Outdated (_, p) | Invalid p ->
Right p
in
let latest, _ = List.partition_map only_latest updates in

(* 3. fall back to remote read if needed *)
let get_contents path =
let get_contents (_, path) =
Lwt.catch (fun () -> read_from ~filename:path) (fun _ -> read_remote ())
in

Expand Down Expand Up @@ -382,43 +371,38 @@ module Watcher : sig
end = struct
type push_cache = File of valid_file | Update_all | Wait

(* Outdated and invalid files can be deleted, keep temporary files just in case
they need to be recovered *)
let discarder = function
| Latest _ as f ->
Either.Left f
| Temporary _ as f ->
Left f
| Outdated (_, p) ->
Right p
| Invalid p ->
Right p

let get_latest_and_delete_rest root =
let* files = get_all_contents root in
let keep, to_delete = List.partition_map discarder files in
let latest, to_delete = List.partition_map only_latest files in
let* () = Lwt_list.iter_p unlink_safe to_delete in
(* Ignore temporaty files *)
let latest =
List.filter_map (function Latest f -> Some f | _ -> None) keep
in
Lwt.return latest

let retry_push push (uuid, timestamp, key) contents =
let __FUN = __FUNCTION__ in
let push' () = push (uuid, timestamp, key) contents in
let rec retry k =
let counter = Mtime_clock.counter () in
let rec retry is_first_try =
let on_error e =
D.info "%s: Error on push, attempt %i. Reason: %s" __FUN k
(Printexc.to_string e) ;
if is_first_try then
D.debug "%s: Error on push, retrying. Reason: %s" __FUN
(Printexc.to_string e) ;
let* () = Lwt_unix.sleep 0.1 in
retry (k + 1)
retry false
in
Lwt.try_bind push'
(function Ok () -> Lwt.return_unit | Error e -> on_error e)
(function
| Ok () -> Lwt.return (not is_first_try) | Error e -> on_error e
)
on_error
in
retry 1
let* failed = retry true in
( if failed then
let elapsed = Mtime_clock.count counter in
D.debug "%s: Pushed %s after trying for %s" __FUN
(print_key (uuid, timestamp, key))
(Fmt.to_to_string Mtime.Span.pp elapsed)
) ;
Lwt.return_unit

let push_file push (key, path) =
let __FUN = __FUNCTION__ in
Expand Down Expand Up @@ -519,30 +503,28 @@ end
(** Module use to change the cache contents before the reader and writer start
running *)
module Setup : sig
val retime_cache_contents : Types.Service.t -> unit Lwt.t
val retime_cache_contents : Types.Service.t -> t List.t Lwt.t
(** [retime_cache_contents typ] retimes the current cache contents so they
are time congruently with the current execution and returns the keys of
valid files that are yet to be pushed *)
end = struct
type file_action =
| Keep of file
| Delete of string
| Move of {from: string; into: string}

let get_fs_action root now = function
let get_fs_action root now acc = function
| Latest ((uuid, timestamp, key), from) as latest ->
if Mtime.is_later ~than:now timestamp then
let timestamp = now in
let into = path_of_key root (uuid, timestamp, key) in
Move {from; into}
((uuid, timestamp, key) :: acc, Move {from; into})
else
Keep latest
| Temporary _ as temp ->
Keep temp
((uuid, timestamp, key) :: acc, Keep latest)
| Invalid p | Outdated (_, p) ->
Delete p
(acc, Delete p)

let commit __FUN = function
| Keep (Temporary p) ->
D.warn "%s: Found temporary file, ignoring '%s'" __FUN p ;
Lwt.return_unit
| Keep _ ->
Lwt.return_unit
| Delete p ->
Expand Down Expand Up @@ -585,19 +567,31 @@ end = struct
let now = Mtime_clock.now () in
let root = cache_of typ in
let* contents = get_all_contents root in
let* () =
contents
|> List.map (get_fs_action root now)
|> Lwt_list.iter_p (commit __FUNCTION__)
let pending, actions =
contents |> List.fold_left_map (get_fs_action root now) []
in
delete_empty_dirs ~delete_root:false root
let* () = Lwt_list.iter_p (commit __FUNCTION__) actions in
let* () = delete_empty_dirs ~delete_root:false root in
Lwt.return pending
end

let setup typ read write =
let* () = Setup.retime_cache_contents typ in
let queue, push = Lwt_bounded_stream.create 2 in
let* pending = Setup.retime_cache_contents typ in
let capacity = 512 in
let queue, push = Lwt_bounded_stream.create capacity in
let lock = Lwt_mutex.create () in
let q = {queue; push; lock; state= Disengaged} in
let state =
if pending = [] then
Direct
else if List.length pending < capacity then
let () =
List.iter (fun e -> Option.value ~default:() (push (Some e))) pending
in
Engaged
else
Disengaged
in
let q = {queue; push; lock; state} in
Lwt.return
( Writer.with_cache ~direct:(read, write) typ q
, Watcher.watch ~direct:write typ q
Expand Down
8 changes: 4 additions & 4 deletions ocaml/xapi-guard/lib/types.ml
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ module Tpm = struct

let deserialize_key = function
| 0 ->
Perm
Ok Perm
| 1 ->
Save
Ok Save
| 2 ->
Volatile
Ok Volatile
| s ->
Fmt.invalid_arg "Unknown TPM state key: %i" s
Error Printf.(sprintf "Unknown TPM state key: %i" s)

let empty_state = ""

Expand Down
2 changes: 1 addition & 1 deletion ocaml/xapi-guard/lib/types.mli
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ module Tpm : sig
(** [key_of_swtpm path] returns a state key represented by [path]. These paths
are parts of the requests generated by SWTPM and may contain slashes *)

val deserialize_key : int -> key
val deserialize_key : int -> (key, string) Result.t

val serialize_key : key -> int
(** [serialize key] returns the state key represented by [key]. *)
Expand Down
5 changes: 3 additions & 2 deletions ocaml/xapi-guard/test/cache_test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module TPMs = struct
let request_persist uuid write =
let __FUN = __FUNCTION__ in

let key = Tpm.deserialize_key (Random.int 3) in
let key = Tpm.deserialize_key (Random.int 3) |> Result.get_ok in

let time = Mtime_clock.now () in
let serial_n = Atomic.fetch_and_add writes_created 1 in
Expand All @@ -31,7 +31,7 @@ module TPMs = struct
let request_read uuid read =
let __FUN = __FUNCTION__ in

let key = Tpm.deserialize_key (Random.int 3) in
let key = Tpm.deserialize_key (Random.int 3) |> Result.get_ok in

let time = Mtime_clock.now () in
let serial_n = Atomic.fetch_and_add reads_created 1 in
Expand Down Expand Up @@ -200,5 +200,6 @@ let main () =
Lwt.return_unit

let () =
Debug.log_to_stdout () ;
setup_log @@ Some Logs.Debug ;
Lwt_main.run (main ())

0 comments on commit add5d2c

Please sign in to comment.