1 /* 2 This Source Code Form is subject to the terms of the Mozilla Public 3 License, v. 2.0. If a copy of the MPL was not distributed with this 4 file, You can obtain one at http://mozilla.org/MPL/2.0/. 5 */ 6 7 /** 8 $(ZMQD) – a thin wrapper around the low-level C API of the 9 $(LINK2 http://zeromq.org,$(ZMQ)) messaging framework. 10 11 Most functions in this module have a one-to-one relationship with functions 12 in the underlying C API. Some adaptations have been made to make the API 13 safer, easier and more pleasant to use; most importantly: 14 $(UL 15 $(LI 16 Errors are signalled by means of exceptions rather than return 17 codes. The $(REF ZmqException) class provides 18 a standard textual message for any error condition, but it also 19 provides access to the $(D errno) code set by the C function 20 that reported the error.) 21 $(LI 22 Functions are marked with $(D @safe), $(D pure) and $(D nothrow) 23 as appropriate, thus facilitating their use in high-level D code.) 24 $(LI 25 Memory and resources (i.e. contexts, sockets and messages) are 26 automatically managed, thus preventing leaks.) 27 $(LI 28 Context, socket and message options are implemented as properties.) 29 ) 30 The names of functions and types in $(ZMQD) are very similar to those in 31 $(ZMQ), but they follow the D naming conventions. Thus, the library should 32 feel both familiar to $(ZMQ) users and natural to D users. A notable 33 deviation from the C API is that message parts are consistently called 34 "frames". For example, $(D zmq_msg_send()) becomes $(D zmqd.Frame.send()) 35 and so on. (Multipart messages were a late addition to $(ZMQ), and the "msg" 36 function names were well established at that point. The library's 37 developers have admitted that this is somewhat confusing, and the newer 38 CZMQ API consistently uses "frame" in function names.) 39 40 Due to the close correspondence with the C API, this documentation has 41 intentionally been kept sparse. There is really no reason to repeat the 42 contents of the $(ZMQAPI __start,$(ZMQ) reference manual) here. 43 Instead, the documentation for each function contains a "Corresponds to" 44 section that links to the appropriate pages in the $(ZMQ) reference. Any 45 details given in the present documentation mostly concern the D-specific 46 adaptations that have been made. 47 48 Also note that the examples generally only use the INPROC transport. The 49 reason for this is that the examples double as unittests, and we want to 50 avoid firewall troubles and other issues that could arise with the use of 51 network protocols such as TCP, PGM, etc., and the IPC protocol is not 52 supported on Windows. Anyway, they are only short 53 snippets that demonstrate the syntax; for more comprehensive and realistic 54 examples, please refer to the $(LINK2 http://zguide.zeromq.org/page:all, 55 $(ZMQ) Guide). Many of the examples in the Guide have been translated to 56 D, and can be found in the 57 $(LINK2 https://github.com/kyllingstad/zmqd/tree/master/examples,$(D examples)) 58 subdirectory of the $(ZMQD) source repository. 59 60 Version: 61 1.2.0 (compatible with $(ZMQ) >= 4.3.0) 62 Authors: 63 $(LINK2 http://github.com/kyllingstad,Lars T. Kyllingstad) 64 Copyright: 65 Copyright (c) 2013–2019, Lars T. Kyllingstad. All rights reserved. 66 License: 67 $(ZMQD) is released under the terms of the 68 $(LINK2 https://www.mozilla.org/MPL/2.0/index.txt,Mozilla Public License v. 2.0).$(BR) 69 Please refer to the $(LINK2 http://zeromq.org/area:licensing,$(ZMQ) site) 70 for details about $(ZMQ) licensing. 71 Macros: 72 D = <code>$0</code> 73 EM = <em>$0</em> 74 LDOTS = … 75 QUOTE = <blockquote>$0</blockquote> 76 FREF = $(D $(LINK2 #$1,$1())) 77 REF = $(D $(LINK2 #$1,$1)) 78 COREF = $(D $(LINK2 http://dlang.org/phobos/core_$1.html#.$2,core.$1.$2)) 79 OBJREF = $(D $(LINK2 http://dlang.org/phobos/object.html#.$1,$1)) 80 STDREF = $(D $(LINK2 http://dlang.org/phobos/std_$1.html#.$2,std.$1.$2)) 81 ANCHOR = <span id="$1"></span> 82 ZMQ = ZeroMQ 83 ZMQAPI = $(LINK2 http://api.zeromq.org/4-3:$1,$+) 84 ZMQD = $(EM zmqd) 85 ZMQREF = $(D $(ZMQAPI $1,$1)) 86 */ 87 module zmqd; 88 89 import core.time; 90 import std.typecons; 91 import deimos.zmq.zmq; 92 93 94 version(Windows) { 95 import core.sys.windows.winsock2: SOCKET; 96 } 97 98 // Compile with debug=ZMQD_DisableCurveTests to be able to successfully 99 // run unittests even if the local ZMQ library is not compiled with Curve 100 // support. 101 debug (ZMQD_DisableCurveTests) { } else debug = WithCurveTests; 102 103 // Compatibility check 104 version(unittest) static this() 105 { 106 import std.stdio: stderr; 107 const v = zmqVersion(); 108 if (v.major != ZMQ_VERSION_MAJOR || v.minor != ZMQ_VERSION_MINOR) { 109 stderr.writefln( 110 "Warning: Potential ZeroMQ header/library incompatibility: " 111 ~"The header (binding) is for version %d.%d.%d, " 112 ~"while the library is version %d.%d.%d. Unittests may fail.", 113 ZMQ_VERSION_MAJOR, ZMQ_VERSION_MINOR, ZMQ_VERSION_PATCH, 114 v.major, v.minor, v.patch); 115 } 116 // Known incompatibilities 117 import std.algorithm: min, max; 118 const libVersion = ZMQ_MAKE_VERSION(v.major, v.minor, v.patch); 119 const older = min(ZMQ_VERSION, libVersion); 120 const newer = max(ZMQ_VERSION, libVersion); 121 if (older < ZMQ_MAKE_VERSION(4, 1, 0) && newer >= ZMQ_MAKE_VERSION(4, 1, 0)) { 122 stderr.writeln("Note: Version 4.1.0 is known to be ABI incompatible with older versions"); 123 } else if (older < ZMQ_MAKE_VERSION(4, 1, 1) && newer >= ZMQ_MAKE_VERSION(4, 1, 1)) { 124 stderr.writeln("Note: Version 4.1.1 is known to be ABI incompatible with older versions"); 125 } 126 } 127 128 129 @safe: 130 131 T* ptr(T)(T[] arr) { return arr.length == 0 ? null : &arr[0]; } 132 133 /** 134 Reports the $(ZMQ) library version. 135 136 Returns: 137 A $(STDREF typecons,Tuple) with three integer fields that represent the 138 three versioning levels: $(D major), $(D minor) and $(D patch). 139 Corresponds_to: 140 $(ZMQREF zmq_version()) 141 */ 142 Tuple!(int, "major", int, "minor", int, "patch") zmqVersion() nothrow 143 { 144 typeof(return) v; 145 scopedTrusted!zmq_version(&v.major, &v.minor, &v.patch); 146 return v; 147 } 148 149 150 /** 151 Checks for a $(ZMQ) capability. 152 153 Corresponds_to: 154 $(ZMQREF zmq_has()) 155 */ 156 bool zmqHas(const char[] capability) nothrow 157 { 158 return 0 != trusted!zmq_has(zeroTermString(capability)); 159 } 160 161 unittest 162 { 163 assert (!zmqHas("this ain't a capability")); 164 } 165 166 167 /** 168 The various socket types. 169 170 These are described in the $(ZMQREF zmq_socket()) reference. 171 */ 172 enum SocketType 173 { 174 req = ZMQ_REQ, /// Corresponds to $(D ZMQ_REQ) 175 rep = ZMQ_REP, /// Corresponds to $(D ZMQ_REP) 176 dealer = ZMQ_DEALER, /// Corresponds to $(D ZMQ_DEALER) 177 router = ZMQ_ROUTER, /// Corresponds to $(D ZMQ_ROUTER) 178 pub = ZMQ_PUB, /// Corresponds to $(D ZMQ_PUB) 179 sub = ZMQ_SUB, /// Corresponds to $(D ZMQ_SUB) 180 xpub = ZMQ_XPUB, /// Corresponds to $(D ZMQ_XPUB) 181 xsub = ZMQ_XSUB, /// Corresponds to $(D ZMQ_XSUB) 182 push = ZMQ_PUSH, /// Corresponds to $(D ZMQ_PUSH) 183 pull = ZMQ_PULL, /// Corresponds to $(D ZMQ_PULL) 184 pair = ZMQ_PAIR, /// Corresponds to $(D ZMQ_PAIR) 185 stream = ZMQ_STREAM, /// Corresponds to $(D ZMQ_STREAM) 186 } 187 188 189 /// _Security mechanisms. 190 enum Security 191 { 192 /// $(ZMQAPI zmq_null,NULL): No security or confidentiality. 193 none = ZMQ_NULL, 194 195 /// $(ZMQAPI zmq_plain,PLAIN): Clear-text authentication. 196 plain = ZMQ_PLAIN, 197 198 /// $(ZMQAPI zmq_curve,CURVE): Secure authentication and confidentiality. 199 curve = ZMQ_CURVE, 200 } 201 202 203 /** 204 A special $(COREF time,Duration) value used to signify an infinite 205 timeout or time interval in certain contexts. 206 207 The places where this value may be used are: 208 $(UL 209 $(LI $(LINK2 #Socket.misc_options,certain socket options)) 210 $(LI $(FREF poll)) 211 ) 212 213 Note: 214 Since $(D core.time.Duration) doesn't reserve such a special value, the 215 actual value of $(D infiniteDuration) is $(COREF time,Duration.max). 216 */ 217 enum infiniteDuration = Duration.max; 218 219 220 /** 221 An object that encapsulates a $(ZMQ) socket. 222 223 A default-initialized $(D Socket) is not a valid $(ZMQ) socket; it 224 must always be explicitly initialized with a constructor (see 225 $(FREF _Socket.this)): 226 --- 227 Socket s; // Not a valid socket yet 228 s = Socket(SocketType.push); // ...but now it is. 229 --- 230 This $(D struct) is noncopyable, which means that a socket is always 231 uniquely managed by a single $(D Socket) object. Functions that will 232 inspect or use the socket, but not take ownership of it, should take 233 a $(D ref Socket) parameter. Use $(STDREF algorithm,move) to move 234 a $(D Socket) to a different location (e.g. into a sink function that 235 takes it by value, or into a new variable). 236 237 The socket is automatically closed when the $(D Socket) object goes out 238 of scope. 239 240 Linger_period: 241 Note that Socket by default sets the socket's linger period to zero. 242 This deviates from the $(ZMQ) default (which is an infinite linger period). 243 */ 244 struct Socket 245 { 246 @safe: 247 /** 248 Creates a new $(ZMQ) socket. 249 250 If $(D context) is not specified, the default _context (as returned 251 by $(FREF defaultContext)) is used. 252 253 Throws: 254 $(REF ZmqException) if $(ZMQ) reports an error. 255 Corresponds_to: 256 $(ZMQREF zmq_socket()) 257 */ 258 this(SocketType type) 259 { 260 this(defaultContext(), type); 261 } 262 263 /// ditto 264 this(Context context, SocketType type) 265 { 266 m_context = context; 267 m_type = type; 268 m_socket = trusted!zmq_socket(context.handle, type); 269 if (m_socket == null) { 270 throw new ZmqException; 271 } 272 linger = 0.msecs; 273 } 274 275 /// With default context: 276 unittest 277 { 278 auto sck = Socket(SocketType.push); 279 assert (sck.initialized); 280 } 281 /// With explicit context: 282 unittest 283 { 284 auto ctx = Context(); 285 auto sck = Socket(ctx, SocketType.push); 286 assert (sck.initialized); 287 } 288 289 // Socket objects are noncopyable. 290 @disable this(this); 291 292 unittest // Verify that move semantics work. 293 { 294 import std.algorithm: move; 295 struct SocketOwner 296 { 297 this(Socket s) { owned = trusted!move(s); } 298 Socket owned; 299 } 300 auto socket = Socket(SocketType.req); 301 const socketPtr = socket.handle; 302 assert (socketPtr != null); 303 304 auto owner = SocketOwner(trusted!move(socket)); 305 assert (socket.handle == null); 306 assert (!socket.m_context.initialized); 307 assert (owner.owned.handle == socketPtr); 308 assert (owner.owned.m_context.initialized); 309 } 310 311 // Closes the socket on desctruction. 312 ~this() nothrow { nothrowClose(); } 313 314 /** 315 Closes the $(ZMQ) socket. 316 317 Note that the socket will be closed automatically upon destruction, 318 so it is usually not necessary to call this method manually. 319 320 Throws: 321 $(REF ZmqException) if $(ZMQ) reports an error. 322 Corresponds_to: 323 $(ZMQREF zmq_close()) 324 */ 325 void close() 326 { 327 if (!nothrowClose()) throw new ZmqException; 328 } 329 330 /// 331 unittest 332 { 333 auto s = Socket(SocketType.pair); 334 assert (s.initialized); 335 s.close(); 336 assert (!s.initialized); 337 } 338 339 /** 340 Starts accepting incoming connections on $(D endpoint). 341 342 Throws: 343 $(REF ZmqException) if $(ZMQ) reports an error. 344 Corresponds_to: 345 $(ZMQREF zmq_bind()) 346 */ 347 void bind(const char[] endpoint) 348 { 349 if (trusted!zmq_bind(m_socket, zeroTermString(endpoint)) != 0) { 350 throw new ZmqException; 351 } 352 } 353 354 /// 355 unittest 356 { 357 auto s = Socket(SocketType.pub); 358 s.bind("inproc://zmqd_bind_example"); 359 } 360 361 /** 362 Stops accepting incoming connections on $(D endpoint). 363 364 Throws: 365 $(REF ZmqException) if $(ZMQ) reports an error. 366 Corresponds_to: 367 $(ZMQREF zmq_unbind()) 368 */ 369 void unbind(const char[] endpoint) 370 { 371 if (trusted!zmq_unbind(m_socket, zeroTermString(endpoint)) != 0) { 372 throw new ZmqException; 373 } 374 } 375 376 unittest 377 { 378 auto s = Socket(SocketType.pub); 379 s.bind("inproc://zmqd_unbind_example"); 380 // Do some work... 381 s.unbind("inproc://zmqd_unbind_example"); 382 } 383 384 /** 385 Creates an outgoing connection to $(D endpoint). 386 387 Throws: 388 $(REF ZmqException) if $(ZMQ) reports an error. 389 Corresponds_to: 390 $(ZMQREF zmq_connect()) 391 */ 392 void connect(const char[] endpoint) 393 { 394 if (trusted!zmq_connect(m_socket, zeroTermString(endpoint)) != 0) { 395 throw new ZmqException; 396 } 397 } 398 399 /// 400 unittest 401 { 402 auto s = Socket(SocketType.sub); 403 s.connect("inproc://zmqd_connect_example"); 404 } 405 406 /** 407 Disconnects the socket from $(D endpoint). 408 409 Throws: 410 $(REF ZmqException) if $(ZMQ) reports an error. 411 Corresponds_to: 412 $(ZMQREF zmq_disconnect()) 413 */ 414 void disconnect(const char[] endpoint) 415 { 416 if (trusted!zmq_disconnect(m_socket, zeroTermString(endpoint)) != 0) { 417 throw new ZmqException; 418 } 419 } 420 421 /// 422 unittest 423 { 424 auto s = Socket(SocketType.sub); 425 s.connect("inproc://zmqd_disconnect_example"); 426 // Do some work... 427 s.disconnect("inproc://zmqd_disconnect_example"); 428 } 429 430 /** 431 Sends a message frame. 432 433 $(D _send) blocks until the frame has been queued on the socket. 434 $(D trySend) performs the operation in non-blocking mode, and returns 435 a $(D bool) value that signifies whether the frame was queued on the 436 socket. 437 438 The $(D more) parameter specifies whether this is a multipart message 439 and there are _more frames to follow. 440 441 The $(D char[]) overload is a convenience function that simply casts 442 the string argument to $(D ubyte[]). 443 444 Throws: 445 $(REF ZmqException) if $(ZMQ) reports an error. 446 Corresponds_to: 447 $(ZMQREF zmq_send()) (with the $(D ZMQ_DONTWAIT) flag, in the 448 case of $(D trySend), and with the $(D ZMQ_SNDMORE) flag if 449 $(D more == true)). 450 */ 451 void send(const ubyte[] data, bool more = false) 452 { 453 immutable flags = more ? ZMQ_SNDMORE : 0; 454 if (trusted!zmq_send(m_socket, ptr(data), data.length, flags) < 0) { 455 throw new ZmqException; 456 } 457 } 458 459 /// ditto 460 void send(const char[] data, bool more = false) 461 { 462 send(cast(const(ubyte)[]) data, more); 463 } 464 465 /// ditto 466 bool trySend(const ubyte[] data, bool more = false) 467 { 468 immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0); 469 if (trusted!zmq_send(m_socket, ptr(data), data.length, flags) < 0) { 470 import core.stdc.errno: EAGAIN; 471 if (trusted!zmq_errno() == EAGAIN) return false; 472 else throw new ZmqException; 473 } 474 return true; 475 } 476 477 /// ditto 478 bool trySend(const char[] data, bool more = false) 479 { 480 return trySend(cast(const(ubyte)[]) data, more); 481 } 482 483 /// 484 unittest 485 { 486 auto sck = Socket(SocketType.pub); 487 sck.send(cast(ubyte[]) [11, 226, 92]); 488 sck.send("Hello World!"); 489 } 490 491 /** 492 Sends a message frame. 493 494 $(D _send) blocks until the frame has been queued on the socket. 495 $(D trySend) performs the operation in non-blocking mode, and returns 496 a $(D bool) value that signifies whether the frame was queued on the 497 socket. 498 499 The $(D more) parameter specifies whether this is a multipart message 500 and there are _more frames to follow. 501 502 Throws: 503 $(REF ZmqException) if $(ZMQ) reports an error. 504 Corresponds_to: 505 $(ZMQREF zmq_msg_send()) (with the $(D ZMQ_DONTWAIT) flag, in the 506 case of $(D trySend), and with the $(D ZMQ_SNDMORE) flag if 507 $(D more == true)). 508 */ 509 void send(ref Frame msg, bool more = false) 510 { 511 immutable flags = more ? ZMQ_SNDMORE : 0; 512 if (trusted!zmq_msg_send(msg.handle, m_socket, flags) < 0) { 513 throw new ZmqException; 514 } 515 } 516 517 /// ditto 518 bool trySend(ref Frame msg, bool more = false) 519 { 520 immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0); 521 if (trusted!zmq_msg_send(msg.handle, m_socket, flags) < 0) { 522 import core.stdc.errno: EAGAIN; 523 if (trusted!zmq_errno() == EAGAIN) return false; 524 else throw new ZmqException; 525 } 526 return true; 527 } 528 529 /// 530 unittest 531 { 532 auto sck = Socket(SocketType.pub); 533 auto msg = Frame(12); 534 msg.data.asString()[] = "Hello World!"; 535 sck.send(msg); 536 } 537 538 /** 539 Sends a constant-memory message frame. 540 541 $(D _sendConst) blocks until the frame has been queued on the socket. 542 $(D trySendConst) performs the operation in non-blocking mode, and returns 543 a $(D bool) value that signifies whether the frame was queued on the 544 socket. 545 546 The $(D more) parameter specifies whether this is a multipart message 547 and there are _more frames to follow. 548 549 Throws: 550 $(REF ZmqException) if $(ZMQ) reports an error. 551 Corresponds_to: 552 $(ZMQREF zmq_send_const()) (with the $(D ZMQ_DONTWAIT) flag, in the 553 case of $(D trySend), and with the $(D ZMQ_SNDMORE) flag if 554 $(D more == true)). 555 */ 556 void sendConst(immutable ubyte[] data, bool more = false) 557 { 558 immutable flags = more ? ZMQ_SNDMORE : 0; 559 if (trusted!zmq_send_const(m_socket, ptr(data), data.length, flags) < 0) { 560 throw new ZmqException; 561 } 562 } 563 564 /// ditto 565 void sendConst(string data, bool more = false) 566 { 567 sendConst(cast(immutable(ubyte)[]) data, more); 568 } 569 570 /// ditto 571 bool trySendConst(immutable ubyte[] data, bool more = false) 572 { 573 immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0); 574 if (trusted!zmq_send_const(m_socket, ptr(data), data.length, flags) < 0) { 575 import core.stdc.errno: EAGAIN; 576 if (trusted!zmq_errno() == EAGAIN) return false; 577 else throw new ZmqException; 578 } 579 return true; 580 } 581 582 /// ditto 583 bool trySendConst(string data, bool more = false) 584 { 585 return trySend(cast(immutable(ubyte)[]) data, more); 586 } 587 588 /// 589 unittest 590 { 591 static immutable arr = cast(ubyte[]) [11, 226, 92]; 592 auto sck = Socket(SocketType.pub); 593 sck.sendConst(arr); 594 sck.sendConst("Hello World!"); 595 } 596 597 /** 598 Receives a message frame. 599 600 $(D _receive) blocks until the request can be satisfied, and returns the 601 number of bytes in the frame. 602 $(D tryReceive) performs the operation in non-blocking mode, and returns 603 a $(STDREF typecons,Tuple) which contains the size of the frame along 604 with a $(D bool) value that signifies whether a frame was received. 605 (If the latter is $(D false), the former is always set to zero.) 606 607 Throws: 608 $(REF ZmqException) if $(ZMQ) reports an error. 609 Corresponds_to: 610 $(ZMQREF zmq_recv()) (with the $(D ZMQ_DONTWAIT) flag, in the case 611 of $(D tryReceive)). 612 613 */ 614 size_t receive(scope ubyte[] data) 615 { 616 immutable len = scopedTrusted!zmq_recv(m_socket, ptr(data), data.length, 0); 617 if (len >= 0) { 618 import std.conv; 619 return to!size_t(len); 620 } else { 621 throw new ZmqException; 622 } 623 } 624 625 /// ditto 626 Tuple!(size_t, bool) tryReceive(ubyte[] data) 627 { 628 immutable len = trusted!zmq_recv(m_socket, ptr(data), data.length, ZMQ_DONTWAIT); 629 if (len >= 0) { 630 import std.conv; 631 return typeof(return)(to!size_t(len), true); 632 } else { 633 import core.stdc.errno: EAGAIN; 634 if (trusted!zmq_errno() == EAGAIN) { 635 return typeof(return)(0, false); 636 } else { 637 throw new ZmqException; 638 } 639 } 640 } 641 642 /// 643 unittest 644 { 645 // Sender 646 auto snd = Socket(SocketType.req); 647 snd.connect("inproc://zmqd_receive_example"); 648 snd.send("Hello World!"); 649 650 // Receiver 651 import std.string: representation; 652 auto rcv = Socket(SocketType.rep); 653 rcv.bind("inproc://zmqd_receive_example"); 654 char[256] buf; 655 immutable len = rcv.receive(buf.representation); 656 assert (buf[0 .. len] == "Hello World!"); 657 } 658 659 @system unittest 660 { 661 auto snd = Socket(SocketType.pair); 662 snd.bind("inproc://zmqd_tryReceive_example"); 663 auto rcv = Socket(SocketType.pair); 664 rcv.connect("inproc://zmqd_tryReceive_example"); 665 666 ubyte[256] buf; 667 auto r1 = rcv.tryReceive(buf); 668 assert (!r1[1]); 669 670 import core.thread, core.time, std.string; 671 snd.send("Hello World!"); 672 Thread.sleep(100.msecs); // Wait for message to be transferred... 673 auto r2 = rcv.tryReceive(buf); 674 assert (r2[1] && buf[0 .. r2[0]] == "Hello World!".representation); 675 } 676 677 /** 678 Receives a message frame. 679 680 $(D _receive) blocks until the request can be satisfied, and returns the 681 number of bytes in the frame. 682 $(D tryReceive) performs the operation in non-blocking mode, and returns 683 a $(STDREF typecons,Tuple) which contains the size of the frame along 684 with a $(D bool) value that signifies whether a frame was received. 685 (If the latter is $(D false), the former is always set to zero.) 686 687 Throws: 688 $(REF ZmqException) if $(ZMQ) reports an error. 689 Corresponds_to: 690 $(ZMQREF zmq_msg_recv()) (with the $(D ZMQ_DONTWAIT) flag, in the case 691 of $(D tryReceive)). 692 693 */ 694 size_t receive(ref Frame msg) 695 { 696 immutable len = trusted!zmq_msg_recv(msg.handle, m_socket, 0); 697 if (len >= 0) { 698 import std.conv; 699 return to!size_t(len); 700 } else { 701 throw new ZmqException; 702 } 703 } 704 705 /// ditto 706 Tuple!(size_t, bool) tryReceive(ref Frame msg) 707 { 708 immutable len = trusted!zmq_msg_recv(msg.handle, m_socket, ZMQ_DONTWAIT); 709 if (len >= 0) { 710 import std.conv; 711 return typeof(return)(to!size_t(len), true); 712 } else { 713 import core.stdc.errno: EAGAIN; 714 if (trusted!zmq_errno() == EAGAIN) { 715 return typeof(return)(0, false); 716 } else { 717 throw new ZmqException; 718 } 719 } 720 } 721 722 /// 723 unittest 724 { 725 // Sender 726 auto snd = Socket(SocketType.req); 727 snd.connect("inproc://zmqd_msg_receive_example"); 728 snd.send("Hello World!"); 729 730 // Receiver 731 import std.string: representation; 732 auto rcv = Socket(SocketType.rep); 733 rcv.bind("inproc://zmqd_msg_receive_example"); 734 auto msg = Frame(); 735 rcv.receive(msg); 736 assert (msg.data.asString() == "Hello World!"); 737 } 738 739 @system unittest 740 { 741 auto snd = Socket(SocketType.pair); 742 snd.bind("inproc://zmqd_msg_tryReceive_example"); 743 auto rcv = Socket(SocketType.pair); 744 rcv.connect("inproc://zmqd_msg_tryReceive_example"); 745 746 auto msg = Frame(); 747 auto r1 = rcv.tryReceive(msg); 748 assert (!r1[1]); 749 750 import core.thread, core.time, std.string; 751 snd.send("Hello World!"); 752 Thread.sleep(100.msecs); // Wait for message to be transferred... 753 auto r2 = rcv.tryReceive(msg); 754 assert (r2[1] && msg.data[0 .. r2[0]] == "Hello World!".representation); 755 } 756 757 /** 758 Whether there are _more message frames to follow. 759 760 Throws: 761 $(REF ZmqException) if $(ZMQ) reports an error. 762 Corresponds_to: 763 $(ZMQREF zmq_getsockopt()) with $(D ZMQ_RCVMORE). 764 */ 765 @property bool more() { return !!getOption!int(ZMQ_RCVMORE); } 766 767 // TODO: Better unittest/example 768 unittest 769 { 770 auto sck = Socket(SocketType.req); 771 assert (!sck.more); 772 } 773 774 /** $(ANCHOR Socket.misc_options) 775 Miscellaneous socket options. 776 777 Each of these corresponds to an option passed to 778 $(ZMQREF zmq_getsockopt()) and $(ZMQREF zmq_setsockopt()). For 779 example, $(D identity) corresponds to $(D ZMQ_IDENTITY), 780 $(D receiveBufferSize) corresponds to $(D ZMQ_RCVBUF), etc. 781 782 Notes: 783 $(UL 784 $(LI For convenience, the setter for the $(D identity) property 785 accepts strings. To retrieve a string with the getter, use 786 the $(FREF asString) function. 787 --- 788 sck.identity = "foobar"; 789 assert (sck.identity.asString() == "foobar"); 790 --- 791 ) 792 $(LI The $(D linger), $(D receiveTimeout), $(D sendTimeout) and 793 $(D handshakeInterval) properties may have the special _value 794 $(REF infiniteDuration). This is translated to an option 795 _value of -1 or 0 (depending on which property is being set) 796 in the C API.) 797 $(LI Some options have array _type, and these allow the user to supply 798 a buffer in which to store the _value, to avoid a GC allocation. 799 The return _value is then a slice of this buffer. 800 These are not marked as $(D @property), but are prefixed with 801 "get" (e.g. $(D getIdentity())). A user-supplied buffer is 802 $(EM required) for some options, namely $(D getPlainUsername()) 803 and $(D getPlainPassword()), and these do not have $(D @property) 804 versions. $(D getCurveXxxKey()) and $(D getCurveXxxKeyZ85()) 805 require buffers which are at least 32 and 41 bytes long, 806 respectively.) 807 $(LI The $(D ZMQ_SUBSCRIBE) and $(D ZMQ_UNSUBSCRIBE) options are 808 treated differently from the others; see $(FREF Socket.subscribe) 809 and $(FREF Socket.unsubscribe)) 810 ) 811 812 Throws: 813 $(REF ZmqException) if $(ZMQ) reports an error.$(BR) 814 $(STDREF conv,ConvOverflowException) if a given $(D Duration) is 815 longer than the number of milliseconds that will fit in an $(D int) 816 (only applies to properties of $(COREF time,Duration) _type).$(BR) 817 $(COREF exception,RangeError) if the $(D dest) buffers passed to 818 $(D getCurveXxxKey()) or $(D getCurveXxxKeyZ85()) are less than 819 32 or 41 bytes long, respectively. 820 Corresponds_to: 821 $(ZMQREF zmq_getsockopt()) and $(ZMQREF zmq_setsockopt()). 822 */ 823 @property ulong threadAffinity() { return getOption!ulong(ZMQ_AFFINITY); } 824 /// ditto 825 @property void threadAffinity(ulong value) { setOption(ZMQ_AFFINITY, value); } 826 827 /// ditto 828 @property ubyte[] identity() { return getIdentity(new ubyte[255]); } 829 /// ditto 830 ubyte[] getIdentity(ubyte[] dest) { return getArrayOption(ZMQ_IDENTITY, dest); } 831 /// ditto 832 @property void identity(const ubyte[] value) { setArrayOption(ZMQ_IDENTITY, value); } 833 /// ditto 834 @property void identity(const char[] value) { setArrayOption(ZMQ_IDENTITY, value); } 835 836 /// ditto 837 @property int rate() { return getOption!int(ZMQ_RATE); } 838 /// ditto 839 @property void rate(int value) { setOption(ZMQ_RATE, value); } 840 841 /// ditto 842 @property Duration recoveryInterval() 843 { 844 return msecs(getOption!int(ZMQ_RECOVERY_IVL)); 845 } 846 /// ditto 847 @property void recoveryInterval(Duration value) 848 { 849 import std.conv: to; 850 setOption(ZMQ_RECOVERY_IVL, to!int(value.total!"msecs"())); 851 } 852 853 /// ditto 854 @property int sendBufferSize() { return getOption!int(ZMQ_SNDBUF); } 855 /// ditto 856 @property void sendBufferSize(int value) { setOption(ZMQ_SNDBUF, value); } 857 858 /// ditto 859 @property int receiveBufferSize() { return getOption!int(ZMQ_RCVBUF); } 860 /// ditto 861 @property void receiveBufferSize(int value) { setOption(ZMQ_RCVBUF, value); } 862 863 /// ditto 864 @property FD fd() { return getOption!FD(ZMQ_FD); } 865 866 /// ditto 867 @property PollFlags events() { return getOption!PollFlags(ZMQ_EVENTS); } 868 869 /// ditto 870 @property SocketType type() { return getOption!SocketType(ZMQ_TYPE); } 871 872 /// ditto 873 @property Duration linger() 874 { 875 const auto value = getOption!int(ZMQ_LINGER); 876 return value == -1 ? infiniteDuration : value.msecs; 877 } 878 /// ditto 879 @property void linger(Duration value) 880 { 881 import std.conv: to; 882 setOption( 883 ZMQ_LINGER, 884 value == infiniteDuration ? -1 : to!int(value.total!"msecs"())); 885 } 886 887 /// ditto 888 @property Duration reconnectionInterval() 889 { 890 return getOption!int(ZMQ_RECONNECT_IVL).msecs; 891 } 892 /// ditto 893 @property void reconnectionInterval(Duration value) 894 { 895 import std.conv: to; 896 setOption(ZMQ_RECONNECT_IVL, to!int(value.total!"msecs"())); 897 } 898 899 /// ditto 900 @property int backlog() { return getOption!int(ZMQ_BACKLOG); } 901 /// ditto 902 @property void backlog(int value) { setOption(ZMQ_BACKLOG, value); } 903 904 /// ditto 905 @property Duration maxReconnectionInterval() 906 { 907 return getOption!int(ZMQ_RECONNECT_IVL_MAX).msecs; 908 } 909 /// ditto 910 @property void maxReconnectionInterval(Duration value) 911 { 912 import std.conv: to; 913 setOption(ZMQ_RECONNECT_IVL_MAX, to!int(value.total!"msecs"())); 914 } 915 916 /// ditto 917 @property long maxMsgSize() { return getOption!long(ZMQ_MAXMSGSIZE); } 918 /// ditto 919 @property void maxMsgSize(long value) { setOption(ZMQ_MAXMSGSIZE, value); } 920 921 /// ditto 922 @property int sendHWM() { return getOption!int(ZMQ_SNDHWM); } 923 /// ditto 924 @property void sendHWM(int value) { setOption(ZMQ_SNDHWM, value); } 925 926 /// ditto 927 @property int receiveHWM() { return getOption!int(ZMQ_RCVHWM); } 928 /// ditto 929 @property void receiveHWM(int value) { setOption(ZMQ_RCVHWM, value); } 930 931 /// ditto 932 @property int multicastHops() { return getOption!int(ZMQ_MULTICAST_HOPS); } 933 /// ditto 934 @property void multicastHops(int value) { setOption(ZMQ_MULTICAST_HOPS, value); } 935 936 /// ditto 937 @property Duration receiveTimeout() 938 { 939 const value = getOption!int(ZMQ_RCVTIMEO); 940 return value == -1 ? infiniteDuration : value.msecs; 941 } 942 /// ditto 943 @property void receiveTimeout(Duration value) 944 { 945 import std.conv: to; 946 setOption( 947 ZMQ_RCVTIMEO, 948 value == infiniteDuration ? -1 : to!int(value.total!"msecs"())); 949 } 950 951 /// ditto 952 @property Duration sendTimeout() 953 { 954 const value = getOption!int(ZMQ_SNDTIMEO); 955 return value == -1 ? infiniteDuration : value.msecs; 956 } 957 /// ditto 958 @property void sendTimeout(Duration value) 959 { 960 import std.conv: to; 961 setOption( 962 ZMQ_SNDTIMEO, 963 value == infiniteDuration ? -1 : to!int(value.total!"msecs"())); 964 } 965 966 /// ditto 967 @property char[] lastEndpoint() @trusted 968 { 969 // This function is not @safe because it calls a @system function 970 // (zmq_getsockopt) and takes the address of a local (len). 971 auto buf = new char[1024]; 972 size_t len = buf.length; 973 if (zmq_getsockopt(m_socket, ZMQ_LAST_ENDPOINT, buf.ptr, &len) != 0) { 974 throw new ZmqException; 975 } 976 return buf[0 .. len-1]; 977 } 978 979 /// ditto 980 @property void routerMandatory(bool value) { setOption(ZMQ_ROUTER_MANDATORY, value ? 1 : 0); } 981 982 /// ditto 983 @property int tcpKeepalive() { return getOption!int(ZMQ_TCP_KEEPALIVE); } 984 /// ditto 985 @property void tcpKeepalive(int value) { setOption(ZMQ_TCP_KEEPALIVE, value); } 986 987 /// ditto 988 @property int tcpKeepaliveCnt() { return getOption!int(ZMQ_TCP_KEEPALIVE_CNT); } 989 /// ditto 990 @property void tcpKeepaliveCnt(int value) { setOption(ZMQ_TCP_KEEPALIVE_CNT, value); } 991 992 /// ditto 993 @property int tcpKeepaliveIdle() { return getOption!int(ZMQ_TCP_KEEPALIVE_IDLE); } 994 /// ditto 995 @property void tcpKeepaliveIdle(int value) { setOption(ZMQ_TCP_KEEPALIVE_IDLE, value); } 996 997 /// ditto 998 @property int tcpKeepaliveIntvl() { return getOption!int(ZMQ_TCP_KEEPALIVE_INTVL); } 999 /// ditto 1000 @property void tcpKeepaliveIntvl(int value) { setOption(ZMQ_TCP_KEEPALIVE_INTVL, value); } 1001 1002 /// ditto 1003 @property bool immediate() { return !!getOption!int(ZMQ_IMMEDIATE); } 1004 /// ditto 1005 @property void immediate(bool value) { setOption!int(ZMQ_IMMEDIATE, value ? 1 : 0); } 1006 1007 /// ditto 1008 @property void xpubVerbose(bool value) { setOption(ZMQ_XPUB_VERBOSE, value ? 1 : 0); } 1009 1010 /// ditto 1011 @property bool ipv6() { return !!getOption!int(ZMQ_IPV6); } 1012 /// ditto 1013 @property void ipv6(bool value) { setOption(ZMQ_IPV6, value ? 1 : 0); } 1014 1015 /// ditto 1016 @property Security mechanism() 1017 { 1018 import std.conv; 1019 return to!Security(getOption!int(ZMQ_MECHANISM)); 1020 } 1021 1022 /// ditto 1023 @property bool plainServer() { return !!getOption!int(ZMQ_PLAIN_SERVER); } 1024 /// ditto 1025 @property void plainServer(bool value) { setOption(ZMQ_PLAIN_SERVER, value ? 1 : 0); } 1026 1027 /// ditto 1028 char[] getPlainUsername(char[] dest) 1029 { 1030 return getCStringOption(ZMQ_PLAIN_USERNAME, dest); 1031 } 1032 /// ditto 1033 @property void plainUsername(const(char)[] value) 1034 { 1035 setArrayOption(ZMQ_PLAIN_USERNAME, value); 1036 } 1037 1038 /// ditto 1039 char[] getPlainPassword(char[] dest) 1040 { 1041 return getCStringOption(ZMQ_PLAIN_PASSWORD, dest); 1042 } 1043 /// ditto 1044 @property void plainPassword(const(char)[] value) 1045 { 1046 setArrayOption(ZMQ_PLAIN_PASSWORD, value); 1047 } 1048 1049 /// ditto 1050 @property bool curveServer() { return !!getOption!int(ZMQ_CURVE_SERVER); } 1051 /// ditto 1052 @property void curveServer(bool value) { setOption(ZMQ_CURVE_SERVER, value ? 1 : 0); } 1053 1054 /// ditto 1055 @property ubyte[] curvePublicKey() 1056 { 1057 return getCurvePublicKey(new ubyte[keyBufSizeBin]); 1058 } 1059 /// ditto 1060 ubyte[] getCurvePublicKey(ubyte[] dest) 1061 { 1062 return getCurveKey(ZMQ_CURVE_PUBLICKEY, dest); 1063 } 1064 /// ditto 1065 @property char[] curvePublicKeyZ85() 1066 { 1067 return getCurvePublicKeyZ85(new char[keyBufSizeZ85]); 1068 } 1069 /// ditto 1070 char[] getCurvePublicKeyZ85(char[] dest) 1071 { 1072 return getCurveKeyZ85(ZMQ_CURVE_PUBLICKEY, dest); 1073 } 1074 /// ditto 1075 @property void curvePublicKey(const(ubyte)[] value) 1076 { 1077 setCurveKey(ZMQ_CURVE_PUBLICKEY, value); 1078 } 1079 /// ditto 1080 @property void curvePublicKeyZ85(const(char)[] value) 1081 { 1082 setCurveKeyZ85(ZMQ_CURVE_PUBLICKEY, value); 1083 } 1084 1085 /// ditto 1086 @property ubyte[] curveSecretKey() 1087 { 1088 return getCurveSecretKey(new ubyte[keyBufSizeBin]); 1089 } 1090 /// ditto 1091 ubyte[] getCurveSecretKey(ubyte[] dest) 1092 { 1093 return getCurveKey(ZMQ_CURVE_SECRETKEY, dest); 1094 } 1095 /// ditto 1096 @property char[] curveSecretKeyZ85() 1097 { 1098 return getCurveSecretKeyZ85(new char[keyBufSizeZ85]); 1099 } 1100 /// ditto 1101 char[] getCurveSecretKeyZ85(char[] dest) 1102 { 1103 return getCurveKeyZ85(ZMQ_CURVE_SECRETKEY, dest); 1104 } 1105 /// ditto 1106 @property void curveSecretKey(const(ubyte)[] value) 1107 { 1108 setCurveKey(ZMQ_CURVE_SECRETKEY, value); 1109 } 1110 /// ditto 1111 @property void curveSecretKeyZ85(const(char)[] value) 1112 { 1113 setCurveKeyZ85(ZMQ_CURVE_SECRETKEY, value); 1114 } 1115 1116 /// ditto 1117 @property ubyte[] curveServerKey() 1118 { 1119 return getCurveServerKey(new ubyte[keyBufSizeBin]); 1120 } 1121 /// ditto 1122 ubyte[] getCurveServerKey(ubyte[] dest) 1123 { 1124 return getCurveKey(ZMQ_CURVE_SERVERKEY, dest); 1125 } 1126 /// ditto 1127 @property char[] curveServerKeyZ85() 1128 { 1129 return getCurveServerKeyZ85(new char[keyBufSizeZ85]); 1130 } 1131 /// ditto 1132 char[] getCurveServerKeyZ85(char[] dest) 1133 { 1134 return getCurveKeyZ85(ZMQ_CURVE_SERVERKEY, dest); 1135 } 1136 /// ditto 1137 @property void curveServerKey(const(ubyte)[] value) 1138 { 1139 setCurveKey(ZMQ_CURVE_SERVERKEY, value); 1140 } 1141 /// ditto 1142 @property void curveServerKeyZ85(const(char)[] value) 1143 { 1144 setCurveKeyZ85(ZMQ_CURVE_SERVERKEY, value); 1145 } 1146 1147 /// ditto 1148 @property void probeRouter(bool value) { setOption(ZMQ_PROBE_ROUTER, value ? 1 : 0); } 1149 1150 /// ditto 1151 @property void reqCorrelate(bool value) { setOption(ZMQ_REQ_CORRELATE, value ? 1 : 0); } 1152 1153 /// ditto 1154 @property void reqRelaxed(bool value) { setOption(ZMQ_REQ_RELAXED, value ? 1 : 0); } 1155 1156 /// ditto 1157 @property void conflate(bool value) { setOption(ZMQ_CONFLATE, value ? 1 : 0); } 1158 1159 /// ditto 1160 @property char[] zapDomain() { return getZapDomain(new char[256]); } 1161 /// ditto 1162 char[] getZapDomain(char[] dest) { return getCStringOption(ZMQ_ZAP_DOMAIN, dest); } 1163 /// ditto 1164 @property void zapDomain(const char[] value) { setArrayOption(ZMQ_ZAP_DOMAIN, value); } 1165 1166 /// ditto 1167 @property void routerHandover(bool value) { setOption(ZMQ_ROUTER_HANDOVER, value ? 1 : 0); } 1168 1169 /// ditto 1170 @property void connectionRID(const ubyte[] value) { setArrayOption(ZMQ_CONNECT_RID, value); } 1171 1172 /// ditto 1173 @property int typeOfService() { return getOption!int(ZMQ_TOS); } 1174 /// ditto 1175 @property void typeOfService(int value) { setOption(ZMQ_TOS, value); } 1176 1177 /// ditto 1178 @property bool gssapiPlaintext() { return !!getOption!int(ZMQ_GSSAPI_PLAINTEXT); } 1179 /// ditto 1180 @property void gssapiPlaintext(bool value) { setOption(ZMQ_GSSAPI_PLAINTEXT, value ? 1 : 0); } 1181 1182 /// ditto 1183 @property char[] gssapiPrincipal() { return getGssapiPrincipal(new char[256]); } 1184 /// ditto 1185 char[] getGssapiPrincipal(char[] dest) { return getCStringOption(ZMQ_GSSAPI_PRINCIPAL, dest); } 1186 /// ditto 1187 @property void gssapiPrincipal(const char[] value) { setArrayOption(ZMQ_GSSAPI_PRINCIPAL, value); } 1188 1189 /// ditto 1190 @property bool gssapiServer() { return !!getOption!int(ZMQ_GSSAPI_SERVER); } 1191 /// ditto 1192 @property void gssapiServer(bool value) { setOption(ZMQ_GSSAPI_SERVER, value ? 1 : 0); } 1193 1194 /// ditto 1195 @property char[] gssapiServicePrincipal() { return getGssapiServicePrincipal(new char[256]); } 1196 /// ditto 1197 char[] getGssapiServicePrincipal(char[] dest) { return getCStringOption(ZMQ_GSSAPI_SERVICE_PRINCIPAL, dest); } 1198 /// ditto 1199 @property void gssapiServicePrincipal(const char[] value) { setArrayOption(ZMQ_GSSAPI_SERVICE_PRINCIPAL, value); } 1200 1201 /// ditto 1202 @property Duration handshakeInterval() 1203 { 1204 const auto value = getOption!int(ZMQ_HANDSHAKE_IVL); 1205 return value == 0 ? infiniteDuration : msecs(value); 1206 } 1207 /// ditto 1208 @property void handshakeInterval(Duration value) 1209 { 1210 import std.conv: to; 1211 setOption( 1212 ZMQ_HANDSHAKE_IVL, 1213 value == infiniteDuration ? 0 : to!int(value.total!"msecs"())); 1214 } 1215 1216 // deprecated options 1217 deprecated("Use the 'immediate' property instead") 1218 @property bool delayAttachOnConnect() { return !!getOption!int(ZMQ_DELAY_ATTACH_ON_CONNECT); } 1219 1220 deprecated("Use the 'immediate' property instead") 1221 @property void delayAttachOnConnect(bool value) { setOption(ZMQ_DELAY_ATTACH_ON_CONNECT, value ? 1 : 0); } 1222 1223 deprecated("Use !ipv6 instead") 1224 @property bool ipv4Only() { return !ipv6; } 1225 1226 deprecated("Use ipv6 = !value instead") 1227 @property void ipv4Only(bool value) { ipv6 = !value; } 1228 1229 unittest 1230 { 1231 auto s = Socket(SocketType.xpub); 1232 const e = "inproc://unittest2"; 1233 s.bind(e); 1234 import core.time; 1235 s.threadAffinity = 1; 1236 assert(s.threadAffinity == 1); 1237 s.identity = cast(ubyte[]) [ 65, 66, 67 ]; 1238 assert(s.identity == [65, 66, 67]); 1239 s.identity = "foo"; 1240 assert(s.identity == [102, 111, 111]); 1241 s.rate = 200; 1242 assert(s.rate == 200); 1243 s.recoveryInterval = 5.seconds; 1244 assert(s.recoveryInterval == 5_000.msecs); 1245 s.sendBufferSize = 500; 1246 assert(s.sendBufferSize == 500); 1247 s.receiveBufferSize = 600; 1248 assert(s.receiveBufferSize == 600); 1249 version(Posix) { 1250 assert(s.fd > 2); // 0, 1 and 2 are the standard streams 1251 } 1252 assert(s.events == PollFlags.pollOut); 1253 assert(s.type == SocketType.xpub); 1254 s.linger = Duration.zero; 1255 assert(s.linger == Duration.zero); 1256 s.linger = 100_000.usecs; 1257 assert(s.linger == 100.msecs); 1258 s.linger = infiniteDuration; 1259 assert(s.linger == infiniteDuration); 1260 s.reconnectionInterval = 200_000.usecs; 1261 assert(s.reconnectionInterval == 200.msecs); 1262 s.backlog = 50; 1263 assert(s.backlog == 50); 1264 s.maxReconnectionInterval = 300_000.usecs; 1265 assert(s.maxReconnectionInterval == 300.msecs); 1266 s.maxMsgSize = 1000; 1267 assert(s.maxMsgSize == 1000); 1268 s.sendHWM = 500; 1269 assert(s.sendHWM == 500); 1270 s.receiveHWM = 600; 1271 assert(s.receiveHWM == 600); 1272 s.multicastHops = 2; 1273 assert(s.multicastHops == 2); 1274 s.receiveTimeout = 3.seconds; 1275 assert(s.receiveTimeout == 3_000_000.usecs); 1276 s.receiveTimeout = infiniteDuration; 1277 assert(s.receiveTimeout == infiniteDuration); 1278 s.sendTimeout = 2_000_000.usecs; 1279 assert(s.sendTimeout == 2.seconds); 1280 s.sendTimeout = infiniteDuration; 1281 assert(s.sendTimeout == infiniteDuration); 1282 s.tcpKeepalive = 1; 1283 assert(s.tcpKeepalive == 1); 1284 s.tcpKeepaliveCnt = 1; 1285 assert(s.tcpKeepaliveCnt == 1); 1286 s.tcpKeepaliveIdle = 0; 1287 assert(s.tcpKeepaliveIdle == 0); 1288 s.tcpKeepaliveIntvl = 0; 1289 assert(s.tcpKeepaliveIntvl == 0); 1290 s.immediate = true; 1291 assert(s.immediate); 1292 s.ipv6 = true; 1293 assert(s.ipv6); 1294 s.plainServer = true; 1295 assert(s.mechanism == Security.plain); 1296 assert(s.plainServer); 1297 debug (WithCurveTests) { 1298 assert(!s.curveServer); 1299 s.curveServer = true; 1300 assert(s.mechanism == Security.curve); 1301 assert(!s.plainServer); 1302 assert(s.curveServer); 1303 } 1304 s.plainServer = false; 1305 assert(s.mechanism == Security.none); 1306 assert(!s.plainServer); 1307 debug (WithCurveTests) assert(!s.curveServer); 1308 s.plainUsername = "foobar"; 1309 assert(s.getPlainUsername(new char[8]) == "foobar"); 1310 assert(s.mechanism == Security.plain); 1311 s.plainUsername = null; 1312 assert(s.mechanism == Security.none); 1313 s.plainPassword = "xyz"; 1314 assert(s.getPlainPassword(new char[8]) == "xyz"); 1315 assert(s.mechanism == Security.plain); 1316 s.plainPassword = null; 1317 assert(s.mechanism == Security.none); 1318 s.zapDomain = "my_zap_domain"; 1319 assert(s.zapDomain == "my_zap_domain"); 1320 s.typeOfService = 1; 1321 assert(s.typeOfService == 1); 1322 if (zmqHas("gssapi")) { 1323 s.gssapiPlaintext = true; 1324 assert(s.gssapiPlaintext); 1325 s.gssapiPrincipal = "myPrincipal"; 1326 assert(s.gssapiPrincipal == "myPrincipal"); 1327 s.gssapiServer = true; 1328 assert(s.gssapiServer); 1329 s.gssapiServicePrincipal = "myServicePrincipal"; 1330 assert(s.gssapiServicePrincipal == "myServicePrincipal"); 1331 } 1332 s.handshakeInterval = 60000.msecs; 1333 assert(s.handshakeInterval == 60000.msecs); 1334 1335 // Test write-only options 1336 s.conflate = true; 1337 } 1338 1339 debug (WithCurveTests) @system unittest 1340 { 1341 // The CURVE key options require some special setup, so we test them 1342 // separately. 1343 import std.array, std.range; 1344 auto binKey1 = iota(cast(ubyte) 0, cast(ubyte) 32).array(); 1345 auto z85Key1 = z85Encode(binKey1); 1346 auto binKey2 = iota(cast(ubyte) 32, cast(ubyte) 64).array(); 1347 auto z85Key2 = z85Encode(binKey2); 1348 auto zeroKey = repeat(cast(ubyte) 0).take(32).array(); 1349 assert (z85Key1 != z85Key2); 1350 1351 auto s = Socket(SocketType.req); 1352 s.curvePublicKey = zeroKey; 1353 s.curveSecretKey = zeroKey; 1354 s.curveServerKey = zeroKey; 1355 1356 s.curvePublicKey = binKey1; 1357 assert (s.curvePublicKey == binKey1); 1358 assert (s.curvePublicKeyZ85 == z85Key1); 1359 s.curvePublicKeyZ85 = z85Key2; 1360 assert (s.curvePublicKey == binKey2); 1361 assert (s.curvePublicKeyZ85 == z85Key2); 1362 assert (s.curveSecretKey == zeroKey); 1363 assert (s.curveServerKey == zeroKey); 1364 s.curvePublicKey = zeroKey; 1365 1366 s.curveSecretKey = binKey1; 1367 assert (s.curveSecretKey == binKey1); 1368 assert (s.curveSecretKeyZ85 == z85Key1); 1369 s.curveSecretKeyZ85 = z85Key2; 1370 assert (s.curveSecretKey == binKey2); 1371 assert (s.curveSecretKeyZ85 == z85Key2); 1372 assert (s.curvePublicKey == zeroKey); 1373 assert (s.curveServerKey == zeroKey); 1374 s.curveSecretKey = zeroKey; 1375 1376 s.curveServerKey = binKey1; 1377 assert (s.curveServerKey == binKey1); 1378 assert (s.curveServerKeyZ85 == z85Key1); 1379 s.curveServerKeyZ85 = z85Key2; 1380 assert (s.curveServerKey == binKey2); 1381 assert (s.curveServerKeyZ85 == z85Key2); 1382 assert (s.curvePublicKey == zeroKey); 1383 assert (s.curveSecretKey == zeroKey); 1384 s.curveServerKey = zeroKey; 1385 } 1386 1387 unittest 1388 { 1389 // Some options are only applicable to specific socket types. 1390 auto rt = Socket(SocketType.router); 1391 rt.routerMandatory = true; 1392 rt.probeRouter = true; 1393 rt.routerHandover = true; 1394 rt.connectionRID = cast(ubyte[]) [ 10, 20, 30 ]; 1395 auto xp = Socket(SocketType.xpub); 1396 xp.xpubVerbose = true; 1397 auto rq = Socket(SocketType.req); 1398 rq.reqCorrelate = true; 1399 rq.reqRelaxed = true; 1400 } 1401 1402 deprecated unittest 1403 { 1404 // Test deprecated socket options 1405 auto s = Socket(SocketType.req); 1406 assert(s.ipv4Only); 1407 assert(!s.delayAttachOnConnect); 1408 1409 // Test setters and getters together 1410 s.ipv4Only = false; 1411 assert(!s.ipv4Only); 1412 s.delayAttachOnConnect = true; 1413 assert(s.delayAttachOnConnect); 1414 } 1415 1416 /** 1417 Establishes a message filter. 1418 1419 Throws: 1420 $(REF ZmqException) if $(ZMQ) reports an error. 1421 Corresponds_to: 1422 $(ZMQREF zmq_msg_setsockopt()) with $(D ZMQ_SUBSCRIBE). 1423 */ 1424 void subscribe(const ubyte[] filterPrefix) 1425 { 1426 setArrayOption(ZMQ_SUBSCRIBE, filterPrefix); 1427 } 1428 /// ditto 1429 void subscribe(const char[] filterPrefix) 1430 { 1431 setArrayOption(ZMQ_SUBSCRIBE, filterPrefix); 1432 } 1433 1434 /// 1435 unittest 1436 { 1437 // Create a subscriber that accepts all messages that start with 1438 // the prefixes "foo" or "bar". 1439 auto sck = Socket(SocketType.sub); 1440 sck.subscribe("foo"); 1441 sck.subscribe("bar"); 1442 } 1443 1444 @system unittest 1445 { 1446 void sleep(int ms) { 1447 import core.thread, core.time; 1448 Thread.sleep(dur!"msecs"(ms)); 1449 } 1450 auto pub = Socket(SocketType.pub); 1451 pub.bind("inproc://zmqd_subscribe_unittest"); 1452 auto sub = Socket(SocketType.sub); 1453 sub.connect("inproc://zmqd_subscribe_unittest"); 1454 1455 pub.send("Hello"); 1456 sleep(100); 1457 sub.subscribe("He"); 1458 sub.subscribe(cast(ubyte[])['W', 'o']); 1459 sleep(100); 1460 pub.send("Heeee"); 1461 pub.send("World"); 1462 sleep(100); 1463 ubyte[5] buf; 1464 sub.receive(buf); 1465 assert(buf.asString() == "Heeee"); 1466 sub.receive(buf); 1467 assert(buf.asString() == "World"); 1468 } 1469 1470 /** 1471 Removes a message filter. 1472 1473 Throws: 1474 $(REF ZmqException) if $(ZMQ) reports an error. 1475 Corresponds_to: 1476 $(ZMQREF zmq_msg_setsockopt()) with $(D ZMQ_SUBSCRIBE). 1477 */ 1478 void unsubscribe(const ubyte[] filterPrefix) 1479 { 1480 setArrayOption(ZMQ_UNSUBSCRIBE, filterPrefix); 1481 } 1482 /// ditto 1483 void unsubscribe(const char[] filterPrefix) 1484 { 1485 setArrayOption(ZMQ_UNSUBSCRIBE, filterPrefix); 1486 } 1487 1488 /// 1489 unittest 1490 { 1491 // Subscribe to messages that start with "foo" or "bar". 1492 auto sck = Socket(SocketType.sub); 1493 sck.subscribe("foo"); 1494 sck.subscribe("bar"); 1495 // ... 1496 // From now on, only accept messages that start with "bar" 1497 sck.unsubscribe("foo"); 1498 } 1499 1500 /** 1501 Spawns a PAIR socket that publishes socket state changes (_events) over 1502 the INPROC transport to the given _endpoint. 1503 1504 Which event types should be published may be selected by bitwise-ORing 1505 together different $(REF EventType) flags in the $(D events) parameter. 1506 1507 Throws: 1508 $(REF ZmqException) if $(ZMQ) reports an error. 1509 Corresponds_to: 1510 $(ZMQREF zmq_socket_monitor()) 1511 See_also: 1512 $(FREF receiveEvent), which receives and parses event messages. 1513 */ 1514 void monitor(const char[] endpoint, EventType events = EventType.all) 1515 { 1516 if (trusted!zmq_socket_monitor(m_socket, zeroTermString(endpoint), events) < 0) { 1517 throw new ZmqException; 1518 } 1519 } 1520 1521 /// 1522 unittest 1523 { 1524 auto sck = Socket(SocketType.pub); 1525 sck.monitor("inproc://zmqd_monitor_unittest", 1526 EventType.accepted | EventType.closed); 1527 } 1528 1529 /** 1530 The $(D void*) pointer used by the underlying C API to refer to the socket. 1531 1532 If the object has not been initialized, this function returns $(D null). 1533 */ 1534 @property inout(void)* handle() inout pure nothrow 1535 { 1536 return m_socket; 1537 } 1538 1539 /** 1540 Whether this $(REF Socket) object has been _initialized, i.e. whether it 1541 refers to a valid $(ZMQ) socket. 1542 */ 1543 @property bool initialized() const pure nothrow 1544 { 1545 return m_socket != null; 1546 } 1547 1548 /// 1549 unittest 1550 { 1551 Socket sck; 1552 assert (!sck.initialized); 1553 sck = Socket(SocketType.sub); 1554 assert (sck.initialized); 1555 sck.close(); 1556 assert (!sck.initialized); 1557 } 1558 1559 private: 1560 // Helper function for ~this() and close() 1561 bool nothrowClose() nothrow 1562 { 1563 if (m_socket != null) { 1564 if (trusted!zmq_close(m_socket) != 0) return false; 1565 m_socket = null; 1566 } 1567 return true; 1568 } 1569 1570 import std.traits; 1571 1572 T getOption(T)(int option) @trusted 1573 if (isScalarType!T) 1574 { 1575 T buf; 1576 auto len = T.sizeof; 1577 if (zmq_getsockopt(m_socket, option, &buf, &len) != 0) { 1578 throw new ZmqException; 1579 } 1580 assert(len == T.sizeof); 1581 return buf; 1582 } 1583 1584 void setOption(T)(int option, T value) @trusted 1585 if (isScalarType!T) 1586 { 1587 if (zmq_setsockopt(m_socket, option, &value, value.sizeof) != 0) { 1588 throw new ZmqException; 1589 } 1590 } 1591 1592 T[] getArrayOption(T)(int option, T[] buf) @trusted 1593 if (isScalarType!T) 1594 { 1595 static assert (T.sizeof == 1); 1596 auto len = buf.length; 1597 if (zmq_getsockopt(m_socket, option, buf.ptr, &len) != 0) { 1598 throw new ZmqException; 1599 } 1600 return buf[0 .. len]; 1601 } 1602 1603 void setArrayOption()(int option, const void[] value) 1604 { 1605 if (trusted!zmq_setsockopt(m_socket, option, ptr(value), value.length) != 0) { 1606 throw new ZmqException; 1607 } 1608 } 1609 1610 char[] getCStringOption(int option, char[] buf) 1611 { 1612 auto ret = getArrayOption(option, buf); 1613 assert (ret.length && ret[$-1] == '\0'); 1614 return ret[0 .. $-1]; 1615 } 1616 1617 enum : size_t 1618 { 1619 keySizeBin = 32, 1620 keyBufSizeBin = keySizeBin, 1621 keySizeZ85 = 40, 1622 keyBufSizeZ85 = keySizeZ85 + 1, 1623 } 1624 1625 ubyte[] getCurveKey(int option, ubyte[] buf) 1626 { 1627 if (buf.length < keyBufSizeBin) { 1628 import core.exception: RangeError; 1629 throw new RangeError; 1630 } 1631 return getArrayOption(option, buf[0 .. keyBufSizeBin]); 1632 } 1633 1634 char[] getCurveKeyZ85(int option, char[] buf) 1635 { 1636 if (buf.length < keyBufSizeZ85) { 1637 import core.exception: RangeError; 1638 throw new RangeError; 1639 } 1640 return getCStringOption(option, buf[0 .. keyBufSizeZ85]); 1641 } 1642 1643 void setCurveKey(int option, const ubyte[] value) 1644 { 1645 if (value.length != keySizeBin) throw new Exception("Invalid key size"); 1646 setArrayOption(option, value); 1647 } 1648 1649 void setCurveKeyZ85(int option, const char[] value) 1650 { 1651 if (value.length != keySizeZ85) throw new Exception("Invalid key size"); 1652 setArrayOption(option, value); 1653 } 1654 1655 Context m_context; 1656 SocketType m_type; 1657 void* m_socket; 1658 } 1659 1660 unittest 1661 { 1662 auto s1 = Socket(SocketType.pair); 1663 auto s2 = Socket(SocketType.pair); 1664 s1.bind("inproc://unittest"); 1665 s2.connect("inproc://unittest"); 1666 s1.send("Hello World!"); 1667 ubyte[12] buf; 1668 const len = s2.receive(buf[]); 1669 assert (len == 12); 1670 assert (buf == "Hello World!"); 1671 } 1672 1673 1674 version (Windows) { 1675 alias PlatformFD = SOCKET; 1676 } else version (Posix) { 1677 alias PlatformFD = int; 1678 } 1679 1680 /** 1681 The native socket file descriptor type. 1682 1683 This is an alias for $(D SOCKET) on Windows and $(D int) on POSIX systems. 1684 */ 1685 alias FD = PlatformFD; 1686 1687 1688 /** 1689 Starts the built-in $(ZMQ) _proxy. 1690 1691 This function never returns normally, but it may throw an exception. This could 1692 happen if the context associated with either of the specified sockets is 1693 manually destroyed in a different thread. 1694 1695 Throws: 1696 $(REF ZmqException) if $(ZMQ) reports an error. 1697 Corresponds_to: 1698 $(ZMQREF zmq_proxy()) 1699 See_Also: 1700 $(FREF steerableProxy) 1701 */ 1702 void proxy(ref Socket frontend, ref Socket backend) 1703 { 1704 const rc = trusted!zmq_proxy(frontend.handle, backend.handle, null); 1705 assert (rc == -1); 1706 throw new ZmqException; 1707 } 1708 1709 /// ditto 1710 void proxy(ref Socket frontend, ref Socket backend, ref Socket capture) 1711 { 1712 const rc = trusted!zmq_proxy(frontend.handle, backend.handle, capture.handle); 1713 assert (rc == -1); 1714 throw new ZmqException; 1715 } 1716 1717 1718 /** 1719 Starts the built-in $(ZMQ) proxy with _control flow. 1720 1721 Note that the order of the two last parameters is reversed compared to 1722 $(ZMQREF zmq_proxy_steerable()). That is, the $(D control) socket always 1723 comes before the $(D capture) socket. Furthermore, unlike in $(ZMQ), 1724 $(D control) is mandatory. (Without the _control socket one can simply 1725 use $(FREF proxy).) 1726 1727 Throws: 1728 $(REF ZmqException) if $(ZMQ) reports an error. 1729 Corresponds_to: 1730 $(ZMQREF zmq_proxy_steerable()) 1731 See_Also: 1732 $(FREF proxy) 1733 */ 1734 void steerableProxy(ref Socket frontend, ref Socket backend, ref Socket control) 1735 { 1736 const rc = trusted!zmq_proxy_steerable( 1737 frontend.handle, backend.handle, null, control.handle); 1738 if (rc == -1) throw new ZmqException; 1739 } 1740 1741 /// ditto 1742 void steerableProxy(ref Socket frontend, ref Socket backend, ref Socket control, ref Socket capture) 1743 { 1744 const rc = trusted!zmq_proxy_steerable( 1745 frontend.handle, backend.handle, capture.handle, control.handle); 1746 if (rc == -1) throw new ZmqException; 1747 } 1748 1749 @system unittest 1750 { 1751 import core.thread; 1752 auto t = new Thread(() { 1753 auto frontend = Socket(SocketType.router); 1754 frontend.bind("inproc://zmqd_steerableProxy_unittest_fe"); 1755 auto backend = Socket(SocketType.dealer); 1756 backend.bind("inproc://zmqd_steerableProxy_unittest_be"); 1757 auto controllee = Socket(SocketType.pair); 1758 controllee.bind("inproc://zmqd_steerableProxy_unittest_ctl"); 1759 steerableProxy(frontend, backend, controllee); 1760 }); 1761 t.start(); 1762 auto client = Socket(SocketType.req); 1763 client.connect("inproc://zmqd_steerableProxy_unittest_fe"); 1764 auto server = Socket(SocketType.rep); 1765 server.connect("inproc://zmqd_steerableProxy_unittest_be"); 1766 auto controller = Socket(SocketType.pair); 1767 controller.connect("inproc://zmqd_steerableProxy_unittest_ctl"); 1768 1769 auto cf = Frame(1); 1770 cf.data[0] = 86; 1771 client.send(cf); 1772 auto sf = Frame(); 1773 server.receive(sf); 1774 assert(sf.size == 1 && sf.data[0] == 86); 1775 sf.data[0] = 87; 1776 server.send(sf); 1777 client.receive(cf); 1778 assert(cf.size == 1 && cf.data[0] == 87); 1779 1780 controller.send("TERMINATE"); 1781 t.join(); 1782 } 1783 1784 @system unittest 1785 { 1786 import core.thread; 1787 auto t = new Thread(() { 1788 auto frontend = Socket(SocketType.pull); 1789 frontend.bind("inproc://zmqd_steerableProxy2_unittest_fe"); 1790 auto backend = Socket(SocketType.push); 1791 backend.bind("inproc://zmqd_steerableProxy2_unittest_be"); 1792 auto controllee = Socket(SocketType.pair); 1793 controllee.bind("inproc://zmqd_steerableProxy2_unittest_ctl"); 1794 auto capture = Socket(SocketType.push); 1795 capture.bind("inproc://zmqd_steerableProxy2_unittest_cpt"); 1796 steerableProxy(frontend, backend, controllee, capture); 1797 }); 1798 t.start(); 1799 auto client = Socket(SocketType.push); 1800 client.connect("inproc://zmqd_steerableProxy2_unittest_fe"); 1801 auto server = Socket(SocketType.pull); 1802 server.connect("inproc://zmqd_steerableProxy2_unittest_be"); 1803 auto controller = Socket(SocketType.pair); 1804 controller.connect("inproc://zmqd_steerableProxy2_unittest_ctl"); 1805 auto capturer = Socket(SocketType.pull); 1806 capturer.connect("inproc://zmqd_steerableProxy2_unittest_cpt"); 1807 1808 auto cf = Frame(1); 1809 cf.data[0] = 86; 1810 client.send(cf); 1811 auto sf = Frame(); 1812 server.receive(sf); 1813 assert(sf.size == 1 && sf.data[0] == 86); 1814 auto pf = Frame(); 1815 capturer.receive(pf); 1816 assert(pf.size == 1 && pf.data[0] == 86); 1817 1818 controller.send("TERMINATE"); 1819 t.join(); 1820 } 1821 1822 1823 deprecated("zmqd.poll() has a new signature as of v0.4") 1824 uint poll(zmq_pollitem_t[] items, Duration timeout = infiniteDuration) 1825 { 1826 import std.conv: to; 1827 const n = trusted!zmq_poll( 1828 ptr(items), 1829 to!int(items.length), 1830 timeout == infiniteDuration ? -1 : to!int(timeout.total!"msecs"())); 1831 if (n < 0) throw new ZmqException; 1832 return cast(uint) n; 1833 } 1834 1835 1836 /** 1837 Input/output multiplexing. 1838 1839 The $(D timeout) parameter may have the special value $(REF infiniteDuration) 1840 which means no _timeout. This is translated to an argument value of -1 in the 1841 C API. 1842 1843 Returns: 1844 The number of $(REF PollItem) structures with events signalled in 1845 $(REF PollItem.returnedEvents), or 0 if no events have been signalled. 1846 Throws: 1847 $(REF ZmqException) if $(ZMQ) reports an error. 1848 Corresponds_to: 1849 $(ZMQREF zmq_poll()) 1850 */ 1851 uint poll(PollItem[] items, Duration timeout = infiniteDuration) @trusted 1852 { 1853 // Here we use a trick where we pretend the array of PollItems is 1854 // actually an array of zmq_pollitem_t, to avoid an unnecessary 1855 // allocation. For this to work, PollItem must have the exact 1856 // same size as zmq_pollitem_t. 1857 static assert (PollItem.sizeof == zmq_pollitem_t.sizeof); 1858 1859 import std.conv: to; 1860 const n = zmq_poll( 1861 cast(zmq_pollitem_t*) items.ptr, 1862 to!int(items.length), 1863 timeout == infiniteDuration ? -1 : to!int(timeout.total!"msecs"())); 1864 if (n < 0) throw new ZmqException; 1865 return cast(uint) n; 1866 } 1867 1868 1869 /// 1870 @system unittest 1871 { 1872 auto socket1 = zmqd.Socket(zmqd.SocketType.pull); 1873 socket1.bind("inproc://zmqd_poll_example"); 1874 1875 import std.socket; 1876 auto socket2 = new std.socket.Socket( 1877 AddressFamily.INET, 1878 std.socket.SocketType.DGRAM); 1879 socket2.bind(new InternetAddress(InternetAddress.ADDR_ANY, 5678)); 1880 1881 auto socket3 = zmqd.Socket(zmqd.SocketType.push); 1882 socket3.connect("inproc://zmqd_poll_example"); 1883 socket3.send("test"); 1884 1885 import core.thread: Thread; 1886 Thread.sleep(10.msecs); 1887 1888 auto items = [ 1889 PollItem(socket1, PollFlags.pollIn), 1890 PollItem(socket2, PollFlags.pollIn | PollFlags.pollOut), 1891 PollItem(socket3, PollFlags.pollIn), 1892 ]; 1893 1894 const n = poll(items, 100.msecs); 1895 assert (n == 2); 1896 assert (items[0].returnedEvents == PollFlags.pollIn); 1897 assert (items[1].returnedEvents == PollFlags.pollOut); 1898 assert (items[2].returnedEvents == 0); 1899 socket2.close(); 1900 } 1901 1902 1903 /** 1904 $(FREF poll) event flags. 1905 1906 These are described in the $(ZMQREF zmq_poll()) manual. 1907 */ 1908 enum PollFlags 1909 { 1910 pollIn = ZMQ_POLLIN, /// Corresponds to $(D ZMQ_POLLIN) 1911 pollOut = ZMQ_POLLOUT, /// Corresponds to $(D ZMQ_POLLOUT) 1912 pollErr = ZMQ_POLLERR, /// Corresponds to $(D ZMQ_POLLERR) 1913 } 1914 1915 1916 /++ 1917 A structure that specifies a socket to be monitored by $(FREF poll) as well 1918 as the events to poll for, and, when $(FREF poll) returns, the events that 1919 occurred. 1920 1921 Warning: 1922 $(D PollItem) objects do not store $(STDREF socket,Socket) references, 1923 only the corresponding native file descriptors. This means that the 1924 references have to be stored elsewhere, or the objects may be garbage 1925 collected, invalidating the sockets before or while $(FREF poll) executes. 1926 --- 1927 // Not OK 1928 auto p1 = PollItem(new std.socket.Socket(/*...*/), PollFlags.pollIn); 1929 1930 // OK 1931 auto s = new std.socket.Socket(/*...*/); 1932 auto p2 = PollItem(s, PollFlags.pollIn); 1933 --- 1934 Corresponds_to: 1935 $(D $(ZMQAPI zmq_poll,zmq_pollitem_t)) 1936 +/ 1937 struct PollItem 1938 { 1939 /// Constructs a $(REF PollItem) for monitoring a $(ZMQ) socket. 1940 this(ref zmqd.Socket socket, PollFlags events) nothrow 1941 { 1942 m_pollItem = zmq_pollitem_t(socket.handle, 0, cast(short) events, 0); 1943 } 1944 1945 import std.socket; 1946 /** 1947 Constructs a $(REF PollItem) for monitoring a standard socket referenced 1948 by a $(STDREF socket,Socket). 1949 */ 1950 this(std.socket.Socket socket, PollFlags events) @system 1951 { 1952 this(socket.handle, events); 1953 } 1954 1955 /** 1956 Constructs a $(REF PollItem) for monitoring a standard socket referenced 1957 by a native file descriptor. 1958 */ 1959 this(FD fd, PollFlags events) pure nothrow 1960 { 1961 m_pollItem = zmq_pollitem_t(null, fd, cast(short) events, 0); 1962 } 1963 1964 /** 1965 Requested _events. 1966 1967 Corresponds_to: 1968 $(D $(ZMQAPI zmq_poll,zmq_pollitem_t.events)) 1969 */ 1970 @property void requestedEvents(PollFlags events) pure nothrow 1971 { 1972 m_pollItem.events = cast(short) events; 1973 } 1974 1975 /// ditto 1976 @property PollFlags requestedEvents() const pure nothrow 1977 { 1978 return cast(typeof(return)) m_pollItem.events; 1979 } 1980 1981 /** 1982 Returned events. 1983 1984 Corresponds_to: 1985 $(D $(ZMQAPI zmq_poll,zmq_pollitem_t.revents)) 1986 */ 1987 @property PollFlags returnedEvents() const pure nothrow 1988 { 1989 return cast(typeof(return)) m_pollItem.revents; 1990 } 1991 1992 private: 1993 zmq_pollitem_t m_pollItem; 1994 } 1995 1996 1997 /** 1998 An object that encapsulates a $(ZMQ) message frame. 1999 2000 This $(D struct) is a wrapper around a $(D zmq_msg_t) object. 2001 A default-initialized $(D Frame) is not a valid $(ZMQ) message frame; it 2002 should always be explicitly initialized upon construction using 2003 $(FREF _Frame.opCall). Alternatively, it may be initialized later with 2004 $(FREF _Frame.rebuild). 2005 --- 2006 Frame msg1; // Invalid frame 2007 auto msg2 = Frame(); // Empty frame 2008 auto msg3 = Frame(1024); // 1K frame 2009 msg1.rebuild(2048); // msg1 now has size 2K 2010 msg2.rebuild(2048); // ...and so does msg2 2011 --- 2012 When a $(D Frame) object is destroyed, $(ZMQREF zmq_msg_close()) is 2013 called on the underlying $(D zmq_msg_t). 2014 2015 A $(D Frame) cannot be copied by normal assignment; use $(FREF _Frame.copy) 2016 for this. 2017 */ 2018 struct Frame 2019 { 2020 @safe: 2021 /** 2022 Initializes an empty $(ZMQ) message frame. 2023 2024 Throws: 2025 $(REF ZmqException) if $(ZMQ) reports an error. 2026 Corresponds_to: 2027 $(ZMQREF zmq_msg_init()) 2028 */ 2029 static Frame opCall() 2030 { 2031 Frame f; 2032 f.init(); 2033 return f; 2034 } 2035 2036 /// 2037 unittest 2038 { 2039 auto msg = Frame(); 2040 assert(msg.size == 0); 2041 } 2042 2043 /** $(ANCHOR Frame.opCall_size) 2044 Initializes a $(ZMQ) message frame of the specified _size. 2045 2046 Throws: 2047 $(REF ZmqException) if $(ZMQ) reports an error. 2048 Corresponds_to: 2049 $(ZMQREF zmq_msg_init_size()) 2050 */ 2051 static Frame opCall(size_t size) 2052 { 2053 Frame m; 2054 m.init(size); 2055 return m; 2056 } 2057 2058 /// 2059 unittest 2060 { 2061 auto msg = Frame(123); 2062 assert(msg.size == 123); 2063 } 2064 2065 /** $(ANCHOR Frame.opCall_data) 2066 Initializes a $(ZMQ) message frame from a supplied buffer. 2067 2068 If $(D free) is not specified, $(D data) $(EM must) refer to a slice of 2069 memory which has been allocated by the garbage collector (typically using 2070 operator $(D new)). Ownership will then be transferred temporarily to 2071 $(ZMQ), and then transferred back to the GC when $(ZMQ) is done using the 2072 buffer. 2073 2074 For memory which is $(EM not) garbage collected, the argument $(D free) 2075 must be a pointer to a function that will release the memory, which will 2076 be called when $(ZMQ) no longer needs the buffer. $(D free) must point to 2077 a function with the following signature: 2078 --- 2079 extern(C) void f(void* d, void* h) nothrow; 2080 --- 2081 When it is called, $(D d) will be equal to $(D data.ptr), while $(D h) will 2082 be equal to $(D hint) (which is optional and null by default). 2083 2084 $(D free) is passed directly to the underlying $(ZMQ) C function, which is 2085 why it needs to be $(D extern(C)). 2086 2087 Warning: 2088 Some care must be taken when using this function, as there is no telling 2089 when, whether, or in which thread $(ZMQ) relinquishes ownership of the 2090 buffer. Client code should therefore avoid retaining any references to 2091 it, including slices that contain, overlap with or are contained in 2092 $(D data). For the "non-GC" version, client code should in general not 2093 retain slices or pointers to any memory which will be released when 2094 $(D free) is called. 2095 See_Also: 2096 $(REF Frame.FreeData) 2097 Throws: 2098 $(REF ZmqException) if $(ZMQ) reports an error. 2099 Corresponds_to: 2100 $(ZMQREF zmq_msg_init_data()) 2101 */ 2102 static Frame opCall(ubyte[] data) @system 2103 { 2104 Frame m; 2105 m.init(data); 2106 return m; 2107 } 2108 2109 /// ditto 2110 static Frame opCall(ubyte[] data, FreeData free, void* hint = null) @system 2111 { 2112 Frame m; 2113 m.init(data, free, hint); 2114 return m; 2115 } 2116 2117 /// 2118 @system unittest 2119 { 2120 // Garbage-collected memory 2121 auto buf = new ubyte[123]; 2122 auto msg = Frame(buf); 2123 assert(msg.size == buf.length); 2124 assert(msg.data.ptr == buf.ptr); 2125 } 2126 2127 /// 2128 @system unittest 2129 { 2130 // Manually managed memory 2131 import core.stdc.stdlib: malloc, free; 2132 static extern(C) void myFree(void* data, void* hint) nothrow { free(data); } 2133 auto buf = (cast(ubyte*) malloc(10))[0 .. 10]; 2134 auto msg = Frame(buf, &myFree); 2135 assert(msg.size == buf.length); 2136 assert(msg.data.ptr == buf.ptr); 2137 } 2138 2139 @system unittest 2140 { 2141 // Non-documentation unittest. Let's see if the data *actually* gets freed. 2142 ubyte buffer = 81; 2143 bool released = false; 2144 static extern(C) void myFree(void* data, void* hint) nothrow 2145 { 2146 auto typedData = cast(ubyte*) data; 2147 auto typedHint = cast(bool*) hint; 2148 assert(*typedData == 81); 2149 assert(!*typedHint); 2150 *typedData = 0; 2151 *typedHint = true; 2152 } 2153 // New scope, so the frame gets released at the end. 2154 { 2155 auto frame = Frame((&buffer)[0 .. 1], &myFree, &released); 2156 assert(frame.size == 1); 2157 assert(frame.data.ptr == &buffer); 2158 assert(buffer == 81); 2159 assert(!released); 2160 } 2161 assert(buffer == 0); 2162 assert(released); 2163 } 2164 2165 /** 2166 The function pointer type for memory-freeing callback functions passed to 2167 $(D $(LINK2 #Frame.opCall_data,Frame(ubyte[], _FreeData, void*))). 2168 */ 2169 @system alias extern(C) void function(void*, void*) nothrow FreeData; 2170 2171 /** 2172 Reinitializes the Frame object as an empty message. 2173 2174 This function will first call $(FREF Frame.close) to release the 2175 resources associated with the message frame, and then it will 2176 initialize it anew, exactly as if it were constructed with 2177 $(D $(LINK2 #Frame.opCall,Frame())). 2178 2179 Throws: 2180 $(REF ZmqException) if $(ZMQ) reports an error. 2181 Corresponds_to: 2182 $(ZMQREF zmq_msg_close()) followed by $(ZMQREF zmq_msg_init()) 2183 */ 2184 void rebuild() 2185 { 2186 close(); 2187 init(); 2188 } 2189 2190 /// 2191 unittest 2192 { 2193 auto msg = Frame(256); 2194 assert (msg.size == 256); 2195 msg.rebuild(); 2196 assert (msg.size == 0); 2197 } 2198 2199 /** 2200 Reinitializes the Frame object to a specified size. 2201 2202 This function will first call $(FREF Frame.close) to release the 2203 resources associated with the message frame, and then it will 2204 initialize it anew, exactly as if it were constructed with 2205 $(D $(LINK2 #Frame.opCall_size,Frame(size))). 2206 2207 Throws: 2208 $(REF ZmqException) if $(ZMQ) reports an error. 2209 Corresponds_to: 2210 $(ZMQREF zmq_msg_close()) followed by $(ZMQREF zmq_msg_init_size()). 2211 */ 2212 void rebuild(size_t size) 2213 { 2214 close(); 2215 init(size); 2216 } 2217 2218 /// 2219 unittest 2220 { 2221 auto msg = Frame(256); 2222 assert (msg.size == 256); 2223 msg.rebuild(1024); 2224 assert (msg.size == 1024); 2225 } 2226 2227 /** 2228 Reinitializes the Frame object from a supplied buffer. 2229 2230 This function will first call $(FREF Frame.close) to release the 2231 resources associated with the message frame, and then it will 2232 initialize it anew, exactly as if it were constructed with 2233 $(D Frame(data)) or $(D Frame(data, free, hint)). 2234 2235 Some care must be taken when using these functions. Please read the 2236 $(D $(LINK2 #Frame.opCall_data,Frame(ubyte[]))) documentation. 2237 2238 Throws: 2239 $(REF ZmqException) if $(ZMQ) reports an error. 2240 Corresponds_to: 2241 $(ZMQREF zmq_msg_close()) followed by $(ZMQREF zmq_msg_init_data()). 2242 */ 2243 void rebuild(ubyte[] data) @system 2244 { 2245 close(); 2246 init(data); 2247 } 2248 2249 /// ditto 2250 void rebuild(ubyte[] data, FreeData free, void* hint = null) @system 2251 { 2252 close(); 2253 init(data, free, hint); 2254 } 2255 2256 /// 2257 @system unittest 2258 { 2259 // Garbage-collected memory 2260 auto msg = Frame(256); 2261 assert (msg.size == 256); 2262 auto buf = new ubyte[123]; 2263 msg.rebuild(buf); 2264 assert(msg.size == buf.length); 2265 assert(msg.data.ptr == buf.ptr); 2266 } 2267 2268 /// 2269 @system unittest 2270 { 2271 // Manually managed memory 2272 import core.stdc.stdlib: malloc, free; 2273 static extern(C) void myFree(void* data, void* hint) nothrow { free(data); } 2274 2275 auto msg = Frame(256); 2276 assert (msg.size == 256); 2277 auto buf = (cast(ubyte*) malloc(10))[0 .. 10]; 2278 msg.rebuild(buf, &myFree); 2279 assert(msg.size == buf.length); 2280 assert(msg.data.ptr == buf.ptr); 2281 } 2282 2283 @disable this(this); 2284 2285 /** 2286 Releases the $(ZMQ) message frame when the $(D Frame) is destroyed. 2287 2288 This destructor never throws, which means that any errors will go 2289 undetected. If this is undesirable, call $(FREF Frame.close) before 2290 the $(D Frame) is destroyed. 2291 2292 Corresponds_to: 2293 $(ZMQREF zmq_msg_close()) 2294 */ 2295 ~this() nothrow 2296 { 2297 if (m_initialized) { 2298 immutable rc = trusted!zmq_msg_close(&m_msg); 2299 assert(rc == 0, "zmq_msg_close failed: Invalid message frame"); 2300 } 2301 } 2302 2303 /** 2304 Releases the $(ZMQ) message frame. 2305 2306 Note that the frame will be automatically released when the $(D Frame) 2307 object is destroyed, so it is often not necessary to call this method 2308 manually. 2309 2310 Throws: 2311 $(REF ZmqException) if $(ZMQ) reports an error. 2312 Corresponds_to: 2313 $(ZMQREF zmq_msg_close()) 2314 */ 2315 void close() 2316 { 2317 if (m_initialized) { 2318 if (trusted!zmq_msg_close(&m_msg) != 0) { 2319 throw new ZmqException; 2320 } 2321 m_initialized = false; 2322 } 2323 } 2324 2325 /** 2326 Copies frame content to another message frame. 2327 2328 $(D copy()) returns a new $(D Frame) object, while $(D copyTo(dest)) 2329 copies the contents of this $(D Frame) into $(D dest). $(D dest) must 2330 be a valid (i.e. initialized) $(D Frame). 2331 2332 Warning: 2333 These functions may not do what you think they do. Please refer 2334 to $(ZMQAPI zmq_msg_copy(),the $(ZMQ) manual) for details. 2335 Throws: 2336 $(REF ZmqException) if $(ZMQ) reports an error. 2337 Corresponds_to: 2338 $(ZMQREF zmq_msg_copy()) 2339 */ 2340 Frame copy() 2341 in { assert(m_initialized); } 2342 body 2343 { 2344 auto cp = Frame(); 2345 copyTo(cp); 2346 return cp; 2347 } 2348 2349 /// ditto 2350 void copyTo(ref Frame dest) 2351 in { assert(m_initialized); } 2352 body 2353 { 2354 if (trusted!zmq_msg_copy(&dest.m_msg, &m_msg) != 0) { 2355 throw new ZmqException; 2356 } 2357 } 2358 2359 /// 2360 unittest 2361 { 2362 import std.string: representation; 2363 auto msg1 = Frame(3); 2364 msg1.data[] = "foo".representation; 2365 auto msg2 = msg1.copy(); 2366 assert (msg2.data.asString() == "foo"); 2367 } 2368 2369 /** 2370 Moves frame content to another message frame. 2371 2372 $(D move()) returns a new $(D Frame) object, while $(D moveTo(dest)) 2373 moves the contents of this $(D Frame) to $(D dest). $(D dest) must 2374 be a valid (i.e. initialized) $(D Frame). 2375 2376 Throws: 2377 $(REF ZmqException) if $(ZMQ) reports an error. 2378 Corresponds_to: 2379 $(ZMQREF zmq_msg_move()) 2380 */ 2381 Frame move() 2382 in { assert(m_initialized); } 2383 body 2384 { 2385 auto m = Frame(); 2386 moveTo(m); 2387 return m; 2388 } 2389 2390 /// ditto 2391 void moveTo(ref Frame dest) 2392 in { assert(m_initialized); } 2393 body 2394 { 2395 if (trusted!zmq_msg_move(&dest.m_msg, &m_msg) != 0) { 2396 throw new ZmqException; 2397 } 2398 } 2399 2400 /// 2401 unittest 2402 { 2403 import std.string: representation; 2404 auto msg1 = Frame(3); 2405 msg1.data[] = "foo".representation; 2406 auto msg2 = msg1.move(); 2407 assert (msg1.size == 0); 2408 assert (msg2.data.asString() == "foo"); 2409 } 2410 2411 /** 2412 The message frame content _size in bytes. 2413 2414 Corresponds_to: 2415 $(ZMQREF zmq_msg_size()) 2416 */ 2417 @property size_t size() nothrow 2418 in { assert(m_initialized); } 2419 body 2420 { 2421 return trusted!zmq_msg_size(&m_msg); 2422 } 2423 2424 /// 2425 unittest 2426 { 2427 auto msg = Frame(123); 2428 assert(msg.size == 123); 2429 } 2430 2431 /** 2432 Retrieves the message frame content. 2433 2434 Corresponds_to: 2435 $(ZMQREF zmq_msg_data()) 2436 */ 2437 @property ubyte[] data() @trusted nothrow 2438 in { assert(m_initialized); } 2439 body 2440 { 2441 return (cast(ubyte*) zmq_msg_data(&m_msg))[0 .. size]; 2442 } 2443 2444 /// 2445 unittest 2446 { 2447 import std.string: representation; 2448 auto msg = Frame(3); 2449 assert(msg.data.length == 3); 2450 msg.data[] = "foo".representation; // Slice operator -> array copy. 2451 assert(msg.data.asString() == "foo"); 2452 } 2453 2454 /** 2455 Whether there are _more message frames to retrieve. 2456 2457 Corresponds_to: 2458 $(ZMQREF zmq_msg_more()) 2459 */ 2460 @property bool more() nothrow 2461 in { assert(m_initialized); } 2462 body 2463 { 2464 return !!trusted!zmq_msg_more(&m_msg); 2465 } 2466 2467 /** 2468 The file descriptor of the socket the message was read from. 2469 2470 Throws: 2471 $(REF ZmqException) if $(ZMQ) reports an error. 2472 Corresponds_to: 2473 $(ZMQREF zmq_msg_get()) with $(D ZMQ_SRCFD). 2474 */ 2475 @property FD sourceFD() 2476 { 2477 return cast(FD) getProperty(ZMQ_SRCFD); 2478 } 2479 2480 /** 2481 Whether the message MAY share underlying storage with another copy. 2482 2483 Throws: 2484 $(REF ZmqException) if $(ZMQ) reports an error. 2485 Corresponds_to: 2486 $(ZMQREF zmq_msg_get()) with $(D ZMQ_SHARED). 2487 */ 2488 @property bool sharedStorage() 2489 { 2490 return !!getProperty(ZMQ_SHARED); 2491 } 2492 2493 unittest 2494 { 2495 import std.string: representation; 2496 auto msg1 = Frame(100); // Big message to avoid small-string optimisation 2497 msg1.data[0 .. 3] = "foo".representation; 2498 assert (!msg1.sharedStorage); 2499 auto msg2 = msg1.copy(); 2500 assert (msg2.sharedStorage); // Warning: The ZMQ docs only say that this MAY be true 2501 assert (msg2.data[0 .. 3].asString() == "foo"); 2502 } 2503 2504 /** 2505 Gets message _metadata. 2506 2507 $(D metadataUnsafe()) is faster than $(D metadata()) because it 2508 directly returns the array which comes from $(ZMQREF zmq_msg_gets()), 2509 whereas the latter returns a freshly GC-allocated copy of it. However, 2510 the array returned by $(D metadataUnsafe()) is owned by the underlying 2511 $(ZMQ) message and gets destroyed along with it, so care must be 2512 taken when using this function. 2513 2514 Throws: 2515 $(REF ZmqException) if $(ZMQ) reports an error. 2516 Corresponds_to: 2517 $(ZMQREF zmq_msg_gets()) 2518 */ 2519 char[] metadata(const char[] property) @trusted 2520 in { assert(m_initialized); } 2521 body 2522 { 2523 return metadataUnsafe(property).dup; 2524 } 2525 2526 /// ditto 2527 const(char)[] metadataUnsafe(const char[] property) @system 2528 in { assert(m_initialized); } 2529 body 2530 { 2531 if (auto value = zmq_msg_gets(handle, zeroTermString(property))) { 2532 import std.string: fromStringz; 2533 return fromStringz(value); 2534 } else { 2535 throw new ZmqException(); 2536 } 2537 } 2538 2539 /** 2540 A pointer to the underlying $(D zmq_msg_t). 2541 */ 2542 @property inout(zmq_msg_t)* handle() inout pure nothrow 2543 { 2544 return &m_msg; 2545 } 2546 2547 private: 2548 private void init() 2549 in { assert (!m_initialized); } 2550 out { assert (m_initialized); } 2551 body 2552 { 2553 if (trusted!zmq_msg_init(&m_msg) != 0) { 2554 throw new ZmqException; 2555 } 2556 m_initialized = true; 2557 } 2558 2559 private void init(size_t size) 2560 in { assert (!m_initialized); } 2561 out { assert (m_initialized); } 2562 body 2563 { 2564 if (trusted!zmq_msg_init_size(&m_msg, size) != 0) { 2565 throw new ZmqException; 2566 } 2567 m_initialized = true; 2568 } 2569 2570 private void init(ubyte[] data) @system 2571 in { assert (!m_initialized); } 2572 out { assert (m_initialized); } 2573 body 2574 { 2575 import core.memory; 2576 static extern(C) void zmqd_Frame_init_gcFree(void* dataPtr, void* block) nothrow 2577 { 2578 GC.removeRoot(dataPtr); 2579 GC.clrAttr(block, GC.BlkAttr.NO_MOVE); 2580 } 2581 2582 GC.addRoot(data.ptr); 2583 scope(failure) GC.removeRoot(data.ptr); 2584 2585 auto block = GC.addrOf(data.ptr); 2586 immutable movable = block && !(GC.getAttr(block) & GC.BlkAttr.NO_MOVE); 2587 GC.setAttr(block, GC.BlkAttr.NO_MOVE); 2588 scope(failure) if (movable) GC.clrAttr(block, GC.BlkAttr.NO_MOVE); 2589 2590 init(data, &zmqd_Frame_init_gcFree, block); 2591 } 2592 2593 void init(ubyte[] data, FreeData free, void* hint) @system 2594 in { assert (!m_initialized); } 2595 out { assert (m_initialized); } 2596 body 2597 { 2598 if (zmq_msg_init_data(&m_msg, data.ptr, data.length, free, hint) != 0) { 2599 throw new ZmqException; 2600 } 2601 m_initialized = true; 2602 } 2603 2604 int getProperty(int property) @trusted 2605 { 2606 const value = zmq_msg_get(&m_msg, property); 2607 if (value == -1) { 2608 throw new ZmqException; 2609 } 2610 return value; 2611 } 2612 2613 bool m_initialized; 2614 zmq_msg_t m_msg; 2615 } 2616 2617 unittest 2618 { 2619 const url = uniqueUrl("inproc"); 2620 auto s1 = Socket(SocketType.pair); 2621 auto s2 = Socket(SocketType.pair); 2622 s1.bind(url); 2623 s2.connect(url); 2624 2625 auto m1a = Frame(123); 2626 m1a.data[] = 'a'; 2627 s1.send(m1a); 2628 auto m2a = Frame(); 2629 s2.receive(m2a); 2630 assert(m2a.size == 123); 2631 foreach (e; m2a.data) assert(e == 'a'); 2632 2633 auto m1b = Frame(10); 2634 m1b.data[] = 'b'; 2635 s1.send(m1b); 2636 auto m2b = Frame(); 2637 s2.receive(m2b); 2638 assert(m2b.size == 10); 2639 foreach (e; m2b.data) assert(e == 'b'); 2640 } 2641 2642 deprecated("zmqd.Message has been renamed to zmqd.Frame") alias Message = Frame; 2643 2644 2645 /** 2646 A global context which is used by default by all sockets, unless they are 2647 explicitly constructed with a different context. 2648 2649 The $(ZMQ) Guide $(LINK2 http://zguide.zeromq.org/page:all#Getting-the-Context-Right, 2650 has the following to say) about context creation: 2651 $(QUOTE 2652 You should create and use exactly one context in your process. 2653 [$(LDOTS)] If at runtime a process has two contexts, these are 2654 like separate $(ZMQ) instances. If that's explicitly what you 2655 want, OK, but otherwise remember: $(EM Do one $(D zmq_ctx_new()) 2656 at the start of your main line code, and one $(D zmq_ctx_destroy()) 2657 at the end.) 2658 ) 2659 By using $(D defaultContext()), this is exactly what you achieve. The 2660 context is created the first time the function is called, and is 2661 automatically destroyed when the program ends. 2662 2663 This function is thread safe. 2664 2665 Throws: 2666 $(REF ZmqException) if $(ZMQ) reports an error. 2667 See_also: 2668 $(REF Context) 2669 */ 2670 Context defaultContext() @trusted 2671 { 2672 // For future reference: This is the low-lock singleton pattern. See: 2673 // http://davesdprogramming.wordpress.com/2013/05/06/low-lock-singletons/ 2674 static bool instantiated; 2675 __gshared Context ctx; 2676 if (!instantiated) { 2677 synchronized { 2678 if (!ctx.initialized) { 2679 ctx = Context(); 2680 } 2681 instantiated = true; 2682 } 2683 } 2684 return ctx; 2685 } 2686 2687 @system unittest 2688 { 2689 import core.thread; 2690 Context c1, c2; 2691 auto t = new Thread(() { c1 = defaultContext(); }); 2692 t.start(); 2693 c2 = defaultContext(); 2694 t.join(); 2695 assert(c1.handle !is null); 2696 assert(c1.handle == c2.handle); 2697 } 2698 2699 2700 /** 2701 An object that encapsulates a $(ZMQ) context. 2702 2703 In most programs, it is not necessary to use this type directly, 2704 as $(REF Socket) will use a default global context if not explicitly 2705 provided with one. See $(FREF defaultContext) for details. 2706 2707 A default-initialized $(D Context) is not a valid $(ZMQ) context; it 2708 must always be explicitly initialized with $(FREF _Context.opCall): 2709 --- 2710 Context ctx; // Not a valid context yet 2711 ctx = Context(); // ...but now it is. 2712 --- 2713 $(D Context) objects can be passed around by value, and two copies will 2714 refer to the same context. The underlying context is managed using 2715 reference counting, so that when the last copy of a $(D Context) goes 2716 out of scope, the context is automatically destroyed. The reference 2717 counting is performed in a thread safe manner, so that the same context 2718 can be shared between multiple threads. ($(ZMQ) guarantees the thread 2719 safety of other context operations.) 2720 2721 See_also: 2722 $(FREF defaultContext) 2723 */ 2724 struct Context 2725 { 2726 @safe: 2727 /** 2728 Creates a new $(ZMQ) context. 2729 2730 Returns: 2731 A $(REF Context) object that encapsulates the new context. 2732 Throws: 2733 $(REF ZmqException) if $(ZMQ) reports an error. 2734 Corresponds_to: 2735 $(ZMQREF zmq_ctx_new()) 2736 */ 2737 static Context opCall() @trusted // because of the cast 2738 { 2739 if (auto c = trusted!zmq_ctx_new()) { 2740 Context ctx; 2741 // Casting from/to shared is OK since ZMQ contexts are thread safe. 2742 static Exception release(shared(void)* ptr) @trusted nothrow 2743 { 2744 return zmq_ctx_term(cast(void*) ptr) == 0 2745 ? null 2746 : new ZmqException; 2747 } 2748 ctx.m_resource = SharedResource(cast(shared) c, &release); 2749 return ctx; 2750 } else { 2751 throw new ZmqException; 2752 } 2753 } 2754 2755 /// 2756 unittest 2757 { 2758 auto ctx = Context(); 2759 assert (ctx.initialized); 2760 } 2761 2762 /** 2763 Detaches from the $(ZMQ) context. 2764 2765 If this is the last reference to the context, it will be destroyed with 2766 $(ZMQREF zmq_ctx_term()). 2767 2768 Throws: 2769 $(REF ZmqException) if $(ZMQ) reports an error. 2770 */ 2771 void detach() 2772 { 2773 m_resource.detach(); 2774 } 2775 2776 /// 2777 unittest 2778 { 2779 auto ctx = Context(); 2780 assert (ctx.initialized); 2781 ctx.detach(); 2782 assert (!ctx.initialized); 2783 } 2784 2785 /** 2786 Forcefully terminates the context. 2787 2788 By using this function, one effectively circumvents the reference-counting 2789 mechanism for managing the context. After it returns, all other 2790 $(D Context) objects that used to refer to the same context will be in a 2791 state which is functionally equivalent to the default-initialized state 2792 (i.e., $(REF Context.initialized) is $(D false) and $(REF Context.handle) 2793 is $(D null)). 2794 2795 Throws: 2796 $(REF ZmqException) if $(ZMQ) reports an error. 2797 Corresponds_to: 2798 $(ZMQREF zmq_ctx_term()) 2799 */ 2800 void terminate() @system 2801 { 2802 m_resource.forceRelease(); 2803 } 2804 2805 /// 2806 @system unittest 2807 { 2808 auto ctx1 = Context(); 2809 auto ctx2 = ctx1; 2810 assert (ctx1.initialized); 2811 assert (ctx2.initialized); 2812 assert (ctx1.handle == ctx2.handle); 2813 ctx2.terminate(); 2814 assert (!ctx1.initialized); 2815 assert (!ctx2.initialized); 2816 assert (ctx1.handle == null); 2817 assert (ctx2.handle == null); 2818 } 2819 2820 @system unittest 2821 { 2822 auto ctx = Context(); 2823 2824 void threadFunc() 2825 { 2826 auto server = Socket(ctx, SocketType.rep); 2827 server.bind("inproc://Context_terminate_unittest"); 2828 try { 2829 server.receive(null); 2830 server.send(""); 2831 server.receive(null); 2832 assert (false, "Never get here"); 2833 } catch (ZmqException e) { 2834 assert (e.errno == ETERM); 2835 } 2836 } 2837 2838 import core.thread: Thread; 2839 auto thread = new Thread(&threadFunc); 2840 thread.start(); 2841 2842 auto client = Socket(ctx, SocketType.req); 2843 client.connect("inproc://Context_terminate_unittest"); 2844 client.send(""); 2845 client.receive(null); 2846 client.close(); 2847 2848 ctx.terminate(); 2849 thread.join(); 2850 } 2851 2852 2853 /** 2854 The number of I/O threads. 2855 2856 Throws: 2857 $(REF ZmqException) if $(ZMQ) reports an error. 2858 Corresponds_to: 2859 $(ZMQREF zmq_ctx_get()) and $(ZMQREF zmq_ctx_set()) with 2860 $(D ZMQ_IO_THREADS). 2861 */ 2862 @property int ioThreads() 2863 { 2864 return getOption(ZMQ_IO_THREADS); 2865 } 2866 2867 /// ditto 2868 @property void ioThreads(int value) 2869 { 2870 setOption(ZMQ_IO_THREADS, value); 2871 } 2872 2873 /// 2874 unittest 2875 { 2876 auto ctx = Context(); 2877 ctx.ioThreads = 3; 2878 assert (ctx.ioThreads == 3); 2879 } 2880 2881 /** 2882 The maximum number of sockets. 2883 2884 Throws: 2885 $(REF ZmqException) if $(ZMQ) reports an error. 2886 Corresponds_to: 2887 $(ZMQREF zmq_ctx_get()) and $(ZMQREF zmq_ctx_set()) with 2888 $(D ZMQ_MAX_SOCKETS). 2889 */ 2890 @property int maxSockets() 2891 { 2892 return getOption(ZMQ_MAX_SOCKETS); 2893 } 2894 2895 /// ditto 2896 @property void maxSockets(int value) 2897 { 2898 setOption(ZMQ_MAX_SOCKETS, value); 2899 } 2900 2901 /// 2902 unittest 2903 { 2904 auto ctx = Context(); 2905 ctx.maxSockets = 512; 2906 assert (ctx.maxSockets == 512); 2907 } 2908 2909 /** 2910 The largest configurable number of sockets. 2911 2912 Throws: 2913 $(REF ZmqException) if $(ZMQ) reports an error. 2914 Corresponds_to: 2915 $(ZMQREF zmq_ctx_get()) with $(D ZMQ_SOCKET_LIMIT). 2916 */ 2917 @property int socketLimit() 2918 { 2919 return getOption(ZMQ_SOCKET_LIMIT); 2920 } 2921 2922 /// 2923 unittest 2924 { 2925 auto ctx = Context(); 2926 assert (ctx.socketLimit > 0); 2927 } 2928 2929 /** 2930 IPv6 option. 2931 2932 Throws: 2933 $(REF ZmqException) if $(ZMQ) reports an error. 2934 Corresponds_to: 2935 $(ZMQREF zmq_ctx_get()) and $(ZMQREF zmq_ctx_set()) with 2936 $(D ZMQ_IPV6). 2937 */ 2938 @property bool ipv6() 2939 { 2940 return !!getOption(ZMQ_IPV6); 2941 } 2942 2943 /// ditto 2944 @property void ipv6(bool value) 2945 { 2946 setOption(ZMQ_IPV6, value ? 1 : 0); 2947 } 2948 2949 /// 2950 unittest 2951 { 2952 auto ctx = Context(); 2953 ctx.ipv6 = true; 2954 assert (ctx.ipv6 == true); 2955 } 2956 2957 /** 2958 The $(D void*) pointer used by the underlying C API to refer to the context. 2959 2960 If the object has not been initialized, this function returns $(D null). 2961 */ 2962 @property inout(void)* handle() inout @trusted pure nothrow 2963 { 2964 // ZMQ contexts are thread safe, so casting away shared is OK. 2965 return cast(typeof(return)) m_resource.handle; 2966 } 2967 2968 /** 2969 Whether this $(REF Context) object has been _initialized, i.e. whether it 2970 refers to a valid $(ZMQ) context. 2971 */ 2972 @property bool initialized() const pure nothrow 2973 { 2974 return m_resource.handle != null; 2975 } 2976 2977 /// 2978 unittest 2979 { 2980 Context ctx; 2981 assert (!ctx.initialized); 2982 ctx = Context(); 2983 assert (ctx.initialized); 2984 ctx.detach(); 2985 assert (!ctx.initialized); 2986 } 2987 2988 private: 2989 int getOption(int option) 2990 { 2991 immutable value = trusted!zmq_ctx_get(this.handle, option); 2992 if (value < 0) { 2993 throw new ZmqException; 2994 } 2995 return value; 2996 } 2997 2998 void setOption(int option, int value) 2999 { 3000 if (trusted!zmq_ctx_set(this.handle, option, value) != 0) { 3001 throw new ZmqException; 3002 } 3003 } 3004 3005 SharedResource m_resource; 3006 } 3007 3008 3009 /** 3010 Socket event types. 3011 3012 These are used together with $(FREF Socket.monitor), and are described 3013 in the $(ZMQREF zmq_socket_monitor()) reference. 3014 */ 3015 enum EventType 3016 { 3017 connected = ZMQ_EVENT_CONNECTED, /// Corresponds to $(D ZMQ_EVENT_CONNECTED). 3018 connectDelayed = ZMQ_EVENT_CONNECT_DELAYED,/// Corresponds to $(D ZMQ_EVENT_CONNECT_DELAYED). 3019 connectRetried = ZMQ_EVENT_CONNECT_RETRIED,/// Corresponds to $(D ZMQ_EVENT_CONNECT_RETRIED). 3020 listening = ZMQ_EVENT_LISTENING, /// Corresponds to $(D ZMQ_EVENT_LISTENING). 3021 bindFailed = ZMQ_EVENT_BIND_FAILED, /// Corresponds to $(D ZMQ_EVENT_BIND_FAILED). 3022 accepted = ZMQ_EVENT_ACCEPTED, /// Corresponds to $(D ZMQ_EVENT_ACCEPTED). 3023 acceptFailed = ZMQ_EVENT_ACCEPT_FAILED, /// Corresponds to $(D ZMQ_EVENT_ACCEPT_FAILED). 3024 closed = ZMQ_EVENT_CLOSED, /// Corresponds to $(D ZMQ_EVENT_CLOSED). 3025 closeFailed = ZMQ_EVENT_CLOSE_FAILED, /// Corresponds to $(D ZMQ_EVENT_CLOSE_FAILED). 3026 disconnected = ZMQ_EVENT_DISCONNECTED, /// Corresponds to $(D ZMQ_EVENT_DISCONNECTED). 3027 monitorStopped = ZMQ_EVENT_MONITOR_STOPPED,/// Corresponds to $(D ZMQ_EVENT_MONITOR_STOPPED). 3028 all = ZMQ_EVENT_ALL, /// Corresponds to $(D ZMQ_EVENT_ALL). 3029 handshakeFailedNoDetail = ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL, /// Corresponds to $(D ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL). 3030 handshakeSucceeded = ZMQ_EVENT_HANDSHAKE_SUCCEEDED, /// Corresponds to $(D ZMQ_EVENT_HANDSHAKE_SUCCEEDED). 3031 handshakeFailedProtocol = ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL, /// Corresponds to $(D ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL). 3032 handshakeFailedAuth = ZMQ_EVENT_HANDSHAKE_FAILED_AUTH, /// Corresponds to $(D ZMQ_EVENT_HANDSHAKE_FAILED_AUTH). 3033 } 3034 3035 3036 /** 3037 Protocol error codes. 3038 3039 These may be returned from $(FREF Event.protocolError), and are described 3040 in the $(ZMQREF zmq_socket_monitor()) reference. 3041 */ 3042 enum ProtocolError 3043 { 3044 zmtpUnspecified = ZMQ_PROTOCOL_ERROR_ZMTP_UNSPECIFIED, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_UNSPECIFIED). 3045 zmtpUnexpectedCommand = ZMQ_PROTOCOL_ERROR_ZMTP_UNEXPECTED_COMMAND, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_UNEXPECTED_COMMAND). 3046 zmtpInvalidSequence = ZMQ_PROTOCOL_ERROR_ZMTP_INVALID_SEQUENCE, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_INVALID_SEQUENCE). 3047 zmtpKeyExchange = ZMQ_PROTOCOL_ERROR_ZMTP_KEY_EXCHANGE, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_KEY_EXCHANGE). 3048 zmtpMalformedCommandUnspecified = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_UNSPECIFIED,/// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_UNSPECIFIED). 3049 zmtpMalformedCommandMessage = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_MESSAGE, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_MESSAGE). 3050 zmtpMalformedCommandHello = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_HELLO, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_HELLO). 3051 zmtpMalformedCommandInitiate = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_INITIATE, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_INITIATE). 3052 zmtpMalformedCommandError = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_ERROR, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_ERROR). 3053 zmtpMalformedCommandReady = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_READY, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_READY). 3054 zmtpMalformedCommandWelcome = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_WELCOME, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_WELCOME). 3055 zmtpInvalidMetadata = ZMQ_PROTOCOL_ERROR_ZMTP_INVALID_METADATA, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_INVALID_METADATA). 3056 zmtpCryptographic = ZMQ_PROTOCOL_ERROR_ZMTP_CRYPTOGRAPHIC, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_CRYPTOGRAPHIC). 3057 zmtpMechanismMismatch = ZMQ_PROTOCOL_ERROR_ZMTP_MECHANISM_MISMATCH, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MECHANISM_MISMATCH). 3058 zapUnspecified = ZMQ_PROTOCOL_ERROR_ZAP_UNSPECIFIED, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZAP_UNSPECIFIED). 3059 zapMalformedReply = ZMQ_PROTOCOL_ERROR_ZAP_MALFORMED_REPLY, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZAP_MALFORMED_REPLY). 3060 zapBadRequestID = ZMQ_PROTOCOL_ERROR_ZAP_BAD_REQUEST_ID, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZAP_BAD_REQUEST_ID). 3061 zapBadVersion = ZMQ_PROTOCOL_ERROR_ZAP_BAD_VERSION, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZAP_BAD_VERSION). 3062 zapInvalidStatusCode = ZMQ_PROTOCOL_ERROR_ZAP_INVALID_STATUS_CODE, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZAP_INVALID_STATUS_CODE). 3063 zapInvalidMetadata = ZMQ_PROTOCOL_ERROR_ZAP_INVALID_METADATA, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZAP_INVALID_METADATA). 3064 } 3065 3066 3067 /** 3068 Receives a message on the given _socket and interprets it as a _socket 3069 state change event. 3070 3071 $(D socket) must be a PAIR _socket which is connected to an endpoint 3072 created via a $(FREF Socket.monitor) call. $(D receiveEvent()) receives 3073 one message on the _socket, parses its contents according to the 3074 specification in the $(ZMQREF zmq_socket_monitor()) reference, 3075 and returns the event information as an $(REF Event) object. 3076 3077 Throws: 3078 $(REF ZmqException) if $(ZMQ) reports an error.$(BR) 3079 $(REF InvalidEventException) if the received message could not 3080 be interpreted as an event message. 3081 See_also: 3082 $(FREF Socket.monitor), for monitoring _socket state changes. 3083 */ 3084 Event receiveEvent(ref Socket socket) @system 3085 { 3086 enum eventFrameSize = ushort.sizeof + int.sizeof; 3087 auto eventFrame = Frame(); 3088 if (socket.receive(eventFrame) != eventFrameSize) { 3089 throw new InvalidEventException; 3090 } 3091 auto addrFrame = Frame(); 3092 socket.receive(addrFrame); 3093 try { 3094 import std.conv: to; 3095 immutable event = to!EventType(*(cast(const(ushort)*) eventFrame.data.ptr)); 3096 immutable value = *(cast(const(int)*) eventFrame.data.ptr + ushort.sizeof); 3097 immutable addr = (cast(char[]) addrFrame.data).idup; 3098 return Event(event, addr, value); 3099 } catch (Exception e) { 3100 // Any exception thrown within the try block signifies that there 3101 // is something wrong with the event message. 3102 throw new InvalidEventException; 3103 } 3104 } 3105 3106 // Socket monitoring only works for connection-oriented sockets, and 3107 // IPC is not supported on Windows. 3108 version (Posix) @system unittest 3109 { 3110 Event[] events; 3111 void eventCollector() 3112 { 3113 auto coll = Socket(SocketType.pair); 3114 coll.connect("inproc://zmqd_receiveEvent_unittest_monitor"); 3115 do { 3116 events ~= receiveEvent(coll); 3117 } while (events[$-1].type != EventType.closed); 3118 } 3119 import core.thread; 3120 auto collector = new Thread(&eventCollector); 3121 collector.start(); 3122 3123 static void eventGenerator() 3124 { 3125 auto sck1 = Socket(SocketType.pair); 3126 sck1.monitor("inproc://zmqd_receiveEvent_unittest_monitor"); 3127 sck1.bind("ipc://zmqd_receiveEvent_unittest"); 3128 import core.time; 3129 Thread.sleep(100.msecs); 3130 auto sck2 = Socket(SocketType.pair); 3131 sck2.connect("ipc://zmqd_receiveEvent_unittest"); 3132 Thread.sleep(100.msecs); 3133 sck2.disconnect("ipc://zmqd_receiveEvent_unittest"); 3134 Thread.sleep(100.msecs); 3135 sck1.unbind("ipc://zmqd_receiveEvent_unittest"); 3136 } 3137 eventGenerator(); 3138 collector.join(); 3139 assert (events.length == 4); 3140 foreach (ev; events) { 3141 assert (ev.address == "ipc://zmqd_receiveEvent_unittest"); 3142 } 3143 assert (events[0].type == EventType.listening); 3144 assert (events[1].type == EventType.accepted); 3145 assert (events[2].type == EventType.handshakeSucceeded); 3146 assert (events[3].type == EventType.closed); 3147 import std.exception; 3148 assertNotThrown!Error(events[0].fd); 3149 assertThrown!Error(events[0].errno); 3150 assertThrown!Error(events[0].interval); 3151 } 3152 3153 3154 /** 3155 Information about a socket state change. 3156 3157 See_also: 3158 $(FREF receiveEvent) 3159 */ 3160 struct Event 3161 { 3162 /// The event _type. 3163 @property EventType type() const pure nothrow 3164 { 3165 return m_type; 3166 } 3167 3168 /// The peer _address. 3169 @property string address() const pure nothrow 3170 { 3171 return m_address; 3172 } 3173 3174 /** 3175 The socket file descriptor. 3176 3177 This property function may only be called if $(REF Event.type) is one of: 3178 $(D connected), $(D listening), $(D accepted), $(D closed) or $(D disonnected). 3179 3180 Throws: 3181 $(D Error) if the property is called for a wrong event type. 3182 */ 3183 @property FD fd() const pure nothrow 3184 { 3185 final switch (m_type) { 3186 case EventType.connected: 3187 case EventType.listening: 3188 case EventType.accepted: 3189 case EventType.closed: 3190 case EventType.disconnected: 3191 return cast(typeof(return)) m_value; 3192 case EventType.connectDelayed: 3193 case EventType.connectRetried: 3194 case EventType.bindFailed: 3195 case EventType.acceptFailed: 3196 case EventType.closeFailed: 3197 case EventType.monitorStopped: 3198 case EventType.handshakeFailedNoDetail: 3199 case EventType.handshakeSucceeded: 3200 case EventType.handshakeFailedProtocol: 3201 case EventType.handshakeFailedAuth: 3202 throw invalidProperty(); 3203 case EventType.all: 3204 assert (false); 3205 } 3206 } 3207 3208 /** 3209 The $(D _errno) code for the error which triggered the event. 3210 3211 This property function may only be called if $(REF Event.type) is either 3212 $(D bindFailed), $(D acceptFailed), $(D closeFailed) or 3213 $(D handshakeFailedNoDetail). 3214 3215 Throws: 3216 $(D Error) if the property is called for a wrong event type. 3217 */ 3218 @property int errno() const pure nothrow 3219 { 3220 final switch (m_type) { 3221 case EventType.bindFailed: 3222 case EventType.acceptFailed: 3223 case EventType.closeFailed: 3224 case EventType.handshakeFailedNoDetail: 3225 return m_value; 3226 case EventType.connected: 3227 case EventType.connectDelayed: 3228 case EventType.connectRetried: 3229 case EventType.listening: 3230 case EventType.accepted: 3231 case EventType.closed: 3232 case EventType.disconnected: 3233 case EventType.monitorStopped: 3234 case EventType.handshakeSucceeded: 3235 case EventType.handshakeFailedProtocol: 3236 case EventType.handshakeFailedAuth: 3237 throw invalidProperty(); 3238 case EventType.all: 3239 assert (false); 3240 } 3241 } 3242 3243 /** 3244 The reconnect interval. 3245 3246 This property function may only be called if $(REF Event.type) is 3247 $(D connectRetried). 3248 3249 Throws: 3250 $(D Error) if the property is called for a wrong event type. 3251 */ 3252 @property Duration interval() const pure nothrow 3253 { 3254 final switch (m_type) { 3255 case EventType.connectRetried: 3256 return m_value.msecs; 3257 case EventType.connected: 3258 case EventType.connectDelayed: 3259 case EventType.listening: 3260 case EventType.bindFailed: 3261 case EventType.accepted: 3262 case EventType.acceptFailed: 3263 case EventType.closed: 3264 case EventType.closeFailed: 3265 case EventType.disconnected: 3266 case EventType.monitorStopped: 3267 case EventType.handshakeFailedNoDetail: 3268 case EventType.handshakeSucceeded: 3269 case EventType.handshakeFailedProtocol: 3270 case EventType.handshakeFailedAuth: 3271 throw invalidProperty(); 3272 case EventType.all: 3273 assert (false); 3274 } 3275 } 3276 3277 /** 3278 The protocol error code. 3279 3280 This property function may only be called if $(REF Event.type) is 3281 $(D handshakeFailedProtocol). 3282 3283 Throws: 3284 $(D Error) if the property is called for a wrong event type. 3285 */ 3286 @property ProtocolError protocolError() const pure 3287 { 3288 final switch (m_type) { 3289 case EventType.handshakeFailedProtocol: 3290 import std.conv: to; 3291 return to!ProtocolError(m_value); 3292 case EventType.connectRetried: 3293 case EventType.connected: 3294 case EventType.connectDelayed: 3295 case EventType.listening: 3296 case EventType.bindFailed: 3297 case EventType.accepted: 3298 case EventType.acceptFailed: 3299 case EventType.closed: 3300 case EventType.closeFailed: 3301 case EventType.disconnected: 3302 case EventType.monitorStopped: 3303 case EventType.handshakeFailedNoDetail: 3304 case EventType.handshakeSucceeded: 3305 case EventType.handshakeFailedAuth: 3306 throw invalidProperty(); 3307 case EventType.all: 3308 assert (false); 3309 } 3310 } 3311 3312 /** 3313 The status code returned by the ZAP handler. 3314 3315 This property function may only be called if $(REF Event.type) is 3316 $(D handshakeFailedAuth). 3317 3318 Throws: 3319 $(D Error) if the property is called for a wrong event type. 3320 */ 3321 @property int statusCode() const pure 3322 { 3323 final switch (m_type) { 3324 case EventType.handshakeFailedAuth: 3325 return m_value; 3326 case EventType.connectRetried: 3327 case EventType.connected: 3328 case EventType.connectDelayed: 3329 case EventType.listening: 3330 case EventType.bindFailed: 3331 case EventType.accepted: 3332 case EventType.acceptFailed: 3333 case EventType.closed: 3334 case EventType.closeFailed: 3335 case EventType.disconnected: 3336 case EventType.monitorStopped: 3337 case EventType.handshakeFailedNoDetail: 3338 case EventType.handshakeSucceeded: 3339 case EventType.handshakeFailedProtocol: 3340 throw invalidProperty(); 3341 case EventType.all: 3342 assert (false); 3343 } 3344 } 3345 3346 private: 3347 this(EventType type, string address, int value) pure nothrow 3348 { 3349 m_type = type; 3350 m_address = address; 3351 m_value = value; 3352 } 3353 3354 Error invalidProperty(string name = __FUNCTION__)() const pure nothrow 3355 { 3356 try { 3357 import std.conv: text; 3358 return new Error(text("Property '", name, 3359 "' not available for event type '", 3360 m_type, "'")); 3361 } catch (Exception e) { 3362 assert(false); 3363 } 3364 } 3365 3366 EventType m_type; 3367 string m_address; 3368 int m_value; 3369 } 3370 3371 3372 /** 3373 Encodes a binary key as Z85 printable text. 3374 3375 $(D dest) must be an array whose length is at least $(D 5*data.length/4 + 1), 3376 which will be used to store the return value plus a terminating zero byte. 3377 If $(D dest) is omitted, a new array will be created. 3378 3379 Returns: 3380 An array of size $(D 5*data.length/4) which contains the Z85-encoded text, 3381 excluding the terminating zero byte. This will be a slice of $(D dest) if 3382 it is provided. 3383 Throws: 3384 $(COREF exception,RangeError) if $(D dest) is given but is too small.$(BR) 3385 $(REF ZmqException) if $(ZMQ) reports an error (i.e., if $(D data.length) 3386 is not a multiple of 4). 3387 Corresponds_to: 3388 $(ZMQREF zmq_z85_encode()) 3389 */ 3390 char[] z85Encode(ubyte[] data, char[] dest) 3391 // TODO: Make data const when we update to ZMQ 4.1 3392 { 3393 import core.exception: RangeError; 3394 immutable len = 5 * data.length / 4; 3395 if (dest.length < len+1) throw new RangeError; 3396 if (trusted!zmq_z85_encode(ptr(dest), ptr(data), data.length) == null) { 3397 throw new ZmqException; 3398 } 3399 return dest[0 .. len]; 3400 } 3401 3402 /// ditto 3403 char[] z85Encode(ubyte[] data) 3404 { 3405 return z85Encode(data, new char[5*data.length/4 + 1]); 3406 } 3407 3408 debug (WithCurveTests) @system unittest // @system because of assertThrown 3409 { 3410 // TODO: Make data immutable when we update to ZMQ 4.1 3411 auto data = cast(ubyte[])[0x86, 0x4f, 0xd2, 0x6f, 0xb5, 0x59, 0xf7, 0x5b]; 3412 immutable encoded = "HelloWorld"; 3413 assert (z85Encode(data) == encoded); 3414 3415 auto buffer = new char[11]; 3416 auto result = z85Encode(data, buffer); 3417 assert (result == encoded); 3418 assert (buffer.ptr == result.ptr); 3419 3420 import core.exception: RangeError; 3421 import std.exception: assertThrown; 3422 assertThrown!RangeError(z85Encode(data, new char[10])); 3423 assertThrown!ZmqException(z85Encode(cast(ubyte[]) [ 1, 2, 3, 4, 5])); 3424 } 3425 3426 3427 /** 3428 Decodes a binary key from Z85 printable _text. 3429 3430 $(D dest) must be an array whose length is at least $(D 4*data.length/5), 3431 which will be used to store the return value. 3432 If $(D dest) is omitted, a new array will be created. 3433 3434 Note that $(ZMQREF zmq_z85_decode()) expects a zero-terminated string, so a zero 3435 byte will be appended to $(D text) if it does not contain one already. However, 3436 this may trigger a (possibly unwanted) GC allocation. To avoid this, either 3437 make sure that the last character in $(D text) is $(D '\0'), or use 3438 $(OBJREF assumeSafeAppend) on the array before calling this function (provided 3439 this is safe). 3440 3441 Returns: 3442 An array of size $(D 4*data.length/5) which contains the decoded data. 3443 This will be a slice of $(D dest) if it is provided. 3444 Throws: 3445 $(COREF exception,RangeError) if $(D dest) is given but is too small.$(BR) 3446 $(REF ZmqException) if $(ZMQ) reports an error (i.e., if $(D data.length) 3447 is not a multiple of 5). 3448 Corresponds_to: 3449 $(ZMQREF zmq_z85_decode()) 3450 */ 3451 ubyte[] z85Decode(char[] text, ubyte[] dest) 3452 // TODO: Make text const when we update to ZMQ 4.1 3453 { 3454 import core.exception: RangeError; 3455 immutable len = 4 * text.length/5; 3456 if (dest.length < len) throw new RangeError; 3457 if (text[$-1] != '\0') text ~= '\0'; 3458 if (trusted!zmq_z85_decode(ptr(dest), ptr(text)) == null) { 3459 throw new ZmqException; 3460 } 3461 return dest[0 .. len]; 3462 } 3463 3464 /// ditto 3465 ubyte[] z85Decode(char[] text) 3466 { 3467 return z85Decode(text, new ubyte[4*text.length/5]); 3468 } 3469 3470 debug (WithCurveTests) @system unittest // @system because of assertThrown 3471 { 3472 // TODO: Make data immutable when we update to ZMQ 4.1 3473 auto text = "HelloWorld".dup; 3474 immutable decoded = cast(ubyte[])[0x86, 0x4f, 0xd2, 0x6f, 0xb5, 0x59, 0xf7, 0x5b]; 3475 assert (z85Decode(text) == decoded); 3476 assert (z85Decode(text~'\0') == decoded); 3477 3478 auto buffer = new ubyte[8]; 3479 auto result = z85Decode(text, buffer); 3480 assert (result == decoded); 3481 assert (buffer.ptr == result.ptr); 3482 3483 import core.exception: RangeError; 3484 import std.exception: assertThrown; 3485 assertThrown!RangeError(z85Decode(text, new ubyte[7])); 3486 assertThrown!ZmqException(z85Decode("SizeNotAMultipleOf5".dup)); 3487 } 3488 3489 3490 /** 3491 Generates a new Curve key pair. 3492 3493 To avoid a memory allocation, preallocated buffers may optionally be supplied 3494 for the two keys. Each of these must have a length of at least 41 bytes, enough 3495 for a 40-character Z85-encoded key plus a terminating zero byte. If either 3496 buffer is omitted/$(D null), a new one will be created. 3497 3498 Returns: 3499 A tuple that contains the two keys. Each of these will have a length of 3500 40 characters, and will be slices of the input buffers if such have been 3501 provided. 3502 Throws: 3503 $(COREF exception,RangeError) if $(D publicKeyBuf) or $(D secretKeyBuf) are 3504 not $(D null) but have a length of less than 41 characters.$(BR) 3505 $(REF ZmqException) if $(ZMQ) reports an error. 3506 Corresponds_to: 3507 $(ZMQREF zmq_curve_keypair()) 3508 */ 3509 Tuple!(char[], "publicKey", char[], "secretKey") 3510 curveKeyPair(char[] publicKeyBuf = null, char[] secretKeyBuf = null) 3511 { 3512 import core.exception: RangeError; 3513 if (publicKeyBuf is null) publicKeyBuf = new char[41]; 3514 else if (publicKeyBuf.length < 41) throw new RangeError; 3515 if (secretKeyBuf is null) secretKeyBuf = new char[41]; 3516 else if (secretKeyBuf.length < 41) throw new RangeError; 3517 3518 if (trusted!zmq_curve_keypair(ptr(publicKeyBuf), ptr(secretKeyBuf)) != 0) { 3519 throw new ZmqException; 3520 } 3521 return typeof(return)(publicKeyBuf[0 .. 40], secretKeyBuf[0 .. 40]); 3522 } 3523 3524 /// 3525 debug (WithCurveTests) unittest 3526 { 3527 auto server = Socket(SocketType.rep); 3528 auto serverKeys = curveKeyPair(); 3529 server.curveServer = true; 3530 server.curveSecretKeyZ85 = serverKeys.secretKey; 3531 server.bind("inproc://curveKeyPair_test"); 3532 3533 auto client = Socket(SocketType.req); 3534 auto clientKeys = curveKeyPair(); 3535 client.curvePublicKeyZ85 = clientKeys.publicKey; 3536 client.curveSecretKeyZ85 = clientKeys.secretKey; 3537 client.curveServerKeyZ85 = serverKeys.publicKey; 3538 client.connect("inproc://curveKeyPair_test"); 3539 client.send("hello"); 3540 3541 ubyte[5] buf; 3542 assert (server.receive(buf) == 5); 3543 assert (buf.asString() == "hello"); 3544 } 3545 3546 debug (WithCurveTests) @system unittest 3547 { 3548 auto k1 = curveKeyPair(); 3549 assert (k1.publicKey.length == 40); 3550 assert (k1.secretKey.length == 40); 3551 3552 char[82] buf; 3553 auto k2 = curveKeyPair(buf[0 .. 41], buf[41 .. 82]); 3554 assert (k2.publicKey.length == 40); 3555 assert (k2.publicKey.ptr == buf.ptr); 3556 assert (k2.secretKey.length == 40); 3557 assert (k2.secretKey.ptr == buf.ptr + 41); 3558 3559 char[82] backup = buf; 3560 import core.exception, std.exception; 3561 assertThrown!RangeError(curveKeyPair(buf[0 .. 40], buf[41 .. 82])); 3562 assertThrown!RangeError(curveKeyPair(buf[0 .. 41], buf[42 .. 82])); 3563 assert (backup[] == buf[]); 3564 } 3565 3566 3567 /** 3568 Utility function which interprets and validates a byte array as a UTF-8 string. 3569 3570 Most of $(ZMQD)'s message API deals in $(D ubyte[]) arrays, but very often, 3571 the message _data contains plain text. $(D asString()) allows for easy and 3572 safe interpretation of raw _data as characters. It checks that $(D data) is 3573 a valid UTF-8 encoded string, and returns a $(D char[]) array that refers to 3574 the same memory region. 3575 3576 Throws: 3577 $(STDREF utf,UTFException) if $(D data) is not a valid UTF-8 string. 3578 See_also: 3579 $(STDREF string,representation), which performs the opposite operation. 3580 */ 3581 inout(char)[] asString(inout(ubyte)[] data) pure 3582 { 3583 auto s = cast(typeof(return)) data; 3584 import std.utf: validate; 3585 validate(s); 3586 return s; 3587 } 3588 3589 /// 3590 unittest 3591 { 3592 auto s1 = Socket(SocketType.pair); 3593 s1.bind("inproc://zmqd_asString_example"); 3594 auto s2 = Socket(SocketType.pair); 3595 s2.connect("inproc://zmqd_asString_example"); 3596 3597 auto msg = Frame(12); 3598 msg.data.asString()[] = "Hello World!"; 3599 s1.send(msg); 3600 3601 ubyte[12] buf; 3602 s2.receive(buf); 3603 assert(buf.asString() == "Hello World!"); 3604 } 3605 3606 unittest 3607 { 3608 auto bytes = cast(ubyte[]) ['f', 'o', 'o']; 3609 auto text = bytes.asString(); 3610 assert (text == "foo"); 3611 assert (cast(void*) ptr(bytes) == cast(void*) ptr(text)); 3612 3613 import std.exception: assertThrown; 3614 import std.utf: UTFException; 3615 auto b = cast(ubyte[]) [100, 252, 1]; 3616 assertThrown!UTFException(asString(b)); 3617 } 3618 3619 3620 /** 3621 A class for exceptions thrown when any of the underlying $(ZMQ) C functions 3622 report an error. 3623 3624 The exception provides a standard error message obtained with 3625 $(ZMQREF zmq_strerror()), as well as the $(D errno) code set by the $(ZMQ) 3626 function which reported the error. 3627 */ 3628 class ZmqException : Exception 3629 { 3630 /** 3631 The $(D _errno) code that was set by the $(ZMQ) function that reported 3632 the error. 3633 3634 Corresponds_to: 3635 $(ZMQREF zmq_errno()) 3636 */ 3637 immutable int errno; 3638 3639 private: 3640 this(string file = __FILE__, int line = __LINE__) nothrow 3641 { 3642 import std.conv; 3643 this.errno = trusted!zmq_errno(); 3644 string msg; 3645 try { 3646 msg = trusted!(to!string)(trusted!zmq_strerror(this.errno)); 3647 } catch (Exception e) { /* We never get here */ } 3648 assert(msg.length); // Still, let's assert as much. 3649 super(msg, file, line); 3650 } 3651 } 3652 3653 3654 /** 3655 Exception thrown by $(FREF receiveEvent) on failure to interpret a 3656 received message as an event description. 3657 */ 3658 class InvalidEventException : Exception 3659 { 3660 private: 3661 this(string file = __FILE__, int line = __LINE__) nothrow 3662 { 3663 super("The received message is not an event message", file, line); 3664 } 3665 } 3666 3667 3668 // ============================================================================= 3669 // Everything below is internal 3670 // ============================================================================= 3671 private: 3672 3673 3674 struct SharedResource 3675 { 3676 @safe: 3677 alias Exception function(shared(void)*) nothrow Release; 3678 3679 this(shared(void)* ptr, Release release) nothrow 3680 in { assert(ptr); } body 3681 { 3682 m_payload = new shared(Payload)(1, ptr, release); 3683 } 3684 3685 this(this) nothrow 3686 { 3687 if (m_payload) { 3688 incRefCount(); 3689 } 3690 } 3691 3692 ~this() nothrow 3693 { 3694 nothrowDetach(); 3695 } 3696 3697 void opAssign(SharedResource rhs) 3698 { 3699 detach(); 3700 m_payload = rhs.m_payload; 3701 rhs.m_payload = null; 3702 } 3703 3704 void detach() 3705 { 3706 if (auto ex = nothrowDetach()) throw ex; 3707 } 3708 3709 void forceRelease() @system 3710 { 3711 if (m_payload) { 3712 scope(exit) m_payload = null; 3713 decRefCount(); 3714 if (m_payload.handle != null) { 3715 scope(exit) m_payload.handle = null; 3716 if (auto ex = m_payload.release(m_payload.handle)) { 3717 throw ex; 3718 } 3719 } 3720 } 3721 } 3722 3723 @property inout(shared(void))* handle() inout pure nothrow 3724 { 3725 if (m_payload) { 3726 return m_payload.handle; 3727 } else { 3728 return null; 3729 } 3730 } 3731 3732 private: 3733 void incRefCount() @trusted nothrow 3734 { 3735 assert (m_payload !is null && m_payload.refCount > 0); 3736 import core.atomic: atomicOp; 3737 atomicOp!"+="(m_payload.refCount, 1); 3738 } 3739 3740 int decRefCount() @trusted nothrow 3741 { 3742 assert (m_payload !is null && m_payload.refCount > 0); 3743 import core.atomic: atomicOp; 3744 return atomicOp!"-="(m_payload.refCount, 1); 3745 } 3746 3747 Exception nothrowDetach() @trusted nothrow 3748 out { assert (m_payload is null); } 3749 body 3750 { 3751 if (m_payload) { 3752 scope(exit) m_payload = null; 3753 if (decRefCount() < 1 && m_payload.handle != null) { 3754 return m_payload.release(m_payload.handle); 3755 } 3756 } 3757 return null; 3758 } 3759 3760 struct Payload 3761 { 3762 int refCount; 3763 void* handle; 3764 Release release; 3765 } 3766 shared(Payload)* m_payload; 3767 3768 invariant() 3769 { 3770 assert (m_payload is null || 3771 (m_payload.refCount > 0 && m_payload.release !is null)); 3772 } 3773 } 3774 3775 @system unittest 3776 { 3777 import std.exception: assertNotThrown, assertThrown; 3778 static Exception myFree(shared(void)* p) @trusted nothrow 3779 { 3780 auto v = cast(shared(int)*) p; 3781 if (*v == 0) { 3782 return new Exception("double release"); 3783 } else { 3784 *v = 0; 3785 return null; 3786 } 3787 } 3788 3789 shared int i = 1; 3790 3791 { 3792 // Test constructor and properties. 3793 auto ra = SharedResource(&i, &myFree); 3794 assert (i == 1); 3795 assert (ra.handle == &i); 3796 3797 // Test postblit constructor 3798 auto rb = ra; 3799 assert (i == 1); 3800 assert (rb.handle == &i); 3801 3802 { 3803 // Test properties and free() with default-initialized object. 3804 SharedResource rc; 3805 assert (rc.handle == null); 3806 assertNotThrown(rc.detach()); 3807 3808 // Test assignment, both with and without detachment 3809 rc = rb; 3810 assert (i == 1); 3811 assert (rc.handle == &i); 3812 3813 shared int j = 2; 3814 auto rd = SharedResource(&j, &myFree); 3815 assert (rd.handle == &j); 3816 rd = rb; 3817 assert (j == 0); 3818 assert (i == 1); 3819 assert (rd.handle == &i); 3820 3821 // Test explicit detach() 3822 shared int k = 3; 3823 auto re = SharedResource(&k, &myFree); 3824 assertNotThrown(re.detach()); 3825 assert(k == 0); 3826 3827 // Test failure to free and assign (myFree(&k) fails when k == 0) 3828 re = SharedResource(&k, &myFree); 3829 assertThrown!Exception(re.detach()); // We defined free(k == 0) as an error 3830 re = SharedResource(&k, &myFree); 3831 assertThrown!Exception(re = rb); 3832 } 3833 3834 // i should not be "freed" yet 3835 assert (i == 1); 3836 assert (ra.handle == &i); 3837 assert (rb.handle == &i); 3838 } 3839 // ...but now it should. 3840 assert (i == 0); 3841 } 3842 3843 // Thread safety test 3844 @system unittest 3845 { 3846 enum threadCount = 100; 3847 enum copyCount = 1000; 3848 static Exception myFree(shared(void)* p) @trusted nothrow 3849 { 3850 auto v = cast(shared(int)*) p; 3851 if (*v == 0) { 3852 return new Exception("double release"); 3853 } else { 3854 *v = 0; 3855 return null; 3856 } 3857 } 3858 shared int raw = 1; 3859 { 3860 auto rs = SharedResource(&raw, &myFree); 3861 3862 import core.thread; 3863 auto group = new ThreadGroup; 3864 foreach (i; 0 .. threadCount) { 3865 group.create(() { 3866 auto a = rs; 3867 foreach (j; 0 .. copyCount) { 3868 auto b = a; 3869 assert (b.handle == &raw); 3870 assert (raw == 1); 3871 } 3872 }); 3873 } 3874 group.joinAll(); 3875 assert (rs.handle == &raw); 3876 assert (raw == 1); 3877 } 3878 assert (raw == 0); 3879 } 3880 3881 // forceRelease() test 3882 @system unittest 3883 { 3884 import std.exception: assertNotThrown, assertThrown; 3885 static Exception myFree(shared(void)* p) @trusted nothrow 3886 { 3887 auto v = cast(shared(int)*) p; 3888 if (*v == 0) { 3889 return new Exception("double release"); 3890 } else { 3891 *v = 0; 3892 return null; 3893 } 3894 } 3895 3896 shared int i = 1; 3897 3898 auto ra = SharedResource(&i, &myFree); 3899 auto rb = ra; 3900 assert (ra.handle == &i); 3901 assert (rb.handle == ra.handle); 3902 rb.forceRelease(); 3903 assert (rb.handle == null); 3904 assert (ra.handle == null); 3905 } 3906 3907 3908 version(unittest) private string uniqueUrl(string p, int n = __LINE__) 3909 { 3910 import std.uuid; 3911 return p ~ "://" ~ randomUUID().toString(); 3912 } 3913 3914 3915 private auto trusted(alias func, Args...)(auto ref Args args) @trusted 3916 { 3917 return func(args); 3918 } 3919 3920 private auto scopedTrusted(alias func, Args...)(auto ref scope Args args) @trusted 3921 { 3922 return func(args); 3923 } 3924 3925 3926 // std.string.toStringz() is unsafe, so we provide our own implementation 3927 // tailored to the string sizes we are likely to encounter here. 3928 // Note that this implementation requires that the string be used immediately 3929 // upon return, and not stored, as the buffer will be reused most of the time. 3930 const(char)* zeroTermString(const char[] s) nothrow 3931 { 3932 import std.algorithm: max; 3933 static char[] buf; 3934 immutable len = s.length + 1; 3935 if (buf.length < len) buf.length = max(len, 1023); 3936 buf[0 .. s.length] = s; 3937 buf[s.length] = '\0'; 3938 return ptr(buf); 3939 } 3940 3941 @system unittest 3942 { 3943 auto c1 = zeroTermString("Hello World!"); 3944 assert (c1[0 .. 13] == "Hello World!\0"); 3945 auto c2 = zeroTermString("foo"); 3946 assert (c2[0 .. 4] == "foo\0"); 3947 assert (c1 == c2); 3948 }