Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix async backend #20

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .merlin
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ PKG oUnit
PKG result
PKG bigstring
PKG ppx_deriving.std
PKG core
PKG async

S lib
Expand Down
92 changes: 47 additions & 45 deletions lib/nanomsg_async.ml
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
open Core.Std
open Async.Std

module Bytes = Caml.Bytes

open Nanomsg_utils
open Nanomsg

Expand All @@ -12,80 +10,84 @@ let ready sock io_event =
| `Read -> recv_fd
in
f sock |> function
| `Error _ -> return @@ `Bad_fd
| `Ok fd ->
| Error _ -> return @@ `Bad_fd
| Ok fd ->
let fd = Fd.create ~avoid_nonblock_if_possible:true
(Fd.Kind.Socket `Passive) fd
Info.(of_string "nanomsg pollfd") in
Fd.ready_to fd io_event

let send_buf blitf lenf sock buf pos len =
if pos < 0 || len < 0 || pos + len > lenf buf
then return @@ Result.Error ("Internal", "bounds")
module type STRING = sig
type t
val length : t -> int
include Blit.S_distinct with type src = t and type dst = Bigstring.t
end

let send_buf (type t) (module S : STRING with type t = t) ?(pos=0) ?len sock buf =
let buflen = S.length buf in
let len = Option.value len ~default:buflen in
if pos < 0 || len < 0 || pos + len > buflen
then return @@ Result.fail ("Internal", "bounds")
else
C.nn_allocmsg (Unsigned.Size_t.of_int len) 0 |> function
| None -> return @@ error ()
| Some nn_buf ->
let nn_buf_p = Ctypes.(allocate (ptr void) nn_buf) in
let ba = Ctypes.(bigarray_of_ptr array1 len
Bigarray.char @@ from_voidp char nn_buf) in
blitf buf pos ba 0 len;
S.blit buf pos ba 0 len;
ready sock `Write >>| function
| `Bad_fd | `Closed -> Result.Error ("Internal", "`Bad_fd | `Closed")
| `Bad_fd | `Closed -> Result.fail ("Internal", "`Bad_fd | `Closed")
| `Ready ->
let _ =
C.nn_send (Obj.magic sock : int)
nn_buf_p (Unsigned.Size_t.of_int (-1))
Symbol.(value_of_name_exn "NN_DONTWAIT") in
Result.Ok ()

let send_bigstring_buf = send_buf Bigstring.blit Bigstring.size
let send_bytes_buf = send_buf Bigstring.blit_of_bytes Bytes.length

let send_bigstring sock buf =
send_bigstring_buf sock buf 0 @@ Bigstring.size buf

let send_bytes sock b =
send_bytes_buf sock b 0 (Bytes.length b)

let send_string_buf sock s pos len =
send_bytes_buf sock (Bytes.unsafe_of_string s) pos len

let send_string sock s =
send_bytes_buf sock (Bytes.unsafe_of_string s) 0 (String.length s)
Result.return ()

let recv sock f =
let recv sock =
let open Ctypes in
let ba_start_p = allocate (ptr void) null in
ready sock `Read >>= function
ready sock `Read >>| function
| `Bad_fd | `Closed ->
return @@ Result.Error ("Internal", "`Bad_fd | `Closed")
Result.fail ("Internal", "`Bad_fd | `Closed")
| `Ready ->
let nb_recv = C.nn_recv (Obj.magic sock : int)
ba_start_p (Unsigned.Size_t.of_int (-1))
Symbol.(value_of_name_exn "NN_DONTWAIT") in
let ba_start = !@ ba_start_p in
let ba = bigarray_of_ptr array1 nb_recv
Bigarray.char (from_voidp char ba_start) in
f ba >>| fun res ->
let (_:int) = C.nn_freemsg ba_start in
Result.Ok res
Result.return ba

let recv_bytes_buf sock buf pos =
recv sock (fun ba ->
let len = Bigstring.size ba in
Bigstring.blit_to_bytes ba 0 buf pos len;
return len
)
module String = struct
module S = struct
include String
type src = string
type dst = Bigstring.t
include Bigstring.From_string
end

let recv_bytes sock =
recv sock (fun ba ->
let len = Bigstring.size ba in
let buf = Bytes.create len in
Bigstring.blit_to_bytes ba 0 buf 0 len;
return buf
)
let send = send_buf (module S : STRING with type t = String.t)
let recv ?(pos=0) ?len sock buf =
let maxlen = String.length buf - pos in
let maxlen = Option.value_map len ~default:maxlen ~f:(fun l -> Int.min l maxlen) in
recv sock >>| function
| Error err -> Error err
| Ok ba ->
let len = Int.min maxlen (Bigstring.length ba) in
Bigstring.To_string.blit ~src:ba ~src_pos:0 ~dst:buf ~dst_pos:pos ~len;
Result.return ()
end

let recv_string sock =
recv_bytes sock >>| Nanomsg_utils.Res.map Bytes.unsafe_to_string
module Bigstring = struct
module BS = struct
include Bigstring
type src = t
type dst = t
end

let send = send_buf (module BS : STRING with type t = Bigstring.t)
let recv = recv
end
39 changes: 9 additions & 30 deletions lib/nanomsg_async.mli
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,12 @@ open Core.Std
open Async.Std
open Nanomsg

(** {1 Asynchronous I/O} *)

(** {2 Zero-copy I/O} *)

val send_bigstring : socket -> Bigstring.t ->
(unit, error) Result.result Deferred.t

val send_bigstring_buf : socket -> Bigstring.t -> int -> int ->
(unit, error) Result.result Deferred.t

val send_string : socket -> string -> (unit, error) Result.result Deferred.t

val send_string_buf : socket -> string -> int -> int ->
(unit, error) Result.result Deferred.t

val send_bytes : socket -> Bytes.t -> (unit, error) Result.result Deferred.t

val send_bytes_buf : socket -> Bytes.t -> int -> int ->
(unit, error) Result.result Deferred.t

val recv : socket -> (Bigstring.t -> 'a Deferred.t) -> ('a, error) Result.result Deferred.t
(** [recv sock f] applies [f] to the received message. The
argument of [f] gets unallocated after [f] returns, so make sure
[f] {b never} let a reference to its argument escape. *)

(** {2 Legacy I/O} *)

val recv_string : socket -> (string, error) Result.result Deferred.t
val recv_bytes : socket -> (Bytes.t, error) Result.result Deferred.t
val recv_bytes_buf : socket -> Bytes.t -> int -> (int, error) Result.result Deferred.t
module String : sig
val send : ?pos:int -> ?len:int -> socket -> String.t -> (unit, error) Result.t Deferred.t
val recv : ?pos:int -> ?len:int -> socket -> String.t -> (unit, error) Result.t Deferred.t
end

module Bigstring : sig
val send : ?pos:int -> ?len:int -> socket -> Bigstring.t -> (unit, error) Result.t Deferred.t
val recv : socket -> (Bigstring.t, error) Result.t Deferred.t
end