Client

Undocumented in source.

Constructors

this
this()
Undocumented in source.
this
this(ClientParameters parameters)
Undocumented in source.

Destructor

~this
~this()
Undocumented in source.

Members

Functions

close
void close()
Undocumented in source. Be warned that the author may not have intended to support it.
connect
void connect()
Undocumented in source. Be warned that the author may not have intended to support it.
declareQueue
void declareQueue(string queueName, QueueType type)
Undocumented in source. Be warned that the author may not have intended to support it.
keepUpdating
void keepUpdating()
Undocumented in source. Be warned that the author may not have intended to support it.
publish
void publish(string queueName, Message message)
Undocumented in source. Be warned that the author may not have intended to support it.
publish
void publish(string queueName, Message message)
Undocumented in source. Be warned that the author may not have intended to support it.
publish
void publish(string queueName, Message message, QueueType queueType)
Undocumented in source. Be warned that the author may not have intended to support it.
request
void request(string queueName, Message requestMessage, void delegate(Message) callback)
Undocumented in source. Be warned that the author may not have intended to support it.
request
void request(string queueName, Message requestMessage, void delegate(Message) callback)
Undocumented in source. Be warned that the author may not have intended to support it.
subscribe
Subscription subscribe(string queueName, void delegate(Message) callback)
Undocumented in source. Be warned that the author may not have intended to support it.
subscribe
Subscription subscribe(string queueName, void delegate(Message) callback, QueueType queueType)
Undocumented in source. Be warned that the author may not have intended to support it.
update
void update()
Undocumented in source. Be warned that the author may not have intended to support it.

Properties

isConnected
bool isConnected [@property getter]
Undocumented in source. Be warned that the author may not have intended to support it.
parameters
ClientParameters parameters [@property getter]
Undocumented in source. Be warned that the author may not have intended to support it.

Variables

connection
amqp_connection_state_t connection;
Undocumented in source.
declaredQueues
QueueType[string] declaredQueues;
Undocumented in source.
logger
Log logger;
Undocumented in source.
subscriptions
Subscription[] subscriptions;
Undocumented in source.

Examples

Client#connect/isConnected/close - connect to the AMQP server

import dutils.testing : assertEqual;

auto client = new Client();
assertEqual(client.isConnected, true);

client.close();
assertEqual(client.isConnected, false);

client.connect();
assertEqual(client.isConnected, true);

ConnectException connectionError;
try {
  client = new Client(ClientParameters("badhostname"));
} catch (ConnectException exception) {
  connectionError = exception;
}

assertEqual(connectionError.message,
    "Unable to connect to AMQP 0-9-1 service at badhostname:5672 with username guest");

Client#subscribe/publish - subscribe/publish to a queue on an AMQP server

import dutils.testing : assertEqual;

for (int index; index < 10; index++) {
  auto client = new Client();
  assertEqual(client.isConnected, true);

  auto service = new Client();
  assertEqual(service.isConnected, true);

  struct DummyMessage {
    string story;
  }

  auto dummy = DummyMessage("Lorem ipsum dolor sit amet, consectetur adipisicing elit." ~ randomUUID()
      .toString());
  auto message = Message.from(dummy);

  if (index % 2 == 1) {
    client.publish("testqueue", message);
  }

  Message recievedMessage;
  service.subscribe("testqueue", (Message recievedMessageTemp) {
    recievedMessage = recievedMessageTemp;
  });

  if (index % 2 == 0) {
    client.publish("testqueue", message);
  }

  while (recievedMessage.isEmpty) {
    service.update();
  }

  service.close();
  client.close();

  assertEqual(recievedMessage.type, "DummyMessage");
  assertEqual(recievedMessage.correlationId, message.correlationId);
  assertEqual(recievedMessage.payload["story"].get!string, dummy.story);
}

Client#request - request server/client

1 import std.conv : to;
2 
3 import dutils.testing : assertEqual;
4 import dutils.data.bson : BSON;
5 import dutils.message.message : ResponseBadType;
6 
7 auto client = new Client();
8 auto service = new Client();
9 
10 service.subscribe("testservice", (Message request) {
11   if (request.type == "Ping") {
12     @SetResponseStatus(ResponseStatus.OK)
13     struct Pong {
14       string story;
15     }
16 
17     auto payload = Pong("Playing ping pong with " ~ request.payload["name"].get!string);
18     service.publish(request.replyToQueueName, request.createResponse(payload));
19   } else {
20     auto payload = ResponseBadType(request.type, ["Ping"]);
21     service.publish(request.replyToQueueName, request.createResponse(payload));
22   }
23 });
24 
25 struct Ping {
26   string name;
27 }
28 
29 auto payload = Ping("Anna");
30 auto message = Message.from(payload);
31 message.token = "this is a placeholder token";
32 
33 Message response;
34 client.request("testservice", message, (Message requestResponse) {
35   response = requestResponse;
36 });
37 
38 auto badMessage = Message();
39 badMessage.type = "ThisIsABadType";
40 badMessage.correlationId = randomUUID();
41 
42 Message errorResponse;
43 client.request("testservice", badMessage, (Message requestResponse) {
44   errorResponse = requestResponse;
45 });
46 
47 while (response.isEmpty || errorResponse.isEmpty) {
48   service.update();
49   client.update();
50 }
51 
52 service.close();
53 client.close();
54 
55 assertEqual(response.type, "Pong");
56 assertEqual(response.correlationId, message.correlationId);
57 assertEqual(response.payload["story"].get!string, "Playing ping pong with Anna");
58 assertEqual(response.responseStatus, ResponseStatus.OK);
59 assertEqual(response.token, "this is a placeholder token");
60 
61 assertEqual(errorResponse.type, "ResponseBadType");
62 assertEqual(errorResponse.correlationId, badMessage.correlationId);
63 assertEqual(errorResponse.payload["supportedTypes"][0].get!string, "Ping");
64 assertEqual(errorResponse.responseStatus, ResponseStatus.BAD_TYPE);

Client#request - request timeout

import std.conv : to;
import core.time : msecs;

import dutils.testing : assertEqual;

auto client = new Client();

struct Ping {
  string name;
}

auto payload = Ping("Anna");
auto message = Message.from(payload);
message.token = "this is a placeholder token";
message.setToExpireAfter(1000.msecs);

Message response;
client.request("testservicetimeout", message, (Message requestResponse) {
  response = requestResponse;
});

while (response.isEmpty) {
  client.update();
}

client.close();

assertEqual(response.type, "ResponseTimeout");
assertEqual(response.correlationId, message.correlationId);
assertEqual(response.responseStatus, ResponseStatus.TIMEOUT);
assertEqual(response.token, "this is a placeholder token");

Client#keepUpdating - should loop until

import dutils.data.bson : BSON;

import dutils.testing : assertEqual;

auto client = new Client();

struct Hello {
  string from;
}

auto gotMessage = false;

Subscription subscription;

subscription = client.subscribe("testservice", (Message message) {
  if (message.type == "Hello" && message.payload["from"].get!string == "Pelle") {
    gotMessage = true;
    client.close();
  }
});

auto message = Message.from(Hello("Pelle"));
client.publish("testservice", message);

client.keepUpdating();

assertEqual(gotMessage, true);

Meta