1 /*
2 This Source Code Form is subject to the terms of the Mozilla Public
3 License, v. 2.0. If a copy of the MPL was not distributed with this
4 file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 */
6 
7 /**
8 $(ZMQD) – a thin wrapper around the low-level C API of the
9 $(LINK2 http://zeromq.org,$(ZMQ)) messaging framework.
10 
11 Most functions in this module have a one-to-one relationship with functions
12 in the underlying C API.  Some adaptations have been made to make the API
13 safer, easier and more pleasant to use; most importantly:
14 $(UL
15     $(LI
16         Errors are signalled by means of exceptions rather than return
17         codes.  The $(REF ZmqException) class provides
18         a standard textual message for any error condition, but it also
19         provides access to the $(D errno) code set by the C function
20         that reported the error.)
21     $(LI
22         Functions are marked with $(D @safe), $(D pure) and $(D nothrow)
23         as appropriate, thus facilitating their use in high-level D code.)
24     $(LI
25         Memory and resources (i.e. contexts, sockets and messages) are
26         automatically managed, thus preventing leaks.)
27     $(LI
28         Context, socket and message options are implemented as properties.)
29 )
30 The names of functions and types in $(ZMQD) are very similar to those in
31 $(ZMQ), but they follow the D naming conventions.  Thus, the library should
32 feel both familiar to $(ZMQ) users and natural to D users.  A notable
33 deviation from the C API is that message parts are consistently called
34 "frames".  For example, $(D zmq_msg_send()) becomes $(D zmqd.Frame.send())
35 and so on.  (Multipart messages were a late addition to $(ZMQ), and the "msg"
36 function names were well established at that point.  The library's
37 developers have admitted that this is somewhat confusing, and the newer
38 CZMQ API consistently uses "frame" in function names.)
39 
40 Due to the close correspondence with the C API, this documentation has
41 intentionally been kept sparse. There is really no reason to repeat the
42 contents of the $(ZMQAPI __start,$(ZMQ) reference manual) here.
43 Instead, the documentation for each function contains a "Corresponds to"
44 section that links to the appropriate pages in the $(ZMQ) reference.  Any
45 details given in the present documentation mostly concern the D-specific
46 adaptations that have been made.
47 
48 Also note that the examples generally only use the INPROC transport.  The
49 reason for this is that the examples double as unittests, and we want to
50 avoid firewall troubles and other issues that could arise with the use of
51 network protocols such as TCP, PGM, etc., and the IPC protocol is not
52 supported on Windows.  Anyway, they are only short
53 snippets that demonstrate the syntax; for more comprehensive and realistic
54 examples, please refer to the $(LINK2 http://zguide.zeromq.org/page:all,
55 $(ZMQ) Guide).  Many of the examples in the Guide have been translated to
56 D, and can be found in the
57 $(LINK2 https://github.com/kyllingstad/zmqd/tree/master/examples,$(D examples))
58 subdirectory of the $(ZMQD) source repository.
59 
60 Version:
61     1.1.0 (compatible with $(ZMQ) >= 4.0.0)
62 Authors:
63     $(LINK2 http://github.com/kyllingstad,Lars T. Kyllingstad)
64 Copyright:
65     Copyright (c) 2013–2016, Lars T. Kyllingstad. All rights reserved.
66 License:
67     $(ZMQD) is released under the terms of the
68     $(LINK2 https://www.mozilla.org/MPL/2.0/index.txt,Mozilla Public License v. 2.0).$(BR)
69     Please refer to the $(LINK2 http://zeromq.org/area:licensing,$(ZMQ) site)
70     for details about $(ZMQ) licensing.
71 Macros:
72     D      = <code>$0</code>
73     EM     = <em>$0</em>
74     LDOTS  = &hellip;
75     QUOTE  = <blockquote>$0</blockquote>
76     FREF   = $(D $(LINK2 #$1,$1()))
77     REF    = $(D $(LINK2 #$1,$1))
78     COREF  = $(D $(LINK2 http://dlang.org/phobos/core_$1.html#.$2,core.$1.$2))
79     OBJREF = $(D $(LINK2 http://dlang.org/phobos/object.html#.$1,$1))
80     STDREF = $(D $(LINK2 http://dlang.org/phobos/std_$1.html#.$2,std.$1.$2))
81     ANCHOR = <span id="$1"></span>
82     ZMQ    = ZeroMQ
83     ZMQAPI = $(LINK2 http://api.zeromq.org/4-0:$1,$+)
84     ZMQD   = $(EM zmqd)
85     ZMQREF = $(D $(ZMQAPI $1,$1))
86 */
87 module zmqd;
88 
89 import core.time;
90 import std.typecons;
91 import deimos.zmq.zmq;
92 
93 
94 version(Windows) {
95     import core.sys.windows.winsock2: SOCKET;
96 }
97 
98 // Include ZMQ >= 4.1 features?
99 private enum ZMQ41 = ZMQ_MAKE_VERSION(4, 1, 0);
100 
101 // Compile with debug=ZMQD_DisableCurveTests to be able to successfully
102 // run unittests even if the local ZMQ library is not compiled with Curve
103 // support.
104 debug (ZMQD_DisableCurveTests) { } else debug = WithCurveTests;
105 
106 // Compatibility check
107 version(unittest) static this()
108 {
109     import std.stdio: stderr;
110     const v = zmqVersion();
111     if (v.major != ZMQ_VERSION_MAJOR || v.minor != ZMQ_VERSION_MINOR) {
112         stderr.writefln(
113             "Warning: Potential ZeroMQ header/library incompatibility: "
114             ~"The header (binding) is for version %d.%d.%d, "
115             ~"while the library is version %d.%d.%d. Unittests may fail.",
116             ZMQ_VERSION_MAJOR, ZMQ_VERSION_MINOR, ZMQ_VERSION_PATCH,
117             v.major, v.minor, v.patch);
118     }
119     // Known incompatibilities
120     import std.algorithm: min, max;
121     const libVersion = ZMQ_MAKE_VERSION(v.major, v.minor, v.patch);
122     const older = min(ZMQ_VERSION, libVersion);
123     const newer = max(ZMQ_VERSION, libVersion);
124     if (older < ZMQ_MAKE_VERSION(4, 1, 0) && newer >= ZMQ_MAKE_VERSION(4, 1, 0)) {
125         stderr.writeln("Note: Version 4.1.0 is known to be ABI incompatible with older versions");
126     } else if (older < ZMQ_MAKE_VERSION(4, 1, 1) && newer >= ZMQ_MAKE_VERSION(4, 1, 1)) {
127         stderr.writeln("Note: Version 4.1.1 is known to be ABI incompatible with older versions");
128     }
129 }
130 
131 
132 @safe:
133 
134 
135 /**
136 Reports the $(ZMQ) library version.
137 
138 Returns:
139     A $(STDREF typecons,Tuple) with three integer fields that represent the
140     three versioning levels: $(D major), $(D minor) and $(D patch).
141 Corresponds_to:
142     $(ZMQREF zmq_version())
143 */
144 Tuple!(int, "major", int, "minor", int, "patch") zmqVersion() nothrow
145 {
146     typeof(return) v;
147     trusted!zmq_version(&v.major, &v.minor, &v.patch);
148     return v;
149 }
150 
151 
152 static if (ZMQ_VERSION >= ZMQ41) {
153     /**
154     Checks for a $(ZMQ) capability.
155 
156     Corresponds_to:
157         $(ZMQREF zmq_has())
158     */
159     bool zmqHas(const char[] capability) nothrow
160     {
161         return 0 != trusted!zmq_has(zeroTermString(capability));
162     }
163 
164     unittest
165     {
166         assert (!zmqHas("this ain't a capability"));
167     }
168 }
169 
170 
171 /**
172 The various socket types.
173 
174 These are described in the $(ZMQREF zmq_socket()) reference.
175 */
176 enum SocketType
177 {
178     req     = ZMQ_REQ,      /// Corresponds to $(D ZMQ_REQ)
179     rep     = ZMQ_REP,      /// Corresponds to $(D ZMQ_REP)
180     dealer  = ZMQ_DEALER,   /// Corresponds to $(D ZMQ_DEALER)
181     router  = ZMQ_ROUTER,   /// Corresponds to $(D ZMQ_ROUTER)
182     pub     = ZMQ_PUB,      /// Corresponds to $(D ZMQ_PUB)
183     sub     = ZMQ_SUB,      /// Corresponds to $(D ZMQ_SUB)
184     xpub    = ZMQ_XPUB,     /// Corresponds to $(D ZMQ_XPUB)
185     xsub    = ZMQ_XSUB,     /// Corresponds to $(D ZMQ_XSUB)
186     push    = ZMQ_PUSH,     /// Corresponds to $(D ZMQ_PUSH)
187     pull    = ZMQ_PULL,     /// Corresponds to $(D ZMQ_PULL)
188     pair    = ZMQ_PAIR,     /// Corresponds to $(D ZMQ_PAIR)
189     stream  = ZMQ_STREAM,   /// Corresponds to $(D ZMQ_STREAM)
190 }
191 
192 
193 /// _Security mechanisms.
194 enum Security
195 {
196     /// $(ZMQAPI zmq_null,NULL): No security or confidentiality.
197     none  = ZMQ_NULL,
198 
199     /// $(ZMQAPI zmq_plain,PLAIN): Clear-text authentication.
200     plain = ZMQ_PLAIN,
201 
202     /// $(ZMQAPI zmq_curve,CURVE): Secure authentication and confidentiality.
203     curve = ZMQ_CURVE,
204 }
205 
206 
207 /**
208 A special $(COREF time,Duration) value used to signify an infinite
209 timeout or time interval in certain contexts.
210 
211 The places where this value may be used are:
212 $(UL
213     $(LI $(LINK2 #Socket.misc_options,certain socket options))
214     $(LI $(FREF poll))
215 )
216 
217 Note:
218 Since $(D core.time.Duration) doesn't reserve such a special value, the
219 actual value of $(D infiniteDuration) is $(COREF time,Duration.max).
220 */
221 enum infiniteDuration = Duration.max;
222 
223 
224 /**
225 An object that encapsulates a $(ZMQ) socket.
226 
227 A default-initialized $(D Socket) is not a valid $(ZMQ) socket; it
228 must always be explicitly initialized with a constructor (see
229 $(FREF _Socket.this)):
230 ---
231 Socket s;                     // Not a valid socket yet
232 s = Socket(SocketType.push);  // ...but now it is.
233 ---
234 This $(D struct) is noncopyable, which means that a socket is always
235 uniquely managed by a single $(D Socket) object.  Functions that will
236 inspect or use the socket, but not take ownership of it, should take
237 a $(D ref Socket) parameter.  Use $(STDREF algorithm,move) to move
238 a $(D Socket) to a different location (e.g. into a sink function that
239 takes it by value, or into a new variable).
240 
241 The socket is automatically closed when the $(D Socket) object goes out
242 of scope.
243 
244 Linger_period:
245 Note that Socket by default sets the socket's linger period to zero.
246 This deviates from the $(ZMQ) default (which is an infinite linger period).
247 */
248 struct Socket
249 {
250 @safe:
251     /**
252     Creates a new $(ZMQ) socket.
253 
254     If $(D context) is not specified, the default _context (as returned
255     by $(FREF defaultContext)) is used.
256 
257     Throws:
258         $(REF ZmqException) if $(ZMQ) reports an error.
259     Corresponds_to:
260         $(ZMQREF zmq_socket())
261     */
262     this(SocketType type)
263     {
264         this(defaultContext(), type);
265     }
266 
267     /// ditto
268     this(Context context, SocketType type)
269     {
270         m_context = context;
271         m_type = type;
272         m_socket = trusted!zmq_socket(context.handle, type);
273         if (m_socket == null) {
274             throw new ZmqException;
275         }
276         linger = 0.msecs;
277     }
278 
279     /// With default context:
280     unittest
281     {
282         auto sck = Socket(SocketType.push);
283         assert (sck.initialized);
284     }
285     /// With explicit context:
286     unittest
287     {
288         auto ctx = Context();
289         auto sck = Socket(ctx, SocketType.push);
290         assert (sck.initialized);
291     }
292 
293     // Socket objects are noncopyable.
294     @disable this(this);
295 
296     unittest // Verify that move semantics work.
297     {
298         import std.algorithm: move;
299         struct SocketOwner
300         {
301             this(Socket s) { owned = trusted!move(s); }
302             Socket owned;
303         }
304         auto socket = Socket(SocketType.req);
305         const socketPtr = socket.handle;
306         assert (socketPtr != null);
307 
308         auto owner = SocketOwner(trusted!move(socket));
309         assert (socket.handle == null);
310         assert (!socket.m_context.initialized);
311         assert (owner.owned.handle == socketPtr);
312         assert (owner.owned.m_context.initialized);
313     }
314 
315     // Closes the socket on desctruction.
316     ~this() nothrow { nothrowClose(); }
317 
318     /**
319     Closes the $(ZMQ) socket.
320 
321     Note that the socket will be closed automatically upon destruction,
322     so it is usually not necessary to call this method manually.
323 
324     Throws:
325         $(REF ZmqException) if $(ZMQ) reports an error.
326     Corresponds_to:
327         $(ZMQREF zmq_close())
328     */
329     void close()
330     {
331         if (!nothrowClose()) throw new ZmqException;
332     }
333 
334     ///
335     unittest
336     {
337         auto s = Socket(SocketType.pair);
338         assert (s.initialized);
339         s.close();
340         assert (!s.initialized);
341     }
342 
343     /**
344     Starts accepting incoming connections on $(D endpoint).
345 
346     Throws:
347         $(REF ZmqException) if $(ZMQ) reports an error.
348     Corresponds_to:
349         $(ZMQREF zmq_bind())
350     */
351     void bind(const char[] endpoint)
352     {
353         if (trusted!zmq_bind(m_socket, zeroTermString(endpoint)) != 0) {
354             throw new ZmqException;
355         }
356     }
357 
358     ///
359     unittest
360     {
361         auto s = Socket(SocketType.pub);
362         s.bind("inproc://zmqd_bind_example");
363     }
364 
365     /**
366     Stops accepting incoming connections on $(D endpoint).
367 
368     Throws:
369         $(REF ZmqException) if $(ZMQ) reports an error.
370     Corresponds_to:
371         $(ZMQREF zmq_unbind())
372     */
373     void unbind(const char[] endpoint)
374     {
375         if (trusted!zmq_unbind(m_socket, zeroTermString(endpoint)) != 0) {
376             throw new ZmqException;
377         }
378     }
379 
380     // TODO: Remove version(Posix) and change to INPROC when updating to ZMQ 4.1.
381     //       IPC does not work on Windows, and unbind() does not work with INPROC.
382     //       See: https://github.com/zeromq/libzmq/issues/949
383     ///
384     version (Posix) unittest
385     {
386         auto s = Socket(SocketType.pub);
387         s.bind("ipc://zmqd_unbind_example");
388         // Do some work...
389         s.unbind("ipc://zmqd_unbind_example");
390     }
391 
392     /**
393     Creates an outgoing connection to $(D endpoint).
394 
395     Throws:
396         $(REF ZmqException) if $(ZMQ) reports an error.
397     Corresponds_to:
398         $(ZMQREF zmq_connect())
399     */
400     void connect(const char[] endpoint)
401     {
402         if (trusted!zmq_connect(m_socket, zeroTermString(endpoint)) != 0) {
403             throw new ZmqException;
404         }
405     }
406 
407     ///
408     unittest
409     {
410         auto s = Socket(SocketType.sub);
411         s.connect("inproc://zmqd_connect_example");
412     }
413 
414     /**
415     Disconnects the socket from $(D endpoint).
416 
417     Throws:
418         $(REF ZmqException) if $(ZMQ) reports an error.
419     Corresponds_to:
420         $(ZMQREF zmq_disconnect())
421     */
422     void disconnect(const char[] endpoint)
423     {
424         if (trusted!zmq_disconnect(m_socket, zeroTermString(endpoint)) != 0) {
425             throw new ZmqException;
426         }
427     }
428 
429     ///
430     unittest
431     {
432         auto s = Socket(SocketType.sub);
433         s.connect("inproc://zmqd_disconnect_example");
434         // Do some work...
435         s.disconnect("inproc://zmqd_disconnect_example");
436     }
437 
438     /**
439     Sends a message frame.
440 
441     $(D _send) blocks until the frame has been queued on the socket.
442     $(D trySend) performs the operation in non-blocking mode, and returns
443     a $(D bool) value that signifies whether the frame was queued on the
444     socket.
445 
446     The $(D more) parameter specifies whether this is a multipart message
447     and there are _more frames to follow.
448 
449     The $(D char[]) overload is a convenience function that simply casts
450     the string argument to $(D ubyte[]).
451 
452     Throws:
453         $(REF ZmqException) if $(ZMQ) reports an error.
454     Corresponds_to:
455         $(ZMQREF zmq_send()) (with the $(D ZMQ_DONTWAIT) flag, in the
456         case of $(D trySend), and with the $(D ZMQ_SNDMORE) flag if
457         $(D more == true)).
458     */
459     void send(const ubyte[] data, bool more = false)
460     {
461         immutable flags = more ? ZMQ_SNDMORE : 0;
462         if (trusted!zmq_send(m_socket, data.ptr, data.length, flags) < 0) {
463             throw new ZmqException;
464         }
465     }
466 
467     /// ditto
468     void send(const char[] data, bool more = false)
469     {
470         send(cast(const(ubyte)[]) data, more);
471     }
472 
473     /// ditto
474     bool trySend(const ubyte[] data, bool more = false)
475     {
476         immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0);
477         if (trusted!zmq_send(m_socket, data.ptr, data.length, flags) < 0) {
478             import core.stdc.errno;
479             if (errno == EAGAIN) return false;
480             else throw new ZmqException;
481         }
482         return true;
483     }
484 
485     /// ditto
486     bool trySend(const char[] data, bool more = false)
487     {
488         return trySend(cast(const(ubyte)[]) data, more);
489     }
490 
491     ///
492     unittest
493     {
494         auto sck = Socket(SocketType.pub);
495         sck.send(cast(ubyte[]) [11, 226, 92]);
496         sck.send("Hello World!");
497     }
498 
499     /**
500     Sends a message frame.
501 
502     $(D _send) blocks until the frame has been queued on the socket.
503     $(D trySend) performs the operation in non-blocking mode, and returns
504     a $(D bool) value that signifies whether the frame was queued on the
505     socket.
506 
507     The $(D more) parameter specifies whether this is a multipart message
508     and there are _more frames to follow.
509 
510     Throws:
511         $(REF ZmqException) if $(ZMQ) reports an error.
512     Corresponds_to:
513         $(ZMQREF zmq_msg_send()) (with the $(D ZMQ_DONTWAIT) flag, in the
514         case of $(D trySend), and with the $(D ZMQ_SNDMORE) flag if
515         $(D more == true)).
516     */
517     void send(ref Frame msg, bool more = false)
518     {
519         immutable flags = more ? ZMQ_SNDMORE : 0;
520         if (trusted!zmq_msg_send(msg.handle, m_socket, flags) < 0) {
521             throw new ZmqException;
522         }
523     }
524 
525     /// ditto
526     bool trySend(ref Frame msg, bool more = false)
527     {
528         immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0);
529         if (trusted!zmq_msg_send(msg.handle, m_socket, flags) < 0) {
530             import core.stdc.errno;
531             if (errno == EAGAIN) return false;
532             else throw new ZmqException;
533         }
534         return true;
535     }
536 
537     ///
538     unittest
539     {
540         auto sck = Socket(SocketType.pub);
541         auto msg = Frame(12);
542         msg.data.asString()[] = "Hello World!";
543         sck.send(msg);
544     }
545 
546     /**
547     Sends a constant-memory message frame.
548 
549     $(D _sendConst) blocks until the frame has been queued on the socket.
550     $(D trySendConst) performs the operation in non-blocking mode, and returns
551     a $(D bool) value that signifies whether the frame was queued on the
552     socket.
553 
554     The $(D more) parameter specifies whether this is a multipart message
555     and there are _more frames to follow.
556 
557     Throws:
558         $(REF ZmqException) if $(ZMQ) reports an error.
559     Corresponds_to:
560         $(ZMQREF zmq_send_const()) (with the $(D ZMQ_DONTWAIT) flag, in the
561         case of $(D trySend), and with the $(D ZMQ_SNDMORE) flag if
562         $(D more == true)).
563     */
564     void sendConst(immutable ubyte[] data, bool more = false)
565     {
566         immutable flags = more ? ZMQ_SNDMORE : 0;
567         if (trusted!zmq_send_const(m_socket, data.ptr, data.length, flags) < 0) {
568             throw new ZmqException;
569         }
570     }
571 
572     /// ditto
573     void sendConst(string data, bool more = false)
574     {
575         sendConst(cast(immutable(ubyte)[]) data, more);
576     }
577 
578     /// ditto
579     bool trySendConst(immutable ubyte[] data, bool more = false)
580     {
581         immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0);
582         if (trusted!zmq_send_const(m_socket, data.ptr, data.length, flags) < 0) {
583             import core.stdc.errno;
584             if (errno == EAGAIN) return false;
585             else throw new ZmqException;
586         }
587         return true;
588     }
589 
590     /// ditto
591     bool trySendConst(string data, bool more = false)
592     {
593         return trySend(cast(immutable(ubyte)[]) data, more);
594     }
595 
596     ///
597     unittest
598     {
599         static immutable arr = cast(ubyte[]) [11, 226, 92];
600         auto sck = Socket(SocketType.pub);
601         sck.sendConst(arr);
602         sck.sendConst("Hello World!");
603     }
604 
605     /**
606     Receives a message frame.
607 
608     $(D _receive) blocks until the request can be satisfied, and returns the
609     number of bytes in the frame.
610     $(D tryReceive) performs the operation in non-blocking mode, and returns
611     a $(STDREF typecons,Tuple) which contains the size of the frame along
612     with a $(D bool) value that signifies whether a frame was received.
613     (If the latter is $(D false), the former is always set to zero.)
614 
615     Throws:
616         $(REF ZmqException) if $(ZMQ) reports an error.
617     Corresponds_to:
618         $(ZMQREF zmq_recv()) (with the $(D ZMQ_DONTWAIT) flag, in the case
619         of $(D tryReceive)).
620 
621     */
622     size_t receive(ubyte[] data)
623     {
624         immutable len = trusted!zmq_recv(m_socket, data.ptr, data.length, 0);
625         if (len >= 0) {
626             import std.conv;
627             return to!size_t(len);
628         } else {
629             throw new ZmqException;
630         }
631     }
632 
633     /// ditto
634     Tuple!(size_t, bool) tryReceive(ubyte[] data)
635     {
636         immutable len = trusted!zmq_recv(m_socket, data.ptr, data.length, ZMQ_DONTWAIT);
637         if (len >= 0) {
638             import std.conv;
639             return typeof(return)(to!size_t(len), true);
640         } else {
641             import core.stdc.errno;
642             if (errno == EAGAIN) {
643                 return typeof(return)(0, false);
644             } else {
645                 throw new ZmqException;
646             }
647         }
648     }
649 
650     ///
651     unittest
652     {
653         // Sender
654         auto snd = Socket(SocketType.req);
655         snd.connect("inproc://zmqd_receive_example");
656         snd.send("Hello World!");
657 
658         // Receiver
659         import std.string: representation;
660         auto rcv = Socket(SocketType.rep);
661         rcv.bind("inproc://zmqd_receive_example");
662         char[256] buf;
663         immutable len  = rcv.receive(buf.representation);
664         assert (buf[0 .. len] == "Hello World!");
665     }
666 
667     @system unittest
668     {
669         auto snd = Socket(SocketType.pair);
670         snd.bind("inproc://zmqd_tryReceive_example");
671         auto rcv = Socket(SocketType.pair);
672         rcv.connect("inproc://zmqd_tryReceive_example");
673 
674         ubyte[256] buf;
675         auto r1 = rcv.tryReceive(buf);
676         assert (!r1[1]);
677 
678         import core.thread, core.time, std.string;
679         snd.send("Hello World!");
680         Thread.sleep(100.msecs); // Wait for message to be transferred...
681         auto r2 = rcv.tryReceive(buf);
682         assert (r2[1] && buf[0 .. r2[0]] == "Hello World!".representation);
683     }
684 
685     /**
686     Receives a message frame.
687 
688     $(D _receive) blocks until the request can be satisfied, and returns the
689     number of bytes in the frame.
690     $(D tryReceive) performs the operation in non-blocking mode, and returns
691     a $(STDREF typecons,Tuple) which contains the size of the frame along
692     with a $(D bool) value that signifies whether a frame was received.
693     (If the latter is $(D false), the former is always set to zero.)
694 
695     Throws:
696         $(REF ZmqException) if $(ZMQ) reports an error.
697     Corresponds_to:
698         $(ZMQREF zmq_msg_recv()) (with the $(D ZMQ_DONTWAIT) flag, in the case
699         of $(D tryReceive)).
700 
701     */
702     size_t receive(ref Frame msg)
703     {
704         immutable len = trusted!zmq_msg_recv(msg.handle, m_socket, 0);
705         if (len >= 0) {
706             import std.conv;
707             return to!size_t(len);
708         } else {
709             throw new ZmqException;
710         }
711     }
712 
713     /// ditto
714     Tuple!(size_t, bool) tryReceive(ref Frame msg)
715     {
716         immutable len = trusted!zmq_msg_recv(msg.handle, m_socket, ZMQ_DONTWAIT);
717         if (len >= 0) {
718             import std.conv;
719             return typeof(return)(to!size_t(len), true);
720         } else {
721             import core.stdc.errno;
722             if (errno == EAGAIN) {
723                 return typeof(return)(0, false);
724             } else {
725                 throw new ZmqException;
726             }
727         }
728     }
729 
730     ///
731     unittest
732     {
733         // Sender
734         auto snd = Socket(SocketType.req);
735         snd.connect("inproc://zmqd_msg_receive_example");
736         snd.send("Hello World!");
737 
738         // Receiver
739         import std.string: representation;
740         auto rcv = Socket(SocketType.rep);
741         rcv.bind("inproc://zmqd_msg_receive_example");
742         auto msg = Frame();
743         rcv.receive(msg);
744         assert (msg.data.asString() == "Hello World!");
745     }
746 
747     @system unittest
748     {
749         auto snd = Socket(SocketType.pair);
750         snd.bind("inproc://zmqd_msg_tryReceive_example");
751         auto rcv = Socket(SocketType.pair);
752         rcv.connect("inproc://zmqd_msg_tryReceive_example");
753 
754         auto msg = Frame();
755         auto r1 = rcv.tryReceive(msg);
756         assert (!r1[1]);
757 
758         import core.thread, core.time, std.string;
759         snd.send("Hello World!");
760         Thread.sleep(100.msecs); // Wait for message to be transferred...
761         auto r2 = rcv.tryReceive(msg);
762         assert (r2[1] && msg.data[0 .. r2[0]] == "Hello World!".representation);
763     }
764 
765     /**
766     Whether there are _more message frames to follow.
767 
768     Throws:
769         $(REF ZmqException) if $(ZMQ) reports an error.
770     Corresponds_to:
771         $(ZMQREF zmq_getsockopt()) with $(D ZMQ_RCVMORE).
772     */
773     @property bool more() { return !!getOption!int(ZMQ_RCVMORE); }
774 
775     // TODO: Better unittest/example
776     unittest
777     {
778         auto sck = Socket(SocketType.req);
779         assert (!sck.more);
780     }
781 
782     /** $(ANCHOR Socket.misc_options)
783     Miscellaneous socket options.
784 
785     Each of these corresponds to an option passed to
786     $(ZMQREF zmq_getsockopt()) and $(ZMQREF zmq_setsockopt()). For
787     example, $(D identity) corresponds to $(D ZMQ_IDENTITY),
788     $(D receiveBufferSize) corresponds to $(D ZMQ_RCVBUF), etc.
789 
790     Notes:
791     $(UL
792         $(LI For convenience, the setter for the $(D identity) property
793             accepts strings.  To retrieve a string with the getter, use
794             the $(FREF asString) function.
795             ---
796             sck.identity = "foobar";
797             assert (sck.identity.asString() == "foobar");
798             ---
799             )
800         $(LI The $(D linger), $(D receiveTimeout), $(D sendTimeout) and
801             $(D handshakeInterval) properties may have the special _value
802             $(REF infiniteDuration).  This  is translated to an option
803             _value of -1 or 0 (depending on which property is being set)
804             in the C API.)
805         $(LI Some options have array _type, and these allow the user to supply
806             a buffer in which to store the _value, to avoid a GC allocation.
807             The return _value is then a slice of this buffer.
808             These are not marked as $(D @property), but are prefixed with
809             "get" (e.g. $(D getIdentity())).  A user-supplied buffer is
810             $(EM required) for some options, namely $(D getPlainUsername())
811             and $(D getPlainPassword()), and these do not have $(D @property)
812             versions.  $(D getCurveXxxKey()) and $(D getCurveXxxKeyZ85())
813             require buffers which are at least 32 and 41 bytes long,
814             respectively.)
815         $(LI The $(D ZMQ_SUBSCRIBE) and $(D ZMQ_UNSUBSCRIBE) options are
816             treated differently from the others; see $(FREF Socket.subscribe)
817             and $(FREF Socket.unsubscribe))
818     )
819 
820     Throws:
821         $(REF ZmqException) if $(ZMQ) reports an error.$(BR)
822         $(STDREF conv,ConvOverflowException) if a given $(D Duration) is
823             longer than the number of milliseconds that will fit in an $(D int)
824             (only applies to properties of $(COREF time,Duration) _type).$(BR)
825         $(COREF exception,RangeError) if the $(D dest) buffers passed to
826             $(D getCurveXxxKey()) or $(D getCurveXxxKeyZ85()) are less than
827             32 or 41 bytes long, respectively.
828     Corresponds_to:
829         $(ZMQREF zmq_getsockopt()) and $(ZMQREF zmq_setsockopt()).
830     */
831     @property ulong threadAffinity() { return getOption!ulong(ZMQ_AFFINITY); }
832     /// ditto
833     @property void threadAffinity(ulong value) { setOption(ZMQ_AFFINITY, value); }
834 
835     /// ditto
836     @property ubyte[] identity() { return getIdentity(new ubyte[255]); }
837     /// ditto
838     ubyte[] getIdentity(ubyte[] dest) { return getArrayOption(ZMQ_IDENTITY, dest); }
839     /// ditto
840     @property void identity(const ubyte[] value) { setArrayOption(ZMQ_IDENTITY, value); }
841     /// ditto
842     @property void identity(const char[] value) { setArrayOption(ZMQ_IDENTITY, value); }
843 
844     /// ditto
845     @property int rate() { return getOption!int(ZMQ_RATE); }
846     /// ditto
847     @property void rate(int value) { setOption(ZMQ_RATE, value); }
848 
849     /// ditto
850     @property Duration recoveryInterval()
851     {
852         return msecs(getOption!int(ZMQ_RECOVERY_IVL));
853     }
854     /// ditto
855     @property void recoveryInterval(Duration value)
856     {
857         import std.conv: to;
858         setOption(ZMQ_RECOVERY_IVL, to!int(value.total!"msecs"()));
859     }
860 
861     /// ditto
862     @property int sendBufferSize() { return getOption!int(ZMQ_SNDBUF); }
863     /// ditto
864     @property void sendBufferSize(int value) { setOption(ZMQ_SNDBUF, value); }
865 
866     /// ditto
867     @property int receiveBufferSize() { return getOption!int(ZMQ_RCVBUF); }
868     /// ditto
869     @property void receiveBufferSize(int value) { setOption(ZMQ_RCVBUF, value); }
870 
871     /// ditto
872     @property FD fd() { return getOption!FD(ZMQ_FD); }
873 
874     /// ditto
875     @property PollFlags events() { return getOption!PollFlags(ZMQ_EVENTS); }
876 
877     /// ditto
878     @property SocketType type() { return getOption!SocketType(ZMQ_TYPE); }
879 
880     /// ditto
881     @property Duration linger()
882     {
883         const auto value = getOption!int(ZMQ_LINGER);
884         return value == -1 ? infiniteDuration : value.msecs;
885     }
886     /// ditto
887     @property void linger(Duration value)
888     {
889         import std.conv: to;
890         setOption(
891             ZMQ_LINGER,
892             value == infiniteDuration ? -1 : to!int(value.total!"msecs"()));
893     }
894 
895     /// ditto
896     @property Duration reconnectionInterval()
897     {
898         return getOption!int(ZMQ_RECONNECT_IVL).msecs;
899     }
900     /// ditto
901     @property void reconnectionInterval(Duration value)
902     {
903         import std.conv: to;
904         setOption(ZMQ_RECONNECT_IVL, to!int(value.total!"msecs"()));
905     }
906 
907     /// ditto
908     @property int backlog() { return getOption!int(ZMQ_BACKLOG); }
909     /// ditto
910     @property void backlog(int value) { setOption(ZMQ_BACKLOG, value); }
911 
912     /// ditto
913     @property Duration maxReconnectionInterval()
914     {
915         return getOption!int(ZMQ_RECONNECT_IVL_MAX).msecs;
916     }
917     /// ditto
918     @property void maxReconnectionInterval(Duration value)
919     {
920         import std.conv: to;
921         setOption(ZMQ_RECONNECT_IVL_MAX, to!int(value.total!"msecs"()));
922     }
923 
924     /// ditto
925     @property long maxMsgSize() { return getOption!long(ZMQ_MAXMSGSIZE); }
926     /// ditto
927     @property void maxMsgSize(long value) { setOption(ZMQ_MAXMSGSIZE, value); }
928 
929     /// ditto
930     @property int sendHWM() { return getOption!int(ZMQ_SNDHWM); }
931     /// ditto
932     @property void sendHWM(int value) { setOption(ZMQ_SNDHWM, value); }
933 
934     /// ditto
935     @property int receiveHWM() { return getOption!int(ZMQ_RCVHWM); }
936     /// ditto
937     @property void receiveHWM(int value) { setOption(ZMQ_RCVHWM, value); }
938 
939     /// ditto
940     @property int multicastHops() { return getOption!int(ZMQ_MULTICAST_HOPS); }
941     /// ditto
942     @property void multicastHops(int value) { setOption(ZMQ_MULTICAST_HOPS, value); }
943 
944     /// ditto
945     @property Duration receiveTimeout()
946     {
947         const value = getOption!int(ZMQ_RCVTIMEO);
948         return value == -1 ? infiniteDuration : value.msecs;
949     }
950     /// ditto
951     @property void receiveTimeout(Duration value)
952     {
953         import std.conv: to;
954         setOption(
955             ZMQ_RCVTIMEO,
956             value == infiniteDuration ? -1 : to!int(value.total!"msecs"()));
957     }
958 
959     /// ditto
960     @property Duration sendTimeout()
961     {
962         const value = getOption!int(ZMQ_SNDTIMEO);
963         return value == -1 ? infiniteDuration : value.msecs;
964     }
965     /// ditto
966     @property void sendTimeout(Duration value)
967     {
968         import std.conv: to;
969         setOption(
970             ZMQ_SNDTIMEO,
971             value == infiniteDuration ? -1 : to!int(value.total!"msecs"()));
972     }
973 
974     /// ditto
975     @property char[] lastEndpoint() @trusted
976     {
977         // This function is not @safe because it calls a @system function
978         // (zmq_getsockopt) and takes the address of a local (len).
979         auto buf = new char[1024];
980         size_t len = buf.length;
981         if (zmq_getsockopt(m_socket, ZMQ_LAST_ENDPOINT, buf.ptr, &len) != 0) {
982             throw new ZmqException;
983         }
984         return buf[0 .. len-1];
985     }
986 
987     /// ditto
988     @property void routerMandatory(bool value) { setOption(ZMQ_ROUTER_MANDATORY, value ? 1 : 0); }
989 
990     /// ditto
991     @property int tcpKeepalive() { return getOption!int(ZMQ_TCP_KEEPALIVE); }
992     /// ditto
993     @property void tcpKeepalive(int value) { setOption(ZMQ_TCP_KEEPALIVE, value); }
994 
995     /// ditto
996     @property int tcpKeepaliveCnt() { return getOption!int(ZMQ_TCP_KEEPALIVE_CNT); }
997     /// ditto
998     @property void tcpKeepaliveCnt(int value) { setOption(ZMQ_TCP_KEEPALIVE_CNT, value); }
999 
1000     /// ditto
1001     @property int tcpKeepaliveIdle() { return getOption!int(ZMQ_TCP_KEEPALIVE_IDLE); }
1002     /// ditto
1003     @property void tcpKeepaliveIdle(int value) { setOption(ZMQ_TCP_KEEPALIVE_IDLE, value); }
1004 
1005     /// ditto
1006     @property int tcpKeepaliveIntvl() { return getOption!int(ZMQ_TCP_KEEPALIVE_INTVL); }
1007     /// ditto
1008     @property void tcpKeepaliveIntvl(int value) { setOption(ZMQ_TCP_KEEPALIVE_INTVL, value); }
1009 
1010     /// ditto
1011     @property bool immediate() { return !!getOption!int(ZMQ_IMMEDIATE); }
1012     /// ditto
1013     @property void immediate(bool value) { setOption!int(ZMQ_IMMEDIATE, value ? 1 : 0); }
1014 
1015     /// ditto
1016     @property void xpubVerbose(bool value) { setOption(ZMQ_XPUB_VERBOSE, value ? 1 : 0); }
1017 
1018     /// ditto
1019     @property bool ipv6() { return !!getOption!int(ZMQ_IPV6); }
1020     /// ditto
1021     @property void ipv6(bool value) { setOption(ZMQ_IPV6, value ? 1 : 0); }
1022 
1023     /// ditto
1024     @property Security mechanism()
1025     {
1026         import std.conv;
1027         return to!Security(getOption!int(ZMQ_MECHANISM));
1028     }
1029 
1030     /// ditto
1031     @property bool plainServer() { return !!getOption!int(ZMQ_PLAIN_SERVER); }
1032     /// ditto
1033     @property void plainServer(bool value) { setOption(ZMQ_PLAIN_SERVER, value ? 1 : 0); }
1034 
1035     /// ditto
1036     char[] getPlainUsername(char[] dest)
1037     {
1038         return getCStringOption(ZMQ_PLAIN_USERNAME, dest);
1039     }
1040     /// ditto
1041     @property void plainUsername(const(char)[] value)
1042     {
1043         setArrayOption(ZMQ_PLAIN_USERNAME, value);
1044     }
1045 
1046     /// ditto
1047     char[] getPlainPassword(char[] dest)
1048     {
1049         return getCStringOption(ZMQ_PLAIN_PASSWORD, dest);
1050     }
1051     /// ditto
1052     @property void plainPassword(const(char)[] value)
1053     {
1054         setArrayOption(ZMQ_PLAIN_PASSWORD, value);
1055     }
1056 
1057     /// ditto
1058     @property bool curveServer() { return !!getOption!int(ZMQ_CURVE_SERVER); }
1059     /// ditto
1060     @property void curveServer(bool value) { setOption(ZMQ_CURVE_SERVER, value ? 1 : 0); }
1061 
1062     /// ditto
1063     @property ubyte[] curvePublicKey()
1064     {
1065         return getCurvePublicKey(new ubyte[keyBufSizeBin]);
1066     }
1067     /// ditto
1068     ubyte[] getCurvePublicKey(ubyte[] dest)
1069     {
1070         return getCurveKey(ZMQ_CURVE_PUBLICKEY, dest);
1071     }
1072     /// ditto
1073     @property char[] curvePublicKeyZ85()
1074     {
1075         return getCurvePublicKeyZ85(new char[keyBufSizeZ85]);
1076     }
1077     /// ditto
1078     char[] getCurvePublicKeyZ85(char[] dest)
1079     {
1080         return getCurveKeyZ85(ZMQ_CURVE_PUBLICKEY, dest);
1081     }
1082     /// ditto
1083     @property void curvePublicKey(const(ubyte)[] value)
1084     {
1085         setCurveKey(ZMQ_CURVE_PUBLICKEY, value);
1086     }
1087     /// ditto
1088     @property void curvePublicKeyZ85(const(char)[] value)
1089     {
1090         setCurveKeyZ85(ZMQ_CURVE_PUBLICKEY, value);
1091     }
1092 
1093     /// ditto
1094     @property ubyte[] curveSecretKey()
1095     {
1096         return getCurveSecretKey(new ubyte[keyBufSizeBin]);
1097     }
1098     /// ditto
1099     ubyte[] getCurveSecretKey(ubyte[] dest)
1100     {
1101         return getCurveKey(ZMQ_CURVE_SECRETKEY, dest);
1102     }
1103     /// ditto
1104     @property char[] curveSecretKeyZ85()
1105     {
1106         return getCurveSecretKeyZ85(new char[keyBufSizeZ85]);
1107     }
1108     /// ditto
1109     char[] getCurveSecretKeyZ85(char[] dest)
1110     {
1111         return getCurveKeyZ85(ZMQ_CURVE_SECRETKEY, dest);
1112     }
1113     /// ditto
1114     @property void curveSecretKey(const(ubyte)[] value)
1115     {
1116         setCurveKey(ZMQ_CURVE_SECRETKEY, value);
1117     }
1118     /// ditto
1119     @property void curveSecretKeyZ85(const(char)[] value)
1120     {
1121         setCurveKeyZ85(ZMQ_CURVE_SECRETKEY, value);
1122     }
1123 
1124     /// ditto
1125     @property ubyte[] curveServerKey()
1126     {
1127         return getCurveServerKey(new ubyte[keyBufSizeBin]);
1128     }
1129     /// ditto
1130     ubyte[] getCurveServerKey(ubyte[] dest)
1131     {
1132         return getCurveKey(ZMQ_CURVE_SERVERKEY, dest);
1133     }
1134     /// ditto
1135     @property char[] curveServerKeyZ85()
1136     {
1137         return getCurveServerKeyZ85(new char[keyBufSizeZ85]);
1138     }
1139     /// ditto
1140     char[] getCurveServerKeyZ85(char[] dest)
1141     {
1142         return getCurveKeyZ85(ZMQ_CURVE_SERVERKEY, dest);
1143     }
1144     /// ditto
1145     @property void curveServerKey(const(ubyte)[] value)
1146     {
1147         setCurveKey(ZMQ_CURVE_SERVERKEY, value);
1148     }
1149     /// ditto
1150     @property void curveServerKeyZ85(const(char)[] value)
1151     {
1152         setCurveKeyZ85(ZMQ_CURVE_SERVERKEY, value);
1153     }
1154 
1155     /// ditto
1156     @property void probeRouter(bool value) { setOption(ZMQ_PROBE_ROUTER, value ? 1 : 0); }
1157 
1158     /// ditto
1159     @property void reqCorrelate(bool value) { setOption(ZMQ_REQ_CORRELATE, value ? 1 : 0); }
1160 
1161     /// ditto
1162     @property void reqRelaxed(bool value) { setOption(ZMQ_REQ_RELAXED, value ? 1 : 0); }
1163 
1164     /// ditto
1165     @property void conflate(bool value) { setOption(ZMQ_CONFLATE, value ? 1 : 0); }
1166 
1167     /// ditto
1168     @property char[] zapDomain() { return getZapDomain(new char[256]); }
1169     /// ditto
1170     char[] getZapDomain(char[] dest) { return getCStringOption(ZMQ_ZAP_DOMAIN, dest); }
1171     /// ditto
1172     @property void zapDomain(const char[] value) { setArrayOption(ZMQ_ZAP_DOMAIN, value); }
1173 
1174     static if (ZMQ_VERSION >= ZMQ41) {
1175         /// ditto
1176         @property void routerHandover(bool value) { setOption(ZMQ_ROUTER_HANDOVER, value ? 1 : 0); }
1177 
1178         /// ditto
1179         @property void connectionRID(const ubyte[] value) { setArrayOption(ZMQ_CONNECT_RID, value); }
1180 
1181         /// ditto
1182         @property int typeOfService() { return getOption!int(ZMQ_TOS); }
1183         /// ditto
1184         @property void typeOfService(int value) { setOption(ZMQ_TOS, value); }
1185 
1186         /// ditto
1187         @property bool gssapiPlaintext() { return !!getOption!int(ZMQ_GSSAPI_PLAINTEXT); }
1188         /// ditto
1189         @property void gssapiPlaintext(bool value) { setOption(ZMQ_GSSAPI_PLAINTEXT, value ? 1 : 0); }
1190 
1191         /// ditto
1192         @property char[] gssapiPrincipal() { return getGssapiPrincipal(new char[256]); }
1193         /// ditto
1194         char[] getGssapiPrincipal(char[] dest) { return getCStringOption(ZMQ_GSSAPI_PRINCIPAL, dest); }
1195         /// ditto
1196         @property void gssapiPrincipal(const char[] value) { setArrayOption(ZMQ_GSSAPI_PRINCIPAL, value); }
1197 
1198         /// ditto
1199         @property bool gssapiServer() { return !!getOption!int(ZMQ_GSSAPI_SERVER); }
1200         /// ditto
1201         @property void gssapiServer(bool value) { setOption(ZMQ_GSSAPI_SERVER, value ? 1 : 0); }
1202 
1203         /// ditto
1204         @property char[] gssapiServicePrincipal() { return getGssapiServicePrincipal(new char[256]); }
1205         /// ditto
1206         char[] getGssapiServicePrincipal(char[] dest) { return getCStringOption(ZMQ_GSSAPI_SERVICE_PRINCIPAL, dest); }
1207         /// ditto
1208         @property void gssapiServicePrincipal(const char[] value) { setArrayOption(ZMQ_GSSAPI_SERVICE_PRINCIPAL, value); }
1209 
1210         /// ditto
1211         @property Duration handshakeInterval()
1212         {
1213             const auto value = getOption!int(ZMQ_HANDSHAKE_IVL);
1214             return value == 0 ? infiniteDuration : msecs(value);
1215         }
1216         /// ditto
1217         @property void handshakeInterval(Duration value)
1218         {
1219             import std.conv: to;
1220             setOption(
1221                 ZMQ_HANDSHAKE_IVL,
1222                 value == infiniteDuration ? 0 : to!int(value.total!"msecs"()));
1223         }
1224     } // static if (ZMQ_VERSION >= ZMQ41)
1225 
1226     // deprecated options
1227     deprecated("Use the 'immediate' property instead")
1228     @property bool delayAttachOnConnect() { return !!getOption!int(ZMQ_DELAY_ATTACH_ON_CONNECT); }
1229 
1230     deprecated("Use the 'immediate' property instead")
1231     @property void delayAttachOnConnect(bool value) { setOption(ZMQ_DELAY_ATTACH_ON_CONNECT, value ? 1 : 0); }
1232 
1233     deprecated("Use !ipv6 instead")
1234     @property bool ipv4Only() { return !ipv6; }
1235 
1236     deprecated("Use ipv6 = !value instead")
1237     @property void ipv4Only(bool value) { ipv6 = !value; }
1238 
1239     unittest
1240     {
1241         // We test all the socket options by checking that they have their default value.
1242         auto s = Socket(SocketType.xpub);
1243         const e = "inproc://unittest2";
1244         s.bind(e);
1245         import core.time;
1246         assert(s.threadAffinity == 0);
1247         assert(s.identity == null);
1248         assert(s.rate == 100);
1249         assert(s.recoveryInterval == 10.seconds);
1250         assert(s.sendBufferSize == 0);
1251         assert(s.receiveBufferSize == 0);
1252         version(Posix) {
1253             assert(s.fd > 2); // 0, 1 and 2 are the standard streams
1254         }
1255         assert(s.events == PollFlags.pollOut);
1256         assert(s.type == SocketType.xpub);
1257         assert(s.linger == 0.hnsecs);
1258         assert(s.reconnectionInterval == 100.msecs);
1259         assert(s.backlog == 100);
1260         assert(s.maxReconnectionInterval == Duration.zero);
1261         assert(s.maxMsgSize == -1);
1262         assert(s.sendHWM == 1000);
1263         assert(s.receiveHWM == 1000);
1264         assert(s.multicastHops == 1);
1265         assert(s.receiveTimeout == infiniteDuration);
1266         assert(s.sendTimeout == infiniteDuration);
1267         assert(s.tcpKeepalive == -1);
1268         assert(s.tcpKeepaliveCnt == -1);
1269         assert(s.tcpKeepaliveIdle == -1);
1270         assert(s.tcpKeepaliveIntvl == -1);
1271         assert(!s.immediate);
1272         assert(!s.ipv6);
1273         assert(s.mechanism == Security.none);
1274         assert(!s.plainServer);
1275         assert(s.getPlainUsername(new char[8]).length == 0);
1276         assert(s.getPlainPassword(new char[8]).length == 0);
1277         debug (WithCurveTests) {
1278             assert(!s.curveServer);
1279         }
1280         assert(s.zapDomain.length == 0);
1281         static if (ZMQ_VERSION >= ZMQ41) {
1282             assert(s.typeOfService == 0);
1283             if (zmqHas("gssapi")) {
1284                 assert(!s.gssapiPlaintext);
1285                 assert(s.gssapiPrincipal.length == 0);
1286                 assert(!s.gssapiServer);
1287                 assert(s.gssapiServicePrincipal.length == 0);
1288             }
1289             assert(s.handshakeInterval == 30000.msecs);
1290         }
1291 
1292         // Test setters and getters together
1293         s.threadAffinity = 1;
1294         assert(s.threadAffinity == 1);
1295         s.identity = cast(ubyte[]) [ 65, 66, 67 ];
1296         assert(s.identity == [65, 66, 67]);
1297         s.identity = "foo";
1298         assert(s.identity == [102, 111, 111]);
1299         s.rate = 200;
1300         assert(s.rate == 200);
1301         s.recoveryInterval = 5.seconds;
1302         assert(s.recoveryInterval == 5_000.msecs);
1303         s.sendBufferSize = 500;
1304         assert(s.sendBufferSize == 500);
1305         s.receiveBufferSize = 600;
1306         assert(s.receiveBufferSize == 600);
1307         s.linger = Duration.zero;
1308         assert(s.linger == Duration.zero);
1309         s.linger = 100_000.usecs;
1310         assert(s.linger == 100.msecs);
1311         s.linger = infiniteDuration;
1312         assert(s.linger == infiniteDuration);
1313         s.reconnectionInterval = 200_000.usecs;
1314         assert(s.reconnectionInterval == 200.msecs);
1315         s.backlog = 50;
1316         assert(s.backlog == 50);
1317         s.maxReconnectionInterval = 300_000.usecs;
1318         assert(s.maxReconnectionInterval == 300.msecs);
1319         s.maxMsgSize = 1000;
1320         assert(s.maxMsgSize == 1000);
1321         s.sendHWM = 500;
1322         assert(s.sendHWM == 500);
1323         s.receiveHWM = 600;
1324         assert(s.receiveHWM == 600);
1325         s.multicastHops = 2;
1326         assert(s.multicastHops == 2);
1327         s.receiveTimeout = 3.seconds;
1328         assert(s.receiveTimeout == 3_000_000.usecs);
1329         s.receiveTimeout = infiniteDuration;
1330         assert(s.receiveTimeout == infiniteDuration);
1331         s.sendTimeout = 2_000_000.usecs;
1332         assert(s.sendTimeout == 2.seconds);
1333         s.sendTimeout = infiniteDuration;
1334         assert(s.sendTimeout == infiniteDuration);
1335         s.tcpKeepalive = 1;
1336         assert(s.tcpKeepalive == 1);
1337         s.tcpKeepaliveCnt = 1;
1338         assert(s.tcpKeepaliveCnt == 1);
1339         s.tcpKeepaliveIdle = 0;
1340         assert(s.tcpKeepaliveIdle == 0);
1341         s.tcpKeepaliveIntvl = 0;
1342         assert(s.tcpKeepaliveIntvl == 0);
1343         s.immediate = true;
1344         assert(s.immediate);
1345         s.ipv6 = true;
1346         assert(s.ipv6);
1347         s.plainServer = true;
1348         assert(s.mechanism == Security.plain);
1349         assert(s.plainServer);
1350         debug (WithCurveTests) {
1351             assert(!s.curveServer);
1352             s.curveServer = true;
1353             assert(s.mechanism == Security.curve);
1354             assert(!s.plainServer);
1355             assert(s.curveServer);
1356         }
1357         s.plainServer = false;
1358         assert(s.mechanism == Security.none);
1359         assert(!s.plainServer);
1360         debug (WithCurveTests) assert(!s.curveServer);
1361         s.plainUsername = "foobar";
1362         assert(s.getPlainUsername(new char[8]) == "foobar");
1363         assert(s.mechanism == Security.plain);
1364         s.plainUsername = null;
1365         assert(s.mechanism == Security.none);
1366         s.plainPassword = "xyz";
1367         assert(s.getPlainPassword(new char[8]) == "xyz");
1368         assert(s.mechanism == Security.plain);
1369         s.plainPassword = null;
1370         assert(s.mechanism == Security.none);
1371         s.zapDomain = "my_zap_domain";
1372         assert(s.zapDomain == "my_zap_domain");
1373         static if (ZMQ_VERSION >= ZMQ41) {
1374             s.typeOfService = 1;
1375             assert(s.typeOfService == 1);
1376             if (zmqHas("gssapi")) {
1377                 s.gssapiPlaintext = true;
1378                 assert(s.gssapiPlaintext);
1379                 s.gssapiPrincipal = "myPrincipal";
1380                 assert(s.gssapiPrincipal == "myPrincipal");
1381                 s.gssapiServer = true;
1382                 assert(s.gssapiServer);
1383                 s.gssapiServicePrincipal = "myServicePrincipal";
1384                 assert(s.gssapiServicePrincipal == "myServicePrincipal");
1385             }
1386             s.handshakeInterval = 60000.msecs;
1387             assert(s.handshakeInterval == 60000.msecs);
1388         }
1389 
1390         // Test write-only options
1391         s.conflate = true;
1392     }
1393 
1394     debug (WithCurveTests) @system unittest
1395     {
1396         // The CURVE key options require some special setup, so we test them
1397         // separately.
1398         import std.array, std.range;
1399         auto binKey1 = iota(cast(ubyte) 0, cast(ubyte) 32).array();
1400         auto z85Key1 = z85Encode(binKey1);
1401         auto binKey2 = iota(cast(ubyte) 32, cast(ubyte) 64).array();
1402         auto z85Key2 = z85Encode(binKey2);
1403         auto zeroKey = repeat(cast(ubyte) 0).take(32).array();
1404         assert (z85Key1 != z85Key2);
1405 
1406         auto s = Socket(SocketType.req);
1407         s.curvePublicKey = zeroKey;
1408         s.curveSecretKey = zeroKey;
1409         s.curveServerKey = zeroKey;
1410 
1411         s.curvePublicKey = binKey1;
1412         assert (s.curvePublicKey == binKey1);
1413         assert (s.curvePublicKeyZ85 == z85Key1);
1414         s.curvePublicKeyZ85 = z85Key2;
1415         assert (s.curvePublicKey == binKey2);
1416         assert (s.curvePublicKeyZ85 == z85Key2);
1417         assert (s.curveSecretKey == zeroKey);
1418         assert (s.curveServerKey == zeroKey);
1419         s.curvePublicKey = zeroKey;
1420 
1421         s.curveSecretKey = binKey1;
1422         assert (s.curveSecretKey == binKey1);
1423         assert (s.curveSecretKeyZ85 == z85Key1);
1424         s.curveSecretKeyZ85 = z85Key2;
1425         assert (s.curveSecretKey == binKey2);
1426         assert (s.curveSecretKeyZ85 == z85Key2);
1427         assert (s.curvePublicKey == zeroKey);
1428         assert (s.curveServerKey == zeroKey);
1429         s.curveSecretKey = zeroKey;
1430 
1431         s.curveServerKey = binKey1;
1432         assert (s.curveServerKey == binKey1);
1433         assert (s.curveServerKeyZ85 == z85Key1);
1434         s.curveServerKeyZ85 = z85Key2;
1435         assert (s.curveServerKey == binKey2);
1436         assert (s.curveServerKeyZ85 == z85Key2);
1437         assert (s.curvePublicKey == zeroKey);
1438         assert (s.curveSecretKey == zeroKey);
1439         s.curveServerKey = zeroKey;
1440     }
1441 
1442     unittest
1443     {
1444         // Some options are only applicable to specific socket types.
1445         auto rt = Socket(SocketType.router);
1446         rt.routerMandatory = true;
1447         rt.probeRouter = true;
1448         static if (ZMQ_VERSION >= ZMQ41) {
1449             rt.routerHandover = true;
1450             rt.connectionRID = cast(ubyte[]) [ 10, 20, 30 ];
1451         }
1452         auto xp = Socket(SocketType.xpub);
1453         xp.xpubVerbose = true;
1454         auto rq = Socket(SocketType.req);
1455         rq.reqCorrelate = true;
1456         rq.reqRelaxed = true;
1457     }
1458 
1459     deprecated unittest
1460     {
1461         // Test deprecated socket options
1462         auto s = Socket(SocketType.req);
1463         assert(s.ipv4Only);
1464         assert(!s.delayAttachOnConnect);
1465 
1466         // Test setters and getters together
1467         s.ipv4Only = false;
1468         assert(!s.ipv4Only);
1469         s.delayAttachOnConnect = true;
1470         assert(s.delayAttachOnConnect);
1471     }
1472 
1473     /**
1474     Establishes a message filter.
1475 
1476     Throws:
1477         $(REF ZmqException) if $(ZMQ) reports an error.
1478     Corresponds_to:
1479         $(ZMQREF zmq_msg_setsockopt()) with $(D ZMQ_SUBSCRIBE).
1480     */
1481     void subscribe(const ubyte[] filterPrefix)
1482     {
1483         setArrayOption(ZMQ_SUBSCRIBE, filterPrefix);
1484     }
1485     /// ditto
1486     void subscribe(const  char[] filterPrefix)
1487     {
1488         setArrayOption(ZMQ_SUBSCRIBE, filterPrefix);
1489     }
1490 
1491     ///
1492     unittest
1493     {
1494         // Create a subscriber that accepts all messages that start with
1495         // the prefixes "foo" or "bar".
1496         auto sck = Socket(SocketType.sub);
1497         sck.subscribe("foo");
1498         sck.subscribe("bar");
1499     }
1500 
1501     @system unittest
1502     {
1503         void sleep(int ms) {
1504             import core.thread, core.time;
1505             Thread.sleep(dur!"msecs"(ms));
1506         }
1507         auto pub = Socket(SocketType.pub);
1508         pub.bind("inproc://zmqd_subscribe_unittest");
1509         auto sub = Socket(SocketType.sub);
1510         sub.connect("inproc://zmqd_subscribe_unittest");
1511 
1512         pub.send("Hello");
1513         sleep(100);
1514         sub.subscribe("He");
1515         sub.subscribe(cast(ubyte[])['W', 'o']);
1516         sleep(100);
1517         pub.send("Heeee");
1518         pub.send("World");
1519         sleep(100);
1520         ubyte[5] buf;
1521         sub.receive(buf);
1522         assert(buf.asString() == "Heeee");
1523         sub.receive(buf);
1524         assert(buf.asString() == "World");
1525     }
1526 
1527     /**
1528     Removes a message filter.
1529 
1530     Throws:
1531         $(REF ZmqException) if $(ZMQ) reports an error.
1532     Corresponds_to:
1533         $(ZMQREF zmq_msg_setsockopt()) with $(D ZMQ_SUBSCRIBE).
1534     */
1535     void unsubscribe(const ubyte[] filterPrefix)
1536     {
1537         setArrayOption(ZMQ_UNSUBSCRIBE, filterPrefix);
1538     }
1539     /// ditto
1540     void unsubscribe(const char[] filterPrefix)
1541     {
1542         setArrayOption(ZMQ_UNSUBSCRIBE, filterPrefix);
1543     }
1544 
1545     ///
1546     unittest
1547     {
1548         // Subscribe to messages that start with "foo" or "bar".
1549         auto sck = Socket(SocketType.sub);
1550         sck.subscribe("foo");
1551         sck.subscribe("bar");
1552         // ...
1553         // From now on, only accept messages that start with "bar"
1554         sck.unsubscribe("foo");
1555     }
1556 
1557     /**
1558     Spawns a PAIR socket that publishes socket state changes (_events) over
1559     the INPROC transport to the given _endpoint.
1560 
1561     Which event types should be published may be selected by bitwise-ORing
1562     together different $(REF EventType) flags in the $(D events) parameter.
1563 
1564     Throws:
1565         $(REF ZmqException) if $(ZMQ) reports an error.
1566     Corresponds_to:
1567         $(ZMQREF zmq_socket_monitor())
1568     See_also:
1569         $(FREF receiveEvent), which receives and parses event messages.
1570     */
1571     void monitor(const char[] endpoint, EventType events = EventType.all)
1572     {
1573         if (trusted!zmq_socket_monitor(m_socket, zeroTermString(endpoint), events) < 0) {
1574             throw new ZmqException;
1575         }
1576     }
1577 
1578     ///
1579     unittest
1580     {
1581         auto sck = Socket(SocketType.pub);
1582         sck.monitor("inproc://zmqd_monitor_unittest",
1583                     EventType.accepted | EventType.closed);
1584     }
1585 
1586     /**
1587     The $(D void*) pointer used by the underlying C API to refer to the socket.
1588 
1589     If the object has not been initialized, this function returns $(D null).
1590     */
1591     @property inout(void)* handle() inout pure nothrow
1592     {
1593         return m_socket;
1594     }
1595 
1596     /**
1597     Whether this $(REF Socket) object has been _initialized, i.e. whether it
1598     refers to a valid $(ZMQ) socket.
1599     */
1600     @property bool initialized() const pure nothrow
1601     {
1602         return m_socket != null;
1603     }
1604 
1605     ///
1606     unittest
1607     {
1608         Socket sck;
1609         assert (!sck.initialized);
1610         sck = Socket(SocketType.sub);
1611         assert (sck.initialized);
1612         sck.close();
1613         assert (!sck.initialized);
1614     }
1615 
1616 private:
1617     // Helper function for ~this() and close()
1618     bool nothrowClose() nothrow
1619     {
1620         if (m_socket != null) {
1621             if (trusted!zmq_close(m_socket) != 0) return false;
1622             m_socket = null;
1623         }
1624         return true;
1625     }
1626 
1627     import std.traits;
1628 
1629     T getOption(T)(int option) @trusted
1630         if (isScalarType!T)
1631     {
1632         T buf;
1633         auto len = T.sizeof;
1634         if (zmq_getsockopt(m_socket, option, &buf, &len) != 0) {
1635             throw new ZmqException;
1636         }
1637         assert(len == T.sizeof);
1638         return buf;
1639     }
1640 
1641     void setOption(T)(int option, T value) @trusted
1642         if (isScalarType!T)
1643     {
1644         if (zmq_setsockopt(m_socket, option, &value, value.sizeof) != 0) {
1645             throw new ZmqException;
1646         }
1647     }
1648 
1649     T[] getArrayOption(T)(int option, T[] buf) @trusted
1650         if (isScalarType!T)
1651     {
1652         static assert (T.sizeof == 1);
1653         auto len = buf.length;
1654         if (zmq_getsockopt(m_socket, option, buf.ptr, &len) != 0) {
1655             throw new ZmqException;
1656         }
1657         return buf[0 .. len];
1658     }
1659 
1660     void setArrayOption()(int option, const void[] value)
1661     {
1662         if (trusted!zmq_setsockopt(m_socket, option, value.ptr, value.length) != 0) {
1663             throw new ZmqException;
1664         }
1665     }
1666 
1667     char[] getCStringOption(int option, char[] buf)
1668     {
1669         auto ret = getArrayOption(option, buf);
1670         assert (ret.length && ret[$-1] == '\0');
1671         return ret[0 .. $-1];
1672     }
1673 
1674     enum : size_t
1675     {
1676         keySizeBin    = 32,
1677         keyBufSizeBin = keySizeBin,
1678         keySizeZ85    = 40,
1679         keyBufSizeZ85 = keySizeZ85 + 1,
1680     }
1681 
1682     ubyte[] getCurveKey(int option, ubyte[] buf)
1683     {
1684         if (buf.length < keyBufSizeBin) {
1685             import core.exception: RangeError;
1686             throw new RangeError;
1687         }
1688         return getArrayOption(option, buf[0 .. keyBufSizeBin]);
1689     }
1690 
1691     char[] getCurveKeyZ85(int option, char[] buf)
1692     {
1693         if (buf.length < keyBufSizeZ85) {
1694             import core.exception: RangeError;
1695             throw new RangeError;
1696         }
1697         return getCStringOption(option, buf[0 .. keyBufSizeZ85]);
1698     }
1699 
1700     void setCurveKey(int option, const ubyte[] value)
1701     {
1702         if (value.length != keySizeBin) throw new Exception("Invalid key size");
1703         setArrayOption(option, value);
1704     }
1705 
1706     void setCurveKeyZ85(int option, const char[] value)
1707     {
1708         if (value.length != keySizeZ85) throw new Exception("Invalid key size");
1709         setArrayOption(option, value);
1710     }
1711 
1712     Context m_context;
1713     SocketType m_type;
1714     void* m_socket;
1715 }
1716 
1717 unittest
1718 {
1719     auto s1 = Socket(SocketType.pair);
1720     auto s2 = Socket(SocketType.pair);
1721     s1.bind("inproc://unittest");
1722     s2.connect("inproc://unittest");
1723     s1.send("Hello World!");
1724     ubyte[12] buf;
1725     const len = s2.receive(buf[]);
1726     assert (len == 12);
1727     assert (buf == "Hello World!");
1728 }
1729 
1730 
1731 version (Windows) {
1732     alias PlatformFD = SOCKET;
1733 } else version (Posix) {
1734     alias PlatformFD = int;
1735 }
1736 
1737 /**
1738 The native socket file descriptor type.
1739 
1740 This is an alias for $(D SOCKET) on Windows and $(D int) on POSIX systems.
1741 */
1742 alias FD = PlatformFD;
1743 
1744 
1745 /**
1746 Starts the built-in $(ZMQ) _proxy.
1747 
1748 This function never returns normally, but it may throw an exception.  This could
1749 happen if the context associated with either of the specified sockets is
1750 manually destroyed in a different thread.
1751 
1752 Throws:
1753     $(REF ZmqException) if $(ZMQ) reports an error.
1754 Corresponds_to:
1755     $(ZMQREF zmq_proxy())
1756 See_Also:
1757     $(FREF steerableProxy)
1758 */
1759 void proxy(ref Socket frontend, ref Socket backend)
1760 {
1761     const rc = trusted!zmq_proxy(frontend.handle, backend.handle, null);
1762     assert (rc == -1);
1763     throw new ZmqException;
1764 }
1765 
1766 /// ditto
1767 void proxy(ref Socket frontend, ref Socket backend, ref Socket capture)
1768 {
1769     const rc = trusted!zmq_proxy(frontend.handle, backend.handle, capture.handle);
1770     assert (rc == -1);
1771     throw new ZmqException;
1772 }
1773 
1774 
1775 static if (ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 0, 5)) {
1776 
1777 /**
1778 Starts the built-in $(ZMQ) proxy with _control flow.
1779 
1780 Note that the order of the two last parameters is reversed compared to
1781 $(ZMQREF zmq_proxy_steerable()).  That is, the $(D control) socket always
1782 comes before the $(D capture) socket.  Furthermore, unlike in $(ZMQ),
1783 $(D control) is mandatory.  (Without the _control socket one can simply
1784 use $(FREF proxy).)
1785 
1786 Versions:
1787     This function is only available when using the bindings for ZeroMQ
1788     4.0.5 or later.
1789 Throws:
1790     $(REF ZmqException) if $(ZMQ) reports an error.
1791 Corresponds_to:
1792     $(ZMQREF zmq_proxy_steerable())
1793 See_Also:
1794     $(FREF proxy)
1795 */
1796 void steerableProxy(ref Socket frontend, ref Socket backend, ref Socket control)
1797 {
1798     const rc = trusted!zmq_proxy_steerable(
1799         frontend.handle, backend.handle, null, control.handle);
1800     if (rc == -1) throw new ZmqException;
1801 }
1802 
1803 /// ditto
1804 void steerableProxy(ref Socket frontend, ref Socket backend, ref Socket control, ref Socket capture)
1805 {
1806     const rc = trusted!zmq_proxy_steerable(
1807         frontend.handle, backend.handle, capture.handle, control.handle);
1808     if (rc == -1) throw new ZmqException;
1809 }
1810 
1811 @system unittest
1812 {
1813     import core.thread;
1814     auto t = new Thread(() {
1815         auto frontend = Socket(SocketType.router);
1816         frontend.bind("inproc://zmqd_steerableProxy_unittest_fe");
1817         auto backend  = Socket(SocketType.dealer);
1818         backend.bind("inproc://zmqd_steerableProxy_unittest_be");
1819         auto controllee = Socket(SocketType.pair);
1820         controllee.bind("inproc://zmqd_steerableProxy_unittest_ctl");
1821         steerableProxy(frontend, backend, controllee);
1822     });
1823     t.start();
1824     auto client = Socket(SocketType.req);
1825     client.connect("inproc://zmqd_steerableProxy_unittest_fe");
1826     auto server  = Socket(SocketType.rep);
1827     server.connect("inproc://zmqd_steerableProxy_unittest_be");
1828     auto controller = Socket(SocketType.pair);
1829     controller.connect("inproc://zmqd_steerableProxy_unittest_ctl");
1830 
1831     auto cf = Frame(1);
1832     cf.data[0] = 86;
1833     client.send(cf);
1834     auto sf = Frame();
1835     server.receive(sf);
1836     assert(sf.size == 1 && sf.data[0] == 86);
1837     sf.data[0] = 87;
1838     server.send(sf);
1839     client.receive(cf);
1840     assert(cf.size == 1 && cf.data[0] == 87);
1841 
1842     controller.send("TERMINATE");
1843     t.join();
1844 }
1845 
1846 @system unittest
1847 {
1848     import core.thread;
1849     auto t = new Thread(() {
1850         auto frontend = Socket(SocketType.pull);
1851         frontend.bind("inproc://zmqd_steerableProxy2_unittest_fe");
1852         auto backend  = Socket(SocketType.push);
1853         backend.bind("inproc://zmqd_steerableProxy2_unittest_be");
1854         auto controllee = Socket(SocketType.pair);
1855         controllee.bind("inproc://zmqd_steerableProxy2_unittest_ctl");
1856         auto capture = Socket(SocketType.push);
1857         capture.bind("inproc://zmqd_steerableProxy2_unittest_cpt");
1858         steerableProxy(frontend, backend, controllee, capture);
1859     });
1860     t.start();
1861     auto client = Socket(SocketType.push);
1862     client.connect("inproc://zmqd_steerableProxy2_unittest_fe");
1863     auto server  = Socket(SocketType.pull);
1864     server.connect("inproc://zmqd_steerableProxy2_unittest_be");
1865     auto controller = Socket(SocketType.pair);
1866     controller.connect("inproc://zmqd_steerableProxy2_unittest_ctl");
1867     auto capturer = Socket(SocketType.pull);
1868     capturer.connect("inproc://zmqd_steerableProxy2_unittest_cpt");
1869 
1870     auto cf = Frame(1);
1871     cf.data[0] = 86;
1872     client.send(cf);
1873     auto sf = Frame();
1874     server.receive(sf);
1875     assert(sf.size == 1 && sf.data[0] == 86);
1876     auto pf = Frame();
1877     capturer.receive(pf);
1878     assert(pf.size == 1 && pf.data[0] == 86);
1879 
1880     controller.send("TERMINATE");
1881     t.join();
1882 }
1883 
1884 } // end static if (version >= 4.0.5)
1885 
1886 
1887 deprecated("zmqd.poll() has a new signature as of v0.4")
1888 uint poll(zmq_pollitem_t[] items, Duration timeout = infiniteDuration)
1889 {
1890     import std.conv: to;
1891     const n = trusted!zmq_poll(
1892         items.ptr,
1893         to!int(items.length),
1894         timeout == infiniteDuration ? -1 : to!int(timeout.total!"msecs"()));
1895     if (n < 0) throw new ZmqException;
1896     return cast(uint) n;
1897 }
1898 
1899 
1900 /**
1901 Input/output multiplexing.
1902 
1903 The $(D timeout) parameter may have the special value $(REF infiniteDuration)
1904 which means no _timeout.  This is translated to an argument value of -1 in the
1905 C API.
1906 
1907 Returns:
1908     The number of $(REF PollItem) structures with events signalled in
1909     $(REF PollItem.returnedEvents), or 0 if no events have been signalled.
1910 Throws:
1911     $(REF ZmqException) if $(ZMQ) reports an error.
1912 Corresponds_to:
1913     $(ZMQREF zmq_poll())
1914 */
1915 uint poll(PollItem[] items, Duration timeout = infiniteDuration) @trusted
1916 {
1917     // Here we use a trick where we pretend the array of PollItems is
1918     // actually an array of zmq_pollitem_t, to avoid an unnecessary
1919     // allocation.  For this to work, PollItem must have the exact
1920     // same size as zmq_pollitem_t.
1921     static assert (PollItem.sizeof == zmq_pollitem_t.sizeof);
1922 
1923     import std.conv: to;
1924     const n = zmq_poll(
1925         cast(zmq_pollitem_t*) items.ptr,
1926         to!int(items.length),
1927         timeout == infiniteDuration ? -1 : to!int(timeout.total!"msecs"()));
1928     if (n < 0) throw new ZmqException;
1929     return cast(uint) n;
1930 }
1931 
1932 
1933 ///
1934 @system unittest
1935 {
1936     auto socket1 = zmqd.Socket(zmqd.SocketType.pull);
1937     socket1.bind("inproc://zmqd_poll_example");
1938 
1939     import std.socket;
1940     auto socket2 = new std.socket.Socket(
1941         AddressFamily.INET,
1942         std.socket.SocketType.DGRAM);
1943     socket2.bind(new InternetAddress(InternetAddress.ADDR_ANY, 5678));
1944 
1945     auto socket3 = zmqd.Socket(zmqd.SocketType.push);
1946     socket3.connect("inproc://zmqd_poll_example");
1947     socket3.send("test");
1948 
1949     import core.thread: Thread;
1950     Thread.sleep(10.msecs);
1951 
1952     auto items = [
1953         PollItem(socket1, PollFlags.pollIn),
1954         PollItem(socket2, PollFlags.pollIn | PollFlags.pollOut),
1955         PollItem(socket3, PollFlags.pollIn),
1956     ];
1957 
1958     const n = poll(items, 100.msecs);
1959     assert (n == 2);
1960     assert (items[0].returnedEvents == PollFlags.pollIn);
1961     assert (items[1].returnedEvents == PollFlags.pollOut);
1962     assert (items[2].returnedEvents == 0);
1963     socket2.close();
1964 }
1965 
1966 
1967 /**
1968 $(FREF poll) event flags.
1969 
1970 These are described in the $(ZMQREF zmq_poll()) manual.
1971 */
1972 enum PollFlags
1973 {
1974     pollIn = ZMQ_POLLIN,    /// Corresponds to $(D ZMQ_POLLIN)
1975     pollOut = ZMQ_POLLOUT,  /// Corresponds to $(D ZMQ_POLLOUT)
1976     pollErr = ZMQ_POLLERR,  /// Corresponds to $(D ZMQ_POLLERR)
1977 }
1978 
1979 
1980 /++
1981 A structure that specifies a socket to be monitored by $(FREF poll) as well
1982 as the events to poll for, and, when $(FREF poll) returns, the events that
1983 occurred.
1984 
1985 Warning:
1986     $(D PollItem) objects do not store $(STDREF socket,Socket) references,
1987     only the corresponding native file descriptors.  This means that the
1988     references have to be stored elsewhere, or the objects may be garbage
1989     collected, invalidating the sockets before or while $(FREF poll) executes.
1990     ---
1991     // Not OK
1992     auto p1 = PollItem(new std.socket.Socket(/*...*/), PollFlags.pollIn);
1993 
1994     // OK
1995     auto s = new std.socket.Socket(/*...*/);
1996     auto p2 = PollItem(s, PollFlags.pollIn);
1997     ---
1998 Corresponds_to:
1999     $(D $(ZMQAPI zmq_poll,zmq_pollitem_t))
2000 +/
2001 struct PollItem
2002 {
2003     /// Constructs a $(REF PollItem) for monitoring a $(ZMQ) socket.
2004     this(ref zmqd.Socket socket, PollFlags events) nothrow
2005     {
2006         m_pollItem = zmq_pollitem_t(socket.handle, 0, cast(short) events, 0);
2007     }
2008 
2009     import std.socket;
2010     /**
2011     Constructs a $(REF PollItem) for monitoring a standard socket referenced
2012     by a $(STDREF socket,Socket).
2013     */
2014     this(std.socket.Socket socket, PollFlags events) @system
2015     {
2016         this(socket.handle, events);
2017     }
2018 
2019     /**
2020     Constructs a $(REF PollItem) for monitoring a standard socket referenced
2021     by a native file descriptor.
2022     */
2023     this(FD fd, PollFlags events) pure nothrow
2024     {
2025         m_pollItem = zmq_pollitem_t(null, fd, cast(short) events, 0);
2026     }
2027 
2028     /**
2029     Requested _events.
2030 
2031     Corresponds_to:
2032         $(D $(ZMQAPI zmq_poll,zmq_pollitem_t.events))
2033     */
2034     @property void requestedEvents(PollFlags events) pure nothrow
2035     {
2036         m_pollItem.events = cast(short) events;
2037     }
2038 
2039     /// ditto
2040     @property PollFlags requestedEvents() const pure nothrow
2041     {
2042         return cast(typeof(return)) m_pollItem.events;
2043     }
2044 
2045     /**
2046     Returned events.
2047 
2048     Corresponds_to:
2049         $(D $(ZMQAPI zmq_poll,zmq_pollitem_t.revents))
2050     */
2051     @property PollFlags returnedEvents() const pure nothrow
2052     {
2053         return cast(typeof(return)) m_pollItem.revents;
2054     }
2055 
2056 private:
2057     zmq_pollitem_t m_pollItem;
2058 }
2059 
2060 
2061 /**
2062 An object that encapsulates a $(ZMQ) message frame.
2063 
2064 This $(D struct) is a wrapper around a $(D zmq_msg_t) object.
2065 A default-initialized $(D Frame) is not a valid $(ZMQ) message frame; it
2066 should always be explicitly initialized upon construction using
2067 $(FREF _Frame.opCall).  Alternatively, it may be initialized later with
2068 $(FREF _Frame.rebuild).
2069 ---
2070 Frame msg1;                 // Invalid frame
2071 auto msg2 = Frame();        // Empty frame
2072 auto msg3 = Frame(1024);    // 1K frame
2073 msg1.rebuild(2048);         // msg1 now has size 2K
2074 msg2.rebuild(2048);         // ...and so does msg2
2075 ---
2076 When a $(D Frame) object is destroyed, $(ZMQREF zmq_msg_close()) is
2077 called on the underlying $(D zmq_msg_t).
2078 
2079 A $(D Frame) cannot be copied by normal assignment; use $(FREF _Frame.copy)
2080 for this.
2081 */
2082 struct Frame
2083 {
2084 @safe:
2085     /**
2086     Initializes an empty $(ZMQ) message frame.
2087 
2088     Throws:
2089         $(REF ZmqException) if $(ZMQ) reports an error.
2090     Corresponds_to:
2091         $(ZMQREF zmq_msg_init())
2092     */
2093     static Frame opCall()
2094     {
2095         Frame f;
2096         f.init();
2097         return f;
2098     }
2099 
2100     ///
2101     unittest
2102     {
2103         auto msg = Frame();
2104         assert(msg.size == 0);
2105     }
2106 
2107     /** $(ANCHOR Frame.opCall_size)
2108     Initializes a $(ZMQ) message frame of the specified _size.
2109 
2110     Throws:
2111         $(REF ZmqException) if $(ZMQ) reports an error.
2112     Corresponds_to:
2113         $(ZMQREF zmq_msg_init_size())
2114     */
2115     static Frame opCall(size_t size)
2116     {
2117         Frame m;
2118         m.init(size);
2119         return m;
2120     }
2121 
2122     ///
2123     unittest
2124     {
2125         auto msg = Frame(123);
2126         assert(msg.size == 123);
2127     }
2128 
2129     /** $(ANCHOR Frame.opCall_data)
2130     Initializes a $(ZMQ) message frame from a supplied buffer.
2131 
2132     If $(D free) is not specified, $(D data) $(EM must) refer to a slice of
2133     memory which has been allocated by the garbage collector (typically using
2134     operator $(D new)).  Ownership will then be transferred temporarily to
2135     $(ZMQ), and then transferred back to the GC when $(ZMQ) is done using the
2136     buffer.
2137 
2138     For memory which is $(EM not) garbage collected, the argument $(D free)
2139     must be a pointer to a function that will release the memory, which will
2140     be called when $(ZMQ) no longer needs the buffer.  $(D free) must point to
2141     a function with the following signature:
2142     ---
2143     extern(C) void f(void* d, void* h) nothrow;
2144     ---
2145     When it is called, $(D d) will be equal to $(D data.ptr), while $(D h) will
2146     be equal to $(D hint) (which is optional and null by default).
2147 
2148     $(D free) is passed directly to the underlying $(ZMQ) C function, which is
2149     why it needs to be $(D extern(C)).
2150 
2151     Warning:
2152         Some care must be taken when using this function, as there is no telling
2153         when, whether, or in which thread $(ZMQ) relinquishes ownership of the
2154         buffer.  Client code should therefore avoid retaining any references to
2155         it, including slices that contain, overlap with or are contained in
2156         $(D data).  For the "non-GC" version, client code should in general not
2157         retain slices or pointers to any memory which will be released when
2158         $(D free) is called.
2159     See_Also:
2160         $(REF Frame.FreeData)
2161     Throws:
2162         $(REF ZmqException) if $(ZMQ) reports an error.
2163     Corresponds_to:
2164         $(ZMQREF zmq_msg_init_data())
2165     */
2166     static Frame opCall(ubyte[] data) @system
2167     {
2168         Frame m;
2169         m.init(data);
2170         return m;
2171     }
2172 
2173     /// ditto
2174     static Frame opCall(ubyte[] data, FreeData free, void* hint = null) @system
2175     {
2176         Frame m;
2177         m.init(data, free, hint);
2178         return m;
2179     }
2180 
2181     ///
2182     @system unittest
2183     {
2184         // Garbage-collected memory
2185         auto buf = new ubyte[123];
2186         auto msg = Frame(buf);
2187         assert(msg.size == buf.length);
2188         assert(msg.data.ptr == buf.ptr);
2189     }
2190 
2191     ///
2192     @system unittest
2193     {
2194         // Manually managed memory
2195         import core.stdc.stdlib: malloc, free;
2196         static extern(C) void myFree(void* data, void* hint) nothrow { free(data); }
2197         auto buf = (cast(ubyte*) malloc(10))[0 .. 10];
2198         auto msg = Frame(buf, &myFree);
2199         assert(msg.size == buf.length);
2200         assert(msg.data.ptr == buf.ptr);
2201     }
2202 
2203     @system unittest
2204     {
2205         // Non-documentation unittest.  Let's see if the data *actually* gets freed.
2206         ubyte buffer = 81;
2207         bool released = false;
2208         static extern(C) void myFree(void* data, void* hint) nothrow
2209         {
2210             auto typedData = cast(ubyte*) data;
2211             auto typedHint = cast(bool*) hint;
2212             assert(*typedData == 81);
2213             assert(!*typedHint);
2214             *typedData = 0;
2215             *typedHint = true;
2216         }
2217         // New scope, so the frame gets released at the end.
2218         {
2219             auto frame = Frame((&buffer)[0 .. 1], &myFree, &released);
2220             assert(frame.size == 1);
2221             assert(frame.data.ptr == &buffer);
2222             assert(buffer == 81);
2223             assert(!released);
2224         }
2225         assert(buffer == 0);
2226         assert(released);
2227     }
2228 
2229     /**
2230     The function pointer type for memory-freeing callback functions passed to
2231     $(D $(LINK2 #Frame.opCall_data,Frame(ubyte[], _FreeData, void*))).
2232     */
2233     @system alias extern(C) void function(void*, void*) nothrow FreeData;
2234 
2235     /**
2236     Reinitializes the Frame object as an empty message.
2237 
2238     This function will first call $(FREF Frame.close) to release the
2239     resources associated with the message frame, and then it will
2240     initialize it anew, exactly as if it were constructed  with
2241     $(D $(LINK2 #Frame.opCall,Frame())).
2242 
2243     Throws:
2244         $(REF ZmqException) if $(ZMQ) reports an error.
2245     Corresponds_to:
2246         $(ZMQREF zmq_msg_close()) followed by $(ZMQREF zmq_msg_init())
2247     */
2248     void rebuild()
2249     {
2250         close();
2251         init();
2252     }
2253 
2254     ///
2255     unittest
2256     {
2257         auto msg = Frame(256);
2258         assert (msg.size == 256);
2259         msg.rebuild();
2260         assert (msg.size == 0);
2261     }
2262 
2263     /**
2264     Reinitializes the Frame object to a specified size.
2265 
2266     This function will first call $(FREF Frame.close) to release the
2267     resources associated with the message frame, and then it will
2268     initialize it anew, exactly as if it were constructed  with
2269     $(D $(LINK2 #Frame.opCall_size,Frame(size))).
2270 
2271     Throws:
2272         $(REF ZmqException) if $(ZMQ) reports an error.
2273     Corresponds_to:
2274         $(ZMQREF zmq_msg_close()) followed by $(ZMQREF zmq_msg_init_size()).
2275     */
2276     void rebuild(size_t size)
2277     {
2278         close();
2279         init(size);
2280     }
2281 
2282     ///
2283     unittest
2284     {
2285         auto msg = Frame(256);
2286         assert (msg.size == 256);
2287         msg.rebuild(1024);
2288         assert (msg.size == 1024);
2289     }
2290 
2291     /**
2292     Reinitializes the Frame object from a supplied buffer.
2293 
2294     This function will first call $(FREF Frame.close) to release the
2295     resources associated with the message frame, and then it will
2296     initialize it anew, exactly as if it were constructed  with
2297     $(D Frame(data)) or $(D Frame(data, free, hint)).
2298 
2299     Some care must be taken when using these functions.  Please read the
2300     $(D $(LINK2 #Frame.opCall_data,Frame(ubyte[]))) documentation.
2301 
2302     Throws:
2303         $(REF ZmqException) if $(ZMQ) reports an error.
2304     Corresponds_to:
2305         $(ZMQREF zmq_msg_close()) followed by $(ZMQREF zmq_msg_init_data()).
2306     */
2307     void rebuild(ubyte[] data) @system
2308     {
2309         close();
2310         init(data);
2311     }
2312 
2313     /// ditto
2314     void rebuild(ubyte[] data, FreeData free, void* hint = null) @system
2315     {
2316         close();
2317         init(data, free, hint);
2318     }
2319 
2320     ///
2321     @system unittest
2322     {
2323         // Garbage-collected memory
2324         auto msg = Frame(256);
2325         assert (msg.size == 256);
2326         auto buf = new ubyte[123];
2327         msg.rebuild(buf);
2328         assert(msg.size == buf.length);
2329         assert(msg.data.ptr == buf.ptr);
2330     }
2331 
2332     ///
2333     @system unittest
2334     {
2335         // Manually managed memory
2336         import core.stdc.stdlib: malloc, free;
2337         static extern(C) void myFree(void* data, void* hint) nothrow { free(data); }
2338 
2339         auto msg = Frame(256);
2340         assert (msg.size == 256);
2341         auto buf = (cast(ubyte*) malloc(10))[0 .. 10];
2342         msg.rebuild(buf, &myFree);
2343         assert(msg.size == buf.length);
2344         assert(msg.data.ptr == buf.ptr);
2345     }
2346 
2347     @disable this(this);
2348 
2349     /**
2350     Releases the $(ZMQ) message frame when the $(D Frame) is destroyed.
2351 
2352     This destructor never throws, which means that any errors will go
2353     undetected.  If this is undesirable, call $(FREF Frame.close) before
2354     the $(D Frame) is destroyed.
2355 
2356     Corresponds_to:
2357         $(ZMQREF zmq_msg_close())
2358     */
2359     ~this() nothrow
2360     {
2361         if (m_initialized) {
2362             immutable rc = trusted!zmq_msg_close(&m_msg);
2363             assert(rc == 0, "zmq_msg_close failed: Invalid message frame");
2364         }
2365     }
2366 
2367     /**
2368     Releases the $(ZMQ) message frame.
2369 
2370     Note that the frame will be automatically released when the $(D Frame)
2371     object is destroyed, so it is often not necessary to call this method
2372     manually.
2373 
2374     Throws:
2375         $(REF ZmqException) if $(ZMQ) reports an error.
2376     Corresponds_to:
2377         $(ZMQREF zmq_msg_close())
2378     */
2379     void close()
2380     {
2381         if (m_initialized) {
2382             if (trusted!zmq_msg_close(&m_msg) != 0) {
2383                 throw new ZmqException;
2384             }
2385             m_initialized = false;
2386         }
2387     }
2388 
2389     /**
2390     Copies frame content to another message frame.
2391 
2392     $(D copy()) returns a new $(D Frame) object, while $(D copyTo(dest))
2393     copies the contents of this $(D Frame) into $(D dest).  $(D dest) must
2394     be a valid (i.e. initialized) $(D Frame).
2395 
2396     Warning:
2397         These functions may not do what you think they do.  Please refer
2398         to $(ZMQAPI zmq_msg_copy(),the $(ZMQ) manual) for details.
2399     Throws:
2400         $(REF ZmqException) if $(ZMQ) reports an error.
2401     Corresponds_to:
2402         $(ZMQREF zmq_msg_copy())
2403     */
2404     Frame copy()
2405         in { assert(m_initialized); }
2406         body
2407     {
2408         auto cp = Frame();
2409         copyTo(cp);
2410         return cp;
2411     }
2412 
2413     /// ditto
2414     void copyTo(ref Frame dest)
2415         in { assert(m_initialized); }
2416         body
2417     {
2418         if (trusted!zmq_msg_copy(&dest.m_msg, &m_msg) != 0) {
2419             throw new ZmqException;
2420         }
2421     }
2422 
2423     ///
2424     unittest
2425     {
2426         import std.string: representation;
2427         auto msg1 = Frame(3);
2428         msg1.data[] = "foo".representation;
2429         auto msg2 = msg1.copy();
2430         assert (msg2.data.asString() == "foo");
2431     }
2432 
2433     /**
2434     Moves frame content to another message frame.
2435 
2436     $(D move()) returns a new $(D Frame) object, while $(D moveTo(dest))
2437     moves the contents of this $(D Frame) to $(D dest).  $(D dest) must
2438     be a valid (i.e. initialized) $(D Frame).
2439 
2440     Throws:
2441         $(REF ZmqException) if $(ZMQ) reports an error.
2442     Corresponds_to:
2443         $(ZMQREF zmq_msg_move())
2444     */
2445     Frame move()
2446         in { assert(m_initialized); }
2447         body
2448     {
2449         auto m = Frame();
2450         moveTo(m);
2451         return m;
2452     }
2453 
2454     /// ditto
2455     void moveTo(ref Frame dest)
2456         in { assert(m_initialized); }
2457         body
2458     {
2459         if (trusted!zmq_msg_move(&dest.m_msg, &m_msg) != 0) {
2460             throw new ZmqException;
2461         }
2462     }
2463 
2464     ///
2465     unittest
2466     {
2467         import std.string: representation;
2468         auto msg1 = Frame(3);
2469         msg1.data[] = "foo".representation;
2470         auto msg2 = msg1.move();
2471         assert (msg1.size == 0);
2472         assert (msg2.data.asString() == "foo");
2473     }
2474 
2475     /**
2476     The message frame content _size in bytes.
2477 
2478     Corresponds_to:
2479         $(ZMQREF zmq_msg_size())
2480     */
2481     @property size_t size() nothrow
2482         in { assert(m_initialized); }
2483         body
2484     {
2485         return trusted!zmq_msg_size(&m_msg);
2486     }
2487 
2488     ///
2489     unittest
2490     {
2491         auto msg = Frame(123);
2492         assert(msg.size == 123);
2493     }
2494 
2495     /**
2496     Retrieves the message frame content.
2497 
2498     Corresponds_to:
2499         $(ZMQREF zmq_msg_data())
2500     */
2501     @property ubyte[] data() @trusted nothrow
2502         in { assert(m_initialized); }
2503         body
2504     {
2505         return (cast(ubyte*) zmq_msg_data(&m_msg))[0 .. size];
2506     }
2507 
2508     ///
2509     unittest
2510     {
2511         import std.string: representation;
2512         auto msg = Frame(3);
2513         assert(msg.data.length == 3);
2514         msg.data[] = "foo".representation; // Slice operator -> array copy.
2515         assert(msg.data.asString() == "foo");
2516     }
2517 
2518     /**
2519     Whether there are _more message frames to retrieve.
2520 
2521     Corresponds_to:
2522         $(ZMQREF zmq_msg_more())
2523     */
2524     @property bool more() nothrow
2525         in { assert(m_initialized); }
2526         body
2527     {
2528         return !!trusted!zmq_msg_more(&m_msg);
2529     }
2530 
2531     static if (ZMQ_VERSION >= ZMQ41) {
2532         /**
2533         The file descriptor of the socket the message was read from.
2534 
2535         Throws:
2536             $(REF ZmqException) if $(ZMQ) reports an error.
2537         Corresponds_to:
2538             $(ZMQREF zmq_msg_get()) with $(D ZMQ_SRCFD).
2539         */
2540         @property FD sourceFD()
2541         {
2542             return cast(FD) getProperty(ZMQ_SRCFD);
2543         }
2544 
2545         /**
2546         Whether the message MAY share underlying storage with another copy.
2547 
2548         Throws:
2549             $(REF ZmqException) if $(ZMQ) reports an error.
2550         Corresponds_to:
2551             $(ZMQREF zmq_msg_get()) with $(D ZMQ_SHARED).
2552         */
2553         @property bool sharedStorage()
2554         {
2555             return !!getProperty(ZMQ_SHARED);
2556         }
2557 
2558         unittest
2559         {
2560             import std.string: representation;
2561             auto msg1 = Frame(100); // Big message to avoid small-string optimisation
2562             msg1.data[0 .. 3] = "foo".representation;
2563             assert (!msg1.sharedStorage);
2564             auto msg2 = msg1.copy();
2565             assert (msg2.sharedStorage); // Warning: The ZMQ docs only say that this MAY be true
2566             assert (msg2.data[0 .. 3].asString() == "foo");
2567         }
2568 
2569         /**
2570         Gets message _metadata.
2571 
2572         $(D metadataUnsafe()) is faster than $(D metadata()) because it
2573         directly returns the array which comes from $(ZMQREF zmq_msg_gets()),
2574         whereas the latter returns a freshly GC-allocated copy of it.  However,
2575         the array returned by $(D metadataUnsafe()) is owned by the underlying
2576         $(ZMQ) message and gets destroyed along with it, so care must be
2577         taken when using this function.
2578 
2579         Throws:
2580             $(REF ZmqException) if $(ZMQ) reports an error.
2581         Corresponds_to:
2582             $(ZMQREF zmq_msg_gets())
2583         */
2584         char[] metadata(const char[] property) @trusted
2585             in { assert(m_initialized); }
2586             body
2587         {
2588             return metadataUnsafe(property).dup;
2589         }
2590 
2591         /// ditto
2592         const(char)[] metadataUnsafe(const char[] property) @system
2593             in { assert(m_initialized); }
2594             body
2595         {
2596             if (auto value = zmq_msg_gets(handle, zeroTermString(property))) {
2597                 import std.string: fromStringz;
2598                 return fromStringz(value);
2599             } else {
2600                 throw new ZmqException();
2601             }
2602         }
2603     } // static if (ZMQ_VERSION >= ZMQ41)
2604 
2605     /**
2606     A pointer to the underlying $(D zmq_msg_t).
2607     */
2608     @property inout(zmq_msg_t)* handle() inout pure nothrow
2609     {
2610         return &m_msg;
2611     }
2612 
2613 private:
2614     private void init()
2615         in { assert (!m_initialized); }
2616         out { assert (m_initialized); }
2617         body
2618     {
2619         if (trusted!zmq_msg_init(&m_msg) != 0) {
2620             throw new ZmqException;
2621         }
2622         m_initialized = true;
2623     }
2624 
2625     private void init(size_t size)
2626         in { assert (!m_initialized); }
2627         out { assert (m_initialized); }
2628         body
2629     {
2630         if (trusted!zmq_msg_init_size(&m_msg, size) != 0) {
2631             throw new ZmqException;
2632         }
2633         m_initialized = true;
2634     }
2635 
2636     private void init(ubyte[] data) @system
2637         in { assert (!m_initialized); }
2638         out { assert (m_initialized); }
2639         body
2640     {
2641         import core.memory;
2642         static extern(C) void zmqd_Frame_init_gcFree(void* dataPtr, void* block) nothrow
2643         {
2644             GC.removeRoot(dataPtr);
2645             GC.clrAttr(block, GC.BlkAttr.NO_MOVE);
2646         }
2647 
2648         GC.addRoot(data.ptr);
2649         scope(failure) GC.removeRoot(data.ptr);
2650 
2651         auto block = GC.addrOf(data.ptr);
2652         immutable movable = block && !(GC.getAttr(block) & GC.BlkAttr.NO_MOVE);
2653         GC.setAttr(block, GC.BlkAttr.NO_MOVE);
2654         scope(failure) if (movable) GC.clrAttr(block, GC.BlkAttr.NO_MOVE);
2655 
2656         init(data, &zmqd_Frame_init_gcFree, block);
2657     }
2658 
2659     void init(ubyte[] data, FreeData free, void* hint) @system
2660         in { assert (!m_initialized); }
2661         out { assert (m_initialized); }
2662         body
2663     {
2664         if (zmq_msg_init_data(&m_msg, data.ptr, data.length, free, hint) != 0) {
2665             throw new ZmqException;
2666         }
2667         m_initialized = true;
2668     }
2669 
2670     int getProperty(int property) @trusted
2671     {
2672         const value = zmq_msg_get(&m_msg, property);
2673         if (value == -1) {
2674             throw new ZmqException;
2675         }
2676         return value;
2677     }
2678 
2679     bool m_initialized;
2680     zmq_msg_t m_msg;
2681 }
2682 
2683 unittest
2684 {
2685     const url = uniqueUrl("inproc");
2686     auto s1 = Socket(SocketType.pair);
2687     auto s2 = Socket(SocketType.pair);
2688     s1.bind(url);
2689     s2.connect(url);
2690 
2691     auto m1a = Frame(123);
2692     m1a.data[] = 'a';
2693     s1.send(m1a);
2694     auto m2a = Frame();
2695     s2.receive(m2a);
2696     assert(m2a.size == 123);
2697     foreach (e; m2a.data) assert(e == 'a');
2698 
2699     auto m1b = Frame(10);
2700     m1b.data[] = 'b';
2701     s1.send(m1b);
2702     auto m2b = Frame();
2703     s2.receive(m2b);
2704     assert(m2b.size == 10);
2705     foreach (e; m2b.data) assert(e == 'b');
2706 }
2707 
2708 deprecated("zmqd.Message has been renamed to zmqd.Frame") alias Message = Frame;
2709 
2710 
2711 /**
2712 A global context which is used by default by all sockets, unless they are
2713 explicitly constructed with a different context.
2714 
2715 The $(ZMQ) Guide $(LINK2 http://zguide.zeromq.org/page:all#Getting-the-Context-Right,
2716 has the following to say) about context creation:
2717 $(QUOTE
2718     You should create and use exactly one context in your process.
2719     [$(LDOTS)] If at runtime a process has two contexts, these are
2720     like separate $(ZMQ) instances. If that's explicitly what you
2721     want, OK, but otherwise remember: $(EM Do one $(D zmq_ctx_new())
2722     at the start of your main line code, and one $(D zmq_ctx_destroy())
2723     at the end.)
2724 )
2725 By using $(D defaultContext()), this is exactly what you achieve.  The
2726 context is created the first time the function is called, and is
2727 automatically destroyed when the program ends.
2728 
2729 This function is thread safe.
2730 
2731 Throws:
2732     $(REF ZmqException) if $(ZMQ) reports an error.
2733 See_also:
2734     $(REF Context)
2735 */
2736 Context defaultContext() @trusted
2737 {
2738     // For future reference: This is the low-lock singleton pattern. See:
2739     // http://davesdprogramming.wordpress.com/2013/05/06/low-lock-singletons/
2740     static bool instantiated;
2741     __gshared Context ctx;
2742     if (!instantiated) {
2743         synchronized {
2744             if (!ctx.initialized) {
2745                 ctx = Context();
2746             }
2747             instantiated = true;
2748         }
2749     }
2750     return ctx;
2751 }
2752 
2753 @system unittest
2754 {
2755     import core.thread;
2756     Context c1, c2;
2757     auto t = new Thread(() { c1 = defaultContext(); });
2758     t.start();
2759     c2 = defaultContext();
2760     t.join();
2761     assert(c1.handle !is null);
2762     assert(c1.handle == c2.handle);
2763 }
2764 
2765 
2766 /**
2767 An object that encapsulates a $(ZMQ) context.
2768 
2769 In most programs, it is not necessary to use this type directly,
2770 as $(REF Socket) will use a default global context if not explicitly
2771 provided with one.  See $(FREF defaultContext) for details.
2772 
2773 A default-initialized $(D Context) is not a valid $(ZMQ) context; it
2774 must always be explicitly initialized with $(FREF _Context.opCall):
2775 ---
2776 Context ctx;        // Not a valid context yet
2777 ctx = Context();    // ...but now it is.
2778 ---
2779 $(D Context) objects can be passed around by value, and two copies will
2780 refer to the same context.  The underlying context is managed using
2781 reference counting, so that when the last copy of a $(D Context) goes
2782 out of scope, the context is automatically destroyed.  The reference
2783 counting is performed in a thread safe manner, so that the same context
2784 can be shared between multiple threads.  ($(ZMQ) guarantees the thread
2785 safety of other context operations.)
2786 
2787 See_also:
2788     $(FREF defaultContext)
2789 */
2790 struct Context
2791 {
2792 @safe:
2793     /**
2794     Creates a new $(ZMQ) context.
2795 
2796     Returns:
2797         A $(REF Context) object that encapsulates the new context.
2798     Throws:
2799         $(REF ZmqException) if $(ZMQ) reports an error.
2800     Corresponds_to:
2801         $(ZMQREF zmq_ctx_new())
2802     */
2803     static Context opCall() @trusted // because of the cast
2804     {
2805         if (auto c = trusted!zmq_ctx_new()) {
2806             Context ctx;
2807             // Casting from/to shared is OK since ZMQ contexts are thread safe.
2808             static Exception release(shared(void)* ptr) @trusted nothrow
2809             {
2810                 return zmq_ctx_term(cast(void*) ptr) == 0
2811                     ? null
2812                     : new ZmqException;
2813             }
2814             ctx.m_resource = SharedResource(cast(shared) c, &release);
2815             return ctx;
2816         } else {
2817             throw new ZmqException;
2818         }
2819     }
2820 
2821     ///
2822     unittest
2823     {
2824         auto ctx = Context();
2825         assert (ctx.initialized);
2826     }
2827 
2828     /**
2829     Detaches from the $(ZMQ) context.
2830 
2831     If this is the last reference to the context, it will be destroyed with
2832     $(ZMQREF zmq_ctx_term()).
2833 
2834     Throws:
2835         $(REF ZmqException) if $(ZMQ) reports an error.
2836     */
2837     void detach()
2838     {
2839         m_resource.detach();
2840     }
2841 
2842     ///
2843     unittest
2844     {
2845         auto ctx = Context();
2846         assert (ctx.initialized);
2847         ctx.detach();
2848         assert (!ctx.initialized);
2849     }
2850 
2851     /**
2852     Forcefully terminates the context.
2853 
2854     By using this function, one effectively circumvents the reference-counting
2855     mechanism for managing the context.  After it returns, all other
2856     $(D Context) objects that used to refer to the same context will be in a
2857     state which is functionally equivalent to the default-initialized state
2858     (i.e., $(REF Context.initialized) is $(D false) and $(REF Context.handle)
2859     is $(D null)).
2860 
2861     Throws:
2862         $(REF ZmqException) if $(ZMQ) reports an error.
2863     Corresponds_to:
2864         $(ZMQREF zmq_ctx_term())
2865     */
2866     void terminate() @system
2867     {
2868         m_resource.forceRelease();
2869     }
2870 
2871     ///
2872     @system unittest
2873     {
2874         auto ctx1 = Context();
2875         auto ctx2 = ctx1;
2876         assert (ctx1.initialized);
2877         assert (ctx2.initialized);
2878         assert (ctx1.handle == ctx2.handle);
2879         ctx2.terminate();
2880         assert (!ctx1.initialized);
2881         assert (!ctx2.initialized);
2882         assert (ctx1.handle == null);
2883         assert (ctx2.handle == null);
2884     }
2885 
2886     @system unittest
2887     {
2888         auto ctx = Context();
2889 
2890         void threadFunc()
2891         {
2892             auto server = Socket(ctx, SocketType.rep);
2893             server.bind("inproc://Context_terminate_unittest");
2894             try {
2895                 server.receive(null);
2896                 server.send("");
2897                 server.receive(null);
2898                 assert (false, "Never get here");
2899             } catch (ZmqException e) {
2900                 assert (e.errno == ETERM);
2901             }
2902         }
2903 
2904         import core.thread: Thread;
2905         auto thread = new Thread(&threadFunc);
2906         thread.start();
2907 
2908         auto client = Socket(ctx, SocketType.req);
2909         client.connect("inproc://Context_terminate_unittest");
2910         client.send("");
2911         client.receive(null);
2912         client.close();
2913 
2914         ctx.terminate();
2915         thread.join();
2916     }
2917 
2918 
2919     /**
2920     The number of I/O threads.
2921 
2922     Throws:
2923         $(REF ZmqException) if $(ZMQ) reports an error.
2924     Corresponds_to:
2925         $(ZMQREF zmq_ctx_get()) and $(ZMQREF zmq_ctx_set()) with
2926         $(D ZMQ_IO_THREADS).
2927     */
2928     @property int ioThreads()
2929     {
2930         return getOption(ZMQ_IO_THREADS);
2931     }
2932 
2933     /// ditto
2934     @property void ioThreads(int value)
2935     {
2936         setOption(ZMQ_IO_THREADS, value);
2937     }
2938 
2939     ///
2940     unittest
2941     {
2942         auto ctx = Context();
2943         ctx.ioThreads = 3;
2944         assert (ctx.ioThreads == 3);
2945     }
2946 
2947     /**
2948     The maximum number of sockets.
2949 
2950     Throws:
2951         $(REF ZmqException) if $(ZMQ) reports an error.
2952     Corresponds_to:
2953         $(ZMQREF zmq_ctx_get()) and $(ZMQREF zmq_ctx_set()) with
2954         $(D ZMQ_MAX_SOCKETS).
2955     */
2956     @property int maxSockets()
2957     {
2958         return getOption(ZMQ_MAX_SOCKETS);
2959     }
2960 
2961     /// ditto
2962     @property void maxSockets(int value)
2963     {
2964         setOption(ZMQ_MAX_SOCKETS, value);
2965     }
2966 
2967     ///
2968     unittest
2969     {
2970         auto ctx = Context();
2971         ctx.maxSockets = 512;
2972         assert (ctx.maxSockets == 512);
2973     }
2974 
2975     static if (ZMQ_VERSION >= ZMQ41) {
2976         /**
2977         The largest configurable number of sockets.
2978 
2979         Throws:
2980             $(REF ZmqException) if $(ZMQ) reports an error.
2981         Corresponds_to:
2982             $(ZMQREF zmq_ctx_get()) with $(D ZMQ_SOCKET_LIMIT).
2983         */
2984         @property int socketLimit()
2985         {
2986             return getOption(ZMQ_SOCKET_LIMIT);
2987         }
2988 
2989         ///
2990         unittest
2991         {
2992             auto ctx = Context();
2993             assert (ctx.socketLimit > 0);
2994         }
2995 
2996         /**
2997         IPv6 option.
2998 
2999         Throws:
3000             $(REF ZmqException) if $(ZMQ) reports an error.
3001         Corresponds_to:
3002             $(ZMQREF zmq_ctx_get()) and $(ZMQREF zmq_ctx_set()) with
3003             $(D ZMQ_IPV6).
3004         */
3005         @property bool ipv6()
3006         {
3007             return !!getOption(ZMQ_IPV6);
3008         }
3009 
3010         /// ditto
3011         @property void ipv6(bool value)
3012         {
3013             setOption(ZMQ_IPV6, value ? 1 : 0);
3014         }
3015 
3016         ///
3017         unittest
3018         {
3019             auto ctx = Context();
3020             ctx.ipv6 = true;
3021             assert (ctx.ipv6 == true);
3022         }
3023     }
3024 
3025     /**
3026     The $(D void*) pointer used by the underlying C API to refer to the context.
3027 
3028     If the object has not been initialized, this function returns $(D null).
3029     */
3030     @property inout(void)* handle() inout @trusted pure nothrow
3031     {
3032         // ZMQ contexts are thread safe, so casting away shared is OK.
3033         return cast(typeof(return)) m_resource.handle;
3034     }
3035 
3036     /**
3037     Whether this $(REF Context) object has been _initialized, i.e. whether it
3038     refers to a valid $(ZMQ) context.
3039     */
3040     @property bool initialized() const pure nothrow
3041     {
3042         return m_resource.handle != null;
3043     }
3044 
3045     ///
3046     unittest
3047     {
3048         Context ctx;
3049         assert (!ctx.initialized);
3050         ctx = Context();
3051         assert (ctx.initialized);
3052         ctx.detach();
3053         assert (!ctx.initialized);
3054     }
3055 
3056 private:
3057     int getOption(int option)
3058     {
3059         immutable value = trusted!zmq_ctx_get(this.handle, option);
3060         if (value < 0) {
3061             throw new ZmqException;
3062         }
3063         return value;
3064     }
3065 
3066     void setOption(int option, int value)
3067     {
3068         if (trusted!zmq_ctx_set(this.handle, option, value) != 0) {
3069             throw new ZmqException;
3070         }
3071     }
3072 
3073     SharedResource m_resource;
3074 }
3075 
3076 
3077 /**
3078 Socket event types.
3079 
3080 These are used together with $(FREF Socket.monitor), and are described
3081 in the $(ZMQREF zmq_socket_monitor()) reference.
3082 */
3083 enum EventType
3084 {
3085     connected       = ZMQ_EVENT_CONNECTED,      /// Corresponds to $(D ZMQ_EVENT_CONNECTED).
3086     connectDelayed  = ZMQ_EVENT_CONNECT_DELAYED,/// Corresponds to $(D ZMQ_EVENT_CONNECT_DELAYED).
3087     connectRetried  = ZMQ_EVENT_CONNECT_RETRIED,/// Corresponds to $(D ZMQ_EVENT_CONNECT_RETRIED).
3088     listening       = ZMQ_EVENT_LISTENING,      /// Corresponds to $(D ZMQ_EVENT_LISTENING).
3089     bindFailed      = ZMQ_EVENT_BIND_FAILED,    /// Corresponds to $(D ZMQ_EVENT_BIND_FAILED).
3090     accepted        = ZMQ_EVENT_ACCEPTED,       /// Corresponds to $(D ZMQ_EVENT_ACCEPTED).
3091     acceptFailed    = ZMQ_EVENT_ACCEPT_FAILED,  /// Corresponds to $(D ZMQ_EVENT_ACCEPT_FAILED).
3092     closed          = ZMQ_EVENT_CLOSED,         /// Corresponds to $(D ZMQ_EVENT_CLOSED).
3093     closeFailed     = ZMQ_EVENT_CLOSE_FAILED,   /// Corresponds to $(D ZMQ_EVENT_CLOSE_FAILED).
3094     disconnected    = ZMQ_EVENT_DISCONNECTED,   /// Corresponds to $(D ZMQ_EVENT_DISCONNECTED).
3095     all             = ZMQ_EVENT_ALL             /// Corresponds to $(D ZMQ_EVENT_ALL).
3096 }
3097 
3098 
3099 /**
3100 Receives a message on the given _socket and interprets it as a _socket
3101 state change event.
3102 
3103 $(D socket) must be a PAIR _socket which is connected to an endpoint
3104 created via a $(FREF Socket.monitor) call.  $(D receiveEvent()) receives
3105 one message on the _socket, parses its contents according to the
3106 specification in the $(ZMQREF zmq_socket_monitor()) reference,
3107 and returns the event information as an $(REF Event) object.
3108 
3109 Throws:
3110     $(REF ZmqException) if $(ZMQ) reports an error.$(BR)
3111     $(REF InvalidEventException) if the received message could not
3112     be interpreted as an event message.
3113 See_also:
3114     $(FREF Socket.monitor), for monitoring _socket state changes.
3115 */
3116 Event receiveEvent(ref Socket socket) @system
3117 {
3118     // The monitor event message format underwent some changes between ZMQ
3119     // versions 3.2, 3.3 (unreleased) and 4.0.  Furthermore, the zmq_event_t
3120     // type was removed as of ZMQ 4.1.  We try to support all versions >= 3.2.
3121     immutable ver = zmqVersion();
3122     immutable usePackedData = ver.major >= 4;
3123     immutable useNewLayout = ZMQ_MAKE_VERSION(ver.major, ver.minor, ver.patch)
3124         >= ZMQ_MAKE_VERSION(3, 3, 0);
3125     assert (useNewLayout || !usePackedData);
3126 
3127     struct OldEventStruct {
3128         int event;
3129         const(char*) addr;
3130         int value;
3131     }
3132     struct NewEventStruct {
3133         ushort event;
3134         int value;
3135     }
3136     immutable eventFrameSize =
3137         usePackedData ? (ushort.sizeof + int.sizeof)
3138                       : (useNewLayout ? NewEventStruct.sizeof : OldEventStruct.sizeof);
3139 
3140     auto eventFrame = Frame();
3141     if (socket.receive(eventFrame) != eventFrameSize) {
3142         throw new InvalidEventException;
3143     }
3144     const data = eventFrame.data.ptr;
3145     try {
3146         import std.conv: to;
3147         EventType event;
3148         int value;
3149         string addr;
3150         if (useNewLayout) {
3151             if (usePackedData) {
3152                 event = to!EventType(*(cast(const(ushort)*) data));
3153                 value = *(cast(const(int)*) data + ushort.sizeof);
3154             } else {
3155                 const eventStruct = cast(const(NewEventStruct)*) data;
3156                 event = to!EventType(eventStruct.event);
3157                 value = eventStruct.value;
3158             }
3159             auto addrFrame = Frame();
3160             socket.receive(addrFrame);
3161             addr = (cast(char[]) addrFrame.data).idup;
3162         } else {
3163             const eventStruct = cast(const(OldEventStruct)*) data;
3164             event = to!EventType(eventStruct.event);
3165             value = eventStruct.value;
3166             addr = eventStruct.addr !is null
3167                 ? to!string(eventStruct.addr)
3168                 : null;
3169         }
3170         return Event(event, addr, value);
3171     } catch (Exception e) {
3172         // Any exception thrown within the try block signifies that there
3173         // is something wrong with the event message.
3174         throw new InvalidEventException;
3175     }
3176 }
3177 
3178 // TODO: Remove version(Posix) and change to INPROC when updating to ZMQ 4.1.
3179 //       IPC does not work on Windows, and unbind() does not work with INPROC.
3180 //       See: https://github.com/zeromq/libzmq/issues/949
3181 version (Posix) @system unittest
3182 {
3183     Event[] events;
3184     void eventCollector()
3185     {
3186         auto coll = Socket(SocketType.pair);
3187         coll.connect("inproc://zmqd_receiveEvent_unittest_monitor");
3188         do {
3189             events ~= receiveEvent(coll);
3190         } while (events[$-1].type != EventType.closed);
3191     }
3192     import core.thread;
3193     auto collector = new Thread(&eventCollector);
3194     collector.start();
3195 
3196     static void eventGenerator()
3197     {
3198         auto sck1 = Socket(SocketType.pair);
3199         sck1.monitor("inproc://zmqd_receiveEvent_unittest_monitor");
3200         sck1.bind("ipc://zmqd_receiveEvent_unittest");
3201         import core.time;
3202         Thread.sleep(100.msecs);
3203         auto sck2 = Socket(SocketType.pair);
3204         sck2.connect("ipc://zmqd_receiveEvent_unittest");
3205         Thread.sleep(100.msecs);
3206         sck2.disconnect("ipc://zmqd_receiveEvent_unittest");
3207         Thread.sleep(100.msecs);
3208         sck1.unbind("ipc://zmqd_receiveEvent_unittest");
3209     }
3210     eventGenerator();
3211     collector.join();
3212     assert (events.length == 3);
3213     foreach (ev; events) {
3214         assert (ev.address == "ipc://zmqd_receiveEvent_unittest");
3215     }
3216     assert (events[0].type == EventType.listening);
3217     assert (events[1].type == EventType.accepted);
3218     assert (events[2].type == EventType.closed);
3219     import std.exception;
3220     assertNotThrown!Error(events[0].fd);
3221     assertThrown!Error(events[0].errno);
3222     assertThrown!Error(events[0].interval);
3223 }
3224 
3225 
3226 /**
3227 Information about a socket state change.
3228 
3229 See_also:
3230     $(FREF receiveEvent)
3231 */
3232 struct Event
3233 {
3234     /// The event _type.
3235     @property EventType type() const pure nothrow
3236     {
3237         return m_type;
3238     }
3239 
3240     /// The peer _address.
3241     @property string address() const pure nothrow
3242     {
3243         return m_address;
3244     }
3245 
3246     /**
3247     The socket file descriptor.
3248 
3249     This property function may only be called if $(REF Event.type) is one of:
3250     $(D connected), $(D listening), $(D accepted), $(D closed) or $(D disonnected).
3251 
3252     Throws:
3253         $(D Error) if the property is called for a wrong event type.
3254     */
3255     @property FD fd() const pure nothrow
3256     {
3257         final switch (m_type) {
3258             case EventType.connected     :
3259             case EventType.listening     :
3260             case EventType.accepted      :
3261             case EventType.closed        :
3262             case EventType.disconnected  : return cast(typeof(return)) m_value;
3263             case EventType.connectDelayed:
3264             case EventType.connectRetried:
3265             case EventType.bindFailed    :
3266             case EventType.acceptFailed  :
3267             case EventType.closeFailed   : throw invalidProperty();
3268             case EventType.all           :
3269         }
3270         assert (false);
3271     }
3272 
3273     /**
3274     The $(D _errno) code for the error which triggered the event.
3275 
3276     This property function may only be called if $(REF Event.type) is either
3277     $(D bindFailed), $(D acceptFailed) or $(D closeFailed).
3278 
3279     Throws:
3280         $(D Error) if the property is called for a wrong event type.
3281     */
3282     @property int errno() const pure nothrow
3283     {
3284         final switch (m_type) {
3285             case EventType.bindFailed    :
3286             case EventType.acceptFailed  :
3287             case EventType.closeFailed   : return m_value;
3288             case EventType.connected     :
3289             case EventType.connectDelayed:
3290             case EventType.connectRetried:
3291             case EventType.listening     :
3292             case EventType.accepted      :
3293             case EventType.closed        :
3294             case EventType.disconnected  : throw invalidProperty();
3295             case EventType.all           :
3296         }
3297         assert (false);
3298     }
3299 
3300     /**
3301     The reconnect interval.
3302 
3303     This property function may only be called if $(REF Event.type) is
3304     $(D connectRetried).
3305 
3306     Throws:
3307         $(D Error) if the property is called for a wrong event type.
3308     */
3309     @property Duration interval() const pure nothrow
3310     {
3311         final switch (m_type) {
3312             case EventType.connectRetried: return m_value.msecs;
3313             case EventType.connected     :
3314             case EventType.connectDelayed:
3315             case EventType.listening     :
3316             case EventType.bindFailed    :
3317             case EventType.accepted      :
3318             case EventType.acceptFailed  :
3319             case EventType.closed        :
3320             case EventType.closeFailed   :
3321             case EventType.disconnected  : throw invalidProperty();
3322             case EventType.all           :
3323         }
3324         assert (false);
3325     }
3326 
3327 private:
3328     this(EventType type, string address, int value) pure nothrow
3329     {
3330         m_type = type;
3331         m_address = address;
3332         m_value = value;
3333     }
3334 
3335     Error invalidProperty(string name = __FUNCTION__)() const pure nothrow
3336     {
3337         try {
3338             import std.conv: text;
3339             return new Error(text("Property '", name,
3340                                   "' not available for event type '",
3341                                   m_type, "'"));
3342         } catch (Exception e) {
3343             assert(false);
3344         }
3345     }
3346 
3347     EventType m_type;
3348     string m_address;
3349     int m_value;
3350 }
3351 
3352 
3353 /**
3354 Encodes a binary key as Z85 printable text.
3355 
3356 $(D dest) must be an array whose length is at least $(D 5*data.length/4 + 1),
3357 which will be used to store the return value plus a terminating zero byte.
3358 If $(D dest) is omitted, a new array will be created.
3359 
3360 Returns:
3361     An array of size $(D 5*data.length/4) which contains the Z85-encoded text,
3362     excluding the terminating zero byte.  This will be a slice of $(D dest) if
3363     it is provided.
3364 Throws:
3365     $(COREF exception,RangeError) if $(D dest) is given but is too small.$(BR)
3366     $(REF ZmqException) if $(ZMQ) reports an error (i.e., if $(D data.length)
3367     is not a multiple of 4).
3368 Corresponds_to:
3369     $(ZMQREF zmq_z85_encode())
3370 */
3371 char[] z85Encode(ubyte[] data, char[] dest)
3372 // TODO: Make data const when we update to ZMQ 4.1
3373 {
3374     import core.exception: RangeError;
3375     immutable len = 5 * data.length / 4;
3376     if (dest.length < len+1) throw new RangeError;
3377     if (trusted!zmq_z85_encode(dest.ptr, data.ptr, data.length) == null) {
3378         throw new ZmqException;
3379     }
3380     return dest[0 .. len];
3381 }
3382 
3383 /// ditto
3384 char[] z85Encode(ubyte[] data)
3385 {
3386     return z85Encode(data, new char[5*data.length/4 + 1]);
3387 }
3388 
3389 debug (WithCurveTests) @system unittest // @system because of assertThrown
3390 {
3391     // TODO: Make data immutable when we update to ZMQ 4.1
3392     auto data = cast(ubyte[])[0x86, 0x4f, 0xd2, 0x6f, 0xb5, 0x59, 0xf7, 0x5b];
3393     immutable encoded = "HelloWorld";
3394     assert (z85Encode(data) == encoded);
3395 
3396     auto buffer = new char[11];
3397     auto result = z85Encode(data, buffer);
3398     assert (result == encoded);
3399     assert (buffer.ptr == result.ptr);
3400 
3401     import core.exception: RangeError;
3402     import std.exception: assertThrown;
3403     assertThrown!RangeError(z85Encode(data, new char[10]));
3404     assertThrown!ZmqException(z85Encode(cast(ubyte[]) [ 1, 2, 3, 4, 5]));
3405 }
3406 
3407 
3408 /**
3409 Decodes a binary key from Z85 printable _text.
3410 
3411 $(D dest) must be an array whose length is at least $(D 4*data.length/5),
3412 which will be used to store the return value.
3413 If $(D dest) is omitted, a new array will be created.
3414 
3415 Note that $(ZMQREF zmq_z85_decode()) expects a zero-terminated string, so a zero
3416 byte will be appended to $(D text) if it does not contain one already.  However,
3417 this may trigger a (possibly unwanted) GC allocation.  To avoid this, either
3418 make sure that the last character in $(D text) is $(D '\0'), or use
3419 $(OBJREF assumeSafeAppend) on the array before calling this function (provided
3420 this is safe).
3421 
3422 Returns:
3423     An array of size $(D 4*data.length/5) which contains the decoded data.
3424     This will be a slice of $(D dest) if it is provided.
3425 Throws:
3426     $(COREF exception,RangeError) if $(D dest) is given but is too small.$(BR)
3427     $(REF ZmqException) if $(ZMQ) reports an error (i.e., if $(D data.length)
3428     is not a multiple of 5).
3429 Corresponds_to:
3430     $(ZMQREF zmq_z85_decode())
3431 */
3432 ubyte[] z85Decode(char[] text, ubyte[] dest)
3433 // TODO: Make text const when we update to ZMQ 4.1
3434 {
3435     import core.exception: RangeError;
3436     immutable len = 4 * text.length/5;
3437     if (dest.length < len) throw new RangeError;
3438     if (text[$-1] != '\0') text ~= '\0';
3439     if (trusted!zmq_z85_decode(dest.ptr, text.ptr) == null) {
3440         throw new ZmqException;
3441     }
3442     return dest[0 .. len];
3443 }
3444 
3445 /// ditto
3446 ubyte[] z85Decode(char[] text)
3447 {
3448     return z85Decode(text, new ubyte[4*text.length/5]);
3449 }
3450 
3451 debug (WithCurveTests) @system unittest // @system because of assertThrown
3452 {
3453     // TODO: Make data immutable when we update to ZMQ 4.1
3454     auto text = "HelloWorld".dup;
3455     immutable decoded = cast(ubyte[])[0x86, 0x4f, 0xd2, 0x6f, 0xb5, 0x59, 0xf7, 0x5b];
3456     assert (z85Decode(text) == decoded);
3457     assert (z85Decode(text~'\0') == decoded);
3458 
3459     auto buffer = new ubyte[8];
3460     auto result = z85Decode(text, buffer);
3461     assert (result == decoded);
3462     assert (buffer.ptr == result.ptr);
3463 
3464     import core.exception: RangeError;
3465     import std.exception: assertThrown;
3466     assertThrown!RangeError(z85Decode(text, new ubyte[7]));
3467     assertThrown!ZmqException(z85Decode("SizeNotAMultipleOf5".dup));
3468 }
3469 
3470 
3471 /**
3472 Generates a new Curve key pair.
3473 
3474 To avoid a memory allocation, preallocated buffers may optionally be supplied
3475 for the two keys.  Each of these must have a length of at least 41 bytes, enough
3476 for a 40-character Z85-encoded key plus a terminating zero byte.  If either
3477 buffer is omitted/$(D null), a new one will be created.
3478 
3479 Returns:
3480     A tuple that contains the two keys.  Each of these will have a length of
3481     40 characters, and will be slices of the input buffers if such have been
3482     provided.
3483 Throws:
3484     $(COREF exception,RangeError) if $(D publicKeyBuf) or $(D secretKeyBuf) are
3485         not $(D null) but have a length of less than 41 characters.$(BR)
3486     $(REF ZmqException) if $(ZMQ) reports an error.
3487 Corresponds_to:
3488     $(ZMQREF zmq_curve_keypair())
3489 */
3490 Tuple!(char[], "publicKey", char[], "secretKey")
3491     curveKeyPair(char[] publicKeyBuf = null, char[] secretKeyBuf = null)
3492 {
3493     import core.exception: RangeError;
3494     if (publicKeyBuf is null)           publicKeyBuf = new char[41];
3495     else if (publicKeyBuf.length < 41)  throw new RangeError;
3496     if (secretKeyBuf is null)           secretKeyBuf = new char[41];
3497     else if (secretKeyBuf.length < 41)  throw new RangeError;
3498 
3499     static if (ZMQ_VERSION < ZMQ_MAKE_VERSION(4, 1, 0)) {
3500         import deimos.zmq.utils: zmq_curve_keypair;
3501     }
3502     if (trusted!zmq_curve_keypair(publicKeyBuf.ptr, secretKeyBuf.ptr) != 0) {
3503         throw new ZmqException;
3504     }
3505     return typeof(return)(publicKeyBuf[0 .. 40], secretKeyBuf[0 .. 40]);
3506 }
3507 
3508 ///
3509 debug (WithCurveTests) unittest
3510 {
3511     auto server = Socket(SocketType.rep);
3512     auto serverKeys = curveKeyPair();
3513     server.curveServer = true;
3514     server.curveSecretKeyZ85 = serverKeys.secretKey;
3515     server.bind("inproc://curveKeyPair_test");
3516 
3517     auto client = Socket(SocketType.req);
3518     auto clientKeys = curveKeyPair();
3519     client.curvePublicKeyZ85 = clientKeys.publicKey;
3520     client.curveSecretKeyZ85 = clientKeys.secretKey;
3521     client.curveServerKeyZ85 = serverKeys.publicKey;
3522     client.connect("inproc://curveKeyPair_test");
3523     client.send("hello");
3524 
3525     ubyte[5] buf;
3526     assert (server.receive(buf) == 5);
3527     assert (buf.asString() == "hello");
3528 }
3529 
3530 debug (WithCurveTests) @system unittest
3531 {
3532     auto k1 = curveKeyPair();
3533     assert (k1.publicKey.length == 40);
3534     assert (k1.secretKey.length == 40);
3535 
3536     char[82] buf;
3537     auto k2 = curveKeyPair(buf[0 .. 41], buf[41 .. 82]);
3538     assert (k2.publicKey.length == 40);
3539     assert (k2.publicKey.ptr == buf.ptr);
3540     assert (k2.secretKey.length == 40);
3541     assert (k2.secretKey.ptr == buf.ptr + 41);
3542 
3543     char[82] backup = buf;
3544     import core.exception, std.exception;
3545     assertThrown!RangeError(curveKeyPair(buf[0 .. 40], buf[41 .. 82]));
3546     assertThrown!RangeError(curveKeyPair(buf[0 .. 41], buf[42 .. 82]));
3547     assert (backup[] == buf[]);
3548 }
3549 
3550 
3551 /**
3552 Utility function which interprets and validates a byte array as a UTF-8 string.
3553 
3554 Most of $(ZMQD)'s message API deals in $(D ubyte[]) arrays, but very often,
3555 the message _data contains plain text.  $(D asString()) allows for easy and
3556 safe interpretation of raw _data as characters.  It checks that $(D data) is
3557 a valid UTF-8 encoded string, and returns a $(D char[]) array that refers to
3558 the same memory region.
3559 
3560 Throws:
3561     $(STDREF utf,UTFException) if $(D data) is not a valid UTF-8 string.
3562 See_also:
3563     $(STDREF string,representation), which performs the opposite operation.
3564 */
3565 inout(char)[] asString(inout(ubyte)[] data) pure
3566 {
3567     auto s = cast(typeof(return)) data;
3568     import std.utf: validate;
3569     validate(s);
3570     return s;
3571 }
3572 
3573 ///
3574 unittest
3575 {
3576     auto s1 = Socket(SocketType.pair);
3577     s1.bind("inproc://zmqd_asString_example");
3578     auto s2 = Socket(SocketType.pair);
3579     s2.connect("inproc://zmqd_asString_example");
3580 
3581     auto msg = Frame(12);
3582     msg.data.asString()[] = "Hello World!";
3583     s1.send(msg);
3584 
3585     ubyte[12] buf;
3586     s2.receive(buf);
3587     assert(buf.asString() == "Hello World!");
3588 }
3589 
3590 unittest
3591 {
3592     auto bytes = cast(ubyte[]) ['f', 'o', 'o'];
3593     auto text = bytes.asString();
3594     assert (text == "foo");
3595     assert (cast(void*) bytes.ptr == cast(void*) text.ptr);
3596 
3597     import std.exception: assertThrown;
3598     import std.utf: UTFException;
3599     auto b = cast(ubyte[]) [100, 252, 1];
3600     assertThrown!UTFException(asString(b));
3601 }
3602 
3603 
3604 /**
3605 A class for exceptions thrown when any of the underlying $(ZMQ) C functions
3606 report an error.
3607 
3608 The exception provides a standard error message obtained with
3609 $(ZMQREF zmq_strerror()), as well as the $(D errno) code set by the $(ZMQ)
3610 function which reported the error.
3611 */
3612 class ZmqException : Exception
3613 {
3614     /**
3615     The $(D _errno) code that was set by the $(ZMQ) function that reported
3616     the error.
3617 
3618     Corresponds_to:
3619         $(ZMQREF zmq_errno())
3620     */
3621     immutable int errno;
3622 
3623 private:
3624     this(string file = __FILE__, int line = __LINE__) nothrow
3625     {
3626         import core.stdc.errno, std.conv;
3627         this.errno = core.stdc.errno.errno;
3628         string msg;
3629         try {
3630             msg = trusted!(to!string)(trusted!zmq_strerror(this.errno));
3631         } catch (Exception e) { /* We never get here */ }
3632         assert(msg.length);     // Still, let's assert as much.
3633         super(msg, file, line);
3634     }
3635 }
3636 
3637 
3638 /**
3639 Exception thrown by $(FREF receiveEvent) on failure to interpret a
3640 received message as an event description.
3641 */
3642 class InvalidEventException : Exception
3643 {
3644 private:
3645     this(string file = __FILE__, int line = __LINE__) nothrow
3646     {
3647         super("The received message is not an event message", file, line);
3648     }
3649 }
3650 
3651 
3652 // =============================================================================
3653 // Everything below is internal
3654 // =============================================================================
3655 private:
3656 
3657 
3658 struct SharedResource
3659 {
3660 @safe:
3661     alias Exception function(shared(void)*) nothrow Release;
3662 
3663     this(shared(void)* ptr, Release release) nothrow
3664         in { assert(ptr); } body
3665     {
3666         m_payload = new shared(Payload)(1, ptr, release);
3667     }
3668 
3669     this(this) nothrow
3670     {
3671         if (m_payload) {
3672             incRefCount();
3673         }
3674     }
3675 
3676     ~this() nothrow
3677     {
3678         nothrowDetach();
3679     }
3680 
3681     ref SharedResource opAssign(SharedResource rhs)
3682     {
3683         detach();
3684         m_payload = rhs.m_payload;
3685         rhs.m_payload = null;
3686         return this;
3687     }
3688 
3689     void detach()
3690     {
3691         if (auto ex = nothrowDetach()) throw ex;
3692     }
3693 
3694     void forceRelease() @system
3695     {
3696         if (m_payload) {
3697             scope(exit) m_payload = null;
3698             decRefCount();
3699             if (m_payload.handle != null) {
3700                 scope(exit) m_payload.handle = null;
3701                 if (auto ex = m_payload.release(m_payload.handle)) {
3702                     throw ex;
3703                 }
3704             }
3705         }
3706     }
3707 
3708     @property inout(shared(void))* handle() inout pure nothrow
3709     {
3710         if (m_payload) {
3711             return m_payload.handle;
3712         } else {
3713             return null;
3714         }
3715     }
3716 
3717 private:
3718     void incRefCount() @trusted nothrow
3719     {
3720         assert (m_payload !is null && m_payload.refCount > 0);
3721         import core.atomic: atomicOp;
3722         atomicOp!"+="(m_payload.refCount, 1);
3723     }
3724 
3725     int decRefCount() @trusted nothrow
3726     {
3727         assert (m_payload !is null && m_payload.refCount > 0);
3728         import core.atomic: atomicOp;
3729         return atomicOp!"-="(m_payload.refCount, 1);
3730     }
3731 
3732     Exception nothrowDetach() @trusted nothrow
3733         out { assert (m_payload is null); }
3734         body
3735     {
3736         if (m_payload) {
3737             scope(exit) m_payload = null;
3738             if (decRefCount() < 1 && m_payload.handle != null) {
3739                 return m_payload.release(m_payload.handle);
3740             }
3741         }
3742         return null;
3743     }
3744 
3745     struct Payload
3746     {
3747         int refCount;
3748         void* handle;
3749         Release release;
3750     }
3751     shared(Payload)* m_payload;
3752 
3753     invariant()
3754     {
3755         assert (m_payload is null ||
3756             (m_payload.refCount > 0 && m_payload.release !is null));
3757     }
3758 }
3759 
3760 @system unittest
3761 {
3762     import std.exception: assertNotThrown, assertThrown;
3763     static Exception myFree(shared(void)* p) @trusted nothrow
3764     {
3765         auto v = cast(shared(int)*) p;
3766         if (*v == 0) {
3767             return new Exception("double release");
3768         } else {
3769             *v = 0;
3770             return null;
3771         }
3772     }
3773 
3774     shared int i = 1;
3775 
3776     {
3777         // Test constructor and properties.
3778         auto ra = SharedResource(&i, &myFree);
3779         assert (i == 1);
3780         assert (ra.handle == &i);
3781 
3782         // Test postblit constructor
3783         auto rb = ra;
3784         assert (i == 1);
3785         assert (rb.handle == &i);
3786 
3787         {
3788             // Test properties and free() with default-initialized object.
3789             SharedResource rc;
3790             assert (rc.handle == null);
3791             assertNotThrown(rc.detach());
3792 
3793             // Test assignment, both with and without detachment
3794             rc = rb;
3795             assert (i == 1);
3796             assert (rc.handle == &i);
3797 
3798             shared int j = 2;
3799             auto rd = SharedResource(&j, &myFree);
3800             assert (rd.handle == &j);
3801             rd = rb;
3802             assert (j == 0);
3803             assert (i == 1);
3804             assert (rd.handle == &i);
3805 
3806             // Test explicit detach()
3807             shared int k = 3;
3808             auto re = SharedResource(&k, &myFree);
3809             assertNotThrown(re.detach());
3810             assert(k == 0);
3811 
3812             // Test failure to free and assign (myFree(&k) fails when k == 0)
3813             re = SharedResource(&k, &myFree);
3814             assertThrown!Exception(re.detach()); // We defined free(k == 0) as an error
3815             re = SharedResource(&k, &myFree);
3816             assertThrown!Exception(re = rb);
3817         }
3818 
3819         // i should not be "freed" yet
3820         assert (i == 1);
3821         assert (ra.handle == &i);
3822         assert (rb.handle == &i);
3823     }
3824     // ...but now it should.
3825     assert (i == 0);
3826 }
3827 
3828 // Thread safety test
3829 @system unittest
3830 {
3831     enum threadCount = 100;
3832     enum copyCount = 1000;
3833     static Exception myFree(shared(void)* p) @trusted nothrow
3834     {
3835         auto v = cast(shared(int)*) p;
3836         if (*v == 0) {
3837             return new Exception("double release");
3838         } else {
3839             *v = 0;
3840             return null;
3841         }
3842     }
3843     shared int raw = 1;
3844     {
3845         auto rs = SharedResource(&raw, &myFree);
3846 
3847         import core.thread;
3848         auto group = new ThreadGroup;
3849         foreach (i; 0 .. threadCount) {
3850             group.create(() {
3851                 auto a = rs;
3852                 foreach (j; 0 .. copyCount) {
3853                     auto b = a;
3854                     assert (b.handle == &raw);
3855                     assert (raw == 1);
3856                 }
3857             });
3858         }
3859         group.joinAll();
3860         assert (rs.handle == &raw);
3861         assert (raw == 1);
3862     }
3863     assert (raw == 0);
3864 }
3865 
3866 // forceRelease() test
3867 @system unittest
3868 {
3869     import std.exception: assertNotThrown, assertThrown;
3870     static Exception myFree(shared(void)* p) @trusted nothrow
3871     {
3872         auto v = cast(shared(int)*) p;
3873         if (*v == 0) {
3874             return new Exception("double release");
3875         } else {
3876             *v = 0;
3877             return null;
3878         }
3879     }
3880 
3881     shared int i = 1;
3882 
3883     auto ra = SharedResource(&i, &myFree);
3884     auto rb = ra;
3885     assert (ra.handle == &i);
3886     assert (rb.handle == ra.handle);
3887     rb.forceRelease();
3888     assert (rb.handle == null);
3889     assert (ra.handle == null);
3890 }
3891 
3892 
3893 version(unittest) private string uniqueUrl(string p, int n = __LINE__)
3894 {
3895     import std.uuid;
3896     return p ~ "://" ~ randomUUID().toString();
3897 }
3898 
3899 
3900 private auto trusted(alias func, Args...)(auto ref Args args) @trusted
3901 {
3902     return func(args);
3903 }
3904 
3905 
3906 // std.string.toStringz() is unsafe, so we provide our own implementation
3907 // tailored to the string sizes we are likely to encounter here.
3908 // Note that this implementation requires that the string be used immediately
3909 // upon return, and not stored, as the buffer will be reused most of the time.
3910 const(char)* zeroTermString(const char[] s) nothrow
3911 {
3912     import std.algorithm: max;
3913     static char[] buf;
3914     immutable len = s.length + 1;
3915     if (buf.length < len) buf.length = max(len, 1023);
3916     buf[0 .. s.length] = s;
3917     buf[s.length] = '\0';
3918     return buf.ptr;
3919 }
3920 
3921 @system unittest
3922 {
3923     auto c1 = zeroTermString("Hello World!");
3924     assert (c1[0 .. 13] == "Hello World!\0");
3925     auto c2 = zeroTermString("foo");
3926     assert (c2[0 .. 4] == "foo\0");
3927     assert (c1 == c2);
3928 }