Package com.espertech.esperio.amqp
Class QueueingConsumer
java.lang.Object
com.rabbitmq.client.DefaultConsumer
com.espertech.esperio.amqp.QueueingConsumer
- All Implemented Interfaces:
com.rabbitmq.client.Consumer
public class QueueingConsumer
extends com.rabbitmq.client.DefaultConsumer
Convenience class: an implementation of
Consumer
with
straightforward blocking semantics.
The general pattern for using QueueingConsumer is as follows:
// Create connection and channel.ConnectionFactory
factory = new ConnectionFactory(); Connection conn = factory.newConnection();Channel
ch1 = conn.createChannel(); // Declare a queue and bind it to an exchange. String queueName = ch1.queueDeclare().getQueue
(); ch1.queueBind
(queueName, exchangeName, queueName); // Create the QueueingConsumer and have it consume from the queue QueueingConsumer consumer = newQueueingConsumer
(ch1); ch1.basicConsume
(queueName, false, consumer); // Process deliveries while (/* some condition * /) {QueueingConsumer.Delivery
delivery = consumer.nextDelivery
(); // process delivery ch1.basicAck
(delivery.getEnvelope
().getDeliveryTag
(), false); }
For a more complete example, see LogTail in the test/src/com/rabbitmq/examples
directory of the source distribution.
QueueingConsumer
was introduced to allow
applications to overcome a limitation in the way Connection
managed threads and consumer dispatching. When QueueingConsumer
was introduced, callbacks to Consumers
were made on the
Connection's
thread. This had two main drawbacks. Firstly, the
Consumer
could stall the processing of all
Channels
on the Connection
. Secondly, if a
Consumer
made a recursive synchronous call into its
Channel
the client would deadlock.
QueueingConsumer
provided client code with an easy way to
obviate this problem by queueing incoming messages and processing them on
a separate, application-managed thread.
The threading behaviour of Connection
and Channel
has been changed so that each Channel
uses a distinct thread
for dispatching to Consumers
. This prevents
Consumers
on one Channel
holding up
Consumers
on another and it also prevents recursive calls from
deadlocking the client.
As such, it is now safe to implement Consumer
directly or
to extend DefaultConsumer
.-
Nested Class Summary
Modifier and TypeClassDescriptionstatic class
Encapsulates an arbitrary message - simple "bean" holder structure. -
Constructor Summary
ConstructorDescriptionQueueingConsumer
(com.rabbitmq.client.Channel ch) QueueingConsumer
(com.rabbitmq.client.Channel ch, BlockingQueue<QueueingConsumer.Delivery> q) -
Method Summary
Modifier and TypeMethodDescriptionvoid
handleCancel
(String consumerTag) void
handleDelivery
(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) void
handleShutdownSignal
(String consumerTag, com.rabbitmq.client.ShutdownSignalException sig) Main application-side API: wait for the next message delivery and return it.nextDelivery
(long timeout) Main application-side API: wait for the next message delivery and return it.Methods inherited from class com.rabbitmq.client.DefaultConsumer
getChannel, getConsumerTag, handleCancelOk, handleConsumeOk, handleRecoverOk
-
Constructor Details
-
QueueingConsumer
public QueueingConsumer(com.rabbitmq.client.Channel ch) -
QueueingConsumer
-
-
Method Details
-
handleShutdownSignal
public void handleShutdownSignal(String consumerTag, com.rabbitmq.client.ShutdownSignalException sig) - Specified by:
handleShutdownSignal
in interfacecom.rabbitmq.client.Consumer
- Overrides:
handleShutdownSignal
in classcom.rabbitmq.client.DefaultConsumer
-
handleCancel
- Specified by:
handleCancel
in interfacecom.rabbitmq.client.Consumer
- Overrides:
handleCancel
in classcom.rabbitmq.client.DefaultConsumer
- Throws:
IOException
-
handleDelivery
public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException - Specified by:
handleDelivery
in interfacecom.rabbitmq.client.Consumer
- Overrides:
handleDelivery
in classcom.rabbitmq.client.DefaultConsumer
- Throws:
IOException
-
nextDelivery
public QueueingConsumer.Delivery nextDelivery() throws InterruptedException, com.rabbitmq.client.ShutdownSignalException, com.rabbitmq.client.ConsumerCancelledExceptionMain application-side API: wait for the next message delivery and return it.- Returns:
- the next message
- Throws:
InterruptedException
- if an interrupt is received while waitingcom.rabbitmq.client.ShutdownSignalException
- if the connection is shut down while waitingcom.rabbitmq.client.ConsumerCancelledException
- if this consumer is cancelled while waiting
-
nextDelivery
public QueueingConsumer.Delivery nextDelivery(long timeout) throws InterruptedException, com.rabbitmq.client.ShutdownSignalException, com.rabbitmq.client.ConsumerCancelledException Main application-side API: wait for the next message delivery and return it.- Parameters:
timeout
- timeout in millisecond- Returns:
- the next message or null if timed out
- Throws:
InterruptedException
- if an interrupt is received while waitingcom.rabbitmq.client.ShutdownSignalException
- if the connection is shut down while waitingcom.rabbitmq.client.ConsumerCancelledException
- if this consumer is cancelled while waiting
-