1 module dutils.message.client;
2 
3 import std.uuid : UUID;
4 import std.exception : enforce;
5 
6 import std.datetime.systime : Clock;
7 import symmetry.api.rabbitmq;
8 import util.log : Log, stderrLogger, stdoutLogger, LogLevel, orBelow;
9 
10 import dutils.validation.validate : validate;
11 import dutils.validation.constraints : ValidateRequired, ValidateMinimumLength,
12   ValidateMaximum, ValidateMinimum;
13 import dutils.random : randomUUID;
14 import dutils.message.message : Message, ResponseStatus, SetResponseStatus, ResponseTimeout;
15 import dutils.message.subscription : Subscription;
16 import dutils.message.exceptions : ConnectException, PublishException, DeclareQueueException;
17 
18 struct ClientParameters {
19   @ValidateRequired()
20   @ValidateMinimumLength(1)
21   string host = "localhost";
22 
23   @ValidateRequired()
24   @ValidateMinimum!ushort(1) @ValidateMaximum!ushort(65535) ushort port = 5672;
25 
26   @ValidateRequired()
27   @ValidateMinimumLength(1)
28   string username = "guest";
29 
30   @ValidateRequired() @ValidateMinimumLength(1)
31   string password = "guest";
32 }
33 
34 enum QueueType {
35   DURABLE,
36   AUTO_DELETE
37 }
38 
39 private struct ClientState {
40   bool closeAfterUpdate = false;
41   bool updateRunning = false;
42 }
43 
44 private struct Request {
45   Message message;
46   void delegate(Message) callback;
47 }
48 
49 class Client {
50   package amqp_connection_state_t connection;
51   package Subscription[] subscriptions;
52   private ClientParameters _parameters;
53   private Request[UUID] requests;
54   private Subscription responseSubscription;
55   private ushort lastSubscriptionChannel = 1;
56   package QueueType[string] declaredQueues;
57   package Log logger;
58   private ClientState state;
59 
60   @property ClientParameters parameters() {
61     return this._parameters;
62   }
63 
64   @property bool isConnected() {
65     return this.connection != null;
66   }
67 
68   this() {
69     this(ClientParameters());
70   }
71 
72   this(const ClientParameters parameters) {
73     this.logger = Log(stderrLogger, stdoutLogger(LogLevel.info.orBelow));
74 
75     validate(parameters);
76     this._parameters = parameters;
77     this.connect();
78   }
79 
80   ~this() {
81     this.close();
82   }
83 
84   void connect() {
85     this.close();
86 
87     amqp_connection_state_t connection;
88 
89     try {
90       // TODO: add SSL support
91       connection = amqp_new_connection();
92 
93       auto socket = amqp_tcp_socket_new(connection);
94       enforce(socket !is null, "creating tcp socket");
95 
96       import std.string : toStringz, fromStringz;
97       import std.conv : to;
98 
99       auto status = amqp_socket_open(socket, this.parameters.host.toStringz,
100           this.parameters.port.to!int);
101       enforce(status == 0, "opening socket: " ~ amqp_error_string2(status).fromStringz);
102 
103       auto username = this.parameters.username;
104       auto password = this.parameters.password;
105       die_on_amqp_error(amqp_login(connection, "/", 0, 131072, 0,
106           SaslMethod.plain, username.toStringz, password.toStringz), "Logging in");
107 
108       amqp_channel_open(connection, 1);
109       die_on_amqp_error(amqp_get_rpc_reply(connection), "Opening channel");
110     } catch (Exception exception) {
111       throw new ConnectException(this.parameters.host, this.parameters.port,
112           this.parameters.username);
113     }
114 
115     this.connection = connection;
116   }
117 
118   void close() {
119     if (!this.isConnected) {
120       return;
121     }
122 
123     if (this.state.updateRunning) {
124       this.state.closeAfterUpdate = true;
125       return;
126     }
127 
128     try {
129       foreach (subscription; this.subscriptions) {
130         subscription.close();
131       }
132 
133       if (this.connection) {
134         die_on_amqp_error(amqp_channel_close(this.connection, 1, ReplySuccess), "Closing channel");
135         die_on_amqp_error(amqp_connection_close(this.connection, ReplySuccess),
136             "Closing connection");
137         die_on_error(amqp_destroy_connection(this.connection), "Ending connection");
138       }
139     } catch (Exception exception) {
140       // TODO: add error handler
141     }
142 
143     this.responseSubscription = null;
144     this.connection = null;
145     this.state.closeAfterUpdate = false;
146   }
147 
148   void publish(string queueName, Message message) {
149     this.publish(queueName, message, QueueType.AUTO_DELETE);
150   }
151 
152   void publish(string queueName, ref Message message) {
153     this.publish(queueName, message, QueueType.AUTO_DELETE);
154   }
155 
156   void publish(string queueName, ref Message message, QueueType queueType) {
157     if (!this.isConnected) {
158       this.connect();
159     }
160 
161     if (!(queueName in this.declaredQueues)) {
162       this.declareQueue(queueName, queueType);
163     }
164 
165     try {
166       import std.conv : to;
167 
168       amqp_basic_properties_t messageProperties;
169       messageProperties._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG
170         | AMQP_BASIC_EXPIRATION_FLAG | AMQP_BASIC_CORRELATION_ID_FLAG | AMQP_BASIC_TYPE_FLAG;
171 
172       messageProperties.content_type = amqp_string("application/octet-stream");
173       messageProperties.delivery_mode = 2; /* persistent delivery mode */
174       messageProperties.correlation_id = amqp_string(message.correlationId.toString());
175       messageProperties.type = amqp_string(message.type);
176 
177       auto expiration = message.expires - Clock.currTime();
178       messageProperties.expiration = amqp_string((expiration.total!"msecs").to!string);
179 
180       if (message.replyToQueueName != "") {
181         messageProperties._flags |= AMQP_BASIC_REPLY_TO_FLAG;
182         messageProperties.reply_to = amqp_string(message.replyToQueueName);
183       }
184 
185       /**
186        * Example in C for constructing header entries:
187        * https://github.com/alanxz/rabbitmq-c/blob/a65c64c0efd883f3e200bd8831ad3ca066ea523c/tests/test_merge_capabilities.c#L163
188        */
189       import std.string : toStringz;
190 
191       messageProperties._flags |= AMQP_BASIC_HEADERS_FLAG;
192 
193       int headersIndex = -1;
194       amqp_table_entry_t[4] headerEntries;
195 
196       headersIndex++;
197       headerEntries[headersIndex].key = amqp_cstring_bytes("created");
198       headerEntries[headersIndex].value.kind = AMQP_FIELD_KIND_UTF8;
199       headerEntries[headersIndex].value.value.bytes = amqp_cstring_bytes(
200           message.created.toISOExtString().toStringz);
201 
202       headersIndex++;
203       headerEntries[headersIndex].key = amqp_cstring_bytes("expires");
204       headerEntries[headersIndex].value.kind = AMQP_FIELD_KIND_UTF8;
205       headerEntries[headersIndex].value.value.bytes = amqp_cstring_bytes(
206           message.expires.toISOExtString().toStringz);
207 
208       if (message.token != "") {
209         headersIndex++;
210         headerEntries[headersIndex].key = amqp_cstring_bytes("token");
211         headerEntries[headersIndex].value.kind = AMQP_FIELD_KIND_UTF8;
212         headerEntries[headersIndex].value.value.bytes = amqp_cstring_bytes(message.token.toStringz);
213       }
214 
215       if (message.responseStatus != ResponseStatus.NOT_APPLICABLE) {
216         headersIndex++;
217         headerEntries[headersIndex].key = amqp_cstring_bytes("responseStatus");
218         headerEntries[headersIndex].value.kind = AMQP_FIELD_KIND_U16;
219         headerEntries[headersIndex].value.value.u16 = message.responseStatus.to!ushort;
220       }
221 
222       messageProperties.headers.num_entries = headersIndex + 1;
223       messageProperties.headers.entries = headerEntries.ptr;
224 
225       amqp_bytes_t payload;
226       auto data = cast(char[]) message.payload.data;
227       payload.len = data.length;
228       payload.bytes = data.ptr;
229 
230       die_on_error(amqp_basic_publish(this.connection, 1, amqp_string("amq.direct"),
231           amqp_string(queueName), 0, 0, &messageProperties, payload), "Publishing");
232 
233     } catch (Exception exception) {
234       throw new PublishException(message.type, queueName);
235     }
236   }
237 
238   Subscription subscribe(string queueName, void delegate(Message) callback) {
239     return this.subscribe(queueName, callback, QueueType.AUTO_DELETE);
240   }
241 
242   Subscription subscribe(string queueName, void delegate(Message) callback, QueueType queueType) {
243     if (!this.isConnected) {
244       this.connect();
245     }
246 
247     auto subscription = new Subscription(this, queueName, queueType,
248         ++this.lastSubscriptionChannel, callback);
249     this.subscriptions ~= subscription;
250 
251     return subscription;
252   }
253 
254   private void createResponseSubscription() {
255     auto responseQueueName = "responselistener-" ~ randomUUID().toString();
256 
257     this.responseSubscription = this.subscribe(responseQueueName, (Message response) {
258       if (response.correlationId in this.requests) {
259         auto request = this.requests[response.correlationId];
260 
261         try {
262           if (request.message.hasExpired == false) {
263             request.callback(response);
264           } else {
265             request.callback(request.message.createResponse(ResponseTimeout()));
266           }
267         } catch (Exception exception) {
268           this.logger.error(exception);
269         }
270 
271         this.requests.remove(response.correlationId);
272       }
273     });
274   }
275 
276   void request(string queueName, Message requestMessage, void delegate(Message) callback) {
277     this.request(queueName, requestMessage, callback);
278   }
279 
280   void request(string queueName, ref Message requestMessage, void delegate(Message) callback) {
281     if (this.responseSubscription is null) {
282       this.createResponseSubscription();
283     }
284 
285     if (requestMessage.correlationId.empty()) {
286       requestMessage.correlationId = randomUUID();
287     }
288 
289     requestMessage.replyToQueueName = this.responseSubscription.getQueueName();
290 
291     this.requests[requestMessage.correlationId] = Request(requestMessage, callback);
292 
293     this.publish(queueName, requestMessage);
294   }
295 
296   void update() {
297     import std.algorithm : remove, countUntil;
298 
299     this.state.updateRunning = true;
300 
301     foreach (subscription; this.subscriptions) {
302       if (subscription.isClosed) {
303         auto index = this.subscriptions.countUntil(this);
304         if (index >= 0) {
305           this.subscriptions.remove(index);
306         }
307       } else {
308         subscription.fiber.call();
309       }
310     }
311 
312     foreach (request; this.requests) {
313       if (request.message.hasExpired == true) {
314         try {
315           request.callback(request.message.createResponse(ResponseTimeout()));
316         } catch (Exception exception) {
317           this.logger.error(exception);
318         }
319         this.requests.remove(request.message.correlationId);
320       }
321     }
322 
323     this.state.updateRunning = false;
324     if (this.state.closeAfterUpdate) {
325       this.close();
326     }
327   }
328 
329   void keepUpdating() {
330     import core.thread : Thread;
331     import core.time : msecs;
332 
333     while (this.isConnected) {
334       this.update();
335       Thread.sleep(1.msecs);
336     }
337   }
338 
339   package void declareQueue(string queueName, QueueType type) {
340     try {
341       {
342         amqp_queue_declare_ok_t* result = amqp_queue_declare(this.connection, 1, amqp_string(queueName), 0,
343             type == QueueType.DURABLE, 0, type == QueueType.AUTO_DELETE,
344             cast(amqp_table_t) amqp_empty_table);
345         die_on_amqp_error(amqp_get_rpc_reply(this.connection), "Declaring queue");
346         auto generatedQueueName = amqp_bytes_malloc_dup(result.queue);
347         enforce(generatedQueueName.bytes !is null, "Out of memory while copying queue name");
348 
349         amqp_queue_bind(this.connection, 1, generatedQueueName, amqp_string("amq.direct"),
350             amqp_string(queueName), cast(amqp_table_t) amqp_empty_table);
351         die_on_amqp_error(amqp_get_rpc_reply(this.connection), "Binding queue");
352       }
353 
354       this.declaredQueues[queueName] = type;
355     } catch (Exception exception) {
356       throw new DeclareQueueException(queueName);
357     }
358   }
359 }
360 
361 /**
362  * Client#connect/isConnected/close - connect to the AMQP server
363  */
364 unittest {
365   import dutils.testing : assertEqual;
366 
367   auto client = new Client();
368   assertEqual(client.isConnected, true);
369 
370   client.close();
371   assertEqual(client.isConnected, false);
372 
373   client.connect();
374   assertEqual(client.isConnected, true);
375 
376   ConnectException connectionError;
377   try {
378     client = new Client(ClientParameters("badhostname"));
379   } catch (ConnectException exception) {
380     connectionError = exception;
381   }
382 
383   assertEqual(connectionError.message,
384       "Unable to connect to AMQP 0-9-1 service at badhostname:5672 with username guest");
385 }
386 
387 /**
388  * Client#subscribe/publish - subscribe/publish to a queue on an AMQP server
389  */
390 unittest {
391   import dutils.testing : assertEqual;
392 
393   for (int index; index < 10; index++) {
394     auto client = new Client();
395     assertEqual(client.isConnected, true);
396 
397     auto service = new Client();
398     assertEqual(service.isConnected, true);
399 
400     struct DummyMessage {
401       string story;
402     }
403 
404     auto dummy = DummyMessage("Lorem ipsum dolor sit amet, consectetur adipisicing elit." ~ randomUUID()
405         .toString());
406     auto message = Message.from(dummy);
407 
408     if (index % 2 == 1) {
409       client.publish("testqueue", message);
410     }
411 
412     Message recievedMessage;
413     service.subscribe("testqueue", (Message recievedMessageTemp) {
414       recievedMessage = recievedMessageTemp;
415     });
416 
417     if (index % 2 == 0) {
418       client.publish("testqueue", message);
419     }
420 
421     while (recievedMessage.isEmpty) {
422       service.update();
423     }
424 
425     service.close();
426     client.close();
427 
428     assertEqual(recievedMessage.type, "DummyMessage");
429     assertEqual(recievedMessage.correlationId, message.correlationId);
430     assertEqual(recievedMessage.payload["story"].get!string, dummy.story);
431   }
432 }
433 
434 /**
435  * Client#request - request server/client
436  */
437 unittest {
438   import std.conv : to;
439 
440   import dutils.testing : assertEqual;
441   import dutils.data.bson : BSON;
442   import dutils.message.message : ResponseBadType;
443 
444   auto client = new Client();
445   auto service = new Client();
446 
447   service.subscribe("testservice", (Message request) {
448     if (request.type == "Ping") {
449       @SetResponseStatus(ResponseStatus.OK)
450       struct Pong {
451         string story;
452       }
453 
454       auto payload = Pong("Playing ping pong with " ~ request.payload["name"].get!string);
455       service.publish(request.replyToQueueName, request.createResponse(payload));
456     } else {
457       auto payload = ResponseBadType(request.type, ["Ping"]);
458       service.publish(request.replyToQueueName, request.createResponse(payload));
459     }
460   });
461 
462   struct Ping {
463     string name;
464   }
465 
466   auto payload = Ping("Anna");
467   auto message = Message.from(payload);
468   message.token = "this is a placeholder token";
469 
470   Message response;
471   client.request("testservice", message, (Message requestResponse) {
472     response = requestResponse;
473   });
474 
475   auto badMessage = Message();
476   badMessage.type = "ThisIsABadType";
477   badMessage.correlationId = randomUUID();
478 
479   Message errorResponse;
480   client.request("testservice", badMessage, (Message requestResponse) {
481     errorResponse = requestResponse;
482   });
483 
484   while (response.isEmpty || errorResponse.isEmpty) {
485     service.update();
486     client.update();
487   }
488 
489   service.close();
490   client.close();
491 
492   assertEqual(response.type, "Pong");
493   assertEqual(response.correlationId, message.correlationId);
494   assertEqual(response.payload["story"].get!string, "Playing ping pong with Anna");
495   assertEqual(response.responseStatus, ResponseStatus.OK);
496   assertEqual(response.token, "this is a placeholder token");
497 
498   assertEqual(errorResponse.type, "ResponseBadType");
499   assertEqual(errorResponse.correlationId, badMessage.correlationId);
500   assertEqual(errorResponse.payload["supportedTypes"][0].get!string, "Ping");
501   assertEqual(errorResponse.responseStatus, ResponseStatus.BAD_TYPE);
502 }
503 
504 /**
505  * Client#request - request timeout
506  */
507 unittest {
508   import std.conv : to;
509   import core.time : msecs;
510 
511   import dutils.testing : assertEqual;
512 
513   auto client = new Client();
514 
515   struct Ping {
516     string name;
517   }
518 
519   auto payload = Ping("Anna");
520   auto message = Message.from(payload);
521   message.token = "this is a placeholder token";
522   message.setToExpireAfter(1000.msecs);
523 
524   Message response;
525   client.request("testservicetimeout", message, (Message requestResponse) {
526     response = requestResponse;
527   });
528 
529   while (response.isEmpty) {
530     client.update();
531   }
532 
533   client.close();
534 
535   assertEqual(response.type, "ResponseTimeout");
536   assertEqual(response.correlationId, message.correlationId);
537   assertEqual(response.responseStatus, ResponseStatus.TIMEOUT);
538   assertEqual(response.token, "this is a placeholder token");
539 }
540 
541 /**
542  * Client#keepUpdating - should loop until
543  */
544 unittest {
545   import dutils.data.bson : BSON;
546 
547   import dutils.testing : assertEqual;
548 
549   auto client = new Client();
550 
551   struct Hello {
552     string from;
553   }
554 
555   auto gotMessage = false;
556 
557   Subscription subscription;
558 
559   subscription = client.subscribe("testservice", (Message message) {
560     if (message.type == "Hello" && message.payload["from"].get!string == "Pelle") {
561       gotMessage = true;
562       client.close();
563     }
564   });
565 
566   auto message = Message.from(Hello("Pelle"));
567   client.publish("testservice", message);
568 
569   client.keepUpdating();
570 
571   assertEqual(gotMessage, true);
572 }