1 /** 2 A thin wrapper around the low-level C API of the $(LINK2 http://zeromq.org,$(ZMQ)) 3 messaging framework, for the $(LINK2 http://dlang.org,D programming language). 4 5 Most functions in this module have a one-to-one relationship with functions 6 in the underlying C API. Some adaptations have been made to make the API 7 safer, easier and more pleasant to use; most importantly: 8 $(UL 9 $(LI 10 Errors are signalled by means of exceptions rather than return 11 codes. In particular, the $(REF ZmqException) class provides 12 a standard textual message for any error condition, but it also 13 provides access to the $(D errno) code set by the C function 14 that reported the error.) 15 $(LI 16 Functions are appropriately marked with $(D @safe), $(D pure) 17 and $(D nothrow), thus facilitating their use in high-level D code.) 18 $(LI 19 Memory and resources (i.e. contexts, sockets and messages) are 20 automatically managed, thus preventing leaks.) 21 $(LI 22 Context, socket and message options are implemented as properties.) 23 ) 24 The names of functions and types in $(ZMQD) are very similar to those in 25 $(ZMQ), but they follow the D naming conventions. Thus, the library should 26 feel both familiar to $(ZMQ) users and natural to D users. A notable 27 deviation from the C API is that message parts are consistently called 28 "frames". For example, $(D zmq_msg_send()) becomes $(D zmqd.Frame.send()) 29 and so on. (Multipart messages were a late addition to $(ZMQ), and the "msg" 30 function names were well established at that point. The library's 31 developers have admitted that this is somewhat confusing, and the newer 32 CZMQ API consistently uses "frame" in function names.) 33 34 Due to the close correspondence with the C API, this documentation has 35 intentionally been kept sparse. There is really no reason to repeat the 36 contents of the $(ZMQAPI __start,$(ZMQ) reference manual) here. 37 Instead, the documentation for each function contains a "Corresponds to" 38 section that links to the appropriate pages in the $(ZMQ) reference. Any 39 details given in the present documentation mostly concern the D-specific 40 adaptations that have been made. 41 42 Also note that the examples generally only use the INPROC transport. The 43 reason for this is that the examples double as unittests, and we want to 44 avoid firewall troubles and other issues that could arise with the use of 45 network protocols such as TCP, PGM, etc., and the IPC protocol is not 46 supported on Windows. Anyway, they are only short 47 snippets that demonstrate the syntax; for more comprehensive and realistic 48 examples, please refer to the $(LINK2 http://zguide.zeromq.org/page:all, 49 $(ZMQ) Guide). Many of the examples in the Guide have been translated to 50 D, and can be found in the 51 $(LINK2 https://github.com/kyllingstad/zmqd/tree/master/examples,$(D examples)) 52 subdirectory of the $(ZMQD) source repository. 53 54 Version: 55 1.0.0-beta (compatible with $(ZMQ) >= 4.0.0) 56 Authors: 57 $(LINK2 http://github.com/kyllingstad,Lars T. Kyllingstad) 58 Copyright: 59 Copyright (c) 2013–2014, Lars T. Kyllingstad. All rights reserved. 60 License: 61 $(ZMQD) is released under a BSD licence (see LICENCE.txt for details).$(BR) 62 Please refer to the $(LINK2 http://zeromq.org/area:licensing,$(ZMQ) site) 63 for details about $(ZMQ) licensing. 64 Macros: 65 D = <code>$0</code> 66 EM = <em>$0</em> 67 LDOTS = … 68 QUOTE = <blockquote>$0</blockquote> 69 FREF = $(D $(LINK2 #$1,$1())) 70 REF = $(D $(LINK2 #$1,$1)) 71 COREF = $(D $(LINK2 http://dlang.org/phobos/core_$1.html#.$2,core.$1.$2)) 72 OBJREF = $(D $(LINK2 http://dlang.org/phobos/object.html#.$1,$1)) 73 STDREF = $(D $(LINK2 http://dlang.org/phobos/std_$1.html#.$2,std.$1.$2)) 74 ZMQ = ∅MQ 75 ZMQAPI = $(LINK2 http://api.zeromq.org/4-0:$1,$+) 76 ZMQD = $(ZMQ)D 77 ZMQREF = $(D $(ZMQAPI $1,$1)) 78 */ 79 module zmqd; 80 81 import core.time; 82 import std.typecons; 83 import deimos.zmq.zmq; 84 85 86 version(Windows) { 87 import std.c.windows.winsock: SOCKET; 88 } 89 90 // libsodium is enabled by default, since that is the case with ZeroMQ itself. 91 version (WithoutLibsodium) { } else version = WithLibsodium; 92 93 // Compatibility check 94 version(unittest) static this() 95 { 96 import std.stdio: stderr; 97 const v = zmqVersion(); 98 if (v.major != ZMQ_VERSION_MAJOR || v.minor != ZMQ_VERSION_MINOR) { 99 stderr.writefln( 100 "Warning: Potential ZeroMQ header/library incompatibility: " 101 ~"The header (binding) is for version %d.%d.%d, " 102 ~"while the library is version %d.%d.%d. Unittests may fail.", 103 ZMQ_VERSION_MAJOR, ZMQ_VERSION_MINOR, ZMQ_VERSION_PATCH, 104 v.major, v.minor, v.patch); 105 } 106 // Known incompatibilities 107 import std.algorithm: min, max; 108 const libVersion = ZMQ_MAKE_VERSION(v.major, v.minor, v.patch); 109 const older = min(ZMQ_VERSION, libVersion); 110 const newer = max(ZMQ_VERSION, libVersion); 111 if (older < ZMQ_MAKE_VERSION(4, 1, 0) && newer >= ZMQ_MAKE_VERSION(4, 1, 0)) { 112 stderr.writeln("Note: Version 4.1.0 is known to be ABI incompatible with older versions"); 113 } else if (older < ZMQ_MAKE_VERSION(4, 1, 1) && newer >= ZMQ_MAKE_VERSION(4, 1, 1)) { 114 stderr.writeln("Note: Version 4.1.1 is known to be ABI incompatible with older versions"); 115 } 116 } 117 118 119 @safe: 120 121 122 /** 123 Reports the $(ZMQ) library version. 124 125 Returns: 126 A $(STDREF typecons,Tuple) with three integer fields that represent the 127 three versioning levels: $(D major), $(D minor) and $(D patch). 128 Corresponds_to: 129 $(ZMQREF zmq_version()) 130 */ 131 Tuple!(int, "major", int, "minor", int, "patch") zmqVersion() nothrow 132 { 133 typeof(return) v; 134 trusted!zmq_version(&v.major, &v.minor, &v.patch); 135 return v; 136 } 137 138 139 /** 140 The various socket types. 141 142 These are described in the $(ZMQREF zmq_socket()) reference. 143 */ 144 enum SocketType 145 { 146 req = ZMQ_REQ, /// Corresponds to $(D ZMQ_REQ) 147 rep = ZMQ_REP, /// Corresponds to $(D ZMQ_REP) 148 dealer = ZMQ_DEALER, /// Corresponds to $(D ZMQ_DEALER) 149 router = ZMQ_ROUTER, /// Corresponds to $(D ZMQ_ROUTER) 150 pub = ZMQ_PUB, /// Corresponds to $(D ZMQ_PUB) 151 sub = ZMQ_SUB, /// Corresponds to $(D ZMQ_SUB) 152 xpub = ZMQ_XPUB, /// Corresponds to $(D ZMQ_XPUB) 153 xsub = ZMQ_XSUB, /// Corresponds to $(D ZMQ_XSUB) 154 push = ZMQ_PUSH, /// Corresponds to $(D ZMQ_PUSH) 155 pull = ZMQ_PULL, /// Corresponds to $(D ZMQ_PULL) 156 pair = ZMQ_PAIR, /// Corresponds to $(D ZMQ_PAIR) 157 stream = ZMQ_STREAM, /// Corresponds to $(D ZMQ_STREAM) 158 } 159 160 161 /// Security mechanisms. 162 enum Security 163 { 164 /// $(ZMQAPI zmq_null,NULL): No security or confidentiality. 165 none = ZMQ_NULL, 166 167 /// $(ZMQAPI zmq_plain,PLAIN): Clear-text authentication. 168 plain = ZMQ_PLAIN, 169 170 /// $(ZMQAPI zmq_curve,CURVE): Secure authentication and confidentiality. 171 curve = ZMQ_CURVE, 172 } 173 174 175 /** 176 An object that encapsulates a $(ZMQ) socket. 177 178 A default-initialized $(D Socket) is not a valid $(ZMQ) socket; it 179 must always be explicitly initialized with a constructor (see 180 $(FREF _Socket.this)): 181 --- 182 Socket s; // Not a valid socket yet 183 s = Socket(SocketType.push); // ...but now it is. 184 --- 185 This $(D struct) is noncopyable, which means that a socket is always 186 uniquely managed by a single $(D Socket) object. Functions that will 187 inspect or use the socket, but not take ownership of it, should take 188 a $(D ref Socket) parameter. Use $(STDREF algorithm,move) to move 189 a $(D Socket) to a different location (e.g. into a sink function that 190 takes it by value, or into a new variable). 191 192 The socket is automatically closed when the $(D Socket) object goes out 193 of scope. 194 195 Linger_period: 196 Note that Socket by default sets the socket's linger period to zero. 197 This deviates from the $(ZMQ) default (which is an infinite linger period). 198 */ 199 struct Socket 200 { 201 @safe: 202 /** 203 Creates a new $(ZMQ) socket. 204 205 If $(D context) is not specified, the default context (as returned 206 by $(FREF defaultContext)) is used. 207 208 Throws: 209 $(REF ZmqException) if $(ZMQ) reports an error. 210 Corresponds_to: 211 $(ZMQREF zmq_socket()) 212 */ 213 this(SocketType type) 214 { 215 this(defaultContext(), type); 216 } 217 218 /// ditto 219 this(Context context, SocketType type) 220 { 221 m_context = context; 222 m_type = type; 223 m_socket = trusted!zmq_socket(context.handle, type); 224 if (m_socket == null) { 225 throw new ZmqException; 226 } 227 linger = 0.msecs; 228 } 229 230 /// With default context: 231 unittest 232 { 233 auto sck = Socket(SocketType.push); 234 assert (sck.initialized); 235 } 236 /// With explicit context: 237 unittest 238 { 239 auto ctx = Context(); 240 auto sck = Socket(ctx, SocketType.push); 241 assert (sck.initialized); 242 } 243 244 // Socket objects are noncopyable. 245 @disable this(this); 246 247 unittest // Verify that move semantics work. 248 { 249 import std.algorithm: move; 250 struct SocketOwner 251 { 252 this(Socket s) { owned = trusted!move(s); } 253 Socket owned; 254 } 255 auto socket = Socket(SocketType.req); 256 const socketPtr = socket.handle; 257 assert (socketPtr != null); 258 259 auto owner = SocketOwner(trusted!move(socket)); 260 assert (socket.handle == null); 261 assert (!socket.m_context.initialized); 262 assert (owner.owned.handle == socketPtr); 263 assert (owner.owned.m_context.initialized); 264 } 265 266 // Closes the socket on desctruction. 267 ~this() nothrow { nothrowClose(); } 268 269 /** 270 Closes the $(ZMQ) socket. 271 272 Note that the socket will be closed automatically upon destruction, 273 so it is usually not necessary to call this method manually. 274 275 Throws: 276 $(REF ZmqException) if $(ZMQ) reports an error. 277 Corresponds_to: 278 $(ZMQREF zmq_close()) 279 */ 280 void close() 281 { 282 if (!nothrowClose()) throw new ZmqException; 283 } 284 285 /// 286 unittest 287 { 288 auto s = Socket(SocketType.pair); 289 assert (s.initialized); 290 s.close(); 291 assert (!s.initialized); 292 } 293 294 /** 295 Starts accepting incoming connections on $(D endpoint). 296 297 Throws: 298 $(REF ZmqException) if $(ZMQ) reports an error. 299 Corresponds_to: 300 $(ZMQREF zmq_bind()) 301 */ 302 void bind(const char[] endpoint) 303 { 304 if (trusted!zmq_bind(m_socket, zeroTermString(endpoint)) != 0) { 305 throw new ZmqException; 306 } 307 } 308 309 /// 310 unittest 311 { 312 auto s = Socket(SocketType.pub); 313 s.bind("inproc://zmqd_bind_example"); 314 } 315 316 /** 317 Stops accepting incoming connections on $(D endpoint). 318 319 Throws: 320 $(REF ZmqException) if $(ZMQ) reports an error. 321 Corresponds_to: 322 $(ZMQREF zmq_unbind()) 323 */ 324 void unbind(const char[] endpoint) 325 { 326 if (trusted!zmq_unbind(m_socket, zeroTermString(endpoint)) != 0) { 327 throw new ZmqException; 328 } 329 } 330 331 // TODO: Remove version(Posix) and change to INPROC when updating to ZMQ 4.1. 332 // IPC does not work on Windows, and unbind() does not work with INPROC. 333 // See: https://github.com/zeromq/libzmq/issues/949 334 /// 335 version (Posix) unittest 336 { 337 auto s = Socket(SocketType.pub); 338 s.bind("ipc://zmqd_unbind_example"); 339 // Do some work... 340 s.unbind("ipc://zmqd_unbind_example"); 341 } 342 343 /** 344 Creates an outgoing connection to $(D endpoint). 345 346 Throws: 347 $(REF ZmqException) if $(ZMQ) reports an error. 348 Corresponds_to: 349 $(ZMQREF zmq_connect()) 350 */ 351 void connect(const char[] endpoint) 352 { 353 if (trusted!zmq_connect(m_socket, zeroTermString(endpoint)) != 0) { 354 throw new ZmqException; 355 } 356 } 357 358 /// 359 unittest 360 { 361 auto s = Socket(SocketType.sub); 362 s.connect("inproc://zmqd_connect_example"); 363 } 364 365 /** 366 Disconnects the socket from $(D endpoint). 367 368 Throws: 369 $(REF ZmqException) if $(ZMQ) reports an error. 370 Corresponds_to: 371 $(ZMQREF zmq_disconnect()) 372 */ 373 void disconnect(const char[] endpoint) 374 { 375 if (trusted!zmq_disconnect(m_socket, zeroTermString(endpoint)) != 0) { 376 throw new ZmqException; 377 } 378 } 379 380 /// 381 unittest 382 { 383 auto s = Socket(SocketType.sub); 384 s.connect("inproc://zmqd_disconnect_example"); 385 // Do some work... 386 s.disconnect("inproc://zmqd_disconnect_example"); 387 } 388 389 /** 390 Sends a message frame. 391 392 $(D _send) blocks until the frame has been queued on the socket. 393 $(D trySend) performs the operation in non-blocking mode, and returns 394 a $(D bool) value that signifies whether the frame was queued on the 395 socket. 396 397 The $(D more) parameter specifies whether this is a multipart message 398 and there are more frames to follow. 399 400 The $(D char[]) overload is a convenience function that simply casts 401 the string argument to $(D ubyte[]). 402 403 Throws: 404 $(REF ZmqException) if $(ZMQ) reports an error. 405 Corresponds_to: 406 $(ZMQREF zmq_send()) (with the $(D ZMQ_DONTWAIT) flag, in the 407 case of $(D trySend), and with the $(D ZMQ_SNDMORE) flag if 408 $(D more == true)). 409 */ 410 void send(const ubyte[] data, bool more = false) 411 { 412 immutable flags = more ? ZMQ_SNDMORE : 0; 413 if (trusted!zmq_send(m_socket, data.ptr, data.length, flags) < 0) { 414 throw new ZmqException; 415 } 416 } 417 418 /// ditto 419 void send(const char[] data, bool more = false) 420 { 421 send(cast(const(ubyte)[]) data, more); 422 } 423 424 /// ditto 425 bool trySend(const ubyte[] data, bool more = false) 426 { 427 immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0); 428 if (trusted!zmq_send(m_socket, data.ptr, data.length, flags) < 0) { 429 import core.stdc.errno; 430 if (errno == EAGAIN) return false; 431 else throw new ZmqException; 432 } 433 return true; 434 } 435 436 /// ditto 437 bool trySend(const char[] data, bool more = false) 438 { 439 return trySend(cast(const(ubyte)[]) data, more); 440 } 441 442 /// 443 unittest 444 { 445 auto sck = Socket(SocketType.pub); 446 sck.send(cast(ubyte[]) [11, 226, 92]); 447 sck.send("Hello World!"); 448 } 449 450 /** 451 Sends a message frame. 452 453 $(D _send) blocks until the frame has been queued on the socket. 454 $(D trySend) performs the operation in non-blocking mode, and returns 455 a $(D bool) value that signifies whether the frame was queued on the 456 socket. 457 458 The $(D more) parameter specifies whether this is a multipart message 459 and there are more frames to follow. 460 461 Throws: 462 $(REF ZmqException) if $(ZMQ) reports an error. 463 Corresponds_to: 464 $(ZMQREF zmq_msg_send()) (with the $(D ZMQ_DONTWAIT) flag, in the 465 case of $(D trySend), and with the $(D ZMQ_SNDMORE) flag if 466 $(D more == true)). 467 */ 468 void send(ref Frame msg, bool more = false) 469 { 470 immutable flags = more ? ZMQ_SNDMORE : 0; 471 if (trusted!zmq_msg_send(msg.handle, m_socket, flags) < 0) { 472 throw new ZmqException; 473 } 474 } 475 476 /// ditto 477 bool trySend(ref Frame msg, bool more = false) 478 { 479 immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0); 480 if (trusted!zmq_msg_send(msg.handle, m_socket, flags) < 0) { 481 import core.stdc.errno; 482 if (errno == EAGAIN) return false; 483 else throw new ZmqException; 484 } 485 return true; 486 } 487 488 /// 489 unittest 490 { 491 auto sck = Socket(SocketType.pub); 492 auto msg = Frame(12); 493 msg.data.asString()[] = "Hello World!"; 494 sck.send(msg); 495 } 496 497 /** 498 Sends a constant-memory message frame. 499 500 $(D _sendConst) blocks until the frame has been queued on the socket. 501 $(D trySendConst) performs the operation in non-blocking mode, and returns 502 a $(D bool) value that signifies whether the frame was queued on the 503 socket. 504 505 The $(D more) parameter specifies whether this is a multipart message 506 and there are more frames to follow. 507 508 Throws: 509 $(REF ZmqException) if $(ZMQ) reports an error. 510 Corresponds_to: 511 $(ZMQREF zmq_send_const()) (with the $(D ZMQ_DONTWAIT) flag, in the 512 case of $(D trySend), and with the $(D ZMQ_SNDMORE) flag if 513 $(D more == true)). 514 */ 515 void sendConst(immutable ubyte[] data, bool more = false) 516 { 517 immutable flags = more ? ZMQ_SNDMORE : 0; 518 if (trusted!zmq_send_const(m_socket, data.ptr, data.length, flags) < 0) { 519 throw new ZmqException; 520 } 521 } 522 523 /// ditto 524 void sendConst(string data, bool more = false) 525 { 526 sendConst(cast(immutable(ubyte)[]) data, more); 527 } 528 529 /// ditto 530 bool trySendConst(immutable ubyte[] data, bool more = false) 531 { 532 immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0); 533 if (trusted!zmq_send_const(m_socket, data.ptr, data.length, flags) < 0) { 534 import core.stdc.errno; 535 if (errno == EAGAIN) return false; 536 else throw new ZmqException; 537 } 538 return true; 539 } 540 541 /// ditto 542 bool trySendConst(string data, bool more = false) 543 { 544 return trySend(cast(immutable(ubyte)[]) data, more); 545 } 546 547 /// 548 unittest 549 { 550 static immutable arr = cast(ubyte[]) [11, 226, 92]; 551 auto sck = Socket(SocketType.pub); 552 sck.sendConst(arr); 553 sck.sendConst("Hello World!"); 554 } 555 556 /** 557 Receives a message frame. 558 559 $(D _receive) blocks until the request can be satisfied, and returns the 560 number of bytes in the frame. 561 $(D tryReceive) performs the operation in non-blocking mode, and returns 562 a $(STDREF typecons,Tuple) which contains the size of the frame along 563 with a $(D bool) value that signifies whether a frame was received. 564 (If the latter is $(D false), the former is always set to zero.) 565 566 Throws: 567 $(REF ZmqException) if $(ZMQ) reports an error. 568 Corresponds_to: 569 $(ZMQREF zmq_recv()) (with the $(D ZMQ_DONTWAIT) flag, in the case 570 of $(D tryReceive)). 571 572 */ 573 size_t receive(ubyte[] data) 574 { 575 immutable len = trusted!zmq_recv(m_socket, data.ptr, data.length, 0); 576 if (len >= 0) { 577 import std.conv; 578 return to!size_t(len); 579 } else { 580 throw new ZmqException; 581 } 582 } 583 584 /// ditto 585 Tuple!(size_t, bool) tryReceive(ubyte[] data) 586 { 587 immutable len = trusted!zmq_recv(m_socket, data.ptr, data.length, ZMQ_DONTWAIT); 588 if (len >= 0) { 589 import std.conv; 590 return typeof(return)(to!size_t(len), true); 591 } else { 592 import core.stdc.errno; 593 if (errno == EAGAIN) { 594 return typeof(return)(0, false); 595 } else { 596 throw new ZmqException; 597 } 598 } 599 } 600 601 /// 602 unittest 603 { 604 // Sender 605 auto snd = Socket(SocketType.req); 606 snd.connect("inproc://zmqd_receive_example"); 607 snd.send("Hello World!"); 608 609 // Receiver 610 import std.string: representation; 611 auto rcv = Socket(SocketType.rep); 612 rcv.bind("inproc://zmqd_receive_example"); 613 char[256] buf; 614 immutable len = rcv.receive(buf.representation); 615 assert (buf[0 .. len] == "Hello World!"); 616 } 617 618 @system unittest 619 { 620 auto snd = Socket(SocketType.pair); 621 snd.bind("inproc://zmqd_tryReceive_example"); 622 auto rcv = Socket(SocketType.pair); 623 rcv.connect("inproc://zmqd_tryReceive_example"); 624 625 ubyte[256] buf; 626 auto r1 = rcv.tryReceive(buf); 627 assert (!r1[1]); 628 629 import core.thread, core.time, std.string; 630 snd.send("Hello World!"); 631 Thread.sleep(100.msecs); // Wait for message to be transferred... 632 auto r2 = rcv.tryReceive(buf); 633 assert (r2[1] && buf[0 .. r2[0]] == "Hello World!".representation); 634 } 635 636 /** 637 Receives a message frame. 638 639 $(D _receive) blocks until the request can be satisfied, and returns the 640 number of bytes in the frame. 641 $(D tryReceive) performs the operation in non-blocking mode, and returns 642 a $(STDREF typecons,Tuple) which contains the size of the frame along 643 with a $(D bool) value that signifies whether a frame was received. 644 (If the latter is $(D false), the former is always set to zero.) 645 646 Throws: 647 $(REF ZmqException) if $(ZMQ) reports an error. 648 Corresponds_to: 649 $(ZMQREF zmq_msg_recv()) (with the $(D ZMQ_DONTWAIT) flag, in the case 650 of $(D tryReceive)). 651 652 */ 653 size_t receive(ref Frame msg) 654 { 655 immutable len = trusted!zmq_msg_recv(msg.handle, m_socket, 0); 656 if (len >= 0) { 657 import std.conv; 658 return to!size_t(len); 659 } else { 660 throw new ZmqException; 661 } 662 } 663 664 /// ditto 665 Tuple!(size_t, bool) tryReceive(ref Frame msg) 666 { 667 immutable len = trusted!zmq_msg_recv(msg.handle, m_socket, ZMQ_DONTWAIT); 668 if (len >= 0) { 669 import std.conv; 670 return typeof(return)(to!size_t(len), true); 671 } else { 672 import core.stdc.errno; 673 if (errno == EAGAIN) { 674 return typeof(return)(0, false); 675 } else { 676 throw new ZmqException; 677 } 678 } 679 } 680 681 /// 682 unittest 683 { 684 // Sender 685 auto snd = Socket(SocketType.req); 686 snd.connect("inproc://zmqd_msg_receive_example"); 687 snd.send("Hello World!"); 688 689 // Receiver 690 import std.string: representation; 691 auto rcv = Socket(SocketType.rep); 692 rcv.bind("inproc://zmqd_msg_receive_example"); 693 auto msg = Frame(); 694 rcv.receive(msg); 695 assert (msg.data.asString() == "Hello World!"); 696 } 697 698 @system unittest 699 { 700 auto snd = Socket(SocketType.pair); 701 snd.bind("inproc://zmqd_msg_tryReceive_example"); 702 auto rcv = Socket(SocketType.pair); 703 rcv.connect("inproc://zmqd_msg_tryReceive_example"); 704 705 auto msg = Frame(); 706 auto r1 = rcv.tryReceive(msg); 707 assert (!r1[1]); 708 709 import core.thread, core.time, std.string; 710 snd.send("Hello World!"); 711 Thread.sleep(100.msecs); // Wait for message to be transferred... 712 auto r2 = rcv.tryReceive(msg); 713 assert (r2[1] && msg.data[0 .. r2[0]] == "Hello World!".representation); 714 } 715 716 /** 717 The socket _type. 718 719 Throws: 720 $(REF ZmqException) if $(ZMQ) reports an error. 721 Corresponds_to: 722 $(ZMQREF zmq_getsockopt()) with $(D ZMQ_TYPE). 723 */ 724 @property SocketType type() { return getOption!SocketType(ZMQ_TYPE); } 725 726 /// 727 unittest 728 { 729 auto sck = Socket(SocketType.xpub); 730 assert (sck.type == SocketType.xpub); 731 } 732 733 /** 734 Whether there are _more message frames to follow. 735 736 Throws: 737 $(REF ZmqException) if $(ZMQ) reports an error. 738 Corresponds_to: 739 $(ZMQREF zmq_getsockopt()) with $(D ZMQ_RCVMORE). 740 */ 741 @property bool more() { return !!getOption!int(ZMQ_RCVMORE); } 742 743 // TODO: Better unittest/example 744 unittest 745 { 746 auto sck = Socket(SocketType.req); 747 assert (!sck.more); 748 } 749 750 /** 751 Misc. socket options. 752 753 Each of these has a one-to-one correspondence with an option passed to 754 $(ZMQREF zmq_getsockopt()) and $(ZMQREF zmq_setsockopt()). For 755 example, $(D identity) corresponds to $(D ZMQ_IDENTITY), 756 $(D receiveBufferSize) corresponds to $(D ZMQ_RCVBUF), etc. 757 758 Notes: 759 $(UL 760 $(LI For convenience, the setter for the $(D identity) property 761 accepts strings. To retrieve a string with the getter, use 762 the $(FREF asString) function. 763 --- 764 sck.identity = "foobar"; 765 assert (sck.identity.asString() == "foobar"); 766 --- 767 ) 768 $(LI The $(D linger), $(D receiveTimeout) and $(D sendTimeout) 769 properties may have the special value $(COREF time,Duration.max), 770 which in this context specifies an infinite duration. This is 771 translated to an option value of -1 in the C API (and it is also 772 the default value for all of them).) 773 $(LI Some options have array type, and these allow the user to supply 774 a buffer in which to store the value, to avoid a GC allocation. 775 The return value is then a slice of this buffer. 776 These are not marked as $(D @property), but are prefixed with 777 "get" (e.g. $(D getIdentity())). A user-supplied buffer is 778 $(I required) for some options, namely $(D getPlainUsername()) 779 and $(D getPlainPassword()), and these do not have $(D @property) 780 versions. $(D getCurveXxxKey()) and $(D getCurveXxxKeyZ85()) 781 require buffers which are at least 32 and 41 bytes long, 782 respectively.) 783 $(LI The $(D ZMQ_SUBSCRIBE) and $(D ZMQ_UNSUBSCRIBE) options are 784 treated differently from the others; see $(FREF Socket.subscribe) 785 and $(FREF Socket.unsubscribe)) 786 ) 787 788 Throws: 789 $(REF ZmqException) if $(ZMQ) reports an error.$(BR) 790 $(STDREF conv,ConvOverflowException) if a given $(D Duration) is 791 longer than the number of milliseconds that will fit in an $(D int) 792 (only applies to properties of $(COREF time,Duration) type).$(BR) 793 $(COREF exception,RangeError) if the $(D dest) buffers passed to 794 $(D getCurveXxxKey()) or $(D getCurveXxxKeyZ85()) are less than 795 32 or 41 bytes long, respectively. 796 Corresponds_to: 797 $(ZMQREF zmq_getsockopt()) and $(ZMQREF zmq_setsockopt()). 798 */ 799 @property int sendHWM() { return getOption!int(ZMQ_SNDHWM); } 800 /// ditto 801 @property void sendHWM(int value) { setOption(ZMQ_SNDHWM, value); } 802 803 /// ditto 804 @property int receiveHWM() { return getOption!int(ZMQ_RCVHWM); } 805 /// ditto 806 @property void receiveHWM(int value) { setOption(ZMQ_RCVHWM, value); } 807 808 /// ditto 809 @property ulong threadAffinity() { return getOption!ulong(ZMQ_AFFINITY); } 810 /// ditto 811 @property void threadAffinity(ulong value) { setOption(ZMQ_AFFINITY, value); } 812 813 /// ditto 814 @property ubyte[] identity() { return getIdentity(new ubyte[255]); } 815 /// ditto 816 ubyte[] getIdentity(ubyte[] dest) { return getArrayOption(ZMQ_IDENTITY, dest); } 817 /// ditto 818 @property void identity(const ubyte[] value) { setArrayOption(ZMQ_IDENTITY, value); } 819 /// ditto 820 @property void identity(const char[] value) { setArrayOption(ZMQ_IDENTITY, value); } 821 822 /// ditto 823 @property int rate() { return getOption!int(ZMQ_RATE); } 824 /// ditto 825 @property void rate(int value) { setOption(ZMQ_RATE, value); } 826 827 /// ditto 828 @property Duration recoveryInterval() 829 { 830 return msecs(getOption!int(ZMQ_RECOVERY_IVL)); 831 } 832 /// ditto 833 @property void recoveryInterval(Duration value) 834 { 835 import std.conv: to; 836 setOption(ZMQ_RECOVERY_IVL, to!int(value.total!"msecs"())); 837 } 838 839 /// ditto 840 @property int sendBufferSize() { return getOption!int(ZMQ_SNDBUF); } 841 /// ditto 842 @property void sendBufferSize(int value) { setOption(ZMQ_SNDBUF, value); } 843 844 /// ditto 845 @property int receiveBufferSize() { return getOption!int(ZMQ_RCVBUF); } 846 /// ditto 847 @property void receiveBufferSize(int value) { setOption(ZMQ_RCVBUF, value); } 848 849 /// ditto 850 @property Duration linger() 851 { 852 const auto value = getOption!int(ZMQ_LINGER); 853 return value == -1 ? Duration.max : value.msecs; 854 } 855 /// ditto 856 @property void linger(Duration value) 857 { 858 import std.conv: to; 859 setOption(ZMQ_LINGER, 860 value == Duration.max ? -1 : to!int(value.total!"msecs"())); 861 } 862 863 /// ditto 864 @property Duration reconnectionInterval() 865 { 866 return getOption!int(ZMQ_RECONNECT_IVL).msecs; 867 } 868 /// ditto 869 @property void reconnectionInterval(Duration value) 870 { 871 import std.conv: to; 872 setOption(ZMQ_RECONNECT_IVL, to!int(value.total!"msecs"())); 873 } 874 875 /// ditto 876 @property Duration maxReconnectionInterval() 877 { 878 return getOption!int(ZMQ_RECONNECT_IVL_MAX).msecs; 879 } 880 /// ditto 881 @property void maxReconnectionInterval(Duration value) 882 { 883 import std.conv: to; 884 setOption(ZMQ_RECONNECT_IVL_MAX, to!int(value.total!"msecs"())); 885 } 886 887 /// ditto 888 @property int backlog() { return getOption!int(ZMQ_BACKLOG); } 889 /// ditto 890 @property void backlog(int value) { setOption(ZMQ_BACKLOG, value); } 891 892 /// ditto 893 @property long maxMsgSize() { return getOption!long(ZMQ_MAXMSGSIZE); } 894 /// ditto 895 @property void maxMsgSize(long value) { setOption(ZMQ_MAXMSGSIZE, value); } 896 897 /// ditto 898 @property int multicastHops() { return getOption!int(ZMQ_MULTICAST_HOPS); } 899 /// ditto 900 @property void multicastHops(int value) { setOption(ZMQ_MULTICAST_HOPS, value); } 901 902 /// ditto 903 @property Duration receiveTimeout() 904 { 905 const value = getOption!int(ZMQ_RCVTIMEO); 906 return value == -1 ? Duration.max : value.msecs; 907 } 908 /// ditto 909 @property void receiveTimeout(Duration value) 910 { 911 import std.conv: to; 912 setOption(ZMQ_RCVTIMEO, 913 value == Duration.max ? -1 : to!int(value.total!"msecs"())); 914 } 915 916 /// ditto 917 @property Duration sendTimeout() 918 { 919 const value = getOption!int(ZMQ_SNDTIMEO); 920 return value == -1 ? Duration.max : value.msecs; 921 } 922 /// ditto 923 @property void sendTimeout(Duration value) 924 { 925 import std.conv: to; 926 setOption(ZMQ_SNDTIMEO, 927 value == Duration.max ? -1 : to!int(value.total!"msecs"())); 928 } 929 930 /// ditto 931 @property bool ipv6() { return !!getOption!int(ZMQ_IPV6); } 932 /// ditto 933 @property void ipv6(bool value) { setOption(ZMQ_IPV6, value ? 1 : 0); } 934 935 /// ditto 936 deprecated("Use !ipv6 instead") 937 @property bool ipv4Only() { return !ipv6; } 938 /// ditto 939 deprecated("Use ipv6 = !value instead") 940 @property void ipv4Only(bool value) { ipv6 = !value; } 941 942 /// ditto 943 @property bool immediate() { return !!getOption!int(ZMQ_IMMEDIATE); } 944 /// ditto 945 @property void immediate(bool value) { setOption!int(ZMQ_IMMEDIATE, value ? 1 : 0); } 946 947 /// ditto 948 deprecated("Use the 'immediate' property instead") 949 @property bool delayAttachOnConnect() { return !!getOption!int(ZMQ_DELAY_ATTACH_ON_CONNECT); } 950 /// ditto 951 deprecated("Use the 'immediate' property instead") 952 @property void delayAttachOnConnect(bool value) { setOption(ZMQ_DELAY_ATTACH_ON_CONNECT, value ? 1 : 0); } 953 954 /// ditto 955 @property FD fd() { return getOption!FD(ZMQ_FD); } 956 957 /// ditto 958 @property PollFlags events() { return getOption!PollFlags(ZMQ_EVENTS); } 959 960 /// ditto 961 @property char[] lastEndpoint() @trusted 962 { 963 // This function is not @safe because it calls a @system function 964 // (zmq_getsockopt) and takes the address of a local (len). 965 auto buf = new char[1024]; 966 size_t len = buf.length; 967 if (zmq_getsockopt(m_socket, ZMQ_LAST_ENDPOINT, buf.ptr, &len) != 0) { 968 throw new ZmqException; 969 } 970 return buf[0 .. len-1]; 971 } 972 973 /// ditto 974 @property void routerMandatory(bool value) { setOption(ZMQ_ROUTER_MANDATORY, value ? 1 : 0); } 975 976 /// ditto 977 @property void probeRouter(bool value) { setOption(ZMQ_PROBE_ROUTER, value ? 1 : 0); } 978 979 /// ditto 980 @property void xpubVerbose(bool value) { setOption(ZMQ_XPUB_VERBOSE, value ? 1 : 0); } 981 982 /// ditto 983 @property void reqCorrelate(bool value) { setOption(ZMQ_REQ_CORRELATE, value ? 1 : 0); } 984 985 /// ditto 986 @property void reqRelaxed(bool value) { setOption(ZMQ_REQ_RELAXED, value ? 1 : 0); } 987 988 /// ditto 989 @property int tcpKeepalive() { return getOption!int(ZMQ_TCP_KEEPALIVE); } 990 /// ditto 991 @property void tcpKeepalive(int value) { setOption(ZMQ_TCP_KEEPALIVE, value); } 992 993 /// ditto 994 @property int tcpKeepaliveIdle() { return getOption!int(ZMQ_TCP_KEEPALIVE_IDLE); } 995 /// ditto 996 @property void tcpKeepaliveIdle(int value) { setOption(ZMQ_TCP_KEEPALIVE_IDLE, value); } 997 998 /// ditto 999 @property int tcpKeepaliveCnt() { return getOption!int(ZMQ_TCP_KEEPALIVE_CNT); } 1000 /// ditto 1001 @property void tcpKeepaliveCnt(int value) { setOption(ZMQ_TCP_KEEPALIVE_CNT, value); } 1002 1003 /// ditto 1004 @property int tcpKeepaliveIntvl() { return getOption!int(ZMQ_TCP_KEEPALIVE_INTVL); } 1005 /// ditto 1006 @property void tcpKeepaliveIntvl(int value) { setOption(ZMQ_TCP_KEEPALIVE_INTVL, value); } 1007 1008 /// ditto 1009 @property Security mechanism() 1010 { 1011 import std.conv; 1012 return to!Security(getOption!int(ZMQ_MECHANISM)); 1013 } 1014 1015 /// ditto 1016 @property bool plainServer() { return !!getOption!int(ZMQ_PLAIN_SERVER); } 1017 /// ditto 1018 @property void plainServer(bool value) { setOption(ZMQ_PLAIN_SERVER, value ? 1 : 0); } 1019 1020 /// ditto 1021 char[] getPlainUsername(char[] dest) 1022 { 1023 return getCStringOption(ZMQ_PLAIN_USERNAME, dest); 1024 } 1025 /// ditto 1026 @property void plainUsername(const(char)[] value) 1027 { 1028 setArrayOption(ZMQ_PLAIN_USERNAME, value); 1029 } 1030 1031 /// ditto 1032 char[] getPlainPassword(char[] dest) 1033 { 1034 return getCStringOption(ZMQ_PLAIN_PASSWORD, dest); 1035 } 1036 /// ditto 1037 @property void plainPassword(const(char)[] value) 1038 { 1039 setArrayOption(ZMQ_PLAIN_PASSWORD, value); 1040 } 1041 1042 version (WithLibsodium) { 1043 1044 /// ditto 1045 @property bool curveServer() { return !!getOption!int(ZMQ_CURVE_SERVER); } 1046 /// ditto 1047 @property void curveServer(bool value) { setOption(ZMQ_CURVE_SERVER, value ? 1 : 0); } 1048 1049 /// ditto 1050 @property ubyte[] curvePublicKey() 1051 { 1052 return getCurvePublicKey(new ubyte[keyBufSizeBin]); 1053 } 1054 /// ditto 1055 ubyte[] getCurvePublicKey(ubyte[] dest) 1056 { 1057 return getCurveKey(ZMQ_CURVE_PUBLICKEY, dest); 1058 } 1059 /// ditto 1060 @property char[] curvePublicKeyZ85() 1061 { 1062 return getCurvePublicKeyZ85(new char[keyBufSizeZ85]); 1063 } 1064 /// ditto 1065 char[] getCurvePublicKeyZ85(char[] dest) 1066 { 1067 return getCurveKeyZ85(ZMQ_CURVE_PUBLICKEY, dest); 1068 } 1069 /// ditto 1070 @property void curvePublicKey(const(ubyte)[] value) 1071 { 1072 setCurveKey(ZMQ_CURVE_PUBLICKEY, value); 1073 } 1074 /// ditto 1075 @property void curvePublicKeyZ85(const(char)[] value) 1076 { 1077 setCurveKeyZ85(ZMQ_CURVE_PUBLICKEY, value); 1078 } 1079 1080 /// ditto 1081 @property ubyte[] curveSecretKey() 1082 { 1083 return getCurveSecretKey(new ubyte[keyBufSizeBin]); 1084 } 1085 /// ditto 1086 ubyte[] getCurveSecretKey(ubyte[] dest) 1087 { 1088 return getCurveKey(ZMQ_CURVE_SECRETKEY, dest); 1089 } 1090 /// ditto 1091 @property char[] curveSecretKeyZ85() 1092 { 1093 return getCurveSecretKeyZ85(new char[keyBufSizeZ85]); 1094 } 1095 /// ditto 1096 char[] getCurveSecretKeyZ85(char[] dest) 1097 { 1098 return getCurveKeyZ85(ZMQ_CURVE_SECRETKEY, dest); 1099 } 1100 /// ditto 1101 @property void curveSecretKey(const(ubyte)[] value) 1102 { 1103 setCurveKey(ZMQ_CURVE_SECRETKEY, value); 1104 } 1105 /// ditto 1106 @property void curveSecretKeyZ85(const(char)[] value) 1107 { 1108 setCurveKeyZ85(ZMQ_CURVE_SECRETKEY, value); 1109 } 1110 1111 /// ditto 1112 @property ubyte[] curveServerKey() 1113 { 1114 return getCurveServerKey(new ubyte[keyBufSizeBin]); 1115 } 1116 /// ditto 1117 ubyte[] getCurveServerKey(ubyte[] dest) 1118 { 1119 return getCurveKey(ZMQ_CURVE_SERVERKEY, dest); 1120 } 1121 /// ditto 1122 @property char[] curveServerKeyZ85() 1123 { 1124 return getCurveServerKeyZ85(new char[keyBufSizeZ85]); 1125 } 1126 /// ditto 1127 char[] getCurveServerKeyZ85(char[] dest) 1128 { 1129 return getCurveKeyZ85(ZMQ_CURVE_SERVERKEY, dest); 1130 } 1131 /// ditto 1132 @property void curveServerKey(const(ubyte)[] value) 1133 { 1134 setCurveKey(ZMQ_CURVE_SERVERKEY, value); 1135 } 1136 /// ditto 1137 @property void curveServerKeyZ85(const(char)[] value) 1138 { 1139 setCurveKeyZ85(ZMQ_CURVE_SERVERKEY, value); 1140 } 1141 1142 } // version (WithLibsodium) 1143 1144 /// ditto 1145 @property char[] zapDomain() { return getZapDomain(new char[256]); } 1146 /// ditto 1147 char[] getZapDomain(char[] dest) { return getCStringOption(ZMQ_ZAP_DOMAIN, dest); } 1148 /// ditto 1149 @property void zapDomain(const char[] value) { setArrayOption(ZMQ_ZAP_DOMAIN, value); } 1150 1151 /// ditto 1152 @property void conflate(bool value) { setOption(ZMQ_CONFLATE, value ? 1 : 0); } 1153 1154 unittest 1155 { 1156 // We test all the socket options by checking that they have their default value. 1157 auto s = Socket(SocketType.xpub); 1158 const e = "inproc://unittest2"; 1159 s.bind(e); 1160 import core.time; 1161 assert(s.type == SocketType.xpub); 1162 assert(s.sendHWM == 1000); 1163 assert(s.receiveHWM == 1000); 1164 assert(s.threadAffinity == 0); 1165 assert(s.identity == null); 1166 assert(s.rate == 100); 1167 assert(s.recoveryInterval == 10.seconds); 1168 assert(s.sendBufferSize == 0); 1169 assert(s.receiveBufferSize == 0); 1170 assert(s.linger == 0.hnsecs); 1171 assert(s.reconnectionInterval == 100.msecs); 1172 assert(s.maxReconnectionInterval == Duration.zero); 1173 assert(s.backlog == 100); 1174 assert(s.maxMsgSize == -1); 1175 assert(s.multicastHops == 1); 1176 assert(s.receiveTimeout == Duration.max); 1177 assert(s.sendTimeout == Duration.max); 1178 assert(!s.ipv6); 1179 assert(!s.immediate); 1180 version(Posix) { 1181 assert(s.fd > 2); // 0, 1 and 2 are the standard streams 1182 } 1183 assert(s.events == PollFlags.pollOut); 1184 assert(s.tcpKeepalive == -1); 1185 assert(s.tcpKeepaliveIdle == -1); 1186 assert(s.tcpKeepaliveCnt == -1); 1187 assert(s.tcpKeepaliveIntvl == -1); 1188 assert(s.mechanism == Security.none); 1189 assert(!s.plainServer); 1190 assert(s.getPlainUsername(new char[8]).length == 0); 1191 assert(s.getPlainPassword(new char[8]).length == 0); 1192 version (WithLibsodium) { 1193 assert(!s.curveServer); 1194 } 1195 assert(s.zapDomain.length == 0); 1196 1197 // Test setters and getters together 1198 s.sendHWM = 500; 1199 assert(s.sendHWM == 500); 1200 s.receiveHWM = 600; 1201 assert(s.receiveHWM == 600); 1202 s.threadAffinity = 1; 1203 assert(s.threadAffinity == 1); 1204 s.identity = cast(ubyte[]) [ 65, 66, 67 ]; 1205 assert(s.identity == [65, 66, 67]); 1206 s.identity = "foo"; 1207 assert(s.identity == [102, 111, 111]); 1208 s.rate = 200; 1209 assert(s.rate == 200); 1210 s.recoveryInterval = 5.seconds; 1211 assert(s.recoveryInterval == 5_000.msecs); 1212 s.sendBufferSize = 500; 1213 assert(s.sendBufferSize == 500); 1214 s.receiveBufferSize = 600; 1215 assert(s.receiveBufferSize == 600); 1216 s.linger = Duration.zero; 1217 assert(s.linger == Duration.zero); 1218 s.linger = 100_000.usecs; 1219 assert(s.linger == 100.msecs); 1220 s.linger = Duration.max; 1221 assert(s.linger == Duration.max); 1222 s.reconnectionInterval = 200_000.usecs; 1223 assert(s.reconnectionInterval == 200.msecs); 1224 s.maxReconnectionInterval = 300_000.usecs; 1225 assert(s.maxReconnectionInterval == 300.msecs); 1226 s.backlog = 50; 1227 assert(s.backlog == 50); 1228 s.maxMsgSize = 1000; 1229 assert(s.maxMsgSize == 1000); 1230 s.multicastHops = 2; 1231 assert(s.multicastHops == 2); 1232 s.receiveTimeout = 3.seconds; 1233 assert(s.receiveTimeout == 3_000_000.usecs); 1234 s.receiveTimeout = Duration.max; 1235 assert(s.receiveTimeout == Duration.max); 1236 s.sendTimeout = 2_000_000.usecs; 1237 assert(s.sendTimeout == 2.seconds); 1238 s.sendTimeout = Duration.max; 1239 assert(s.sendTimeout == Duration.max); 1240 s.ipv6 = true; 1241 assert(s.ipv6); 1242 s.immediate = true; 1243 assert(s.immediate); 1244 s.tcpKeepalive = 1; 1245 assert(s.tcpKeepalive == 1); 1246 s.tcpKeepaliveIdle = 0; 1247 assert(s.tcpKeepaliveIdle == 0); 1248 s.tcpKeepaliveCnt = 1; 1249 assert(s.tcpKeepaliveCnt == 1); 1250 s.tcpKeepaliveIntvl = 0; 1251 assert(s.tcpKeepaliveIntvl == 0); 1252 s.plainServer = true; 1253 assert(s.mechanism == Security.plain); 1254 assert(s.plainServer); 1255 version (WithLibsodium) { 1256 assert(!s.curveServer); 1257 s.curveServer = true; 1258 assert(s.mechanism == Security.curve); 1259 assert(!s.plainServer); 1260 assert(s.curveServer); 1261 } 1262 s.plainServer = false; 1263 assert(s.mechanism == Security.none); 1264 assert(!s.plainServer); 1265 version (WithLibsodium) assert(!s.curveServer); 1266 s.plainUsername = "foobar"; 1267 assert(s.getPlainUsername(new char[8]) == "foobar"); 1268 assert(s.mechanism == Security.plain); 1269 s.plainUsername = null; 1270 assert(s.mechanism == Security.none); 1271 s.plainPassword = "xyz"; 1272 assert(s.getPlainPassword(new char[8]) == "xyz"); 1273 assert(s.mechanism == Security.plain); 1274 s.plainPassword = null; 1275 assert(s.mechanism == Security.none); 1276 s.zapDomain = "my_zap_domain"; 1277 assert(s.zapDomain == "my_zap_domain"); 1278 1279 // Test write-only options 1280 s.conflate = true; 1281 } 1282 1283 version (WithLibsodium) @system unittest 1284 { 1285 // The CURVE key options require some special setup, so we test them 1286 // separately. 1287 import std.array, std.range; 1288 auto binKey1 = iota(cast(ubyte) 0, cast(ubyte) 32).array(); 1289 auto z85Key1 = z85Encode(binKey1); 1290 auto binKey2 = iota(cast(ubyte) 32, cast(ubyte) 64).array(); 1291 auto z85Key2 = z85Encode(binKey2); 1292 auto zeroKey = repeat(cast(ubyte) 0).take(32).array(); 1293 assert (z85Key1 != z85Key2); 1294 1295 auto s = Socket(SocketType.req); 1296 s.curvePublicKey = zeroKey; 1297 s.curveSecretKey = zeroKey; 1298 s.curveServerKey = zeroKey; 1299 1300 s.curvePublicKey = binKey1; 1301 assert (s.curvePublicKey == binKey1); 1302 assert (s.curvePublicKeyZ85 == z85Key1); 1303 s.curvePublicKeyZ85 = z85Key2; 1304 assert (s.curvePublicKey == binKey2); 1305 assert (s.curvePublicKeyZ85 == z85Key2); 1306 assert (s.curveSecretKey == zeroKey); 1307 assert (s.curveServerKey == zeroKey); 1308 s.curvePublicKey = zeroKey; 1309 1310 s.curveSecretKey = binKey1; 1311 assert (s.curveSecretKey == binKey1); 1312 assert (s.curveSecretKeyZ85 == z85Key1); 1313 s.curveSecretKeyZ85 = z85Key2; 1314 assert (s.curveSecretKey == binKey2); 1315 assert (s.curveSecretKeyZ85 == z85Key2); 1316 assert (s.curvePublicKey == zeroKey); 1317 assert (s.curveServerKey == zeroKey); 1318 s.curveSecretKey = zeroKey; 1319 1320 s.curveServerKey = binKey1; 1321 assert (s.curveServerKey == binKey1); 1322 assert (s.curveServerKeyZ85 == z85Key1); 1323 s.curveServerKeyZ85 = z85Key2; 1324 assert (s.curveServerKey == binKey2); 1325 assert (s.curveServerKeyZ85 == z85Key2); 1326 assert (s.curvePublicKey == zeroKey); 1327 assert (s.curveSecretKey == zeroKey); 1328 s.curveServerKey = zeroKey; 1329 } 1330 1331 unittest 1332 { 1333 // Some options are only applicable to specific socket types. 1334 auto rt = Socket(SocketType.router); 1335 rt.routerMandatory = true; 1336 rt.probeRouter = true; 1337 auto xp = Socket(SocketType.xpub); 1338 xp.xpubVerbose = true; 1339 auto rq = Socket(SocketType.req); 1340 rq.reqCorrelate = true; 1341 rq.reqRelaxed = true; 1342 } 1343 1344 deprecated unittest 1345 { 1346 // Test deprecated socket options 1347 auto s = Socket(SocketType.req); 1348 assert(s.ipv4Only); 1349 assert(!s.delayAttachOnConnect); 1350 1351 // Test setters and getters together 1352 s.ipv4Only = false; 1353 assert(!s.ipv4Only); 1354 s.delayAttachOnConnect = true; 1355 assert(s.delayAttachOnConnect); 1356 } 1357 1358 /** 1359 Establishes a message filter. 1360 1361 Throws: 1362 $(REF ZmqException) if $(ZMQ) reports an error. 1363 Corresponds_to: 1364 $(ZMQREF zmq_msg_setsockopt()) with $(D ZMQ_SUBSCRIBE). 1365 */ 1366 void subscribe(const ubyte[] filterPrefix) 1367 { 1368 setArrayOption(ZMQ_SUBSCRIBE, filterPrefix); 1369 } 1370 /// ditto 1371 void subscribe(const char[] filterPrefix) 1372 { 1373 setArrayOption(ZMQ_SUBSCRIBE, filterPrefix); 1374 } 1375 1376 /// 1377 unittest 1378 { 1379 // Create a subscriber that accepts all messages that start with 1380 // the prefixes "foo" or "bar". 1381 auto sck = Socket(SocketType.sub); 1382 sck.subscribe("foo"); 1383 sck.subscribe("bar"); 1384 } 1385 1386 @system unittest 1387 { 1388 void sleep(int ms) { 1389 import core.thread, core.time; 1390 Thread.sleep(dur!"msecs"(ms)); 1391 } 1392 auto pub = Socket(SocketType.pub); 1393 pub.bind("inproc://zmqd_subscribe_unittest"); 1394 auto sub = Socket(SocketType.sub); 1395 sub.connect("inproc://zmqd_subscribe_unittest"); 1396 1397 pub.send("Hello"); 1398 sleep(100); 1399 sub.subscribe("He"); 1400 sub.subscribe(cast(ubyte[])['W', 'o']); 1401 sleep(100); 1402 pub.send("Heeee"); 1403 pub.send("World"); 1404 sleep(100); 1405 ubyte[5] buf; 1406 sub.receive(buf); 1407 assert(buf.asString() == "Heeee"); 1408 sub.receive(buf); 1409 assert(buf.asString() == "World"); 1410 } 1411 1412 /** 1413 Removes a message filter. 1414 1415 Throws: 1416 $(REF ZmqException) if $(ZMQ) reports an error. 1417 Corresponds_to: 1418 $(ZMQREF zmq_msg_setsockopt()) with $(D ZMQ_SUBSCRIBE). 1419 */ 1420 void unsubscribe(const ubyte[] filterPrefix) 1421 { 1422 setArrayOption(ZMQ_UNSUBSCRIBE, filterPrefix); 1423 } 1424 /// ditto 1425 void unsubscribe(const char[] filterPrefix) 1426 { 1427 setArrayOption(ZMQ_UNSUBSCRIBE, filterPrefix); 1428 } 1429 1430 /// 1431 unittest 1432 { 1433 // Subscribe to messages that start with "foo" or "bar". 1434 auto sck = Socket(SocketType.sub); 1435 sck.subscribe("foo"); 1436 sck.subscribe("bar"); 1437 // ... 1438 // From now on, only accept messages that start with "bar" 1439 sck.unsubscribe("foo"); 1440 } 1441 1442 /** 1443 Spawns a PAIR socket that publishes socket state changes (events) over 1444 the INPROC transport to the given endpoint. 1445 1446 Which event types should be published may be selected by bitwise-ORing 1447 together different $(REF EventType) flags in the $(D event) parameter. 1448 1449 Throws: 1450 $(REF ZmqException) if $(ZMQ) reports an error. 1451 Corresponds_to: 1452 $(ZMQREF zmq_socket_monitor()) 1453 See_also: 1454 $(FREF receiveEvent), which receives and parses event messages. 1455 */ 1456 void monitor(const char[] endpoint, EventType events = EventType.all) 1457 { 1458 if (trusted!zmq_socket_monitor(m_socket, zeroTermString(endpoint), events) < 0) { 1459 throw new ZmqException; 1460 } 1461 } 1462 1463 /// 1464 unittest 1465 { 1466 auto sck = Socket(SocketType.pub); 1467 sck.monitor("inproc://zmqd_monitor_unittest", 1468 EventType.accepted | EventType.closed); 1469 } 1470 1471 /** 1472 The $(D void*) pointer used by the underlying C API to refer to the socket. 1473 1474 If the object has not been initialized, this function returns $(D null). 1475 */ 1476 @property inout(void)* handle() inout pure nothrow 1477 { 1478 return m_socket; 1479 } 1480 1481 /** 1482 Whether this $(REF Socket) object has been _initialized, i.e. whether it 1483 refers to a valid $(ZMQ) socket. 1484 */ 1485 @property bool initialized() const pure nothrow 1486 { 1487 return m_socket != null; 1488 } 1489 1490 /// 1491 unittest 1492 { 1493 Socket sck; 1494 assert (!sck.initialized); 1495 sck = Socket(SocketType.sub); 1496 assert (sck.initialized); 1497 sck.close(); 1498 assert (!sck.initialized); 1499 } 1500 1501 private: 1502 // Helper function for ~this() and close() 1503 bool nothrowClose() nothrow 1504 { 1505 if (m_socket != null) { 1506 if (trusted!zmq_close(m_socket) != 0) return false; 1507 m_socket = null; 1508 } 1509 return true; 1510 } 1511 1512 import std.traits; 1513 1514 T getOption(T)(int option) @trusted 1515 if (isScalarType!T) 1516 { 1517 T buf; 1518 auto len = T.sizeof; 1519 if (zmq_getsockopt(m_socket, option, &buf, &len) != 0) { 1520 throw new ZmqException; 1521 } 1522 assert(len == T.sizeof); 1523 return buf; 1524 } 1525 1526 void setOption(T)(int option, T value) @trusted 1527 if (isScalarType!T) 1528 { 1529 if (zmq_setsockopt(m_socket, option, &value, value.sizeof) != 0) { 1530 throw new ZmqException; 1531 } 1532 } 1533 1534 T[] getArrayOption(T)(int option, T[] buf) @trusted 1535 if (isScalarType!T) 1536 { 1537 static assert (T.sizeof == 1); 1538 auto len = buf.length; 1539 if (zmq_getsockopt(m_socket, option, buf.ptr, &len) != 0) { 1540 throw new ZmqException; 1541 } 1542 return buf[0 .. len]; 1543 } 1544 1545 void setArrayOption()(int option, const void[] value) 1546 { 1547 if (trusted!zmq_setsockopt(m_socket, option, value.ptr, value.length) != 0) { 1548 throw new ZmqException; 1549 } 1550 } 1551 1552 char[] getCStringOption(int option, char[] buf) 1553 { 1554 auto ret = getArrayOption(option, buf); 1555 assert (ret.length && ret[$-1] == '\0'); 1556 return ret[0 .. $-1]; 1557 } 1558 1559 version (WithLibsodium) { 1560 enum : size_t 1561 { 1562 keySizeBin = 32, 1563 keyBufSizeBin = keySizeBin, 1564 keySizeZ85 = 40, 1565 keyBufSizeZ85 = keySizeZ85 + 1, 1566 } 1567 1568 ubyte[] getCurveKey(int option, ubyte[] buf) 1569 { 1570 if (buf.length < keyBufSizeBin) { 1571 import core.exception: RangeError; 1572 throw new RangeError; 1573 } 1574 return getArrayOption(option, buf[0 .. keyBufSizeBin]); 1575 } 1576 1577 char[] getCurveKeyZ85(int option, char[] buf) 1578 { 1579 if (buf.length < keyBufSizeZ85) { 1580 import core.exception: RangeError; 1581 throw new RangeError; 1582 } 1583 return getCStringOption(option, buf[0 .. keyBufSizeZ85]); 1584 } 1585 1586 void setCurveKey(int option, const ubyte[] value) 1587 { 1588 if (value.length != keySizeBin) throw new Exception("Invalid key size"); 1589 setArrayOption(option, value); 1590 } 1591 1592 void setCurveKeyZ85(int option, const char[] value) 1593 { 1594 if (value.length != keySizeZ85) throw new Exception("Invalid key size"); 1595 setArrayOption(option, value); 1596 } 1597 } // version (WithLibsodium) 1598 1599 Context m_context; 1600 SocketType m_type; 1601 void* m_socket; 1602 } 1603 1604 unittest 1605 { 1606 auto s1 = Socket(SocketType.pair); 1607 auto s2 = Socket(SocketType.pair); 1608 s1.bind("inproc://unittest"); 1609 s2.connect("inproc://unittest"); 1610 s1.send("Hello World!"); 1611 ubyte[12] buf; 1612 const len = s2.receive(buf[]); 1613 assert (len == 12); 1614 assert (buf == "Hello World!"); 1615 } 1616 1617 1618 version (Windows) { 1619 alias PlatformFD = SOCKET; 1620 } else version (Posix) { 1621 alias PlatformFD = int; 1622 } 1623 1624 /** 1625 The native socket file descriptor type. 1626 1627 This is an alias for $(D SOCKET) on Windows and $(D int) on POSIX systems. 1628 */ 1629 alias FD = PlatformFD; 1630 1631 1632 /** 1633 Starts the built-in $(ZMQ) _proxy. 1634 1635 This function never returns normally, but it may throw an exception. This could 1636 happen if the context associated with either of the specified sockets is 1637 manually destroyed in a different thread. 1638 1639 Throws: 1640 $(REF ZmqException) if $(ZMQ) reports an error. 1641 Corresponds_to: 1642 $(ZMQREF zmq_proxy()) 1643 */ 1644 void proxy(ref Socket frontend, ref Socket backend) 1645 { 1646 const rc = trusted!zmq_proxy(frontend.handle, backend.handle, null); 1647 assert (rc == -1); 1648 throw new ZmqException; 1649 } 1650 1651 /// ditto 1652 void proxy(ref Socket frontend, ref Socket backend, ref Socket capture) 1653 { 1654 const rc = trusted!zmq_proxy(frontend.handle, backend.handle, capture.handle); 1655 assert (rc == -1); 1656 throw new ZmqException; 1657 } 1658 1659 1660 deprecated("zmqd.poll() has a new signature as of v0.4") 1661 uint poll(zmq_pollitem_t[] items, Duration timeout = Duration.max) 1662 { 1663 import std.conv: to; 1664 const n = trusted!zmq_poll( 1665 items.ptr, 1666 to!int(items.length), 1667 timeout == Duration.max ? -1 : to!int(timeout.total!"msecs"())); 1668 if (n < 0) throw new ZmqException; 1669 return cast(uint) n; 1670 } 1671 1672 1673 /** 1674 Input/output multiplexing. 1675 1676 The $(D timeout) parameter may have the special value 1677 $(COREF time,Duration.max), which in this context specifies an infinite 1678 duration. This is translated to an argument value of -1 in the C API. 1679 1680 Returns: 1681 The number of $(REF PollItem) structures with events signalled in 1682 $(REF PollItem.returnedEvents), or 0 if no events have been signalled. 1683 Throws: 1684 $(REF ZmqException) if $(ZMQ) reports an error. 1685 Corresponds_to: 1686 $(ZMQREF zmq_poll()) 1687 */ 1688 uint poll(PollItem[] items, Duration timeout = Duration.max) @trusted 1689 { 1690 // Here we use a trick where we pretend the array of PollItems is 1691 // actually an array of zmq_pollitem_t, to avoid an unnecessary 1692 // allocation. For this to work, PollItem must have the exact 1693 // same size as zmq_pollitem_t. 1694 static assert (PollItem.sizeof == zmq_pollitem_t.sizeof); 1695 1696 import std.conv: to; 1697 const n = zmq_poll( 1698 cast(zmq_pollitem_t*) items.ptr, 1699 to!int(items.length), 1700 timeout == Duration.max ? -1 : to!int(timeout.total!"msecs"())); 1701 if (n < 0) throw new ZmqException; 1702 return cast(uint) n; 1703 } 1704 1705 1706 /// 1707 @system unittest 1708 { 1709 auto socket1 = zmqd.Socket(zmqd.SocketType.pull); 1710 socket1.bind("inproc://zmqd_poll_example"); 1711 1712 import std.socket; 1713 auto socket2 = new std.socket.Socket( 1714 AddressFamily.INET, 1715 std.socket.SocketType.DGRAM); 1716 socket2.bind(new InternetAddress(InternetAddress.ADDR_ANY, 5678)); 1717 1718 auto socket3 = zmqd.Socket(zmqd.SocketType.push); 1719 socket3.connect("inproc://zmqd_poll_example"); 1720 socket3.send("test"); 1721 1722 import core.thread: Thread; 1723 Thread.sleep(10.msecs); 1724 1725 auto items = [ 1726 PollItem(socket1, PollFlags.pollIn), 1727 PollItem(socket2, PollFlags.pollIn | PollFlags.pollOut), 1728 PollItem(socket3, PollFlags.pollIn), 1729 ]; 1730 1731 const n = poll(items, 100.msecs); 1732 assert (n == 2); 1733 assert (items[0].returnedEvents == PollFlags.pollIn); 1734 assert (items[1].returnedEvents == PollFlags.pollOut); 1735 assert (items[2].returnedEvents == 0); 1736 socket2.close(); 1737 } 1738 1739 1740 /** 1741 $(FREF poll) event flags. 1742 1743 These are described in the $(ZMQREF zmq_poll()) manual. 1744 */ 1745 enum PollFlags 1746 { 1747 pollIn = ZMQ_POLLIN, /// Corresponds to $(D ZMQ_POLLIN) 1748 pollOut = ZMQ_POLLOUT, /// Corresponds to $(D ZMQ_POLLOUT) 1749 pollErr = ZMQ_POLLERR, /// Corresponds to $(D ZMQ_POLLERR) 1750 } 1751 1752 1753 /++ 1754 A structure that specifies a socket to be monitored by $(FREF poll) as well 1755 as the events to poll for, and, when $(FREF poll) returns, the events that 1756 occurred. 1757 1758 Warning: 1759 $(D PollItem) objects do not store $(STDREF socket,Socket) references, 1760 only the corresponding native file descriptors. This means that the 1761 references have to be stored elsewhere, or the objects may be garbage 1762 collected, invalidating the sockets before or while $(FREF poll) executes. 1763 --- 1764 // Not OK 1765 auto p1 = PollItem(new std.socket.Socket(/*...*/), PollFlags.pollIn); 1766 1767 // OK 1768 auto s = new std.socket.Socket(/*...*/); 1769 auto p2 = PollItem(s, PollFlags.pollIn); 1770 --- 1771 Corresponds_to: 1772 $(D $(ZMQAPI zmq_poll,zmq_pollitem_t)) 1773 +/ 1774 struct PollItem 1775 { 1776 /// Constructs a $(REF PollItem) for monitoring a $(ZMQ) socket. 1777 this(ref zmqd.Socket socket, PollFlags events) nothrow 1778 { 1779 m_pollItem = zmq_pollitem_t(socket.handle, 0, cast(short) events, 0); 1780 } 1781 1782 import std.socket; 1783 /** 1784 Constructs a $(REF PollItem) for monitoring a standard socket referenced 1785 by a $(STDREF socket,Socket). 1786 */ 1787 this(std.socket.Socket socket, PollFlags events) @system 1788 { 1789 this(socket.handle, events); 1790 } 1791 1792 /** 1793 Constructs a $(REF PollItem) for monitoring a standard socket referenced 1794 by a native file descriptor. 1795 */ 1796 this(FD fd, PollFlags events) pure nothrow 1797 { 1798 m_pollItem = zmq_pollitem_t(null, fd, cast(short) events, 0); 1799 } 1800 1801 /** 1802 Requested events. 1803 1804 Corresponds_to: 1805 $(D $(ZMQAPI zmq_poll,zmq_pollitem_t.events)) 1806 */ 1807 @property void requestedEvents(PollFlags events) pure nothrow 1808 { 1809 m_pollItem.events = cast(short) events; 1810 } 1811 1812 /// ditto 1813 @property PollFlags requestedEvents() const pure nothrow 1814 { 1815 return cast(typeof(return)) m_pollItem.events; 1816 } 1817 1818 /** 1819 Returned events. 1820 1821 Corresponds_to: 1822 $(D $(ZMQAPI zmq_poll,zmq_pollitem_t.revents)) 1823 */ 1824 @property PollFlags returnedEvents() const pure nothrow 1825 { 1826 return cast(typeof(return)) m_pollItem.revents; 1827 } 1828 1829 private: 1830 zmq_pollitem_t m_pollItem; 1831 } 1832 1833 1834 /** 1835 An object that encapsulates a $(ZMQ) message frame. 1836 1837 This $(D struct) is a wrapper around a $(D zmq_msg_t) object. 1838 A default-initialized $(D Frame) is not a valid $(ZMQ) message frame; it 1839 should always be explicitly initialized upon construction using 1840 $(FREF _Frame.opCall). Alternatively, it may be initialized later with 1841 $(FREF _Frame.rebuild). 1842 --- 1843 Frame msg1; // Invalid frame 1844 auto msg2 = Frame(); // Empty frame 1845 auto msg3 = Frame(1024); // 1K frame 1846 msg1.rebuild(2048); // msg1 now has size 2K 1847 msg2.rebuild(2048); // ...and so does msg2 1848 --- 1849 When a $(D Frame) goes out of scope, $(ZMQREF zmq_msg_close()) is 1850 called on the underlying $(D zmq_msg_t). 1851 1852 A $(D Frame) cannot be copied by normal assignment; use $(FREF _Frame.copy) 1853 for this. 1854 */ 1855 struct Frame 1856 { 1857 @safe: 1858 /** 1859 Initializes an empty $(ZMQ) message frame. 1860 1861 Throws: 1862 $(REF ZmqException) if $(ZMQ) reports an error. 1863 Corresponds_to: 1864 $(ZMQREF zmq_msg_init()) 1865 */ 1866 static Frame opCall() 1867 { 1868 Frame f; 1869 f.init(); 1870 return f; 1871 } 1872 1873 /// 1874 unittest 1875 { 1876 auto msg = Frame(); 1877 assert(msg.size == 0); 1878 } 1879 1880 /** $(DDOC_ANCHOR Frame.opCall_size) 1881 Initializes a $(ZMQ) message frame of a specified size. 1882 1883 Throws: 1884 $(REF ZmqException) if $(ZMQ) reports an error. 1885 Corresponds_to: 1886 $(ZMQREF zmq_msg_init_size()) 1887 */ 1888 static Frame opCall(size_t size) 1889 { 1890 Frame m; 1891 m.init(size); 1892 return m; 1893 } 1894 1895 /// 1896 unittest 1897 { 1898 auto msg = Frame(123); 1899 assert(msg.size == 123); 1900 } 1901 1902 /** $(DDOC_ANCHOR Frame.opCall_data) 1903 Initializes a $(ZMQ) message frame from a supplied buffer. 1904 1905 Warning: 1906 Some care must be taken when using this function, as $(ZMQ) expects 1907 to take full ownership of the supplied buffer. Client code should 1908 therefore avoid retaining any references to it, including slices that 1909 contain, overlap with or are contained in $(D data). 1910 $(ZMQ) makes no guarantee that the buffer is not modified, 1911 and it does not specify when the buffer is released. 1912 1913 An additional complication is caused by the fact that most arrays in D 1914 are owned by the garbage collector. This is solved by adding the array 1915 pointer as a new garbage collector root before passing it to 1916 $(ZMQREF zmq_msg_init_data()), thus preventing the GC from collecting 1917 it. The root is then removed again in the deallocator callback 1918 function which is called by $(ZMQ) when it no longer requires 1919 the buffer, thus allowing the GC to collect it. 1920 Throws: 1921 $(REF ZmqException) if $(ZMQ) reports an error. 1922 Corresponds_to: 1923 $(ZMQREF zmq_msg_init_data()) 1924 */ 1925 static Frame opCall(ubyte[] data) 1926 { 1927 Frame m; 1928 m.init(data); 1929 return m; 1930 } 1931 1932 /// 1933 unittest 1934 { 1935 auto buf = new ubyte[123]; 1936 auto msg = Frame(buf); 1937 assert(msg.size == buf.length); 1938 assert(msg.data.ptr == buf.ptr); 1939 } 1940 1941 /** 1942 Reinitializes the Frame object as an empty message. 1943 1944 This function will first call $(FREF Frame.close) to release the 1945 resources associated with the message frame, and then it will 1946 initialize it anew, exactly as if it were constructed with 1947 $(D $(LINK2 #Frame.opCall,Frame())). 1948 1949 Throws: 1950 $(REF ZmqException) if $(ZMQ) reports an error. 1951 Corresponds_to: 1952 $(ZMQREF zmq_msg_close()) followed by $(ZMQREF zmq_msg_init()) 1953 */ 1954 void rebuild() 1955 { 1956 close(); 1957 init(); 1958 } 1959 1960 /// 1961 unittest 1962 { 1963 auto msg = Frame(256); 1964 assert (msg.size == 256); 1965 msg.rebuild(); 1966 assert (msg.size == 0); 1967 } 1968 1969 /** 1970 Reinitializes the Frame object to a specified size. 1971 1972 This function will first call $(FREF Frame.close) to release the 1973 resources associated with the message frame, and then it will 1974 initialize it anew, exactly as if it were constructed with 1975 $(D $(LINK2 #Frame.opCall_size,Frame(size))). 1976 1977 Throws: 1978 $(REF ZmqException) if $(ZMQ) reports an error. 1979 Corresponds_to: 1980 $(ZMQREF zmq_msg_close()) followed by $(ZMQREF zmq_msg_init_size()). 1981 */ 1982 void rebuild(size_t size) 1983 { 1984 close(); 1985 init(size); 1986 } 1987 1988 /// 1989 unittest 1990 { 1991 auto msg = Frame(256); 1992 assert (msg.size == 256); 1993 msg.rebuild(1024); 1994 assert (msg.size == 1024); 1995 } 1996 1997 /** 1998 Reinitializes the Frame object from a supplied buffer. 1999 2000 This function will first call $(FREF Frame.close) to release the 2001 resources associated with the message frame, and then it will 2002 initialize it anew, exactly as if it were constructed with 2003 $(D $(LINK2 #Frame.opCall_data,Frame(data))). 2004 2005 Throws: 2006 $(REF ZmqException) if $(ZMQ) reports an error. 2007 Corresponds_to: 2008 $(ZMQREF zmq_msg_close()) followed by $(ZMQREF zmq_msg_init_data()). 2009 */ 2010 void rebuild(ubyte[] data) 2011 { 2012 close(); 2013 init(data); 2014 } 2015 2016 /// 2017 unittest 2018 { 2019 auto msg = Frame(256); 2020 assert (msg.size == 256); 2021 auto buf = new ubyte[123]; 2022 msg.rebuild(buf); 2023 assert(msg.size == buf.length); 2024 assert(msg.data.ptr == buf.ptr); 2025 } 2026 2027 @disable this(this); 2028 2029 /** 2030 Releases the $(ZMQ) message frame when the $(D Frame) is destroyed. 2031 2032 This destructor never throws, which means that any errors will go 2033 undetected. If this is undesirable, call $(FREF Frame.close) before 2034 the $(D Frame) is destroyed. 2035 2036 Corresponds_to: 2037 $(ZMQREF zmq_msg_close()) 2038 */ 2039 ~this() nothrow 2040 { 2041 if (m_initialized) { 2042 immutable rc = trusted!zmq_msg_close(&m_msg); 2043 assert(rc == 0, "zmq_msg_close failed: Invalid message frame"); 2044 } 2045 } 2046 2047 /** 2048 Releases the $(ZMQ) message frame. 2049 2050 Note that the frame will be automatically released when the $(D Frame) 2051 object is destroyed, so it is often not necessary to call this method 2052 manually. 2053 2054 Throws: 2055 $(REF ZmqException) if $(ZMQ) reports an error. 2056 Corresponds_to: 2057 $(ZMQREF zmq_msg_close()) 2058 */ 2059 void close() 2060 { 2061 if (m_initialized) { 2062 if (trusted!zmq_msg_close(&m_msg) != 0) { 2063 throw new ZmqException; 2064 } 2065 m_initialized = false; 2066 } 2067 } 2068 2069 /** 2070 Copies frame content to another message frame. 2071 2072 $(D copy()) returns a new $(D Frame) object, while $(D copyTo(dest)) 2073 copies the contents of this $(D Frame) into $(D dest). $(D dest) must 2074 be a valid (i.e. initialized) $(D Frame). 2075 2076 Warning: 2077 These functions may not do what you think they do. Please refer 2078 to $(ZMQAPI zmq_msg_copy(),the $(ZMQ) manual) for details. 2079 Throws: 2080 $(REF ZmqException) if $(ZMQ) reports an error. 2081 Corresponds_to: 2082 $(ZMQREF zmq_msg_copy()) 2083 */ 2084 Frame copy() 2085 in { assert(m_initialized); } 2086 body 2087 { 2088 auto cp = Frame(); 2089 copyTo(cp); 2090 return cp; 2091 } 2092 2093 /// ditto 2094 void copyTo(ref Frame dest) 2095 in { assert(m_initialized); } 2096 body 2097 { 2098 if (trusted!zmq_msg_copy(&dest.m_msg, &m_msg) != 0) { 2099 throw new ZmqException; 2100 } 2101 } 2102 2103 /// 2104 unittest 2105 { 2106 import std.string: representation; 2107 auto msg1 = Frame(3); 2108 msg1.data[] = "foo".representation; 2109 auto msg2 = msg1.copy(); 2110 assert (msg2.data.asString() == "foo"); 2111 } 2112 2113 /** 2114 Moves frame content to another message frame. 2115 2116 $(D move()) returns a new $(D Frame) object, while $(D moveTo(dest)) 2117 moves the contents of this $(D Frame) to $(D dest). $(D dest) must 2118 be a valid (i.e. initialized) $(D Frame). 2119 2120 Throws: 2121 $(REF ZmqException) if $(ZMQ) reports an error. 2122 Corresponds_to: 2123 $(ZMQREF zmq_msg_move()) 2124 */ 2125 Frame move() 2126 in { assert(m_initialized); } 2127 body 2128 { 2129 auto m = Frame(); 2130 moveTo(m); 2131 return m; 2132 } 2133 2134 /// ditto 2135 void moveTo(ref Frame dest) 2136 in { assert(m_initialized); } 2137 body 2138 { 2139 if (trusted!zmq_msg_move(&dest.m_msg, &m_msg) != 0) { 2140 throw new ZmqException; 2141 } 2142 } 2143 2144 /// 2145 unittest 2146 { 2147 import std.string: representation; 2148 auto msg1 = Frame(3); 2149 msg1.data[] = "foo".representation; 2150 auto msg2 = msg1.move(); 2151 assert (msg1.size == 0); 2152 assert (msg2.data.asString() == "foo"); 2153 } 2154 2155 /** 2156 The message frame content size in bytes. 2157 2158 Corresponds_to: 2159 $(ZMQREF zmq_msg_size()) 2160 */ 2161 @property size_t size() nothrow 2162 in { assert(m_initialized); } 2163 body 2164 { 2165 return trusted!zmq_msg_size(&m_msg); 2166 } 2167 2168 /// 2169 unittest 2170 { 2171 auto msg = Frame(123); 2172 assert(msg.size == 123); 2173 } 2174 2175 /** 2176 Retrieves the message frame content. 2177 2178 Corresponds_to: 2179 $(ZMQREF zmq_msg_data()) 2180 */ 2181 @property ubyte[] data() @trusted nothrow 2182 in { assert(m_initialized); } 2183 body 2184 { 2185 return (cast(ubyte*) zmq_msg_data(&m_msg))[0 .. size]; 2186 } 2187 2188 /// 2189 unittest 2190 { 2191 import std.string: representation; 2192 auto msg = Frame(3); 2193 assert(msg.data.length == 3); 2194 msg.data[] = "foo".representation; // Slice operator -> array copy. 2195 assert(msg.data.asString() == "foo"); 2196 } 2197 2198 /** 2199 Whether there are more message frames to retrieve. 2200 2201 Corresponds_to: 2202 $(ZMQREF zmq_msg_more()) 2203 */ 2204 @property bool more() nothrow 2205 in { assert(m_initialized); } 2206 body 2207 { 2208 return !!trusted!zmq_msg_more(&m_msg); 2209 } 2210 2211 /** 2212 A pointer to the underlying $(D zmq_msg_t). 2213 */ 2214 @property inout(zmq_msg_t)* handle() inout pure nothrow 2215 { 2216 return &m_msg; 2217 } 2218 2219 private: 2220 private void init() 2221 in { assert (!m_initialized); } 2222 out { assert (m_initialized); } 2223 body 2224 { 2225 if (trusted!zmq_msg_init(&m_msg) != 0) { 2226 throw new ZmqException; 2227 } 2228 m_initialized = true; 2229 } 2230 2231 private void init(size_t size) 2232 in { assert (!m_initialized); } 2233 out { assert (m_initialized); } 2234 body 2235 { 2236 if (trusted!zmq_msg_init_size(&m_msg, size) != 0) { 2237 throw new ZmqException; 2238 } 2239 m_initialized = true; 2240 } 2241 2242 private void init(ubyte[] data) @trusted 2243 in { assert (!m_initialized); } 2244 out { assert (m_initialized); } 2245 body 2246 { 2247 import core.memory; 2248 static extern(C) zmqd_Frame_init_data_free(void* dataPtr, void* block) 2249 @trusted nothrow 2250 { 2251 GC.removeRoot(dataPtr); 2252 GC.clrAttr(block, GC.BlkAttr.NO_MOVE); 2253 } 2254 2255 GC.addRoot(data.ptr); 2256 scope(failure) GC.removeRoot(data.ptr); 2257 2258 auto block = GC.addrOf(data.ptr); 2259 immutable movable = block && !(GC.getAttr(block) & GC.BlkAttr.NO_MOVE); 2260 GC.setAttr(block, GC.BlkAttr.NO_MOVE); 2261 scope(failure) if (movable) GC.clrAttr(block, GC.BlkAttr.NO_MOVE); 2262 2263 if (trusted!zmq_msg_init_data(&m_msg, data.ptr, data.length, 2264 &zmqd_Frame_init_data_free, block) != 0) { 2265 throw new ZmqException; 2266 } 2267 m_initialized = true; 2268 } 2269 2270 bool m_initialized; 2271 zmq_msg_t m_msg; 2272 } 2273 2274 unittest 2275 { 2276 const url = uniqueUrl("inproc"); 2277 auto s1 = Socket(SocketType.pair); 2278 auto s2 = Socket(SocketType.pair); 2279 s1.bind(url); 2280 s2.connect(url); 2281 2282 auto m1a = Frame(123); 2283 m1a.data[] = 'a'; 2284 s1.send(m1a); 2285 auto m2a = Frame(); 2286 s2.receive(m2a); 2287 assert(m2a.size == 123); 2288 foreach (e; m2a.data) assert(e == 'a'); 2289 2290 auto m1b = Frame(10); 2291 m1b.data[] = 'b'; 2292 s1.send(m1b); 2293 auto m2b = Frame(); 2294 s2.receive(m2b); 2295 assert(m2b.size == 10); 2296 foreach (e; m2b.data) assert(e == 'b'); 2297 } 2298 2299 deprecated("zmqd.Message has been renamed to zmqd.Frame") alias Message = Frame; 2300 2301 2302 /** 2303 A global context which is used by default by all sockets, unless they are 2304 explicitly constructed with a different context. 2305 2306 The $(ZMQ) Guide $(LINK2 http://zguide.zeromq.org/page:all#Getting-the-Context-Right, 2307 has the following to say) about context creation: 2308 $(QUOTE 2309 You should create and use exactly one context in your process. 2310 [$(LDOTS)] If at runtime a process has two contexts, these are 2311 like separate $(ZMQ) instances. If that's explicitly what you 2312 want, OK, but otherwise remember: $(EM Do one $(D zmq_ctx_new()) 2313 at the start of your main line code, and one $(D zmq_ctx_destroy()) 2314 at the end.) 2315 ) 2316 By using $(D defaultContext()), this is exactly what you achieve. The 2317 context is created the first time the function is called, and is 2318 automatically destroyed when the program ends. 2319 2320 This function is thread safe. 2321 2322 Throws: 2323 $(REF ZmqException) if $(ZMQ) reports an error. 2324 See_also: 2325 $(REF Context) 2326 */ 2327 Context defaultContext() @trusted 2328 { 2329 // For future reference: This is the low-lock singleton pattern. See: 2330 // http://davesdprogramming.wordpress.com/2013/05/06/low-lock-singletons/ 2331 static bool instantiated; 2332 __gshared Context ctx; 2333 if (!instantiated) { 2334 synchronized { 2335 if (!ctx.initialized) { 2336 ctx = Context(); 2337 } 2338 instantiated = true; 2339 } 2340 } 2341 return ctx; 2342 } 2343 2344 @system unittest 2345 { 2346 import core.thread; 2347 Context c1, c2; 2348 auto t = new Thread(() { c1 = defaultContext(); }); 2349 t.start(); 2350 c2 = defaultContext(); 2351 t.join(); 2352 assert(c1.handle !is null); 2353 assert(c1.handle == c2.handle); 2354 } 2355 2356 2357 /** 2358 An object that encapsulates a $(ZMQ) context. 2359 2360 In most programs, it is not necessary to use this type directly, 2361 as $(REF Socket) will use a default global context if not explicitly 2362 provided with one. See $(FREF defaultContext) for details. 2363 2364 A default-initialized $(D Context) is not a valid $(ZMQ) context; it 2365 must always be explicitly initialized with $(FREF _Context.opCall): 2366 --- 2367 Context ctx; // Not a valid context yet 2368 ctx = Context(); // ...but now it is. 2369 --- 2370 $(D Context) objects can be passed around by value, and two copies will 2371 refer to the same context. The underlying context is managed using 2372 reference counting, so that when the last copy of a $(D Context) goes 2373 out of scope, the context is automatically destroyed. The reference 2374 counting is performed in a thread safe manner, so that the same context 2375 can be shared between multiple threads. ($(ZMQ) guarantees the thread 2376 safety of other context operations.) 2377 2378 See_also: 2379 $(FREF defaultContext) 2380 */ 2381 struct Context 2382 { 2383 @safe: 2384 /** 2385 Creates a new $(ZMQ) context. 2386 2387 Returns: 2388 A $(REF Context) object that encapsulates the new context. 2389 Throws: 2390 $(REF ZmqException) if $(ZMQ) reports an error. 2391 Corresponds_to: 2392 $(ZMQREF zmq_ctx_new()) 2393 */ 2394 static Context opCall() @trusted // because of the cast 2395 { 2396 if (auto c = trusted!zmq_ctx_new()) { 2397 Context ctx; 2398 // Casting from/to shared is OK since ZMQ contexts are thread safe. 2399 static Exception release(shared(void)* ptr) @trusted nothrow 2400 { 2401 return zmq_ctx_term(cast(void*) ptr) == 0 2402 ? null 2403 : new ZmqException; 2404 } 2405 ctx.m_resource = SharedResource(cast(shared) c, &release); 2406 return ctx; 2407 } else { 2408 throw new ZmqException; 2409 } 2410 } 2411 2412 /// 2413 unittest 2414 { 2415 auto ctx = Context(); 2416 assert (ctx.initialized); 2417 } 2418 2419 /** 2420 Detaches from the $(ZMQ) context. 2421 2422 If this is the last reference to the context, it will be destroyed with 2423 $(ZMQREF zmq_ctx_destroy()). 2424 2425 Throws: 2426 $(REF ZmqException) if $(ZMQ) reports an error. 2427 */ 2428 void detach() 2429 { 2430 m_resource.detach(); 2431 } 2432 2433 /// 2434 unittest 2435 { 2436 auto ctx = Context(); 2437 assert (ctx.initialized); 2438 ctx.detach(); 2439 assert (!ctx.initialized); 2440 } 2441 2442 /** 2443 The number of I/O threads. 2444 2445 Throws: 2446 $(REF ZmqException) if $(ZMQ) reports an error. 2447 Corresponds_to: 2448 $(ZMQREF zmq_ctx_get()) and $(ZMQREF zmq_ctx_set()) with 2449 $(D ZMQ_IO_THREADS). 2450 */ 2451 @property int ioThreads() 2452 { 2453 return getOption(ZMQ_IO_THREADS); 2454 } 2455 2456 /// ditto 2457 @property void ioThreads(int value) 2458 { 2459 setOption(ZMQ_IO_THREADS, value); 2460 } 2461 2462 /// 2463 unittest 2464 { 2465 auto ctx = Context(); 2466 ctx.ioThreads = 3; 2467 assert (ctx.ioThreads == 3); 2468 } 2469 2470 /** 2471 The maximum number of sockets. 2472 2473 Throws: 2474 $(REF ZmqException) if $(ZMQ) reports an error. 2475 Corresponds_to: 2476 $(ZMQREF zmq_ctx_get()) and $(ZMQREF zmq_ctx_set()) with 2477 $(D ZMQ_MAX_SOCKETS). 2478 */ 2479 @property int maxSockets() 2480 { 2481 return getOption(ZMQ_MAX_SOCKETS); 2482 } 2483 2484 /// ditto 2485 @property void maxSockets(int value) 2486 { 2487 setOption(ZMQ_MAX_SOCKETS, value); 2488 } 2489 2490 /// 2491 unittest 2492 { 2493 auto ctx = Context(); 2494 ctx.maxSockets = 512; 2495 assert (ctx.maxSockets == 512); 2496 } 2497 2498 /** 2499 The $(D void*) pointer used by the underlying C API to refer to the context. 2500 2501 If the object has not been initialized, this function returns $(D null). 2502 */ 2503 @property inout(void)* handle() inout @trusted pure nothrow 2504 { 2505 // ZMQ contexts are thread safe, so casting away shared is OK. 2506 return cast(typeof(return)) m_resource.handle; 2507 } 2508 2509 /** 2510 Whether this $(REF Context) object has been _initialized, i.e. whether it 2511 refers to a valid $(ZMQ) context. 2512 */ 2513 @property bool initialized() const pure nothrow 2514 { 2515 return m_resource.handle != null; 2516 } 2517 2518 /// 2519 unittest 2520 { 2521 Context ctx; 2522 assert (!ctx.initialized); 2523 ctx = Context(); 2524 assert (ctx.initialized); 2525 ctx.detach(); 2526 assert (!ctx.initialized); 2527 } 2528 2529 private: 2530 int getOption(int option) 2531 { 2532 immutable value = trusted!zmq_ctx_get(this.handle, option); 2533 if (value < 0) { 2534 throw new ZmqException; 2535 } 2536 return value; 2537 } 2538 2539 void setOption(int option, int value) 2540 { 2541 if (trusted!zmq_ctx_set(this.handle, option, value) != 0) { 2542 throw new ZmqException; 2543 } 2544 } 2545 2546 SharedResource m_resource; 2547 } 2548 2549 2550 /** 2551 Socket event types. 2552 2553 These are used together with $(FREF Socket.monitor), and are described 2554 in the $(ZMQREF zmq_socket_monitor()) reference. 2555 */ 2556 enum EventType 2557 { 2558 connected = ZMQ_EVENT_CONNECTED, /// Corresponds to $(D ZMQ_EVENT_CONNECTED). 2559 connectDelayed = ZMQ_EVENT_CONNECT_DELAYED,/// Corresponds to $(D ZMQ_EVENT_CONNECT_DELAYED). 2560 connectRetried = ZMQ_EVENT_CONNECT_RETRIED,/// Corresponds to $(D ZMQ_EVENT_CONNECT_RETRIED). 2561 listening = ZMQ_EVENT_LISTENING, /// Corresponds to $(D ZMQ_EVENT_LISTENING). 2562 bindFailed = ZMQ_EVENT_BIND_FAILED, /// Corresponds to $(D ZMQ_EVENT_BIND_FAILED). 2563 accepted = ZMQ_EVENT_ACCEPTED, /// Corresponds to $(D ZMQ_EVENT_ACCEPTED). 2564 acceptFailed = ZMQ_EVENT_ACCEPT_FAILED, /// Corresponds to $(D ZMQ_EVENT_ACCEPT_FAILED). 2565 closed = ZMQ_EVENT_CLOSED, /// Corresponds to $(D ZMQ_EVENT_CLOSED). 2566 closeFailed = ZMQ_EVENT_CLOSE_FAILED, /// Corresponds to $(D ZMQ_EVENT_CLOSE_FAILED). 2567 disconnected = ZMQ_EVENT_DISCONNECTED, /// Corresponds to $(D ZMQ_EVENT_DISCONNECTED). 2568 all = ZMQ_EVENT_ALL /// Corresponds to $(D ZMQ_EVENT_ALL). 2569 } 2570 2571 2572 /** 2573 Receives a message on the given socket and interprets it as a socket 2574 state change event. 2575 2576 $(D socket) must be a PAIR socket which is connected to an endpoint 2577 created via a $(FREF Socket.monitor) call. $(D receiveEvent()) receives 2578 one message on the socket, parses its contents according to the 2579 specification in the $(ZMQREF zmq_socket_monitor()) reference, 2580 and returns the event information as an $(REF Event) object. 2581 2582 Throws: 2583 $(REF ZmqException) if $(ZMQ) reports an error.$(BR) 2584 $(REF InvalidEventException) if the received message could not 2585 be interpreted as an event message. 2586 See_also: 2587 $(FREF Socket.monitor), for monitoring socket state changes. 2588 */ 2589 Event receiveEvent(ref Socket socket) @system 2590 { 2591 // The monitor event message format underwent some changes between ZMQ 2592 // versions 3.2, 3.3 (unreleased) and 4.0. Furthermore, the zmq_event_t 2593 // type was removed as of ZMQ 4.1. We try to support all versions >= 3.2. 2594 immutable ver = zmqVersion(); 2595 immutable usePackedData = ver.major >= 4; 2596 immutable useNewLayout = ZMQ_MAKE_VERSION(ver.major, ver.minor, ver.patch) 2597 >= ZMQ_MAKE_VERSION(3, 3, 0); 2598 assert (useNewLayout || !usePackedData); 2599 2600 struct OldEventStruct { 2601 int event; 2602 const(char*) addr; 2603 int value; 2604 } 2605 struct NewEventStruct { 2606 ushort event; 2607 int value; 2608 } 2609 immutable eventFrameSize = 2610 usePackedData ? (ushort.sizeof + int.sizeof) 2611 : (useNewLayout ? NewEventStruct.sizeof : OldEventStruct.sizeof); 2612 2613 auto eventFrame = Frame(); 2614 if (socket.receive(eventFrame) != eventFrameSize) { 2615 throw new InvalidEventException; 2616 } 2617 const data = eventFrame.data.ptr; 2618 try { 2619 import std.conv: to; 2620 EventType event; 2621 int value; 2622 string addr; 2623 if (useNewLayout) { 2624 if (usePackedData) { 2625 event = to!EventType(*(cast(const(ushort)*) data)); 2626 value = *(cast(const(int)*) data + ushort.sizeof); 2627 } else { 2628 const eventStruct = cast(const(NewEventStruct)*) data; 2629 event = to!EventType(eventStruct.event); 2630 value = eventStruct.value; 2631 } 2632 auto addrFrame = Frame(); 2633 socket.receive(addrFrame); 2634 addr = (cast(char[]) addrFrame.data).idup; 2635 } else { 2636 const eventStruct = cast(const(OldEventStruct)*) data; 2637 event = to!EventType(eventStruct.event); 2638 value = eventStruct.value; 2639 addr = eventStruct.addr !is null 2640 ? to!string(eventStruct.addr) 2641 : null; 2642 } 2643 return Event(event, addr, value); 2644 } catch (Exception e) { 2645 // Any exception thrown within the try block signifies that there 2646 // is something wrong with the event message. 2647 throw new InvalidEventException; 2648 } 2649 } 2650 2651 // TODO: Remove version(Posix) and change to INPROC when updating to ZMQ 4.1. 2652 // IPC does not work on Windows, and unbind() does not work with INPROC. 2653 // See: https://github.com/zeromq/libzmq/issues/949 2654 version (Posix) @system unittest 2655 { 2656 Event[] events; 2657 void eventCollector() 2658 { 2659 auto coll = Socket(SocketType.pair); 2660 coll.connect("inproc://zmqd_receiveEvent_unittest_monitor"); 2661 do { 2662 events ~= receiveEvent(coll); 2663 } while (events[$-1].type != EventType.closed); 2664 } 2665 import core.thread; 2666 auto collector = new Thread(&eventCollector); 2667 collector.start(); 2668 2669 static void eventGenerator() 2670 { 2671 auto sck1 = Socket(SocketType.pair); 2672 sck1.monitor("inproc://zmqd_receiveEvent_unittest_monitor"); 2673 sck1.bind("ipc://zmqd_receiveEvent_unittest"); 2674 import core.time; 2675 Thread.sleep(100.msecs); 2676 auto sck2 = Socket(SocketType.pair); 2677 sck2.connect("ipc://zmqd_receiveEvent_unittest"); 2678 Thread.sleep(100.msecs); 2679 sck2.disconnect("ipc://zmqd_receiveEvent_unittest"); 2680 Thread.sleep(100.msecs); 2681 sck1.unbind("ipc://zmqd_receiveEvent_unittest"); 2682 } 2683 eventGenerator(); 2684 collector.join(); 2685 assert (events.length == 3); 2686 foreach (ev; events) { 2687 assert (ev.address == "ipc://zmqd_receiveEvent_unittest"); 2688 } 2689 assert (events[0].type == EventType.listening); 2690 assert (events[1].type == EventType.accepted); 2691 assert (events[2].type == EventType.closed); 2692 import std.exception; 2693 assertNotThrown!Error(events[0].fd); 2694 assertThrown!Error(events[0].errno); 2695 assertThrown!Error(events[0].interval); 2696 } 2697 2698 2699 /** 2700 Information about a socket state change. 2701 2702 Corresponds_to: 2703 $(ZMQAPI zmq_socket_monitor,$(D zmq_event_t)) 2704 See_also: 2705 $(FREF receiveEvent) 2706 */ 2707 struct Event 2708 { 2709 /** 2710 The event type. 2711 2712 Corresponds_to: 2713 $(D zmq_event_t.event) 2714 */ 2715 @property EventType type() const pure nothrow 2716 { 2717 return m_type; 2718 } 2719 2720 /** 2721 The peer address. 2722 2723 Corresponds_to: 2724 $(D zmq_event_t.data.xyz.addr), where $(D xyz) is the event-specific union. 2725 */ 2726 @property string address() const pure nothrow 2727 { 2728 return m_address; 2729 } 2730 2731 /** 2732 The socket file descriptor. 2733 2734 This property function may only be called if $(REF Event.type) is one of: 2735 $(D connected), $(D listening), $(D accepted), $(D closed) or $(D disonnected). 2736 2737 Throws: 2738 $(D Error) if the property is called for a wrong event type. 2739 Corresponds_to: 2740 $(D zmq_event_t.data.xyz.addr), where $(D xyz) is the event-specific union. 2741 */ 2742 @property FD fd() const pure nothrow 2743 { 2744 final switch (m_type) { 2745 case EventType.connected : 2746 case EventType.listening : 2747 case EventType.accepted : 2748 case EventType.closed : 2749 case EventType.disconnected : return cast(typeof(return)) m_value; 2750 case EventType.connectDelayed: 2751 case EventType.connectRetried: 2752 case EventType.bindFailed : 2753 case EventType.acceptFailed : 2754 case EventType.closeFailed : throw invalidProperty(); 2755 case EventType.all : 2756 } 2757 assert (false); 2758 } 2759 2760 /** 2761 The $(D errno) code for the error which triggered the event. 2762 2763 This property function may only be called if $(REF Event.type) is either 2764 $(D bindFailed), $(D acceptFailed) or $(D closeFailed). 2765 2766 Throws: 2767 $(D Error) if the property is called for a wrong event type. 2768 Corresponds_to: 2769 $(D zmq_event_t.data.xyz.addr), where $(D xyz) is the event-specific union. 2770 */ 2771 @property int errno() const pure nothrow 2772 { 2773 final switch (m_type) { 2774 case EventType.bindFailed : 2775 case EventType.acceptFailed : 2776 case EventType.closeFailed : return m_value; 2777 case EventType.connected : 2778 case EventType.connectDelayed: 2779 case EventType.connectRetried: 2780 case EventType.listening : 2781 case EventType.accepted : 2782 case EventType.closed : 2783 case EventType.disconnected : throw invalidProperty(); 2784 case EventType.all : 2785 } 2786 assert (false); 2787 } 2788 2789 /** 2790 The reconnect interval. 2791 2792 This property function may only be called if $(REF Event.type) is 2793 $(D connectRetried). 2794 2795 Throws: 2796 $(D Error) if the property is called for a wrong event type. 2797 Corresponds_to: 2798 $(D zmq_event_t.data.connect_retried.interval) 2799 */ 2800 @property Duration interval() const pure nothrow 2801 { 2802 final switch (m_type) { 2803 case EventType.connectRetried: return m_value.msecs; 2804 case EventType.connected : 2805 case EventType.connectDelayed: 2806 case EventType.listening : 2807 case EventType.bindFailed : 2808 case EventType.accepted : 2809 case EventType.acceptFailed : 2810 case EventType.closed : 2811 case EventType.closeFailed : 2812 case EventType.disconnected : throw invalidProperty(); 2813 case EventType.all : 2814 } 2815 assert (false); 2816 } 2817 2818 private: 2819 this(EventType type, string address, int value) pure nothrow 2820 { 2821 m_type = type; 2822 m_address = address; 2823 m_value = value; 2824 } 2825 2826 Error invalidProperty(string name = __FUNCTION__)() const pure nothrow 2827 { 2828 try { 2829 import std.conv: text; 2830 return new Error(text("Property '", name, 2831 "' not available for event type '", 2832 m_type, "'")); 2833 } catch (Exception e) { 2834 assert(false); 2835 } 2836 } 2837 2838 EventType m_type; 2839 string m_address; 2840 int m_value; 2841 } 2842 2843 2844 version (WithLibsodium) { 2845 2846 /** 2847 Encodes a binary key as Z85 printable text. 2848 2849 $(D dest) must be an array whose length is at least $(D 5*data.length/4 + 1), 2850 which will be used to store the return value plus a terminating zero byte. 2851 If $(D dest) is omitted, a new array will be created. 2852 2853 Returns: 2854 An array of size $(D 5*data.length/4) which contains the Z85-encoded text, 2855 excluding the terminating zero byte. This will be a slice of $(D dest) if 2856 it is provided. 2857 Throws: 2858 $(COREF exception,RangeError) if $(D dest) is given but is too small.$(BR) 2859 $(REF ZmqException) if $(ZMQ) reports an error (i.e., if data.length is not 2860 a multiple of 4). 2861 Corresponds_to: 2862 $(ZMQREF zmq_z85_encode()) 2863 */ 2864 char[] z85Encode(ubyte[] data, char[] dest) 2865 // TODO: Make data const when we update to ZMQ 4.1 2866 { 2867 import core.exception: RangeError; 2868 immutable len = 5 * data.length / 4; 2869 if (dest.length < len+1) throw new RangeError; 2870 if (trusted!zmq_z85_encode(dest.ptr, data.ptr, data.length) == null) { 2871 throw new ZmqException; 2872 } 2873 return dest[0 .. len]; 2874 } 2875 2876 /// ditto 2877 char[] z85Encode(ubyte[] data) 2878 { 2879 return z85Encode(data, new char[5*data.length/4 + 1]); 2880 } 2881 2882 @system unittest // @system because of assertThrown 2883 { 2884 // TODO: Make data immutable when we update to ZMQ 4.1 2885 auto data = cast(ubyte[])[0x86, 0x4f, 0xd2, 0x6f, 0xb5, 0x59, 0xf7, 0x5b]; 2886 immutable encoded = "HelloWorld"; 2887 assert (z85Encode(data) == encoded); 2888 2889 auto buffer = new char[11]; 2890 auto result = z85Encode(data, buffer); 2891 assert (result == encoded); 2892 assert (buffer.ptr == result.ptr); 2893 2894 import core.exception: RangeError; 2895 import std.exception: assertThrown; 2896 assertThrown!RangeError(z85Encode(data, new char[10])); 2897 assertThrown!ZmqException(z85Encode(cast(ubyte[]) [ 1, 2, 3, 4, 5])); 2898 } 2899 2900 2901 /** 2902 Decodes a binary key from Z85 printable text. 2903 2904 $(D dest) must be an array whose length is at least $(D 4*data.length/5), 2905 which will be used to store the return value. 2906 If $(D dest) is omitted, a new array will be created. 2907 2908 Note that $(ZMQREF zmq_z85_decode()) expects a zero-terminated string, so a zero 2909 byte will be appended to $(D text) if it does not contain one already. However, 2910 this may trigger a (possibly unwanted) GC allocation. To avoid this, either 2911 make sure that the last character in $(D text) is $(D '\0'), or use 2912 $(OBJREF assumeSafeAppend) on the array before calling this function. 2913 2914 Returns: 2915 An array of size $(D 4*data.length/5) which contains the decoded data. 2916 This will be a slice of $(D dest) if it is provided. 2917 Throws: 2918 $(COREF exception,RangeError) if $(D dest) is given but is too small.$(BR) 2919 $(REF ZmqException) if $(ZMQ) reports an error (i.e., if data.length is not 2920 a multiple of 5). 2921 Corresponds_to: 2922 $(ZMQREF zmq_z85_decode()) 2923 */ 2924 ubyte[] z85Decode(char[] text, ubyte[] dest) 2925 // TODO: Make text const when we update to ZMQ 4.1 2926 { 2927 import core.exception: RangeError; 2928 immutable len = 4 * text.length/5; 2929 if (dest.length < len) throw new RangeError; 2930 if (text[$-1] != '\0') text ~= '\0'; 2931 if (trusted!zmq_z85_decode(dest.ptr, text.ptr) == null) { 2932 throw new ZmqException; 2933 } 2934 return dest[0 .. len]; 2935 } 2936 2937 /// ditto 2938 ubyte[] z85Decode(char[] text) 2939 { 2940 return z85Decode(text, new ubyte[4*text.length/5]); 2941 } 2942 2943 @system unittest // @system because of assertThrown 2944 { 2945 // TODO: Make data immutable when we update to ZMQ 4.1 2946 auto text = "HelloWorld".dup; 2947 immutable decoded = cast(ubyte[])[0x86, 0x4f, 0xd2, 0x6f, 0xb5, 0x59, 0xf7, 0x5b]; 2948 assert (z85Decode(text) == decoded); 2949 assert (z85Decode(text~'\0') == decoded); 2950 2951 auto buffer = new ubyte[8]; 2952 auto result = z85Decode(text, buffer); 2953 assert (result == decoded); 2954 assert (buffer.ptr == result.ptr); 2955 2956 import core.exception: RangeError; 2957 import std.exception: assertThrown; 2958 assertThrown!RangeError(z85Decode(text, new ubyte[7])); 2959 assertThrown!ZmqException(z85Decode("SizeNotAMultipleOf5".dup)); 2960 } 2961 2962 2963 /** 2964 Generates a new CURVE key pair. 2965 2966 To avoid a memory allocation, preallocated buffers may optionally be supplied 2967 for the two keys. Each of these must have a length of at least 41 bytes, enough 2968 for a 40-character Z85-encoded key plus a terminating zero byte. If either 2969 buffer is omitted/$(D null), a new one will be created. 2970 2971 Returns: 2972 A tuple that contains the two keys. Each of these will have a length of 2973 40 characters, and will be slices of the input buffers if such have been 2974 provided. 2975 Throws: 2976 $(COREF exception,RangeError) if $(D publicKeyBuf) or $(D secretKeyBuf) are 2977 not $(D null) but have a length of less than 41 characters.$(BR) 2978 $(REF ZmqException) if $(ZMQ) reports an error. 2979 Corresponds_to: 2980 $(ZMQREF zmq_curve_keypair()) 2981 */ 2982 Tuple!(char[], "publicKey", char[], "secretKey") 2983 curveKeyPair(char[] publicKeyBuf = null, char[] secretKeyBuf = null) 2984 { 2985 import core.exception: RangeError; 2986 if (publicKeyBuf is null) publicKeyBuf = new char[41]; 2987 else if (publicKeyBuf.length < 41) throw new RangeError; 2988 if (secretKeyBuf is null) secretKeyBuf = new char[41]; 2989 else if (secretKeyBuf.length < 41) throw new RangeError; 2990 2991 import deimos.zmq.utils: zmq_curve_keypair; 2992 if (trusted!zmq_curve_keypair(publicKeyBuf.ptr, secretKeyBuf.ptr) != 0) { 2993 throw new ZmqException; 2994 } 2995 return typeof(return)(publicKeyBuf[0 .. 40], secretKeyBuf[0 .. 40]); 2996 } 2997 2998 /// 2999 unittest 3000 { 3001 auto server = Socket(SocketType.rep); 3002 auto serverKeys = curveKeyPair(); 3003 server.curveServer = true; 3004 server.curveSecretKeyZ85 = serverKeys.secretKey; 3005 server.bind("inproc://curveKeyPair_test"); 3006 3007 auto client = Socket(SocketType.req); 3008 auto clientKeys = curveKeyPair(); 3009 client.curvePublicKeyZ85 = clientKeys.publicKey; 3010 client.curveSecretKeyZ85 = clientKeys.secretKey; 3011 client.curveServerKeyZ85 = serverKeys.publicKey; 3012 client.connect("inproc://curveKeyPair_test"); 3013 client.send("hello"); 3014 3015 ubyte[5] buf; 3016 assert (server.receive(buf) == 5); 3017 assert (buf.asString() == "hello"); 3018 } 3019 3020 @system unittest 3021 { 3022 auto k1 = curveKeyPair(); 3023 assert (k1.publicKey.length == 40); 3024 assert (k1.secretKey.length == 40); 3025 3026 char[82] buf; 3027 auto k2 = curveKeyPair(buf[0 .. 41], buf[41 .. 82]); 3028 assert (k2.publicKey.length == 40); 3029 assert (k2.publicKey.ptr == buf.ptr); 3030 assert (k2.secretKey.length == 40); 3031 assert (k2.secretKey.ptr == buf.ptr + 41); 3032 3033 char[82] backup = buf; 3034 import core.exception, std.exception; 3035 assertThrown!RangeError(curveKeyPair(buf[0 .. 40], buf[41 .. 82])); 3036 assertThrown!RangeError(curveKeyPair(buf[0 .. 41], buf[42 .. 82])); 3037 assert (backup[] == buf[]); 3038 } 3039 3040 } //version (WithLibsodium) 3041 3042 3043 /** 3044 Utility function which interprets and validates a byte array as a UTF-8 string. 3045 3046 Most of $(ZMQD)'s message API deals in $(D ubyte[]) arrays, but very often, 3047 the message _data contains plain text. $(D asString()) allows for easy and 3048 safe interpretation of raw _data as characters. It checks that $(D data) is 3049 a valid UTF-8 encoded string, and returns a $(D char[]) array that refers to 3050 the same memory region. 3051 3052 Throws: 3053 $(STDREF utf,UTFException) if $(D data) is not a valid UTF-8 string. 3054 See_also: 3055 $(STDREF string,representation), which performs the opposite operation. 3056 */ 3057 inout(char)[] asString(inout(ubyte)[] data) pure 3058 { 3059 auto s = cast(typeof(return)) data; 3060 import std.utf: validate; 3061 validate(s); 3062 return s; 3063 } 3064 3065 /// 3066 unittest 3067 { 3068 auto s1 = Socket(SocketType.pair); 3069 s1.bind("inproc://zmqd_asString_example"); 3070 auto s2 = Socket(SocketType.pair); 3071 s2.connect("inproc://zmqd_asString_example"); 3072 3073 auto msg = Frame(12); 3074 msg.data.asString()[] = "Hello World!"; 3075 s1.send(msg); 3076 3077 ubyte[12] buf; 3078 s2.receive(buf); 3079 assert(buf.asString() == "Hello World!"); 3080 } 3081 3082 unittest 3083 { 3084 auto bytes = cast(ubyte[]) ['f', 'o', 'o']; 3085 auto text = bytes.asString(); 3086 assert (text == "foo"); 3087 assert (cast(void*) bytes.ptr == cast(void*) text.ptr); 3088 3089 import std.exception: assertThrown; 3090 import std.utf: UTFException; 3091 auto b = cast(ubyte[]) [100, 252, 1]; 3092 assertThrown!UTFException(asString(b)); 3093 } 3094 3095 3096 /** 3097 A class for exceptions thrown when any of the underlying $(ZMQ) C functions 3098 report an error. 3099 3100 The exception provides a standard error message obtained with 3101 $(ZMQREF zmq_strerror()), as well as the $(D errno) code set by the $(ZMQ) 3102 function which reported the error. 3103 */ 3104 class ZmqException : Exception 3105 { 3106 /** 3107 The $(D errno) code that was set by the $(ZMQ) function that reported 3108 the error. 3109 3110 Corresponds_to: 3111 $(ZMQREF zmq_errno()) 3112 */ 3113 immutable int errno; 3114 3115 private: 3116 this(string file = __FILE__, int line = __LINE__) nothrow 3117 { 3118 import core.stdc.errno, std.conv; 3119 this.errno = core.stdc.errno.errno; 3120 string msg; 3121 try { 3122 msg = trusted!(to!string)(trusted!zmq_strerror(this.errno)); 3123 } catch (Exception e) { /* We never get here */ } 3124 assert(msg.length); // Still, let's assert as much. 3125 super(msg, file, line); 3126 } 3127 } 3128 3129 3130 /** 3131 Exception thrown by $(FREF receiveEvent) on failure to interpret a 3132 received message as an event description. 3133 */ 3134 class InvalidEventException : Exception 3135 { 3136 private: 3137 this(string file = __FILE__, int line = __LINE__) nothrow 3138 { 3139 super("The received message is not an event message", file, line); 3140 } 3141 } 3142 3143 3144 // ============================================================================= 3145 // Everything below is internal 3146 // ============================================================================= 3147 private: 3148 3149 3150 struct SharedResource 3151 { 3152 @safe: 3153 alias Exception function(shared(void)*) nothrow Release; 3154 3155 this(shared(void)* ptr, Release release) nothrow 3156 in { assert(ptr); } body 3157 { 3158 m_payload = new shared(Payload)(1, ptr, release); 3159 } 3160 3161 this(this) nothrow 3162 { 3163 if (m_payload) { 3164 incRefCount(); 3165 } 3166 } 3167 3168 ~this() nothrow 3169 { 3170 nothrowDetach(); 3171 } 3172 3173 ref SharedResource opAssign(SharedResource rhs) 3174 { 3175 detach(); 3176 m_payload = rhs.m_payload; 3177 rhs.m_payload = null; 3178 return this; 3179 } 3180 3181 void detach() 3182 { 3183 if (m_payload) { 3184 if (auto ex = nothrowDetach()) throw ex; 3185 } 3186 } 3187 3188 @property inout(shared(void))* handle() inout pure nothrow 3189 { 3190 if (m_payload) { 3191 return m_payload.handle; 3192 } else { 3193 return null; 3194 } 3195 } 3196 3197 private: 3198 void incRefCount() @trusted nothrow 3199 { 3200 assert (m_payload !is null && m_payload.refCount > 0); 3201 import core.atomic: atomicOp; 3202 atomicOp!"+="(m_payload.refCount, 1); 3203 } 3204 3205 int decRefCount() @trusted nothrow 3206 { 3207 assert (m_payload !is null && m_payload.refCount > 0); 3208 import core.atomic: atomicOp; 3209 return atomicOp!"-="(m_payload.refCount, 1); 3210 } 3211 3212 Exception nothrowDetach() @trusted nothrow 3213 out { assert (m_payload is null); } 3214 body 3215 { 3216 if (m_payload) { 3217 scope(exit) m_payload = null; 3218 if (decRefCount() < 1) return m_payload.release(m_payload.handle); 3219 } 3220 return null; 3221 } 3222 3223 struct Payload 3224 { 3225 int refCount; 3226 void* handle; 3227 Release release; 3228 } 3229 shared(Payload)* m_payload; 3230 3231 invariant() 3232 { 3233 assert (m_payload is null || (m_payload.refCount > 0 && 3234 m_payload.handle !is null && m_payload.release !is null)); 3235 } 3236 } 3237 3238 @system unittest 3239 { 3240 import std.exception: assertNotThrown, assertThrown; 3241 static Exception myFree(shared(void)* p) @trusted nothrow 3242 { 3243 auto v = cast(shared(int)*) p; 3244 if (*v == 0) { 3245 return new Exception("double release"); 3246 } else { 3247 *v = 0; 3248 return null; 3249 } 3250 } 3251 3252 shared int i = 1; 3253 3254 { 3255 // Test constructor and properties. 3256 auto ra = SharedResource(&i, &myFree); 3257 assert (i == 1); 3258 assert (ra.handle == &i); 3259 3260 // Test postblit constructor 3261 auto rb = ra; 3262 assert (i == 1); 3263 assert (rb.handle == &i); 3264 3265 { 3266 // Test properties and free() with default-initialized object. 3267 SharedResource rc; 3268 assert (rc.handle == null); 3269 assertNotThrown(rc.detach()); 3270 3271 // Test assignment, both with and without detachment 3272 rc = rb; 3273 assert (i == 1); 3274 assert (rc.handle == &i); 3275 3276 shared int j = 2; 3277 auto rd = SharedResource(&j, &myFree); 3278 assert (rd.handle == &j); 3279 rd = rb; 3280 assert (j == 0); 3281 assert (i == 1); 3282 assert (rd.handle == &i); 3283 3284 // Test explicit detach() 3285 shared int k = 3; 3286 auto re = SharedResource(&k, &myFree); 3287 assertNotThrown(re.detach()); 3288 assert(k == 0); 3289 3290 // Test failure to free and assign (myFree(&k) fails when k == 0) 3291 re = SharedResource(&k, &myFree); 3292 assertThrown!Exception(re.detach()); // We defined free(k == 0) as an error 3293 re = SharedResource(&k, &myFree); 3294 assertThrown!Exception(re = rb); 3295 } 3296 3297 // i should not be "freed" yet 3298 assert (i == 1); 3299 assert (ra.handle == &i); 3300 assert (rb.handle == &i); 3301 } 3302 // ...but now it should. 3303 assert (i == 0); 3304 } 3305 3306 // Thread safety test 3307 @system unittest 3308 { 3309 enum threadCount = 100; 3310 enum copyCount = 1000; 3311 static Exception myFree(shared(void)* p) @trusted nothrow 3312 { 3313 auto v = cast(shared(int)*) p; 3314 if (*v == 0) { 3315 return new Exception("double release"); 3316 } else { 3317 *v = 0; 3318 return null; 3319 } 3320 } 3321 shared int raw = 1; 3322 { 3323 auto rs = SharedResource(&raw, &myFree); 3324 3325 import core.thread; 3326 auto group = new ThreadGroup; 3327 foreach (i; 0 .. threadCount) { 3328 group.create(() { 3329 auto a = rs; 3330 foreach (j; 0 .. copyCount) { 3331 auto b = a; 3332 assert (b.handle == &raw); 3333 assert (raw == 1); 3334 } 3335 }); 3336 } 3337 group.joinAll(); 3338 assert (rs.handle == &raw); 3339 assert (raw == 1); 3340 } 3341 assert (raw == 0); 3342 } 3343 3344 3345 version(unittest) private string uniqueUrl(string p, int n = __LINE__) 3346 { 3347 import std.uuid; 3348 return p ~ "://" ~ randomUUID().toString(); 3349 } 3350 3351 3352 private auto trusted(alias func, Args...)(auto ref Args args) @trusted 3353 { 3354 return func(args); 3355 } 3356 3357 3358 // std.string.toStringz() is unsafe, so we provide our own implementation 3359 // tailored to the string sizes we are likely to encounter here. 3360 // Note that this implementation requires that the string be used immediately 3361 // upon return, and not stored, as the buffer will be reused most of the time. 3362 const(char)* zeroTermString(const char[] s) nothrow 3363 { 3364 import std.algorithm: max; 3365 static char[] buf; 3366 immutable len = s.length + 1; 3367 if (buf.length < len) buf.length = max(len, 1023); 3368 buf[0 .. s.length] = s; 3369 buf[s.length] = '\0'; 3370 return buf.ptr; 3371 } 3372 3373 @system unittest 3374 { 3375 auto c1 = zeroTermString("Hello World!"); 3376 assert (c1[0 .. 13] == "Hello World!\0"); 3377 auto c2 = zeroTermString("foo"); 3378 assert (c2[0 .. 4] == "foo\0"); 3379 assert (c1 == c2); 3380 }