Skip to content

Commit

Permalink
Merge pull request #5388 from Vincent-lau/private/shul2/revert-switch
Browse files Browse the repository at this point in the history
CA-388064: Revert "Protocol_{lwt,async}: process requests concurrently"
  • Loading branch information
robhoes authored Jan 25, 2024
2 parents 8ab1fe3 + 3e84ef2 commit 1e11e6a
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 38 deletions.
3 changes: 1 addition & 2 deletions ocaml/message-switch/async/protocol_async.ml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ module M = struct

let map f t = Deferred.map ~f t

let iter_dontwait f t =
Deferred.don't_wait_for @@ Deferred.List.iter ~how:`Parallel t ~f
let iter f t = Deferred.List.iter t ~f

let any = Deferred.any

Expand Down
39 changes: 12 additions & 27 deletions ocaml/message-switch/core/make.ml
Original file line number Diff line number Diff line change
Expand Up @@ -352,42 +352,33 @@ functor

let listen ~process ~switch:port ~queue:name () =
let token = Printf.sprintf "%d" (Unix.getpid ()) in
let reconnect () =
M.connect port >>= fun request_conn ->
Connection.rpc request_conn (In.Login token) >>|= fun (_ : string) ->
M.connect port >>= fun reply_conn ->
Connection.rpc reply_conn (In.Login token) >>|= fun (_ : string) ->
return (Ok (request_conn, reply_conn))
in
reconnect () >>|= fun ((request_conn, reply_conn) as c) ->
M.connect port >>= fun c ->
Connection.rpc c (In.Login token) >>|= fun (_ : string) ->
Connection.rpc c (In.CreatePersistent name) >>|= fun _ ->
let request_shutdown = M.Ivar.create () in
let on_shutdown = M.Ivar.create () in
let mutex = M.Mutex.create () in
Connection.rpc request_conn (In.CreatePersistent name) >>|= fun _ ->
let t = {request_shutdown; on_shutdown} in
let rec loop c from =
let transfer = {In.from; timeout; queues= [name]} in
let frame = In.Transfer transfer in
let message = Connection.rpc request_conn frame in
let message = Connection.rpc c frame in
any [map (fun _ -> ()) message; M.Ivar.read request_shutdown]
>>= fun () ->
if is_determined (M.Ivar.read request_shutdown) then (
M.Ivar.fill on_shutdown () ; return (Ok ())
) else
message >>= function
| Error _e ->
M.Mutex.with_lock mutex (fun () ->
M.disconnect request_conn >>= fun () ->
M.disconnect reply_conn >>= fun () -> reconnect ()
)
>>|= fun c -> loop c from
M.connect port >>= fun c ->
Connection.rpc c (In.Login token) >>|= fun (_ : string) ->
loop c from
| Ok raw -> (
let transfer = Out.transfer_of_rpc (Jsonrpc.of_string raw) in
match transfer.Out.messages with
| [] ->
loop c from
| _ :: _ ->
iter_dontwait
iter
(fun (i, m) ->
process m.Message.payload >>= fun response ->
( match m.Message.kind with
Expand All @@ -403,20 +394,14 @@ functor
}
)
in
M.Mutex.with_lock mutex (fun () ->
Connection.rpc reply_conn request
)
>>= fun _ -> return ()
Connection.rpc c request >>= fun _ -> return ()
)
>>= fun () ->
let request = In.Ack i in
M.Mutex.with_lock mutex (fun () ->
Connection.rpc reply_conn request
)
>>= fun _ -> return ()
Connection.rpc c request >>= fun _ -> return ()
)
transfer.Out.messages ;
loop c (Some transfer.Out.next)
transfer.Out.messages
>>= fun () -> loop c (Some transfer.Out.next)
)
in
let _ = loop c None in
Expand Down
2 changes: 1 addition & 1 deletion ocaml/message-switch/core/s.ml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ module type BACKEND = sig

val map : ('a -> 'b) -> 'a t -> 'b t

val iter_dontwait : ('a -> unit t) -> 'a list -> unit
val iter : ('a -> unit t) -> 'a list -> unit t

val any : 'a t list -> 'a t

Expand Down
2 changes: 1 addition & 1 deletion ocaml/message-switch/lwt/protocol_lwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ module M = struct

let map = Lwt.map

let iter_dontwait f lst = Lwt.async (fun () -> Lwt_list.iter_p f lst)
let iter = Lwt_list.iter_s

let any = Lwt.choose

Expand Down
7 changes: 0 additions & 7 deletions ocaml/message-switch/switch/switch_main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,6 @@ module Lwt_result = struct
let ( >>= ) m f = m >>= fun x -> f (Stdlib.Result.get_ok x)
end

let exn_hook e =
let bt = Printexc.get_raw_backtrace () in
error "Caught exception in Lwt.async: %s" (Printexc.to_string e) ;
error "backtrace: %s" (Printexc.raw_backtrace_to_string bt)

let () = Lwt.async_exception_hook := exn_hook

let make_server config trace_config =
let open Config in
info "Started server on %s" config.path ;
Expand Down

0 comments on commit 1e11e6a

Please sign in to comment.