1 module dutils.message.subscription; 2 3 import core.thread : Fiber; 4 import std.uuid : UUID; 5 import std.conv : to; 6 7 import symmetry.api.rabbitmq; 8 9 import dutils.data.bson : BSON; 10 import dutils.message.message : Message, ResponseStatus; 11 import dutils.message.client : Client, QueueType; 12 import dutils.message.exceptions : SubscribeQueueException, 13 MessageBodyException, MessageHeaderException; 14 15 class Subscription { 16 private Client client; 17 private string queueName; 18 private QueueType queueType; 19 private ushort channel; 20 package Fiber fiber; 21 private void delegate(Message) callback; 22 private bool _isClosed = true; 23 24 this(Client client, string queueName, QueueType queueType, ushort channel, 25 void delegate(Message) callback) { 26 this.client = client; 27 this.queueName = queueName; 28 this.queueType = queueType; 29 this.channel = channel; 30 this.callback = callback; 31 this.fiber = new Fiber(&this.handler); 32 this.fiber.call(); 33 } 34 35 void close() { 36 try { 37 if (!this._isClosed) { 38 die_on_amqp_error(amqp_channel_close(this.client.connection, 39 this.channel, ReplySuccess), "Closing channel"); 40 } 41 } catch (Exception exception) { 42 } 43 44 this._isClosed = true; 45 } 46 47 @property bool isClosed() { 48 return this._isClosed; 49 } 50 51 string getQueueName() { 52 return this.queueName; 53 } 54 55 private void handler() { 56 import std.string : fromStringz; 57 58 if (!(this.queueName in this.client.declaredQueues)) { 59 this.client.declareQueue(this.queueName, this.queueType); 60 } 61 62 try { 63 amqp_channel_open(this.client.connection, this.channel); 64 die_on_amqp_error(amqp_get_rpc_reply(this.client.connection), "Opening channel"); 65 66 amqp_basic_consume(this.client.connection, this.channel, 67 amqp_string(this.queueName), cast(amqp_bytes_t) amqp_empty_bytes, 0, 68 0, 0, cast(amqp_table_t) amqp_empty_table); 69 die_on_amqp_error(amqp_get_rpc_reply(this.client.connection), "Consuming"); 70 } catch (Exception exception) { 71 throw new SubscribeQueueException(this.queueName, exception); 72 } 73 74 this._isClosed = false; 75 76 while (!this.isClosed) { 77 Message message; 78 bool gotException = false; 79 amqp_envelope_t envelope; 80 81 try { 82 amqp_rpc_reply_t reply; 83 84 amqp_maybe_release_buffers(this.client.connection); 85 86 timeval timeout; 87 timeout.tv_usec = 100_000; 88 reply = amqp_consume_message(this.client.connection, &envelope, &timeout, 0); 89 90 if (reply.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION 91 && reply.library_error == AMQP_STATUS_TIMEOUT) { 92 Fiber.yield(); 93 continue; 94 } else if (reply.reply_type != AMQP_RESPONSE_NORMAL) { 95 if (reply.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) { 96 throw new Exception(amqp_error_string2(reply.library_error).fromStringz.to!string); 97 } else if (reply.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) { 98 throw new Exception(amqp_error_string2(reply.reply.id).fromStringz.to!string); 99 } 100 101 throw new Exception("Unknown error from AMQP service"); 102 } 103 104 message = this.envelopeToMessage(&envelope); 105 this.callback(message); 106 } catch (Exception exception) { 107 this.client.logger.error(exception); 108 gotException = exception is null; 109 } 110 111 // TODO: evaluate if this extra message re-delivery try is working as intended, should there be more retrys? 112 if (gotException == true) { 113 amqp_basic_reject(this.client.connection, this.channel, 114 envelope.delivery_tag, envelope.redelivered == false); 115 } else { 116 amqp_basic_ack(this.client.connection, this.channel, envelope.delivery_tag, false); 117 } 118 119 amqp_destroy_envelope(&envelope); 120 121 Fiber.yield(); 122 } 123 124 this.close(); 125 } 126 127 private Message envelopeToMessage(amqp_envelope_t* envelope) { 128 import core.time : msecs; 129 130 auto message = Message(); 131 132 if (envelope.message.properties._flags & AMQP_BASIC_CORRELATION_ID_FLAG) { 133 message.correlationId = UUID(envelope.message.properties.correlation_id.asString()); 134 } 135 136 if (envelope.message.properties._flags & AMQP_BASIC_TYPE_FLAG) { 137 message.type = envelope.message.properties.type.asString(); 138 } 139 140 if (envelope.message.properties._flags & AMQP_BASIC_REPLY_TO_FLAG) { 141 const value = envelope.message.properties.reply_to.asString(); 142 if (value != "") { 143 message.replyToQueueName = value; 144 } 145 } 146 147 if (envelope.message.properties._flags & AMQP_BASIC_HEADERS_FLAG) { 148 import std.datetime.systime : SysTime; 149 150 auto headers = envelope.message.properties.headers; 151 152 for (int index = 0; index < headers.num_entries; index++) { 153 auto entry = headers.entries[index]; 154 auto key = entry.key.asString(); 155 156 if (key == "created") { 157 try { 158 message.created = SysTime.fromISOExtString(entry.value.value.bytes.asString()); 159 } catch (Exception exception) { 160 throw new MessageHeaderException(key, message.type); 161 } 162 } else if (key == "expires") { 163 try { 164 message.expires = SysTime.fromISOExtString(entry.value.value.bytes.asString()); 165 } catch (Exception exception) { 166 throw new MessageHeaderException(key, message.type); 167 } 168 } else if (key == "token") { 169 message.token = entry.value.value.bytes.asString(); 170 } else if (key == "responseStatus") { 171 try { 172 message.responseStatus = (cast(int) entry.value.value.u16).to!ResponseStatus; 173 } catch (Exception exception) { 174 throw new MessageHeaderException(key, message.type); 175 } 176 } 177 } 178 179 const value = envelope.message.properties.user_id.asString(); 180 if (value != "") { 181 message.token = value; 182 } 183 } 184 185 if (envelope.message.body_.len > 0) { 186 try { 187 auto body = cast(immutable(ubyte)[]) fromBytes(envelope.message.body_.bytes, 188 envelope.message.body_.len); 189 message.payload = BSON(BSON.Type.object, body); 190 } catch (Exception exception) { 191 throw new MessageBodyException(message.type); 192 } 193 } 194 195 return message; 196 } 197 } 198 199 private char[] fromBytes(void* ptr, ulong len) { 200 return (cast(char*) ptr)[0 .. len].dup; 201 } 202 203 private string asString(amqp_bytes_t bytes) { 204 return (cast(char*) bytes.bytes)[0 .. bytes.len].idup; 205 }