www.espertech.comDocumentation

Chapter 4. The AMQP Input and Output Adapter

4.1. Introduction
4.2. AMQPSink Operator
4.3. AMQPSource Operator

This chapter discusses the input and output adapters for AMQP. AMQP input and output utilizes data flow operators.

In order to use the AMQP data flow operators, add esperio-amqp-version.jar to your classpath and import the operator package or class using the static or runtime configuration.

The following code snippet uses the runtime configuration API to import the AMQP adapter classes:

epService.getEPAdministrator().getConfiguration()
  .addImport(AMQPSource.class.getPackage().getName() + ".*");

The AMQP input and output adapter provides the following data flow operators:


The AMQPSink operator receives input stream events, transforms events to AMQP messages and sends messages into an AMQP queue.

The AMQPSink operator must have a single input stream.

The AMQPSink operator cannot declare any output streams.

Parameters for the AMQPSink operator are:


Either the queueName or the combination of exchange and routingKey are required parameters.

The collector is required and must be specified to transform events to AMQP messages. The collector instance must implement the interface ObjectToAMQPCollector. The adapter provides a default implementation ObjectToAMQPCollectorSerializable that employs default serialization.

The following example declares a data flow that is triggered by MyMapEventType events from the event bus (type not declared here) that sends serialized messages to an AMQP queue:

create dataflow AMQPOutgoingDataFlow
   EventBusSource -> outstream<MyMapEventType> {}
   AMQPSink(outstream) {
     host: 'localhost',
     queueName: 'myqueue',
     collector: {class: 'ObjectToAMQPCollectorSerializable'}
   }

The AMQPSource operator receives AMQP messages from a queue, transforms messages and populates a data flow instance with events.

The AMQPSource operator cannot declare any input streams.

The AMQPSource operator must have a single output stream.

Parameters for the AMQPSource operator are listed below, with the required parameters listed first:


The collector is required and must be specified to transform AMQP messages to events. The collector instance must implement the interface AMQPToObjectCollector. The adapter provides a default implementation AMQPToObjectCollectorSerializable that employs default serialization.

The following example declares a data flow that is receives AMQP messages from a queue, transforms each message and sends each message of type MyMapEventType into the event bus:

create dataflow AMQPIncomingDataFlow
  AMQPSource -> outstream<MyMapEventType> {
    host: 'localhost',
    queueName: 'myqueue',
    collector: {class: 'AMQPToObjectCollectorSerializable'},
    logMessages: true
  }
  EventBusSink(outstream){}