Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix curl error #7

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 154 additions & 18 deletions agent/agent.ml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,16 @@ let run_command ~command =
let out = Lwt_io.(read_lines (of_fd ~mode:Input lwt_ofd)) in
let lwt_efd, efd = Lwt_unix.pipe_in () in
let err = Lwt_io.(read_lines (of_fd ~mode:Input lwt_efd)) in
let status = Lwt_process.exec ~env:(Array.append !env additional_env) ~stdout:(`FD_copy ofd) ~stderr:(`FD_copy efd) command in
let status =
let result = Lwt_process.exec ~env:(Array.append !env additional_env) ~stdout:(`FD_copy ofd) ~stderr:(`FD_copy efd) command in
let parse_result = function
| Unix.WEXITED 0 ->
Lwt.return true
| Unix.WEXITED _ | WSIGNALED _ | WSTOPPED _ ->
Lwt.return false
in
Lwt.bind result parse_result
in
Lwt.return (status, out, err)

let authentication key req =
Expand All @@ -68,8 +77,123 @@ let authentication key req =
| _ ->
false

module File_transfer = struct
type t = {
src : string;
dst : string;
} [@@deriving of_yojson]

type direction = Upload | Download

let process_body body =
let json = Yojson.Safe.from_string body in
of_yojson json |> R.reword_error R.msg

let upload cmd out_push err_push () =
let (let*) = Lwt.bind in
let* f = Lwt_io.open_file ~mode:Input cmd.src in
let safe_read ic =
let buf = Bytes.create 4096 in
let aux () =
let* data = Lwt_io.read_into ic buf 0 4096 in
match data with
| 0 -> Lwt.return None
| i -> Lwt.return @@ Some (Bytes.sub_string buf 0 i)
in
Lwt_stream.from aux
in
let body = Cohttp_lwt.Body.of_stream (safe_read f) in
let uri = Uri.of_string cmd.dst in
let* response, rbody = Cohttp_lwt_unix.Client.put ~chunked:false ~body uri in
let* result = match response.status with
| #Cohttp.Code.success_status ->
let () = out_push (Some "Upload successful.\n") in
Lwt.return true
| #Cohttp.Code.server_error_status ->
let* msg = Cohttp_lwt.Body.to_string rbody in
let () = err_push (Some (Fmt.str "Upload failed from server with body:\n %s\n" msg)) in
Lwt.return false
| _ as c ->
let* msg = Cohttp_lwt.Body.to_string rbody in
let code = Cohttp.Code.string_of_status c in
let () = err_push (Some (Fmt.str "Upload failed from server with code %s and body:\n %s\n" code msg)) in
Lwt.return false
in
let* () = Lwt_io.close f in
Lwt.return result

let download cmd out_push err_push () =
let (let*) = Lwt.bind in
let uri = Uri.of_string cmd.src in
let* response, rbody = Cohttp_lwt_unix.Client.get uri in
let* result = match response.status with
| #Cohttp.Code.success_status ->
let* f = Lwt_io.open_file ~mode:Output cmd.dst in
let body = Cohttp_lwt.Body.to_stream rbody in
let safe_write oc lines = Lwt_stream.iter_s (fun line -> Lwt_io.write oc line) lines in
let* () = safe_write f body in
let* () = Lwt_io.close f in
let () = out_push (Some "Download successful.\n") in
Lwt.return true
| #Cohttp.Code.server_error_status ->
let* msg = Cohttp_lwt.Body.to_string rbody in
let () = err_push (Some (Fmt.str "Download failed from server with body:\n %s\n" msg)) in
Lwt.return false
| _ as c ->
let* msg = Cohttp_lwt.Body.to_string rbody in
let code = Cohttp.Code.string_of_status c in
let () = err_push (Some (Fmt.str "Download failed from server with code %s and body:\n %s\n" code msg)) in
Lwt.return false
in
Lwt.return result

let close out_push err_push =
out_push (None);
err_push (None);
()

let process direction body =
let (let+) = Lwt_result.bind in
let (let*) = Lwt.bind in
let+ cmd = Lwt.return @@ process_body body in
let out_stream, out_push = Lwt_stream.create () in
let err_stream, err_push = Lwt_stream.create () in
let rec promise n () =
let* result = match direction with
| Upload -> upload cmd out_push err_push ()
| Download -> download cmd out_push err_push ()
in
match result with
| true ->
let () = close out_push err_push in Lwt.return true
| false -> if n <= 1 then let () = close out_push err_push in Lwt.return false else promise (n - 1) ()
in
Lwt_result.return (promise 3 (), out_stream, err_stream)

let handle direction body =
let (let+) = Lwt.bind in
let+ result = process direction body in
match result with
| Ok (p, out, err) -> Lwt.return (p, out, err)
| Error `Msg e ->
let out_stream, out_push = Lwt_stream.create () in
let err_stream, err_push = Lwt_stream.create () in
let () = out_push (Some (Fmt.str "Failed to transfer file: %s" e)) in
let () = out_push (None) in
let () = err_push (None) in
let promise = Lwt.return false in
Lwt.return (promise, out_stream, err_stream)

let handle_upload body =
handle Upload body

let handle_download body =
handle Download body
end

let http_command _req body =
let%lwt command = Cohttp_lwt.Body.to_string body in
let (let+) = Lwt.bind in
let+ command = Cohttp_lwt.Body.to_string body in
let promise =
run_command ~command:(Lwt_process.shell command)
in
Expand All @@ -78,25 +202,34 @@ let http_command _req body =
cmds := (id, promise) :: !cmds ;
Server.respond_string ~status:`Accepted ~body:(string_of_int id) ()

let http_file_transfer fn _req body =
let (let+) = Lwt.bind in
let+ raw_body = Cohttp_lwt.Body.to_string body in
let promise = fn raw_body in
let id = !next_id in
next_id := id + 1 ;
cmds := (id, promise) :: !cmds ;
Server.respond_string ~status:`Accepted ~body:(string_of_int id) ()

(* TOOD: Figure out how to handle this better. Probably should use norest once that gets open sourced.*)
let get_command req =
let uri = Cohttp_lwt_unix.Request.uri req in
let bind = R.bind in
let%bind str_id =
let (let+) = R.bind in
let+ str_id =
match Uri.get_query_param uri "id" with
| None ->
R.error (`Bad_request, "must supply an id parameter that is a number.")
| Some i ->
R.ok i
in
let%bind id =
let+ id =
match int_of_string_opt str_id with
| None ->
R.error (`Bad_request, "Your id parameter must be a number.")
| Some x ->
R.ok x
in
let%bind promise =
let+ promise =
match List.assoc_opt id !cmds with
| None ->
R.error (`Not_found, "No such command.")
Expand All @@ -120,33 +253,32 @@ let handle_output job_id out err =
Lwt.return (String.concat "\n" (lines))

let http_check_command req body =
let%lwt () = Cohttp_lwt.Body.drain_body body in
let (let+) = Lwt.bind in
let+ () = Cohttp_lwt.Body.drain_body body in
match get_command req with
| Error (code, msg) ->
Server.respond_string ~status:code ~body:msg ()
| Ok (job_id, p) -> (
let%lwt status, out, err = p in
let+ status, out, err = p in
match Lwt.state status with
| Lwt.Sleep ->
let%lwt current_logs = handle_output job_id out err in
let+ current_logs = handle_output job_id out err in
Server.respond_string ~status:`Accepted
~body:current_logs ()
| Lwt.Fail e ->
Server.respond_string ~status:`Internal_server_error
~body:(sprintf "Something went very wrong will running command: %s." (Printexc.to_string e)) ()
~body:(sprintf "Something went wrong while processing command: %s." (Printexc.to_string e)) ()
| Lwt.Return (s) -> (
let%lwt current_logs = handle_output job_id out err in
let+ current_logs = handle_output job_id out err in
match s with
| Unix.WEXITED 0 ->
| true ->
Server.respond_string ~status:`OK ~body:current_logs ()
| Unix.WEXITED i ->
Server.respond_string ~status:`Unprocessable_entity ~body:(Printf.sprintf "job failed with error code %d, logs:\n%s\n" i current_logs) ()
| _ ->
Server.respond_string ~status:`Unprocessable_entity
~body:(sprintf "failure to run command. logs are \n%s\n" current_logs) () ) )
| false ->
Server.respond_string ~status:`Unprocessable_entity ~body:(Printf.sprintf "job failed error, logs:\n%s\n" current_logs) ()))

let http_set_env _req body =
let%lwt json_body = Cohttp_lwt.Body.to_string body in
let (let+) = Lwt.bind in
let+ json_body = Cohttp_lwt.Body.to_string body in
let json = Yojson.Basic.from_string json_body in
(* TODO: if this isn't an assoc, this will crash. *)
let dirty_pairs = Yojson.Basic.Util.to_assoc json in
Expand Down Expand Up @@ -182,6 +314,10 @@ let router req body =
match Uri.path uri with
| "/command" ->
http_command req body
| "/upload" ->
http_file_transfer File_transfer.handle_upload req body
| "/download" ->
http_file_transfer File_transfer.handle_download req body
| "/check_command" ->
http_check_command req body
| "/set_env" ->
Expand Down
4 changes: 2 additions & 2 deletions agent/dune
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(executable
(name agent)
(libraries lwt.unix cmdliner cohttp-lwt-unix yojson rresult)
(preprocess (pps lwt_ppx ocaml-monadic)))
(libraries lwt.unix cmdliner cohttp-lwt-unix yojson rresult ppx_deriving_yojson.runtime)
(preprocess (pps ppx_deriving_yojson)))

(alias
(name makecloud-agent)
Expand Down
42 changes: 28 additions & 14 deletions engine/provider_aws.ml
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,7 @@ module Aws : Provider_template.Provider = struct
Lwt.return None

(*TODO Please no more string types.*)
let send_command box s ~expire_time : (string, [> R.msg] * string) result Lwt.t =
let uri = Uri.of_string ("http://" ^ box ^ ":8000/command") in
let send_command cmd_uri s ~expire_time : (string, [> R.msg] * string) result Lwt.t =
let headers = Cohttp.Header.init_with "ApiKey" (Sys.getenv "MC_KEY") in
let rec repeat_until_ok f c =
match c with
Expand All @@ -209,7 +208,7 @@ module Aws : Provider_template.Provider = struct
in
let send_command () =
let body = Cohttp_lwt.Body.of_string s in
let%lwt resp, body = Cohttp_lwt_unix.Client.put uri ~headers ~body in
let%lwt resp, body = Cohttp_lwt_unix.Client.put cmd_uri ~headers ~body in
let%lwt body = Cohttp_lwt.Body.to_string body in
let process_response x =
match Cohttp.Response.status x with
Expand All @@ -230,18 +229,19 @@ module Aws : Provider_template.Provider = struct
process_response resp
in
let%lwt _resp, body =
match%lwt repeat_until_ok send_command 10 with
match%lwt repeat_until_ok send_command 60 with
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is good!

| Ok (r, b) ->
Lwt.return (r, b)
| Error _ ->
failwith
| Error (`Msg message, note) ->
Lwt.fail_with (Fmt.str

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we leave this as failwith (or even Fmt.failwith)? The lwt docs mention that it might be better to prefer failwith over Lwt.fail_with if we don't attempt to store or catch the rejected promise further up the chain since failwith will result in the runtime recording the backtrace.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair question. In theory using the Lwt ppx should give decent backtraces. But that's not happening here so something is missing somewhere.

"Can't talk to an agent, this probably means the agent failed to \
install."
install. Error: %s - %s"
message note)
in
(*TODO cohttp_retry*)
let poll_agent () =
let check_uri =
Uri.of_string ("http://" ^ box ^ ":8000/check_command")
Uri.with_path cmd_uri ("/check_command")
in
let check_uri = Uri.add_query_param' check_uri ("id", body) in
match%lwt Cohttp_lwt_unix.Client.get check_uri ~headers with
Expand Down Expand Up @@ -332,19 +332,33 @@ module Aws : Provider_template.Provider = struct
let%bind _store_result = store_ami ?profile ~settings ~n ~guid waiting_image in
Lwt.return_ok waiting_image

let runcmd transfer t (params : Lib.run_parameters) (settings : Settings.t) (n : Node.real_node) guid (cmd : Command.t) :
let make_file_transfer_payload src dst =
let json : Yojson.Safe.t = `Assoc [("src", `String src); ("dst", `String dst)] in
Yojson.Safe.to_string json

let runcmd transfer_fn t (params : Lib.run_parameters) (settings : Settings.t) (n : Node.real_node) guid (cmd : Command.t) :
(string, [> R.msg] * string) result Lwt.t =
let%lwt () = Node.node_log n (Command.to_string cmd) in
let expire_time = 12 * Node.rnode_get_expire_time n in
let base_uri = Uri.make ~scheme:"http" ~port:8000 ~host:t.ip_address () in
match cmd with
| Command.(Run shell_cmd) ->
send_command t.ip_address ~expire_time shell_cmd
let u = Uri.with_path base_uri "/command" in
send_command u ~expire_time shell_cmd
| Upload (first_arg, second_arg) ->
send_command t.ip_address ~expire_time
@@ transfer ~first_arg ~second_arg ~verb:`Put
let u = Uri.with_path base_uri "/upload" in
let uri = Uri.to_string (transfer_fn (sprintf "/%s/%s/%s" guid n.name second_arg) `Put) in
let payload = make_file_transfer_payload first_arg uri in
send_command u ~expire_time payload
| Download (first_arg, second_arg) ->
send_command t.ip_address ~expire_time
@@ transfer ~first_arg ~second_arg ~verb:`Get
let u = Uri.with_path base_uri "/download" in
let uri = if Uri.of_string first_arg |> Uri.scheme |> Option.is_none then
Uri.to_string (transfer_fn (sprintf "/%s/%s" guid first_arg) `Get)
else
first_arg
in
let payload = make_file_transfer_payload uri second_arg in
send_command u ~expire_time payload
| Publish ->
let%lwt image_id = publish_image ?profile:params.aws_profile ~t ~settings ~n ~guid in
(match image_id with
Expand Down
2 changes: 1 addition & 1 deletion engine/provider_template.ml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ module type Provider = sig
val wait_until_ready : t -> Node.real_node -> unit -> unit option Lwt.t

val runcmd :
(first_arg:string -> second_arg:string -> verb:verb -> string)
(string -> [`Get | `Put] -> Uri.t)
-> t
-> Lib.run_parameters
-> Settings.t
Expand Down
Loading