1 /*
2 This Source Code Form is subject to the terms of the Mozilla Public
3 License, v. 2.0. If a copy of the MPL was not distributed with this
4 file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 */
6 
7 /**
8 $(ZMQD) – a thin wrapper around the low-level C API of the
9 $(LINK2 http://zeromq.org,$(ZMQ)) messaging framework.
10 
11 Most functions in this module have a one-to-one relationship with functions
12 in the underlying C API.  Some adaptations have been made to make the API
13 safer, easier and more pleasant to use; most importantly:
14 $(UL
15     $(LI
16         Errors are signalled by means of exceptions rather than return
17         codes.  The $(REF ZmqException) class provides
18         a standard textual message for any error condition, but it also
19         provides access to the $(D errno) code set by the C function
20         that reported the error.)
21     $(LI
22         Functions are marked with $(D @safe), $(D pure) and $(D nothrow)
23         as appropriate, thus facilitating their use in high-level D code.)
24     $(LI
25         Memory and resources (i.e. contexts, sockets and messages) are
26         automatically managed, thus preventing leaks.)
27     $(LI
28         Context, socket and message options are implemented as properties.)
29 )
30 The names of functions and types in $(ZMQD) are very similar to those in
31 $(ZMQ), but they follow the D naming conventions.  Thus, the library should
32 feel both familiar to $(ZMQ) users and natural to D users.  A notable
33 deviation from the C API is that message parts are consistently called
34 "frames".  For example, $(D zmq_msg_send()) becomes $(D zmqd.Frame.send())
35 and so on.  (Multipart messages were a late addition to $(ZMQ), and the "msg"
36 function names were well established at that point.  The library's
37 developers have admitted that this is somewhat confusing, and the newer
38 CZMQ API consistently uses "frame" in function names.)
39 
40 Due to the close correspondence with the C API, this documentation has
41 intentionally been kept sparse. There is really no reason to repeat the
42 contents of the $(ZMQAPI __start,$(ZMQ) reference manual) here.
43 Instead, the documentation for each function contains a "Corresponds to"
44 section that links to the appropriate pages in the $(ZMQ) reference.  Any
45 details given in the present documentation mostly concern the D-specific
46 adaptations that have been made.
47 
48 Also note that the examples generally only use the INPROC transport.  The
49 reason for this is that the examples double as unittests, and we want to
50 avoid firewall troubles and other issues that could arise with the use of
51 network protocols such as TCP, PGM, etc., and the IPC protocol is not
52 supported on Windows.  Anyway, they are only short
53 snippets that demonstrate the syntax; for more comprehensive and realistic
54 examples, please refer to the $(LINK2 http://zguide.zeromq.org/page:all,
55 $(ZMQ) Guide).  Many of the examples in the Guide have been translated to
56 D, and can be found in the
57 $(LINK2 https://github.com/kyllingstad/zmqd/tree/master/examples,$(D examples))
58 subdirectory of the $(ZMQD) source repository.
59 
60 Version:
61     1.2.0 (compatible with $(ZMQ) >= 4.3.0)
62 Authors:
63     $(LINK2 http://github.com/kyllingstad,Lars T. Kyllingstad)
64 Copyright:
65     Copyright (c) 2013–2019, Lars T. Kyllingstad. All rights reserved.
66 License:
67     $(ZMQD) is released under the terms of the
68     $(LINK2 https://www.mozilla.org/MPL/2.0/index.txt,Mozilla Public License v. 2.0).$(BR)
69     Please refer to the $(LINK2 http://zeromq.org/area:licensing,$(ZMQ) site)
70     for details about $(ZMQ) licensing.
71 Macros:
72     D      = <code>$0</code>
73     EM     = <em>$0</em>
74     LDOTS  = &hellip;
75     QUOTE  = <blockquote>$0</blockquote>
76     FREF   = $(D $(LINK2 #$1,$1()))
77     REF    = $(D $(LINK2 #$1,$1))
78     COREF  = $(D $(LINK2 http://dlang.org/phobos/core_$1.html#.$2,core.$1.$2))
79     OBJREF = $(D $(LINK2 http://dlang.org/phobos/object.html#.$1,$1))
80     STDREF = $(D $(LINK2 http://dlang.org/phobos/std_$1.html#.$2,std.$1.$2))
81     ANCHOR = <span id="$1"></span>
82     ZMQ    = ZeroMQ
83     ZMQAPI = $(LINK2 http://api.zeromq.org/4-3:$1,$+)
84     ZMQD   = $(EM zmqd)
85     ZMQREF = $(D $(ZMQAPI $1,$1))
86 */
87 module zmqd;
88 
89 import core.time;
90 import std.typecons;
91 import deimos.zmq.zmq;
92 
93 
94 version(Windows) {
95     import core.sys.windows.winsock2: SOCKET;
96 }
97 
98 // Compile with debug=ZMQD_DisableCurveTests to be able to successfully
99 // run unittests even if the local ZMQ library is not compiled with Curve
100 // support.
101 debug (ZMQD_DisableCurveTests) { } else debug = WithCurveTests;
102 
103 // Compatibility check
104 version(unittest) static this()
105 {
106     import std.stdio: stderr;
107     const v = zmqVersion();
108     if (v.major != ZMQ_VERSION_MAJOR || v.minor != ZMQ_VERSION_MINOR) {
109         stderr.writefln(
110             "Warning: Potential ZeroMQ header/library incompatibility: "
111             ~"The header (binding) is for version %d.%d.%d, "
112             ~"while the library is version %d.%d.%d. Unittests may fail.",
113             ZMQ_VERSION_MAJOR, ZMQ_VERSION_MINOR, ZMQ_VERSION_PATCH,
114             v.major, v.minor, v.patch);
115     }
116     // Known incompatibilities
117     import std.algorithm: min, max;
118     const libVersion = ZMQ_MAKE_VERSION(v.major, v.minor, v.patch);
119     const older = min(ZMQ_VERSION, libVersion);
120     const newer = max(ZMQ_VERSION, libVersion);
121     if (older < ZMQ_MAKE_VERSION(4, 1, 0) && newer >= ZMQ_MAKE_VERSION(4, 1, 0)) {
122         stderr.writeln("Note: Version 4.1.0 is known to be ABI incompatible with older versions");
123     } else if (older < ZMQ_MAKE_VERSION(4, 1, 1) && newer >= ZMQ_MAKE_VERSION(4, 1, 1)) {
124         stderr.writeln("Note: Version 4.1.1 is known to be ABI incompatible with older versions");
125     }
126 }
127 
128 
129 @safe:
130 
131 T* ptr(T)(T[] arr) { return arr.length == 0 ? null : &arr[0]; }
132 
133 /**
134 Reports the $(ZMQ) library version.
135 
136 Returns:
137     A $(STDREF typecons,Tuple) with three integer fields that represent the
138     three versioning levels: $(D major), $(D minor) and $(D patch).
139 Corresponds_to:
140     $(ZMQREF zmq_version())
141 */
142 Tuple!(int, "major", int, "minor", int, "patch") zmqVersion() nothrow
143 {
144     typeof(return) v;
145     scopedTrusted!zmq_version(&v.major, &v.minor, &v.patch);
146     return v;
147 }
148 
149 
150 /**
151 Checks for a $(ZMQ) capability.
152 
153 Corresponds_to:
154     $(ZMQREF zmq_has())
155 */
156 bool zmqHas(const char[] capability) nothrow
157 {
158     return 0 != trusted!zmq_has(zeroTermString(capability));
159 }
160 
161 unittest
162 {
163     assert (!zmqHas("this ain't a capability"));
164 }
165 
166 
167 /**
168 The various socket types.
169 
170 These are described in the $(ZMQREF zmq_socket()) reference.
171 */
172 enum SocketType
173 {
174     req     = ZMQ_REQ,      /// Corresponds to $(D ZMQ_REQ)
175     rep     = ZMQ_REP,      /// Corresponds to $(D ZMQ_REP)
176     dealer  = ZMQ_DEALER,   /// Corresponds to $(D ZMQ_DEALER)
177     router  = ZMQ_ROUTER,   /// Corresponds to $(D ZMQ_ROUTER)
178     pub     = ZMQ_PUB,      /// Corresponds to $(D ZMQ_PUB)
179     sub     = ZMQ_SUB,      /// Corresponds to $(D ZMQ_SUB)
180     xpub    = ZMQ_XPUB,     /// Corresponds to $(D ZMQ_XPUB)
181     xsub    = ZMQ_XSUB,     /// Corresponds to $(D ZMQ_XSUB)
182     push    = ZMQ_PUSH,     /// Corresponds to $(D ZMQ_PUSH)
183     pull    = ZMQ_PULL,     /// Corresponds to $(D ZMQ_PULL)
184     pair    = ZMQ_PAIR,     /// Corresponds to $(D ZMQ_PAIR)
185     stream  = ZMQ_STREAM,   /// Corresponds to $(D ZMQ_STREAM)
186 }
187 
188 
189 /// _Security mechanisms.
190 enum Security
191 {
192     /// $(ZMQAPI zmq_null,NULL): No security or confidentiality.
193     none  = ZMQ_NULL,
194 
195     /// $(ZMQAPI zmq_plain,PLAIN): Clear-text authentication.
196     plain = ZMQ_PLAIN,
197 
198     /// $(ZMQAPI zmq_curve,CURVE): Secure authentication and confidentiality.
199     curve = ZMQ_CURVE,
200 }
201 
202 
203 /**
204 A special $(COREF time,Duration) value used to signify an infinite
205 timeout or time interval in certain contexts.
206 
207 The places where this value may be used are:
208 $(UL
209     $(LI $(LINK2 #Socket.misc_options,certain socket options))
210     $(LI $(FREF poll))
211 )
212 
213 Note:
214 Since $(D core.time.Duration) doesn't reserve such a special value, the
215 actual value of $(D infiniteDuration) is $(COREF time,Duration.max).
216 */
217 enum infiniteDuration = Duration.max;
218 
219 
220 /**
221 An object that encapsulates a $(ZMQ) socket.
222 
223 A default-initialized $(D Socket) is not a valid $(ZMQ) socket; it
224 must always be explicitly initialized with a constructor (see
225 $(FREF _Socket.this)):
226 ---
227 Socket s;                     // Not a valid socket yet
228 s = Socket(SocketType.push);  // ...but now it is.
229 ---
230 This $(D struct) is noncopyable, which means that a socket is always
231 uniquely managed by a single $(D Socket) object.  Functions that will
232 inspect or use the socket, but not take ownership of it, should take
233 a $(D ref Socket) parameter.  Use $(STDREF algorithm,move) to move
234 a $(D Socket) to a different location (e.g. into a sink function that
235 takes it by value, or into a new variable).
236 
237 The socket is automatically closed when the $(D Socket) object goes out
238 of scope.
239 
240 Linger_period:
241 Note that Socket by default sets the socket's linger period to zero.
242 This deviates from the $(ZMQ) default (which is an infinite linger period).
243 */
244 struct Socket
245 {
246 @safe:
247     /**
248     Creates a new $(ZMQ) socket.
249 
250     If $(D context) is not specified, the default _context (as returned
251     by $(FREF defaultContext)) is used.
252 
253     Throws:
254         $(REF ZmqException) if $(ZMQ) reports an error.
255     Corresponds_to:
256         $(ZMQREF zmq_socket())
257     */
258     this(SocketType type)
259     {
260         this(defaultContext(), type);
261     }
262 
263     /// ditto
264     this(Context context, SocketType type)
265     {
266         m_context = context;
267         m_type = type;
268         m_socket = trusted!zmq_socket(context.handle, type);
269         if (m_socket == null) {
270             throw new ZmqException;
271         }
272         linger = 0.msecs;
273     }
274 
275     /// With default context:
276     unittest
277     {
278         auto sck = Socket(SocketType.push);
279         assert (sck.initialized);
280     }
281     /// With explicit context:
282     unittest
283     {
284         auto ctx = Context();
285         auto sck = Socket(ctx, SocketType.push);
286         assert (sck.initialized);
287     }
288 
289     // Socket objects are noncopyable.
290     @disable this(this);
291 
292     unittest // Verify that move semantics work.
293     {
294         import std.algorithm: move;
295         struct SocketOwner
296         {
297             this(Socket s) { owned = trusted!move(s); }
298             Socket owned;
299         }
300         auto socket = Socket(SocketType.req);
301         const socketPtr = socket.handle;
302         assert (socketPtr != null);
303 
304         auto owner = SocketOwner(trusted!move(socket));
305         assert (socket.handle == null);
306         assert (!socket.m_context.initialized);
307         assert (owner.owned.handle == socketPtr);
308         assert (owner.owned.m_context.initialized);
309     }
310 
311     // Closes the socket on desctruction.
312     ~this() nothrow { nothrowClose(); }
313 
314     /**
315     Closes the $(ZMQ) socket.
316 
317     Note that the socket will be closed automatically upon destruction,
318     so it is usually not necessary to call this method manually.
319 
320     Throws:
321         $(REF ZmqException) if $(ZMQ) reports an error.
322     Corresponds_to:
323         $(ZMQREF zmq_close())
324     */
325     void close()
326     {
327         if (!nothrowClose()) throw new ZmqException;
328     }
329 
330     ///
331     unittest
332     {
333         auto s = Socket(SocketType.pair);
334         assert (s.initialized);
335         s.close();
336         assert (!s.initialized);
337     }
338 
339     /**
340     Starts accepting incoming connections on $(D endpoint).
341 
342     Throws:
343         $(REF ZmqException) if $(ZMQ) reports an error.
344     Corresponds_to:
345         $(ZMQREF zmq_bind())
346     */
347     void bind(const char[] endpoint)
348     {
349         if (trusted!zmq_bind(m_socket, zeroTermString(endpoint)) != 0) {
350             throw new ZmqException;
351         }
352     }
353 
354     ///
355     unittest
356     {
357         auto s = Socket(SocketType.pub);
358         s.bind("inproc://zmqd_bind_example");
359     }
360 
361     /**
362     Stops accepting incoming connections on $(D endpoint).
363 
364     Throws:
365         $(REF ZmqException) if $(ZMQ) reports an error.
366     Corresponds_to:
367         $(ZMQREF zmq_unbind())
368     */
369     void unbind(const char[] endpoint)
370     {
371         if (trusted!zmq_unbind(m_socket, zeroTermString(endpoint)) != 0) {
372             throw new ZmqException;
373         }
374     }
375 
376     unittest
377     {
378         auto s = Socket(SocketType.pub);
379         s.bind("inproc://zmqd_unbind_example");
380         // Do some work...
381         s.unbind("inproc://zmqd_unbind_example");
382     }
383 
384     /**
385     Creates an outgoing connection to $(D endpoint).
386 
387     Throws:
388         $(REF ZmqException) if $(ZMQ) reports an error.
389     Corresponds_to:
390         $(ZMQREF zmq_connect())
391     */
392     void connect(const char[] endpoint)
393     {
394         if (trusted!zmq_connect(m_socket, zeroTermString(endpoint)) != 0) {
395             throw new ZmqException;
396         }
397     }
398 
399     ///
400     unittest
401     {
402         auto s = Socket(SocketType.sub);
403         s.connect("inproc://zmqd_connect_example");
404     }
405 
406     /**
407     Disconnects the socket from $(D endpoint).
408 
409     Throws:
410         $(REF ZmqException) if $(ZMQ) reports an error.
411     Corresponds_to:
412         $(ZMQREF zmq_disconnect())
413     */
414     void disconnect(const char[] endpoint)
415     {
416         if (trusted!zmq_disconnect(m_socket, zeroTermString(endpoint)) != 0) {
417             throw new ZmqException;
418         }
419     }
420 
421     ///
422     unittest
423     {
424         auto s = Socket(SocketType.sub);
425         s.connect("inproc://zmqd_disconnect_example");
426         // Do some work...
427         s.disconnect("inproc://zmqd_disconnect_example");
428     }
429 
430     /**
431     Sends a message frame.
432 
433     $(D _send) blocks until the frame has been queued on the socket.
434     $(D trySend) performs the operation in non-blocking mode, and returns
435     a $(D bool) value that signifies whether the frame was queued on the
436     socket.
437 
438     The $(D more) parameter specifies whether this is a multipart message
439     and there are _more frames to follow.
440 
441     The $(D char[]) overload is a convenience function that simply casts
442     the string argument to $(D ubyte[]).
443 
444     Throws:
445         $(REF ZmqException) if $(ZMQ) reports an error.
446     Corresponds_to:
447         $(ZMQREF zmq_send()) (with the $(D ZMQ_DONTWAIT) flag, in the
448         case of $(D trySend), and with the $(D ZMQ_SNDMORE) flag if
449         $(D more == true)).
450     */
451     void send(const ubyte[] data, bool more = false)
452     {
453         immutable flags = more ? ZMQ_SNDMORE : 0;
454         if (trusted!zmq_send(m_socket, ptr(data), data.length, flags) < 0) {
455             throw new ZmqException;
456         }
457     }
458 
459     /// ditto
460     void send(const char[] data, bool more = false)
461     {
462         send(cast(const(ubyte)[]) data, more);
463     }
464 
465     /// ditto
466     bool trySend(const ubyte[] data, bool more = false)
467     {
468         immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0);
469         if (trusted!zmq_send(m_socket, ptr(data), data.length, flags) < 0) {
470             import core.stdc.errno: EAGAIN, EINTR;
471             import std.algorithm : among;
472             if (trusted!zmq_errno().among(EAGAIN, EINTR)) return false;
473             else throw new ZmqException;
474         }
475         return true;
476     }
477 
478     /// ditto
479     bool trySend(const char[] data, bool more = false)
480     {
481         return trySend(cast(const(ubyte)[]) data, more);
482     }
483 
484     ///
485     unittest
486     {
487         auto sck = Socket(SocketType.pub);
488         sck.send(cast(ubyte[]) [11, 226, 92]);
489         sck.send("Hello World!");
490     }
491 
492     /**
493     Sends a message frame.
494 
495     $(D _send) blocks until the frame has been queued on the socket.
496     $(D trySend) performs the operation in non-blocking mode, and returns
497     a $(D bool) value that signifies whether the frame was queued on the
498     socket.
499 
500     The $(D more) parameter specifies whether this is a multipart message
501     and there are _more frames to follow.
502 
503     Throws:
504         $(REF ZmqException) if $(ZMQ) reports an error.
505     Corresponds_to:
506         $(ZMQREF zmq_msg_send()) (with the $(D ZMQ_DONTWAIT) flag, in the
507         case of $(D trySend), and with the $(D ZMQ_SNDMORE) flag if
508         $(D more == true)).
509     */
510     void send(ref Frame msg, bool more = false)
511     {
512         immutable flags = more ? ZMQ_SNDMORE : 0;
513         if (trusted!zmq_msg_send(msg.handle, m_socket, flags) < 0) {
514             throw new ZmqException;
515         }
516     }
517 
518     /// ditto
519     bool trySend(ref Frame msg, bool more = false)
520     {
521         immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0);
522         if (trusted!zmq_msg_send(msg.handle, m_socket, flags) < 0) {
523             import core.stdc.errno: EAGAIN, EINTR;
524             import std.algorithm : among;
525             if (trusted!zmq_errno().among(EAGAIN, EINTR)) return false;
526             else throw new ZmqException;
527         }
528         return true;
529     }
530 
531     ///
532     unittest
533     {
534         auto sck = Socket(SocketType.pub);
535         auto msg = Frame(12);
536         msg.data.asString()[] = "Hello World!";
537         sck.send(msg);
538     }
539 
540     /**
541     Sends a constant-memory message frame.
542 
543     $(D _sendConst) blocks until the frame has been queued on the socket.
544     $(D trySendConst) performs the operation in non-blocking mode, and returns
545     a $(D bool) value that signifies whether the frame was queued on the
546     socket.
547 
548     The $(D more) parameter specifies whether this is a multipart message
549     and there are _more frames to follow.
550 
551     Throws:
552         $(REF ZmqException) if $(ZMQ) reports an error.
553     Corresponds_to:
554         $(ZMQREF zmq_send_const()) (with the $(D ZMQ_DONTWAIT) flag, in the
555         case of $(D trySend), and with the $(D ZMQ_SNDMORE) flag if
556         $(D more == true)).
557     */
558     void sendConst(immutable ubyte[] data, bool more = false)
559     {
560         immutable flags = more ? ZMQ_SNDMORE : 0;
561         if (trusted!zmq_send_const(m_socket, ptr(data), data.length, flags) < 0) {
562             throw new ZmqException;
563         }
564     }
565 
566     /// ditto
567     void sendConst(string data, bool more = false)
568     {
569         sendConst(cast(immutable(ubyte)[]) data, more);
570     }
571 
572     /// ditto
573     bool trySendConst(immutable ubyte[] data, bool more = false)
574     {
575         immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0);
576         if (trusted!zmq_send_const(m_socket, ptr(data), data.length, flags) < 0) {
577             import core.stdc.errno: EAGAIN, EINTR;
578             import std.algorithm : among;
579             if (trusted!zmq_errno().among(EAGAIN, EINTR)) return false;
580             else throw new ZmqException;
581         }
582         return true;
583     }
584 
585     /// ditto
586     bool trySendConst(string data, bool more = false)
587     {
588         return trySend(cast(immutable(ubyte)[]) data, more);
589     }
590 
591     ///
592     unittest
593     {
594         static immutable arr = cast(ubyte[]) [11, 226, 92];
595         auto sck = Socket(SocketType.pub);
596         sck.sendConst(arr);
597         sck.sendConst("Hello World!");
598     }
599 
600     /**
601     Receives a message frame.
602 
603     $(D _receive) blocks until the request can be satisfied, and returns the
604     number of bytes in the frame.
605     $(D tryReceive) performs the operation in non-blocking mode, and returns
606     a $(STDREF typecons,Tuple) which contains the size of the frame along
607     with a $(D bool) value that signifies whether a frame was received.
608     (If the latter is $(D false), the former is always set to zero.)
609 
610     Throws:
611         $(REF ZmqException) if $(ZMQ) reports an error.
612     Corresponds_to:
613         $(ZMQREF zmq_recv()) (with the $(D ZMQ_DONTWAIT) flag, in the case
614         of $(D tryReceive)).
615 
616     */
617     size_t receive(scope ubyte[] data)
618     {
619         immutable len = scopedTrusted!zmq_recv(m_socket, ptr(data), data.length, 0);
620         if (len >= 0) {
621             import std.conv;
622             return to!size_t(len);
623         } else {
624             throw new ZmqException;
625         }
626     }
627 
628     /// ditto
629     Tuple!(size_t, bool) tryReceive(ubyte[] data)
630     {
631         immutable len = trusted!zmq_recv(m_socket, ptr(data), data.length, ZMQ_DONTWAIT);
632         if (len >= 0) {
633             import std.conv;
634             return typeof(return)(to!size_t(len), true);
635         } else {
636             import core.stdc.errno: EAGAIN, EINTR;
637             import std.algorithm : among;
638             if (trusted!zmq_errno().among(EAGAIN, EINTR)) {
639                 return typeof(return)(0, false);
640             } else {
641                 throw new ZmqException;
642             }
643         }
644     }
645 
646     ///
647     unittest
648     {
649         // Sender
650         auto snd = Socket(SocketType.req);
651         snd.connect("inproc://zmqd_receive_example");
652         snd.send("Hello World!");
653 
654         // Receiver
655         import std.string: representation;
656         auto rcv = Socket(SocketType.rep);
657         rcv.bind("inproc://zmqd_receive_example");
658         char[256] buf;
659         immutable len  = rcv.receive(buf.representation);
660         assert (buf[0 .. len] == "Hello World!");
661     }
662 
663     @system unittest
664     {
665         auto snd = Socket(SocketType.pair);
666         snd.bind("inproc://zmqd_tryReceive_example");
667         auto rcv = Socket(SocketType.pair);
668         rcv.connect("inproc://zmqd_tryReceive_example");
669 
670         ubyte[256] buf;
671         auto r1 = rcv.tryReceive(buf);
672         assert (!r1[1]);
673 
674         import core.thread, core.time, std.string;
675         snd.send("Hello World!");
676         Thread.sleep(100.msecs); // Wait for message to be transferred...
677         auto r2 = rcv.tryReceive(buf);
678         assert (r2[1] && buf[0 .. r2[0]] == "Hello World!".representation);
679     }
680 
681     /**
682     Receives a message frame.
683 
684     $(D _receive) blocks until the request can be satisfied, and returns the
685     number of bytes in the frame.
686     $(D tryReceive) performs the operation in non-blocking mode, and returns
687     a $(STDREF typecons,Tuple) which contains the size of the frame along
688     with a $(D bool) value that signifies whether a frame was received.
689     (If the latter is $(D false), the former is always set to zero.)
690 
691     Throws:
692         $(REF ZmqException) if $(ZMQ) reports an error.
693     Corresponds_to:
694         $(ZMQREF zmq_msg_recv()) (with the $(D ZMQ_DONTWAIT) flag, in the case
695         of $(D tryReceive)).
696 
697     */
698     size_t receive(ref Frame msg)
699     {
700         immutable len = trusted!zmq_msg_recv(msg.handle, m_socket, 0);
701         if (len >= 0) {
702             import std.conv;
703             return to!size_t(len);
704         } else {
705             throw new ZmqException;
706         }
707     }
708 
709     /// ditto
710     Tuple!(size_t, bool) tryReceive(ref Frame msg)
711     {
712         immutable len = trusted!zmq_msg_recv(msg.handle, m_socket, ZMQ_DONTWAIT);
713         if (len >= 0) {
714             import std.conv;
715             return typeof(return)(to!size_t(len), true);
716         } else {
717             import core.stdc.errno: EAGAIN, EINTR;
718             import std.algorithm : among;
719             if (trusted!zmq_errno().among(EAGAIN, EINTR)) {
720                 return typeof(return)(0, false);
721             } else {
722                 throw new ZmqException;
723             }
724         }
725     }
726 
727     ///
728     unittest
729     {
730         // Sender
731         auto snd = Socket(SocketType.req);
732         snd.connect("inproc://zmqd_msg_receive_example");
733         snd.send("Hello World!");
734 
735         // Receiver
736         import std.string: representation;
737         auto rcv = Socket(SocketType.rep);
738         rcv.bind("inproc://zmqd_msg_receive_example");
739         auto msg = Frame();
740         rcv.receive(msg);
741         assert (msg.data.asString() == "Hello World!");
742     }
743 
744     @system unittest
745     {
746         auto snd = Socket(SocketType.pair);
747         snd.bind("inproc://zmqd_msg_tryReceive_example");
748         auto rcv = Socket(SocketType.pair);
749         rcv.connect("inproc://zmqd_msg_tryReceive_example");
750 
751         auto msg = Frame();
752         auto r1 = rcv.tryReceive(msg);
753         assert (!r1[1]);
754 
755         import core.thread, core.time, std.string;
756         snd.send("Hello World!");
757         Thread.sleep(100.msecs); // Wait for message to be transferred...
758         auto r2 = rcv.tryReceive(msg);
759         assert (r2[1] && msg.data[0 .. r2[0]] == "Hello World!".representation);
760     }
761 
762     /**
763     Whether there are _more message frames to follow.
764 
765     Throws:
766         $(REF ZmqException) if $(ZMQ) reports an error.
767     Corresponds_to:
768         $(ZMQREF zmq_getsockopt()) with $(D ZMQ_RCVMORE).
769     */
770     @property bool more() { return !!getOption!int(ZMQ_RCVMORE); }
771 
772     // TODO: Better unittest/example
773     unittest
774     {
775         auto sck = Socket(SocketType.req);
776         assert (!sck.more);
777     }
778 
779     /** $(ANCHOR Socket.misc_options)
780     Miscellaneous socket options.
781 
782     Each of these corresponds to an option passed to
783     $(ZMQREF zmq_getsockopt()) and $(ZMQREF zmq_setsockopt()). For
784     example, $(D identity) corresponds to $(D ZMQ_IDENTITY),
785     $(D receiveBufferSize) corresponds to $(D ZMQ_RCVBUF), etc.
786 
787     Notes:
788     $(UL
789         $(LI For convenience, the setter for the $(D identity) property
790             accepts strings.  To retrieve a string with the getter, use
791             the $(FREF asString) function.
792             ---
793             sck.identity = "foobar";
794             assert (sck.identity.asString() == "foobar");
795             ---
796             )
797         $(LI The $(D linger), $(D receiveTimeout), $(D sendTimeout) and
798             $(D handshakeInterval) properties may have the special _value
799             $(REF infiniteDuration).  This  is translated to an option
800             _value of -1 or 0 (depending on which property is being set)
801             in the C API.)
802         $(LI Some options have array _type, and these allow the user to supply
803             a buffer in which to store the _value, to avoid a GC allocation.
804             The return _value is then a slice of this buffer.
805             These are not marked as $(D @property), but are prefixed with
806             "get" (e.g. $(D getIdentity())).  A user-supplied buffer is
807             $(EM required) for some options, namely $(D getPlainUsername())
808             and $(D getPlainPassword()), and these do not have $(D @property)
809             versions.  $(D getCurveXxxKey()) and $(D getCurveXxxKeyZ85())
810             require buffers which are at least 32 and 41 bytes long,
811             respectively.)
812         $(LI The $(D ZMQ_SUBSCRIBE) and $(D ZMQ_UNSUBSCRIBE) options are
813             treated differently from the others; see $(FREF Socket.subscribe)
814             and $(FREF Socket.unsubscribe))
815     )
816 
817     Throws:
818         $(REF ZmqException) if $(ZMQ) reports an error.$(BR)
819         $(STDREF conv,ConvOverflowException) if a given $(D Duration) is
820             longer than the number of milliseconds that will fit in an $(D int)
821             (only applies to properties of $(COREF time,Duration) _type).$(BR)
822         $(COREF exception,RangeError) if the $(D dest) buffers passed to
823             $(D getCurveXxxKey()) or $(D getCurveXxxKeyZ85()) are less than
824             32 or 41 bytes long, respectively.
825     Corresponds_to:
826         $(ZMQREF zmq_getsockopt()) and $(ZMQREF zmq_setsockopt()).
827     */
828     @property ulong threadAffinity() { return getOption!ulong(ZMQ_AFFINITY); }
829     /// ditto
830     @property void threadAffinity(ulong value) { setOption(ZMQ_AFFINITY, value); }
831 
832     /// ditto
833     @property ubyte[] identity() { return getIdentity(new ubyte[255]); }
834     /// ditto
835     ubyte[] getIdentity(ubyte[] dest) { return getArrayOption(ZMQ_IDENTITY, dest); }
836     /// ditto
837     @property void identity(const ubyte[] value) { setArrayOption(ZMQ_IDENTITY, value); }
838     /// ditto
839     @property void identity(const char[] value) { setArrayOption(ZMQ_IDENTITY, value); }
840 
841     /// ditto
842     @property int rate() { return getOption!int(ZMQ_RATE); }
843     /// ditto
844     @property void rate(int value) { setOption(ZMQ_RATE, value); }
845 
846     /// ditto
847     @property Duration recoveryInterval()
848     {
849         return msecs(getOption!int(ZMQ_RECOVERY_IVL));
850     }
851     /// ditto
852     @property void recoveryInterval(Duration value)
853     {
854         import std.conv: to;
855         setOption(ZMQ_RECOVERY_IVL, to!int(value.total!"msecs"()));
856     }
857 
858     /// ditto
859     @property int sendBufferSize() { return getOption!int(ZMQ_SNDBUF); }
860     /// ditto
861     @property void sendBufferSize(int value) { setOption(ZMQ_SNDBUF, value); }
862 
863     /// ditto
864     @property int receiveBufferSize() { return getOption!int(ZMQ_RCVBUF); }
865     /// ditto
866     @property void receiveBufferSize(int value) { setOption(ZMQ_RCVBUF, value); }
867 
868     /// ditto
869     @property FD fd() { return getOption!FD(ZMQ_FD); }
870 
871     /// ditto
872     @property PollFlags events() { return getOption!PollFlags(ZMQ_EVENTS); }
873 
874     /// ditto
875     @property SocketType type() { return getOption!SocketType(ZMQ_TYPE); }
876 
877     /// ditto
878     @property Duration linger()
879     {
880         const auto value = getOption!int(ZMQ_LINGER);
881         return value == -1 ? infiniteDuration : value.msecs;
882     }
883     /// ditto
884     @property void linger(Duration value)
885     {
886         import std.conv: to;
887         setOption(
888             ZMQ_LINGER,
889             value == infiniteDuration ? -1 : to!int(value.total!"msecs"()));
890     }
891 
892     /// ditto
893     @property Duration reconnectionInterval()
894     {
895         return getOption!int(ZMQ_RECONNECT_IVL).msecs;
896     }
897     /// ditto
898     @property void reconnectionInterval(Duration value)
899     {
900         import std.conv: to;
901         setOption(ZMQ_RECONNECT_IVL, to!int(value.total!"msecs"()));
902     }
903 
904     /// ditto
905     @property int backlog() { return getOption!int(ZMQ_BACKLOG); }
906     /// ditto
907     @property void backlog(int value) { setOption(ZMQ_BACKLOG, value); }
908 
909     /// ditto
910     @property Duration maxReconnectionInterval()
911     {
912         return getOption!int(ZMQ_RECONNECT_IVL_MAX).msecs;
913     }
914     /// ditto
915     @property void maxReconnectionInterval(Duration value)
916     {
917         import std.conv: to;
918         setOption(ZMQ_RECONNECT_IVL_MAX, to!int(value.total!"msecs"()));
919     }
920 
921     /// ditto
922     @property long maxMsgSize() { return getOption!long(ZMQ_MAXMSGSIZE); }
923     /// ditto
924     @property void maxMsgSize(long value) { setOption(ZMQ_MAXMSGSIZE, value); }
925 
926     /// ditto
927     @property int sendHWM() { return getOption!int(ZMQ_SNDHWM); }
928     /// ditto
929     @property void sendHWM(int value) { setOption(ZMQ_SNDHWM, value); }
930 
931     /// ditto
932     @property int receiveHWM() { return getOption!int(ZMQ_RCVHWM); }
933     /// ditto
934     @property void receiveHWM(int value) { setOption(ZMQ_RCVHWM, value); }
935 
936     /// ditto
937     @property int multicastHops() { return getOption!int(ZMQ_MULTICAST_HOPS); }
938     /// ditto
939     @property void multicastHops(int value) { setOption(ZMQ_MULTICAST_HOPS, value); }
940 
941     /// ditto
942     @property Duration receiveTimeout()
943     {
944         const value = getOption!int(ZMQ_RCVTIMEO);
945         return value == -1 ? infiniteDuration : value.msecs;
946     }
947     /// ditto
948     @property void receiveTimeout(Duration value)
949     {
950         import std.conv: to;
951         setOption(
952             ZMQ_RCVTIMEO,
953             value == infiniteDuration ? -1 : to!int(value.total!"msecs"()));
954     }
955 
956     /// ditto
957     @property Duration sendTimeout()
958     {
959         const value = getOption!int(ZMQ_SNDTIMEO);
960         return value == -1 ? infiniteDuration : value.msecs;
961     }
962     /// ditto
963     @property void sendTimeout(Duration value)
964     {
965         import std.conv: to;
966         setOption(
967             ZMQ_SNDTIMEO,
968             value == infiniteDuration ? -1 : to!int(value.total!"msecs"()));
969     }
970 
971     /// ditto
972     @property char[] lastEndpoint() @trusted
973     {
974         // This function is not @safe because it calls a @system function
975         // (zmq_getsockopt) and takes the address of a local (len).
976         auto buf = new char[1024];
977         size_t len = buf.length;
978         if (zmq_getsockopt(m_socket, ZMQ_LAST_ENDPOINT, buf.ptr, &len) != 0) {
979             throw new ZmqException;
980         }
981         return buf[0 .. len-1];
982     }
983 
984     /// ditto
985     @property void routerMandatory(bool value) { setOption(ZMQ_ROUTER_MANDATORY, value ? 1 : 0); }
986 
987     /// ditto
988     @property int tcpKeepalive() { return getOption!int(ZMQ_TCP_KEEPALIVE); }
989     /// ditto
990     @property void tcpKeepalive(int value) { setOption(ZMQ_TCP_KEEPALIVE, value); }
991 
992     /// ditto
993     @property int tcpKeepaliveCnt() { return getOption!int(ZMQ_TCP_KEEPALIVE_CNT); }
994     /// ditto
995     @property void tcpKeepaliveCnt(int value) { setOption(ZMQ_TCP_KEEPALIVE_CNT, value); }
996 
997     /// ditto
998     @property int tcpKeepaliveIdle() { return getOption!int(ZMQ_TCP_KEEPALIVE_IDLE); }
999     /// ditto
1000     @property void tcpKeepaliveIdle(int value) { setOption(ZMQ_TCP_KEEPALIVE_IDLE, value); }
1001 
1002     /// ditto
1003     @property int tcpKeepaliveIntvl() { return getOption!int(ZMQ_TCP_KEEPALIVE_INTVL); }
1004     /// ditto
1005     @property void tcpKeepaliveIntvl(int value) { setOption(ZMQ_TCP_KEEPALIVE_INTVL, value); }
1006 
1007     /// ditto
1008     @property bool immediate() { return !!getOption!int(ZMQ_IMMEDIATE); }
1009     /// ditto
1010     @property void immediate(bool value) { setOption!int(ZMQ_IMMEDIATE, value ? 1 : 0); }
1011 
1012     /// ditto
1013     @property void xpubVerbose(bool value) { setOption(ZMQ_XPUB_VERBOSE, value ? 1 : 0); }
1014 
1015     /// ditto
1016     @property bool ipv6() { return !!getOption!int(ZMQ_IPV6); }
1017     /// ditto
1018     @property void ipv6(bool value) { setOption(ZMQ_IPV6, value ? 1 : 0); }
1019 
1020     /// ditto
1021     @property Security mechanism()
1022     {
1023         import std.conv;
1024         return to!Security(getOption!int(ZMQ_MECHANISM));
1025     }
1026 
1027     /// ditto
1028     @property bool plainServer() { return !!getOption!int(ZMQ_PLAIN_SERVER); }
1029     /// ditto
1030     @property void plainServer(bool value) { setOption(ZMQ_PLAIN_SERVER, value ? 1 : 0); }
1031 
1032     /// ditto
1033     char[] getPlainUsername(char[] dest)
1034     {
1035         return getCStringOption(ZMQ_PLAIN_USERNAME, dest);
1036     }
1037     /// ditto
1038     @property void plainUsername(const(char)[] value)
1039     {
1040         setArrayOption(ZMQ_PLAIN_USERNAME, value);
1041     }
1042 
1043     /// ditto
1044     char[] getPlainPassword(char[] dest)
1045     {
1046         return getCStringOption(ZMQ_PLAIN_PASSWORD, dest);
1047     }
1048     /// ditto
1049     @property void plainPassword(const(char)[] value)
1050     {
1051         setArrayOption(ZMQ_PLAIN_PASSWORD, value);
1052     }
1053 
1054     /// ditto
1055     @property bool curveServer() { return !!getOption!int(ZMQ_CURVE_SERVER); }
1056     /// ditto
1057     @property void curveServer(bool value) { setOption(ZMQ_CURVE_SERVER, value ? 1 : 0); }
1058 
1059     /// ditto
1060     @property ubyte[] curvePublicKey()
1061     {
1062         return getCurvePublicKey(new ubyte[keyBufSizeBin]);
1063     }
1064     /// ditto
1065     ubyte[] getCurvePublicKey(ubyte[] dest)
1066     {
1067         return getCurveKey(ZMQ_CURVE_PUBLICKEY, dest);
1068     }
1069     /// ditto
1070     @property char[] curvePublicKeyZ85()
1071     {
1072         return getCurvePublicKeyZ85(new char[keyBufSizeZ85]);
1073     }
1074     /// ditto
1075     char[] getCurvePublicKeyZ85(char[] dest)
1076     {
1077         return getCurveKeyZ85(ZMQ_CURVE_PUBLICKEY, dest);
1078     }
1079     /// ditto
1080     @property void curvePublicKey(const(ubyte)[] value)
1081     {
1082         setCurveKey(ZMQ_CURVE_PUBLICKEY, value);
1083     }
1084     /// ditto
1085     @property void curvePublicKeyZ85(const(char)[] value)
1086     {
1087         setCurveKeyZ85(ZMQ_CURVE_PUBLICKEY, value);
1088     }
1089 
1090     /// ditto
1091     @property ubyte[] curveSecretKey()
1092     {
1093         return getCurveSecretKey(new ubyte[keyBufSizeBin]);
1094     }
1095     /// ditto
1096     ubyte[] getCurveSecretKey(ubyte[] dest)
1097     {
1098         return getCurveKey(ZMQ_CURVE_SECRETKEY, dest);
1099     }
1100     /// ditto
1101     @property char[] curveSecretKeyZ85()
1102     {
1103         return getCurveSecretKeyZ85(new char[keyBufSizeZ85]);
1104     }
1105     /// ditto
1106     char[] getCurveSecretKeyZ85(char[] dest)
1107     {
1108         return getCurveKeyZ85(ZMQ_CURVE_SECRETKEY, dest);
1109     }
1110     /// ditto
1111     @property void curveSecretKey(const(ubyte)[] value)
1112     {
1113         setCurveKey(ZMQ_CURVE_SECRETKEY, value);
1114     }
1115     /// ditto
1116     @property void curveSecretKeyZ85(const(char)[] value)
1117     {
1118         setCurveKeyZ85(ZMQ_CURVE_SECRETKEY, value);
1119     }
1120 
1121     /// ditto
1122     @property ubyte[] curveServerKey()
1123     {
1124         return getCurveServerKey(new ubyte[keyBufSizeBin]);
1125     }
1126     /// ditto
1127     ubyte[] getCurveServerKey(ubyte[] dest)
1128     {
1129         return getCurveKey(ZMQ_CURVE_SERVERKEY, dest);
1130     }
1131     /// ditto
1132     @property char[] curveServerKeyZ85()
1133     {
1134         return getCurveServerKeyZ85(new char[keyBufSizeZ85]);
1135     }
1136     /// ditto
1137     char[] getCurveServerKeyZ85(char[] dest)
1138     {
1139         return getCurveKeyZ85(ZMQ_CURVE_SERVERKEY, dest);
1140     }
1141     /// ditto
1142     @property void curveServerKey(const(ubyte)[] value)
1143     {
1144         setCurveKey(ZMQ_CURVE_SERVERKEY, value);
1145     }
1146     /// ditto
1147     @property void curveServerKeyZ85(const(char)[] value)
1148     {
1149         setCurveKeyZ85(ZMQ_CURVE_SERVERKEY, value);
1150     }
1151 
1152     /// ditto
1153     @property void probeRouter(bool value) { setOption(ZMQ_PROBE_ROUTER, value ? 1 : 0); }
1154 
1155     /// ditto
1156     @property void reqCorrelate(bool value) { setOption(ZMQ_REQ_CORRELATE, value ? 1 : 0); }
1157 
1158     /// ditto
1159     @property void reqRelaxed(bool value) { setOption(ZMQ_REQ_RELAXED, value ? 1 : 0); }
1160 
1161     /// ditto
1162     @property void conflate(bool value) { setOption(ZMQ_CONFLATE, value ? 1 : 0); }
1163 
1164     /// ditto
1165     @property char[] zapDomain() { return getZapDomain(new char[256]); }
1166     /// ditto
1167     char[] getZapDomain(char[] dest) { return getCStringOption(ZMQ_ZAP_DOMAIN, dest); }
1168     /// ditto
1169     @property void zapDomain(const char[] value) { setArrayOption(ZMQ_ZAP_DOMAIN, value); }
1170 
1171     /// ditto
1172     @property void routerHandover(bool value) { setOption(ZMQ_ROUTER_HANDOVER, value ? 1 : 0); }
1173 
1174     /// ditto
1175     @property void connectionRID(const ubyte[] value) { setArrayOption(ZMQ_CONNECT_RID, value); }
1176 
1177     /// ditto
1178     @property int typeOfService() { return getOption!int(ZMQ_TOS); }
1179     /// ditto
1180     @property void typeOfService(int value) { setOption(ZMQ_TOS, value); }
1181 
1182     /// ditto
1183     @property bool gssapiPlaintext() { return !!getOption!int(ZMQ_GSSAPI_PLAINTEXT); }
1184     /// ditto
1185     @property void gssapiPlaintext(bool value) { setOption(ZMQ_GSSAPI_PLAINTEXT, value ? 1 : 0); }
1186 
1187     /// ditto
1188     @property char[] gssapiPrincipal() { return getGssapiPrincipal(new char[256]); }
1189     /// ditto
1190     char[] getGssapiPrincipal(char[] dest) { return getCStringOption(ZMQ_GSSAPI_PRINCIPAL, dest); }
1191     /// ditto
1192     @property void gssapiPrincipal(const char[] value) { setArrayOption(ZMQ_GSSAPI_PRINCIPAL, value); }
1193 
1194     /// ditto
1195     @property bool gssapiServer() { return !!getOption!int(ZMQ_GSSAPI_SERVER); }
1196     /// ditto
1197     @property void gssapiServer(bool value) { setOption(ZMQ_GSSAPI_SERVER, value ? 1 : 0); }
1198 
1199     /// ditto
1200     @property char[] gssapiServicePrincipal() { return getGssapiServicePrincipal(new char[256]); }
1201     /// ditto
1202     char[] getGssapiServicePrincipal(char[] dest) { return getCStringOption(ZMQ_GSSAPI_SERVICE_PRINCIPAL, dest); }
1203     /// ditto
1204     @property void gssapiServicePrincipal(const char[] value) { setArrayOption(ZMQ_GSSAPI_SERVICE_PRINCIPAL, value); }
1205 
1206     /// ditto
1207     @property Duration handshakeInterval()
1208     {
1209         const auto value = getOption!int(ZMQ_HANDSHAKE_IVL);
1210         return value == 0 ? infiniteDuration : msecs(value);
1211     }
1212     /// ditto
1213     @property void handshakeInterval(Duration value)
1214     {
1215         import std.conv: to;
1216         setOption(
1217             ZMQ_HANDSHAKE_IVL,
1218             value == infiniteDuration ? 0 : to!int(value.total!"msecs"()));
1219     }
1220 
1221     // deprecated options
1222     deprecated("Use the 'immediate' property instead")
1223     @property bool delayAttachOnConnect() { return !!getOption!int(ZMQ_DELAY_ATTACH_ON_CONNECT); }
1224 
1225     deprecated("Use the 'immediate' property instead")
1226     @property void delayAttachOnConnect(bool value) { setOption(ZMQ_DELAY_ATTACH_ON_CONNECT, value ? 1 : 0); }
1227 
1228     deprecated("Use !ipv6 instead")
1229     @property bool ipv4Only() { return !ipv6; }
1230 
1231     deprecated("Use ipv6 = !value instead")
1232     @property void ipv4Only(bool value) { ipv6 = !value; }
1233 
1234     unittest
1235     {
1236         auto s = Socket(SocketType.xpub);
1237         const e = "inproc://unittest2";
1238         s.bind(e);
1239         import core.time;
1240         s.threadAffinity = 1;
1241         assert(s.threadAffinity == 1);
1242         s.identity = cast(ubyte[]) [ 65, 66, 67 ];
1243         assert(s.identity == [65, 66, 67]);
1244         s.identity = "foo";
1245         assert(s.identity == [102, 111, 111]);
1246         s.rate = 200;
1247         assert(s.rate == 200);
1248         s.recoveryInterval = 5.seconds;
1249         assert(s.recoveryInterval == 5_000.msecs);
1250         s.sendBufferSize = 500;
1251         assert(s.sendBufferSize == 500);
1252         s.receiveBufferSize = 600;
1253         assert(s.receiveBufferSize == 600);
1254         version(Posix) {
1255             assert(s.fd > 2); // 0, 1 and 2 are the standard streams
1256         }
1257         assert(s.events == PollFlags.pollOut);
1258         assert(s.type == SocketType.xpub);
1259         s.linger = Duration.zero;
1260         assert(s.linger == Duration.zero);
1261         s.linger = 100_000.usecs;
1262         assert(s.linger == 100.msecs);
1263         s.linger = infiniteDuration;
1264         assert(s.linger == infiniteDuration);
1265         s.reconnectionInterval = 200_000.usecs;
1266         assert(s.reconnectionInterval == 200.msecs);
1267         s.backlog = 50;
1268         assert(s.backlog == 50);
1269         s.maxReconnectionInterval = 300_000.usecs;
1270         assert(s.maxReconnectionInterval == 300.msecs);
1271         s.maxMsgSize = 1000;
1272         assert(s.maxMsgSize == 1000);
1273         s.sendHWM = 500;
1274         assert(s.sendHWM == 500);
1275         s.receiveHWM = 600;
1276         assert(s.receiveHWM == 600);
1277         s.multicastHops = 2;
1278         assert(s.multicastHops == 2);
1279         s.receiveTimeout = 3.seconds;
1280         assert(s.receiveTimeout == 3_000_000.usecs);
1281         s.receiveTimeout = infiniteDuration;
1282         assert(s.receiveTimeout == infiniteDuration);
1283         s.sendTimeout = 2_000_000.usecs;
1284         assert(s.sendTimeout == 2.seconds);
1285         s.sendTimeout = infiniteDuration;
1286         assert(s.sendTimeout == infiniteDuration);
1287         s.tcpKeepalive = 1;
1288         assert(s.tcpKeepalive == 1);
1289         s.tcpKeepaliveCnt = 1;
1290         assert(s.tcpKeepaliveCnt == 1);
1291         s.tcpKeepaliveIdle = 0;
1292         assert(s.tcpKeepaliveIdle == 0);
1293         s.tcpKeepaliveIntvl = 0;
1294         assert(s.tcpKeepaliveIntvl == 0);
1295         s.immediate = true;
1296         assert(s.immediate);
1297         s.ipv6 = true;
1298         assert(s.ipv6);
1299         s.plainServer = true;
1300         assert(s.mechanism == Security.plain);
1301         assert(s.plainServer);
1302         debug (WithCurveTests) {
1303             assert(!s.curveServer);
1304             s.curveServer = true;
1305             assert(s.mechanism == Security.curve);
1306             assert(!s.plainServer);
1307             assert(s.curveServer);
1308         }
1309         s.plainServer = false;
1310         assert(s.mechanism == Security.none);
1311         assert(!s.plainServer);
1312         debug (WithCurveTests) assert(!s.curveServer);
1313         s.plainUsername = "foobar";
1314         assert(s.getPlainUsername(new char[8]) == "foobar");
1315         assert(s.mechanism == Security.plain);
1316         s.plainUsername = null;
1317         assert(s.mechanism == Security.none);
1318         s.plainPassword = "xyz";
1319         assert(s.getPlainPassword(new char[8]) == "xyz");
1320         assert(s.mechanism == Security.plain);
1321         s.plainPassword = null;
1322         assert(s.mechanism == Security.none);
1323         s.zapDomain = "my_zap_domain";
1324         assert(s.zapDomain == "my_zap_domain");
1325         s.typeOfService = 1;
1326         assert(s.typeOfService == 1);
1327         if (zmqHas("gssapi")) {
1328             s.gssapiPlaintext = true;
1329             assert(s.gssapiPlaintext);
1330             s.gssapiPrincipal = "myPrincipal";
1331             assert(s.gssapiPrincipal == "myPrincipal");
1332             s.gssapiServer = true;
1333             assert(s.gssapiServer);
1334             s.gssapiServicePrincipal = "myServicePrincipal";
1335             assert(s.gssapiServicePrincipal == "myServicePrincipal");
1336         }
1337         s.handshakeInterval = 60000.msecs;
1338         assert(s.handshakeInterval == 60000.msecs);
1339 
1340         // Test write-only options
1341         s.conflate = true;
1342     }
1343 
1344     debug (WithCurveTests) @system unittest
1345     {
1346         // The CURVE key options require some special setup, so we test them
1347         // separately.
1348         import std.array, std.range;
1349         auto binKey1 = iota(cast(ubyte) 0, cast(ubyte) 32).array();
1350         auto z85Key1 = z85Encode(binKey1);
1351         auto binKey2 = iota(cast(ubyte) 32, cast(ubyte) 64).array();
1352         auto z85Key2 = z85Encode(binKey2);
1353         auto zeroKey = repeat(cast(ubyte) 0).take(32).array();
1354         assert (z85Key1 != z85Key2);
1355 
1356         auto s = Socket(SocketType.req);
1357         s.curvePublicKey = zeroKey;
1358         s.curveSecretKey = zeroKey;
1359         s.curveServerKey = zeroKey;
1360 
1361         s.curvePublicKey = binKey1;
1362         assert (s.curvePublicKey == binKey1);
1363         assert (s.curvePublicKeyZ85 == z85Key1);
1364         s.curvePublicKeyZ85 = z85Key2;
1365         assert (s.curvePublicKey == binKey2);
1366         assert (s.curvePublicKeyZ85 == z85Key2);
1367         assert (s.curveSecretKey == zeroKey);
1368         assert (s.curveServerKey == zeroKey);
1369         s.curvePublicKey = zeroKey;
1370 
1371         s.curveSecretKey = binKey1;
1372         assert (s.curveSecretKey == binKey1);
1373         assert (s.curveSecretKeyZ85 == z85Key1);
1374         s.curveSecretKeyZ85 = z85Key2;
1375         assert (s.curveSecretKey == binKey2);
1376         assert (s.curveSecretKeyZ85 == z85Key2);
1377         assert (s.curvePublicKey == zeroKey);
1378         assert (s.curveServerKey == zeroKey);
1379         s.curveSecretKey = zeroKey;
1380 
1381         s.curveServerKey = binKey1;
1382         assert (s.curveServerKey == binKey1);
1383         assert (s.curveServerKeyZ85 == z85Key1);
1384         s.curveServerKeyZ85 = z85Key2;
1385         assert (s.curveServerKey == binKey2);
1386         assert (s.curveServerKeyZ85 == z85Key2);
1387         assert (s.curvePublicKey == zeroKey);
1388         assert (s.curveSecretKey == zeroKey);
1389         s.curveServerKey = zeroKey;
1390     }
1391 
1392     unittest
1393     {
1394         // Some options are only applicable to specific socket types.
1395         auto rt = Socket(SocketType.router);
1396         rt.routerMandatory = true;
1397         rt.probeRouter = true;
1398         rt.routerHandover = true;
1399         rt.connectionRID = cast(ubyte[]) [ 10, 20, 30 ];
1400         auto xp = Socket(SocketType.xpub);
1401         xp.xpubVerbose = true;
1402         auto rq = Socket(SocketType.req);
1403         rq.reqCorrelate = true;
1404         rq.reqRelaxed = true;
1405     }
1406 
1407     deprecated unittest
1408     {
1409         // Test deprecated socket options
1410         auto s = Socket(SocketType.req);
1411         assert(s.ipv4Only);
1412         assert(!s.delayAttachOnConnect);
1413 
1414         // Test setters and getters together
1415         s.ipv4Only = false;
1416         assert(!s.ipv4Only);
1417         s.delayAttachOnConnect = true;
1418         assert(s.delayAttachOnConnect);
1419     }
1420 
1421     /**
1422     Establishes a message filter.
1423 
1424     Throws:
1425         $(REF ZmqException) if $(ZMQ) reports an error.
1426     Corresponds_to:
1427         $(ZMQREF zmq_msg_setsockopt()) with $(D ZMQ_SUBSCRIBE).
1428     */
1429     void subscribe(const ubyte[] filterPrefix)
1430     {
1431         setArrayOption(ZMQ_SUBSCRIBE, filterPrefix);
1432     }
1433     /// ditto
1434     void subscribe(const  char[] filterPrefix)
1435     {
1436         setArrayOption(ZMQ_SUBSCRIBE, filterPrefix);
1437     }
1438 
1439     ///
1440     unittest
1441     {
1442         // Create a subscriber that accepts all messages that start with
1443         // the prefixes "foo" or "bar".
1444         auto sck = Socket(SocketType.sub);
1445         sck.subscribe("foo");
1446         sck.subscribe("bar");
1447     }
1448 
1449     @system unittest
1450     {
1451         void sleep(int ms) {
1452             import core.thread, core.time;
1453             Thread.sleep(dur!"msecs"(ms));
1454         }
1455         auto pub = Socket(SocketType.pub);
1456         pub.bind("inproc://zmqd_subscribe_unittest");
1457         auto sub = Socket(SocketType.sub);
1458         sub.connect("inproc://zmqd_subscribe_unittest");
1459 
1460         pub.send("Hello");
1461         sleep(100);
1462         sub.subscribe("He");
1463         sub.subscribe(cast(ubyte[])['W', 'o']);
1464         sleep(100);
1465         pub.send("Heeee");
1466         pub.send("World");
1467         sleep(100);
1468         ubyte[5] buf;
1469         sub.receive(buf);
1470         assert(buf.asString() == "Heeee");
1471         sub.receive(buf);
1472         assert(buf.asString() == "World");
1473     }
1474 
1475     /**
1476     Removes a message filter.
1477 
1478     Throws:
1479         $(REF ZmqException) if $(ZMQ) reports an error.
1480     Corresponds_to:
1481         $(ZMQREF zmq_msg_setsockopt()) with $(D ZMQ_SUBSCRIBE).
1482     */
1483     void unsubscribe(const ubyte[] filterPrefix)
1484     {
1485         setArrayOption(ZMQ_UNSUBSCRIBE, filterPrefix);
1486     }
1487     /// ditto
1488     void unsubscribe(const char[] filterPrefix)
1489     {
1490         setArrayOption(ZMQ_UNSUBSCRIBE, filterPrefix);
1491     }
1492 
1493     ///
1494     unittest
1495     {
1496         // Subscribe to messages that start with "foo" or "bar".
1497         auto sck = Socket(SocketType.sub);
1498         sck.subscribe("foo");
1499         sck.subscribe("bar");
1500         // ...
1501         // From now on, only accept messages that start with "bar"
1502         sck.unsubscribe("foo");
1503     }
1504 
1505     /**
1506     Spawns a PAIR socket that publishes socket state changes (_events) over
1507     the INPROC transport to the given _endpoint.
1508 
1509     Which event types should be published may be selected by bitwise-ORing
1510     together different $(REF EventType) flags in the $(D events) parameter.
1511 
1512     Throws:
1513         $(REF ZmqException) if $(ZMQ) reports an error.
1514     Corresponds_to:
1515         $(ZMQREF zmq_socket_monitor())
1516     See_also:
1517         $(FREF receiveEvent), which receives and parses event messages.
1518     */
1519     void monitor(const char[] endpoint, EventType events = EventType.all)
1520     {
1521         if (trusted!zmq_socket_monitor(m_socket, zeroTermString(endpoint), events) < 0) {
1522             throw new ZmqException;
1523         }
1524     }
1525 
1526     ///
1527     unittest
1528     {
1529         auto sck = Socket(SocketType.pub);
1530         sck.monitor("inproc://zmqd_monitor_unittest",
1531                     EventType.accepted | EventType.closed);
1532     }
1533 
1534     /**
1535     The $(D void*) pointer used by the underlying C API to refer to the socket.
1536 
1537     If the object has not been initialized, this function returns $(D null).
1538     */
1539     @property inout(void)* handle() inout pure nothrow
1540     {
1541         return m_socket;
1542     }
1543 
1544     /**
1545     Whether this $(REF Socket) object has been _initialized, i.e. whether it
1546     refers to a valid $(ZMQ) socket.
1547     */
1548     @property bool initialized() const pure nothrow
1549     {
1550         return m_socket != null;
1551     }
1552 
1553     ///
1554     unittest
1555     {
1556         Socket sck;
1557         assert (!sck.initialized);
1558         sck = Socket(SocketType.sub);
1559         assert (sck.initialized);
1560         sck.close();
1561         assert (!sck.initialized);
1562     }
1563 
1564 private:
1565     // Helper function for ~this() and close()
1566     bool nothrowClose() nothrow
1567     {
1568         if (m_socket != null) {
1569             if (trusted!zmq_close(m_socket) != 0) return false;
1570             m_socket = null;
1571         }
1572         return true;
1573     }
1574 
1575     import std.traits;
1576 
1577     T getOption(T)(int option) @trusted
1578         if (isScalarType!T)
1579     {
1580         T buf;
1581         auto len = T.sizeof;
1582         if (zmq_getsockopt(m_socket, option, &buf, &len) != 0) {
1583             throw new ZmqException;
1584         }
1585         assert(len == T.sizeof);
1586         return buf;
1587     }
1588 
1589     void setOption(T)(int option, T value) @trusted
1590         if (isScalarType!T)
1591     {
1592         if (zmq_setsockopt(m_socket, option, &value, value.sizeof) != 0) {
1593             throw new ZmqException;
1594         }
1595     }
1596 
1597     T[] getArrayOption(T)(int option, T[] buf) @trusted
1598         if (isScalarType!T)
1599     {
1600         static assert (T.sizeof == 1);
1601         auto len = buf.length;
1602         if (zmq_getsockopt(m_socket, option, buf.ptr, &len) != 0) {
1603             throw new ZmqException;
1604         }
1605         return buf[0 .. len];
1606     }
1607 
1608     void setArrayOption()(int option, const void[] value)
1609     {
1610         if (trusted!zmq_setsockopt(m_socket, option, ptr(value), value.length) != 0) {
1611             throw new ZmqException;
1612         }
1613     }
1614 
1615     char[] getCStringOption(int option, char[] buf)
1616     {
1617         auto ret = getArrayOption(option, buf);
1618         assert (ret.length && ret[$-1] == '\0');
1619         return ret[0 .. $-1];
1620     }
1621 
1622     enum : size_t
1623     {
1624         keySizeBin    = 32,
1625         keyBufSizeBin = keySizeBin,
1626         keySizeZ85    = 40,
1627         keyBufSizeZ85 = keySizeZ85 + 1,
1628     }
1629 
1630     ubyte[] getCurveKey(int option, ubyte[] buf)
1631     {
1632         if (buf.length < keyBufSizeBin) {
1633             import core.exception: RangeError;
1634             throw new RangeError;
1635         }
1636         return getArrayOption(option, buf[0 .. keyBufSizeBin]);
1637     }
1638 
1639     char[] getCurveKeyZ85(int option, char[] buf)
1640     {
1641         if (buf.length < keyBufSizeZ85) {
1642             import core.exception: RangeError;
1643             throw new RangeError;
1644         }
1645         return getCStringOption(option, buf[0 .. keyBufSizeZ85]);
1646     }
1647 
1648     void setCurveKey(int option, const ubyte[] value)
1649     {
1650         if (value.length != keySizeBin) throw new Exception("Invalid key size");
1651         setArrayOption(option, value);
1652     }
1653 
1654     void setCurveKeyZ85(int option, const char[] value)
1655     {
1656         if (value.length != keySizeZ85) throw new Exception("Invalid key size");
1657         setArrayOption(option, value);
1658     }
1659 
1660     Context m_context;
1661     SocketType m_type;
1662     void* m_socket;
1663 }
1664 
1665 unittest
1666 {
1667     auto s1 = Socket(SocketType.pair);
1668     auto s2 = Socket(SocketType.pair);
1669     s1.bind("inproc://unittest");
1670     s2.connect("inproc://unittest");
1671     s1.send("Hello World!");
1672     ubyte[12] buf;
1673     const len = s2.receive(buf[]);
1674     assert (len == 12);
1675     assert (buf == "Hello World!");
1676 }
1677 
1678 
1679 version (Windows) {
1680     alias PlatformFD = SOCKET;
1681 } else version (Posix) {
1682     alias PlatformFD = int;
1683 }
1684 
1685 /**
1686 The native socket file descriptor type.
1687 
1688 This is an alias for $(D SOCKET) on Windows and $(D int) on POSIX systems.
1689 */
1690 alias FD = PlatformFD;
1691 
1692 
1693 /**
1694 Starts the built-in $(ZMQ) _proxy.
1695 
1696 This function never returns normally, but it may throw an exception.  This could
1697 happen if the context associated with either of the specified sockets is
1698 manually destroyed in a different thread.
1699 
1700 Throws:
1701     $(REF ZmqException) if $(ZMQ) reports an error.
1702 Corresponds_to:
1703     $(ZMQREF zmq_proxy())
1704 See_Also:
1705     $(FREF steerableProxy)
1706 */
1707 void proxy(ref Socket frontend, ref Socket backend)
1708 {
1709     const rc = trusted!zmq_proxy(frontend.handle, backend.handle, null);
1710     assert (rc == -1);
1711     throw new ZmqException;
1712 }
1713 
1714 /// ditto
1715 void proxy(ref Socket frontend, ref Socket backend, ref Socket capture)
1716 {
1717     const rc = trusted!zmq_proxy(frontend.handle, backend.handle, capture.handle);
1718     assert (rc == -1);
1719     throw new ZmqException;
1720 }
1721 
1722 
1723 /**
1724 Starts the built-in $(ZMQ) proxy with _control flow.
1725 
1726 Note that the order of the two last parameters is reversed compared to
1727 $(ZMQREF zmq_proxy_steerable()).  That is, the $(D control) socket always
1728 comes before the $(D capture) socket.  Furthermore, unlike in $(ZMQ),
1729 $(D control) is mandatory.  (Without the _control socket one can simply
1730 use $(FREF proxy).)
1731 
1732 Throws:
1733     $(REF ZmqException) if $(ZMQ) reports an error.
1734 Corresponds_to:
1735     $(ZMQREF zmq_proxy_steerable())
1736 See_Also:
1737     $(FREF proxy)
1738 */
1739 void steerableProxy(ref Socket frontend, ref Socket backend, ref Socket control)
1740 {
1741     const rc = trusted!zmq_proxy_steerable(
1742         frontend.handle, backend.handle, null, control.handle);
1743     if (rc == -1) throw new ZmqException;
1744 }
1745 
1746 /// ditto
1747 void steerableProxy(ref Socket frontend, ref Socket backend, ref Socket control, ref Socket capture)
1748 {
1749     const rc = trusted!zmq_proxy_steerable(
1750         frontend.handle, backend.handle, capture.handle, control.handle);
1751     if (rc == -1) throw new ZmqException;
1752 }
1753 
1754 @system unittest
1755 {
1756     import core.thread;
1757     auto t = new Thread(() {
1758         auto frontend = Socket(SocketType.router);
1759         frontend.bind("inproc://zmqd_steerableProxy_unittest_fe");
1760         auto backend  = Socket(SocketType.dealer);
1761         backend.bind("inproc://zmqd_steerableProxy_unittest_be");
1762         auto controllee = Socket(SocketType.pair);
1763         controllee.bind("inproc://zmqd_steerableProxy_unittest_ctl");
1764         steerableProxy(frontend, backend, controllee);
1765     });
1766     t.start();
1767     auto client = Socket(SocketType.req);
1768     client.connect("inproc://zmqd_steerableProxy_unittest_fe");
1769     auto server  = Socket(SocketType.rep);
1770     server.connect("inproc://zmqd_steerableProxy_unittest_be");
1771     auto controller = Socket(SocketType.pair);
1772     controller.connect("inproc://zmqd_steerableProxy_unittest_ctl");
1773 
1774     auto cf = Frame(1);
1775     cf.data[0] = 86;
1776     client.send(cf);
1777     auto sf = Frame();
1778     server.receive(sf);
1779     assert(sf.size == 1 && sf.data[0] == 86);
1780     sf.data[0] = 87;
1781     server.send(sf);
1782     client.receive(cf);
1783     assert(cf.size == 1 && cf.data[0] == 87);
1784 
1785     controller.send("TERMINATE");
1786     t.join();
1787 }
1788 
1789 @system unittest
1790 {
1791     import core.thread;
1792     auto t = new Thread(() {
1793         auto frontend = Socket(SocketType.pull);
1794         frontend.bind("inproc://zmqd_steerableProxy2_unittest_fe");
1795         auto backend  = Socket(SocketType.push);
1796         backend.bind("inproc://zmqd_steerableProxy2_unittest_be");
1797         auto controllee = Socket(SocketType.pair);
1798         controllee.bind("inproc://zmqd_steerableProxy2_unittest_ctl");
1799         auto capture = Socket(SocketType.push);
1800         capture.bind("inproc://zmqd_steerableProxy2_unittest_cpt");
1801         steerableProxy(frontend, backend, controllee, capture);
1802     });
1803     t.start();
1804     auto client = Socket(SocketType.push);
1805     client.connect("inproc://zmqd_steerableProxy2_unittest_fe");
1806     auto server  = Socket(SocketType.pull);
1807     server.connect("inproc://zmqd_steerableProxy2_unittest_be");
1808     auto controller = Socket(SocketType.pair);
1809     controller.connect("inproc://zmqd_steerableProxy2_unittest_ctl");
1810     auto capturer = Socket(SocketType.pull);
1811     capturer.connect("inproc://zmqd_steerableProxy2_unittest_cpt");
1812 
1813     auto cf = Frame(1);
1814     cf.data[0] = 86;
1815     client.send(cf);
1816     auto sf = Frame();
1817     server.receive(sf);
1818     assert(sf.size == 1 && sf.data[0] == 86);
1819     auto pf = Frame();
1820     capturer.receive(pf);
1821     assert(pf.size == 1 && pf.data[0] == 86);
1822 
1823     controller.send("TERMINATE");
1824     t.join();
1825 }
1826 
1827 
1828 deprecated("zmqd.poll() has a new signature as of v0.4")
1829 uint poll(zmq_pollitem_t[] items, Duration timeout = infiniteDuration)
1830 {
1831     import std.conv: to;
1832     const n = trusted!zmq_poll(
1833         ptr(items),
1834         to!int(items.length),
1835         timeout == infiniteDuration ? -1 : to!int(timeout.total!"msecs"()));
1836     if (n < 0) throw new ZmqException;
1837     return cast(uint) n;
1838 }
1839 
1840 
1841 /**
1842 Input/output multiplexing.
1843 
1844 The $(D timeout) parameter may have the special value $(REF infiniteDuration)
1845 which means no _timeout.  This is translated to an argument value of -1 in the
1846 C API.
1847 
1848 Returns:
1849     The number of $(REF PollItem) structures with events signalled in
1850     $(REF PollItem.returnedEvents), or 0 if no events have been signalled.
1851 Throws:
1852     $(REF ZmqException) if $(ZMQ) reports an error.
1853 Corresponds_to:
1854     $(ZMQREF zmq_poll())
1855 */
1856 uint poll(PollItem[] items, Duration timeout = infiniteDuration) @trusted
1857 {
1858     // Here we use a trick where we pretend the array of PollItems is
1859     // actually an array of zmq_pollitem_t, to avoid an unnecessary
1860     // allocation.  For this to work, PollItem must have the exact
1861     // same size as zmq_pollitem_t.
1862     static assert (PollItem.sizeof == zmq_pollitem_t.sizeof);
1863 
1864     import std.conv: to;
1865     const n = zmq_poll(
1866         cast(zmq_pollitem_t*) items.ptr,
1867         to!int(items.length),
1868         timeout == infiniteDuration ? -1 : to!int(timeout.total!"msecs"()));
1869     if (n < 0) throw new ZmqException;
1870     return cast(uint) n;
1871 }
1872 
1873 
1874 ///
1875 @system unittest
1876 {
1877     auto socket1 = zmqd.Socket(zmqd.SocketType.pull);
1878     socket1.bind("inproc://zmqd_poll_example");
1879 
1880     import std.socket;
1881     auto socket2 = new std.socket.Socket(
1882         AddressFamily.INET,
1883         std.socket.SocketType.DGRAM);
1884     socket2.bind(new InternetAddress(InternetAddress.ADDR_ANY, 5678));
1885 
1886     auto socket3 = zmqd.Socket(zmqd.SocketType.push);
1887     socket3.connect("inproc://zmqd_poll_example");
1888     socket3.send("test");
1889 
1890     import core.thread: Thread;
1891     Thread.sleep(10.msecs);
1892 
1893     auto items = [
1894         PollItem(socket1, PollFlags.pollIn),
1895         PollItem(socket2, PollFlags.pollIn | PollFlags.pollOut),
1896         PollItem(socket3, PollFlags.pollIn),
1897     ];
1898 
1899     const n = poll(items, 100.msecs);
1900     assert (n == 2);
1901     assert (items[0].returnedEvents == PollFlags.pollIn);
1902     assert (items[1].returnedEvents == PollFlags.pollOut);
1903     assert (items[2].returnedEvents == 0);
1904     socket2.close();
1905 }
1906 
1907 
1908 /**
1909 $(FREF poll) event flags.
1910 
1911 These are described in the $(ZMQREF zmq_poll()) manual.
1912 */
1913 enum PollFlags
1914 {
1915     pollIn = ZMQ_POLLIN,    /// Corresponds to $(D ZMQ_POLLIN)
1916     pollOut = ZMQ_POLLOUT,  /// Corresponds to $(D ZMQ_POLLOUT)
1917     pollErr = ZMQ_POLLERR,  /// Corresponds to $(D ZMQ_POLLERR)
1918 }
1919 
1920 
1921 /++
1922 A structure that specifies a socket to be monitored by $(FREF poll) as well
1923 as the events to poll for, and, when $(FREF poll) returns, the events that
1924 occurred.
1925 
1926 Warning:
1927     $(D PollItem) objects do not store $(STDREF socket,Socket) references,
1928     only the corresponding native file descriptors.  This means that the
1929     references have to be stored elsewhere, or the objects may be garbage
1930     collected, invalidating the sockets before or while $(FREF poll) executes.
1931     ---
1932     // Not OK
1933     auto p1 = PollItem(new std.socket.Socket(/*...*/), PollFlags.pollIn);
1934 
1935     // OK
1936     auto s = new std.socket.Socket(/*...*/);
1937     auto p2 = PollItem(s, PollFlags.pollIn);
1938     ---
1939 Corresponds_to:
1940     $(D $(ZMQAPI zmq_poll,zmq_pollitem_t))
1941 +/
1942 struct PollItem
1943 {
1944     /// Constructs a $(REF PollItem) for monitoring a $(ZMQ) socket.
1945     this(ref zmqd.Socket socket, PollFlags events) nothrow
1946     {
1947         m_pollItem = zmq_pollitem_t(socket.handle, 0, cast(short) events, 0);
1948     }
1949 
1950     import std.socket;
1951     /**
1952     Constructs a $(REF PollItem) for monitoring a standard socket referenced
1953     by a $(STDREF socket,Socket).
1954     */
1955     this(std.socket.Socket socket, PollFlags events) @system
1956     {
1957         this(socket.handle, events);
1958     }
1959 
1960     /**
1961     Constructs a $(REF PollItem) for monitoring a standard socket referenced
1962     by a native file descriptor.
1963     */
1964     this(FD fd, PollFlags events) pure nothrow
1965     {
1966         m_pollItem = zmq_pollitem_t(null, fd, cast(short) events, 0);
1967     }
1968 
1969     /**
1970     Requested _events.
1971 
1972     Corresponds_to:
1973         $(D $(ZMQAPI zmq_poll,zmq_pollitem_t.events))
1974     */
1975     @property void requestedEvents(PollFlags events) pure nothrow
1976     {
1977         m_pollItem.events = cast(short) events;
1978     }
1979 
1980     /// ditto
1981     @property PollFlags requestedEvents() const pure nothrow
1982     {
1983         return cast(typeof(return)) m_pollItem.events;
1984     }
1985 
1986     /**
1987     Returned events.
1988 
1989     Corresponds_to:
1990         $(D $(ZMQAPI zmq_poll,zmq_pollitem_t.revents))
1991     */
1992     @property PollFlags returnedEvents() const pure nothrow
1993     {
1994         return cast(typeof(return)) m_pollItem.revents;
1995     }
1996 
1997 private:
1998     zmq_pollitem_t m_pollItem;
1999 }
2000 
2001 
2002 /**
2003 An object that encapsulates a $(ZMQ) message frame.
2004 
2005 This $(D struct) is a wrapper around a $(D zmq_msg_t) object.
2006 A default-initialized $(D Frame) is not a valid $(ZMQ) message frame; it
2007 should always be explicitly initialized upon construction using
2008 $(FREF _Frame.opCall).  Alternatively, it may be initialized later with
2009 $(FREF _Frame.rebuild).
2010 ---
2011 Frame msg1;                 // Invalid frame
2012 auto msg2 = Frame();        // Empty frame
2013 auto msg3 = Frame(1024);    // 1K frame
2014 msg1.rebuild(2048);         // msg1 now has size 2K
2015 msg2.rebuild(2048);         // ...and so does msg2
2016 ---
2017 When a $(D Frame) object is destroyed, $(ZMQREF zmq_msg_close()) is
2018 called on the underlying $(D zmq_msg_t).
2019 
2020 A $(D Frame) cannot be copied by normal assignment; use $(FREF _Frame.copy)
2021 for this.
2022 */
2023 struct Frame
2024 {
2025 @safe:
2026     /**
2027     Initializes an empty $(ZMQ) message frame.
2028 
2029     Throws:
2030         $(REF ZmqException) if $(ZMQ) reports an error.
2031     Corresponds_to:
2032         $(ZMQREF zmq_msg_init())
2033     */
2034     static Frame opCall()
2035     {
2036         Frame f;
2037         f.init();
2038         return f;
2039     }
2040 
2041     ///
2042     unittest
2043     {
2044         auto msg = Frame();
2045         assert(msg.size == 0);
2046     }
2047 
2048     /** $(ANCHOR Frame.opCall_size)
2049     Initializes a $(ZMQ) message frame of the specified _size.
2050 
2051     Throws:
2052         $(REF ZmqException) if $(ZMQ) reports an error.
2053     Corresponds_to:
2054         $(ZMQREF zmq_msg_init_size())
2055     */
2056     static Frame opCall(size_t size)
2057     {
2058         Frame m;
2059         m.init(size);
2060         return m;
2061     }
2062 
2063     ///
2064     unittest
2065     {
2066         auto msg = Frame(123);
2067         assert(msg.size == 123);
2068     }
2069 
2070     /** $(ANCHOR Frame.opCall_data)
2071     Initializes a $(ZMQ) message frame from a supplied buffer.
2072 
2073     If $(D free) is not specified, $(D data) $(EM must) refer to a slice of
2074     memory which has been allocated by the garbage collector (typically using
2075     operator $(D new)).  Ownership will then be transferred temporarily to
2076     $(ZMQ), and then transferred back to the GC when $(ZMQ) is done using the
2077     buffer.
2078 
2079     For memory which is $(EM not) garbage collected, the argument $(D free)
2080     must be a pointer to a function that will release the memory, which will
2081     be called when $(ZMQ) no longer needs the buffer.  $(D free) must point to
2082     a function with the following signature:
2083     ---
2084     extern(C) void f(void* d, void* h) nothrow;
2085     ---
2086     When it is called, $(D d) will be equal to $(D data.ptr), while $(D h) will
2087     be equal to $(D hint) (which is optional and null by default).
2088 
2089     $(D free) is passed directly to the underlying $(ZMQ) C function, which is
2090     why it needs to be $(D extern(C)).
2091 
2092     Warning:
2093         Some care must be taken when using this function, as there is no telling
2094         when, whether, or in which thread $(ZMQ) relinquishes ownership of the
2095         buffer.  Client code should therefore avoid retaining any references to
2096         it, including slices that contain, overlap with or are contained in
2097         $(D data).  For the "non-GC" version, client code should in general not
2098         retain slices or pointers to any memory which will be released when
2099         $(D free) is called.
2100     See_Also:
2101         $(REF Frame.FreeData)
2102     Throws:
2103         $(REF ZmqException) if $(ZMQ) reports an error.
2104     Corresponds_to:
2105         $(ZMQREF zmq_msg_init_data())
2106     */
2107     static Frame opCall(ubyte[] data) @system
2108     {
2109         Frame m;
2110         m.init(data);
2111         return m;
2112     }
2113 
2114     /// ditto
2115     static Frame opCall(ubyte[] data, FreeData free, void* hint = null) @system
2116     {
2117         Frame m;
2118         m.init(data, free, hint);
2119         return m;
2120     }
2121 
2122     ///
2123     @system unittest
2124     {
2125         // Garbage-collected memory
2126         auto buf = new ubyte[123];
2127         auto msg = Frame(buf);
2128         assert(msg.size == buf.length);
2129         assert(msg.data.ptr == buf.ptr);
2130     }
2131 
2132     ///
2133     @system unittest
2134     {
2135         // Manually managed memory
2136         import core.stdc.stdlib: malloc, free;
2137         static extern(C) void myFree(void* data, void* hint) nothrow { free(data); }
2138         auto buf = (cast(ubyte*) malloc(10))[0 .. 10];
2139         auto msg = Frame(buf, &myFree);
2140         assert(msg.size == buf.length);
2141         assert(msg.data.ptr == buf.ptr);
2142     }
2143 
2144     @system unittest
2145     {
2146         // Non-documentation unittest.  Let's see if the data *actually* gets freed.
2147         ubyte buffer = 81;
2148         bool released = false;
2149         static extern(C) void myFree(void* data, void* hint) nothrow
2150         {
2151             auto typedData = cast(ubyte*) data;
2152             auto typedHint = cast(bool*) hint;
2153             assert(*typedData == 81);
2154             assert(!*typedHint);
2155             *typedData = 0;
2156             *typedHint = true;
2157         }
2158         // New scope, so the frame gets released at the end.
2159         {
2160             auto frame = Frame((&buffer)[0 .. 1], &myFree, &released);
2161             assert(frame.size == 1);
2162             assert(frame.data.ptr == &buffer);
2163             assert(buffer == 81);
2164             assert(!released);
2165         }
2166         assert(buffer == 0);
2167         assert(released);
2168     }
2169 
2170     /**
2171     The function pointer type for memory-freeing callback functions passed to
2172     $(D $(LINK2 #Frame.opCall_data,Frame(ubyte[], _FreeData, void*))).
2173     */
2174     @system alias extern(C) void function(void*, void*) nothrow FreeData;
2175 
2176     /**
2177     Reinitializes the Frame object as an empty message.
2178 
2179     This function will first call $(FREF Frame.close) to release the
2180     resources associated with the message frame, and then it will
2181     initialize it anew, exactly as if it were constructed  with
2182     $(D $(LINK2 #Frame.opCall,Frame())).
2183 
2184     Throws:
2185         $(REF ZmqException) if $(ZMQ) reports an error.
2186     Corresponds_to:
2187         $(ZMQREF zmq_msg_close()) followed by $(ZMQREF zmq_msg_init())
2188     */
2189     void rebuild()
2190     {
2191         close();
2192         init();
2193     }
2194 
2195     ///
2196     unittest
2197     {
2198         auto msg = Frame(256);
2199         assert (msg.size == 256);
2200         msg.rebuild();
2201         assert (msg.size == 0);
2202     }
2203 
2204     /**
2205     Reinitializes the Frame object to a specified size.
2206 
2207     This function will first call $(FREF Frame.close) to release the
2208     resources associated with the message frame, and then it will
2209     initialize it anew, exactly as if it were constructed  with
2210     $(D $(LINK2 #Frame.opCall_size,Frame(size))).
2211 
2212     Throws:
2213         $(REF ZmqException) if $(ZMQ) reports an error.
2214     Corresponds_to:
2215         $(ZMQREF zmq_msg_close()) followed by $(ZMQREF zmq_msg_init_size()).
2216     */
2217     void rebuild(size_t size)
2218     {
2219         close();
2220         init(size);
2221     }
2222 
2223     ///
2224     unittest
2225     {
2226         auto msg = Frame(256);
2227         assert (msg.size == 256);
2228         msg.rebuild(1024);
2229         assert (msg.size == 1024);
2230     }
2231 
2232     /**
2233     Reinitializes the Frame object from a supplied buffer.
2234 
2235     This function will first call $(FREF Frame.close) to release the
2236     resources associated with the message frame, and then it will
2237     initialize it anew, exactly as if it were constructed  with
2238     $(D Frame(data)) or $(D Frame(data, free, hint)).
2239 
2240     Some care must be taken when using these functions.  Please read the
2241     $(D $(LINK2 #Frame.opCall_data,Frame(ubyte[]))) documentation.
2242 
2243     Throws:
2244         $(REF ZmqException) if $(ZMQ) reports an error.
2245     Corresponds_to:
2246         $(ZMQREF zmq_msg_close()) followed by $(ZMQREF zmq_msg_init_data()).
2247     */
2248     void rebuild(ubyte[] data) @system
2249     {
2250         close();
2251         init(data);
2252     }
2253 
2254     /// ditto
2255     void rebuild(ubyte[] data, FreeData free, void* hint = null) @system
2256     {
2257         close();
2258         init(data, free, hint);
2259     }
2260 
2261     ///
2262     @system unittest
2263     {
2264         // Garbage-collected memory
2265         auto msg = Frame(256);
2266         assert (msg.size == 256);
2267         auto buf = new ubyte[123];
2268         msg.rebuild(buf);
2269         assert(msg.size == buf.length);
2270         assert(msg.data.ptr == buf.ptr);
2271     }
2272 
2273     ///
2274     @system unittest
2275     {
2276         // Manually managed memory
2277         import core.stdc.stdlib: malloc, free;
2278         static extern(C) void myFree(void* data, void* hint) nothrow { free(data); }
2279 
2280         auto msg = Frame(256);
2281         assert (msg.size == 256);
2282         auto buf = (cast(ubyte*) malloc(10))[0 .. 10];
2283         msg.rebuild(buf, &myFree);
2284         assert(msg.size == buf.length);
2285         assert(msg.data.ptr == buf.ptr);
2286     }
2287 
2288     @disable this(this);
2289 
2290     /**
2291     Releases the $(ZMQ) message frame when the $(D Frame) is destroyed.
2292 
2293     This destructor never throws, which means that any errors will go
2294     undetected.  If this is undesirable, call $(FREF Frame.close) before
2295     the $(D Frame) is destroyed.
2296 
2297     Corresponds_to:
2298         $(ZMQREF zmq_msg_close())
2299     */
2300     ~this() nothrow
2301     {
2302         if (m_initialized) {
2303             immutable rc = trusted!zmq_msg_close(&m_msg);
2304             assert(rc == 0, "zmq_msg_close failed: Invalid message frame");
2305         }
2306     }
2307 
2308     /**
2309     Releases the $(ZMQ) message frame.
2310 
2311     Note that the frame will be automatically released when the $(D Frame)
2312     object is destroyed, so it is often not necessary to call this method
2313     manually.
2314 
2315     Throws:
2316         $(REF ZmqException) if $(ZMQ) reports an error.
2317     Corresponds_to:
2318         $(ZMQREF zmq_msg_close())
2319     */
2320     void close()
2321     {
2322         if (m_initialized) {
2323             if (trusted!zmq_msg_close(&m_msg) != 0) {
2324                 throw new ZmqException;
2325             }
2326             m_initialized = false;
2327         }
2328     }
2329 
2330     /**
2331     Copies frame content to another message frame.
2332 
2333     $(D copy()) returns a new $(D Frame) object, while $(D copyTo(dest))
2334     copies the contents of this $(D Frame) into $(D dest).  $(D dest) must
2335     be a valid (i.e. initialized) $(D Frame).
2336 
2337     Warning:
2338         These functions may not do what you think they do.  Please refer
2339         to $(ZMQAPI zmq_msg_copy(),the $(ZMQ) manual) for details.
2340     Throws:
2341         $(REF ZmqException) if $(ZMQ) reports an error.
2342     Corresponds_to:
2343         $(ZMQREF zmq_msg_copy())
2344     */
2345     Frame copy()
2346         in(m_initialized)
2347     {
2348         auto cp = Frame();
2349         copyTo(cp);
2350         return cp;
2351     }
2352 
2353     /// ditto
2354     void copyTo(ref Frame dest)
2355         in(m_initialized)
2356     {
2357         if (trusted!zmq_msg_copy(&dest.m_msg, &m_msg) != 0) {
2358             throw new ZmqException;
2359         }
2360     }
2361 
2362     ///
2363     unittest
2364     {
2365         import std.string: representation;
2366         auto msg1 = Frame(3);
2367         msg1.data[] = "foo".representation;
2368         auto msg2 = msg1.copy();
2369         assert (msg2.data.asString() == "foo");
2370     }
2371 
2372     /**
2373     Moves frame content to another message frame.
2374 
2375     $(D move()) returns a new $(D Frame) object, while $(D moveTo(dest))
2376     moves the contents of this $(D Frame) to $(D dest).  $(D dest) must
2377     be a valid (i.e. initialized) $(D Frame).
2378 
2379     Throws:
2380         $(REF ZmqException) if $(ZMQ) reports an error.
2381     Corresponds_to:
2382         $(ZMQREF zmq_msg_move())
2383     */
2384     Frame move()
2385         in(m_initialized)
2386     {
2387         auto m = Frame();
2388         moveTo(m);
2389         return m;
2390     }
2391 
2392     /// ditto
2393     void moveTo(ref Frame dest)
2394         in(m_initialized)
2395     {
2396         if (trusted!zmq_msg_move(&dest.m_msg, &m_msg) != 0) {
2397             throw new ZmqException;
2398         }
2399     }
2400 
2401     ///
2402     unittest
2403     {
2404         import std.string: representation;
2405         auto msg1 = Frame(3);
2406         msg1.data[] = "foo".representation;
2407         auto msg2 = msg1.move();
2408         assert (msg1.size == 0);
2409         assert (msg2.data.asString() == "foo");
2410     }
2411 
2412     /**
2413     The message frame content _size in bytes.
2414 
2415     Corresponds_to:
2416         $(ZMQREF zmq_msg_size())
2417     */
2418     @property size_t size() nothrow
2419         in(m_initialized)
2420     {
2421         return trusted!zmq_msg_size(&m_msg);
2422     }
2423 
2424     ///
2425     unittest
2426     {
2427         auto msg = Frame(123);
2428         assert(msg.size == 123);
2429     }
2430 
2431     /**
2432     Retrieves the message frame content.
2433 
2434     Corresponds_to:
2435         $(ZMQREF zmq_msg_data())
2436     */
2437     @property ubyte[] data() @trusted nothrow
2438         in(m_initialized)
2439     {
2440         return (cast(ubyte*) zmq_msg_data(&m_msg))[0 .. size];
2441     }
2442 
2443     ///
2444     unittest
2445     {
2446         import std.string: representation;
2447         auto msg = Frame(3);
2448         assert(msg.data.length == 3);
2449         msg.data[] = "foo".representation; // Slice operator -> array copy.
2450         assert(msg.data.asString() == "foo");
2451     }
2452 
2453     /**
2454     Whether there are _more message frames to retrieve.
2455 
2456     Corresponds_to:
2457         $(ZMQREF zmq_msg_more())
2458     */
2459     @property bool more() nothrow
2460         in(m_initialized)
2461     {
2462         return !!trusted!zmq_msg_more(&m_msg);
2463     }
2464 
2465     /**
2466     The file descriptor of the socket the message was read from.
2467 
2468     Throws:
2469         $(REF ZmqException) if $(ZMQ) reports an error.
2470     Corresponds_to:
2471         $(ZMQREF zmq_msg_get()) with $(D ZMQ_SRCFD).
2472     */
2473     @property FD sourceFD()
2474     {
2475         return cast(FD) getProperty(ZMQ_SRCFD);
2476     }
2477 
2478     /**
2479     Whether the message MAY share underlying storage with another copy.
2480 
2481     Throws:
2482         $(REF ZmqException) if $(ZMQ) reports an error.
2483     Corresponds_to:
2484         $(ZMQREF zmq_msg_get()) with $(D ZMQ_SHARED).
2485     */
2486     @property bool sharedStorage()
2487     {
2488         return !!getProperty(ZMQ_SHARED);
2489     }
2490 
2491     unittest
2492     {
2493         import std.string: representation;
2494         auto msg1 = Frame(100); // Big message to avoid small-string optimisation
2495         msg1.data[0 .. 3] = "foo".representation;
2496         assert (!msg1.sharedStorage);
2497         auto msg2 = msg1.copy();
2498         assert (msg2.sharedStorage); // Warning: The ZMQ docs only say that this MAY be true
2499         assert (msg2.data[0 .. 3].asString() == "foo");
2500     }
2501 
2502     /**
2503     Gets message _metadata.
2504 
2505     $(D metadataUnsafe()) is faster than $(D metadata()) because it
2506     directly returns the array which comes from $(ZMQREF zmq_msg_gets()),
2507     whereas the latter returns a freshly GC-allocated copy of it.  However,
2508     the array returned by $(D metadataUnsafe()) is owned by the underlying
2509     $(ZMQ) message and gets destroyed along with it, so care must be
2510     taken when using this function.
2511 
2512     Throws:
2513         $(REF ZmqException) if $(ZMQ) reports an error.
2514     Corresponds_to:
2515         $(ZMQREF zmq_msg_gets())
2516     */
2517     char[] metadata(const char[] property) @trusted
2518         in(m_initialized)
2519     {
2520         return metadataUnsafe(property).dup;
2521     }
2522 
2523     /// ditto
2524     const(char)[] metadataUnsafe(const char[] property) @system
2525         in(m_initialized)
2526     {
2527         if (auto value = zmq_msg_gets(handle, zeroTermString(property))) {
2528             import std.string: fromStringz;
2529             return fromStringz(value);
2530         } else {
2531             throw new ZmqException();
2532         }
2533     }
2534 
2535     /**
2536     A pointer to the underlying $(D zmq_msg_t).
2537     */
2538     @property inout(zmq_msg_t)* handle() inout pure nothrow
2539     {
2540         return &m_msg;
2541     }
2542 
2543 private:
2544     private void init()
2545         in(!m_initialized)
2546         out(; m_initialized)
2547     {
2548         if (trusted!zmq_msg_init(&m_msg) != 0) {
2549             throw new ZmqException;
2550         }
2551         m_initialized = true;
2552     }
2553 
2554     private void init(size_t size)
2555         in(!m_initialized)
2556         out(; m_initialized)
2557     {
2558         if (trusted!zmq_msg_init_size(&m_msg, size) != 0) {
2559             throw new ZmqException;
2560         }
2561         m_initialized = true;
2562     }
2563 
2564     private void init(ubyte[] data) @system
2565         in(!m_initialized)
2566         out(; m_initialized)
2567     {
2568         import core.memory;
2569         static extern(C) void zmqd_Frame_init_gcFree(void* dataPtr, void* block) nothrow
2570         {
2571             GC.removeRoot(dataPtr);
2572             GC.clrAttr(block, GC.BlkAttr.NO_MOVE);
2573         }
2574 
2575         GC.addRoot(data.ptr);
2576         scope(failure) GC.removeRoot(data.ptr);
2577 
2578         auto block = GC.addrOf(data.ptr);
2579         immutable movable = block && !(GC.getAttr(block) & GC.BlkAttr.NO_MOVE);
2580         GC.setAttr(block, GC.BlkAttr.NO_MOVE);
2581         scope(failure) if (movable) GC.clrAttr(block, GC.BlkAttr.NO_MOVE);
2582 
2583         init(data, &zmqd_Frame_init_gcFree, block);
2584     }
2585 
2586     void init(ubyte[] data, FreeData free, void* hint) @system
2587         in(!m_initialized)
2588         out(; m_initialized)
2589     {
2590         if (zmq_msg_init_data(&m_msg, data.ptr, data.length, free, hint) != 0) {
2591             throw new ZmqException;
2592         }
2593         m_initialized = true;
2594     }
2595 
2596     int getProperty(int property) @trusted
2597     {
2598         const value = zmq_msg_get(&m_msg, property);
2599         if (value == -1) {
2600             throw new ZmqException;
2601         }
2602         return value;
2603     }
2604 
2605     bool m_initialized;
2606     zmq_msg_t m_msg;
2607 }
2608 
2609 unittest
2610 {
2611     const url = uniqueUrl("inproc");
2612     auto s1 = Socket(SocketType.pair);
2613     auto s2 = Socket(SocketType.pair);
2614     s1.bind(url);
2615     s2.connect(url);
2616 
2617     auto m1a = Frame(123);
2618     m1a.data[] = 'a';
2619     s1.send(m1a);
2620     auto m2a = Frame();
2621     s2.receive(m2a);
2622     assert(m2a.size == 123);
2623     foreach (e; m2a.data) assert(e == 'a');
2624 
2625     auto m1b = Frame(10);
2626     m1b.data[] = 'b';
2627     s1.send(m1b);
2628     auto m2b = Frame();
2629     s2.receive(m2b);
2630     assert(m2b.size == 10);
2631     foreach (e; m2b.data) assert(e == 'b');
2632 }
2633 
2634 deprecated("zmqd.Message has been renamed to zmqd.Frame") alias Message = Frame;
2635 
2636 
2637 /**
2638 A global context which is used by default by all sockets, unless they are
2639 explicitly constructed with a different context.
2640 
2641 The $(ZMQ) Guide $(LINK2 http://zguide.zeromq.org/page:all#Getting-the-Context-Right,
2642 has the following to say) about context creation:
2643 $(QUOTE
2644     You should create and use exactly one context in your process.
2645     [$(LDOTS)] If at runtime a process has two contexts, these are
2646     like separate $(ZMQ) instances. If that's explicitly what you
2647     want, OK, but otherwise remember: $(EM Do one $(D zmq_ctx_new())
2648     at the start of your main line code, and one $(D zmq_ctx_destroy())
2649     at the end.)
2650 )
2651 By using $(D defaultContext()), this is exactly what you achieve.  The
2652 context is created the first time the function is called, and is
2653 automatically destroyed when the program ends.
2654 
2655 This function is thread safe.
2656 
2657 Throws:
2658     $(REF ZmqException) if $(ZMQ) reports an error.
2659 See_also:
2660     $(REF Context)
2661 */
2662 Context defaultContext() @trusted
2663 {
2664     // For future reference: This is the low-lock singleton pattern. See:
2665     // http://davesdprogramming.wordpress.com/2013/05/06/low-lock-singletons/
2666     static bool instantiated;
2667     __gshared Context ctx;
2668     if (!instantiated) {
2669         synchronized {
2670             if (!ctx.initialized) {
2671                 ctx = Context();
2672             }
2673             instantiated = true;
2674         }
2675     }
2676     return ctx;
2677 }
2678 
2679 @system unittest
2680 {
2681     import core.thread;
2682     Context c1, c2;
2683     auto t = new Thread(() { c1 = defaultContext(); });
2684     t.start();
2685     c2 = defaultContext();
2686     t.join();
2687     assert(c1.handle !is null);
2688     assert(c1.handle == c2.handle);
2689 }
2690 
2691 
2692 /**
2693 An object that encapsulates a $(ZMQ) context.
2694 
2695 In most programs, it is not necessary to use this type directly,
2696 as $(REF Socket) will use a default global context if not explicitly
2697 provided with one.  See $(FREF defaultContext) for details.
2698 
2699 A default-initialized $(D Context) is not a valid $(ZMQ) context; it
2700 must always be explicitly initialized with $(FREF _Context.opCall):
2701 ---
2702 Context ctx;        // Not a valid context yet
2703 ctx = Context();    // ...but now it is.
2704 ---
2705 $(D Context) objects can be passed around by value, and two copies will
2706 refer to the same context.  The underlying context is managed using
2707 reference counting, so that when the last copy of a $(D Context) goes
2708 out of scope, the context is automatically destroyed.  The reference
2709 counting is performed in a thread safe manner, so that the same context
2710 can be shared between multiple threads.  ($(ZMQ) guarantees the thread
2711 safety of other context operations.)
2712 
2713 See_also:
2714     $(FREF defaultContext)
2715 */
2716 struct Context
2717 {
2718 @safe:
2719     /**
2720     Creates a new $(ZMQ) context.
2721 
2722     Returns:
2723         A $(REF Context) object that encapsulates the new context.
2724     Throws:
2725         $(REF ZmqException) if $(ZMQ) reports an error.
2726     Corresponds_to:
2727         $(ZMQREF zmq_ctx_new())
2728     */
2729     static Context opCall() @trusted // because of the cast
2730     {
2731         if (auto c = trusted!zmq_ctx_new()) {
2732             Context ctx;
2733             // Casting from/to shared is OK since ZMQ contexts are thread safe.
2734             static Exception release(shared(void)* ptr) @trusted nothrow
2735             {
2736                 return zmq_ctx_term(cast(void*) ptr) == 0
2737                     ? null
2738                     : new ZmqException;
2739             }
2740             ctx.m_resource = SharedResource(cast(shared) c, &release);
2741             return ctx;
2742         } else {
2743             throw new ZmqException;
2744         }
2745     }
2746 
2747     ///
2748     unittest
2749     {
2750         auto ctx = Context();
2751         assert (ctx.initialized);
2752     }
2753 
2754     /**
2755     Detaches from the $(ZMQ) context.
2756 
2757     If this is the last reference to the context, it will be destroyed with
2758     $(ZMQREF zmq_ctx_term()).
2759 
2760     Throws:
2761         $(REF ZmqException) if $(ZMQ) reports an error.
2762     */
2763     void detach()
2764     {
2765         m_resource.detach();
2766     }
2767 
2768     ///
2769     unittest
2770     {
2771         auto ctx = Context();
2772         assert (ctx.initialized);
2773         ctx.detach();
2774         assert (!ctx.initialized);
2775     }
2776 
2777     /**
2778     Forcefully terminates the context.
2779 
2780     By using this function, one effectively circumvents the reference-counting
2781     mechanism for managing the context.  After it returns, all other
2782     $(D Context) objects that used to refer to the same context will be in a
2783     state which is functionally equivalent to the default-initialized state
2784     (i.e., $(REF Context.initialized) is $(D false) and $(REF Context.handle)
2785     is $(D null)).
2786 
2787     Throws:
2788         $(REF ZmqException) if $(ZMQ) reports an error.
2789     Corresponds_to:
2790         $(ZMQREF zmq_ctx_term())
2791     */
2792     void terminate() @system
2793     {
2794         m_resource.forceRelease();
2795     }
2796 
2797     ///
2798     @system unittest
2799     {
2800         auto ctx1 = Context();
2801         auto ctx2 = ctx1;
2802         assert (ctx1.initialized);
2803         assert (ctx2.initialized);
2804         assert (ctx1.handle == ctx2.handle);
2805         ctx2.terminate();
2806         assert (!ctx1.initialized);
2807         assert (!ctx2.initialized);
2808         assert (ctx1.handle == null);
2809         assert (ctx2.handle == null);
2810     }
2811 
2812     @system unittest
2813     {
2814         auto ctx = Context();
2815 
2816         void threadFunc()
2817         {
2818             auto server = Socket(ctx, SocketType.rep);
2819             server.bind("inproc://Context_terminate_unittest");
2820             try {
2821                 server.receive(null);
2822                 server.send("");
2823                 server.receive(null);
2824                 assert (false, "Never get here");
2825             } catch (ZmqException e) {
2826                 assert (e.errno == ETERM);
2827             }
2828         }
2829 
2830         import core.thread: Thread;
2831         auto thread = new Thread(&threadFunc);
2832         thread.start();
2833 
2834         auto client = Socket(ctx, SocketType.req);
2835         client.connect("inproc://Context_terminate_unittest");
2836         client.send("");
2837         client.receive(null);
2838         client.close();
2839 
2840         ctx.terminate();
2841         thread.join();
2842     }
2843 
2844 
2845     /**
2846     The number of I/O threads.
2847 
2848     Throws:
2849         $(REF ZmqException) if $(ZMQ) reports an error.
2850     Corresponds_to:
2851         $(ZMQREF zmq_ctx_get()) and $(ZMQREF zmq_ctx_set()) with
2852         $(D ZMQ_IO_THREADS).
2853     */
2854     @property int ioThreads()
2855     {
2856         return getOption(ZMQ_IO_THREADS);
2857     }
2858 
2859     /// ditto
2860     @property void ioThreads(int value)
2861     {
2862         setOption(ZMQ_IO_THREADS, value);
2863     }
2864 
2865     ///
2866     unittest
2867     {
2868         auto ctx = Context();
2869         ctx.ioThreads = 3;
2870         assert (ctx.ioThreads == 3);
2871     }
2872 
2873     /**
2874     The maximum number of sockets.
2875 
2876     Throws:
2877         $(REF ZmqException) if $(ZMQ) reports an error.
2878     Corresponds_to:
2879         $(ZMQREF zmq_ctx_get()) and $(ZMQREF zmq_ctx_set()) with
2880         $(D ZMQ_MAX_SOCKETS).
2881     */
2882     @property int maxSockets()
2883     {
2884         return getOption(ZMQ_MAX_SOCKETS);
2885     }
2886 
2887     /// ditto
2888     @property void maxSockets(int value)
2889     {
2890         setOption(ZMQ_MAX_SOCKETS, value);
2891     }
2892 
2893     ///
2894     unittest
2895     {
2896         auto ctx = Context();
2897         ctx.maxSockets = 512;
2898         assert (ctx.maxSockets == 512);
2899     }
2900 
2901     /**
2902     The largest configurable number of sockets.
2903 
2904     Throws:
2905         $(REF ZmqException) if $(ZMQ) reports an error.
2906     Corresponds_to:
2907         $(ZMQREF zmq_ctx_get()) with $(D ZMQ_SOCKET_LIMIT).
2908     */
2909     @property int socketLimit()
2910     {
2911         return getOption(ZMQ_SOCKET_LIMIT);
2912     }
2913 
2914     ///
2915     unittest
2916     {
2917         auto ctx = Context();
2918         assert (ctx.socketLimit > 0);
2919     }
2920 
2921     /**
2922     IPv6 option.
2923 
2924     Throws:
2925         $(REF ZmqException) if $(ZMQ) reports an error.
2926     Corresponds_to:
2927         $(ZMQREF zmq_ctx_get()) and $(ZMQREF zmq_ctx_set()) with
2928         $(D ZMQ_IPV6).
2929     */
2930     @property bool ipv6()
2931     {
2932         return !!getOption(ZMQ_IPV6);
2933     }
2934 
2935     /// ditto
2936     @property void ipv6(bool value)
2937     {
2938         setOption(ZMQ_IPV6, value ? 1 : 0);
2939     }
2940 
2941     ///
2942     unittest
2943     {
2944         auto ctx = Context();
2945         ctx.ipv6 = true;
2946         assert (ctx.ipv6 == true);
2947     }
2948 
2949     /**
2950     The $(D void*) pointer used by the underlying C API to refer to the context.
2951 
2952     If the object has not been initialized, this function returns $(D null).
2953     */
2954     @property inout(void)* handle() inout @trusted pure nothrow
2955     {
2956         // ZMQ contexts are thread safe, so casting away shared is OK.
2957         return cast(typeof(return)) m_resource.handle;
2958     }
2959 
2960     /**
2961     Whether this $(REF Context) object has been _initialized, i.e. whether it
2962     refers to a valid $(ZMQ) context.
2963     */
2964     @property bool initialized() const pure nothrow
2965     {
2966         return m_resource.handle != null;
2967     }
2968 
2969     ///
2970     unittest
2971     {
2972         Context ctx;
2973         assert (!ctx.initialized);
2974         ctx = Context();
2975         assert (ctx.initialized);
2976         ctx.detach();
2977         assert (!ctx.initialized);
2978     }
2979 
2980 private:
2981     int getOption(int option)
2982     {
2983         immutable value = trusted!zmq_ctx_get(this.handle, option);
2984         if (value < 0) {
2985             throw new ZmqException;
2986         }
2987         return value;
2988     }
2989 
2990     void setOption(int option, int value)
2991     {
2992         if (trusted!zmq_ctx_set(this.handle, option, value) != 0) {
2993             throw new ZmqException;
2994         }
2995     }
2996 
2997     SharedResource m_resource;
2998 }
2999 
3000 
3001 /**
3002 Socket event types.
3003 
3004 These are used together with $(FREF Socket.monitor), and are described
3005 in the $(ZMQREF zmq_socket_monitor()) reference.
3006 */
3007 enum EventType
3008 {
3009     connected       = ZMQ_EVENT_CONNECTED,      /// Corresponds to $(D ZMQ_EVENT_CONNECTED).
3010     connectDelayed  = ZMQ_EVENT_CONNECT_DELAYED,/// Corresponds to $(D ZMQ_EVENT_CONNECT_DELAYED).
3011     connectRetried  = ZMQ_EVENT_CONNECT_RETRIED,/// Corresponds to $(D ZMQ_EVENT_CONNECT_RETRIED).
3012     listening       = ZMQ_EVENT_LISTENING,      /// Corresponds to $(D ZMQ_EVENT_LISTENING).
3013     bindFailed      = ZMQ_EVENT_BIND_FAILED,    /// Corresponds to $(D ZMQ_EVENT_BIND_FAILED).
3014     accepted        = ZMQ_EVENT_ACCEPTED,       /// Corresponds to $(D ZMQ_EVENT_ACCEPTED).
3015     acceptFailed    = ZMQ_EVENT_ACCEPT_FAILED,  /// Corresponds to $(D ZMQ_EVENT_ACCEPT_FAILED).
3016     closed          = ZMQ_EVENT_CLOSED,         /// Corresponds to $(D ZMQ_EVENT_CLOSED).
3017     closeFailed     = ZMQ_EVENT_CLOSE_FAILED,   /// Corresponds to $(D ZMQ_EVENT_CLOSE_FAILED).
3018     disconnected    = ZMQ_EVENT_DISCONNECTED,   /// Corresponds to $(D ZMQ_EVENT_DISCONNECTED).
3019     monitorStopped  = ZMQ_EVENT_MONITOR_STOPPED,/// Corresponds to $(D ZMQ_EVENT_MONITOR_STOPPED).
3020     all             = ZMQ_EVENT_ALL,            /// Corresponds to $(D ZMQ_EVENT_ALL).
3021     handshakeFailedNoDetail = ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL, /// Corresponds to $(D ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL).
3022     handshakeSucceeded      = ZMQ_EVENT_HANDSHAKE_SUCCEEDED,        /// Corresponds to $(D ZMQ_EVENT_HANDSHAKE_SUCCEEDED).
3023     handshakeFailedProtocol = ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL,  /// Corresponds to $(D ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL).
3024     handshakeFailedAuth     = ZMQ_EVENT_HANDSHAKE_FAILED_AUTH,      /// Corresponds to $(D ZMQ_EVENT_HANDSHAKE_FAILED_AUTH).
3025 }
3026 
3027 
3028 /**
3029 Protocol error codes.
3030 
3031 These may be returned from $(FREF Event.protocolError), and are described
3032 in the $(ZMQREF zmq_socket_monitor()) reference.
3033 */
3034 enum ProtocolError
3035 {
3036     zmtpUnspecified                 = ZMQ_PROTOCOL_ERROR_ZMTP_UNSPECIFIED,                  /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_UNSPECIFIED).
3037     zmtpUnexpectedCommand           = ZMQ_PROTOCOL_ERROR_ZMTP_UNEXPECTED_COMMAND,           /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_UNEXPECTED_COMMAND).
3038     zmtpInvalidSequence             = ZMQ_PROTOCOL_ERROR_ZMTP_INVALID_SEQUENCE,             /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_INVALID_SEQUENCE).
3039     zmtpKeyExchange                 = ZMQ_PROTOCOL_ERROR_ZMTP_KEY_EXCHANGE,                 /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_KEY_EXCHANGE).
3040     zmtpMalformedCommandUnspecified = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_UNSPECIFIED,/// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_UNSPECIFIED).
3041     zmtpMalformedCommandMessage     = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_MESSAGE,    /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_MESSAGE).
3042     zmtpMalformedCommandHello       = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_HELLO,      /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_HELLO).
3043     zmtpMalformedCommandInitiate    = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_INITIATE,   /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_INITIATE).
3044     zmtpMalformedCommandError       = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_ERROR,      /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_ERROR).
3045     zmtpMalformedCommandReady       = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_READY,      /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_READY).
3046     zmtpMalformedCommandWelcome     = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_WELCOME,    /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_WELCOME).
3047     zmtpInvalidMetadata             = ZMQ_PROTOCOL_ERROR_ZMTP_INVALID_METADATA,             /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_INVALID_METADATA).
3048     zmtpCryptographic               = ZMQ_PROTOCOL_ERROR_ZMTP_CRYPTOGRAPHIC,                /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_CRYPTOGRAPHIC).
3049     zmtpMechanismMismatch           = ZMQ_PROTOCOL_ERROR_ZMTP_MECHANISM_MISMATCH,           /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MECHANISM_MISMATCH).
3050     zapUnspecified                  = ZMQ_PROTOCOL_ERROR_ZAP_UNSPECIFIED,                   /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZAP_UNSPECIFIED).
3051     zapMalformedReply               = ZMQ_PROTOCOL_ERROR_ZAP_MALFORMED_REPLY,               /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZAP_MALFORMED_REPLY).
3052     zapBadRequestID                 = ZMQ_PROTOCOL_ERROR_ZAP_BAD_REQUEST_ID,                /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZAP_BAD_REQUEST_ID).
3053     zapBadVersion                   = ZMQ_PROTOCOL_ERROR_ZAP_BAD_VERSION,                   /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZAP_BAD_VERSION).
3054     zapInvalidStatusCode            = ZMQ_PROTOCOL_ERROR_ZAP_INVALID_STATUS_CODE,           /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZAP_INVALID_STATUS_CODE).
3055     zapInvalidMetadata              = ZMQ_PROTOCOL_ERROR_ZAP_INVALID_METADATA,              /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZAP_INVALID_METADATA).
3056 }
3057 
3058 
3059 /**
3060 Receives a message on the given _socket and interprets it as a _socket
3061 state change event.
3062 
3063 $(D socket) must be a PAIR _socket which is connected to an endpoint
3064 created via a $(FREF Socket.monitor) call.  $(D receiveEvent()) receives
3065 one message on the _socket, parses its contents according to the
3066 specification in the $(ZMQREF zmq_socket_monitor()) reference,
3067 and returns the event information as an $(REF Event) object.
3068 
3069 Throws:
3070     $(REF ZmqException) if $(ZMQ) reports an error.$(BR)
3071     $(REF InvalidEventException) if the received message could not
3072     be interpreted as an event message.
3073 See_also:
3074     $(FREF Socket.monitor), for monitoring _socket state changes.
3075 */
3076 Event receiveEvent(ref Socket socket) @system
3077 {
3078     enum eventFrameSize = ushort.sizeof + int.sizeof;
3079     auto eventFrame = Frame();
3080     if (socket.receive(eventFrame) != eventFrameSize) {
3081         throw new InvalidEventException;
3082     }
3083     auto addrFrame = Frame();
3084     socket.receive(addrFrame);
3085     try {
3086         import std.conv: to;
3087         immutable event = to!EventType(*(cast(const(ushort)*) eventFrame.data.ptr));
3088         immutable value = *(cast(const(int)*) eventFrame.data.ptr + ushort.sizeof);
3089         immutable addr = (cast(char[]) addrFrame.data).idup;
3090         return Event(event, addr, value);
3091     } catch (Exception e) {
3092         // Any exception thrown within the try block signifies that there
3093         // is something wrong with the event message.
3094         throw new InvalidEventException;
3095     }
3096 }
3097 
3098 // Socket monitoring only works for connection-oriented sockets, and
3099 // IPC is not supported on Windows.
3100 version (Posix) @system unittest
3101 {
3102     Event[] events;
3103     void eventCollector()
3104     {
3105         auto coll = Socket(SocketType.pair);
3106         coll.connect("inproc://zmqd_receiveEvent_unittest_monitor");
3107         do {
3108             events ~= receiveEvent(coll);
3109         } while (events[$-1].type != EventType.closed);
3110     }
3111     import core.thread;
3112     auto collector = new Thread(&eventCollector);
3113     collector.start();
3114 
3115     static void eventGenerator()
3116     {
3117         auto sck1 = Socket(SocketType.pair);
3118         sck1.monitor("inproc://zmqd_receiveEvent_unittest_monitor");
3119         sck1.bind("ipc://zmqd_receiveEvent_unittest");
3120         import core.time;
3121         Thread.sleep(100.msecs);
3122         auto sck2 = Socket(SocketType.pair);
3123         sck2.connect("ipc://zmqd_receiveEvent_unittest");
3124         Thread.sleep(100.msecs);
3125         sck2.disconnect("ipc://zmqd_receiveEvent_unittest");
3126         Thread.sleep(100.msecs);
3127         sck1.unbind("ipc://zmqd_receiveEvent_unittest");
3128     }
3129     eventGenerator();
3130     collector.join();
3131     assert (events.length == 4);
3132     foreach (ev; events) {
3133         assert (ev.address == "ipc://zmqd_receiveEvent_unittest");
3134     }
3135     assert (events[0].type == EventType.listening);
3136     assert (events[1].type == EventType.accepted);
3137     assert (events[2].type == EventType.handshakeSucceeded);
3138     assert (events[3].type == EventType.closed);
3139     import std.exception;
3140     assertNotThrown!Error(events[0].fd);
3141     assertThrown!Error(events[0].errno);
3142     assertThrown!Error(events[0].interval);
3143 }
3144 
3145 
3146 /**
3147 Information about a socket state change.
3148 
3149 See_also:
3150     $(FREF receiveEvent)
3151 */
3152 struct Event
3153 {
3154     /// The event _type.
3155     @property EventType type() const pure nothrow
3156     {
3157         return m_type;
3158     }
3159 
3160     /// The peer _address.
3161     @property string address() const pure nothrow
3162     {
3163         return m_address;
3164     }
3165 
3166     /**
3167     The socket file descriptor.
3168 
3169     This property function may only be called if $(REF Event.type) is one of:
3170     $(D connected), $(D listening), $(D accepted), $(D closed) or $(D disonnected).
3171 
3172     Throws:
3173         $(D Error) if the property is called for a wrong event type.
3174     */
3175     @property FD fd() const pure nothrow
3176     {
3177         final switch (m_type) {
3178             case EventType.connected:
3179             case EventType.listening:
3180             case EventType.accepted:
3181             case EventType.closed:
3182             case EventType.disconnected:
3183                 return cast(typeof(return)) m_value;
3184             case EventType.connectDelayed:
3185             case EventType.connectRetried:
3186             case EventType.bindFailed:
3187             case EventType.acceptFailed:
3188             case EventType.closeFailed:
3189             case EventType.monitorStopped:
3190             case EventType.handshakeFailedNoDetail:
3191             case EventType.handshakeSucceeded:
3192             case EventType.handshakeFailedProtocol:
3193             case EventType.handshakeFailedAuth:
3194                 throw invalidProperty();
3195             case EventType.all:
3196                 assert (false);
3197         }
3198     }
3199 
3200     /**
3201     The $(D _errno) code for the error which triggered the event.
3202 
3203     This property function may only be called if $(REF Event.type) is either
3204     $(D bindFailed), $(D acceptFailed), $(D closeFailed) or
3205     $(D handshakeFailedNoDetail).
3206 
3207     Throws:
3208         $(D Error) if the property is called for a wrong event type.
3209     */
3210     @property int errno() const pure nothrow
3211     {
3212         final switch (m_type) {
3213             case EventType.bindFailed:
3214             case EventType.acceptFailed:
3215             case EventType.closeFailed:
3216             case EventType.handshakeFailedNoDetail:
3217                 return m_value;
3218             case EventType.connected:
3219             case EventType.connectDelayed:
3220             case EventType.connectRetried:
3221             case EventType.listening:
3222             case EventType.accepted:
3223             case EventType.closed:
3224             case EventType.disconnected:
3225             case EventType.monitorStopped:
3226             case EventType.handshakeSucceeded:
3227             case EventType.handshakeFailedProtocol:
3228             case EventType.handshakeFailedAuth:
3229                 throw invalidProperty();
3230             case EventType.all:
3231                 assert (false);
3232         }
3233     }
3234 
3235     /**
3236     The reconnect interval.
3237 
3238     This property function may only be called if $(REF Event.type) is
3239     $(D connectRetried).
3240 
3241     Throws:
3242         $(D Error) if the property is called for a wrong event type.
3243     */
3244     @property Duration interval() const pure nothrow
3245     {
3246         final switch (m_type) {
3247             case EventType.connectRetried:
3248                 return m_value.msecs;
3249             case EventType.connected:
3250             case EventType.connectDelayed:
3251             case EventType.listening:
3252             case EventType.bindFailed:
3253             case EventType.accepted:
3254             case EventType.acceptFailed:
3255             case EventType.closed:
3256             case EventType.closeFailed:
3257             case EventType.disconnected:
3258             case EventType.monitorStopped:
3259             case EventType.handshakeFailedNoDetail:
3260             case EventType.handshakeSucceeded:
3261             case EventType.handshakeFailedProtocol:
3262             case EventType.handshakeFailedAuth:
3263                 throw invalidProperty();
3264             case EventType.all:
3265                 assert (false);
3266         }
3267     }
3268 
3269     /**
3270     The protocol error code.
3271 
3272     This property function may only be called if $(REF Event.type) is
3273     $(D handshakeFailedProtocol).
3274 
3275     Throws:
3276         $(D Error) if the property is called for a wrong event type.
3277     */
3278     @property ProtocolError protocolError() const pure
3279     {
3280         final switch (m_type) {
3281             case EventType.handshakeFailedProtocol:
3282                 import std.conv: to;
3283                 return to!ProtocolError(m_value);
3284             case EventType.connectRetried:
3285             case EventType.connected:
3286             case EventType.connectDelayed:
3287             case EventType.listening:
3288             case EventType.bindFailed:
3289             case EventType.accepted:
3290             case EventType.acceptFailed:
3291             case EventType.closed:
3292             case EventType.closeFailed:
3293             case EventType.disconnected:
3294             case EventType.monitorStopped:
3295             case EventType.handshakeFailedNoDetail:
3296             case EventType.handshakeSucceeded:
3297             case EventType.handshakeFailedAuth:
3298                 throw invalidProperty();
3299             case EventType.all:
3300                 assert (false);
3301         }
3302     }
3303 
3304     /**
3305     The status code returned by the ZAP handler.
3306 
3307     This property function may only be called if $(REF Event.type) is
3308     $(D handshakeFailedAuth).
3309 
3310     Throws:
3311         $(D Error) if the property is called for a wrong event type.
3312     */
3313     @property int statusCode() const pure
3314     {
3315         final switch (m_type) {
3316             case EventType.handshakeFailedAuth:
3317                 return m_value;
3318             case EventType.connectRetried:
3319             case EventType.connected:
3320             case EventType.connectDelayed:
3321             case EventType.listening:
3322             case EventType.bindFailed:
3323             case EventType.accepted:
3324             case EventType.acceptFailed:
3325             case EventType.closed:
3326             case EventType.closeFailed:
3327             case EventType.disconnected:
3328             case EventType.monitorStopped:
3329             case EventType.handshakeFailedNoDetail:
3330             case EventType.handshakeSucceeded:
3331             case EventType.handshakeFailedProtocol:
3332                 throw invalidProperty();
3333             case EventType.all:
3334                 assert (false);
3335         }
3336     }
3337 
3338 private:
3339     this(EventType type, string address, int value) pure nothrow
3340     {
3341         m_type = type;
3342         m_address = address;
3343         m_value = value;
3344     }
3345 
3346     Error invalidProperty(string name = __FUNCTION__)() const pure nothrow
3347     {
3348         try {
3349             import std.conv: text;
3350             return new Error(text("Property '", name,
3351                                   "' not available for event type '",
3352                                   m_type, "'"));
3353         } catch (Exception e) {
3354             assert(false);
3355         }
3356     }
3357 
3358     EventType m_type;
3359     string m_address;
3360     int m_value;
3361 }
3362 
3363 
3364 /**
3365 Encodes a binary key as Z85 printable text.
3366 
3367 $(D dest) must be an array whose length is at least $(D 5*data.length/4 + 1),
3368 which will be used to store the return value plus a terminating zero byte.
3369 If $(D dest) is omitted, a new array will be created.
3370 
3371 Returns:
3372     An array of size $(D 5*data.length/4) which contains the Z85-encoded text,
3373     excluding the terminating zero byte.  This will be a slice of $(D dest) if
3374     it is provided.
3375 Throws:
3376     $(COREF exception,RangeError) if $(D dest) is given but is too small.$(BR)
3377     $(REF ZmqException) if $(ZMQ) reports an error (i.e., if $(D data.length)
3378     is not a multiple of 4).
3379 Corresponds_to:
3380     $(ZMQREF zmq_z85_encode())
3381 */
3382 char[] z85Encode(ubyte[] data, char[] dest)
3383 // TODO: Make data const when we update to ZMQ 4.1
3384 {
3385     import core.exception: RangeError;
3386     immutable len = 5 * data.length / 4;
3387     if (dest.length < len+1) throw new RangeError;
3388     if (trusted!zmq_z85_encode(ptr(dest), ptr(data), data.length) == null) {
3389         throw new ZmqException;
3390     }
3391     return dest[0 .. len];
3392 }
3393 
3394 /// ditto
3395 char[] z85Encode(ubyte[] data)
3396 {
3397     return z85Encode(data, new char[5*data.length/4 + 1]);
3398 }
3399 
3400 debug (WithCurveTests) @system unittest // @system because of assertThrown
3401 {
3402     // TODO: Make data immutable when we update to ZMQ 4.1
3403     auto data = cast(ubyte[])[0x86, 0x4f, 0xd2, 0x6f, 0xb5, 0x59, 0xf7, 0x5b];
3404     immutable encoded = "HelloWorld";
3405     assert (z85Encode(data) == encoded);
3406 
3407     auto buffer = new char[11];
3408     auto result = z85Encode(data, buffer);
3409     assert (result == encoded);
3410     assert (buffer.ptr == result.ptr);
3411 
3412     import core.exception: RangeError;
3413     import std.exception: assertThrown;
3414     assertThrown!RangeError(z85Encode(data, new char[10]));
3415     assertThrown!ZmqException(z85Encode(cast(ubyte[]) [ 1, 2, 3, 4, 5]));
3416 }
3417 
3418 
3419 /**
3420 Decodes a binary key from Z85 printable _text.
3421 
3422 $(D dest) must be an array whose length is at least $(D 4*data.length/5),
3423 which will be used to store the return value.
3424 If $(D dest) is omitted, a new array will be created.
3425 
3426 Note that $(ZMQREF zmq_z85_decode()) expects a zero-terminated string, so a zero
3427 byte will be appended to $(D text) if it does not contain one already.  However,
3428 this may trigger a (possibly unwanted) GC allocation.  To avoid this, either
3429 make sure that the last character in $(D text) is $(D '\0'), or use
3430 $(OBJREF assumeSafeAppend) on the array before calling this function (provided
3431 this is safe).
3432 
3433 Returns:
3434     An array of size $(D 4*data.length/5) which contains the decoded data.
3435     This will be a slice of $(D dest) if it is provided.
3436 Throws:
3437     $(COREF exception,RangeError) if $(D dest) is given but is too small.$(BR)
3438     $(REF ZmqException) if $(ZMQ) reports an error (i.e., if $(D data.length)
3439     is not a multiple of 5).
3440 Corresponds_to:
3441     $(ZMQREF zmq_z85_decode())
3442 */
3443 ubyte[] z85Decode(char[] text, ubyte[] dest)
3444 // TODO: Make text const when we update to ZMQ 4.1
3445 {
3446     import core.exception: RangeError;
3447     immutable len = 4 * text.length/5;
3448     if (dest.length < len) throw new RangeError;
3449     if (text[$-1] != '\0') text ~= '\0';
3450     if (trusted!zmq_z85_decode(ptr(dest), ptr(text)) == null) {
3451         throw new ZmqException;
3452     }
3453     return dest[0 .. len];
3454 }
3455 
3456 /// ditto
3457 ubyte[] z85Decode(char[] text)
3458 {
3459     return z85Decode(text, new ubyte[4*text.length/5]);
3460 }
3461 
3462 debug (WithCurveTests) @system unittest // @system because of assertThrown
3463 {
3464     // TODO: Make data immutable when we update to ZMQ 4.1
3465     auto text = "HelloWorld".dup;
3466     immutable decoded = cast(ubyte[])[0x86, 0x4f, 0xd2, 0x6f, 0xb5, 0x59, 0xf7, 0x5b];
3467     assert (z85Decode(text) == decoded);
3468     assert (z85Decode(text~'\0') == decoded);
3469 
3470     auto buffer = new ubyte[8];
3471     auto result = z85Decode(text, buffer);
3472     assert (result == decoded);
3473     assert (buffer.ptr == result.ptr);
3474 
3475     import core.exception: RangeError;
3476     import std.exception: assertThrown;
3477     assertThrown!RangeError(z85Decode(text, new ubyte[7]));
3478     assertThrown!ZmqException(z85Decode("SizeNotAMultipleOf5".dup));
3479 }
3480 
3481 
3482 /**
3483 Generates a new Curve key pair.
3484 
3485 To avoid a memory allocation, preallocated buffers may optionally be supplied
3486 for the two keys.  Each of these must have a length of at least 41 bytes, enough
3487 for a 40-character Z85-encoded key plus a terminating zero byte.  If either
3488 buffer is omitted/$(D null), a new one will be created.
3489 
3490 Returns:
3491     A tuple that contains the two keys.  Each of these will have a length of
3492     40 characters, and will be slices of the input buffers if such have been
3493     provided.
3494 Throws:
3495     $(COREF exception,RangeError) if $(D publicKeyBuf) or $(D secretKeyBuf) are
3496         not $(D null) but have a length of less than 41 characters.$(BR)
3497     $(REF ZmqException) if $(ZMQ) reports an error.
3498 Corresponds_to:
3499     $(ZMQREF zmq_curve_keypair())
3500 */
3501 Tuple!(char[], "publicKey", char[], "secretKey")
3502     curveKeyPair(char[] publicKeyBuf = null, char[] secretKeyBuf = null)
3503 {
3504     import core.exception: RangeError;
3505     if (publicKeyBuf is null)           publicKeyBuf = new char[41];
3506     else if (publicKeyBuf.length < 41)  throw new RangeError;
3507     if (secretKeyBuf is null)           secretKeyBuf = new char[41];
3508     else if (secretKeyBuf.length < 41)  throw new RangeError;
3509 
3510     if (trusted!zmq_curve_keypair(ptr(publicKeyBuf), ptr(secretKeyBuf)) != 0) {
3511         throw new ZmqException;
3512     }
3513     return typeof(return)(publicKeyBuf[0 .. 40], secretKeyBuf[0 .. 40]);
3514 }
3515 
3516 ///
3517 debug (WithCurveTests) unittest
3518 {
3519     auto server = Socket(SocketType.rep);
3520     auto serverKeys = curveKeyPair();
3521     server.curveServer = true;
3522     server.curveSecretKeyZ85 = serverKeys.secretKey;
3523     server.bind("inproc://curveKeyPair_test");
3524 
3525     auto client = Socket(SocketType.req);
3526     auto clientKeys = curveKeyPair();
3527     client.curvePublicKeyZ85 = clientKeys.publicKey;
3528     client.curveSecretKeyZ85 = clientKeys.secretKey;
3529     client.curveServerKeyZ85 = serverKeys.publicKey;
3530     client.connect("inproc://curveKeyPair_test");
3531     client.send("hello");
3532 
3533     ubyte[5] buf;
3534     assert (server.receive(buf) == 5);
3535     assert (buf.asString() == "hello");
3536 }
3537 
3538 debug (WithCurveTests) @system unittest
3539 {
3540     auto k1 = curveKeyPair();
3541     assert (k1.publicKey.length == 40);
3542     assert (k1.secretKey.length == 40);
3543 
3544     char[82] buf;
3545     auto k2 = curveKeyPair(buf[0 .. 41], buf[41 .. 82]);
3546     assert (k2.publicKey.length == 40);
3547     assert (k2.publicKey.ptr == buf.ptr);
3548     assert (k2.secretKey.length == 40);
3549     assert (k2.secretKey.ptr == buf.ptr + 41);
3550 
3551     char[82] backup = buf;
3552     import core.exception, std.exception;
3553     assertThrown!RangeError(curveKeyPair(buf[0 .. 40], buf[41 .. 82]));
3554     assertThrown!RangeError(curveKeyPair(buf[0 .. 41], buf[42 .. 82]));
3555     assert (backup[] == buf[]);
3556 }
3557 
3558 
3559 /**
3560 Utility function which interprets and validates a byte array as a UTF-8 string.
3561 
3562 Most of $(ZMQD)'s message API deals in $(D ubyte[]) arrays, but very often,
3563 the message _data contains plain text.  $(D asString()) allows for easy and
3564 safe interpretation of raw _data as characters.  It checks that $(D data) is
3565 a valid UTF-8 encoded string, and returns a $(D char[]) array that refers to
3566 the same memory region.
3567 
3568 Throws:
3569     $(STDREF utf,UTFException) if $(D data) is not a valid UTF-8 string.
3570 See_also:
3571     $(STDREF string,representation), which performs the opposite operation.
3572 */
3573 inout(char)[] asString(inout(ubyte)[] data) pure
3574 {
3575     auto s = cast(typeof(return)) data;
3576     import std.utf: validate;
3577     validate(s);
3578     return s;
3579 }
3580 
3581 ///
3582 unittest
3583 {
3584     auto s1 = Socket(SocketType.pair);
3585     s1.bind("inproc://zmqd_asString_example");
3586     auto s2 = Socket(SocketType.pair);
3587     s2.connect("inproc://zmqd_asString_example");
3588 
3589     auto msg = Frame(12);
3590     msg.data.asString()[] = "Hello World!";
3591     s1.send(msg);
3592 
3593     ubyte[12] buf;
3594     s2.receive(buf);
3595     assert(buf.asString() == "Hello World!");
3596 }
3597 
3598 unittest
3599 {
3600     auto bytes = cast(ubyte[]) ['f', 'o', 'o'];
3601     auto text = bytes.asString();
3602     assert (text == "foo");
3603     assert (cast(void*) ptr(bytes) == cast(void*) ptr(text));
3604 
3605     import std.exception: assertThrown;
3606     import std.utf: UTFException;
3607     auto b = cast(ubyte[]) [100, 252, 1];
3608     assertThrown!UTFException(asString(b));
3609 }
3610 
3611 
3612 /**
3613 A class for exceptions thrown when any of the underlying $(ZMQ) C functions
3614 report an error.
3615 
3616 The exception provides a standard error message obtained with
3617 $(ZMQREF zmq_strerror()), as well as the $(D errno) code set by the $(ZMQ)
3618 function which reported the error.
3619 */
3620 class ZmqException : Exception
3621 {
3622     /**
3623     The $(D _errno) code that was set by the $(ZMQ) function that reported
3624     the error.
3625 
3626     Corresponds_to:
3627         $(ZMQREF zmq_errno())
3628     */
3629     immutable int errno;
3630 
3631 private:
3632     this(string file = __FILE__, int line = __LINE__) nothrow
3633     {
3634         import std.conv;
3635         this.errno = trusted!zmq_errno();
3636         string msg;
3637         try {
3638             msg = trusted!(to!string)(trusted!zmq_strerror(this.errno));
3639         } catch (Exception e) { /* We never get here */ }
3640         assert(msg.length);     // Still, let's assert as much.
3641         super(msg, file, line);
3642     }
3643 }
3644 
3645 
3646 /**
3647 Exception thrown by $(FREF receiveEvent) on failure to interpret a
3648 received message as an event description.
3649 */
3650 class InvalidEventException : Exception
3651 {
3652 private:
3653     this(string file = __FILE__, int line = __LINE__) nothrow
3654     {
3655         super("The received message is not an event message", file, line);
3656     }
3657 }
3658 
3659 
3660 // =============================================================================
3661 // Everything below is internal
3662 // =============================================================================
3663 private:
3664 
3665 
3666 struct SharedResource
3667 {
3668 @safe:
3669     alias Exception function(shared(void)*) nothrow Release;
3670 
3671     this(shared(void)* ptr, Release release) nothrow
3672         in(ptr)
3673     {
3674         m_payload = new shared(Payload)(1, ptr, release);
3675     }
3676 
3677     this(this) nothrow
3678     {
3679         if (m_payload) {
3680             incRefCount();
3681         }
3682     }
3683 
3684     ~this() nothrow
3685     {
3686         nothrowDetach();
3687     }
3688 
3689     void opAssign(SharedResource rhs)
3690     {
3691         detach();
3692         m_payload = rhs.m_payload;
3693         rhs.m_payload = null;
3694     }
3695 
3696     void detach()
3697     {
3698         if (auto ex = nothrowDetach()) throw ex;
3699     }
3700 
3701     void forceRelease() @system
3702     {
3703         if (m_payload) {
3704             scope(exit) m_payload = null;
3705             decRefCount();
3706             if (m_payload.handle != null) {
3707                 scope(exit) m_payload.handle = null;
3708                 if (auto ex = m_payload.release(m_payload.handle)) {
3709                     throw ex;
3710                 }
3711             }
3712         }
3713     }
3714 
3715     @property inout(shared(void))* handle() inout pure nothrow
3716     {
3717         if (m_payload) {
3718             return m_payload.handle;
3719         } else {
3720             return null;
3721         }
3722     }
3723 
3724 private:
3725     void incRefCount() @trusted nothrow
3726     {
3727         assert (m_payload !is null && m_payload.refCount > 0);
3728         import core.atomic: atomicOp;
3729         atomicOp!"+="(m_payload.refCount, 1);
3730     }
3731 
3732     int decRefCount() @trusted nothrow
3733     {
3734         assert (m_payload !is null && m_payload.refCount > 0);
3735         import core.atomic: atomicOp;
3736         return atomicOp!"-="(m_payload.refCount, 1);
3737     }
3738 
3739     Exception nothrowDetach() @trusted nothrow
3740         out(; m_payload is null)
3741     {
3742         if (m_payload) {
3743             scope(exit) m_payload = null;
3744             if (decRefCount() < 1 && m_payload.handle != null) {
3745                 return m_payload.release(m_payload.handle);
3746             }
3747         }
3748         return null;
3749     }
3750 
3751     struct Payload
3752     {
3753         int refCount;
3754         void* handle;
3755         Release release;
3756     }
3757     shared(Payload)* m_payload;
3758 
3759     invariant()
3760     {
3761         assert (m_payload is null ||
3762             (m_payload.refCount > 0 && m_payload.release !is null));
3763     }
3764 }
3765 
3766 @system unittest
3767 {
3768     import std.exception: assertNotThrown, assertThrown;
3769     static Exception myFree(shared(void)* p) @trusted nothrow
3770     {
3771         auto v = cast(shared(int)*) p;
3772         if (*v == 0) {
3773             return new Exception("double release");
3774         } else {
3775             *v = 0;
3776             return null;
3777         }
3778     }
3779 
3780     shared int i = 1;
3781 
3782     {
3783         // Test constructor and properties.
3784         auto ra = SharedResource(&i, &myFree);
3785         assert (i == 1);
3786         assert (ra.handle == &i);
3787 
3788         // Test postblit constructor
3789         auto rb = ra;
3790         assert (i == 1);
3791         assert (rb.handle == &i);
3792 
3793         {
3794             // Test properties and free() with default-initialized object.
3795             SharedResource rc;
3796             assert (rc.handle == null);
3797             assertNotThrown(rc.detach());
3798 
3799             // Test assignment, both with and without detachment
3800             rc = rb;
3801             assert (i == 1);
3802             assert (rc.handle == &i);
3803 
3804             shared int j = 2;
3805             auto rd = SharedResource(&j, &myFree);
3806             assert (rd.handle == &j);
3807             rd = rb;
3808             assert (j == 0);
3809             assert (i == 1);
3810             assert (rd.handle == &i);
3811 
3812             // Test explicit detach()
3813             shared int k = 3;
3814             auto re = SharedResource(&k, &myFree);
3815             assertNotThrown(re.detach());
3816             assert(k == 0);
3817 
3818             // Test failure to free and assign (myFree(&k) fails when k == 0)
3819             re = SharedResource(&k, &myFree);
3820             assertThrown!Exception(re.detach()); // We defined free(k == 0) as an error
3821             re = SharedResource(&k, &myFree);
3822             assertThrown!Exception(re = rb);
3823         }
3824 
3825         // i should not be "freed" yet
3826         assert (i == 1);
3827         assert (ra.handle == &i);
3828         assert (rb.handle == &i);
3829     }
3830     // ...but now it should.
3831     assert (i == 0);
3832 }
3833 
3834 // Thread safety test
3835 @system unittest
3836 {
3837     enum threadCount = 100;
3838     enum copyCount = 1000;
3839     static Exception myFree(shared(void)* p) @trusted nothrow
3840     {
3841         auto v = cast(shared(int)*) p;
3842         if (*v == 0) {
3843             return new Exception("double release");
3844         } else {
3845             *v = 0;
3846             return null;
3847         }
3848     }
3849     shared int raw = 1;
3850     {
3851         auto rs = SharedResource(&raw, &myFree);
3852 
3853         import core.thread;
3854         auto group = new ThreadGroup;
3855         foreach (i; 0 .. threadCount) {
3856             group.create(() {
3857                 auto a = rs;
3858                 foreach (j; 0 .. copyCount) {
3859                     auto b = a;
3860                     assert (b.handle == &raw);
3861                     assert (raw == 1);
3862                 }
3863             });
3864         }
3865         group.joinAll();
3866         assert (rs.handle == &raw);
3867         assert (raw == 1);
3868     }
3869     assert (raw == 0);
3870 }
3871 
3872 // forceRelease() test
3873 @system unittest
3874 {
3875     import std.exception: assertNotThrown, assertThrown;
3876     static Exception myFree(shared(void)* p) @trusted nothrow
3877     {
3878         auto v = cast(shared(int)*) p;
3879         if (*v == 0) {
3880             return new Exception("double release");
3881         } else {
3882             *v = 0;
3883             return null;
3884         }
3885     }
3886 
3887     shared int i = 1;
3888 
3889     auto ra = SharedResource(&i, &myFree);
3890     auto rb = ra;
3891     assert (ra.handle == &i);
3892     assert (rb.handle == ra.handle);
3893     rb.forceRelease();
3894     assert (rb.handle == null);
3895     assert (ra.handle == null);
3896 }
3897 
3898 
3899 version(unittest) private string uniqueUrl(string p, int n = __LINE__)
3900 {
3901     import std.uuid;
3902     return p ~ "://" ~ randomUUID().toString();
3903 }
3904 
3905 
3906 private auto trusted(alias func, Args...)(auto ref Args args) @trusted
3907 {
3908     return func(args);
3909 }
3910 
3911 private auto scopedTrusted(alias func, Args...)(auto ref scope Args args) @trusted
3912 {
3913     return func(args);
3914 }
3915 
3916 
3917 // std.string.toStringz() is unsafe, so we provide our own implementation
3918 // tailored to the string sizes we are likely to encounter here.
3919 // Note that this implementation requires that the string be used immediately
3920 // upon return, and not stored, as the buffer will be reused most of the time.
3921 const(char)* zeroTermString(const char[] s) nothrow
3922 {
3923     import std.algorithm: max;
3924     static char[] buf;
3925     immutable len = s.length + 1;
3926     if (buf.length < len) buf.length = max(len, 1023);
3927     buf[0 .. s.length] = s;
3928     buf[s.length] = '\0';
3929     return ptr(buf);
3930 }
3931 
3932 @system unittest
3933 {
3934     auto c1 = zeroTermString("Hello World!");
3935     assert (c1[0 .. 13] == "Hello World!\0");
3936     auto c2 = zeroTermString("foo");
3937     assert (c2[0 .. 4] == "foo\0");
3938     assert (c1 == c2);
3939 }