1 /** 2 A thin wrapper around the low-level C API of the $(LINK2 http://zeromq.org,$(ZMQ)) 3 messaging framework, for the $(LINK2 http://dlang.org,D programming language). 4 5 Most functions in this module have a one-to-one relationship with functions 6 in the underlying C API. Some adaptations have been made to make the API 7 safer, easier and more pleasant to use, namely: 8 $(UL 9 $(LI 10 Errors are signalled by means of exceptions rather than return 11 codes. In particular, the $(REF ZmqException) class provides 12 a standard textual message for any error condition, but it also 13 provides access to the $(D errno) code set by the C function 14 that reported the error.) 15 $(LI 16 Functions are appropriately marked with $(D @safe), $(D pure) 17 and $(D nothrow), thus easing their use in high-level D code.) 18 $(LI 19 Memory and resources (i.e. contexts, sockets and messages) are 20 automatically managed, thus preventing leaks.) 21 $(LI 22 Context, socket and message options are implemented as properties.) 23 ) 24 The names of functions and types in $(ZMQD) are very similar to those in 25 $(ZMQ), but they follow the D naming conventions. For example, 26 $(D zmq_msg_send()) becomes $(D zmqd.Message.send()) and so on. Thus, 27 the library should feel both familiar to $(ZMQ) users and natural to D 28 users. 29 30 Due to the close correspondence with the C API, this documentation has 31 intentionally been kept sparse. There is really no reason to repeat the 32 contents of the $(ZMQAPI __start,$(ZMQ) reference manual) here. 33 Instead, the documentation for each function contains a "Corresponds to" 34 section that links to the appropriate page in the $(ZMQ) reference. Any 35 details given in the present documentation mostly concern the D-specific 36 adaptations that have been made. 37 38 Also note that the examples only use the INPROC and IPC transports. The 39 reason for this is that the examples double as unittests, and we want to 40 avoid firewall troubles and other issues that could arise with the use of 41 network protocols such as TCP, PGM, etc. Anyway, they are only short 42 snippets that demonstrate the syntax; for more comprehensive and realistic 43 examples, please refer to the $(LINK2 http://zguide.zeromq.org/page:all, 44 $(ZMQ) Guide). 45 46 Version: 47 0.1 ($(ZMQ) 3.2 compatible) 48 Authors: 49 $(LINK2 http://github.com/kyllingstad,Lars T. Kyllingstad) 50 Copyright: 51 Copyright (c) 2013, Lars T. Kyllingstad. All rights reserved. 52 License: 53 $(ZMQD) is released under a BSD licence (see LICENCE.txt for details).$(BR) 54 Please refer to the $(LINK2 http://zeromq.org/area:licensing,$(ZMQ) site) 55 for details about $(ZMQ) licensing. 56 Macros: 57 D = <code>$0</code> 58 EM = <em>$0</em> 59 LDOTS = … 60 QUOTE = <blockquote>$0</blockquote> 61 REF = $(D $(LINK2 #$1,$1)) 62 STDREF = $(D $(LINK2 http://dlang.org/phobos/std_$1.html#.$2,std.$1.$2)) 63 ZMQ = ∅MQ 64 ZMQAPI = $(LINK2 http://api.zeromq.org/3-2:$1,$+) 65 ZMQD = $(ZMQ)D 66 ZMQREF = $(D $(ZMQAPI $1,$1)) 67 */ 68 module zmqd; 69 70 import std.typecons; 71 import deimos.zmq.zmq; 72 73 74 version(Windows) { 75 alias SOCKET = size_t; 76 } 77 78 79 /** 80 Reports the $(ZMQ) library version. 81 82 Returns: 83 A $(STDREF typecons,Tuple) with three integer fields that represent the 84 three versioning levels: $(D major), $(D minor) and $(D patch). 85 Corresponds_to: 86 $(ZMQREF zmq_version()) 87 */ 88 Tuple!(int, "major", int, "minor", int, "patch") zmqVersion() @safe nothrow 89 { 90 typeof(return) v; 91 trusted!zmq_version(&v.major, &v.minor, &v.patch); 92 return v; 93 } 94 95 96 /** 97 An object that encapsulates a $(ZMQ) context. 98 99 In most programs, it is not necessary to use this type directly, 100 as $(REF Socket) will use a default global context if not explicitly 101 provided with one. See $(REF defaultContext) for details. 102 103 A default-initialized $(D Context) is not a valid $(ZMQ) context; it 104 must always be explicitly initialized with $(REF _Context.opCall): 105 --- 106 Context ctx; // Not a valid context yet 107 ctx = Context(); // ...but now it is. 108 --- 109 $(D Context) objects can be passed around by value, and two copies will 110 refer to the same context. The underlying context is managed using 111 reference counting, so that when the last copy of a $(D Context) goes 112 out of scope, the context is automatically destroyed. 113 114 See_also: 115 $(REF defaultContext) 116 */ 117 struct Context 118 { 119 @safe: 120 /** 121 Creates a new $(ZMQ) context. 122 123 Returns: 124 A $(REF Context) object that encapsulates the new context. 125 Throws: 126 $(REF ZmqException) if $(ZMQ) reports an error. 127 Corresponds_to: 128 $(ZMQREF zmq_ctx_new()) 129 */ 130 static Context opCall() 131 { 132 if (auto c = trusted!zmq_ctx_new()) { 133 Context ctx; 134 ctx.m_resource = Resource(c, &zmq_ctx_destroy); 135 return ctx; 136 } else { 137 throw new ZmqException; 138 } 139 } 140 141 /// 142 unittest 143 { 144 auto ctx = Context(); 145 assert (ctx.initialized); 146 } 147 148 /** 149 Destroys the $(ZMQ) context. 150 151 It is normally not necessary to do this manually, as the context will 152 be destroyed automatically when the last reference to it goes out of 153 scope. 154 155 Throws: 156 $(REF ZmqException) if $(ZMQ) reports an error. 157 Corresponds_to: 158 $(ZMQREF zmq_ctx_destroy()) 159 */ 160 void destroy() 161 { 162 m_resource.free(); 163 } 164 165 /// 166 unittest 167 { 168 auto ctx = Context(); 169 assert (ctx.initialized); 170 ctx.destroy(); 171 assert (!ctx.initialized); 172 } 173 174 /** 175 The number of I/O threads. 176 177 Throws: 178 $(REF ZmqException) if $(ZMQ) reports an error. 179 Corresponds_to: 180 $(ZMQREF zmq_ctx_get()) and $(ZMQREF zmq_ctx_set()) with 181 $(D ZMQ_IO_THREADS). 182 */ 183 @property int ioThreads() 184 { 185 return getOption(ZMQ_IO_THREADS); 186 } 187 188 /// ditto 189 @property void ioThreads(int value) 190 { 191 setOption(ZMQ_IO_THREADS, value); 192 } 193 194 /// 195 unittest 196 { 197 auto ctx = Context(); 198 ctx.ioThreads = 3; 199 assert (ctx.ioThreads == 3); 200 } 201 202 /** 203 The maximum number of sockets. 204 205 Throws: 206 $(REF ZmqException) if $(ZMQ) reports an error. 207 Corresponds_to: 208 $(ZMQREF zmq_ctx_get()) and $(ZMQREF zmq_ctx_set()) with 209 $(D ZMQ_MAX_SOCKETS). 210 */ 211 @property int maxSockets() 212 { 213 return getOption(ZMQ_MAX_SOCKETS); 214 } 215 216 /// ditto 217 @property void maxSockets(int value) 218 { 219 setOption(ZMQ_MAX_SOCKETS, value); 220 } 221 222 /// 223 unittest 224 { 225 auto ctx = Context(); 226 ctx.maxSockets = 512; 227 assert (ctx.maxSockets == 512); 228 } 229 230 /** 231 The $(D void*) pointer used by the underlying C API to refer to the context. 232 233 If the object has not been initialized, this function returns $(D null). 234 */ 235 @property inout(void)* handle() inout pure nothrow 236 { 237 return m_resource.handle; 238 } 239 240 /** 241 Whether this $(REF Context) object has been _initialized, i.e. whether it 242 refers to a valid $(ZMQ) context. 243 */ 244 @property bool initialized() const pure nothrow 245 { 246 return m_resource.initialized; 247 } 248 249 /// 250 @trusted unittest // TODO: Remove @trusted for DMD 2.064 251 { 252 Context ctx; 253 assert (!ctx.initialized); 254 ctx = Context(); 255 assert (ctx.initialized); 256 ctx.destroy(); 257 assert (!ctx.initialized); 258 } 259 260 private: 261 int getOption(int option) 262 { 263 immutable value = trusted!zmq_ctx_get(m_resource.handle, option); 264 if (value < 0) { 265 throw new ZmqException; 266 } 267 return value; 268 } 269 270 void setOption(int option, int value) 271 { 272 if (trusted!zmq_ctx_set(m_resource.handle, option, value) != 0) { 273 throw new ZmqException; 274 } 275 } 276 277 Resource m_resource; 278 } 279 280 281 /** 282 A global context which is used by default by all sockets, unless they are 283 explicitly constructed with a different context. 284 285 The $(ZMQ) Guide $(LINK2 http://zguide.zeromq.org/page:all#Getting-the-Context-Right, 286 has the following to say) about context creation: 287 $(QUOTE 288 You should create and use exactly one context in your process. 289 [$(LDOTS)] If at runtime a process has two contexts, these are 290 like separate $(ZMQ) instances. If that's explicitly what you 291 want, OK, but otherwise remember: $(EM Do one $(D zmq_ctx_new()) 292 at the start of your main line code, and one $(D zmq_ctx_destroy()) 293 at the end.) 294 ) 295 By using $(D defaultContext()), this is exactly what you achieve. The 296 context is created the first time the function is called, and is 297 automatically destroyed when the program ends. 298 299 This function is thread safe. 300 301 Throws: 302 $(REF ZmqException) if $(ZMQ) reports an error. 303 See_also: 304 $(REF Context) 305 */ 306 Context defaultContext() @trusted 307 { 308 // For future reference: This is the low-lock singleton pattern. See: 309 // http://davesdprogramming.wordpress.com/2013/05/06/low-lock-singletons/ 310 static bool instantiated; 311 __gshared Context ctx; 312 if (!instantiated) { 313 synchronized { 314 if (!ctx.initialized) { 315 ctx = Context(); 316 } 317 instantiated = true; 318 } 319 } 320 return ctx; 321 } 322 323 unittest 324 { 325 auto c1 = defaultContext(); 326 auto c2 = defaultContext(); 327 assert(c1.handle !is null); 328 assert(c1.handle == c2.handle); 329 } 330 331 332 /** 333 The various socket types. 334 335 These are described in the $(ZMQREF zmq_socket()) reference. 336 */ 337 enum SocketType 338 { 339 req = ZMQ_REQ, /// Corresponds to $(D ZMQ_REQ) 340 rep = ZMQ_REP, /// Corresponds to $(D ZMQ_REP) 341 dealer = ZMQ_DEALER, /// Corresponds to $(D ZMQ_DEALER) 342 router = ZMQ_ROUTER, /// Corresponds to $(D ZMQ_ROUTER) 343 pub = ZMQ_PUB, /// Corresponds to $(D ZMQ_PUB) 344 sub = ZMQ_SUB, /// Corresponds to $(D ZMQ_SUB) 345 xpub = ZMQ_XPUB, /// Corresponds to $(D ZMQ_XPUB) 346 xsub = ZMQ_XSUB, /// Corresponds to $(D ZMQ_XSUB) 347 push = ZMQ_PUSH, /// Corresponds to $(D ZMQ_PUSH) 348 pull = ZMQ_PULL, /// Corresponds to $(D ZMQ_PULL) 349 pair = ZMQ_PAIR, /// Corresponds to $(D ZMQ_PAIR) 350 } 351 352 353 /** 354 An object that encapsulates a $(ZMQ) socket. 355 356 A default-initialized $(D Socket) is not a valid $(ZMQ) socket; it 357 must always be explicitly initialized with a constructor (see 358 $(REF _Socket.this)): 359 --- 360 Socket s; // Not a valid socket yet 361 s = Socket(SocketType.push); // ...but now it is. 362 --- 363 $(D Socket) objects can be passed around by value, and two copies will 364 refer to the same socket. The underlying socket is managed using 365 reference counting, so that when the last copy of a $(D Socket) goes 366 out of scope, the socket is automatically closed. 367 */ 368 struct Socket 369 { 370 @safe: 371 /** 372 Creates a new $(ZMQ) socket. 373 374 If $(D context) is not specified, the default context (as returned 375 by $(REF defaultContext)) is used. 376 377 Throws: 378 $(REF ZmqException) if $(ZMQ) reports an error. 379 Corresponds_to: 380 $(ZMQREF zmq_socket()) 381 */ 382 this(SocketType type) 383 { 384 this(defaultContext(), type); 385 } 386 387 /// ditto 388 this(Context context, SocketType type) 389 { 390 if (auto s = trusted!zmq_socket(context.handle, type)) { 391 // TODO: Replace the next line with the one below for DMD 2.064 392 (Context c) @trusted { m_context = c; } (context); 393 // m_context = ctx; 394 m_type = type; 395 m_socket = Resource(s, &zmq_close); 396 } else { 397 throw new ZmqException; 398 } 399 } 400 401 /// With default context: 402 unittest 403 { 404 auto sck = Socket(SocketType.push); 405 assert (sck.initialized); 406 } 407 /// With explicit context: 408 unittest 409 { 410 auto ctx = Context(); 411 auto sck = Socket(ctx, SocketType.push); 412 assert (sck.initialized); 413 } 414 415 /** 416 Closes the $(ZMQ) socket. 417 418 Note that the socket will be automatically closed when the last reference 419 to it goes out of scope, so it is often not necessary to call this 420 method manually. 421 422 Throws: 423 $(REF ZmqException) if $(ZMQ) reports an error. 424 Corresponds_to: 425 $(ZMQREF zmq_close()) 426 */ 427 void close() 428 { 429 m_socket.free(); 430 } 431 432 /// 433 unittest 434 { 435 auto s = Socket(SocketType.pair); 436 assert (s.initialized); 437 s.close(); 438 assert (!s.initialized); 439 } 440 441 /** 442 Starts accepting incoming connections on $(D endpoint). 443 444 Throws: 445 $(REF ZmqException) if $(ZMQ) reports an error. 446 Corresponds_to: 447 $(ZMQREF zmq_bind()) 448 */ 449 void bind(const char[] endpoint) 450 { 451 if (trusted!zmq_bind(m_socket.handle, zeroTermString(endpoint)) != 0) { 452 throw new ZmqException; 453 } 454 } 455 456 /// 457 unittest 458 { 459 auto s = Socket(SocketType.pub); 460 s.bind("inproc://zmqd_bind_example"); 461 } 462 463 /** 464 Stops accepting incoming connections on $(D endpoint). 465 466 Throws: 467 $(REF ZmqException) if $(ZMQ) reports an error. 468 Corresponds_to: 469 $(ZMQREF zmq_unbind()) 470 */ 471 void unbind(const char[] endpoint) 472 { 473 if (trusted!zmq_unbind(m_socket.handle, zeroTermString(endpoint)) != 0) { 474 throw new ZmqException; 475 } 476 } 477 478 /// 479 unittest 480 { 481 auto s = Socket(SocketType.pub); 482 s.bind("ipc://zmqd_unbind_example"); 483 // Do some work... 484 s.unbind("ipc://zmqd_unbind_example"); 485 } 486 487 /** 488 Creates an outgoing connection to $(D endpoint). 489 490 Throws: 491 $(REF ZmqException) if $(ZMQ) reports an error. 492 Corresponds_to: 493 $(ZMQREF zmq_connect()) 494 */ 495 void connect(const char[] endpoint) 496 { 497 if (trusted!zmq_connect(m_socket.handle, zeroTermString(endpoint)) != 0) { 498 throw new ZmqException; 499 } 500 } 501 502 /// 503 unittest 504 { 505 auto s = Socket(SocketType.sub); 506 s.connect("ipc://zmqd_connect_example"); 507 } 508 509 /** 510 Disconnects the socket from $(D endpoint). 511 512 Throws: 513 $(REF ZmqException) if $(ZMQ) reports an error. 514 Corresponds_to: 515 $(ZMQREF zmq_disconnect()) 516 */ 517 void disconnect(const char[] endpoint) 518 { 519 if (trusted!zmq_disconnect(m_socket.handle, zeroTermString(endpoint)) != 0) { 520 throw new ZmqException; 521 } 522 } 523 524 /// 525 unittest 526 { 527 auto s = Socket(SocketType.sub); 528 s.connect("ipc://zmqd_disconnect_example"); 529 // Do some work... 530 s.disconnect("ipc://zmqd_disconnect_example"); 531 } 532 533 /** 534 Sends a message part. 535 536 $(D _send) blocks until the message has been queued on the socket. 537 $(D trySend) performs the operation in non-blocking mode, and returns 538 a $(D bool) value that signifies whether the message was queued on the 539 socket. 540 541 The $(D char[]) overload is a convenience function that simply casts 542 the string argument to $(D ubyte[]). 543 544 Throws: 545 $(REF ZmqException) if $(ZMQ) reports an error. 546 Corresponds_to: 547 $(ZMQREF zmq_send()) (with the $(D ZMQ_DONTWAIT) flag, in the case 548 of $(D trySend)). 549 */ 550 void send(const ubyte[] data, bool more = false) 551 { 552 immutable flags = more ? ZMQ_SNDMORE : 0; 553 if (trusted!zmq_send(m_socket.handle, data.ptr, data.length, flags) < 0) { 554 throw new ZmqException; 555 } 556 } 557 558 /// ditto 559 void send(const char[] data, bool more = false) @trusted 560 { 561 send(cast(ubyte[]) data, more); 562 } 563 564 /// ditto 565 bool trySend(const ubyte[] data, bool more = false) 566 { 567 immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0); 568 if (trusted!zmq_send(m_socket.handle, data.ptr, data.length, flags) < 0) { 569 import core.stdc.errno; 570 if (errno == EAGAIN) return false; 571 else throw new ZmqException; 572 } 573 return true; 574 } 575 576 /// ditto 577 bool trySend(const char[] data, bool more = false) @trusted 578 { 579 return trySend(cast(ubyte[]) data, more); 580 } 581 582 /// 583 unittest 584 { 585 auto sck = Socket(SocketType.pub); 586 sck.send(cast(ubyte[]) [11, 226, 92]); 587 sck.send("Hello World!"); 588 } 589 590 /** 591 Sends a message part. 592 593 $(D _send) blocks until the message has been queued on the socket. 594 $(D trySend) performs the operation in non-blocking mode, and returns 595 a $(D bool) value that signifies whether the message was queued on the 596 socket. 597 598 Throws: 599 $(REF ZmqException) if $(ZMQ) reports an error. 600 Corresponds_to: 601 $(ZMQREF zmq_msg_send()) (with the $(D ZMQ_DONTWAIT) flag, in the case 602 of $(D trySend)). 603 */ 604 void send(ref Message msg, bool more = false) 605 { 606 immutable flags = more ? ZMQ_SNDMORE : 0; 607 if (trusted!zmq_msg_send(msg.handle, m_socket.handle, flags) < 0) { 608 throw new ZmqException; 609 } 610 } 611 612 /// ditto 613 bool trySend(ref Message msg, bool more = false) 614 { 615 immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0); 616 if (trusted!zmq_msg_send(msg.handle, m_socket.handle, flags) < 0) { 617 import core.stdc.errno; 618 if (errno == EAGAIN) return false; 619 else throw new ZmqException; 620 } 621 return true; 622 } 623 624 /// 625 unittest 626 { 627 auto sck = Socket(SocketType.pub); 628 auto msg = Message(12); 629 msg.data.asString()[] = "Hello World!"; 630 sck.send(msg); 631 } 632 633 /** 634 Receives a message part. 635 636 $(D _receive) blocks until the request can be satisfied. 637 $(D tryReceive) performs the operation in non-blocking mode, and returns 638 a $(D bool) value that signifies whether a message was received. 639 640 Throws: 641 $(REF ZmqException) if $(ZMQ) reports an error. 642 Corresponds_to: 643 $(ZMQREF zmq_recv()) (with the $(D ZMQ_DONTWAIT) flag, in the case 644 of $(D tryReceive)). 645 646 */ 647 size_t receive(ubyte[] data) 648 { 649 immutable len = trusted!zmq_recv(m_socket.handle, data.ptr, data.length, 0); 650 if (len >= 0) { 651 import std.conv; 652 return to!size_t(len); 653 } else { 654 throw new ZmqException; 655 } 656 } 657 658 /// ditto 659 Tuple!(size_t, bool) tryReceive(ubyte[] data) 660 { 661 immutable len = trusted!zmq_recv(m_socket.handle, data.ptr, data.length, ZMQ_DONTWAIT); 662 if (len >= 0) { 663 import std.conv; 664 return typeof(return)(to!size_t(len), true); 665 } else { 666 import core.stdc.errno; 667 if (errno == EAGAIN) { 668 return typeof(return)(0, false); 669 } else { 670 throw new ZmqException; 671 } 672 } 673 } 674 675 /// 676 unittest 677 { 678 // Sender 679 auto snd = Socket(SocketType.req); 680 snd.connect("ipc://zmqd_receive_example"); 681 snd.send("Hello World!"); 682 683 // Receiver 684 import std.string: representation; 685 auto rcv = Socket(SocketType.rep); 686 rcv.bind("ipc://zmqd_receive_example"); 687 char[256] buf; 688 immutable len = rcv.receive(buf.representation); 689 assert (buf[0 .. len] == "Hello World!"); 690 } 691 692 @trusted unittest 693 { 694 auto snd = Socket(SocketType.pair); 695 snd.bind("ipc://zmqd_tryReceive_example"); 696 auto rcv = Socket(SocketType.pair); 697 rcv.connect("ipc://zmqd_tryReceive_example"); 698 699 ubyte[256] buf; 700 auto r1 = rcv.tryReceive(buf); 701 assert (!r1[1]); 702 703 import core.thread, core.time, std.string; 704 snd.send("Hello World!"); 705 Thread.sleep(100.msecs); // Wait for message to be transferred... 706 auto r2 = rcv.tryReceive(buf); 707 assert (r2[1] && buf[0 .. r2[0]] == "Hello World!".representation); 708 } 709 710 /** 711 Receives a message part. 712 713 $(D _receive) blocks until the request can be satisfied. 714 $(D tryReceive) performs the operation in non-blocking mode, and returns 715 a $(D bool) value that signifies whether a message was received. 716 717 Throws: 718 $(REF ZmqException) if $(ZMQ) reports an error. 719 Corresponds_to: 720 $(ZMQREF zmq_msg_recv()) (with the $(D ZMQ_DONTWAIT) flag, in the case 721 of $(D tryReceive)). 722 723 */ 724 size_t receive(ref Message msg) 725 { 726 immutable len = trusted!zmq_msg_recv(msg.handle, m_socket.handle, 0); 727 if (len >= 0) { 728 import std.conv; 729 return to!size_t(len); 730 } else { 731 throw new ZmqException; 732 } 733 } 734 735 /// ditto 736 Tuple!(size_t, bool) tryReceive(ref Message msg) 737 { 738 immutable len = trusted!zmq_msg_recv(msg.handle, m_socket.handle, ZMQ_DONTWAIT); 739 if (len >= 0) { 740 import std.conv; 741 return typeof(return)(to!size_t(len), true); 742 } else { 743 import core.stdc.errno; 744 if (errno == EAGAIN) { 745 return typeof(return)(0, false); 746 } else { 747 throw new ZmqException; 748 } 749 } 750 } 751 752 /// 753 unittest 754 { 755 // Sender 756 auto snd = Socket(SocketType.req); 757 snd.connect("ipc://zmqd_msg_receive_example"); 758 snd.send("Hello World!"); 759 760 // Receiver 761 import std.string: representation; 762 auto rcv = Socket(SocketType.rep); 763 rcv.bind("ipc://zmqd_msg_receive_example"); 764 auto msg = Message(); 765 rcv.receive(msg); 766 assert (msg.data.asString() == "Hello World!"); 767 } 768 769 @trusted unittest 770 { 771 auto snd = Socket(SocketType.pair); 772 snd.bind("ipc://zmqd_msg_tryReceive_example"); 773 auto rcv = Socket(SocketType.pair); 774 rcv.connect("ipc://zmqd_msg_tryReceive_example"); 775 776 auto msg = Message(); 777 auto r1 = rcv.tryReceive(msg); 778 assert (!r1[1]); 779 780 import core.thread, core.time, std.string; 781 snd.send("Hello World!"); 782 Thread.sleep(100.msecs); // Wait for message to be transferred... 783 auto r2 = rcv.tryReceive(msg); 784 assert (r2[1] && msg.data[0 .. r2[0]] == "Hello World!".representation); 785 } 786 787 /** 788 The socket _type. 789 790 Throws: 791 $(REF ZmqException) if $(ZMQ) reports an error. 792 Corresponds_to: 793 $(ZMQREF zmq_msg_getsockopt()) with $(D ZMQ_TYPE). 794 */ 795 @property SocketType type() { return getOption!SocketType(ZMQ_TYPE); } 796 797 /// 798 unittest 799 { 800 auto sck = Socket(SocketType.xpub); 801 assert (sck.type == SocketType.xpub); 802 } 803 804 /** 805 Whether there are _more message data parts to follow. 806 807 Throws: 808 $(REF ZmqException) if $(ZMQ) reports an error. 809 Corresponds_to: 810 $(ZMQREF zmq_msg_getsockopt()) with $(D ZMQ_RCVMORE). 811 */ 812 @property bool more() { return !!getOption!int(ZMQ_RCVMORE); } 813 814 // TODO: Better unittest/example 815 unittest 816 { 817 auto sck = Socket(SocketType.req); 818 assert (!sck.more); 819 } 820 821 /** 822 Misc. socket properties. 823 824 Each of these has a one-to-one correspondence with an option passed to 825 $(ZMQREF zmq_msg_getsockopt()) and $(ZMQREF zmq_msg_setsockopt()). For 826 example, $(D identity) corresponds to $(D ZMQ_IDENTITY), 827 $(D receiveBufferSize) corresponds to $(D ZMQ_RCVBUF), etc. 828 829 Notes: 830 $(UL 831 $(LI For convenience, the setter for the $(D identity) property 832 accepts strings. To retrieve a string with the getter, use 833 the $(REF asString) function. 834 --- 835 sck.identity = "foobar"; 836 assert (sck.identity.asString() == "foobar"); 837 --- 838 ) 839 $(LI The $(D fd) property is an $(D int) on POSIX and a $(D SOCKET) 840 on Windows.) 841 $(LI The $(D ZMQ_SUBSCRIBE) and $(D ZMQ_UNSUBSCRIBE) options are 842 treated differently from the others; see $(REF Socket.subscribe) 843 and $(REF Socket.unsubscribe)) 844 ) 845 846 Throws: 847 $(REF ZmqException) if $(ZMQ) reports an error. 848 Corresponds_to: 849 $(ZMQREF zmq_msg_getsockopt()) and $(ZMQREF zmq_msg_setsockopt()). 850 */ 851 @property int sendHWM() { return getOption!int(ZMQ_SNDHWM); } 852 /// ditto 853 @property void sendHWM(int value) { setOption(ZMQ_SNDHWM, value); } 854 855 /// ditto 856 @property int receiveHWM() { return getOption!int(ZMQ_RCVHWM); } 857 /// ditto 858 @property void receiveHWM(int value) { setOption(ZMQ_RCVHWM, value); } 859 860 /// ditto 861 @property ulong threadAffinity() { return getOption!ulong(ZMQ_AFFINITY); } 862 /// ditto 863 @property void threadAffinity(ulong value) { setOption(ZMQ_AFFINITY, value); } 864 865 /// ditto 866 @property ubyte[] identity() @trusted 867 { 868 // This function is not @safe because it calls a @system function 869 // (zmq_getsockopt) and takes the address of a local (len). 870 auto buf = new ubyte[255]; 871 size_t len = buf.length; 872 if (zmq_getsockopt(m_socket.handle, ZMQ_IDENTITY, buf.ptr, &len) != 0) { 873 throw new ZmqException; 874 } 875 return buf[0 .. len]; 876 } 877 /// ditto 878 @property void identity(const ubyte[] value) { setOption(ZMQ_IDENTITY, value); } 879 /// ditto 880 @property void identity(const char[] value) { setOption(ZMQ_IDENTITY, value); } 881 882 /// ditto 883 @property int rate() { return getOption!int(ZMQ_RATE); } 884 /// ditto 885 @property void rate(int value) { setOption(ZMQ_RATE, value); } 886 887 /// ditto 888 @property int recoveryInterval() { return getOption!int(ZMQ_RECOVERY_IVL); } 889 /// ditto 890 @property void recoveryInterval(int value) { setOption(ZMQ_RECOVERY_IVL, value); } 891 892 /// ditto 893 @property int sendBufferSize() { return getOption!int(ZMQ_SNDBUF); } 894 /// ditto 895 @property void sendBufferSize(int value) { setOption(ZMQ_SNDBUF, value); } 896 897 /// ditto 898 @property int receiveBufferSize() { return getOption!int(ZMQ_RCVBUF); } 899 /// ditto 900 @property void receiveBufferSize(int value) { setOption(ZMQ_RCVBUF, value); } 901 902 /// ditto 903 @property int linger() { return getOption!int(ZMQ_LINGER); } 904 /// ditto 905 @property void linger(int value) { setOption(ZMQ_LINGER, value); } 906 907 /// ditto 908 @property int reconnectionInterval() { return getOption!int(ZMQ_RECONNECT_IVL); } 909 /// ditto 910 @property void reconnectionInterval(int value) { setOption(ZMQ_RECONNECT_IVL, value); } 911 912 /// ditto 913 @property int maxReconnectionInterval() { return getOption!int(ZMQ_RECONNECT_IVL_MAX); } 914 /// ditto 915 @property void maxReconnectionInterval(int value) { setOption(ZMQ_RECONNECT_IVL_MAX, value); } 916 917 /// ditto 918 @property int backlog() { return getOption!int(ZMQ_BACKLOG); } 919 /// ditto 920 @property void backlog(int value) { setOption(ZMQ_BACKLOG, value); } 921 922 /// ditto 923 @property long maxMsgSize() { return getOption!long(ZMQ_MAXMSGSIZE); } 924 /// ditto 925 @property void maxMsgSize(long value) { setOption(ZMQ_MAXMSGSIZE, value); } 926 927 /// ditto 928 @property int multicastHops() { return getOption!int(ZMQ_MULTICAST_HOPS); } 929 /// ditto 930 @property void multicastHops(int value) { setOption(ZMQ_MULTICAST_HOPS, value); } 931 932 /// ditto 933 @property int receiveTimeout() { return getOption!int(ZMQ_RCVTIMEO); } 934 /// ditto 935 @property void receiveTimeout(int value) { setOption(ZMQ_RCVTIMEO, value); } 936 937 /// ditto 938 @property int sendTimeout() { return getOption!int(ZMQ_SNDTIMEO); } 939 /// ditto 940 @property void sendTimeout(int value) { setOption(ZMQ_SNDTIMEO, value); } 941 942 /// ditto 943 @property bool ipv4Only() { return !!getOption!int(ZMQ_IPV4ONLY); } 944 /// ditto 945 @property void ipv4Only(bool value) { setOption(ZMQ_IPV4ONLY, value ? 1 : 0); } 946 947 /// ditto 948 @property bool delayAttachOnConnect() { return !!getOption!int(ZMQ_DELAY_ATTACH_ON_CONNECT); } 949 /// ditto 950 @property void delayAttachOnConnect(bool value) { setOption(ZMQ_DELAY_ATTACH_ON_CONNECT, value ? 1 : 0); } 951 952 953 version (Windows) { 954 alias FD = SOCKET; 955 } else version (Posix) { 956 alias FD = int; 957 } 958 959 /// ditto 960 @property FD fd() { return getOption!FD(ZMQ_FD); } 961 962 /// ditto 963 @property int events() { return getOption!int(ZMQ_EVENTS); } 964 965 /// ditto 966 @property char[] lastEndpoint() @trusted 967 { 968 // This function is not @safe because it calls a @system function 969 // (zmq_getsockopt) and takes the address of a local (len). 970 auto buf = new char[1024]; 971 size_t len = buf.length; 972 if (zmq_getsockopt(m_socket.handle, ZMQ_LAST_ENDPOINT, buf.ptr, &len) != 0) { 973 throw new ZmqException; 974 } 975 return buf[0 .. len-1]; 976 } 977 978 // TODO: Some low-level options are missing still, plus setters for 979 // ZMQ_ROUTER_MANDATORY and ZMQ_XPUB_VERBOSE. 980 981 unittest 982 { 983 // We test all the socket options by checking that they have their default value. 984 auto s = Socket(SocketType.xpub); 985 const e = "inproc://unittest2"; 986 s.bind(e); 987 assert(s.type == SocketType.xpub); 988 assert(s.sendHWM == 1000); 989 assert(s.receiveHWM == 1000); 990 assert(s.threadAffinity == 0); 991 assert(s.identity == null); 992 assert(s.rate == 100); 993 assert(s.recoveryInterval == 10_000); 994 assert(s.sendBufferSize == 0); 995 assert(s.receiveBufferSize == 0); 996 assert(s.linger == -1); 997 assert(s.reconnectionInterval == 100); 998 assert(s.maxReconnectionInterval == 0); 999 assert(s.backlog == 100); 1000 assert(s.maxMsgSize == -1); 1001 assert(s.multicastHops == 1); 1002 assert(s.receiveTimeout == -1); 1003 assert(s.sendTimeout == -1); 1004 assert(s.ipv4Only); 1005 assert(!s.delayAttachOnConnect); 1006 version(Posix) { 1007 assert(s.fd > 2); // 0, 1 and 2 are the standard streams 1008 } 1009 assert(s.lastEndpoint == e); 1010 1011 // Test setters and getters together 1012 s.sendHWM = 500; 1013 assert(s.sendHWM == 500); 1014 s.receiveHWM = 600; 1015 assert(s.receiveHWM == 600); 1016 s.threadAffinity = 1; 1017 assert(s.threadAffinity == 1); 1018 s.identity = cast(ubyte[]) [ 65, 66, 67 ]; 1019 assert(s.identity == [65, 66, 67]); 1020 s.identity = "foo"; 1021 assert(s.identity == [102, 111, 111]); 1022 s.rate = 200; 1023 assert(s.rate == 200); 1024 s.recoveryInterval = 5_000; 1025 assert(s.recoveryInterval == 5_000); 1026 s.sendBufferSize = 500; 1027 assert(s.sendBufferSize == 500); 1028 s.receiveBufferSize = 600; 1029 assert(s.receiveBufferSize == 600); 1030 s.linger = 0; 1031 assert(s.linger == 0); 1032 s.linger = 100; 1033 assert(s.linger == 100); 1034 s.reconnectionInterval = 200; 1035 assert(s.reconnectionInterval == 200); 1036 s.maxReconnectionInterval = 300; 1037 assert(s.maxReconnectionInterval == 300); 1038 s.backlog = 50; 1039 assert(s.backlog == 50); 1040 s.maxMsgSize = 1000; 1041 assert(s.maxMsgSize == 1000); 1042 s.multicastHops = 2; 1043 assert(s.multicastHops == 2); 1044 s.receiveTimeout = 3_000; 1045 assert(s.receiveTimeout == 3_000); 1046 s.sendTimeout = 2_000; 1047 assert(s.sendTimeout == 2_000); 1048 s.ipv4Only = false; 1049 assert(!s.ipv4Only); 1050 s.delayAttachOnConnect = true; 1051 assert(s.delayAttachOnConnect); 1052 } 1053 1054 /** 1055 Establishes a message filter. 1056 1057 Throws: 1058 $(REF ZmqException) if $(ZMQ) reports an error. 1059 Corresponds_to: 1060 $(ZMQREF zmq_msg_setsockopt()) with $(D ZMQ_SUBSCRIBE). 1061 */ 1062 void subscribe(const ubyte[] filterPrefix) { setOption(ZMQ_SUBSCRIBE, filterPrefix); } 1063 /// ditto 1064 void subscribe(const char[] filterPrefix) { setOption(ZMQ_SUBSCRIBE, filterPrefix); } 1065 1066 /// 1067 unittest 1068 { 1069 // Create a subscriber that accepts all messages that start with 1070 // the prefixes "foo" or "bar". 1071 auto sck = Socket(SocketType.sub); 1072 sck.subscribe("foo"); 1073 sck.subscribe("bar"); 1074 } 1075 1076 @trusted unittest 1077 { 1078 void sleep(int ms) { 1079 import core.thread, core.time; 1080 Thread.sleep(dur!"msecs"(ms)); 1081 } 1082 auto pub = Socket(SocketType.pub); 1083 pub.bind("inproc://zmqd_subscribe_unittest"); 1084 auto sub = Socket(SocketType.sub); 1085 sub.connect("inproc://zmqd_subscribe_unittest"); 1086 1087 pub.send("Hello"); 1088 sleep(100); 1089 sub.subscribe("He"); 1090 sub.subscribe(cast(ubyte[])['W', 'o']); 1091 sleep(100); 1092 pub.send("Heeee"); 1093 pub.send("World"); 1094 sleep(100); 1095 ubyte[5] buf; 1096 sub.receive(buf); 1097 assert(buf.asString() == "Heeee"); 1098 sub.receive(buf); 1099 assert(buf.asString() == "World"); 1100 } 1101 1102 /** 1103 Removes a message filter. 1104 1105 Throws: 1106 $(REF ZmqException) if $(ZMQ) reports an error. 1107 Corresponds_to: 1108 $(ZMQREF zmq_msg_setsockopt()) with $(D ZMQ_SUBSCRIBE). 1109 */ 1110 void unsubscribe(const ubyte[] filterPrefix) { setOption(ZMQ_UNSUBSCRIBE, filterPrefix); } 1111 /// ditto 1112 void unsubscribe(const char[] filterPrefix) { setOption(ZMQ_UNSUBSCRIBE, filterPrefix); } 1113 1114 /// 1115 unittest 1116 { 1117 // Subscribe to messages that start with "foo" or "bar". 1118 auto sck = Socket(SocketType.sub); 1119 sck.subscribe("foo"); 1120 sck.subscribe("bar"); 1121 // ... 1122 // From now on, only accept messages that start with "bar" 1123 sck.unsubscribe("foo"); 1124 } 1125 1126 /** 1127 The $(D void*) pointer used by the underlying C API to refer to the socket. 1128 1129 If the object has not been initialized, this function returns $(D null). 1130 */ 1131 @property inout(void)* handle() inout pure nothrow 1132 { 1133 return m_socket.handle; 1134 } 1135 1136 /** 1137 Whether this $(REF Socket) object has been _initialized, i.e. whether it 1138 refers to a valid $(ZMQ) socket. 1139 */ 1140 @property bool initialized() const pure nothrow 1141 { 1142 return m_socket.initialized; 1143 } 1144 1145 /// 1146 @trusted unittest // TODO: Remove @trusted for DMD 2.064 1147 { 1148 Socket sck; 1149 assert (!sck.initialized); 1150 sck = Socket(SocketType.sub); 1151 assert (sck.initialized); 1152 sck.close(); 1153 assert (!sck.initialized); 1154 } 1155 1156 private: 1157 T getOption(T)(int option) @trusted 1158 { 1159 T buf; 1160 auto len = T.sizeof; 1161 if (zmq_getsockopt(m_socket.handle, option, &buf, &len) != 0) { 1162 throw new ZmqException; 1163 } 1164 assert(len == T.sizeof); 1165 return buf; 1166 } 1167 void setOption()(int option, const void[] value) 1168 { 1169 if (trusted!zmq_setsockopt(m_socket.handle, option, value.ptr, value.length) != 0) { 1170 throw new ZmqException; 1171 } 1172 } 1173 1174 import std.traits; 1175 void setOption(T)(int option, T value) @trusted if (isScalarType!T) 1176 { 1177 if (zmq_setsockopt(m_socket.handle, option, &value, value.sizeof) != 0) { 1178 throw new ZmqException; 1179 } 1180 } 1181 1182 Context m_context; 1183 SocketType m_type; 1184 Resource m_socket; 1185 } 1186 1187 unittest 1188 { 1189 auto s1 = Socket(SocketType.pair); 1190 auto s2 = Socket(SocketType.pair); 1191 s1.bind("inproc://unittest"); 1192 s2.connect("inproc://unittest"); 1193 s1.send("Hello World!"); 1194 ubyte[12] buf; 1195 const len = s2.receive(buf[]); 1196 assert (len == 12); 1197 assert (buf == "Hello World!"); 1198 } 1199 1200 1201 /** 1202 Starts the built-in $(ZMQ) _proxy. 1203 1204 Corresponds_to: 1205 $(ZMQREF zmq_proxy()) 1206 */ 1207 void proxy(ref Socket frontend, ref Socket backend) @safe nothrow 1208 { 1209 trusted!zmq_proxy(frontend.handle, backend.handle, null); 1210 } 1211 1212 /// ditto 1213 void proxy(ref Socket frontend, ref Socket backend, ref Socket capture) 1214 @safe nothrow 1215 { 1216 trusted!zmq_proxy(frontend.handle, backend.handle, capture.handle); 1217 } 1218 1219 1220 /** 1221 An object that encapsulates a $(ZMQ) message. 1222 1223 This $(D struct) is a wrapper around a $(D zmq_msg_t) object. Unlike 1224 $(REF Context) and $(REF Socket), it does $(EM not) perform reference 1225 counting, because $(ZMQ) messages have a form of reference counting of 1226 their own. A $(D Message) cannot be copied by normal assignment; use 1227 $(REF Message.copy) for this. 1228 1229 A default-initialized $(D Message) is not a valid $(ZMQ) message; it 1230 must always be explicitly initialized with $(REF _Message.opCall) or 1231 $(REF _Message.this): 1232 --- 1233 Message msg1; // Invalid message 1234 auto msg2 = Message(); // Empty message 1235 auto msg3 = Message(1024); // 1K message 1236 --- 1237 When a $(D Message) goes out of scope, $(ZMQREF zmq_msg_close()) is 1238 called on the underlying $(D zmq_msg_t). 1239 */ 1240 struct Message 1241 { 1242 @safe: 1243 /** 1244 Initialises an empty $(ZMQ) message. 1245 1246 Throws: 1247 $(REF ZmqException) if $(ZMQ) reports an error. 1248 Corresponds_to: 1249 $(ZMQREF zmq_msg_init()) 1250 */ 1251 static Message opCall() 1252 { 1253 Message m; 1254 if (trusted!zmq_msg_init(&m.m_msg) != 0) { 1255 throw new ZmqException; 1256 } 1257 m.m_initialized = true; 1258 return m; 1259 } 1260 1261 /// 1262 unittest 1263 { 1264 auto msg = Message(); 1265 assert(msg.size == 0); 1266 } 1267 1268 /** 1269 Initialises a $(ZMQ) message of a specified size. 1270 1271 Throws: 1272 $(REF ZmqException) if $(ZMQ) reports an error. 1273 Corresponds_to: 1274 $(ZMQREF zmq_msg_init_size()) 1275 */ 1276 this(size_t size) 1277 { 1278 if (trusted!zmq_msg_init_size(&m_msg, size) != 0) { 1279 throw new ZmqException; 1280 } 1281 m_initialized = true; 1282 } 1283 1284 /// 1285 unittest 1286 { 1287 auto msg = Message(123); 1288 assert(msg.size == 123); 1289 } 1290 1291 @disable this(this); 1292 1293 /** 1294 Releases the $(ZMQ) message when the $(D Message) is destroyed. 1295 1296 This destructor never throws, which means that any errors will go 1297 undetected. If this is undesirable, call $(REF Message.close) before 1298 the $(D Message) is destroyed. 1299 1300 Corresponds_to: 1301 $(ZMQREF zmq_msg_close()) 1302 */ 1303 ~this() nothrow 1304 { 1305 if (m_initialized) { 1306 immutable rc = trusted!zmq_msg_close(&m_msg); 1307 assert(rc == 0, "zmq_msg_close failed: Invalid message"); 1308 } 1309 } 1310 1311 /** 1312 Releases the $(ZMQ) message. 1313 1314 Note that the message will be automatically released when the $(D Message) 1315 object is destroyed, so it is often not necessary to call this method 1316 manually. 1317 1318 Throws: 1319 $(REF ZmqException) if $(ZMQ) reports an error. 1320 Corresponds_to: 1321 $(ZMQREF zmq_msg_close()) 1322 */ 1323 void close() 1324 { 1325 if (m_initialized) { 1326 if (trusted!zmq_msg_close(&m_msg) != 0) { 1327 throw new ZmqException; 1328 } 1329 m_initialized = false; 1330 } 1331 } 1332 1333 /** 1334 Copies message content to another message. 1335 1336 $(D copy()) returns a new $(D Message) object, while $(D copyTo(dest)) 1337 copies the contents of this $(D Message) into $(D dest). $(D dest) must 1338 be a valid (i.e. initialised) $(D Message). 1339 1340 Warning: 1341 These functions may not do what you think they do. Please refer 1342 to $(ZMQAPI zmq_msg_copy(),the $(ZMQ) manual) for details. 1343 Throws: 1344 $(REF ZmqException) if $(ZMQ) reports an error. 1345 Corresponds_to: 1346 $(ZMQREF zmq_msg_copy()) 1347 */ 1348 Message copy() 1349 { 1350 auto cp = Message(); 1351 copyTo(cp); 1352 return cp; 1353 } 1354 1355 /// ditto 1356 void copyTo(ref Message dest) 1357 { 1358 if (trusted!zmq_msg_copy(&dest.m_msg, &m_msg) != 0) { 1359 throw new ZmqException; 1360 } 1361 } 1362 1363 /// 1364 unittest 1365 { 1366 import std.string: representation; 1367 auto msg1 = Message(3); 1368 msg1.data[] = "foo".representation; 1369 auto msg2 = msg1.copy(); 1370 assert (msg2.data.asString() == "foo"); 1371 } 1372 1373 /** 1374 Moves message content to another message. 1375 1376 $(D move()) returns a new $(D Message) object, while $(D moveTo(dest)) 1377 moves the contents of this $(D Message) to $(D dest). $(D dest) must 1378 be a valid (i.e. initialised) $(D Message). 1379 1380 Throws: 1381 $(REF ZmqException) if $(ZMQ) reports an error. 1382 Corresponds_to: 1383 $(ZMQREF zmq_msg_move()) 1384 */ 1385 Message move() 1386 { 1387 auto m = Message(); 1388 moveTo(m); 1389 return m; 1390 } 1391 1392 /// ditto 1393 void moveTo(ref Message dest) 1394 { 1395 if (trusted!zmq_msg_move(&dest.m_msg, &m_msg) != 0) { 1396 throw new ZmqException; 1397 } 1398 } 1399 1400 /// 1401 unittest 1402 { 1403 import std.string: representation; 1404 auto msg1 = Message(3); 1405 msg1.data[] = "foo".representation; 1406 auto msg2 = msg1.move(); 1407 assert (msg1.size == 0); 1408 assert (msg2.data.asString() == "foo"); 1409 } 1410 1411 /** 1412 The message content size in bytes. 1413 1414 Corresponds_to: 1415 $(ZMQREF zmq_msg_size()) 1416 */ 1417 @property size_t size() nothrow 1418 { 1419 return trusted!zmq_msg_size(&m_msg); 1420 } 1421 1422 /// 1423 unittest 1424 { 1425 auto msg = Message(123); 1426 assert(msg.size == 123); 1427 } 1428 1429 /** 1430 Retrieves the message content. 1431 1432 Corresponds_to: 1433 $(ZMQREF zmq_msg_data()) 1434 */ 1435 @property ubyte[] data() @trusted nothrow 1436 { 1437 return (cast(ubyte*) zmq_msg_data(&m_msg))[0 .. size]; 1438 } 1439 1440 /// 1441 unittest 1442 { 1443 import std.string: representation; 1444 auto msg = Message(3); 1445 assert(msg.data.length == 3); 1446 msg.data[] = "foo".representation; // Slice operator -> array copy. 1447 assert(msg.data.asString() == "foo"); 1448 } 1449 1450 /** 1451 Whether there are more message parts to retrieve. 1452 1453 Corresponds_to: 1454 $(ZMQREF zmq_msg_more()) 1455 */ 1456 @property bool more() nothrow 1457 { 1458 return !!trusted!zmq_msg_more(&m_msg); 1459 } 1460 1461 /** 1462 A pointer to the underlying $(D zmq_msg_t). 1463 */ 1464 @property inout(zmq_msg_t)* handle() inout pure nothrow 1465 { 1466 return &m_msg; 1467 } 1468 1469 private: 1470 bool m_initialized; 1471 zmq_msg_t m_msg; 1472 } 1473 1474 unittest 1475 { 1476 const url = uniqueUrl("inproc"); 1477 auto s1 = Socket(SocketType.pair); 1478 auto s2 = Socket(SocketType.pair); 1479 s1.bind(url); 1480 s2.connect(url); 1481 1482 auto m1a = Message(123); 1483 m1a.data[] = 'a'; 1484 s1.send(m1a); 1485 auto m2a = Message(); 1486 s2.receive(m2a); 1487 assert(m2a.size == 123); 1488 foreach (e; m2a.data) assert(e == 'a'); 1489 1490 auto m1b = Message(10); 1491 m1b.data[] = 'b'; 1492 s1.send(m1b); 1493 auto m2b = Message(); 1494 s2.receive(m2b); 1495 assert(m2b.size == 10); 1496 foreach (e; m2b.data) assert(e == 'b'); 1497 } 1498 1499 1500 /** 1501 Utility function which interprets and validates a byte array as a UTF-8 string. 1502 1503 Most of $(ZMQD)'s message API deals in $(D ubyte[]) arrays, but very often, 1504 the message _data contains plain text. $(D asString()) allows for easy and 1505 safe interpretation of raw _data as characters. It checks that $(D data) is 1506 a valid UTF-8 encoded string, and returns a $(D char[]) array that refers to 1507 the same memory region. 1508 1509 Throws: 1510 $(STDREF utf,UTFException) if $(D data) is not a valid UTF-8 string. 1511 See_also: 1512 $(STDREF string,representation), which performs the opposite operation. 1513 */ 1514 inout(char)[] asString(inout(ubyte)[] data) @safe pure 1515 { 1516 auto s = cast(typeof(return)) data; 1517 import std.utf: validate; 1518 validate(s); 1519 return s; 1520 } 1521 1522 /// 1523 unittest 1524 { 1525 auto s1 = Socket(SocketType.pair); 1526 s1.bind("ipc://zmqd_asString_example"); 1527 auto s2 = Socket(SocketType.pair); 1528 s2.connect("ipc://zmqd_asString_example"); 1529 1530 auto msg = Message(12); 1531 msg.data.asString()[] = "Hello World!"; 1532 s1.send(msg); 1533 1534 ubyte[12] buf; 1535 s2.receive(buf); 1536 assert(buf.asString() == "Hello World!"); 1537 } 1538 1539 unittest 1540 { 1541 auto bytes = cast(ubyte[]) ['f', 'o', 'o']; 1542 auto text = bytes.asString(); 1543 assert (text == "foo"); 1544 assert (cast(void*) bytes.ptr == cast(void*) text.ptr); 1545 1546 import std.exception: assertThrown; 1547 import std.utf: UTFException; 1548 auto b = cast(ubyte[]) [100, 252, 1]; 1549 assertThrown!UTFException(asString(b)); 1550 } 1551 1552 1553 /** 1554 A class for exceptions thrown when any of the underlying $(ZMQ) C functions 1555 report an error. 1556 1557 The exception provides a standard error message obtained with 1558 $(ZMQREF zmq_strerror()), as well as the $(D errno) code set by the $(ZMQ) 1559 function which reported the error. 1560 */ 1561 class ZmqException : Exception 1562 { 1563 @safe: 1564 /** 1565 The $(D errno) code that was set by the $(ZMQ) function that reported 1566 the error. 1567 1568 Corresponds_to: 1569 $(ZMQREF zmq_errno()) 1570 */ 1571 immutable int errno; 1572 1573 private: 1574 this(string file = __FILE__, int line = __LINE__) nothrow 1575 { 1576 import core.stdc.errno, std.conv; 1577 this.errno = core.stdc.errno.errno; 1578 string msg; 1579 try { 1580 msg = trusted!(to!string)(trusted!zmq_strerror(this.errno)); 1581 } catch (Exception e) { /* We never get here */ } 1582 assert(msg.length); // Still, let's assert as much. 1583 super(msg, file, line); 1584 } 1585 } 1586 1587 1588 private: 1589 1590 struct Resource 1591 { 1592 alias extern(C) int function(void*) nothrow CFreeFunction; 1593 1594 this(void* ptr, CFreeFunction freeFunc) @safe pure nothrow 1595 in { assert(ptr !is null); } body 1596 { 1597 m_payload = new Payload(1, ptr, freeFunc); 1598 } 1599 1600 this(this) @safe pure nothrow 1601 { 1602 if (m_payload !is null) { 1603 ++(m_payload.refCount); 1604 } 1605 } 1606 1607 // TODO: This function could be @safe, if not for a weird compiler bug. 1608 // https://d.puremagic.com/issues/show_bug.cgi?id=11505 1609 ~this() @trusted nothrow 1610 { 1611 detach(); 1612 } 1613 1614 ref Resource opAssign(Resource rhs) @safe 1615 { 1616 if (detach() != 0) { 1617 throw new ZmqException; 1618 } 1619 m_payload = rhs.m_payload; 1620 if (m_payload !is null) { 1621 ++(m_payload.refCount); 1622 } 1623 return this; 1624 } 1625 1626 @property bool initialized() const @safe pure nothrow 1627 { 1628 return (m_payload !is null) && (m_payload.handle !is null); 1629 } 1630 1631 void free() @safe 1632 { 1633 if (m_payload !is null && m_payload.free() != 0) { 1634 throw new ZmqException; 1635 } 1636 } 1637 1638 @property inout(void)* handle() inout @safe pure nothrow 1639 { 1640 if (m_payload !is null) { 1641 return m_payload.handle; 1642 } else { 1643 return null; 1644 } 1645 } 1646 1647 private: 1648 int detach() @safe nothrow 1649 { 1650 int rc = 0; 1651 if (m_payload !is null) { 1652 if (--(m_payload.refCount) < 1) { 1653 rc = m_payload.free(); 1654 } 1655 m_payload = null; 1656 } 1657 return rc; 1658 } 1659 1660 struct Payload 1661 { 1662 int refCount; 1663 void* handle; 1664 CFreeFunction freeFunc; 1665 1666 int free() @trusted nothrow 1667 { 1668 int rc = 0; 1669 if (handle !is null) { 1670 rc = freeFunc(handle); 1671 handle = null; 1672 freeFunc = null; 1673 } 1674 return rc; 1675 } 1676 } 1677 Payload* m_payload; 1678 } 1679 1680 unittest 1681 { 1682 import std.exception: assertNotThrown, assertThrown; 1683 static extern(C) int myFree(void* p) nothrow 1684 { 1685 auto v = cast(int*) p; 1686 if (*v == 0) { 1687 return -1; 1688 } else { 1689 *v = 0; 1690 return 0; 1691 } 1692 } 1693 1694 int i = 1; 1695 1696 { 1697 // Test constructor and properties. 1698 auto ra = Resource(&i, &myFree); 1699 assert (i == 1); 1700 assert (ra.initialized); 1701 assert (ra.handle == &i); 1702 1703 // Test postblit constructor 1704 auto rb = ra; 1705 assert (i == 1); 1706 assert (rb.initialized); 1707 assert (rb.handle == &i); 1708 1709 { 1710 // Test properties and free() with default-initialized object. 1711 Resource rc; 1712 assert (!rc.initialized); 1713 assert (rc.handle == null); 1714 assertNotThrown(rc.free()); 1715 1716 // Test assignment, both with and without detachment 1717 rc = rb; 1718 assert (i == 1); 1719 assert (rc.initialized); 1720 assert (rc.handle == &i); 1721 1722 int j = 2; 1723 auto rd = Resource(&j, &myFree); 1724 assert (rd.handle == &j); 1725 rd = rb; 1726 assert (j == 0); 1727 assert (i == 1); 1728 assert (rd.handle == &i); 1729 1730 // Test explicit free() 1731 int k = 3; 1732 auto re = Resource(&k, &myFree); 1733 assertNotThrown(re.free()); 1734 assert(k == 0); 1735 1736 // Test failure to free and assign (myFree(&k) fails when k == 0) 1737 re = Resource(&k, &myFree); 1738 assertThrown!ZmqException(re.free()); // We defined free(k == 0) as an error 1739 re = Resource(&k, &myFree); 1740 assertThrown!ZmqException(re = rb); 1741 } 1742 1743 // i should not be "freed" yet 1744 assert (i == 1); 1745 assert (ra.handle == &i); 1746 assert (rb.handle == &i); 1747 } 1748 // ...but now it should. 1749 assert (i == 0); 1750 } 1751 1752 1753 version(unittest) private string uniqueUrl(string p, int n = __LINE__) 1754 { 1755 import std.uuid; 1756 return p ~ "://" ~ randomUUID().toString(); 1757 } 1758 1759 1760 private auto trusted(alias func, Args...)(Args args) @trusted 1761 { 1762 return func(args); 1763 } 1764 1765 1766 // std.string.toStringz() is unsafe, so we provide our own implementation 1767 // tailored to the string sizes we are likely to encounter here. 1768 // Note that this implementation requires that the string be used immediately 1769 // upon return, and not stored, as the buffer will be reused most of the time. 1770 const char* zeroTermString(const char[] s) @safe nothrow 1771 { 1772 import std.algorithm: max; 1773 static char[] buf; 1774 immutable len = s.length + 1; 1775 if (buf.length < len) buf.length = max(len, 1023); 1776 buf[0 .. s.length] = s; 1777 buf[s.length] = '\0'; 1778 return buf.ptr; 1779 } 1780 1781 unittest 1782 { 1783 auto c1 = zeroTermString("Hello World!"); 1784 assert (c1[0 .. 13] == "Hello World!\0"); 1785 auto c2 = zeroTermString("foo"); 1786 assert (c2[0 .. 4] == "foo\0"); 1787 assert (c1 == c2); 1788 }