1 module dutils.message.subscription;
2 
3 import core.thread : Fiber;
4 import std.uuid : UUID;
5 import std.conv : to;
6 
7 import symmetry.api.rabbitmq;
8 
9 import dutils.data.bson : BSON;
10 import dutils.message.message : Message, ResponseStatus;
11 import dutils.message.client : Client, QueueType;
12 import dutils.message.exceptions : SubscribeQueueException,
13   MessageBodyException, MessageHeaderException;
14 
15 class Subscription {
16   private Client client;
17   private string queueName;
18   private QueueType queueType;
19   private ushort channel;
20   package Fiber fiber;
21   private void delegate(Message) callback;
22   private bool _isClosed = true;
23 
24   this(Client client, string queueName, QueueType queueType, ushort channel,
25       void delegate(Message) callback) {
26     this.client = client;
27     this.queueName = queueName;
28     this.queueType = queueType;
29     this.channel = channel;
30     this.callback = callback;
31     this.fiber = new Fiber(&this.handler);
32     this.fiber.call();
33   }
34 
35   void close() {
36     try {
37       if (!this._isClosed) {
38         die_on_amqp_error(amqp_channel_close(this.client.connection,
39             this.channel, ReplySuccess), "Closing channel");
40       }
41     } catch (Exception exception) {
42     }
43 
44     this._isClosed = true;
45   }
46 
47   @property bool isClosed() {
48     return this._isClosed;
49   }
50 
51   string getQueueName() {
52     return this.queueName;
53   }
54 
55   private void handler() {
56     import std.string : fromStringz;
57 
58     if (!(this.queueName in this.client.declaredQueues)) {
59       this.client.declareQueue(this.queueName, this.queueType);
60     }
61 
62     try {
63       amqp_channel_open(this.client.connection, this.channel);
64       die_on_amqp_error(amqp_get_rpc_reply(this.client.connection), "Opening channel");
65 
66       amqp_basic_consume(this.client.connection, this.channel,
67           amqp_string(this.queueName), cast(amqp_bytes_t) amqp_empty_bytes, 0,
68           0, 0, cast(amqp_table_t) amqp_empty_table);
69       die_on_amqp_error(amqp_get_rpc_reply(this.client.connection), "Consuming");
70     } catch (Exception exception) {
71       throw new SubscribeQueueException(this.queueName, exception);
72     }
73 
74     this._isClosed = false;
75 
76     while (!this.isClosed) {
77       Message message;
78       bool gotException = false;
79       amqp_envelope_t envelope;
80 
81       try {
82         amqp_rpc_reply_t reply;
83 
84         amqp_maybe_release_buffers(this.client.connection);
85 
86         timeval timeout;
87         timeout.tv_usec = 100_000;
88         reply = amqp_consume_message(this.client.connection, &envelope, &timeout, 0);
89 
90         if (reply.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION
91             && reply.library_error == AMQP_STATUS_TIMEOUT) {
92           Fiber.yield();
93           continue;
94         } else if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
95           if (reply.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) {
96             throw new Exception(amqp_error_string2(reply.library_error).fromStringz.to!string);
97           } else if (reply.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) {
98             throw new Exception(amqp_error_string2(reply.reply.id).fromStringz.to!string);
99           }
100 
101           throw new Exception("Unknown error from AMQP service");
102         }
103 
104         message = this.envelopeToMessage(&envelope);
105         this.callback(message);
106       } catch (Exception exception) {
107         this.client.logger.error(exception);
108         gotException = exception is null;
109       }
110 
111       // TODO: evaluate if this extra message re-delivery try is working as intended, should there be more retrys?
112       if (gotException == true) {
113         amqp_basic_reject(this.client.connection, this.channel,
114             envelope.delivery_tag, envelope.redelivered == false);
115       } else {
116         amqp_basic_ack(this.client.connection, this.channel, envelope.delivery_tag, false);
117       }
118 
119       amqp_destroy_envelope(&envelope);
120 
121       Fiber.yield();
122     }
123 
124     this.close();
125   }
126 
127   private Message envelopeToMessage(amqp_envelope_t* envelope) {
128     import core.time : msecs;
129 
130     auto message = Message();
131 
132     if (envelope.message.properties._flags & AMQP_BASIC_CORRELATION_ID_FLAG) {
133       message.correlationId = UUID(envelope.message.properties.correlation_id.asString());
134     }
135 
136     if (envelope.message.properties._flags & AMQP_BASIC_TYPE_FLAG) {
137       message.type = envelope.message.properties.type.asString();
138     }
139 
140     if (envelope.message.properties._flags & AMQP_BASIC_REPLY_TO_FLAG) {
141       const value = envelope.message.properties.reply_to.asString();
142       if (value != "") {
143         message.replyToQueueName = value;
144       }
145     }
146 
147     if (envelope.message.properties._flags & AMQP_BASIC_HEADERS_FLAG) {
148       import std.datetime.systime : SysTime;
149 
150       auto headers = envelope.message.properties.headers;
151 
152       for (int index = 0; index < headers.num_entries; index++) {
153         auto entry = headers.entries[index];
154         auto key = entry.key.asString();
155 
156         if (key == "created") {
157           try {
158             message.created = SysTime.fromISOExtString(entry.value.value.bytes.asString());
159           } catch (Exception exception) {
160             throw new MessageHeaderException(key, message.type);
161           }
162         } else if (key == "expires") {
163           try {
164             message.expires = SysTime.fromISOExtString(entry.value.value.bytes.asString());
165           } catch (Exception exception) {
166             throw new MessageHeaderException(key, message.type);
167           }
168         } else if (key == "token") {
169           message.token = entry.value.value.bytes.asString();
170         } else if (key == "responseStatus") {
171           try {
172             message.responseStatus = (cast(int) entry.value.value.u16).to!ResponseStatus;
173           } catch (Exception exception) {
174             throw new MessageHeaderException(key, message.type);
175           }
176         }
177       }
178 
179       const value = envelope.message.properties.user_id.asString();
180       if (value != "") {
181         message.token = value;
182       }
183     }
184 
185     if (envelope.message.body_.len > 0) {
186       try {
187         auto body = cast(immutable(ubyte)[]) fromBytes(envelope.message.body_.bytes,
188             envelope.message.body_.len);
189         message.payload = BSON(BSON.Type.object, body);
190       } catch (Exception exception) {
191         throw new MessageBodyException(message.type);
192       }
193     }
194 
195     return message;
196   }
197 }
198 
199 private char[] fromBytes(void* ptr, ulong len) {
200   return (cast(char*) ptr)[0 .. len].dup;
201 }
202 
203 private string asString(amqp_bytes_t bytes) {
204   return (cast(char*) bytes.bytes)[0 .. bytes.len].idup;
205 }