Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix curl error #7

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
fail-fast: false
matrix:
os:
- ubuntu-latest
- ubuntu-18.04
ocaml-version:
- 4.11.2

Expand Down
178 changes: 160 additions & 18 deletions agent/agent.ml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,16 @@ let run_command ~command =
let out = Lwt_io.(read_lines (of_fd ~mode:Input lwt_ofd)) in
let lwt_efd, efd = Lwt_unix.pipe_in () in
let err = Lwt_io.(read_lines (of_fd ~mode:Input lwt_efd)) in
let status = Lwt_process.exec ~env:(Array.append !env additional_env) ~stdout:(`FD_copy ofd) ~stderr:(`FD_copy efd) command in
let status =
let result = Lwt_process.exec ~env:(Array.append !env additional_env) ~stdout:(`FD_copy ofd) ~stderr:(`FD_copy efd) command in
let parse_result = function
| Unix.WEXITED 0 ->
Lwt.return true
| Unix.WEXITED _ | WSIGNALED _ | WSTOPPED _ ->
Lwt.return false
in
Lwt.bind result parse_result
in
Lwt.return (status, out, err)

let authentication key req =
Expand All @@ -68,8 +77,129 @@ let authentication key req =
| _ ->
false

module File_transfer = struct
type t = {
src : string;
dst : string;
} [@@deriving of_yojson]

type direction = Upload | Download

let process_body body =
let json = Yojson.Safe.from_string body in
of_yojson json |> R.reword_error R.msg

let upload cmd out_push err_push () =
let%lwt f = Lwt_io.open_file ~mode:Input cmd.src in
let safe_read ic =
let buf = Bytes.create 4096 in
let aux () =
let%lwt data = Lwt_io.read_into ic buf 0 4096 in
match data with
| 0 -> Lwt.return None
| i -> Lwt.return @@ Some (Bytes.sub_string buf 0 i)
in
Lwt_stream.from aux
in
let body = Cohttp_lwt.Body.of_stream (safe_read f) in
let uri = Uri.of_string cmd.dst in
let%lwt result =
match%lwt Cohttp_lwt_unix.Client.put ~chunked:false ~body uri with
| response, rbody -> begin
match response.status with
| #Cohttp.Code.success_status ->
let%lwt () = Cohttp_lwt.Body.drain_body rbody in
let () = out_push (Some "Upload successful.\n") in
Lwt.return true
| #Cohttp.Code.server_error_status ->
let%lwt msg = Cohttp_lwt.Body.to_string rbody in
let () = err_push (Some (Fmt.str "Upload failed from server with body:\n %s\n" msg)) in
Lwt.return false
| _ as c ->
let%lwt msg = Cohttp_lwt.Body.to_string rbody in
let code = Cohttp.Code.string_of_status c in
let () = err_push (Some (Fmt.str "Upload failed from server with code %s and body:\n %s\n" code msg)) in
Lwt.return false
end
| exception e ->
let () = err_push (Some (Fmt.str "Upload failed with exception %a" Fmt.exn e)) in
Lwt.return false
in
let%lwt () = Lwt_io.close f in
Lwt.return result

let download cmd out_push err_push () =
let (let*) = Lwt.bind in
let uri = Uri.of_string cmd.src in
let* response, rbody = Cohttp_lwt_unix.Client.get uri in
let* result = match response.status with
| #Cohttp.Code.success_status ->
let* f = Lwt_io.open_file ~mode:Output cmd.dst in
let body = Cohttp_lwt.Body.to_stream rbody in
let safe_write oc lines = Lwt_stream.iter_s (fun line -> Lwt_io.write oc line) lines in
let* () = safe_write f body in
let* () = Lwt_io.close f in
let () = out_push (Some "Download successful.\n") in
Lwt.return true
| #Cohttp.Code.server_error_status ->
let* msg = Cohttp_lwt.Body.to_string rbody in
let () = err_push (Some (Fmt.str "Download failed from server with body:\n %s\n" msg)) in
Lwt.return false
| _ as c ->
let* msg = Cohttp_lwt.Body.to_string rbody in
let code = Cohttp.Code.string_of_status c in
let () = err_push (Some (Fmt.str "Download failed from server with code %s and body:\n %s\n" code msg)) in
Lwt.return false
in
Lwt.return result

let close out_push err_push =
out_push (None);
err_push (None);
()

let process direction body =
let (let+) = Lwt_result.bind in
let (let*) = Lwt.bind in
let+ cmd = Lwt.return @@ process_body body in
let out_stream, out_push = Lwt_stream.create () in
let err_stream, err_push = Lwt_stream.create () in
let rec promise n () =
let* result = match direction with
| Upload -> upload cmd out_push err_push ()
| Download -> download cmd out_push err_push ()
in
match result with
| true ->
let () = close out_push err_push in Lwt.return true
| false -> if n <= 1 then let () = close out_push err_push in Lwt.return false else promise (n - 1) ()
in
Lwt_result.return (promise 3 (), out_stream, err_stream)

let handle direction body =
let (let+) = Lwt.bind in
let+ result = process direction body in
match result with
| Ok (p, out, err) -> Lwt.return (p, out, err)
| Error `Msg e ->
let out_stream, out_push = Lwt_stream.create () in
let err_stream, err_push = Lwt_stream.create () in
let () = out_push (Some (Fmt.str "Failed to transfer file: %s" e)) in
let () = out_push (None) in
let () = err_push (None) in
let promise = Lwt.return false in
Lwt.return (promise, out_stream, err_stream)

let handle_upload body =
handle Upload body

let handle_download body =
handle Download body
end

let http_command _req body =
let%lwt command = Cohttp_lwt.Body.to_string body in
let (let+) = Lwt.bind in
let+ command = Cohttp_lwt.Body.to_string body in
let promise =
run_command ~command:(Lwt_process.shell command)
in
Expand All @@ -78,25 +208,34 @@ let http_command _req body =
cmds := (id, promise) :: !cmds ;
Server.respond_string ~status:`Accepted ~body:(string_of_int id) ()

let http_file_transfer fn _req body =
let (let+) = Lwt.bind in
let+ raw_body = Cohttp_lwt.Body.to_string body in
let promise = fn raw_body in
let id = !next_id in
next_id := id + 1 ;
cmds := (id, promise) :: !cmds ;
Server.respond_string ~status:`Accepted ~body:(string_of_int id) ()

(* TOOD: Figure out how to handle this better. Probably should use norest once that gets open sourced.*)
let get_command req =
let uri = Cohttp_lwt_unix.Request.uri req in
let bind = R.bind in
let%bind str_id =
let (let+) = R.bind in
let+ str_id =
match Uri.get_query_param uri "id" with
| None ->
R.error (`Bad_request, "must supply an id parameter that is a number.")
| Some i ->
R.ok i
in
let%bind id =
let+ id =
match int_of_string_opt str_id with
| None ->
R.error (`Bad_request, "Your id parameter must be a number.")
| Some x ->
R.ok x
in
let%bind promise =
let+ promise =
match List.assoc_opt id !cmds with
| None ->
R.error (`Not_found, "No such command.")
Expand All @@ -120,33 +259,32 @@ let handle_output job_id out err =
Lwt.return (String.concat "\n" (lines))

let http_check_command req body =
let%lwt () = Cohttp_lwt.Body.drain_body body in
let (let+) = Lwt.bind in
let+ () = Cohttp_lwt.Body.drain_body body in
match get_command req with
| Error (code, msg) ->
Server.respond_string ~status:code ~body:msg ()
| Ok (job_id, p) -> (
let%lwt status, out, err = p in
let+ status, out, err = p in
match Lwt.state status with
| Lwt.Sleep ->
let%lwt current_logs = handle_output job_id out err in
let+ current_logs = handle_output job_id out err in
Server.respond_string ~status:`Accepted
~body:current_logs ()
| Lwt.Fail e ->
Server.respond_string ~status:`Internal_server_error
~body:(sprintf "Something went very wrong will running command: %s." (Printexc.to_string e)) ()
~body:(sprintf "Something went wrong while processing command: %s." (Printexc.to_string e)) ()
| Lwt.Return (s) -> (
let%lwt current_logs = handle_output job_id out err in
let+ current_logs = handle_output job_id out err in
match s with
| Unix.WEXITED 0 ->
| true ->
Server.respond_string ~status:`OK ~body:current_logs ()
| Unix.WEXITED i ->
Server.respond_string ~status:`Unprocessable_entity ~body:(Printf.sprintf "job failed with error code %d, logs:\n%s\n" i current_logs) ()
| _ ->
Server.respond_string ~status:`Unprocessable_entity
~body:(sprintf "failure to run command. logs are \n%s\n" current_logs) () ) )
| false ->
Server.respond_string ~status:`Unprocessable_entity ~body:(Printf.sprintf "job failed error, logs:\n%s\n" current_logs) ()))

let http_set_env _req body =
let%lwt json_body = Cohttp_lwt.Body.to_string body in
let (let+) = Lwt.bind in
let+ json_body = Cohttp_lwt.Body.to_string body in
let json = Yojson.Basic.from_string json_body in
(* TODO: if this isn't an assoc, this will crash. *)
let dirty_pairs = Yojson.Basic.Util.to_assoc json in
Expand Down Expand Up @@ -182,6 +320,10 @@ let router req body =
match Uri.path uri with
| "/command" ->
http_command req body
| "/upload" ->
http_file_transfer File_transfer.handle_upload req body
| "/download" ->
http_file_transfer File_transfer.handle_download req body
| "/check_command" ->
http_check_command req body
| "/set_env" ->
Expand Down
4 changes: 2 additions & 2 deletions agent/dune
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(executable
(name agent)
(libraries lwt.unix cmdliner cohttp-lwt-unix yojson rresult)
(preprocess (pps lwt_ppx ocaml-monadic)))
(libraries lwt.unix cmdliner cohttp-lwt-unix yojson rresult ppx_deriving_yojson.runtime)
(preprocess (pps lwt_ppx ppx_deriving_yojson)))

(alias
(name makecloud-agent)
Expand Down
2 changes: 1 addition & 1 deletion cli/dune
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
(name main)
(public_name mc)
(libraries engine lwt.unix cmdliner yaml.unix aws-ec2 aws-lwt aws-s3-lwt rresult uuidm digestif astring)
(preprocess (pps lwt_ppx ppx_defer ocaml-monadic )))
(preprocess (pps lwt_ppx ppx_defer)))

(alias
(name makecloud-cli)
Expand Down
4 changes: 2 additions & 2 deletions cli/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ open Engine.Lib

let main repo_dir nocache deploy target_nodes dont_delete profile =
let dont_delete = match dont_delete with | Some x -> [x] | None -> [] in
let params = Engine.Lib.make_params ~repo_dir ~nocache ~deploy ~target_nodes ~dont_delete ?aws_profile:profile in
let params = Engine.Lib.make_params ~repo_dir ~nocache ~deploy ~target_nodes ~dont_delete ~aws_profile:profile in
Lwt_main.run (Engine.Runner.main params)

let check repo_dir =
Expand Down Expand Up @@ -117,7 +117,7 @@ let show_all_cache profile repo_dir =
let print_node_cache (n : Engine.Node.real_node) =
let name = Engine.Node.node_to_string (Engine.Node.Rnode n) in
let%lwt rstatus =
Engine.Runner.AwsRunner.check_cache ?profile ~settings ~cwd:repo_dir ~n
Engine.Runner.AwsRunner.check_cache ~profile ~settings ~cwd:repo_dir ~n
in
let status = R.is_ok rstatus in
let%lwt hash = Engine.Node.hash_of_node repo_dir n in
Expand Down
2 changes: 1 addition & 1 deletion engine/dune
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
(name engine)
(libraries jingoo lwt.unix cmdliner yaml.unix aws-ec2 aws-lwt aws-s3-lwt rresult uuidm digestif astring ppx_protocol_conv_yaml ppx_protocol_conv irmin-unix )
(preprocessor_deps "userdata_scripts/linux.txt" "userdata_scripts/windows.txt")
(preprocess (pps lwt_ppx ppx_defer ocaml-monadic ppx_protocol_conv ppx_deriving.show ppx_deriving.eq ppx_blob ppx_irmin)))
(preprocess (pps lwt_ppx ppx_protocol_conv ppx_deriving.show ppx_deriving.eq ppx_blob ppx_irmin)))

6 changes: 3 additions & 3 deletions engine/lib.ml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type run_parameters =
; aws_profile : string option
; guid : Uuidm.t }

let make_params ?aws_profile ~repo_dir ~nocache ~deploy ~target_nodes ~dont_delete =
let make_params ~aws_profile ~repo_dir ~nocache ~deploy ~target_nodes ~dont_delete =
let guid = Uuidm.v4_gen (Random.State.make_self_init ()) () in
{ repo_dir; nocache; deploy; target_nodes; guid; dont_delete; aws_profile }

Expand Down Expand Up @@ -46,8 +46,8 @@ let get_string str =
R.error (R.msg "not a string")

let get_string_from_attrib_list assoc_list key =
let bind = R.bind in
let%bind pair = get_value assoc_list key in
let ( let* ) = R.bind in
let* pair = get_value assoc_list key in
get_string (snd pair)

let result_fold fn acc input_list =
Expand Down
Loading