1 module examples.lbbroker2;
2 
3 import std.stdio;
4 import std.concurrency;
5 import std.range;
6 import std.conv;
7 
8 import core.thread;
9 
10 import zmqd;
11 import zhelpers;
12 
13 enum clientCount = 10;
14 enum workerCount = 3;
15 
16 void main()
17 {
18     auto frontend = Socket(SocketType.router);
19     frontend.linger = infiniteDuration;
20     frontend.bind("tcp://*:5672");
21 
22     auto backend = Socket(SocketType.router);
23     backend.linger = infiniteDuration;
24     backend.bind("tcp://*:5673");
25 
26     // starts clients
27     foreach (const int i; 0 .. clientCount)
28         spawn(&clientTask, i);
29 
30     // start workers
31     foreach (const int i; 0 .. workerCount)
32         spawn(&workerTask, i);
33 
34     //  main task body
35     //  Here is the main loop for the least-recently-used queue. It has two
36     //  sockets; a frontend for clients and a backend for workers. It polls
37     //  the backend in all cases, and polls the frontend only when there are
38     //  one or more workers ready. This is a neat way to use 0MQ's own queues
39     //  to hold messages we're not ready to process yet. When we get a client
40     //  request, we pop the next available worker and send the request to it,
41     //  including the originating client identity. When a worker replies, we
42     //  requeue that worker and forward the reply to the original client
43     //  using the reply envelope.
44 
45     //  Queue of available workers
46     string[] availableWorkers;
47     int requestsServed = 0;
48     while (true)
49     {
50         PollItem[] items = [
51             PollItem(backend, PollFlags.pollIn),
52             PollItem(frontend, PollFlags.pollIn)
53         ];
54 
55         //  Poll frontend only if we have available workers
56         poll(availableWorkers.length ? items : items[0..1]);
57 
58         // handle worker activity on backend
59         if (items[0].returnedEvents & PollFlags.pollIn)
60         {
61             // worker-id for load balancing
62             string workerId = backend.sRecv;
63             availableWorkers ~= workerId;
64 
65             // second frame is empty
66             string delimiter = backend.sRecv;
67             assert(delimiter.empty);
68 
69             // third frame is READY or else a client reply id
70             string clientId = backend.sRecv;
71             if (clientId != "READY")
72             {
73                 delimiter = backend.sRecv;
74                 assert(delimiter.empty);
75                 auto reply = backend.sRecv;
76                 frontend.send(clientId, true);
77                 frontend.send("", true);
78                 frontend.send(reply);
79                 // exit once all clients are served
80                 if (++requestsServed == clientCount)
81                     break;
82             }
83         }
84 
85         //  Here is how we handle a client request:
86         if (items[1].returnedEvents & PollFlags.pollIn)
87         {
88             string clientId = frontend.sRecv;
89             string delimiter = frontend.sRecv;
90             assert(delimiter.empty);
91             string request = frontend.sRecv;
92 
93             //  Dequeue and drop the next worker identity
94             string worker = availableWorkers.front;
95             availableWorkers.popFront;
96             backend.send(worker, true);
97             backend.send("", true);
98             backend.send(clientId, true);
99             backend.send("", true);
100             backend.send(request);
101 
102         }
103     }
104     import core.stdc.stdlib : exit;
105     exit(0);
106 }
107 
108 //  While this example runs in a single process, that is just to make
109 //  it easier to start and stop the example. Each thread has its own
110 //  context and conceptually acts as a separate process.
111 //  Basic request-reply client using REQ socket
112 //  Because s_send and s_recv can't handle 0MQ binary identities, we
113 //  set a printable text identity to allow routing.
114 void clientTask(int clientNumber)
115 {
116     Context ctx = Context();
117     auto socket = Socket(ctx, SocketType.req);
118     socket.identity = clientNumber.to!string;
119     socket.linger = infiniteDuration;
120     socket.connect("tcp://localhost:5672");
121 
122     socket.send("HELLO");
123     string reply = socket.sRecv;
124     writefln("Client [%s]: %s", clientNumber, reply);
125 }
126 
127 //  worker task
128 //  While this example runs in a single process, that is just to make
129 //  it easier to start and stop the example. Each thread has its own
130 //  context and conceptually acts as a separate process.
131 //  This is the worker task, using a REQ socket to do load-balancing.
132 //  Because s_send and s_recv can't handle 0MQ binary identities, we
133 //  set a printable text identity to allow routing.
134 
135 void workerTask(int workerNumber)
136 {
137     Context ctx = Context();
138     auto worker = Socket(ctx, SocketType.req);
139     worker.identity = workerNumber.to!string;
140     worker.connect("tcp://localhost:5673");
141 
142     worker.send("READY");
143     while (true)
144     {
145         import core.time : seconds;
146 
147         //  Read and save all frames until we get an empty frame
148         //  In this example there is only 1, but there could be more
149         string identity = worker.sRecv;
150         string delimiter = worker.sRecv;
151         assert(delimiter.empty);
152 
153         string request = worker.sRecv;
154         writefln("Worker: [%s], %s", workerNumber, request);
155         worker.send(identity, true);
156         worker.send("", true);
157         worker.send("OK");
158     }
159 }