Class QueueingConsumer

java.lang.Object
com.rabbitmq.client.DefaultConsumer
com.espertech.esperio.amqp.QueueingConsumer
All Implemented Interfaces:
com.rabbitmq.client.Consumer

public class QueueingConsumer extends com.rabbitmq.client.DefaultConsumer
Convenience class: an implementation of Consumer with straightforward blocking semantics.

The general pattern for using QueueingConsumer is as follows:

 // Create connection and channel.
 ConnectionFactory factory = new ConnectionFactory();
 Connection conn = factory.newConnection();
 Channel ch1 = conn.createChannel();

 // Declare a queue and bind it to an exchange.
 String queueName = ch1.queueDeclare().getQueue();
 ch1.queueBind(queueName, exchangeName, queueName);

 // Create the QueueingConsumer and have it consume from the queue
 QueueingConsumer consumer = new QueueingConsumer(ch1);
 ch1.basicConsume(queueName, false, consumer);

 // Process deliveries
 while (/* some condition * /) {
     QueueingConsumer.Delivery delivery = consumer.nextDelivery();
     // process delivery
     ch1.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
 }
 

For a more complete example, see LogTail in the test/src/com/rabbitmq/examples directory of the source distribution.

deprecated QueueingConsumer was introduced to allow applications to overcome a limitation in the way Connection managed threads and consumer dispatching. When QueueingConsumer was introduced, callbacks to Consumers were made on the Connection's thread. This had two main drawbacks. Firstly, the Consumer could stall the processing of all Channels on the Connection. Secondly, if a Consumer made a recursive synchronous call into its Channel the client would deadlock. QueueingConsumer provided client code with an easy way to obviate this problem by queueing incoming messages and processing them on a separate, application-managed thread. The threading behaviour of Connection and Channel has been changed so that each Channel uses a distinct thread for dispatching to Consumers. This prevents Consumers on one Channel holding up Consumers on another and it also prevents recursive calls from deadlocking the client. As such, it is now safe to implement Consumer directly or to extend DefaultConsumer.
  • Constructor Details

    • QueueingConsumer

      public QueueingConsumer(com.rabbitmq.client.Channel ch)
    • QueueingConsumer

      public QueueingConsumer(com.rabbitmq.client.Channel ch, BlockingQueue<QueueingConsumer.Delivery> q)
  • Method Details

    • handleShutdownSignal

      public void handleShutdownSignal(String consumerTag, com.rabbitmq.client.ShutdownSignalException sig)
      Specified by:
      handleShutdownSignal in interface com.rabbitmq.client.Consumer
      Overrides:
      handleShutdownSignal in class com.rabbitmq.client.DefaultConsumer
    • handleCancel

      public void handleCancel(String consumerTag) throws IOException
      Specified by:
      handleCancel in interface com.rabbitmq.client.Consumer
      Overrides:
      handleCancel in class com.rabbitmq.client.DefaultConsumer
      Throws:
      IOException
    • handleDelivery

      public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException
      Specified by:
      handleDelivery in interface com.rabbitmq.client.Consumer
      Overrides:
      handleDelivery in class com.rabbitmq.client.DefaultConsumer
      Throws:
      IOException
    • nextDelivery

      public QueueingConsumer.Delivery nextDelivery() throws InterruptedException, com.rabbitmq.client.ShutdownSignalException, com.rabbitmq.client.ConsumerCancelledException
      Main application-side API: wait for the next message delivery and return it.
      Returns:
      the next message
      Throws:
      InterruptedException - if an interrupt is received while waiting
      com.rabbitmq.client.ShutdownSignalException - if the connection is shut down while waiting
      com.rabbitmq.client.ConsumerCancelledException - if this consumer is cancelled while waiting
    • nextDelivery

      public QueueingConsumer.Delivery nextDelivery(long timeout) throws InterruptedException, com.rabbitmq.client.ShutdownSignalException, com.rabbitmq.client.ConsumerCancelledException
      Main application-side API: wait for the next message delivery and return it.
      Parameters:
      timeout - timeout in millisecond
      Returns:
      the next message or null if timed out
      Throws:
      InterruptedException - if an interrupt is received while waiting
      com.rabbitmq.client.ShutdownSignalException - if the connection is shut down while waiting
      com.rabbitmq.client.ConsumerCancelledException - if this consumer is cancelled while waiting