1 /* 2 This Source Code Form is subject to the terms of the Mozilla Public 3 License, v. 2.0. If a copy of the MPL was not distributed with this 4 file, You can obtain one at http://mozilla.org/MPL/2.0/. 5 */ 6 7 /** 8 $(ZMQD) – a thin wrapper around the low-level C API of the 9 $(LINK2 http://zeromq.org,$(ZMQ)) messaging framework. 10 11 Most functions in this module have a one-to-one relationship with functions 12 in the underlying C API. Some adaptations have been made to make the API 13 safer, easier and more pleasant to use; most importantly: 14 $(UL 15 $(LI 16 Errors are signalled by means of exceptions rather than return 17 codes. The $(REF ZmqException) class provides 18 a standard textual message for any error condition, but it also 19 provides access to the $(D errno) code set by the C function 20 that reported the error.) 21 $(LI 22 Functions are marked with $(D @safe), $(D pure) and $(D nothrow) 23 as appropriate, thus facilitating their use in high-level D code.) 24 $(LI 25 Memory and resources (i.e. contexts, sockets and messages) are 26 automatically managed, thus preventing leaks.) 27 $(LI 28 Context, socket and message options are implemented as properties.) 29 ) 30 The names of functions and types in $(ZMQD) are very similar to those in 31 $(ZMQ), but they follow the D naming conventions. Thus, the library should 32 feel both familiar to $(ZMQ) users and natural to D users. A notable 33 deviation from the C API is that message parts are consistently called 34 "frames". For example, $(D zmq_msg_send()) becomes $(D zmqd.Frame.send()) 35 and so on. (Multipart messages were a late addition to $(ZMQ), and the "msg" 36 function names were well established at that point. The library's 37 developers have admitted that this is somewhat confusing, and the newer 38 CZMQ API consistently uses "frame" in function names.) 39 40 Due to the close correspondence with the C API, this documentation has 41 intentionally been kept sparse. There is really no reason to repeat the 42 contents of the $(ZMQAPI __start,$(ZMQ) reference manual) here. 43 Instead, the documentation for each function contains a "Corresponds to" 44 section that links to the appropriate pages in the $(ZMQ) reference. Any 45 details given in the present documentation mostly concern the D-specific 46 adaptations that have been made. 47 48 Also note that the examples generally only use the INPROC transport. The 49 reason for this is that the examples double as unittests, and we want to 50 avoid firewall troubles and other issues that could arise with the use of 51 network protocols such as TCP, PGM, etc., and the IPC protocol is not 52 supported on Windows. Anyway, they are only short 53 snippets that demonstrate the syntax; for more comprehensive and realistic 54 examples, please refer to the $(LINK2 http://zguide.zeromq.org/page:all, 55 $(ZMQ) Guide). Many of the examples in the Guide have been translated to 56 D, and can be found in the 57 $(LINK2 https://github.com/kyllingstad/zmqd/tree/master/examples,$(D examples)) 58 subdirectory of the $(ZMQD) source repository. 59 60 Version: 61 1.2.0 (compatible with $(ZMQ) >= 4.3.0) 62 Authors: 63 $(LINK2 http://github.com/kyllingstad,Lars T. Kyllingstad) 64 Copyright: 65 Copyright (c) 2013–2019, Lars T. Kyllingstad. All rights reserved. 66 License: 67 $(ZMQD) is released under the terms of the 68 $(LINK2 https://www.mozilla.org/MPL/2.0/index.txt,Mozilla Public License v. 2.0).$(BR) 69 Please refer to the $(LINK2 http://zeromq.org/area:licensing,$(ZMQ) site) 70 for details about $(ZMQ) licensing. 71 Macros: 72 D = <code>$0</code> 73 EM = <em>$0</em> 74 LDOTS = … 75 QUOTE = <blockquote>$0</blockquote> 76 FREF = $(D $(LINK2 #$1,$1())) 77 REF = $(D $(LINK2 #$1,$1)) 78 COREF = $(D $(LINK2 http://dlang.org/phobos/core_$1.html#.$2,core.$1.$2)) 79 OBJREF = $(D $(LINK2 http://dlang.org/phobos/object.html#.$1,$1)) 80 STDREF = $(D $(LINK2 http://dlang.org/phobos/std_$1.html#.$2,std.$1.$2)) 81 ANCHOR = <span id="$1"></span> 82 ZMQ = ZeroMQ 83 ZMQAPI = $(LINK2 http://api.zeromq.org/4-3:$1,$+) 84 ZMQD = $(EM zmqd) 85 ZMQREF = $(D $(ZMQAPI $1,$1)) 86 */ 87 module zmqd; 88 89 import core.time; 90 import std.typecons; 91 import deimos.zmq.zmq; 92 93 94 version(Windows) { 95 import core.sys.windows.winsock2: SOCKET; 96 } 97 98 // Compile with debug=ZMQD_DisableCurveTests to be able to successfully 99 // run unittests even if the local ZMQ library is not compiled with Curve 100 // support. 101 debug (ZMQD_DisableCurveTests) { } else debug = WithCurveTests; 102 103 // Compatibility check 104 version(unittest) static this() 105 { 106 import std.stdio: stderr; 107 const v = zmqVersion(); 108 if (v.major != ZMQ_VERSION_MAJOR || v.minor != ZMQ_VERSION_MINOR) { 109 stderr.writefln( 110 "Warning: Potential ZeroMQ header/library incompatibility: " 111 ~"The header (binding) is for version %d.%d.%d, " 112 ~"while the library is version %d.%d.%d. Unittests may fail.", 113 ZMQ_VERSION_MAJOR, ZMQ_VERSION_MINOR, ZMQ_VERSION_PATCH, 114 v.major, v.minor, v.patch); 115 } 116 // Known incompatibilities 117 import std.algorithm: min, max; 118 const libVersion = ZMQ_MAKE_VERSION(v.major, v.minor, v.patch); 119 const older = min(ZMQ_VERSION, libVersion); 120 const newer = max(ZMQ_VERSION, libVersion); 121 if (older < ZMQ_MAKE_VERSION(4, 1, 0) && newer >= ZMQ_MAKE_VERSION(4, 1, 0)) { 122 stderr.writeln("Note: Version 4.1.0 is known to be ABI incompatible with older versions"); 123 } else if (older < ZMQ_MAKE_VERSION(4, 1, 1) && newer >= ZMQ_MAKE_VERSION(4, 1, 1)) { 124 stderr.writeln("Note: Version 4.1.1 is known to be ABI incompatible with older versions"); 125 } 126 } 127 128 129 @safe: 130 131 T* ptr(T)(T[] arr) { return arr.length == 0 ? null : &arr[0]; } 132 133 /** 134 Reports the $(ZMQ) library version. 135 136 Returns: 137 A $(STDREF typecons,Tuple) with three integer fields that represent the 138 three versioning levels: $(D major), $(D minor) and $(D patch). 139 Corresponds_to: 140 $(ZMQREF zmq_version()) 141 */ 142 Tuple!(int, "major", int, "minor", int, "patch") zmqVersion() nothrow 143 { 144 typeof(return) v; 145 scopedTrusted!zmq_version(&v.major, &v.minor, &v.patch); 146 return v; 147 } 148 149 150 /** 151 Checks for a $(ZMQ) capability. 152 153 Corresponds_to: 154 $(ZMQREF zmq_has()) 155 */ 156 bool zmqHas(const char[] capability) nothrow 157 { 158 return 0 != trusted!zmq_has(zeroTermString(capability)); 159 } 160 161 unittest 162 { 163 assert (!zmqHas("this ain't a capability")); 164 } 165 166 167 /** 168 The various socket types. 169 170 These are described in the $(ZMQREF zmq_socket()) reference. 171 */ 172 enum SocketType 173 { 174 req = ZMQ_REQ, /// Corresponds to $(D ZMQ_REQ) 175 rep = ZMQ_REP, /// Corresponds to $(D ZMQ_REP) 176 dealer = ZMQ_DEALER, /// Corresponds to $(D ZMQ_DEALER) 177 router = ZMQ_ROUTER, /// Corresponds to $(D ZMQ_ROUTER) 178 pub = ZMQ_PUB, /// Corresponds to $(D ZMQ_PUB) 179 sub = ZMQ_SUB, /// Corresponds to $(D ZMQ_SUB) 180 xpub = ZMQ_XPUB, /// Corresponds to $(D ZMQ_XPUB) 181 xsub = ZMQ_XSUB, /// Corresponds to $(D ZMQ_XSUB) 182 push = ZMQ_PUSH, /// Corresponds to $(D ZMQ_PUSH) 183 pull = ZMQ_PULL, /// Corresponds to $(D ZMQ_PULL) 184 pair = ZMQ_PAIR, /// Corresponds to $(D ZMQ_PAIR) 185 stream = ZMQ_STREAM, /// Corresponds to $(D ZMQ_STREAM) 186 } 187 188 189 /// _Security mechanisms. 190 enum Security 191 { 192 /// $(ZMQAPI zmq_null,NULL): No security or confidentiality. 193 none = ZMQ_NULL, 194 195 /// $(ZMQAPI zmq_plain,PLAIN): Clear-text authentication. 196 plain = ZMQ_PLAIN, 197 198 /// $(ZMQAPI zmq_curve,CURVE): Secure authentication and confidentiality. 199 curve = ZMQ_CURVE, 200 } 201 202 203 /** 204 A special $(COREF time,Duration) value used to signify an infinite 205 timeout or time interval in certain contexts. 206 207 The places where this value may be used are: 208 $(UL 209 $(LI $(LINK2 #Socket.misc_options,certain socket options)) 210 $(LI $(FREF poll)) 211 ) 212 213 Note: 214 Since $(D core.time.Duration) doesn't reserve such a special value, the 215 actual value of $(D infiniteDuration) is $(COREF time,Duration.max). 216 */ 217 enum infiniteDuration = Duration.max; 218 219 220 /** 221 An object that encapsulates a $(ZMQ) socket. 222 223 A default-initialized $(D Socket) is not a valid $(ZMQ) socket; it 224 must always be explicitly initialized with a constructor (see 225 $(FREF _Socket.this)): 226 --- 227 Socket s; // Not a valid socket yet 228 s = Socket(SocketType.push); // ...but now it is. 229 --- 230 This $(D struct) is noncopyable, which means that a socket is always 231 uniquely managed by a single $(D Socket) object. Functions that will 232 inspect or use the socket, but not take ownership of it, should take 233 a $(D ref Socket) parameter. Use $(STDREF algorithm,move) to move 234 a $(D Socket) to a different location (e.g. into a sink function that 235 takes it by value, or into a new variable). 236 237 The socket is automatically closed when the $(D Socket) object goes out 238 of scope. 239 240 Linger_period: 241 Note that Socket by default sets the socket's linger period to zero. 242 This deviates from the $(ZMQ) default (which is an infinite linger period). 243 */ 244 struct Socket 245 { 246 @safe: 247 /** 248 Creates a new $(ZMQ) socket. 249 250 If $(D context) is not specified, the default _context (as returned 251 by $(FREF defaultContext)) is used. 252 253 Throws: 254 $(REF ZmqException) if $(ZMQ) reports an error. 255 Corresponds_to: 256 $(ZMQREF zmq_socket()) 257 */ 258 this(SocketType type) 259 { 260 this(defaultContext(), type); 261 } 262 263 /// ditto 264 this(Context context, SocketType type) 265 { 266 m_context = context; 267 m_type = type; 268 m_socket = trusted!zmq_socket(context.handle, type); 269 if (m_socket == null) { 270 throw new ZmqException; 271 } 272 linger = 0.msecs; 273 } 274 275 /// With default context: 276 unittest 277 { 278 auto sck = Socket(SocketType.push); 279 assert (sck.initialized); 280 } 281 /// With explicit context: 282 unittest 283 { 284 auto ctx = Context(); 285 auto sck = Socket(ctx, SocketType.push); 286 assert (sck.initialized); 287 } 288 289 // Socket objects are noncopyable. 290 @disable this(this); 291 292 unittest // Verify that move semantics work. 293 { 294 import std.algorithm: move; 295 struct SocketOwner 296 { 297 this(Socket s) { owned = trusted!move(s); } 298 Socket owned; 299 } 300 auto socket = Socket(SocketType.req); 301 const socketPtr = socket.handle; 302 assert (socketPtr != null); 303 304 auto owner = SocketOwner(trusted!move(socket)); 305 assert (socket.handle == null); 306 assert (!socket.m_context.initialized); 307 assert (owner.owned.handle == socketPtr); 308 assert (owner.owned.m_context.initialized); 309 } 310 311 // Closes the socket on desctruction. 312 ~this() nothrow { nothrowClose(); } 313 314 /** 315 Closes the $(ZMQ) socket. 316 317 Note that the socket will be closed automatically upon destruction, 318 so it is usually not necessary to call this method manually. 319 320 Throws: 321 $(REF ZmqException) if $(ZMQ) reports an error. 322 Corresponds_to: 323 $(ZMQREF zmq_close()) 324 */ 325 void close() 326 { 327 if (!nothrowClose()) throw new ZmqException; 328 } 329 330 /// 331 unittest 332 { 333 auto s = Socket(SocketType.pair); 334 assert (s.initialized); 335 s.close(); 336 assert (!s.initialized); 337 } 338 339 /** 340 Starts accepting incoming connections on $(D endpoint). 341 342 Throws: 343 $(REF ZmqException) if $(ZMQ) reports an error. 344 Corresponds_to: 345 $(ZMQREF zmq_bind()) 346 */ 347 void bind(const char[] endpoint) 348 { 349 if (trusted!zmq_bind(m_socket, zeroTermString(endpoint)) != 0) { 350 throw new ZmqException; 351 } 352 } 353 354 /// 355 unittest 356 { 357 auto s = Socket(SocketType.pub); 358 s.bind("inproc://zmqd_bind_example"); 359 } 360 361 /** 362 Stops accepting incoming connections on $(D endpoint). 363 364 Throws: 365 $(REF ZmqException) if $(ZMQ) reports an error. 366 Corresponds_to: 367 $(ZMQREF zmq_unbind()) 368 */ 369 void unbind(const char[] endpoint) 370 { 371 if (trusted!zmq_unbind(m_socket, zeroTermString(endpoint)) != 0) { 372 throw new ZmqException; 373 } 374 } 375 376 unittest 377 { 378 auto s = Socket(SocketType.pub); 379 s.bind("inproc://zmqd_unbind_example"); 380 // Do some work... 381 s.unbind("inproc://zmqd_unbind_example"); 382 } 383 384 /** 385 Creates an outgoing connection to $(D endpoint). 386 387 Throws: 388 $(REF ZmqException) if $(ZMQ) reports an error. 389 Corresponds_to: 390 $(ZMQREF zmq_connect()) 391 */ 392 void connect(const char[] endpoint) 393 { 394 if (trusted!zmq_connect(m_socket, zeroTermString(endpoint)) != 0) { 395 throw new ZmqException; 396 } 397 } 398 399 /// 400 unittest 401 { 402 auto s = Socket(SocketType.sub); 403 s.connect("inproc://zmqd_connect_example"); 404 } 405 406 /** 407 Disconnects the socket from $(D endpoint). 408 409 Throws: 410 $(REF ZmqException) if $(ZMQ) reports an error. 411 Corresponds_to: 412 $(ZMQREF zmq_disconnect()) 413 */ 414 void disconnect(const char[] endpoint) 415 { 416 if (trusted!zmq_disconnect(m_socket, zeroTermString(endpoint)) != 0) { 417 throw new ZmqException; 418 } 419 } 420 421 /// 422 unittest 423 { 424 auto s = Socket(SocketType.sub); 425 s.connect("inproc://zmqd_disconnect_example"); 426 // Do some work... 427 s.disconnect("inproc://zmqd_disconnect_example"); 428 } 429 430 /** 431 Sends a message frame. 432 433 $(D _send) blocks until the frame has been queued on the socket. 434 $(D trySend) performs the operation in non-blocking mode, and returns 435 a $(D bool) value that signifies whether the frame was queued on the 436 socket. 437 438 The $(D more) parameter specifies whether this is a multipart message 439 and there are _more frames to follow. 440 441 The $(D char[]) overload is a convenience function that simply casts 442 the string argument to $(D ubyte[]). 443 444 Throws: 445 $(REF ZmqException) if $(ZMQ) reports an error. 446 Corresponds_to: 447 $(ZMQREF zmq_send()) (with the $(D ZMQ_DONTWAIT) flag, in the 448 case of $(D trySend), and with the $(D ZMQ_SNDMORE) flag if 449 $(D more == true)). 450 */ 451 void send(const ubyte[] data, bool more = false) 452 { 453 immutable flags = more ? ZMQ_SNDMORE : 0; 454 if (trusted!zmq_send(m_socket, ptr(data), data.length, flags) < 0) { 455 throw new ZmqException; 456 } 457 } 458 459 /// ditto 460 void send(const char[] data, bool more = false) 461 { 462 send(cast(const(ubyte)[]) data, more); 463 } 464 465 /// ditto 466 bool trySend(const ubyte[] data, bool more = false) 467 { 468 immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0); 469 if (trusted!zmq_send(m_socket, ptr(data), data.length, flags) < 0) { 470 import core.stdc.errno: EAGAIN, EINTR; 471 import std.algorithm : among; 472 if (trusted!zmq_errno().among(EAGAIN, EINTR)) return false; 473 else throw new ZmqException; 474 } 475 return true; 476 } 477 478 /// ditto 479 bool trySend(const char[] data, bool more = false) 480 { 481 return trySend(cast(const(ubyte)[]) data, more); 482 } 483 484 /// 485 unittest 486 { 487 auto sck = Socket(SocketType.pub); 488 sck.send(cast(ubyte[]) [11, 226, 92]); 489 sck.send("Hello World!"); 490 } 491 492 /** 493 Sends a message frame. 494 495 $(D _send) blocks until the frame has been queued on the socket. 496 $(D trySend) performs the operation in non-blocking mode, and returns 497 a $(D bool) value that signifies whether the frame was queued on the 498 socket. 499 500 The $(D more) parameter specifies whether this is a multipart message 501 and there are _more frames to follow. 502 503 Throws: 504 $(REF ZmqException) if $(ZMQ) reports an error. 505 Corresponds_to: 506 $(ZMQREF zmq_msg_send()) (with the $(D ZMQ_DONTWAIT) flag, in the 507 case of $(D trySend), and with the $(D ZMQ_SNDMORE) flag if 508 $(D more == true)). 509 */ 510 void send(ref Frame msg, bool more = false) 511 { 512 immutable flags = more ? ZMQ_SNDMORE : 0; 513 if (trusted!zmq_msg_send(msg.handle, m_socket, flags) < 0) { 514 throw new ZmqException; 515 } 516 } 517 518 /// ditto 519 bool trySend(ref Frame msg, bool more = false) 520 { 521 immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0); 522 if (trusted!zmq_msg_send(msg.handle, m_socket, flags) < 0) { 523 import core.stdc.errno: EAGAIN, EINTR; 524 import std.algorithm : among; 525 if (trusted!zmq_errno().among(EAGAIN, EINTR)) return false; 526 else throw new ZmqException; 527 } 528 return true; 529 } 530 531 /// 532 unittest 533 { 534 auto sck = Socket(SocketType.pub); 535 auto msg = Frame(12); 536 msg.data.asString()[] = "Hello World!"; 537 sck.send(msg); 538 } 539 540 /** 541 Sends a constant-memory message frame. 542 543 $(D _sendConst) blocks until the frame has been queued on the socket. 544 $(D trySendConst) performs the operation in non-blocking mode, and returns 545 a $(D bool) value that signifies whether the frame was queued on the 546 socket. 547 548 The $(D more) parameter specifies whether this is a multipart message 549 and there are _more frames to follow. 550 551 Throws: 552 $(REF ZmqException) if $(ZMQ) reports an error. 553 Corresponds_to: 554 $(ZMQREF zmq_send_const()) (with the $(D ZMQ_DONTWAIT) flag, in the 555 case of $(D trySend), and with the $(D ZMQ_SNDMORE) flag if 556 $(D more == true)). 557 */ 558 void sendConst(immutable ubyte[] data, bool more = false) 559 { 560 immutable flags = more ? ZMQ_SNDMORE : 0; 561 if (trusted!zmq_send_const(m_socket, ptr(data), data.length, flags) < 0) { 562 throw new ZmqException; 563 } 564 } 565 566 /// ditto 567 void sendConst(string data, bool more = false) 568 { 569 sendConst(cast(immutable(ubyte)[]) data, more); 570 } 571 572 /// ditto 573 bool trySendConst(immutable ubyte[] data, bool more = false) 574 { 575 immutable flags = ZMQ_DONTWAIT | (more ? ZMQ_SNDMORE : 0); 576 if (trusted!zmq_send_const(m_socket, ptr(data), data.length, flags) < 0) { 577 import core.stdc.errno: EAGAIN, EINTR; 578 import std.algorithm : among; 579 if (trusted!zmq_errno().among(EAGAIN, EINTR)) return false; 580 else throw new ZmqException; 581 } 582 return true; 583 } 584 585 /// ditto 586 bool trySendConst(string data, bool more = false) 587 { 588 return trySend(cast(immutable(ubyte)[]) data, more); 589 } 590 591 /// 592 unittest 593 { 594 static immutable arr = cast(ubyte[]) [11, 226, 92]; 595 auto sck = Socket(SocketType.pub); 596 sck.sendConst(arr); 597 sck.sendConst("Hello World!"); 598 } 599 600 /** 601 Receives a message frame. 602 603 $(D _receive) blocks until the request can be satisfied, and returns the 604 number of bytes in the frame. 605 $(D tryReceive) performs the operation in non-blocking mode, and returns 606 a $(STDREF typecons,Tuple) which contains the size of the frame along 607 with a $(D bool) value that signifies whether a frame was received. 608 (If the latter is $(D false), the former is always set to zero.) 609 610 Throws: 611 $(REF ZmqException) if $(ZMQ) reports an error. 612 Corresponds_to: 613 $(ZMQREF zmq_recv()) (with the $(D ZMQ_DONTWAIT) flag, in the case 614 of $(D tryReceive)). 615 616 */ 617 size_t receive(scope ubyte[] data) 618 { 619 immutable len = scopedTrusted!zmq_recv(m_socket, ptr(data), data.length, 0); 620 if (len >= 0) { 621 import std.conv; 622 return to!size_t(len); 623 } else { 624 throw new ZmqException; 625 } 626 } 627 628 /// ditto 629 Tuple!(size_t, bool) tryReceive(ubyte[] data) 630 { 631 immutable len = trusted!zmq_recv(m_socket, ptr(data), data.length, ZMQ_DONTWAIT); 632 if (len >= 0) { 633 import std.conv; 634 return typeof(return)(to!size_t(len), true); 635 } else { 636 import core.stdc.errno: EAGAIN, EINTR; 637 import std.algorithm : among; 638 if (trusted!zmq_errno().among(EAGAIN, EINTR)) { 639 return typeof(return)(0, false); 640 } else { 641 throw new ZmqException; 642 } 643 } 644 } 645 646 /// 647 unittest 648 { 649 // Sender 650 auto snd = Socket(SocketType.req); 651 snd.connect("inproc://zmqd_receive_example"); 652 snd.send("Hello World!"); 653 654 // Receiver 655 import std.string: representation; 656 auto rcv = Socket(SocketType.rep); 657 rcv.bind("inproc://zmqd_receive_example"); 658 char[256] buf; 659 immutable len = rcv.receive(buf.representation); 660 assert (buf[0 .. len] == "Hello World!"); 661 } 662 663 @system unittest 664 { 665 auto snd = Socket(SocketType.pair); 666 snd.bind("inproc://zmqd_tryReceive_example"); 667 auto rcv = Socket(SocketType.pair); 668 rcv.connect("inproc://zmqd_tryReceive_example"); 669 670 ubyte[256] buf; 671 auto r1 = rcv.tryReceive(buf); 672 assert (!r1[1]); 673 674 import core.thread, core.time, std.string; 675 snd.send("Hello World!"); 676 Thread.sleep(100.msecs); // Wait for message to be transferred... 677 auto r2 = rcv.tryReceive(buf); 678 assert (r2[1] && buf[0 .. r2[0]] == "Hello World!".representation); 679 } 680 681 /** 682 Receives a message frame. 683 684 $(D _receive) blocks until the request can be satisfied, and returns the 685 number of bytes in the frame. 686 $(D tryReceive) performs the operation in non-blocking mode, and returns 687 a $(STDREF typecons,Tuple) which contains the size of the frame along 688 with a $(D bool) value that signifies whether a frame was received. 689 (If the latter is $(D false), the former is always set to zero.) 690 691 Throws: 692 $(REF ZmqException) if $(ZMQ) reports an error. 693 Corresponds_to: 694 $(ZMQREF zmq_msg_recv()) (with the $(D ZMQ_DONTWAIT) flag, in the case 695 of $(D tryReceive)). 696 697 */ 698 size_t receive(ref Frame msg) 699 { 700 immutable len = trusted!zmq_msg_recv(msg.handle, m_socket, 0); 701 if (len >= 0) { 702 import std.conv; 703 return to!size_t(len); 704 } else { 705 throw new ZmqException; 706 } 707 } 708 709 /// ditto 710 Tuple!(size_t, bool) tryReceive(ref Frame msg) 711 { 712 immutable len = trusted!zmq_msg_recv(msg.handle, m_socket, ZMQ_DONTWAIT); 713 if (len >= 0) { 714 import std.conv; 715 return typeof(return)(to!size_t(len), true); 716 } else { 717 import core.stdc.errno: EAGAIN, EINTR; 718 import std.algorithm : among; 719 if (trusted!zmq_errno().among(EAGAIN, EINTR)) { 720 return typeof(return)(0, false); 721 } else { 722 throw new ZmqException; 723 } 724 } 725 } 726 727 /// 728 unittest 729 { 730 // Sender 731 auto snd = Socket(SocketType.req); 732 snd.connect("inproc://zmqd_msg_receive_example"); 733 snd.send("Hello World!"); 734 735 // Receiver 736 import std.string: representation; 737 auto rcv = Socket(SocketType.rep); 738 rcv.bind("inproc://zmqd_msg_receive_example"); 739 auto msg = Frame(); 740 rcv.receive(msg); 741 assert (msg.data.asString() == "Hello World!"); 742 } 743 744 @system unittest 745 { 746 auto snd = Socket(SocketType.pair); 747 snd.bind("inproc://zmqd_msg_tryReceive_example"); 748 auto rcv = Socket(SocketType.pair); 749 rcv.connect("inproc://zmqd_msg_tryReceive_example"); 750 751 auto msg = Frame(); 752 auto r1 = rcv.tryReceive(msg); 753 assert (!r1[1]); 754 755 import core.thread, core.time, std.string; 756 snd.send("Hello World!"); 757 Thread.sleep(100.msecs); // Wait for message to be transferred... 758 auto r2 = rcv.tryReceive(msg); 759 assert (r2[1] && msg.data[0 .. r2[0]] == "Hello World!".representation); 760 } 761 762 /** 763 Whether there are _more message frames to follow. 764 765 Throws: 766 $(REF ZmqException) if $(ZMQ) reports an error. 767 Corresponds_to: 768 $(ZMQREF zmq_getsockopt()) with $(D ZMQ_RCVMORE). 769 */ 770 @property bool more() { return !!getOption!int(ZMQ_RCVMORE); } 771 772 // TODO: Better unittest/example 773 unittest 774 { 775 auto sck = Socket(SocketType.req); 776 assert (!sck.more); 777 } 778 779 /** $(ANCHOR Socket.misc_options) 780 Miscellaneous socket options. 781 782 Each of these corresponds to an option passed to 783 $(ZMQREF zmq_getsockopt()) and $(ZMQREF zmq_setsockopt()). For 784 example, $(D identity) corresponds to $(D ZMQ_IDENTITY), 785 $(D receiveBufferSize) corresponds to $(D ZMQ_RCVBUF), etc. 786 787 Notes: 788 $(UL 789 $(LI For convenience, the setter for the $(D identity) property 790 accepts strings. To retrieve a string with the getter, use 791 the $(FREF asString) function. 792 --- 793 sck.identity = "foobar"; 794 assert (sck.identity.asString() == "foobar"); 795 --- 796 ) 797 $(LI The $(D linger), $(D receiveTimeout), $(D sendTimeout) and 798 $(D handshakeInterval) properties may have the special _value 799 $(REF infiniteDuration). This is translated to an option 800 _value of -1 or 0 (depending on which property is being set) 801 in the C API.) 802 $(LI Some options have array _type, and these allow the user to supply 803 a buffer in which to store the _value, to avoid a GC allocation. 804 The return _value is then a slice of this buffer. 805 These are not marked as $(D @property), but are prefixed with 806 "get" (e.g. $(D getIdentity())). A user-supplied buffer is 807 $(EM required) for some options, namely $(D getPlainUsername()) 808 and $(D getPlainPassword()), and these do not have $(D @property) 809 versions. $(D getCurveXxxKey()) and $(D getCurveXxxKeyZ85()) 810 require buffers which are at least 32 and 41 bytes long, 811 respectively.) 812 $(LI The $(D ZMQ_SUBSCRIBE) and $(D ZMQ_UNSUBSCRIBE) options are 813 treated differently from the others; see $(FREF Socket.subscribe) 814 and $(FREF Socket.unsubscribe)) 815 ) 816 817 Throws: 818 $(REF ZmqException) if $(ZMQ) reports an error.$(BR) 819 $(STDREF conv,ConvOverflowException) if a given $(D Duration) is 820 longer than the number of milliseconds that will fit in an $(D int) 821 (only applies to properties of $(COREF time,Duration) _type).$(BR) 822 $(COREF exception,RangeError) if the $(D dest) buffers passed to 823 $(D getCurveXxxKey()) or $(D getCurveXxxKeyZ85()) are less than 824 32 or 41 bytes long, respectively. 825 Corresponds_to: 826 $(ZMQREF zmq_getsockopt()) and $(ZMQREF zmq_setsockopt()). 827 */ 828 @property ulong threadAffinity() { return getOption!ulong(ZMQ_AFFINITY); } 829 /// ditto 830 @property void threadAffinity(ulong value) { setOption(ZMQ_AFFINITY, value); } 831 832 /// ditto 833 @property ubyte[] identity() { return getIdentity(new ubyte[255]); } 834 /// ditto 835 ubyte[] getIdentity(ubyte[] dest) { return getArrayOption(ZMQ_IDENTITY, dest); } 836 /// ditto 837 @property void identity(const ubyte[] value) { setArrayOption(ZMQ_IDENTITY, value); } 838 /// ditto 839 @property void identity(const char[] value) { setArrayOption(ZMQ_IDENTITY, value); } 840 841 /// ditto 842 @property int rate() { return getOption!int(ZMQ_RATE); } 843 /// ditto 844 @property void rate(int value) { setOption(ZMQ_RATE, value); } 845 846 /// ditto 847 @property Duration recoveryInterval() 848 { 849 return msecs(getOption!int(ZMQ_RECOVERY_IVL)); 850 } 851 /// ditto 852 @property void recoveryInterval(Duration value) 853 { 854 import std.conv: to; 855 setOption(ZMQ_RECOVERY_IVL, to!int(value.total!"msecs"())); 856 } 857 858 /// ditto 859 @property int sendBufferSize() { return getOption!int(ZMQ_SNDBUF); } 860 /// ditto 861 @property void sendBufferSize(int value) { setOption(ZMQ_SNDBUF, value); } 862 863 /// ditto 864 @property int receiveBufferSize() { return getOption!int(ZMQ_RCVBUF); } 865 /// ditto 866 @property void receiveBufferSize(int value) { setOption(ZMQ_RCVBUF, value); } 867 868 /// ditto 869 @property FD fd() { return getOption!FD(ZMQ_FD); } 870 871 /// ditto 872 @property PollFlags events() { return getOption!PollFlags(ZMQ_EVENTS); } 873 874 /// ditto 875 @property SocketType type() { return getOption!SocketType(ZMQ_TYPE); } 876 877 /// ditto 878 @property Duration linger() 879 { 880 const auto value = getOption!int(ZMQ_LINGER); 881 return value == -1 ? infiniteDuration : value.msecs; 882 } 883 /// ditto 884 @property void linger(Duration value) 885 { 886 import std.conv: to; 887 setOption( 888 ZMQ_LINGER, 889 value == infiniteDuration ? -1 : to!int(value.total!"msecs"())); 890 } 891 892 /// ditto 893 @property Duration reconnectionInterval() 894 { 895 return getOption!int(ZMQ_RECONNECT_IVL).msecs; 896 } 897 /// ditto 898 @property void reconnectionInterval(Duration value) 899 { 900 import std.conv: to; 901 setOption(ZMQ_RECONNECT_IVL, to!int(value.total!"msecs"())); 902 } 903 904 /// ditto 905 @property int backlog() { return getOption!int(ZMQ_BACKLOG); } 906 /// ditto 907 @property void backlog(int value) { setOption(ZMQ_BACKLOG, value); } 908 909 /// ditto 910 @property Duration maxReconnectionInterval() 911 { 912 return getOption!int(ZMQ_RECONNECT_IVL_MAX).msecs; 913 } 914 /// ditto 915 @property void maxReconnectionInterval(Duration value) 916 { 917 import std.conv: to; 918 setOption(ZMQ_RECONNECT_IVL_MAX, to!int(value.total!"msecs"())); 919 } 920 921 /// ditto 922 @property long maxMsgSize() { return getOption!long(ZMQ_MAXMSGSIZE); } 923 /// ditto 924 @property void maxMsgSize(long value) { setOption(ZMQ_MAXMSGSIZE, value); } 925 926 /// ditto 927 @property int sendHWM() { return getOption!int(ZMQ_SNDHWM); } 928 /// ditto 929 @property void sendHWM(int value) { setOption(ZMQ_SNDHWM, value); } 930 931 /// ditto 932 @property int receiveHWM() { return getOption!int(ZMQ_RCVHWM); } 933 /// ditto 934 @property void receiveHWM(int value) { setOption(ZMQ_RCVHWM, value); } 935 936 /// ditto 937 @property int multicastHops() { return getOption!int(ZMQ_MULTICAST_HOPS); } 938 /// ditto 939 @property void multicastHops(int value) { setOption(ZMQ_MULTICAST_HOPS, value); } 940 941 /// ditto 942 @property Duration receiveTimeout() 943 { 944 const value = getOption!int(ZMQ_RCVTIMEO); 945 return value == -1 ? infiniteDuration : value.msecs; 946 } 947 /// ditto 948 @property void receiveTimeout(Duration value) 949 { 950 import std.conv: to; 951 setOption( 952 ZMQ_RCVTIMEO, 953 value == infiniteDuration ? -1 : to!int(value.total!"msecs"())); 954 } 955 956 /// ditto 957 @property Duration sendTimeout() 958 { 959 const value = getOption!int(ZMQ_SNDTIMEO); 960 return value == -1 ? infiniteDuration : value.msecs; 961 } 962 /// ditto 963 @property void sendTimeout(Duration value) 964 { 965 import std.conv: to; 966 setOption( 967 ZMQ_SNDTIMEO, 968 value == infiniteDuration ? -1 : to!int(value.total!"msecs"())); 969 } 970 971 /// ditto 972 @property char[] lastEndpoint() @trusted 973 { 974 // This function is not @safe because it calls a @system function 975 // (zmq_getsockopt) and takes the address of a local (len). 976 auto buf = new char[1024]; 977 size_t len = buf.length; 978 if (zmq_getsockopt(m_socket, ZMQ_LAST_ENDPOINT, buf.ptr, &len) != 0) { 979 throw new ZmqException; 980 } 981 return buf[0 .. len-1]; 982 } 983 984 /// ditto 985 @property void routerMandatory(bool value) { setOption(ZMQ_ROUTER_MANDATORY, value ? 1 : 0); } 986 987 /// ditto 988 @property int tcpKeepalive() { return getOption!int(ZMQ_TCP_KEEPALIVE); } 989 /// ditto 990 @property void tcpKeepalive(int value) { setOption(ZMQ_TCP_KEEPALIVE, value); } 991 992 /// ditto 993 @property int tcpKeepaliveCnt() { return getOption!int(ZMQ_TCP_KEEPALIVE_CNT); } 994 /// ditto 995 @property void tcpKeepaliveCnt(int value) { setOption(ZMQ_TCP_KEEPALIVE_CNT, value); } 996 997 /// ditto 998 @property int tcpKeepaliveIdle() { return getOption!int(ZMQ_TCP_KEEPALIVE_IDLE); } 999 /// ditto 1000 @property void tcpKeepaliveIdle(int value) { setOption(ZMQ_TCP_KEEPALIVE_IDLE, value); } 1001 1002 /// ditto 1003 @property int tcpKeepaliveIntvl() { return getOption!int(ZMQ_TCP_KEEPALIVE_INTVL); } 1004 /// ditto 1005 @property void tcpKeepaliveIntvl(int value) { setOption(ZMQ_TCP_KEEPALIVE_INTVL, value); } 1006 1007 /// ditto 1008 @property bool immediate() { return !!getOption!int(ZMQ_IMMEDIATE); } 1009 /// ditto 1010 @property void immediate(bool value) { setOption!int(ZMQ_IMMEDIATE, value ? 1 : 0); } 1011 1012 /// ditto 1013 @property void xpubVerbose(bool value) { setOption(ZMQ_XPUB_VERBOSE, value ? 1 : 0); } 1014 1015 /// ditto 1016 @property bool ipv6() { return !!getOption!int(ZMQ_IPV6); } 1017 /// ditto 1018 @property void ipv6(bool value) { setOption(ZMQ_IPV6, value ? 1 : 0); } 1019 1020 /// ditto 1021 @property Security mechanism() 1022 { 1023 import std.conv; 1024 return to!Security(getOption!int(ZMQ_MECHANISM)); 1025 } 1026 1027 /// ditto 1028 @property bool plainServer() { return !!getOption!int(ZMQ_PLAIN_SERVER); } 1029 /// ditto 1030 @property void plainServer(bool value) { setOption(ZMQ_PLAIN_SERVER, value ? 1 : 0); } 1031 1032 /// ditto 1033 char[] getPlainUsername(char[] dest) 1034 { 1035 return getCStringOption(ZMQ_PLAIN_USERNAME, dest); 1036 } 1037 /// ditto 1038 @property void plainUsername(const(char)[] value) 1039 { 1040 setArrayOption(ZMQ_PLAIN_USERNAME, value); 1041 } 1042 1043 /// ditto 1044 char[] getPlainPassword(char[] dest) 1045 { 1046 return getCStringOption(ZMQ_PLAIN_PASSWORD, dest); 1047 } 1048 /// ditto 1049 @property void plainPassword(const(char)[] value) 1050 { 1051 setArrayOption(ZMQ_PLAIN_PASSWORD, value); 1052 } 1053 1054 /// ditto 1055 @property bool curveServer() { return !!getOption!int(ZMQ_CURVE_SERVER); } 1056 /// ditto 1057 @property void curveServer(bool value) { setOption(ZMQ_CURVE_SERVER, value ? 1 : 0); } 1058 1059 /// ditto 1060 @property ubyte[] curvePublicKey() 1061 { 1062 return getCurvePublicKey(new ubyte[keyBufSizeBin]); 1063 } 1064 /// ditto 1065 ubyte[] getCurvePublicKey(ubyte[] dest) 1066 { 1067 return getCurveKey(ZMQ_CURVE_PUBLICKEY, dest); 1068 } 1069 /// ditto 1070 @property char[] curvePublicKeyZ85() 1071 { 1072 return getCurvePublicKeyZ85(new char[keyBufSizeZ85]); 1073 } 1074 /// ditto 1075 char[] getCurvePublicKeyZ85(char[] dest) 1076 { 1077 return getCurveKeyZ85(ZMQ_CURVE_PUBLICKEY, dest); 1078 } 1079 /// ditto 1080 @property void curvePublicKey(const(ubyte)[] value) 1081 { 1082 setCurveKey(ZMQ_CURVE_PUBLICKEY, value); 1083 } 1084 /// ditto 1085 @property void curvePublicKeyZ85(const(char)[] value) 1086 { 1087 setCurveKeyZ85(ZMQ_CURVE_PUBLICKEY, value); 1088 } 1089 1090 /// ditto 1091 @property ubyte[] curveSecretKey() 1092 { 1093 return getCurveSecretKey(new ubyte[keyBufSizeBin]); 1094 } 1095 /// ditto 1096 ubyte[] getCurveSecretKey(ubyte[] dest) 1097 { 1098 return getCurveKey(ZMQ_CURVE_SECRETKEY, dest); 1099 } 1100 /// ditto 1101 @property char[] curveSecretKeyZ85() 1102 { 1103 return getCurveSecretKeyZ85(new char[keyBufSizeZ85]); 1104 } 1105 /// ditto 1106 char[] getCurveSecretKeyZ85(char[] dest) 1107 { 1108 return getCurveKeyZ85(ZMQ_CURVE_SECRETKEY, dest); 1109 } 1110 /// ditto 1111 @property void curveSecretKey(const(ubyte)[] value) 1112 { 1113 setCurveKey(ZMQ_CURVE_SECRETKEY, value); 1114 } 1115 /// ditto 1116 @property void curveSecretKeyZ85(const(char)[] value) 1117 { 1118 setCurveKeyZ85(ZMQ_CURVE_SECRETKEY, value); 1119 } 1120 1121 /// ditto 1122 @property ubyte[] curveServerKey() 1123 { 1124 return getCurveServerKey(new ubyte[keyBufSizeBin]); 1125 } 1126 /// ditto 1127 ubyte[] getCurveServerKey(ubyte[] dest) 1128 { 1129 return getCurveKey(ZMQ_CURVE_SERVERKEY, dest); 1130 } 1131 /// ditto 1132 @property char[] curveServerKeyZ85() 1133 { 1134 return getCurveServerKeyZ85(new char[keyBufSizeZ85]); 1135 } 1136 /// ditto 1137 char[] getCurveServerKeyZ85(char[] dest) 1138 { 1139 return getCurveKeyZ85(ZMQ_CURVE_SERVERKEY, dest); 1140 } 1141 /// ditto 1142 @property void curveServerKey(const(ubyte)[] value) 1143 { 1144 setCurveKey(ZMQ_CURVE_SERVERKEY, value); 1145 } 1146 /// ditto 1147 @property void curveServerKeyZ85(const(char)[] value) 1148 { 1149 setCurveKeyZ85(ZMQ_CURVE_SERVERKEY, value); 1150 } 1151 1152 /// ditto 1153 @property void probeRouter(bool value) { setOption(ZMQ_PROBE_ROUTER, value ? 1 : 0); } 1154 1155 /// ditto 1156 @property void reqCorrelate(bool value) { setOption(ZMQ_REQ_CORRELATE, value ? 1 : 0); } 1157 1158 /// ditto 1159 @property void reqRelaxed(bool value) { setOption(ZMQ_REQ_RELAXED, value ? 1 : 0); } 1160 1161 /// ditto 1162 @property void conflate(bool value) { setOption(ZMQ_CONFLATE, value ? 1 : 0); } 1163 1164 /// ditto 1165 @property char[] zapDomain() { return getZapDomain(new char[256]); } 1166 /// ditto 1167 char[] getZapDomain(char[] dest) { return getCStringOption(ZMQ_ZAP_DOMAIN, dest); } 1168 /// ditto 1169 @property void zapDomain(const char[] value) { setArrayOption(ZMQ_ZAP_DOMAIN, value); } 1170 1171 /// ditto 1172 @property void routerHandover(bool value) { setOption(ZMQ_ROUTER_HANDOVER, value ? 1 : 0); } 1173 1174 /// ditto 1175 @property void connectionRID(const ubyte[] value) { setArrayOption(ZMQ_CONNECT_RID, value); } 1176 1177 /// ditto 1178 @property int typeOfService() { return getOption!int(ZMQ_TOS); } 1179 /// ditto 1180 @property void typeOfService(int value) { setOption(ZMQ_TOS, value); } 1181 1182 /// ditto 1183 @property bool gssapiPlaintext() { return !!getOption!int(ZMQ_GSSAPI_PLAINTEXT); } 1184 /// ditto 1185 @property void gssapiPlaintext(bool value) { setOption(ZMQ_GSSAPI_PLAINTEXT, value ? 1 : 0); } 1186 1187 /// ditto 1188 @property char[] gssapiPrincipal() { return getGssapiPrincipal(new char[256]); } 1189 /// ditto 1190 char[] getGssapiPrincipal(char[] dest) { return getCStringOption(ZMQ_GSSAPI_PRINCIPAL, dest); } 1191 /// ditto 1192 @property void gssapiPrincipal(const char[] value) { setArrayOption(ZMQ_GSSAPI_PRINCIPAL, value); } 1193 1194 /// ditto 1195 @property bool gssapiServer() { return !!getOption!int(ZMQ_GSSAPI_SERVER); } 1196 /// ditto 1197 @property void gssapiServer(bool value) { setOption(ZMQ_GSSAPI_SERVER, value ? 1 : 0); } 1198 1199 /// ditto 1200 @property char[] gssapiServicePrincipal() { return getGssapiServicePrincipal(new char[256]); } 1201 /// ditto 1202 char[] getGssapiServicePrincipal(char[] dest) { return getCStringOption(ZMQ_GSSAPI_SERVICE_PRINCIPAL, dest); } 1203 /// ditto 1204 @property void gssapiServicePrincipal(const char[] value) { setArrayOption(ZMQ_GSSAPI_SERVICE_PRINCIPAL, value); } 1205 1206 /// ditto 1207 @property Duration handshakeInterval() 1208 { 1209 const auto value = getOption!int(ZMQ_HANDSHAKE_IVL); 1210 return value == 0 ? infiniteDuration : msecs(value); 1211 } 1212 /// ditto 1213 @property void handshakeInterval(Duration value) 1214 { 1215 import std.conv: to; 1216 setOption( 1217 ZMQ_HANDSHAKE_IVL, 1218 value == infiniteDuration ? 0 : to!int(value.total!"msecs"())); 1219 } 1220 1221 // deprecated options 1222 deprecated("Use the 'immediate' property instead") 1223 @property bool delayAttachOnConnect() { return !!getOption!int(ZMQ_DELAY_ATTACH_ON_CONNECT); } 1224 1225 deprecated("Use the 'immediate' property instead") 1226 @property void delayAttachOnConnect(bool value) { setOption(ZMQ_DELAY_ATTACH_ON_CONNECT, value ? 1 : 0); } 1227 1228 deprecated("Use !ipv6 instead") 1229 @property bool ipv4Only() { return !ipv6; } 1230 1231 deprecated("Use ipv6 = !value instead") 1232 @property void ipv4Only(bool value) { ipv6 = !value; } 1233 1234 unittest 1235 { 1236 auto s = Socket(SocketType.xpub); 1237 const e = "inproc://unittest2"; 1238 s.bind(e); 1239 import core.time; 1240 s.threadAffinity = 1; 1241 assert(s.threadAffinity == 1); 1242 s.identity = cast(ubyte[]) [ 65, 66, 67 ]; 1243 assert(s.identity == [65, 66, 67]); 1244 s.identity = "foo"; 1245 assert(s.identity == [102, 111, 111]); 1246 s.rate = 200; 1247 assert(s.rate == 200); 1248 s.recoveryInterval = 5.seconds; 1249 assert(s.recoveryInterval == 5_000.msecs); 1250 s.sendBufferSize = 500; 1251 assert(s.sendBufferSize == 500); 1252 s.receiveBufferSize = 600; 1253 assert(s.receiveBufferSize == 600); 1254 version(Posix) { 1255 assert(s.fd > 2); // 0, 1 and 2 are the standard streams 1256 } 1257 assert(s.events == PollFlags.pollOut); 1258 assert(s.type == SocketType.xpub); 1259 s.linger = Duration.zero; 1260 assert(s.linger == Duration.zero); 1261 s.linger = 100_000.usecs; 1262 assert(s.linger == 100.msecs); 1263 s.linger = infiniteDuration; 1264 assert(s.linger == infiniteDuration); 1265 s.reconnectionInterval = 200_000.usecs; 1266 assert(s.reconnectionInterval == 200.msecs); 1267 s.backlog = 50; 1268 assert(s.backlog == 50); 1269 s.maxReconnectionInterval = 300_000.usecs; 1270 assert(s.maxReconnectionInterval == 300.msecs); 1271 s.maxMsgSize = 1000; 1272 assert(s.maxMsgSize == 1000); 1273 s.sendHWM = 500; 1274 assert(s.sendHWM == 500); 1275 s.receiveHWM = 600; 1276 assert(s.receiveHWM == 600); 1277 s.multicastHops = 2; 1278 assert(s.multicastHops == 2); 1279 s.receiveTimeout = 3.seconds; 1280 assert(s.receiveTimeout == 3_000_000.usecs); 1281 s.receiveTimeout = infiniteDuration; 1282 assert(s.receiveTimeout == infiniteDuration); 1283 s.sendTimeout = 2_000_000.usecs; 1284 assert(s.sendTimeout == 2.seconds); 1285 s.sendTimeout = infiniteDuration; 1286 assert(s.sendTimeout == infiniteDuration); 1287 s.tcpKeepalive = 1; 1288 assert(s.tcpKeepalive == 1); 1289 s.tcpKeepaliveCnt = 1; 1290 assert(s.tcpKeepaliveCnt == 1); 1291 s.tcpKeepaliveIdle = 0; 1292 assert(s.tcpKeepaliveIdle == 0); 1293 s.tcpKeepaliveIntvl = 0; 1294 assert(s.tcpKeepaliveIntvl == 0); 1295 s.immediate = true; 1296 assert(s.immediate); 1297 s.ipv6 = true; 1298 assert(s.ipv6); 1299 s.plainServer = true; 1300 assert(s.mechanism == Security.plain); 1301 assert(s.plainServer); 1302 debug (WithCurveTests) { 1303 assert(!s.curveServer); 1304 s.curveServer = true; 1305 assert(s.mechanism == Security.curve); 1306 assert(!s.plainServer); 1307 assert(s.curveServer); 1308 } 1309 s.plainServer = false; 1310 assert(s.mechanism == Security.none); 1311 assert(!s.plainServer); 1312 debug (WithCurveTests) assert(!s.curveServer); 1313 s.plainUsername = "foobar"; 1314 assert(s.getPlainUsername(new char[8]) == "foobar"); 1315 assert(s.mechanism == Security.plain); 1316 s.plainUsername = null; 1317 assert(s.mechanism == Security.none); 1318 s.plainPassword = "xyz"; 1319 assert(s.getPlainPassword(new char[8]) == "xyz"); 1320 assert(s.mechanism == Security.plain); 1321 s.plainPassword = null; 1322 assert(s.mechanism == Security.none); 1323 s.zapDomain = "my_zap_domain"; 1324 assert(s.zapDomain == "my_zap_domain"); 1325 s.typeOfService = 1; 1326 assert(s.typeOfService == 1); 1327 if (zmqHas("gssapi")) { 1328 s.gssapiPlaintext = true; 1329 assert(s.gssapiPlaintext); 1330 s.gssapiPrincipal = "myPrincipal"; 1331 assert(s.gssapiPrincipal == "myPrincipal"); 1332 s.gssapiServer = true; 1333 assert(s.gssapiServer); 1334 s.gssapiServicePrincipal = "myServicePrincipal"; 1335 assert(s.gssapiServicePrincipal == "myServicePrincipal"); 1336 } 1337 s.handshakeInterval = 60000.msecs; 1338 assert(s.handshakeInterval == 60000.msecs); 1339 1340 // Test write-only options 1341 s.conflate = true; 1342 } 1343 1344 debug (WithCurveTests) @system unittest 1345 { 1346 // The CURVE key options require some special setup, so we test them 1347 // separately. 1348 import std.array, std.range; 1349 auto binKey1 = iota(cast(ubyte) 0, cast(ubyte) 32).array(); 1350 auto z85Key1 = z85Encode(binKey1); 1351 auto binKey2 = iota(cast(ubyte) 32, cast(ubyte) 64).array(); 1352 auto z85Key2 = z85Encode(binKey2); 1353 auto zeroKey = repeat(cast(ubyte) 0).take(32).array(); 1354 assert (z85Key1 != z85Key2); 1355 1356 auto s = Socket(SocketType.req); 1357 s.curvePublicKey = zeroKey; 1358 s.curveSecretKey = zeroKey; 1359 s.curveServerKey = zeroKey; 1360 1361 s.curvePublicKey = binKey1; 1362 assert (s.curvePublicKey == binKey1); 1363 assert (s.curvePublicKeyZ85 == z85Key1); 1364 s.curvePublicKeyZ85 = z85Key2; 1365 assert (s.curvePublicKey == binKey2); 1366 assert (s.curvePublicKeyZ85 == z85Key2); 1367 assert (s.curveSecretKey == zeroKey); 1368 assert (s.curveServerKey == zeroKey); 1369 s.curvePublicKey = zeroKey; 1370 1371 s.curveSecretKey = binKey1; 1372 assert (s.curveSecretKey == binKey1); 1373 assert (s.curveSecretKeyZ85 == z85Key1); 1374 s.curveSecretKeyZ85 = z85Key2; 1375 assert (s.curveSecretKey == binKey2); 1376 assert (s.curveSecretKeyZ85 == z85Key2); 1377 assert (s.curvePublicKey == zeroKey); 1378 assert (s.curveServerKey == zeroKey); 1379 s.curveSecretKey = zeroKey; 1380 1381 s.curveServerKey = binKey1; 1382 assert (s.curveServerKey == binKey1); 1383 assert (s.curveServerKeyZ85 == z85Key1); 1384 s.curveServerKeyZ85 = z85Key2; 1385 assert (s.curveServerKey == binKey2); 1386 assert (s.curveServerKeyZ85 == z85Key2); 1387 assert (s.curvePublicKey == zeroKey); 1388 assert (s.curveSecretKey == zeroKey); 1389 s.curveServerKey = zeroKey; 1390 } 1391 1392 unittest 1393 { 1394 // Some options are only applicable to specific socket types. 1395 auto rt = Socket(SocketType.router); 1396 rt.routerMandatory = true; 1397 rt.probeRouter = true; 1398 rt.routerHandover = true; 1399 rt.connectionRID = cast(ubyte[]) [ 10, 20, 30 ]; 1400 auto xp = Socket(SocketType.xpub); 1401 xp.xpubVerbose = true; 1402 auto rq = Socket(SocketType.req); 1403 rq.reqCorrelate = true; 1404 rq.reqRelaxed = true; 1405 } 1406 1407 deprecated unittest 1408 { 1409 // Test deprecated socket options 1410 auto s = Socket(SocketType.req); 1411 assert(s.ipv4Only); 1412 assert(!s.delayAttachOnConnect); 1413 1414 // Test setters and getters together 1415 s.ipv4Only = false; 1416 assert(!s.ipv4Only); 1417 s.delayAttachOnConnect = true; 1418 assert(s.delayAttachOnConnect); 1419 } 1420 1421 /** 1422 Establishes a message filter. 1423 1424 Throws: 1425 $(REF ZmqException) if $(ZMQ) reports an error. 1426 Corresponds_to: 1427 $(ZMQREF zmq_msg_setsockopt()) with $(D ZMQ_SUBSCRIBE). 1428 */ 1429 void subscribe(const ubyte[] filterPrefix) 1430 { 1431 setArrayOption(ZMQ_SUBSCRIBE, filterPrefix); 1432 } 1433 /// ditto 1434 void subscribe(const char[] filterPrefix) 1435 { 1436 setArrayOption(ZMQ_SUBSCRIBE, filterPrefix); 1437 } 1438 1439 /// 1440 unittest 1441 { 1442 // Create a subscriber that accepts all messages that start with 1443 // the prefixes "foo" or "bar". 1444 auto sck = Socket(SocketType.sub); 1445 sck.subscribe("foo"); 1446 sck.subscribe("bar"); 1447 } 1448 1449 @system unittest 1450 { 1451 void sleep(int ms) { 1452 import core.thread, core.time; 1453 Thread.sleep(dur!"msecs"(ms)); 1454 } 1455 auto pub = Socket(SocketType.pub); 1456 pub.bind("inproc://zmqd_subscribe_unittest"); 1457 auto sub = Socket(SocketType.sub); 1458 sub.connect("inproc://zmqd_subscribe_unittest"); 1459 1460 pub.send("Hello"); 1461 sleep(100); 1462 sub.subscribe("He"); 1463 sub.subscribe(cast(ubyte[])['W', 'o']); 1464 sleep(100); 1465 pub.send("Heeee"); 1466 pub.send("World"); 1467 sleep(100); 1468 ubyte[5] buf; 1469 sub.receive(buf); 1470 assert(buf.asString() == "Heeee"); 1471 sub.receive(buf); 1472 assert(buf.asString() == "World"); 1473 } 1474 1475 /** 1476 Removes a message filter. 1477 1478 Throws: 1479 $(REF ZmqException) if $(ZMQ) reports an error. 1480 Corresponds_to: 1481 $(ZMQREF zmq_msg_setsockopt()) with $(D ZMQ_SUBSCRIBE). 1482 */ 1483 void unsubscribe(const ubyte[] filterPrefix) 1484 { 1485 setArrayOption(ZMQ_UNSUBSCRIBE, filterPrefix); 1486 } 1487 /// ditto 1488 void unsubscribe(const char[] filterPrefix) 1489 { 1490 setArrayOption(ZMQ_UNSUBSCRIBE, filterPrefix); 1491 } 1492 1493 /// 1494 unittest 1495 { 1496 // Subscribe to messages that start with "foo" or "bar". 1497 auto sck = Socket(SocketType.sub); 1498 sck.subscribe("foo"); 1499 sck.subscribe("bar"); 1500 // ... 1501 // From now on, only accept messages that start with "bar" 1502 sck.unsubscribe("foo"); 1503 } 1504 1505 /** 1506 Spawns a PAIR socket that publishes socket state changes (_events) over 1507 the INPROC transport to the given _endpoint. 1508 1509 Which event types should be published may be selected by bitwise-ORing 1510 together different $(REF EventType) flags in the $(D events) parameter. 1511 1512 Throws: 1513 $(REF ZmqException) if $(ZMQ) reports an error. 1514 Corresponds_to: 1515 $(ZMQREF zmq_socket_monitor()) 1516 See_also: 1517 $(FREF receiveEvent), which receives and parses event messages. 1518 */ 1519 void monitor(const char[] endpoint, EventType events = EventType.all) 1520 { 1521 if (trusted!zmq_socket_monitor(m_socket, zeroTermString(endpoint), events) < 0) { 1522 throw new ZmqException; 1523 } 1524 } 1525 1526 /// 1527 unittest 1528 { 1529 auto sck = Socket(SocketType.pub); 1530 sck.monitor("inproc://zmqd_monitor_unittest", 1531 EventType.accepted | EventType.closed); 1532 } 1533 1534 /** 1535 The $(D void*) pointer used by the underlying C API to refer to the socket. 1536 1537 If the object has not been initialized, this function returns $(D null). 1538 */ 1539 @property inout(void)* handle() inout pure nothrow 1540 { 1541 return m_socket; 1542 } 1543 1544 /** 1545 Whether this $(REF Socket) object has been _initialized, i.e. whether it 1546 refers to a valid $(ZMQ) socket. 1547 */ 1548 @property bool initialized() const pure nothrow 1549 { 1550 return m_socket != null; 1551 } 1552 1553 /// 1554 unittest 1555 { 1556 Socket sck; 1557 assert (!sck.initialized); 1558 sck = Socket(SocketType.sub); 1559 assert (sck.initialized); 1560 sck.close(); 1561 assert (!sck.initialized); 1562 } 1563 1564 private: 1565 // Helper function for ~this() and close() 1566 bool nothrowClose() nothrow 1567 { 1568 if (m_socket != null) { 1569 if (trusted!zmq_close(m_socket) != 0) return false; 1570 m_socket = null; 1571 } 1572 return true; 1573 } 1574 1575 import std.traits; 1576 1577 T getOption(T)(int option) @trusted 1578 if (isScalarType!T) 1579 { 1580 T buf; 1581 auto len = T.sizeof; 1582 if (zmq_getsockopt(m_socket, option, &buf, &len) != 0) { 1583 throw new ZmqException; 1584 } 1585 assert(len == T.sizeof); 1586 return buf; 1587 } 1588 1589 void setOption(T)(int option, T value) @trusted 1590 if (isScalarType!T) 1591 { 1592 if (zmq_setsockopt(m_socket, option, &value, value.sizeof) != 0) { 1593 throw new ZmqException; 1594 } 1595 } 1596 1597 T[] getArrayOption(T)(int option, T[] buf) @trusted 1598 if (isScalarType!T) 1599 { 1600 static assert (T.sizeof == 1); 1601 auto len = buf.length; 1602 if (zmq_getsockopt(m_socket, option, buf.ptr, &len) != 0) { 1603 throw new ZmqException; 1604 } 1605 return buf[0 .. len]; 1606 } 1607 1608 void setArrayOption()(int option, const void[] value) 1609 { 1610 if (trusted!zmq_setsockopt(m_socket, option, ptr(value), value.length) != 0) { 1611 throw new ZmqException; 1612 } 1613 } 1614 1615 char[] getCStringOption(int option, char[] buf) 1616 { 1617 auto ret = getArrayOption(option, buf); 1618 assert (ret.length && ret[$-1] == '\0'); 1619 return ret[0 .. $-1]; 1620 } 1621 1622 enum : size_t 1623 { 1624 keySizeBin = 32, 1625 keyBufSizeBin = keySizeBin, 1626 keySizeZ85 = 40, 1627 keyBufSizeZ85 = keySizeZ85 + 1, 1628 } 1629 1630 ubyte[] getCurveKey(int option, ubyte[] buf) 1631 { 1632 if (buf.length < keyBufSizeBin) { 1633 import core.exception: RangeError; 1634 throw new RangeError; 1635 } 1636 return getArrayOption(option, buf[0 .. keyBufSizeBin]); 1637 } 1638 1639 char[] getCurveKeyZ85(int option, char[] buf) 1640 { 1641 if (buf.length < keyBufSizeZ85) { 1642 import core.exception: RangeError; 1643 throw new RangeError; 1644 } 1645 return getCStringOption(option, buf[0 .. keyBufSizeZ85]); 1646 } 1647 1648 void setCurveKey(int option, const ubyte[] value) 1649 { 1650 if (value.length != keySizeBin) throw new Exception("Invalid key size"); 1651 setArrayOption(option, value); 1652 } 1653 1654 void setCurveKeyZ85(int option, const char[] value) 1655 { 1656 if (value.length != keySizeZ85) throw new Exception("Invalid key size"); 1657 setArrayOption(option, value); 1658 } 1659 1660 Context m_context; 1661 SocketType m_type; 1662 void* m_socket; 1663 } 1664 1665 unittest 1666 { 1667 auto s1 = Socket(SocketType.pair); 1668 auto s2 = Socket(SocketType.pair); 1669 s1.bind("inproc://unittest"); 1670 s2.connect("inproc://unittest"); 1671 s1.send("Hello World!"); 1672 ubyte[12] buf; 1673 const len = s2.receive(buf[]); 1674 assert (len == 12); 1675 assert (buf == "Hello World!"); 1676 } 1677 1678 1679 version (Windows) { 1680 alias PlatformFD = SOCKET; 1681 } else version (Posix) { 1682 alias PlatformFD = int; 1683 } 1684 1685 /** 1686 The native socket file descriptor type. 1687 1688 This is an alias for $(D SOCKET) on Windows and $(D int) on POSIX systems. 1689 */ 1690 alias FD = PlatformFD; 1691 1692 1693 /** 1694 Starts the built-in $(ZMQ) _proxy. 1695 1696 This function never returns normally, but it may throw an exception. This could 1697 happen if the context associated with either of the specified sockets is 1698 manually destroyed in a different thread. 1699 1700 Throws: 1701 $(REF ZmqException) if $(ZMQ) reports an error. 1702 Corresponds_to: 1703 $(ZMQREF zmq_proxy()) 1704 See_Also: 1705 $(FREF steerableProxy) 1706 */ 1707 void proxy(ref Socket frontend, ref Socket backend) 1708 { 1709 const rc = trusted!zmq_proxy(frontend.handle, backend.handle, null); 1710 assert (rc == -1); 1711 throw new ZmqException; 1712 } 1713 1714 /// ditto 1715 void proxy(ref Socket frontend, ref Socket backend, ref Socket capture) 1716 { 1717 const rc = trusted!zmq_proxy(frontend.handle, backend.handle, capture.handle); 1718 assert (rc == -1); 1719 throw new ZmqException; 1720 } 1721 1722 1723 /** 1724 Starts the built-in $(ZMQ) proxy with _control flow. 1725 1726 Note that the order of the two last parameters is reversed compared to 1727 $(ZMQREF zmq_proxy_steerable()). That is, the $(D control) socket always 1728 comes before the $(D capture) socket. Furthermore, unlike in $(ZMQ), 1729 $(D control) is mandatory. (Without the _control socket one can simply 1730 use $(FREF proxy).) 1731 1732 Throws: 1733 $(REF ZmqException) if $(ZMQ) reports an error. 1734 Corresponds_to: 1735 $(ZMQREF zmq_proxy_steerable()) 1736 See_Also: 1737 $(FREF proxy) 1738 */ 1739 void steerableProxy(ref Socket frontend, ref Socket backend, ref Socket control) 1740 { 1741 const rc = trusted!zmq_proxy_steerable( 1742 frontend.handle, backend.handle, null, control.handle); 1743 if (rc == -1) throw new ZmqException; 1744 } 1745 1746 /// ditto 1747 void steerableProxy(ref Socket frontend, ref Socket backend, ref Socket control, ref Socket capture) 1748 { 1749 const rc = trusted!zmq_proxy_steerable( 1750 frontend.handle, backend.handle, capture.handle, control.handle); 1751 if (rc == -1) throw new ZmqException; 1752 } 1753 1754 @system unittest 1755 { 1756 import core.thread; 1757 auto t = new Thread(() { 1758 auto frontend = Socket(SocketType.router); 1759 frontend.bind("inproc://zmqd_steerableProxy_unittest_fe"); 1760 auto backend = Socket(SocketType.dealer); 1761 backend.bind("inproc://zmqd_steerableProxy_unittest_be"); 1762 auto controllee = Socket(SocketType.pair); 1763 controllee.bind("inproc://zmqd_steerableProxy_unittest_ctl"); 1764 steerableProxy(frontend, backend, controllee); 1765 }); 1766 t.start(); 1767 auto client = Socket(SocketType.req); 1768 client.connect("inproc://zmqd_steerableProxy_unittest_fe"); 1769 auto server = Socket(SocketType.rep); 1770 server.connect("inproc://zmqd_steerableProxy_unittest_be"); 1771 auto controller = Socket(SocketType.pair); 1772 controller.connect("inproc://zmqd_steerableProxy_unittest_ctl"); 1773 1774 auto cf = Frame(1); 1775 cf.data[0] = 86; 1776 client.send(cf); 1777 auto sf = Frame(); 1778 server.receive(sf); 1779 assert(sf.size == 1 && sf.data[0] == 86); 1780 sf.data[0] = 87; 1781 server.send(sf); 1782 client.receive(cf); 1783 assert(cf.size == 1 && cf.data[0] == 87); 1784 1785 controller.send("TERMINATE"); 1786 t.join(); 1787 } 1788 1789 @system unittest 1790 { 1791 import core.thread; 1792 auto t = new Thread(() { 1793 auto frontend = Socket(SocketType.pull); 1794 frontend.bind("inproc://zmqd_steerableProxy2_unittest_fe"); 1795 auto backend = Socket(SocketType.push); 1796 backend.bind("inproc://zmqd_steerableProxy2_unittest_be"); 1797 auto controllee = Socket(SocketType.pair); 1798 controllee.bind("inproc://zmqd_steerableProxy2_unittest_ctl"); 1799 auto capture = Socket(SocketType.push); 1800 capture.bind("inproc://zmqd_steerableProxy2_unittest_cpt"); 1801 steerableProxy(frontend, backend, controllee, capture); 1802 }); 1803 t.start(); 1804 auto client = Socket(SocketType.push); 1805 client.connect("inproc://zmqd_steerableProxy2_unittest_fe"); 1806 auto server = Socket(SocketType.pull); 1807 server.connect("inproc://zmqd_steerableProxy2_unittest_be"); 1808 auto controller = Socket(SocketType.pair); 1809 controller.connect("inproc://zmqd_steerableProxy2_unittest_ctl"); 1810 auto capturer = Socket(SocketType.pull); 1811 capturer.connect("inproc://zmqd_steerableProxy2_unittest_cpt"); 1812 1813 auto cf = Frame(1); 1814 cf.data[0] = 86; 1815 client.send(cf); 1816 auto sf = Frame(); 1817 server.receive(sf); 1818 assert(sf.size == 1 && sf.data[0] == 86); 1819 auto pf = Frame(); 1820 capturer.receive(pf); 1821 assert(pf.size == 1 && pf.data[0] == 86); 1822 1823 controller.send("TERMINATE"); 1824 t.join(); 1825 } 1826 1827 1828 deprecated("zmqd.poll() has a new signature as of v0.4") 1829 uint poll(zmq_pollitem_t[] items, Duration timeout = infiniteDuration) 1830 { 1831 import std.conv: to; 1832 const n = trusted!zmq_poll( 1833 ptr(items), 1834 to!int(items.length), 1835 timeout == infiniteDuration ? -1 : to!int(timeout.total!"msecs"())); 1836 if (n < 0) throw new ZmqException; 1837 return cast(uint) n; 1838 } 1839 1840 1841 /** 1842 Input/output multiplexing. 1843 1844 The $(D timeout) parameter may have the special value $(REF infiniteDuration) 1845 which means no _timeout. This is translated to an argument value of -1 in the 1846 C API. 1847 1848 Returns: 1849 The number of $(REF PollItem) structures with events signalled in 1850 $(REF PollItem.returnedEvents), or 0 if no events have been signalled. 1851 Throws: 1852 $(REF ZmqException) if $(ZMQ) reports an error. 1853 Corresponds_to: 1854 $(ZMQREF zmq_poll()) 1855 */ 1856 uint poll(PollItem[] items, Duration timeout = infiniteDuration) @trusted 1857 { 1858 // Here we use a trick where we pretend the array of PollItems is 1859 // actually an array of zmq_pollitem_t, to avoid an unnecessary 1860 // allocation. For this to work, PollItem must have the exact 1861 // same size as zmq_pollitem_t. 1862 static assert (PollItem.sizeof == zmq_pollitem_t.sizeof); 1863 1864 import std.conv: to; 1865 const n = zmq_poll( 1866 cast(zmq_pollitem_t*) items.ptr, 1867 to!int(items.length), 1868 timeout == infiniteDuration ? -1 : to!int(timeout.total!"msecs"())); 1869 if (n < 0) throw new ZmqException; 1870 return cast(uint) n; 1871 } 1872 1873 1874 /// 1875 @system unittest 1876 { 1877 auto socket1 = zmqd.Socket(zmqd.SocketType.pull); 1878 socket1.bind("inproc://zmqd_poll_example"); 1879 1880 import std.socket; 1881 auto socket2 = new std.socket.Socket( 1882 AddressFamily.INET, 1883 std.socket.SocketType.DGRAM); 1884 socket2.bind(new InternetAddress(InternetAddress.ADDR_ANY, 5678)); 1885 1886 auto socket3 = zmqd.Socket(zmqd.SocketType.push); 1887 socket3.connect("inproc://zmqd_poll_example"); 1888 socket3.send("test"); 1889 1890 import core.thread: Thread; 1891 Thread.sleep(10.msecs); 1892 1893 auto items = [ 1894 PollItem(socket1, PollFlags.pollIn), 1895 PollItem(socket2, PollFlags.pollIn | PollFlags.pollOut), 1896 PollItem(socket3, PollFlags.pollIn), 1897 ]; 1898 1899 const n = poll(items, 100.msecs); 1900 assert (n == 2); 1901 assert (items[0].returnedEvents == PollFlags.pollIn); 1902 assert (items[1].returnedEvents == PollFlags.pollOut); 1903 assert (items[2].returnedEvents == 0); 1904 socket2.close(); 1905 } 1906 1907 1908 /** 1909 $(FREF poll) event flags. 1910 1911 These are described in the $(ZMQREF zmq_poll()) manual. 1912 */ 1913 enum PollFlags 1914 { 1915 pollIn = ZMQ_POLLIN, /// Corresponds to $(D ZMQ_POLLIN) 1916 pollOut = ZMQ_POLLOUT, /// Corresponds to $(D ZMQ_POLLOUT) 1917 pollErr = ZMQ_POLLERR, /// Corresponds to $(D ZMQ_POLLERR) 1918 } 1919 1920 1921 /++ 1922 A structure that specifies a socket to be monitored by $(FREF poll) as well 1923 as the events to poll for, and, when $(FREF poll) returns, the events that 1924 occurred. 1925 1926 Warning: 1927 $(D PollItem) objects do not store $(STDREF socket,Socket) references, 1928 only the corresponding native file descriptors. This means that the 1929 references have to be stored elsewhere, or the objects may be garbage 1930 collected, invalidating the sockets before or while $(FREF poll) executes. 1931 --- 1932 // Not OK 1933 auto p1 = PollItem(new std.socket.Socket(/*...*/), PollFlags.pollIn); 1934 1935 // OK 1936 auto s = new std.socket.Socket(/*...*/); 1937 auto p2 = PollItem(s, PollFlags.pollIn); 1938 --- 1939 Corresponds_to: 1940 $(D $(ZMQAPI zmq_poll,zmq_pollitem_t)) 1941 +/ 1942 struct PollItem 1943 { 1944 /// Constructs a $(REF PollItem) for monitoring a $(ZMQ) socket. 1945 this(ref zmqd.Socket socket, PollFlags events) nothrow 1946 { 1947 m_pollItem = zmq_pollitem_t(socket.handle, 0, cast(short) events, 0); 1948 } 1949 1950 import std.socket; 1951 /** 1952 Constructs a $(REF PollItem) for monitoring a standard socket referenced 1953 by a $(STDREF socket,Socket). 1954 */ 1955 this(std.socket.Socket socket, PollFlags events) @system 1956 { 1957 this(socket.handle, events); 1958 } 1959 1960 /** 1961 Constructs a $(REF PollItem) for monitoring a standard socket referenced 1962 by a native file descriptor. 1963 */ 1964 this(FD fd, PollFlags events) pure nothrow 1965 { 1966 m_pollItem = zmq_pollitem_t(null, fd, cast(short) events, 0); 1967 } 1968 1969 /** 1970 Requested _events. 1971 1972 Corresponds_to: 1973 $(D $(ZMQAPI zmq_poll,zmq_pollitem_t.events)) 1974 */ 1975 @property void requestedEvents(PollFlags events) pure nothrow 1976 { 1977 m_pollItem.events = cast(short) events; 1978 } 1979 1980 /// ditto 1981 @property PollFlags requestedEvents() const pure nothrow 1982 { 1983 return cast(typeof(return)) m_pollItem.events; 1984 } 1985 1986 /** 1987 Returned events. 1988 1989 Corresponds_to: 1990 $(D $(ZMQAPI zmq_poll,zmq_pollitem_t.revents)) 1991 */ 1992 @property PollFlags returnedEvents() const pure nothrow 1993 { 1994 return cast(typeof(return)) m_pollItem.revents; 1995 } 1996 1997 private: 1998 zmq_pollitem_t m_pollItem; 1999 } 2000 2001 2002 /** 2003 An object that encapsulates a $(ZMQ) message frame. 2004 2005 This $(D struct) is a wrapper around a $(D zmq_msg_t) object. 2006 A default-initialized $(D Frame) is not a valid $(ZMQ) message frame; it 2007 should always be explicitly initialized upon construction using 2008 $(FREF _Frame.opCall). Alternatively, it may be initialized later with 2009 $(FREF _Frame.rebuild). 2010 --- 2011 Frame msg1; // Invalid frame 2012 auto msg2 = Frame(); // Empty frame 2013 auto msg3 = Frame(1024); // 1K frame 2014 msg1.rebuild(2048); // msg1 now has size 2K 2015 msg2.rebuild(2048); // ...and so does msg2 2016 --- 2017 When a $(D Frame) object is destroyed, $(ZMQREF zmq_msg_close()) is 2018 called on the underlying $(D zmq_msg_t). 2019 2020 A $(D Frame) cannot be copied by normal assignment; use $(FREF _Frame.copy) 2021 for this. 2022 */ 2023 struct Frame 2024 { 2025 @safe: 2026 /** 2027 Initializes an empty $(ZMQ) message frame. 2028 2029 Throws: 2030 $(REF ZmqException) if $(ZMQ) reports an error. 2031 Corresponds_to: 2032 $(ZMQREF zmq_msg_init()) 2033 */ 2034 static Frame opCall() 2035 { 2036 Frame f; 2037 f.init(); 2038 return f; 2039 } 2040 2041 /// 2042 unittest 2043 { 2044 auto msg = Frame(); 2045 assert(msg.size == 0); 2046 } 2047 2048 /** $(ANCHOR Frame.opCall_size) 2049 Initializes a $(ZMQ) message frame of the specified _size. 2050 2051 Throws: 2052 $(REF ZmqException) if $(ZMQ) reports an error. 2053 Corresponds_to: 2054 $(ZMQREF zmq_msg_init_size()) 2055 */ 2056 static Frame opCall(size_t size) 2057 { 2058 Frame m; 2059 m.init(size); 2060 return m; 2061 } 2062 2063 /// 2064 unittest 2065 { 2066 auto msg = Frame(123); 2067 assert(msg.size == 123); 2068 } 2069 2070 /** $(ANCHOR Frame.opCall_data) 2071 Initializes a $(ZMQ) message frame from a supplied buffer. 2072 2073 If $(D free) is not specified, $(D data) $(EM must) refer to a slice of 2074 memory which has been allocated by the garbage collector (typically using 2075 operator $(D new)). Ownership will then be transferred temporarily to 2076 $(ZMQ), and then transferred back to the GC when $(ZMQ) is done using the 2077 buffer. 2078 2079 For memory which is $(EM not) garbage collected, the argument $(D free) 2080 must be a pointer to a function that will release the memory, which will 2081 be called when $(ZMQ) no longer needs the buffer. $(D free) must point to 2082 a function with the following signature: 2083 --- 2084 extern(C) void f(void* d, void* h) nothrow; 2085 --- 2086 When it is called, $(D d) will be equal to $(D data.ptr), while $(D h) will 2087 be equal to $(D hint) (which is optional and null by default). 2088 2089 $(D free) is passed directly to the underlying $(ZMQ) C function, which is 2090 why it needs to be $(D extern(C)). 2091 2092 Warning: 2093 Some care must be taken when using this function, as there is no telling 2094 when, whether, or in which thread $(ZMQ) relinquishes ownership of the 2095 buffer. Client code should therefore avoid retaining any references to 2096 it, including slices that contain, overlap with or are contained in 2097 $(D data). For the "non-GC" version, client code should in general not 2098 retain slices or pointers to any memory which will be released when 2099 $(D free) is called. 2100 See_Also: 2101 $(REF Frame.FreeData) 2102 Throws: 2103 $(REF ZmqException) if $(ZMQ) reports an error. 2104 Corresponds_to: 2105 $(ZMQREF zmq_msg_init_data()) 2106 */ 2107 static Frame opCall(ubyte[] data) @system 2108 { 2109 Frame m; 2110 m.init(data); 2111 return m; 2112 } 2113 2114 /// ditto 2115 static Frame opCall(ubyte[] data, FreeData free, void* hint = null) @system 2116 { 2117 Frame m; 2118 m.init(data, free, hint); 2119 return m; 2120 } 2121 2122 /// 2123 @system unittest 2124 { 2125 // Garbage-collected memory 2126 auto buf = new ubyte[123]; 2127 auto msg = Frame(buf); 2128 assert(msg.size == buf.length); 2129 assert(msg.data.ptr == buf.ptr); 2130 } 2131 2132 /// 2133 @system unittest 2134 { 2135 // Manually managed memory 2136 import core.stdc.stdlib: malloc, free; 2137 static extern(C) void myFree(void* data, void* hint) nothrow { free(data); } 2138 auto buf = (cast(ubyte*) malloc(10))[0 .. 10]; 2139 auto msg = Frame(buf, &myFree); 2140 assert(msg.size == buf.length); 2141 assert(msg.data.ptr == buf.ptr); 2142 } 2143 2144 @system unittest 2145 { 2146 // Non-documentation unittest. Let's see if the data *actually* gets freed. 2147 ubyte buffer = 81; 2148 bool released = false; 2149 static extern(C) void myFree(void* data, void* hint) nothrow 2150 { 2151 auto typedData = cast(ubyte*) data; 2152 auto typedHint = cast(bool*) hint; 2153 assert(*typedData == 81); 2154 assert(!*typedHint); 2155 *typedData = 0; 2156 *typedHint = true; 2157 } 2158 // New scope, so the frame gets released at the end. 2159 { 2160 auto frame = Frame((&buffer)[0 .. 1], &myFree, &released); 2161 assert(frame.size == 1); 2162 assert(frame.data.ptr == &buffer); 2163 assert(buffer == 81); 2164 assert(!released); 2165 } 2166 assert(buffer == 0); 2167 assert(released); 2168 } 2169 2170 /** 2171 The function pointer type for memory-freeing callback functions passed to 2172 $(D $(LINK2 #Frame.opCall_data,Frame(ubyte[], _FreeData, void*))). 2173 */ 2174 @system alias extern(C) void function(void*, void*) nothrow FreeData; 2175 2176 /** 2177 Reinitializes the Frame object as an empty message. 2178 2179 This function will first call $(FREF Frame.close) to release the 2180 resources associated with the message frame, and then it will 2181 initialize it anew, exactly as if it were constructed with 2182 $(D $(LINK2 #Frame.opCall,Frame())). 2183 2184 Throws: 2185 $(REF ZmqException) if $(ZMQ) reports an error. 2186 Corresponds_to: 2187 $(ZMQREF zmq_msg_close()) followed by $(ZMQREF zmq_msg_init()) 2188 */ 2189 void rebuild() 2190 { 2191 close(); 2192 init(); 2193 } 2194 2195 /// 2196 unittest 2197 { 2198 auto msg = Frame(256); 2199 assert (msg.size == 256); 2200 msg.rebuild(); 2201 assert (msg.size == 0); 2202 } 2203 2204 /** 2205 Reinitializes the Frame object to a specified size. 2206 2207 This function will first call $(FREF Frame.close) to release the 2208 resources associated with the message frame, and then it will 2209 initialize it anew, exactly as if it were constructed with 2210 $(D $(LINK2 #Frame.opCall_size,Frame(size))). 2211 2212 Throws: 2213 $(REF ZmqException) if $(ZMQ) reports an error. 2214 Corresponds_to: 2215 $(ZMQREF zmq_msg_close()) followed by $(ZMQREF zmq_msg_init_size()). 2216 */ 2217 void rebuild(size_t size) 2218 { 2219 close(); 2220 init(size); 2221 } 2222 2223 /// 2224 unittest 2225 { 2226 auto msg = Frame(256); 2227 assert (msg.size == 256); 2228 msg.rebuild(1024); 2229 assert (msg.size == 1024); 2230 } 2231 2232 /** 2233 Reinitializes the Frame object from a supplied buffer. 2234 2235 This function will first call $(FREF Frame.close) to release the 2236 resources associated with the message frame, and then it will 2237 initialize it anew, exactly as if it were constructed with 2238 $(D Frame(data)) or $(D Frame(data, free, hint)). 2239 2240 Some care must be taken when using these functions. Please read the 2241 $(D $(LINK2 #Frame.opCall_data,Frame(ubyte[]))) documentation. 2242 2243 Throws: 2244 $(REF ZmqException) if $(ZMQ) reports an error. 2245 Corresponds_to: 2246 $(ZMQREF zmq_msg_close()) followed by $(ZMQREF zmq_msg_init_data()). 2247 */ 2248 void rebuild(ubyte[] data) @system 2249 { 2250 close(); 2251 init(data); 2252 } 2253 2254 /// ditto 2255 void rebuild(ubyte[] data, FreeData free, void* hint = null) @system 2256 { 2257 close(); 2258 init(data, free, hint); 2259 } 2260 2261 /// 2262 @system unittest 2263 { 2264 // Garbage-collected memory 2265 auto msg = Frame(256); 2266 assert (msg.size == 256); 2267 auto buf = new ubyte[123]; 2268 msg.rebuild(buf); 2269 assert(msg.size == buf.length); 2270 assert(msg.data.ptr == buf.ptr); 2271 } 2272 2273 /// 2274 @system unittest 2275 { 2276 // Manually managed memory 2277 import core.stdc.stdlib: malloc, free; 2278 static extern(C) void myFree(void* data, void* hint) nothrow { free(data); } 2279 2280 auto msg = Frame(256); 2281 assert (msg.size == 256); 2282 auto buf = (cast(ubyte*) malloc(10))[0 .. 10]; 2283 msg.rebuild(buf, &myFree); 2284 assert(msg.size == buf.length); 2285 assert(msg.data.ptr == buf.ptr); 2286 } 2287 2288 @disable this(this); 2289 2290 /** 2291 Releases the $(ZMQ) message frame when the $(D Frame) is destroyed. 2292 2293 This destructor never throws, which means that any errors will go 2294 undetected. If this is undesirable, call $(FREF Frame.close) before 2295 the $(D Frame) is destroyed. 2296 2297 Corresponds_to: 2298 $(ZMQREF zmq_msg_close()) 2299 */ 2300 ~this() nothrow 2301 { 2302 if (m_initialized) { 2303 immutable rc = trusted!zmq_msg_close(&m_msg); 2304 assert(rc == 0, "zmq_msg_close failed: Invalid message frame"); 2305 } 2306 } 2307 2308 /** 2309 Releases the $(ZMQ) message frame. 2310 2311 Note that the frame will be automatically released when the $(D Frame) 2312 object is destroyed, so it is often not necessary to call this method 2313 manually. 2314 2315 Throws: 2316 $(REF ZmqException) if $(ZMQ) reports an error. 2317 Corresponds_to: 2318 $(ZMQREF zmq_msg_close()) 2319 */ 2320 void close() 2321 { 2322 if (m_initialized) { 2323 if (trusted!zmq_msg_close(&m_msg) != 0) { 2324 throw new ZmqException; 2325 } 2326 m_initialized = false; 2327 } 2328 } 2329 2330 /** 2331 Copies frame content to another message frame. 2332 2333 $(D copy()) returns a new $(D Frame) object, while $(D copyTo(dest)) 2334 copies the contents of this $(D Frame) into $(D dest). $(D dest) must 2335 be a valid (i.e. initialized) $(D Frame). 2336 2337 Warning: 2338 These functions may not do what you think they do. Please refer 2339 to $(ZMQAPI zmq_msg_copy(),the $(ZMQ) manual) for details. 2340 Throws: 2341 $(REF ZmqException) if $(ZMQ) reports an error. 2342 Corresponds_to: 2343 $(ZMQREF zmq_msg_copy()) 2344 */ 2345 Frame copy() 2346 in(m_initialized) 2347 { 2348 auto cp = Frame(); 2349 copyTo(cp); 2350 return cp; 2351 } 2352 2353 /// ditto 2354 void copyTo(ref Frame dest) 2355 in(m_initialized) 2356 { 2357 if (trusted!zmq_msg_copy(&dest.m_msg, &m_msg) != 0) { 2358 throw new ZmqException; 2359 } 2360 } 2361 2362 /// 2363 unittest 2364 { 2365 import std.string: representation; 2366 auto msg1 = Frame(3); 2367 msg1.data[] = "foo".representation; 2368 auto msg2 = msg1.copy(); 2369 assert (msg2.data.asString() == "foo"); 2370 } 2371 2372 /** 2373 Moves frame content to another message frame. 2374 2375 $(D move()) returns a new $(D Frame) object, while $(D moveTo(dest)) 2376 moves the contents of this $(D Frame) to $(D dest). $(D dest) must 2377 be a valid (i.e. initialized) $(D Frame). 2378 2379 Throws: 2380 $(REF ZmqException) if $(ZMQ) reports an error. 2381 Corresponds_to: 2382 $(ZMQREF zmq_msg_move()) 2383 */ 2384 Frame move() 2385 in(m_initialized) 2386 { 2387 auto m = Frame(); 2388 moveTo(m); 2389 return m; 2390 } 2391 2392 /// ditto 2393 void moveTo(ref Frame dest) 2394 in(m_initialized) 2395 { 2396 if (trusted!zmq_msg_move(&dest.m_msg, &m_msg) != 0) { 2397 throw new ZmqException; 2398 } 2399 } 2400 2401 /// 2402 unittest 2403 { 2404 import std.string: representation; 2405 auto msg1 = Frame(3); 2406 msg1.data[] = "foo".representation; 2407 auto msg2 = msg1.move(); 2408 assert (msg1.size == 0); 2409 assert (msg2.data.asString() == "foo"); 2410 } 2411 2412 /** 2413 The message frame content _size in bytes. 2414 2415 Corresponds_to: 2416 $(ZMQREF zmq_msg_size()) 2417 */ 2418 @property size_t size() nothrow 2419 in(m_initialized) 2420 { 2421 return trusted!zmq_msg_size(&m_msg); 2422 } 2423 2424 /// 2425 unittest 2426 { 2427 auto msg = Frame(123); 2428 assert(msg.size == 123); 2429 } 2430 2431 /** 2432 Retrieves the message frame content. 2433 2434 Corresponds_to: 2435 $(ZMQREF zmq_msg_data()) 2436 */ 2437 @property ubyte[] data() @trusted nothrow 2438 in(m_initialized) 2439 { 2440 return (cast(ubyte*) zmq_msg_data(&m_msg))[0 .. size]; 2441 } 2442 2443 /// 2444 unittest 2445 { 2446 import std.string: representation; 2447 auto msg = Frame(3); 2448 assert(msg.data.length == 3); 2449 msg.data[] = "foo".representation; // Slice operator -> array copy. 2450 assert(msg.data.asString() == "foo"); 2451 } 2452 2453 /** 2454 Whether there are _more message frames to retrieve. 2455 2456 Corresponds_to: 2457 $(ZMQREF zmq_msg_more()) 2458 */ 2459 @property bool more() nothrow 2460 in(m_initialized) 2461 { 2462 return !!trusted!zmq_msg_more(&m_msg); 2463 } 2464 2465 /** 2466 The file descriptor of the socket the message was read from. 2467 2468 Throws: 2469 $(REF ZmqException) if $(ZMQ) reports an error. 2470 Corresponds_to: 2471 $(ZMQREF zmq_msg_get()) with $(D ZMQ_SRCFD). 2472 */ 2473 @property FD sourceFD() 2474 { 2475 return cast(FD) getProperty(ZMQ_SRCFD); 2476 } 2477 2478 /** 2479 Whether the message MAY share underlying storage with another copy. 2480 2481 Throws: 2482 $(REF ZmqException) if $(ZMQ) reports an error. 2483 Corresponds_to: 2484 $(ZMQREF zmq_msg_get()) with $(D ZMQ_SHARED). 2485 */ 2486 @property bool sharedStorage() 2487 { 2488 return !!getProperty(ZMQ_SHARED); 2489 } 2490 2491 unittest 2492 { 2493 import std.string: representation; 2494 auto msg1 = Frame(100); // Big message to avoid small-string optimisation 2495 msg1.data[0 .. 3] = "foo".representation; 2496 assert (!msg1.sharedStorage); 2497 auto msg2 = msg1.copy(); 2498 assert (msg2.sharedStorage); // Warning: The ZMQ docs only say that this MAY be true 2499 assert (msg2.data[0 .. 3].asString() == "foo"); 2500 } 2501 2502 /** 2503 Gets message _metadata. 2504 2505 $(D metadataUnsafe()) is faster than $(D metadata()) because it 2506 directly returns the array which comes from $(ZMQREF zmq_msg_gets()), 2507 whereas the latter returns a freshly GC-allocated copy of it. However, 2508 the array returned by $(D metadataUnsafe()) is owned by the underlying 2509 $(ZMQ) message and gets destroyed along with it, so care must be 2510 taken when using this function. 2511 2512 Throws: 2513 $(REF ZmqException) if $(ZMQ) reports an error. 2514 Corresponds_to: 2515 $(ZMQREF zmq_msg_gets()) 2516 */ 2517 char[] metadata(const char[] property) @trusted 2518 in(m_initialized) 2519 { 2520 return metadataUnsafe(property).dup; 2521 } 2522 2523 /// ditto 2524 const(char)[] metadataUnsafe(const char[] property) @system 2525 in(m_initialized) 2526 { 2527 if (auto value = zmq_msg_gets(handle, zeroTermString(property))) { 2528 import std.string: fromStringz; 2529 return fromStringz(value); 2530 } else { 2531 throw new ZmqException(); 2532 } 2533 } 2534 2535 /** 2536 A pointer to the underlying $(D zmq_msg_t). 2537 */ 2538 @property inout(zmq_msg_t)* handle() inout pure nothrow 2539 { 2540 return &m_msg; 2541 } 2542 2543 private: 2544 private void init() 2545 in(!m_initialized) 2546 out(; m_initialized) 2547 { 2548 if (trusted!zmq_msg_init(&m_msg) != 0) { 2549 throw new ZmqException; 2550 } 2551 m_initialized = true; 2552 } 2553 2554 private void init(size_t size) 2555 in(!m_initialized) 2556 out(; m_initialized) 2557 { 2558 if (trusted!zmq_msg_init_size(&m_msg, size) != 0) { 2559 throw new ZmqException; 2560 } 2561 m_initialized = true; 2562 } 2563 2564 private void init(ubyte[] data) @system 2565 in(!m_initialized) 2566 out(; m_initialized) 2567 { 2568 import core.memory; 2569 static extern(C) void zmqd_Frame_init_gcFree(void* dataPtr, void* block) nothrow 2570 { 2571 GC.removeRoot(dataPtr); 2572 GC.clrAttr(block, GC.BlkAttr.NO_MOVE); 2573 } 2574 2575 GC.addRoot(data.ptr); 2576 scope(failure) GC.removeRoot(data.ptr); 2577 2578 auto block = GC.addrOf(data.ptr); 2579 immutable movable = block && !(GC.getAttr(block) & GC.BlkAttr.NO_MOVE); 2580 GC.setAttr(block, GC.BlkAttr.NO_MOVE); 2581 scope(failure) if (movable) GC.clrAttr(block, GC.BlkAttr.NO_MOVE); 2582 2583 init(data, &zmqd_Frame_init_gcFree, block); 2584 } 2585 2586 void init(ubyte[] data, FreeData free, void* hint) @system 2587 in(!m_initialized) 2588 out(; m_initialized) 2589 { 2590 if (zmq_msg_init_data(&m_msg, data.ptr, data.length, free, hint) != 0) { 2591 throw new ZmqException; 2592 } 2593 m_initialized = true; 2594 } 2595 2596 int getProperty(int property) @trusted 2597 { 2598 const value = zmq_msg_get(&m_msg, property); 2599 if (value == -1) { 2600 throw new ZmqException; 2601 } 2602 return value; 2603 } 2604 2605 bool m_initialized; 2606 zmq_msg_t m_msg; 2607 } 2608 2609 unittest 2610 { 2611 const url = uniqueUrl("inproc"); 2612 auto s1 = Socket(SocketType.pair); 2613 auto s2 = Socket(SocketType.pair); 2614 s1.bind(url); 2615 s2.connect(url); 2616 2617 auto m1a = Frame(123); 2618 m1a.data[] = 'a'; 2619 s1.send(m1a); 2620 auto m2a = Frame(); 2621 s2.receive(m2a); 2622 assert(m2a.size == 123); 2623 foreach (e; m2a.data) assert(e == 'a'); 2624 2625 auto m1b = Frame(10); 2626 m1b.data[] = 'b'; 2627 s1.send(m1b); 2628 auto m2b = Frame(); 2629 s2.receive(m2b); 2630 assert(m2b.size == 10); 2631 foreach (e; m2b.data) assert(e == 'b'); 2632 } 2633 2634 deprecated("zmqd.Message has been renamed to zmqd.Frame") alias Message = Frame; 2635 2636 2637 /** 2638 A global context which is used by default by all sockets, unless they are 2639 explicitly constructed with a different context. 2640 2641 The $(ZMQ) Guide $(LINK2 http://zguide.zeromq.org/page:all#Getting-the-Context-Right, 2642 has the following to say) about context creation: 2643 $(QUOTE 2644 You should create and use exactly one context in your process. 2645 [$(LDOTS)] If at runtime a process has two contexts, these are 2646 like separate $(ZMQ) instances. If that's explicitly what you 2647 want, OK, but otherwise remember: $(EM Do one $(D zmq_ctx_new()) 2648 at the start of your main line code, and one $(D zmq_ctx_destroy()) 2649 at the end.) 2650 ) 2651 By using $(D defaultContext()), this is exactly what you achieve. The 2652 context is created the first time the function is called, and is 2653 automatically destroyed when the program ends. 2654 2655 This function is thread safe. 2656 2657 Throws: 2658 $(REF ZmqException) if $(ZMQ) reports an error. 2659 See_also: 2660 $(REF Context) 2661 */ 2662 Context defaultContext() @trusted 2663 { 2664 // For future reference: This is the low-lock singleton pattern. See: 2665 // http://davesdprogramming.wordpress.com/2013/05/06/low-lock-singletons/ 2666 static bool instantiated; 2667 __gshared Context ctx; 2668 if (!instantiated) { 2669 synchronized { 2670 if (!ctx.initialized) { 2671 ctx = Context(); 2672 } 2673 instantiated = true; 2674 } 2675 } 2676 return ctx; 2677 } 2678 2679 @system unittest 2680 { 2681 import core.thread; 2682 Context c1, c2; 2683 auto t = new Thread(() { c1 = defaultContext(); }); 2684 t.start(); 2685 c2 = defaultContext(); 2686 t.join(); 2687 assert(c1.handle !is null); 2688 assert(c1.handle == c2.handle); 2689 } 2690 2691 2692 /** 2693 An object that encapsulates a $(ZMQ) context. 2694 2695 In most programs, it is not necessary to use this type directly, 2696 as $(REF Socket) will use a default global context if not explicitly 2697 provided with one. See $(FREF defaultContext) for details. 2698 2699 A default-initialized $(D Context) is not a valid $(ZMQ) context; it 2700 must always be explicitly initialized with $(FREF _Context.opCall): 2701 --- 2702 Context ctx; // Not a valid context yet 2703 ctx = Context(); // ...but now it is. 2704 --- 2705 $(D Context) objects can be passed around by value, and two copies will 2706 refer to the same context. The underlying context is managed using 2707 reference counting, so that when the last copy of a $(D Context) goes 2708 out of scope, the context is automatically destroyed. The reference 2709 counting is performed in a thread safe manner, so that the same context 2710 can be shared between multiple threads. ($(ZMQ) guarantees the thread 2711 safety of other context operations.) 2712 2713 See_also: 2714 $(FREF defaultContext) 2715 */ 2716 struct Context 2717 { 2718 @safe: 2719 /** 2720 Creates a new $(ZMQ) context. 2721 2722 Returns: 2723 A $(REF Context) object that encapsulates the new context. 2724 Throws: 2725 $(REF ZmqException) if $(ZMQ) reports an error. 2726 Corresponds_to: 2727 $(ZMQREF zmq_ctx_new()) 2728 */ 2729 static Context opCall() @trusted // because of the cast 2730 { 2731 if (auto c = trusted!zmq_ctx_new()) { 2732 Context ctx; 2733 // Casting from/to shared is OK since ZMQ contexts are thread safe. 2734 static Exception release(shared(void)* ptr) @trusted nothrow 2735 { 2736 return zmq_ctx_term(cast(void*) ptr) == 0 2737 ? null 2738 : new ZmqException; 2739 } 2740 ctx.m_resource = SharedResource(cast(shared) c, &release); 2741 return ctx; 2742 } else { 2743 throw new ZmqException; 2744 } 2745 } 2746 2747 /// 2748 unittest 2749 { 2750 auto ctx = Context(); 2751 assert (ctx.initialized); 2752 } 2753 2754 /** 2755 Detaches from the $(ZMQ) context. 2756 2757 If this is the last reference to the context, it will be destroyed with 2758 $(ZMQREF zmq_ctx_term()). 2759 2760 Throws: 2761 $(REF ZmqException) if $(ZMQ) reports an error. 2762 */ 2763 void detach() 2764 { 2765 m_resource.detach(); 2766 } 2767 2768 /// 2769 unittest 2770 { 2771 auto ctx = Context(); 2772 assert (ctx.initialized); 2773 ctx.detach(); 2774 assert (!ctx.initialized); 2775 } 2776 2777 /** 2778 Forcefully terminates the context. 2779 2780 By using this function, one effectively circumvents the reference-counting 2781 mechanism for managing the context. After it returns, all other 2782 $(D Context) objects that used to refer to the same context will be in a 2783 state which is functionally equivalent to the default-initialized state 2784 (i.e., $(REF Context.initialized) is $(D false) and $(REF Context.handle) 2785 is $(D null)). 2786 2787 Throws: 2788 $(REF ZmqException) if $(ZMQ) reports an error. 2789 Corresponds_to: 2790 $(ZMQREF zmq_ctx_term()) 2791 */ 2792 void terminate() @system 2793 { 2794 m_resource.forceRelease(); 2795 } 2796 2797 /// 2798 @system unittest 2799 { 2800 auto ctx1 = Context(); 2801 auto ctx2 = ctx1; 2802 assert (ctx1.initialized); 2803 assert (ctx2.initialized); 2804 assert (ctx1.handle == ctx2.handle); 2805 ctx2.terminate(); 2806 assert (!ctx1.initialized); 2807 assert (!ctx2.initialized); 2808 assert (ctx1.handle == null); 2809 assert (ctx2.handle == null); 2810 } 2811 2812 @system unittest 2813 { 2814 auto ctx = Context(); 2815 2816 void threadFunc() 2817 { 2818 auto server = Socket(ctx, SocketType.rep); 2819 server.bind("inproc://Context_terminate_unittest"); 2820 try { 2821 server.receive(null); 2822 server.send(""); 2823 server.receive(null); 2824 assert (false, "Never get here"); 2825 } catch (ZmqException e) { 2826 assert (e.errno == ETERM); 2827 } 2828 } 2829 2830 import core.thread: Thread; 2831 auto thread = new Thread(&threadFunc); 2832 thread.start(); 2833 2834 auto client = Socket(ctx, SocketType.req); 2835 client.connect("inproc://Context_terminate_unittest"); 2836 client.send(""); 2837 client.receive(null); 2838 client.close(); 2839 2840 ctx.terminate(); 2841 thread.join(); 2842 } 2843 2844 2845 /** 2846 The number of I/O threads. 2847 2848 Throws: 2849 $(REF ZmqException) if $(ZMQ) reports an error. 2850 Corresponds_to: 2851 $(ZMQREF zmq_ctx_get()) and $(ZMQREF zmq_ctx_set()) with 2852 $(D ZMQ_IO_THREADS). 2853 */ 2854 @property int ioThreads() 2855 { 2856 return getOption(ZMQ_IO_THREADS); 2857 } 2858 2859 /// ditto 2860 @property void ioThreads(int value) 2861 { 2862 setOption(ZMQ_IO_THREADS, value); 2863 } 2864 2865 /// 2866 unittest 2867 { 2868 auto ctx = Context(); 2869 ctx.ioThreads = 3; 2870 assert (ctx.ioThreads == 3); 2871 } 2872 2873 /** 2874 The maximum number of sockets. 2875 2876 Throws: 2877 $(REF ZmqException) if $(ZMQ) reports an error. 2878 Corresponds_to: 2879 $(ZMQREF zmq_ctx_get()) and $(ZMQREF zmq_ctx_set()) with 2880 $(D ZMQ_MAX_SOCKETS). 2881 */ 2882 @property int maxSockets() 2883 { 2884 return getOption(ZMQ_MAX_SOCKETS); 2885 } 2886 2887 /// ditto 2888 @property void maxSockets(int value) 2889 { 2890 setOption(ZMQ_MAX_SOCKETS, value); 2891 } 2892 2893 /// 2894 unittest 2895 { 2896 auto ctx = Context(); 2897 ctx.maxSockets = 512; 2898 assert (ctx.maxSockets == 512); 2899 } 2900 2901 /** 2902 The largest configurable number of sockets. 2903 2904 Throws: 2905 $(REF ZmqException) if $(ZMQ) reports an error. 2906 Corresponds_to: 2907 $(ZMQREF zmq_ctx_get()) with $(D ZMQ_SOCKET_LIMIT). 2908 */ 2909 @property int socketLimit() 2910 { 2911 return getOption(ZMQ_SOCKET_LIMIT); 2912 } 2913 2914 /// 2915 unittest 2916 { 2917 auto ctx = Context(); 2918 assert (ctx.socketLimit > 0); 2919 } 2920 2921 /** 2922 IPv6 option. 2923 2924 Throws: 2925 $(REF ZmqException) if $(ZMQ) reports an error. 2926 Corresponds_to: 2927 $(ZMQREF zmq_ctx_get()) and $(ZMQREF zmq_ctx_set()) with 2928 $(D ZMQ_IPV6). 2929 */ 2930 @property bool ipv6() 2931 { 2932 return !!getOption(ZMQ_IPV6); 2933 } 2934 2935 /// ditto 2936 @property void ipv6(bool value) 2937 { 2938 setOption(ZMQ_IPV6, value ? 1 : 0); 2939 } 2940 2941 /// 2942 unittest 2943 { 2944 auto ctx = Context(); 2945 ctx.ipv6 = true; 2946 assert (ctx.ipv6 == true); 2947 } 2948 2949 /** 2950 The $(D void*) pointer used by the underlying C API to refer to the context. 2951 2952 If the object has not been initialized, this function returns $(D null). 2953 */ 2954 @property inout(void)* handle() inout @trusted pure nothrow 2955 { 2956 // ZMQ contexts are thread safe, so casting away shared is OK. 2957 return cast(typeof(return)) m_resource.handle; 2958 } 2959 2960 /** 2961 Whether this $(REF Context) object has been _initialized, i.e. whether it 2962 refers to a valid $(ZMQ) context. 2963 */ 2964 @property bool initialized() const pure nothrow 2965 { 2966 return m_resource.handle != null; 2967 } 2968 2969 /// 2970 unittest 2971 { 2972 Context ctx; 2973 assert (!ctx.initialized); 2974 ctx = Context(); 2975 assert (ctx.initialized); 2976 ctx.detach(); 2977 assert (!ctx.initialized); 2978 } 2979 2980 private: 2981 int getOption(int option) 2982 { 2983 immutable value = trusted!zmq_ctx_get(this.handle, option); 2984 if (value < 0) { 2985 throw new ZmqException; 2986 } 2987 return value; 2988 } 2989 2990 void setOption(int option, int value) 2991 { 2992 if (trusted!zmq_ctx_set(this.handle, option, value) != 0) { 2993 throw new ZmqException; 2994 } 2995 } 2996 2997 SharedResource m_resource; 2998 } 2999 3000 3001 /** 3002 Socket event types. 3003 3004 These are used together with $(FREF Socket.monitor), and are described 3005 in the $(ZMQREF zmq_socket_monitor()) reference. 3006 */ 3007 enum EventType 3008 { 3009 connected = ZMQ_EVENT_CONNECTED, /// Corresponds to $(D ZMQ_EVENT_CONNECTED). 3010 connectDelayed = ZMQ_EVENT_CONNECT_DELAYED,/// Corresponds to $(D ZMQ_EVENT_CONNECT_DELAYED). 3011 connectRetried = ZMQ_EVENT_CONNECT_RETRIED,/// Corresponds to $(D ZMQ_EVENT_CONNECT_RETRIED). 3012 listening = ZMQ_EVENT_LISTENING, /// Corresponds to $(D ZMQ_EVENT_LISTENING). 3013 bindFailed = ZMQ_EVENT_BIND_FAILED, /// Corresponds to $(D ZMQ_EVENT_BIND_FAILED). 3014 accepted = ZMQ_EVENT_ACCEPTED, /// Corresponds to $(D ZMQ_EVENT_ACCEPTED). 3015 acceptFailed = ZMQ_EVENT_ACCEPT_FAILED, /// Corresponds to $(D ZMQ_EVENT_ACCEPT_FAILED). 3016 closed = ZMQ_EVENT_CLOSED, /// Corresponds to $(D ZMQ_EVENT_CLOSED). 3017 closeFailed = ZMQ_EVENT_CLOSE_FAILED, /// Corresponds to $(D ZMQ_EVENT_CLOSE_FAILED). 3018 disconnected = ZMQ_EVENT_DISCONNECTED, /// Corresponds to $(D ZMQ_EVENT_DISCONNECTED). 3019 monitorStopped = ZMQ_EVENT_MONITOR_STOPPED,/// Corresponds to $(D ZMQ_EVENT_MONITOR_STOPPED). 3020 all = ZMQ_EVENT_ALL, /// Corresponds to $(D ZMQ_EVENT_ALL). 3021 handshakeFailedNoDetail = ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL, /// Corresponds to $(D ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL). 3022 handshakeSucceeded = ZMQ_EVENT_HANDSHAKE_SUCCEEDED, /// Corresponds to $(D ZMQ_EVENT_HANDSHAKE_SUCCEEDED). 3023 handshakeFailedProtocol = ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL, /// Corresponds to $(D ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL). 3024 handshakeFailedAuth = ZMQ_EVENT_HANDSHAKE_FAILED_AUTH, /// Corresponds to $(D ZMQ_EVENT_HANDSHAKE_FAILED_AUTH). 3025 } 3026 3027 3028 /** 3029 Protocol error codes. 3030 3031 These may be returned from $(FREF Event.protocolError), and are described 3032 in the $(ZMQREF zmq_socket_monitor()) reference. 3033 */ 3034 enum ProtocolError 3035 { 3036 zmtpUnspecified = ZMQ_PROTOCOL_ERROR_ZMTP_UNSPECIFIED, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_UNSPECIFIED). 3037 zmtpUnexpectedCommand = ZMQ_PROTOCOL_ERROR_ZMTP_UNEXPECTED_COMMAND, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_UNEXPECTED_COMMAND). 3038 zmtpInvalidSequence = ZMQ_PROTOCOL_ERROR_ZMTP_INVALID_SEQUENCE, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_INVALID_SEQUENCE). 3039 zmtpKeyExchange = ZMQ_PROTOCOL_ERROR_ZMTP_KEY_EXCHANGE, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_KEY_EXCHANGE). 3040 zmtpMalformedCommandUnspecified = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_UNSPECIFIED,/// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_UNSPECIFIED). 3041 zmtpMalformedCommandMessage = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_MESSAGE, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_MESSAGE). 3042 zmtpMalformedCommandHello = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_HELLO, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_HELLO). 3043 zmtpMalformedCommandInitiate = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_INITIATE, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_INITIATE). 3044 zmtpMalformedCommandError = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_ERROR, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_ERROR). 3045 zmtpMalformedCommandReady = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_READY, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_READY). 3046 zmtpMalformedCommandWelcome = ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_WELCOME, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_WELCOME). 3047 zmtpInvalidMetadata = ZMQ_PROTOCOL_ERROR_ZMTP_INVALID_METADATA, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_INVALID_METADATA). 3048 zmtpCryptographic = ZMQ_PROTOCOL_ERROR_ZMTP_CRYPTOGRAPHIC, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_CRYPTOGRAPHIC). 3049 zmtpMechanismMismatch = ZMQ_PROTOCOL_ERROR_ZMTP_MECHANISM_MISMATCH, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZMTP_MECHANISM_MISMATCH). 3050 zapUnspecified = ZMQ_PROTOCOL_ERROR_ZAP_UNSPECIFIED, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZAP_UNSPECIFIED). 3051 zapMalformedReply = ZMQ_PROTOCOL_ERROR_ZAP_MALFORMED_REPLY, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZAP_MALFORMED_REPLY). 3052 zapBadRequestID = ZMQ_PROTOCOL_ERROR_ZAP_BAD_REQUEST_ID, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZAP_BAD_REQUEST_ID). 3053 zapBadVersion = ZMQ_PROTOCOL_ERROR_ZAP_BAD_VERSION, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZAP_BAD_VERSION). 3054 zapInvalidStatusCode = ZMQ_PROTOCOL_ERROR_ZAP_INVALID_STATUS_CODE, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZAP_INVALID_STATUS_CODE). 3055 zapInvalidMetadata = ZMQ_PROTOCOL_ERROR_ZAP_INVALID_METADATA, /// Corresponds to $(D ZMQ_PROTOCOL_ERROR_ZAP_INVALID_METADATA). 3056 } 3057 3058 3059 /** 3060 Receives a message on the given _socket and interprets it as a _socket 3061 state change event. 3062 3063 $(D socket) must be a PAIR _socket which is connected to an endpoint 3064 created via a $(FREF Socket.monitor) call. $(D receiveEvent()) receives 3065 one message on the _socket, parses its contents according to the 3066 specification in the $(ZMQREF zmq_socket_monitor()) reference, 3067 and returns the event information as an $(REF Event) object. 3068 3069 Throws: 3070 $(REF ZmqException) if $(ZMQ) reports an error.$(BR) 3071 $(REF InvalidEventException) if the received message could not 3072 be interpreted as an event message. 3073 See_also: 3074 $(FREF Socket.monitor), for monitoring _socket state changes. 3075 */ 3076 Event receiveEvent(ref Socket socket) @system 3077 { 3078 enum eventFrameSize = ushort.sizeof + int.sizeof; 3079 auto eventFrame = Frame(); 3080 if (socket.receive(eventFrame) != eventFrameSize) { 3081 throw new InvalidEventException; 3082 } 3083 auto addrFrame = Frame(); 3084 socket.receive(addrFrame); 3085 try { 3086 import std.conv: to; 3087 immutable event = to!EventType(*(cast(const(ushort)*) eventFrame.data.ptr)); 3088 immutable value = *(cast(const(int)*) eventFrame.data.ptr + ushort.sizeof); 3089 immutable addr = (cast(char[]) addrFrame.data).idup; 3090 return Event(event, addr, value); 3091 } catch (Exception e) { 3092 // Any exception thrown within the try block signifies that there 3093 // is something wrong with the event message. 3094 throw new InvalidEventException; 3095 } 3096 } 3097 3098 // Socket monitoring only works for connection-oriented sockets, and 3099 // IPC is not supported on Windows. 3100 version (Posix) @system unittest 3101 { 3102 Event[] events; 3103 void eventCollector() 3104 { 3105 auto coll = Socket(SocketType.pair); 3106 coll.connect("inproc://zmqd_receiveEvent_unittest_monitor"); 3107 do { 3108 events ~= receiveEvent(coll); 3109 } while (events[$-1].type != EventType.closed); 3110 } 3111 import core.thread; 3112 auto collector = new Thread(&eventCollector); 3113 collector.start(); 3114 3115 static void eventGenerator() 3116 { 3117 auto sck1 = Socket(SocketType.pair); 3118 sck1.monitor("inproc://zmqd_receiveEvent_unittest_monitor"); 3119 sck1.bind("ipc://zmqd_receiveEvent_unittest"); 3120 import core.time; 3121 Thread.sleep(100.msecs); 3122 auto sck2 = Socket(SocketType.pair); 3123 sck2.connect("ipc://zmqd_receiveEvent_unittest"); 3124 Thread.sleep(100.msecs); 3125 sck2.disconnect("ipc://zmqd_receiveEvent_unittest"); 3126 Thread.sleep(100.msecs); 3127 sck1.unbind("ipc://zmqd_receiveEvent_unittest"); 3128 } 3129 eventGenerator(); 3130 collector.join(); 3131 assert (events.length == 4); 3132 foreach (ev; events) { 3133 assert (ev.address == "ipc://zmqd_receiveEvent_unittest"); 3134 } 3135 assert (events[0].type == EventType.listening); 3136 assert (events[1].type == EventType.accepted); 3137 assert (events[2].type == EventType.handshakeSucceeded); 3138 assert (events[3].type == EventType.closed); 3139 import std.exception; 3140 assertNotThrown!Error(events[0].fd); 3141 assertThrown!Error(events[0].errno); 3142 assertThrown!Error(events[0].interval); 3143 } 3144 3145 3146 /** 3147 Information about a socket state change. 3148 3149 See_also: 3150 $(FREF receiveEvent) 3151 */ 3152 struct Event 3153 { 3154 /// The event _type. 3155 @property EventType type() const pure nothrow 3156 { 3157 return m_type; 3158 } 3159 3160 /// The peer _address. 3161 @property string address() const pure nothrow 3162 { 3163 return m_address; 3164 } 3165 3166 /** 3167 The socket file descriptor. 3168 3169 This property function may only be called if $(REF Event.type) is one of: 3170 $(D connected), $(D listening), $(D accepted), $(D closed) or $(D disonnected). 3171 3172 Throws: 3173 $(D Error) if the property is called for a wrong event type. 3174 */ 3175 @property FD fd() const pure nothrow 3176 { 3177 final switch (m_type) { 3178 case EventType.connected: 3179 case EventType.listening: 3180 case EventType.accepted: 3181 case EventType.closed: 3182 case EventType.disconnected: 3183 return cast(typeof(return)) m_value; 3184 case EventType.connectDelayed: 3185 case EventType.connectRetried: 3186 case EventType.bindFailed: 3187 case EventType.acceptFailed: 3188 case EventType.closeFailed: 3189 case EventType.monitorStopped: 3190 case EventType.handshakeFailedNoDetail: 3191 case EventType.handshakeSucceeded: 3192 case EventType.handshakeFailedProtocol: 3193 case EventType.handshakeFailedAuth: 3194 throw invalidProperty(); 3195 case EventType.all: 3196 assert (false); 3197 } 3198 } 3199 3200 /** 3201 The $(D _errno) code for the error which triggered the event. 3202 3203 This property function may only be called if $(REF Event.type) is either 3204 $(D bindFailed), $(D acceptFailed), $(D closeFailed) or 3205 $(D handshakeFailedNoDetail). 3206 3207 Throws: 3208 $(D Error) if the property is called for a wrong event type. 3209 */ 3210 @property int errno() const pure nothrow 3211 { 3212 final switch (m_type) { 3213 case EventType.bindFailed: 3214 case EventType.acceptFailed: 3215 case EventType.closeFailed: 3216 case EventType.handshakeFailedNoDetail: 3217 return m_value; 3218 case EventType.connected: 3219 case EventType.connectDelayed: 3220 case EventType.connectRetried: 3221 case EventType.listening: 3222 case EventType.accepted: 3223 case EventType.closed: 3224 case EventType.disconnected: 3225 case EventType.monitorStopped: 3226 case EventType.handshakeSucceeded: 3227 case EventType.handshakeFailedProtocol: 3228 case EventType.handshakeFailedAuth: 3229 throw invalidProperty(); 3230 case EventType.all: 3231 assert (false); 3232 } 3233 } 3234 3235 /** 3236 The reconnect interval. 3237 3238 This property function may only be called if $(REF Event.type) is 3239 $(D connectRetried). 3240 3241 Throws: 3242 $(D Error) if the property is called for a wrong event type. 3243 */ 3244 @property Duration interval() const pure nothrow 3245 { 3246 final switch (m_type) { 3247 case EventType.connectRetried: 3248 return m_value.msecs; 3249 case EventType.connected: 3250 case EventType.connectDelayed: 3251 case EventType.listening: 3252 case EventType.bindFailed: 3253 case EventType.accepted: 3254 case EventType.acceptFailed: 3255 case EventType.closed: 3256 case EventType.closeFailed: 3257 case EventType.disconnected: 3258 case EventType.monitorStopped: 3259 case EventType.handshakeFailedNoDetail: 3260 case EventType.handshakeSucceeded: 3261 case EventType.handshakeFailedProtocol: 3262 case EventType.handshakeFailedAuth: 3263 throw invalidProperty(); 3264 case EventType.all: 3265 assert (false); 3266 } 3267 } 3268 3269 /** 3270 The protocol error code. 3271 3272 This property function may only be called if $(REF Event.type) is 3273 $(D handshakeFailedProtocol). 3274 3275 Throws: 3276 $(D Error) if the property is called for a wrong event type. 3277 */ 3278 @property ProtocolError protocolError() const pure 3279 { 3280 final switch (m_type) { 3281 case EventType.handshakeFailedProtocol: 3282 import std.conv: to; 3283 return to!ProtocolError(m_value); 3284 case EventType.connectRetried: 3285 case EventType.connected: 3286 case EventType.connectDelayed: 3287 case EventType.listening: 3288 case EventType.bindFailed: 3289 case EventType.accepted: 3290 case EventType.acceptFailed: 3291 case EventType.closed: 3292 case EventType.closeFailed: 3293 case EventType.disconnected: 3294 case EventType.monitorStopped: 3295 case EventType.handshakeFailedNoDetail: 3296 case EventType.handshakeSucceeded: 3297 case EventType.handshakeFailedAuth: 3298 throw invalidProperty(); 3299 case EventType.all: 3300 assert (false); 3301 } 3302 } 3303 3304 /** 3305 The status code returned by the ZAP handler. 3306 3307 This property function may only be called if $(REF Event.type) is 3308 $(D handshakeFailedAuth). 3309 3310 Throws: 3311 $(D Error) if the property is called for a wrong event type. 3312 */ 3313 @property int statusCode() const pure 3314 { 3315 final switch (m_type) { 3316 case EventType.handshakeFailedAuth: 3317 return m_value; 3318 case EventType.connectRetried: 3319 case EventType.connected: 3320 case EventType.connectDelayed: 3321 case EventType.listening: 3322 case EventType.bindFailed: 3323 case EventType.accepted: 3324 case EventType.acceptFailed: 3325 case EventType.closed: 3326 case EventType.closeFailed: 3327 case EventType.disconnected: 3328 case EventType.monitorStopped: 3329 case EventType.handshakeFailedNoDetail: 3330 case EventType.handshakeSucceeded: 3331 case EventType.handshakeFailedProtocol: 3332 throw invalidProperty(); 3333 case EventType.all: 3334 assert (false); 3335 } 3336 } 3337 3338 private: 3339 this(EventType type, string address, int value) pure nothrow 3340 { 3341 m_type = type; 3342 m_address = address; 3343 m_value = value; 3344 } 3345 3346 Error invalidProperty(string name = __FUNCTION__)() const pure nothrow 3347 { 3348 try { 3349 import std.conv: text; 3350 return new Error(text("Property '", name, 3351 "' not available for event type '", 3352 m_type, "'")); 3353 } catch (Exception e) { 3354 assert(false); 3355 } 3356 } 3357 3358 EventType m_type; 3359 string m_address; 3360 int m_value; 3361 } 3362 3363 3364 /** 3365 Encodes a binary key as Z85 printable text. 3366 3367 $(D dest) must be an array whose length is at least $(D 5*data.length/4 + 1), 3368 which will be used to store the return value plus a terminating zero byte. 3369 If $(D dest) is omitted, a new array will be created. 3370 3371 Returns: 3372 An array of size $(D 5*data.length/4) which contains the Z85-encoded text, 3373 excluding the terminating zero byte. This will be a slice of $(D dest) if 3374 it is provided. 3375 Throws: 3376 $(COREF exception,RangeError) if $(D dest) is given but is too small.$(BR) 3377 $(REF ZmqException) if $(ZMQ) reports an error (i.e., if $(D data.length) 3378 is not a multiple of 4). 3379 Corresponds_to: 3380 $(ZMQREF zmq_z85_encode()) 3381 */ 3382 char[] z85Encode(ubyte[] data, char[] dest) 3383 // TODO: Make data const when we update to ZMQ 4.1 3384 { 3385 import core.exception: RangeError; 3386 immutable len = 5 * data.length / 4; 3387 if (dest.length < len+1) throw new RangeError; 3388 if (trusted!zmq_z85_encode(ptr(dest), ptr(data), data.length) == null) { 3389 throw new ZmqException; 3390 } 3391 return dest[0 .. len]; 3392 } 3393 3394 /// ditto 3395 char[] z85Encode(ubyte[] data) 3396 { 3397 return z85Encode(data, new char[5*data.length/4 + 1]); 3398 } 3399 3400 debug (WithCurveTests) @system unittest // @system because of assertThrown 3401 { 3402 // TODO: Make data immutable when we update to ZMQ 4.1 3403 auto data = cast(ubyte[])[0x86, 0x4f, 0xd2, 0x6f, 0xb5, 0x59, 0xf7, 0x5b]; 3404 immutable encoded = "HelloWorld"; 3405 assert (z85Encode(data) == encoded); 3406 3407 auto buffer = new char[11]; 3408 auto result = z85Encode(data, buffer); 3409 assert (result == encoded); 3410 assert (buffer.ptr == result.ptr); 3411 3412 import core.exception: RangeError; 3413 import std.exception: assertThrown; 3414 assertThrown!RangeError(z85Encode(data, new char[10])); 3415 assertThrown!ZmqException(z85Encode(cast(ubyte[]) [ 1, 2, 3, 4, 5])); 3416 } 3417 3418 3419 /** 3420 Decodes a binary key from Z85 printable _text. 3421 3422 $(D dest) must be an array whose length is at least $(D 4*data.length/5), 3423 which will be used to store the return value. 3424 If $(D dest) is omitted, a new array will be created. 3425 3426 Note that $(ZMQREF zmq_z85_decode()) expects a zero-terminated string, so a zero 3427 byte will be appended to $(D text) if it does not contain one already. However, 3428 this may trigger a (possibly unwanted) GC allocation. To avoid this, either 3429 make sure that the last character in $(D text) is $(D '\0'), or use 3430 $(OBJREF assumeSafeAppend) on the array before calling this function (provided 3431 this is safe). 3432 3433 Returns: 3434 An array of size $(D 4*data.length/5) which contains the decoded data. 3435 This will be a slice of $(D dest) if it is provided. 3436 Throws: 3437 $(COREF exception,RangeError) if $(D dest) is given but is too small.$(BR) 3438 $(REF ZmqException) if $(ZMQ) reports an error (i.e., if $(D data.length) 3439 is not a multiple of 5). 3440 Corresponds_to: 3441 $(ZMQREF zmq_z85_decode()) 3442 */ 3443 ubyte[] z85Decode(char[] text, ubyte[] dest) 3444 // TODO: Make text const when we update to ZMQ 4.1 3445 { 3446 import core.exception: RangeError; 3447 immutable len = 4 * text.length/5; 3448 if (dest.length < len) throw new RangeError; 3449 if (text[$-1] != '\0') text ~= '\0'; 3450 if (trusted!zmq_z85_decode(ptr(dest), ptr(text)) == null) { 3451 throw new ZmqException; 3452 } 3453 return dest[0 .. len]; 3454 } 3455 3456 /// ditto 3457 ubyte[] z85Decode(char[] text) 3458 { 3459 return z85Decode(text, new ubyte[4*text.length/5]); 3460 } 3461 3462 debug (WithCurveTests) @system unittest // @system because of assertThrown 3463 { 3464 // TODO: Make data immutable when we update to ZMQ 4.1 3465 auto text = "HelloWorld".dup; 3466 immutable decoded = cast(ubyte[])[0x86, 0x4f, 0xd2, 0x6f, 0xb5, 0x59, 0xf7, 0x5b]; 3467 assert (z85Decode(text) == decoded); 3468 assert (z85Decode(text~'\0') == decoded); 3469 3470 auto buffer = new ubyte[8]; 3471 auto result = z85Decode(text, buffer); 3472 assert (result == decoded); 3473 assert (buffer.ptr == result.ptr); 3474 3475 import core.exception: RangeError; 3476 import std.exception: assertThrown; 3477 assertThrown!RangeError(z85Decode(text, new ubyte[7])); 3478 assertThrown!ZmqException(z85Decode("SizeNotAMultipleOf5".dup)); 3479 } 3480 3481 3482 /** 3483 Generates a new Curve key pair. 3484 3485 To avoid a memory allocation, preallocated buffers may optionally be supplied 3486 for the two keys. Each of these must have a length of at least 41 bytes, enough 3487 for a 40-character Z85-encoded key plus a terminating zero byte. If either 3488 buffer is omitted/$(D null), a new one will be created. 3489 3490 Returns: 3491 A tuple that contains the two keys. Each of these will have a length of 3492 40 characters, and will be slices of the input buffers if such have been 3493 provided. 3494 Throws: 3495 $(COREF exception,RangeError) if $(D publicKeyBuf) or $(D secretKeyBuf) are 3496 not $(D null) but have a length of less than 41 characters.$(BR) 3497 $(REF ZmqException) if $(ZMQ) reports an error. 3498 Corresponds_to: 3499 $(ZMQREF zmq_curve_keypair()) 3500 */ 3501 Tuple!(char[], "publicKey", char[], "secretKey") 3502 curveKeyPair(char[] publicKeyBuf = null, char[] secretKeyBuf = null) 3503 { 3504 import core.exception: RangeError; 3505 if (publicKeyBuf is null) publicKeyBuf = new char[41]; 3506 else if (publicKeyBuf.length < 41) throw new RangeError; 3507 if (secretKeyBuf is null) secretKeyBuf = new char[41]; 3508 else if (secretKeyBuf.length < 41) throw new RangeError; 3509 3510 if (trusted!zmq_curve_keypair(ptr(publicKeyBuf), ptr(secretKeyBuf)) != 0) { 3511 throw new ZmqException; 3512 } 3513 return typeof(return)(publicKeyBuf[0 .. 40], secretKeyBuf[0 .. 40]); 3514 } 3515 3516 /// 3517 debug (WithCurveTests) unittest 3518 { 3519 auto server = Socket(SocketType.rep); 3520 auto serverKeys = curveKeyPair(); 3521 server.curveServer = true; 3522 server.curveSecretKeyZ85 = serverKeys.secretKey; 3523 server.bind("inproc://curveKeyPair_test"); 3524 3525 auto client = Socket(SocketType.req); 3526 auto clientKeys = curveKeyPair(); 3527 client.curvePublicKeyZ85 = clientKeys.publicKey; 3528 client.curveSecretKeyZ85 = clientKeys.secretKey; 3529 client.curveServerKeyZ85 = serverKeys.publicKey; 3530 client.connect("inproc://curveKeyPair_test"); 3531 client.send("hello"); 3532 3533 ubyte[5] buf; 3534 assert (server.receive(buf) == 5); 3535 assert (buf.asString() == "hello"); 3536 } 3537 3538 debug (WithCurveTests) @system unittest 3539 { 3540 auto k1 = curveKeyPair(); 3541 assert (k1.publicKey.length == 40); 3542 assert (k1.secretKey.length == 40); 3543 3544 char[82] buf; 3545 auto k2 = curveKeyPair(buf[0 .. 41], buf[41 .. 82]); 3546 assert (k2.publicKey.length == 40); 3547 assert (k2.publicKey.ptr == buf.ptr); 3548 assert (k2.secretKey.length == 40); 3549 assert (k2.secretKey.ptr == buf.ptr + 41); 3550 3551 char[82] backup = buf; 3552 import core.exception, std.exception; 3553 assertThrown!RangeError(curveKeyPair(buf[0 .. 40], buf[41 .. 82])); 3554 assertThrown!RangeError(curveKeyPair(buf[0 .. 41], buf[42 .. 82])); 3555 assert (backup[] == buf[]); 3556 } 3557 3558 3559 /** 3560 Utility function which interprets and validates a byte array as a UTF-8 string. 3561 3562 Most of $(ZMQD)'s message API deals in $(D ubyte[]) arrays, but very often, 3563 the message _data contains plain text. $(D asString()) allows for easy and 3564 safe interpretation of raw _data as characters. It checks that $(D data) is 3565 a valid UTF-8 encoded string, and returns a $(D char[]) array that refers to 3566 the same memory region. 3567 3568 Throws: 3569 $(STDREF utf,UTFException) if $(D data) is not a valid UTF-8 string. 3570 See_also: 3571 $(STDREF string,representation), which performs the opposite operation. 3572 */ 3573 inout(char)[] asString(inout(ubyte)[] data) pure 3574 { 3575 auto s = cast(typeof(return)) data; 3576 import std.utf: validate; 3577 validate(s); 3578 return s; 3579 } 3580 3581 /// 3582 unittest 3583 { 3584 auto s1 = Socket(SocketType.pair); 3585 s1.bind("inproc://zmqd_asString_example"); 3586 auto s2 = Socket(SocketType.pair); 3587 s2.connect("inproc://zmqd_asString_example"); 3588 3589 auto msg = Frame(12); 3590 msg.data.asString()[] = "Hello World!"; 3591 s1.send(msg); 3592 3593 ubyte[12] buf; 3594 s2.receive(buf); 3595 assert(buf.asString() == "Hello World!"); 3596 } 3597 3598 unittest 3599 { 3600 auto bytes = cast(ubyte[]) ['f', 'o', 'o']; 3601 auto text = bytes.asString(); 3602 assert (text == "foo"); 3603 assert (cast(void*) ptr(bytes) == cast(void*) ptr(text)); 3604 3605 import std.exception: assertThrown; 3606 import std.utf: UTFException; 3607 auto b = cast(ubyte[]) [100, 252, 1]; 3608 assertThrown!UTFException(asString(b)); 3609 } 3610 3611 3612 /** 3613 A class for exceptions thrown when any of the underlying $(ZMQ) C functions 3614 report an error. 3615 3616 The exception provides a standard error message obtained with 3617 $(ZMQREF zmq_strerror()), as well as the $(D errno) code set by the $(ZMQ) 3618 function which reported the error. 3619 */ 3620 class ZmqException : Exception 3621 { 3622 /** 3623 The $(D _errno) code that was set by the $(ZMQ) function that reported 3624 the error. 3625 3626 Corresponds_to: 3627 $(ZMQREF zmq_errno()) 3628 */ 3629 immutable int errno; 3630 3631 private: 3632 this(string file = __FILE__, int line = __LINE__) nothrow 3633 { 3634 import std.conv; 3635 this.errno = trusted!zmq_errno(); 3636 string msg; 3637 try { 3638 msg = trusted!(to!string)(trusted!zmq_strerror(this.errno)); 3639 } catch (Exception e) { /* We never get here */ } 3640 assert(msg.length); // Still, let's assert as much. 3641 super(msg, file, line); 3642 } 3643 } 3644 3645 3646 /** 3647 Exception thrown by $(FREF receiveEvent) on failure to interpret a 3648 received message as an event description. 3649 */ 3650 class InvalidEventException : Exception 3651 { 3652 private: 3653 this(string file = __FILE__, int line = __LINE__) nothrow 3654 { 3655 super("The received message is not an event message", file, line); 3656 } 3657 } 3658 3659 3660 // ============================================================================= 3661 // Everything below is internal 3662 // ============================================================================= 3663 private: 3664 3665 3666 struct SharedResource 3667 { 3668 @safe: 3669 alias Exception function(shared(void)*) nothrow Release; 3670 3671 this(shared(void)* ptr, Release release) nothrow 3672 in(ptr) 3673 { 3674 m_payload = new shared(Payload)(1, ptr, release); 3675 } 3676 3677 this(this) nothrow 3678 { 3679 if (m_payload) { 3680 incRefCount(); 3681 } 3682 } 3683 3684 ~this() nothrow 3685 { 3686 nothrowDetach(); 3687 } 3688 3689 void opAssign(SharedResource rhs) 3690 { 3691 detach(); 3692 m_payload = rhs.m_payload; 3693 rhs.m_payload = null; 3694 } 3695 3696 void detach() 3697 { 3698 if (auto ex = nothrowDetach()) throw ex; 3699 } 3700 3701 void forceRelease() @system 3702 { 3703 if (m_payload) { 3704 scope(exit) m_payload = null; 3705 decRefCount(); 3706 if (m_payload.handle != null) { 3707 scope(exit) m_payload.handle = null; 3708 if (auto ex = m_payload.release(m_payload.handle)) { 3709 throw ex; 3710 } 3711 } 3712 } 3713 } 3714 3715 @property inout(shared(void))* handle() inout pure nothrow 3716 { 3717 if (m_payload) { 3718 return m_payload.handle; 3719 } else { 3720 return null; 3721 } 3722 } 3723 3724 private: 3725 void incRefCount() @trusted nothrow 3726 { 3727 assert (m_payload !is null && m_payload.refCount > 0); 3728 import core.atomic: atomicOp; 3729 atomicOp!"+="(m_payload.refCount, 1); 3730 } 3731 3732 int decRefCount() @trusted nothrow 3733 { 3734 assert (m_payload !is null && m_payload.refCount > 0); 3735 import core.atomic: atomicOp; 3736 return atomicOp!"-="(m_payload.refCount, 1); 3737 } 3738 3739 Exception nothrowDetach() @trusted nothrow 3740 out(; m_payload is null) 3741 { 3742 if (m_payload) { 3743 scope(exit) m_payload = null; 3744 if (decRefCount() < 1 && m_payload.handle != null) { 3745 return m_payload.release(m_payload.handle); 3746 } 3747 } 3748 return null; 3749 } 3750 3751 struct Payload 3752 { 3753 int refCount; 3754 void* handle; 3755 Release release; 3756 } 3757 shared(Payload)* m_payload; 3758 3759 invariant() 3760 { 3761 assert (m_payload is null || 3762 (m_payload.refCount > 0 && m_payload.release !is null)); 3763 } 3764 } 3765 3766 @system unittest 3767 { 3768 import std.exception: assertNotThrown, assertThrown; 3769 static Exception myFree(shared(void)* p) @trusted nothrow 3770 { 3771 auto v = cast(shared(int)*) p; 3772 if (*v == 0) { 3773 return new Exception("double release"); 3774 } else { 3775 *v = 0; 3776 return null; 3777 } 3778 } 3779 3780 shared int i = 1; 3781 3782 { 3783 // Test constructor and properties. 3784 auto ra = SharedResource(&i, &myFree); 3785 assert (i == 1); 3786 assert (ra.handle == &i); 3787 3788 // Test postblit constructor 3789 auto rb = ra; 3790 assert (i == 1); 3791 assert (rb.handle == &i); 3792 3793 { 3794 // Test properties and free() with default-initialized object. 3795 SharedResource rc; 3796 assert (rc.handle == null); 3797 assertNotThrown(rc.detach()); 3798 3799 // Test assignment, both with and without detachment 3800 rc = rb; 3801 assert (i == 1); 3802 assert (rc.handle == &i); 3803 3804 shared int j = 2; 3805 auto rd = SharedResource(&j, &myFree); 3806 assert (rd.handle == &j); 3807 rd = rb; 3808 assert (j == 0); 3809 assert (i == 1); 3810 assert (rd.handle == &i); 3811 3812 // Test explicit detach() 3813 shared int k = 3; 3814 auto re = SharedResource(&k, &myFree); 3815 assertNotThrown(re.detach()); 3816 assert(k == 0); 3817 3818 // Test failure to free and assign (myFree(&k) fails when k == 0) 3819 re = SharedResource(&k, &myFree); 3820 assertThrown!Exception(re.detach()); // We defined free(k == 0) as an error 3821 re = SharedResource(&k, &myFree); 3822 assertThrown!Exception(re = rb); 3823 } 3824 3825 // i should not be "freed" yet 3826 assert (i == 1); 3827 assert (ra.handle == &i); 3828 assert (rb.handle == &i); 3829 } 3830 // ...but now it should. 3831 assert (i == 0); 3832 } 3833 3834 // Thread safety test 3835 @system unittest 3836 { 3837 enum threadCount = 100; 3838 enum copyCount = 1000; 3839 static Exception myFree(shared(void)* p) @trusted nothrow 3840 { 3841 auto v = cast(shared(int)*) p; 3842 if (*v == 0) { 3843 return new Exception("double release"); 3844 } else { 3845 *v = 0; 3846 return null; 3847 } 3848 } 3849 shared int raw = 1; 3850 { 3851 auto rs = SharedResource(&raw, &myFree); 3852 3853 import core.thread; 3854 auto group = new ThreadGroup; 3855 foreach (i; 0 .. threadCount) { 3856 group.create(() { 3857 auto a = rs; 3858 foreach (j; 0 .. copyCount) { 3859 auto b = a; 3860 assert (b.handle == &raw); 3861 assert (raw == 1); 3862 } 3863 }); 3864 } 3865 group.joinAll(); 3866 assert (rs.handle == &raw); 3867 assert (raw == 1); 3868 } 3869 assert (raw == 0); 3870 } 3871 3872 // forceRelease() test 3873 @system unittest 3874 { 3875 import std.exception: assertNotThrown, assertThrown; 3876 static Exception myFree(shared(void)* p) @trusted nothrow 3877 { 3878 auto v = cast(shared(int)*) p; 3879 if (*v == 0) { 3880 return new Exception("double release"); 3881 } else { 3882 *v = 0; 3883 return null; 3884 } 3885 } 3886 3887 shared int i = 1; 3888 3889 auto ra = SharedResource(&i, &myFree); 3890 auto rb = ra; 3891 assert (ra.handle == &i); 3892 assert (rb.handle == ra.handle); 3893 rb.forceRelease(); 3894 assert (rb.handle == null); 3895 assert (ra.handle == null); 3896 } 3897 3898 3899 version(unittest) private string uniqueUrl(string p, int n = __LINE__) 3900 { 3901 import std.uuid; 3902 return p ~ "://" ~ randomUUID().toString(); 3903 } 3904 3905 3906 private auto trusted(alias func, Args...)(auto ref Args args) @trusted 3907 { 3908 return func(args); 3909 } 3910 3911 private auto scopedTrusted(alias func, Args...)(auto ref scope Args args) @trusted 3912 { 3913 return func(args); 3914 } 3915 3916 3917 // std.string.toStringz() is unsafe, so we provide our own implementation 3918 // tailored to the string sizes we are likely to encounter here. 3919 // Note that this implementation requires that the string be used immediately 3920 // upon return, and not stored, as the buffer will be reused most of the time. 3921 const(char)* zeroTermString(const char[] s) nothrow 3922 { 3923 import std.algorithm: max; 3924 static char[] buf; 3925 immutable len = s.length + 1; 3926 if (buf.length < len) buf.length = max(len, 1023); 3927 buf[0 .. s.length] = s; 3928 buf[s.length] = '\0'; 3929 return ptr(buf); 3930 } 3931 3932 @system unittest 3933 { 3934 auto c1 = zeroTermString("Hello World!"); 3935 assert (c1[0 .. 13] == "Hello World!\0"); 3936 auto c2 = zeroTermString("foo"); 3937 assert (c2[0 .. 4] == "foo\0"); 3938 assert (c1 == c2); 3939 }