www.espertech.comDocumentation
This chapter discusses the input and output adapters for JMS based on the Spring JmsTemplate technology. For more information on Spring, and the latest version of Spring, please visit http://www.springframework.org.
Here are the steps to use the adapters:
Configure an Esper engine instance to use a SpringContextLoader
for loading input and output adapters, and point it to a Spring JmsTemplate configuration file.
Create a Spring JmsTemplate configuration file for your JMS provider and add all your input and output adapter entries in the same file.
For receiving events from a JMS destination into an engine (input adapter):
List the destination and un-marshalling class in the Spring configuration.
Create EPL statements using the event type name matching the event objects or the Map-event type names received.
For sending events to a JMS destination (output adapter):
Use the insert-into
syntax naming the stream to insert-into using the same name as listed in the Spring configuration file
Configure the Map event type of the stream in the engine configuration
In summary the Spring JMS input adapter performs the following functions:
Initialize from a given Spring configuration file in classpath or from a filename. The Spring configuration file sets all JMS parameters such as JMS connection factory, destination and listener pools.
Attach to a JMS destination and listen to messages using the Spring class org.springframework.jms.core.JmsTemplate
Unmarshal a JMS message and send into the configured engine instance
The Spring JMS output adapter can:
Initialize from a given Spring configuration file in classpath or from a filename, and attach to a JMS destination
Act as a listener to one or more named streams populated via insert-into
syntax by EPL statements
Marshal events generated by a stream into a JMS message, and send to the given destination
The Spring JMS input and output adapters are configured as part of the Esper engine configuration. EsperIO supplies a SpringContextLoader
class that loads a Spring
configuration file which in turn configures the JMS input and output adapters. List the SpringContextLoader
class as an adapter loader in the Esper configuration file as the below example shows. The configuration API can alternatively be used to configure one or more adapter loaders.
<esper-configuration> <!-- Sample configuration for an input/output adapter loader --> <plugin-loader name="MyLoader" class-name="com.espertech.esperio.jms.SpringContextLoader"> <!-- SpringApplicationContext translates into Spring ClassPathXmlApplicationContext or FileSystemXmlApplicationContext. Only one app-context of a sort can be used. When both attributes are used classpath and file, classpath prevails --> <init-arg name="classpath-app-context" value="spring\jms-spring.xml" /> <init-arg name="file-app-context" value="spring\jms-spring.xml" /> </plugin-loader> </esper-configuration>
The loader loads the Spring configuration file from classpath via the classpath-app-context
configuration, or from a file via file-app-context
.
The Spring configuration file must list input and output adapters to be initialized by SpringContextLoader
upon engine initialization.
Please refer to your JMS provider documentation, and the Spring framework documentation on help to configure your specific JMS provider via Spring.
The next XML snippet shows a complete sample configuration for an input adapter. The sample includes the JMS configuration for an Apache ActiveMQ JMS provider.
<!-- Spring Application Context --> <beans default-destroy-method="destroy"> <!-- JMS ActiveMQ Connection Factory --> <bean id="jmsActiveMQFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616"/> </bean> </property> </bean> <!-- ActiveMQ destination to use by default --> <bean id="defaultDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="ESPER.QUEUE"/> </bean> <!-- Spring JMS Template for ActiveMQ --> <bean id="jmsActiveMQTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory"> <ref bean="jmsActiveMQFactory"/> </property> <property name="defaultDestination"> <ref bean="defaultDestination"/> </property> </bean> <!-- Provides listener threads --> <bean id="listenerContainer" class="org.springframework.jms.listener.SimpleMessageListenerContainer"> <property name="connectionFactory" ref="jmsActiveMQFactory"/> <property name="destination" ref="defaultDestination"/> <property name="messageListener" ref="jmsInputAdapter"/> </bean> <!-- Default unmarshaller --> <bean id="jmsMessageUnmarshaller" class="com.espertech.esperio.jms.JMSDefaultAnyMessageUnmarshaller"/> <!-- Input adapter --> <bean id="jmsInputAdapter" class="com.espertech.esperio.jms.SpringJMSTemplateInputAdapter"> <property name="jmsTemplate"> <ref bean="jmsActiveMQTemplate"/> </property> <property name="jmsMessageUnmarshaller"> <ref bean="jmsMessageUnmarshaller"/> </property> </bean> </beans>
This input adapter attaches to the JMS destination ESPER.QUEUE
at an Apache MQ broker available at port tcp://localhost:61616
. It configures an un-marshalling class as discussed next.
EsperIO provides a class for unmarshaling JMS message instances into events for processing by an engine in the class JMSDefaultAnyMessageUnmarshaller
. The class unmarshals as follows:
If the received Message is of type javax.xml.MapMessage
, extract the event type name out of the message and send to the engine via sendEvent(name, Map)
If the received Message is of type javax.xml.ObjectMessage
, extract the Serializable
out of the message and send to the engine via sendEvent(Object)
Else the un-marshaller outputs a warning and ignores the message
The unmarshaller must be made aware of the event type of events within MapMessage
messages. This is achieved by the client application setting a well-defined property on the message: InputAdapter.ESPERIO_MAP_EVENT_TYPE
. An example code snippet is:
MapMessage mapMessage = jmsSession.createMapMessage(); mapMessage.setObject(InputAdapter.ESPERIO_MAP_EVENT_TYPE, "MyInputEvent");
The Spring configuration file lists all input and output adapters in one file. The SpringContextLoader
upon engine initialization starts all input and output adapters.
The next XML snippet shows a complete sample configuration of an output adapter. Please check with your JMS provider for the appropriate Spring class names and settings. Note that the input and output adapter Spring configurations can be in the same file.
<!-- Application Context --> <beans default-destroy-method="destroy"> <!-- JMS ActiveMQ Connection Factory --> <bean id="jmsActiveMQFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616"/> </bean> </property> </bean> <!-- ActiveMQ destination to use by default --> <bean id="defaultDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="ESPER.QUEUE"/> </bean> <!-- Spring JMS Template for ActiveMQ --> <bean id="jmsActiveMQTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory"> <ref bean="jmsActiveMQFactory"/> </property> <property name="defaultDestination"> <ref bean="defaultDestination"/> </property> <property name="receiveTimeout"> <value>30000</value> </property> </bean> <!-- Marshaller marshals events into map messages --> <bean id="jmsMessageMarshaller" class="com.espertech.esperio.jms.JMSDefaultMapMessageMarshaller"/> <bean id="myCustomMarshaller" class="com.espertech.esperio.jms.JMSDefaultMapMessageMarshaller"/> <!-- Output adapter puts it all together --> <bean id="jmsOutputAdapter" class="com.espertech.esperio.jms.SpringJMSTemplateOutputAdapter"> <property name="jmsTemplate"> <ref bean="jmsActiveMQTemplate"/> </property> <property name="subscriptionMap"> <map> <entry> <key><idref local="subscriptionOne"/></key> <ref bean="subscriptionOne"/> </entry> <entry> <key><idref local="subscriptionTwo"/></key> <ref bean="subscriptionTwo"/> </entry> </map> </property> <property name="jmsMessageMarshaller"> <ref bean="jmsMessageMarshaller"/> </property> </bean> <bean id="subscriptionOne" class="com.espertech.esperio.jms.JMSSubscription"> <property name="eventTypeName" value="MyOutputStream"/> </bean> <bean id="subscriptionTwo" class="com.espertech.esperio.jms.JMSSubscription"> <property name="eventTypeName" value="MyOtherOutputStream"/> <property name="jmsMessageMarshaller"> <ref bean="myCustomMarshaller"/> </property> </bean> </beans>
EsperIO provides a marshal implementation in the class JMSDefaultMapMessageMarshaller
. This marshaller constructs a JMS MapMessage
from any event received by copying
event properties into the name-value pairs of the message. The configuration file makes it easy to configure a custom marshaller that adheres to the com.espertech.esperio.jms.JMSMessageMarshaller
interface.
Note that this marshaller uses javax.jms.MapMessage
name-value pairs and not general javax.jms.Message
properties.
This means when you'll read the event properties back from the JMS MapMessage, you will have to use the javax.jms.MapMessage.getObject(...)
method.
The SpringJMSTemplateOutputAdapter
is configured with a list of subscription instances of type JMSSubscription
as the sample configuration shows. Each subscription defines
an event type name that must be configured and used in the insert-into
syntax of a statement.
To connect the Spring JMS output adapter and the EPL statements producing events, use the insert-into
syntax to direct events for output. Here is a sample statement that sends events into MyOutputStream
:
insert into MyOutputStream select assetId, zone from RFIDEvent
The type MyOutputStream
must be known to an engine instance. The output adapter requires the name to be configured with the Engine instance, e.g.:
<esper-configuration> <event-type name="MyOutputStream"> <java-util-map> <map-property name="assetId" class="String"/> <map-property name="zone" class="int"/> </java-util-map> </event-type> </esper-configuration>