1 module dutils.message.client; 2 3 import std.uuid : UUID; 4 import std.exception : enforce; 5 6 import std.datetime.systime : Clock; 7 import symmetry.api.rabbitmq; 8 import util.log : Log, stderrLogger, stdoutLogger, LogLevel, orBelow; 9 10 import dutils.validation.validate : validate; 11 import dutils.validation.constraints : ValidateRequired, ValidateMinimumLength, 12 ValidateMaximum, ValidateMinimum; 13 import dutils.random : randomUUID; 14 import dutils.message.message : Message, ResponseStatus, SetResponseStatus, ResponseTimeout; 15 import dutils.message.subscription : Subscription; 16 import dutils.message.exceptions : ConnectException, PublishException, DeclareQueueException; 17 18 struct ClientParameters { 19 @ValidateRequired() 20 @ValidateMinimumLength(1) 21 string host = "localhost"; 22 23 @ValidateRequired() 24 @ValidateMinimum!ushort(1) @ValidateMaximum!ushort(65535) ushort port = 5672; 25 26 @ValidateRequired() 27 @ValidateMinimumLength(1) 28 string username = "guest"; 29 30 @ValidateRequired() @ValidateMinimumLength(1) 31 string password = "guest"; 32 } 33 34 enum QueueType { 35 DURABLE, 36 AUTO_DELETE 37 } 38 39 private struct ClientState { 40 bool closeAfterUpdate = false; 41 bool updateRunning = false; 42 } 43 44 private struct Request { 45 Message message; 46 void delegate(Message) callback; 47 } 48 49 class Client { 50 package amqp_connection_state_t connection; 51 package Subscription[] subscriptions; 52 private ClientParameters _parameters; 53 private Request[UUID] requests; 54 private Subscription responseSubscription; 55 private ushort lastSubscriptionChannel = 1; 56 package QueueType[string] declaredQueues; 57 package Log logger; 58 private ClientState state; 59 60 @property ClientParameters parameters() { 61 return this._parameters; 62 } 63 64 @property bool isConnected() { 65 return this.connection != null; 66 } 67 68 this() { 69 this(ClientParameters()); 70 } 71 72 this(const ClientParameters parameters) { 73 this.logger = Log(stderrLogger, stdoutLogger(LogLevel.info.orBelow)); 74 75 validate(parameters); 76 this._parameters = parameters; 77 this.connect(); 78 } 79 80 ~this() { 81 this.close(); 82 } 83 84 void connect() { 85 this.close(); 86 87 amqp_connection_state_t connection; 88 89 try { 90 // TODO: add SSL support 91 connection = amqp_new_connection(); 92 93 auto socket = amqp_tcp_socket_new(connection); 94 enforce(socket !is null, "creating tcp socket"); 95 96 import std.string : toStringz, fromStringz; 97 import std.conv : to; 98 99 auto status = amqp_socket_open(socket, this.parameters.host.toStringz, 100 this.parameters.port.to!int); 101 enforce(status == 0, "opening socket: " ~ amqp_error_string2(status).fromStringz); 102 103 auto username = this.parameters.username; 104 auto password = this.parameters.password; 105 die_on_amqp_error(amqp_login(connection, "/", 0, 131072, 0, 106 SaslMethod.plain, username.toStringz, password.toStringz), "Logging in"); 107 108 amqp_channel_open(connection, 1); 109 die_on_amqp_error(amqp_get_rpc_reply(connection), "Opening channel"); 110 } catch (Exception exception) { 111 throw new ConnectException(this.parameters.host, this.parameters.port, 112 this.parameters.username); 113 } 114 115 this.connection = connection; 116 } 117 118 void close() { 119 if (!this.isConnected) { 120 return; 121 } 122 123 if (this.state.updateRunning) { 124 this.state.closeAfterUpdate = true; 125 return; 126 } 127 128 try { 129 foreach (subscription; this.subscriptions) { 130 subscription.close(); 131 } 132 133 if (this.connection) { 134 die_on_amqp_error(amqp_channel_close(this.connection, 1, ReplySuccess), "Closing channel"); 135 die_on_amqp_error(amqp_connection_close(this.connection, ReplySuccess), 136 "Closing connection"); 137 die_on_error(amqp_destroy_connection(this.connection), "Ending connection"); 138 } 139 } catch (Exception exception) { 140 // TODO: add error handler 141 } 142 143 this.responseSubscription = null; 144 this.connection = null; 145 this.state.closeAfterUpdate = false; 146 } 147 148 void publish(string queueName, Message message) { 149 this.publish(queueName, message, QueueType.AUTO_DELETE); 150 } 151 152 void publish(string queueName, ref Message message) { 153 this.publish(queueName, message, QueueType.AUTO_DELETE); 154 } 155 156 void publish(string queueName, ref Message message, QueueType queueType) { 157 if (!this.isConnected) { 158 this.connect(); 159 } 160 161 if (!(queueName in this.declaredQueues)) { 162 this.declareQueue(queueName, queueType); 163 } 164 165 try { 166 import std.conv : to; 167 168 amqp_basic_properties_t messageProperties; 169 messageProperties._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG 170 | AMQP_BASIC_EXPIRATION_FLAG | AMQP_BASIC_CORRELATION_ID_FLAG | AMQP_BASIC_TYPE_FLAG; 171 172 messageProperties.content_type = amqp_string("application/octet-stream"); 173 messageProperties.delivery_mode = 2; /* persistent delivery mode */ 174 messageProperties.correlation_id = amqp_string(message.correlationId.toString()); 175 messageProperties.type = amqp_string(message.type); 176 177 auto expiration = message.expires - Clock.currTime(); 178 messageProperties.expiration = amqp_string((expiration.total!"msecs").to!string); 179 180 if (message.replyToQueueName != "") { 181 messageProperties._flags |= AMQP_BASIC_REPLY_TO_FLAG; 182 messageProperties.reply_to = amqp_string(message.replyToQueueName); 183 } 184 185 /** 186 * Example in C for constructing header entries: 187 * https://github.com/alanxz/rabbitmq-c/blob/a65c64c0efd883f3e200bd8831ad3ca066ea523c/tests/test_merge_capabilities.c#L163 188 */ 189 import std.string : toStringz; 190 191 messageProperties._flags |= AMQP_BASIC_HEADERS_FLAG; 192 193 int headersIndex = -1; 194 amqp_table_entry_t[4] headerEntries; 195 196 headersIndex++; 197 headerEntries[headersIndex].key = amqp_cstring_bytes("created"); 198 headerEntries[headersIndex].value.kind = AMQP_FIELD_KIND_UTF8; 199 headerEntries[headersIndex].value.value.bytes = amqp_cstring_bytes( 200 message.created.toISOExtString().toStringz); 201 202 headersIndex++; 203 headerEntries[headersIndex].key = amqp_cstring_bytes("expires"); 204 headerEntries[headersIndex].value.kind = AMQP_FIELD_KIND_UTF8; 205 headerEntries[headersIndex].value.value.bytes = amqp_cstring_bytes( 206 message.expires.toISOExtString().toStringz); 207 208 if (message.token != "") { 209 headersIndex++; 210 headerEntries[headersIndex].key = amqp_cstring_bytes("token"); 211 headerEntries[headersIndex].value.kind = AMQP_FIELD_KIND_UTF8; 212 headerEntries[headersIndex].value.value.bytes = amqp_cstring_bytes(message.token.toStringz); 213 } 214 215 if (message.responseStatus != ResponseStatus.NOT_APPLICABLE) { 216 headersIndex++; 217 headerEntries[headersIndex].key = amqp_cstring_bytes("responseStatus"); 218 headerEntries[headersIndex].value.kind = AMQP_FIELD_KIND_U16; 219 headerEntries[headersIndex].value.value.u16 = message.responseStatus.to!ushort; 220 } 221 222 messageProperties.headers.num_entries = headersIndex + 1; 223 messageProperties.headers.entries = headerEntries.ptr; 224 225 amqp_bytes_t payload; 226 auto data = cast(char[]) message.payload.data; 227 payload.len = data.length; 228 payload.bytes = data.ptr; 229 230 die_on_error(amqp_basic_publish(this.connection, 1, amqp_string("amq.direct"), 231 amqp_string(queueName), 0, 0, &messageProperties, payload), "Publishing"); 232 233 } catch (Exception exception) { 234 throw new PublishException(message.type, queueName); 235 } 236 } 237 238 Subscription subscribe(string queueName, void delegate(Message) callback) { 239 return this.subscribe(queueName, callback, QueueType.AUTO_DELETE); 240 } 241 242 Subscription subscribe(string queueName, void delegate(Message) callback, QueueType queueType) { 243 if (!this.isConnected) { 244 this.connect(); 245 } 246 247 auto subscription = new Subscription(this, queueName, queueType, 248 ++this.lastSubscriptionChannel, callback); 249 this.subscriptions ~= subscription; 250 251 return subscription; 252 } 253 254 private void createResponseSubscription() { 255 auto responseQueueName = "responselistener-" ~ randomUUID().toString(); 256 257 this.responseSubscription = this.subscribe(responseQueueName, (Message response) { 258 if (response.correlationId in this.requests) { 259 auto request = this.requests[response.correlationId]; 260 261 try { 262 if (request.message.hasExpired == false) { 263 request.callback(response); 264 } else { 265 request.callback(request.message.createResponse(ResponseTimeout())); 266 } 267 } catch (Exception exception) { 268 this.logger.error(exception); 269 } 270 271 this.requests.remove(response.correlationId); 272 } 273 }); 274 } 275 276 void request(string queueName, Message requestMessage, void delegate(Message) callback) { 277 this.request(queueName, requestMessage, callback); 278 } 279 280 void request(string queueName, ref Message requestMessage, void delegate(Message) callback) { 281 if (this.responseSubscription is null) { 282 this.createResponseSubscription(); 283 } 284 285 if (requestMessage.correlationId.empty()) { 286 requestMessage.correlationId = randomUUID(); 287 } 288 289 requestMessage.replyToQueueName = this.responseSubscription.getQueueName(); 290 291 this.requests[requestMessage.correlationId] = Request(requestMessage, callback); 292 293 this.publish(queueName, requestMessage); 294 } 295 296 void update() { 297 import std.algorithm : remove, countUntil; 298 299 this.state.updateRunning = true; 300 301 foreach (subscription; this.subscriptions) { 302 if (subscription.isClosed) { 303 auto index = this.subscriptions.countUntil(this); 304 if (index >= 0) { 305 this.subscriptions.remove(index); 306 } 307 } else { 308 subscription.fiber.call(); 309 } 310 } 311 312 foreach (request; this.requests) { 313 if (request.message.hasExpired == true) { 314 try { 315 request.callback(request.message.createResponse(ResponseTimeout())); 316 } catch (Exception exception) { 317 this.logger.error(exception); 318 } 319 this.requests.remove(request.message.correlationId); 320 } 321 } 322 323 this.state.updateRunning = false; 324 if (this.state.closeAfterUpdate) { 325 this.close(); 326 } 327 } 328 329 void keepUpdating() { 330 import core.thread : Thread; 331 import core.time : msecs; 332 333 while (this.isConnected) { 334 this.update(); 335 Thread.sleep(1.msecs); 336 } 337 } 338 339 package void declareQueue(string queueName, QueueType type) { 340 try { 341 { 342 amqp_queue_declare_ok_t* result = amqp_queue_declare(this.connection, 1, amqp_string(queueName), 0, 343 type == QueueType.DURABLE, 0, type == QueueType.AUTO_DELETE, 344 cast(amqp_table_t) amqp_empty_table); 345 die_on_amqp_error(amqp_get_rpc_reply(this.connection), "Declaring queue"); 346 auto generatedQueueName = amqp_bytes_malloc_dup(result.queue); 347 enforce(generatedQueueName.bytes !is null, "Out of memory while copying queue name"); 348 349 amqp_queue_bind(this.connection, 1, generatedQueueName, amqp_string("amq.direct"), 350 amqp_string(queueName), cast(amqp_table_t) amqp_empty_table); 351 die_on_amqp_error(amqp_get_rpc_reply(this.connection), "Binding queue"); 352 } 353 354 this.declaredQueues[queueName] = type; 355 } catch (Exception exception) { 356 throw new DeclareQueueException(queueName); 357 } 358 } 359 } 360 361 /** 362 * Client#connect/isConnected/close - connect to the AMQP server 363 */ 364 unittest { 365 import dutils.testing : assertEqual; 366 367 auto client = new Client(); 368 assertEqual(client.isConnected, true); 369 370 client.close(); 371 assertEqual(client.isConnected, false); 372 373 client.connect(); 374 assertEqual(client.isConnected, true); 375 376 ConnectException connectionError; 377 try { 378 client = new Client(ClientParameters("badhostname")); 379 } catch (ConnectException exception) { 380 connectionError = exception; 381 } 382 383 assertEqual(connectionError.message, 384 "Unable to connect to AMQP 0-9-1 service at badhostname:5672 with username guest"); 385 } 386 387 /** 388 * Client#subscribe/publish - subscribe/publish to a queue on an AMQP server 389 */ 390 unittest { 391 import dutils.testing : assertEqual; 392 393 for (int index; index < 10; index++) { 394 auto client = new Client(); 395 assertEqual(client.isConnected, true); 396 397 auto service = new Client(); 398 assertEqual(service.isConnected, true); 399 400 struct DummyMessage { 401 string story; 402 } 403 404 auto dummy = DummyMessage("Lorem ipsum dolor sit amet, consectetur adipisicing elit." ~ randomUUID() 405 .toString()); 406 auto message = Message.from(dummy); 407 408 if (index % 2 == 1) { 409 client.publish("testqueue", message); 410 } 411 412 Message recievedMessage; 413 service.subscribe("testqueue", (Message recievedMessageTemp) { 414 recievedMessage = recievedMessageTemp; 415 }); 416 417 if (index % 2 == 0) { 418 client.publish("testqueue", message); 419 } 420 421 while (recievedMessage.isEmpty) { 422 service.update(); 423 } 424 425 service.close(); 426 client.close(); 427 428 assertEqual(recievedMessage.type, "DummyMessage"); 429 assertEqual(recievedMessage.correlationId, message.correlationId); 430 assertEqual(recievedMessage.payload["story"].get!string, dummy.story); 431 } 432 } 433 434 /** 435 * Client#request - request server/client 436 */ 437 unittest { 438 import std.conv : to; 439 440 import dutils.testing : assertEqual; 441 import dutils.data.bson : BSON; 442 import dutils.message.message : ResponseBadType; 443 444 auto client = new Client(); 445 auto service = new Client(); 446 447 service.subscribe("testservice", (Message request) { 448 if (request.type == "Ping") { 449 @SetResponseStatus(ResponseStatus.OK) 450 struct Pong { 451 string story; 452 } 453 454 auto payload = Pong("Playing ping pong with " ~ request.payload["name"].get!string); 455 service.publish(request.replyToQueueName, request.createResponse(payload)); 456 } else { 457 auto payload = ResponseBadType(request.type, ["Ping"]); 458 service.publish(request.replyToQueueName, request.createResponse(payload)); 459 } 460 }); 461 462 struct Ping { 463 string name; 464 } 465 466 auto payload = Ping("Anna"); 467 auto message = Message.from(payload); 468 message.token = "this is a placeholder token"; 469 470 Message response; 471 client.request("testservice", message, (Message requestResponse) { 472 response = requestResponse; 473 }); 474 475 auto badMessage = Message(); 476 badMessage.type = "ThisIsABadType"; 477 badMessage.correlationId = randomUUID(); 478 479 Message errorResponse; 480 client.request("testservice", badMessage, (Message requestResponse) { 481 errorResponse = requestResponse; 482 }); 483 484 while (response.isEmpty || errorResponse.isEmpty) { 485 service.update(); 486 client.update(); 487 } 488 489 service.close(); 490 client.close(); 491 492 assertEqual(response.type, "Pong"); 493 assertEqual(response.correlationId, message.correlationId); 494 assertEqual(response.payload["story"].get!string, "Playing ping pong with Anna"); 495 assertEqual(response.responseStatus, ResponseStatus.OK); 496 assertEqual(response.token, "this is a placeholder token"); 497 498 assertEqual(errorResponse.type, "ResponseBadType"); 499 assertEqual(errorResponse.correlationId, badMessage.correlationId); 500 assertEqual(errorResponse.payload["supportedTypes"][0].get!string, "Ping"); 501 assertEqual(errorResponse.responseStatus, ResponseStatus.BAD_TYPE); 502 } 503 504 /** 505 * Client#request - request timeout 506 */ 507 unittest { 508 import std.conv : to; 509 import core.time : msecs; 510 511 import dutils.testing : assertEqual; 512 513 auto client = new Client(); 514 515 struct Ping { 516 string name; 517 } 518 519 auto payload = Ping("Anna"); 520 auto message = Message.from(payload); 521 message.token = "this is a placeholder token"; 522 message.setToExpireAfter(1000.msecs); 523 524 Message response; 525 client.request("testservicetimeout", message, (Message requestResponse) { 526 response = requestResponse; 527 }); 528 529 while (response.isEmpty) { 530 client.update(); 531 } 532 533 client.close(); 534 535 assertEqual(response.type, "ResponseTimeout"); 536 assertEqual(response.correlationId, message.correlationId); 537 assertEqual(response.responseStatus, ResponseStatus.TIMEOUT); 538 assertEqual(response.token, "this is a placeholder token"); 539 } 540 541 /** 542 * Client#keepUpdating - should loop until 543 */ 544 unittest { 545 import dutils.data.bson : BSON; 546 547 import dutils.testing : assertEqual; 548 549 auto client = new Client(); 550 551 struct Hello { 552 string from; 553 } 554 555 auto gotMessage = false; 556 557 Subscription subscription; 558 559 subscription = client.subscribe("testservice", (Message message) { 560 if (message.type == "Hello" && message.payload["from"].get!string == "Pelle") { 561 gotMessage = true; 562 client.close(); 563 } 564 }); 565 566 auto message = Message.from(Hello("Pelle")); 567 client.publish("testservice", message); 568 569 client.keepUpdating(); 570 571 assertEqual(gotMessage, true); 572 }