1 /**
2 A thin wrapper around the low-level C API of the $(LINK2 http://zeromq.org,$(ZMQ))
3 messaging framework, for the $(LINK2 http://dlang.org,D programming language).
4 
5 Most functions in this module have a one-to-one relationship with functions
6 in the underlying C API.  Some adaptations have been made to make the API
7 safer, easier and more pleasant to use, namely:
8 $(UL
9     $(LI
10         Errors are signalled by means of exceptions rather than return
11         codes.  In particular, the $(REF ZmqException) class provides
12         a standard textual message for any error condition, but it also
13         provides access to the $(D errno) code set by the C function
14         that reported the error.)
15     $(LI
16         Functions are appropriately marked with $(D @safe), $(D pure)
17         and $(D nothrow), thus easing their use in high-level D code.)
18     $(LI
19         Memory and resources (i.e. contexts, sockets and messages) are
20         automatically managed, thus preventing leaks.)
21     $(LI
22         Context, socket and message options are implemented as properties.)
23 )
24 The names of functions and types in $(ZMQD) are very similar to those in
25 $(ZMQ), but they follow the D naming conventions.  For example,
26 $(D zmq_msg_send()) becomes $(D zmqd.Message.send()) and so on.  Thus,
27 the library should feel both familiar to $(ZMQ) users and natural to D
28 users.
29 
30 Due to the close correspondence with the C API, this documentation has
31 intentionally been kept sparse. There is really no reason to repeat the
32 contents of the $(ZMQAPI __start,$(ZMQ) reference manual) here.
33 Instead, the documentation for each function contains a "Corresponds to"
34 section that links to the appropriate page in the $(ZMQ) reference.  Any
35 details given in the present documentation mostly concern the D-specific
36 adaptations that have been made.
37 
38 Also note that the examples only use the INPROC and IPC transports.  The
39 reason for this is that the examples double as unittests, and we want to
40 avoid firewall troubles and other issues that could arise with the use of
41 network protocols such as TCP, PGM, etc.  Anyway, they are only short
42 snippets that demonstrate the syntax; for more comprehensive and realistic
43 examples, please refer to the $(LINK2 http://zguide.zeromq.org/page:all,
44 $(ZMQ) Guide).
45 
46 Version:
47     0.1 ($(ZMQ) 3.2 compatible)
48 Authors:
49     $(LINK2 http://github.com/kyllingstad,Lars T. Kyllingstad)
50 Copyright:
51     Copyright (c) 2013, Lars T. Kyllingstad. All rights reserved.
52 License:
53     $(ZMQD) is released under a BSD licence (see LICENCE.txt for details).$(BR)
54     Please refer to the $(LINK2 http://zeromq.org/area:licensing,$(ZMQ) site)
55     for details about $(ZMQ) licensing.
56 Macros:
57     D      = <code>$0</code>
58     EM     = <em>$0</em>
59     LDOTS  = &hellip;
60     QUOTE  = <blockquote>$0</blockquote>
61     REF    = $(D $(LINK2 #$1,$1))
62     STDREF = $(D $(LINK2 http://dlang.org/phobos/std_$1.html#.$2,std.$1.$2))
63     ZMQ    = &#x2205;MQ
64     ZMQAPI = $(LINK2 http://api.zeromq.org/3-2:$1,$+)
65     ZMQD   = $(ZMQ)D
66     ZMQREF = $(D $(ZMQAPI $1,$1))
67 */
68 module zmqd;
69 
70 import std.typecons;
71 import deimos.zmq.zmq;
72 
73 
74 version(Windows) {
75     alias SOCKET = size_t;
76 }
77 
78 
79 /**
80 Reports the $(ZMQ) library version.
81 
82 Returns:
83     A $(STDREF typecons,Tuple) with three integer fields that represent the
84     three versioning levels: $(D major), $(D minor) and $(D patch).
85 Corresponds_to:
86     $(ZMQREF zmq_version())
87 */
88 Tuple!(int, "major", int, "minor", int, "patch") zmqVersion() @safe nothrow
89 {
90     typeof(return) v;
91     trusted!zmq_version(&v.major, &v.minor, &v.patch);
92     return v;
93 }
94 
95 
96 /**
97 An object that encapsulates a $(ZMQ) context.
98 
99 In most programs, it is not necessary to use this type directly,
100 as $(REF Socket) will use a default global context if not explicitly
101 provided with one.  See $(REF defaultContext) for details.
102 
103 A default-initialized $(D Context) is not a valid $(ZMQ) context; it
104 must always be explicitly initialized with $(REF _Context.opCall):
105 ---
106 Context ctx;        // Not a valid context yet
107 ctx = Context();    // ...but now it is.
108 ---
109 $(D Context) objects can be passed around by value, and two copies will
110 refer to the same context.  The underlying context is managed using
111 reference counting, so that when the last copy of a $(D Context) goes
112 out of scope, the context is automatically destroyed.
113 
114 See_also:
115     $(REF defaultContext)
116 */
117 struct Context
118 {
119 @safe:
120     /**
121     Creates a new $(ZMQ) context.
122 
123     Returns:
124         A $(REF Context) object that encapsulates the new context.
125     Throws:
126         $(REF ZmqException) if $(ZMQ) reports an error.
127     Corresponds_to:
128         $(ZMQREF zmq_ctx_new())
129     */
130     static Context opCall()
131     {
132         if (auto c = trusted!zmq_ctx_new()) {
133             Context ctx;
134             ctx.m_resource = Resource(c, &zmq_ctx_destroy);
135             return ctx;
136         } else {
137             throw new ZmqException;
138         }
139     }
140 
141     ///
142     unittest
143     {
144         auto ctx = Context();
145         assert (ctx.initialized);
146     }
147 
148     /**
149     Destroys the $(ZMQ) context.
150 
151     It is normally not necessary to do this manually, as the context will
152     be destroyed automatically when the last reference to it goes out of
153     scope.
154 
155     Throws:
156         $(REF ZmqException) if $(ZMQ) reports an error.
157     Corresponds_to:
158         $(ZMQREF zmq_ctx_destroy())
159     */
160     void destroy()
161     {
162         m_resource.free();
163     }
164 
165     ///
166     unittest
167     {
168         auto ctx = Context();
169         assert (ctx.initialized);
170         ctx.destroy();
171         assert (!ctx.initialized);
172     }
173 
174     /**
175     The number of I/O threads.
176 
177     Throws:
178         $(REF ZmqException) if $(ZMQ) reports an error.
179     Corresponds_to:
180         $(ZMQREF zmq_ctx_get()) and $(ZMQREF zmq_ctx_set()) with
181         $(D ZMQ_IO_THREADS).
182     */
183     @property int ioThreads()
184     {
185         return getOption(ZMQ_IO_THREADS);
186     }
187 
188     /// ditto
189     @property void ioThreads(int value)
190     {
191         setOption(ZMQ_IO_THREADS, value);
192     }
193 
194     ///
195     unittest
196     {
197         auto ctx = Context();
198         ctx.ioThreads = 3;
199         assert (ctx.ioThreads == 3);
200     }
201 
202     /**
203     The maximum number of sockets.
204 
205     Throws:
206         $(REF ZmqException) if $(ZMQ) reports an error.
207     Corresponds_to:
208         $(ZMQREF zmq_ctx_get()) and $(ZMQREF zmq_ctx_set()) with
209         $(D ZMQ_MAX_SOCKETS).
210     */
211     @property int maxSockets()
212     {
213         return getOption(ZMQ_MAX_SOCKETS);
214     }
215 
216     /// ditto
217     @property void maxSockets(int value)
218     {
219         setOption(ZMQ_MAX_SOCKETS, value);
220     }
221 
222     ///
223     unittest
224     {
225         auto ctx = Context();
226         ctx.maxSockets = 512;
227         assert (ctx.maxSockets == 512);
228     }
229 
230     /**
231     The $(D void*) pointer used by the underlying C API to refer to the context.
232 
233     If the object has not been initialized, this function returns $(D null).
234     */
235     @property inout(void)* handle() inout pure nothrow
236     {
237         return m_resource.handle;
238     }
239 
240     /**
241     Whether this $(REF Context) object has been _initialized, i.e. whether it
242     refers to a valid $(ZMQ) context.
243     */
244     @property bool initialized() const pure nothrow
245     {
246         return m_resource.initialized;
247     }
248 
249     ///
250     @trusted unittest // TODO: Remove @trusted for DMD 2.064
251     {
252         Context ctx;
253         assert (!ctx.initialized);
254         ctx = Context();
255         assert (ctx.initialized);
256         ctx.destroy();
257         assert (!ctx.initialized);
258     }
259 
260 private:
261     int getOption(int option)
262     {
263         immutable value = trusted!zmq_ctx_get(m_resource.handle, option);
264         if (value < 0) {
265             throw new ZmqException;
266         }
267         return value;
268     }
269 
270     void setOption(int option, int value)
271     {
272         if (trusted!zmq_ctx_set(m_resource.handle, option, value) != 0) {
273             throw new ZmqException;
274         }
275     }
276 
277     Resource m_resource;
278 }
279 
280 
281 /**
282 A global context which is used by default by all sockets, unless they are
283 explicitly constructed with a different context.
284 
285 The $(ZMQ) Guide $(LINK2 http://zguide.zeromq.org/page:all#Getting-the-Context-Right,
286 has the following to say) about context creation:
287 $(QUOTE
288     You should create and use exactly one context in your process.
289     [$(LDOTS)] If at runtime a process has two contexts, these are
290     like separate $(ZMQ) instances. If that's explicitly what you
291     want, OK, but otherwise remember: $(EM Do one $(D zmq_ctx_new())
292     at the start of your main line code, and one $(D zmq_ctx_destroy())
293     at the end.)
294 )
295 By using $(D defaultContext()), this is exactly what you achieve.  The
296 context is created the first time the function is called, and is
297 automatically destroyed when the program ends.
298 
299 This function is thread safe.
300 
301 Throws:
302     $(REF ZmqException) if $(ZMQ) reports an error.
303 See_also:
304     $(REF Context)
305 */
306 Context defaultContext() @trusted
307 {
308     // For future reference: This is the low-lock singleton pattern. See:
309     // http://davesdprogramming.wordpress.com/2013/05/06/low-lock-singletons/
310     static bool instantiated;
311     __gshared Context ctx;
312     if (!instantiated) {
313         synchronized {
314             if (!ctx.initialized) {
315                 ctx = Context();
316             }
317             instantiated = true;
318         }
319     }
320     return ctx;
321 }
322 
323 unittest
324 {
325     auto c1 = defaultContext();
326     auto c2 = defaultContext();
327     assert(c1.handle !is null);
328     assert(c1.handle == c2.handle);
329 }
330 
331 
332 /**
333 The various socket types.
334 
335 These are described in the $(ZMQREF zmq_socket()) reference.
336 */
337 enum SocketType
338 {
339     req     = ZMQ_REQ,      /// Corresponds to $(D ZMQ_REQ)
340     rep     = ZMQ_REP,      /// Corresponds to $(D ZMQ_REP)
341     dealer  = ZMQ_DEALER,   /// Corresponds to $(D ZMQ_DEALER)
342     router  = ZMQ_ROUTER,   /// Corresponds to $(D ZMQ_ROUTER)
343     pub     = ZMQ_PUB,      /// Corresponds to $(D ZMQ_PUB)
344     sub     = ZMQ_SUB,      /// Corresponds to $(D ZMQ_SUB)
345     xpub    = ZMQ_XPUB,     /// Corresponds to $(D ZMQ_XPUB)
346     xsub    = ZMQ_XSUB,     /// Corresponds to $(D ZMQ_XSUB)
347     push    = ZMQ_PUSH,     /// Corresponds to $(D ZMQ_PUSH)
348     pull    = ZMQ_PULL,     /// Corresponds to $(D ZMQ_PULL)
349     pair    = ZMQ_PAIR,     /// Corresponds to $(D ZMQ_PAIR)
350 }
351 
352 
353 /**
354 An object that encapsulates a $(ZMQ) socket.
355 
356 A default-initialized $(D Socket) is not a valid $(ZMQ) socket; it
357 must always be explicitly initialized with a constructor (see
358 $(REF _Socket.this)):
359 ---
360 Socket s;                     // Not a valid socket yet
361 s = Socket(SocketType.push);  // ...but now it is.
362 ---
363 $(D Socket) objects can be passed around by value, and two copies will
364 refer to the same socket.  The underlying socket is managed using
365 reference counting, so that when the last copy of a $(D Socket) goes
366 out of scope, the socket is automatically closed.
367 */
368 struct Socket
369 {
370 @safe:
371     /**
372     Creates a new $(ZMQ) socket.
373 
374     If $(D context) is not specified, the default context (as returned
375     by $(REF defaultContext)) is used.
376 
377     Throws:
378         $(REF ZmqException) if $(ZMQ) reports an error.
379     Corresponds_to:
380         $(ZMQREF zmq_socket())
381     */
382     this(SocketType type)
383     {
384         this(defaultContext(), type);
385     }
386 
387     /// ditto
388     this(Context context, SocketType type)
389     {
390         if (auto s = trusted!zmq_socket(context.handle, type)) {
391             // TODO: Replace the next line with the one below for DMD 2.064
392             (Context c) @trusted { m_context = c; } (context);
393             // m_context = ctx;
394             m_type = type;
395             m_socket = Resource(s, &zmq_close);
396         } else {
397             throw new ZmqException;
398         }
399     }
400 
401     /// With default context:
402     unittest
403     {
404         auto sck = Socket(SocketType.push);
405         assert (sck.initialized);
406     }
407     /// With explicit context:
408     unittest
409     {
410         auto ctx = Context();
411         auto sck = Socket(ctx, SocketType.push);
412         assert (sck.initialized);
413     }
414 
415     /**
416     Closes the $(ZMQ) socket.
417 
418     Note that the socket will be automatically closed when the last reference
419     to it goes out of scope, so it is often not necessary to call this
420     method manually.
421 
422     Throws:
423         $(REF ZmqException) if $(ZMQ) reports an error.
424     Corresponds_to:
425         $(ZMQREF zmq_close())
426     */
427     void close()
428     {
429         m_socket.free();
430     }
431 
432     ///
433     unittest
434     {
435         auto s = Socket(SocketType.pair);
436         assert (s.initialized);
437         s.close();
438         assert (!s.initialized);
439     }
440 
441     /**
442     Starts accepting incoming connections on $(D endpoint).
443 
444     Throws:
445         $(REF ZmqException) if $(ZMQ) reports an error.
446     Corresponds_to:
447         $(ZMQREF zmq_bind())
448     */
449     void bind(const char[] endpoint)
450     {
451         if (trusted!zmq_bind(m_socket.handle, zeroTermString(endpoint)) != 0) {
452             throw new ZmqException;
453         }
454     }
455 
456     ///
457     unittest
458     {
459         auto s = Socket(SocketType.pub);
460         s.bind("inproc://zmqd_bind_example");
461     }
462 
463     /**
464     Stops accepting incoming connections on $(D endpoint).
465 
466     Throws:
467         $(REF ZmqException) if $(ZMQ) reports an error.
468     Corresponds_to:
469         $(ZMQREF zmq_unbind())
470     */
471     void unbind(const char[] endpoint)
472     {
473         if (trusted!zmq_unbind(m_socket.handle, zeroTermString(endpoint)) != 0) {
474             throw new ZmqException;
475         }
476     }
477 
478     ///
479     unittest
480     {
481         auto s = Socket(SocketType.pub);
482         s.bind("ipc://zmqd_unbind_example");
483         // Do some work...
484         s.unbind("ipc://zmqd_unbind_example");
485     }
486 
487     /**
488     Creates an outgoing connection to $(D endpoint).
489 
490     Throws:
491         $(REF ZmqException) if $(ZMQ) reports an error.
492     Corresponds_to:
493         $(ZMQREF zmq_connect())
494     */
495     void connect(const char[] endpoint)
496     {
497         if (trusted!zmq_connect(m_socket.handle, zeroTermString(endpoint)) != 0) {
498             throw new ZmqException;
499         }
500     }
501 
502     ///
503     unittest
504     {
505         auto s = Socket(SocketType.sub);
506         s.connect("ipc://zmqd_connect_example");
507     }
508 
509     /**
510     Disconnects the socket from $(D endpoint).
511 
512     Throws:
513         $(REF ZmqException) if $(ZMQ) reports an error.
514     Corresponds_to:
515         $(ZMQREF zmq_disconnect())
516     */
517     void disconnect(const char[] endpoint)
518     {
519         if (trusted!zmq_disconnect(m_socket.handle, zeroTermString(endpoint)) != 0) {
520             throw new ZmqException;
521         }
522     }
523 
524     ///
525     unittest
526     {
527         auto s = Socket(SocketType.sub);
528         s.connect("ipc://zmqd_disconnect_example");
529         // Do some work...
530         s.disconnect("ipc://zmqd_disconnect_example");
531     }
532 
533     /**
534     Sends a message part.
535 
536     $(D _send) blocks until the message has been queued on the socket.
537     $(D trySend) performs the operation in non-blocking mode, and returns
538     a $(D bool) value that signifies whether the message was queued on the
539     socket.
540 
541     The $(D char[]) overload is a convenience function that simply casts
542     the string argument to $(D ubyte[]).
543 
544     Throws:
545         $(REF ZmqException) if $(ZMQ) reports an error.
546     Corresponds_to:
547         $(ZMQREF zmq_send()) (with the $(D ZMQ_DONTWAIT) flag, in the case
548         of $(D trySend)).
549     */
550     void send(const ubyte[] data, bool more = false)
551     {
552         immutable flags = more ? ZMQ_SNDMORE : 0;
553         if (trusted!zmq_send(m_socket.handle, data.ptr, data.length, flags) < 0) {
554             throw new ZmqException;
555         }
556     }
557 
558     /// ditto
559     void send(const char[] data, bool more = false) @trusted
560     {
561         send(cast(ubyte[]) data, more);
562     }
563 
564     /// ditto
565     bool trySend(const ubyte[] data, bool more = false)
566     {
567         immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0);
568         if (trusted!zmq_send(m_socket.handle, data.ptr, data.length, flags) < 0) {
569             import core.stdc.errno;
570             if (errno == EAGAIN) return false;
571             else throw new ZmqException;
572         }
573         return true;
574     }
575 
576     /// ditto
577     bool trySend(const char[] data, bool more = false) @trusted
578     {
579         return trySend(cast(ubyte[]) data, more);
580     }
581 
582     ///
583     unittest
584     {
585         auto sck = Socket(SocketType.pub);
586         sck.send(cast(ubyte[]) [11, 226, 92]);
587         sck.send("Hello World!");
588     }
589 
590     /**
591     Sends a message part.
592 
593     $(D _send) blocks until the message has been queued on the socket.
594     $(D trySend) performs the operation in non-blocking mode, and returns
595     a $(D bool) value that signifies whether the message was queued on the
596     socket.
597 
598     Throws:
599         $(REF ZmqException) if $(ZMQ) reports an error.
600     Corresponds_to:
601         $(ZMQREF zmq_msg_send()) (with the $(D ZMQ_DONTWAIT) flag, in the case
602         of $(D trySend)).
603     */
604     void send(ref Message msg, bool more = false)
605     {
606         immutable flags = more ? ZMQ_SNDMORE : 0;
607         if (trusted!zmq_msg_send(msg.handle, m_socket.handle, flags) < 0) {
608             throw new ZmqException;
609         }
610     }
611 
612     /// ditto
613     bool trySend(ref Message msg, bool more = false)
614     {
615         immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0);
616         if (trusted!zmq_msg_send(msg.handle, m_socket.handle, flags) < 0) {
617             import core.stdc.errno;
618             if (errno == EAGAIN) return false;
619             else throw new ZmqException;
620         }
621         return true;
622     }
623 
624     ///
625     unittest
626     {
627         auto sck = Socket(SocketType.pub);
628         auto msg = Message(12);
629         msg.data.asString()[] = "Hello World!";
630         sck.send(msg);
631     }
632 
633     /**
634     Receives a message part.
635 
636     $(D _receive) blocks until the request can be satisfied.
637     $(D tryReceive) performs the operation in non-blocking mode, and returns
638     a $(D bool) value that signifies whether a message was received.
639 
640     Throws:
641         $(REF ZmqException) if $(ZMQ) reports an error.
642     Corresponds_to:
643         $(ZMQREF zmq_recv()) (with the $(D ZMQ_DONTWAIT) flag, in the case
644         of $(D tryReceive)).
645 
646     */
647     size_t receive(ubyte[] data)
648     {
649         immutable len = trusted!zmq_recv(m_socket.handle, data.ptr, data.length, 0);
650         if (len >= 0) {
651             import std.conv;
652             return to!size_t(len);
653         } else {
654             throw new ZmqException;
655         }
656     }
657 
658     /// ditto
659     Tuple!(size_t, bool) tryReceive(ubyte[] data)
660     {
661         immutable len = trusted!zmq_recv(m_socket.handle, data.ptr, data.length, ZMQ_DONTWAIT);
662         if (len >= 0) {
663             import std.conv;
664             return typeof(return)(to!size_t(len), true);
665         } else {
666             import core.stdc.errno;
667             if (errno == EAGAIN) {
668                 return typeof(return)(0, false);
669             } else {
670                 throw new ZmqException;
671             }
672         }
673     }
674 
675     ///
676     unittest
677     {
678         // Sender
679         auto snd = Socket(SocketType.req);
680         snd.connect("ipc://zmqd_receive_example");
681         snd.send("Hello World!");
682 
683         // Receiver
684         import std.string: representation;
685         auto rcv = Socket(SocketType.rep);
686         rcv.bind("ipc://zmqd_receive_example");
687         char[256] buf;
688         immutable len  = rcv.receive(buf.representation);
689         assert (buf[0 .. len] == "Hello World!");
690     }
691 
692     @trusted unittest
693     {
694         auto snd = Socket(SocketType.pair);
695         snd.bind("ipc://zmqd_tryReceive_example");
696         auto rcv = Socket(SocketType.pair);
697         rcv.connect("ipc://zmqd_tryReceive_example");
698 
699         ubyte[256] buf;
700         auto r1 = rcv.tryReceive(buf);
701         assert (!r1[1]);
702 
703         import core.thread, core.time, std.string;
704         snd.send("Hello World!");
705         Thread.sleep(100.msecs); // Wait for message to be transferred...
706         auto r2 = rcv.tryReceive(buf);
707         assert (r2[1] && buf[0 .. r2[0]] == "Hello World!".representation);
708     }
709 
710     /**
711     Receives a message part.
712 
713     $(D _receive) blocks until the request can be satisfied.
714     $(D tryReceive) performs the operation in non-blocking mode, and returns
715     a $(D bool) value that signifies whether a message was received.
716 
717     Throws:
718         $(REF ZmqException) if $(ZMQ) reports an error.
719     Corresponds_to:
720         $(ZMQREF zmq_msg_recv()) (with the $(D ZMQ_DONTWAIT) flag, in the case
721         of $(D tryReceive)).
722 
723     */
724     size_t receive(ref Message msg)
725     {
726         immutable len = trusted!zmq_msg_recv(msg.handle, m_socket.handle, 0);
727         if (len >= 0) {
728             import std.conv;
729             return to!size_t(len);
730         } else {
731             throw new ZmqException;
732         }
733     }
734 
735     /// ditto
736     Tuple!(size_t, bool) tryReceive(ref Message msg)
737     {
738         immutable len = trusted!zmq_msg_recv(msg.handle, m_socket.handle, ZMQ_DONTWAIT);
739         if (len >= 0) {
740             import std.conv;
741             return typeof(return)(to!size_t(len), true);
742         } else {
743             import core.stdc.errno;
744             if (errno == EAGAIN) {
745                 return typeof(return)(0, false);
746             } else {
747                 throw new ZmqException;
748             }
749         }
750     }
751 
752     ///
753     unittest
754     {
755         // Sender
756         auto snd = Socket(SocketType.req);
757         snd.connect("ipc://zmqd_msg_receive_example");
758         snd.send("Hello World!");
759 
760         // Receiver
761         import std.string: representation;
762         auto rcv = Socket(SocketType.rep);
763         rcv.bind("ipc://zmqd_msg_receive_example");
764         auto msg = Message();
765         rcv.receive(msg);
766         assert (msg.data.asString() == "Hello World!");
767     }
768 
769     @trusted unittest
770     {
771         auto snd = Socket(SocketType.pair);
772         snd.bind("ipc://zmqd_msg_tryReceive_example");
773         auto rcv = Socket(SocketType.pair);
774         rcv.connect("ipc://zmqd_msg_tryReceive_example");
775 
776         auto msg = Message();
777         auto r1 = rcv.tryReceive(msg);
778         assert (!r1[1]);
779 
780         import core.thread, core.time, std.string;
781         snd.send("Hello World!");
782         Thread.sleep(100.msecs); // Wait for message to be transferred...
783         auto r2 = rcv.tryReceive(msg);
784         assert (r2[1] && msg.data[0 .. r2[0]] == "Hello World!".representation);
785     }
786 
787     /**
788     The socket _type.
789 
790     Throws:
791         $(REF ZmqException) if $(ZMQ) reports an error.
792     Corresponds_to:
793         $(ZMQREF zmq_msg_getsockopt()) with $(D ZMQ_TYPE).
794     */
795     @property SocketType type() { return getOption!SocketType(ZMQ_TYPE); }
796 
797     ///
798     unittest
799     {
800         auto sck = Socket(SocketType.xpub);
801         assert (sck.type == SocketType.xpub);
802     }
803 
804     /**
805     Whether there are _more message data parts to follow.
806 
807     Throws:
808         $(REF ZmqException) if $(ZMQ) reports an error.
809     Corresponds_to:
810         $(ZMQREF zmq_msg_getsockopt()) with $(D ZMQ_RCVMORE).
811     */
812     @property bool more() { return !!getOption!int(ZMQ_RCVMORE); }
813 
814     // TODO: Better unittest/example
815     unittest
816     {
817         auto sck = Socket(SocketType.req);
818         assert (!sck.more);
819     }
820 
821     /**
822     Misc. socket properties.
823 
824     Each of these has a one-to-one correspondence with an option passed to
825     $(ZMQREF zmq_msg_getsockopt()) and $(ZMQREF zmq_msg_setsockopt()). For
826     example, $(D identity) corresponds to $(D ZMQ_IDENTITY),
827     $(D receiveBufferSize) corresponds to $(D ZMQ_RCVBUF), etc.
828 
829     Notes:
830     $(UL
831         $(LI For convenience, the setter for the $(D identity) property
832             accepts strings.  To retrieve a string with the getter, use
833             the $(REF asString) function.
834             ---
835             sck.identity = "foobar";
836             assert (sck.identity.asString() == "foobar");
837             ---
838             )
839         $(LI The $(D fd) property is an $(D int) on POSIX and a $(D SOCKET)
840             on Windows.)
841         $(LI The $(D ZMQ_SUBSCRIBE) and $(D ZMQ_UNSUBSCRIBE) options are
842             treated differently from the others; see $(REF Socket.subscribe)
843             and $(REF Socket.unsubscribe))
844     )
845 
846     Throws:
847         $(REF ZmqException) if $(ZMQ) reports an error.
848     Corresponds_to:
849         $(ZMQREF zmq_msg_getsockopt()) and $(ZMQREF zmq_msg_setsockopt()).
850     */
851     @property int sendHWM() { return getOption!int(ZMQ_SNDHWM); }
852     /// ditto
853     @property void sendHWM(int value) { setOption(ZMQ_SNDHWM, value); }
854 
855     /// ditto
856     @property int receiveHWM() { return getOption!int(ZMQ_RCVHWM); }
857     /// ditto
858     @property void receiveHWM(int value) { setOption(ZMQ_RCVHWM, value); }
859 
860     /// ditto
861     @property ulong threadAffinity() { return getOption!ulong(ZMQ_AFFINITY); }
862     /// ditto
863     @property void threadAffinity(ulong value) { setOption(ZMQ_AFFINITY, value); }
864 
865     /// ditto
866     @property ubyte[] identity() @trusted
867     {
868         // This function is not @safe because it calls a @system function
869         // (zmq_getsockopt) and takes the address of a local (len).
870         auto buf = new ubyte[255];
871         size_t len = buf.length;
872         if (zmq_getsockopt(m_socket.handle, ZMQ_IDENTITY, buf.ptr, &len) != 0) {
873             throw new ZmqException;
874         }
875         return buf[0 .. len];
876     }
877     /// ditto
878     @property void identity(const ubyte[] value) { setOption(ZMQ_IDENTITY, value); }
879     /// ditto
880     @property void identity(const  char[] value) { setOption(ZMQ_IDENTITY, value); }
881 
882     /// ditto
883     @property int rate() { return getOption!int(ZMQ_RATE); }
884     /// ditto
885     @property void rate(int value) { setOption(ZMQ_RATE, value); }
886 
887     /// ditto
888     @property int recoveryInterval() { return getOption!int(ZMQ_RECOVERY_IVL); }
889     /// ditto
890     @property void recoveryInterval(int value) { setOption(ZMQ_RECOVERY_IVL, value); }
891 
892     /// ditto
893     @property int sendBufferSize() { return getOption!int(ZMQ_SNDBUF); }
894     /// ditto
895     @property void sendBufferSize(int value) { setOption(ZMQ_SNDBUF, value); }
896 
897     /// ditto
898     @property int receiveBufferSize() { return getOption!int(ZMQ_RCVBUF); }
899     /// ditto
900     @property void receiveBufferSize(int value) { setOption(ZMQ_RCVBUF, value); }
901 
902     /// ditto
903     @property int linger() { return getOption!int(ZMQ_LINGER); }
904     /// ditto
905     @property void linger(int value) { setOption(ZMQ_LINGER, value); }
906 
907     /// ditto
908     @property int reconnectionInterval() { return getOption!int(ZMQ_RECONNECT_IVL); }
909     /// ditto
910     @property void reconnectionInterval(int value) { setOption(ZMQ_RECONNECT_IVL, value); }
911 
912     /// ditto
913     @property int maxReconnectionInterval() { return getOption!int(ZMQ_RECONNECT_IVL_MAX); }
914     /// ditto
915     @property void maxReconnectionInterval(int value) { setOption(ZMQ_RECONNECT_IVL_MAX, value); }
916 
917     /// ditto
918     @property int backlog() { return getOption!int(ZMQ_BACKLOG); }
919     /// ditto
920     @property void backlog(int value) { setOption(ZMQ_BACKLOG, value); }
921 
922     /// ditto
923     @property long maxMsgSize() { return getOption!long(ZMQ_MAXMSGSIZE); }
924     /// ditto
925     @property void maxMsgSize(long value) { setOption(ZMQ_MAXMSGSIZE, value); }
926 
927     /// ditto
928     @property int multicastHops() { return getOption!int(ZMQ_MULTICAST_HOPS); }
929     /// ditto
930     @property void multicastHops(int value) { setOption(ZMQ_MULTICAST_HOPS, value); }
931 
932     /// ditto
933     @property int receiveTimeout() { return getOption!int(ZMQ_RCVTIMEO); }
934     /// ditto
935     @property void receiveTimeout(int value) { setOption(ZMQ_RCVTIMEO, value); }
936 
937     /// ditto
938     @property int sendTimeout() { return getOption!int(ZMQ_SNDTIMEO); }
939     /// ditto
940     @property void sendTimeout(int value) { setOption(ZMQ_SNDTIMEO, value); }
941 
942     /// ditto
943     @property bool ipv4Only() { return !!getOption!int(ZMQ_IPV4ONLY); }
944     /// ditto
945     @property void ipv4Only(bool value) { setOption(ZMQ_IPV4ONLY, value ? 1 : 0); }
946 
947     /// ditto
948     @property bool delayAttachOnConnect() { return !!getOption!int(ZMQ_DELAY_ATTACH_ON_CONNECT); }
949     /// ditto
950     @property void delayAttachOnConnect(bool value) { setOption(ZMQ_DELAY_ATTACH_ON_CONNECT, value ? 1 : 0); }
951 
952 
953     version (Windows) {
954         alias FD = SOCKET;
955     } else version (Posix) {
956         alias FD = int;
957     }
958 
959     /// ditto
960     @property FD fd() { return getOption!FD(ZMQ_FD); }
961 
962     /// ditto
963     @property int events() { return getOption!int(ZMQ_EVENTS); }
964 
965     /// ditto
966     @property char[] lastEndpoint() @trusted
967     {
968         // This function is not @safe because it calls a @system function
969         // (zmq_getsockopt) and takes the address of a local (len).
970         auto buf = new char[1024];
971         size_t len = buf.length;
972         if (zmq_getsockopt(m_socket.handle, ZMQ_LAST_ENDPOINT, buf.ptr, &len) != 0) {
973             throw new ZmqException;
974         }
975         return buf[0 .. len-1];
976     }
977 
978     // TODO: Some low-level options are missing still, plus setters for
979     // ZMQ_ROUTER_MANDATORY and ZMQ_XPUB_VERBOSE.
980 
981     unittest
982     {
983         // We test all the socket options by checking that they have their default value.
984         auto s = Socket(SocketType.xpub);
985         const e = "inproc://unittest2";
986         s.bind(e);
987         assert(s.type == SocketType.xpub);
988         assert(s.sendHWM == 1000);
989         assert(s.receiveHWM == 1000);
990         assert(s.threadAffinity == 0);
991         assert(s.identity == null);
992         assert(s.rate == 100);
993         assert(s.recoveryInterval == 10_000);
994         assert(s.sendBufferSize == 0);
995         assert(s.receiveBufferSize == 0);
996         assert(s.linger == -1);
997         assert(s.reconnectionInterval == 100);
998         assert(s.maxReconnectionInterval == 0);
999         assert(s.backlog == 100);
1000         assert(s.maxMsgSize == -1);
1001         assert(s.multicastHops == 1);
1002         assert(s.receiveTimeout == -1);
1003         assert(s.sendTimeout == -1);
1004         assert(s.ipv4Only);
1005         assert(!s.delayAttachOnConnect);
1006         version(Posix) {
1007             assert(s.fd > 2); // 0, 1 and 2 are the standard streams
1008         }
1009         assert(s.lastEndpoint == e);
1010 
1011         // Test setters and getters together
1012         s.sendHWM = 500;
1013         assert(s.sendHWM == 500);
1014         s.receiveHWM = 600;
1015         assert(s.receiveHWM == 600);
1016         s.threadAffinity = 1;
1017         assert(s.threadAffinity == 1);
1018         s.identity = cast(ubyte[]) [ 65, 66, 67 ];
1019         assert(s.identity == [65, 66, 67]);
1020         s.identity = "foo";
1021         assert(s.identity == [102, 111, 111]);
1022         s.rate = 200;
1023         assert(s.rate == 200);
1024         s.recoveryInterval = 5_000;
1025         assert(s.recoveryInterval == 5_000);
1026         s.sendBufferSize = 500;
1027         assert(s.sendBufferSize == 500);
1028         s.receiveBufferSize = 600;
1029         assert(s.receiveBufferSize == 600);
1030         s.linger = 0;
1031         assert(s.linger == 0);
1032         s.linger = 100;
1033         assert(s.linger == 100);
1034         s.reconnectionInterval = 200;
1035         assert(s.reconnectionInterval == 200);
1036         s.maxReconnectionInterval = 300;
1037         assert(s.maxReconnectionInterval == 300);
1038         s.backlog = 50;
1039         assert(s.backlog == 50);
1040         s.maxMsgSize = 1000;
1041         assert(s.maxMsgSize == 1000);
1042         s.multicastHops = 2;
1043         assert(s.multicastHops == 2);
1044         s.receiveTimeout = 3_000;
1045         assert(s.receiveTimeout == 3_000);
1046         s.sendTimeout = 2_000;
1047         assert(s.sendTimeout == 2_000);
1048         s.ipv4Only = false;
1049         assert(!s.ipv4Only);
1050         s.delayAttachOnConnect = true;
1051         assert(s.delayAttachOnConnect);
1052     }
1053 
1054     /**
1055     Establishes a message filter.
1056 
1057     Throws:
1058         $(REF ZmqException) if $(ZMQ) reports an error.
1059     Corresponds_to:
1060         $(ZMQREF zmq_msg_setsockopt()) with $(D ZMQ_SUBSCRIBE).
1061     */
1062     void subscribe(const ubyte[] filterPrefix) { setOption(ZMQ_SUBSCRIBE, filterPrefix); }
1063     /// ditto
1064     void subscribe(const  char[] filterPrefix) { setOption(ZMQ_SUBSCRIBE, filterPrefix); }
1065 
1066     ///
1067     unittest
1068     {
1069         // Create a subscriber that accepts all messages that start with
1070         // the prefixes "foo" or "bar".
1071         auto sck = Socket(SocketType.sub);
1072         sck.subscribe("foo");
1073         sck.subscribe("bar");
1074     }
1075 
1076     @trusted unittest
1077     {
1078         void sleep(int ms) {
1079             import core.thread, core.time;
1080             Thread.sleep(dur!"msecs"(ms));
1081         }
1082         auto pub = Socket(SocketType.pub);
1083         pub.bind("inproc://zmqd_subscribe_unittest");
1084         auto sub = Socket(SocketType.sub);
1085         sub.connect("inproc://zmqd_subscribe_unittest");
1086 
1087         pub.send("Hello");
1088         sleep(100);
1089         sub.subscribe("He");
1090         sub.subscribe(cast(ubyte[])['W', 'o']);
1091         sleep(100);
1092         pub.send("Heeee");
1093         pub.send("World");
1094         sleep(100);
1095         ubyte[5] buf;
1096         sub.receive(buf);
1097         assert(buf.asString() == "Heeee");
1098         sub.receive(buf);
1099         assert(buf.asString() == "World");
1100     }
1101 
1102     /**
1103     Removes a message filter.
1104 
1105     Throws:
1106         $(REF ZmqException) if $(ZMQ) reports an error.
1107     Corresponds_to:
1108         $(ZMQREF zmq_msg_setsockopt()) with $(D ZMQ_SUBSCRIBE).
1109     */
1110     void unsubscribe(const ubyte[] filterPrefix) { setOption(ZMQ_UNSUBSCRIBE, filterPrefix); }
1111     /// ditto
1112     void unsubscribe(const  char[] filterPrefix) { setOption(ZMQ_UNSUBSCRIBE, filterPrefix); }
1113 
1114     ///
1115     unittest
1116     {
1117         // Subscribe to messages that start with "foo" or "bar".
1118         auto sck = Socket(SocketType.sub);
1119         sck.subscribe("foo");
1120         sck.subscribe("bar");
1121         // ...
1122         // From now on, only accept messages that start with "bar"
1123         sck.unsubscribe("foo");
1124     }
1125 
1126     /**
1127     The $(D void*) pointer used by the underlying C API to refer to the socket.
1128 
1129     If the object has not been initialized, this function returns $(D null).
1130     */
1131     @property inout(void)* handle() inout pure nothrow
1132     {
1133         return m_socket.handle;
1134     }
1135 
1136     /**
1137     Whether this $(REF Socket) object has been _initialized, i.e. whether it
1138     refers to a valid $(ZMQ) socket.
1139     */
1140     @property bool initialized() const pure nothrow
1141     {
1142         return m_socket.initialized;
1143     }
1144 
1145     ///
1146     @trusted unittest // TODO: Remove @trusted for DMD 2.064
1147     {
1148         Socket sck;
1149         assert (!sck.initialized);
1150         sck = Socket(SocketType.sub);
1151         assert (sck.initialized);
1152         sck.close();
1153         assert (!sck.initialized);
1154     }
1155 
1156 private:
1157     T getOption(T)(int option) @trusted
1158     {
1159         T buf;
1160         auto len = T.sizeof;
1161         if (zmq_getsockopt(m_socket.handle, option, &buf, &len) != 0) {
1162             throw new ZmqException;
1163         }
1164         assert(len == T.sizeof);
1165         return buf;
1166     }
1167     void setOption()(int option, const void[] value)
1168     {
1169         if (trusted!zmq_setsockopt(m_socket.handle, option, value.ptr, value.length) != 0) {
1170             throw new ZmqException;
1171         }
1172     }
1173 
1174     import std.traits;
1175     void setOption(T)(int option, T value) @trusted if (isScalarType!T)
1176     {
1177         if (zmq_setsockopt(m_socket.handle, option, &value, value.sizeof) != 0) {
1178             throw new ZmqException;
1179         }
1180     }
1181 
1182     Context m_context;
1183     SocketType m_type;
1184     Resource m_socket;
1185 }
1186 
1187 unittest
1188 {
1189     auto s1 = Socket(SocketType.pair);
1190     auto s2 = Socket(SocketType.pair);
1191     s1.bind("inproc://unittest");
1192     s2.connect("inproc://unittest");
1193     s1.send("Hello World!");
1194     ubyte[12] buf;
1195     const len = s2.receive(buf[]);
1196     assert (len == 12);
1197     assert (buf == "Hello World!");
1198 }
1199 
1200 
1201 /**
1202 Starts the built-in $(ZMQ) _proxy.
1203 
1204 Corresponds_to:
1205     $(ZMQREF zmq_proxy())
1206 */
1207 void proxy(ref Socket frontend, ref Socket backend) @safe nothrow
1208 {
1209     trusted!zmq_proxy(frontend.handle, backend.handle, null);
1210 }
1211 
1212 /// ditto
1213 void proxy(ref Socket frontend, ref Socket backend, ref Socket capture)
1214     @safe nothrow
1215 {
1216     trusted!zmq_proxy(frontend.handle, backend.handle, capture.handle);
1217 }
1218 
1219 
1220 /**
1221 An object that encapsulates a $(ZMQ) message.
1222 
1223 This $(D struct) is a wrapper around a $(D zmq_msg_t) object.  Unlike
1224 $(REF Context) and $(REF Socket), it does $(EM not) perform reference
1225 counting, because $(ZMQ) messages have a form of reference counting of
1226 their own.  A $(D Message) cannot be copied by normal assignment; use
1227 $(REF Message.copy) for this.
1228 
1229 A default-initialized $(D Message) is not a valid $(ZMQ) message; it
1230 must always be explicitly initialized with $(REF _Message.opCall) or
1231 $(REF _Message.this):
1232 ---
1233 Message msg1;               // Invalid message
1234 auto msg2 = Message();      // Empty message
1235 auto msg3 = Message(1024);  // 1K message
1236 ---
1237 When a $(D Message) goes out of scope, $(ZMQREF zmq_msg_close()) is
1238 called on the underlying $(D zmq_msg_t).
1239 */
1240 struct Message
1241 {
1242 @safe:
1243     /**
1244     Initialises an empty $(ZMQ) message.
1245 
1246     Throws:
1247         $(REF ZmqException) if $(ZMQ) reports an error.
1248     Corresponds_to:
1249         $(ZMQREF zmq_msg_init())
1250     */
1251     static Message opCall()
1252     {
1253         Message m;
1254         if (trusted!zmq_msg_init(&m.m_msg) != 0) {
1255             throw new ZmqException;
1256         }
1257         m.m_initialized = true;
1258         return m;
1259     }
1260 
1261     ///
1262     unittest
1263     {
1264         auto msg = Message();
1265         assert(msg.size == 0);
1266     }
1267 
1268     /**
1269     Initialises a $(ZMQ) message of a specified size.
1270 
1271     Throws:
1272         $(REF ZmqException) if $(ZMQ) reports an error.
1273     Corresponds_to:
1274         $(ZMQREF zmq_msg_init_size())
1275     */
1276     this(size_t size)
1277     {
1278         if (trusted!zmq_msg_init_size(&m_msg, size) != 0) {
1279             throw new ZmqException;
1280         }
1281         m_initialized = true;
1282     }
1283 
1284     ///
1285     unittest
1286     {
1287         auto msg = Message(123);
1288         assert(msg.size == 123);
1289     }
1290 
1291     @disable this(this);
1292 
1293     /**
1294     Releases the $(ZMQ) message when the $(D Message) is destroyed.
1295 
1296     This destructor never throws, which means that any errors will go
1297     undetected.  If this is undesirable, call $(REF Message.close) before
1298     the $(D Message) is destroyed.
1299 
1300     Corresponds_to:
1301         $(ZMQREF zmq_msg_close())
1302     */
1303     ~this() nothrow
1304     {
1305         if (m_initialized) {
1306             immutable rc = trusted!zmq_msg_close(&m_msg);
1307             assert(rc == 0, "zmq_msg_close failed: Invalid message");
1308         }
1309     }
1310 
1311     /**
1312     Releases the $(ZMQ) message.
1313 
1314     Note that the message will be automatically released when the $(D Message)
1315     object is destroyed, so it is often not necessary to call this method
1316     manually.
1317 
1318     Throws:
1319         $(REF ZmqException) if $(ZMQ) reports an error.
1320     Corresponds_to:
1321         $(ZMQREF zmq_msg_close())
1322     */
1323     void close()
1324     {
1325         if (m_initialized) {
1326             if (trusted!zmq_msg_close(&m_msg) != 0) {
1327                 throw new ZmqException;
1328             }
1329             m_initialized = false;
1330         }
1331     }
1332 
1333     /**
1334     Copies message content to another message.
1335 
1336     $(D copy()) returns a new $(D Message) object, while $(D copyTo(dest))
1337     copies the contents of this $(D Message) into $(D dest).  $(D dest) must
1338     be a valid (i.e. initialised) $(D Message).
1339 
1340     Warning:
1341         These functions may not do what you think they do.  Please refer
1342         to $(ZMQAPI zmq_msg_copy(),the $(ZMQ) manual) for details.
1343     Throws:
1344         $(REF ZmqException) if $(ZMQ) reports an error.
1345     Corresponds_to:
1346         $(ZMQREF zmq_msg_copy())
1347     */
1348     Message copy()
1349     {
1350         auto cp = Message();
1351         copyTo(cp);
1352         return cp;
1353     }
1354 
1355     /// ditto
1356     void copyTo(ref Message dest)
1357     {
1358         if (trusted!zmq_msg_copy(&dest.m_msg, &m_msg) != 0) {
1359             throw new ZmqException;
1360         }
1361     }
1362 
1363     ///
1364     unittest
1365     {
1366         import std.string: representation;
1367         auto msg1 = Message(3);
1368         msg1.data[] = "foo".representation;
1369         auto msg2 = msg1.copy();
1370         assert (msg2.data.asString() == "foo");
1371     }
1372 
1373     /**
1374     Moves message content to another message.
1375 
1376     $(D move()) returns a new $(D Message) object, while $(D moveTo(dest))
1377     moves the contents of this $(D Message) to $(D dest).  $(D dest) must
1378     be a valid (i.e. initialised) $(D Message).
1379 
1380     Throws:
1381         $(REF ZmqException) if $(ZMQ) reports an error.
1382     Corresponds_to:
1383         $(ZMQREF zmq_msg_move())
1384     */
1385     Message move()
1386     {
1387         auto m = Message();
1388         moveTo(m);
1389         return m;
1390     }
1391 
1392     /// ditto
1393     void moveTo(ref Message dest)
1394     {
1395         if (trusted!zmq_msg_move(&dest.m_msg, &m_msg) != 0) {
1396             throw new ZmqException;
1397         }
1398     }
1399 
1400     ///
1401     unittest
1402     {
1403         import std.string: representation;
1404         auto msg1 = Message(3);
1405         msg1.data[] = "foo".representation;
1406         auto msg2 = msg1.move();
1407         assert (msg1.size == 0);
1408         assert (msg2.data.asString() == "foo");
1409     }
1410 
1411     /**
1412     The message content size in bytes.
1413 
1414     Corresponds_to:
1415         $(ZMQREF zmq_msg_size())
1416     */
1417     @property size_t size() nothrow
1418     {
1419         return trusted!zmq_msg_size(&m_msg);
1420     }
1421 
1422     ///
1423     unittest
1424     {
1425         auto msg = Message(123);
1426         assert(msg.size == 123);
1427     }
1428 
1429     /**
1430     Retrieves the message content.
1431 
1432     Corresponds_to:
1433         $(ZMQREF zmq_msg_data())
1434     */
1435     @property ubyte[] data() @trusted nothrow
1436     {
1437         return (cast(ubyte*) zmq_msg_data(&m_msg))[0 .. size];
1438     }
1439 
1440     ///
1441     unittest
1442     {
1443         import std.string: representation;
1444         auto msg = Message(3);
1445         assert(msg.data.length == 3);
1446         msg.data[] = "foo".representation; // Slice operator -> array copy.
1447         assert(msg.data.asString() == "foo");
1448     }
1449 
1450     /**
1451     Whether there are more message parts to retrieve.
1452 
1453     Corresponds_to:
1454         $(ZMQREF zmq_msg_more())
1455     */
1456     @property bool more() nothrow
1457     {
1458         return !!trusted!zmq_msg_more(&m_msg);
1459     }
1460 
1461     /**
1462     A pointer to the underlying $(D zmq_msg_t).
1463     */
1464     @property inout(zmq_msg_t)* handle() inout pure nothrow
1465     {
1466         return &m_msg;
1467     }
1468 
1469 private:
1470     bool m_initialized;
1471     zmq_msg_t m_msg;
1472 }
1473 
1474 unittest
1475 {
1476     const url = uniqueUrl("inproc");
1477     auto s1 = Socket(SocketType.pair);
1478     auto s2 = Socket(SocketType.pair);
1479     s1.bind(url);
1480     s2.connect(url);
1481 
1482     auto m1a = Message(123);
1483     m1a.data[] = 'a';
1484     s1.send(m1a);
1485     auto m2a = Message();
1486     s2.receive(m2a);
1487     assert(m2a.size == 123);
1488     foreach (e; m2a.data) assert(e == 'a');
1489 
1490     auto m1b = Message(10);
1491     m1b.data[] = 'b';
1492     s1.send(m1b);
1493     auto m2b = Message();
1494     s2.receive(m2b);
1495     assert(m2b.size == 10);
1496     foreach (e; m2b.data) assert(e == 'b');
1497 }
1498 
1499 
1500 /**
1501 Utility function which interprets and validates a byte array as a UTF-8 string.
1502 
1503 Most of $(ZMQD)'s message API deals in $(D ubyte[]) arrays, but very often,
1504 the message _data contains plain text.  $(D asString()) allows for easy and
1505 safe interpretation of raw _data as characters.  It checks that $(D data) is
1506 a valid UTF-8 encoded string, and returns a $(D char[]) array that refers to
1507 the same memory region.
1508 
1509 Throws:
1510     $(STDREF utf,UTFException) if $(D data) is not a valid UTF-8 string.
1511 See_also:
1512     $(STDREF string,representation), which performs the opposite operation.
1513 */
1514 inout(char)[] asString(inout(ubyte)[] data) @safe pure
1515 {
1516     auto s = cast(typeof(return)) data;
1517     import std.utf: validate;
1518     validate(s);
1519     return s;
1520 }
1521 
1522 ///
1523 unittest
1524 {
1525     auto s1 = Socket(SocketType.pair);
1526     s1.bind("ipc://zmqd_asString_example");
1527     auto s2 = Socket(SocketType.pair);
1528     s2.connect("ipc://zmqd_asString_example");
1529 
1530     auto msg = Message(12);
1531     msg.data.asString()[] = "Hello World!";
1532     s1.send(msg);
1533 
1534     ubyte[12] buf;
1535     s2.receive(buf);
1536     assert(buf.asString() == "Hello World!");
1537 }
1538 
1539 unittest
1540 {
1541     auto bytes = cast(ubyte[]) ['f', 'o', 'o'];
1542     auto text = bytes.asString();
1543     assert (text == "foo");
1544     assert (cast(void*) bytes.ptr == cast(void*) text.ptr);
1545 
1546     import std.exception: assertThrown;
1547     import std.utf: UTFException;
1548     auto b = cast(ubyte[]) [100, 252, 1];
1549     assertThrown!UTFException(asString(b));
1550 }
1551 
1552 
1553 /**
1554 A class for exceptions thrown when any of the underlying $(ZMQ) C functions
1555 report an error.
1556 
1557 The exception provides a standard error message obtained with
1558 $(ZMQREF zmq_strerror()), as well as the $(D errno) code set by the $(ZMQ)
1559 function which reported the error.
1560 */
1561 class ZmqException : Exception
1562 {
1563 @safe:
1564     /**
1565     The $(D errno) code that was set by the $(ZMQ) function that reported
1566     the error.
1567 
1568     Corresponds_to:
1569         $(ZMQREF zmq_errno())
1570     */
1571     immutable int errno;
1572 
1573 private:
1574     this(string file = __FILE__, int line = __LINE__) nothrow
1575     {
1576         import core.stdc.errno, std.conv;
1577         this.errno = core.stdc.errno.errno;
1578         string msg;
1579         try {
1580             msg = trusted!(to!string)(trusted!zmq_strerror(this.errno));
1581         } catch (Exception e) { /* We never get here */ }
1582         assert(msg.length);     // Still, let's assert as much.
1583         super(msg, file, line);
1584     }
1585 }
1586 
1587 
1588 private:
1589 
1590 struct Resource
1591 {
1592     alias extern(C) int function(void*) nothrow CFreeFunction;
1593 
1594     this(void* ptr, CFreeFunction freeFunc) @safe pure nothrow
1595         in { assert(ptr !is null); } body
1596     {
1597         m_payload = new Payload(1, ptr, freeFunc);
1598     }
1599 
1600     this(this) @safe pure nothrow
1601     {
1602         if (m_payload !is null) {
1603             ++(m_payload.refCount);
1604         }
1605     }
1606 
1607     // TODO: This function could be @safe, if not for a weird compiler bug.
1608     // https://d.puremagic.com/issues/show_bug.cgi?id=11505
1609     ~this() @trusted nothrow
1610     {
1611         detach();
1612     }
1613 
1614     ref Resource opAssign(Resource rhs) @safe
1615     {
1616         if (detach() != 0) {
1617             throw new ZmqException;
1618         }
1619         m_payload = rhs.m_payload;
1620         if (m_payload !is null) {
1621             ++(m_payload.refCount);
1622         }
1623         return this;
1624     }
1625 
1626     @property bool initialized() const @safe pure nothrow
1627     {
1628         return (m_payload !is null) && (m_payload.handle !is null);
1629     }
1630 
1631     void free() @safe
1632     {
1633         if (m_payload !is null && m_payload.free() != 0) {
1634             throw new ZmqException;
1635         }
1636     }
1637 
1638     @property inout(void)* handle() inout @safe pure nothrow
1639     {
1640         if (m_payload !is null) {
1641             return m_payload.handle;
1642         } else {
1643             return null;
1644         }
1645     }
1646 
1647 private:
1648     int detach() @safe nothrow
1649     {
1650         int rc = 0;
1651         if (m_payload !is null) {
1652             if (--(m_payload.refCount) < 1) {
1653                 rc = m_payload.free();
1654             }
1655             m_payload = null;
1656         }
1657         return rc;
1658     }
1659 
1660     struct Payload
1661     {
1662         int refCount;
1663         void* handle;
1664         CFreeFunction freeFunc;
1665 
1666         int free() @trusted nothrow
1667         {
1668             int rc = 0;
1669             if (handle !is null) {
1670                 rc = freeFunc(handle);
1671                 handle = null;
1672                 freeFunc = null;
1673             }
1674             return rc;
1675         }
1676     }
1677     Payload* m_payload;
1678 }
1679 
1680 unittest
1681 {
1682     import std.exception: assertNotThrown, assertThrown;
1683     static extern(C) int myFree(void* p) nothrow
1684     {
1685         auto v = cast(int*) p;
1686         if (*v == 0) {
1687             return -1;
1688         } else {
1689             *v = 0;
1690             return 0;
1691         }
1692     }
1693 
1694     int i = 1;
1695 
1696     {
1697         // Test constructor and properties.
1698         auto ra = Resource(&i, &myFree);
1699         assert (i == 1);
1700         assert (ra.initialized);
1701         assert (ra.handle == &i);
1702 
1703         // Test postblit constructor
1704         auto rb = ra;
1705         assert (i == 1);
1706         assert (rb.initialized);
1707         assert (rb.handle == &i);
1708 
1709         {
1710             // Test properties and free() with default-initialized object.
1711             Resource rc;
1712             assert (!rc.initialized);
1713             assert (rc.handle == null);
1714             assertNotThrown(rc.free());
1715 
1716             // Test assignment, both with and without detachment
1717             rc = rb;
1718             assert (i == 1);
1719             assert (rc.initialized);
1720             assert (rc.handle == &i);
1721 
1722             int j = 2;
1723             auto rd = Resource(&j, &myFree);
1724             assert (rd.handle == &j);
1725             rd = rb;
1726             assert (j == 0);
1727             assert (i == 1);
1728             assert (rd.handle == &i);
1729 
1730             // Test explicit free()
1731             int k = 3;
1732             auto re = Resource(&k, &myFree);
1733             assertNotThrown(re.free());
1734             assert(k == 0);
1735 
1736             // Test failure to free and assign (myFree(&k) fails when k == 0)
1737             re = Resource(&k, &myFree);
1738             assertThrown!ZmqException(re.free()); // We defined free(k == 0) as an error
1739             re = Resource(&k, &myFree);
1740             assertThrown!ZmqException(re = rb);
1741         }
1742 
1743         // i should not be "freed" yet
1744         assert (i == 1);
1745         assert (ra.handle == &i);
1746         assert (rb.handle == &i);
1747     }
1748     // ...but now it should.
1749     assert (i == 0);
1750 }
1751 
1752 
1753 version(unittest) private string uniqueUrl(string p, int n = __LINE__)
1754 {
1755     import std.uuid;
1756     return p ~ "://" ~ randomUUID().toString();
1757 }
1758 
1759 
1760 private auto trusted(alias func, Args...)(Args args) @trusted
1761 {
1762     return func(args);
1763 }
1764 
1765 
1766 // std.string.toStringz() is unsafe, so we provide our own implementation
1767 // tailored to the string sizes we are likely to encounter here.
1768 // Note that this implementation requires that the string be used immediately
1769 // upon return, and not stored, as the buffer will be reused most of the time.
1770 const char* zeroTermString(const char[] s) @safe nothrow
1771 {
1772     import std.algorithm: max;
1773     static char[] buf;
1774     immutable len = s.length + 1;
1775     if (buf.length < len) buf.length = max(len, 1023);
1776     buf[0 .. s.length] = s;
1777     buf[s.length] = '\0';
1778     return buf.ptr;
1779 }
1780 
1781 unittest
1782 {
1783     auto c1 = zeroTermString("Hello World!");
1784     assert (c1[0 .. 13] == "Hello World!\0");
1785     auto c2 = zeroTermString("foo");
1786     assert (c2[0 .. 4] == "foo\0");
1787     assert (c1 == c2);
1788 }