www.espertech.comDocumentation

Chapter 3. The Spring JMS Input and Output Adapter

3.1. Introduction
3.2. Engine Configuration
3.3. Input Adapter
3.3.1. Spring Configuration
3.3.2. JMS Message Unmarshalling
3.4. Output Adapter
3.4.1. Spring Configuration
3.4.2. JMS Message Marshalling

This chapter discusses the input and output adapters for JMS based on the Spring JmsTemplate technology. For more information on Spring, and the latest version of Spring, please visit http://www.springframework.org.

Here are the steps to use the adapters:

In summary the Spring JMS input adapter performs the following functions:

The Spring JMS output adapter can:

The Spring JMS input and output adapters are configured as part of the Esper engine configuration. EsperIO supplies a SpringContextLoader class that loads a Spring configuration file which in turn configures the JMS input and output adapters. List the SpringContextLoader class as an adapter loader in the Esper configuration file as the below example shows. The configuration API can alternatively be used to configure one or more adapter loaders.

<esper-configuration>

  <!-- Sample configuration for an input/output adapter loader -->
  <plugin-loader name="MyLoader" class-name="com.espertech.esperio.SpringContextLoader">
    <!-- SpringApplicationContext translates into Spring ClassPathXmlApplicationContext 
           or FileSystemXmlApplicationContext. Only one app-context of a sort can be used. 
           When both attributes are used classpath and file, classpath prevails -->
    <init-arg name="classpath-app-context" value="spring\jms-spring.xml" />
    <init-arg name="file-app-context" value="spring\jms-spring.xml" />
  </plugin-loader>

</esper-configuration>

The loader loads the Spring configuration file from classpath via the classpath-app-context configuration, or from a file via file-app-context.

The Spring configuration file must list input and output adapters to be initialized by SpringContextLoader upon engine initialization. Please refer to your JMS provider documentation, and the Spring framework documentation on help to configure your specific JMS provider via Spring.

The next XML snippet shows a complete sample configuration for an input adapter. The sample includes the JMS configuration for an Apache ActiveMQ JMS provider.

<!-- Spring Application Context -->
<beans default-destroy-method="destroy">

  <!-- JMS ActiveMQ Connection Factory -->
  <bean id="jmsActiveMQFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    <property name="connectionFactory">
      <bean class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
      </bean>
    </property>
  </bean>

  <!--  ActiveMQ destination to use  by default -->
  <bean id="defaultDestination"
        class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="ESPER.QUEUE"/>
  </bean>

  <!--  Spring JMS Template for ActiveMQ -->
  <bean id="jmsActiveMQTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory">
      <ref bean="jmsActiveMQFactory"/>
    </property>
    <property name="defaultDestination">
      <ref bean="defaultDestination"/>
    </property>
  </bean>

  <!-- Provides listener threads -->
  <bean id="listenerContainer" 
              class="org.springframework.jms.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="jmsActiveMQFactory"/>
    <property name="destination" ref="defaultDestination"/>
    <property name="messageListener" ref="jmsInputAdapter"/>
  </bean>

  <!-- Default unmarshaller -->
  <bean id="jmsMessageUnmarshaller" 
              class="com.espertech.esperio.jms.JMSDefaultAnyMessageUnmarshaller"/>

  <!-- Input adapter -->
  <bean id="jmsInputAdapter" class="com.espertech.esperio.jms.SpringJMSTemplateInputAdapter">
    <property name="jmsTemplate">
      <ref bean="jmsActiveMQTemplate"/>
    </property>
    <property name="jmsMessageUnmarshaller">
      <ref bean="jmsMessageUnmarshaller"/>
    </property>
  </bean>

</beans>

This input adapter attaches to the JMS destination ESPER.QUEUE at an Apache MQ broker available at port tcp://localhost:61616. It configures an un-marshalling class as discussed next.

The Spring configuration file lists all input and output adapters in one file. The SpringContextLoader upon engine initialization starts all input and output adapters.

The next XML snippet shows a complete sample configuration of an output adapter. Please check with your JMS provider for the appropriate Spring class names and settings. Note that the input and output adapter Spring configurations can be in the same file.

<!-- Application Context -->
<beans default-destroy-method="destroy">

  <!-- JMS ActiveMQ Connection Factory -->
  <bean id="jmsActiveMQFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    <property name="connectionFactory">
      <bean class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
      </bean>
    </property>
  </bean>

  <!--  ActiveMQ destination to use  by default -->
  <bean id="defaultDestination"
        class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="ESPER.QUEUE"/>
  </bean>

  <!--  Spring JMS Template for ActiveMQ -->
  <bean id="jmsActiveMQTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory">
      <ref bean="jmsActiveMQFactory"/>
    </property>
    <property name="defaultDestination">
      <ref bean="defaultDestination"/>
    </property>
    <property name="receiveTimeout">
      <value>30000</value>
    </property>
  </bean>

  <!--  Marshaller marshals events into map messages -->
  <bean id="jmsMessageMarshaller" class="com.espertech.esperio.jms.JMSDefaultMapMessageMarshaller"/>
  <bean id="myCustomMarshaller" class="com.espertech.esperio.jms.JMSDefaultMapMessageMarshaller"/>

  <!--  Output adapter puts it all together -->
  <bean id="jmsOutputAdapter" class="com.espertech.esperio.jms.SpringJMSTemplateOutputAdapter">
    <property name="jmsTemplate">
      <ref bean="jmsActiveMQTemplate"/>
    </property>
    <property name="subscriptionMap">
      <map>
        <entry>
          <key><idref local="subscriptionOne"/></key>
          <ref bean="subscriptionOne"/>
        </entry>
        <entry>
          <key><idref local="subscriptionTwo"/></key>
          <ref bean="subscriptionTwo"/>
        </entry>
      </map>
    </property>
    <property name="jmsMessageMarshaller">
      <ref bean="jmsMessageMarshaller"/>
    </property>
  </bean>

  <bean id="subscriptionOne" class="com.espertech.esperio.jms.JMSSubscription">
    <property name="eventTypeName" value="MyOutputStream"/>
  </bean>

  <bean id="subscriptionTwo" class="com.espertech.esperio.jms.JMSSubscription">
    <property name="eventTypeName" value="MyOtherOutputStream"/>
    <property name="jmsMessageMarshaller">
      <ref bean="myCustomMarshaller"/>
    </property>
  </bean>

</beans>

EsperIO provides a marshal implementation in the class JMSDefaultMapMessageMarshaller. This marshaller constructs a JMS MapMessage from any event received by copying event properties into the name-value pairs of the message. The configuration file makes it easy to configure a custom marshaller that adheres to the com.espertech.esperio.jms.JMSMessageMarshaller interface.

Note that this marshaller uses javax.jms.MapMessage name-value pairs and not general javax.jms.Message properties. This means when you'll read the event properties back from the JMS MapMessage, you will have to use the javax.jms.MapMessage.getObject(...) method.

The SpringJMSTemplateOutputAdapter is configured with a list of subscription instances of type JMSSubscription as the sample configuration shows. Each subscription defines an event type name that must be configured and used in the insert-into syntax of a statement.

To connect the Spring JMS output adapter and the EPL statements producing events, use the insert-into syntax to direct events for output. Here is a sample statement that sends events into MyOutputStream:

insert into MyOutputStream select assetId, zone from RFIDEvent

The type MyOutputStream must be known to an engine instance. The output adapter requires the name to be configured with the Engine instance, e.g.:

<esper-configuration>
  <event-type name="MyOutputStream">
    <java-util-map>
      <map-property name="assetId" class="String"/>
      <map-property name="zone" class="int"/>
    </java-util-map>
  </event-type>
</esper-configuration>