| Apache Qpid - AMQP Messaging for Java JMS, C++, Python, Ruby, and .NET | Apache Qpid Documentation |
Includes and Namespaces
#include <qpid/client/Connection.h> #include <qpid/client/Session.h> #include <qpid/client/Message.h> #include <qpid/client/SubscriptionManager.h>
using namespace qpid::client; using namespace qpid::framing;
Opening and closing connections and sessions
Connection connection;
try {
connection.open(host, port);
Session session = connection.newSession();
...
connection.close();
return 0;
} catch(const std::exception& error) {
std::cout << error.what() << std::endl;
}
return 1;
Declaring and binding queues:
session.queueDeclare(arg::queue="message_queue"); session.exchangeBind(arg::exchange="amq.direct", arg::queue="message_queue", arg::bindingKey="routing_key");
Sending a message:
message.getDeliveryProperties().setRoutingKey("routing_key");
message.setData("Hi, Mom!");
session.messageTransfer(arg::content=message, arg::destination="amq.direct");
Sending a message (asynchronous):
#include <qpid/client/AsyncSession.h> async(session).messageTransfer(arg::content=message, arg::destination="amq.direct"); ... session.sync();
Replying to a message:
Message request, response;
...
if (request.getMessageProperties().hasReplyTo()) {
string routingKey = request.getMessageProperties().getReplyTo().getRoutingKey();
string exchange = request.getMessageProperties().getReplyTo().getExchange();
response.getDeliveryProperties().setRoutingKey(routingKey);
messageTransfer(arg::content=response, arg::destination=exchange);
}
A message listener:
class Listener : public MessageListener{
private:
SubscriptionManager& subscriptions;
public:
Listener(SubscriptionManager& subscriptions);
virtual void received(Message& message);
}; void Listener::received(Message& message) {
std::cout << "Message: " << message.getData() << std::endl;
if (endCondition(message)) {
subscriptions.cancel(message.getDestination());
}
}
Using a message listener with a subscription manager:
SubscriptionManager subscriptions(session);
Listener listener(subscriptions); subscriptions.subscribe(listener, "message_queue"); subscriptions.run();
Using a LocalQueue with a subscription manager
SubscriptionManager subscriptions(session);
LocalQueue local_queue;
subscriptions.subscribe(local_queue, string("message_queue")); Message message;
for (int i=0; i<10; i++) {
local_queue.get(message, 10000);
std::cout << message.getData() << std::endl;
}
1.6.1