www.espertech.comDocumentation
This chapter discusses the input and output adapters for AMQP. AMQP input and output utilizes data flow operators.
In order to use the AMQP data flow operators, add esperio-amqp-
version.jar
to your classpath and import the operator package or class using the static or runtime configuration.
The following code snippet uses the runtime configuration API to import the AMQP adapter classes:
epService.getEPAdministrator().getConfiguration() .addImport(AMQPSource.class.getPackage().getName() + ".*");
The AMQP input and output adapter provides the following data flow operators:
Table 4.1. AMQP Operators
Operator | Description |
---|---|
AMQPSink |
Send messages to an AMQP broker. See Section 4.2, “AMQPSink Operator”. |
AMQPSource |
Receive messages from an AMQP broker. See Section 4.3, “AMQPSource Operator”. |
The AMQPSink operator receives input stream events, transforms events to AMQP messages and sends messages into an AMQP queue.
The AMQPSink operator must have a single input stream.
The AMQPSink operator cannot declare any output streams.
Parameters for the AMQPSink operator are:
Table 4.2. AMQPSink Parameters
Name | Description |
---|---|
host (required) | Host name string |
queueName | Queue name string |
collector (required) | Transformation class or instance for events to AMQP message |
port | Port number integer |
username | User name string |
password | Password string (also see systemProperties or data flow instantiation options) |
vhost | Vhost string |
exchange | Exchange name string |
routingKey | Routing key string |
logMessage | Boolean indicator whether to log a text for each message |
waitMSecNextMsg | Number of milliseconds wait between messages, a long-typed value |
declareDurable | Boolean indicator whether durable, false by default |
declareExclusive | Boolean indicator whether exclusive, false by default |
declareAutoDelete | Boolean indicator whether auto-delete, true by default |
declareAdditionalArgs | Map of additional arguments passed to AMQP of type Map<String, Object> |
Either the queueName
or the combination of exchange
and routingKey
are required parameters.
The collector is required and must be specified to transform events to AMQP messages. The collector instance must implement the interface ObjectToAMQPCollector
. The adapter provides a default implementation ObjectToAMQPCollectorSerializable
that
employs default serialization.
The following example declares a data flow that is triggered by MyMapEventType
events from the event bus (type not declared here) that sends serialized messages to an AMQP queue:
create dataflow AMQPOutgoingDataFlow EventBusSource -> outstream<MyMapEventType> {} AMQPSink(outstream) { host: 'localhost', queueName: 'myqueue', collector: {class: 'ObjectToAMQPCollectorSerializable'} }
The AMQPSource operator receives AMQP messages from a queue, transforms messages and populates a data flow instance with events.
The AMQPSource operator cannot declare any input streams.
The AMQPSource operator must have a single output stream.
Parameters for the AMQPSource operator are listed below, with the required parameters listed first:
Table 4.3. AMQPSource Parameters
Name | Description |
---|---|
host (required) | Host name string |
queueName (required) | Queue name string |
collector (required) | Transformation class or instance for AMQP message to underlying event transformation |
port | Port number integer |
username | User name string |
password | Password string (also see systemProperties or data flow instantiation options) |
vhost | Vhost string |
exchange | Exchange name string |
routingKey | Routing key string |
logMessage | Boolean indiator whether to log a text for each message |
waitMSecNextMsg | Number of milliseconds wait between messages, a long-typed value |
declareDurable | Boolean indicator whether durable, false by default |
declareExclusive | Boolean indicator whether exclusive, false by default |
declareAutoDelete | Boolean indicator whether auto-delete, true by default |
declareAdditionalArgs | Map of additional arguments passed to AMQP of type Map<String, Object> |
prefetchCount | Prefetch count integer, defaults to 100 |
consumeAutoAck | Boolean indicator whether to auto-ack, true by default |
The collector is required and must be specified to transform AMQP messages to events. The collector instance must implement the interface AMQPToObjectCollector
. The adapter provides a default implementation AMQPToObjectCollectorSerializable
that
employs default serialization.
The following example declares a data flow that is receives AMQP messages from a queue, transforms each message and sends each message of type MyMapEventType
into the event bus:
create dataflow AMQPIncomingDataFlow AMQPSource -> outstream<MyMapEventType> { host: 'localhost', queueName: 'myqueue', collector: {class: 'AMQPToObjectCollectorSerializable'}, logMessages: true } EventBusSink(outstream){}