1 /**
2 A thin wrapper around the low-level C API of the $(LINK2 http://zeromq.org,$(ZMQ))
3 messaging framework, for the $(LINK2 http://dlang.org,D programming language).
4 
5 Most functions in this module have a one-to-one relationship with functions
6 in the underlying C API.  Some adaptations have been made to make the API
7 safer, easier and more pleasant to use; most importantly:
8 $(UL
9     $(LI
10         Errors are signalled by means of exceptions rather than return
11         codes.  In particular, the $(REF ZmqException) class provides
12         a standard textual message for any error condition, but it also
13         provides access to the $(D errno) code set by the C function
14         that reported the error.)
15     $(LI
16         Functions are appropriately marked with $(D @safe), $(D pure)
17         and $(D nothrow), thus facilitating their use in high-level D code.)
18     $(LI
19         Memory and resources (i.e. contexts, sockets and messages) are
20         automatically managed, thus preventing leaks.)
21     $(LI
22         Context, socket and message options are implemented as properties.)
23 )
24 The names of functions and types in $(ZMQD) are very similar to those in
25 $(ZMQ), but they follow the D naming conventions.  Thus, the library should
26 feel both familiar to $(ZMQ) users and natural to D users.  A notable
27 deviation from the C API is that message parts are consistently called
28 "frames".  For example, $(D zmq_msg_send()) becomes $(D zmqd.Frame.send())
29 and so on.  (Multipart messages were a late addition to $(ZMQ), and the "msg"
30 function names were well established at that point.  The library's
31 developers have admitted that this is somewhat confusing, and the newer
32 CZMQ API consistently uses "frame" in function names.)
33 
34 Due to the close correspondence with the C API, this documentation has
35 intentionally been kept sparse. There is really no reason to repeat the
36 contents of the $(ZMQAPI __start,$(ZMQ) reference manual) here.
37 Instead, the documentation for each function contains a "Corresponds to"
38 section that links to the appropriate pages in the $(ZMQ) reference.  Any
39 details given in the present documentation mostly concern the D-specific
40 adaptations that have been made.
41 
42 Also note that the examples generally only use the INPROC transport.  The
43 reason for this is that the examples double as unittests, and we want to
44 avoid firewall troubles and other issues that could arise with the use of
45 network protocols such as TCP, PGM, etc., and the IPC protocol is not
46 supported on Windows.  Anyway, they are only short
47 snippets that demonstrate the syntax; for more comprehensive and realistic
48 examples, please refer to the $(LINK2 http://zguide.zeromq.org/page:all,
49 $(ZMQ) Guide).  Many of the examples in the Guide have been translated to
50 D, and can be found in the
51 $(LINK2 https://github.com/kyllingstad/zmqd/tree/master/examples,$(D examples))
52 subdirectory of the $(ZMQD) source repository.
53 
54 Version:
55     1.0.0-beta (compatible with $(ZMQ) >= 4.0.0)
56 Authors:
57     $(LINK2 http://github.com/kyllingstad,Lars T. Kyllingstad)
58 Copyright:
59     Copyright (c) 2013–2014, Lars T. Kyllingstad. All rights reserved.
60 License:
61     $(ZMQD) is released under a BSD licence (see LICENCE.txt for details).$(BR)
62     Please refer to the $(LINK2 http://zeromq.org/area:licensing,$(ZMQ) site)
63     for details about $(ZMQ) licensing.
64 Macros:
65     D      = <code>$0</code>
66     EM     = <em>$0</em>
67     LDOTS  = &hellip;
68     QUOTE  = <blockquote>$0</blockquote>
69     FREF   = $(D $(LINK2 #$1,$1()))
70     REF    = $(D $(LINK2 #$1,$1))
71     COREF  = $(D $(LINK2 http://dlang.org/phobos/core_$1.html#.$2,core.$1.$2))
72     OBJREF = $(D $(LINK2 http://dlang.org/phobos/object.html#.$1,$1))
73     STDREF = $(D $(LINK2 http://dlang.org/phobos/std_$1.html#.$2,std.$1.$2))
74     ZMQ    = &#x2205;MQ
75     ZMQAPI = $(LINK2 http://api.zeromq.org/4-0:$1,$+)
76     ZMQD   = $(ZMQ)D
77     ZMQREF = $(D $(ZMQAPI $1,$1))
78 */
79 module zmqd;
80 
81 import core.time;
82 import std.typecons;
83 import deimos.zmq.zmq;
84 
85 
86 version(Windows) {
87     import std.c.windows.winsock: SOCKET;
88 }
89 
90 // libsodium is enabled by default, since that is the case with ZeroMQ itself.
91 version (WithoutLibsodium) { } else version = WithLibsodium;
92 
93 // Compatibility check
94 version(unittest) static this()
95 {
96     import std.stdio: stderr;
97     const v = zmqVersion();
98     if (v.major != ZMQ_VERSION_MAJOR || v.minor != ZMQ_VERSION_MINOR) {
99         stderr.writefln(
100             "Warning: Potential ZeroMQ header/library incompatibility: "
101             ~"The header (binding) is for version %d.%d.%d, "
102             ~"while the library is version %d.%d.%d. Unittests may fail.",
103             ZMQ_VERSION_MAJOR, ZMQ_VERSION_MINOR, ZMQ_VERSION_PATCH,
104             v.major, v.minor, v.patch);
105     }
106     // Known incompatibilities
107     import std.algorithm: min, max;
108     const libVersion = ZMQ_MAKE_VERSION(v.major, v.minor, v.patch);
109     const older = min(ZMQ_VERSION, libVersion);
110     const newer = max(ZMQ_VERSION, libVersion);
111     if (older < ZMQ_MAKE_VERSION(4, 1, 0) && newer >= ZMQ_MAKE_VERSION(4, 1, 0)) {
112         stderr.writeln("Note: Version 4.1.0 is known to be ABI incompatible with older versions");
113     } else if (older < ZMQ_MAKE_VERSION(4, 1, 1) && newer >= ZMQ_MAKE_VERSION(4, 1, 1)) {
114         stderr.writeln("Note: Version 4.1.1 is known to be ABI incompatible with older versions");
115     }
116 }
117 
118 
119 @safe:
120 
121 
122 /**
123 Reports the $(ZMQ) library version.
124 
125 Returns:
126     A $(STDREF typecons,Tuple) with three integer fields that represent the
127     three versioning levels: $(D major), $(D minor) and $(D patch).
128 Corresponds_to:
129     $(ZMQREF zmq_version())
130 */
131 Tuple!(int, "major", int, "minor", int, "patch") zmqVersion() nothrow
132 {
133     typeof(return) v;
134     trusted!zmq_version(&v.major, &v.minor, &v.patch);
135     return v;
136 }
137 
138 
139 /**
140 The various socket types.
141 
142 These are described in the $(ZMQREF zmq_socket()) reference.
143 */
144 enum SocketType
145 {
146     req     = ZMQ_REQ,      /// Corresponds to $(D ZMQ_REQ)
147     rep     = ZMQ_REP,      /// Corresponds to $(D ZMQ_REP)
148     dealer  = ZMQ_DEALER,   /// Corresponds to $(D ZMQ_DEALER)
149     router  = ZMQ_ROUTER,   /// Corresponds to $(D ZMQ_ROUTER)
150     pub     = ZMQ_PUB,      /// Corresponds to $(D ZMQ_PUB)
151     sub     = ZMQ_SUB,      /// Corresponds to $(D ZMQ_SUB)
152     xpub    = ZMQ_XPUB,     /// Corresponds to $(D ZMQ_XPUB)
153     xsub    = ZMQ_XSUB,     /// Corresponds to $(D ZMQ_XSUB)
154     push    = ZMQ_PUSH,     /// Corresponds to $(D ZMQ_PUSH)
155     pull    = ZMQ_PULL,     /// Corresponds to $(D ZMQ_PULL)
156     pair    = ZMQ_PAIR,     /// Corresponds to $(D ZMQ_PAIR)
157     stream  = ZMQ_STREAM,   /// Corresponds to $(D ZMQ_STREAM)
158 }
159 
160 
161 /// Security mechanisms.
162 enum Security
163 {
164     /// $(ZMQAPI zmq_null,NULL): No security or confidentiality.
165     none  = ZMQ_NULL,
166 
167     /// $(ZMQAPI zmq_plain,PLAIN): Clear-text authentication.
168     plain = ZMQ_PLAIN,
169 
170     /// $(ZMQAPI zmq_curve,CURVE): Secure authentication and confidentiality.
171     curve = ZMQ_CURVE,
172 }
173 
174 
175 /**
176 An object that encapsulates a $(ZMQ) socket.
177 
178 A default-initialized $(D Socket) is not a valid $(ZMQ) socket; it
179 must always be explicitly initialized with a constructor (see
180 $(FREF _Socket.this)):
181 ---
182 Socket s;                     // Not a valid socket yet
183 s = Socket(SocketType.push);  // ...but now it is.
184 ---
185 This $(D struct) is noncopyable, which means that a socket is always
186 uniquely managed by a single $(D Socket) object.  Functions that will
187 inspect or use the socket, but not take ownership of it, should take
188 a $(D ref Socket) parameter.  Use $(STDREF algorithm,move) to move
189 a $(D Socket) to a different location (e.g. into a sink function that
190 takes it by value, or into a new variable).
191 
192 The socket is automatically closed when the $(D Socket) object goes out
193 of scope.
194 
195 Linger_period:
196 Note that Socket by default sets the socket's linger period to zero.
197 This deviates from the $(ZMQ) default (which is an infinite linger period).
198 */
199 struct Socket
200 {
201 @safe:
202     /**
203     Creates a new $(ZMQ) socket.
204 
205     If $(D context) is not specified, the default context (as returned
206     by $(FREF defaultContext)) is used.
207 
208     Throws:
209         $(REF ZmqException) if $(ZMQ) reports an error.
210     Corresponds_to:
211         $(ZMQREF zmq_socket())
212     */
213     this(SocketType type)
214     {
215         this(defaultContext(), type);
216     }
217 
218     /// ditto
219     this(Context context, SocketType type)
220     {
221         m_context = context;
222         m_type = type;
223         m_socket = trusted!zmq_socket(context.handle, type);
224         if (m_socket == null) {
225             throw new ZmqException;
226         }
227         linger = 0.msecs;
228     }
229 
230     /// With default context:
231     unittest
232     {
233         auto sck = Socket(SocketType.push);
234         assert (sck.initialized);
235     }
236     /// With explicit context:
237     unittest
238     {
239         auto ctx = Context();
240         auto sck = Socket(ctx, SocketType.push);
241         assert (sck.initialized);
242     }
243 
244     // Socket objects are noncopyable.
245     @disable this(this);
246 
247     unittest // Verify that move semantics work.
248     {
249         import std.algorithm: move;
250         struct SocketOwner
251         {
252             this(Socket s) { owned = trusted!move(s); }
253             Socket owned;
254         }
255         auto socket = Socket(SocketType.req);
256         const socketPtr = socket.handle;
257         assert (socketPtr != null);
258 
259         auto owner = SocketOwner(trusted!move(socket));
260         assert (socket.handle == null);
261         assert (!socket.m_context.initialized);
262         assert (owner.owned.handle == socketPtr);
263         assert (owner.owned.m_context.initialized);
264     }
265 
266     // Closes the socket on desctruction.
267     ~this() nothrow { nothrowClose(); }
268 
269     /**
270     Closes the $(ZMQ) socket.
271 
272     Note that the socket will be closed automatically upon destruction,
273     so it is usually not necessary to call this method manually.
274 
275     Throws:
276         $(REF ZmqException) if $(ZMQ) reports an error.
277     Corresponds_to:
278         $(ZMQREF zmq_close())
279     */
280     void close()
281     {
282         if (!nothrowClose()) throw new ZmqException;
283     }
284 
285     ///
286     unittest
287     {
288         auto s = Socket(SocketType.pair);
289         assert (s.initialized);
290         s.close();
291         assert (!s.initialized);
292     }
293 
294     /**
295     Starts accepting incoming connections on $(D endpoint).
296 
297     Throws:
298         $(REF ZmqException) if $(ZMQ) reports an error.
299     Corresponds_to:
300         $(ZMQREF zmq_bind())
301     */
302     void bind(const char[] endpoint)
303     {
304         if (trusted!zmq_bind(m_socket, zeroTermString(endpoint)) != 0) {
305             throw new ZmqException;
306         }
307     }
308 
309     ///
310     unittest
311     {
312         auto s = Socket(SocketType.pub);
313         s.bind("inproc://zmqd_bind_example");
314     }
315 
316     /**
317     Stops accepting incoming connections on $(D endpoint).
318 
319     Throws:
320         $(REF ZmqException) if $(ZMQ) reports an error.
321     Corresponds_to:
322         $(ZMQREF zmq_unbind())
323     */
324     void unbind(const char[] endpoint)
325     {
326         if (trusted!zmq_unbind(m_socket, zeroTermString(endpoint)) != 0) {
327             throw new ZmqException;
328         }
329     }
330 
331     // TODO: Remove version(Posix) and change to INPROC when updating to ZMQ 4.1.
332     //       IPC does not work on Windows, and unbind() does not work with INPROC.
333     //       See: https://github.com/zeromq/libzmq/issues/949
334     ///
335     version (Posix) unittest
336     {
337         auto s = Socket(SocketType.pub);
338         s.bind("ipc://zmqd_unbind_example");
339         // Do some work...
340         s.unbind("ipc://zmqd_unbind_example");
341     }
342 
343     /**
344     Creates an outgoing connection to $(D endpoint).
345 
346     Throws:
347         $(REF ZmqException) if $(ZMQ) reports an error.
348     Corresponds_to:
349         $(ZMQREF zmq_connect())
350     */
351     void connect(const char[] endpoint)
352     {
353         if (trusted!zmq_connect(m_socket, zeroTermString(endpoint)) != 0) {
354             throw new ZmqException;
355         }
356     }
357 
358     ///
359     unittest
360     {
361         auto s = Socket(SocketType.sub);
362         s.connect("inproc://zmqd_connect_example");
363     }
364 
365     /**
366     Disconnects the socket from $(D endpoint).
367 
368     Throws:
369         $(REF ZmqException) if $(ZMQ) reports an error.
370     Corresponds_to:
371         $(ZMQREF zmq_disconnect())
372     */
373     void disconnect(const char[] endpoint)
374     {
375         if (trusted!zmq_disconnect(m_socket, zeroTermString(endpoint)) != 0) {
376             throw new ZmqException;
377         }
378     }
379 
380     ///
381     unittest
382     {
383         auto s = Socket(SocketType.sub);
384         s.connect("inproc://zmqd_disconnect_example");
385         // Do some work...
386         s.disconnect("inproc://zmqd_disconnect_example");
387     }
388 
389     /**
390     Sends a message frame.
391 
392     $(D _send) blocks until the frame has been queued on the socket.
393     $(D trySend) performs the operation in non-blocking mode, and returns
394     a $(D bool) value that signifies whether the frame was queued on the
395     socket.
396 
397     The $(D more) parameter specifies whether this is a multipart message
398     and there are more frames to follow.
399 
400     The $(D char[]) overload is a convenience function that simply casts
401     the string argument to $(D ubyte[]).
402 
403     Throws:
404         $(REF ZmqException) if $(ZMQ) reports an error.
405     Corresponds_to:
406         $(ZMQREF zmq_send()) (with the $(D ZMQ_DONTWAIT) flag, in the
407         case of $(D trySend), and with the $(D ZMQ_SNDMORE) flag if
408         $(D more == true)).
409     */
410     void send(const ubyte[] data, bool more = false)
411     {
412         immutable flags = more ? ZMQ_SNDMORE : 0;
413         if (trusted!zmq_send(m_socket, data.ptr, data.length, flags) < 0) {
414             throw new ZmqException;
415         }
416     }
417 
418     /// ditto
419     void send(const char[] data, bool more = false)
420     {
421         send(cast(const(ubyte)[]) data, more);
422     }
423 
424     /// ditto
425     bool trySend(const ubyte[] data, bool more = false)
426     {
427         immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0);
428         if (trusted!zmq_send(m_socket, data.ptr, data.length, flags) < 0) {
429             import core.stdc.errno;
430             if (errno == EAGAIN) return false;
431             else throw new ZmqException;
432         }
433         return true;
434     }
435 
436     /// ditto
437     bool trySend(const char[] data, bool more = false)
438     {
439         return trySend(cast(const(ubyte)[]) data, more);
440     }
441 
442     ///
443     unittest
444     {
445         auto sck = Socket(SocketType.pub);
446         sck.send(cast(ubyte[]) [11, 226, 92]);
447         sck.send("Hello World!");
448     }
449 
450     /**
451     Sends a message frame.
452 
453     $(D _send) blocks until the frame has been queued on the socket.
454     $(D trySend) performs the operation in non-blocking mode, and returns
455     a $(D bool) value that signifies whether the frame was queued on the
456     socket.
457 
458     The $(D more) parameter specifies whether this is a multipart message
459     and there are more frames to follow.
460 
461     Throws:
462         $(REF ZmqException) if $(ZMQ) reports an error.
463     Corresponds_to:
464         $(ZMQREF zmq_msg_send()) (with the $(D ZMQ_DONTWAIT) flag, in the
465         case of $(D trySend), and with the $(D ZMQ_SNDMORE) flag if
466         $(D more == true)).
467     */
468     void send(ref Frame msg, bool more = false)
469     {
470         immutable flags = more ? ZMQ_SNDMORE : 0;
471         if (trusted!zmq_msg_send(msg.handle, m_socket, flags) < 0) {
472             throw new ZmqException;
473         }
474     }
475 
476     /// ditto
477     bool trySend(ref Frame msg, bool more = false)
478     {
479         immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0);
480         if (trusted!zmq_msg_send(msg.handle, m_socket, flags) < 0) {
481             import core.stdc.errno;
482             if (errno == EAGAIN) return false;
483             else throw new ZmqException;
484         }
485         return true;
486     }
487 
488     ///
489     unittest
490     {
491         auto sck = Socket(SocketType.pub);
492         auto msg = Frame(12);
493         msg.data.asString()[] = "Hello World!";
494         sck.send(msg);
495     }
496 
497     /**
498     Sends a constant-memory message frame.
499 
500     $(D _sendConst) blocks until the frame has been queued on the socket.
501     $(D trySendConst) performs the operation in non-blocking mode, and returns
502     a $(D bool) value that signifies whether the frame was queued on the
503     socket.
504 
505     The $(D more) parameter specifies whether this is a multipart message
506     and there are more frames to follow.
507 
508     Throws:
509         $(REF ZmqException) if $(ZMQ) reports an error.
510     Corresponds_to:
511         $(ZMQREF zmq_send_const()) (with the $(D ZMQ_DONTWAIT) flag, in the
512         case of $(D trySend), and with the $(D ZMQ_SNDMORE) flag if
513         $(D more == true)).
514     */
515     void sendConst(immutable ubyte[] data, bool more = false)
516     {
517         immutable flags = more ? ZMQ_SNDMORE : 0;
518         if (trusted!zmq_send_const(m_socket, data.ptr, data.length, flags) < 0) {
519             throw new ZmqException;
520         }
521     }
522 
523     /// ditto
524     void sendConst(string data, bool more = false)
525     {
526         sendConst(cast(immutable(ubyte)[]) data, more);
527     }
528 
529     /// ditto
530     bool trySendConst(immutable ubyte[] data, bool more = false)
531     {
532         immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0);
533         if (trusted!zmq_send_const(m_socket, data.ptr, data.length, flags) < 0) {
534             import core.stdc.errno;
535             if (errno == EAGAIN) return false;
536             else throw new ZmqException;
537         }
538         return true;
539     }
540 
541     /// ditto
542     bool trySendConst(string data, bool more = false)
543     {
544         return trySend(cast(immutable(ubyte)[]) data, more);
545     }
546 
547     ///
548     unittest
549     {
550         static immutable arr = cast(ubyte[]) [11, 226, 92];
551         auto sck = Socket(SocketType.pub);
552         sck.sendConst(arr);
553         sck.sendConst("Hello World!");
554     }
555 
556     /**
557     Receives a message frame.
558 
559     $(D _receive) blocks until the request can be satisfied, and returns the
560     number of bytes in the frame.
561     $(D tryReceive) performs the operation in non-blocking mode, and returns
562     a $(STDREF typecons,Tuple) which contains the size of the frame along
563     with a $(D bool) value that signifies whether a frame was received.
564     (If the latter is $(D false), the former is always set to zero.)
565 
566     Throws:
567         $(REF ZmqException) if $(ZMQ) reports an error.
568     Corresponds_to:
569         $(ZMQREF zmq_recv()) (with the $(D ZMQ_DONTWAIT) flag, in the case
570         of $(D tryReceive)).
571 
572     */
573     size_t receive(ubyte[] data)
574     {
575         immutable len = trusted!zmq_recv(m_socket, data.ptr, data.length, 0);
576         if (len >= 0) {
577             import std.conv;
578             return to!size_t(len);
579         } else {
580             throw new ZmqException;
581         }
582     }
583 
584     /// ditto
585     Tuple!(size_t, bool) tryReceive(ubyte[] data)
586     {
587         immutable len = trusted!zmq_recv(m_socket, data.ptr, data.length, ZMQ_DONTWAIT);
588         if (len >= 0) {
589             import std.conv;
590             return typeof(return)(to!size_t(len), true);
591         } else {
592             import core.stdc.errno;
593             if (errno == EAGAIN) {
594                 return typeof(return)(0, false);
595             } else {
596                 throw new ZmqException;
597             }
598         }
599     }
600 
601     ///
602     unittest
603     {
604         // Sender
605         auto snd = Socket(SocketType.req);
606         snd.connect("inproc://zmqd_receive_example");
607         snd.send("Hello World!");
608 
609         // Receiver
610         import std.string: representation;
611         auto rcv = Socket(SocketType.rep);
612         rcv.bind("inproc://zmqd_receive_example");
613         char[256] buf;
614         immutable len  = rcv.receive(buf.representation);
615         assert (buf[0 .. len] == "Hello World!");
616     }
617 
618     @system unittest
619     {
620         auto snd = Socket(SocketType.pair);
621         snd.bind("inproc://zmqd_tryReceive_example");
622         auto rcv = Socket(SocketType.pair);
623         rcv.connect("inproc://zmqd_tryReceive_example");
624 
625         ubyte[256] buf;
626         auto r1 = rcv.tryReceive(buf);
627         assert (!r1[1]);
628 
629         import core.thread, core.time, std.string;
630         snd.send("Hello World!");
631         Thread.sleep(100.msecs); // Wait for message to be transferred...
632         auto r2 = rcv.tryReceive(buf);
633         assert (r2[1] && buf[0 .. r2[0]] == "Hello World!".representation);
634     }
635 
636     /**
637     Receives a message frame.
638 
639     $(D _receive) blocks until the request can be satisfied, and returns the
640     number of bytes in the frame.
641     $(D tryReceive) performs the operation in non-blocking mode, and returns
642     a $(STDREF typecons,Tuple) which contains the size of the frame along
643     with a $(D bool) value that signifies whether a frame was received.
644     (If the latter is $(D false), the former is always set to zero.)
645 
646     Throws:
647         $(REF ZmqException) if $(ZMQ) reports an error.
648     Corresponds_to:
649         $(ZMQREF zmq_msg_recv()) (with the $(D ZMQ_DONTWAIT) flag, in the case
650         of $(D tryReceive)).
651 
652     */
653     size_t receive(ref Frame msg)
654     {
655         immutable len = trusted!zmq_msg_recv(msg.handle, m_socket, 0);
656         if (len >= 0) {
657             import std.conv;
658             return to!size_t(len);
659         } else {
660             throw new ZmqException;
661         }
662     }
663 
664     /// ditto
665     Tuple!(size_t, bool) tryReceive(ref Frame msg)
666     {
667         immutable len = trusted!zmq_msg_recv(msg.handle, m_socket, ZMQ_DONTWAIT);
668         if (len >= 0) {
669             import std.conv;
670             return typeof(return)(to!size_t(len), true);
671         } else {
672             import core.stdc.errno;
673             if (errno == EAGAIN) {
674                 return typeof(return)(0, false);
675             } else {
676                 throw new ZmqException;
677             }
678         }
679     }
680 
681     ///
682     unittest
683     {
684         // Sender
685         auto snd = Socket(SocketType.req);
686         snd.connect("inproc://zmqd_msg_receive_example");
687         snd.send("Hello World!");
688 
689         // Receiver
690         import std.string: representation;
691         auto rcv = Socket(SocketType.rep);
692         rcv.bind("inproc://zmqd_msg_receive_example");
693         auto msg = Frame();
694         rcv.receive(msg);
695         assert (msg.data.asString() == "Hello World!");
696     }
697 
698     @system unittest
699     {
700         auto snd = Socket(SocketType.pair);
701         snd.bind("inproc://zmqd_msg_tryReceive_example");
702         auto rcv = Socket(SocketType.pair);
703         rcv.connect("inproc://zmqd_msg_tryReceive_example");
704 
705         auto msg = Frame();
706         auto r1 = rcv.tryReceive(msg);
707         assert (!r1[1]);
708 
709         import core.thread, core.time, std.string;
710         snd.send("Hello World!");
711         Thread.sleep(100.msecs); // Wait for message to be transferred...
712         auto r2 = rcv.tryReceive(msg);
713         assert (r2[1] && msg.data[0 .. r2[0]] == "Hello World!".representation);
714     }
715 
716     /**
717     The socket _type.
718 
719     Throws:
720         $(REF ZmqException) if $(ZMQ) reports an error.
721     Corresponds_to:
722         $(ZMQREF zmq_getsockopt()) with $(D ZMQ_TYPE).
723     */
724     @property SocketType type() { return getOption!SocketType(ZMQ_TYPE); }
725 
726     ///
727     unittest
728     {
729         auto sck = Socket(SocketType.xpub);
730         assert (sck.type == SocketType.xpub);
731     }
732 
733     /**
734     Whether there are _more message frames to follow.
735 
736     Throws:
737         $(REF ZmqException) if $(ZMQ) reports an error.
738     Corresponds_to:
739         $(ZMQREF zmq_getsockopt()) with $(D ZMQ_RCVMORE).
740     */
741     @property bool more() { return !!getOption!int(ZMQ_RCVMORE); }
742 
743     // TODO: Better unittest/example
744     unittest
745     {
746         auto sck = Socket(SocketType.req);
747         assert (!sck.more);
748     }
749 
750     /**
751     Misc. socket options.
752 
753     Each of these has a one-to-one correspondence with an option passed to
754     $(ZMQREF zmq_getsockopt()) and $(ZMQREF zmq_setsockopt()). For
755     example, $(D identity) corresponds to $(D ZMQ_IDENTITY),
756     $(D receiveBufferSize) corresponds to $(D ZMQ_RCVBUF), etc.
757 
758     Notes:
759     $(UL
760         $(LI For convenience, the setter for the $(D identity) property
761             accepts strings.  To retrieve a string with the getter, use
762             the $(FREF asString) function.
763             ---
764             sck.identity = "foobar";
765             assert (sck.identity.asString() == "foobar");
766             ---
767             )
768         $(LI The $(D linger), $(D receiveTimeout) and $(D sendTimeout)
769             properties may have the special value $(COREF time,Duration.max),
770             which in this context specifies an infinite duration.  This  is
771             translated to an option value of -1 in the C API (and it is also
772             the default value for all of them).)
773         $(LI Some options have array type, and these allow the user to supply
774             a buffer in which to store the value, to avoid a GC allocation.
775             The return value is then a slice of this buffer.
776             These are not marked as $(D @property), but are prefixed with
777             "get" (e.g. $(D getIdentity())).  A user-supplied buffer is
778             $(I required) for some options, namely $(D getPlainUsername())
779             and $(D getPlainPassword()), and these do not have $(D @property)
780             versions.  $(D getCurveXxxKey()) and $(D getCurveXxxKeyZ85())
781             require buffers which are at least 32 and 41 bytes long,
782             respectively.)
783         $(LI The $(D ZMQ_SUBSCRIBE) and $(D ZMQ_UNSUBSCRIBE) options are
784             treated differently from the others; see $(FREF Socket.subscribe)
785             and $(FREF Socket.unsubscribe))
786     )
787 
788     Throws:
789         $(REF ZmqException) if $(ZMQ) reports an error.$(BR)
790         $(STDREF conv,ConvOverflowException) if a given $(D Duration) is
791             longer than the number of milliseconds that will fit in an $(D int)
792             (only applies to properties of $(COREF time,Duration) type).$(BR)
793         $(COREF exception,RangeError) if the $(D dest) buffers passed to
794             $(D getCurveXxxKey()) or $(D getCurveXxxKeyZ85()) are less than
795             32 or 41 bytes long, respectively.
796     Corresponds_to:
797         $(ZMQREF zmq_getsockopt()) and $(ZMQREF zmq_setsockopt()).
798     */
799     @property int sendHWM() { return getOption!int(ZMQ_SNDHWM); }
800     /// ditto
801     @property void sendHWM(int value) { setOption(ZMQ_SNDHWM, value); }
802 
803     /// ditto
804     @property int receiveHWM() { return getOption!int(ZMQ_RCVHWM); }
805     /// ditto
806     @property void receiveHWM(int value) { setOption(ZMQ_RCVHWM, value); }
807 
808     /// ditto
809     @property ulong threadAffinity() { return getOption!ulong(ZMQ_AFFINITY); }
810     /// ditto
811     @property void threadAffinity(ulong value) { setOption(ZMQ_AFFINITY, value); }
812 
813     /// ditto
814     @property ubyte[] identity() { return getIdentity(new ubyte[255]); }
815     /// ditto
816     ubyte[] getIdentity(ubyte[] dest) { return getArrayOption(ZMQ_IDENTITY, dest); }
817     /// ditto
818     @property void identity(const ubyte[] value) { setArrayOption(ZMQ_IDENTITY, value); }
819     /// ditto
820     @property void identity(const char[] value) { setArrayOption(ZMQ_IDENTITY, value); }
821 
822     /// ditto
823     @property int rate() { return getOption!int(ZMQ_RATE); }
824     /// ditto
825     @property void rate(int value) { setOption(ZMQ_RATE, value); }
826 
827     /// ditto
828     @property Duration recoveryInterval()
829     {
830         return msecs(getOption!int(ZMQ_RECOVERY_IVL));
831     }
832     /// ditto
833     @property void recoveryInterval(Duration value)
834     {
835         import std.conv: to;
836         setOption(ZMQ_RECOVERY_IVL, to!int(value.total!"msecs"()));
837     }
838 
839     /// ditto
840     @property int sendBufferSize() { return getOption!int(ZMQ_SNDBUF); }
841     /// ditto
842     @property void sendBufferSize(int value) { setOption(ZMQ_SNDBUF, value); }
843 
844     /// ditto
845     @property int receiveBufferSize() { return getOption!int(ZMQ_RCVBUF); }
846     /// ditto
847     @property void receiveBufferSize(int value) { setOption(ZMQ_RCVBUF, value); }
848 
849     /// ditto
850     @property Duration linger()
851     {
852         const auto value = getOption!int(ZMQ_LINGER);
853         return value == -1 ? Duration.max : value.msecs;
854     }
855     /// ditto
856     @property void linger(Duration value)
857     {
858         import std.conv: to;
859         setOption(ZMQ_LINGER,
860                   value == Duration.max ? -1 : to!int(value.total!"msecs"()));
861     }
862 
863     /// ditto
864     @property Duration reconnectionInterval()
865     {
866         return getOption!int(ZMQ_RECONNECT_IVL).msecs;
867     }
868     /// ditto
869     @property void reconnectionInterval(Duration value)
870     {
871         import std.conv: to;
872         setOption(ZMQ_RECONNECT_IVL, to!int(value.total!"msecs"()));
873     }
874 
875     /// ditto
876     @property Duration maxReconnectionInterval()
877     {
878         return getOption!int(ZMQ_RECONNECT_IVL_MAX).msecs;
879     }
880     /// ditto
881     @property void maxReconnectionInterval(Duration value)
882     {
883         import std.conv: to;
884         setOption(ZMQ_RECONNECT_IVL_MAX, to!int(value.total!"msecs"()));
885     }
886 
887     /// ditto
888     @property int backlog() { return getOption!int(ZMQ_BACKLOG); }
889     /// ditto
890     @property void backlog(int value) { setOption(ZMQ_BACKLOG, value); }
891 
892     /// ditto
893     @property long maxMsgSize() { return getOption!long(ZMQ_MAXMSGSIZE); }
894     /// ditto
895     @property void maxMsgSize(long value) { setOption(ZMQ_MAXMSGSIZE, value); }
896 
897     /// ditto
898     @property int multicastHops() { return getOption!int(ZMQ_MULTICAST_HOPS); }
899     /// ditto
900     @property void multicastHops(int value) { setOption(ZMQ_MULTICAST_HOPS, value); }
901 
902     /// ditto
903     @property Duration receiveTimeout()
904     {
905         const value = getOption!int(ZMQ_RCVTIMEO);
906         return value == -1 ? Duration.max : value.msecs;
907     }
908     /// ditto
909     @property void receiveTimeout(Duration value)
910     {
911         import std.conv: to;
912         setOption(ZMQ_RCVTIMEO,
913                   value == Duration.max ? -1 : to!int(value.total!"msecs"()));
914     }
915 
916     /// ditto
917     @property Duration sendTimeout()
918     {
919         const value = getOption!int(ZMQ_SNDTIMEO);
920         return value == -1 ? Duration.max : value.msecs;
921     }
922     /// ditto
923     @property void sendTimeout(Duration value)
924     {
925         import std.conv: to;
926         setOption(ZMQ_SNDTIMEO,
927                   value == Duration.max ? -1 : to!int(value.total!"msecs"()));
928     }
929 
930     /// ditto
931     @property bool ipv6() { return !!getOption!int(ZMQ_IPV6); }
932     /// ditto
933     @property void ipv6(bool value) { setOption(ZMQ_IPV6, value ? 1 : 0); }
934 
935     /// ditto
936     deprecated("Use !ipv6 instead")
937     @property bool ipv4Only() { return !ipv6; }
938     /// ditto
939     deprecated("Use ipv6 = !value instead")
940     @property void ipv4Only(bool value) { ipv6 = !value; }
941 
942     /// ditto
943     @property bool immediate() { return !!getOption!int(ZMQ_IMMEDIATE); }
944     /// ditto
945     @property void immediate(bool value) { setOption!int(ZMQ_IMMEDIATE, value ? 1 : 0); }
946 
947     /// ditto
948     deprecated("Use the 'immediate' property instead")
949     @property bool delayAttachOnConnect() { return !!getOption!int(ZMQ_DELAY_ATTACH_ON_CONNECT); }
950     /// ditto
951     deprecated("Use the 'immediate' property instead")
952     @property void delayAttachOnConnect(bool value) { setOption(ZMQ_DELAY_ATTACH_ON_CONNECT, value ? 1 : 0); }
953 
954     /// ditto
955     @property FD fd() { return getOption!FD(ZMQ_FD); }
956 
957     /// ditto
958     @property PollFlags events() { return getOption!PollFlags(ZMQ_EVENTS); }
959 
960     /// ditto
961     @property char[] lastEndpoint() @trusted
962     {
963         // This function is not @safe because it calls a @system function
964         // (zmq_getsockopt) and takes the address of a local (len).
965         auto buf = new char[1024];
966         size_t len = buf.length;
967         if (zmq_getsockopt(m_socket, ZMQ_LAST_ENDPOINT, buf.ptr, &len) != 0) {
968             throw new ZmqException;
969         }
970         return buf[0 .. len-1];
971     }
972 
973     /// ditto
974     @property void routerMandatory(bool value) { setOption(ZMQ_ROUTER_MANDATORY, value ? 1 : 0); }
975 
976     /// ditto
977     @property void probeRouter(bool value) { setOption(ZMQ_PROBE_ROUTER, value ? 1 : 0); }
978 
979     /// ditto
980     @property void xpubVerbose(bool value) { setOption(ZMQ_XPUB_VERBOSE, value ? 1 : 0); }
981 
982     /// ditto
983     @property void reqCorrelate(bool value) { setOption(ZMQ_REQ_CORRELATE, value ? 1 : 0); }
984 
985     /// ditto
986     @property void reqRelaxed(bool value) { setOption(ZMQ_REQ_RELAXED, value ? 1 : 0); }
987 
988     /// ditto
989     @property int tcpKeepalive() { return getOption!int(ZMQ_TCP_KEEPALIVE); }
990     /// ditto
991     @property void tcpKeepalive(int value) { setOption(ZMQ_TCP_KEEPALIVE, value); }
992 
993     /// ditto
994     @property int tcpKeepaliveIdle() { return getOption!int(ZMQ_TCP_KEEPALIVE_IDLE); }
995     /// ditto
996     @property void tcpKeepaliveIdle(int value) { setOption(ZMQ_TCP_KEEPALIVE_IDLE, value); }
997 
998     /// ditto
999     @property int tcpKeepaliveCnt() { return getOption!int(ZMQ_TCP_KEEPALIVE_CNT); }
1000     /// ditto
1001     @property void tcpKeepaliveCnt(int value) { setOption(ZMQ_TCP_KEEPALIVE_CNT, value); }
1002 
1003     /// ditto
1004     @property int tcpKeepaliveIntvl() { return getOption!int(ZMQ_TCP_KEEPALIVE_INTVL); }
1005     /// ditto
1006     @property void tcpKeepaliveIntvl(int value) { setOption(ZMQ_TCP_KEEPALIVE_INTVL, value); }
1007 
1008     /// ditto
1009     @property Security mechanism()
1010     {
1011         import std.conv;
1012         return to!Security(getOption!int(ZMQ_MECHANISM));
1013     }
1014 
1015     /// ditto
1016     @property bool plainServer() { return !!getOption!int(ZMQ_PLAIN_SERVER); }
1017     /// ditto
1018     @property void plainServer(bool value) { setOption(ZMQ_PLAIN_SERVER, value ? 1 : 0); }
1019 
1020     /// ditto
1021     char[] getPlainUsername(char[] dest)
1022     {
1023         return getCStringOption(ZMQ_PLAIN_USERNAME, dest);
1024     }
1025     /// ditto
1026     @property void plainUsername(const(char)[] value)
1027     {
1028         setArrayOption(ZMQ_PLAIN_USERNAME, value);
1029     }
1030 
1031     /// ditto
1032     char[] getPlainPassword(char[] dest)
1033     {
1034         return getCStringOption(ZMQ_PLAIN_PASSWORD, dest);
1035     }
1036     /// ditto
1037     @property void plainPassword(const(char)[] value)
1038     {
1039         setArrayOption(ZMQ_PLAIN_PASSWORD, value);
1040     }
1041 
1042 version (WithLibsodium) {
1043 
1044     /// ditto
1045     @property bool curveServer() { return !!getOption!int(ZMQ_CURVE_SERVER); }
1046     /// ditto
1047     @property void curveServer(bool value) { setOption(ZMQ_CURVE_SERVER, value ? 1 : 0); }
1048 
1049     /// ditto
1050     @property ubyte[] curvePublicKey()
1051     {
1052         return getCurvePublicKey(new ubyte[keyBufSizeBin]);
1053     }
1054     /// ditto
1055     ubyte[] getCurvePublicKey(ubyte[] dest)
1056     {
1057         return getCurveKey(ZMQ_CURVE_PUBLICKEY, dest);
1058     }
1059     /// ditto
1060     @property char[] curvePublicKeyZ85()
1061     {
1062         return getCurvePublicKeyZ85(new char[keyBufSizeZ85]);
1063     }
1064     /// ditto
1065     char[] getCurvePublicKeyZ85(char[] dest)
1066     {
1067         return getCurveKeyZ85(ZMQ_CURVE_PUBLICKEY, dest);
1068     }
1069     /// ditto
1070     @property void curvePublicKey(const(ubyte)[] value)
1071     {
1072         setCurveKey(ZMQ_CURVE_PUBLICKEY, value);
1073     }
1074     /// ditto
1075     @property void curvePublicKeyZ85(const(char)[] value)
1076     {
1077         setCurveKeyZ85(ZMQ_CURVE_PUBLICKEY, value);
1078     }
1079 
1080     /// ditto
1081     @property ubyte[] curveSecretKey()
1082     {
1083         return getCurveSecretKey(new ubyte[keyBufSizeBin]);
1084     }
1085     /// ditto
1086     ubyte[] getCurveSecretKey(ubyte[] dest)
1087     {
1088         return getCurveKey(ZMQ_CURVE_SECRETKEY, dest);
1089     }
1090     /// ditto
1091     @property char[] curveSecretKeyZ85()
1092     {
1093         return getCurveSecretKeyZ85(new char[keyBufSizeZ85]);
1094     }
1095     /// ditto
1096     char[] getCurveSecretKeyZ85(char[] dest)
1097     {
1098         return getCurveKeyZ85(ZMQ_CURVE_SECRETKEY, dest);
1099     }
1100     /// ditto
1101     @property void curveSecretKey(const(ubyte)[] value)
1102     {
1103         setCurveKey(ZMQ_CURVE_SECRETKEY, value);
1104     }
1105     /// ditto
1106     @property void curveSecretKeyZ85(const(char)[] value)
1107     {
1108         setCurveKeyZ85(ZMQ_CURVE_SECRETKEY, value);
1109     }
1110 
1111     /// ditto
1112     @property ubyte[] curveServerKey()
1113     {
1114         return getCurveServerKey(new ubyte[keyBufSizeBin]);
1115     }
1116     /// ditto
1117     ubyte[] getCurveServerKey(ubyte[] dest)
1118     {
1119         return getCurveKey(ZMQ_CURVE_SERVERKEY, dest);
1120     }
1121     /// ditto
1122     @property char[] curveServerKeyZ85()
1123     {
1124         return getCurveServerKeyZ85(new char[keyBufSizeZ85]);
1125     }
1126     /// ditto
1127     char[] getCurveServerKeyZ85(char[] dest)
1128     {
1129         return getCurveKeyZ85(ZMQ_CURVE_SERVERKEY, dest);
1130     }
1131     /// ditto
1132     @property void curveServerKey(const(ubyte)[] value)
1133     {
1134         setCurveKey(ZMQ_CURVE_SERVERKEY, value);
1135     }
1136     /// ditto
1137     @property void curveServerKeyZ85(const(char)[] value)
1138     {
1139         setCurveKeyZ85(ZMQ_CURVE_SERVERKEY, value);
1140     }
1141 
1142 } // version (WithLibsodium)
1143 
1144     /// ditto
1145     @property char[] zapDomain() { return getZapDomain(new char[256]); }
1146     /// ditto
1147     char[] getZapDomain(char[] dest) { return getCStringOption(ZMQ_ZAP_DOMAIN, dest); }
1148     /// ditto
1149     @property void zapDomain(const char[] value) { setArrayOption(ZMQ_ZAP_DOMAIN, value); }
1150 
1151     /// ditto
1152     @property void conflate(bool value) { setOption(ZMQ_CONFLATE, value ? 1 : 0); }
1153 
1154     unittest
1155     {
1156         // We test all the socket options by checking that they have their default value.
1157         auto s = Socket(SocketType.xpub);
1158         const e = "inproc://unittest2";
1159         s.bind(e);
1160         import core.time;
1161         assert(s.type == SocketType.xpub);
1162         assert(s.sendHWM == 1000);
1163         assert(s.receiveHWM == 1000);
1164         assert(s.threadAffinity == 0);
1165         assert(s.identity == null);
1166         assert(s.rate == 100);
1167         assert(s.recoveryInterval == 10.seconds);
1168         assert(s.sendBufferSize == 0);
1169         assert(s.receiveBufferSize == 0);
1170         assert(s.linger == 0.hnsecs);
1171         assert(s.reconnectionInterval == 100.msecs);
1172         assert(s.maxReconnectionInterval == Duration.zero);
1173         assert(s.backlog == 100);
1174         assert(s.maxMsgSize == -1);
1175         assert(s.multicastHops == 1);
1176         assert(s.receiveTimeout == Duration.max);
1177         assert(s.sendTimeout == Duration.max);
1178         assert(!s.ipv6);
1179         assert(!s.immediate);
1180         version(Posix) {
1181             assert(s.fd > 2); // 0, 1 and 2 are the standard streams
1182         }
1183         assert(s.events == PollFlags.pollOut);
1184         assert(s.tcpKeepalive == -1);
1185         assert(s.tcpKeepaliveIdle == -1);
1186         assert(s.tcpKeepaliveCnt == -1);
1187         assert(s.tcpKeepaliveIntvl == -1);
1188         assert(s.mechanism == Security.none);
1189         assert(!s.plainServer);
1190         assert(s.getPlainUsername(new char[8]).length == 0);
1191         assert(s.getPlainPassword(new char[8]).length == 0);
1192         version (WithLibsodium) {
1193             assert(!s.curveServer);
1194         }
1195         assert(s.zapDomain.length == 0);
1196 
1197         // Test setters and getters together
1198         s.sendHWM = 500;
1199         assert(s.sendHWM == 500);
1200         s.receiveHWM = 600;
1201         assert(s.receiveHWM == 600);
1202         s.threadAffinity = 1;
1203         assert(s.threadAffinity == 1);
1204         s.identity = cast(ubyte[]) [ 65, 66, 67 ];
1205         assert(s.identity == [65, 66, 67]);
1206         s.identity = "foo";
1207         assert(s.identity == [102, 111, 111]);
1208         s.rate = 200;
1209         assert(s.rate == 200);
1210         s.recoveryInterval = 5.seconds;
1211         assert(s.recoveryInterval == 5_000.msecs);
1212         s.sendBufferSize = 500;
1213         assert(s.sendBufferSize == 500);
1214         s.receiveBufferSize = 600;
1215         assert(s.receiveBufferSize == 600);
1216         s.linger = Duration.zero;
1217         assert(s.linger == Duration.zero);
1218         s.linger = 100_000.usecs;
1219         assert(s.linger == 100.msecs);
1220         s.linger = Duration.max;
1221         assert(s.linger == Duration.max);
1222         s.reconnectionInterval = 200_000.usecs;
1223         assert(s.reconnectionInterval == 200.msecs);
1224         s.maxReconnectionInterval = 300_000.usecs;
1225         assert(s.maxReconnectionInterval == 300.msecs);
1226         s.backlog = 50;
1227         assert(s.backlog == 50);
1228         s.maxMsgSize = 1000;
1229         assert(s.maxMsgSize == 1000);
1230         s.multicastHops = 2;
1231         assert(s.multicastHops == 2);
1232         s.receiveTimeout = 3.seconds;
1233         assert(s.receiveTimeout == 3_000_000.usecs);
1234         s.receiveTimeout = Duration.max;
1235         assert(s.receiveTimeout == Duration.max);
1236         s.sendTimeout = 2_000_000.usecs;
1237         assert(s.sendTimeout == 2.seconds);
1238         s.sendTimeout = Duration.max;
1239         assert(s.sendTimeout == Duration.max);
1240         s.ipv6 = true;
1241         assert(s.ipv6);
1242         s.immediate = true;
1243         assert(s.immediate);
1244         s.tcpKeepalive = 1;
1245         assert(s.tcpKeepalive == 1);
1246         s.tcpKeepaliveIdle = 0;
1247         assert(s.tcpKeepaliveIdle == 0);
1248         s.tcpKeepaliveCnt = 1;
1249         assert(s.tcpKeepaliveCnt == 1);
1250         s.tcpKeepaliveIntvl = 0;
1251         assert(s.tcpKeepaliveIntvl == 0);
1252         s.plainServer = true;
1253         assert(s.mechanism == Security.plain);
1254         assert(s.plainServer);
1255         version (WithLibsodium) {
1256             assert(!s.curveServer);
1257             s.curveServer = true;
1258             assert(s.mechanism == Security.curve);
1259             assert(!s.plainServer);
1260             assert(s.curveServer);
1261         }
1262         s.plainServer = false;
1263         assert(s.mechanism == Security.none);
1264         assert(!s.plainServer);
1265         version (WithLibsodium) assert(!s.curveServer);
1266         s.plainUsername = "foobar";
1267         assert(s.getPlainUsername(new char[8]) == "foobar");
1268         assert(s.mechanism == Security.plain);
1269         s.plainUsername = null;
1270         assert(s.mechanism == Security.none);
1271         s.plainPassword = "xyz";
1272         assert(s.getPlainPassword(new char[8]) == "xyz");
1273         assert(s.mechanism == Security.plain);
1274         s.plainPassword = null;
1275         assert(s.mechanism == Security.none);
1276         s.zapDomain = "my_zap_domain";
1277         assert(s.zapDomain == "my_zap_domain");
1278 
1279         // Test write-only options
1280         s.conflate = true;
1281     }
1282 
1283     version (WithLibsodium) @system unittest
1284     {
1285         // The CURVE key options require some special setup, so we test them
1286         // separately.
1287         import std.array, std.range;
1288         auto binKey1 = iota(cast(ubyte) 0, cast(ubyte) 32).array();
1289         auto z85Key1 = z85Encode(binKey1);
1290         auto binKey2 = iota(cast(ubyte) 32, cast(ubyte) 64).array();
1291         auto z85Key2 = z85Encode(binKey2);
1292         auto zeroKey = repeat(cast(ubyte) 0).take(32).array();
1293         assert (z85Key1 != z85Key2);
1294 
1295         auto s = Socket(SocketType.req);
1296         s.curvePublicKey = zeroKey;
1297         s.curveSecretKey = zeroKey;
1298         s.curveServerKey = zeroKey;
1299 
1300         s.curvePublicKey = binKey1;
1301         assert (s.curvePublicKey == binKey1);
1302         assert (s.curvePublicKeyZ85 == z85Key1);
1303         s.curvePublicKeyZ85 = z85Key2;
1304         assert (s.curvePublicKey == binKey2);
1305         assert (s.curvePublicKeyZ85 == z85Key2);
1306         assert (s.curveSecretKey == zeroKey);
1307         assert (s.curveServerKey == zeroKey);
1308         s.curvePublicKey = zeroKey;
1309 
1310         s.curveSecretKey = binKey1;
1311         assert (s.curveSecretKey == binKey1);
1312         assert (s.curveSecretKeyZ85 == z85Key1);
1313         s.curveSecretKeyZ85 = z85Key2;
1314         assert (s.curveSecretKey == binKey2);
1315         assert (s.curveSecretKeyZ85 == z85Key2);
1316         assert (s.curvePublicKey == zeroKey);
1317         assert (s.curveServerKey == zeroKey);
1318         s.curveSecretKey = zeroKey;
1319 
1320         s.curveServerKey = binKey1;
1321         assert (s.curveServerKey == binKey1);
1322         assert (s.curveServerKeyZ85 == z85Key1);
1323         s.curveServerKeyZ85 = z85Key2;
1324         assert (s.curveServerKey == binKey2);
1325         assert (s.curveServerKeyZ85 == z85Key2);
1326         assert (s.curvePublicKey == zeroKey);
1327         assert (s.curveSecretKey == zeroKey);
1328         s.curveServerKey = zeroKey;
1329     }
1330 
1331     unittest
1332     {
1333         // Some options are only applicable to specific socket types.
1334         auto rt = Socket(SocketType.router);
1335         rt.routerMandatory = true;
1336         rt.probeRouter = true;
1337         auto xp = Socket(SocketType.xpub);
1338         xp.xpubVerbose = true;
1339         auto rq = Socket(SocketType.req);
1340         rq.reqCorrelate = true;
1341         rq.reqRelaxed = true;
1342     }
1343 
1344     deprecated unittest
1345     {
1346         // Test deprecated socket options
1347         auto s = Socket(SocketType.req);
1348         assert(s.ipv4Only);
1349         assert(!s.delayAttachOnConnect);
1350 
1351         // Test setters and getters together
1352         s.ipv4Only = false;
1353         assert(!s.ipv4Only);
1354         s.delayAttachOnConnect = true;
1355         assert(s.delayAttachOnConnect);
1356     }
1357 
1358     /**
1359     Establishes a message filter.
1360 
1361     Throws:
1362         $(REF ZmqException) if $(ZMQ) reports an error.
1363     Corresponds_to:
1364         $(ZMQREF zmq_msg_setsockopt()) with $(D ZMQ_SUBSCRIBE).
1365     */
1366     void subscribe(const ubyte[] filterPrefix)
1367     {
1368         setArrayOption(ZMQ_SUBSCRIBE, filterPrefix);
1369     }
1370     /// ditto
1371     void subscribe(const  char[] filterPrefix)
1372     {
1373         setArrayOption(ZMQ_SUBSCRIBE, filterPrefix);
1374     }
1375 
1376     ///
1377     unittest
1378     {
1379         // Create a subscriber that accepts all messages that start with
1380         // the prefixes "foo" or "bar".
1381         auto sck = Socket(SocketType.sub);
1382         sck.subscribe("foo");
1383         sck.subscribe("bar");
1384     }
1385 
1386     @system unittest
1387     {
1388         void sleep(int ms) {
1389             import core.thread, core.time;
1390             Thread.sleep(dur!"msecs"(ms));
1391         }
1392         auto pub = Socket(SocketType.pub);
1393         pub.bind("inproc://zmqd_subscribe_unittest");
1394         auto sub = Socket(SocketType.sub);
1395         sub.connect("inproc://zmqd_subscribe_unittest");
1396 
1397         pub.send("Hello");
1398         sleep(100);
1399         sub.subscribe("He");
1400         sub.subscribe(cast(ubyte[])['W', 'o']);
1401         sleep(100);
1402         pub.send("Heeee");
1403         pub.send("World");
1404         sleep(100);
1405         ubyte[5] buf;
1406         sub.receive(buf);
1407         assert(buf.asString() == "Heeee");
1408         sub.receive(buf);
1409         assert(buf.asString() == "World");
1410     }
1411 
1412     /**
1413     Removes a message filter.
1414 
1415     Throws:
1416         $(REF ZmqException) if $(ZMQ) reports an error.
1417     Corresponds_to:
1418         $(ZMQREF zmq_msg_setsockopt()) with $(D ZMQ_SUBSCRIBE).
1419     */
1420     void unsubscribe(const ubyte[] filterPrefix)
1421     {
1422         setArrayOption(ZMQ_UNSUBSCRIBE, filterPrefix);
1423     }
1424     /// ditto
1425     void unsubscribe(const char[] filterPrefix)
1426     {
1427         setArrayOption(ZMQ_UNSUBSCRIBE, filterPrefix);
1428     }
1429 
1430     ///
1431     unittest
1432     {
1433         // Subscribe to messages that start with "foo" or "bar".
1434         auto sck = Socket(SocketType.sub);
1435         sck.subscribe("foo");
1436         sck.subscribe("bar");
1437         // ...
1438         // From now on, only accept messages that start with "bar"
1439         sck.unsubscribe("foo");
1440     }
1441 
1442     /**
1443     Spawns a PAIR socket that publishes socket state changes (events) over
1444     the INPROC transport to the given endpoint.
1445 
1446     Which event types should be published may be selected by bitwise-ORing
1447     together different $(REF EventType) flags in the $(D event) parameter.
1448 
1449     Throws:
1450         $(REF ZmqException) if $(ZMQ) reports an error.
1451     Corresponds_to:
1452         $(ZMQREF zmq_socket_monitor())
1453     See_also:
1454         $(FREF receiveEvent), which receives and parses event messages.
1455     */
1456     void monitor(const char[] endpoint, EventType events = EventType.all)
1457     {
1458         if (trusted!zmq_socket_monitor(m_socket, zeroTermString(endpoint), events) < 0) {
1459             throw new ZmqException;
1460         }
1461     }
1462 
1463     ///
1464     unittest
1465     {
1466         auto sck = Socket(SocketType.pub);
1467         sck.monitor("inproc://zmqd_monitor_unittest",
1468                     EventType.accepted | EventType.closed);
1469     }
1470 
1471     /**
1472     The $(D void*) pointer used by the underlying C API to refer to the socket.
1473 
1474     If the object has not been initialized, this function returns $(D null).
1475     */
1476     @property inout(void)* handle() inout pure nothrow
1477     {
1478         return m_socket;
1479     }
1480 
1481     /**
1482     Whether this $(REF Socket) object has been _initialized, i.e. whether it
1483     refers to a valid $(ZMQ) socket.
1484     */
1485     @property bool initialized() const pure nothrow
1486     {
1487         return m_socket != null;
1488     }
1489 
1490     ///
1491     unittest
1492     {
1493         Socket sck;
1494         assert (!sck.initialized);
1495         sck = Socket(SocketType.sub);
1496         assert (sck.initialized);
1497         sck.close();
1498         assert (!sck.initialized);
1499     }
1500 
1501 private:
1502     // Helper function for ~this() and close()
1503     bool nothrowClose() nothrow
1504     {
1505         if (m_socket != null) {
1506             if (trusted!zmq_close(m_socket) != 0) return false;
1507             m_socket = null;
1508         }
1509         return true;
1510     }
1511 
1512     import std.traits;
1513 
1514     T getOption(T)(int option) @trusted
1515         if (isScalarType!T)
1516     {
1517         T buf;
1518         auto len = T.sizeof;
1519         if (zmq_getsockopt(m_socket, option, &buf, &len) != 0) {
1520             throw new ZmqException;
1521         }
1522         assert(len == T.sizeof);
1523         return buf;
1524     }
1525 
1526     void setOption(T)(int option, T value) @trusted
1527         if (isScalarType!T)
1528     {
1529         if (zmq_setsockopt(m_socket, option, &value, value.sizeof) != 0) {
1530             throw new ZmqException;
1531         }
1532     }
1533 
1534     T[] getArrayOption(T)(int option, T[] buf) @trusted
1535         if (isScalarType!T)
1536     {
1537         static assert (T.sizeof == 1);
1538         auto len = buf.length;
1539         if (zmq_getsockopt(m_socket, option, buf.ptr, &len) != 0) {
1540             throw new ZmqException;
1541         }
1542         return buf[0 .. len];
1543     }
1544 
1545     void setArrayOption()(int option, const void[] value)
1546     {
1547         if (trusted!zmq_setsockopt(m_socket, option, value.ptr, value.length) != 0) {
1548             throw new ZmqException;
1549         }
1550     }
1551 
1552     char[] getCStringOption(int option, char[] buf)
1553     {
1554         auto ret = getArrayOption(option, buf);
1555         assert (ret.length && ret[$-1] == '\0');
1556         return ret[0 .. $-1];
1557     }
1558 
1559 version (WithLibsodium) {
1560     enum : size_t
1561     {
1562         keySizeBin    = 32,
1563         keyBufSizeBin = keySizeBin,
1564         keySizeZ85    = 40,
1565         keyBufSizeZ85 = keySizeZ85 + 1,
1566     }
1567 
1568     ubyte[] getCurveKey(int option, ubyte[] buf)
1569     {
1570         if (buf.length < keyBufSizeBin) {
1571             import core.exception: RangeError;
1572             throw new RangeError;
1573         }
1574         return getArrayOption(option, buf[0 .. keyBufSizeBin]);
1575     }
1576 
1577     char[] getCurveKeyZ85(int option, char[] buf)
1578     {
1579         if (buf.length < keyBufSizeZ85) {
1580             import core.exception: RangeError;
1581             throw new RangeError;
1582         }
1583         return getCStringOption(option, buf[0 .. keyBufSizeZ85]);
1584     }
1585 
1586     void setCurveKey(int option, const ubyte[] value)
1587     {
1588         if (value.length != keySizeBin) throw new Exception("Invalid key size");
1589         setArrayOption(option, value);
1590     }
1591 
1592     void setCurveKeyZ85(int option, const char[] value)
1593     {
1594         if (value.length != keySizeZ85) throw new Exception("Invalid key size");
1595         setArrayOption(option, value);
1596     }
1597 } // version (WithLibsodium)
1598 
1599     Context m_context;
1600     SocketType m_type;
1601     void* m_socket;
1602 }
1603 
1604 unittest
1605 {
1606     auto s1 = Socket(SocketType.pair);
1607     auto s2 = Socket(SocketType.pair);
1608     s1.bind("inproc://unittest");
1609     s2.connect("inproc://unittest");
1610     s1.send("Hello World!");
1611     ubyte[12] buf;
1612     const len = s2.receive(buf[]);
1613     assert (len == 12);
1614     assert (buf == "Hello World!");
1615 }
1616 
1617 
1618 version (Windows) {
1619     alias PlatformFD = SOCKET;
1620 } else version (Posix) {
1621     alias PlatformFD = int;
1622 }
1623 
1624 /**
1625 The native socket file descriptor type.
1626 
1627 This is an alias for $(D SOCKET) on Windows and $(D int) on POSIX systems.
1628 */
1629 alias FD = PlatformFD;
1630 
1631 
1632 /**
1633 Starts the built-in $(ZMQ) _proxy.
1634 
1635 This function never returns normally, but it may throw an exception.  This could
1636 happen if the context associated with either of the specified sockets is
1637 manually destroyed in a different thread.
1638 
1639 Throws:
1640     $(REF ZmqException) if $(ZMQ) reports an error.
1641 Corresponds_to:
1642     $(ZMQREF zmq_proxy())
1643 */
1644 void proxy(ref Socket frontend, ref Socket backend)
1645 {
1646     const rc = trusted!zmq_proxy(frontend.handle, backend.handle, null);
1647     assert (rc == -1);
1648     throw new ZmqException;
1649 }
1650 
1651 /// ditto
1652 void proxy(ref Socket frontend, ref Socket backend, ref Socket capture)
1653 {
1654     const rc = trusted!zmq_proxy(frontend.handle, backend.handle, capture.handle);
1655     assert (rc == -1);
1656     throw new ZmqException;
1657 }
1658 
1659 
1660 deprecated("zmqd.poll() has a new signature as of v0.4")
1661 uint poll(zmq_pollitem_t[] items, Duration timeout = Duration.max)
1662 {
1663     import std.conv: to;
1664     const n = trusted!zmq_poll(
1665         items.ptr,
1666         to!int(items.length),
1667         timeout == Duration.max ? -1 : to!int(timeout.total!"msecs"()));
1668     if (n < 0) throw new ZmqException;
1669     return cast(uint) n;
1670 }
1671 
1672 
1673 /**
1674 Input/output multiplexing.
1675 
1676 The $(D timeout) parameter may have the special value
1677 $(COREF time,Duration.max), which in this context specifies an infinite
1678 duration.  This is translated to an argument value of -1 in the C API.
1679 
1680 Returns:
1681     The number of $(REF PollItem) structures with events signalled in
1682     $(REF PollItem.returnedEvents), or 0 if no events have been signalled.
1683 Throws:
1684     $(REF ZmqException) if $(ZMQ) reports an error.
1685 Corresponds_to:
1686     $(ZMQREF zmq_poll())
1687 */
1688 uint poll(PollItem[] items, Duration timeout = Duration.max) @trusted
1689 {
1690     // Here we use a trick where we pretend the array of PollItems is
1691     // actually an array of zmq_pollitem_t, to avoid an unnecessary
1692     // allocation.  For this to work, PollItem must have the exact
1693     // same size as zmq_pollitem_t.
1694     static assert (PollItem.sizeof == zmq_pollitem_t.sizeof);
1695 
1696     import std.conv: to;
1697     const n = zmq_poll(
1698         cast(zmq_pollitem_t*) items.ptr,
1699         to!int(items.length),
1700         timeout == Duration.max ? -1 : to!int(timeout.total!"msecs"()));
1701     if (n < 0) throw new ZmqException;
1702     return cast(uint) n;
1703 }
1704 
1705 
1706 ///
1707 @system unittest
1708 {
1709     auto socket1 = zmqd.Socket(zmqd.SocketType.pull);
1710     socket1.bind("inproc://zmqd_poll_example");
1711 
1712     import std.socket;
1713     auto socket2 = new std.socket.Socket(
1714         AddressFamily.INET,
1715         std.socket.SocketType.DGRAM);
1716     socket2.bind(new InternetAddress(InternetAddress.ADDR_ANY, 5678));
1717 
1718     auto socket3 = zmqd.Socket(zmqd.SocketType.push);
1719     socket3.connect("inproc://zmqd_poll_example");
1720     socket3.send("test");
1721 
1722     import core.thread: Thread;
1723     Thread.sleep(10.msecs);
1724 
1725     auto items = [
1726         PollItem(socket1, PollFlags.pollIn),
1727         PollItem(socket2, PollFlags.pollIn | PollFlags.pollOut),
1728         PollItem(socket3, PollFlags.pollIn),
1729     ];
1730 
1731     const n = poll(items, 100.msecs);
1732     assert (n == 2);
1733     assert (items[0].returnedEvents == PollFlags.pollIn);
1734     assert (items[1].returnedEvents == PollFlags.pollOut);
1735     assert (items[2].returnedEvents == 0);
1736     socket2.close();
1737 }
1738 
1739 
1740 /**
1741 $(FREF poll) event flags.
1742 
1743 These are described in the $(ZMQREF zmq_poll()) manual.
1744 */
1745 enum PollFlags
1746 {
1747     pollIn = ZMQ_POLLIN,    /// Corresponds to $(D ZMQ_POLLIN)
1748     pollOut = ZMQ_POLLOUT,  /// Corresponds to $(D ZMQ_POLLOUT)
1749     pollErr = ZMQ_POLLERR,  /// Corresponds to $(D ZMQ_POLLERR)
1750 }
1751 
1752 
1753 /++
1754 A structure that specifies a socket to be monitored by $(FREF poll) as well
1755 as the events to poll for, and, when $(FREF poll) returns, the events that
1756 occurred.
1757 
1758 Warning:
1759     $(D PollItem) objects do not store $(STDREF socket,Socket) references,
1760     only the corresponding native file descriptors.  This means that the
1761     references have to be stored elsewhere, or the objects may be garbage
1762     collected, invalidating the sockets before or while $(FREF poll) executes.
1763     ---
1764     // Not OK
1765     auto p1 = PollItem(new std.socket.Socket(/*...*/), PollFlags.pollIn);
1766 
1767     // OK
1768     auto s = new std.socket.Socket(/*...*/);
1769     auto p2 = PollItem(s, PollFlags.pollIn);
1770     ---
1771 Corresponds_to:
1772     $(D $(ZMQAPI zmq_poll,zmq_pollitem_t))
1773 +/
1774 struct PollItem
1775 {
1776     /// Constructs a $(REF PollItem) for monitoring a $(ZMQ) socket.
1777     this(ref zmqd.Socket socket, PollFlags events) nothrow
1778     {
1779         m_pollItem = zmq_pollitem_t(socket.handle, 0, cast(short) events, 0);
1780     }
1781 
1782     import std.socket;
1783     /**
1784     Constructs a $(REF PollItem) for monitoring a standard socket referenced
1785     by a $(STDREF socket,Socket).
1786     */
1787     this(std.socket.Socket socket, PollFlags events) @system
1788     {
1789         this(socket.handle, events);
1790     }
1791 
1792     /**
1793     Constructs a $(REF PollItem) for monitoring a standard socket referenced
1794     by a native file descriptor.
1795     */
1796     this(FD fd, PollFlags events) pure nothrow
1797     {
1798         m_pollItem = zmq_pollitem_t(null, fd, cast(short) events, 0);
1799     }
1800 
1801     /**
1802     Requested events.
1803 
1804     Corresponds_to:
1805         $(D $(ZMQAPI zmq_poll,zmq_pollitem_t.events))
1806     */
1807     @property void requestedEvents(PollFlags events) pure nothrow
1808     {
1809         m_pollItem.events = cast(short) events;
1810     }
1811 
1812     /// ditto
1813     @property PollFlags requestedEvents() const pure nothrow
1814     {
1815         return cast(typeof(return)) m_pollItem.events;
1816     }
1817 
1818     /**
1819     Returned events.
1820 
1821     Corresponds_to:
1822         $(D $(ZMQAPI zmq_poll,zmq_pollitem_t.revents))
1823     */
1824     @property PollFlags returnedEvents() const pure nothrow
1825     {
1826         return cast(typeof(return)) m_pollItem.revents;
1827     }
1828 
1829 private:
1830     zmq_pollitem_t m_pollItem;
1831 }
1832 
1833 
1834 /**
1835 An object that encapsulates a $(ZMQ) message frame.
1836 
1837 This $(D struct) is a wrapper around a $(D zmq_msg_t) object.
1838 A default-initialized $(D Frame) is not a valid $(ZMQ) message frame; it
1839 should always be explicitly initialized upon construction using
1840 $(FREF _Frame.opCall).  Alternatively, it may be initialized later with
1841 $(FREF _Frame.rebuild).
1842 ---
1843 Frame msg1;                 // Invalid frame
1844 auto msg2 = Frame();        // Empty frame
1845 auto msg3 = Frame(1024);    // 1K frame
1846 msg1.rebuild(2048);         // msg1 now has size 2K
1847 msg2.rebuild(2048);         // ...and so does msg2
1848 ---
1849 When a $(D Frame) goes out of scope, $(ZMQREF zmq_msg_close()) is
1850 called on the underlying $(D zmq_msg_t).
1851 
1852 A $(D Frame) cannot be copied by normal assignment; use $(FREF _Frame.copy)
1853 for this.
1854 */
1855 struct Frame
1856 {
1857 @safe:
1858     /**
1859     Initializes an empty $(ZMQ) message frame.
1860 
1861     Throws:
1862         $(REF ZmqException) if $(ZMQ) reports an error.
1863     Corresponds_to:
1864         $(ZMQREF zmq_msg_init())
1865     */
1866     static Frame opCall()
1867     {
1868         Frame f;
1869         f.init();
1870         return f;
1871     }
1872 
1873     ///
1874     unittest
1875     {
1876         auto msg = Frame();
1877         assert(msg.size == 0);
1878     }
1879 
1880     /** $(DDOC_ANCHOR Frame.opCall_size)
1881     Initializes a $(ZMQ) message frame of a specified size.
1882 
1883     Throws:
1884         $(REF ZmqException) if $(ZMQ) reports an error.
1885     Corresponds_to:
1886         $(ZMQREF zmq_msg_init_size())
1887     */
1888     static Frame opCall(size_t size)
1889     {
1890         Frame m;
1891         m.init(size);
1892         return m;
1893     }
1894 
1895     ///
1896     unittest
1897     {
1898         auto msg = Frame(123);
1899         assert(msg.size == 123);
1900     }
1901 
1902     /** $(DDOC_ANCHOR Frame.opCall_data)
1903     Initializes a $(ZMQ) message frame from a supplied buffer.
1904 
1905     Warning:
1906         Some care must be taken when using this function, as $(ZMQ) expects
1907         to take full ownership of the supplied buffer.  Client code should
1908         therefore avoid retaining any references to it, including slices that
1909         contain, overlap with or are contained in $(D data).
1910         $(ZMQ) makes no guarantee that the buffer is not modified,
1911         and it does not specify when the buffer is released.
1912 
1913         An additional complication is caused by the fact that most arrays in D
1914         are owned by the garbage collector.  This is solved by adding the array
1915         pointer as a new garbage collector root before passing it to
1916         $(ZMQREF zmq_msg_init_data()), thus preventing the GC from collecting
1917         it.  The root is then removed again in the deallocator callback
1918         function which is called by $(ZMQ) when it no longer requires
1919         the buffer, thus allowing the GC to collect it.
1920     Throws:
1921         $(REF ZmqException) if $(ZMQ) reports an error.
1922     Corresponds_to:
1923         $(ZMQREF zmq_msg_init_data())
1924     */
1925     static Frame opCall(ubyte[] data)
1926     {
1927         Frame m;
1928         m.init(data);
1929         return m;
1930     }
1931 
1932     ///
1933     unittest
1934     {
1935         auto buf = new ubyte[123];
1936         auto msg = Frame(buf);
1937         assert(msg.size == buf.length);
1938         assert(msg.data.ptr == buf.ptr);
1939     }
1940 
1941     /**
1942     Reinitializes the Frame object as an empty message.
1943 
1944     This function will first call $(FREF Frame.close) to release the
1945     resources associated with the message frame, and then it will
1946     initialize it anew, exactly as if it were constructed  with
1947     $(D $(LINK2 #Frame.opCall,Frame())).
1948 
1949     Throws:
1950         $(REF ZmqException) if $(ZMQ) reports an error.
1951     Corresponds_to:
1952         $(ZMQREF zmq_msg_close()) followed by $(ZMQREF zmq_msg_init())
1953     */
1954     void rebuild()
1955     {
1956         close();
1957         init();
1958     }
1959 
1960     ///
1961     unittest
1962     {
1963         auto msg = Frame(256);
1964         assert (msg.size == 256);
1965         msg.rebuild();
1966         assert (msg.size == 0);
1967     }
1968 
1969     /**
1970     Reinitializes the Frame object to a specified size.
1971 
1972     This function will first call $(FREF Frame.close) to release the
1973     resources associated with the message frame, and then it will
1974     initialize it anew, exactly as if it were constructed  with
1975     $(D $(LINK2 #Frame.opCall_size,Frame(size))).
1976 
1977     Throws:
1978         $(REF ZmqException) if $(ZMQ) reports an error.
1979     Corresponds_to:
1980         $(ZMQREF zmq_msg_close()) followed by $(ZMQREF zmq_msg_init_size()).
1981     */
1982     void rebuild(size_t size)
1983     {
1984         close();
1985         init(size);
1986     }
1987 
1988     ///
1989     unittest
1990     {
1991         auto msg = Frame(256);
1992         assert (msg.size == 256);
1993         msg.rebuild(1024);
1994         assert (msg.size == 1024);
1995     }
1996 
1997     /**
1998     Reinitializes the Frame object from a supplied buffer.
1999 
2000     This function will first call $(FREF Frame.close) to release the
2001     resources associated with the message frame, and then it will
2002     initialize it anew, exactly as if it were constructed  with
2003     $(D $(LINK2 #Frame.opCall_data,Frame(data))).
2004 
2005     Throws:
2006         $(REF ZmqException) if $(ZMQ) reports an error.
2007     Corresponds_to:
2008         $(ZMQREF zmq_msg_close()) followed by $(ZMQREF zmq_msg_init_data()).
2009     */
2010     void rebuild(ubyte[] data)
2011     {
2012         close();
2013         init(data);
2014     }
2015 
2016     ///
2017     unittest
2018     {
2019         auto msg = Frame(256);
2020         assert (msg.size == 256);
2021         auto buf = new ubyte[123];
2022         msg.rebuild(buf);
2023         assert(msg.size == buf.length);
2024         assert(msg.data.ptr == buf.ptr);
2025     }
2026 
2027     @disable this(this);
2028 
2029     /**
2030     Releases the $(ZMQ) message frame when the $(D Frame) is destroyed.
2031 
2032     This destructor never throws, which means that any errors will go
2033     undetected.  If this is undesirable, call $(FREF Frame.close) before
2034     the $(D Frame) is destroyed.
2035 
2036     Corresponds_to:
2037         $(ZMQREF zmq_msg_close())
2038     */
2039     ~this() nothrow
2040     {
2041         if (m_initialized) {
2042             immutable rc = trusted!zmq_msg_close(&m_msg);
2043             assert(rc == 0, "zmq_msg_close failed: Invalid message frame");
2044         }
2045     }
2046 
2047     /**
2048     Releases the $(ZMQ) message frame.
2049 
2050     Note that the frame will be automatically released when the $(D Frame)
2051     object is destroyed, so it is often not necessary to call this method
2052     manually.
2053 
2054     Throws:
2055         $(REF ZmqException) if $(ZMQ) reports an error.
2056     Corresponds_to:
2057         $(ZMQREF zmq_msg_close())
2058     */
2059     void close()
2060     {
2061         if (m_initialized) {
2062             if (trusted!zmq_msg_close(&m_msg) != 0) {
2063                 throw new ZmqException;
2064             }
2065             m_initialized = false;
2066         }
2067     }
2068 
2069     /**
2070     Copies frame content to another message frame.
2071 
2072     $(D copy()) returns a new $(D Frame) object, while $(D copyTo(dest))
2073     copies the contents of this $(D Frame) into $(D dest).  $(D dest) must
2074     be a valid (i.e. initialized) $(D Frame).
2075 
2076     Warning:
2077         These functions may not do what you think they do.  Please refer
2078         to $(ZMQAPI zmq_msg_copy(),the $(ZMQ) manual) for details.
2079     Throws:
2080         $(REF ZmqException) if $(ZMQ) reports an error.
2081     Corresponds_to:
2082         $(ZMQREF zmq_msg_copy())
2083     */
2084     Frame copy()
2085         in { assert(m_initialized); }
2086         body
2087     {
2088         auto cp = Frame();
2089         copyTo(cp);
2090         return cp;
2091     }
2092 
2093     /// ditto
2094     void copyTo(ref Frame dest)
2095         in { assert(m_initialized); }
2096         body
2097     {
2098         if (trusted!zmq_msg_copy(&dest.m_msg, &m_msg) != 0) {
2099             throw new ZmqException;
2100         }
2101     }
2102 
2103     ///
2104     unittest
2105     {
2106         import std.string: representation;
2107         auto msg1 = Frame(3);
2108         msg1.data[] = "foo".representation;
2109         auto msg2 = msg1.copy();
2110         assert (msg2.data.asString() == "foo");
2111     }
2112 
2113     /**
2114     Moves frame content to another message frame.
2115 
2116     $(D move()) returns a new $(D Frame) object, while $(D moveTo(dest))
2117     moves the contents of this $(D Frame) to $(D dest).  $(D dest) must
2118     be a valid (i.e. initialized) $(D Frame).
2119 
2120     Throws:
2121         $(REF ZmqException) if $(ZMQ) reports an error.
2122     Corresponds_to:
2123         $(ZMQREF zmq_msg_move())
2124     */
2125     Frame move()
2126         in { assert(m_initialized); }
2127         body
2128     {
2129         auto m = Frame();
2130         moveTo(m);
2131         return m;
2132     }
2133 
2134     /// ditto
2135     void moveTo(ref Frame dest)
2136         in { assert(m_initialized); }
2137         body
2138     {
2139         if (trusted!zmq_msg_move(&dest.m_msg, &m_msg) != 0) {
2140             throw new ZmqException;
2141         }
2142     }
2143 
2144     ///
2145     unittest
2146     {
2147         import std.string: representation;
2148         auto msg1 = Frame(3);
2149         msg1.data[] = "foo".representation;
2150         auto msg2 = msg1.move();
2151         assert (msg1.size == 0);
2152         assert (msg2.data.asString() == "foo");
2153     }
2154 
2155     /**
2156     The message frame content size in bytes.
2157 
2158     Corresponds_to:
2159         $(ZMQREF zmq_msg_size())
2160     */
2161     @property size_t size() nothrow
2162         in { assert(m_initialized); }
2163         body
2164     {
2165         return trusted!zmq_msg_size(&m_msg);
2166     }
2167 
2168     ///
2169     unittest
2170     {
2171         auto msg = Frame(123);
2172         assert(msg.size == 123);
2173     }
2174 
2175     /**
2176     Retrieves the message frame content.
2177 
2178     Corresponds_to:
2179         $(ZMQREF zmq_msg_data())
2180     */
2181     @property ubyte[] data() @trusted nothrow
2182         in { assert(m_initialized); }
2183         body
2184     {
2185         return (cast(ubyte*) zmq_msg_data(&m_msg))[0 .. size];
2186     }
2187 
2188     ///
2189     unittest
2190     {
2191         import std.string: representation;
2192         auto msg = Frame(3);
2193         assert(msg.data.length == 3);
2194         msg.data[] = "foo".representation; // Slice operator -> array copy.
2195         assert(msg.data.asString() == "foo");
2196     }
2197 
2198     /**
2199     Whether there are more message frames to retrieve.
2200 
2201     Corresponds_to:
2202         $(ZMQREF zmq_msg_more())
2203     */
2204     @property bool more() nothrow
2205         in { assert(m_initialized); }
2206         body
2207     {
2208         return !!trusted!zmq_msg_more(&m_msg);
2209     }
2210 
2211     /**
2212     A pointer to the underlying $(D zmq_msg_t).
2213     */
2214     @property inout(zmq_msg_t)* handle() inout pure nothrow
2215     {
2216         return &m_msg;
2217     }
2218 
2219 private:
2220     private void init()
2221         in { assert (!m_initialized); }
2222         out { assert (m_initialized); }
2223         body
2224     {
2225         if (trusted!zmq_msg_init(&m_msg) != 0) {
2226             throw new ZmqException;
2227         }
2228         m_initialized = true;
2229     }
2230 
2231     private void init(size_t size)
2232         in { assert (!m_initialized); }
2233         out { assert (m_initialized); }
2234         body
2235     {
2236         if (trusted!zmq_msg_init_size(&m_msg, size) != 0) {
2237             throw new ZmqException;
2238         }
2239         m_initialized = true;
2240     }
2241 
2242     private void init(ubyte[] data) @trusted
2243         in { assert (!m_initialized); }
2244         out { assert (m_initialized); }
2245         body
2246     {
2247         import core.memory;
2248         static extern(C) zmqd_Frame_init_data_free(void* dataPtr, void* block)
2249             @trusted nothrow
2250         {
2251             GC.removeRoot(dataPtr);
2252             GC.clrAttr(block, GC.BlkAttr.NO_MOVE);
2253         }
2254 
2255         GC.addRoot(data.ptr);
2256         scope(failure) GC.removeRoot(data.ptr);
2257 
2258         auto block = GC.addrOf(data.ptr);
2259         immutable movable = block && !(GC.getAttr(block) & GC.BlkAttr.NO_MOVE);
2260         GC.setAttr(block, GC.BlkAttr.NO_MOVE);
2261         scope(failure) if (movable) GC.clrAttr(block, GC.BlkAttr.NO_MOVE);
2262 
2263         if (trusted!zmq_msg_init_data(&m_msg, data.ptr, data.length,
2264                                       &zmqd_Frame_init_data_free, block) != 0) {
2265             throw new ZmqException;
2266         }
2267         m_initialized = true;
2268     }
2269 
2270     bool m_initialized;
2271     zmq_msg_t m_msg;
2272 }
2273 
2274 unittest
2275 {
2276     const url = uniqueUrl("inproc");
2277     auto s1 = Socket(SocketType.pair);
2278     auto s2 = Socket(SocketType.pair);
2279     s1.bind(url);
2280     s2.connect(url);
2281 
2282     auto m1a = Frame(123);
2283     m1a.data[] = 'a';
2284     s1.send(m1a);
2285     auto m2a = Frame();
2286     s2.receive(m2a);
2287     assert(m2a.size == 123);
2288     foreach (e; m2a.data) assert(e == 'a');
2289 
2290     auto m1b = Frame(10);
2291     m1b.data[] = 'b';
2292     s1.send(m1b);
2293     auto m2b = Frame();
2294     s2.receive(m2b);
2295     assert(m2b.size == 10);
2296     foreach (e; m2b.data) assert(e == 'b');
2297 }
2298 
2299 deprecated("zmqd.Message has been renamed to zmqd.Frame") alias Message = Frame;
2300 
2301 
2302 /**
2303 A global context which is used by default by all sockets, unless they are
2304 explicitly constructed with a different context.
2305 
2306 The $(ZMQ) Guide $(LINK2 http://zguide.zeromq.org/page:all#Getting-the-Context-Right,
2307 has the following to say) about context creation:
2308 $(QUOTE
2309     You should create and use exactly one context in your process.
2310     [$(LDOTS)] If at runtime a process has two contexts, these are
2311     like separate $(ZMQ) instances. If that's explicitly what you
2312     want, OK, but otherwise remember: $(EM Do one $(D zmq_ctx_new())
2313     at the start of your main line code, and one $(D zmq_ctx_destroy())
2314     at the end.)
2315 )
2316 By using $(D defaultContext()), this is exactly what you achieve.  The
2317 context is created the first time the function is called, and is
2318 automatically destroyed when the program ends.
2319 
2320 This function is thread safe.
2321 
2322 Throws:
2323     $(REF ZmqException) if $(ZMQ) reports an error.
2324 See_also:
2325     $(REF Context)
2326 */
2327 Context defaultContext() @trusted
2328 {
2329     // For future reference: This is the low-lock singleton pattern. See:
2330     // http://davesdprogramming.wordpress.com/2013/05/06/low-lock-singletons/
2331     static bool instantiated;
2332     __gshared Context ctx;
2333     if (!instantiated) {
2334         synchronized {
2335             if (!ctx.initialized) {
2336                 ctx = Context();
2337             }
2338             instantiated = true;
2339         }
2340     }
2341     return ctx;
2342 }
2343 
2344 @system unittest
2345 {
2346     import core.thread;
2347     Context c1, c2;
2348     auto t = new Thread(() { c1 = defaultContext(); });
2349     t.start();
2350     c2 = defaultContext();
2351     t.join();
2352     assert(c1.handle !is null);
2353     assert(c1.handle == c2.handle);
2354 }
2355 
2356 
2357 /**
2358 An object that encapsulates a $(ZMQ) context.
2359 
2360 In most programs, it is not necessary to use this type directly,
2361 as $(REF Socket) will use a default global context if not explicitly
2362 provided with one.  See $(FREF defaultContext) for details.
2363 
2364 A default-initialized $(D Context) is not a valid $(ZMQ) context; it
2365 must always be explicitly initialized with $(FREF _Context.opCall):
2366 ---
2367 Context ctx;        // Not a valid context yet
2368 ctx = Context();    // ...but now it is.
2369 ---
2370 $(D Context) objects can be passed around by value, and two copies will
2371 refer to the same context.  The underlying context is managed using
2372 reference counting, so that when the last copy of a $(D Context) goes
2373 out of scope, the context is automatically destroyed.  The reference
2374 counting is performed in a thread safe manner, so that the same context
2375 can be shared between multiple threads.  ($(ZMQ) guarantees the thread
2376 safety of other context operations.)
2377 
2378 See_also:
2379     $(FREF defaultContext)
2380 */
2381 struct Context
2382 {
2383 @safe:
2384     /**
2385     Creates a new $(ZMQ) context.
2386 
2387     Returns:
2388         A $(REF Context) object that encapsulates the new context.
2389     Throws:
2390         $(REF ZmqException) if $(ZMQ) reports an error.
2391     Corresponds_to:
2392         $(ZMQREF zmq_ctx_new())
2393     */
2394     static Context opCall() @trusted // because of the cast
2395     {
2396         if (auto c = trusted!zmq_ctx_new()) {
2397             Context ctx;
2398             // Casting from/to shared is OK since ZMQ contexts are thread safe.
2399             static Exception release(shared(void)* ptr) @trusted nothrow
2400             {
2401                 return zmq_ctx_term(cast(void*) ptr) == 0
2402                     ? null
2403                     : new ZmqException;
2404             }
2405             ctx.m_resource = SharedResource(cast(shared) c, &release);
2406             return ctx;
2407         } else {
2408             throw new ZmqException;
2409         }
2410     }
2411 
2412     ///
2413     unittest
2414     {
2415         auto ctx = Context();
2416         assert (ctx.initialized);
2417     }
2418 
2419     /**
2420     Detaches from the $(ZMQ) context.
2421 
2422     If this is the last reference to the context, it will be destroyed with
2423     $(ZMQREF zmq_ctx_destroy()).
2424 
2425     Throws:
2426         $(REF ZmqException) if $(ZMQ) reports an error.
2427     */
2428     void detach()
2429     {
2430         m_resource.detach();
2431     }
2432 
2433     ///
2434     unittest
2435     {
2436         auto ctx = Context();
2437         assert (ctx.initialized);
2438         ctx.detach();
2439         assert (!ctx.initialized);
2440     }
2441 
2442     /**
2443     The number of I/O threads.
2444 
2445     Throws:
2446         $(REF ZmqException) if $(ZMQ) reports an error.
2447     Corresponds_to:
2448         $(ZMQREF zmq_ctx_get()) and $(ZMQREF zmq_ctx_set()) with
2449         $(D ZMQ_IO_THREADS).
2450     */
2451     @property int ioThreads()
2452     {
2453         return getOption(ZMQ_IO_THREADS);
2454     }
2455 
2456     /// ditto
2457     @property void ioThreads(int value)
2458     {
2459         setOption(ZMQ_IO_THREADS, value);
2460     }
2461 
2462     ///
2463     unittest
2464     {
2465         auto ctx = Context();
2466         ctx.ioThreads = 3;
2467         assert (ctx.ioThreads == 3);
2468     }
2469 
2470     /**
2471     The maximum number of sockets.
2472 
2473     Throws:
2474         $(REF ZmqException) if $(ZMQ) reports an error.
2475     Corresponds_to:
2476         $(ZMQREF zmq_ctx_get()) and $(ZMQREF zmq_ctx_set()) with
2477         $(D ZMQ_MAX_SOCKETS).
2478     */
2479     @property int maxSockets()
2480     {
2481         return getOption(ZMQ_MAX_SOCKETS);
2482     }
2483 
2484     /// ditto
2485     @property void maxSockets(int value)
2486     {
2487         setOption(ZMQ_MAX_SOCKETS, value);
2488     }
2489 
2490     ///
2491     unittest
2492     {
2493         auto ctx = Context();
2494         ctx.maxSockets = 512;
2495         assert (ctx.maxSockets == 512);
2496     }
2497 
2498     /**
2499     The $(D void*) pointer used by the underlying C API to refer to the context.
2500 
2501     If the object has not been initialized, this function returns $(D null).
2502     */
2503     @property inout(void)* handle() inout @trusted pure nothrow
2504     {
2505         // ZMQ contexts are thread safe, so casting away shared is OK.
2506         return cast(typeof(return)) m_resource.handle;
2507     }
2508 
2509     /**
2510     Whether this $(REF Context) object has been _initialized, i.e. whether it
2511     refers to a valid $(ZMQ) context.
2512     */
2513     @property bool initialized() const pure nothrow
2514     {
2515         return m_resource.handle != null;
2516     }
2517 
2518     ///
2519     unittest
2520     {
2521         Context ctx;
2522         assert (!ctx.initialized);
2523         ctx = Context();
2524         assert (ctx.initialized);
2525         ctx.detach();
2526         assert (!ctx.initialized);
2527     }
2528 
2529 private:
2530     int getOption(int option)
2531     {
2532         immutable value = trusted!zmq_ctx_get(this.handle, option);
2533         if (value < 0) {
2534             throw new ZmqException;
2535         }
2536         return value;
2537     }
2538 
2539     void setOption(int option, int value)
2540     {
2541         if (trusted!zmq_ctx_set(this.handle, option, value) != 0) {
2542             throw new ZmqException;
2543         }
2544     }
2545 
2546     SharedResource m_resource;
2547 }
2548 
2549 
2550 /**
2551 Socket event types.
2552 
2553 These are used together with $(FREF Socket.monitor), and are described
2554 in the $(ZMQREF zmq_socket_monitor()) reference.
2555 */
2556 enum EventType
2557 {
2558     connected       = ZMQ_EVENT_CONNECTED,      /// Corresponds to $(D ZMQ_EVENT_CONNECTED).
2559     connectDelayed  = ZMQ_EVENT_CONNECT_DELAYED,/// Corresponds to $(D ZMQ_EVENT_CONNECT_DELAYED).
2560     connectRetried  = ZMQ_EVENT_CONNECT_RETRIED,/// Corresponds to $(D ZMQ_EVENT_CONNECT_RETRIED).
2561     listening       = ZMQ_EVENT_LISTENING,      /// Corresponds to $(D ZMQ_EVENT_LISTENING).
2562     bindFailed      = ZMQ_EVENT_BIND_FAILED,    /// Corresponds to $(D ZMQ_EVENT_BIND_FAILED).
2563     accepted        = ZMQ_EVENT_ACCEPTED,       /// Corresponds to $(D ZMQ_EVENT_ACCEPTED).
2564     acceptFailed    = ZMQ_EVENT_ACCEPT_FAILED,  /// Corresponds to $(D ZMQ_EVENT_ACCEPT_FAILED).
2565     closed          = ZMQ_EVENT_CLOSED,         /// Corresponds to $(D ZMQ_EVENT_CLOSED).
2566     closeFailed     = ZMQ_EVENT_CLOSE_FAILED,   /// Corresponds to $(D ZMQ_EVENT_CLOSE_FAILED).
2567     disconnected    = ZMQ_EVENT_DISCONNECTED,   /// Corresponds to $(D ZMQ_EVENT_DISCONNECTED).
2568     all             = ZMQ_EVENT_ALL             /// Corresponds to $(D ZMQ_EVENT_ALL).
2569 }
2570 
2571 
2572 /**
2573 Receives a message on the given socket and interprets it as a socket
2574 state change event.
2575 
2576 $(D socket) must be a PAIR socket which is connected to an endpoint
2577 created via a $(FREF Socket.monitor) call.  $(D receiveEvent()) receives
2578 one message on the socket, parses its contents according to the
2579 specification in the $(ZMQREF zmq_socket_monitor()) reference,
2580 and returns the event information as an $(REF Event) object.
2581 
2582 Throws:
2583     $(REF ZmqException) if $(ZMQ) reports an error.$(BR)
2584     $(REF InvalidEventException) if the received message could not
2585     be interpreted as an event message.
2586 See_also:
2587     $(FREF Socket.monitor), for monitoring socket state changes.
2588 */
2589 Event receiveEvent(ref Socket socket) @system
2590 {
2591     // The monitor event message format underwent some changes between ZMQ
2592     // versions 3.2, 3.3 (unreleased) and 4.0.  Furthermore, the zmq_event_t
2593     // type was removed as of ZMQ 4.1.  We try to support all versions >= 3.2.
2594     immutable ver = zmqVersion();
2595     immutable usePackedData = ver.major >= 4;
2596     immutable useNewLayout = ZMQ_MAKE_VERSION(ver.major, ver.minor, ver.patch)
2597         >= ZMQ_MAKE_VERSION(3, 3, 0);
2598     assert (useNewLayout || !usePackedData);
2599 
2600     struct OldEventStruct {
2601         int event;
2602         const(char*) addr;
2603         int value;
2604     }
2605     struct NewEventStruct {
2606         ushort event;
2607         int value;
2608     }
2609     immutable eventFrameSize =
2610         usePackedData ? (ushort.sizeof + int.sizeof)
2611                       : (useNewLayout ? NewEventStruct.sizeof : OldEventStruct.sizeof);
2612 
2613     auto eventFrame = Frame();
2614     if (socket.receive(eventFrame) != eventFrameSize) {
2615         throw new InvalidEventException;
2616     }
2617     const data = eventFrame.data.ptr;
2618     try {
2619         import std.conv: to;
2620         EventType event;
2621         int value;
2622         string addr;
2623         if (useNewLayout) {
2624             if (usePackedData) {
2625                 event = to!EventType(*(cast(const(ushort)*) data));
2626                 value = *(cast(const(int)*) data + ushort.sizeof);
2627             } else {
2628                 const eventStruct = cast(const(NewEventStruct)*) data;
2629                 event = to!EventType(eventStruct.event);
2630                 value = eventStruct.value;
2631             }
2632             auto addrFrame = Frame();
2633             socket.receive(addrFrame);
2634             addr = (cast(char[]) addrFrame.data).idup;
2635         } else {
2636             const eventStruct = cast(const(OldEventStruct)*) data;
2637             event = to!EventType(eventStruct.event);
2638             value = eventStruct.value;
2639             addr = eventStruct.addr !is null
2640                 ? to!string(eventStruct.addr)
2641                 : null;
2642         }
2643         return Event(event, addr, value);
2644     } catch (Exception e) {
2645         // Any exception thrown within the try block signifies that there
2646         // is something wrong with the event message.
2647         throw new InvalidEventException;
2648     }
2649 }
2650 
2651 // TODO: Remove version(Posix) and change to INPROC when updating to ZMQ 4.1.
2652 //       IPC does not work on Windows, and unbind() does not work with INPROC.
2653 //       See: https://github.com/zeromq/libzmq/issues/949
2654 version (Posix) @system unittest
2655 {
2656     Event[] events;
2657     void eventCollector()
2658     {
2659         auto coll = Socket(SocketType.pair);
2660         coll.connect("inproc://zmqd_receiveEvent_unittest_monitor");
2661         do {
2662             events ~= receiveEvent(coll);
2663         } while (events[$-1].type != EventType.closed);
2664     }
2665     import core.thread;
2666     auto collector = new Thread(&eventCollector);
2667     collector.start();
2668 
2669     static void eventGenerator()
2670     {
2671         auto sck1 = Socket(SocketType.pair);
2672         sck1.monitor("inproc://zmqd_receiveEvent_unittest_monitor");
2673         sck1.bind("ipc://zmqd_receiveEvent_unittest");
2674         import core.time;
2675         Thread.sleep(100.msecs);
2676         auto sck2 = Socket(SocketType.pair);
2677         sck2.connect("ipc://zmqd_receiveEvent_unittest");
2678         Thread.sleep(100.msecs);
2679         sck2.disconnect("ipc://zmqd_receiveEvent_unittest");
2680         Thread.sleep(100.msecs);
2681         sck1.unbind("ipc://zmqd_receiveEvent_unittest");
2682     }
2683     eventGenerator();
2684     collector.join();
2685     assert (events.length == 3);
2686     foreach (ev; events) {
2687         assert (ev.address == "ipc://zmqd_receiveEvent_unittest");
2688     }
2689     assert (events[0].type == EventType.listening);
2690     assert (events[1].type == EventType.accepted);
2691     assert (events[2].type == EventType.closed);
2692     import std.exception;
2693     assertNotThrown!Error(events[0].fd);
2694     assertThrown!Error(events[0].errno);
2695     assertThrown!Error(events[0].interval);
2696 }
2697 
2698 
2699 /**
2700 Information about a socket state change.
2701 
2702 Corresponds_to:
2703     $(ZMQAPI zmq_socket_monitor,$(D zmq_event_t))
2704 See_also:
2705     $(FREF receiveEvent)
2706 */
2707 struct Event
2708 {
2709     /**
2710     The event type.
2711 
2712     Corresponds_to:
2713         $(D zmq_event_t.event)
2714     */
2715     @property EventType type() const pure nothrow
2716     {
2717         return m_type;
2718     }
2719 
2720     /**
2721     The peer address.
2722 
2723     Corresponds_to:
2724         $(D zmq_event_t.data.xyz.addr), where $(D xyz) is the event-specific union.
2725     */
2726     @property string address() const pure nothrow
2727     {
2728         return m_address;
2729     }
2730 
2731     /**
2732     The socket file descriptor.
2733 
2734     This property function may only be called if $(REF Event.type) is one of:
2735     $(D connected), $(D listening), $(D accepted), $(D closed) or $(D disonnected).
2736 
2737     Throws:
2738         $(D Error) if the property is called for a wrong event type.
2739     Corresponds_to:
2740         $(D zmq_event_t.data.xyz.addr), where $(D xyz) is the event-specific union.
2741     */
2742     @property FD fd() const pure nothrow
2743     {
2744         final switch (m_type) {
2745             case EventType.connected     :
2746             case EventType.listening     :
2747             case EventType.accepted      :
2748             case EventType.closed        :
2749             case EventType.disconnected  : return cast(typeof(return)) m_value;
2750             case EventType.connectDelayed:
2751             case EventType.connectRetried:
2752             case EventType.bindFailed    :
2753             case EventType.acceptFailed  :
2754             case EventType.closeFailed   : throw invalidProperty();
2755             case EventType.all           :
2756         }
2757         assert (false);
2758     }
2759 
2760     /**
2761     The $(D errno) code for the error which triggered the event.
2762 
2763     This property function may only be called if $(REF Event.type) is either
2764     $(D bindFailed), $(D acceptFailed) or $(D closeFailed).
2765 
2766     Throws:
2767         $(D Error) if the property is called for a wrong event type.
2768     Corresponds_to:
2769         $(D zmq_event_t.data.xyz.addr), where $(D xyz) is the event-specific union.
2770     */
2771     @property int errno() const pure nothrow
2772     {
2773         final switch (m_type) {
2774             case EventType.bindFailed    :
2775             case EventType.acceptFailed  :
2776             case EventType.closeFailed   : return m_value;
2777             case EventType.connected     :
2778             case EventType.connectDelayed:
2779             case EventType.connectRetried:
2780             case EventType.listening     :
2781             case EventType.accepted      :
2782             case EventType.closed        :
2783             case EventType.disconnected  : throw invalidProperty();
2784             case EventType.all           :
2785         }
2786         assert (false);
2787     }
2788 
2789     /**
2790     The reconnect interval.
2791 
2792     This property function may only be called if $(REF Event.type) is
2793     $(D connectRetried).
2794 
2795     Throws:
2796         $(D Error) if the property is called for a wrong event type.
2797     Corresponds_to:
2798         $(D zmq_event_t.data.connect_retried.interval)
2799     */
2800     @property Duration interval() const pure nothrow
2801     {
2802         final switch (m_type) {
2803             case EventType.connectRetried: return m_value.msecs;
2804             case EventType.connected     :
2805             case EventType.connectDelayed:
2806             case EventType.listening     :
2807             case EventType.bindFailed    :
2808             case EventType.accepted      :
2809             case EventType.acceptFailed  :
2810             case EventType.closed        :
2811             case EventType.closeFailed   :
2812             case EventType.disconnected  : throw invalidProperty();
2813             case EventType.all           :
2814         }
2815         assert (false);
2816     }
2817 
2818 private:
2819     this(EventType type, string address, int value) pure nothrow
2820     {
2821         m_type = type;
2822         m_address = address;
2823         m_value = value;
2824     }
2825 
2826     Error invalidProperty(string name = __FUNCTION__)() const pure nothrow
2827     {
2828         try {
2829             import std.conv: text;
2830             return new Error(text("Property '", name,
2831                                   "' not available for event type '",
2832                                   m_type, "'"));
2833         } catch (Exception e) {
2834             assert(false);
2835         }
2836     }
2837 
2838     EventType m_type;
2839     string m_address;
2840     int m_value;
2841 }
2842 
2843 
2844 version (WithLibsodium) {
2845 
2846 /**
2847 Encodes a binary key as Z85 printable text.
2848 
2849 $(D dest) must be an array whose length is at least $(D 5*data.length/4 + 1),
2850 which will be used to store the return value plus a terminating zero byte.
2851 If $(D dest) is omitted, a new array will be created.
2852 
2853 Returns:
2854     An array of size $(D 5*data.length/4) which contains the Z85-encoded text,
2855     excluding the terminating zero byte.  This will be a slice of $(D dest) if
2856     it is provided.
2857 Throws:
2858     $(COREF exception,RangeError) if $(D dest) is given but is too small.$(BR)
2859     $(REF ZmqException) if $(ZMQ) reports an error (i.e., if data.length is not
2860     a multiple of 4).
2861 Corresponds_to:
2862     $(ZMQREF zmq_z85_encode())
2863 */
2864 char[] z85Encode(ubyte[] data, char[] dest)
2865 // TODO: Make data const when we update to ZMQ 4.1
2866 {
2867     import core.exception: RangeError;
2868     immutable len = 5 * data.length / 4;
2869     if (dest.length < len+1) throw new RangeError;
2870     if (trusted!zmq_z85_encode(dest.ptr, data.ptr, data.length) == null) {
2871         throw new ZmqException;
2872     }
2873     return dest[0 .. len];
2874 }
2875 
2876 /// ditto
2877 char[] z85Encode(ubyte[] data)
2878 {
2879     return z85Encode(data, new char[5*data.length/4 + 1]);
2880 }
2881 
2882 @system unittest // @system because of assertThrown
2883 {
2884     // TODO: Make data immutable when we update to ZMQ 4.1
2885     auto data = cast(ubyte[])[0x86, 0x4f, 0xd2, 0x6f, 0xb5, 0x59, 0xf7, 0x5b];
2886     immutable encoded = "HelloWorld";
2887     assert (z85Encode(data) == encoded);
2888 
2889     auto buffer = new char[11];
2890     auto result = z85Encode(data, buffer);
2891     assert (result == encoded);
2892     assert (buffer.ptr == result.ptr);
2893 
2894     import core.exception: RangeError;
2895     import std.exception: assertThrown;
2896     assertThrown!RangeError(z85Encode(data, new char[10]));
2897     assertThrown!ZmqException(z85Encode(cast(ubyte[]) [ 1, 2, 3, 4, 5]));
2898 }
2899 
2900 
2901 /**
2902 Decodes a binary key from Z85 printable text.
2903 
2904 $(D dest) must be an array whose length is at least $(D 4*data.length/5),
2905 which will be used to store the return value.
2906 If $(D dest) is omitted, a new array will be created.
2907 
2908 Note that $(ZMQREF zmq_z85_decode()) expects a zero-terminated string, so a zero
2909 byte will be appended to $(D text) if it does not contain one already.  However,
2910 this may trigger a (possibly unwanted) GC allocation.  To avoid this, either
2911 make sure that the last character in $(D text) is $(D '\0'), or use
2912 $(OBJREF assumeSafeAppend) on the array before calling this function.
2913 
2914 Returns:
2915     An array of size $(D 4*data.length/5) which contains the decoded data.
2916     This will be a slice of $(D dest) if it is provided.
2917 Throws:
2918     $(COREF exception,RangeError) if $(D dest) is given but is too small.$(BR)
2919     $(REF ZmqException) if $(ZMQ) reports an error (i.e., if data.length is not
2920     a multiple of 5).
2921 Corresponds_to:
2922     $(ZMQREF zmq_z85_decode())
2923 */
2924 ubyte[] z85Decode(char[] text, ubyte[] dest)
2925 // TODO: Make text const when we update to ZMQ 4.1
2926 {
2927     import core.exception: RangeError;
2928     immutable len = 4 * text.length/5;
2929     if (dest.length < len) throw new RangeError;
2930     if (text[$-1] != '\0') text ~= '\0';
2931     if (trusted!zmq_z85_decode(dest.ptr, text.ptr) == null) {
2932         throw new ZmqException;
2933     }
2934     return dest[0 .. len];
2935 }
2936 
2937 /// ditto
2938 ubyte[] z85Decode(char[] text)
2939 {
2940     return z85Decode(text, new ubyte[4*text.length/5]);
2941 }
2942 
2943 @system unittest // @system because of assertThrown
2944 {
2945     // TODO: Make data immutable when we update to ZMQ 4.1
2946     auto text = "HelloWorld".dup;
2947     immutable decoded = cast(ubyte[])[0x86, 0x4f, 0xd2, 0x6f, 0xb5, 0x59, 0xf7, 0x5b];
2948     assert (z85Decode(text) == decoded);
2949     assert (z85Decode(text~'\0') == decoded);
2950 
2951     auto buffer = new ubyte[8];
2952     auto result = z85Decode(text, buffer);
2953     assert (result == decoded);
2954     assert (buffer.ptr == result.ptr);
2955 
2956     import core.exception: RangeError;
2957     import std.exception: assertThrown;
2958     assertThrown!RangeError(z85Decode(text, new ubyte[7]));
2959     assertThrown!ZmqException(z85Decode("SizeNotAMultipleOf5".dup));
2960 }
2961 
2962 
2963 /**
2964 Generates a new CURVE key pair.
2965 
2966 To avoid a memory allocation, preallocated buffers may optionally be supplied
2967 for the two keys.  Each of these must have a length of at least 41 bytes, enough
2968 for a 40-character Z85-encoded key plus a terminating zero byte.  If either
2969 buffer is omitted/$(D null), a new one will be created.
2970 
2971 Returns:
2972     A tuple that contains the two keys.  Each of these will have a length of
2973     40 characters, and will be slices of the input buffers if such have been
2974     provided.
2975 Throws:
2976     $(COREF exception,RangeError) if $(D publicKeyBuf) or $(D secretKeyBuf) are
2977         not $(D null) but have a length of less than 41 characters.$(BR)
2978     $(REF ZmqException) if $(ZMQ) reports an error.
2979 Corresponds_to:
2980     $(ZMQREF zmq_curve_keypair())
2981 */
2982 Tuple!(char[], "publicKey", char[], "secretKey")
2983     curveKeyPair(char[] publicKeyBuf = null, char[] secretKeyBuf = null)
2984 {
2985     import core.exception: RangeError;
2986     if (publicKeyBuf is null)           publicKeyBuf = new char[41];
2987     else if (publicKeyBuf.length < 41)  throw new RangeError;
2988     if (secretKeyBuf is null)           secretKeyBuf = new char[41];
2989     else if (secretKeyBuf.length < 41)  throw new RangeError;
2990 
2991     import deimos.zmq.utils: zmq_curve_keypair;
2992     if (trusted!zmq_curve_keypair(publicKeyBuf.ptr, secretKeyBuf.ptr) != 0) {
2993         throw new ZmqException;
2994     }
2995     return typeof(return)(publicKeyBuf[0 .. 40], secretKeyBuf[0 .. 40]);
2996 }
2997 
2998 ///
2999 unittest
3000 {
3001     auto server = Socket(SocketType.rep);
3002     auto serverKeys = curveKeyPair();
3003     server.curveServer = true;
3004     server.curveSecretKeyZ85 = serverKeys.secretKey;
3005     server.bind("inproc://curveKeyPair_test");
3006 
3007     auto client = Socket(SocketType.req);
3008     auto clientKeys = curveKeyPair();
3009     client.curvePublicKeyZ85 = clientKeys.publicKey;
3010     client.curveSecretKeyZ85 = clientKeys.secretKey;
3011     client.curveServerKeyZ85 = serverKeys.publicKey;
3012     client.connect("inproc://curveKeyPair_test");
3013     client.send("hello");
3014 
3015     ubyte[5] buf;
3016     assert (server.receive(buf) == 5);
3017     assert (buf.asString() == "hello");
3018 }
3019 
3020 @system unittest
3021 {
3022     auto k1 = curveKeyPair();
3023     assert (k1.publicKey.length == 40);
3024     assert (k1.secretKey.length == 40);
3025 
3026     char[82] buf;
3027     auto k2 = curveKeyPair(buf[0 .. 41], buf[41 .. 82]);
3028     assert (k2.publicKey.length == 40);
3029     assert (k2.publicKey.ptr == buf.ptr);
3030     assert (k2.secretKey.length == 40);
3031     assert (k2.secretKey.ptr == buf.ptr + 41);
3032 
3033     char[82] backup = buf;
3034     import core.exception, std.exception;
3035     assertThrown!RangeError(curveKeyPair(buf[0 .. 40], buf[41 .. 82]));
3036     assertThrown!RangeError(curveKeyPair(buf[0 .. 41], buf[42 .. 82]));
3037     assert (backup[] == buf[]);
3038 }
3039 
3040 } //version (WithLibsodium)
3041 
3042 
3043 /**
3044 Utility function which interprets and validates a byte array as a UTF-8 string.
3045 
3046 Most of $(ZMQD)'s message API deals in $(D ubyte[]) arrays, but very often,
3047 the message _data contains plain text.  $(D asString()) allows for easy and
3048 safe interpretation of raw _data as characters.  It checks that $(D data) is
3049 a valid UTF-8 encoded string, and returns a $(D char[]) array that refers to
3050 the same memory region.
3051 
3052 Throws:
3053     $(STDREF utf,UTFException) if $(D data) is not a valid UTF-8 string.
3054 See_also:
3055     $(STDREF string,representation), which performs the opposite operation.
3056 */
3057 inout(char)[] asString(inout(ubyte)[] data) pure
3058 {
3059     auto s = cast(typeof(return)) data;
3060     import std.utf: validate;
3061     validate(s);
3062     return s;
3063 }
3064 
3065 ///
3066 unittest
3067 {
3068     auto s1 = Socket(SocketType.pair);
3069     s1.bind("inproc://zmqd_asString_example");
3070     auto s2 = Socket(SocketType.pair);
3071     s2.connect("inproc://zmqd_asString_example");
3072 
3073     auto msg = Frame(12);
3074     msg.data.asString()[] = "Hello World!";
3075     s1.send(msg);
3076 
3077     ubyte[12] buf;
3078     s2.receive(buf);
3079     assert(buf.asString() == "Hello World!");
3080 }
3081 
3082 unittest
3083 {
3084     auto bytes = cast(ubyte[]) ['f', 'o', 'o'];
3085     auto text = bytes.asString();
3086     assert (text == "foo");
3087     assert (cast(void*) bytes.ptr == cast(void*) text.ptr);
3088 
3089     import std.exception: assertThrown;
3090     import std.utf: UTFException;
3091     auto b = cast(ubyte[]) [100, 252, 1];
3092     assertThrown!UTFException(asString(b));
3093 }
3094 
3095 
3096 /**
3097 A class for exceptions thrown when any of the underlying $(ZMQ) C functions
3098 report an error.
3099 
3100 The exception provides a standard error message obtained with
3101 $(ZMQREF zmq_strerror()), as well as the $(D errno) code set by the $(ZMQ)
3102 function which reported the error.
3103 */
3104 class ZmqException : Exception
3105 {
3106     /**
3107     The $(D errno) code that was set by the $(ZMQ) function that reported
3108     the error.
3109 
3110     Corresponds_to:
3111         $(ZMQREF zmq_errno())
3112     */
3113     immutable int errno;
3114 
3115 private:
3116     this(string file = __FILE__, int line = __LINE__) nothrow
3117     {
3118         import core.stdc.errno, std.conv;
3119         this.errno = core.stdc.errno.errno;
3120         string msg;
3121         try {
3122             msg = trusted!(to!string)(trusted!zmq_strerror(this.errno));
3123         } catch (Exception e) { /* We never get here */ }
3124         assert(msg.length);     // Still, let's assert as much.
3125         super(msg, file, line);
3126     }
3127 }
3128 
3129 
3130 /**
3131 Exception thrown by $(FREF receiveEvent) on failure to interpret a
3132 received message as an event description.
3133 */
3134 class InvalidEventException : Exception
3135 {
3136 private:
3137     this(string file = __FILE__, int line = __LINE__) nothrow
3138     {
3139         super("The received message is not an event message", file, line);
3140     }
3141 }
3142 
3143 
3144 // =============================================================================
3145 // Everything below is internal
3146 // =============================================================================
3147 private:
3148 
3149 
3150 struct SharedResource
3151 {
3152 @safe:
3153     alias Exception function(shared(void)*) nothrow Release;
3154 
3155     this(shared(void)* ptr, Release release) nothrow
3156         in { assert(ptr); } body
3157     {
3158         m_payload = new shared(Payload)(1, ptr, release);
3159     }
3160 
3161     this(this) nothrow
3162     {
3163         if (m_payload) {
3164             incRefCount();
3165         }
3166     }
3167 
3168     ~this() nothrow
3169     {
3170         nothrowDetach();
3171     }
3172 
3173     ref SharedResource opAssign(SharedResource rhs)
3174     {
3175         detach();
3176         m_payload = rhs.m_payload;
3177         rhs.m_payload = null;
3178         return this;
3179     }
3180 
3181     void detach()
3182     {
3183         if (m_payload) {
3184             if (auto ex = nothrowDetach()) throw ex;
3185         }
3186     }
3187 
3188     @property inout(shared(void))* handle() inout pure nothrow
3189     {
3190         if (m_payload) {
3191             return m_payload.handle;
3192         } else {
3193             return null;
3194         }
3195     }
3196 
3197 private:
3198     void incRefCount() @trusted nothrow
3199     {
3200         assert (m_payload !is null && m_payload.refCount > 0);
3201         import core.atomic: atomicOp;
3202         atomicOp!"+="(m_payload.refCount, 1);
3203     }
3204 
3205     int decRefCount() @trusted nothrow
3206     {
3207         assert (m_payload !is null && m_payload.refCount > 0);
3208         import core.atomic: atomicOp;
3209         return atomicOp!"-="(m_payload.refCount, 1);
3210     }
3211 
3212     Exception nothrowDetach() @trusted nothrow
3213         out { assert (m_payload is null); }
3214         body
3215     {
3216         if (m_payload) {
3217             scope(exit) m_payload = null;
3218             if (decRefCount() < 1) return m_payload.release(m_payload.handle);
3219         }
3220         return null;
3221     }
3222 
3223     struct Payload
3224     {
3225         int refCount;
3226         void* handle;
3227         Release release;
3228     }
3229     shared(Payload)* m_payload;
3230 
3231     invariant()
3232     {
3233         assert (m_payload is null || (m_payload.refCount > 0 &&
3234             m_payload.handle !is null && m_payload.release !is null));
3235     }
3236 }
3237 
3238 @system unittest
3239 {
3240     import std.exception: assertNotThrown, assertThrown;
3241     static Exception myFree(shared(void)* p) @trusted nothrow
3242     {
3243         auto v = cast(shared(int)*) p;
3244         if (*v == 0) {
3245             return new Exception("double release");
3246         } else {
3247             *v = 0;
3248             return null;
3249         }
3250     }
3251 
3252     shared int i = 1;
3253 
3254     {
3255         // Test constructor and properties.
3256         auto ra = SharedResource(&i, &myFree);
3257         assert (i == 1);
3258         assert (ra.handle == &i);
3259 
3260         // Test postblit constructor
3261         auto rb = ra;
3262         assert (i == 1);
3263         assert (rb.handle == &i);
3264 
3265         {
3266             // Test properties and free() with default-initialized object.
3267             SharedResource rc;
3268             assert (rc.handle == null);
3269             assertNotThrown(rc.detach());
3270 
3271             // Test assignment, both with and without detachment
3272             rc = rb;
3273             assert (i == 1);
3274             assert (rc.handle == &i);
3275 
3276             shared int j = 2;
3277             auto rd = SharedResource(&j, &myFree);
3278             assert (rd.handle == &j);
3279             rd = rb;
3280             assert (j == 0);
3281             assert (i == 1);
3282             assert (rd.handle == &i);
3283 
3284             // Test explicit detach()
3285             shared int k = 3;
3286             auto re = SharedResource(&k, &myFree);
3287             assertNotThrown(re.detach());
3288             assert(k == 0);
3289 
3290             // Test failure to free and assign (myFree(&k) fails when k == 0)
3291             re = SharedResource(&k, &myFree);
3292             assertThrown!Exception(re.detach()); // We defined free(k == 0) as an error
3293             re = SharedResource(&k, &myFree);
3294             assertThrown!Exception(re = rb);
3295         }
3296 
3297         // i should not be "freed" yet
3298         assert (i == 1);
3299         assert (ra.handle == &i);
3300         assert (rb.handle == &i);
3301     }
3302     // ...but now it should.
3303     assert (i == 0);
3304 }
3305 
3306 // Thread safety test
3307 @system unittest
3308 {
3309     enum threadCount = 100;
3310     enum copyCount = 1000;
3311     static Exception myFree(shared(void)* p) @trusted nothrow
3312     {
3313         auto v = cast(shared(int)*) p;
3314         if (*v == 0) {
3315             return new Exception("double release");
3316         } else {
3317             *v = 0;
3318             return null;
3319         }
3320     }
3321     shared int raw = 1;
3322     {
3323         auto rs = SharedResource(&raw, &myFree);
3324 
3325         import core.thread;
3326         auto group = new ThreadGroup;
3327         foreach (i; 0 .. threadCount) {
3328             group.create(() {
3329                 auto a = rs;
3330                 foreach (j; 0 .. copyCount) {
3331                     auto b = a;
3332                     assert (b.handle == &raw);
3333                     assert (raw == 1);
3334                 }
3335             });
3336         }
3337         group.joinAll();
3338         assert (rs.handle == &raw);
3339         assert (raw == 1);
3340     }
3341     assert (raw == 0);
3342 }
3343 
3344 
3345 version(unittest) private string uniqueUrl(string p, int n = __LINE__)
3346 {
3347     import std.uuid;
3348     return p ~ "://" ~ randomUUID().toString();
3349 }
3350 
3351 
3352 private auto trusted(alias func, Args...)(auto ref Args args) @trusted
3353 {
3354     return func(args);
3355 }
3356 
3357 
3358 // std.string.toStringz() is unsafe, so we provide our own implementation
3359 // tailored to the string sizes we are likely to encounter here.
3360 // Note that this implementation requires that the string be used immediately
3361 // upon return, and not stored, as the buffer will be reused most of the time.
3362 const(char)* zeroTermString(const char[] s) nothrow
3363 {
3364     import std.algorithm: max;
3365     static char[] buf;
3366     immutable len = s.length + 1;
3367     if (buf.length < len) buf.length = max(len, 1023);
3368     buf[0 .. s.length] = s;
3369     buf[s.length] = '\0';
3370     return buf.ptr;
3371 }
3372 
3373 @system unittest
3374 {
3375     auto c1 = zeroTermString("Hello World!");
3376     assert (c1[0 .. 13] == "Hello World!\0");
3377     auto c2 = zeroTermString("foo");
3378     assert (c2[0 .. 4] == "foo\0");
3379     assert (c1 == c2);
3380 }