www.espertech.comDocumentation
This chapter discusses the input and output adapters for AMQP. AMQP input and output utilizes data flow operators.
The following code snippet uses the runtime configuration API to import the AMQP adapter classes:
epService.getEPAdministrator().getConfiguration() .addImport(AMQPSource.class.getPackage().getName() + ".*");
The AMQP input and output adapter provides the following data flow operators:
Table 4.1. AMQP Operators
Operator | Description |
---|---|
AMQPSink |
Send messages to an AMQP broker. See Section 4.2, “AMQPSink Operator”. |
AMQPSource |
Receive messages from an AMQP broker. See Section 4.3, “AMQPSource Operator”. |
The AMQPSink operator must have a single input stream.
The AMQPSink operator cannot declare any output streams.
Parameters for the AMQPSink operator are:
Either the queueName
or the combination of exchange
and routingKey
are required parameters.
The collector is required and must be specified to transform events to AMQP messages. The collector instance must implement the interface ObjectToAMQPCollector
. The adapter provides a default implementation ObjectToAMQPCollectorSerializable
that
employs default serialization.
The following example declares a data flow that is triggered by MyMapEventType
events from the event bus (type not declared here) that sends serialized messages to an AMQP queue:
create dataflow AMQPOutgoingDataFlow EventBusSource -> outstream<MyMapEventType> {} AMQPSink(outstream) { host: 'localhost', queueName: 'myqueue', collector: {class: 'ObjectToAMQPCollectorSerializable'} }
The AMQPSource operator cannot declare any input streams.
The AMQPSource operator must have a single output stream.
Parameters for the AMQPSource operator are listed below, with the required parameters listed first:
The collector is required and must be specified to transform AMQP messages to events. The collector instance must implement the interface AMQPToObjectCollector
. The adapter provides a default implementation AMQPToObjectCollectorSerializable
that
employs default serialization.
The following example declares a data flow that is receives AMQP messages from a queue, transforms each message and sends each message of type MyMapEventType
into the event bus:
create dataflow AMQPIncomingDataFlow AMQPSource -> outstream<MyMapEventType> { host: 'localhost', queueName: 'myqueue', collector: {class: 'AMQPToObjectCollectorSerializable'}, logMessages: true } EventBusSink(outstream){}