1 module examples.lbbroker2; 2 3 import std.stdio; 4 import std.concurrency; 5 import std.range; 6 import std.conv; 7 8 import core.thread; 9 10 import zmqd; 11 import zhelpers; 12 13 enum clientCount = 10; 14 enum workerCount = 3; 15 16 void main() 17 { 18 auto frontend = Socket(SocketType.router); 19 frontend.linger = infiniteDuration; 20 frontend.bind("tcp://*:5672"); 21 22 auto backend = Socket(SocketType.router); 23 backend.linger = infiniteDuration; 24 backend.bind("tcp://*:5673"); 25 26 // starts clients 27 foreach (const int i; 0 .. clientCount) 28 spawn(&clientTask, i); 29 30 // start workers 31 foreach (const int i; 0 .. workerCount) 32 spawn(&workerTask, i); 33 34 // main task body 35 // Here is the main loop for the least-recently-used queue. It has two 36 // sockets; a frontend for clients and a backend for workers. It polls 37 // the backend in all cases, and polls the frontend only when there are 38 // one or more workers ready. This is a neat way to use 0MQ's own queues 39 // to hold messages we're not ready to process yet. When we get a client 40 // request, we pop the next available worker and send the request to it, 41 // including the originating client identity. When a worker replies, we 42 // requeue that worker and forward the reply to the original client 43 // using the reply envelope. 44 45 // Queue of available workers 46 string[] availableWorkers; 47 int requestsServed = 0; 48 while (true) 49 { 50 PollItem[] items = [ 51 PollItem(backend, PollFlags.pollIn), 52 PollItem(frontend, PollFlags.pollIn) 53 ]; 54 55 // Poll frontend only if we have available workers 56 poll(availableWorkers.length ? items : items[0..1]); 57 58 // handle worker activity on backend 59 if (items[0].returnedEvents & PollFlags.pollIn) 60 { 61 // worker-id for load balancing 62 string workerId = backend.sRecv; 63 availableWorkers ~= workerId; 64 65 // second frame is empty 66 string delimiter = backend.sRecv; 67 assert(delimiter.empty); 68 69 // third frame is READY or else a client reply id 70 string clientId = backend.sRecv; 71 if (clientId != "READY") 72 { 73 delimiter = backend.sRecv; 74 assert(delimiter.empty); 75 auto reply = backend.sRecv; 76 frontend.send(clientId, true); 77 frontend.send("", true); 78 frontend.send(reply); 79 // exit once all clients are served 80 if (++requestsServed == clientCount) 81 break; 82 } 83 } 84 85 // Here is how we handle a client request: 86 if (items[1].returnedEvents & PollFlags.pollIn) 87 { 88 string clientId = frontend.sRecv; 89 string delimiter = frontend.sRecv; 90 assert(delimiter.empty); 91 string request = frontend.sRecv; 92 93 // Dequeue and drop the next worker identity 94 string worker = availableWorkers.front; 95 availableWorkers.popFront; 96 backend.send(worker, true); 97 backend.send("", true); 98 backend.send(clientId, true); 99 backend.send("", true); 100 backend.send(request); 101 102 } 103 } 104 import core.stdc.stdlib : exit; 105 exit(0); 106 } 107 108 // While this example runs in a single process, that is just to make 109 // it easier to start and stop the example. Each thread has its own 110 // context and conceptually acts as a separate process. 111 // Basic request-reply client using REQ socket 112 // Because s_send and s_recv can't handle 0MQ binary identities, we 113 // set a printable text identity to allow routing. 114 void clientTask(int clientNumber) 115 { 116 Context ctx = Context(); 117 auto socket = Socket(ctx, SocketType.req); 118 socket.identity = clientNumber.to!string; 119 socket.linger = infiniteDuration; 120 socket.connect("tcp://localhost:5672"); 121 122 socket.send("HELLO"); 123 string reply = socket.sRecv; 124 writefln("Client [%s]: %s", clientNumber, reply); 125 } 126 127 // worker task 128 // While this example runs in a single process, that is just to make 129 // it easier to start and stop the example. Each thread has its own 130 // context and conceptually acts as a separate process. 131 // This is the worker task, using a REQ socket to do load-balancing. 132 // Because s_send and s_recv can't handle 0MQ binary identities, we 133 // set a printable text identity to allow routing. 134 135 void workerTask(int workerNumber) 136 { 137 Context ctx = Context(); 138 auto worker = Socket(ctx, SocketType.req); 139 worker.identity = workerNumber.to!string; 140 worker.connect("tcp://localhost:5673"); 141 142 worker.send("READY"); 143 while (true) 144 { 145 import core.time : seconds; 146 147 // Read and save all frames until we get an empty frame 148 // In this example there is only 1, but there could be more 149 string identity = worker.sRecv; 150 string delimiter = worker.sRecv; 151 assert(delimiter.empty); 152 153 string request = worker.sRecv; 154 writefln("Worker: [%s], %s", workerNumber, request); 155 worker.send(identity, true); 156 worker.send("", true); 157 worker.send("OK"); 158 } 159 }