www.espertech.comDocumentation
This chapter discusses the input and output adapters for AMQP. AMQP input and output utilizes data flow operators.
This adapter only requires Esper runtime as a dependency and does not require Esper compiler.
In order to use the AMQP data flow operators, add esperio-amqp-
version.jar
to your classpath and import the operator package or class using the static or runtime configuration.
The following code snippet uses the runtime configuration API to import the AMQP adapter classes:
Configuration configuration = new Configuration(); configuration.getCommon().addImport(AMQPSource.class.getPackage().getName() + ".*");
The AMQP input and output adapter provides the following data flow operators:
Table 4.1. AMQP Operators
Operator | Description |
---|---|
AMQPSink |
Send messages to an AMQP broker. See Section 4.2, “AMQPSink Operator”. |
AMQPSource |
Receive messages from an AMQP broker. See Section 4.3, “AMQPSource Operator”. |
The AMQPSink operator receives input stream events, transforms events to AMQP messages and sends messages into an AMQP queue.
The AMQPSink operator must have a single input stream.
The AMQPSink operator cannot declare any output streams.
Parameters for the AMQPSink operator are:
Table 4.2. AMQPSink Parameters
Name | Description |
---|---|
host (required) | An expression returning the host name string |
queueName | An expression returning the queue name string |
collector (required) | Transformation class or instance for events to AMQP message |
port | An expression returning the port number integer |
username | An expression returning the user name string |
password | An expression returning the password string (also see systemProperties or data flow instantiation options) |
vhost | An expression returning the vhost string |
exchange | An expression returning the exchange name string |
routingKey | An expression returning the routing key string |
logMessages | An expression returning a boolean indicator whether to log a text for each message (requires debug-level logging) |
waitMSecNextMsg | An expression returning the number of milliseconds wait between messages, a long-typed value |
declareDurable | An expression returning a boolean indicator whether durable, false by default |
declareExclusive | An expression returning a boolean indicator whether exclusive, false by default |
declareAutoDelete | An expression returning a boolean indicator whether auto-delete, true by default |
declareAdditionalArgs | Map of additional arguments passed to AMQP of type Map<String, Object> |
Either the queueName
or the combination of exchange
and routingKey
are required parameters.
The collector is required and must be specified to transform events to AMQP messages. The collector instance must implement the interface ObjectToAMQPCollector
. The adapter provides a default implementation ObjectToAMQPCollectorSerializable
that employs default serialization.
For JSON formatted messages, the adapter provides the ObjectToAMQPCollectorJson
JSON collector. The collector emits string-type messages.
You must specify a JSON event type name e.g. create json schema MyJsonEventType(...)
and outstream<MyJsonEventType>
.
The following example declares a data flow that is triggered by MyMapEventType
events from the event bus (type not declared here) that sends serialized messages to an AMQP queue:
create dataflow AMQPOutgoingDataFlow EventBusSource -> outstream<MyMapEventType> {} AMQPSink(outstream) { host: 'localhost', queueName: 'myqueue', collector: {class: 'ObjectToAMQPCollectorSerializable'} }
The AMQPSource operator receives AMQP messages from a queue, transforms messages and populates a data flow instance with events.
The AMQPSource operator cannot declare any input streams.
The AMQPSource operator must have a single output stream.
Parameters for the AMQPSource operator are listed below, with the required parameters listed first:
Table 4.3. AMQPSource Parameters
Name | Description |
---|---|
host (required) | An expression returning the host name string |
queueName (required) | An expression returning the queue name string |
collector (required) | Transformation class or instance for AMQP message to underlying event transformation |
port | An expression returning the port number integer |
username | An expression returning the user name string |
password | An expression returning the password string (also see systemProperties or data flow instantiation options) |
vhost | An expression returning the vhost string |
exchange | An expression returning the exchange name string |
routingKey | An expression returning the routing key string |
logMessages | An expression returning a boolean indiator whether to log a text for each message (requires debug-level logging) |
waitMSecNextMsg | An expression returning the number of milliseconds wait between messages, a long-typed value |
declareDurable | An expression returning a boolean indicator whether durable, false by default |
declareExclusive | An expression returning a boolean indicator whether exclusive, false by default |
declareAutoDelete | An expression returning a boolean indicator whether auto-delete, true by default |
declareAdditionalArgs | Map of additional arguments passed to AMQP of type Map<String, Object> |
prefetchCount | An expression returning the prefetch count integer, defaults to 100 |
consumeAutoAck | An expression returning a boolean indicator whether to auto-ack, true by default |
The collector is required and must be specified to transform AMQP messages to events. The collector instance must implement the interface AMQPToObjectCollector
.
The adapter provides a default implementation AMQPToObjectCollectorSerializable
that employs default serialization.
For JSON formatted messages, the adapter provides the AMQPToObjectCollectorJson
JSON collector. The collector deserializes the AMQP message bytes into a string using the JVM default encoding
and parses the JSON message for processing as an event. You must specify a JSON event type name e.g. create json schema MyJsonEventType(...)
and outstream<MyJsonEventType>
.
The following example declares a data flow that is receives AMQP messages from a queue, transforms each message and sends each message of type MyMapEventType
into the event bus:
create dataflow AMQPIncomingDataFlow AMQPSource -> outstream<MyMapEventType> { host: 'localhost', queueName: 'myqueue', collector: {class: 'AMQPToObjectCollectorSerializable'}, logMessages: true } EventBusSink(outstream){}