1 /*
2 This Source Code Form is subject to the terms of the Mozilla Public
3 License, v. 2.0. If a copy of the MPL was not distributed with this
4 file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 */
6 
7 /**
8 $(ZMQD) – a thin wrapper around the low-level C API of the
9 $(LINK2 http://zeromq.org,$(ZMQ)) messaging framework.
10 
11 Most functions in this module have a one-to-one relationship with functions
12 in the underlying C API.  Some adaptations have been made to make the API
13 safer, easier and more pleasant to use; most importantly:
14 $(UL
15     $(LI
16         Errors are signalled by means of exceptions rather than return
17         codes.  The $(REF ZmqException) class provides
18         a standard textual message for any error condition, but it also
19         provides access to the $(D errno) code set by the C function
20         that reported the error.)
21     $(LI
22         Functions are marked with $(D @safe), $(D pure) and $(D nothrow)
23         as appropriate, thus facilitating their use in high-level D code.)
24     $(LI
25         Memory and resources (i.e. contexts, sockets and messages) are
26         automatically managed, thus preventing leaks.)
27     $(LI
28         Context, socket and message options are implemented as properties.)
29 )
30 The names of functions and types in $(ZMQD) are very similar to those in
31 $(ZMQ), but they follow the D naming conventions.  Thus, the library should
32 feel both familiar to $(ZMQ) users and natural to D users.  A notable
33 deviation from the C API is that message parts are consistently called
34 "frames".  For example, $(D zmq_msg_send()) becomes $(D zmqd.Frame.send())
35 and so on.  (Multipart messages were a late addition to $(ZMQ), and the "msg"
36 function names were well established at that point.  The library's
37 developers have admitted that this is somewhat confusing, and the newer
38 CZMQ API consistently uses "frame" in function names.)
39 
40 Due to the close correspondence with the C API, this documentation has
41 intentionally been kept sparse. There is really no reason to repeat the
42 contents of the $(ZMQAPI __start,$(ZMQ) reference manual) here.
43 Instead, the documentation for each function contains a "Corresponds to"
44 section that links to the appropriate pages in the $(ZMQ) reference.  Any
45 details given in the present documentation mostly concern the D-specific
46 adaptations that have been made.
47 
48 Also note that the examples generally only use the INPROC transport.  The
49 reason for this is that the examples double as unittests, and we want to
50 avoid firewall troubles and other issues that could arise with the use of
51 network protocols such as TCP, PGM, etc., and the IPC protocol is not
52 supported on Windows.  Anyway, they are only short
53 snippets that demonstrate the syntax; for more comprehensive and realistic
54 examples, please refer to the $(LINK2 http://zguide.zeromq.org/page:all,
55 $(ZMQ) Guide).  Many of the examples in the Guide have been translated to
56 D, and can be found in the
57 $(LINK2 https://github.com/kyllingstad/zmqd/tree/master/examples,$(D examples))
58 subdirectory of the $(ZMQD) source repository.
59 
60 Version:
61     1.2.0 (compatible with $(ZMQ) >= 4.3.0)
62 Authors:
63     $(LINK2 http://github.com/kyllingstad,Lars T. Kyllingstad)
64 Copyright:
65     Copyright (c) 2013–2019, Lars T. Kyllingstad. All rights reserved.
66 License:
67     $(ZMQD) is released under the terms of the
68     $(LINK2 https://www.mozilla.org/MPL/2.0/index.txt,Mozilla Public License v. 2.0).$(BR)
69     Please refer to the $(LINK2 http://zeromq.org/area:licensing,$(ZMQ) site)
70     for details about $(ZMQ) licensing.
71 Macros:
72     D      = <code>$0</code>
73     EM     = <em>$0</em>
74     LDOTS  = &hellip;
75     QUOTE  = <blockquote>$0</blockquote>
76     FREF   = $(D $(LINK2 #$1,$1()))
77     REF    = $(D $(LINK2 #$1,$1))
78     COREF  = $(D $(LINK2 http://dlang.org/phobos/core_$1.html#.$2,core.$1.$2))
79     OBJREF = $(D $(LINK2 http://dlang.org/phobos/object.html#.$1,$1))
80     STDREF = $(D $(LINK2 http://dlang.org/phobos/std_$1.html#.$2,std.$1.$2))
81     ANCHOR = <span id="$1"></span>
82     ZMQ    = ZeroMQ
83     ZMQAPI = $(LINK2 http://api.zeromq.org/4-3:$1,$+)
84     ZMQD   = $(EM zmqd)
85     ZMQREF = $(D $(ZMQAPI $1,$1))
86 */
87 module zmqd;
88 
89 import core.time;
90 import std.typecons;
91 import deimos.zmq.zmq;
92 
93 
94 version(Windows) {
95     import core.sys.windows.winsock2: SOCKET;
96 }
97 
98 // Compile with debug=ZMQD_DisableCurveTests to be able to successfully
99 // run unittests even if the local ZMQ library is not compiled with Curve
100 // support.
101 debug (ZMQD_DisableCurveTests) { } else debug = WithCurveTests;
102 
103 // Compatibility check
104 version(unittest) static this()
105 {
106     import std.stdio: stderr;
107     const v = zmqVersion();
108     if (v.major != ZMQ_VERSION_MAJOR || v.minor != ZMQ_VERSION_MINOR) {
109         stderr.writefln(
110             "Warning: Potential ZeroMQ header/library incompatibility: "
111             ~"The header (binding) is for version %d.%d.%d, "
112             ~"while the library is version %d.%d.%d. Unittests may fail.",
113             ZMQ_VERSION_MAJOR, ZMQ_VERSION_MINOR, ZMQ_VERSION_PATCH,
114             v.major, v.minor, v.patch);
115     }
116     // Known incompatibilities
117     import std.algorithm: min, max;
118     const libVersion = ZMQ_MAKE_VERSION(v.major, v.minor, v.patch);
119     const older = min(ZMQ_VERSION, libVersion);
120     const newer = max(ZMQ_VERSION, libVersion);
121     if (older < ZMQ_MAKE_VERSION(4, 1, 0) && newer >= ZMQ_MAKE_VERSION(4, 1, 0)) {
122         stderr.writeln("Note: Version 4.1.0 is known to be ABI incompatible with older versions");
123     } else if (older < ZMQ_MAKE_VERSION(4, 1, 1) && newer >= ZMQ_MAKE_VERSION(4, 1, 1)) {
124         stderr.writeln("Note: Version 4.1.1 is known to be ABI incompatible with older versions");
125     }
126 }
127 
128 
129 @safe:
130 
131 T* ptr(T)(T[] arr) { return arr.length == 0 ? null : &arr[0]; }
132 
133 /**
134 Reports the $(ZMQ) library version.
135 
136 Returns:
137     A $(STDREF typecons,Tuple) with three integer fields that represent the
138     three versioning levels: $(D major), $(D minor) and $(D patch).
139 Corresponds_to:
140     $(ZMQREF zmq_version())
141 */
142 Tuple!(int, "major", int, "minor", int, "patch") zmqVersion() nothrow
143 {
144     typeof(return) v;
145     scopedTrusted!zmq_version(&v.major, &v.minor, &v.patch);
146     return v;
147 }
148 
149 
150 /**
151 Checks for a $(ZMQ) capability.
152 
153 Corresponds_to:
154     $(ZMQREF zmq_has())
155 */
156 bool zmqHas(const char[] capability) nothrow
157 {
158     return 0 != trusted!zmq_has(zeroTermString(capability));
159 }
160 
161 unittest
162 {
163     assert (!zmqHas("this ain't a capability"));
164 }
165 
166 
167 /**
168 The various socket types.
169 
170 These are described in the $(ZMQREF zmq_socket()) reference.
171 */
172 enum SocketType
173 {
174     req     = ZMQ_REQ,      /// Corresponds to $(D ZMQ_REQ)
175     rep     = ZMQ_REP,      /// Corresponds to $(D ZMQ_REP)
176     dealer  = ZMQ_DEALER,   /// Corresponds to $(D ZMQ_DEALER)
177     router  = ZMQ_ROUTER,   /// Corresponds to $(D ZMQ_ROUTER)
178     pub     = ZMQ_PUB,      /// Corresponds to $(D ZMQ_PUB)
179     sub     = ZMQ_SUB,      /// Corresponds to $(D ZMQ_SUB)
180     xpub    = ZMQ_XPUB,     /// Corresponds to $(D ZMQ_XPUB)
181     xsub    = ZMQ_XSUB,     /// Corresponds to $(D ZMQ_XSUB)
182     push    = ZMQ_PUSH,     /// Corresponds to $(D ZMQ_PUSH)
183     pull    = ZMQ_PULL,     /// Corresponds to $(D ZMQ_PULL)
184     pair    = ZMQ_PAIR,     /// Corresponds to $(D ZMQ_PAIR)
185     stream  = ZMQ_STREAM,   /// Corresponds to $(D ZMQ_STREAM)
186 }
187 
188 
189 /// _Security mechanisms.
190 enum Security
191 {
192     /// $(ZMQAPI zmq_null,NULL): No security or confidentiality.
193     none  = ZMQ_NULL,
194 
195     /// $(ZMQAPI zmq_plain,PLAIN): Clear-text authentication.
196     plain = ZMQ_PLAIN,
197 
198     /// $(ZMQAPI zmq_curve,CURVE): Secure authentication and confidentiality.
199     curve = ZMQ_CURVE,
200 }
201 
202 
203 /**
204 A special $(COREF time,Duration) value used to signify an infinite
205 timeout or time interval in certain contexts.
206 
207 The places where this value may be used are:
208 $(UL
209     $(LI $(LINK2 #Socket.misc_options,certain socket options))
210     $(LI $(FREF poll))
211 )
212 
213 Note:
214 Since $(D core.time.Duration) doesn't reserve such a special value, the
215 actual value of $(D infiniteDuration) is $(COREF time,Duration.max).
216 */
217 enum infiniteDuration = Duration.max;
218 
219 
220 /**
221 An object that encapsulates a $(ZMQ) socket.
222 
223 A default-initialized $(D Socket) is not a valid $(ZMQ) socket; it
224 must always be explicitly initialized with a constructor (see
225 $(FREF _Socket.this)):
226 ---
227 Socket s;                     // Not a valid socket yet
228 s = Socket(SocketType.push);  // ...but now it is.
229 ---
230 This $(D struct) is noncopyable, which means that a socket is always
231 uniquely managed by a single $(D Socket) object.  Functions that will
232 inspect or use the socket, but not take ownership of it, should take
233 a $(D ref Socket) parameter.  Use $(STDREF algorithm,move) to move
234 a $(D Socket) to a different location (e.g. into a sink function that
235 takes it by value, or into a new variable).
236 
237 The socket is automatically closed when the $(D Socket) object goes out
238 of scope.
239 
240 Linger_period:
241 Note that Socket by default sets the socket's linger period to zero.
242 This deviates from the $(ZMQ) default (which is an infinite linger period).
243 */
244 struct Socket
245 {
246 @safe:
247     /**
248     Creates a new $(ZMQ) socket.
249 
250     If $(D context) is not specified, the default _context (as returned
251     by $(FREF defaultContext)) is used.
252 
253     Throws:
254         $(REF ZmqException) if $(ZMQ) reports an error.
255     Corresponds_to:
256         $(ZMQREF zmq_socket())
257     */
258     this(SocketType type)
259     {
260         this(defaultContext(), type);
261     }
262 
263     /// ditto
264     this(Context context, SocketType type)
265     {
266         m_context = context;
267         m_type = type;
268         m_socket = trusted!zmq_socket(context.handle, type);
269         if (m_socket == null) {
270             throw new ZmqException;
271         }
272         linger = 0.msecs;
273     }
274 
275     /// With default context:
276     unittest
277     {
278         auto sck = Socket(SocketType.push);
279         assert (sck.initialized);
280     }
281     /// With explicit context:
282     unittest
283     {
284         auto ctx = Context();
285         auto sck = Socket(ctx, SocketType.push);
286         assert (sck.initialized);
287     }
288 
289     // Socket objects are noncopyable.
290     @disable this(this);
291 
292     unittest // Verify that move semantics work.
293     {
294         import std.algorithm: move;
295         struct SocketOwner
296         {
297             this(Socket s) { owned = trusted!move(s); }
298             Socket owned;
299         }
300         auto socket = Socket(SocketType.req);
301         const socketPtr = socket.handle;
302         assert (socketPtr != null);
303 
304         auto owner = SocketOwner(trusted!move(socket));
305         assert (socket.handle == null);
306         assert (!socket.m_context.initialized);
307         assert (owner.owned.handle == socketPtr);
308         assert (owner.owned.m_context.initialized);
309     }
310 
311     // Closes the socket on desctruction.
312     ~this() nothrow { nothrowClose(); }
313 
314     /**
315     Closes the $(ZMQ) socket.
316 
317     Note that the socket will be closed automatically upon destruction,
318     so it is usually not necessary to call this method manually.
319 
320     Throws:
321         $(REF ZmqException) if $(ZMQ) reports an error.
322     Corresponds_to:
323         $(ZMQREF zmq_close())
324     */
325     void close()
326     {
327         if (!nothrowClose()) throw new ZmqException;
328     }
329 
330     ///
331     unittest
332     {
333         auto s = Socket(SocketType.pair);
334         assert (s.initialized);
335         s.close();
336         assert (!s.initialized);
337     }
338 
339     /**
340     Starts accepting incoming connections on $(D endpoint).
341 
342     Throws:
343         $(REF ZmqException) if $(ZMQ) reports an error.
344     Corresponds_to:
345         $(ZMQREF zmq_bind())
346     */
347     void bind(const char[] endpoint)
348     {
349         if (trusted!zmq_bind(m_socket, zeroTermString(endpoint)) != 0) {
350             throw new ZmqException;
351         }
352     }
353 
354     ///
355     unittest
356     {
357         auto s = Socket(SocketType.pub);
358         s.bind("inproc://zmqd_bind_example");
359     }
360 
361     /**
362     Stops accepting incoming connections on $(D endpoint).
363 
364     Throws:
365         $(REF ZmqException) if $(ZMQ) reports an error.
366     Corresponds_to:
367         $(ZMQREF zmq_unbind())
368     */
369     void unbind(const char[] endpoint)
370     {
371         if (trusted!zmq_unbind(m_socket, zeroTermString(endpoint)) != 0) {
372             throw new ZmqException;
373         }
374     }
375 
376     unittest
377     {
378         auto s = Socket(SocketType.pub);
379         s.bind("inproc://zmqd_unbind_example");
380         // Do some work...
381         s.unbind("inproc://zmqd_unbind_example");
382     }
383 
384     /**
385     Creates an outgoing connection to $(D endpoint).
386 
387     Throws:
388         $(REF ZmqException) if $(ZMQ) reports an error.
389     Corresponds_to:
390         $(ZMQREF zmq_connect())
391     */
392     void connect(const char[] endpoint)
393     {
394         if (trusted!zmq_connect(m_socket, zeroTermString(endpoint)) != 0) {
395             throw new ZmqException;
396         }
397     }
398 
399     ///
400     unittest
401     {
402         auto s = Socket(SocketType.sub);
403         s.connect("inproc://zmqd_connect_example");
404     }
405 
406     /**
407     Disconnects the socket from $(D endpoint).
408 
409     Throws:
410         $(REF ZmqException) if $(ZMQ) reports an error.
411     Corresponds_to:
412         $(ZMQREF zmq_disconnect())
413     */
414     void disconnect(const char[] endpoint)
415     {
416         if (trusted!zmq_disconnect(m_socket, zeroTermString(endpoint)) != 0) {
417             throw new ZmqException;
418         }
419     }
420 
421     ///
422     unittest
423     {
424         auto s = Socket(SocketType.sub);
425         s.connect("inproc://zmqd_disconnect_example");
426         // Do some work...
427         s.disconnect("inproc://zmqd_disconnect_example");
428     }
429 
430     /**
431     Sends a message frame.
432 
433     $(D _send) blocks until the frame has been queued on the socket.
434     $(D trySend) performs the operation in non-blocking mode, and returns
435     a $(D bool) value that signifies whether the frame was queued on the
436     socket.
437 
438     The $(D more) parameter specifies whether this is a multipart message
439     and there are _more frames to follow.
440 
441     The $(D char[]) overload is a convenience function that simply casts
442     the string argument to $(D ubyte[]).
443 
444     Throws:
445         $(REF ZmqException) if $(ZMQ) reports an error.
446     Corresponds_to:
447         $(ZMQREF zmq_send()) (with the $(D ZMQ_DONTWAIT) flag, in the
448         case of $(D trySend), and with the $(D ZMQ_SNDMORE) flag if
449         $(D more == true)).
450     */
451     void send(const ubyte[] data, bool more = false)
452     {
453         immutable flags = more ? ZMQ_SNDMORE : 0;
454         if (trusted!zmq_send(m_socket, ptr(data), data.length, flags) < 0) {
455             throw new ZmqException;
456         }
457     }
458 
459     /// ditto
460     void send(const char[] data, bool more = false)
461     {
462         send(cast(const(ubyte)[]) data, more);
463     }
464 
465     /// ditto
466     bool trySend(const ubyte[] data, bool more = false)
467     {
468         immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0);
469         if (trusted!zmq_send(m_socket, ptr(data), data.length, flags) < 0) {
470             import core.stdc.errno: EAGAIN;
471             if (trusted!zmq_errno() == EAGAIN) return false;
472             else throw new ZmqException;
473         }
474         return true;
475     }
476 
477     /// ditto
478     bool trySend(const char[] data, bool more = false)
479     {
480         return trySend(cast(const(ubyte)[]) data, more);
481     }
482 
483     ///
484     unittest
485     {
486         auto sck = Socket(SocketType.pub);
487         sck.send(cast(ubyte[]) [11, 226, 92]);
488         sck.send("Hello World!");
489     }
490 
491     /**
492     Sends a message frame.
493 
494     $(D _send) blocks until the frame has been queued on the socket.
495     $(D trySend) performs the operation in non-blocking mode, and returns
496     a $(D bool) value that signifies whether the frame was queued on the
497     socket.
498 
499     The $(D more) parameter specifies whether this is a multipart message
500     and there are _more frames to follow.
501 
502     Throws:
503         $(REF ZmqException) if $(ZMQ) reports an error.
504     Corresponds_to:
505         $(ZMQREF zmq_msg_send()) (with the $(D ZMQ_DONTWAIT) flag, in the
506         case of $(D trySend), and with the $(D ZMQ_SNDMORE) flag if
507         $(D more == true)).
508     */
509     void send(ref Frame msg, bool more = false)
510     {
511         immutable flags = more ? ZMQ_SNDMORE : 0;
512         if (trusted!zmq_msg_send(msg.handle, m_socket, flags) < 0) {
513             throw new ZmqException;
514         }
515     }
516 
517     /// ditto
518     bool trySend(ref Frame msg, bool more = false)
519     {
520         immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0);
521         if (trusted!zmq_msg_send(msg.handle, m_socket, flags) < 0) {
522             import core.stdc.errno: EAGAIN;
523             if (trusted!zmq_errno() == EAGAIN) return false;
524             else throw new ZmqException;
525         }
526         return true;
527     }
528 
529     ///
530     unittest
531     {
532         auto sck = Socket(SocketType.pub);
533         auto msg = Frame(12);
534         msg.data.asString()[] = "Hello World!";
535         sck.send(msg);
536     }
537 
538     /**
539     Sends a constant-memory message frame.
540 
541     $(D _sendConst) blocks until the frame has been queued on the socket.
542     $(D trySendConst) performs the operation in non-blocking mode, and returns
543     a $(D bool) value that signifies whether the frame was queued on the
544     socket.
545 
546     The $(D more) parameter specifies whether this is a multipart message
547     and there are _more frames to follow.
548 
549     Throws:
550         $(REF ZmqException) if $(ZMQ) reports an error.
551     Corresponds_to:
552         $(ZMQREF zmq_send_const()) (with the $(D ZMQ_DONTWAIT) flag, in the
553         case of $(D trySend), and with the $(D ZMQ_SNDMORE) flag if
554         $(D more == true)).
555     */
556     void sendConst(immutable ubyte[] data, bool more = false)
557     {
558         immutable flags = more ? ZMQ_SNDMORE : 0;
559         if (trusted!zmq_send_const(m_socket, ptr(data), data.length, flags) < 0) {
560             throw new ZmqException;
561         }
562     }
563 
564     /// ditto
565     void sendConst(string data, bool more = false)
566     {
567         sendConst(cast(immutable(ubyte)[]) data, more);
568     }
569 
570     /// ditto
571     bool trySendConst(immutable ubyte[] data, bool more = false)
572     {
573         immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0);
574         if (trusted!zmq_send_const(m_socket, ptr(data), data.length, flags) < 0) {
575             import core.stdc.errno: EAGAIN;
576             if (trusted!zmq_errno() == EAGAIN) return false;
577             else throw new ZmqException;
578         }
579         return true;
580     }
581 
582     /// ditto
583     bool trySendConst(string data, bool more = false)
584     {
585         return trySend(cast(immutable(ubyte)[]) data, more);
586     }
587 
588     ///
589     unittest
590     {
591         static immutable arr = cast(ubyte[]) [11, 226, 92];
592         auto sck = Socket(SocketType.pub);
593         sck.sendConst(arr);
594         sck.sendConst("Hello World!");
595     }
596 
597     /**
598     Receives a message frame.
599 
600     $(D _receive) blocks until the request can be satisfied, and returns the
601     number of bytes in the frame.
602     $(D tryReceive) performs the operation in non-blocking mode, and returns
603     a $(STDREF typecons,Tuple) which contains the size of the frame along
604     with a $(D bool) value that signifies whether a frame was received.
605     (If the latter is $(D false), the former is always set to zero.)
606 
607     Throws:
608         $(REF ZmqException) if $(ZMQ) reports an error.
609     Corresponds_to:
610         $(ZMQREF zmq_recv()) (with the $(D ZMQ_DONTWAIT) flag, in the case
611         of $(D tryReceive)).
612 
613     */
614     size_t receive(scope ubyte[] data)
615     {
616         immutable len = scopedTrusted!zmq_recv(m_socket, ptr(data), data.length, 0);
617         if (len >= 0) {
618             import std.conv;
619             return to!size_t(len);
620         } else {
621             throw new ZmqException;
622         }
623     }
624 
625     /// ditto
626     Tuple!(size_t, bool) tryReceive(ubyte[] data)
627     {
628         immutable len = trusted!zmq_recv(m_socket, ptr(data), data.length, ZMQ_DONTWAIT);
629         if (len >= 0) {
630             import std.conv;
631             return typeof(return)(to!size_t(len), true);
632         } else {
633             import core.stdc.errno: EAGAIN;
634             if (trusted!zmq_errno() == EAGAIN) {
635                 return typeof(return)(0, false);
636             } else {
637                 throw new ZmqException;
638             }
639         }
640     }
641 
642     ///
643     unittest
644     {
645         // Sender
646         auto snd = Socket(SocketType.req);
647         snd.connect("inproc://zmqd_receive_example");
648         snd.send("Hello World!");
649 
650         // Receiver
651         import std.string: representation;
652         auto rcv = Socket(SocketType.rep);
653         rcv.bind("inproc://zmqd_receive_example");
654         char[256] buf;
655         immutable len  = rcv.receive(buf.representation);
656         assert (buf[0 .. len] == "Hello World!");
657     }
658 
659     @system unittest
660     {
661         auto snd = Socket(SocketType.pair);
662         snd.bind("inproc://zmqd_tryReceive_example");
663         auto rcv = Socket(SocketType.pair);
664         rcv.connect("inproc://zmqd_tryReceive_example");
665 
666         ubyte[256] buf;
667         auto r1 = rcv.tryReceive(buf);
668         assert (!r1[1]);
669 
670         import core.thread, core.time, std.string;
671         snd.send("Hello World!");
672         Thread.sleep(100.msecs); // Wait for message to be transferred...
673         auto r2 = rcv.tryReceive(buf);
674         assert (r2[1] && buf[0 .. r2[0]] == "Hello World!".representation);
675     }
676 
677     /**
678     Receives a message frame.
679 
680     $(D _receive) blocks until the request can be satisfied, and returns the
681     number of bytes in the frame.
682     $(D tryReceive) performs the operation in non-blocking mode, and returns
683     a $(STDREF typecons,Tuple) which contains the size of the frame along
684     with a $(D bool) value that signifies whether a frame was received.
685     (If the latter is $(D false), the former is always set to zero.)
686 
687     Throws:
688         $(REF ZmqException) if $(ZMQ) reports an error.
689     Corresponds_to:
690         $(ZMQREF zmq_msg_recv()) (with the $(D ZMQ_DONTWAIT) flag, in the case
691         of $(D tryReceive)).
692 
693     */
694     size_t receive(ref Frame msg)
695     {
696         immutable len = trusted!zmq_msg_recv(msg.handle, m_socket, 0);
697         if (len >= 0) {
698             import std.conv;
699             return to!size_t(len);
700         } else {
701             throw new ZmqException;
702         }
703     }
704 
705     /// ditto
706     Tuple!(size_t, bool) tryReceive(ref Frame msg)
707     {
708         immutable len = trusted!zmq_msg_recv(msg.handle, m_socket, ZMQ_DONTWAIT);
709         if (len >= 0) {
710             import std.conv;
711             return typeof(return)(to!size_t(len), true);
712         } else {
713             import core.stdc.errno: EAGAIN;
714             if (trusted!zmq_errno() == EAGAIN) {
715                 return typeof(return)(0, false);
716             } else {
717                 throw new ZmqException;
718             }
719         }
720     }
721 
722     ///
723     unittest
724     {
725         // Sender
726         auto snd = Socket(SocketType.req);
727         snd.connect("inproc://zmqd_msg_receive_example");
728         snd.send("Hello World!");
729 
730         // Receiver
731         import std.string: representation;
732         auto rcv = Socket(SocketType.rep);
733         rcv.bind("inproc://zmqd_msg_receive_example");
734         auto msg = Frame();
735         rcv.receive(msg);
736         assert (msg.data.asString() == "Hello World!");
737     }
738 
739     @system unittest
740     {
741         auto snd = Socket(SocketType.pair);
742         snd.bind("inproc://zmqd_msg_tryReceive_example");
743         auto rcv = Socket(SocketType.pair);
744         rcv.connect("inproc://zmqd_msg_tryReceive_example");
745 
746         auto msg = Frame();
747         auto r1 = rcv.tryReceive(msg);
748         assert (!r1[1]);
749 
750         import core.thread, core.time, std.string;
751         snd.send("Hello World!");
752         Thread.sleep(100.msecs); // Wait for message to be transferred...
753         auto r2 = rcv.tryReceive(msg);
754         assert (r2[1] && msg.data[0 .. r2[0]] == "Hello World!".representation);
755     }
756 
757     /**
758     Whether there are _more message frames to follow.
759 
760     Throws:
761         $(REF ZmqException) if $(ZMQ) reports an error.
762     Corresponds_to:
763         $(ZMQREF zmq_getsockopt()) with $(D ZMQ_RCVMORE).
764     */
765     @property bool more() { return !!getOption!int(ZMQ_RCVMORE); }
766 
767     // TODO: Better unittest/example
768     unittest
769     {
770         auto sck = Socket(SocketType.req);
771         assert (!sck.more);
772     }
773 
774     /** $(ANCHOR Socket.misc_options)
775     Miscellaneous socket options.
776 
777     Each of these corresponds to an option passed to
778     $(ZMQREF zmq_getsockopt()) and $(ZMQREF zmq_setsockopt()). For
779     example, $(D identity) corresponds to $(D ZMQ_IDENTITY),
780     $(D receiveBufferSize) corresponds to $(D ZMQ_RCVBUF), etc.
781 
782     Notes:
783     $(UL
784         $(LI For convenience, the setter for the $(D identity) property
785             accepts strings.  To retrieve a string with the getter, use
786             the $(FREF asString) function.
787             ---
788             sck.identity = "foobar";
789             assert (sck.identity.asString() == "foobar");
790             ---
791             )
792         $(LI The $(D linger), $(D receiveTimeout), $(D sendTimeout) and
793             $(D handshakeInterval) properties may have the special _value
794             $(REF infiniteDuration).  This  is translated to an option
795             _value of -1 or 0 (depending on which property is being set)
796             in the C API.)
797         $(LI Some options have array _type, and these allow the user to supply
798             a buffer in which to store the _value, to avoid a GC allocation.
799             The return _value is then a slice of this buffer.
800             These are not marked as $(D @property), but are prefixed with
801             "get" (e.g. $(D getIdentity())).  A user-supplied buffer is
802             $(EM required) for some options, namely $(D getPlainUsername())
803             and $(D getPlainPassword()), and these do not have $(D @property)
804             versions.  $(D getCurveXxxKey()) and $(D getCurveXxxKeyZ85())
805             require buffers which are at least 32 and 41 bytes long,
806             respectively.)
807         $(LI The $(D ZMQ_SUBSCRIBE) and $(D ZMQ_UNSUBSCRIBE) options are
808             treated differently from the others; see $(FREF Socket.subscribe)
809             and $(FREF Socket.unsubscribe))
810     )
811 
812     Throws:
813         $(REF ZmqException) if $(ZMQ) reports an error.$(BR)
814         $(STDREF conv,ConvOverflowException) if a given $(D Duration) is
815             longer than the number of milliseconds that will fit in an $(D int)
816             (only applies to properties of $(COREF time,Duration) _type).$(BR)
817         $(COREF exception,RangeError) if the $(D dest) buffers passed to
818             $(D getCurveXxxKey()) or $(D getCurveXxxKeyZ85()) are less than
819             32 or 41 bytes long, respectively.
820     Corresponds_to:
821         $(ZMQREF zmq_getsockopt()) and $(ZMQREF zmq_setsockopt()).
822     */
823     @property ulong threadAffinity() { return getOption!ulong(ZMQ_AFFINITY); }
824     /// ditto
825     @property void threadAffinity(ulong value) { setOption(ZMQ_AFFINITY, value); }
826 
827     /// ditto
828     @property ubyte[] identity() { return getIdentity(new ubyte[255]); }
829     /// ditto
830     ubyte[] getIdentity(ubyte[] dest) { return getArrayOption(ZMQ_IDENTITY, dest); }
831     /// ditto
832     @property void identity(const ubyte[] value) { setArrayOption(ZMQ_IDENTITY, value); }
833     /// ditto
834     @property void identity(const char[] value) { setArrayOption(ZMQ_IDENTITY, value); }
835 
836     /// ditto
837     @property int rate() { return getOption!int(ZMQ_RATE); }
838     /// ditto
839     @property void rate(int value) { setOption(ZMQ_RATE, value); }
840 
841     /// ditto
842     @property Duration recoveryInterval()
843     {
844         return msecs(getOption!int(ZMQ_RECOVERY_IVL));
845     }
846     /// ditto
847     @property void recoveryInterval(Duration value)
848     {
849         import std.conv: to;
850         setOption(ZMQ_RECOVERY_IVL, to!int(value.total!"msecs"()));
851     }
852 
853     /// ditto
854     @property int sendBufferSize() { return getOption!int(ZMQ_SNDBUF); }
855     /// ditto
856     @property void sendBufferSize(int value) { setOption(ZMQ_SNDBUF, value); }
857 
858     /// ditto
859     @property int receiveBufferSize() { return getOption!int(ZMQ_RCVBUF); }
860     /// ditto
861     @property void receiveBufferSize(int value) { setOption(ZMQ_RCVBUF, value); }
862 
863     /// ditto
864     @property FD fd() { return getOption!FD(ZMQ_FD); }
865 
866     /// ditto
867     @property PollFlags events() { return getOption!PollFlags(ZMQ_EVENTS); }
868 
869     /// ditto
870     @property SocketType type() { return getOption!SocketType(ZMQ_TYPE); }
871 
872     /// ditto
873     @property Duration linger()
874     {
875         const auto value = getOption!int(ZMQ_LINGER);
876         return value == -1 ? infiniteDuration : value.msecs;
877     }
878     /// ditto
879     @property void linger(Duration value)
880     {
881         import std.conv: to;
882         setOption(
883             ZMQ_LINGER,
884             value == infiniteDuration ? -1 : to!int(value.total!"msecs"()));
885     }
886 
887     /// ditto
888     @property Duration reconnectionInterval()
889     {
890         return getOption!int(ZMQ_RECONNECT_IVL).msecs;
891     }
892     /// ditto
893     @property void reconnectionInterval(Duration value)
894     {
895         import std.conv: to;
896         setOption(ZMQ_RECONNECT_IVL, to!int(value.total!"msecs"()));
897     }
898 
899     /// ditto
900     @property int backlog() { return getOption!int(ZMQ_BACKLOG); }
901     /// ditto
902     @property void backlog(int value) { setOption(ZMQ_BACKLOG, value); }
903 
904     /// ditto
905     @property Duration maxReconnectionInterval()
906     {
907         return getOption!int(ZMQ_RECONNECT_IVL_MAX).msecs;
908     }
909     /// ditto
910     @property void maxReconnectionInterval(Duration value)
911     {
912         import std.conv: to;
913         setOption(ZMQ_RECONNECT_IVL_MAX, to!int(value.total!"msecs"()));
914     }
915 
916     /// ditto
917     @property long maxMsgSize() { return getOption!long(ZMQ_MAXMSGSIZE); }
918     /// ditto
919     @property void maxMsgSize(long value) { setOption(ZMQ_MAXMSGSIZE, value); }
920 
921     /// ditto
922     @property int sendHWM() { return getOption!int(ZMQ_SNDHWM); }
923     /// ditto
924     @property void sendHWM(int value) { setOption(ZMQ_SNDHWM, value); }
925 
926     /// ditto
927     @property int receiveHWM() { return getOption!int(ZMQ_RCVHWM); }
928     /// ditto
929     @property void receiveHWM(int value) { setOption(ZMQ_RCVHWM, value); }
930 
931     /// ditto
932     @property int multicastHops() { return getOption!int(ZMQ_MULTICAST_HOPS); }
933     /// ditto
934     @property void multicastHops(int value) { setOption(ZMQ_MULTICAST_HOPS, value); }
935 
936     /// ditto
937     @property Duration receiveTimeout()
938     {
939         const value = getOption!int(ZMQ_RCVTIMEO);
940         return value == -1 ? infiniteDuration : value.msecs;
941     }
942     /// ditto
943     @property void receiveTimeout(Duration value)
944     {
945         import std.conv: to;
946         setOption(
947             ZMQ_RCVTIMEO,
948             value == infiniteDuration ? -1 : to!int(value.total!"msecs"()));
949     }
950 
951     /// ditto
952     @property Duration sendTimeout()
953     {
954         const value = getOption!int(ZMQ_SNDTIMEO);
955         return value == -1 ? infiniteDuration : value.msecs;
956     }
957     /// ditto
958     @property void sendTimeout(Duration value)
959     {
960         import std.conv: to;
961         setOption(
962             ZMQ_SNDTIMEO,
963             value == infiniteDuration ? -1 : to!int(value.total!"msecs"()));
964     }
965 
966     /// ditto
967     @property char[] lastEndpoint() @trusted
968     {
969         // This function is not @safe because it calls a @system function
970         // (zmq_getsockopt) and takes the address of a local (len).
971         auto buf = new char[1024];
972         size_t len = buf.length;
973         if (zmq_getsockopt(m_socket, ZMQ_LAST_ENDPOINT, buf.ptr, &len) != 0) {
974             throw new ZmqException;
975         }
976         return buf[0 .. len-1];
977     }
978 
979     /// ditto
980     @property void routerMandatory(bool value) { setOption(ZMQ_ROUTER_MANDATORY, value ? 1 : 0); }
981 
982     /// ditto
983     @property int tcpKeepalive() { return getOption!int(ZMQ_TCP_KEEPALIVE); }
984     /// ditto
985     @property void tcpKeepalive(int value) { setOption(ZMQ_TCP_KEEPALIVE, value); }
986 
987     /// ditto
988     @property int tcpKeepaliveCnt() { return getOption!int(ZMQ_TCP_KEEPALIVE_CNT); }
989     /// ditto
990     @property void tcpKeepaliveCnt(int value) { setOption(ZMQ_TCP_KEEPALIVE_CNT, value); }
991 
992     /// ditto
993     @property int tcpKeepaliveIdle() { return getOption!int(ZMQ_TCP_KEEPALIVE_IDLE); }
994     /// ditto
995     @property void tcpKeepaliveIdle(int value) { setOption(ZMQ_TCP_KEEPALIVE_IDLE, value); }
996 
997     /// ditto
998     @property int tcpKeepaliveIntvl() { return getOption!int(ZMQ_TCP_KEEPALIVE_INTVL); }
999     /// ditto
1000     @property void tcpKeepaliveIntvl(int value) { setOption(ZMQ_TCP_KEEPALIVE_INTVL, value); }
1001 
1002     /// ditto
1003     @property bool immediate() { return !!getOption!int(ZMQ_IMMEDIATE); }
1004     /// ditto
1005     @property void immediate(bool value) { setOption!int(ZMQ_IMMEDIATE, value ? 1 : 0); }
1006 
1007     /// ditto
1008     @property void xpubVerbose(bool value) { setOption(ZMQ_XPUB_VERBOSE, value ? 1 : 0); }
1009 
1010     /// ditto
1011     @property bool ipv6() { return !!getOption!int(ZMQ_IPV6); }
1012     /// ditto
1013     @property void ipv6(bool value) { setOption(ZMQ_IPV6, value ? 1 : 0); }
1014 
1015     /// ditto
1016     @property Security mechanism()
1017     {
1018         import std.conv;
1019         return to!Security(getOption!int(ZMQ_MECHANISM));
1020     }
1021 
1022     /// ditto
1023     @property bool plainServer() { return !!getOption!int(ZMQ_PLAIN_SERVER); }
1024     /// ditto
1025     @property void plainServer(bool value) { setOption(ZMQ_PLAIN_SERVER, value ? 1 : 0); }
1026 
1027     /// ditto
1028     char[] getPlainUsername(char[] dest)
1029     {
1030         return getCStringOption(ZMQ_PLAIN_USERNAME, dest);
1031     }
1032     /// ditto
1033     @property void plainUsername(const(char)[] value)
1034     {
1035         setArrayOption(ZMQ_PLAIN_USERNAME, value);
1036     }
1037 
1038     /// ditto
1039     char[] getPlainPassword(char[] dest)
1040     {
1041         return getCStringOption(ZMQ_PLAIN_PASSWORD, dest);
1042     }
1043     /// ditto
1044     @property void plainPassword(const(char)[] value)
1045     {
1046         setArrayOption(ZMQ_PLAIN_PASSWORD, value);
1047     }
1048 
1049     /// ditto
1050     @property bool curveServer() { return !!getOption!int(ZMQ_CURVE_SERVER); }
1051     /// ditto
1052     @property void curveServer(bool value) { setOption(ZMQ_CURVE_SERVER, value ? 1 : 0); }
1053 
1054     /// ditto
1055     @property ubyte[] curvePublicKey()
1056     {
1057         return getCurvePublicKey(new ubyte[keyBufSizeBin]);
1058     }
1059     /// ditto
1060     ubyte[] getCurvePublicKey(ubyte[] dest)
1061     {
1062         return getCurveKey(ZMQ_CURVE_PUBLICKEY, dest);
1063     }
1064     /// ditto
1065     @property char[] curvePublicKeyZ85()
1066     {
1067         return getCurvePublicKeyZ85(new char[keyBufSizeZ85]);
1068     }
1069     /// ditto
1070     char[] getCurvePublicKeyZ85(char[] dest)
1071     {
1072         return getCurveKeyZ85(ZMQ_CURVE_PUBLICKEY, dest);
1073     }
1074     /// ditto
1075     @property void curvePublicKey(const(ubyte)[] value)
1076     {
1077         setCurveKey(ZMQ_CURVE_PUBLICKEY, value);
1078     }
1079     /// ditto
1080     @property void curvePublicKeyZ85(const(char)[] value)
1081     {
1082         setCurveKeyZ85(ZMQ_CURVE_PUBLICKEY, value);
1083     }
1084 
1085     /// ditto
1086     @property ubyte[] curveSecretKey()
1087     {
1088         return getCurveSecretKey(new ubyte[keyBufSizeBin]);
1089     }
1090     /// ditto
1091     ubyte[] getCurveSecretKey(ubyte[] dest)
1092     {
1093         return getCurveKey(ZMQ_CURVE_SECRETKEY, dest);
1094     }
1095     /// ditto
1096     @property char[] curveSecretKeyZ85()
1097     {
1098         return getCurveSecretKeyZ85(new char[keyBufSizeZ85]);
1099     }
1100     /// ditto
1101     char[] getCurveSecretKeyZ85(char[] dest)
1102     {
1103         return getCurveKeyZ85(ZMQ_CURVE_SECRETKEY, dest);
1104     }
1105     /// ditto
1106     @property void curveSecretKey(const(ubyte)[] value)
1107     {
1108         setCurveKey(ZMQ_CURVE_SECRETKEY, value);
1109     }
1110     /// ditto
1111     @property void curveSecretKeyZ85(const(char)[] value)
1112     {
1113         setCurveKeyZ85(ZMQ_CURVE_SECRETKEY, value);
1114     }
1115 
1116     /// ditto
1117     @property ubyte[] curveServerKey()
1118     {
1119         return getCurveServerKey(new ubyte[keyBufSizeBin]);
1120     }
1121     /// ditto
1122     ubyte[] getCurveServerKey(ubyte[] dest)
1123     {
1124         return getCurveKey(ZMQ_CURVE_SERVERKEY, dest);
1125     }
1126     /// ditto
1127     @property char[] curveServerKeyZ85()
1128     {
1129         return getCurveServerKeyZ85(new char[keyBufSizeZ85]);
1130     }
1131     /// ditto
1132     char[] getCurveServerKeyZ85(char[] dest)
1133     {
1134         return getCurveKeyZ85(ZMQ_CURVE_SERVERKEY, dest);
1135     }
1136     /// ditto
1137     @property void curveServerKey(const(ubyte)[] value)
1138     {
1139         setCurveKey(ZMQ_CURVE_SERVERKEY, value);
1140     }
1141     /// ditto
1142     @property void curveServerKeyZ85(const(char)[] value)
1143     {
1144         setCurveKeyZ85(ZMQ_CURVE_SERVERKEY, value);
1145     }
1146 
1147     /// ditto
1148     @property void probeRouter(bool value) { setOption(ZMQ_PROBE_ROUTER, value ? 1 : 0); }
1149 
1150     /// ditto
1151     @property void reqCorrelate(bool value) { setOption(ZMQ_REQ_CORRELATE, value ? 1 : 0); }
1152 
1153     /// ditto
1154     @property void reqRelaxed(bool value) { setOption(ZMQ_REQ_RELAXED, value ? 1 : 0); }
1155 
1156     /// ditto
1157     @property void conflate(bool value) { setOption(ZMQ_CONFLATE, value ? 1 : 0); }
1158 
1159     /// ditto
1160     @property char[] zapDomain() { return getZapDomain(new char[256]); }
1161     /// ditto
1162     char[] getZapDomain(char[] dest) { return getCStringOption(ZMQ_ZAP_DOMAIN, dest); }
1163     /// ditto
1164     @property void zapDomain(const char[] value) { setArrayOption(ZMQ_ZAP_DOMAIN, value); }
1165 
1166     /// ditto
1167     @property void routerHandover(bool value) { setOption(ZMQ_ROUTER_HANDOVER, value ? 1 : 0); }
1168 
1169     /// ditto
1170     @property void connectionRID(const ubyte[] value) { setArrayOption(ZMQ_CONNECT_RID, value); }
1171 
1172     /// ditto
1173     @property int typeOfService() { return getOption!int(ZMQ_TOS); }
1174     /// ditto
1175     @property void typeOfService(int value) { setOption(ZMQ_TOS, value); }
1176 
1177     /// ditto
1178     @property bool gssapiPlaintext() { return !!getOption!int(ZMQ_GSSAPI_PLAINTEXT); }
1179     /// ditto
1180     @property void gssapiPlaintext(bool value) { setOption(ZMQ_GSSAPI_PLAINTEXT, value ? 1 : 0); }
1181 
1182     /// ditto
1183     @property char[] gssapiPrincipal() { return getGssapiPrincipal(new char[256]); }
1184     /// ditto
1185     char[] getGssapiPrincipal(char[] dest) { return getCStringOption(ZMQ_GSSAPI_PRINCIPAL, dest); }
1186     /// ditto
1187     @property void gssapiPrincipal(const char[] value) { setArrayOption(ZMQ_GSSAPI_PRINCIPAL, value); }
1188 
1189     /// ditto
1190     @property bool gssapiServer() { return !!getOption!int(ZMQ_GSSAPI_SERVER); }
1191     /// ditto
1192     @property void gssapiServer(bool value) { setOption(ZMQ_GSSAPI_SERVER, value ? 1 : 0); }
1193 
1194     /// ditto
1195     @property char[] gssapiServicePrincipal() { return getGssapiServicePrincipal(new char[256]); }
1196     /// ditto
1197     char[] getGssapiServicePrincipal(char[] dest) { return getCStringOption(ZMQ_GSSAPI_SERVICE_PRINCIPAL, dest); }
1198     /// ditto
1199     @property void gssapiServicePrincipal(const char[] value) { setArrayOption(ZMQ_GSSAPI_SERVICE_PRINCIPAL, value); }
1200 
1201     /// ditto
1202     @property Duration handshakeInterval()
1203     {
1204         const auto value = getOption!int(ZMQ_HANDSHAKE_IVL);
1205         return value == 0 ? infiniteDuration : msecs(value);
1206     }
1207     /// ditto
1208     @property void handshakeInterval(Duration value)
1209     {
1210         import std.conv: to;
1211         setOption(
1212             ZMQ_HANDSHAKE_IVL,
1213             value == infiniteDuration ? 0 : to!int(value.total!"msecs"()));
1214     }
1215 
1216     // deprecated options
1217     deprecated("Use the 'immediate' property instead")
1218     @property bool delayAttachOnConnect() { return !!getOption!int(ZMQ_DELAY_ATTACH_ON_CONNECT); }
1219 
1220     deprecated("Use the 'immediate' property instead")
1221     @property void delayAttachOnConnect(bool value) { setOption(ZMQ_DELAY_ATTACH_ON_CONNECT, value ? 1 : 0); }
1222 
1223     deprecated("Use !ipv6 instead")
1224     @property bool ipv4Only() { return !ipv6; }
1225 
1226     deprecated("Use ipv6 = !value instead")
1227     @property void ipv4Only(bool value) { ipv6 = !value; }
1228 
1229     unittest
1230     {
1231         auto s = Socket(SocketType.xpub);
1232         const e = "inproc://unittest2";
1233         s.bind(e);
1234         import core.time;
1235         s.threadAffinity = 1;
1236         assert(s.threadAffinity == 1);
1237         s.identity = cast(ubyte[]) [ 65, 66, 67 ];
1238         assert(s.identity == [65, 66, 67]);
1239         s.identity = "foo";
1240         assert(s.identity == [102, 111, 111]);
1241         s.rate = 200;
1242         assert(s.rate == 200);
1243         s.recoveryInterval = 5.seconds;
1244         assert(s.recoveryInterval == 5_000.msecs);
1245         s.sendBufferSize = 500;
1246         assert(s.sendBufferSize == 500);
1247         s.receiveBufferSize = 600;
1248         assert(s.receiveBufferSize == 600);
1249         version(Posix) {
1250             assert(s.fd > 2); // 0, 1 and 2 are the standard streams
1251         }
1252         assert(s.events == PollFlags.pollOut);
1253         assert(s.type == SocketType.xpub);
1254         s.linger = Duration.zero;
1255         assert(s.linger == Duration.zero);
1256         s.linger = 100_000.usecs;
1257         assert(s.linger == 100.msecs);
1258         s.linger = infiniteDuration;
1259         assert(s.linger == infiniteDuration);
1260         s.reconnectionInterval = 200_000.usecs;
1261         assert(s.reconnectionInterval == 200.msecs);
1262         s.backlog = 50;
1263         assert(s.backlog == 50);
1264         s.maxReconnectionInterval = 300_000.usecs;
1265         assert(s.maxReconnectionInterval == 300.msecs);
1266         s.maxMsgSize = 1000;
1267         assert(s.maxMsgSize == 1000);
1268         s.sendHWM = 500;
1269         assert(s.sendHWM == 500);
1270         s.receiveHWM = 600;
1271         assert(s.receiveHWM == 600);
1272         s.multicastHops = 2;
1273         assert(s.multicastHops == 2);
1274         s.receiveTimeout = 3.seconds;
1275         assert(s.receiveTimeout == 3_000_000.usecs);
1276         s.receiveTimeout = infiniteDuration;
1277         assert(s.receiveTimeout == infiniteDuration);
1278         s.sendTimeout = 2_000_000.usecs;
1279         assert(s.sendTimeout == 2.seconds);
1280         s.sendTimeout = infiniteDuration;
1281         assert(s.sendTimeout == infiniteDuration);
1282         s.tcpKeepalive = 1;
1283         assert(s.tcpKeepalive == 1);
1284         s.tcpKeepaliveCnt = 1;
1285         assert(s.tcpKeepaliveCnt == 1);
1286         s.tcpKeepaliveIdle = 0;
1287         assert(s.tcpKeepaliveIdle == 0);
1288         s.tcpKeepaliveIntvl = 0;
1289         assert(s.tcpKeepaliveIntvl == 0);
1290         s.immediate = true;
1291         assert(s.immediate);
1292         s.ipv6 = true;
1293         assert(s.ipv6);
1294         s.plainServer = true;
1295         assert(s.mechanism == Security.plain);
1296         assert(s.plainServer);
1297         debug (WithCurveTests) {
1298             assert(!s.curveServer);
1299             s.curveServer = true;
1300             assert(s.mechanism == Security.curve);
1301             assert(!s.plainServer);
1302             assert(s.curveServer);
1303         }
1304         s.plainServer = false;
1305         assert(s.mechanism == Security.none);
1306         assert(!s.plainServer);
1307         debug (WithCurveTests) assert(!s.curveServer);
1308         s.plainUsername = "foobar";
1309         assert(s.getPlainUsername(new char[8]) == "foobar");
1310         assert(s.mechanism == Security.plain);
1311         s.plainUsername = null;
1312         assert(s.mechanism == Security.none);
1313         s.plainPassword = "xyz";
1314         assert(s.getPlainPassword(new char[8]) == "xyz");
1315         assert(s.mechanism == Security.plain);
1316         s.plainPassword = null;
1317         assert(s.mechanism == Security.none);
1318         s.zapDomain = "my_zap_domain";
1319         assert(s.zapDomain == "my_zap_domain");
1320         s.typeOfService = 1;
1321         assert(s.typeOfService == 1);
1322         if (zmqHas("gssapi")) {
1323             s.gssapiPlaintext = true;
1324             assert(s.gssapiPlaintext);
1325             s.gssapiPrincipal = "myPrincipal";
1326             assert(s.gssapiPrincipal == "myPrincipal");
1327             s.gssapiServer = true;
1328             assert(s.gssapiServer);
1329             s.gssapiServicePrincipal = "myServicePrincipal";
1330             assert(s.gssapiServicePrincipal == "myServicePrincipal");
1331         }
1332         s.handshakeInterval = 60000.msecs;
1333         assert(s.handshakeInterval == 60000.msecs);
1334 
1335         // Test write-only options
1336         s.conflate = true;
1337     }
1338 
1339     debug (WithCurveTests) @system unittest
1340     {
1341         // The CURVE key options require some special setup, so we test them
1342         // separately.
1343         import std.array, std.range;
1344         auto binKey1 = iota(cast(ubyte) 0, cast(ubyte) 32).array();
1345         auto z85Key1 = z85Encode(binKey1);
1346         auto binKey2 = iota(cast(ubyte) 32, cast(ubyte) 64).array();
1347         auto z85Key2 = z85Encode(binKey2);
1348         auto zeroKey = repeat(cast(ubyte) 0).take(32).array();
1349         assert (z85Key1 != z85Key2);
1350 
1351         auto s = Socket(SocketType.req);
1352         s.curvePublicKey = zeroKey;
1353         s.curveSecretKey = zeroKey;
1354         s.curveServerKey = zeroKey;
1355 
1356         s.curvePublicKey = binKey1;
1357         assert (s.curvePublicKey == binKey1);
1358         assert (s.curvePublicKeyZ85 == z85Key1);
1359         s.curvePublicKeyZ85 = z85Key2;
1360         assert (s.curvePublicKey == binKey2);
1361         assert (s.curvePublicKeyZ85 == z85Key2);
1362         assert (s.curveSecretKey == zeroKey);
1363         assert (s.curveServerKey == zeroKey);
1364         s.curvePublicKey = zeroKey;
1365 
1366         s.curveSecretKey = binKey1;
1367         assert (s.curveSecretKey == binKey1);
1368         assert (s.curveSecretKeyZ85 == z85Key1);
1369         s.curveSecretKeyZ85 = z85Key2;
1370         assert (s.curveSecretKey == binKey2);
1371         assert (s.curveSecretKeyZ85 == z85Key2);
1372         assert (s.curvePublicKey == zeroKey);
1373         assert (s.curveServerKey == zeroKey);
1374         s.curveSecretKey = zeroKey;
1375 
1376         s.curveServerKey = binKey1;
1377         assert (s.curveServerKey == binKey1);
1378         assert (s.curveServerKeyZ85 == z85Key1);
1379         s.curveServerKeyZ85 = z85Key2;
1380         assert (s.curveServerKey == binKey2);
1381         assert (s.curveServerKeyZ85 == z85Key2);
1382         assert (s.curvePublicKey == zeroKey);
1383         assert (s.curveSecretKey == zeroKey);
1384         s.curveServerKey = zeroKey;
1385     }
1386 
1387     unittest
1388     {
1389         // Some options are only applicable to specific socket types.
1390         auto rt = Socket(SocketType.router);
1391         rt.routerMandatory = true;
1392         rt.probeRouter = true;
1393         rt.routerHandover = true;
1394         rt.connectionRID = cast(ubyte[]) [ 10, 20, 30 ];
1395         auto xp = Socket(SocketType.xpub);
1396         xp.xpubVerbose = true;
1397         auto rq = Socket(SocketType.req);
1398         rq.reqCorrelate = true;
1399         rq.reqRelaxed = true;
1400     }
1401 
1402     deprecated unittest
1403     {
1404         // Test deprecated socket options
1405         auto s = Socket(SocketType.req);
1406         assert(s.ipv4Only);
1407         assert(!s.delayAttachOnConnect);
1408 
1409         // Test setters and getters together
1410         s.ipv4Only = false;
1411         assert(!s.ipv4Only);
1412         s.delayAttachOnConnect = true;
1413         assert(s.delayAttachOnConnect);
1414     }
1415 
1416     /**
1417     Establishes a message filter.
1418 
1419     Throws:
1420         $(REF ZmqException) if $(ZMQ) reports an error.
1421     Corresponds_to:
1422         $(ZMQREF zmq_msg_setsockopt()) with $(D ZMQ_SUBSCRIBE).
1423     */
1424     void subscribe(const ubyte[] filterPrefix)
1425     {
1426         setArrayOption(ZMQ_SUBSCRIBE, filterPrefix);
1427     }
1428     /// ditto
1429     void subscribe(const  char[] filterPrefix)
1430     {
1431         setArrayOption(ZMQ_SUBSCRIBE, filterPrefix);
1432     }
1433 
1434     ///
1435     unittest
1436     {
1437         // Create a subscriber that accepts all messages that start with
1438         // the prefixes "foo" or "bar".
1439         auto sck = Socket(SocketType.sub);
1440         sck.subscribe("foo");
1441         sck.subscribe("bar");
1442     }
1443 
1444     @system unittest
1445     {
1446         void sleep(int ms) {
1447             import core.thread, core.time;
1448             Thread.sleep(dur!"msecs"(ms));
1449         }
1450         auto pub = Socket(SocketType.pub);
1451         pub.bind("inproc://zmqd_subscribe_unittest");
1452         auto sub = Socket(SocketType.sub);
1453         sub.connect("inproc://zmqd_subscribe_unittest");
1454 
1455         pub.send("Hello");
1456         sleep(100);
1457         sub.subscribe("He");
1458         sub.subscribe(cast(ubyte[])['W', 'o']);
1459         sleep(100);
1460         pub.send("Heeee");
1461         pub.send("World");
1462         sleep(100);
1463         ubyte[5] buf;
1464         sub.receive(buf);
1465         assert(buf.asString() == "Heeee");
1466         sub.receive(buf);
1467         assert(buf.asString() == "World");
1468     }
1469 
1470     /**
1471     Removes a message filter.
1472 
1473     Throws:
1474         $(REF ZmqException) if $(ZMQ) reports an error.
1475     Corresponds_to:
1476         $(ZMQREF zmq_msg_setsockopt()) with $(D ZMQ_SUBSCRIBE).
1477     */
1478     void unsubscribe(const ubyte[] filterPrefix)
1479     {
1480         setArrayOption(ZMQ_UNSUBSCRIBE, filterPrefix);
1481     }
1482     /// ditto
1483     void unsubscribe(const char[] filterPrefix)
1484     {
1485         setArrayOption(ZMQ_UNSUBSCRIBE, filterPrefix);
1486     }
1487 
1488     ///
1489     unittest
1490     {
1491         // Subscribe to messages that start with "foo" or "bar".
1492         auto sck = Socket(SocketType.sub);
1493         sck.subscribe("foo");
1494         sck.subscribe("bar");
1495         // ...
1496         // From now on, only accept messages that start with "bar"
1497         sck.unsubscribe("foo");
1498     }
1499 
1500     /**
1501     Spawns a PAIR socket that publishes socket state changes (_events) over
1502     the INPROC transport to the given _endpoint.
1503 
1504     Which event types should be published may be selected by bitwise-ORing
1505     together different $(REF EventType) flags in the $(D events) parameter.
1506 
1507     Throws:
1508         $(REF ZmqException) if $(ZMQ) reports an error.
1509     Corresponds_to:
1510         $(ZMQREF zmq_socket_monitor())
1511     See_also:
1512         $(FREF receiveEvent), which receives and parses event messages.
1513     */
1514     void monitor(const char[] endpoint, EventType events = EventType.all)
1515     {
1516         if (trusted!zmq_socket_monitor(m_socket, zeroTermString(endpoint), events) < 0) {
1517             throw new ZmqException;
1518         }
1519     }
1520 
1521     ///
1522     unittest
1523     {
1524         auto sck = Socket(SocketType.pub);
1525         sck.monitor("inproc://zmqd_monitor_unittest",
1526                     EventType.accepted | EventType.closed);
1527     }
1528 
1529     /**
1530     The $(D void*) pointer used by the underlying C API to refer to the socket.
1531 
1532     If the object has not been initialized, this function returns $(D null).
1533     */
1534     @property inout(void)* handle() inout pure nothrow
1535     {
1536         return m_socket;
1537     }
1538 
1539     /**
1540     Whether this $(REF Socket) object has been _initialized, i.e. whether it
1541     refers to a valid $(ZMQ) socket.
1542     */
1543     @property bool initialized() const pure nothrow
1544     {
1545         return m_socket != null;
1546     }
1547 
1548     ///
1549     unittest
1550     {
1551         Socket sck;
1552         assert (!sck.initialized);
1553         sck = Socket(SocketType.sub);
1554         assert (sck.initialized);
1555         sck.close();
1556         assert (!sck.initialized);
1557     }
1558 
1559 private:
1560     // Helper function for ~this() and close()
1561     bool nothrowClose() nothrow
1562     {
1563         if (m_socket != null) {
1564             if (trusted!zmq_close(m_socket) != 0) return false;
1565             m_socket = null;
1566         }
1567         return true;
1568     }
1569 
1570     import std.traits;
1571 
1572     T getOption(T)(int option) @trusted
1573         if (isScalarType!T)
1574     {
1575         T buf;
1576         auto len = T.sizeof;
1577         if (zmq_getsockopt(m_socket, option, &buf, &len) != 0) {
1578             throw new ZmqException;
1579         }
1580         assert(len == T.sizeof);
1581         return buf;
1582     }
1583 
1584     void setOption(T)(int option, T value) @trusted
1585         if (isScalarType!T)
1586     {
1587         if (zmq_setsockopt(m_socket, option, &value, value.sizeof) != 0) {
1588             throw new ZmqException;
1589         }
1590     }
1591 
1592     T[] getArrayOption(T)(int option, T[] buf) @trusted
1593         if (isScalarType!T)
1594     {
1595         static assert (T.sizeof == 1);
1596         auto len = buf.length;
1597         if (zmq_getsockopt(m_socket, option, buf.ptr, &len) != 0) {
1598             throw new ZmqException;
1599         }
1600         return buf[0 .. len];
1601     }
1602 
1603     void setArrayOption()(int option, const void[] value)
1604     {
1605         if (trusted!zmq_setsockopt(m_socket, option, ptr(value), value.length) != 0) {
1606             throw new ZmqException;
1607         }
1608     }
1609 
1610     char[] getCStringOption(int option, char[] buf)
1611     {
1612         auto ret = getArrayOption(option, buf);
1613         assert (ret.length && ret[$-1] == '\0');
1614         return ret[0 .. $-1];
1615     }
1616 
1617     enum : size_t
1618     {
1619         keySizeBin    = 32,
1620         keyBufSizeBin = keySizeBin,
1621         keySizeZ85    = 40,
1622         keyBufSizeZ85 = keySizeZ85 + 1,
1623     }
1624 
1625     ubyte[] getCurveKey(int option, ubyte[] buf)
1626     {
1627         if (buf.length < keyBufSizeBin) {
1628             import core.exception: RangeError;
1629             throw new RangeError;
1630         }
1631         return getArrayOption(option, buf[0 .. keyBufSizeBin]);
1632     }
1633 
1634     char[] getCurveKeyZ85(int option, char[] buf)
1635     {
1636         if (buf.length < keyBufSizeZ85) {
1637             import core.exception: RangeError;
1638             throw new RangeError;
1639         }
1640         return getCStringOption(option, buf[0 .. keyBufSizeZ85]);
1641     }
1642 
1643     void setCurveKey(int option, const ubyte[] value)
1644     {
1645         if (value.length != keySizeBin) throw new Exception("Invalid key size");
1646         setArrayOption(option, value);
1647     }
1648 
1649     void setCurveKeyZ85(int option, const char[] value)
1650     {
1651         if (value.length != keySizeZ85) throw new Exception("Invalid key size");
1652         setArrayOption(option, value);
1653     }
1654 
1655     Context m_context;
1656     SocketType m_type;
1657     void* m_socket;
1658 }
1659 
1660 unittest
1661 {
1662     auto s1 = Socket(SocketType.pair);
1663     auto s2 = Socket(SocketType.pair);
1664     s1.bind("inproc://unittest");
1665     s2.connect("inproc://unittest");
1666     s1.send("Hello World!");
1667     ubyte[12] buf;
1668     const len = s2.receive(buf[]);
1669     assert (len == 12);
1670     assert (buf == "Hello World!");
1671 }
1672 
1673 
1674 version (Windows) {
1675     alias PlatformFD = SOCKET;
1676 } else version (Posix) {
1677     alias PlatformFD = int;
1678 }
1679 
1680 /**
1681 The native socket file descriptor type.
1682 
1683 This is an alias for $(D SOCKET) on Windows and $(D int) on POSIX systems.
1684 */
1685 alias FD = PlatformFD;
1686 
1687 
1688 /**
1689 Starts the built-in $(ZMQ) _proxy.
1690 
1691 This function never returns normally, but it may throw an exception.  This could
1692 happen if the context associated with either of the specified sockets is
1693 manually destroyed in a different thread.
1694 
1695 Throws:
1696     $(REF ZmqException) if $(ZMQ) reports an error.
1697 Corresponds_to:
1698     $(ZMQREF zmq_proxy())
1699 See_Also:
1700     $(FREF steerableProxy)
1701 */
1702 void proxy(ref Socket frontend, ref Socket backend)
1703 {
1704     const rc = trusted!zmq_proxy(frontend.handle, backend.handle, null);
1705     assert (rc == -1);
1706     throw new ZmqException;
1707 }
1708 
1709 /// ditto
1710 void proxy(ref Socket frontend, ref Socket backend, ref Socket capture)
1711 {
1712     const rc = trusted!zmq_proxy(frontend.handle, backend.handle, capture.handle);
1713     assert (rc == -1);
1714     throw new ZmqException;
1715 }
1716 
1717 
1718 /**
1719 Starts the built-in $(ZMQ) proxy with _control flow.
1720 
1721 Note that the order of the two last parameters is reversed compared to
1722 $(ZMQREF zmq_proxy_steerable()).  That is, the $(D control) socket always
1723 comes before the $(D capture) socket.  Furthermore, unlike in $(ZMQ),
1724 $(D control) is mandatory.  (Without the _control socket one can simply
1725 use $(FREF proxy).)
1726 
1727 Throws:
1728     $(REF ZmqException) if $(ZMQ) reports an error.
1729 Corresponds_to:
1730     $(ZMQREF zmq_proxy_steerable())
1731 See_Also:
1732     $(FREF proxy)
1733 */
1734 void steerableProxy(ref Socket frontend, ref Socket backend, ref Socket control)
1735 {
1736     const rc = trusted!zmq_proxy_steerable(
1737         frontend.handle, backend.handle, null, control.handle);
1738     if (rc == -1) throw new ZmqException;
1739 }
1740 
1741 /// ditto
1742 void steerableProxy(ref Socket frontend, ref Socket backend, ref Socket control, ref Socket capture)
1743 {
1744     const rc = trusted!zmq_proxy_steerable(
1745         frontend.handle, backend.handle, capture.handle, control.handle);
1746     if (rc == -1) throw new ZmqException;
1747 }
1748 
1749 @system unittest
1750 {
1751     import core.thread;
1752     auto t = new Thread(() {
1753         auto frontend = Socket(SocketType.router);
1754         frontend.bind("inproc://zmqd_steerableProxy_unittest_fe");
1755         auto backend  = Socket(SocketType.dealer);
1756         backend.bind("inproc://zmqd_steerableProxy_unittest_be");
1757         auto controllee = Socket(SocketType.pair);
1758         controllee.bind("inproc://zmqd_steerableProxy_unittest_ctl");
1759         steerableProxy(frontend, backend, controllee);
1760     });
1761     t.start();
1762     auto client = Socket(SocketType.req);
1763     client.connect("inproc://zmqd_steerableProxy_unittest_fe");
1764     auto server  = Socket(SocketType.rep);
1765     server.connect("inproc://zmqd_steerableProxy_unittest_be");
1766     auto controller = Socket(SocketType.pair);
1767     controller.connect("inproc://zmqd_steerableProxy_unittest_ctl");
1768 
1769     auto cf = Frame(1);
1770     cf.data[0] = 86;
1771     client.send(cf);
1772     auto sf = Frame();
1773     server.receive(sf);
1774     assert(sf.size == 1 && sf.data[0] == 86);
1775     sf.data[0] = 87;
1776     server.send(sf);
1777     client.receive(cf);
1778     assert(cf.size == 1 && cf.data[0] == 87);
1779 
1780     controller.send("TERMINATE");
1781     t.join();
1782 }
1783 
1784 @system unittest
1785 {
1786     import core.thread;
1787     auto t = new Thread(() {
1788         auto frontend = Socket(SocketType.pull);
1789         frontend.bind("inproc://zmqd_steerableProxy2_unittest_fe");
1790         auto backend  = Socket(SocketType.push);
1791         backend.bind("inproc://zmqd_steerableProxy2_unittest_be");
1792         auto controllee = Socket(SocketType.pair);
1793         controllee.bind("inproc://zmqd_steerableProxy2_unittest_ctl");
1794         auto capture = Socket(SocketType.push);
1795         capture.bind("inproc://zmqd_steerableProxy2_unittest_cpt");
1796         steerableProxy(frontend, backend, controllee, capture);
1797     });
1798     t.start();
1799     auto client = Socket(SocketType.push);
1800     client.connect("inproc://zmqd_steerableProxy2_unittest_fe");
1801     auto server  = Socket(SocketType.pull);
1802     server.connect("inproc://zmqd_steerableProxy2_unittest_be");
1803     auto controller = Socket(SocketType.pair);
1804     controller.connect("inproc://zmqd_steerableProxy2_unittest_ctl");
1805     auto capturer = Socket(SocketType.pull);
1806     capturer.connect("inproc://zmqd_steerableProxy2_unittest_cpt");
1807 
1808     auto cf = Frame(1);
1809     cf.data[0] = 86;
1810     client.send(cf);
1811     auto sf = Frame();
1812     server.receive(sf);
1813     assert(sf.size == 1 && sf.data[0] == 86);
1814     auto pf = Frame();
1815     capturer.receive(pf);
1816     assert(pf.size == 1 && pf.data[0] == 86);
1817 
1818     controller.send("TERMINATE");
1819     t.join();
1820 }
1821 
1822 
1823 deprecated("zmqd.poll() has a new signature as of v0.4")
1824 uint poll(zmq_pollitem_t[] items, Duration timeout = infiniteDuration)
1825 {
1826     import std.conv: to;
1827     const n = trusted!zmq_poll(
1828         ptr(items),
1829         to!int(items.length),
1830         timeout == infiniteDuration ? -1 : to!int(timeout.total!"msecs"()));
1831     if (n < 0) throw new ZmqException;
1832     return cast(uint) n;
1833 }
1834 
1835 
1836 /**
1837 Input/output multiplexing.
1838 
1839 The $(D timeout) parameter may have the special value $(REF infiniteDuration)
1840 which means no _timeout.  This is translated to an argument value of -1 in the
1841 C API.
1842 
1843 Returns:
1844     The number of $(REF PollItem) structures with events signalled in
1845     $(REF PollItem.returnedEvents), or 0 if no events have been signalled.
1846 Throws:
1847     $(REF ZmqException) if $(ZMQ) reports an error.
1848 Corresponds_to:
1849     $(ZMQREF zmq_poll())
1850 */
1851 uint poll(PollItem[] items, Duration timeout = infiniteDuration) @trusted
1852 {
1853     // Here we use a trick where we pretend the array of PollItems is
1854     // actually an array of zmq_pollitem_t, to avoid an unnecessary
1855     // allocation.  For this to work, PollItem must have the exact
1856     // same size as zmq_pollitem_t.
1857     static assert (PollItem.sizeof == zmq_pollitem_t.sizeof);
1858 
1859     import std.conv: to;
1860     const n = zmq_poll(
1861         cast(zmq_pollitem_t*) items.ptr,
1862         to!int(items.length),
1863         timeout == infiniteDuration ? -1 : to!int(timeout.total!"msecs"()));
1864     if (n < 0) throw new ZmqException;
1865     return cast(uint) n;
1866 }
1867 
1868 
1869 ///
1870 @system unittest
1871 {
1872     auto socket1 = zmqd.Socket(zmqd.SocketType.pull);
1873     socket1.bind("inproc://zmqd_poll_example");
1874 
1875     import std.socket;
1876     auto socket2 = new std.socket.Socket(
1877         AddressFamily.INET,
1878         std.socket.SocketType.DGRAM);
1879     socket2.bind(new InternetAddress(InternetAddress.ADDR_ANY, 5678));
1880 
1881     auto socket3 = zmqd.Socket(zmqd.SocketType.push);
1882     socket3.connect("inproc://zmqd_poll_example");
1883     socket3.send("test");
1884 
1885     import core.thread: Thread;
1886     Thread.sleep(10.msecs);
1887 
1888     auto items = [
1889         PollItem(socket1, PollFlags.pollIn),
1890         PollItem(socket2, PollFlags.pollIn | PollFlags.pollOut),
1891         PollItem(socket3, PollFlags.pollIn),
1892     ];
1893 
1894     const n = poll(items, 100.msecs);
1895     assert (n == 2);
1896     assert (items[0].returnedEvents == PollFlags.pollIn);
1897     assert (items[1].returnedEvents == PollFlags.pollOut);
1898     assert (items[2].returnedEvents == 0);
1899     socket2.close();
1900 }
1901 
1902 
1903 /**
1904 $(FREF poll) event flags.
1905 
1906 These are described in the $(ZMQREF zmq_poll()) manual.
1907 */
1908 enum PollFlags
1909 {
1910     pollIn = ZMQ_POLLIN,    /// Corresponds to $(D ZMQ_POLLIN)
1911     pollOut = ZMQ_POLLOUT,  /// Corresponds to $(D ZMQ_POLLOUT)
1912     pollErr = ZMQ_POLLERR,  /// Corresponds to $(D ZMQ_POLLERR)
1913 }
1914 
1915 
1916 /++
1917 A structure that specifies a socket to be monitored by $(FREF poll) as well
1918 as the events to poll for, and, when $(FREF poll) returns, the events that
1919 occurred.
1920 
1921 Warning:
1922     $(D PollItem) objects do not store $(STDREF socket,Socket) references,
1923     only the corresponding native file descriptors.  This means that the
1924     references have to be stored elsewhere, or the objects may be garbage
1925     collected, invalidating the sockets before or while $(FREF poll) executes.
1926     ---
1927     // Not OK
1928     auto p1 = PollItem(new std.socket.Socket(/*...*/), PollFlags.pollIn);
1929 
1930     // OK
1931     auto s = new std.socket.Socket(/*...*/);
1932     auto p2 = PollItem(s, PollFlags.pollIn);
1933     ---
1934 Corresponds_to:
1935     $(D $(ZMQAPI zmq_poll,zmq_pollitem_t))
1936 +/
1937 struct PollItem
1938 {
1939     /// Constructs a $(REF PollItem) for monitoring a $(ZMQ) socket.
1940     this(ref zmqd.Socket socket, PollFlags events) nothrow
1941     {
1942         m_pollItem = zmq_pollitem_t(socket.handle, 0, cast(short) events, 0);
1943     }
1944 
1945     import std.socket;
1946     /**
1947     Constructs a $(REF PollItem) for monitoring a standard socket referenced
1948     by a $(STDREF socket,Socket).
1949     */
1950     this(std.socket.Socket socket, PollFlags events) @system
1951     {
1952         this(socket.handle, events);
1953     }
1954 
1955     /**
1956     Constructs a $(REF PollItem) for monitoring a standard socket referenced
1957     by a native file descriptor.
1958     */
1959     this(FD fd, PollFlags events) pure nothrow
1960     {
1961         m_pollItem = zmq_pollitem_t(null, fd, cast(short) events, 0);
1962     }
1963 
1964     /**
1965     Requested _events.
1966 
1967     Corresponds_to:
1968         $(D $(ZMQAPI zmq_poll,zmq_pollitem_t.events))
1969     */
1970     @property void requestedEvents(PollFlags events) pure nothrow
1971     {
1972         m_pollItem.events = cast(short) events;
1973     }
1974 
1975     /// ditto
1976     @property PollFlags requestedEvents() const pure nothrow
1977     {
1978         return cast(typeof(return)) m_pollItem.events;
1979     }
1980 
1981     /**
1982     Returned events.
1983 
1984     Corresponds_to:
1985         $(D $(ZMQAPI zmq_poll,zmq_pollitem_t.revents))
1986     */
1987     @property PollFlags returnedEvents() const pure nothrow
1988     {
1989         return cast(typeof(return)) m_pollItem.revents;
1990     }
1991 
1992 private:
1993     zmq_pollitem_t m_pollItem;
1994 }
1995 
1996 
1997 /**
1998 An object that encapsulates a $(ZMQ) message frame.
1999 
2000 This $(D struct) is a wrapper around a $(D zmq_msg_t) object.
2001 A default-initialized $(D Frame) is not a valid $(ZMQ) message frame; it
2002 should always be explicitly initialized upon construction using
2003 $(FREF _Frame.opCall).  Alternatively, it may be initialized later with
2004 $(FREF _Frame.rebuild).
2005 ---
2006 Frame msg1;                 // Invalid frame
2007 auto msg2 = Frame();        // Empty frame
2008 auto msg3 = Frame(1024);    // 1K frame
2009 msg1.rebuild(2048);         // msg1 now has size 2K
2010 msg2.rebuild(2048);         // ...and so does msg2
2011 ---
2012 When a $(D Frame) object is destroyed, $(ZMQREF zmq_msg_close()) is
2013 called on the underlying $(D zmq_msg_t).
2014 
2015 A $(D Frame) cannot be copied by normal assignment; use $(FREF _Frame.copy)
2016 for this.
2017 */
2018 struct Frame
2019 {
2020 @safe:
2021     /**
2022     Initializes an empty $(ZMQ) message frame.
2023 
2024     Throws:
2025         $(REF ZmqException) if $(ZMQ) reports an error.
2026     Corresponds_to:
2027         $(ZMQREF zmq_msg_init())
2028     */
2029     static Frame opCall()
2030     {
2031         Frame f;
2032         f.init();
2033         return f;
2034     }
2035 
2036     ///
2037     unittest
2038     {
2039         auto msg = Frame();
2040         assert(msg.size == 0);
2041     }
2042 
2043     /** $(ANCHOR Frame.opCall_size)
2044     Initializes a $(ZMQ) message frame of the specified _size.
2045 
2046     Throws:
2047         $(REF ZmqException) if $(ZMQ) reports an error.
2048     Corresponds_to:
2049         $(ZMQREF zmq_msg_init_size())
2050     */
2051     static Frame opCall(size_t size)
2052     {
2053         Frame m;
2054         m.init(size);
2055         return m;
2056     }
2057 
2058     ///
2059     unittest
2060     {
2061         auto msg = Frame(123);
2062         assert(msg.size == 123);
2063     }
2064 
2065     /** $(ANCHOR Frame.opCall_data)
2066     Initializes a $(ZMQ) message frame from a supplied buffer.
2067 
2068     If $(D free) is not specified, $(D data) $(EM must) refer to a slice of
2069     memory which has been allocated by the garbage collector (typically using
2070     operator $(D new)).  Ownership will then be transferred temporarily to
2071     $(ZMQ), and then transferred back to the GC when $(ZMQ) is done using the
2072     buffer.
2073 
2074     For memory which is $(EM not) garbage collected, the argument $(D free)
2075     must be a pointer to a function that will release the memory, which will
2076     be called when $(ZMQ) no longer needs the buffer.  $(D free) must point to
2077     a function with the following signature:
2078     ---
2079     extern(C) void f(void* d, void* h) nothrow;
2080     ---
2081     When it is called, $(D d) will be equal to $(D data.ptr), while $(D h) will
2082     be equal to $(D hint) (which is optional and null by default).
2083 
2084     $(D free) is passed directly to the underlying $(ZMQ) C function, which is
2085     why it needs to be $(D extern(C)).
2086 
2087     Warning:
2088         Some care must be taken when using this function, as there is no telling
2089         when, whether, or in which thread $(ZMQ) relinquishes ownership of the
2090         buffer.  Client code should therefore avoid retaining any references to
2091         it, including slices that contain, overlap with or are contained in
2092         $(D data).  For the "non-GC" version, client code should in general not
2093         retain slices or pointers to any memory which will be released when
2094         $(D free) is called.
2095     See_Also:
2096         $(REF Frame.FreeData)
2097     Throws:
2098         $(REF ZmqException) if $(ZMQ) reports an error.
2099     Corresponds_to:
2100         $(ZMQREF zmq_msg_init_data())
2101     */
2102     static Frame opCall(ubyte[] data) @system
2103     {
2104         Frame m;
2105         m.init(data);
2106         return m;
2107     }
2108 
2109     /// ditto
2110     static Frame opCall(ubyte[] data, FreeData free, void* hint = null) @system
2111     {
2112         Frame m;
2113         m.init(data, free, hint);
2114         return m;
2115     }
2116 
2117     ///
2118     @system unittest
2119     {
2120         // Garbage-collected memory
2121         auto buf = new ubyte[123];
2122         auto msg = Frame(buf);
2123         assert(msg.size == buf.length);
2124         assert(msg.data.ptr == buf.ptr);
2125     }
2126 
2127     ///
2128     @system unittest
2129     {
2130         // Manually managed memory
2131         import core.stdc.stdlib: malloc, free;
2132         static extern(C) void myFree(void* data, void* hint) nothrow { free(data); }
2133         auto buf = (cast(ubyte*) malloc(10))[0 .. 10];
2134         auto msg = Frame(buf, &myFree);
2135         assert(msg.size == buf.length);
2136         assert(msg.data.ptr == buf.ptr);
2137     }
2138 
2139     @system unittest
2140     {
2141         // Non-documentation unittest.  Let's see if the data *actually* gets freed.
2142         ubyte buffer = 81;
2143         bool released = false;
2144         static extern(C) void myFree(void* data, void* hint) nothrow
2145         {
2146             auto typedData = cast(ubyte*) data;
2147             auto typedHint = cast(bool*) hint;
2148             assert(*typedData == 81);
2149             assert(!*typedHint);
2150             *typedData = 0;
2151             *typedHint = true;
2152         }
2153         // New scope, so the frame gets released at the end.
2154         {
2155             auto frame = Frame((&buffer)[0 .. 1], &myFree, &released);
2156             assert(frame.size == 1);
2157             assert(frame.data.ptr == &buffer);
2158             assert(buffer == 81);
2159             assert(!released);
2160         }
2161         assert(buffer == 0);
2162         assert(released);
2163     }
2164 
2165     /**
2166     The function pointer type for memory-freeing callback functions passed to
2167     $(D $(LINK2 #Frame.opCall_data,Frame(ubyte[], _FreeData, void*))).
2168     */
2169     @system alias extern(C) void function(void*, void*) nothrow FreeData;
2170 
2171     /**
2172     Reinitializes the Frame object as an empty message.
2173 
2174     This function will first call $(FREF Frame.close) to release the
2175     resources associated with the message frame, and then it will
2176     initialize it anew, exactly as if it were constructed  with
2177     $(D $(LINK2 #Frame.opCall,Frame())).
2178 
2179     Throws:
2180         $(REF ZmqException) if $(ZMQ) reports an error.
2181     Corresponds_to:
2182         $(ZMQREF zmq_msg_close()) followed by $(ZMQREF zmq_msg_init())
2183     */
2184     void rebuild()
2185     {
2186         close();
2187         init();
2188     }
2189 
2190     ///
2191     unittest
2192     {
2193         auto msg = Frame(256);
2194         assert (msg.size == 256);
2195         msg.rebuild();
2196         assert (msg.size == 0);
2197     }
2198 
2199     /**
2200     Reinitializes the Frame object to a specified size.
2201 
2202     This function will first call $(FREF Frame.close) to release the
2203     resources associated with the message frame, and then it will
2204     initialize it anew, exactly as if it were constructed  with
2205     $(D $(LINK2 #Frame.opCall_size,Frame(size))).
2206 
2207     Throws:
2208         $(REF ZmqException) if $(ZMQ) reports an error.
2209     Corresponds_to:
2210         $(ZMQREF zmq_msg_close()) followed by $(ZMQREF zmq_msg_init_size()).
2211     */
2212     void rebuild(size_t size)
2213     {
2214         close();
2215         init(size);
2216     }
2217 
2218     ///
2219     unittest
2220     {
2221         auto msg = Frame(256);
2222         assert (msg.size == 256);
2223         msg.rebuild(1024);
2224         assert (msg.size == 1024);
2225     }
2226 
2227     /**
2228     Reinitializes the Frame object from a supplied buffer.
2229 
2230     This function will first call $(FREF Frame.close) to release the
2231     resources associated with the message frame, and then it will
2232     initialize it anew, exactly as if it were constructed  with
2233     $(D Frame(data)) or $(D Frame(data, free, hint)).
2234 
2235     Some care must be taken when using these functions.  Please read the
2236     $(D $(LINK2 #Frame.opCall_data,Frame(ubyte[]))) documentation.
2237 
2238     Throws:
2239         $(REF ZmqException) if $(ZMQ) reports an error.
2240     Corresponds_to:
2241         $(ZMQREF zmq_msg_close()) followed by $(ZMQREF zmq_msg_init_data()).
2242     */
2243     void rebuild(ubyte[] data) @system
2244     {
2245         close();
2246         init(data);
2247     }
2248 
2249     /// ditto
2250     void rebuild(ubyte[] data, FreeData free, void* hint = null) @system
2251     {
2252         close();
2253         init(data, free, hint);
2254     }
2255 
2256     ///
2257     @system unittest
2258     {
2259         // Garbage-collected memory
2260         auto msg = Frame(256);
2261         assert (msg.size == 256);
2262         auto buf = new ubyte[123];
2263         msg.rebuild(buf);
2264         assert(msg.size == buf.length);
2265         assert(msg.data.ptr == buf.ptr);
2266     }
2267 
2268     ///
2269     @system unittest
2270     {
2271         // Manually managed memory
2272         import core.stdc.stdlib: malloc, free;
2273         static extern(C) void myFree(void* data, void* hint) nothrow { free(data); }
2274 
2275         auto msg = Frame(256);
2276         assert (msg.size == 256);
2277         auto buf = (cast(ubyte*) malloc(10))[0 .. 10];
2278         msg.rebuild(buf, &myFree);
2279         assert(msg.size == buf.length);
2280         assert(msg.data.ptr == buf.ptr);
2281     }
2282 
2283     @disable this(this);
2284 
2285     /**
2286     Releases the $(ZMQ) message frame when the $(D Frame) is destroyed.
2287 
2288     This destructor never throws, which means that any errors will go
2289     undetected.  If this is undesirable, call $(FREF Frame.close) before
2290     the $(D Frame) is destroyed.
2291 
2292     Corresponds_to:
2293         $(ZMQREF zmq_msg_close())
2294     */
2295     ~this() nothrow
2296     {
2297         if (m_initialized) {
2298             immutable rc = trusted!zmq_msg_close(&m_msg);
2299             assert(rc == 0, "zmq_msg_close failed: Invalid message frame");
2300         }
2301     }
2302 
2303     /**
2304     Releases the $(ZMQ) message frame.
2305 
2306     Note that the frame will be automatically released when the $(D Frame)
2307     object is destroyed, so it is often not necessary to call this method
2308     manually.
2309 
2310     Throws:
2311         $(REF ZmqException) if $(ZMQ) reports an error.
2312     Corresponds_to:
2313         $(ZMQREF zmq_msg_close())
2314     */
2315     void close()
2316     {
2317         if (m_initialized) {
2318             if (trusted!zmq_msg_close(&m_msg) != 0) {
2319                 throw new ZmqException;
2320             }
2321             m_initialized = false;
2322         }
2323     }
2324 
2325     /**
2326     Copies frame content to another message frame.
2327 
2328     $(D copy()) returns a new $(D Frame) object, while $(D copyTo(dest))
2329     copies the contents of this $(D Frame) into $(D dest).  $(D dest) must
2330     be a valid (i.e. initialized) $(D Frame).
2331 
2332     Warning:
2333         These functions may not do what you think they do.  Please refer
2334         to $(ZMQAPI zmq_msg_copy(),the $(ZMQ) manual) for details.
2335     Throws:
2336         $(REF ZmqException) if $(ZMQ) reports an error.
2337     Corresponds_to:
2338         $(ZMQREF zmq_msg_copy())
2339     */
2340     Frame copy()
2341         in { assert(m_initialized); }
2342         body
2343     {
2344         auto cp = Frame();
2345         copyTo(cp);
2346         return cp;
2347     }
2348 
2349     /// ditto
2350     void copyTo(ref Frame dest)
2351         in { assert(m_initialized); }
2352         body
2353     {
2354         if (trusted!zmq_msg_copy(&dest.m_msg, &m_msg) != 0) {
2355             throw new ZmqException;
2356         }
2357     }
2358 
2359     ///
2360     unittest
2361     {
2362         import std.string: representation;
2363         auto msg1 = Frame(3);
2364         msg1.data[] = "foo".representation;
2365         auto msg2 = msg1.copy();
2366         assert (msg2.data.asString() == "foo");
2367     }
2368 
2369     /**
2370     Moves frame content to another message frame.
2371 
2372     $(D move()) returns a new $(D Frame) object, while $(D moveTo(dest))
2373     moves the contents of this $(D Frame) to $(D dest).  $(D dest) must
2374     be a valid (i.e. initialized) $(D Frame).
2375 
2376     Throws:
2377         $(REF ZmqException) if $(ZMQ) reports an error.
2378     Corresponds_to:
2379         $(ZMQREF zmq_msg_move())
2380     */
2381     Frame move()
2382         in { assert(m_initialized); }
2383         body
2384     {
2385         auto m = Frame();
2386         moveTo(m);
2387         return m;
2388     }
2389 
2390     /// ditto
2391     void moveTo(ref Frame dest)
2392         in { assert(m_initialized); }
2393         body
2394     {
2395         if (trusted!zmq_msg_move(&dest.m_msg, &m_msg) != 0) {
2396             throw new ZmqException;
2397         }
2398     }
2399 
2400     ///
2401     unittest
2402     {
2403         import std.string: representation;
2404         auto msg1 = Frame(3);
2405         msg1.data[] = "foo".representation;
2406         auto msg2 = msg1.move();
2407         assert (msg1.size == 0);
2408         assert (msg2.data.asString() == "foo");
2409     }
2410 
2411     /**
2412     The message frame content _size in bytes.
2413 
2414     Corresponds_to:
2415         $(ZMQREF zmq_msg_size())
2416     */
2417     @property size_t size() nothrow
2418         in { assert(m_initialized); }
2419         body
2420     {
2421         return trusted!zmq_msg_size(&m_msg);
2422     }
2423 
2424     ///
2425     unittest
2426     {
2427         auto msg = Frame(123);
2428         assert(msg.size == 123);
2429     }
2430 
2431     /**
2432     Retrieves the message frame content.
2433 
2434     Corresponds_to:
2435         $(ZMQREF zmq_msg_data())
2436     */
2437     @property ubyte[] data() @trusted nothrow
2438         in { assert(m_initialized); }
2439         body
2440     {
2441         return (cast(ubyte*) zmq_msg_data(&m_msg))[0 .. size];
2442     }
2443 
2444     ///
2445     unittest
2446     {
2447         import std.string: representation;
2448         auto msg = Frame(3);
2449         assert(msg.data.length == 3);
2450         msg.data[] = "foo".representation; // Slice operator -> array copy.
2451         assert(msg.data.asString() == "foo");
2452     }
2453 
2454     /**
2455     Whether there are _more message frames to retrieve.
2456 
2457     Corresponds_to:
2458         $(ZMQREF zmq_msg_more())
2459     */
2460     @property bool more() nothrow
2461         in { assert(m_initialized); }
2462         body
2463     {
2464         return !!trusted!zmq_msg_more(&m_msg);
2465     }
2466 
2467     /**
2468     The file descriptor of the socket the message was read from.
2469 
2470     Throws:
2471         $(REF ZmqException) if $(ZMQ) reports an error.
2472     Corresponds_to:
2473         $(ZMQREF zmq_msg_get()) with $(D ZMQ_SRCFD).
2474     */
2475     @property FD sourceFD()
2476     {
2477         return cast(FD) getProperty(ZMQ_SRCFD);
2478     }
2479 
2480     /**
2481     Whether the message MAY share underlying storage with another copy.
2482 
2483     Throws:
2484         $(REF ZmqException) if $(ZMQ) reports an error.
2485     Corresponds_to:
2486         $(ZMQREF zmq_msg_get()) with $(D ZMQ_SHARED).
2487     */
2488     @property bool sharedStorage()
2489     {
2490         return !!getProperty(ZMQ_SHARED);
2491     }
2492 
2493     unittest
2494     {
2495         import std.string: representation;
2496         auto msg1 = Frame(100); // Big message to avoid small-string optimisation
2497         msg1.data[0 .. 3] = "foo".representation;
2498         assert (!msg1.sharedStorage);
2499         auto msg2 = msg1.copy();
2500         assert (msg2.sharedStorage); // Warning: The ZMQ docs only say that this MAY be true
2501         assert (msg2.data[0 .. 3].asString() == "foo");
2502     }
2503 
2504     /**
2505     Gets message _metadata.
2506 
2507     $(D metadataUnsafe()) is faster than $(D metadata()) because it
2508     directly returns the array which comes from $(ZMQREF zmq_msg_gets()),
2509     whereas the latter returns a freshly GC-allocated copy of it.  However,
2510     the array returned by $(D metadataUnsafe()) is owned by the underlying
2511     $(ZMQ) message and gets destroyed along with it, so care must be
2512     taken when using this function.
2513 
2514     Throws:
2515         $(REF ZmqException) if $(ZMQ) reports an error.
2516     Corresponds_to:
2517         $(ZMQREF zmq_msg_gets())
2518     */
2519     char[] metadata(const char[] property) @trusted
2520         in { assert(m_initialized); }
2521         body
2522     {
2523         return metadataUnsafe(property).dup;
2524     }
2525 
2526     /// ditto
2527     const(char)[] metadataUnsafe(const char[] property) @system
2528         in { assert(m_initialized); }
2529         body
2530     {
2531         if (auto value = zmq_msg_gets(handle, zeroTermString(property))) {
2532             import std.string: fromStringz;
2533             return fromStringz(value);
2534         } else {
2535             throw new ZmqException();
2536         }
2537     }
2538 
2539     /**
2540     A pointer to the underlying $(D zmq_msg_t).
2541     */
2542     @property inout(zmq_msg_t)* handle() inout pure nothrow
2543     {
2544         return &m_msg;
2545     }
2546 
2547 private:
2548     private void init()
2549         in { assert (!m_initialized); }
2550         out { assert (m_initialized); }
2551         body
2552     {
2553         if (trusted!zmq_msg_init(&m_msg) != 0) {
2554             throw new ZmqException;
2555         }
2556         m_initialized = true;
2557     }
2558 
2559     private void init(size_t size)
2560         in { assert (!m_initialized); }
2561         out { assert (m_initialized); }
2562         body
2563     {
2564         if (trusted!zmq_msg_init_size(&m_msg, size) != 0) {
2565             throw new ZmqException;
2566         }
2567         m_initialized = true;
2568     }
2569 
2570     private void init(ubyte[] data) @system
2571         in { assert (!m_initialized); }
2572         out { assert (m_initialized); }
2573         body
2574     {
2575         import core.memory;
2576         static extern(C) void zmqd_Frame_init_gcFree(void* dataPtr, void* block) nothrow
2577         {
2578             GC.removeRoot(dataPtr);
2579             GC.clrAttr(block, GC.BlkAttr.NO_MOVE);
2580         }
2581 
2582         GC.addRoot(data.ptr);
2583         scope(failure) GC.removeRoot(data.ptr);
2584 
2585         auto block = GC.addrOf(data.ptr);
2586         immutable movable = block && !(GC.getAttr(block) & GC.BlkAttr.NO_MOVE);
2587         GC.setAttr(block, GC.BlkAttr.NO_MOVE);
2588         scope(failure) if (movable) GC.clrAttr(block, GC.BlkAttr.NO_MOVE);
2589 
2590         init(data, &zmqd_Frame_init_gcFree, block);
2591     }
2592 
2593     void init(ubyte[] data, FreeData free, void* hint) @system
2594         in { assert (!m_initialized); }
2595         out { assert (m_initialized); }
2596         body
2597     {
2598         if (zmq_msg_init_data(&m_msg, data.ptr, data.length, free, hint) != 0) {
2599             throw new ZmqException;
2600         }
2601         m_initialized = true;
2602     }
2603 
2604     int getProperty(int property) @trusted
2605     {
2606         const value = zmq_msg_get(&m_msg, property);
2607         if (value == -1) {
2608             throw new ZmqException;
2609         }
2610         return value;
2611     }
2612 
2613     bool m_initialized;
2614     zmq_msg_t m_msg;
2615 }
2616 
2617 unittest
2618 {
2619     const url = uniqueUrl("inproc");
2620     auto s1 = Socket(SocketType.pair);
2621     auto s2 = Socket(SocketType.pair);
2622     s1.bind(url);
2623     s2.connect(url);
2624 
2625     auto m1a = Frame(123);
2626     m1a.data[] = 'a';
2627     s1.send(m1a);
2628     auto m2a = Frame();
2629     s2.receive(m2a);
2630     assert(m2a.size == 123);
2631     foreach (e; m2a.data) assert(e == 'a');
2632 
2633     auto m1b = Frame(10);
2634     m1b.data[] = 'b';
2635     s1.send(m1b);
2636     auto m2b = Frame();
2637     s2.receive(m2b);
2638     assert(m2b.size == 10);
2639     foreach (e; m2b.data) assert(e == 'b');
2640 }
2641 
2642 deprecated("zmqd.Message has been renamed to zmqd.Frame") alias Message = Frame;
2643 
2644 
2645 /**
2646 A global context which is used by default by all sockets, unless they are
2647 explicitly constructed with a different context.
2648 
2649 The $(ZMQ) Guide $(LINK2 http://zguide.zeromq.org/page:all#Getting-the-Context-Right,
2650 has the following to say) about context creation:
2651 $(QUOTE
2652     You should create and use exactly one context in your process.
2653     [$(LDOTS)] If at runtime a process has two contexts, these are
2654     like separate $(ZMQ) instances. If that's explicitly what you
2655     want, OK, but otherwise remember: $(EM Do one $(D zmq_ctx_new())
2656     at the start of your main line code, and one $(D zmq_ctx_destroy())
2657     at the end.)
2658 )
2659 By using $(D defaultContext()), this is exactly what you achieve.  The
2660 context is created the first time the function is called, and is
2661 automatically destroyed when the program ends.
2662 
2663 This function is thread safe.
2664 
2665 Throws:
2666     $(REF ZmqException) if $(ZMQ) reports an error.
2667 See_also:
2668     $(REF Context)
2669 */
2670 Context defaultContext() @trusted
2671 {
2672     // For future reference: This is the low-lock singleton pattern. See:
2673     // http://davesdprogramming.wordpress.com/2013/05/06/low-lock-singletons/
2674     static bool instantiated;
2675     __gshared Context ctx;
2676     if (!instantiated) {
2677         synchronized {
2678             if (!ctx.initialized) {
2679                 ctx = Context();
2680             }
2681             instantiated = true;
2682         }
2683     }
2684     return ctx;
2685 }
2686 
2687 @system unittest
2688 {
2689     import core.thread;
2690     Context c1, c2;
2691     auto t = new Thread(() { c1 = defaultContext(); });
2692     t.start();
2693     c2 = defaultContext();
2694     t.join();
2695     assert(c1.handle !is null);
2696     assert(c1.handle == c2.handle);
2697 }
2698 
2699 
2700 /**
2701 An object that encapsulates a $(ZMQ) context.
2702 
2703 In most programs, it is not necessary to use this type directly,
2704 as $(REF Socket) will use a default global context if not explicitly
2705 provided with one.  See $(FREF defaultContext) for details.
2706 
2707 A default-initialized $(D Context) is not a valid $(ZMQ) context; it
2708 must always be explicitly initialized with $(FREF _Context.opCall):
2709 ---
2710 Context ctx;        // Not a valid context yet
2711 ctx = Context();    // ...but now it is.
2712 ---
2713 $(D Context) objects can be passed around by value, and two copies will
2714 refer to the same context.  The underlying context is managed using
2715 reference counting, so that when the last copy of a $(D Context) goes
2716 out of scope, the context is automatically destroyed.  The reference
2717 counting is performed in a thread safe manner, so that the same context
2718 can be shared between multiple threads.  ($(ZMQ) guarantees the thread
2719 safety of other context operations.)
2720 
2721 See_also:
2722     $(FREF defaultContext)
2723 */
2724 struct Context
2725 {
2726 @safe:
2727     /**
2728     Creates a new $(ZMQ) context.
2729 
2730     Returns:
2731         A $(REF Context) object that encapsulates the new context.
2732     Throws:
2733         $(REF ZmqException) if $(ZMQ) reports an error.
2734     Corresponds_to:
2735         $(ZMQREF zmq_ctx_new())
2736     */
2737     static Context opCall() @trusted // because of the cast
2738     {
2739         if (auto c = trusted!zmq_ctx_new()) {
2740             Context ctx;
2741             // Casting from/to shared is OK since ZMQ contexts are thread safe.
2742             static Exception release(shared(void)* ptr) @trusted nothrow
2743             {
2744                 return zmq_ctx_term(cast(void*) ptr) == 0
2745                     ? null
2746                     : new ZmqException;
2747             }
2748             ctx.m_resource = SharedResource(cast(shared) c, &release);
2749             return ctx;
2750         } else {
2751             throw new ZmqException;
2752         }
2753     }
2754 
2755     ///
2756     unittest
2757     {
2758         auto ctx = Context();
2759         assert (ctx.initialized);
2760     }
2761 
2762     /**
2763     Detaches from the $(ZMQ) context.
2764 
2765     If this is the last reference to the context, it will be destroyed with
2766     $(ZMQREF zmq_ctx_term()).
2767 
2768     Throws:
2769         $(REF ZmqException) if $(ZMQ) reports an error.
2770     */
2771     void detach()
2772     {
2773         m_resource.detach();
2774     }
2775 
2776     ///
2777     unittest
2778     {
2779         auto ctx = Context();
2780         assert (ctx.initialized);
2781         ctx.detach();
2782         assert (!ctx.initialized);
2783     }
2784 
2785     /**
2786     Forcefully terminates the context.
2787 
2788     By using this function, one effectively circumvents the reference-counting
2789     mechanism for managing the context.  After it returns, all other
2790     $(D Context) objects that used to refer to the same context will be in a
2791     state which is functionally equivalent to the default-initialized state
2792     (i.e., $(REF Context.initialized) is $(D false) and $(REF Context.handle)
2793     is $(D null)).
2794 
2795     Throws:
2796         $(REF ZmqException) if $(ZMQ) reports an error.
2797     Corresponds_to:
2798         $(ZMQREF zmq_ctx_term())
2799     */
2800     void terminate() @system
2801     {
2802         m_resource.forceRelease();
2803     }
2804 
2805     ///
2806     @system unittest
2807     {
2808         auto ctx1 = Context();
2809         auto ctx2 = ctx1;
2810         assert (ctx1.initialized);
2811         assert (ctx2.initialized);
2812         assert (ctx1.handle == ctx2.handle);
2813         ctx2.terminate();
2814         assert (!ctx1.initialized);
2815         assert (!ctx2.initialized);
2816         assert (ctx1.handle == null);
2817         assert (ctx2.handle == null);
2818     }
2819 
2820     @system unittest
2821     {
2822         auto ctx = Context();
2823 
2824         void threadFunc()
2825         {
2826             auto server = Socket(ctx, SocketType.rep);
2827             server.bind("inproc://Context_terminate_unittest");
2828             try {
2829                 server.receive(null);
2830                 server.send("");
2831                 server.receive(null);
2832                 assert (false, "Never get here");
2833             } catch (ZmqException e) {
2834                 assert (e.errno == ETERM);
2835             }
2836         }
2837 
2838         import core.thread: Thread;
2839         auto thread = new Thread(&threadFunc);
2840         thread.start();
2841 
2842         auto client = Socket(ctx, SocketType.req);
2843         client.connect("inproc://Context_terminate_unittest");
2844         client.send("");
2845         client.receive(null);
2846         client.close();
2847 
2848         ctx.terminate();
2849         thread.join();
2850     }
2851 
2852 
2853     /**
2854     The number of I/O threads.
2855 
2856     Throws:
2857         $(REF ZmqException) if $(ZMQ) reports an error.
2858     Corresponds_to:
2859         $(ZMQREF zmq_ctx_get()) and $(ZMQREF zmq_ctx_set()) with
2860         $(D ZMQ_IO_THREADS).
2861     */
2862     @property int ioThreads()
2863     {
2864         return getOption(ZMQ_IO_THREADS);
2865     }
2866 
2867     /// ditto
2868     @property void ioThreads(int value)
2869     {
2870         setOption(ZMQ_IO_THREADS, value);
2871     }
2872 
2873     ///
2874     unittest
2875     {
2876         auto ctx = Context();
2877         ctx.ioThreads = 3;
2878         assert (ctx.ioThreads == 3);
2879     }
2880 
2881     /**
2882     The maximum number of sockets.
2883 
2884     Throws:
2885         $(REF ZmqException) if $(ZMQ) reports an error.
2886     Corresponds_to:
2887         $(ZMQREF zmq_ctx_get()) and $(ZMQREF zmq_ctx_set()) with
2888         $(D ZMQ_MAX_SOCKETS).
2889     */
2890     @property int maxSockets()
2891     {
2892         return getOption(ZMQ_MAX_SOCKETS);
2893     }
2894 
2895     /// ditto
2896     @property void maxSockets(int value)
2897     {
2898         setOption(ZMQ_MAX_SOCKETS, value);
2899     }
2900 
2901     ///
2902     unittest
2903     {
2904         auto ctx = Context();
2905         ctx.maxSockets = 512;
2906         assert (ctx.maxSockets == 512);
2907     }
2908 
2909     /**
2910     The largest configurable number of sockets.
2911 
2912     Throws:
2913         $(REF ZmqException) if $(ZMQ) reports an error.
2914     Corresponds_to:
2915         $(ZMQREF zmq_ctx_get()) with $(D ZMQ_SOCKET_LIMIT).
2916     */
2917     @property int socketLimit()
2918     {
2919         return getOption(ZMQ_SOCKET_LIMIT);
2920     }
2921 
2922     ///
2923     unittest
2924     {
2925         auto ctx = Context();
2926         assert (ctx.socketLimit > 0);
2927     }
2928 
2929     /**
2930     IPv6 option.
2931 
2932     Throws:
2933         $(REF ZmqException) if $(ZMQ) reports an error.
2934     Corresponds_to:
2935         $(ZMQREF zmq_ctx_get()) and $(ZMQREF zmq_ctx_set()) with
2936         $(D ZMQ_IPV6).
2937     */
2938     @property bool ipv6()
2939     {
2940         return !!getOption(ZMQ_IPV6);
2941     }
2942 
2943     /// ditto
2944     @property void ipv6(bool value)
2945     {
2946         setOption(ZMQ_IPV6, value ? 1 : 0);
2947     }
2948 
2949     ///
2950     unittest
2951     {
2952         auto ctx = Context();
2953         ctx.ipv6 = true;
2954         assert (ctx.ipv6 == true);
2955     }
2956 
2957     /**
2958     The $(D void*) pointer used by the underlying C API to refer to the context.
2959 
2960     If the object has not been initialized, this function returns $(D null).
2961     */
2962     @property inout(void)* handle() inout @trusted pure nothrow
2963     {
2964         // ZMQ contexts are thread safe, so casting away shared is OK.
2965         return cast(typeof(return)) m_resource.handle;
2966     }
2967 
2968     /**
2969     Whether this $(REF Context) object has been _initialized, i.e. whether it
2970     refers to a valid $(ZMQ) context.
2971     */
2972     @property bool initialized() const pure nothrow
2973     {
2974         return m_resource.handle != null;
2975     }
2976 
2977     ///
2978     unittest
2979     {
2980         Context ctx;
2981         assert (!ctx.initialized);
2982         ctx = Context();
2983         assert (ctx.initialized);
2984         ctx.detach();
2985         assert (!ctx.initialized);
2986     }
2987 
2988 private:
2989     int getOption(int option)
2990     {
2991         immutable value = trusted!zmq_ctx_get(this.handle, option);
2992         if (value < 0) {
2993             throw new ZmqException;
2994         }
2995         return value;
2996     }
2997 
2998     void setOption(int option, int value)
2999     {
3000         if (trusted!zmq_ctx_set(this.handle, option, value) != 0) {
3001             throw new ZmqException;
3002         }
3003     }
3004 
3005     SharedResource m_resource;
3006 }
3007 
3008 
3009 /**
3010 Socket event types.
3011 
3012 These are used together with $(FREF Socket.monitor), and are described
3013 in the $(ZMQREF zmq_socket_monitor()) reference.
3014 */
3015 enum EventType
3016 {
3017     connected       = ZMQ_EVENT_CONNECTED,      /// Corresponds to $(D ZMQ_EVENT_CONNECTED).
3018     connectDelayed  = ZMQ_EVENT_CONNECT_DELAYED,/// Corresponds to $(D ZMQ_EVENT_CONNECT_DELAYED).
3019     connectRetried  = ZMQ_EVENT_CONNECT_RETRIED,/// Corresponds to $(D ZMQ_EVENT_CONNECT_RETRIED).
3020     listening       = ZMQ_EVENT_LISTENING,      /// Corresponds to $(D ZMQ_EVENT_LISTENING).
3021     bindFailed      = ZMQ_EVENT_BIND_FAILED,    /// Corresponds to $(D ZMQ_EVENT_BIND_FAILED).
3022     accepted        = ZMQ_EVENT_ACCEPTED,       /// Corresponds to $(D ZMQ_EVENT_ACCEPTED).
3023     acceptFailed    = ZMQ_EVENT_ACCEPT_FAILED,  /// Corresponds to $(D ZMQ_EVENT_ACCEPT_FAILED).
3024     closed          = ZMQ_EVENT_CLOSED,         /// Corresponds to $(D ZMQ_EVENT_CLOSED).
3025     closeFailed     = ZMQ_EVENT_CLOSE_FAILED,   /// Corresponds to $(D ZMQ_EVENT_CLOSE_FAILED).
3026     disconnected    = ZMQ_EVENT_DISCONNECTED,   /// Corresponds to $(D ZMQ_EVENT_DISCONNECTED).
3027     monitorStopped  = ZMQ_EVENT_MONITOR_STOPPED,/// Corresponds to $(D ZMQ_EVENT_MONITOR_STOPPED).
3028     all             = ZMQ_EVENT_ALL,            /// Corresponds to $(D ZMQ_EVENT_ALL).
3029     handshakeFailedNoDetail = ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL, /// Corresponds to $(D ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL).
3030     handshakeSucceeded      = ZMQ_EVENT_HANDSHAKE_SUCCEEDED,        /// Corresponds to $(D ZMQ_EVENT_HANDSHAKE_SUCCEEDED).
3031     handshakeFailedProtocol = ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL,  /// Corresponds to $(D ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL).
3032     handshakeFailedAuth     = ZMQ_EVENT_HANDSHAKE_FAILED_AUTH,      /// Corresponds to $(D ZMQ_EVENT_HANDSHAKE_FAILED_AUTH).
3033 }
3034 
3035 
3036 /**
3037 Protocol error codes.
3038 
3039 These may be returned from $(FREF Event.protocolError), and are described
3040 in the $(ZMQREF zmq_socket_monitor()) reference.
3041 */
3042 enum ProtocolError
3043 {
3044     zmtpUnspecified                 = ZMQ_PROTOCOL_ERROR_ZMTP_UNSPECIFIED,                  /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_UNSPECIFIED).
3045     zmtpUnexpectedCommand           = ZMQ_PROTOCOL_ERROR_ZMTP_UNEXPECTED_COMMAND,           /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_UNEXPECTED_COMMAND).
3046     zmtpInvalidSequence             = ZMQ_PROTOCOL_ERROR_ZMTP_INVALID_SEQUENCE,             /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_INVALID_SEQUENCE).
3047     zmtpKeyExchange                 = ZMQ_PROTOCOL_ERROR_ZMTP_KEY_EXCHANGE,                 /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_KEY_EXCHANGE).
3048     zmtpMalformedCommandUnspecified = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_UNSPECIFIED,/// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_UNSPECIFIED).
3049     zmtpMalformedCommandMessage     = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_MESSAGE,    /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_MESSAGE).
3050     zmtpMalformedCommandHello       = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_HELLO,      /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_HELLO).
3051     zmtpMalformedCommandInitiate    = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_INITIATE,   /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_INITIATE).
3052     zmtpMalformedCommandError       = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_ERROR,      /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_ERROR).
3053     zmtpMalformedCommandReady       = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_READY,      /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_READY).
3054     zmtpMalformedCommandWelcome     = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_WELCOME,    /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_WELCOME).
3055     zmtpInvalidMetadata             = ZMQ_PROTOCOL_ERROR_ZMTP_INVALID_METADATA,             /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_INVALID_METADATA).
3056     zmtpCryptographic               = ZMQ_PROTOCOL_ERROR_ZMTP_CRYPTOGRAPHIC,                /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_CRYPTOGRAPHIC).
3057     zmtpMechanismMismatch           = ZMQ_PROTOCOL_ERROR_ZMTP_MECHANISM_MISMATCH,           /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MECHANISM_MISMATCH).
3058     zapUnspecified                  = ZMQ_PROTOCOL_ERROR_ZAP_UNSPECIFIED,                   /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZAP_UNSPECIFIED).
3059     zapMalformedReply               = ZMQ_PROTOCOL_ERROR_ZAP_MALFORMED_REPLY,               /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZAP_MALFORMED_REPLY).
3060     zapBadRequestID                 = ZMQ_PROTOCOL_ERROR_ZAP_BAD_REQUEST_ID,                /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZAP_BAD_REQUEST_ID).
3061     zapBadVersion                   = ZMQ_PROTOCOL_ERROR_ZAP_BAD_VERSION,                   /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZAP_BAD_VERSION).
3062     zapInvalidStatusCode            = ZMQ_PROTOCOL_ERROR_ZAP_INVALID_STATUS_CODE,           /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZAP_INVALID_STATUS_CODE).
3063     zapInvalidMetadata              = ZMQ_PROTOCOL_ERROR_ZAP_INVALID_METADATA,              /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZAP_INVALID_METADATA).
3064 }
3065 
3066 
3067 /**
3068 Receives a message on the given _socket and interprets it as a _socket
3069 state change event.
3070 
3071 $(D socket) must be a PAIR _socket which is connected to an endpoint
3072 created via a $(FREF Socket.monitor) call.  $(D receiveEvent()) receives
3073 one message on the _socket, parses its contents according to the
3074 specification in the $(ZMQREF zmq_socket_monitor()) reference,
3075 and returns the event information as an $(REF Event) object.
3076 
3077 Throws:
3078     $(REF ZmqException) if $(ZMQ) reports an error.$(BR)
3079     $(REF InvalidEventException) if the received message could not
3080     be interpreted as an event message.
3081 See_also:
3082     $(FREF Socket.monitor), for monitoring _socket state changes.
3083 */
3084 Event receiveEvent(ref Socket socket) @system
3085 {
3086     enum eventFrameSize = ushort.sizeof + int.sizeof;
3087     auto eventFrame = Frame();
3088     if (socket.receive(eventFrame) != eventFrameSize) {
3089         throw new InvalidEventException;
3090     }
3091     auto addrFrame = Frame();
3092     socket.receive(addrFrame);
3093     try {
3094         import std.conv: to;
3095         immutable event = to!EventType(*(cast(const(ushort)*) eventFrame.data.ptr));
3096         immutable value = *(cast(const(int)*) eventFrame.data.ptr + ushort.sizeof);
3097         immutable addr = (cast(char[]) addrFrame.data).idup;
3098         return Event(event, addr, value);
3099     } catch (Exception e) {
3100         // Any exception thrown within the try block signifies that there
3101         // is something wrong with the event message.
3102         throw new InvalidEventException;
3103     }
3104 }
3105 
3106 // Socket monitoring only works for connection-oriented sockets, and
3107 // IPC is not supported on Windows.
3108 version (Posix) @system unittest
3109 {
3110     Event[] events;
3111     void eventCollector()
3112     {
3113         auto coll = Socket(SocketType.pair);
3114         coll.connect("inproc://zmqd_receiveEvent_unittest_monitor");
3115         do {
3116             events ~= receiveEvent(coll);
3117         } while (events[$-1].type != EventType.closed);
3118     }
3119     import core.thread;
3120     auto collector = new Thread(&eventCollector);
3121     collector.start();
3122 
3123     static void eventGenerator()
3124     {
3125         auto sck1 = Socket(SocketType.pair);
3126         sck1.monitor("inproc://zmqd_receiveEvent_unittest_monitor");
3127         sck1.bind("ipc://zmqd_receiveEvent_unittest");
3128         import core.time;
3129         Thread.sleep(100.msecs);
3130         auto sck2 = Socket(SocketType.pair);
3131         sck2.connect("ipc://zmqd_receiveEvent_unittest");
3132         Thread.sleep(100.msecs);
3133         sck2.disconnect("ipc://zmqd_receiveEvent_unittest");
3134         Thread.sleep(100.msecs);
3135         sck1.unbind("ipc://zmqd_receiveEvent_unittest");
3136     }
3137     eventGenerator();
3138     collector.join();
3139     assert (events.length == 4);
3140     foreach (ev; events) {
3141         assert (ev.address == "ipc://zmqd_receiveEvent_unittest");
3142     }
3143     assert (events[0].type == EventType.listening);
3144     assert (events[1].type == EventType.accepted);
3145     assert (events[2].type == EventType.handshakeSucceeded);
3146     assert (events[3].type == EventType.closed);
3147     import std.exception;
3148     assertNotThrown!Error(events[0].fd);
3149     assertThrown!Error(events[0].errno);
3150     assertThrown!Error(events[0].interval);
3151 }
3152 
3153 
3154 /**
3155 Information about a socket state change.
3156 
3157 See_also:
3158     $(FREF receiveEvent)
3159 */
3160 struct Event
3161 {
3162     /// The event _type.
3163     @property EventType type() const pure nothrow
3164     {
3165         return m_type;
3166     }
3167 
3168     /// The peer _address.
3169     @property string address() const pure nothrow
3170     {
3171         return m_address;
3172     }
3173 
3174     /**
3175     The socket file descriptor.
3176 
3177     This property function may only be called if $(REF Event.type) is one of:
3178     $(D connected), $(D listening), $(D accepted), $(D closed) or $(D disonnected).
3179 
3180     Throws:
3181         $(D Error) if the property is called for a wrong event type.
3182     */
3183     @property FD fd() const pure nothrow
3184     {
3185         final switch (m_type) {
3186             case EventType.connected:
3187             case EventType.listening:
3188             case EventType.accepted:
3189             case EventType.closed:
3190             case EventType.disconnected:
3191                 return cast(typeof(return)) m_value;
3192             case EventType.connectDelayed:
3193             case EventType.connectRetried:
3194             case EventType.bindFailed:
3195             case EventType.acceptFailed:
3196             case EventType.closeFailed:
3197             case EventType.monitorStopped:
3198             case EventType.handshakeFailedNoDetail:
3199             case EventType.handshakeSucceeded:
3200             case EventType.handshakeFailedProtocol:
3201             case EventType.handshakeFailedAuth:
3202                 throw invalidProperty();
3203             case EventType.all:
3204                 assert (false);
3205         }
3206     }
3207 
3208     /**
3209     The $(D _errno) code for the error which triggered the event.
3210 
3211     This property function may only be called if $(REF Event.type) is either
3212     $(D bindFailed), $(D acceptFailed), $(D closeFailed) or
3213     $(D handshakeFailedNoDetail).
3214 
3215     Throws:
3216         $(D Error) if the property is called for a wrong event type.
3217     */
3218     @property int errno() const pure nothrow
3219     {
3220         final switch (m_type) {
3221             case EventType.bindFailed:
3222             case EventType.acceptFailed:
3223             case EventType.closeFailed:
3224             case EventType.handshakeFailedNoDetail:
3225                 return m_value;
3226             case EventType.connected:
3227             case EventType.connectDelayed:
3228             case EventType.connectRetried:
3229             case EventType.listening:
3230             case EventType.accepted:
3231             case EventType.closed:
3232             case EventType.disconnected:
3233             case EventType.monitorStopped:
3234             case EventType.handshakeSucceeded:
3235             case EventType.handshakeFailedProtocol:
3236             case EventType.handshakeFailedAuth:
3237                 throw invalidProperty();
3238             case EventType.all:
3239                 assert (false);
3240         }
3241     }
3242 
3243     /**
3244     The reconnect interval.
3245 
3246     This property function may only be called if $(REF Event.type) is
3247     $(D connectRetried).
3248 
3249     Throws:
3250         $(D Error) if the property is called for a wrong event type.
3251     */
3252     @property Duration interval() const pure nothrow
3253     {
3254         final switch (m_type) {
3255             case EventType.connectRetried:
3256                 return m_value.msecs;
3257             case EventType.connected:
3258             case EventType.connectDelayed:
3259             case EventType.listening:
3260             case EventType.bindFailed:
3261             case EventType.accepted:
3262             case EventType.acceptFailed:
3263             case EventType.closed:
3264             case EventType.closeFailed:
3265             case EventType.disconnected:
3266             case EventType.monitorStopped:
3267             case EventType.handshakeFailedNoDetail:
3268             case EventType.handshakeSucceeded:
3269             case EventType.handshakeFailedProtocol:
3270             case EventType.handshakeFailedAuth:
3271                 throw invalidProperty();
3272             case EventType.all:
3273                 assert (false);
3274         }
3275     }
3276 
3277     /**
3278     The protocol error code.
3279 
3280     This property function may only be called if $(REF Event.type) is
3281     $(D handshakeFailedProtocol).
3282 
3283     Throws:
3284         $(D Error) if the property is called for a wrong event type.
3285     */
3286     @property ProtocolError protocolError() const pure
3287     {
3288         final switch (m_type) {
3289             case EventType.handshakeFailedProtocol:
3290                 import std.conv: to;
3291                 return to!ProtocolError(m_value);
3292             case EventType.connectRetried:
3293             case EventType.connected:
3294             case EventType.connectDelayed:
3295             case EventType.listening:
3296             case EventType.bindFailed:
3297             case EventType.accepted:
3298             case EventType.acceptFailed:
3299             case EventType.closed:
3300             case EventType.closeFailed:
3301             case EventType.disconnected:
3302             case EventType.monitorStopped:
3303             case EventType.handshakeFailedNoDetail:
3304             case EventType.handshakeSucceeded:
3305             case EventType.handshakeFailedAuth:
3306                 throw invalidProperty();
3307             case EventType.all:
3308                 assert (false);
3309         }
3310     }
3311 
3312     /**
3313     The status code returned by the ZAP handler.
3314 
3315     This property function may only be called if $(REF Event.type) is
3316     $(D handshakeFailedAuth).
3317 
3318     Throws:
3319         $(D Error) if the property is called for a wrong event type.
3320     */
3321     @property int statusCode() const pure
3322     {
3323         final switch (m_type) {
3324             case EventType.handshakeFailedAuth:
3325                 return m_value;
3326             case EventType.connectRetried:
3327             case EventType.connected:
3328             case EventType.connectDelayed:
3329             case EventType.listening:
3330             case EventType.bindFailed:
3331             case EventType.accepted:
3332             case EventType.acceptFailed:
3333             case EventType.closed:
3334             case EventType.closeFailed:
3335             case EventType.disconnected:
3336             case EventType.monitorStopped:
3337             case EventType.handshakeFailedNoDetail:
3338             case EventType.handshakeSucceeded:
3339             case EventType.handshakeFailedProtocol:
3340                 throw invalidProperty();
3341             case EventType.all:
3342                 assert (false);
3343         }
3344     }
3345 
3346 private:
3347     this(EventType type, string address, int value) pure nothrow
3348     {
3349         m_type = type;
3350         m_address = address;
3351         m_value = value;
3352     }
3353 
3354     Error invalidProperty(string name = __FUNCTION__)() const pure nothrow
3355     {
3356         try {
3357             import std.conv: text;
3358             return new Error(text("Property '", name,
3359                                   "' not available for event type '",
3360                                   m_type, "'"));
3361         } catch (Exception e) {
3362             assert(false);
3363         }
3364     }
3365 
3366     EventType m_type;
3367     string m_address;
3368     int m_value;
3369 }
3370 
3371 
3372 /**
3373 Encodes a binary key as Z85 printable text.
3374 
3375 $(D dest) must be an array whose length is at least $(D 5*data.length/4 + 1),
3376 which will be used to store the return value plus a terminating zero byte.
3377 If $(D dest) is omitted, a new array will be created.
3378 
3379 Returns:
3380     An array of size $(D 5*data.length/4) which contains the Z85-encoded text,
3381     excluding the terminating zero byte.  This will be a slice of $(D dest) if
3382     it is provided.
3383 Throws:
3384     $(COREF exception,RangeError) if $(D dest) is given but is too small.$(BR)
3385     $(REF ZmqException) if $(ZMQ) reports an error (i.e., if $(D data.length)
3386     is not a multiple of 4).
3387 Corresponds_to:
3388     $(ZMQREF zmq_z85_encode())
3389 */
3390 char[] z85Encode(ubyte[] data, char[] dest)
3391 // TODO: Make data const when we update to ZMQ 4.1
3392 {
3393     import core.exception: RangeError;
3394     immutable len = 5 * data.length / 4;
3395     if (dest.length < len+1) throw new RangeError;
3396     if (trusted!zmq_z85_encode(ptr(dest), ptr(data), data.length) == null) {
3397         throw new ZmqException;
3398     }
3399     return dest[0 .. len];
3400 }
3401 
3402 /// ditto
3403 char[] z85Encode(ubyte[] data)
3404 {
3405     return z85Encode(data, new char[5*data.length/4 + 1]);
3406 }
3407 
3408 debug (WithCurveTests) @system unittest // @system because of assertThrown
3409 {
3410     // TODO: Make data immutable when we update to ZMQ 4.1
3411     auto data = cast(ubyte[])[0x86, 0x4f, 0xd2, 0x6f, 0xb5, 0x59, 0xf7, 0x5b];
3412     immutable encoded = "HelloWorld";
3413     assert (z85Encode(data) == encoded);
3414 
3415     auto buffer = new char[11];
3416     auto result = z85Encode(data, buffer);
3417     assert (result == encoded);
3418     assert (buffer.ptr == result.ptr);
3419 
3420     import core.exception: RangeError;
3421     import std.exception: assertThrown;
3422     assertThrown!RangeError(z85Encode(data, new char[10]));
3423     assertThrown!ZmqException(z85Encode(cast(ubyte[]) [ 1, 2, 3, 4, 5]));
3424 }
3425 
3426 
3427 /**
3428 Decodes a binary key from Z85 printable _text.
3429 
3430 $(D dest) must be an array whose length is at least $(D 4*data.length/5),
3431 which will be used to store the return value.
3432 If $(D dest) is omitted, a new array will be created.
3433 
3434 Note that $(ZMQREF zmq_z85_decode()) expects a zero-terminated string, so a zero
3435 byte will be appended to $(D text) if it does not contain one already.  However,
3436 this may trigger a (possibly unwanted) GC allocation.  To avoid this, either
3437 make sure that the last character in $(D text) is $(D '\0'), or use
3438 $(OBJREF assumeSafeAppend) on the array before calling this function (provided
3439 this is safe).
3440 
3441 Returns:
3442     An array of size $(D 4*data.length/5) which contains the decoded data.
3443     This will be a slice of $(D dest) if it is provided.
3444 Throws:
3445     $(COREF exception,RangeError) if $(D dest) is given but is too small.$(BR)
3446     $(REF ZmqException) if $(ZMQ) reports an error (i.e., if $(D data.length)
3447     is not a multiple of 5).
3448 Corresponds_to:
3449     $(ZMQREF zmq_z85_decode())
3450 */
3451 ubyte[] z85Decode(char[] text, ubyte[] dest)
3452 // TODO: Make text const when we update to ZMQ 4.1
3453 {
3454     import core.exception: RangeError;
3455     immutable len = 4 * text.length/5;
3456     if (dest.length < len) throw new RangeError;
3457     if (text[$-1] != '\0') text ~= '\0';
3458     if (trusted!zmq_z85_decode(ptr(dest), ptr(text)) == null) {
3459         throw new ZmqException;
3460     }
3461     return dest[0 .. len];
3462 }
3463 
3464 /// ditto
3465 ubyte[] z85Decode(char[] text)
3466 {
3467     return z85Decode(text, new ubyte[4*text.length/5]);
3468 }
3469 
3470 debug (WithCurveTests) @system unittest // @system because of assertThrown
3471 {
3472     // TODO: Make data immutable when we update to ZMQ 4.1
3473     auto text = "HelloWorld".dup;
3474     immutable decoded = cast(ubyte[])[0x86, 0x4f, 0xd2, 0x6f, 0xb5, 0x59, 0xf7, 0x5b];
3475     assert (z85Decode(text) == decoded);
3476     assert (z85Decode(text~'\0') == decoded);
3477 
3478     auto buffer = new ubyte[8];
3479     auto result = z85Decode(text, buffer);
3480     assert (result == decoded);
3481     assert (buffer.ptr == result.ptr);
3482 
3483     import core.exception: RangeError;
3484     import std.exception: assertThrown;
3485     assertThrown!RangeError(z85Decode(text, new ubyte[7]));
3486     assertThrown!ZmqException(z85Decode("SizeNotAMultipleOf5".dup));
3487 }
3488 
3489 
3490 /**
3491 Generates a new Curve key pair.
3492 
3493 To avoid a memory allocation, preallocated buffers may optionally be supplied
3494 for the two keys.  Each of these must have a length of at least 41 bytes, enough
3495 for a 40-character Z85-encoded key plus a terminating zero byte.  If either
3496 buffer is omitted/$(D null), a new one will be created.
3497 
3498 Returns:
3499     A tuple that contains the two keys.  Each of these will have a length of
3500     40 characters, and will be slices of the input buffers if such have been
3501     provided.
3502 Throws:
3503     $(COREF exception,RangeError) if $(D publicKeyBuf) or $(D secretKeyBuf) are
3504         not $(D null) but have a length of less than 41 characters.$(BR)
3505     $(REF ZmqException) if $(ZMQ) reports an error.
3506 Corresponds_to:
3507     $(ZMQREF zmq_curve_keypair())
3508 */
3509 Tuple!(char[], "publicKey", char[], "secretKey")
3510     curveKeyPair(char[] publicKeyBuf = null, char[] secretKeyBuf = null)
3511 {
3512     import core.exception: RangeError;
3513     if (publicKeyBuf is null)           publicKeyBuf = new char[41];
3514     else if (publicKeyBuf.length < 41)  throw new RangeError;
3515     if (secretKeyBuf is null)           secretKeyBuf = new char[41];
3516     else if (secretKeyBuf.length < 41)  throw new RangeError;
3517 
3518     if (trusted!zmq_curve_keypair(ptr(publicKeyBuf), ptr(secretKeyBuf)) != 0) {
3519         throw new ZmqException;
3520     }
3521     return typeof(return)(publicKeyBuf[0 .. 40], secretKeyBuf[0 .. 40]);
3522 }
3523 
3524 ///
3525 debug (WithCurveTests) unittest
3526 {
3527     auto server = Socket(SocketType.rep);
3528     auto serverKeys = curveKeyPair();
3529     server.curveServer = true;
3530     server.curveSecretKeyZ85 = serverKeys.secretKey;
3531     server.bind("inproc://curveKeyPair_test");
3532 
3533     auto client = Socket(SocketType.req);
3534     auto clientKeys = curveKeyPair();
3535     client.curvePublicKeyZ85 = clientKeys.publicKey;
3536     client.curveSecretKeyZ85 = clientKeys.secretKey;
3537     client.curveServerKeyZ85 = serverKeys.publicKey;
3538     client.connect("inproc://curveKeyPair_test");
3539     client.send("hello");
3540 
3541     ubyte[5] buf;
3542     assert (server.receive(buf) == 5);
3543     assert (buf.asString() == "hello");
3544 }
3545 
3546 debug (WithCurveTests) @system unittest
3547 {
3548     auto k1 = curveKeyPair();
3549     assert (k1.publicKey.length == 40);
3550     assert (k1.secretKey.length == 40);
3551 
3552     char[82] buf;
3553     auto k2 = curveKeyPair(buf[0 .. 41], buf[41 .. 82]);
3554     assert (k2.publicKey.length == 40);
3555     assert (k2.publicKey.ptr == buf.ptr);
3556     assert (k2.secretKey.length == 40);
3557     assert (k2.secretKey.ptr == buf.ptr + 41);
3558 
3559     char[82] backup = buf;
3560     import core.exception, std.exception;
3561     assertThrown!RangeError(curveKeyPair(buf[0 .. 40], buf[41 .. 82]));
3562     assertThrown!RangeError(curveKeyPair(buf[0 .. 41], buf[42 .. 82]));
3563     assert (backup[] == buf[]);
3564 }
3565 
3566 
3567 /**
3568 Utility function which interprets and validates a byte array as a UTF-8 string.
3569 
3570 Most of $(ZMQD)'s message API deals in $(D ubyte[]) arrays, but very often,
3571 the message _data contains plain text.  $(D asString()) allows for easy and
3572 safe interpretation of raw _data as characters.  It checks that $(D data) is
3573 a valid UTF-8 encoded string, and returns a $(D char[]) array that refers to
3574 the same memory region.
3575 
3576 Throws:
3577     $(STDREF utf,UTFException) if $(D data) is not a valid UTF-8 string.
3578 See_also:
3579     $(STDREF string,representation), which performs the opposite operation.
3580 */
3581 inout(char)[] asString(inout(ubyte)[] data) pure
3582 {
3583     auto s = cast(typeof(return)) data;
3584     import std.utf: validate;
3585     validate(s);
3586     return s;
3587 }
3588 
3589 ///
3590 unittest
3591 {
3592     auto s1 = Socket(SocketType.pair);
3593     s1.bind("inproc://zmqd_asString_example");
3594     auto s2 = Socket(SocketType.pair);
3595     s2.connect("inproc://zmqd_asString_example");
3596 
3597     auto msg = Frame(12);
3598     msg.data.asString()[] = "Hello World!";
3599     s1.send(msg);
3600 
3601     ubyte[12] buf;
3602     s2.receive(buf);
3603     assert(buf.asString() == "Hello World!");
3604 }
3605 
3606 unittest
3607 {
3608     auto bytes = cast(ubyte[]) ['f', 'o', 'o'];
3609     auto text = bytes.asString();
3610     assert (text == "foo");
3611     assert (cast(void*) ptr(bytes) == cast(void*) ptr(text));
3612 
3613     import std.exception: assertThrown;
3614     import std.utf: UTFException;
3615     auto b = cast(ubyte[]) [100, 252, 1];
3616     assertThrown!UTFException(asString(b));
3617 }
3618 
3619 
3620 /**
3621 A class for exceptions thrown when any of the underlying $(ZMQ) C functions
3622 report an error.
3623 
3624 The exception provides a standard error message obtained with
3625 $(ZMQREF zmq_strerror()), as well as the $(D errno) code set by the $(ZMQ)
3626 function which reported the error.
3627 */
3628 class ZmqException : Exception
3629 {
3630     /**
3631     The $(D _errno) code that was set by the $(ZMQ) function that reported
3632     the error.
3633 
3634     Corresponds_to:
3635         $(ZMQREF zmq_errno())
3636     */
3637     immutable int errno;
3638 
3639 private:
3640     this(string file = __FILE__, int line = __LINE__) nothrow
3641     {
3642         import std.conv;
3643         this.errno = trusted!zmq_errno();
3644         string msg;
3645         try {
3646             msg = trusted!(to!string)(trusted!zmq_strerror(this.errno));
3647         } catch (Exception e) { /* We never get here */ }
3648         assert(msg.length);     // Still, let's assert as much.
3649         super(msg, file, line);
3650     }
3651 }
3652 
3653 
3654 /**
3655 Exception thrown by $(FREF receiveEvent) on failure to interpret a
3656 received message as an event description.
3657 */
3658 class InvalidEventException : Exception
3659 {
3660 private:
3661     this(string file = __FILE__, int line = __LINE__) nothrow
3662     {
3663         super("The received message is not an event message", file, line);
3664     }
3665 }
3666 
3667 
3668 // =============================================================================
3669 // Everything below is internal
3670 // =============================================================================
3671 private:
3672 
3673 
3674 struct SharedResource
3675 {
3676 @safe:
3677     alias Exception function(shared(void)*) nothrow Release;
3678 
3679     this(shared(void)* ptr, Release release) nothrow
3680         in { assert(ptr); } body
3681     {
3682         m_payload = new shared(Payload)(1, ptr, release);
3683     }
3684 
3685     this(this) nothrow
3686     {
3687         if (m_payload) {
3688             incRefCount();
3689         }
3690     }
3691 
3692     ~this() nothrow
3693     {
3694         nothrowDetach();
3695     }
3696 
3697     void opAssign(SharedResource rhs)
3698     {
3699         detach();
3700         m_payload = rhs.m_payload;
3701         rhs.m_payload = null;
3702     }
3703 
3704     void detach()
3705     {
3706         if (auto ex = nothrowDetach()) throw ex;
3707     }
3708 
3709     void forceRelease() @system
3710     {
3711         if (m_payload) {
3712             scope(exit) m_payload = null;
3713             decRefCount();
3714             if (m_payload.handle != null) {
3715                 scope(exit) m_payload.handle = null;
3716                 if (auto ex = m_payload.release(m_payload.handle)) {
3717                     throw ex;
3718                 }
3719             }
3720         }
3721     }
3722 
3723     @property inout(shared(void))* handle() inout pure nothrow
3724     {
3725         if (m_payload) {
3726             return m_payload.handle;
3727         } else {
3728             return null;
3729         }
3730     }
3731 
3732 private:
3733     void incRefCount() @trusted nothrow
3734     {
3735         assert (m_payload !is null && m_payload.refCount > 0);
3736         import core.atomic: atomicOp;
3737         atomicOp!"+="(m_payload.refCount, 1);
3738     }
3739 
3740     int decRefCount() @trusted nothrow
3741     {
3742         assert (m_payload !is null && m_payload.refCount > 0);
3743         import core.atomic: atomicOp;
3744         return atomicOp!"-="(m_payload.refCount, 1);
3745     }
3746 
3747     Exception nothrowDetach() @trusted nothrow
3748         out { assert (m_payload is null); }
3749         body
3750     {
3751         if (m_payload) {
3752             scope(exit) m_payload = null;
3753             if (decRefCount() < 1 && m_payload.handle != null) {
3754                 return m_payload.release(m_payload.handle);
3755             }
3756         }
3757         return null;
3758     }
3759 
3760     struct Payload
3761     {
3762         int refCount;
3763         void* handle;
3764         Release release;
3765     }
3766     shared(Payload)* m_payload;
3767 
3768     invariant()
3769     {
3770         assert (m_payload is null ||
3771             (m_payload.refCount > 0 && m_payload.release !is null));
3772     }
3773 }
3774 
3775 @system unittest
3776 {
3777     import std.exception: assertNotThrown, assertThrown;
3778     static Exception myFree(shared(void)* p) @trusted nothrow
3779     {
3780         auto v = cast(shared(int)*) p;
3781         if (*v == 0) {
3782             return new Exception("double release");
3783         } else {
3784             *v = 0;
3785             return null;
3786         }
3787     }
3788 
3789     shared int i = 1;
3790 
3791     {
3792         // Test constructor and properties.
3793         auto ra = SharedResource(&i, &myFree);
3794         assert (i == 1);
3795         assert (ra.handle == &i);
3796 
3797         // Test postblit constructor
3798         auto rb = ra;
3799         assert (i == 1);
3800         assert (rb.handle == &i);
3801 
3802         {
3803             // Test properties and free() with default-initialized object.
3804             SharedResource rc;
3805             assert (rc.handle == null);
3806             assertNotThrown(rc.detach());
3807 
3808             // Test assignment, both with and without detachment
3809             rc = rb;
3810             assert (i == 1);
3811             assert (rc.handle == &i);
3812 
3813             shared int j = 2;
3814             auto rd = SharedResource(&j, &myFree);
3815             assert (rd.handle == &j);
3816             rd = rb;
3817             assert (j == 0);
3818             assert (i == 1);
3819             assert (rd.handle == &i);
3820 
3821             // Test explicit detach()
3822             shared int k = 3;
3823             auto re = SharedResource(&k, &myFree);
3824             assertNotThrown(re.detach());
3825             assert(k == 0);
3826 
3827             // Test failure to free and assign (myFree(&k) fails when k == 0)
3828             re = SharedResource(&k, &myFree);
3829             assertThrown!Exception(re.detach()); // We defined free(k == 0) as an error
3830             re = SharedResource(&k, &myFree);
3831             assertThrown!Exception(re = rb);
3832         }
3833 
3834         // i should not be "freed" yet
3835         assert (i == 1);
3836         assert (ra.handle == &i);
3837         assert (rb.handle == &i);
3838     }
3839     // ...but now it should.
3840     assert (i == 0);
3841 }
3842 
3843 // Thread safety test
3844 @system unittest
3845 {
3846     enum threadCount = 100;
3847     enum copyCount = 1000;
3848     static Exception myFree(shared(void)* p) @trusted nothrow
3849     {
3850         auto v = cast(shared(int)*) p;
3851         if (*v == 0) {
3852             return new Exception("double release");
3853         } else {
3854             *v = 0;
3855             return null;
3856         }
3857     }
3858     shared int raw = 1;
3859     {
3860         auto rs = SharedResource(&raw, &myFree);
3861 
3862         import core.thread;
3863         auto group = new ThreadGroup;
3864         foreach (i; 0 .. threadCount) {
3865             group.create(() {
3866                 auto a = rs;
3867                 foreach (j; 0 .. copyCount) {
3868                     auto b = a;
3869                     assert (b.handle == &raw);
3870                     assert (raw == 1);
3871                 }
3872             });
3873         }
3874         group.joinAll();
3875         assert (rs.handle == &raw);
3876         assert (raw == 1);
3877     }
3878     assert (raw == 0);
3879 }
3880 
3881 // forceRelease() test
3882 @system unittest
3883 {
3884     import std.exception: assertNotThrown, assertThrown;
3885     static Exception myFree(shared(void)* p) @trusted nothrow
3886     {
3887         auto v = cast(shared(int)*) p;
3888         if (*v == 0) {
3889             return new Exception("double release");
3890         } else {
3891             *v = 0;
3892             return null;
3893         }
3894     }
3895 
3896     shared int i = 1;
3897 
3898     auto ra = SharedResource(&i, &myFree);
3899     auto rb = ra;
3900     assert (ra.handle == &i);
3901     assert (rb.handle == ra.handle);
3902     rb.forceRelease();
3903     assert (rb.handle == null);
3904     assert (ra.handle == null);
3905 }
3906 
3907 
3908 version(unittest) private string uniqueUrl(string p, int n = __LINE__)
3909 {
3910     import std.uuid;
3911     return p ~ "://" ~ randomUUID().toString();
3912 }
3913 
3914 
3915 private auto trusted(alias func, Args...)(auto ref Args args) @trusted
3916 {
3917     return func(args);
3918 }
3919 
3920 private auto scopedTrusted(alias func, Args...)(auto ref scope Args args) @trusted
3921 {
3922     return func(args);
3923 }
3924 
3925 
3926 // std.string.toStringz() is unsafe, so we provide our own implementation
3927 // tailored to the string sizes we are likely to encounter here.
3928 // Note that this implementation requires that the string be used immediately
3929 // upon return, and not stored, as the buffer will be reused most of the time.
3930 const(char)* zeroTermString(const char[] s) nothrow
3931 {
3932     import std.algorithm: max;
3933     static char[] buf;
3934     immutable len = s.length + 1;
3935     if (buf.length < len) buf.length = max(len, 1023);
3936     buf[0 .. s.length] = s;
3937     buf[s.length] = '\0';
3938     return ptr(buf);
3939 }
3940 
3941 @system unittest
3942 {
3943     auto c1 = zeroTermString("Hello World!");
3944     assert (c1[0 .. 13] == "Hello World!\0");
3945     auto c2 = zeroTermString("foo");
3946     assert (c2[0 .. 4] == "foo\0");
3947     assert (c1 == c2);
3948 }