1 /* 2 This Source Code Form is subject to the terms of the Mozilla Public 3 License, v. 2.0. If a copy of the MPL was not distributed with this 4 file, You can obtain one at http://mozilla.org/MPL/2.0/. 5 */ 6 7 /** 8 $(ZMQD) – a thin wrapper around the low-level C API of the 9 $(LINK2 http://zeromq.org,$(ZMQ)) messaging framework. 10 11 Most functions in this module have a one-to-one relationship with functions 12 in the underlying C API. Some adaptations have been made to make the API 13 safer, easier and more pleasant to use; most importantly: 14 $(UL 15 $(LI 16 Errors are signalled by means of exceptions rather than return 17 codes. The $(REF ZmqException) class provides 18 a standard textual message for any error condition, but it also 19 provides access to the $(D errno) code set by the C function 20 that reported the error.) 21 $(LI 22 Functions are marked with $(D @safe), $(D pure) and $(D nothrow) 23 as appropriate, thus facilitating their use in high-level D code.) 24 $(LI 25 Memory and resources (i.e. contexts, sockets and messages) are 26 automatically managed, thus preventing leaks.) 27 $(LI 28 Context, socket and message options are implemented as properties.) 29 ) 30 The names of functions and types in $(ZMQD) are very similar to those in 31 $(ZMQ), but they follow the D naming conventions. Thus, the library should 32 feel both familiar to $(ZMQ) users and natural to D users. A notable 33 deviation from the C API is that message parts are consistently called 34 "frames". For example, $(D zmq_msg_send()) becomes $(D zmqd.Frame.send()) 35 and so on. (Multipart messages were a late addition to $(ZMQ), and the "msg" 36 function names were well established at that point. The library's 37 developers have admitted that this is somewhat confusing, and the newer 38 CZMQ API consistently uses "frame" in function names.) 39 40 Due to the close correspondence with the C API, this documentation has 41 intentionally been kept sparse. There is really no reason to repeat the 42 contents of the $(ZMQAPI __start,$(ZMQ) reference manual) here. 43 Instead, the documentation for each function contains a "Corresponds to" 44 section that links to the appropriate pages in the $(ZMQ) reference. Any 45 details given in the present documentation mostly concern the D-specific 46 adaptations that have been made. 47 48 Also note that the examples generally only use the INPROC transport. The 49 reason for this is that the examples double as unittests, and we want to 50 avoid firewall troubles and other issues that could arise with the use of 51 network protocols such as TCP, PGM, etc., and the IPC protocol is not 52 supported on Windows. Anyway, they are only short 53 snippets that demonstrate the syntax; for more comprehensive and realistic 54 examples, please refer to the $(LINK2 http://zguide.zeromq.org/page:all, 55 $(ZMQ) Guide). Many of the examples in the Guide have been translated to 56 D, and can be found in the 57 $(LINK2 https://github.com/kyllingstad/zmqd/tree/master/examples,$(D examples)) 58 subdirectory of the $(ZMQD) source repository. 59 60 Version: 61 1.1.0 (compatible with $(ZMQ) >= 4.0.0) 62 Authors: 63 $(LINK2 http://github.com/kyllingstad,Lars T. Kyllingstad) 64 Copyright: 65 Copyright (c) 2013–2016, Lars T. Kyllingstad. All rights reserved. 66 License: 67 $(ZMQD) is released under the terms of the 68 $(LINK2 https://www.mozilla.org/MPL/2.0/index.txt,Mozilla Public License v. 2.0).$(BR) 69 Please refer to the $(LINK2 http://zeromq.org/area:licensing,$(ZMQ) site) 70 for details about $(ZMQ) licensing. 71 Macros: 72 D = <code>$0</code> 73 EM = <em>$0</em> 74 LDOTS = … 75 QUOTE = <blockquote>$0</blockquote> 76 FREF = $(D $(LINK2 #$1,$1())) 77 REF = $(D $(LINK2 #$1,$1)) 78 COREF = $(D $(LINK2 http://dlang.org/phobos/core_$1.html#.$2,core.$1.$2)) 79 OBJREF = $(D $(LINK2 http://dlang.org/phobos/object.html#.$1,$1)) 80 STDREF = $(D $(LINK2 http://dlang.org/phobos/std_$1.html#.$2,std.$1.$2)) 81 ANCHOR = <span id="$1"></span> 82 ZMQ = ZeroMQ 83 ZMQAPI = $(LINK2 http://api.zeromq.org/4-0:$1,$+) 84 ZMQD = $(EM zmqd) 85 ZMQREF = $(D $(ZMQAPI $1,$1)) 86 */ 87 module zmqd; 88 89 import core.time; 90 import std.typecons; 91 import deimos.zmq.zmq; 92 93 94 version(Windows) { 95 import core.sys.windows.winsock2: SOCKET; 96 } 97 98 // Include ZMQ >= 4.1 features? 99 private enum ZMQ41 = ZMQ_MAKE_VERSION(4, 1, 0); 100 101 // Compile with debug=ZMQD_DisableCurveTests to be able to successfully 102 // run unittests even if the local ZMQ library is not compiled with Curve 103 // support. 104 debug (ZMQD_DisableCurveTests) { } else debug = WithCurveTests; 105 106 // Compatibility check 107 version(unittest) static this() 108 { 109 import std.stdio: stderr; 110 const v = zmqVersion(); 111 if (v.major != ZMQ_VERSION_MAJOR || v.minor != ZMQ_VERSION_MINOR) { 112 stderr.writefln( 113 "Warning: Potential ZeroMQ header/library incompatibility: " 114 ~"The header (binding) is for version %d.%d.%d, " 115 ~"while the library is version %d.%d.%d. Unittests may fail.", 116 ZMQ_VERSION_MAJOR, ZMQ_VERSION_MINOR, ZMQ_VERSION_PATCH, 117 v.major, v.minor, v.patch); 118 } 119 // Known incompatibilities 120 import std.algorithm: min, max; 121 const libVersion = ZMQ_MAKE_VERSION(v.major, v.minor, v.patch); 122 const older = min(ZMQ_VERSION, libVersion); 123 const newer = max(ZMQ_VERSION, libVersion); 124 if (older < ZMQ_MAKE_VERSION(4, 1, 0) && newer >= ZMQ_MAKE_VERSION(4, 1, 0)) { 125 stderr.writeln("Note: Version 4.1.0 is known to be ABI incompatible with older versions"); 126 } else if (older < ZMQ_MAKE_VERSION(4, 1, 1) && newer >= ZMQ_MAKE_VERSION(4, 1, 1)) { 127 stderr.writeln("Note: Version 4.1.1 is known to be ABI incompatible with older versions"); 128 } 129 } 130 131 132 @safe: 133 134 135 /** 136 Reports the $(ZMQ) library version. 137 138 Returns: 139 A $(STDREF typecons,Tuple) with three integer fields that represent the 140 three versioning levels: $(D major), $(D minor) and $(D patch). 141 Corresponds_to: 142 $(ZMQREF zmq_version()) 143 */ 144 Tuple!(int, "major", int, "minor", int, "patch") zmqVersion() nothrow 145 { 146 typeof(return) v; 147 trusted!zmq_version(&v.major, &v.minor, &v.patch); 148 return v; 149 } 150 151 152 static if (ZMQ_VERSION >= ZMQ41) { 153 /** 154 Checks for a $(ZMQ) capability. 155 156 Corresponds_to: 157 $(ZMQREF zmq_has()) 158 */ 159 bool zmqHas(const char[] capability) nothrow 160 { 161 return 0 != trusted!zmq_has(zeroTermString(capability)); 162 } 163 164 unittest 165 { 166 assert (!zmqHas("this ain't a capability")); 167 } 168 } 169 170 171 /** 172 The various socket types. 173 174 These are described in the $(ZMQREF zmq_socket()) reference. 175 */ 176 enum SocketType 177 { 178 req = ZMQ_REQ, /// Corresponds to $(D ZMQ_REQ) 179 rep = ZMQ_REP, /// Corresponds to $(D ZMQ_REP) 180 dealer = ZMQ_DEALER, /// Corresponds to $(D ZMQ_DEALER) 181 router = ZMQ_ROUTER, /// Corresponds to $(D ZMQ_ROUTER) 182 pub = ZMQ_PUB, /// Corresponds to $(D ZMQ_PUB) 183 sub = ZMQ_SUB, /// Corresponds to $(D ZMQ_SUB) 184 xpub = ZMQ_XPUB, /// Corresponds to $(D ZMQ_XPUB) 185 xsub = ZMQ_XSUB, /// Corresponds to $(D ZMQ_XSUB) 186 push = ZMQ_PUSH, /// Corresponds to $(D ZMQ_PUSH) 187 pull = ZMQ_PULL, /// Corresponds to $(D ZMQ_PULL) 188 pair = ZMQ_PAIR, /// Corresponds to $(D ZMQ_PAIR) 189 stream = ZMQ_STREAM, /// Corresponds to $(D ZMQ_STREAM) 190 } 191 192 193 /// _Security mechanisms. 194 enum Security 195 { 196 /// $(ZMQAPI zmq_null,NULL): No security or confidentiality. 197 none = ZMQ_NULL, 198 199 /// $(ZMQAPI zmq_plain,PLAIN): Clear-text authentication. 200 plain = ZMQ_PLAIN, 201 202 /// $(ZMQAPI zmq_curve,CURVE): Secure authentication and confidentiality. 203 curve = ZMQ_CURVE, 204 } 205 206 207 /** 208 A special $(COREF time,Duration) value used to signify an infinite 209 timeout or time interval in certain contexts. 210 211 The places where this value may be used are: 212 $(UL 213 $(LI $(LINK2 #Socket.misc_options,certain socket options)) 214 $(LI $(FREF poll)) 215 ) 216 217 Note: 218 Since $(D core.time.Duration) doesn't reserve such a special value, the 219 actual value of $(D infiniteDuration) is $(COREF time,Duration.max). 220 */ 221 enum infiniteDuration = Duration.max; 222 223 224 /** 225 An object that encapsulates a $(ZMQ) socket. 226 227 A default-initialized $(D Socket) is not a valid $(ZMQ) socket; it 228 must always be explicitly initialized with a constructor (see 229 $(FREF _Socket.this)): 230 --- 231 Socket s; // Not a valid socket yet 232 s = Socket(SocketType.push); // ...but now it is. 233 --- 234 This $(D struct) is noncopyable, which means that a socket is always 235 uniquely managed by a single $(D Socket) object. Functions that will 236 inspect or use the socket, but not take ownership of it, should take 237 a $(D ref Socket) parameter. Use $(STDREF algorithm,move) to move 238 a $(D Socket) to a different location (e.g. into a sink function that 239 takes it by value, or into a new variable). 240 241 The socket is automatically closed when the $(D Socket) object goes out 242 of scope. 243 244 Linger_period: 245 Note that Socket by default sets the socket's linger period to zero. 246 This deviates from the $(ZMQ) default (which is an infinite linger period). 247 */ 248 struct Socket 249 { 250 @safe: 251 /** 252 Creates a new $(ZMQ) socket. 253 254 If $(D context) is not specified, the default _context (as returned 255 by $(FREF defaultContext)) is used. 256 257 Throws: 258 $(REF ZmqException) if $(ZMQ) reports an error. 259 Corresponds_to: 260 $(ZMQREF zmq_socket()) 261 */ 262 this(SocketType type) 263 { 264 this(defaultContext(), type); 265 } 266 267 /// ditto 268 this(Context context, SocketType type) 269 { 270 m_context = context; 271 m_type = type; 272 m_socket = trusted!zmq_socket(context.handle, type); 273 if (m_socket == null) { 274 throw new ZmqException; 275 } 276 linger = 0.msecs; 277 } 278 279 /// With default context: 280 unittest 281 { 282 auto sck = Socket(SocketType.push); 283 assert (sck.initialized); 284 } 285 /// With explicit context: 286 unittest 287 { 288 auto ctx = Context(); 289 auto sck = Socket(ctx, SocketType.push); 290 assert (sck.initialized); 291 } 292 293 // Socket objects are noncopyable. 294 @disable this(this); 295 296 unittest // Verify that move semantics work. 297 { 298 import std.algorithm: move; 299 struct SocketOwner 300 { 301 this(Socket s) { owned = trusted!move(s); } 302 Socket owned; 303 } 304 auto socket = Socket(SocketType.req); 305 const socketPtr = socket.handle; 306 assert (socketPtr != null); 307 308 auto owner = SocketOwner(trusted!move(socket)); 309 assert (socket.handle == null); 310 assert (!socket.m_context.initialized); 311 assert (owner.owned.handle == socketPtr); 312 assert (owner.owned.m_context.initialized); 313 } 314 315 // Closes the socket on desctruction. 316 ~this() nothrow { nothrowClose(); } 317 318 /** 319 Closes the $(ZMQ) socket. 320 321 Note that the socket will be closed automatically upon destruction, 322 so it is usually not necessary to call this method manually. 323 324 Throws: 325 $(REF ZmqException) if $(ZMQ) reports an error. 326 Corresponds_to: 327 $(ZMQREF zmq_close()) 328 */ 329 void close() 330 { 331 if (!nothrowClose()) throw new ZmqException; 332 } 333 334 /// 335 unittest 336 { 337 auto s = Socket(SocketType.pair); 338 assert (s.initialized); 339 s.close(); 340 assert (!s.initialized); 341 } 342 343 /** 344 Starts accepting incoming connections on $(D endpoint). 345 346 Throws: 347 $(REF ZmqException) if $(ZMQ) reports an error. 348 Corresponds_to: 349 $(ZMQREF zmq_bind()) 350 */ 351 void bind(const char[] endpoint) 352 { 353 if (trusted!zmq_bind(m_socket, zeroTermString(endpoint)) != 0) { 354 throw new ZmqException; 355 } 356 } 357 358 /// 359 unittest 360 { 361 auto s = Socket(SocketType.pub); 362 s.bind("inproc://zmqd_bind_example"); 363 } 364 365 /** 366 Stops accepting incoming connections on $(D endpoint). 367 368 Throws: 369 $(REF ZmqException) if $(ZMQ) reports an error. 370 Corresponds_to: 371 $(ZMQREF zmq_unbind()) 372 */ 373 void unbind(const char[] endpoint) 374 { 375 if (trusted!zmq_unbind(m_socket, zeroTermString(endpoint)) != 0) { 376 throw new ZmqException; 377 } 378 } 379 380 // TODO: Remove version(Posix) and change to INPROC when updating to ZMQ 4.1. 381 // IPC does not work on Windows, and unbind() does not work with INPROC. 382 // See: https://github.com/zeromq/libzmq/issues/949 383 /// 384 version (Posix) unittest 385 { 386 auto s = Socket(SocketType.pub); 387 s.bind("ipc://zmqd_unbind_example"); 388 // Do some work... 389 s.unbind("ipc://zmqd_unbind_example"); 390 } 391 392 /** 393 Creates an outgoing connection to $(D endpoint). 394 395 Throws: 396 $(REF ZmqException) if $(ZMQ) reports an error. 397 Corresponds_to: 398 $(ZMQREF zmq_connect()) 399 */ 400 void connect(const char[] endpoint) 401 { 402 if (trusted!zmq_connect(m_socket, zeroTermString(endpoint)) != 0) { 403 throw new ZmqException; 404 } 405 } 406 407 /// 408 unittest 409 { 410 auto s = Socket(SocketType.sub); 411 s.connect("inproc://zmqd_connect_example"); 412 } 413 414 /** 415 Disconnects the socket from $(D endpoint). 416 417 Throws: 418 $(REF ZmqException) if $(ZMQ) reports an error. 419 Corresponds_to: 420 $(ZMQREF zmq_disconnect()) 421 */ 422 void disconnect(const char[] endpoint) 423 { 424 if (trusted!zmq_disconnect(m_socket, zeroTermString(endpoint)) != 0) { 425 throw new ZmqException; 426 } 427 } 428 429 /// 430 unittest 431 { 432 auto s = Socket(SocketType.sub); 433 s.connect("inproc://zmqd_disconnect_example"); 434 // Do some work... 435 s.disconnect("inproc://zmqd_disconnect_example"); 436 } 437 438 /** 439 Sends a message frame. 440 441 $(D _send) blocks until the frame has been queued on the socket. 442 $(D trySend) performs the operation in non-blocking mode, and returns 443 a $(D bool) value that signifies whether the frame was queued on the 444 socket. 445 446 The $(D more) parameter specifies whether this is a multipart message 447 and there are _more frames to follow. 448 449 The $(D char[]) overload is a convenience function that simply casts 450 the string argument to $(D ubyte[]). 451 452 Throws: 453 $(REF ZmqException) if $(ZMQ) reports an error. 454 Corresponds_to: 455 $(ZMQREF zmq_send()) (with the $(D ZMQ_DONTWAIT) flag, in the 456 case of $(D trySend), and with the $(D ZMQ_SNDMORE) flag if 457 $(D more == true)). 458 */ 459 void send(const ubyte[] data, bool more = false) 460 { 461 immutable flags = more ? ZMQ_SNDMORE : 0; 462 if (trusted!zmq_send(m_socket, data.ptr, data.length, flags) < 0) { 463 throw new ZmqException; 464 } 465 } 466 467 /// ditto 468 void send(const char[] data, bool more = false) 469 { 470 send(cast(const(ubyte)[]) data, more); 471 } 472 473 /// ditto 474 bool trySend(const ubyte[] data, bool more = false) 475 { 476 immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0); 477 if (trusted!zmq_send(m_socket, data.ptr, data.length, flags) < 0) { 478 import core.stdc.errno; 479 if (errno == EAGAIN) return false; 480 else throw new ZmqException; 481 } 482 return true; 483 } 484 485 /// ditto 486 bool trySend(const char[] data, bool more = false) 487 { 488 return trySend(cast(const(ubyte)[]) data, more); 489 } 490 491 /// 492 unittest 493 { 494 auto sck = Socket(SocketType.pub); 495 sck.send(cast(ubyte[]) [11, 226, 92]); 496 sck.send("Hello World!"); 497 } 498 499 /** 500 Sends a message frame. 501 502 $(D _send) blocks until the frame has been queued on the socket. 503 $(D trySend) performs the operation in non-blocking mode, and returns 504 a $(D bool) value that signifies whether the frame was queued on the 505 socket. 506 507 The $(D more) parameter specifies whether this is a multipart message 508 and there are _more frames to follow. 509 510 Throws: 511 $(REF ZmqException) if $(ZMQ) reports an error. 512 Corresponds_to: 513 $(ZMQREF zmq_msg_send()) (with the $(D ZMQ_DONTWAIT) flag, in the 514 case of $(D trySend), and with the $(D ZMQ_SNDMORE) flag if 515 $(D more == true)). 516 */ 517 void send(ref Frame msg, bool more = false) 518 { 519 immutable flags = more ? ZMQ_SNDMORE : 0; 520 if (trusted!zmq_msg_send(msg.handle, m_socket, flags) < 0) { 521 throw new ZmqException; 522 } 523 } 524 525 /// ditto 526 bool trySend(ref Frame msg, bool more = false) 527 { 528 immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0); 529 if (trusted!zmq_msg_send(msg.handle, m_socket, flags) < 0) { 530 import core.stdc.errno; 531 if (errno == EAGAIN) return false; 532 else throw new ZmqException; 533 } 534 return true; 535 } 536 537 /// 538 unittest 539 { 540 auto sck = Socket(SocketType.pub); 541 auto msg = Frame(12); 542 msg.data.asString()[] = "Hello World!"; 543 sck.send(msg); 544 } 545 546 /** 547 Sends a constant-memory message frame. 548 549 $(D _sendConst) blocks until the frame has been queued on the socket. 550 $(D trySendConst) performs the operation in non-blocking mode, and returns 551 a $(D bool) value that signifies whether the frame was queued on the 552 socket. 553 554 The $(D more) parameter specifies whether this is a multipart message 555 and there are _more frames to follow. 556 557 Throws: 558 $(REF ZmqException) if $(ZMQ) reports an error. 559 Corresponds_to: 560 $(ZMQREF zmq_send_const()) (with the $(D ZMQ_DONTWAIT) flag, in the 561 case of $(D trySend), and with the $(D ZMQ_SNDMORE) flag if 562 $(D more == true)). 563 */ 564 void sendConst(immutable ubyte[] data, bool more = false) 565 { 566 immutable flags = more ? ZMQ_SNDMORE : 0; 567 if (trusted!zmq_send_const(m_socket, data.ptr, data.length, flags) < 0) { 568 throw new ZmqException; 569 } 570 } 571 572 /// ditto 573 void sendConst(string data, bool more = false) 574 { 575 sendConst(cast(immutable(ubyte)[]) data, more); 576 } 577 578 /// ditto 579 bool trySendConst(immutable ubyte[] data, bool more = false) 580 { 581 immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0); 582 if (trusted!zmq_send_const(m_socket, data.ptr, data.length, flags) < 0) { 583 import core.stdc.errno; 584 if (errno == EAGAIN) return false; 585 else throw new ZmqException; 586 } 587 return true; 588 } 589 590 /// ditto 591 bool trySendConst(string data, bool more = false) 592 { 593 return trySend(cast(immutable(ubyte)[]) data, more); 594 } 595 596 /// 597 unittest 598 { 599 static immutable arr = cast(ubyte[]) [11, 226, 92]; 600 auto sck = Socket(SocketType.pub); 601 sck.sendConst(arr); 602 sck.sendConst("Hello World!"); 603 } 604 605 /** 606 Receives a message frame. 607 608 $(D _receive) blocks until the request can be satisfied, and returns the 609 number of bytes in the frame. 610 $(D tryReceive) performs the operation in non-blocking mode, and returns 611 a $(STDREF typecons,Tuple) which contains the size of the frame along 612 with a $(D bool) value that signifies whether a frame was received. 613 (If the latter is $(D false), the former is always set to zero.) 614 615 Throws: 616 $(REF ZmqException) if $(ZMQ) reports an error. 617 Corresponds_to: 618 $(ZMQREF zmq_recv()) (with the $(D ZMQ_DONTWAIT) flag, in the case 619 of $(D tryReceive)). 620 621 */ 622 size_t receive(ubyte[] data) 623 { 624 immutable len = trusted!zmq_recv(m_socket, data.ptr, data.length, 0); 625 if (len >= 0) { 626 import std.conv; 627 return to!size_t(len); 628 } else { 629 throw new ZmqException; 630 } 631 } 632 633 /// ditto 634 Tuple!(size_t, bool) tryReceive(ubyte[] data) 635 { 636 immutable len = trusted!zmq_recv(m_socket, data.ptr, data.length, ZMQ_DONTWAIT); 637 if (len >= 0) { 638 import std.conv; 639 return typeof(return)(to!size_t(len), true); 640 } else { 641 import core.stdc.errno; 642 if (errno == EAGAIN) { 643 return typeof(return)(0, false); 644 } else { 645 throw new ZmqException; 646 } 647 } 648 } 649 650 /// 651 unittest 652 { 653 // Sender 654 auto snd = Socket(SocketType.req); 655 snd.connect("inproc://zmqd_receive_example"); 656 snd.send("Hello World!"); 657 658 // Receiver 659 import std.string: representation; 660 auto rcv = Socket(SocketType.rep); 661 rcv.bind("inproc://zmqd_receive_example"); 662 char[256] buf; 663 immutable len = rcv.receive(buf.representation); 664 assert (buf[0 .. len] == "Hello World!"); 665 } 666 667 @system unittest 668 { 669 auto snd = Socket(SocketType.pair); 670 snd.bind("inproc://zmqd_tryReceive_example"); 671 auto rcv = Socket(SocketType.pair); 672 rcv.connect("inproc://zmqd_tryReceive_example"); 673 674 ubyte[256] buf; 675 auto r1 = rcv.tryReceive(buf); 676 assert (!r1[1]); 677 678 import core.thread, core.time, std.string; 679 snd.send("Hello World!"); 680 Thread.sleep(100.msecs); // Wait for message to be transferred... 681 auto r2 = rcv.tryReceive(buf); 682 assert (r2[1] && buf[0 .. r2[0]] == "Hello World!".representation); 683 } 684 685 /** 686 Receives a message frame. 687 688 $(D _receive) blocks until the request can be satisfied, and returns the 689 number of bytes in the frame. 690 $(D tryReceive) performs the operation in non-blocking mode, and returns 691 a $(STDREF typecons,Tuple) which contains the size of the frame along 692 with a $(D bool) value that signifies whether a frame was received. 693 (If the latter is $(D false), the former is always set to zero.) 694 695 Throws: 696 $(REF ZmqException) if $(ZMQ) reports an error. 697 Corresponds_to: 698 $(ZMQREF zmq_msg_recv()) (with the $(D ZMQ_DONTWAIT) flag, in the case 699 of $(D tryReceive)). 700 701 */ 702 size_t receive(ref Frame msg) 703 { 704 immutable len = trusted!zmq_msg_recv(msg.handle, m_socket, 0); 705 if (len >= 0) { 706 import std.conv; 707 return to!size_t(len); 708 } else { 709 throw new ZmqException; 710 } 711 } 712 713 /// ditto 714 Tuple!(size_t, bool) tryReceive(ref Frame msg) 715 { 716 immutable len = trusted!zmq_msg_recv(msg.handle, m_socket, ZMQ_DONTWAIT); 717 if (len >= 0) { 718 import std.conv; 719 return typeof(return)(to!size_t(len), true); 720 } else { 721 import core.stdc.errno; 722 if (errno == EAGAIN) { 723 return typeof(return)(0, false); 724 } else { 725 throw new ZmqException; 726 } 727 } 728 } 729 730 /// 731 unittest 732 { 733 // Sender 734 auto snd = Socket(SocketType.req); 735 snd.connect("inproc://zmqd_msg_receive_example"); 736 snd.send("Hello World!"); 737 738 // Receiver 739 import std.string: representation; 740 auto rcv = Socket(SocketType.rep); 741 rcv.bind("inproc://zmqd_msg_receive_example"); 742 auto msg = Frame(); 743 rcv.receive(msg); 744 assert (msg.data.asString() == "Hello World!"); 745 } 746 747 @system unittest 748 { 749 auto snd = Socket(SocketType.pair); 750 snd.bind("inproc://zmqd_msg_tryReceive_example"); 751 auto rcv = Socket(SocketType.pair); 752 rcv.connect("inproc://zmqd_msg_tryReceive_example"); 753 754 auto msg = Frame(); 755 auto r1 = rcv.tryReceive(msg); 756 assert (!r1[1]); 757 758 import core.thread, core.time, std.string; 759 snd.send("Hello World!"); 760 Thread.sleep(100.msecs); // Wait for message to be transferred... 761 auto r2 = rcv.tryReceive(msg); 762 assert (r2[1] && msg.data[0 .. r2[0]] == "Hello World!".representation); 763 } 764 765 /** 766 Whether there are _more message frames to follow. 767 768 Throws: 769 $(REF ZmqException) if $(ZMQ) reports an error. 770 Corresponds_to: 771 $(ZMQREF zmq_getsockopt()) with $(D ZMQ_RCVMORE). 772 */ 773 @property bool more() { return !!getOption!int(ZMQ_RCVMORE); } 774 775 // TODO: Better unittest/example 776 unittest 777 { 778 auto sck = Socket(SocketType.req); 779 assert (!sck.more); 780 } 781 782 /** $(ANCHOR Socket.misc_options) 783 Miscellaneous socket options. 784 785 Each of these corresponds to an option passed to 786 $(ZMQREF zmq_getsockopt()) and $(ZMQREF zmq_setsockopt()). For 787 example, $(D identity) corresponds to $(D ZMQ_IDENTITY), 788 $(D receiveBufferSize) corresponds to $(D ZMQ_RCVBUF), etc. 789 790 Notes: 791 $(UL 792 $(LI For convenience, the setter for the $(D identity) property 793 accepts strings. To retrieve a string with the getter, use 794 the $(FREF asString) function. 795 --- 796 sck.identity = "foobar"; 797 assert (sck.identity.asString() == "foobar"); 798 --- 799 ) 800 $(LI The $(D linger), $(D receiveTimeout), $(D sendTimeout) and 801 $(D handshakeInterval) properties may have the special _value 802 $(REF infiniteDuration). This is translated to an option 803 _value of -1 or 0 (depending on which property is being set) 804 in the C API.) 805 $(LI Some options have array _type, and these allow the user to supply 806 a buffer in which to store the _value, to avoid a GC allocation. 807 The return _value is then a slice of this buffer. 808 These are not marked as $(D @property), but are prefixed with 809 "get" (e.g. $(D getIdentity())). A user-supplied buffer is 810 $(EM required) for some options, namely $(D getPlainUsername()) 811 and $(D getPlainPassword()), and these do not have $(D @property) 812 versions. $(D getCurveXxxKey()) and $(D getCurveXxxKeyZ85()) 813 require buffers which are at least 32 and 41 bytes long, 814 respectively.) 815 $(LI The $(D ZMQ_SUBSCRIBE) and $(D ZMQ_UNSUBSCRIBE) options are 816 treated differently from the others; see $(FREF Socket.subscribe) 817 and $(FREF Socket.unsubscribe)) 818 ) 819 820 Throws: 821 $(REF ZmqException) if $(ZMQ) reports an error.$(BR) 822 $(STDREF conv,ConvOverflowException) if a given $(D Duration) is 823 longer than the number of milliseconds that will fit in an $(D int) 824 (only applies to properties of $(COREF time,Duration) _type).$(BR) 825 $(COREF exception,RangeError) if the $(D dest) buffers passed to 826 $(D getCurveXxxKey()) or $(D getCurveXxxKeyZ85()) are less than 827 32 or 41 bytes long, respectively. 828 Corresponds_to: 829 $(ZMQREF zmq_getsockopt()) and $(ZMQREF zmq_setsockopt()). 830 */ 831 @property ulong threadAffinity() { return getOption!ulong(ZMQ_AFFINITY); } 832 /// ditto 833 @property void threadAffinity(ulong value) { setOption(ZMQ_AFFINITY, value); } 834 835 /// ditto 836 @property ubyte[] identity() { return getIdentity(new ubyte[255]); } 837 /// ditto 838 ubyte[] getIdentity(ubyte[] dest) { return getArrayOption(ZMQ_IDENTITY, dest); } 839 /// ditto 840 @property void identity(const ubyte[] value) { setArrayOption(ZMQ_IDENTITY, value); } 841 /// ditto 842 @property void identity(const char[] value) { setArrayOption(ZMQ_IDENTITY, value); } 843 844 /// ditto 845 @property int rate() { return getOption!int(ZMQ_RATE); } 846 /// ditto 847 @property void rate(int value) { setOption(ZMQ_RATE, value); } 848 849 /// ditto 850 @property Duration recoveryInterval() 851 { 852 return msecs(getOption!int(ZMQ_RECOVERY_IVL)); 853 } 854 /// ditto 855 @property void recoveryInterval(Duration value) 856 { 857 import std.conv: to; 858 setOption(ZMQ_RECOVERY_IVL, to!int(value.total!"msecs"())); 859 } 860 861 /// ditto 862 @property int sendBufferSize() { return getOption!int(ZMQ_SNDBUF); } 863 /// ditto 864 @property void sendBufferSize(int value) { setOption(ZMQ_SNDBUF, value); } 865 866 /// ditto 867 @property int receiveBufferSize() { return getOption!int(ZMQ_RCVBUF); } 868 /// ditto 869 @property void receiveBufferSize(int value) { setOption(ZMQ_RCVBUF, value); } 870 871 /// ditto 872 @property FD fd() { return getOption!FD(ZMQ_FD); } 873 874 /// ditto 875 @property PollFlags events() { return getOption!PollFlags(ZMQ_EVENTS); } 876 877 /// ditto 878 @property SocketType type() { return getOption!SocketType(ZMQ_TYPE); } 879 880 /// ditto 881 @property Duration linger() 882 { 883 const auto value = getOption!int(ZMQ_LINGER); 884 return value == -1 ? infiniteDuration : value.msecs; 885 } 886 /// ditto 887 @property void linger(Duration value) 888 { 889 import std.conv: to; 890 setOption( 891 ZMQ_LINGER, 892 value == infiniteDuration ? -1 : to!int(value.total!"msecs"())); 893 } 894 895 /// ditto 896 @property Duration reconnectionInterval() 897 { 898 return getOption!int(ZMQ_RECONNECT_IVL).msecs; 899 } 900 /// ditto 901 @property void reconnectionInterval(Duration value) 902 { 903 import std.conv: to; 904 setOption(ZMQ_RECONNECT_IVL, to!int(value.total!"msecs"())); 905 } 906 907 /// ditto 908 @property int backlog() { return getOption!int(ZMQ_BACKLOG); } 909 /// ditto 910 @property void backlog(int value) { setOption(ZMQ_BACKLOG, value); } 911 912 /// ditto 913 @property Duration maxReconnectionInterval() 914 { 915 return getOption!int(ZMQ_RECONNECT_IVL_MAX).msecs; 916 } 917 /// ditto 918 @property void maxReconnectionInterval(Duration value) 919 { 920 import std.conv: to; 921 setOption(ZMQ_RECONNECT_IVL_MAX, to!int(value.total!"msecs"())); 922 } 923 924 /// ditto 925 @property long maxMsgSize() { return getOption!long(ZMQ_MAXMSGSIZE); } 926 /// ditto 927 @property void maxMsgSize(long value) { setOption(ZMQ_MAXMSGSIZE, value); } 928 929 /// ditto 930 @property int sendHWM() { return getOption!int(ZMQ_SNDHWM); } 931 /// ditto 932 @property void sendHWM(int value) { setOption(ZMQ_SNDHWM, value); } 933 934 /// ditto 935 @property int receiveHWM() { return getOption!int(ZMQ_RCVHWM); } 936 /// ditto 937 @property void receiveHWM(int value) { setOption(ZMQ_RCVHWM, value); } 938 939 /// ditto 940 @property int multicastHops() { return getOption!int(ZMQ_MULTICAST_HOPS); } 941 /// ditto 942 @property void multicastHops(int value) { setOption(ZMQ_MULTICAST_HOPS, value); } 943 944 /// ditto 945 @property Duration receiveTimeout() 946 { 947 const value = getOption!int(ZMQ_RCVTIMEO); 948 return value == -1 ? infiniteDuration : value.msecs; 949 } 950 /// ditto 951 @property void receiveTimeout(Duration value) 952 { 953 import std.conv: to; 954 setOption( 955 ZMQ_RCVTIMEO, 956 value == infiniteDuration ? -1 : to!int(value.total!"msecs"())); 957 } 958 959 /// ditto 960 @property Duration sendTimeout() 961 { 962 const value = getOption!int(ZMQ_SNDTIMEO); 963 return value == -1 ? infiniteDuration : value.msecs; 964 } 965 /// ditto 966 @property void sendTimeout(Duration value) 967 { 968 import std.conv: to; 969 setOption( 970 ZMQ_SNDTIMEO, 971 value == infiniteDuration ? -1 : to!int(value.total!"msecs"())); 972 } 973 974 /// ditto 975 @property char[] lastEndpoint() @trusted 976 { 977 // This function is not @safe because it calls a @system function 978 // (zmq_getsockopt) and takes the address of a local (len). 979 auto buf = new char[1024]; 980 size_t len = buf.length; 981 if (zmq_getsockopt(m_socket, ZMQ_LAST_ENDPOINT, buf.ptr, &len) != 0) { 982 throw new ZmqException; 983 } 984 return buf[0 .. len-1]; 985 } 986 987 /// ditto 988 @property void routerMandatory(bool value) { setOption(ZMQ_ROUTER_MANDATORY, value ? 1 : 0); } 989 990 /// ditto 991 @property int tcpKeepalive() { return getOption!int(ZMQ_TCP_KEEPALIVE); } 992 /// ditto 993 @property void tcpKeepalive(int value) { setOption(ZMQ_TCP_KEEPALIVE, value); } 994 995 /// ditto 996 @property int tcpKeepaliveCnt() { return getOption!int(ZMQ_TCP_KEEPALIVE_CNT); } 997 /// ditto 998 @property void tcpKeepaliveCnt(int value) { setOption(ZMQ_TCP_KEEPALIVE_CNT, value); } 999 1000 /// ditto 1001 @property int tcpKeepaliveIdle() { return getOption!int(ZMQ_TCP_KEEPALIVE_IDLE); } 1002 /// ditto 1003 @property void tcpKeepaliveIdle(int value) { setOption(ZMQ_TCP_KEEPALIVE_IDLE, value); } 1004 1005 /// ditto 1006 @property int tcpKeepaliveIntvl() { return getOption!int(ZMQ_TCP_KEEPALIVE_INTVL); } 1007 /// ditto 1008 @property void tcpKeepaliveIntvl(int value) { setOption(ZMQ_TCP_KEEPALIVE_INTVL, value); } 1009 1010 /// ditto 1011 @property bool immediate() { return !!getOption!int(ZMQ_IMMEDIATE); } 1012 /// ditto 1013 @property void immediate(bool value) { setOption!int(ZMQ_IMMEDIATE, value ? 1 : 0); } 1014 1015 /// ditto 1016 @property void xpubVerbose(bool value) { setOption(ZMQ_XPUB_VERBOSE, value ? 1 : 0); } 1017 1018 /// ditto 1019 @property bool ipv6() { return !!getOption!int(ZMQ_IPV6); } 1020 /// ditto 1021 @property void ipv6(bool value) { setOption(ZMQ_IPV6, value ? 1 : 0); } 1022 1023 /// ditto 1024 @property Security mechanism() 1025 { 1026 import std.conv; 1027 return to!Security(getOption!int(ZMQ_MECHANISM)); 1028 } 1029 1030 /// ditto 1031 @property bool plainServer() { return !!getOption!int(ZMQ_PLAIN_SERVER); } 1032 /// ditto 1033 @property void plainServer(bool value) { setOption(ZMQ_PLAIN_SERVER, value ? 1 : 0); } 1034 1035 /// ditto 1036 char[] getPlainUsername(char[] dest) 1037 { 1038 return getCStringOption(ZMQ_PLAIN_USERNAME, dest); 1039 } 1040 /// ditto 1041 @property void plainUsername(const(char)[] value) 1042 { 1043 setArrayOption(ZMQ_PLAIN_USERNAME, value); 1044 } 1045 1046 /// ditto 1047 char[] getPlainPassword(char[] dest) 1048 { 1049 return getCStringOption(ZMQ_PLAIN_PASSWORD, dest); 1050 } 1051 /// ditto 1052 @property void plainPassword(const(char)[] value) 1053 { 1054 setArrayOption(ZMQ_PLAIN_PASSWORD, value); 1055 } 1056 1057 /// ditto 1058 @property bool curveServer() { return !!getOption!int(ZMQ_CURVE_SERVER); } 1059 /// ditto 1060 @property void curveServer(bool value) { setOption(ZMQ_CURVE_SERVER, value ? 1 : 0); } 1061 1062 /// ditto 1063 @property ubyte[] curvePublicKey() 1064 { 1065 return getCurvePublicKey(new ubyte[keyBufSizeBin]); 1066 } 1067 /// ditto 1068 ubyte[] getCurvePublicKey(ubyte[] dest) 1069 { 1070 return getCurveKey(ZMQ_CURVE_PUBLICKEY, dest); 1071 } 1072 /// ditto 1073 @property char[] curvePublicKeyZ85() 1074 { 1075 return getCurvePublicKeyZ85(new char[keyBufSizeZ85]); 1076 } 1077 /// ditto 1078 char[] getCurvePublicKeyZ85(char[] dest) 1079 { 1080 return getCurveKeyZ85(ZMQ_CURVE_PUBLICKEY, dest); 1081 } 1082 /// ditto 1083 @property void curvePublicKey(const(ubyte)[] value) 1084 { 1085 setCurveKey(ZMQ_CURVE_PUBLICKEY, value); 1086 } 1087 /// ditto 1088 @property void curvePublicKeyZ85(const(char)[] value) 1089 { 1090 setCurveKeyZ85(ZMQ_CURVE_PUBLICKEY, value); 1091 } 1092 1093 /// ditto 1094 @property ubyte[] curveSecretKey() 1095 { 1096 return getCurveSecretKey(new ubyte[keyBufSizeBin]); 1097 } 1098 /// ditto 1099 ubyte[] getCurveSecretKey(ubyte[] dest) 1100 { 1101 return getCurveKey(ZMQ_CURVE_SECRETKEY, dest); 1102 } 1103 /// ditto 1104 @property char[] curveSecretKeyZ85() 1105 { 1106 return getCurveSecretKeyZ85(new char[keyBufSizeZ85]); 1107 } 1108 /// ditto 1109 char[] getCurveSecretKeyZ85(char[] dest) 1110 { 1111 return getCurveKeyZ85(ZMQ_CURVE_SECRETKEY, dest); 1112 } 1113 /// ditto 1114 @property void curveSecretKey(const(ubyte)[] value) 1115 { 1116 setCurveKey(ZMQ_CURVE_SECRETKEY, value); 1117 } 1118 /// ditto 1119 @property void curveSecretKeyZ85(const(char)[] value) 1120 { 1121 setCurveKeyZ85(ZMQ_CURVE_SECRETKEY, value); 1122 } 1123 1124 /// ditto 1125 @property ubyte[] curveServerKey() 1126 { 1127 return getCurveServerKey(new ubyte[keyBufSizeBin]); 1128 } 1129 /// ditto 1130 ubyte[] getCurveServerKey(ubyte[] dest) 1131 { 1132 return getCurveKey(ZMQ_CURVE_SERVERKEY, dest); 1133 } 1134 /// ditto 1135 @property char[] curveServerKeyZ85() 1136 { 1137 return getCurveServerKeyZ85(new char[keyBufSizeZ85]); 1138 } 1139 /// ditto 1140 char[] getCurveServerKeyZ85(char[] dest) 1141 { 1142 return getCurveKeyZ85(ZMQ_CURVE_SERVERKEY, dest); 1143 } 1144 /// ditto 1145 @property void curveServerKey(const(ubyte)[] value) 1146 { 1147 setCurveKey(ZMQ_CURVE_SERVERKEY, value); 1148 } 1149 /// ditto 1150 @property void curveServerKeyZ85(const(char)[] value) 1151 { 1152 setCurveKeyZ85(ZMQ_CURVE_SERVERKEY, value); 1153 } 1154 1155 /// ditto 1156 @property void probeRouter(bool value) { setOption(ZMQ_PROBE_ROUTER, value ? 1 : 0); } 1157 1158 /// ditto 1159 @property void reqCorrelate(bool value) { setOption(ZMQ_REQ_CORRELATE, value ? 1 : 0); } 1160 1161 /// ditto 1162 @property void reqRelaxed(bool value) { setOption(ZMQ_REQ_RELAXED, value ? 1 : 0); } 1163 1164 /// ditto 1165 @property void conflate(bool value) { setOption(ZMQ_CONFLATE, value ? 1 : 0); } 1166 1167 /// ditto 1168 @property char[] zapDomain() { return getZapDomain(new char[256]); } 1169 /// ditto 1170 char[] getZapDomain(char[] dest) { return getCStringOption(ZMQ_ZAP_DOMAIN, dest); } 1171 /// ditto 1172 @property void zapDomain(const char[] value) { setArrayOption(ZMQ_ZAP_DOMAIN, value); } 1173 1174 static if (ZMQ_VERSION >= ZMQ41) { 1175 /// ditto 1176 @property void routerHandover(bool value) { setOption(ZMQ_ROUTER_HANDOVER, value ? 1 : 0); } 1177 1178 /// ditto 1179 @property void connectionRID(const ubyte[] value) { setArrayOption(ZMQ_CONNECT_RID, value); } 1180 1181 /// ditto 1182 @property int typeOfService() { return getOption!int(ZMQ_TOS); } 1183 /// ditto 1184 @property void typeOfService(int value) { setOption(ZMQ_TOS, value); } 1185 1186 /// ditto 1187 @property bool gssapiPlaintext() { return !!getOption!int(ZMQ_GSSAPI_PLAINTEXT); } 1188 /// ditto 1189 @property void gssapiPlaintext(bool value) { setOption(ZMQ_GSSAPI_PLAINTEXT, value ? 1 : 0); } 1190 1191 /// ditto 1192 @property char[] gssapiPrincipal() { return getGssapiPrincipal(new char[256]); } 1193 /// ditto 1194 char[] getGssapiPrincipal(char[] dest) { return getCStringOption(ZMQ_GSSAPI_PRINCIPAL, dest); } 1195 /// ditto 1196 @property void gssapiPrincipal(const char[] value) { setArrayOption(ZMQ_GSSAPI_PRINCIPAL, value); } 1197 1198 /// ditto 1199 @property bool gssapiServer() { return !!getOption!int(ZMQ_GSSAPI_SERVER); } 1200 /// ditto 1201 @property void gssapiServer(bool value) { setOption(ZMQ_GSSAPI_SERVER, value ? 1 : 0); } 1202 1203 /// ditto 1204 @property char[] gssapiServicePrincipal() { return getGssapiServicePrincipal(new char[256]); } 1205 /// ditto 1206 char[] getGssapiServicePrincipal(char[] dest) { return getCStringOption(ZMQ_GSSAPI_SERVICE_PRINCIPAL, dest); } 1207 /// ditto 1208 @property void gssapiServicePrincipal(const char[] value) { setArrayOption(ZMQ_GSSAPI_SERVICE_PRINCIPAL, value); } 1209 1210 /// ditto 1211 @property Duration handshakeInterval() 1212 { 1213 const auto value = getOption!int(ZMQ_HANDSHAKE_IVL); 1214 return value == 0 ? infiniteDuration : msecs(value); 1215 } 1216 /// ditto 1217 @property void handshakeInterval(Duration value) 1218 { 1219 import std.conv: to; 1220 setOption( 1221 ZMQ_HANDSHAKE_IVL, 1222 value == infiniteDuration ? 0 : to!int(value.total!"msecs"())); 1223 } 1224 } // static if (ZMQ_VERSION >= ZMQ41) 1225 1226 // deprecated options 1227 deprecated("Use the 'immediate' property instead") 1228 @property bool delayAttachOnConnect() { return !!getOption!int(ZMQ_DELAY_ATTACH_ON_CONNECT); } 1229 1230 deprecated("Use the 'immediate' property instead") 1231 @property void delayAttachOnConnect(bool value) { setOption(ZMQ_DELAY_ATTACH_ON_CONNECT, value ? 1 : 0); } 1232 1233 deprecated("Use !ipv6 instead") 1234 @property bool ipv4Only() { return !ipv6; } 1235 1236 deprecated("Use ipv6 = !value instead") 1237 @property void ipv4Only(bool value) { ipv6 = !value; } 1238 1239 unittest 1240 { 1241 // We test all the socket options by checking that they have their default value. 1242 auto s = Socket(SocketType.xpub); 1243 const e = "inproc://unittest2"; 1244 s.bind(e); 1245 import core.time; 1246 assert(s.threadAffinity == 0); 1247 assert(s.identity == null); 1248 assert(s.rate == 100); 1249 assert(s.recoveryInterval == 10.seconds); 1250 assert(s.sendBufferSize == 0); 1251 assert(s.receiveBufferSize == 0); 1252 version(Posix) { 1253 assert(s.fd > 2); // 0, 1 and 2 are the standard streams 1254 } 1255 assert(s.events == PollFlags.pollOut); 1256 assert(s.type == SocketType.xpub); 1257 assert(s.linger == 0.hnsecs); 1258 assert(s.reconnectionInterval == 100.msecs); 1259 assert(s.backlog == 100); 1260 assert(s.maxReconnectionInterval == Duration.zero); 1261 assert(s.maxMsgSize == -1); 1262 assert(s.sendHWM == 1000); 1263 assert(s.receiveHWM == 1000); 1264 assert(s.multicastHops == 1); 1265 assert(s.receiveTimeout == infiniteDuration); 1266 assert(s.sendTimeout == infiniteDuration); 1267 assert(s.tcpKeepalive == -1); 1268 assert(s.tcpKeepaliveCnt == -1); 1269 assert(s.tcpKeepaliveIdle == -1); 1270 assert(s.tcpKeepaliveIntvl == -1); 1271 assert(!s.immediate); 1272 assert(!s.ipv6); 1273 assert(s.mechanism == Security.none); 1274 assert(!s.plainServer); 1275 assert(s.getPlainUsername(new char[8]).length == 0); 1276 assert(s.getPlainPassword(new char[8]).length == 0); 1277 debug (WithCurveTests) { 1278 assert(!s.curveServer); 1279 } 1280 assert(s.zapDomain.length == 0); 1281 static if (ZMQ_VERSION >= ZMQ41) { 1282 assert(s.typeOfService == 0); 1283 if (zmqHas("gssapi")) { 1284 assert(!s.gssapiPlaintext); 1285 assert(s.gssapiPrincipal.length == 0); 1286 assert(!s.gssapiServer); 1287 assert(s.gssapiServicePrincipal.length == 0); 1288 } 1289 assert(s.handshakeInterval == 30000.msecs); 1290 } 1291 1292 // Test setters and getters together 1293 s.threadAffinity = 1; 1294 assert(s.threadAffinity == 1); 1295 s.identity = cast(ubyte[]) [ 65, 66, 67 ]; 1296 assert(s.identity == [65, 66, 67]); 1297 s.identity = "foo"; 1298 assert(s.identity == [102, 111, 111]); 1299 s.rate = 200; 1300 assert(s.rate == 200); 1301 s.recoveryInterval = 5.seconds; 1302 assert(s.recoveryInterval == 5_000.msecs); 1303 s.sendBufferSize = 500; 1304 assert(s.sendBufferSize == 500); 1305 s.receiveBufferSize = 600; 1306 assert(s.receiveBufferSize == 600); 1307 s.linger = Duration.zero; 1308 assert(s.linger == Duration.zero); 1309 s.linger = 100_000.usecs; 1310 assert(s.linger == 100.msecs); 1311 s.linger = infiniteDuration; 1312 assert(s.linger == infiniteDuration); 1313 s.reconnectionInterval = 200_000.usecs; 1314 assert(s.reconnectionInterval == 200.msecs); 1315 s.backlog = 50; 1316 assert(s.backlog == 50); 1317 s.maxReconnectionInterval = 300_000.usecs; 1318 assert(s.maxReconnectionInterval == 300.msecs); 1319 s.maxMsgSize = 1000; 1320 assert(s.maxMsgSize == 1000); 1321 s.sendHWM = 500; 1322 assert(s.sendHWM == 500); 1323 s.receiveHWM = 600; 1324 assert(s.receiveHWM == 600); 1325 s.multicastHops = 2; 1326 assert(s.multicastHops == 2); 1327 s.receiveTimeout = 3.seconds; 1328 assert(s.receiveTimeout == 3_000_000.usecs); 1329 s.receiveTimeout = infiniteDuration; 1330 assert(s.receiveTimeout == infiniteDuration); 1331 s.sendTimeout = 2_000_000.usecs; 1332 assert(s.sendTimeout == 2.seconds); 1333 s.sendTimeout = infiniteDuration; 1334 assert(s.sendTimeout == infiniteDuration); 1335 s.tcpKeepalive = 1; 1336 assert(s.tcpKeepalive == 1); 1337 s.tcpKeepaliveCnt = 1; 1338 assert(s.tcpKeepaliveCnt == 1); 1339 s.tcpKeepaliveIdle = 0; 1340 assert(s.tcpKeepaliveIdle == 0); 1341 s.tcpKeepaliveIntvl = 0; 1342 assert(s.tcpKeepaliveIntvl == 0); 1343 s.immediate = true; 1344 assert(s.immediate); 1345 s.ipv6 = true; 1346 assert(s.ipv6); 1347 s.plainServer = true; 1348 assert(s.mechanism == Security.plain); 1349 assert(s.plainServer); 1350 debug (WithCurveTests) { 1351 assert(!s.curveServer); 1352 s.curveServer = true; 1353 assert(s.mechanism == Security.curve); 1354 assert(!s.plainServer); 1355 assert(s.curveServer); 1356 } 1357 s.plainServer = false; 1358 assert(s.mechanism == Security.none); 1359 assert(!s.plainServer); 1360 debug (WithCurveTests) assert(!s.curveServer); 1361 s.plainUsername = "foobar"; 1362 assert(s.getPlainUsername(new char[8]) == "foobar"); 1363 assert(s.mechanism == Security.plain); 1364 s.plainUsername = null; 1365 assert(s.mechanism == Security.none); 1366 s.plainPassword = "xyz"; 1367 assert(s.getPlainPassword(new char[8]) == "xyz"); 1368 assert(s.mechanism == Security.plain); 1369 s.plainPassword = null; 1370 assert(s.mechanism == Security.none); 1371 s.zapDomain = "my_zap_domain"; 1372 assert(s.zapDomain == "my_zap_domain"); 1373 static if (ZMQ_VERSION >= ZMQ41) { 1374 s.typeOfService = 1; 1375 assert(s.typeOfService == 1); 1376 if (zmqHas("gssapi")) { 1377 s.gssapiPlaintext = true; 1378 assert(s.gssapiPlaintext); 1379 s.gssapiPrincipal = "myPrincipal"; 1380 assert(s.gssapiPrincipal == "myPrincipal"); 1381 s.gssapiServer = true; 1382 assert(s.gssapiServer); 1383 s.gssapiServicePrincipal = "myServicePrincipal"; 1384 assert(s.gssapiServicePrincipal == "myServicePrincipal"); 1385 } 1386 s.handshakeInterval = 60000.msecs; 1387 assert(s.handshakeInterval == 60000.msecs); 1388 } 1389 1390 // Test write-only options 1391 s.conflate = true; 1392 } 1393 1394 debug (WithCurveTests) @system unittest 1395 { 1396 // The CURVE key options require some special setup, so we test them 1397 // separately. 1398 import std.array, std.range; 1399 auto binKey1 = iota(cast(ubyte) 0, cast(ubyte) 32).array(); 1400 auto z85Key1 = z85Encode(binKey1); 1401 auto binKey2 = iota(cast(ubyte) 32, cast(ubyte) 64).array(); 1402 auto z85Key2 = z85Encode(binKey2); 1403 auto zeroKey = repeat(cast(ubyte) 0).take(32).array(); 1404 assert (z85Key1 != z85Key2); 1405 1406 auto s = Socket(SocketType.req); 1407 s.curvePublicKey = zeroKey; 1408 s.curveSecretKey = zeroKey; 1409 s.curveServerKey = zeroKey; 1410 1411 s.curvePublicKey = binKey1; 1412 assert (s.curvePublicKey == binKey1); 1413 assert (s.curvePublicKeyZ85 == z85Key1); 1414 s.curvePublicKeyZ85 = z85Key2; 1415 assert (s.curvePublicKey == binKey2); 1416 assert (s.curvePublicKeyZ85 == z85Key2); 1417 assert (s.curveSecretKey == zeroKey); 1418 assert (s.curveServerKey == zeroKey); 1419 s.curvePublicKey = zeroKey; 1420 1421 s.curveSecretKey = binKey1; 1422 assert (s.curveSecretKey == binKey1); 1423 assert (s.curveSecretKeyZ85 == z85Key1); 1424 s.curveSecretKeyZ85 = z85Key2; 1425 assert (s.curveSecretKey == binKey2); 1426 assert (s.curveSecretKeyZ85 == z85Key2); 1427 assert (s.curvePublicKey == zeroKey); 1428 assert (s.curveServerKey == zeroKey); 1429 s.curveSecretKey = zeroKey; 1430 1431 s.curveServerKey = binKey1; 1432 assert (s.curveServerKey == binKey1); 1433 assert (s.curveServerKeyZ85 == z85Key1); 1434 s.curveServerKeyZ85 = z85Key2; 1435 assert (s.curveServerKey == binKey2); 1436 assert (s.curveServerKeyZ85 == z85Key2); 1437 assert (s.curvePublicKey == zeroKey); 1438 assert (s.curveSecretKey == zeroKey); 1439 s.curveServerKey = zeroKey; 1440 } 1441 1442 unittest 1443 { 1444 // Some options are only applicable to specific socket types. 1445 auto rt = Socket(SocketType.router); 1446 rt.routerMandatory = true; 1447 rt.probeRouter = true; 1448 static if (ZMQ_VERSION >= ZMQ41) { 1449 rt.routerHandover = true; 1450 rt.connectionRID = cast(ubyte[]) [ 10, 20, 30 ]; 1451 } 1452 auto xp = Socket(SocketType.xpub); 1453 xp.xpubVerbose = true; 1454 auto rq = Socket(SocketType.req); 1455 rq.reqCorrelate = true; 1456 rq.reqRelaxed = true; 1457 } 1458 1459 deprecated unittest 1460 { 1461 // Test deprecated socket options 1462 auto s = Socket(SocketType.req); 1463 assert(s.ipv4Only); 1464 assert(!s.delayAttachOnConnect); 1465 1466 // Test setters and getters together 1467 s.ipv4Only = false; 1468 assert(!s.ipv4Only); 1469 s.delayAttachOnConnect = true; 1470 assert(s.delayAttachOnConnect); 1471 } 1472 1473 /** 1474 Establishes a message filter. 1475 1476 Throws: 1477 $(REF ZmqException) if $(ZMQ) reports an error. 1478 Corresponds_to: 1479 $(ZMQREF zmq_msg_setsockopt()) with $(D ZMQ_SUBSCRIBE). 1480 */ 1481 void subscribe(const ubyte[] filterPrefix) 1482 { 1483 setArrayOption(ZMQ_SUBSCRIBE, filterPrefix); 1484 } 1485 /// ditto 1486 void subscribe(const char[] filterPrefix) 1487 { 1488 setArrayOption(ZMQ_SUBSCRIBE, filterPrefix); 1489 } 1490 1491 /// 1492 unittest 1493 { 1494 // Create a subscriber that accepts all messages that start with 1495 // the prefixes "foo" or "bar". 1496 auto sck = Socket(SocketType.sub); 1497 sck.subscribe("foo"); 1498 sck.subscribe("bar"); 1499 } 1500 1501 @system unittest 1502 { 1503 void sleep(int ms) { 1504 import core.thread, core.time; 1505 Thread.sleep(dur!"msecs"(ms)); 1506 } 1507 auto pub = Socket(SocketType.pub); 1508 pub.bind("inproc://zmqd_subscribe_unittest"); 1509 auto sub = Socket(SocketType.sub); 1510 sub.connect("inproc://zmqd_subscribe_unittest"); 1511 1512 pub.send("Hello"); 1513 sleep(100); 1514 sub.subscribe("He"); 1515 sub.subscribe(cast(ubyte[])['W', 'o']); 1516 sleep(100); 1517 pub.send("Heeee"); 1518 pub.send("World"); 1519 sleep(100); 1520 ubyte[5] buf; 1521 sub.receive(buf); 1522 assert(buf.asString() == "Heeee"); 1523 sub.receive(buf); 1524 assert(buf.asString() == "World"); 1525 } 1526 1527 /** 1528 Removes a message filter. 1529 1530 Throws: 1531 $(REF ZmqException) if $(ZMQ) reports an error. 1532 Corresponds_to: 1533 $(ZMQREF zmq_msg_setsockopt()) with $(D ZMQ_SUBSCRIBE). 1534 */ 1535 void unsubscribe(const ubyte[] filterPrefix) 1536 { 1537 setArrayOption(ZMQ_UNSUBSCRIBE, filterPrefix); 1538 } 1539 /// ditto 1540 void unsubscribe(const char[] filterPrefix) 1541 { 1542 setArrayOption(ZMQ_UNSUBSCRIBE, filterPrefix); 1543 } 1544 1545 /// 1546 unittest 1547 { 1548 // Subscribe to messages that start with "foo" or "bar". 1549 auto sck = Socket(SocketType.sub); 1550 sck.subscribe("foo"); 1551 sck.subscribe("bar"); 1552 // ... 1553 // From now on, only accept messages that start with "bar" 1554 sck.unsubscribe("foo"); 1555 } 1556 1557 /** 1558 Spawns a PAIR socket that publishes socket state changes (_events) over 1559 the INPROC transport to the given _endpoint. 1560 1561 Which event types should be published may be selected by bitwise-ORing 1562 together different $(REF EventType) flags in the $(D events) parameter. 1563 1564 Throws: 1565 $(REF ZmqException) if $(ZMQ) reports an error. 1566 Corresponds_to: 1567 $(ZMQREF zmq_socket_monitor()) 1568 See_also: 1569 $(FREF receiveEvent), which receives and parses event messages. 1570 */ 1571 void monitor(const char[] endpoint, EventType events = EventType.all) 1572 { 1573 if (trusted!zmq_socket_monitor(m_socket, zeroTermString(endpoint), events) < 0) { 1574 throw new ZmqException; 1575 } 1576 } 1577 1578 /// 1579 unittest 1580 { 1581 auto sck = Socket(SocketType.pub); 1582 sck.monitor("inproc://zmqd_monitor_unittest", 1583 EventType.accepted | EventType.closed); 1584 } 1585 1586 /** 1587 The $(D void*) pointer used by the underlying C API to refer to the socket. 1588 1589 If the object has not been initialized, this function returns $(D null). 1590 */ 1591 @property inout(void)* handle() inout pure nothrow 1592 { 1593 return m_socket; 1594 } 1595 1596 /** 1597 Whether this $(REF Socket) object has been _initialized, i.e. whether it 1598 refers to a valid $(ZMQ) socket. 1599 */ 1600 @property bool initialized() const pure nothrow 1601 { 1602 return m_socket != null; 1603 } 1604 1605 /// 1606 unittest 1607 { 1608 Socket sck; 1609 assert (!sck.initialized); 1610 sck = Socket(SocketType.sub); 1611 assert (sck.initialized); 1612 sck.close(); 1613 assert (!sck.initialized); 1614 } 1615 1616 private: 1617 // Helper function for ~this() and close() 1618 bool nothrowClose() nothrow 1619 { 1620 if (m_socket != null) { 1621 if (trusted!zmq_close(m_socket) != 0) return false; 1622 m_socket = null; 1623 } 1624 return true; 1625 } 1626 1627 import std.traits; 1628 1629 T getOption(T)(int option) @trusted 1630 if (isScalarType!T) 1631 { 1632 T buf; 1633 auto len = T.sizeof; 1634 if (zmq_getsockopt(m_socket, option, &buf, &len) != 0) { 1635 throw new ZmqException; 1636 } 1637 assert(len == T.sizeof); 1638 return buf; 1639 } 1640 1641 void setOption(T)(int option, T value) @trusted 1642 if (isScalarType!T) 1643 { 1644 if (zmq_setsockopt(m_socket, option, &value, value.sizeof) != 0) { 1645 throw new ZmqException; 1646 } 1647 } 1648 1649 T[] getArrayOption(T)(int option, T[] buf) @trusted 1650 if (isScalarType!T) 1651 { 1652 static assert (T.sizeof == 1); 1653 auto len = buf.length; 1654 if (zmq_getsockopt(m_socket, option, buf.ptr, &len) != 0) { 1655 throw new ZmqException; 1656 } 1657 return buf[0 .. len]; 1658 } 1659 1660 void setArrayOption()(int option, const void[] value) 1661 { 1662 if (trusted!zmq_setsockopt(m_socket, option, value.ptr, value.length) != 0) { 1663 throw new ZmqException; 1664 } 1665 } 1666 1667 char[] getCStringOption(int option, char[] buf) 1668 { 1669 auto ret = getArrayOption(option, buf); 1670 assert (ret.length && ret[$-1] == '\0'); 1671 return ret[0 .. $-1]; 1672 } 1673 1674 enum : size_t 1675 { 1676 keySizeBin = 32, 1677 keyBufSizeBin = keySizeBin, 1678 keySizeZ85 = 40, 1679 keyBufSizeZ85 = keySizeZ85 + 1, 1680 } 1681 1682 ubyte[] getCurveKey(int option, ubyte[] buf) 1683 { 1684 if (buf.length < keyBufSizeBin) { 1685 import core.exception: RangeError; 1686 throw new RangeError; 1687 } 1688 return getArrayOption(option, buf[0 .. keyBufSizeBin]); 1689 } 1690 1691 char[] getCurveKeyZ85(int option, char[] buf) 1692 { 1693 if (buf.length < keyBufSizeZ85) { 1694 import core.exception: RangeError; 1695 throw new RangeError; 1696 } 1697 return getCStringOption(option, buf[0 .. keyBufSizeZ85]); 1698 } 1699 1700 void setCurveKey(int option, const ubyte[] value) 1701 { 1702 if (value.length != keySizeBin) throw new Exception("Invalid key size"); 1703 setArrayOption(option, value); 1704 } 1705 1706 void setCurveKeyZ85(int option, const char[] value) 1707 { 1708 if (value.length != keySizeZ85) throw new Exception("Invalid key size"); 1709 setArrayOption(option, value); 1710 } 1711 1712 Context m_context; 1713 SocketType m_type; 1714 void* m_socket; 1715 } 1716 1717 unittest 1718 { 1719 auto s1 = Socket(SocketType.pair); 1720 auto s2 = Socket(SocketType.pair); 1721 s1.bind("inproc://unittest"); 1722 s2.connect("inproc://unittest"); 1723 s1.send("Hello World!"); 1724 ubyte[12] buf; 1725 const len = s2.receive(buf[]); 1726 assert (len == 12); 1727 assert (buf == "Hello World!"); 1728 } 1729 1730 1731 version (Windows) { 1732 alias PlatformFD = SOCKET; 1733 } else version (Posix) { 1734 alias PlatformFD = int; 1735 } 1736 1737 /** 1738 The native socket file descriptor type. 1739 1740 This is an alias for $(D SOCKET) on Windows and $(D int) on POSIX systems. 1741 */ 1742 alias FD = PlatformFD; 1743 1744 1745 /** 1746 Starts the built-in $(ZMQ) _proxy. 1747 1748 This function never returns normally, but it may throw an exception. This could 1749 happen if the context associated with either of the specified sockets is 1750 manually destroyed in a different thread. 1751 1752 Throws: 1753 $(REF ZmqException) if $(ZMQ) reports an error. 1754 Corresponds_to: 1755 $(ZMQREF zmq_proxy()) 1756 See_Also: 1757 $(FREF steerableProxy) 1758 */ 1759 void proxy(ref Socket frontend, ref Socket backend) 1760 { 1761 const rc = trusted!zmq_proxy(frontend.handle, backend.handle, null); 1762 assert (rc == -1); 1763 throw new ZmqException; 1764 } 1765 1766 /// ditto 1767 void proxy(ref Socket frontend, ref Socket backend, ref Socket capture) 1768 { 1769 const rc = trusted!zmq_proxy(frontend.handle, backend.handle, capture.handle); 1770 assert (rc == -1); 1771 throw new ZmqException; 1772 } 1773 1774 1775 static if (ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 0, 5)) { 1776 1777 /** 1778 Starts the built-in $(ZMQ) proxy with _control flow. 1779 1780 Note that the order of the two last parameters is reversed compared to 1781 $(ZMQREF zmq_proxy_steerable()). That is, the $(D control) socket always 1782 comes before the $(D capture) socket. Furthermore, unlike in $(ZMQ), 1783 $(D control) is mandatory. (Without the _control socket one can simply 1784 use $(FREF proxy).) 1785 1786 Versions: 1787 This function is only available when using the bindings for ZeroMQ 1788 4.0.5 or later. 1789 Throws: 1790 $(REF ZmqException) if $(ZMQ) reports an error. 1791 Corresponds_to: 1792 $(ZMQREF zmq_proxy_steerable()) 1793 See_Also: 1794 $(FREF proxy) 1795 */ 1796 void steerableProxy(ref Socket frontend, ref Socket backend, ref Socket control) 1797 { 1798 const rc = trusted!zmq_proxy_steerable( 1799 frontend.handle, backend.handle, null, control.handle); 1800 if (rc == -1) throw new ZmqException; 1801 } 1802 1803 /// ditto 1804 void steerableProxy(ref Socket frontend, ref Socket backend, ref Socket control, ref Socket capture) 1805 { 1806 const rc = trusted!zmq_proxy_steerable( 1807 frontend.handle, backend.handle, capture.handle, control.handle); 1808 if (rc == -1) throw new ZmqException; 1809 } 1810 1811 @system unittest 1812 { 1813 import core.thread; 1814 auto t = new Thread(() { 1815 auto frontend = Socket(SocketType.router); 1816 frontend.bind("inproc://zmqd_steerableProxy_unittest_fe"); 1817 auto backend = Socket(SocketType.dealer); 1818 backend.bind("inproc://zmqd_steerableProxy_unittest_be"); 1819 auto controllee = Socket(SocketType.pair); 1820 controllee.bind("inproc://zmqd_steerableProxy_unittest_ctl"); 1821 steerableProxy(frontend, backend, controllee); 1822 }); 1823 t.start(); 1824 auto client = Socket(SocketType.req); 1825 client.connect("inproc://zmqd_steerableProxy_unittest_fe"); 1826 auto server = Socket(SocketType.rep); 1827 server.connect("inproc://zmqd_steerableProxy_unittest_be"); 1828 auto controller = Socket(SocketType.pair); 1829 controller.connect("inproc://zmqd_steerableProxy_unittest_ctl"); 1830 1831 auto cf = Frame(1); 1832 cf.data[0] = 86; 1833 client.send(cf); 1834 auto sf = Frame(); 1835 server.receive(sf); 1836 assert(sf.size == 1 && sf.data[0] == 86); 1837 sf.data[0] = 87; 1838 server.send(sf); 1839 client.receive(cf); 1840 assert(cf.size == 1 && cf.data[0] == 87); 1841 1842 controller.send("TERMINATE"); 1843 t.join(); 1844 } 1845 1846 @system unittest 1847 { 1848 import core.thread; 1849 auto t = new Thread(() { 1850 auto frontend = Socket(SocketType.pull); 1851 frontend.bind("inproc://zmqd_steerableProxy2_unittest_fe"); 1852 auto backend = Socket(SocketType.push); 1853 backend.bind("inproc://zmqd_steerableProxy2_unittest_be"); 1854 auto controllee = Socket(SocketType.pair); 1855 controllee.bind("inproc://zmqd_steerableProxy2_unittest_ctl"); 1856 auto capture = Socket(SocketType.push); 1857 capture.bind("inproc://zmqd_steerableProxy2_unittest_cpt"); 1858 steerableProxy(frontend, backend, controllee, capture); 1859 }); 1860 t.start(); 1861 auto client = Socket(SocketType.push); 1862 client.connect("inproc://zmqd_steerableProxy2_unittest_fe"); 1863 auto server = Socket(SocketType.pull); 1864 server.connect("inproc://zmqd_steerableProxy2_unittest_be"); 1865 auto controller = Socket(SocketType.pair); 1866 controller.connect("inproc://zmqd_steerableProxy2_unittest_ctl"); 1867 auto capturer = Socket(SocketType.pull); 1868 capturer.connect("inproc://zmqd_steerableProxy2_unittest_cpt"); 1869 1870 auto cf = Frame(1); 1871 cf.data[0] = 86; 1872 client.send(cf); 1873 auto sf = Frame(); 1874 server.receive(sf); 1875 assert(sf.size == 1 && sf.data[0] == 86); 1876 auto pf = Frame(); 1877 capturer.receive(pf); 1878 assert(pf.size == 1 && pf.data[0] == 86); 1879 1880 controller.send("TERMINATE"); 1881 t.join(); 1882 } 1883 1884 } // end static if (version >= 4.0.5) 1885 1886 1887 deprecated("zmqd.poll() has a new signature as of v0.4") 1888 uint poll(zmq_pollitem_t[] items, Duration timeout = infiniteDuration) 1889 { 1890 import std.conv: to; 1891 const n = trusted!zmq_poll( 1892 items.ptr, 1893 to!int(items.length), 1894 timeout == infiniteDuration ? -1 : to!int(timeout.total!"msecs"())); 1895 if (n < 0) throw new ZmqException; 1896 return cast(uint) n; 1897 } 1898 1899 1900 /** 1901 Input/output multiplexing. 1902 1903 The $(D timeout) parameter may have the special value $(REF infiniteDuration) 1904 which means no _timeout. This is translated to an argument value of -1 in the 1905 C API. 1906 1907 Returns: 1908 The number of $(REF PollItem) structures with events signalled in 1909 $(REF PollItem.returnedEvents), or 0 if no events have been signalled. 1910 Throws: 1911 $(REF ZmqException) if $(ZMQ) reports an error. 1912 Corresponds_to: 1913 $(ZMQREF zmq_poll()) 1914 */ 1915 uint poll(PollItem[] items, Duration timeout = infiniteDuration) @trusted 1916 { 1917 // Here we use a trick where we pretend the array of PollItems is 1918 // actually an array of zmq_pollitem_t, to avoid an unnecessary 1919 // allocation. For this to work, PollItem must have the exact 1920 // same size as zmq_pollitem_t. 1921 static assert (PollItem.sizeof == zmq_pollitem_t.sizeof); 1922 1923 import std.conv: to; 1924 const n = zmq_poll( 1925 cast(zmq_pollitem_t*) items.ptr, 1926 to!int(items.length), 1927 timeout == infiniteDuration ? -1 : to!int(timeout.total!"msecs"())); 1928 if (n < 0) throw new ZmqException; 1929 return cast(uint) n; 1930 } 1931 1932 1933 /// 1934 @system unittest 1935 { 1936 auto socket1 = zmqd.Socket(zmqd.SocketType.pull); 1937 socket1.bind("inproc://zmqd_poll_example"); 1938 1939 import std.socket; 1940 auto socket2 = new std.socket.Socket( 1941 AddressFamily.INET, 1942 std.socket.SocketType.DGRAM); 1943 socket2.bind(new InternetAddress(InternetAddress.ADDR_ANY, 5678)); 1944 1945 auto socket3 = zmqd.Socket(zmqd.SocketType.push); 1946 socket3.connect("inproc://zmqd_poll_example"); 1947 socket3.send("test"); 1948 1949 import core.thread: Thread; 1950 Thread.sleep(10.msecs); 1951 1952 auto items = [ 1953 PollItem(socket1, PollFlags.pollIn), 1954 PollItem(socket2, PollFlags.pollIn | PollFlags.pollOut), 1955 PollItem(socket3, PollFlags.pollIn), 1956 ]; 1957 1958 const n = poll(items, 100.msecs); 1959 assert (n == 2); 1960 assert (items[0].returnedEvents == PollFlags.pollIn); 1961 assert (items[1].returnedEvents == PollFlags.pollOut); 1962 assert (items[2].returnedEvents == 0); 1963 socket2.close(); 1964 } 1965 1966 1967 /** 1968 $(FREF poll) event flags. 1969 1970 These are described in the $(ZMQREF zmq_poll()) manual. 1971 */ 1972 enum PollFlags 1973 { 1974 pollIn = ZMQ_POLLIN, /// Corresponds to $(D ZMQ_POLLIN) 1975 pollOut = ZMQ_POLLOUT, /// Corresponds to $(D ZMQ_POLLOUT) 1976 pollErr = ZMQ_POLLERR, /// Corresponds to $(D ZMQ_POLLERR) 1977 } 1978 1979 1980 /++ 1981 A structure that specifies a socket to be monitored by $(FREF poll) as well 1982 as the events to poll for, and, when $(FREF poll) returns, the events that 1983 occurred. 1984 1985 Warning: 1986 $(D PollItem) objects do not store $(STDREF socket,Socket) references, 1987 only the corresponding native file descriptors. This means that the 1988 references have to be stored elsewhere, or the objects may be garbage 1989 collected, invalidating the sockets before or while $(FREF poll) executes. 1990 --- 1991 // Not OK 1992 auto p1 = PollItem(new std.socket.Socket(/*...*/), PollFlags.pollIn); 1993 1994 // OK 1995 auto s = new std.socket.Socket(/*...*/); 1996 auto p2 = PollItem(s, PollFlags.pollIn); 1997 --- 1998 Corresponds_to: 1999 $(D $(ZMQAPI zmq_poll,zmq_pollitem_t)) 2000 +/ 2001 struct PollItem 2002 { 2003 /// Constructs a $(REF PollItem) for monitoring a $(ZMQ) socket. 2004 this(ref zmqd.Socket socket, PollFlags events) nothrow 2005 { 2006 m_pollItem = zmq_pollitem_t(socket.handle, 0, cast(short) events, 0); 2007 } 2008 2009 import std.socket; 2010 /** 2011 Constructs a $(REF PollItem) for monitoring a standard socket referenced 2012 by a $(STDREF socket,Socket). 2013 */ 2014 this(std.socket.Socket socket, PollFlags events) @system 2015 { 2016 this(socket.handle, events); 2017 } 2018 2019 /** 2020 Constructs a $(REF PollItem) for monitoring a standard socket referenced 2021 by a native file descriptor. 2022 */ 2023 this(FD fd, PollFlags events) pure nothrow 2024 { 2025 m_pollItem = zmq_pollitem_t(null, fd, cast(short) events, 0); 2026 } 2027 2028 /** 2029 Requested _events. 2030 2031 Corresponds_to: 2032 $(D $(ZMQAPI zmq_poll,zmq_pollitem_t.events)) 2033 */ 2034 @property void requestedEvents(PollFlags events) pure nothrow 2035 { 2036 m_pollItem.events = cast(short) events; 2037 } 2038 2039 /// ditto 2040 @property PollFlags requestedEvents() const pure nothrow 2041 { 2042 return cast(typeof(return)) m_pollItem.events; 2043 } 2044 2045 /** 2046 Returned events. 2047 2048 Corresponds_to: 2049 $(D $(ZMQAPI zmq_poll,zmq_pollitem_t.revents)) 2050 */ 2051 @property PollFlags returnedEvents() const pure nothrow 2052 { 2053 return cast(typeof(return)) m_pollItem.revents; 2054 } 2055 2056 private: 2057 zmq_pollitem_t m_pollItem; 2058 } 2059 2060 2061 /** 2062 An object that encapsulates a $(ZMQ) message frame. 2063 2064 This $(D struct) is a wrapper around a $(D zmq_msg_t) object. 2065 A default-initialized $(D Frame) is not a valid $(ZMQ) message frame; it 2066 should always be explicitly initialized upon construction using 2067 $(FREF _Frame.opCall). Alternatively, it may be initialized later with 2068 $(FREF _Frame.rebuild). 2069 --- 2070 Frame msg1; // Invalid frame 2071 auto msg2 = Frame(); // Empty frame 2072 auto msg3 = Frame(1024); // 1K frame 2073 msg1.rebuild(2048); // msg1 now has size 2K 2074 msg2.rebuild(2048); // ...and so does msg2 2075 --- 2076 When a $(D Frame) object is destroyed, $(ZMQREF zmq_msg_close()) is 2077 called on the underlying $(D zmq_msg_t). 2078 2079 A $(D Frame) cannot be copied by normal assignment; use $(FREF _Frame.copy) 2080 for this. 2081 */ 2082 struct Frame 2083 { 2084 @safe: 2085 /** 2086 Initializes an empty $(ZMQ) message frame. 2087 2088 Throws: 2089 $(REF ZmqException) if $(ZMQ) reports an error. 2090 Corresponds_to: 2091 $(ZMQREF zmq_msg_init()) 2092 */ 2093 static Frame opCall() 2094 { 2095 Frame f; 2096 f.init(); 2097 return f; 2098 } 2099 2100 /// 2101 unittest 2102 { 2103 auto msg = Frame(); 2104 assert(msg.size == 0); 2105 } 2106 2107 /** $(ANCHOR Frame.opCall_size) 2108 Initializes a $(ZMQ) message frame of the specified _size. 2109 2110 Throws: 2111 $(REF ZmqException) if $(ZMQ) reports an error. 2112 Corresponds_to: 2113 $(ZMQREF zmq_msg_init_size()) 2114 */ 2115 static Frame opCall(size_t size) 2116 { 2117 Frame m; 2118 m.init(size); 2119 return m; 2120 } 2121 2122 /// 2123 unittest 2124 { 2125 auto msg = Frame(123); 2126 assert(msg.size == 123); 2127 } 2128 2129 /** $(ANCHOR Frame.opCall_data) 2130 Initializes a $(ZMQ) message frame from a supplied buffer. 2131 2132 If $(D free) is not specified, $(D data) $(EM must) refer to a slice of 2133 memory which has been allocated by the garbage collector (typically using 2134 operator $(D new)). Ownership will then be transferred temporarily to 2135 $(ZMQ), and then transferred back to the GC when $(ZMQ) is done using the 2136 buffer. 2137 2138 For memory which is $(EM not) garbage collected, the argument $(D free) 2139 must be a pointer to a function that will release the memory, which will 2140 be called when $(ZMQ) no longer needs the buffer. $(D free) must point to 2141 a function with the following signature: 2142 --- 2143 extern(C) void f(void* d, void* h) nothrow; 2144 --- 2145 When it is called, $(D d) will be equal to $(D data.ptr), while $(D h) will 2146 be equal to $(D hint) (which is optional and null by default). 2147 2148 $(D free) is passed directly to the underlying $(ZMQ) C function, which is 2149 why it needs to be $(D extern(C)). 2150 2151 Warning: 2152 Some care must be taken when using this function, as there is no telling 2153 when, whether, or in which thread $(ZMQ) relinquishes ownership of the 2154 buffer. Client code should therefore avoid retaining any references to 2155 it, including slices that contain, overlap with or are contained in 2156 $(D data). For the "non-GC" version, client code should in general not 2157 retain slices or pointers to any memory which will be released when 2158 $(D free) is called. 2159 See_Also: 2160 $(REF Frame.FreeData) 2161 Throws: 2162 $(REF ZmqException) if $(ZMQ) reports an error. 2163 Corresponds_to: 2164 $(ZMQREF zmq_msg_init_data()) 2165 */ 2166 static Frame opCall(ubyte[] data) @system 2167 { 2168 Frame m; 2169 m.init(data); 2170 return m; 2171 } 2172 2173 /// ditto 2174 static Frame opCall(ubyte[] data, FreeData free, void* hint = null) @system 2175 { 2176 Frame m; 2177 m.init(data, free, hint); 2178 return m; 2179 } 2180 2181 /// 2182 @system unittest 2183 { 2184 // Garbage-collected memory 2185 auto buf = new ubyte[123]; 2186 auto msg = Frame(buf); 2187 assert(msg.size == buf.length); 2188 assert(msg.data.ptr == buf.ptr); 2189 } 2190 2191 /// 2192 @system unittest 2193 { 2194 // Manually managed memory 2195 import core.stdc.stdlib: malloc, free; 2196 static extern(C) void myFree(void* data, void* hint) nothrow { free(data); } 2197 auto buf = (cast(ubyte*) malloc(10))[0 .. 10]; 2198 auto msg = Frame(buf, &myFree); 2199 assert(msg.size == buf.length); 2200 assert(msg.data.ptr == buf.ptr); 2201 } 2202 2203 @system unittest 2204 { 2205 // Non-documentation unittest. Let's see if the data *actually* gets freed. 2206 ubyte buffer = 81; 2207 bool released = false; 2208 static extern(C) void myFree(void* data, void* hint) nothrow 2209 { 2210 auto typedData = cast(ubyte*) data; 2211 auto typedHint = cast(bool*) hint; 2212 assert(*typedData == 81); 2213 assert(!*typedHint); 2214 *typedData = 0; 2215 *typedHint = true; 2216 } 2217 // New scope, so the frame gets released at the end. 2218 { 2219 auto frame = Frame((&buffer)[0 .. 1], &myFree, &released); 2220 assert(frame.size == 1); 2221 assert(frame.data.ptr == &buffer); 2222 assert(buffer == 81); 2223 assert(!released); 2224 } 2225 assert(buffer == 0); 2226 assert(released); 2227 } 2228 2229 /** 2230 The function pointer type for memory-freeing callback functions passed to 2231 $(D $(LINK2 #Frame.opCall_data,Frame(ubyte[], _FreeData, void*))). 2232 */ 2233 @system alias extern(C) void function(void*, void*) nothrow FreeData; 2234 2235 /** 2236 Reinitializes the Frame object as an empty message. 2237 2238 This function will first call $(FREF Frame.close) to release the 2239 resources associated with the message frame, and then it will 2240 initialize it anew, exactly as if it were constructed with 2241 $(D $(LINK2 #Frame.opCall,Frame())). 2242 2243 Throws: 2244 $(REF ZmqException) if $(ZMQ) reports an error. 2245 Corresponds_to: 2246 $(ZMQREF zmq_msg_close()) followed by $(ZMQREF zmq_msg_init()) 2247 */ 2248 void rebuild() 2249 { 2250 close(); 2251 init(); 2252 } 2253 2254 /// 2255 unittest 2256 { 2257 auto msg = Frame(256); 2258 assert (msg.size == 256); 2259 msg.rebuild(); 2260 assert (msg.size == 0); 2261 } 2262 2263 /** 2264 Reinitializes the Frame object to a specified size. 2265 2266 This function will first call $(FREF Frame.close) to release the 2267 resources associated with the message frame, and then it will 2268 initialize it anew, exactly as if it were constructed with 2269 $(D $(LINK2 #Frame.opCall_size,Frame(size))). 2270 2271 Throws: 2272 $(REF ZmqException) if $(ZMQ) reports an error. 2273 Corresponds_to: 2274 $(ZMQREF zmq_msg_close()) followed by $(ZMQREF zmq_msg_init_size()). 2275 */ 2276 void rebuild(size_t size) 2277 { 2278 close(); 2279 init(size); 2280 } 2281 2282 /// 2283 unittest 2284 { 2285 auto msg = Frame(256); 2286 assert (msg.size == 256); 2287 msg.rebuild(1024); 2288 assert (msg.size == 1024); 2289 } 2290 2291 /** 2292 Reinitializes the Frame object from a supplied buffer. 2293 2294 This function will first call $(FREF Frame.close) to release the 2295 resources associated with the message frame, and then it will 2296 initialize it anew, exactly as if it were constructed with 2297 $(D Frame(data)) or $(D Frame(data, free, hint)). 2298 2299 Some care must be taken when using these functions. Please read the 2300 $(D $(LINK2 #Frame.opCall_data,Frame(ubyte[]))) documentation. 2301 2302 Throws: 2303 $(REF ZmqException) if $(ZMQ) reports an error. 2304 Corresponds_to: 2305 $(ZMQREF zmq_msg_close()) followed by $(ZMQREF zmq_msg_init_data()). 2306 */ 2307 void rebuild(ubyte[] data) @system 2308 { 2309 close(); 2310 init(data); 2311 } 2312 2313 /// ditto 2314 void rebuild(ubyte[] data, FreeData free, void* hint = null) @system 2315 { 2316 close(); 2317 init(data, free, hint); 2318 } 2319 2320 /// 2321 @system unittest 2322 { 2323 // Garbage-collected memory 2324 auto msg = Frame(256); 2325 assert (msg.size == 256); 2326 auto buf = new ubyte[123]; 2327 msg.rebuild(buf); 2328 assert(msg.size == buf.length); 2329 assert(msg.data.ptr == buf.ptr); 2330 } 2331 2332 /// 2333 @system unittest 2334 { 2335 // Manually managed memory 2336 import core.stdc.stdlib: malloc, free; 2337 static extern(C) void myFree(void* data, void* hint) nothrow { free(data); } 2338 2339 auto msg = Frame(256); 2340 assert (msg.size == 256); 2341 auto buf = (cast(ubyte*) malloc(10))[0 .. 10]; 2342 msg.rebuild(buf, &myFree); 2343 assert(msg.size == buf.length); 2344 assert(msg.data.ptr == buf.ptr); 2345 } 2346 2347 @disable this(this); 2348 2349 /** 2350 Releases the $(ZMQ) message frame when the $(D Frame) is destroyed. 2351 2352 This destructor never throws, which means that any errors will go 2353 undetected. If this is undesirable, call $(FREF Frame.close) before 2354 the $(D Frame) is destroyed. 2355 2356 Corresponds_to: 2357 $(ZMQREF zmq_msg_close()) 2358 */ 2359 ~this() nothrow 2360 { 2361 if (m_initialized) { 2362 immutable rc = trusted!zmq_msg_close(&m_msg); 2363 assert(rc == 0, "zmq_msg_close failed: Invalid message frame"); 2364 } 2365 } 2366 2367 /** 2368 Releases the $(ZMQ) message frame. 2369 2370 Note that the frame will be automatically released when the $(D Frame) 2371 object is destroyed, so it is often not necessary to call this method 2372 manually. 2373 2374 Throws: 2375 $(REF ZmqException) if $(ZMQ) reports an error. 2376 Corresponds_to: 2377 $(ZMQREF zmq_msg_close()) 2378 */ 2379 void close() 2380 { 2381 if (m_initialized) { 2382 if (trusted!zmq_msg_close(&m_msg) != 0) { 2383 throw new ZmqException; 2384 } 2385 m_initialized = false; 2386 } 2387 } 2388 2389 /** 2390 Copies frame content to another message frame. 2391 2392 $(D copy()) returns a new $(D Frame) object, while $(D copyTo(dest)) 2393 copies the contents of this $(D Frame) into $(D dest). $(D dest) must 2394 be a valid (i.e. initialized) $(D Frame). 2395 2396 Warning: 2397 These functions may not do what you think they do. Please refer 2398 to $(ZMQAPI zmq_msg_copy(),the $(ZMQ) manual) for details. 2399 Throws: 2400 $(REF ZmqException) if $(ZMQ) reports an error. 2401 Corresponds_to: 2402 $(ZMQREF zmq_msg_copy()) 2403 */ 2404 Frame copy() 2405 in { assert(m_initialized); } 2406 body 2407 { 2408 auto cp = Frame(); 2409 copyTo(cp); 2410 return cp; 2411 } 2412 2413 /// ditto 2414 void copyTo(ref Frame dest) 2415 in { assert(m_initialized); } 2416 body 2417 { 2418 if (trusted!zmq_msg_copy(&dest.m_msg, &m_msg) != 0) { 2419 throw new ZmqException; 2420 } 2421 } 2422 2423 /// 2424 unittest 2425 { 2426 import std.string: representation; 2427 auto msg1 = Frame(3); 2428 msg1.data[] = "foo".representation; 2429 auto msg2 = msg1.copy(); 2430 assert (msg2.data.asString() == "foo"); 2431 } 2432 2433 /** 2434 Moves frame content to another message frame. 2435 2436 $(D move()) returns a new $(D Frame) object, while $(D moveTo(dest)) 2437 moves the contents of this $(D Frame) to $(D dest). $(D dest) must 2438 be a valid (i.e. initialized) $(D Frame). 2439 2440 Throws: 2441 $(REF ZmqException) if $(ZMQ) reports an error. 2442 Corresponds_to: 2443 $(ZMQREF zmq_msg_move()) 2444 */ 2445 Frame move() 2446 in { assert(m_initialized); } 2447 body 2448 { 2449 auto m = Frame(); 2450 moveTo(m); 2451 return m; 2452 } 2453 2454 /// ditto 2455 void moveTo(ref Frame dest) 2456 in { assert(m_initialized); } 2457 body 2458 { 2459 if (trusted!zmq_msg_move(&dest.m_msg, &m_msg) != 0) { 2460 throw new ZmqException; 2461 } 2462 } 2463 2464 /// 2465 unittest 2466 { 2467 import std.string: representation; 2468 auto msg1 = Frame(3); 2469 msg1.data[] = "foo".representation; 2470 auto msg2 = msg1.move(); 2471 assert (msg1.size == 0); 2472 assert (msg2.data.asString() == "foo"); 2473 } 2474 2475 /** 2476 The message frame content _size in bytes. 2477 2478 Corresponds_to: 2479 $(ZMQREF zmq_msg_size()) 2480 */ 2481 @property size_t size() nothrow 2482 in { assert(m_initialized); } 2483 body 2484 { 2485 return trusted!zmq_msg_size(&m_msg); 2486 } 2487 2488 /// 2489 unittest 2490 { 2491 auto msg = Frame(123); 2492 assert(msg.size == 123); 2493 } 2494 2495 /** 2496 Retrieves the message frame content. 2497 2498 Corresponds_to: 2499 $(ZMQREF zmq_msg_data()) 2500 */ 2501 @property ubyte[] data() @trusted nothrow 2502 in { assert(m_initialized); } 2503 body 2504 { 2505 return (cast(ubyte*) zmq_msg_data(&m_msg))[0 .. size]; 2506 } 2507 2508 /// 2509 unittest 2510 { 2511 import std.string: representation; 2512 auto msg = Frame(3); 2513 assert(msg.data.length == 3); 2514 msg.data[] = "foo".representation; // Slice operator -> array copy. 2515 assert(msg.data.asString() == "foo"); 2516 } 2517 2518 /** 2519 Whether there are _more message frames to retrieve. 2520 2521 Corresponds_to: 2522 $(ZMQREF zmq_msg_more()) 2523 */ 2524 @property bool more() nothrow 2525 in { assert(m_initialized); } 2526 body 2527 { 2528 return !!trusted!zmq_msg_more(&m_msg); 2529 } 2530 2531 static if (ZMQ_VERSION >= ZMQ41) { 2532 /** 2533 The file descriptor of the socket the message was read from. 2534 2535 Throws: 2536 $(REF ZmqException) if $(ZMQ) reports an error. 2537 Corresponds_to: 2538 $(ZMQREF zmq_msg_get()) with $(D ZMQ_SRCFD). 2539 */ 2540 @property FD sourceFD() 2541 { 2542 return cast(FD) getProperty(ZMQ_SRCFD); 2543 } 2544 2545 /** 2546 Whether the message MAY share underlying storage with another copy. 2547 2548 Throws: 2549 $(REF ZmqException) if $(ZMQ) reports an error. 2550 Corresponds_to: 2551 $(ZMQREF zmq_msg_get()) with $(D ZMQ_SHARED). 2552 */ 2553 @property bool sharedStorage() 2554 { 2555 return !!getProperty(ZMQ_SHARED); 2556 } 2557 2558 unittest 2559 { 2560 import std.string: representation; 2561 auto msg1 = Frame(100); // Big message to avoid small-string optimisation 2562 msg1.data[0 .. 3] = "foo".representation; 2563 assert (!msg1.sharedStorage); 2564 auto msg2 = msg1.copy(); 2565 assert (msg2.sharedStorage); // Warning: The ZMQ docs only say that this MAY be true 2566 assert (msg2.data[0 .. 3].asString() == "foo"); 2567 } 2568 2569 /** 2570 Gets message _metadata. 2571 2572 $(D metadataUnsafe()) is faster than $(D metadata()) because it 2573 directly returns the array which comes from $(ZMQREF zmq_msg_gets()), 2574 whereas the latter returns a freshly GC-allocated copy of it. However, 2575 the array returned by $(D metadataUnsafe()) is owned by the underlying 2576 $(ZMQ) message and gets destroyed along with it, so care must be 2577 taken when using this function. 2578 2579 Throws: 2580 $(REF ZmqException) if $(ZMQ) reports an error. 2581 Corresponds_to: 2582 $(ZMQREF zmq_msg_gets()) 2583 */ 2584 char[] metadata(const char[] property) @trusted 2585 in { assert(m_initialized); } 2586 body 2587 { 2588 return metadataUnsafe(property).dup; 2589 } 2590 2591 /// ditto 2592 const(char)[] metadataUnsafe(const char[] property) @system 2593 in { assert(m_initialized); } 2594 body 2595 { 2596 if (auto value = zmq_msg_gets(handle, zeroTermString(property))) { 2597 import std.string: fromStringz; 2598 return fromStringz(value); 2599 } else { 2600 throw new ZmqException(); 2601 } 2602 } 2603 } // static if (ZMQ_VERSION >= ZMQ41) 2604 2605 /** 2606 A pointer to the underlying $(D zmq_msg_t). 2607 */ 2608 @property inout(zmq_msg_t)* handle() inout pure nothrow 2609 { 2610 return &m_msg; 2611 } 2612 2613 private: 2614 private void init() 2615 in { assert (!m_initialized); } 2616 out { assert (m_initialized); } 2617 body 2618 { 2619 if (trusted!zmq_msg_init(&m_msg) != 0) { 2620 throw new ZmqException; 2621 } 2622 m_initialized = true; 2623 } 2624 2625 private void init(size_t size) 2626 in { assert (!m_initialized); } 2627 out { assert (m_initialized); } 2628 body 2629 { 2630 if (trusted!zmq_msg_init_size(&m_msg, size) != 0) { 2631 throw new ZmqException; 2632 } 2633 m_initialized = true; 2634 } 2635 2636 private void init(ubyte[] data) @system 2637 in { assert (!m_initialized); } 2638 out { assert (m_initialized); } 2639 body 2640 { 2641 import core.memory; 2642 static extern(C) void zmqd_Frame_init_gcFree(void* dataPtr, void* block) nothrow 2643 { 2644 GC.removeRoot(dataPtr); 2645 GC.clrAttr(block, GC.BlkAttr.NO_MOVE); 2646 } 2647 2648 GC.addRoot(data.ptr); 2649 scope(failure) GC.removeRoot(data.ptr); 2650 2651 auto block = GC.addrOf(data.ptr); 2652 immutable movable = block && !(GC.getAttr(block) & GC.BlkAttr.NO_MOVE); 2653 GC.setAttr(block, GC.BlkAttr.NO_MOVE); 2654 scope(failure) if (movable) GC.clrAttr(block, GC.BlkAttr.NO_MOVE); 2655 2656 init(data, &zmqd_Frame_init_gcFree, block); 2657 } 2658 2659 void init(ubyte[] data, FreeData free, void* hint) @system 2660 in { assert (!m_initialized); } 2661 out { assert (m_initialized); } 2662 body 2663 { 2664 if (zmq_msg_init_data(&m_msg, data.ptr, data.length, free, hint) != 0) { 2665 throw new ZmqException; 2666 } 2667 m_initialized = true; 2668 } 2669 2670 int getProperty(int property) @trusted 2671 { 2672 const value = zmq_msg_get(&m_msg, property); 2673 if (value == -1) { 2674 throw new ZmqException; 2675 } 2676 return value; 2677 } 2678 2679 bool m_initialized; 2680 zmq_msg_t m_msg; 2681 } 2682 2683 unittest 2684 { 2685 const url = uniqueUrl("inproc"); 2686 auto s1 = Socket(SocketType.pair); 2687 auto s2 = Socket(SocketType.pair); 2688 s1.bind(url); 2689 s2.connect(url); 2690 2691 auto m1a = Frame(123); 2692 m1a.data[] = 'a'; 2693 s1.send(m1a); 2694 auto m2a = Frame(); 2695 s2.receive(m2a); 2696 assert(m2a.size == 123); 2697 foreach (e; m2a.data) assert(e == 'a'); 2698 2699 auto m1b = Frame(10); 2700 m1b.data[] = 'b'; 2701 s1.send(m1b); 2702 auto m2b = Frame(); 2703 s2.receive(m2b); 2704 assert(m2b.size == 10); 2705 foreach (e; m2b.data) assert(e == 'b'); 2706 } 2707 2708 deprecated("zmqd.Message has been renamed to zmqd.Frame") alias Message = Frame; 2709 2710 2711 /** 2712 A global context which is used by default by all sockets, unless they are 2713 explicitly constructed with a different context. 2714 2715 The $(ZMQ) Guide $(LINK2 http://zguide.zeromq.org/page:all#Getting-the-Context-Right, 2716 has the following to say) about context creation: 2717 $(QUOTE 2718 You should create and use exactly one context in your process. 2719 [$(LDOTS)] If at runtime a process has two contexts, these are 2720 like separate $(ZMQ) instances. If that's explicitly what you 2721 want, OK, but otherwise remember: $(EM Do one $(D zmq_ctx_new()) 2722 at the start of your main line code, and one $(D zmq_ctx_destroy()) 2723 at the end.) 2724 ) 2725 By using $(D defaultContext()), this is exactly what you achieve. The 2726 context is created the first time the function is called, and is 2727 automatically destroyed when the program ends. 2728 2729 This function is thread safe. 2730 2731 Throws: 2732 $(REF ZmqException) if $(ZMQ) reports an error. 2733 See_also: 2734 $(REF Context) 2735 */ 2736 Context defaultContext() @trusted 2737 { 2738 // For future reference: This is the low-lock singleton pattern. See: 2739 // http://davesdprogramming.wordpress.com/2013/05/06/low-lock-singletons/ 2740 static bool instantiated; 2741 __gshared Context ctx; 2742 if (!instantiated) { 2743 synchronized { 2744 if (!ctx.initialized) { 2745 ctx = Context(); 2746 } 2747 instantiated = true; 2748 } 2749 } 2750 return ctx; 2751 } 2752 2753 @system unittest 2754 { 2755 import core.thread; 2756 Context c1, c2; 2757 auto t = new Thread(() { c1 = defaultContext(); }); 2758 t.start(); 2759 c2 = defaultContext(); 2760 t.join(); 2761 assert(c1.handle !is null); 2762 assert(c1.handle == c2.handle); 2763 } 2764 2765 2766 /** 2767 An object that encapsulates a $(ZMQ) context. 2768 2769 In most programs, it is not necessary to use this type directly, 2770 as $(REF Socket) will use a default global context if not explicitly 2771 provided with one. See $(FREF defaultContext) for details. 2772 2773 A default-initialized $(D Context) is not a valid $(ZMQ) context; it 2774 must always be explicitly initialized with $(FREF _Context.opCall): 2775 --- 2776 Context ctx; // Not a valid context yet 2777 ctx = Context(); // ...but now it is. 2778 --- 2779 $(D Context) objects can be passed around by value, and two copies will 2780 refer to the same context. The underlying context is managed using 2781 reference counting, so that when the last copy of a $(D Context) goes 2782 out of scope, the context is automatically destroyed. The reference 2783 counting is performed in a thread safe manner, so that the same context 2784 can be shared between multiple threads. ($(ZMQ) guarantees the thread 2785 safety of other context operations.) 2786 2787 See_also: 2788 $(FREF defaultContext) 2789 */ 2790 struct Context 2791 { 2792 @safe: 2793 /** 2794 Creates a new $(ZMQ) context. 2795 2796 Returns: 2797 A $(REF Context) object that encapsulates the new context. 2798 Throws: 2799 $(REF ZmqException) if $(ZMQ) reports an error. 2800 Corresponds_to: 2801 $(ZMQREF zmq_ctx_new()) 2802 */ 2803 static Context opCall() @trusted // because of the cast 2804 { 2805 if (auto c = trusted!zmq_ctx_new()) { 2806 Context ctx; 2807 // Casting from/to shared is OK since ZMQ contexts are thread safe. 2808 static Exception release(shared(void)* ptr) @trusted nothrow 2809 { 2810 return zmq_ctx_term(cast(void*) ptr) == 0 2811 ? null 2812 : new ZmqException; 2813 } 2814 ctx.m_resource = SharedResource(cast(shared) c, &release); 2815 return ctx; 2816 } else { 2817 throw new ZmqException; 2818 } 2819 } 2820 2821 /// 2822 unittest 2823 { 2824 auto ctx = Context(); 2825 assert (ctx.initialized); 2826 } 2827 2828 /** 2829 Detaches from the $(ZMQ) context. 2830 2831 If this is the last reference to the context, it will be destroyed with 2832 $(ZMQREF zmq_ctx_term()). 2833 2834 Throws: 2835 $(REF ZmqException) if $(ZMQ) reports an error. 2836 */ 2837 void detach() 2838 { 2839 m_resource.detach(); 2840 } 2841 2842 /// 2843 unittest 2844 { 2845 auto ctx = Context(); 2846 assert (ctx.initialized); 2847 ctx.detach(); 2848 assert (!ctx.initialized); 2849 } 2850 2851 /** 2852 Forcefully terminates the context. 2853 2854 By using this function, one effectively circumvents the reference-counting 2855 mechanism for managing the context. After it returns, all other 2856 $(D Context) objects that used to refer to the same context will be in a 2857 state which is functionally equivalent to the default-initialized state 2858 (i.e., $(REF Context.initialized) is $(D false) and $(REF Context.handle) 2859 is $(D null)). 2860 2861 Throws: 2862 $(REF ZmqException) if $(ZMQ) reports an error. 2863 Corresponds_to: 2864 $(ZMQREF zmq_ctx_term()) 2865 */ 2866 void terminate() @system 2867 { 2868 m_resource.forceRelease(); 2869 } 2870 2871 /// 2872 @system unittest 2873 { 2874 auto ctx1 = Context(); 2875 auto ctx2 = ctx1; 2876 assert (ctx1.initialized); 2877 assert (ctx2.initialized); 2878 assert (ctx1.handle == ctx2.handle); 2879 ctx2.terminate(); 2880 assert (!ctx1.initialized); 2881 assert (!ctx2.initialized); 2882 assert (ctx1.handle == null); 2883 assert (ctx2.handle == null); 2884 } 2885 2886 @system unittest 2887 { 2888 auto ctx = Context(); 2889 2890 void threadFunc() 2891 { 2892 auto server = Socket(ctx, SocketType.rep); 2893 server.bind("inproc://Context_terminate_unittest"); 2894 try { 2895 server.receive(null); 2896 server.send(""); 2897 server.receive(null); 2898 assert (false, "Never get here"); 2899 } catch (ZmqException e) { 2900 assert (e.errno == ETERM); 2901 } 2902 } 2903 2904 import core.thread: Thread; 2905 auto thread = new Thread(&threadFunc); 2906 thread.start(); 2907 2908 auto client = Socket(ctx, SocketType.req); 2909 client.connect("inproc://Context_terminate_unittest"); 2910 client.send(""); 2911 client.receive(null); 2912 client.close(); 2913 2914 ctx.terminate(); 2915 thread.join(); 2916 } 2917 2918 2919 /** 2920 The number of I/O threads. 2921 2922 Throws: 2923 $(REF ZmqException) if $(ZMQ) reports an error. 2924 Corresponds_to: 2925 $(ZMQREF zmq_ctx_get()) and $(ZMQREF zmq_ctx_set()) with 2926 $(D ZMQ_IO_THREADS). 2927 */ 2928 @property int ioThreads() 2929 { 2930 return getOption(ZMQ_IO_THREADS); 2931 } 2932 2933 /// ditto 2934 @property void ioThreads(int value) 2935 { 2936 setOption(ZMQ_IO_THREADS, value); 2937 } 2938 2939 /// 2940 unittest 2941 { 2942 auto ctx = Context(); 2943 ctx.ioThreads = 3; 2944 assert (ctx.ioThreads == 3); 2945 } 2946 2947 /** 2948 The maximum number of sockets. 2949 2950 Throws: 2951 $(REF ZmqException) if $(ZMQ) reports an error. 2952 Corresponds_to: 2953 $(ZMQREF zmq_ctx_get()) and $(ZMQREF zmq_ctx_set()) with 2954 $(D ZMQ_MAX_SOCKETS). 2955 */ 2956 @property int maxSockets() 2957 { 2958 return getOption(ZMQ_MAX_SOCKETS); 2959 } 2960 2961 /// ditto 2962 @property void maxSockets(int value) 2963 { 2964 setOption(ZMQ_MAX_SOCKETS, value); 2965 } 2966 2967 /// 2968 unittest 2969 { 2970 auto ctx = Context(); 2971 ctx.maxSockets = 512; 2972 assert (ctx.maxSockets == 512); 2973 } 2974 2975 static if (ZMQ_VERSION >= ZMQ41) { 2976 /** 2977 The largest configurable number of sockets. 2978 2979 Throws: 2980 $(REF ZmqException) if $(ZMQ) reports an error. 2981 Corresponds_to: 2982 $(ZMQREF zmq_ctx_get()) with $(D ZMQ_SOCKET_LIMIT). 2983 */ 2984 @property int socketLimit() 2985 { 2986 return getOption(ZMQ_SOCKET_LIMIT); 2987 } 2988 2989 /// 2990 unittest 2991 { 2992 auto ctx = Context(); 2993 assert (ctx.socketLimit > 0); 2994 } 2995 2996 /** 2997 IPv6 option. 2998 2999 Throws: 3000 $(REF ZmqException) if $(ZMQ) reports an error. 3001 Corresponds_to: 3002 $(ZMQREF zmq_ctx_get()) and $(ZMQREF zmq_ctx_set()) with 3003 $(D ZMQ_IPV6). 3004 */ 3005 @property bool ipv6() 3006 { 3007 return !!getOption(ZMQ_IPV6); 3008 } 3009 3010 /// ditto 3011 @property void ipv6(bool value) 3012 { 3013 setOption(ZMQ_IPV6, value ? 1 : 0); 3014 } 3015 3016 /// 3017 unittest 3018 { 3019 auto ctx = Context(); 3020 ctx.ipv6 = true; 3021 assert (ctx.ipv6 == true); 3022 } 3023 } 3024 3025 /** 3026 The $(D void*) pointer used by the underlying C API to refer to the context. 3027 3028 If the object has not been initialized, this function returns $(D null). 3029 */ 3030 @property inout(void)* handle() inout @trusted pure nothrow 3031 { 3032 // ZMQ contexts are thread safe, so casting away shared is OK. 3033 return cast(typeof(return)) m_resource.handle; 3034 } 3035 3036 /** 3037 Whether this $(REF Context) object has been _initialized, i.e. whether it 3038 refers to a valid $(ZMQ) context. 3039 */ 3040 @property bool initialized() const pure nothrow 3041 { 3042 return m_resource.handle != null; 3043 } 3044 3045 /// 3046 unittest 3047 { 3048 Context ctx; 3049 assert (!ctx.initialized); 3050 ctx = Context(); 3051 assert (ctx.initialized); 3052 ctx.detach(); 3053 assert (!ctx.initialized); 3054 } 3055 3056 private: 3057 int getOption(int option) 3058 { 3059 immutable value = trusted!zmq_ctx_get(this.handle, option); 3060 if (value < 0) { 3061 throw new ZmqException; 3062 } 3063 return value; 3064 } 3065 3066 void setOption(int option, int value) 3067 { 3068 if (trusted!zmq_ctx_set(this.handle, option, value) != 0) { 3069 throw new ZmqException; 3070 } 3071 } 3072 3073 SharedResource m_resource; 3074 } 3075 3076 3077 /** 3078 Socket event types. 3079 3080 These are used together with $(FREF Socket.monitor), and are described 3081 in the $(ZMQREF zmq_socket_monitor()) reference. 3082 */ 3083 enum EventType 3084 { 3085 connected = ZMQ_EVENT_CONNECTED, /// Corresponds to $(D ZMQ_EVENT_CONNECTED). 3086 connectDelayed = ZMQ_EVENT_CONNECT_DELAYED,/// Corresponds to $(D ZMQ_EVENT_CONNECT_DELAYED). 3087 connectRetried = ZMQ_EVENT_CONNECT_RETRIED,/// Corresponds to $(D ZMQ_EVENT_CONNECT_RETRIED). 3088 listening = ZMQ_EVENT_LISTENING, /// Corresponds to $(D ZMQ_EVENT_LISTENING). 3089 bindFailed = ZMQ_EVENT_BIND_FAILED, /// Corresponds to $(D ZMQ_EVENT_BIND_FAILED). 3090 accepted = ZMQ_EVENT_ACCEPTED, /// Corresponds to $(D ZMQ_EVENT_ACCEPTED). 3091 acceptFailed = ZMQ_EVENT_ACCEPT_FAILED, /// Corresponds to $(D ZMQ_EVENT_ACCEPT_FAILED). 3092 closed = ZMQ_EVENT_CLOSED, /// Corresponds to $(D ZMQ_EVENT_CLOSED). 3093 closeFailed = ZMQ_EVENT_CLOSE_FAILED, /// Corresponds to $(D ZMQ_EVENT_CLOSE_FAILED). 3094 disconnected = ZMQ_EVENT_DISCONNECTED, /// Corresponds to $(D ZMQ_EVENT_DISCONNECTED). 3095 all = ZMQ_EVENT_ALL /// Corresponds to $(D ZMQ_EVENT_ALL). 3096 } 3097 3098 3099 /** 3100 Receives a message on the given _socket and interprets it as a _socket 3101 state change event. 3102 3103 $(D socket) must be a PAIR _socket which is connected to an endpoint 3104 created via a $(FREF Socket.monitor) call. $(D receiveEvent()) receives 3105 one message on the _socket, parses its contents according to the 3106 specification in the $(ZMQREF zmq_socket_monitor()) reference, 3107 and returns the event information as an $(REF Event) object. 3108 3109 Throws: 3110 $(REF ZmqException) if $(ZMQ) reports an error.$(BR) 3111 $(REF InvalidEventException) if the received message could not 3112 be interpreted as an event message. 3113 See_also: 3114 $(FREF Socket.monitor), for monitoring _socket state changes. 3115 */ 3116 Event receiveEvent(ref Socket socket) @system 3117 { 3118 // The monitor event message format underwent some changes between ZMQ 3119 // versions 3.2, 3.3 (unreleased) and 4.0. Furthermore, the zmq_event_t 3120 // type was removed as of ZMQ 4.1. We try to support all versions >= 3.2. 3121 immutable ver = zmqVersion(); 3122 immutable usePackedData = ver.major >= 4; 3123 immutable useNewLayout = ZMQ_MAKE_VERSION(ver.major, ver.minor, ver.patch) 3124 >= ZMQ_MAKE_VERSION(3, 3, 0); 3125 assert (useNewLayout || !usePackedData); 3126 3127 struct OldEventStruct { 3128 int event; 3129 const(char*) addr; 3130 int value; 3131 } 3132 struct NewEventStruct { 3133 ushort event; 3134 int value; 3135 } 3136 immutable eventFrameSize = 3137 usePackedData ? (ushort.sizeof + int.sizeof) 3138 : (useNewLayout ? NewEventStruct.sizeof : OldEventStruct.sizeof); 3139 3140 auto eventFrame = Frame(); 3141 if (socket.receive(eventFrame) != eventFrameSize) { 3142 throw new InvalidEventException; 3143 } 3144 const data = eventFrame.data.ptr; 3145 try { 3146 import std.conv: to; 3147 EventType event; 3148 int value; 3149 string addr; 3150 if (useNewLayout) { 3151 if (usePackedData) { 3152 event = to!EventType(*(cast(const(ushort)*) data)); 3153 value = *(cast(const(int)*) data + ushort.sizeof); 3154 } else { 3155 const eventStruct = cast(const(NewEventStruct)*) data; 3156 event = to!EventType(eventStruct.event); 3157 value = eventStruct.value; 3158 } 3159 auto addrFrame = Frame(); 3160 socket.receive(addrFrame); 3161 addr = (cast(char[]) addrFrame.data).idup; 3162 } else { 3163 const eventStruct = cast(const(OldEventStruct)*) data; 3164 event = to!EventType(eventStruct.event); 3165 value = eventStruct.value; 3166 addr = eventStruct.addr !is null 3167 ? to!string(eventStruct.addr) 3168 : null; 3169 } 3170 return Event(event, addr, value); 3171 } catch (Exception e) { 3172 // Any exception thrown within the try block signifies that there 3173 // is something wrong with the event message. 3174 throw new InvalidEventException; 3175 } 3176 } 3177 3178 // TODO: Remove version(Posix) and change to INPROC when updating to ZMQ 4.1. 3179 // IPC does not work on Windows, and unbind() does not work with INPROC. 3180 // See: https://github.com/zeromq/libzmq/issues/949 3181 version (Posix) @system unittest 3182 { 3183 Event[] events; 3184 void eventCollector() 3185 { 3186 auto coll = Socket(SocketType.pair); 3187 coll.connect("inproc://zmqd_receiveEvent_unittest_monitor"); 3188 do { 3189 events ~= receiveEvent(coll); 3190 } while (events[$-1].type != EventType.closed); 3191 } 3192 import core.thread; 3193 auto collector = new Thread(&eventCollector); 3194 collector.start(); 3195 3196 static void eventGenerator() 3197 { 3198 auto sck1 = Socket(SocketType.pair); 3199 sck1.monitor("inproc://zmqd_receiveEvent_unittest_monitor"); 3200 sck1.bind("ipc://zmqd_receiveEvent_unittest"); 3201 import core.time; 3202 Thread.sleep(100.msecs); 3203 auto sck2 = Socket(SocketType.pair); 3204 sck2.connect("ipc://zmqd_receiveEvent_unittest"); 3205 Thread.sleep(100.msecs); 3206 sck2.disconnect("ipc://zmqd_receiveEvent_unittest"); 3207 Thread.sleep(100.msecs); 3208 sck1.unbind("ipc://zmqd_receiveEvent_unittest"); 3209 } 3210 eventGenerator(); 3211 collector.join(); 3212 assert (events.length == 3); 3213 foreach (ev; events) { 3214 assert (ev.address == "ipc://zmqd_receiveEvent_unittest"); 3215 } 3216 assert (events[0].type == EventType.listening); 3217 assert (events[1].type == EventType.accepted); 3218 assert (events[2].type == EventType.closed); 3219 import std.exception; 3220 assertNotThrown!Error(events[0].fd); 3221 assertThrown!Error(events[0].errno); 3222 assertThrown!Error(events[0].interval); 3223 } 3224 3225 3226 /** 3227 Information about a socket state change. 3228 3229 See_also: 3230 $(FREF receiveEvent) 3231 */ 3232 struct Event 3233 { 3234 /// The event _type. 3235 @property EventType type() const pure nothrow 3236 { 3237 return m_type; 3238 } 3239 3240 /// The peer _address. 3241 @property string address() const pure nothrow 3242 { 3243 return m_address; 3244 } 3245 3246 /** 3247 The socket file descriptor. 3248 3249 This property function may only be called if $(REF Event.type) is one of: 3250 $(D connected), $(D listening), $(D accepted), $(D closed) or $(D disonnected). 3251 3252 Throws: 3253 $(D Error) if the property is called for a wrong event type. 3254 */ 3255 @property FD fd() const pure nothrow 3256 { 3257 final switch (m_type) { 3258 case EventType.connected : 3259 case EventType.listening : 3260 case EventType.accepted : 3261 case EventType.closed : 3262 case EventType.disconnected : return cast(typeof(return)) m_value; 3263 case EventType.connectDelayed: 3264 case EventType.connectRetried: 3265 case EventType.bindFailed : 3266 case EventType.acceptFailed : 3267 case EventType.closeFailed : throw invalidProperty(); 3268 case EventType.all : 3269 } 3270 assert (false); 3271 } 3272 3273 /** 3274 The $(D _errno) code for the error which triggered the event. 3275 3276 This property function may only be called if $(REF Event.type) is either 3277 $(D bindFailed), $(D acceptFailed) or $(D closeFailed). 3278 3279 Throws: 3280 $(D Error) if the property is called for a wrong event type. 3281 */ 3282 @property int errno() const pure nothrow 3283 { 3284 final switch (m_type) { 3285 case EventType.bindFailed : 3286 case EventType.acceptFailed : 3287 case EventType.closeFailed : return m_value; 3288 case EventType.connected : 3289 case EventType.connectDelayed: 3290 case EventType.connectRetried: 3291 case EventType.listening : 3292 case EventType.accepted : 3293 case EventType.closed : 3294 case EventType.disconnected : throw invalidProperty(); 3295 case EventType.all : 3296 } 3297 assert (false); 3298 } 3299 3300 /** 3301 The reconnect interval. 3302 3303 This property function may only be called if $(REF Event.type) is 3304 $(D connectRetried). 3305 3306 Throws: 3307 $(D Error) if the property is called for a wrong event type. 3308 */ 3309 @property Duration interval() const pure nothrow 3310 { 3311 final switch (m_type) { 3312 case EventType.connectRetried: return m_value.msecs; 3313 case EventType.connected : 3314 case EventType.connectDelayed: 3315 case EventType.listening : 3316 case EventType.bindFailed : 3317 case EventType.accepted : 3318 case EventType.acceptFailed : 3319 case EventType.closed : 3320 case EventType.closeFailed : 3321 case EventType.disconnected : throw invalidProperty(); 3322 case EventType.all : 3323 } 3324 assert (false); 3325 } 3326 3327 private: 3328 this(EventType type, string address, int value) pure nothrow 3329 { 3330 m_type = type; 3331 m_address = address; 3332 m_value = value; 3333 } 3334 3335 Error invalidProperty(string name = __FUNCTION__)() const pure nothrow 3336 { 3337 try { 3338 import std.conv: text; 3339 return new Error(text("Property '", name, 3340 "' not available for event type '", 3341 m_type, "'")); 3342 } catch (Exception e) { 3343 assert(false); 3344 } 3345 } 3346 3347 EventType m_type; 3348 string m_address; 3349 int m_value; 3350 } 3351 3352 3353 /** 3354 Encodes a binary key as Z85 printable text. 3355 3356 $(D dest) must be an array whose length is at least $(D 5*data.length/4 + 1), 3357 which will be used to store the return value plus a terminating zero byte. 3358 If $(D dest) is omitted, a new array will be created. 3359 3360 Returns: 3361 An array of size $(D 5*data.length/4) which contains the Z85-encoded text, 3362 excluding the terminating zero byte. This will be a slice of $(D dest) if 3363 it is provided. 3364 Throws: 3365 $(COREF exception,RangeError) if $(D dest) is given but is too small.$(BR) 3366 $(REF ZmqException) if $(ZMQ) reports an error (i.e., if $(D data.length) 3367 is not a multiple of 4). 3368 Corresponds_to: 3369 $(ZMQREF zmq_z85_encode()) 3370 */ 3371 char[] z85Encode(ubyte[] data, char[] dest) 3372 // TODO: Make data const when we update to ZMQ 4.1 3373 { 3374 import core.exception: RangeError; 3375 immutable len = 5 * data.length / 4; 3376 if (dest.length < len+1) throw new RangeError; 3377 if (trusted!zmq_z85_encode(dest.ptr, data.ptr, data.length) == null) { 3378 throw new ZmqException; 3379 } 3380 return dest[0 .. len]; 3381 } 3382 3383 /// ditto 3384 char[] z85Encode(ubyte[] data) 3385 { 3386 return z85Encode(data, new char[5*data.length/4 + 1]); 3387 } 3388 3389 debug (WithCurveTests) @system unittest // @system because of assertThrown 3390 { 3391 // TODO: Make data immutable when we update to ZMQ 4.1 3392 auto data = cast(ubyte[])[0x86, 0x4f, 0xd2, 0x6f, 0xb5, 0x59, 0xf7, 0x5b]; 3393 immutable encoded = "HelloWorld"; 3394 assert (z85Encode(data) == encoded); 3395 3396 auto buffer = new char[11]; 3397 auto result = z85Encode(data, buffer); 3398 assert (result == encoded); 3399 assert (buffer.ptr == result.ptr); 3400 3401 import core.exception: RangeError; 3402 import std.exception: assertThrown; 3403 assertThrown!RangeError(z85Encode(data, new char[10])); 3404 assertThrown!ZmqException(z85Encode(cast(ubyte[]) [ 1, 2, 3, 4, 5])); 3405 } 3406 3407 3408 /** 3409 Decodes a binary key from Z85 printable _text. 3410 3411 $(D dest) must be an array whose length is at least $(D 4*data.length/5), 3412 which will be used to store the return value. 3413 If $(D dest) is omitted, a new array will be created. 3414 3415 Note that $(ZMQREF zmq_z85_decode()) expects a zero-terminated string, so a zero 3416 byte will be appended to $(D text) if it does not contain one already. However, 3417 this may trigger a (possibly unwanted) GC allocation. To avoid this, either 3418 make sure that the last character in $(D text) is $(D '\0'), or use 3419 $(OBJREF assumeSafeAppend) on the array before calling this function (provided 3420 this is safe). 3421 3422 Returns: 3423 An array of size $(D 4*data.length/5) which contains the decoded data. 3424 This will be a slice of $(D dest) if it is provided. 3425 Throws: 3426 $(COREF exception,RangeError) if $(D dest) is given but is too small.$(BR) 3427 $(REF ZmqException) if $(ZMQ) reports an error (i.e., if $(D data.length) 3428 is not a multiple of 5). 3429 Corresponds_to: 3430 $(ZMQREF zmq_z85_decode()) 3431 */ 3432 ubyte[] z85Decode(char[] text, ubyte[] dest) 3433 // TODO: Make text const when we update to ZMQ 4.1 3434 { 3435 import core.exception: RangeError; 3436 immutable len = 4 * text.length/5; 3437 if (dest.length < len) throw new RangeError; 3438 if (text[$-1] != '\0') text ~= '\0'; 3439 if (trusted!zmq_z85_decode(dest.ptr, text.ptr) == null) { 3440 throw new ZmqException; 3441 } 3442 return dest[0 .. len]; 3443 } 3444 3445 /// ditto 3446 ubyte[] z85Decode(char[] text) 3447 { 3448 return z85Decode(text, new ubyte[4*text.length/5]); 3449 } 3450 3451 debug (WithCurveTests) @system unittest // @system because of assertThrown 3452 { 3453 // TODO: Make data immutable when we update to ZMQ 4.1 3454 auto text = "HelloWorld".dup; 3455 immutable decoded = cast(ubyte[])[0x86, 0x4f, 0xd2, 0x6f, 0xb5, 0x59, 0xf7, 0x5b]; 3456 assert (z85Decode(text) == decoded); 3457 assert (z85Decode(text~'\0') == decoded); 3458 3459 auto buffer = new ubyte[8]; 3460 auto result = z85Decode(text, buffer); 3461 assert (result == decoded); 3462 assert (buffer.ptr == result.ptr); 3463 3464 import core.exception: RangeError; 3465 import std.exception: assertThrown; 3466 assertThrown!RangeError(z85Decode(text, new ubyte[7])); 3467 assertThrown!ZmqException(z85Decode("SizeNotAMultipleOf5".dup)); 3468 } 3469 3470 3471 /** 3472 Generates a new Curve key pair. 3473 3474 To avoid a memory allocation, preallocated buffers may optionally be supplied 3475 for the two keys. Each of these must have a length of at least 41 bytes, enough 3476 for a 40-character Z85-encoded key plus a terminating zero byte. If either 3477 buffer is omitted/$(D null), a new one will be created. 3478 3479 Returns: 3480 A tuple that contains the two keys. Each of these will have a length of 3481 40 characters, and will be slices of the input buffers if such have been 3482 provided. 3483 Throws: 3484 $(COREF exception,RangeError) if $(D publicKeyBuf) or $(D secretKeyBuf) are 3485 not $(D null) but have a length of less than 41 characters.$(BR) 3486 $(REF ZmqException) if $(ZMQ) reports an error. 3487 Corresponds_to: 3488 $(ZMQREF zmq_curve_keypair()) 3489 */ 3490 Tuple!(char[], "publicKey", char[], "secretKey") 3491 curveKeyPair(char[] publicKeyBuf = null, char[] secretKeyBuf = null) 3492 { 3493 import core.exception: RangeError; 3494 if (publicKeyBuf is null) publicKeyBuf = new char[41]; 3495 else if (publicKeyBuf.length < 41) throw new RangeError; 3496 if (secretKeyBuf is null) secretKeyBuf = new char[41]; 3497 else if (secretKeyBuf.length < 41) throw new RangeError; 3498 3499 static if (ZMQ_VERSION < ZMQ_MAKE_VERSION(4, 1, 0)) { 3500 import deimos.zmq.utils: zmq_curve_keypair; 3501 } 3502 if (trusted!zmq_curve_keypair(publicKeyBuf.ptr, secretKeyBuf.ptr) != 0) { 3503 throw new ZmqException; 3504 } 3505 return typeof(return)(publicKeyBuf[0 .. 40], secretKeyBuf[0 .. 40]); 3506 } 3507 3508 /// 3509 debug (WithCurveTests) unittest 3510 { 3511 auto server = Socket(SocketType.rep); 3512 auto serverKeys = curveKeyPair(); 3513 server.curveServer = true; 3514 server.curveSecretKeyZ85 = serverKeys.secretKey; 3515 server.bind("inproc://curveKeyPair_test"); 3516 3517 auto client = Socket(SocketType.req); 3518 auto clientKeys = curveKeyPair(); 3519 client.curvePublicKeyZ85 = clientKeys.publicKey; 3520 client.curveSecretKeyZ85 = clientKeys.secretKey; 3521 client.curveServerKeyZ85 = serverKeys.publicKey; 3522 client.connect("inproc://curveKeyPair_test"); 3523 client.send("hello"); 3524 3525 ubyte[5] buf; 3526 assert (server.receive(buf) == 5); 3527 assert (buf.asString() == "hello"); 3528 } 3529 3530 debug (WithCurveTests) @system unittest 3531 { 3532 auto k1 = curveKeyPair(); 3533 assert (k1.publicKey.length == 40); 3534 assert (k1.secretKey.length == 40); 3535 3536 char[82] buf; 3537 auto k2 = curveKeyPair(buf[0 .. 41], buf[41 .. 82]); 3538 assert (k2.publicKey.length == 40); 3539 assert (k2.publicKey.ptr == buf.ptr); 3540 assert (k2.secretKey.length == 40); 3541 assert (k2.secretKey.ptr == buf.ptr + 41); 3542 3543 char[82] backup = buf; 3544 import core.exception, std.exception; 3545 assertThrown!RangeError(curveKeyPair(buf[0 .. 40], buf[41 .. 82])); 3546 assertThrown!RangeError(curveKeyPair(buf[0 .. 41], buf[42 .. 82])); 3547 assert (backup[] == buf[]); 3548 } 3549 3550 3551 /** 3552 Utility function which interprets and validates a byte array as a UTF-8 string. 3553 3554 Most of $(ZMQD)'s message API deals in $(D ubyte[]) arrays, but very often, 3555 the message _data contains plain text. $(D asString()) allows for easy and 3556 safe interpretation of raw _data as characters. It checks that $(D data) is 3557 a valid UTF-8 encoded string, and returns a $(D char[]) array that refers to 3558 the same memory region. 3559 3560 Throws: 3561 $(STDREF utf,UTFException) if $(D data) is not a valid UTF-8 string. 3562 See_also: 3563 $(STDREF string,representation), which performs the opposite operation. 3564 */ 3565 inout(char)[] asString(inout(ubyte)[] data) pure 3566 { 3567 auto s = cast(typeof(return)) data; 3568 import std.utf: validate; 3569 validate(s); 3570 return s; 3571 } 3572 3573 /// 3574 unittest 3575 { 3576 auto s1 = Socket(SocketType.pair); 3577 s1.bind("inproc://zmqd_asString_example"); 3578 auto s2 = Socket(SocketType.pair); 3579 s2.connect("inproc://zmqd_asString_example"); 3580 3581 auto msg = Frame(12); 3582 msg.data.asString()[] = "Hello World!"; 3583 s1.send(msg); 3584 3585 ubyte[12] buf; 3586 s2.receive(buf); 3587 assert(buf.asString() == "Hello World!"); 3588 } 3589 3590 unittest 3591 { 3592 auto bytes = cast(ubyte[]) ['f', 'o', 'o']; 3593 auto text = bytes.asString(); 3594 assert (text == "foo"); 3595 assert (cast(void*) bytes.ptr == cast(void*) text.ptr); 3596 3597 import std.exception: assertThrown; 3598 import std.utf: UTFException; 3599 auto b = cast(ubyte[]) [100, 252, 1]; 3600 assertThrown!UTFException(asString(b)); 3601 } 3602 3603 3604 /** 3605 A class for exceptions thrown when any of the underlying $(ZMQ) C functions 3606 report an error. 3607 3608 The exception provides a standard error message obtained with 3609 $(ZMQREF zmq_strerror()), as well as the $(D errno) code set by the $(ZMQ) 3610 function which reported the error. 3611 */ 3612 class ZmqException : Exception 3613 { 3614 /** 3615 The $(D _errno) code that was set by the $(ZMQ) function that reported 3616 the error. 3617 3618 Corresponds_to: 3619 $(ZMQREF zmq_errno()) 3620 */ 3621 immutable int errno; 3622 3623 private: 3624 this(string file = __FILE__, int line = __LINE__) nothrow 3625 { 3626 import core.stdc.errno, std.conv; 3627 this.errno = core.stdc.errno.errno; 3628 string msg; 3629 try { 3630 msg = trusted!(to!string)(trusted!zmq_strerror(this.errno)); 3631 } catch (Exception e) { /* We never get here */ } 3632 assert(msg.length); // Still, let's assert as much. 3633 super(msg, file, line); 3634 } 3635 } 3636 3637 3638 /** 3639 Exception thrown by $(FREF receiveEvent) on failure to interpret a 3640 received message as an event description. 3641 */ 3642 class InvalidEventException : Exception 3643 { 3644 private: 3645 this(string file = __FILE__, int line = __LINE__) nothrow 3646 { 3647 super("The received message is not an event message", file, line); 3648 } 3649 } 3650 3651 3652 // ============================================================================= 3653 // Everything below is internal 3654 // ============================================================================= 3655 private: 3656 3657 3658 struct SharedResource 3659 { 3660 @safe: 3661 alias Exception function(shared(void)*) nothrow Release; 3662 3663 this(shared(void)* ptr, Release release) nothrow 3664 in { assert(ptr); } body 3665 { 3666 m_payload = new shared(Payload)(1, ptr, release); 3667 } 3668 3669 this(this) nothrow 3670 { 3671 if (m_payload) { 3672 incRefCount(); 3673 } 3674 } 3675 3676 ~this() nothrow 3677 { 3678 nothrowDetach(); 3679 } 3680 3681 ref SharedResource opAssign(SharedResource rhs) 3682 { 3683 detach(); 3684 m_payload = rhs.m_payload; 3685 rhs.m_payload = null; 3686 return this; 3687 } 3688 3689 void detach() 3690 { 3691 if (auto ex = nothrowDetach()) throw ex; 3692 } 3693 3694 void forceRelease() @system 3695 { 3696 if (m_payload) { 3697 scope(exit) m_payload = null; 3698 decRefCount(); 3699 if (m_payload.handle != null) { 3700 scope(exit) m_payload.handle = null; 3701 if (auto ex = m_payload.release(m_payload.handle)) { 3702 throw ex; 3703 } 3704 } 3705 } 3706 } 3707 3708 @property inout(shared(void))* handle() inout pure nothrow 3709 { 3710 if (m_payload) { 3711 return m_payload.handle; 3712 } else { 3713 return null; 3714 } 3715 } 3716 3717 private: 3718 void incRefCount() @trusted nothrow 3719 { 3720 assert (m_payload !is null && m_payload.refCount > 0); 3721 import core.atomic: atomicOp; 3722 atomicOp!"+="(m_payload.refCount, 1); 3723 } 3724 3725 int decRefCount() @trusted nothrow 3726 { 3727 assert (m_payload !is null && m_payload.refCount > 0); 3728 import core.atomic: atomicOp; 3729 return atomicOp!"-="(m_payload.refCount, 1); 3730 } 3731 3732 Exception nothrowDetach() @trusted nothrow 3733 out { assert (m_payload is null); } 3734 body 3735 { 3736 if (m_payload) { 3737 scope(exit) m_payload = null; 3738 if (decRefCount() < 1 && m_payload.handle != null) { 3739 return m_payload.release(m_payload.handle); 3740 } 3741 } 3742 return null; 3743 } 3744 3745 struct Payload 3746 { 3747 int refCount; 3748 void* handle; 3749 Release release; 3750 } 3751 shared(Payload)* m_payload; 3752 3753 invariant() 3754 { 3755 assert (m_payload is null || 3756 (m_payload.refCount > 0 && m_payload.release !is null)); 3757 } 3758 } 3759 3760 @system unittest 3761 { 3762 import std.exception: assertNotThrown, assertThrown; 3763 static Exception myFree(shared(void)* p) @trusted nothrow 3764 { 3765 auto v = cast(shared(int)*) p; 3766 if (*v == 0) { 3767 return new Exception("double release"); 3768 } else { 3769 *v = 0; 3770 return null; 3771 } 3772 } 3773 3774 shared int i = 1; 3775 3776 { 3777 // Test constructor and properties. 3778 auto ra = SharedResource(&i, &myFree); 3779 assert (i == 1); 3780 assert (ra.handle == &i); 3781 3782 // Test postblit constructor 3783 auto rb = ra; 3784 assert (i == 1); 3785 assert (rb.handle == &i); 3786 3787 { 3788 // Test properties and free() with default-initialized object. 3789 SharedResource rc; 3790 assert (rc.handle == null); 3791 assertNotThrown(rc.detach()); 3792 3793 // Test assignment, both with and without detachment 3794 rc = rb; 3795 assert (i == 1); 3796 assert (rc.handle == &i); 3797 3798 shared int j = 2; 3799 auto rd = SharedResource(&j, &myFree); 3800 assert (rd.handle == &j); 3801 rd = rb; 3802 assert (j == 0); 3803 assert (i == 1); 3804 assert (rd.handle == &i); 3805 3806 // Test explicit detach() 3807 shared int k = 3; 3808 auto re = SharedResource(&k, &myFree); 3809 assertNotThrown(re.detach()); 3810 assert(k == 0); 3811 3812 // Test failure to free and assign (myFree(&k) fails when k == 0) 3813 re = SharedResource(&k, &myFree); 3814 assertThrown!Exception(re.detach()); // We defined free(k == 0) as an error 3815 re = SharedResource(&k, &myFree); 3816 assertThrown!Exception(re = rb); 3817 } 3818 3819 // i should not be "freed" yet 3820 assert (i == 1); 3821 assert (ra.handle == &i); 3822 assert (rb.handle == &i); 3823 } 3824 // ...but now it should. 3825 assert (i == 0); 3826 } 3827 3828 // Thread safety test 3829 @system unittest 3830 { 3831 enum threadCount = 100; 3832 enum copyCount = 1000; 3833 static Exception myFree(shared(void)* p) @trusted nothrow 3834 { 3835 auto v = cast(shared(int)*) p; 3836 if (*v == 0) { 3837 return new Exception("double release"); 3838 } else { 3839 *v = 0; 3840 return null; 3841 } 3842 } 3843 shared int raw = 1; 3844 { 3845 auto rs = SharedResource(&raw, &myFree); 3846 3847 import core.thread; 3848 auto group = new ThreadGroup; 3849 foreach (i; 0 .. threadCount) { 3850 group.create(() { 3851 auto a = rs; 3852 foreach (j; 0 .. copyCount) { 3853 auto b = a; 3854 assert (b.handle == &raw); 3855 assert (raw == 1); 3856 } 3857 }); 3858 } 3859 group.joinAll(); 3860 assert (rs.handle == &raw); 3861 assert (raw == 1); 3862 } 3863 assert (raw == 0); 3864 } 3865 3866 // forceRelease() test 3867 @system unittest 3868 { 3869 import std.exception: assertNotThrown, assertThrown; 3870 static Exception myFree(shared(void)* p) @trusted nothrow 3871 { 3872 auto v = cast(shared(int)*) p; 3873 if (*v == 0) { 3874 return new Exception("double release"); 3875 } else { 3876 *v = 0; 3877 return null; 3878 } 3879 } 3880 3881 shared int i = 1; 3882 3883 auto ra = SharedResource(&i, &myFree); 3884 auto rb = ra; 3885 assert (ra.handle == &i); 3886 assert (rb.handle == ra.handle); 3887 rb.forceRelease(); 3888 assert (rb.handle == null); 3889 assert (ra.handle == null); 3890 } 3891 3892 3893 version(unittest) private string uniqueUrl(string p, int n = __LINE__) 3894 { 3895 import std.uuid; 3896 return p ~ "://" ~ randomUUID().toString(); 3897 } 3898 3899 3900 private auto trusted(alias func, Args...)(auto ref Args args) @trusted 3901 { 3902 return func(args); 3903 } 3904 3905 3906 // std.string.toStringz() is unsafe, so we provide our own implementation 3907 // tailored to the string sizes we are likely to encounter here. 3908 // Note that this implementation requires that the string be used immediately 3909 // upon return, and not stored, as the buffer will be reused most of the time. 3910 const(char)* zeroTermString(const char[] s) nothrow 3911 { 3912 import std.algorithm: max; 3913 static char[] buf; 3914 immutable len = s.length + 1; 3915 if (buf.length < len) buf.length = max(len, 1023); 3916 buf[0 .. s.length] = s; 3917 buf[s.length] = '\0'; 3918 return buf.ptr; 3919 } 3920 3921 @system unittest 3922 { 3923 auto c1 = zeroTermString("Hello World!"); 3924 assert (c1[0 .. 13] == "Hello World!\0"); 3925 auto c2 = zeroTermString("foo"); 3926 assert (c2[0 .. 4] == "foo\0"); 3927 assert (c1 == c2); 3928 }