diff --git a/ocaml/tests/test_sm_features.ml b/ocaml/tests/test_sm_features.ml index 091d58d4f6e..43bce4c3807 100644 --- a/ocaml/tests/test_sm_features.ml +++ b/ocaml/tests/test_sm_features.ml @@ -20,7 +20,7 @@ type sm_data_sequence = { (* Text feature list we get back as part of sr_get_driver_info. *) raw: string list ; (* SMAPIv1 driver info. *) - smapiv1_features: Smint.feature list + smapiv1_features: Smint.Feature.t list ; (* SMAPIv2 driver info. *) smapiv2_features: string list ; (* SM object created in the database. *) @@ -40,7 +40,6 @@ let string_of_sm_object sm = ) let test_sequences = - let open Smint in [ (* Test NFS driver features as of Clearwater. *) { @@ -179,14 +178,14 @@ module ParseSMAPIv1Features = Generic.MakeStateless (struct module Io = struct type input_t = string list - type output_t = Smint.feature list + type output_t = Smint.Feature.t list let string_of_input_t = Test_printers.(list string) - let string_of_output_t = Test_printers.(list Smint.string_of_feature) + let string_of_output_t = Test_printers.(list Smint.Feature.to_string) end - let transform = Smint.parse_capability_int64_features + let transform = Smint.Feature.parse_capability_int64 let tests = `QuickAndAutoDocumented @@ -198,16 +197,16 @@ end) module CreateSMAPIv2Features = Generic.MakeStateless (struct module Io = struct - type input_t = Smint.feature list + type input_t = Smint.Feature.t list type output_t = string list - let string_of_input_t = Test_printers.(list Smint.string_of_feature) + let string_of_input_t = Test_printers.(list Smint.Feature.to_string) let string_of_output_t = Test_printers.(list string) end - let transform = List.map Smint.string_of_feature + let transform = List.map Smint.Feature.to_string let tests = `QuickAndAutoDocumented @@ -276,9 +275,10 @@ module CompatSMFeatures = Generic.MakeStateless (struct end let transform l = + let open Smint.Feature in List.split l |> fun (x, y) -> - (Smint.parse_string_int64_features x, Smint.parse_string_int64_features y) - |> fun (x, y) -> Smint.compat_features x y |> List.map Smint.unparse_feature + (parse_string_int64 x, parse_string_int64 y) |> fun (x, y) -> + compat_features x y |> List.map unparse let tests = let r1, r2 = test_intersection_sequences in diff --git a/ocaml/tests/test_storage_migrate_state.ml b/ocaml/tests/test_storage_migrate_state.ml index d822b7ef393..42087887995 100644 --- a/ocaml/tests/test_storage_migrate_state.ml +++ b/ocaml/tests/test_storage_migrate_state.ml @@ -44,14 +44,16 @@ let sample_send_state = } let sample_receive_state = + let open Storage_interface in Storage_migrate.State.Receive_state. { - sr= Storage_interface.Sr.of_string "my_sr" - ; dummy_vdi= Storage_interface.Vdi.of_string "dummy_vdi" - ; leaf_vdi= Storage_interface.Vdi.of_string "leaf_vdi" + sr= Sr.of_string "my_sr" + ; dummy_vdi= Vdi.of_string "dummy_vdi" + ; leaf_vdi= Vdi.of_string "leaf_vdi" ; leaf_dp= "leaf_dp" - ; parent_vdi= Storage_interface.Vdi.of_string "parent_vdi" - ; remote_vdi= Storage_interface.Vdi.of_string "remote_vdi" + ; parent_vdi= Vdi.of_string "parent_vdi" + ; remote_vdi= Vdi.of_string "remote_vdi" + ; mirror_vm= Vm.of_string "mirror_vm" } let sample_copy_state = diff --git a/ocaml/vhd-tool/cli/dune b/ocaml/vhd-tool/cli/dune index aca350c9f45..5dea7468c10 100644 --- a/ocaml/vhd-tool/cli/dune +++ b/ocaml/vhd-tool/cli/dune @@ -4,7 +4,7 @@ (libraries astring - local_lib + vhd_lib cmdliner cstruct forkexec diff --git a/ocaml/vhd-tool/cli/sparse_dd.ml b/ocaml/vhd-tool/cli/sparse_dd.ml index 19dc6422a27..e593a1bb049 100644 --- a/ocaml/vhd-tool/cli/sparse_dd.ml +++ b/ocaml/vhd-tool/cli/sparse_dd.ml @@ -65,6 +65,10 @@ let sni = ref None let cert_bundle_path = ref None +let dest_proto = ref None + +let nbd_export = ref "" + let string_opt = function None -> "None" | Some x -> x let machine_readable_progress = ref false @@ -158,6 +162,16 @@ let options = , (fun () -> string_opt !cert_bundle_path) , "path to a CA certificate bundle" ) + ; ( "dest-proto" + , Arg.String (fun x -> dest_proto := Some x) + , (fun () -> string_opt !dest_proto) + , "destination protocol to be used for copying the disk" + ) + ; ( "nbd-export" + , Arg.String (fun x -> nbd_export := x) + , (fun () -> !nbd_export) + , "nbd export name to be used, only useful when dest-proto is nbd" + ) ] let startswith prefix x = @@ -288,6 +302,15 @@ let _ = debug "Must have -size argument\n" ; exit 1 ) ; + let dest_proto = + match !dest_proto with + | Some "nbd" -> + Some (StreamCommon.Nbd !nbd_export) + | Some x -> + Some (StreamCommon.protocol_of_string x) + | None -> + None + in let size = !size in let base = !base in (* Helper function to bring an int into valid range *) @@ -485,7 +508,7 @@ let _ = in let t = stream_t >>= fun s -> - Impl.write_stream common s destination None !prezeroed progress None + Impl.write_stream common s destination dest_proto !prezeroed progress None !good_ciphersuites verify_cert in if destination_format = "vhd" then diff --git a/ocaml/vhd-tool/src/dune b/ocaml/vhd-tool/src/dune index f7ab6341f77..7aa9d0de704 100644 --- a/ocaml/vhd-tool/src/dune +++ b/ocaml/vhd-tool/src/dune @@ -4,7 +4,7 @@ (language c) (names direct_copy_stubs) ) - (name local_lib) + (name vhd_lib) (wrapped false) (libraries astring @@ -32,6 +32,7 @@ tapctl xapi-stdext-std xapi-stdext-unix + xapi-log xen-api-client-lwt ) (preprocess diff --git a/ocaml/vhd-tool/src/impl.ml b/ocaml/vhd-tool/src/impl.ml index 52f2b3aa501..d067846f565 100644 --- a/ocaml/vhd-tool/src/impl.ml +++ b/ocaml/vhd-tool/src/impl.ml @@ -17,6 +17,8 @@ open Lwt module F = Vhd_format.F.From_file (Vhd_format_lwt.IO) module In = Vhd_format.F.From_input (Input) +module D = Debug.Make (struct let name = "vhd_impl" end) + module Channel_In = Vhd_format.F.From_input (struct include Lwt @@ -218,7 +220,7 @@ let[@warning "-27"] stream_human _common _ s _ _ ?(progress = no_progress_bar) Printf.printf "# end of stream\n" ; return None -let stream_nbd _common c s prezeroed _ ?(progress = no_progress_bar) () = +let stream_nbd _common c s prezeroed ~export ?(progress = no_progress_bar) () = let open Nbd_unix in let c = { @@ -229,9 +231,10 @@ let stream_nbd _common c s prezeroed _ ?(progress = no_progress_bar) () = } in - Client.negotiate c "" >>= fun (server, _size, _flags) -> + Client.negotiate c export >>= fun (server, size, _flags) -> (* Work to do is: non-zero data to write + empty sectors if the target is not prezeroed *) + D.debug "%s nbd negotiation done, size is %Ld" __FUNCTION__ size ; let total_work = let open Vhd_format.F in Int64.( @@ -985,7 +988,7 @@ let write_stream common s destination destination_protocol prezeroed progress return (c, [NoProtocol; Human; Tar]) | File_descr fd -> Channels.of_raw_fd fd >>= fun c -> - return (c, [Nbd; NoProtocol; Chunked; Human; Tar]) + return (c, [dummy_nbd; NoProtocol; Chunked; Human; Tar]) | Sockaddr sockaddr -> let sock = socket sockaddr in Lwt.catch @@ -993,7 +996,7 @@ let write_stream common s destination destination_protocol prezeroed progress (fun e -> Lwt_unix.close sock >>= fun () -> Lwt.fail e) >>= fun () -> Channels.of_raw_fd sock >>= fun c -> - return (c, [Nbd; NoProtocol; Chunked; Human; Tar]) + return (c, [dummy_nbd; NoProtocol; Chunked; Human; Tar]) | Https uri' | Http uri' -> ( (* TODO: https is not currently implemented *) let port = @@ -1013,7 +1016,6 @@ let write_stream common s destination destination_protocol prezeroed progress let host = Scanf.ksscanf host (fun _ _ -> host) "[%s@]" Fun.id in Lwt_unix.getaddrinfo host (string_of_int port) [] >>= fun he -> if he = [] then raise Not_found ; - let sockaddr = (List.hd he).Unix.ai_addr in let sock = socket sockaddr in Lwt.catch @@ -1074,7 +1076,7 @@ let write_stream common s destination destination_protocol prezeroed progress List.mem_assoc te headers && List.assoc te headers = "nbd" in if advertises_nbd then - return (c, [Nbd]) + return (c, [dummy_nbd]) else return (c, [Chunked; NoProtocol]) else @@ -1088,10 +1090,15 @@ let write_stream common s destination destination_protocol prezeroed progress x | None -> let t = List.hd possible_protocols in - Printf.fprintf stderr "Using protocol: %s\n%!" (string_of_protocol t) ; + D.info "Using protocol: %s\n%!" (string_of_protocol t) ; t in - if not (List.mem destination_protocol possible_protocols) then + (* when checking for the validity of the protocol, we only care about the protocol + itself and not the export name, if it was nbd. Convert all Nbd export to Nbd "" *) + let equal_to_dest_proto = + ( = ) (match destination_protocol with Nbd _ -> dummy_nbd | y -> y) + in + if not (List.exists equal_to_dest_proto possible_protocols) then fail (Failure (Printf.sprintf "this destination only supports protocols: [ %s ]" @@ -1101,18 +1108,17 @@ let write_stream common s destination destination_protocol prezeroed progress else let start = Unix.gettimeofday () in ( match destination_protocol with - | Nbd -> - stream_nbd + | Nbd export -> + stream_nbd common c s prezeroed ~export ~progress () | Human -> - stream_human + stream_human common c s prezeroed tar_filename_prefix ~progress () | Chunked -> - stream_chunked + stream_chunked common c s prezeroed tar_filename_prefix ~progress () | Tar -> - stream_tar + stream_tar common c s prezeroed tar_filename_prefix ~progress () | NoProtocol -> - stream_raw + stream_raw common c s prezeroed tar_filename_prefix ~progress () ) - common c s prezeroed tar_filename_prefix ~progress () >>= fun p -> c.Channels.close () >>= fun () -> match p with @@ -1319,7 +1325,7 @@ let serve common_options source source_fd source_format source_protocol let supported_formats = ["raw"] in if not (List.mem destination_format supported_formats) then failwith (Printf.sprintf "%s is not a supported format" destination_format) ; - let supported_protocols = [NoProtocol; Chunked; Nbd; Tar] in + let supported_protocols = [NoProtocol; Chunked; dummy_nbd; Tar] in if not (List.mem source_protocol supported_protocols) then failwith (Printf.sprintf "%s is not a supported source protocol" @@ -1409,7 +1415,7 @@ let serve common_options source source_fd source_format source_protocol match (source_format, source_protocol) with | "raw", NoProtocol -> serve_raw_to_raw common_options size - | "raw", Nbd -> + | "raw", Nbd _ -> serve_nbd_to_raw common_options size | "raw", Chunked -> serve_chunked_to_raw common_options diff --git a/ocaml/vhd-tool/src/streamCommon.ml b/ocaml/vhd-tool/src/streamCommon.ml index 9ec8e243205..57aae1236d1 100644 --- a/ocaml/vhd-tool/src/streamCommon.ml +++ b/ocaml/vhd-tool/src/streamCommon.ml @@ -12,11 +12,20 @@ * GNU Lesser General Public License for more details. *) -type protocol = Nbd | Chunked | Human | Tar | NoProtocol +type protocol = + | Nbd of string (* export name used by the nbd client during negotiation*) + | Chunked + | Human + | Tar + | NoProtocol + +(** This dummy nbd has no export name in it, it is introduced for backwards compatability +reasons. *) +let dummy_nbd = Nbd "" let protocol_of_string = function | "nbd" -> - Nbd + dummy_nbd | "chunked" -> Chunked | "human" -> @@ -29,8 +38,8 @@ let protocol_of_string = function failwith (Printf.sprintf "Unsupported protocol: %s" x) let string_of_protocol = function - | Nbd -> - "nbd" + | Nbd export -> + "nbd:" ^ export | Chunked -> "chunked" | Human -> diff --git a/ocaml/vhd-tool/test/dune b/ocaml/vhd-tool/test/dune index 77bbe519bc3..b31c295ccd7 100644 --- a/ocaml/vhd-tool/test/dune +++ b/ocaml/vhd-tool/test/dune @@ -4,7 +4,7 @@ alcotest alcotest-lwt lwt - local_lib + vhd_lib vhd-format vhd-format-lwt ) diff --git a/ocaml/xapi-idl/storage/storage_interface.ml b/ocaml/xapi-idl/storage/storage_interface.ml index aa5754fabb9..43f347e0386 100644 --- a/ocaml/xapi-idl/storage/storage_interface.ml +++ b/ocaml/xapi-idl/storage/storage_interface.ml @@ -251,6 +251,8 @@ let string_of_vdi_info (x : vdi_info) = Jsonrpc.to_string (rpc_of vdi_info x) "datapaths". *) type dp = string [@@deriving rpcty] +type sock_path = string [@@deriving rpcty] + type dp_stat_t = { superstate: Vdi_automaton.state ; dps: (string * Vdi_automaton.state) list @@ -443,6 +445,8 @@ module StorageAPI (R : RPC) = struct let dp_p = Param.mk ~name:"dp" dp + let sock_path_p = Param.mk ~name:"sock_path" sock_path + let device_config_p = Param.mk ~name:"device_config" ~description:["Backend-specific keys to specify the storage for the SR"] @@ -512,6 +516,9 @@ module StorageAPI (R : RPC) = struct @-> returning unit_p err ) + (** [attach_info context dbg sr vdi dp vm] returns the information as returned + by the [attach3 dbg dp sr vdi vm _] call. Callers of this function should ensure + that VDIs are already attached before calling this function. *) let attach_info = let backend_p = Param.mk ~name:"backend" backend in declare "DP.attach_info" @@ -519,7 +526,7 @@ module StorageAPI (R : RPC) = struct "[DP.attach_info sr vdi dp]: returns the params of the dp (the \ return value of VDI.attach2)" ] - (dbg_p @-> sr_p @-> vdi_p @-> dp_p @-> returning backend_p err) + (dbg_p @-> sr_p @-> vdi_p @-> dp_p @-> vm_p @-> returning backend_p err) (** *) let diagnostics = @@ -824,7 +831,7 @@ module StorageAPI (R : RPC) = struct @-> returning backend_p err ) - (** [attach3 task dp sr vdi read_write] returns the [params] for a given + (** [attach3 task dp sr vdi vm read_write] returns the [params] for a given [vdi] in [sr] which can be written to if (but not necessarily only if) [read_write] is true *) let attach3 = @@ -906,8 +913,8 @@ module StorageAPI (R : RPC) = struct declare "VDI.set_content_id" [] (dbg_p @-> sr_p @-> vdi_p @-> content_id_p @-> returning unit_p err) - (** [compose task sr vdi1 vdi2] layers the updates from [vdi2] onto [vdi1], - modifying [vdi2] *) + (** [compose task sr parent child] layers the updates from [child] onto [parent], + modifying [child] *) let compose = let vdi1_p = Param.mk ~name:"vdi1" Vdi.t in let vdi2_p = Param.mk ~name:"vdi2" Vdi.t in @@ -982,6 +989,7 @@ module StorageAPI (R : RPC) = struct (dbg_p @-> sr_p @-> vdi_p + @-> vm_p @-> url_p @-> dest_p @-> verify_dest_p @@ -997,6 +1005,8 @@ module StorageAPI (R : RPC) = struct @-> sr_p @-> vdi_p @-> dp_p + @-> vm_p + @-> vm_p @-> url_p @-> dest_p @-> verify_dest_p @@ -1013,7 +1023,11 @@ module StorageAPI (R : RPC) = struct let result_p = Param.mk ~name:"result" Mirror.t in declare "DATA.MIRROR.stat" [] (dbg_p @-> id_p @-> returning result_p err) - (** Called on the receiving end *) + (** Called on the receiving end + @deprecated This function is deprecated, and is only here to keep backward + compatibility with old xapis that call Remote.DATA.MIRROR.receive_start during SXM. + Use the receive_start2 function instead. + *) let receive_start = let similar_p = Param.mk ~name:"similar" Mirror.similars in let result = Param.mk ~name:"result" Mirror.mirror_receive_result in @@ -1026,10 +1040,40 @@ module StorageAPI (R : RPC) = struct @-> returning result err ) + (** Called on the receiving end to prepare for receipt of the storage. This + function should be used in conjunction with [receive_finalize2]*) + let receive_start2 = + let similar_p = Param.mk ~name:"similar" Mirror.similars in + let result = Param.mk ~name:"result" Mirror.mirror_receive_result in + declare "DATA.MIRROR.receive_start2" [] + (dbg_p + @-> sr_p + @-> VDI.vdi_info_p + @-> id_p + @-> similar_p + @-> vm_p + @-> returning result err + ) + + (** Called on the receiving end + @deprecated This function is deprecated, and is only here to keep backward + compatibility with old xapis that call Remote.DATA.MIRROR.receive_finalize + during SXM. Use the receive_finalize2 function instead. + *) let receive_finalize = declare "DATA.MIRROR.receive_finalize" [] (dbg_p @-> id_p @-> returning unit_p err) + (** [receive_finalize2 dbg id] will stop the mirroring process and compose + the snapshot VDI with the mirror VDI. It also cleans up the storage resources + used by mirroring. It is called after the the source VM is paused. This fucntion + should be used in conjunction with [receive_start2] *) + let receive_finalize2 = + declare "DATA.MIRROR.receive_finalize2" [] + (dbg_p @-> id_p @-> returning unit_p err) + + (** [receive_cancel dbg id] is called in the case of migration failure to + do the clean up.*) let receive_cancel = declare "DATA.MIRROR.receive_cancel" [] (dbg_p @-> id_p @-> returning unit_p err) @@ -1039,6 +1083,32 @@ module StorageAPI (R : RPC) = struct Param.mk ~name:"mirrors" TypeCombinators.(list (pair Mirror.(id, t))) in declare "DATA.MIRROR.list" [] (dbg_p @-> returning result_p err) + + (** [import_activate dbg dp sr vdi vm] returns a server socket address to + which a fd can be passed via SCM_RIGHTS for mirroring purposes.*) + let import_activate = + declare "DATA.MIRROR.import_activate" [] + (dbg_p + @-> dp_p + @-> sr_p + @-> vdi_p + @-> vm_p + @-> returning sock_path_p err + ) + + (** [get_nbd_server dbg dp sr vdi vm] returns the address of a generic nbd + server that can be connected to. Depending on the backend, this will either + be a nbd server backed by tapdisk or qemu-dp. Note this is different + from [import_activate] as the returned server does not accept fds. *) + let get_nbd_server = + declare "DATA.MIRROR.get_nbd_server" [] + (dbg_p + @-> dp_p + @-> sr_p + @-> vdi_p + @-> vm_p + @-> returning sock_path_p err + ) end end @@ -1108,7 +1178,7 @@ module type Server_impl = sig -> unit val attach_info : - context -> dbg:debug_info -> sr:sr -> vdi:vdi -> dp:dp -> backend + context -> dbg:debug_info -> sr:sr -> vdi:vdi -> dp:dp -> vm:vm -> backend val diagnostics : context -> unit -> string @@ -1333,6 +1403,7 @@ module type Server_impl = sig -> dbg:debug_info -> sr:sr -> vdi:vdi + -> vm:vm -> url:string -> dest:sr -> verify_dest:bool @@ -1345,6 +1416,8 @@ module type Server_impl = sig -> sr:sr -> vdi:vdi -> dp:dp + -> mirror_vm:vm + -> copy_vm:vm -> url:string -> dest:sr -> verify_dest:bool @@ -1363,11 +1436,41 @@ module type Server_impl = sig -> similar:Mirror.similars -> Mirror.mirror_receive_result + val receive_start2 : + context + -> dbg:debug_info + -> sr:sr + -> vdi_info:vdi_info + -> id:Mirror.id + -> similar:Mirror.similars + -> vm:vm + -> Mirror.mirror_receive_result + val receive_finalize : context -> dbg:debug_info -> id:Mirror.id -> unit + val receive_finalize2 : context -> dbg:debug_info -> id:Mirror.id -> unit + val receive_cancel : context -> dbg:debug_info -> id:Mirror.id -> unit val list : context -> dbg:debug_info -> (Mirror.id * Mirror.t) list + + val import_activate : + context + -> dbg:debug_info + -> dp:dp + -> sr:sr + -> vdi:vdi + -> vm:vm + -> sock_path + + val get_nbd_server : + context + -> dbg:debug_info + -> dp:dp + -> sr:sr + -> vdi:vdi + -> vm:vm + -> sock_path end end @@ -1409,8 +1512,8 @@ module Server (Impl : Server_impl) () = struct S.DP.destroy2 (fun dbg dp sr vdi vm allow_leak -> Impl.DP.destroy2 () ~dbg ~dp ~sr ~vdi ~vm ~allow_leak ) ; - S.DP.attach_info (fun dbg sr vdi dp -> - Impl.DP.attach_info () ~dbg ~sr ~vdi ~dp + S.DP.attach_info (fun dbg sr vdi dp vm -> + Impl.DP.attach_info () ~dbg ~sr ~vdi ~dp ~vm ) ; S.DP.diagnostics (fun () -> Impl.DP.diagnostics () ()) ; S.DP.stat_vdi (fun dbg sr vdi () -> Impl.DP.stat_vdi () ~dbg ~sr ~vdi ()) ; @@ -1521,24 +1624,38 @@ module Server (Impl : Server_impl) () = struct Impl.VDI.list_changed_blocks () ~dbg ~sr ~vdi_from ~vdi_to ) ; S.get_by_name (fun dbg name -> Impl.get_by_name () ~dbg ~name) ; - S.DATA.copy (fun dbg sr vdi url dest verify_dest -> - Impl.DATA.copy () ~dbg ~sr ~vdi ~url ~dest ~verify_dest + S.DATA.copy (fun dbg sr vdi vm url dest verify_dest -> + Impl.DATA.copy () ~dbg ~sr ~vdi ~vm ~url ~dest ~verify_dest ) ; - S.DATA.MIRROR.start (fun dbg sr vdi dp url dest verify_dest -> - Impl.DATA.MIRROR.start () ~dbg ~sr ~vdi ~dp ~url ~dest ~verify_dest + S.DATA.MIRROR.start + (fun dbg sr vdi dp mirror_vm copy_vm url dest verify_dest -> + Impl.DATA.MIRROR.start () ~dbg ~sr ~vdi ~dp ~mirror_vm ~copy_vm ~url + ~dest ~verify_dest ) ; S.DATA.MIRROR.stop (fun dbg id -> Impl.DATA.MIRROR.stop () ~dbg ~id) ; S.DATA.MIRROR.stat (fun dbg id -> Impl.DATA.MIRROR.stat () ~dbg ~id) ; S.DATA.MIRROR.receive_start (fun dbg sr vdi_info id similar -> Impl.DATA.MIRROR.receive_start () ~dbg ~sr ~vdi_info ~id ~similar ) ; + S.DATA.MIRROR.receive_start2 (fun dbg sr vdi_info id similar vm -> + Impl.DATA.MIRROR.receive_start2 () ~dbg ~sr ~vdi_info ~id ~similar ~vm + ) ; S.DATA.MIRROR.receive_cancel (fun dbg id -> Impl.DATA.MIRROR.receive_cancel () ~dbg ~id ) ; S.DATA.MIRROR.receive_finalize (fun dbg id -> Impl.DATA.MIRROR.receive_finalize () ~dbg ~id ) ; + S.DATA.MIRROR.receive_finalize2 (fun dbg id -> + Impl.DATA.MIRROR.receive_finalize2 () ~dbg ~id + ) ; S.DATA.MIRROR.list (fun dbg -> Impl.DATA.MIRROR.list () ~dbg) ; + S.DATA.MIRROR.import_activate (fun dbg dp sr vdi vm -> + Impl.DATA.MIRROR.import_activate () ~dbg ~dp ~sr ~vdi ~vm + ) ; + S.DATA.MIRROR.get_nbd_server (fun dbg dp sr vdi vm -> + Impl.DATA.MIRROR.get_nbd_server () ~dbg ~dp ~sr ~vdi ~vm + ) ; S.Policy.get_backend_vm (fun dbg vm sr vdi -> Impl.Policy.get_backend_vm () ~dbg ~vm ~sr ~vdi ) ; diff --git a/ocaml/xapi-idl/storage/storage_skeleton.ml b/ocaml/xapi-idl/storage/storage_skeleton.ml index cced1a7f6f5..ab84ed7712e 100644 --- a/ocaml/xapi-idl/storage/storage_skeleton.ml +++ b/ocaml/xapi-idl/storage/storage_skeleton.ml @@ -39,7 +39,7 @@ module DP = struct let destroy2 ctx ~dbg ~dp ~sr ~vdi ~vm ~allow_leak = u "DP.destroy2" - let attach_info ctx ~dbg ~sr ~vdi ~dp = u "DP.attach_info" + let attach_info ctx ~dbg ~sr ~vdi ~dp ~vm = u "DP.attach_info" let diagnostics ctx () = u "DP.diagnostics" @@ -152,12 +152,13 @@ end let get_by_name ctx ~dbg ~name = u "get_by_name" module DATA = struct - let copy ctx ~dbg ~sr ~vdi ~url ~dest = u "DATA.copy" + let copy ctx ~dbg ~sr ~vdi ~vm ~url ~dest = u "DATA.copy" module MIRROR = struct (** [start task sr vdi url sr2] creates a VDI in remote [url]'s [sr2] and writes data synchronously. It returns the id of the VDI.*) - let start ctx ~dbg ~sr ~vdi ~dp ~url ~dest = u "DATA.MIRROR.start" + let start ctx ~dbg ~sr ~vdi ~dp ~mirror_vm ~copy_vm ~url ~dest = + u "DATA.MIRROR.start" let stop ctx ~dbg ~id = u "DATA.MIRROR.stop" @@ -166,11 +167,22 @@ module DATA = struct let receive_start ctx ~dbg ~sr ~vdi_info ~id ~similar = u "DATA.MIRROR.receive_start" + let receive_start2 ctx ~dbg ~sr ~vdi_info ~id ~similar ~vm = + u "DATA.MIRROR.receive_start2" + let receive_finalize ctx ~dbg ~id = u "DATA.MIRROR.receive_finalize" + let receive_finalize2 ctx ~dbg ~id = u "DATA.MIRROR.receive_finalize2" + let receive_cancel ctx ~dbg ~id = u "DATA.MIRROR.receive_cancel" let list ctx ~dbg = u "DATA.MIRROR.list" + + let import_activate ctx ~dbg ~dp ~sr ~vdi ~vm = + u "DATA.MIRROR.import_activate" + + let get_nbd_server ctx ~dbg ~dp ~sr ~vdi ~vm = + u "DATA.MIRROR.get_nbd_server" end end diff --git a/ocaml/xapi-storage-cli/main.ml b/ocaml/xapi-storage-cli/main.ml index 9355cd5b4c3..c64a4f6fcd9 100644 --- a/ocaml/xapi-storage-cli/main.ml +++ b/ocaml/xapi-storage-cli/main.ml @@ -311,6 +311,10 @@ let on_vdi' f common_opts sr vdi = ) common_opts sr vdi +let mirror_vm = Vm.of_string "SXM_mirror" + +let copy_vm = Vm.of_string "SXM_copy" + let mirror_start common_opts sr vdi dp url dest verify_dest = on_vdi' (fun sr vdi -> @@ -319,7 +323,7 @@ let mirror_start common_opts sr vdi dp url dest verify_dest = let url = get_opt url "Need a URL" in let dest = get_opt dest "Need a destination SR" in let task = - Client.DATA.MIRROR.start dbg sr vdi dp url + Client.DATA.MIRROR.start dbg sr vdi dp mirror_vm copy_vm url (Storage_interface.Sr.of_string dest) verify_dest in diff --git a/ocaml/xapi-storage-script/main.ml b/ocaml/xapi-storage-script/main.ml index e29f4937ab5..48dd360012b 100644 --- a/ocaml/xapi-storage-script/main.ml +++ b/ocaml/xapi-storage-script/main.ml @@ -352,10 +352,16 @@ end let _nonpersistent = "NONPERSISTENT" +let _vdi_mirror_in = "VDI_MIRROR_IN" + let _clone_on_boot_key = "clone-on-boot" let _vdi_type_key = "vdi-type" +let _vdi_content_id_key = "content_id" + +let _sm_config_prefix_key = "_sm_config_" + let _snapshot_time_key = "snapshot_time" let _is_a_snapshot_key = "is_a_snapshot" @@ -749,7 +755,7 @@ let vdi_of_volume x = { vdi= Vdi.of_string x.Xapi_storage.Control.key ; uuid= x.Xapi_storage.Control.uuid - ; content_id= "" + ; content_id= find_string _vdi_content_id_key ~default:"" ; name_label= x.Xapi_storage.Control.name ; name_description= x.Xapi_storage.Control.description ; ty= find_string _vdi_type_key ~default:"" @@ -761,8 +767,26 @@ let vdi_of_volume x = ; read_only= not x.Xapi_storage.Control.read_write ; cbt_enabled= Option.value x.Xapi_storage.Control.cbt_enabled ~default:false ; virtual_size= x.Xapi_storage.Control.virtual_size - ; physical_utilisation= x.Xapi_storage.Control.physical_utilisation - ; sm_config= [] + ; physical_utilisation= + x.Xapi_storage.Control.physical_utilisation + (* All the sm_config stored is of the form (k, v) where k will be prefixed by + "_sm_config_". For example if the sm_config passed was ("base_mirror", 3), + then it will be stored as ()"_sm_config_base_mirror", mirror_id). The + code below removed this prefix and extracts the actual key. *) + ; sm_config= + List.filter_map + (fun (k, v) -> + if String.starts_with ~prefix:_sm_config_prefix_key k then + Some + ( String.sub k + (String.length _sm_config_prefix_key) + (String.length k - String.length _sm_config_prefix_key) + , v + ) + else + None + ) + x.Xapi_storage.Control.keys ; sharable= x.Xapi_storage.Control.sharable ; persistent= true } @@ -1479,6 +1503,10 @@ let bind ~volume_script_dir = |> wrap in S.VDI.attach3 vdi_attach3_impl ; + let dp_attach_info_impl dbg sr vdi dp vm = + vdi_attach3_impl dbg dp sr vdi vm () + in + S.DP.attach_info dp_attach_info_impl ; let vdi_activate_common dbg dp sr vdi' vm' readonly = (let vdi = Storage_interface.Vdi.string_of vdi' in let domain = domain_of ~dp ~vm' in @@ -1732,27 +1760,126 @@ let bind ~volume_script_dir = set ~dbg ~sr ~vdi ~key:_vdi_type_key ~value:"cbt_metadata" in S.VDI.data_destroy vdi_data_destroy_impl ; + let vdi_compose_impl dbg sr parent child = + wrap + @@ + let* sr = Attached_SRs.find sr in + let child = Storage_interface.Vdi.string_of child in + let parent = Storage_interface.Vdi.string_of parent in + return_volume_rpc (fun () -> + Volume_client.compose (volume_rpc ~dbg) dbg sr child parent + ) + in + S.VDI.compose vdi_compose_impl ; + let vdi_set_content_id_impl dbg sr vdi content_id = + wrap + @@ + let* sr = Attached_SRs.find sr in + let vdi = Storage_interface.Vdi.string_of vdi in + let* () = set ~dbg ~sr ~vdi ~key:_vdi_content_id_key ~value:content_id in + return () + in + S.VDI.set_content_id vdi_set_content_id_impl ; + let vdi_add_to_sm_config_impl dbg sr vdi key value = + wrap + @@ + let* sr = Attached_SRs.find sr in + let vdi = Storage_interface.Vdi.string_of vdi in + let* () = set ~dbg ~sr ~vdi ~key:(_sm_config_prefix_key ^ key) ~value in + return () + in + S.VDI.add_to_sm_config vdi_add_to_sm_config_impl ; + let vdi_remove_from_sm_config_impl dbg sr vdi key = + wrap + @@ + let* sr = Attached_SRs.find sr in + let vdi = Storage_interface.Vdi.string_of vdi in + let* () = unset ~dbg ~sr ~vdi ~key:(_sm_config_prefix_key ^ key) in + return () + in + S.VDI.remove_from_sm_config vdi_remove_from_sm_config_impl ; + let data_import_activate_impl dbg _dp sr vdi' vm' = + wrap + @@ + let vdi = Storage_interface.Vdi.string_of vdi' in + let domain = Storage_interface.Vm.string_of vm' in + Attached_SRs.find sr >>>= fun sr -> + (* Discover the URIs using Volume.stat *) + stat ~dbg ~sr ~vdi >>>= fun response -> + ( match + List.assoc_opt _clone_on_boot_key response.Xapi_storage.Control.keys + with + | None -> + return response + | Some temporary -> + stat ~dbg ~sr ~vdi:temporary + ) + >>>= fun response -> + choose_datapath domain response >>>= fun (rpc, datapath, uri, domain) -> + if Datapath_plugins.supports_feature datapath _vdi_mirror_in then + return_data_rpc (fun () -> + Datapath_client.import_activate (rpc ~dbg) dbg uri domain + ) + else + fail (Storage_interface.Errors.Unimplemented _vdi_mirror_in) + in + S.DATA.MIRROR.import_activate data_import_activate_impl ; + let get_nbd_server_impl dbg _dp sr vdi' vm' = + wrap + @@ + let vdi = Storage_interface.Vdi.string_of vdi' in + let domain = Storage_interface.Vm.string_of vm' in + vdi_attach_common dbg sr vdi domain >>>= function + | response -> ( + let convert_implementation = function + | Xapi_storage.Data.XenDisk {params; extra; backend_type} -> + Storage_interface.XenDisk {params; extra; backend_type} + | BlockDevice {path} -> + BlockDevice {path} + | File {path} -> + File {path} + | Nbd {uri} -> + Nbd {uri} + in + let _, blockdevices, _, nbds = + Storage_interface.implementations_of_backend + { + Storage_interface.implementations= + List.map convert_implementation + response.Xapi_storage.Data.implementations + } + in + match (blockdevices, nbds) with + | _, ({uri} as nbd) :: _ -> + info (fun m -> + m "%s qemu-dp nbd server address is %s" __FUNCTION__ uri + ) + >>= fun () -> + let socket, _export = Storage_interface.parse_nbd_uri nbd in + return socket + | _ -> + fail (backend_error "No nbd server found" []) + ) + in + S.DATA.MIRROR.get_nbd_server get_nbd_server_impl ; let u name _ = failwith ("Unimplemented: " ^ name) in S.get_by_name (u "get_by_name") ; - S.VDI.compose (u "VDI.compose") ; S.VDI.get_by_name (u "VDI.get_by_name") ; S.DATA.MIRROR.receive_start (u "DATA.MIRROR.receive_start") ; + S.DATA.MIRROR.receive_start2 (u "DATA.MIRROR.receive_start2") ; S.UPDATES.get (u "UPDATES.get") ; S.SR.update_snapshot_info_dest (u "SR.update_snapshot_info_dest") ; S.DATA.MIRROR.list (u "DATA.MIRROR.list") ; S.TASK.stat (u "TASK.stat") ; - S.VDI.remove_from_sm_config (u "VDI.remove_from_sm_config") ; S.DP.diagnostics (u "DP.diagnostics") ; S.TASK.destroy (u "TASK.destroy") ; S.DP.destroy (u "DP.destroy") ; - S.VDI.add_to_sm_config (u "VDI.add_to_sm_config") ; S.VDI.similar_content (u "VDI.similar_content") ; S.DATA.copy (u "DATA.copy") ; S.DP.stat_vdi (u "DP.stat_vdi") ; S.DATA.MIRROR.receive_finalize (u "DATA.MIRROR.receive_finalize") ; + S.DATA.MIRROR.receive_finalize2 (u "DATA.MIRROR.receive_finalize2") ; S.DP.create (u "DP.create") ; - S.VDI.set_content_id (u "VDI.set_content_id") ; - S.DP.attach_info (u "DP.attach_info") ; S.TASK.cancel (u "TASK.cancel") ; S.VDI.attach (u "VDI.attach") ; S.VDI.attach2 (u "VDI.attach2") ; diff --git a/ocaml/xapi/dune b/ocaml/xapi/dune index d2e2fb17de8..6b85aa17109 100644 --- a/ocaml/xapi/dune +++ b/ocaml/xapi/dune @@ -159,6 +159,7 @@ uri uuid uuidm + vhd_lib x509 xapi_aux xapi-backtrace diff --git a/ocaml/xapi/sm.ml b/ocaml/xapi/sm.ml index 40e9b11e3e2..1d198cf3f98 100644 --- a/ocaml/xapi/sm.ml +++ b/ocaml/xapi/sm.ml @@ -117,7 +117,7 @@ let sr_detach ~dbg dconf driver sr = let sr_probe ~dbg dconf driver sr_sm_config = with_dbg ~dbg ~name:"sr_probe" @@ fun di -> let dbg = Debug_info.to_string di in - if List.mem_assoc Sr_probe (features_of_driver driver) then + if Feature.(has_capability Sr_probe (features_of_driver driver)) then Locking_helpers.Named_mutex.execute serialize_attach_detach (fun () -> debug "sr_probe" driver (sprintf "sm_config=[%s]" diff --git a/ocaml/xapi/sm_exec.ml b/ocaml/xapi/sm_exec.ml index 28cdd11e07b..a0ab8d09e6b 100644 --- a/ocaml/xapi/sm_exec.ml +++ b/ocaml/xapi/sm_exec.ml @@ -551,8 +551,8 @@ let parse_sr_get_driver_info driver (xml : Xml.xml) = let strings = XMLRPC.From.array XMLRPC.From.string (safe_assoc "capabilities" info) in - let features = Smint.parse_capability_int64_features strings in - let text_features = List.map Smint.string_of_feature features in + let features = Smint.Feature.parse_capability_int64 strings in + let text_features = List.map Smint.Feature.to_string features in (* Parse the driver options *) let configuration = List.map diff --git a/ocaml/xapi/smint.ml b/ocaml/xapi/smint.ml index 8797e0d7cf6..a3c3a9f0454 100644 --- a/ocaml/xapi/smint.ml +++ b/ocaml/xapi/smint.ml @@ -21,99 +21,100 @@ open D type vdi_info = {vdi_info_uuid: string option; vdi_info_location: string} -(** Very primitive first attempt at a set of backend features *) -type capability = - | Sr_create - | Sr_delete - | Sr_attach - | Sr_detach - | Sr_scan - | Sr_probe - | Sr_update - | Sr_supports_local_caching - | Sr_stats - | Sr_metadata - | Sr_trim - | Sr_multipath - | Vdi_create - | Vdi_delete - | Vdi_attach - | Vdi_detach - | Vdi_mirror - | Vdi_clone - | Vdi_snapshot - | Vdi_resize - | Vdi_activate - | Vdi_activate_readonly - | Vdi_deactivate - | Vdi_update - | Vdi_introduce - | Vdi_resize_online - | Vdi_generate_config - | Vdi_attach_offline - | Vdi_reset_on_boot - | Vdi_configure_cbt - | Large_vdi (** Supports >2TB VDIs *) - | Thin_provisioning - | Vdi_read_caching - -type feature = capability * int64 - -let string_to_capability_table = - [ - ("SR_CREATE", Sr_create) - ; ("SR_DELETE", Sr_delete) - ; ("SR_ATTACH", Sr_attach) - ; ("SR_DETACH", Sr_detach) - ; ("SR_SCAN", Sr_scan) - ; ("SR_PROBE", Sr_probe) - ; ("SR_UPDATE", Sr_update) - ; ("SR_SUPPORTS_LOCAL_CACHING", Sr_supports_local_caching) - ; ("SR_METADATA", Sr_metadata) - ; ("SR_TRIM", Sr_trim) - ; ("SR_MULTIPATH", Sr_multipath) - ; ("SR_STATS", Sr_stats) - ; ("VDI_CREATE", Vdi_create) - ; ("VDI_DELETE", Vdi_delete) - ; ("VDI_ATTACH", Vdi_attach) - ; ("VDI_DETACH", Vdi_detach) - ; ("VDI_MIRROR", Vdi_mirror) - ; ("VDI_RESIZE", Vdi_resize) - ; ("VDI_RESIZE_ONLINE", Vdi_resize_online) - ; ("VDI_CLONE", Vdi_clone) - ; ("VDI_SNAPSHOT", Vdi_snapshot) - ; ("VDI_ACTIVATE", Vdi_activate) - ; ("VDI_ACTIVATE_READONLY", Vdi_activate_readonly) - ; ("VDI_DEACTIVATE", Vdi_deactivate) - ; ("VDI_UPDATE", Vdi_update) - ; ("VDI_INTRODUCE", Vdi_introduce) - ; ("VDI_GENERATE_CONFIG", Vdi_generate_config) - ; ("VDI_ATTACH_OFFLINE", Vdi_attach_offline) - ; ("VDI_RESET_ON_BOOT", Vdi_reset_on_boot) - ; ("VDI_CONFIG_CBT", Vdi_configure_cbt) - ; ("LARGE_VDI", Large_vdi) - ; ("THIN_PROVISIONING", Thin_provisioning) - ; ("VDI_READ_CACHING", Vdi_read_caching) - ] - -let capability_to_string_table = - List.map (fun (k, v) -> (v, k)) string_to_capability_table - -let string_of_capability c = List.assoc c capability_to_string_table - -let string_of_feature (c, v) = - Printf.sprintf "%s/%Ld" (string_of_capability c) v - -let has_capability (c : capability) fl = List.mem_assoc c fl - -let capability_of_feature : feature -> capability = fst - -let known_features = List.map fst string_to_capability_table - -let unparse_feature (f, v) = f ^ "/" ^ Int64.to_string v - -let parse_string_int64_features features = - let scan feature = +module Feature = struct + (** Very primitive first attempt at a set of backend features *) + type capability = + | Sr_create + | Sr_delete + | Sr_attach + | Sr_detach + | Sr_scan + | Sr_probe + | Sr_update + | Sr_supports_local_caching + | Sr_stats + | Sr_metadata + | Sr_trim + | Sr_multipath + | Vdi_create + | Vdi_delete + | Vdi_attach + | Vdi_detach + | Vdi_mirror + | Vdi_mirror_in + | Vdi_clone + | Vdi_snapshot + | Vdi_resize + | Vdi_activate + | Vdi_activate_readonly + | Vdi_deactivate + | Vdi_update + | Vdi_introduce + | Vdi_resize_online + | Vdi_generate_config + | Vdi_attach_offline + | Vdi_reset_on_boot + | Vdi_configure_cbt + | Vdi_compose + | Large_vdi (** Supports >2TB VDIs *) + | Thin_provisioning + | Vdi_read_caching + + type t = capability * int64 + + let string_to_capability_table = + [ + ("SR_CREATE", Sr_create) + ; ("SR_DELETE", Sr_delete) + ; ("SR_ATTACH", Sr_attach) + ; ("SR_DETACH", Sr_detach) + ; ("SR_SCAN", Sr_scan) + ; ("SR_PROBE", Sr_probe) + ; ("SR_UPDATE", Sr_update) + ; ("SR_SUPPORTS_LOCAL_CACHING", Sr_supports_local_caching) + ; ("SR_METADATA", Sr_metadata) + ; ("SR_TRIM", Sr_trim) + ; ("SR_MULTIPATH", Sr_multipath) + ; ("SR_STATS", Sr_stats) + ; ("VDI_CREATE", Vdi_create) + ; ("VDI_DELETE", Vdi_delete) + ; ("VDI_ATTACH", Vdi_attach) + ; ("VDI_DETACH", Vdi_detach) + ; ("VDI_MIRROR", Vdi_mirror) + ; ("VDI_MIRROR_IN", Vdi_mirror_in) + ; ("VDI_RESIZE", Vdi_resize) + ; ("VDI_RESIZE_ONLINE", Vdi_resize_online) + ; ("VDI_CLONE", Vdi_clone) + ; ("VDI_SNAPSHOT", Vdi_snapshot) + ; ("VDI_ACTIVATE", Vdi_activate) + ; ("VDI_ACTIVATE_READONLY", Vdi_activate_readonly) + ; ("VDI_DEACTIVATE", Vdi_deactivate) + ; ("VDI_UPDATE", Vdi_update) + ; ("VDI_INTRODUCE", Vdi_introduce) + ; ("VDI_GENERATE_CONFIG", Vdi_generate_config) + ; ("VDI_ATTACH_OFFLINE", Vdi_attach_offline) + ; ("VDI_RESET_ON_BOOT", Vdi_reset_on_boot) + ; ("VDI_CONFIG_CBT", Vdi_configure_cbt) + ; ("VDI_COMPOSE", Vdi_compose) + ; ("LARGE_VDI", Large_vdi) + ; ("THIN_PROVISIONING", Thin_provisioning) + ; ("VDI_READ_CACHING", Vdi_read_caching) + ] + + let capability_to_string_table = + List.map (fun (k, v) -> (v, k)) string_to_capability_table + + let known_features = List.map fst string_to_capability_table + + let capability_to_string c = List.assoc c capability_to_string_table + + let to_string (c, v) = Printf.sprintf "%s/%Ld" (capability_to_string c) v + + let capability_of : t -> capability = fst + + let unparse (f, v) = f ^ "/" ^ Int64.to_string v + + let string_int64_of_string_opt feature = match String.split_on_char '/' feature with | [] -> None @@ -131,30 +132,57 @@ let parse_string_int64_features features = | feature :: _ -> error "SM.feature: unknown feature %s" feature ; None - in - features - |> List.filter_map scan - |> List.sort_uniq (fun (x, _) (y, _) -> compare x y) -(** [compat_features features1 features2] finds the compatible features in the input -features lists. We assume features backwards compatible, i.e. if there are FOO/1 and + (** [compat_features features1 features2] finds the compatible features in the input + features lists. We assume features backwards compatible, i.e. if there are FOO/1 and FOO/2 are present, then we assume they can both do FOO/1*) -let compat_features features1 features2 = - let features2 = List.to_seq features2 |> Hashtbl.of_seq in - List.filter_map - (fun (f1, v1) -> - match Hashtbl.find_opt features2 f1 with - | Some v2 -> - Some (f1, Int64.min v1 v2) - | None -> - None - ) - features1 - -let parse_capability_int64_features strings = - List.map - (function c, v -> (List.assoc c string_to_capability_table, v)) - (parse_string_int64_features strings) + let compat_features features1 features2 = + let features2 = List.to_seq features2 |> Hashtbl.of_seq in + List.filter_map + (fun (f1, v1) -> + match Hashtbl.find_opt features2 f1 with + | Some v2 -> + Some (f1, Int64.min v1 v2) + | None -> + None + ) + features1 + + let of_string_int64_opt (c, v) = + List.assoc_opt c string_to_capability_table |> Option.map (fun c -> (c, v)) + + (** [has_capability c fl] will test weather the required capability [c] is present + in the feature list [fl]. Callers should use this function to test if a feature + is available rather than directly using membership functions on a feature list + as this function might have special logic for some features. *) + let has_capability (c : capability) (fl : t list) = + List.exists + (fun (c', _v) -> + match (c, c') with + | Vdi_mirror_in, Vdi_mirror -> + true + | c, c' when c = c' -> + true + | _ -> + false + ) + fl + + (** [parse_string_int64 features] takes a [features] list in its plain string + forms such as "VDI_MIRROR/2" and parses them into the form of (VDI_MIRROR, 2). + If the number is malformated, default to (VDI_MIRROR, 1). It will also deduplicate + based on the capability ONLY, and randomly choose a verion, based on the order + it appears in the input list. + *) + let parse_string_int64 features = + List.filter_map string_int64_of_string_opt features + |> List.sort_uniq (fun (x, _) (y, _) -> compare x y) + + (** [parse_capability_int64 features] is similar to [parse_string_int64_features features] + but parses the input list into a [t list] *) + let parse_capability_int64 features = + parse_string_int64 features |> List.filter_map of_string_int64_opt +end type sr_driver_info = { sr_driver_filename: string @@ -164,7 +192,7 @@ type sr_driver_info = { ; sr_driver_copyright: string ; sr_driver_version: string ; sr_driver_required_api_version: string - ; sr_driver_features: feature list + ; sr_driver_features: Feature.t list ; sr_driver_text_features: string list ; sr_driver_configuration: (string * string) list ; sr_driver_required_cluster_stack: string list diff --git a/ocaml/xapi/sparse_dd_wrapper.ml b/ocaml/xapi/sparse_dd_wrapper.ml index c2a0f2112a7..0195fc38884 100644 --- a/ocaml/xapi/sparse_dd_wrapper.ml +++ b/ocaml/xapi/sparse_dd_wrapper.ml @@ -72,7 +72,8 @@ end exception Cancelled (** Use the new external sparse_dd program *) -let dd_internal progress_cb base prezeroed verify_cert infile outfile size = +let dd_internal progress_cb base prezeroed verify_cert ?(proto = None) infile + outfile size = let pipe_read, pipe_write = Unix.pipe () in let to_close = ref [pipe_read; pipe_write] in let close x = @@ -87,6 +88,15 @@ let dd_internal progress_cb base prezeroed verify_cert infile outfile size = match Forkhelpers.with_logfile_fd "sparse_dd" (fun log_fd -> let sparse_dd_path = !Xapi_globs.sparse_dd in + let proto_args = + match proto with + | None -> + [] + | Some (StreamCommon.Nbd export) -> + ["-dest-proto"; "nbd"; "-nbd-export"; export] + | Some p -> + ["-dest-proto"; StreamCommon.string_of_protocol p] + in let verify_args = match verify_cert with | None -> @@ -119,6 +129,7 @@ let dd_internal progress_cb base prezeroed verify_cert infile outfile size = ; (if prezeroed then ["-prezeroed"] else []) ; (match base with None -> [] | Some x -> ["-base"; x]) ; verify_args + ; proto_args ] in debug "%s %s" sparse_dd_path (String.concat " " args) ; @@ -164,7 +175,7 @@ let dd_internal progress_cb base prezeroed verify_cert infile outfile size = | Forkhelpers.Success _ -> progress_cb (Finished None) | Forkhelpers.Failure (log, End_of_file) -> - error "Error while trying to read progress from sparse_dd" ; + error "Error while trying to read progress from sparse_dd: %s" log ; raise (Api_errors.Server_error (Api_errors.vdi_copy_failed, [log])) | Forkhelpers.Failure (log, exn) -> error "Failure from sparse_dd: %s raising %s" log @@ -179,13 +190,13 @@ let dd_internal progress_cb base prezeroed verify_cert infile outfile size = ) (fun () -> close pipe_read ; close pipe_write) -let dd ?(progress_cb = fun _ -> ()) ?base ~verify_cert prezeroed = +let dd ?(progress_cb = fun _ -> ()) ?base ~verify_cert ?proto prezeroed = dd_internal (function Continuing x -> progress_cb x | _ -> ()) - base prezeroed verify_cert + base prezeroed ~proto verify_cert -let start ?(progress_cb = fun _ -> ()) ?base ~verify_cert prezeroed infile - outfile size = +let start ?(progress_cb = fun _ -> ()) ?base ~verify_cert ?(proto = None) + prezeroed infile outfile size = let m = Mutex.create () in let c = Condition.create () in let pid = ref None in @@ -206,8 +217,8 @@ let start ?(progress_cb = fun _ -> ()) ?base ~verify_cert prezeroed infile let _ = Thread.create (fun () -> - dd_internal thread_progress_cb base prezeroed verify_cert infile outfile - size + dd_internal thread_progress_cb base prezeroed verify_cert ~proto infile + outfile size ) () in diff --git a/ocaml/xapi/storage_access.ml b/ocaml/xapi/storage_access.ml index c92651bc576..4568863f8ca 100644 --- a/ocaml/xapi/storage_access.ml +++ b/ocaml/xapi/storage_access.ml @@ -21,6 +21,7 @@ let finally = Xapi_stdext_pervasives.Pervasiveext.finally module XenAPI = Client.Client open Storage_interface +open Storage_utils module D = Debug.Make (struct let name = "storage_access" end) @@ -30,50 +31,6 @@ let s_of_vdi = Vdi.string_of let s_of_sr = Sr.string_of -let transform_storage_exn f = - let get_sr_ref sr_uuid = - Server_helpers.exec_with_new_task "transform_storage_exn" (fun __context -> - Db.SR.get_by_uuid ~__context ~uuid:sr_uuid - ) - in - try f () with - | Storage_error (Backend_error (code, params)) as e -> - Backtrace.reraise e (Api_errors.Server_error (code, params)) - | Storage_error (Backend_error_with_backtrace (code, backtrace :: params)) as - e -> - let backtrace = Backtrace.Interop.of_json "SM" backtrace in - Backtrace.add e backtrace ; - Backtrace.reraise e (Api_errors.Server_error (code, params)) - | Storage_error (Sr_unhealthy (sr, health)) as e -> - let advice = - match health with - | Unavailable -> - "try reboot" - | Unreachable -> - "try again later" - | _health -> - "" - in - let sr = get_sr_ref sr in - Backtrace.reraise e - (Api_errors.Server_error - ( Api_errors.sr_unhealthy - , [Ref.string_of sr; Storage_interface.show_sr_health health; advice] - ) - ) - | Api_errors.Server_error _ as e -> - raise e - | Storage_error (No_storage_plugin_for_sr sr) as e -> - let sr = get_sr_ref sr in - Backtrace.reraise e - (Api_errors.Server_error (Api_errors.sr_not_attached, [Ref.string_of sr]) - ) - | e -> - Backtrace.reraise e - (Api_errors.Server_error - (Api_errors.internal_error, [Printexc.to_string e]) - ) - (* Start a set of servers for all SMAPIv1 plugins *) let start_smapiv1_servers () = let drivers = Sm.supported_drivers () in diff --git a/ocaml/xapi/storage_access.mli b/ocaml/xapi/storage_access.mli index 28cf3108dee..b781e9e9f26 100644 --- a/ocaml/xapi/storage_access.mli +++ b/ocaml/xapi/storage_access.mli @@ -61,9 +61,6 @@ val reset : __context:Context.t -> vm:API.ref_VM -> unit (** [reset __context vm] declares that [vm] has reset and if it's a driver domain, we expect it to lose all state. *) -val transform_storage_exn : (unit -> 'a) -> 'a -(** [transform_storage_exn f] runs [f], rethrowing any storage error as a nice XenAPI error *) - val attach_and_activate : __context:Context.t -> vbd:API.ref_VBD diff --git a/ocaml/xapi/storage_migrate.ml b/ocaml/xapi/storage_migrate.ml index 3279846a5a5..2686208d733 100644 --- a/ocaml/xapi/storage_migrate.ml +++ b/ocaml/xapi/storage_migrate.ml @@ -38,6 +38,7 @@ module State = struct ; leaf_dp: dp ; parent_vdi: Vdi.t ; remote_vdi: Vdi.t + ; mirror_vm: Vm.t } [@@deriving rpcty] @@ -362,13 +363,11 @@ let tapdisk_of_attach_info (backend : Storage_interface.backend) = (Storage_interface.(rpc_of backend) backend |> Rpc.to_string) ; None -let vm_of_s = Storage_interface.Vm.of_string - -let with_activated_disk ~dbg ~sr ~vdi ~dp f = +let with_activated_disk ~dbg ~sr ~vdi ~dp ~vm f = let attached_vdi = Option.map (fun vdi -> - let backend = Local.VDI.attach3 dbg dp sr vdi (vm_of_s "0") false in + let backend = Local.VDI.attach3 dbg dp sr vdi vm false in (vdi, backend) ) vdi @@ -383,10 +382,10 @@ let with_activated_disk ~dbg ~sr ~vdi ~dp f = in match (files, blockdevs, nbds) with | {path} :: _, _, _ | _, {path} :: _, _ -> - Local.VDI.activate3 dbg dp sr vdi (vm_of_s "0") ; + Local.VDI.activate3 dbg dp sr vdi vm ; (path, false) | _, _, nbd :: _ -> - Local.VDI.activate3 dbg dp sr vdi (vm_of_s "0") ; + Local.VDI.activate3 dbg dp sr vdi vm ; let unix_socket_path, export_name = Storage_interface.parse_nbd_uri nbd in @@ -423,14 +422,12 @@ let with_activated_disk ~dbg ~sr ~vdi ~dp f = () ) path_and_nbd ; - Option.iter - (fun vdi -> Local.VDI.deactivate dbg dp sr vdi (vm_of_s "0")) - vdi + Option.iter (fun vdi -> Local.VDI.deactivate dbg dp sr vdi vm) vdi ) ) (fun () -> Option.iter - (fun (vdi, _) -> Local.VDI.detach dbg dp sr vdi (vm_of_s "0")) + (fun (vdi, _) -> Local.VDI.detach dbg dp sr vdi vm) attached_vdi ) @@ -462,7 +459,7 @@ they tend to be executed on the sender side. although there is not a hard rule on what is executed on the sender side, this provides some heuristics. *) module MigrateLocal = struct (** [copy_into_vdi] is similar to [copy_into_sr] but requires a [dest_vdi] parameter *) - let copy_into_vdi ~task ~dbg ~sr ~vdi ~url ~dest ~dest_vdi ~verify_dest = + let copy_into_vdi ~task ~dbg ~sr ~vdi ~vm ~url ~dest ~dest_vdi ~verify_dest = let remote_url = Storage_utils.connection_args_of_uri ~verify_dest url in let module Remote = StorageAPI (Idl.Exn.GenClient (struct let rpc = @@ -532,14 +529,15 @@ module MigrateLocal = struct let dest_vdi_url = let url' = Http.Url.of_string url in Http.Url.set_uri url' - (Printf.sprintf "%s/nbd/%s/%s/%s" (Http.Url.get_uri url') + (Printf.sprintf "%s/nbdproxy/%s/%s/%s/%s" (Http.Url.get_uri url') + (Storage_interface.Vm.string_of vm) (Storage_interface.Sr.string_of dest) (Storage_interface.Vdi.string_of dest_vdi) remote_dp ) |> Http.Url.to_string in - debug "copy remote NBD URL = %s" dest_vdi_url ; + debug "%s copy remote NBD URL = %s" __FUNCTION__ dest_vdi_url ; let id = State.copy_id_of (sr, vdi) in debug "Persisting state for copy (id=%s)" id ; State.add id @@ -562,11 +560,24 @@ module MigrateLocal = struct finally (fun () -> debug "activating RW datapath %s on remote" remote_dp ; - ignore (Remote.VDI.attach2 dbg remote_dp dest dest_vdi true) ; - Remote.VDI.activate dbg remote_dp dest dest_vdi ; - with_activated_disk ~dbg ~sr ~vdi:base_vdi ~dp:base_dp + let backend = + Remote.VDI.attach3 dbg remote_dp dest dest_vdi vm true + in + let _, _, _, nbds = + Storage_interface.implementations_of_backend backend + in + let proto = + match nbds with + | [] -> + None + | uri :: _ -> + let _socket, export = Storage_interface.parse_nbd_uri uri in + Some (StreamCommon.Nbd export) + in + Remote.VDI.activate3 dbg remote_dp dest dest_vdi vm ; + with_activated_disk ~dbg ~sr ~vdi:base_vdi ~dp:base_dp ~vm (fun base_path -> - with_activated_disk ~dbg ~sr ~vdi:(Some vdi) ~dp:leaf_dp + with_activated_disk ~dbg ~sr ~vdi:(Some vdi) ~dp:leaf_dp ~vm (fun src -> let verify_cert = if verify_dest then Stunnel_client.pool () else None @@ -574,7 +585,7 @@ module MigrateLocal = struct let dd = Sparse_dd_wrapper.start ~progress_cb:(progress_callback 0.05 0.9 task) - ~verify_cert ?base:base_path true (Option.get src) + ~verify_cert ~proto ?base:base_path true (Option.get src) dest_vdi_url remote_vdi.virtual_size in Storage_task.with_cancel task @@ -608,7 +619,7 @@ module MigrateLocal = struct (** [copy_into_sr] does not requires a dest vdi to be provided, instead, it will find the nearest vdi on the [dest] sr, and if there is no such vdi, it will create one. *) - let copy_into_sr ~task ~dbg ~sr ~vdi ~url ~dest ~verify_dest = + let copy_into_sr ~task ~dbg ~sr ~vdi ~vm ~url ~dest ~verify_dest = debug "copy sr:%s vdi:%s url:%s dest:%s verify_dest:%B" (Storage_interface.Sr.string_of sr) (Storage_interface.Vdi.string_of vdi) @@ -693,8 +704,8 @@ module MigrateLocal = struct Remote.VDI.create dbg dest {local_vdi with sm_config= []} in let remote_copy = - copy_into_vdi ~task ~dbg ~sr ~vdi ~url ~dest ~dest_vdi:remote_base.vdi - ~verify_dest + copy_into_vdi ~task ~dbg ~sr ~vdi ~vm ~url ~dest + ~dest_vdi:remote_base.vdi ~verify_dest |> vdi_info in let snapshot = Remote.VDI.snapshot dbg dest remote_copy in @@ -710,10 +721,17 @@ module MigrateLocal = struct | e -> raise (Storage_error (Internal_error (Printexc.to_string e))) - let start ~task ~dbg ~sr ~vdi ~dp ~url ~dest ~verify_dest = - SXM.info "%s sr:%s vdi:%s url:%s dest:%s verify_dest:%B" __FUNCTION__ + let start ~task ~dbg ~sr ~vdi ~dp ~mirror_vm ~copy_vm ~url ~dest ~verify_dest + = + SXM.info + "%s sr:%s vdi:%s dp: %s mirror_vm: %s copy_vm: %s url:%s dest:%s \ + verify_dest:%B" + __FUNCTION__ (Storage_interface.Sr.string_of sr) (Storage_interface.Vdi.string_of vdi) + dp + (Storage_interface.Vm.string_of mirror_vm) + (Storage_interface.Vm.string_of copy_vm) url (Storage_interface.Sr.string_of dest) verify_dest ; @@ -729,8 +747,6 @@ module MigrateLocal = struct try List.find (fun x -> x.vdi = vdi) vdis with Not_found -> failwith "Local VDI not found" in - let id = State.mirror_id_of (sr, local_vdi.vdi) in - debug "Adding to active local mirrors before sending: id=%s" id ; let mirror_id = State.mirror_id_of (sr, local_vdi.vdi) in debug "%s: Adding to active local mirrors before sending: id=%s" __FUNCTION__ mirror_id ; @@ -769,18 +785,20 @@ module MigrateLocal = struct similar_vdis ) ) ; - let result_ty = - Remote.DATA.MIRROR.receive_start dbg dest local_vdi mirror_id similars + let (Mirror.Vhd_mirror result) = + Remote.DATA.MIRROR.receive_start2 dbg dest local_vdi mirror_id similars + mirror_vm in - let result = match result_ty with Mirror.Vhd_mirror x -> x in (* Enable mirroring on the local machine *) let mirror_dp = result.Mirror.mirror_datapath in let uri = - Printf.sprintf "/services/SM/nbd/%s/%s/%s" + Printf.sprintf "/services/SM/nbd/%s/%s/%s/%s" + (Storage_interface.Vm.string_of mirror_vm) (Storage_interface.Sr.string_of dest) (Storage_interface.Vdi.string_of result.Mirror.mirror_vdi.vdi) mirror_dp in + debug "%s: uri of http request for mirroring is %s" __FUNCTION__ uri ; let dest_url = Http.Url.set_uri remote_url uri in let request = Http.Request.make @@ -790,7 +808,7 @@ module MigrateLocal = struct let verify_cert = if verify_dest then Stunnel_client.pool () else None in let transport = Xmlrpc_client.transport_of_url ~verify_cert dest_url in debug "Searching for data path: %s" dp ; - let attach_info = Local.DP.attach_info "nbd" sr vdi dp in + let attach_info = Local.DP.attach_info dbg sr vdi dp mirror_vm in on_fail := (fun () -> Remote.DATA.MIRROR.receive_cancel dbg mirror_id) :: !on_fail ; let tapdev = @@ -895,23 +913,17 @@ module MigrateLocal = struct (* Copy the snapshot to the remote *) let new_parent = Storage_task.with_subtask task "copy" (fun () -> - copy_into_vdi ~task ~dbg ~sr ~vdi:snapshot.vdi ~url ~dest - ~dest_vdi:result.Mirror.copy_diffs_to ~verify_dest + copy_into_vdi ~task ~dbg ~sr ~vdi:snapshot.vdi ~vm:copy_vm ~url + ~dest ~dest_vdi:result.Mirror.copy_diffs_to ~verify_dest ) |> vdi_info in debug "Local VDI %s = remote VDI %s" (Storage_interface.Vdi.string_of snapshot.vdi) (Storage_interface.Vdi.string_of new_parent.vdi) ; - Remote.VDI.compose dbg dest result.Mirror.copy_diffs_to - result.Mirror.mirror_vdi.vdi ; - Remote.VDI.remove_from_sm_config dbg dest result.Mirror.mirror_vdi.vdi - "base_mirror" ; debug "Local VDI %s now mirrored to remote VDI: %s" (Storage_interface.Vdi.string_of local_vdi.vdi) (Storage_interface.Vdi.string_of result.Mirror.mirror_vdi.vdi) ; - debug "Destroying dummy VDI on remote" ; - Remote.VDI.destroy dbg dest result.Mirror.dummy_vdi ; debug "Destroying snapshot on src" ; Local.VDI.destroy dbg sr snapshot.vdi ; Some (Mirror_id mirror_id) @@ -1110,7 +1122,7 @@ end (** module [MigrateRemote] is similar to [MigrateLocal], but most of these functions tend to be executed on the receiver side. *) module MigrateRemote = struct - let receive_start ~dbg ~sr ~vdi_info ~id ~similar = + let receive_start_common ~dbg ~sr ~vdi_info ~id ~similar ~vm = let on_fail : (unit -> unit) list ref = ref [] in let vdis = Local.SR.scan dbg sr in (* We drop cbt_metadata VDIs that do not have any actual data *) @@ -1121,12 +1133,14 @@ module MigrateRemote = struct let leaf = Local.VDI.create dbg sr vdi_info in info "Created leaf VDI for mirror receive: %s" (string_of_vdi_info leaf) ; on_fail := (fun () -> Local.VDI.destroy dbg sr leaf.vdi) :: !on_fail ; + (* dummy VDI is created so that the leaf VDI becomes a differencing disk, + useful for calling VDI.compose later on *) let dummy = Local.VDI.snapshot dbg sr leaf in on_fail := (fun () -> Local.VDI.destroy dbg sr dummy.vdi) :: !on_fail ; - debug "Created dummy snapshot for mirror receive: %s" + debug "%s Created dummy snapshot for mirror receive: %s" __FUNCTION__ (string_of_vdi_info dummy) ; - let _ = Local.VDI.attach3 dbg leaf_dp sr leaf.vdi (vm_of_s "0") true in - Local.VDI.activate3 dbg leaf_dp sr leaf.vdi (vm_of_s "0") ; + let _ : backend = Local.VDI.attach3 dbg leaf_dp sr leaf.vdi vm true in + Local.VDI.activate3 dbg leaf_dp sr leaf.vdi vm ; let nearest = List.fold_left (fun acc content_id -> @@ -1185,6 +1199,7 @@ module MigrateRemote = struct ; leaf_dp ; parent_vdi= parent.vdi ; remote_vdi= vdi_info.vdi + ; mirror_vm= vm } ) ; let nearest_content_id = Option.map (fun x -> x.content_id) nearest in @@ -1206,12 +1221,38 @@ module MigrateRemote = struct !on_fail ; raise e + let receive_start ~dbg ~sr ~vdi_info ~id ~similar = + receive_start_common ~dbg ~sr ~vdi_info ~id ~similar ~vm:(Vm.of_string "0") + + let receive_start2 ~dbg ~sr ~vdi_info ~id ~similar ~vm = + receive_start_common ~dbg ~sr ~vdi_info ~id ~similar ~vm + let receive_finalize ~dbg ~id = let recv_state = State.find_active_receive_mirror id in let open State.Receive_state in Option.iter (fun r -> Local.DP.destroy dbg r.leaf_dp false) recv_state ; State.remove_receive_mirror id + let receive_finalize2 ~dbg ~id = + let recv_state = State.find_active_receive_mirror id in + let open State.Receive_state in + Option.iter + (fun r -> + SXM.info + "%s Mirror done. Compose on the dest sr %s parent %s and leaf %s" + __FUNCTION__ (Sr.string_of r.sr) + (Vdi.string_of r.parent_vdi) + (Vdi.string_of r.leaf_vdi) ; + Local.DP.destroy2 dbg r.leaf_dp r.sr r.leaf_vdi r.mirror_vm false ; + Local.VDI.compose dbg r.sr r.parent_vdi r.leaf_vdi ; + (* On SMAPIv3, compose would have removed the now invalid dummy vdi, so + there is no need to destroy it anymore, while this is necessary on SMAPIv1 SRs. *) + log_and_ignore_exn (fun () -> Local.VDI.destroy dbg r.sr r.dummy_vdi) ; + Local.VDI.remove_from_sm_config dbg r.sr r.leaf_vdi "base_mirror" + ) + recv_state ; + State.remove_receive_mirror id + let receive_cancel ~dbg ~id = let receive_state = State.find_active_receive_mirror id in let open State.Receive_state in @@ -1300,11 +1341,11 @@ let post_detach_hook ~sr ~vdi ~dp:_ = let t = Thread.create (fun () -> - debug "Calling receive_finalize" ; + debug "Calling receive_finalize2" ; log_and_ignore_exn (fun () -> - Remote.DATA.MIRROR.receive_finalize "Mirror-cleanup" id + Remote.DATA.MIRROR.receive_finalize2 "Mirror-cleanup" id ) ; - debug "Finished calling receive_finalize" ; + debug "Finished calling receive_finalize2" ; State.remove_local_mirror id ; debug "Removed active local mirror: %s" id ) @@ -1315,35 +1356,60 @@ let post_detach_hook ~sr ~vdi ~dp:_ = (Thread.id t) ) -let nbd_handler req s sr vdi dp = - debug "sr=%s vdi=%s dp=%s" sr vdi dp ; +let nbd_handler req s ?(vm = "0") sr vdi dp = + debug "%s: vm=%s sr=%s vdi=%s dp=%s" __FUNCTION__ vm sr vdi dp ; let sr, vdi = Storage_interface.(Sr.of_string sr, Vdi.of_string vdi) in - let attach_info = Local.DP.attach_info "nbd" sr vdi dp in req.Http.Request.close <- true ; - match tapdisk_of_attach_info attach_info with - | Some tapdev -> - let minor = Tapctl.get_minor tapdev in - let pid = Tapctl.get_tapdisk_pid tapdev in - let path = - Printf.sprintf "/var/run/blktap-control/nbdserver%d.%d" pid minor - in - Http_svr.headers s (Http.http_200_ok () @ ["Transfer-encoding: nbd"]) ; - let control_fd = Unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in - finally - (fun () -> - Unix.connect control_fd (Unix.ADDR_UNIX path) ; - let msg = dp in - let len = String.length msg in - let written = Unixext.send_fd_substring control_fd msg 0 len [] s in - if written <> len then ( - error "Failed to transfer fd to %s" path ; - Http_svr.headers s (Http.http_404_missing ~version:"1.0" ()) ; - req.Http.Request.close <- true - ) - ) - (fun () -> Unix.close control_fd) - | None -> - () + let vm = Vm.of_string vm in + let path = + Storage_utils.transform_storage_exn (fun () -> + Local.DATA.MIRROR.import_activate "nbd" dp sr vdi vm + ) + in + Http_svr.headers s (Http.http_200_ok () @ ["Transfer-encoding: nbd"]) ; + let control_fd = Unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in + finally + (fun () -> + Unix.connect control_fd (Unix.ADDR_UNIX path) ; + let msg = dp in + let len = String.length msg in + let written = Unixext.send_fd_substring control_fd msg 0 len [] s in + + if written <> len then ( + error "Failed to transfer fd to %s" path ; + Http_svr.headers s (Http.http_404_missing ~version:"1.0" ()) ; + req.Http.Request.close <- true + ) + ) + (fun () -> Unix.close control_fd) + +(** nbd_proxy is a http handler but will turn the http connection into an nbd connection. +It proxies the connection between the sender and the generic nbd server, as returned +by [get_nbd_server dp sr vdi vm]. *) +let nbd_proxy req s vm sr vdi dp = + debug "%s: vm=%s sr=%s vdi=%s dp=%s" __FUNCTION__ vm sr vdi dp ; + let sr, vdi = Storage_interface.(Sr.of_string sr, Vdi.of_string vdi) in + req.Http.Request.close <- true ; + let vm = Vm.of_string vm in + let path = + Storage_utils.transform_storage_exn (fun () -> + Local.DATA.MIRROR.get_nbd_server "nbd" dp sr vdi vm + ) + in + debug "%s got nbd server path %s" __FUNCTION__ path ; + Http_svr.headers s (Http.http_200_ok () @ ["Transfer-encoding: nbd"]) ; + let control_fd = Unixext.open_connection_unix_fd path in + finally + (fun () -> + let s' = Unix.dup s in + let control_fd' = Unix.dup control_fd in + debug "%s: Connected; running proxy (between fds: %d and %d)" __FUNCTION__ + (Unixext.int_of_file_descr control_fd') + (Unixext.int_of_file_descr s') ; + Unixext.proxy s' control_fd' ; + debug "%s: proxy exited" __FUNCTION__ + ) + (fun () -> Unix.close control_fd) let with_task_and_thread ~dbg f = let task = @@ -1374,16 +1440,16 @@ let with_task_and_thread ~dbg f = this way so that they all stay in one place rather than being spread around the file. *) -let copy ~dbg ~sr ~vdi ~url ~dest ~verify_dest = +let copy ~dbg ~sr ~vdi ~vm ~url ~dest ~verify_dest = with_task_and_thread ~dbg (fun task -> - MigrateLocal.copy_into_sr ~task ~dbg:(Debug_info.to_string dbg) ~sr ~vdi - ~url ~dest ~verify_dest + MigrateLocal.copy_into_sr ~task ~dbg:dbg.Debug_info.log ~sr ~vdi ~vm ~url + ~dest ~verify_dest ) -let start ~dbg ~sr ~vdi ~dp ~url ~dest ~verify_dest = +let start ~dbg ~sr ~vdi ~dp ~mirror_vm ~copy_vm ~url ~dest ~verify_dest = with_task_and_thread ~dbg (fun task -> - MigrateLocal.start ~task ~dbg:(Debug_info.to_string dbg) ~sr ~vdi ~dp ~url - ~dest ~verify_dest + MigrateLocal.start ~task ~dbg:dbg.Debug_info.log ~sr ~vdi ~dp ~mirror_vm + ~copy_vm ~url ~dest ~verify_dest ) (* XXX: PR-1255: copy the xenopsd 'raise Exception' pattern *) @@ -1403,8 +1469,12 @@ let stat = MigrateLocal.stat let receive_start = MigrateRemote.receive_start +let receive_start2 = MigrateRemote.receive_start2 + let receive_finalize = MigrateRemote.receive_finalize +let receive_finalize2 = MigrateRemote.receive_finalize2 + let receive_cancel = MigrateRemote.receive_cancel (* The remote end of this call, SR.update_snapshot_info_dest, is implemented in diff --git a/ocaml/xapi/storage_mux.ml b/ocaml/xapi/storage_mux.ml index dc49d2e75b7..7acba0c8823 100644 --- a/ocaml/xapi/storage_mux.ml +++ b/ocaml/xapi/storage_mux.ml @@ -37,7 +37,7 @@ type plugin = { processor: processor ; backend_domain: string ; query_result: query_result - ; features: Smint.feature list + ; features: Smint.Feature.t list } let plugins : (sr, plugin) Hashtbl.t = Hashtbl.create 10 @@ -53,7 +53,7 @@ let debug_printer rpc call = let register sr rpc d info = with_lock m (fun () -> let features = - Smint.parse_capability_int64_features info.Storage_interface.features + Smint.Feature.parse_capability_int64 info.Storage_interface.features in Hashtbl.replace plugins sr { @@ -88,7 +88,7 @@ let sr_has_capability sr capability = with_lock m (fun () -> match Hashtbl.find_opt plugins sr with | Some x -> - Smint.has_capability capability x.features + Smint.Feature.has_capability capability x.features | None -> false ) @@ -253,7 +253,14 @@ module Mux = struct let diagnostics () = Storage_smapiv1_wrapper.Impl.DP.diagnostics () - let attach_info () = Storage_smapiv1_wrapper.Impl.DP.attach_info () + let attach_info _context ~dbg ~sr ~vdi ~dp ~vm = + with_dbg ~name:"DP.attach_info" ~dbg @@ fun di -> + info "%s dbg:%s sr:%s vdi:%s dp:%s vm:%s" __FUNCTION__ dbg (s_of_sr sr) + (s_of_vdi vdi) dp (s_of_vm vm) ; + let module C = StorageAPI (Idl.Exn.GenClient (struct + let rpc = of_sr sr + end)) in + C.DP.attach_info (Debug_info.to_string di) sr vdi dp vm let stat_vdi () = Storage_smapiv1_wrapper.Impl.DP.stat_vdi () end @@ -648,7 +655,9 @@ module Mux = struct | None -> failwith "DP not found" in - if (not read_write) && sr_has_capability sr Smint.Vdi_activate_readonly + if + (not read_write) + && sr_has_capability sr Smint.Feature.Vdi_activate_readonly then ( info "The VDI was attached read-only: calling activate_readonly" ; C.VDI.activate_readonly (Debug_info.to_string di) dp sr vdi vm @@ -826,33 +835,85 @@ module Mux = struct with_dbg ~name:"DATA.copy" ~dbg @@ fun dbg -> Storage_migrate.copy ~dbg module MIRROR = struct - let start () ~dbg = - with_dbg ~name:"DATA.MIRROR.start" ~dbg @@ fun dbg -> - Storage_migrate.start ~dbg - - let stop () ~dbg = - with_dbg ~name:"DATA.MIRROR.stop" ~dbg @@ fun {log= dbg; _} -> - Storage_migrate.stop ~dbg + let start () ~dbg ~sr ~vdi ~dp ~mirror_vm ~copy_vm ~url ~dest ~verify_dest + = + with_dbg ~name:"DATA.MIRROR.start" ~dbg @@ fun di -> + info + "%s dbg:%s sr: %s vdi: %s dp:%s mirror_vm: %s copy_vm: %s url: %s \ + dest sr: %s verify_dest: %B" + __FUNCTION__ dbg (s_of_sr sr) (s_of_vdi vdi) dp (s_of_vm mirror_vm) + (s_of_vm copy_vm) url (s_of_sr dest) verify_dest ; + Storage_migrate.start ~dbg:di ~sr ~vdi ~dp ~mirror_vm ~copy_vm ~url + ~dest ~verify_dest + + let stop () ~dbg ~id = + with_dbg ~name:"DATA.MIRROR.stop" ~dbg @@ fun di -> + info "%s dbg:%s mirror_id: %s" __FUNCTION__ dbg id ; + Storage_migrate.stop ~dbg:di.log ~id let list () ~dbg = - with_dbg ~name:"DATA.MIRROR.list" ~dbg @@ fun {log= dbg; _} -> - Storage_migrate.list ~dbg - - let stat () ~dbg = - with_dbg ~name:"DATA.MIRROR.stat" ~dbg @@ fun {log= dbg; _} -> - Storage_migrate.stat ~dbg - - let receive_start () ~dbg = - with_dbg ~name:"DATA.MIRROR.receive_start" ~dbg @@ fun {log= dbg; _} -> - Storage_migrate.receive_start ~dbg - - let receive_finalize () ~dbg = - with_dbg ~name:"DATA.MIRROR.receive_finalize" ~dbg - @@ fun {log= dbg; _} -> Storage_migrate.receive_finalize ~dbg + with_dbg ~name:"DATA.MIRROR.list" ~dbg @@ fun di -> + info "%s dbg: %s" __FUNCTION__ dbg ; + Storage_migrate.list ~dbg:di.log + + let stat () ~dbg ~id = + with_dbg ~name:"DATA.MIRROR.stat" ~dbg @@ fun di -> + info "%s dbg: %s mirror_id: %s" __FUNCTION__ di.log id ; + Storage_migrate.stat ~dbg:di.log ~id + + let receive_start () ~dbg ~sr ~vdi_info ~id ~similar = + with_dbg ~name:"DATA.MIRROR.receive_start" ~dbg @@ fun di -> + info "%s dbg: %s sr: %s vdi_info: %s mirror_id: %s similar: %s" + __FUNCTION__ dbg (s_of_sr sr) + (string_of_vdi_info vdi_info) + id + (String.concat ";" similar) ; + Storage_migrate.receive_start ~dbg:di.log ~sr ~vdi_info ~id ~similar + + let receive_start2 () ~dbg ~sr ~vdi_info ~id ~similar ~vm = + with_dbg ~name:"DATA.MIRROR.receive_start2" ~dbg @@ fun di -> + info "%s dbg: %s sr: %s vdi_info: %s mirror_id: %s similar: %s vm: %s" + __FUNCTION__ dbg (s_of_sr sr) + (string_of_vdi_info vdi_info) + id + (String.concat ";" similar) + (s_of_vm vm) ; + info "%s dbg:%s" __FUNCTION__ dbg ; + Storage_migrate.receive_start2 ~dbg:di.log ~sr ~vdi_info ~id ~similar + ~vm + + let receive_finalize () ~dbg ~id = + with_dbg ~name:"DATA.MIRROR.receive_finalize" ~dbg @@ fun di -> + info "%s dbg: %s mirror_id: %s" __FUNCTION__ dbg id ; + Storage_migrate.receive_finalize ~dbg:di.log ~id + + let receive_finalize2 () ~dbg ~id = + with_dbg ~name:"DATA.MIRROR.receive_finalize2" ~dbg @@ fun di -> + info "%s dbg: %s mirror_id: %s" __FUNCTION__ dbg id ; + Storage_migrate.receive_finalize2 ~dbg:di.log ~id + + let receive_cancel () ~dbg ~id = + with_dbg ~name:"DATA.MIRROR.receive_cancel" ~dbg @@ fun di -> + info "%s dbg: %s mirror_id: %s" __FUNCTION__ dbg id ; + Storage_migrate.receive_cancel ~dbg:di.log ~id + + let import_activate () ~dbg ~dp ~sr ~vdi ~vm = + with_dbg ~name:"DATA.MIRROR.import_activate" ~dbg @@ fun di -> + info "%s dbg:%s dp:%s sr:%s vdi:%s vm:%s" __FUNCTION__ dbg dp + (s_of_sr sr) (s_of_vdi vdi) (s_of_vm vm) ; + let module C = StorageAPI (Idl.Exn.GenClient (struct + let rpc = of_sr sr + end)) in + C.DATA.MIRROR.import_activate (Debug_info.to_string di) dp sr vdi vm - let receive_cancel () ~dbg = - with_dbg ~name:"DATA.MIRROR.receive_cancel" ~dbg @@ fun {log= dbg; _} -> - Storage_migrate.receive_cancel ~dbg + let get_nbd_server () ~dbg ~dp ~sr ~vdi ~vm = + with_dbg ~name:"DATA.MIRROR.get_nbd_server" ~dbg @@ fun di -> + info "%s dbg:%s dp:%s sr:%s vdi:%s vm:%s" __FUNCTION__ dbg dp + (s_of_sr sr) (s_of_vdi vdi) (s_of_vm vm) ; + let module C = StorageAPI (Idl.Exn.GenClient (struct + let rpc = of_sr sr + end)) in + C.DATA.MIRROR.get_nbd_server (Debug_info.to_string di) dp sr vdi vm end end diff --git a/ocaml/xapi/storage_smapiv1.ml b/ocaml/xapi/storage_smapiv1.ml index 0bb0dd9d267..87a80d0cadc 100644 --- a/ocaml/xapi/storage_smapiv1.ml +++ b/ocaml/xapi/storage_smapiv1.ml @@ -607,7 +607,10 @@ module SMAPIv1 : Server_impl = struct ~key:"content_id" ) ; (* If the backend doesn't advertise the capability then do nothing *) - if List.mem_assoc Smint.Vdi_activate (Sm.features_of_driver _type) + if + Smint.Feature.( + has_capability Vdi_activate (Sm.features_of_driver _type) + ) then Sm.vdi_activate ~dbg device_config _type sr self read_write else @@ -638,7 +641,10 @@ module SMAPIv1 : Server_impl = struct ~value:Uuidx.(to_string (make ())) ) ; (* If the backend doesn't advertise the capability then do nothing *) - if List.mem_assoc Smint.Vdi_deactivate (Sm.features_of_driver _type) + if + Smint.Feature.( + has_capability Vdi_deactivate (Sm.features_of_driver _type) + ) then Sm.vdi_deactivate ~dbg device_config _type sr self else @@ -1202,12 +1208,12 @@ module SMAPIv1 : Server_impl = struct let get_by_name _context ~dbg:_ ~name:_ = assert false module DATA = struct - let copy _context ~dbg:_ ~sr:_ ~vdi:_ ~url:_ ~dest:_ ~verify_dest:_ = + let copy _context ~dbg:_ ~sr:_ ~vdi:_ ~vm:_ ~url:_ ~dest:_ ~verify_dest:_ = assert false module MIRROR = struct - let start _context ~dbg:_ ~sr:_ ~vdi:_ ~dp:_ ~url:_ ~dest:_ ~verify_dest:_ - = + let start _context ~dbg:_ ~sr:_ ~vdi:_ ~dp:_ ~mirror_vm:_ ~copy_vm:_ + ~url:_ ~dest:_ ~verify_dest:_ = assert false let stop _context ~dbg:_ ~id:_ = assert false @@ -1219,9 +1225,20 @@ module SMAPIv1 : Server_impl = struct let receive_start _context ~dbg:_ ~sr:_ ~vdi_info:_ ~id:_ ~similar:_ = assert false + let receive_start2 _context ~dbg:_ ~sr:_ ~vdi_info:_ ~id:_ ~similar:_ + ~vm:_ = + assert false + let receive_finalize _context ~dbg:_ ~id:_ = assert false + let receive_finalize2 _context ~dbg:_ ~id:_ = assert false + let receive_cancel _context ~dbg:_ ~id:_ = assert false + + let import_activate _context ~dbg:_ ~dp:_ ~sr:_ ~vdi:_ ~vm:_ = + assert false + + let get_nbd_server _context ~dbg:_ ~dp:_ ~sr:_ ~vdi:_ ~vm:_ = assert false end end diff --git a/ocaml/xapi/storage_smapiv1_wrapper.ml b/ocaml/xapi/storage_smapiv1_wrapper.ml index 55067efd9de..fbb30a2177f 100644 --- a/ocaml/xapi/storage_smapiv1_wrapper.ml +++ b/ocaml/xapi/storage_smapiv1_wrapper.ml @@ -932,46 +932,6 @@ functor let dbg = Debug_info.to_string di in Impl.get_by_name context ~dbg ~name - module DATA = struct - let copy context ~dbg ~sr ~vdi ~url ~dest = - info "DATA.copy dbg:%s sr:%s vdi:%s url:%s dest:%s" dbg (s_of_sr sr) - (s_of_vdi vdi) url (s_of_sr dest) ; - Impl.DATA.copy context ~dbg ~sr ~vdi ~url ~dest - - module MIRROR = struct - let start context ~dbg ~sr ~vdi ~dp ~url ~dest = - info "DATA.MIRROR.start dbg:%s sr:%s vdi:%s url:%s dest:%s" dbg - (s_of_sr sr) (s_of_vdi vdi) url (s_of_sr dest) ; - Impl.DATA.MIRROR.start context ~dbg ~sr ~vdi ~dp ~url ~dest - - let stop context ~dbg ~id = - info "DATA.MIRROR.stop dbg:%s id:%s" dbg id ; - Impl.DATA.MIRROR.stop context ~dbg ~id - - let list context ~dbg = - info "DATA.MIRROR.active dbg:%s" dbg ; - Impl.DATA.MIRROR.list context ~dbg - - let stat context ~dbg ~id = - info "DATA.MIRROR.stat dbg:%s id:%s" dbg id ; - Impl.DATA.MIRROR.stat context ~dbg ~id - - let receive_start context ~dbg ~sr ~vdi_info ~id ~similar = - info "DATA.MIRROR.receive_start dbg:%s sr:%s id:%s similar:[%s]" dbg - (s_of_sr sr) id - (String.concat "," similar) ; - Impl.DATA.MIRROR.receive_start context ~dbg ~sr ~vdi_info ~id ~similar - - let receive_finalize context ~dbg ~id = - info "DATA.MIRROR.receive_finalize dbg:%s id:%s" dbg id ; - Impl.DATA.MIRROR.receive_finalize context ~dbg ~id - - let receive_cancel context ~dbg ~id = - info "DATA.MIRROR.receive_cancel dbg:%s id:%s" dbg id ; - Impl.DATA.MIRROR.receive_cancel context ~dbg ~id - end - end - module DP = struct let create _context ~dbg:_ ~id = id @@ -1137,7 +1097,7 @@ functor in String.concat "" (List.map (fun x -> x ^ "\n") lines) - let attach_info _context ~dbg:_ ~sr ~vdi ~dp = + let attach_info _context ~dbg:_ ~sr ~vdi ~dp ~vm:_ = let srs = Host.list !Host.host in let sr_state = List.assoc sr srs in let vdi_state = Hashtbl.find sr_state.Sr.vdis vdi in @@ -1176,6 +1136,105 @@ functor ) end + module DATA = struct + let copy context ~dbg ~sr ~vdi ~vm ~url ~dest = + info "DATA.copy dbg:%s sr:%s vdi:%s url:%s dest:%s" dbg (s_of_sr sr) + (s_of_vdi vdi) url (s_of_sr dest) ; + Impl.DATA.copy context ~dbg ~sr ~vdi ~vm ~url ~dest + + module MIRROR = struct + let start context ~dbg ~sr ~vdi ~dp ~mirror_vm ~copy_vm ~url ~dest = + info "DATA.MIRROR.start dbg:%s sr:%s vdi:%s url:%s dest:%s" dbg + (s_of_sr sr) (s_of_vdi vdi) url (s_of_sr dest) ; + Impl.DATA.MIRROR.start context ~dbg ~sr ~vdi ~dp ~mirror_vm ~copy_vm + ~url ~dest + + let stop context ~dbg ~id = + info "DATA.MIRROR.stop dbg:%s id:%s" dbg id ; + Impl.DATA.MIRROR.stop context ~dbg ~id + + let list context ~dbg = + info "DATA.MIRROR.active dbg:%s" dbg ; + Impl.DATA.MIRROR.list context ~dbg + + let stat context ~dbg ~id = + info "DATA.MIRROR.stat dbg:%s id:%s" dbg id ; + Impl.DATA.MIRROR.stat context ~dbg ~id + + let receive_start context ~dbg ~sr ~vdi_info ~id ~similar = + info "DATA.MIRROR.receive_start dbg:%s sr:%s id:%s similar:[%s]" dbg + (s_of_sr sr) id + (String.concat "," similar) ; + Impl.DATA.MIRROR.receive_start context ~dbg ~sr ~vdi_info ~id ~similar + + let receive_start2 context ~dbg ~sr ~vdi_info ~id ~similar ~vm = + info + "DATA.MIRROR.receive_start2 dbg:%s sr:%s id:%s similar:[%s] vm:%s" + dbg (s_of_sr sr) id + (String.concat "," similar) + (s_of_vm vm) ; + Impl.DATA.MIRROR.receive_start2 context ~dbg ~sr ~vdi_info ~id + ~similar ~vm + + let receive_finalize context ~dbg ~id = + info "DATA.MIRROR.receive_finalize dbg:%s id:%s" dbg id ; + Impl.DATA.MIRROR.receive_finalize context ~dbg ~id + + let receive_finalize2 context ~dbg ~id = + info "DATA.MIRROR.receive_finalize2 dbg:%s id:%s" dbg id ; + Impl.DATA.MIRROR.receive_finalize2 context ~dbg ~id + + let receive_cancel context ~dbg ~id = + info "DATA.MIRROR.receive_cancel dbg:%s id:%s" dbg id ; + Impl.DATA.MIRROR.receive_cancel context ~dbg ~id + + (* tapdisk supports three kind of nbd servers, the old style nbdserver, + the new style nbd server and a real nbd server. The old and new style nbd servers + are "special" nbd servers that accept fds passed via SCM_RIGHTS and handle + connection based on that fd. The real nbd server is a "normal" nbd server + that accepts nbd connections from nbd clients, and it does not support fd + passing. *) + let get_nbd_server_common context ~dbg ~dp ~sr ~vdi ~vm ~style = + info "%s DATA.MIRROR.get_nbd_server dbg:%s dp:%s sr:%s vdi:%s vm:%s" + __FUNCTION__ dbg dp (s_of_sr sr) (s_of_vdi vdi) (s_of_vm vm) ; + let attach_info = + DP.attach_info context ~dbg:"nbd" ~sr ~vdi ~dp ~vm + in + match Storage_migrate.tapdisk_of_attach_info attach_info with + | Some tapdev -> + let minor = Tapctl.get_minor tapdev in + let pid = Tapctl.get_tapdisk_pid tapdev in + let path = + match style with + | `newstyle -> + Printf.sprintf "/var/run/blktap-control/nbdserver-new%d.%d" + pid minor + | `oldstyle -> + Printf.sprintf "/var/run/blktap-control/nbdserver%d.%d" pid + minor + | `real -> + Printf.sprintf "/var/run/blktap-control/nbd%d.%d" pid minor + in + debug "%s nbd server path is %s" __FUNCTION__ path ; + path + | None -> + raise + (Storage_interface.Storage_error + (Backend_error + ( Api_errors.internal_error + , ["No tapdisk attach info found"] + ) + ) + ) + + let import_activate context ~dbg ~dp ~sr ~vdi ~vm = + get_nbd_server_common context ~dbg ~dp ~sr ~vdi ~vm ~style:`oldstyle + + let get_nbd_server context ~dbg ~dp ~sr ~vdi ~vm = + get_nbd_server_common context ~dbg ~dp ~sr ~vdi ~vm ~style:`real + end + end + module SR = struct include Storage_skeleton.SR diff --git a/ocaml/xapi/storage_utils.ml b/ocaml/xapi/storage_utils.ml index 16397af6434..dd7d6b6e63d 100644 --- a/ocaml/xapi/storage_utils.ml +++ b/ocaml/xapi/storage_utils.ml @@ -12,6 +12,8 @@ * GNU Lesser General Public License for more details. *) +open Storage_interface + let string_of_vdi_type vdi_type = Rpc.string_of_rpc (API.rpc_of_vdi_type vdi_type) @@ -127,3 +129,47 @@ let rpc ~srcstr ~dststr {url; pool_secret; verify_cert} = intra_pool_rpc_of_ip ~srcstr ~dststr ~ip in redirectable_rpc ~original ~redirect_to_ip + +let transform_storage_exn f = + let get_sr_ref sr_uuid = + Server_helpers.exec_with_new_task "transform_storage_exn" (fun __context -> + Db.SR.get_by_uuid ~__context ~uuid:sr_uuid + ) + in + try f () with + | Storage_error (Backend_error (code, params)) as e -> + Backtrace.reraise e (Api_errors.Server_error (code, params)) + | Storage_error (Backend_error_with_backtrace (code, backtrace :: params)) as + e -> + let backtrace = Backtrace.Interop.of_json "SM" backtrace in + Backtrace.add e backtrace ; + Backtrace.reraise e (Api_errors.Server_error (code, params)) + | Storage_error (Sr_unhealthy (sr, health)) as e -> + let advice = + match health with + | Unavailable -> + "try reboot" + | Unreachable -> + "try again later" + | _health -> + "" + in + let sr = get_sr_ref sr in + Backtrace.reraise e + (Api_errors.Server_error + ( Api_errors.sr_unhealthy + , [Ref.string_of sr; Storage_interface.show_sr_health health; advice] + ) + ) + | Api_errors.Server_error _ as e -> + raise e + | Storage_error (No_storage_plugin_for_sr sr) as e -> + let sr = get_sr_ref sr in + Backtrace.reraise e + (Api_errors.Server_error (Api_errors.sr_not_attached, [Ref.string_of sr]) + ) + | e -> + Backtrace.reraise e + (Api_errors.Server_error + (Api_errors.internal_error, [Printexc.to_string e]) + ) diff --git a/ocaml/xapi/storage_utils.mli b/ocaml/xapi/storage_utils.mli new file mode 100644 index 00000000000..50e3a80e7f8 --- /dev/null +++ b/ocaml/xapi/storage_utils.mli @@ -0,0 +1,66 @@ +(* Copyright (C) Cloud Software Group Inc. + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published + by the Free Software Foundation; version 2.1 only. with the special + exception on linking described in file LICENSE. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. +*) + +val string_of_vdi_type : + [< `cbt_metadata + | `crashdump + | `ephemeral + | `ha_statefile + | `metadata + | `pvs_cache + | `redo_log + | `rrd + | `suspend + | `system + | `user ] + -> string + +val vdi_type_of_string : + string + -> [> `cbt_metadata + | `crashdump + | `ephemeral + | `ha_statefile + | `metadata + | `pvs_cache + | `redo_log + | `rrd + | `suspend + | `system + | `user ] + +val redirectable_rpc : + redirect_to_ip:(ip:string -> Rpc.call -> Rpc.response) + -> original:(Rpc.call -> Rpc.response) + -> Rpc.call + -> Rpc.response + +type connection_args = { + url: Http.Url.t + ; pool_secret: SecretString.t option + ; verify_cert: Stunnel.verification_config option +} + +val localhost_connection_args : unit -> connection_args + +val intra_pool_connection_args_of_ip : string -> connection_args + +val connection_args_of_uri : verify_dest:bool -> string -> connection_args + +val intra_pool_rpc_of_ip : + srcstr:string -> dststr:string -> ip:string -> Rpc.call -> Rpc.response + +val rpc : + srcstr:string -> dststr:string -> connection_args -> Rpc.call -> Rpc.response + +val transform_storage_exn : (unit -> 'a) -> 'a +(** [transform_storage_exn f] runs [f], rethrowing any storage error as a nice XenAPI error *) diff --git a/ocaml/xapi/xapi_dr_task.ml b/ocaml/xapi/xapi_dr_task.ml index 415a4e45c8f..40a9a992c9e 100644 --- a/ocaml/xapi/xapi_dr_task.ml +++ b/ocaml/xapi/xapi_dr_task.ml @@ -101,7 +101,8 @@ let create ~__context ~_type ~device_config ~whitelist = (* Check if licence allows disaster recovery. *) Pool_features.assert_enabled ~__context ~f:Features.DR ; (* Check that the SR type supports metadata. *) - if not (List.mem_assoc Smint.Sr_metadata (Sm.features_of_driver _type)) then + if not Smint.Feature.(has_capability Sr_metadata (Sm.features_of_driver _type)) + then raise (Api_errors.Server_error ( Api_errors.operation_not_allowed diff --git a/ocaml/xapi/xapi_host.ml b/ocaml/xapi/xapi_host.ml index cd6ae3a7d35..cc3f48719db 100644 --- a/ocaml/xapi/xapi_host.ml +++ b/ocaml/xapi/xapi_host.ml @@ -2232,7 +2232,7 @@ let enable_local_storage_caching ~__context ~host ~sr = let shared = Db.SR.get_shared ~__context ~self:sr in let has_required_capability = let caps = Sm.features_of_driver ty in - List.mem_assoc Smint.Sr_supports_local_caching caps + Smint.Feature.(has_capability Sr_supports_local_caching caps) in debug "shared: %b. List.length pbds: %d. has_required_capability: %b" shared (List.length pbds) has_required_capability ; diff --git a/ocaml/xapi/xapi_pbd.ml b/ocaml/xapi/xapi_pbd.ml index 7ba1fd8642d..a9625dc3c62 100644 --- a/ocaml/xapi/xapi_pbd.ml +++ b/ocaml/xapi/xapi_pbd.ml @@ -188,7 +188,7 @@ let plug ~__context ~self = check_sharing_constraint ~__context ~sr ; let dbg = Ref.string_of (Context.get_task_id __context) in let device_config = Db.PBD.get_device_config ~__context ~self in - Storage_access.transform_storage_exn (fun () -> + Storage_utils.transform_storage_exn (fun () -> C.SR.attach dbg (Storage_interface.Sr.of_string (Db.SR.get_uuid ~__context ~self:sr) @@ -264,7 +264,7 @@ let unplug ~__context ~self = ) ; let dbg = Ref.string_of (Context.get_task_id __context) in let uuid = Db.SR.get_uuid ~__context ~self:sr in - Storage_access.transform_storage_exn (fun () -> + Storage_utils.transform_storage_exn (fun () -> C.SR.detach dbg (Storage_interface.Sr.of_string uuid) ) ; Storage_access.unbind ~__context ~pbd:self ; diff --git a/ocaml/xapi/xapi_pool.ml b/ocaml/xapi/xapi_pool.ml index 2f471932c14..b73d7fbf0dd 100644 --- a/ocaml/xapi/xapi_pool.ml +++ b/ocaml/xapi/xapi_pool.ml @@ -849,7 +849,8 @@ let pre_join_checks ~__context ~rpc ~session_id ~force = let features_compatible coor_features candidate_features = (* The pool features must not be reduced or downgraded, although it is fine the other way around. *) - Smint.compat_features coor_features candidate_features = coor_features + Smint.Feature.compat_features coor_features candidate_features + = coor_features in let pool_sms = Client.SM.get_all_records ~rpc ~session_id in List.iter @@ -3156,8 +3157,10 @@ let enable_local_storage_caching ~__context ~self:_ = (fun (_, _, srrec) -> (not srrec.API.sR_shared) && List.length srrec.API.sR_PBDs = 1 - && List.mem_assoc Smint.Sr_supports_local_caching - (Sm.features_of_driver srrec.API.sR_type) + && Smint.Feature.( + has_capability Sr_supports_local_caching + (Sm.features_of_driver srrec.API.sR_type) + ) ) hosts_and_srs in diff --git a/ocaml/xapi/xapi_services.ml b/ocaml/xapi/xapi_services.ml index 8a71c2aca0c..a413e4c3630 100644 --- a/ocaml/xapi/xapi_services.ml +++ b/ocaml/xapi/xapi_services.ml @@ -203,6 +203,12 @@ let put_handler (req : Http.Request.t) s _ = ignore (Import_raw_vdi.import (Some vdi) req s ()) | [""; services; "SM"; "nbd"; sr; vdi; dp] when services = _services -> Storage_migrate.nbd_handler req s sr vdi dp + | [""; services; "SM"; "nbd"; vm; sr; vdi; dp] when services = _services + -> + Storage_migrate.nbd_handler req s ~vm sr vdi dp + | [""; services; "SM"; "nbdproxy"; vm; sr; vdi; dp] + when services = _services -> + Storage_migrate.nbd_proxy req s vm sr vdi dp | _ -> Http_svr.headers s (Http.http_404_missing ~version:"1.0" ()) ; req.Http.Request.close <- true diff --git a/ocaml/xapi/xapi_sm.ml b/ocaml/xapi/xapi_sm.ml index 9badc179c06..769484ddd7f 100644 --- a/ocaml/xapi/xapi_sm.ml +++ b/ocaml/xapi/xapi_sm.ml @@ -36,7 +36,7 @@ let create_from_query_result ~__context q = let r = Ref.make () and u = Uuidx.to_string (Uuidx.make ()) in let open Storage_interface in if String.lowercase_ascii q.driver <> "storage_access" then ( - let features = Smint.parse_string_int64_features q.features in + let features = Smint.Feature.parse_string_int64 q.features in let capabilities = List.map fst features in info "%s Registering SM plugin %s (version %s)" __FUNCTION__ (String.lowercase_ascii q.driver) @@ -59,7 +59,7 @@ to pending features of host [self]. It then returns a list of currently pending let addto_pending_hosts_features ~__context self new_features = let host = Helpers.get_localhost ~__context in let new_features = - List.map (fun (f, v) -> Smint.unparse_feature (f, v)) new_features + List.map (fun (f, v) -> Smint.Feature.unparse (f, v)) new_features in let curr_pending_features = Db.SM.get_host_pending_features ~__context ~self @@ -74,7 +74,7 @@ let addto_pending_hosts_features ~__context self new_features = ) curr_pending_features ; List.map - (fun (h, f) -> (h, Smint.parse_string_int64_features f)) + (fun (h, f) -> (h, Smint.Feature.parse_string_int64 f)) curr_pending_features let valid_hosts_pending_features ~__context pending_features = @@ -84,14 +84,14 @@ let valid_hosts_pending_features ~__context pending_features = [] ) else List.map snd pending_features |> fun l -> - List.fold_left Smint.compat_features + List.fold_left Smint.Feature.compat_features (* The list in theory cannot be empty due to the if condition check, but do this just in case *) (List.nth_opt l 0 |> Option.fold ~none:[] ~some:Fun.id) (List.tl l) let remove_valid_features_from_pending ~__context ~self valid_features = - let valid_features = List.map Smint.unparse_feature valid_features in + let valid_features = List.map Smint.Feature.unparse valid_features in let new_pending_feature = Db.SM.get_host_pending_features ~__context ~self |> List.map (fun (h, pending_features) -> @@ -107,7 +107,7 @@ let update_from_query_result ~__context (self, r) q_result = let driver_filename = Sm_exec.cmd_name q_result.driver in let existing_features = Db.SM.get_features ~__context ~self in let new_features = - Smint.parse_string_int64_features q_result.features + Smint.Feature.parse_string_int64 q_result.features |> find_pending_features existing_features |> addto_pending_hosts_features ~__context self |> valid_hosts_pending_features ~__context diff --git a/ocaml/xapi/xapi_sr.ml b/ocaml/xapi/xapi_sr.ml index a40a644ba04..b6d8caf5dda 100644 --- a/ocaml/xapi/xapi_sr.ml +++ b/ocaml/xapi/xapi_sr.ml @@ -234,7 +234,7 @@ let call_probe ~__context ~host:_ ~device_config ~_type ~sm_config ~f = let rpc = rpc end)) in let dbg = Context.string_of_task __context in - Storage_access.transform_storage_exn (fun () -> + Storage_utils.transform_storage_exn (fun () -> Client.SR.probe dbg queue device_config sm_config |> f ) @@ -482,7 +482,7 @@ let find_or_create_rrd_vdi ~__context ~sr = let should_manage_stats ~__context sr = let sr_record = Db.SR.get_record_internal ~__context ~self:sr in let sr_features = Xapi_sr_operations.features_of_sr ~__context sr_record in - Smint.(has_capability Sr_stats sr_features) + Smint.Feature.(has_capability Sr_stats sr_features) && Helpers.i_am_srmaster ~__context ~sr let maybe_push_sr_rrds ~__context ~sr = @@ -570,7 +570,7 @@ let update ~__context ~sr = let module C = Storage_interface.StorageAPI (Idl.Exn.GenClient (struct let rpc = rpc end)) in - transform_storage_exn (fun () -> + Storage_utils.transform_storage_exn (fun () -> let sr' = Db.SR.get_uuid ~__context ~self:sr |> Storage_interface.Sr.of_string in @@ -764,7 +764,7 @@ let scan ~__context ~sr = end)) in let sr' = Ref.string_of sr in SRScanThrottle.execute (fun () -> - transform_storage_exn (fun () -> + Storage_utils.transform_storage_exn (fun () -> let sr_uuid = Db.SR.get_uuid ~__context ~self:sr in (* CA-399757: Do not update_vdis unless we are sure that the db was not changed during the scan. If it was, retry the scan operation. This @@ -851,13 +851,12 @@ let set_shared ~__context ~sr ~value = Db.SR.set_shared ~__context ~self:sr ~value let set_name_label ~__context ~sr ~value = - let open Storage_access in let task = Context.get_task_id __context in let sr' = Db.SR.get_uuid ~__context ~self:sr in let module C = Storage_interface.StorageAPI (Idl.Exn.GenClient (struct let rpc = Storage_access.rpc end)) in - transform_storage_exn (fun () -> + Storage_utils.transform_storage_exn (fun () -> C.SR.set_name_label (Ref.string_of task) (Storage_interface.Sr.of_string sr') value @@ -865,13 +864,12 @@ let set_name_label ~__context ~sr ~value = Db.SR.set_name_label ~__context ~self:sr ~value let set_name_description ~__context ~sr ~value = - let open Storage_access in let task = Context.get_task_id __context in let sr' = Db.SR.get_uuid ~__context ~self:sr in let module C = Storage_interface.StorageAPI (Idl.Exn.GenClient (struct let rpc = Storage_access.rpc end)) in - transform_storage_exn (fun () -> + Storage_utils.transform_storage_exn (fun () -> C.SR.set_name_description (Ref.string_of task) (Storage_interface.Sr.of_string sr') value diff --git a/ocaml/xapi/xapi_sr_operations.ml b/ocaml/xapi/xapi_sr_operations.ml index eef09a7d9eb..263f002d474 100644 --- a/ocaml/xapi/xapi_sr_operations.ml +++ b/ocaml/xapi/xapi_sr_operations.ml @@ -76,20 +76,21 @@ let disallowed_during_rpu : API.storage_operations_set = List.filter (fun x -> not (List.mem x all_rpu_ops)) all_ops let sm_cap_table : (API.storage_operations * _) list = + let open Smint.Feature in [ - (`vdi_create, Smint.Vdi_create) - ; (`vdi_destroy, Smint.Vdi_delete) - ; (`vdi_resize, Smint.Vdi_resize) - ; (`vdi_introduce, Smint.Vdi_introduce) - ; (`vdi_mirror, Smint.Vdi_mirror) - ; (`vdi_enable_cbt, Smint.Vdi_configure_cbt) - ; (`vdi_disable_cbt, Smint.Vdi_configure_cbt) - ; (`vdi_data_destroy, Smint.Vdi_configure_cbt) - ; (`vdi_list_changed_blocks, Smint.Vdi_configure_cbt) - ; (`vdi_set_on_boot, Smint.Vdi_reset_on_boot) - ; (`update, Smint.Sr_update) + (`vdi_create, Vdi_create) + ; (`vdi_destroy, Vdi_delete) + ; (`vdi_resize, Vdi_resize) + ; (`vdi_introduce, Vdi_introduce) + ; (`vdi_mirror, Vdi_mirror) + ; (`vdi_enable_cbt, Vdi_configure_cbt) + ; (`vdi_disable_cbt, Vdi_configure_cbt) + ; (`vdi_data_destroy, Vdi_configure_cbt) + ; (`vdi_list_changed_blocks, Vdi_configure_cbt) + ; (`vdi_set_on_boot, Vdi_reset_on_boot) + ; (`update, Sr_update) ; (* We fake clone ourselves *) - (`vdi_snapshot, Smint.Vdi_snapshot) + (`vdi_snapshot, Vdi_snapshot) ] type table = (API.storage_operations, (string * string list) option) Hashtbl.t @@ -104,7 +105,7 @@ let features_of_sr_internal ~__context ~_type = | (_, sm) :: _ -> List.filter_map (fun (name, v) -> - try Some (List.assoc name Smint.string_to_capability_table, v) + try Some (List.assoc name Smint.Feature.string_to_capability_table, v) with Not_found -> None ) sm.Db_actions.sM_features @@ -139,16 +140,14 @@ let valid_operations ~__context ?op record _ref' : table = Multiple simultaneous PBD.unplug operations are ok. *) let check_sm_features ~__context record = + let open Smint.Feature in (* First consider the backend SM features *) let sm_features = features_of_sr ~__context record in (* Then filter out the operations we don't want to see for the magic tools SR *) let sm_features = if record.Db_actions.sR_is_tools_sr then List.filter - (fun f -> - not - Smint.(List.mem (capability_of_feature f) [Vdi_create; Vdi_delete]) - ) + (fun f -> not (List.mem (capability_of f) [Vdi_create; Vdi_delete])) sm_features else sm_features @@ -157,7 +156,7 @@ let valid_operations ~__context ?op record _ref' : table = List.filter (fun op -> List.mem_assoc op sm_cap_table - && not (Smint.has_capability (List.assoc op sm_cap_table) sm_features) + && not (has_capability (List.assoc op sm_cap_table) sm_features) ) all_ops in diff --git a/ocaml/xapi/xapi_vdi.ml b/ocaml/xapi/xapi_vdi.ml index a2978de0b7f..3713f189040 100644 --- a/ocaml/xapi/xapi_vdi.ml +++ b/ocaml/xapi/xapi_vdi.ml @@ -23,7 +23,7 @@ open D (* current/allowed operations checking *) let feature_of_op = - let open Smint in + let open Smint.Feature in function | `forget | `copy | `force_unlock | `blocked -> None @@ -53,7 +53,7 @@ let check_sm_feature_error (op : API.vdi_operations) sm_features sr = | None -> Ok () | Some feature -> - if Smint.(has_capability feature sm_features) then + if Smint.Feature.(has_capability feature sm_features) then Ok () else Error (Api_errors.sr_operation_not_supported, [Ref.string_of sr]) @@ -641,7 +641,7 @@ let create ~__context ~name_label ~name_description ~sR ~virtual_size ~_type let rpc = rpc end)) in let sm_vdi = - transform_storage_exn (fun () -> + Storage_utils.transform_storage_exn (fun () -> C.VDI.create (Ref.string_of task) (Db.SR.get_uuid ~__context ~self:sR |> Storage_interface.Sr.of_string) vdi_info @@ -721,7 +721,6 @@ let introduce ~__context ~uuid ~name_label ~name_description ~sR ~_type ~sharable ~read_only:_ ~other_config ~location ~xenstore_data ~sm_config ~managed:_ ~virtual_size:_ ~physical_utilisation:_ ~metadata_of_pool:_ ~is_a_snapshot:_ ~snapshot_time:_ ~snapshot_of:_ = - let open Storage_access in debug "introduce uuid=%s name_label=%s sm_config=[ %s ]" uuid name_label (String.concat "; " (List.map (fun (k, v) -> k ^ " = " ^ v) sm_config)) ; Sm.assert_pbd_is_plugged ~__context ~sr:sR ; @@ -747,7 +746,7 @@ let introduce ~__context ~uuid ~name_label ~name_description ~sR ~_type end)) in Sm.assert_pbd_is_plugged ~__context ~sr:sR ; let vdi_info = - transform_storage_exn (fun () -> + Storage_utils.transform_storage_exn (fun () -> C.VDI.introduce (Ref.string_of task) sr' uuid sm_config location ) in @@ -835,7 +834,7 @@ let snapshot ~__context ~vdi ~driver_params = let rpc = Storage_access.rpc end)) in let newvdi = - Storage_access.transform_storage_exn (fun () -> + Storage_utils.transform_storage_exn (fun () -> try snapshot_and_clone C.VDI.snapshot ~__context ~vdi ~driver_params with Storage_interface.Storage_error (Unimplemented _) -> debug @@ -995,7 +994,7 @@ let destroy_and_data_destroy_common ~__context ~self | `data_destroy _ -> C.VDI.data_destroy in - transform_storage_exn (fun () -> + Storage_utils.transform_storage_exn (fun () -> call_f (Ref.string_of task) (Db.SR.get_uuid ~__context ~self:sr |> Storage_interface.Sr.of_string) (Storage_interface.Vdi.of_string location) @@ -1042,7 +1041,7 @@ let data_destroy = _data_destroy ~timeout:4 let resize ~__context ~vdi ~size = Sm.assert_pbd_is_plugged ~__context ~sr:(Db.VDI.get_SR ~__context ~self:vdi) ; Xapi_vdi_helpers.assert_managed ~__context ~vdi ; - Storage_access.transform_storage_exn (fun () -> + Storage_utils.transform_storage_exn (fun () -> let module C = Storage_interface.StorageAPI (Idl.Exn.GenClient (struct let rpc = Storage_access.rpc end)) in @@ -1068,7 +1067,7 @@ let generate_config ~__context ~host:_ ~vdi = ) let clone ~__context ~vdi ~driver_params = - Storage_access.transform_storage_exn (fun () -> + Storage_utils.transform_storage_exn (fun () -> try let module C = Storage_interface.StorageAPI (Idl.Exn.GenClient (struct let rpc = Storage_access.rpc @@ -1246,7 +1245,6 @@ let set_metadata_of_pool ~__context ~self ~value = let set_on_boot ~__context ~self ~value = let sr = Db.VDI.get_SR ~__context ~self in Sm.assert_pbd_is_plugged ~__context ~sr ; - let open Storage_access in let task = Context.get_task_id __context in let sr' = Db.SR.get_uuid ~__context ~self:sr |> Storage_interface.Sr.of_string @@ -1257,7 +1255,7 @@ let set_on_boot ~__context ~self ~value = let module C = Storage_interface.StorageAPI (Idl.Exn.GenClient (struct let rpc = Storage_access.rpc end)) in - transform_storage_exn (fun () -> + Storage_utils.transform_storage_exn (fun () -> C.VDI.set_persistent (Ref.string_of task) sr' vdi' (value = `persist) ) ; Db.VDI.set_on_boot ~__context ~self ~value @@ -1266,7 +1264,6 @@ let set_allow_caching ~__context ~self ~value = Db.VDI.set_allow_caching ~__context ~self ~value let set_name_label ~__context ~self ~value = - let open Storage_access in let task = Context.get_task_id __context in let sr = Db.VDI.get_SR ~__context ~self in let sr' = @@ -1278,13 +1275,12 @@ let set_name_label ~__context ~self ~value = let module C = Storage_interface.StorageAPI (Idl.Exn.GenClient (struct let rpc = Storage_access.rpc end)) in - transform_storage_exn (fun () -> + Storage_utils.transform_storage_exn (fun () -> C.VDI.set_name_label (Ref.string_of task) sr' vdi' value ) ; update ~__context ~vdi:self let set_name_description ~__context ~self ~value = - let open Storage_access in let task = Context.get_task_id __context in let sr = Db.VDI.get_SR ~__context ~self in let sr' = @@ -1296,7 +1292,7 @@ let set_name_description ~__context ~self ~value = let module C = Storage_interface.StorageAPI (Idl.Exn.GenClient (struct let rpc = Storage_access.rpc end)) in - transform_storage_exn (fun () -> + Storage_utils.transform_storage_exn (fun () -> C.VDI.set_name_description (Ref.string_of task) sr' vdi' value ) ; update ~__context ~vdi:self @@ -1362,7 +1358,7 @@ let change_cbt_status ~__context ~self ~new_cbt_enabled ~caller_name = let call_f = if new_cbt_enabled then C.VDI.enable_cbt else C.VDI.disable_cbt in - Storage_access.transform_storage_exn (fun () -> + Storage_utils.transform_storage_exn (fun () -> call_f (Ref.string_of task) sr' vdi' ) ; Db.VDI.set_cbt_enabled ~__context ~self ~value:new_cbt_enabled @@ -1404,7 +1400,7 @@ let list_changed_blocks ~__context ~vdi_from ~vdi_to = let module C = Storage_interface.StorageAPI (Idl.Exn.GenClient (struct let rpc = Storage_access.rpc end)) in - Storage_access.transform_storage_exn (fun () -> + Storage_utils.transform_storage_exn (fun () -> C.VDI.list_changed_blocks (Ref.string_of task) sr' vdi_from vdi_to ) diff --git a/ocaml/xapi/xapi_vm_migrate.ml b/ocaml/xapi/xapi_vm_migrate.ml index 4ac14efa270..93aebf7933f 100644 --- a/ocaml/xapi/xapi_vm_migrate.ml +++ b/ocaml/xapi/xapi_vm_migrate.ml @@ -160,38 +160,34 @@ end)) open Storage_interface -let assert_sr_support_operations ~__context ~vdi_map ~remote ~ops = +let assert_sr_support_operations ~__context ~vdi_map ~remote ~local_ops + ~remote_ops = let op_supported_on_source_sr vdi ops = + let open Smint.Feature in (* Check VDIs must not be present on SR which doesn't have required capability *) let source_sr = Db.VDI.get_SR ~__context ~self:vdi in let sr_record = Db.SR.get_record_internal ~__context ~self:source_sr in let sr_features = Xapi_sr_operations.features_of_sr ~__context sr_record in - if not (List.for_all (fun op -> Smint.(has_capability op sr_features)) ops) - then + if not (List.for_all (fun op -> has_capability op sr_features) ops) then raise (Api_errors.Server_error (Api_errors.sr_does_not_support_migration, [Ref.string_of source_sr]) ) in let op_supported_on_dest_sr sr ops sm_record remote = + let open Smint.Feature in (* Check VDIs must not be mirrored to SR which doesn't have required capability *) let sr_type = XenAPI.SR.get_type ~rpc:remote.rpc ~session_id:remote.session ~self:sr in - let sm_capabilities = + let sm_features = match List.filter (fun (_, r) -> r.API.sM_type = sr_type) sm_record with | [(_, plugin)] -> - plugin.API.sM_capabilities + plugin.API.sM_features |> List.filter_map of_string_int64_opt | _ -> [] in - if - not - (List.for_all - (fun op -> List.mem Smint.(string_of_capability op) sm_capabilities) - ops - ) - then + if not (List.for_all (fun op -> has_capability op sm_features) ops) then raise (Api_errors.Server_error (Api_errors.sr_does_not_support_migration, [Ref.string_of sr]) @@ -216,8 +212,8 @@ let assert_sr_support_operations ~__context ~vdi_map ~remote ~ops = in List.filter (fun (vdi, sr) -> not (is_sr_matching vdi sr)) vdi_map |> List.iter (fun (vdi, sr) -> - op_supported_on_source_sr vdi ops ; - op_supported_on_dest_sr sr ops sm_record remote + op_supported_on_source_sr vdi local_ops ; + op_supported_on_dest_sr sr remote_ops sm_record remote ) (** Check that none of the VDIs that are mapped to a different SR have CBT @@ -483,6 +479,7 @@ let remove_stale_pcis ~__context ~vm = in List.iter remove stale_pcis +(** Called on the destination side *) let pool_migrate_complete ~__context ~vm ~host:_ = let id = Db.VM.get_uuid ~__context ~self:vm in debug "VM.pool_migrate_complete %s" id ; @@ -737,6 +734,10 @@ type vdi_mirror = { snapshot_of: [`VDI] API.Ref.t ; (* API's snapshot_of reference *) do_mirror: bool (* Whether we should mirror or just copy the VDI *) + ; mirror_vm: Vm.t + (* The domain slice to which SMAPI calls should be made when mirroring this vdi *) + ; copy_vm: Vm.t + (* The domain slice to which SMAPI calls should be made when copying this vdi *) } (* For VMs (not snapshots) xenopsd does not allow remapping, so we @@ -802,7 +803,28 @@ let get_vdi_mirror __context vm vdi do_mirror = Storage_interface.Sr.of_string (Db.SR.get_uuid ~__context ~self:(Db.VDI.get_SR ~__context ~self:vdi)) in - {vdi; dp; location; sr; xenops_locator; size; snapshot_of; do_mirror} + let hash x = + let s = Digest.string x |> Digest.to_hex in + String.sub s 0 5 + in + let copy_vm = + Ref.string_of vm |> hash |> ( ^ ) "COPY" |> Storage_interface.Vm.of_string + in + let mirror_vm = + Ref.string_of vm |> hash |> ( ^ ) "MIR" |> Storage_interface.Vm.of_string + in + { + vdi + ; dp + ; location + ; sr + ; xenops_locator + ; size + ; snapshot_of + ; do_mirror + ; copy_vm + ; mirror_vm + } (* We ignore empty or CD VBDs - nothing to do there. Possible redundancy here: I don't think any VBDs other than CD VBDs can be 'empty' *) @@ -928,6 +950,8 @@ let vdi_copy_fun __context dbg vdi_map remote is_intra_pool remote_vdis so_far let with_remote_vdi remote_vdi cont = debug "Executing remote scan to ensure VDI is known to xapi" ; let remote_vdi_str = Storage_interface.Vdi.string_of remote_vdi in + debug "%s Executing remote scan to ensure VDI %s is known to xapi " + __FUNCTION__ remote_vdi_str ; XenAPI.SR.scan ~rpc:remote.rpc ~session_id:remote.session ~sr:dest_sr_ref ; let query = Printf.sprintf "(field \"location\"=\"%s\") and (field \"SR\"=\"%s\")" @@ -985,8 +1009,8 @@ let vdi_copy_fun __context dbg vdi_map remote is_intra_pool remote_vdis so_far let mirror_to_remote new_dp = let task = if not vconf.do_mirror then - SMAPI.DATA.copy dbg vconf.sr vconf.location remote.sm_url dest_sr - is_intra_pool + SMAPI.DATA.copy dbg vconf.sr vconf.location vconf.copy_vm remote.sm_url + dest_sr is_intra_pool else (* Though we have no intention of "write", here we use the same mode as the associated VBD on a mirrored VDIs (i.e. always RW). This avoids problem @@ -994,17 +1018,25 @@ let vdi_copy_fun __context dbg vdi_map remote is_intra_pool remote_vdis so_far let read_write = true in (* DP set up is only essential for MIRROR.start/stop due to their open ended pattern. It's not necessary for copy which will take care of that itself. *) - let vm = Storage_interface.Vm.of_string "0" in ignore - (SMAPI.VDI.attach3 dbg new_dp vconf.sr vconf.location vm read_write) ; - SMAPI.VDI.activate dbg new_dp vconf.sr vconf.location ; + (SMAPI.VDI.attach3 dbg new_dp vconf.sr vconf.location vconf.mirror_vm + read_write + ) ; + SMAPI.VDI.activate3 dbg new_dp vconf.sr vconf.location vconf.mirror_vm ; let id = Storage_migrate.State.mirror_id_of (vconf.sr, vconf.location) in + debug "%s mirror_vm is %s copy_vm is %s" __FUNCTION__ + (Vm.string_of vconf.mirror_vm) + (Vm.string_of vconf.copy_vm) ; (* Layering violation!! *) ignore (Storage_access.register_mirror __context id) ; - SMAPI.DATA.MIRROR.start dbg vconf.sr vconf.location new_dp remote.sm_url - dest_sr is_intra_pool + (* XXX I really do not understand why we need to pass copy_vm and then + mirror_vm. The storage_interface defines mirror_vm first. But logging + suggests that Storage_mux receives copy_vm first so I had to reverse the ordering + of these two params to make it work. *) + SMAPI.DATA.MIRROR.start dbg vconf.sr vconf.location new_dp vconf.copy_vm + vconf.mirror_vm remote.sm_url dest_sr is_intra_pool in let mapfn x = let total = Int64.to_float total_size in @@ -1509,7 +1541,8 @@ let migrate_send' ~__context ~vm ~dest ~live:_ ~vdi_map ~vif_map ~vgpu_map ) vifs in - (* Destroy the local datapaths - this allows the VDIs to properly detach, invoking the migrate_finalize calls *) + (* Destroy the local datapaths - this allows the VDIs to properly detach, + invoking the migrate_finalize calls *) List.iter (fun mirror_record -> if mirror_record.mr_mirrored then @@ -1531,7 +1564,8 @@ let migrate_send' ~__context ~vm ~dest ~live:_ ~vdi_map ~vif_map ~vgpu_map TaskHelper.exn_if_cancelling ~__context ; TaskHelper.set_not_cancellable ~__context ) ; - (* It's acceptable for the VM not to exist at this point; shutdown commutes with storage migrate *) + (* It's acceptable for the VM not to exist at this point; shutdown commutes + with storage migrate *) ( try Xapi_xenops.Events_from_xenopsd.with_suppressed queue_name dbg vm_uuid (fun () -> @@ -1747,7 +1781,10 @@ let assert_can_migrate ~__context ~vm ~dest ~live:_ ~vdi_map ~vif_map ~options ) vms_vdis ; (* operations required for migration *) - let required_sr_operations = [Smint.Vdi_mirror; Smint.Vdi_snapshot] in + let required_src_sr_operations = Smint.Feature.[Vdi_snapshot; Vdi_mirror] in + let required_dst_sr_operations = + Smint.Feature.[Vdi_snapshot; Vdi_mirror_in] + in let host_from = Helpers.LocalObject source_host_ref in ( match migration_type ~__context ~remote with | `intra_pool -> @@ -1760,7 +1797,8 @@ let assert_can_migrate ~__context ~vm ~dest ~live:_ ~vdi_map ~vif_map ~options (Api_errors.Server_error (Api_errors.not_supported_during_upgrade, [])) ; (* Check VDIs are not migrating to or from an SR which doesn't have required_sr_operations *) assert_sr_support_operations ~__context ~vdi_map ~remote - ~ops:required_sr_operations ; + ~local_ops:required_src_sr_operations + ~remote_ops:required_dst_sr_operations ; let snapshot = Db.VM.get_record ~__context ~self:vm in let do_cpuid_check = not force in Xapi_vm_helpers.assert_can_boot_here ~__context ~self:vm @@ -1792,7 +1830,8 @@ let assert_can_migrate ~__context ~vm ~dest ~live:_ ~vdi_map ~vif_map ~options let power_state = Db.VM.get_power_state ~__context ~self:vm in (* Check VDIs are not migrating to or from an SR which doesn't have required_sr_operations *) assert_sr_support_operations ~__context ~vdi_map ~remote - ~ops:required_sr_operations ; + ~local_ops:required_src_sr_operations + ~remote_ops:required_dst_sr_operations ; (* The copy mode is only allow on stopped VM *) if (not force) && copy && power_state <> `Halted then raise diff --git a/ocaml/xapi/xapi_vm_snapshot.ml b/ocaml/xapi/xapi_vm_snapshot.ml index 49f745a8845..4ef856d1630 100644 --- a/ocaml/xapi/xapi_vm_snapshot.ml +++ b/ocaml/xapi/xapi_vm_snapshot.ml @@ -104,7 +104,7 @@ let checkpoint ~__context ~vm ~new_name = in (* Check if SR has snapshot feature *) let sr_has_snapshot_feature sr = - Smint.has_capability Vdi_snapshot + Smint.Feature.(has_capability Vdi_snapshot) (Xapi_sr_operations.features_of_sr ~__context sr) in List.iter diff --git a/ocaml/xenopsd/lib/xenops_server.ml b/ocaml/xenopsd/lib/xenops_server.ml index f4c784faa11..a5719582b77 100644 --- a/ocaml/xenopsd/lib/xenops_server.ml +++ b/ocaml/xenopsd/lib/xenops_server.ml @@ -2725,6 +2725,23 @@ and perform_exn ?subtask ?result (op : operation) (t : Xenops_task.task_handle) ) ; debug "VM.migrate: Synchronisation point 1" in + let pause_src_vm () = + debug + "VM.migrate: pause src vm before allowing destination to proceed" ; + (* cleanup tmp src VM *) + let atomics = + [ + VM_hook_script_stable + ( id + , Xenops_hooks.VM_pre_destroy + , Xenops_hooks.reason__suspend + , new_src_id + ) + ] + @ atomics_of_operation (VM_shutdown (new_src_id, None)) + in + perform_atomics atomics t + in let final_handshake () = Handshake.send vm_fd Handshake.Success ; debug "VM.migrate: Synchronisation point 3" ; @@ -2765,7 +2782,10 @@ and perform_exn ?subtask ?result (op : operation) (t : Xenops_task.task_handle) the main VM migration sequence. *) match VGPU_DB.ids id with | [] -> - first_handshake () ; save () ; final_handshake () + first_handshake () ; + save () ; + pause_src_vm () ; + final_handshake () | (_vm_id, dev_id) :: _ -> let url = make_url "/migrate/vgpu/" @@ -2782,20 +2802,12 @@ and perform_exn ?subtask ?result (op : operation) (t : Xenops_task.task_handle) first_handshake () ; save ~vgpu_fd:(FD vgpu_fd) () ) ; + pause_src_vm () ; final_handshake () ) ; - (* cleanup tmp src VM *) - let atomics = - [ - VM_hook_script_stable - ( id - , Xenops_hooks.VM_pre_destroy - , Xenops_hooks.reason__suspend - , new_src_id - ) - ] - @ atomics_of_operation (VM_shutdown (new_src_id, None)) - @ [ + let cleanup_src_vm () = + let atomics = + [ VM_hook_script_stable ( id , Xenops_hooks.VM_post_destroy @@ -2804,8 +2816,10 @@ and perform_exn ?subtask ?result (op : operation) (t : Xenops_task.task_handle) ) ; VM_remove new_src_id ] + in + perform_atomics atomics t in - perform_atomics atomics t + cleanup_src_vm () | VM_receive_memory { vmr_id= id diff --git a/quality-gate.sh b/quality-gate.sh index a7ffefea72b..cfef6614e00 100755 --- a/quality-gate.sh +++ b/quality-gate.sh @@ -25,7 +25,7 @@ verify-cert () { } mli-files () { - N=497 + N=496 # do not count ml files from the tests in ocaml/{tests/quicktest} MLIS=$(git ls-files -- '**/*.mli' | grep -vE "ocaml/tests|ocaml/quicktest|ocaml/message-switch/core_test" | xargs -I {} sh -c "echo {} | cut -f 1 -d '.'" \;) MLS=$(git ls-files -- '**/*.ml' | grep -vE "ocaml/tests|ocaml/quicktest|ocaml/message-switch/core_test" | xargs -I {} sh -c "echo {} | cut -f 1 -d '.'" \;)