-
Notifications
You must be signed in to change notification settings - Fork 11
/
lisp-api.lisp
141 lines (125 loc) · 5.87 KB
/
lisp-api.lisp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
(in-package #:pzmq)
(defmacro with-message (name &body body)
"Initialize and free ZMQ message around body."
`(with-foreign-object (,name '(:struct %msg))
(msg-init ,name)
(unwind-protect (progn ,@body)
(msg-close ,name))))
(defmacro with-messages ((&rest names) &body body)
(if (not names) `(progn ,@body)
`(with-message ,(first names)
(with-messages ,(rest names)
,@body))))
(defun recv-string (socket &key dontwait (encoding cffi:*default-foreign-encoding*))
"Receive a message part from a socket as a string.
@return{A string as the primary value, and the value of the socket
option :rcvmore as the secondary value.}"
(with-message msg
(msg-recv msg socket :dontwait dontwait)
(values
(foreign-string-to-lisp (msg-data msg) :count (msg-size msg) :encoding encoding)
(getsockopt socket :rcvmore))))
(defun recv-octets (socket &key dontwait)
"Receive a message part from a socket as an octet vector.
@return{A (vector (unsigned-byte 8)) as the primary value, and the value
of the socket option :rcvmore as the secondary value.}"
(with-message msg
(msg-recv msg socket :dontwait dontwait)
(values
(foreign-array-to-lisp (msg-data msg) `(:array :unsigned-char ,(msg-size msg))
:element-type '(unsigned-byte 8))
(getsockopt socket :rcvmore))))
(defvar *default-context* nil
"Implicit context from @fun{WITH-CONTEXT} for @fun{WITH-SOCKET}.")
(defmacro with-context (name-and-options &body body)
"Initialize and terminate ZMQ context around body.
Use NIL for @em{anonymous context}, stored in @variable{*DEFAULT-CONTEXT*}.
Omit @fun{WITH-CONTEXT} altogether, and @fun{WITH-SOCKET} will establish it by
itself.
Note: unwind-protected @fun{CTX-TERM} will not return until all governed
sockets have sent all queued messages, unless they limited their wait time
with :LINGER socket parameter.
@arg[name-and-options]{name | (name options)}
@arg[name]{name | NIL}
@arg[options]{:io-threads INT :max-sockets INT; as per @fun{CTX-SET}}"
(let ((name name-and-options)
(options nil))
(when (listp name)
(setf name (car name-and-options)
options (cdr name-and-options)))
`(let* ((*default-context* (ctx-new))
,@(if name `((,name *default-context*))))
(unwind-protect
(progn
,(when options
`(ctx-set *default-context* ,@options))
,@body)
(ctx-term ,(or name '*default-context*))))))
(defmacro with-socket (name-and-context type-and-options &body body)
"Initialize and close ZMQ socket around body. Type is one of the types accepted
by @fun{SOCKET}. Options are passed to @fun{SETSOCKOPT} one by one.
When TYPE is :SUB, and :SUBSCRIBE is not given in OPTIONS, imply subscribe to all.
If this is undesirable, provide :SUBSCRIBE NIL.
When context is not specified, it either comes from surrounding @fun{WITH-CONTEXT}
or @fun{WITH-SOCKET} in @variable{*DEFAULT-CONTEXT*}, or is established by this
@fun{WITH-SOCKET} and stored in @variable{*DEFAULT-CONTEXT*} for the timespan of
this block.
@arg[name-and-context]{name | (name context)}
@arg[type-and-options]{type | (type :option1 value1 :option2 value2 ...)}"
(let* ((context-p (listp name-and-context))
(options-p (listp type-and-options))
(name (if context-p (car name-and-context) name-and-context))
(context (if context-p (cadr name-and-context) '*default-context*))
(type (if options-p (car type-and-options) type-and-options))
(options (when options-p (cdr type-and-options)))
(implicit-context (gensym (string '#:implicit-context))))
(when (and (eq type :sub) (not (member :subscribe options)))
(setf options (list* :subscribe "" options)))
`(let ((,implicit-context (not (or ,context-p *default-context*))))
(when ,implicit-context (setf *default-context* (ctx-new)))
(unwind-protect
(let ((,name (socket ,context ,type)))
(unwind-protect
(progn
,@(loop
for (option value) on options by #'cddr
collect `(setsockopt ,name ,option ,value))
,@body)
(close ,name)))
(when ,implicit-context
(ctx-term (prog1 *default-context*
(setf *default-context* nil))))))))
(defmacro with-sockets ((&rest socket-definitions) &body body)
"Nest multiple sockets."
(loop for definition in (reverse socket-definitions)
for form = `(with-socket ,@definition ,@body)
then `(with-socket ,@definition ,form)
finally (return form)))
(defmacro with-poll-items (name (&rest items) &body body)
"Prepare POLLITEM array in NAME.
Without parentheses, an item indicates subscription to both :pollin and :pollout.
@arg[items]{(item ...)}
@arg[item]{name | (name [:pollin] [:pollout])}"
(let ((nitems (length items)))
`(with-foreign-object (,name '(:struct pollitem) ,nitems)
,@(loop
for item in items
for offset from 0
for (%socket . %events) = (if (atom item) (list item) item)
when (zerop (length %events))
do (setf %events (list :pollin :pollout))
collect `(with-foreign-slots
((socket events)
(mem-aptr ,name '(:struct pollitem) ,offset)
(:struct pollitem))
(setf socket ,%socket
events ',%events)))
(let ((,name (cons ,name ,nitems)))
,@body))))
(defun revents (items subscript)
"Return a list of events - :pollin, :pollout or both - that happened to an
indicated item, counting from 0.
@return{([:pollin] [:pollout])}"
(assert (< -1 subscript (cdr items)))
(let ((item-ptr (mem-aptr (car items) '(:struct pollitem) subscript)))
(foreign-slot-value item-ptr '(:struct pollitem) 'revents)))