www.espertech.comDocumentation

EsperIO Reference

Version 6.0.1


Preface
1. Adapter Overview
1.1. The Adapter Interface
2. The File and CSV Input and Output Adapter
2.1. Data Flow Operators
2.1.1. Introduction
2.1.2. FileSink Operator
2.1.3. FileSource Operator
2.2. CSV Input Adapter API
2.2.1. Introduction
2.2.2. Playback of CSV-formatted Events
2.2.3. CSV Playback Options
2.2.4. Simulating Multiple Event Streams
2.2.5. Pausing and Resuming Operation
3. The Spring JMS Input and Output Adapter
3.1. Introduction
3.2. Engine Configuration
3.3. Input Adapter
3.3.1. Spring Configuration
3.3.2. JMS Message Unmarshalling
3.4. Output Adapter
3.4.1. Spring Configuration
3.4.2. JMS Message Marshalling
4. The AMQP Input and Output Adapter
4.1. Introduction
4.2. AMQPSink Operator
4.3. AMQPSource Operator
5. The Kafka Adapter
5.1. Classpath Setup
5.2. Imports Setup
5.3. Input Adapter
5.3.1. Input Adapter Configuration and Start
5.3.2. Kafka Connectivity
5.3.3. Controlling Input Adapter Operation
5.4. Output Adapter
5.4.1. Output Adapter Configuration and Start
5.4.2. Kafka Connectivity
5.4.3. Controlling Output Adapter Operation
6. The HTTP Adapter
6.1. Adapter Overview
6.2. Getting Started
6.2.1. Plugin Loader Configuration
6.2.2. Configuration and Starting via API
6.3. HTTP Input Adapter
6.3.1. HTTP Service
6.3.2. Get Handlers
6.4. HTTP Output Adapter
6.4.1. Triggered HTTP Get
7. The Socket Adapter
7.1. Getting Started
7.1.1. Plugin Loader Configuration
7.1.2. Configuration and Starting via API
7.2. Socket Service
7.2.1. Object Data Format
7.2.2. String CSV Data Format
7.2.3. String CSV Data Format With Property Order
8. The Relational Database Adapter
8.1. Adapter Overview
8.2. Getting Started
8.2.1. Plugin Loader Configuration
8.2.2. Configuration and Starting via API
8.3. JDBC Connections
8.4. Triggered DML Statement Execution
8.5. Triggered Update-Insert Execution
8.6. Executor Configuration
8.7. Reading and Polling Database Tables
8.7.1. Polling and Startup SQL Queries
9. XML and JSON Output
10. Additional Event Representations
10.1. Apache Axiom Events

The file input and output adapter consists of:

  1. File (including CSV) input and output utilizing data flow operators.

  2. The CSV input adapter API.

The FileSource operator reads from files, transforms file data and populates a data flow instance with events.

The FileSource operator cannot declare any input streams.

The FileSource operator must have at least one output stream. You can declare additional output streams to hold beginning-of-file and end-of-file indication.

Parameters for the FileSource operator are listed below, with the required parameters listed first:


The first output stream holds per-line output events. For use with the line format and if declaring two output streams, the second stream holds end-of-file indication. If declaring three output streams, the second stream holds beginning-of-file indication and the third stream holds end-of-file indication.

The line format requires that the output stream's event type is an object-array event type that features a single string-type property that the operator populates with each line of the file.

The file name (or adapterInputSource) may point to a zip file. If the file name ends with the literal zip the operator opens the zip file and uses the first packaged file. All other parameters including the format parameter for CSV or line-formatting then apply to the zipped file.

This example defines a data flow that consists of two operators that work together to read a file and send the resulting events into the engine:

create dataflow SensorCSVFlow
  FileSource -> sensorstream<TemperatureEventStream> {
    file: 'sensor_events.csv', 
    propertyNames: ['sensor','temp','updtime'], 
    numLoops: 3
  }
  EventBusSink(sensorstream){}

The data flow above configures the FileSource operator to read the file sensor_events.csv, populate the sensor, temp and updtime properties of the TemperatureEventStream event type (type definition not shown here) and make the output events available within the data flow under the name sensorstream.

The data flow above configures the EventBusSource operator to send the sensorstream events into the engine for processing.

This example shows the EPL and code to read and count lines in text files.

Below EPL defines an event type to each hold the file line text as well as to indictate the beginning and end of a file (remove the semicolon if creating EPL individually and not as a module):

// for beginning-of-file events
create objectarray schema MyBOF (filename string); 
// for end of file events
create objectarray schema MyEOF (filename string); 
// for line text events
create objectarray schema MyLine (filename string, line string);  

The next EPL statements count lines per file outputting the final line count only when the end-of-file is reached.

// Initiate a context partition for each file, terminate upon end-of-file
create context FileContext 
  initiated by MyBOF as mybof 
  terminated by MyEOF(filename=mybof.filename);
  
// For each file, count lines 
context FileContext 
  select context.mybof.filename as filename, count(*) as cnt
  from MyLine(filename=context.mybof.filename)
  output snapshot when terminated;

The below EPL defines a data flow that reads text files line-by-line and that send events into the engine for processing.

create dataflow MyEOFEventFileReader
  FileSource -> mylines<MyLine>, mybof<MyBOF>, myeof<MyEOF> { 
    format: 'line', 
    propertyNameLine: 'line',      // store the text in the event property 'line' 
    propertyNameFile: 'filename'   // store the file name in 'filename'
  }
  EventBusSink(mylines, mybof, myeof) {}  // send events into engine

The next sample code instantiates and runs data flows passing a file name:

EPDataFlowInstantiationOptions options = new EPDataFlowInstantiationOptions();
options.addParameterURI("FileSource/file", "myfile.txt");
EPDataFlowInstance instance = epService.getEPRuntime().getDataFlowRuntime()
    .instantiate("MyEOFEventFileReader",options);
instance.run();

This chapter discusses the CSV input adapter API. CSV is an abbreviation for comma-separated values. CSV files are simple text files in which each line is a comma-separated list of values. CSV-formatted text can be read from many different input sources via com.espertech.esperio.csv.AdapterInputSource. Please consult the JavaDoc for additional information on AdapterInputSource and the CSV adapter.

The adapter reads events from a CSV input source and sends events to an engine using the class com.espertech.esperio.csv.CSVInputAdapter.

The below code snippet reads the CSV-formatted text file "simulation.csv" expecting the file in the classpath. The AdapterInputSource class can take other input sources.

AdapterInputSource source = new AdapterInputSource("simulation.csv");
(new CSVInputAdapter(epServiceProvider, source, "PriceEvent")).start();

To use the CSVInputAdapter without any options, the event type PriceEvent and its property names and value types must be known to the engine. The next section elaborates on adapter options.

The sample application code below shows all the steps to configure, via API, a Map-based event type and play the CSV file without setting any of the available options.

Map<String, Class> eventProperties = new HashMap<String, Class>();
eventProperties.put("symbol", String.class);
eventProperties.put("price", double.class);
eventProperties.put("volume", Integer.class);

Configuration configuration = new Configuration();
configuration.addEventType("PriceEvent", eventProperties);

epService = EPServiceProviderManager.getDefaultProvider(configuration);

EPStatement stmt = epService.getEPAdministrator().createEPL(
   "select symbol, price, volume from PriceEvent.win:length(100)");

(new CSVInputAdapter(epService, new AdapterInputSource(filename), "PriceEvent")).start();

The contents of a sample CSV file is shown next.

symbol,price,volume
IBM,55.5,1000

The next code snippet outlines using a java.io.Reader as an alternative input source :

String myCSV = "symbol, price, volume" + NEW_LINE + "IBM, 10.2, 10000";
StringReader reader = new StringReader(myCSV);
(new CSVInputAdapter(epService, new AdapterInputSource(reader), "PriceEvent")).start();

In the previous code samples, the PriceEvent properties were defined programmatically with their correct types. It is possible to skip this step and use only a column header record. In such a case you must define property types in the header otherwise a type of String is assumed.

Consider the following:

symbol,double price, int volume
IBM,55.5,1000

symbol,price,volume
IBM,55.5,1000

The first CSV file defines explicit types in the column header while the second file does not. With the second file a statement like select sum(volume) from PriceEvent.win:time(1 min) will be rejected as in the second file volume is defaulted to type String - unless otherwise programmatically configured.

Use the CSVInputAdapterSpec class to set playback options. The following options are available:

The next code snippet shows the use of CSVInputAdapterSpec to set playback options.

CSVInputAdapterSpec spec = new CSVInputAdapterSpec(new AdapterInputSource(myURL), "PriceEvent");
spec.setEventsPerSec(1000);
spec.setLooping(true);
  
InputAdapter inputAdapter = new CSVInputAdapter(epService, spec);
inputAdapter.start();	// method blocks unless engine thread option is set

The CSV input adapter can run simulations of events arriving in time-order from different input streams. Use the AdapterCoordinator as a specialized input adapter for coordinating multiple CSV input sources by timestamp.

The sample application code listed below simulates price and trade events arriving in timestamp order. Via the adapter the application reads two CSV-formatted files from a URL that each contain a timestamp column as well as price or trade events. The AdapterCoordinator uses the timestamp column to send events to the engine in the exact ordering prescribed by the timestamp values.

AdapterInputSource sourceOne = new AdapterInputSource(new URL("FILE://prices.csv"));
CSVInputAdapterSpec inputOne = new CSVInputAdapterSpec(sourceOne, "PriceEvent");
inputOne.setTimestampColumn("timestamp");

AdapterInputSource sourceTwo = new AdapterInputSource(new URL("FILE://trades.csv"));
CSVInputAdapterSpec inputTwo = new CSVInputAdapterSpec(sourceTwo, "TradeEvent");
inputTwo.setTimestampColumn("timestamp");

AdapterCoordinator coordinator = new AdapterCoordinatorImpl(epService, true);
coordinator.coordinate(new CSVInputAdapter(inputOne));
coordinator.coordinate(new CSVInputAdapter(inputTwo));
coordinator.start();

The AdapterCoordinatorImpl is provided with two parameters: the engine instance, and a boolean value that instructs the adapter to use the engine timer thread if set to true, and the adapter can use the application thread if the flag passed is false.

You may not set an event rate per second when using a timestamp column and time-order.

This chapter discusses the input and output adapters for JMS based on the Spring JmsTemplate technology. For more information on Spring, and the latest version of Spring, please visit http://www.springframework.org.

Here are the steps to use the adapters:

In summary the Spring JMS input adapter performs the following functions:

The Spring JMS output adapter can:

The Spring configuration file must list input and output adapters to be initialized by SpringContextLoader upon engine initialization. Please refer to your JMS provider documentation, and the Spring framework documentation on help to configure your specific JMS provider via Spring.

The next XML snippet shows a complete sample configuration for an input adapter. The sample includes the JMS configuration for an Apache ActiveMQ JMS provider.

<!-- Spring Application Context -->
<beans default-destroy-method="destroy">

  <!-- JMS ActiveMQ Connection Factory -->
  <bean id="jmsActiveMQFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    <property name="connectionFactory">
      <bean class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
      </bean>
    </property>
  </bean>

  <!--  ActiveMQ destination to use  by default -->
  <bean id="defaultDestination"
        class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="ESPER.QUEUE"/>
  </bean>

  <!--  Spring JMS Template for ActiveMQ -->
  <bean id="jmsActiveMQTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory">
      <ref bean="jmsActiveMQFactory"/>
    </property>
    <property name="defaultDestination">
      <ref bean="defaultDestination"/>
    </property>
  </bean>

  <!-- Provides listener threads -->
  <bean id="listenerContainer" 
              class="org.springframework.jms.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="jmsActiveMQFactory"/>
    <property name="destination" ref="defaultDestination"/>
    <property name="messageListener" ref="jmsInputAdapter"/>
  </bean>

  <!-- Default unmarshaller -->
  <bean id="jmsMessageUnmarshaller" 
              class="com.espertech.esperio.jms.JMSDefaultAnyMessageUnmarshaller"/>

  <!-- Input adapter -->
  <bean id="jmsInputAdapter" class="com.espertech.esperio.jms.SpringJMSTemplateInputAdapter">
    <property name="jmsTemplate">
      <ref bean="jmsActiveMQTemplate"/>
    </property>
    <property name="jmsMessageUnmarshaller">
      <ref bean="jmsMessageUnmarshaller"/>
    </property>
  </bean>

</beans>

This input adapter attaches to the JMS destination ESPER.QUEUE at an Apache MQ broker available at port tcp://localhost:61616. It configures an un-marshalling class as discussed next.

The Spring configuration file lists all input and output adapters in one file. The SpringContextLoader upon engine initialization starts all input and output adapters.

The next XML snippet shows a complete sample configuration of an output adapter. Please check with your JMS provider for the appropriate Spring class names and settings. Note that the input and output adapter Spring configurations can be in the same file.

<!-- Application Context -->
<beans default-destroy-method="destroy">

  <!-- JMS ActiveMQ Connection Factory -->
  <bean id="jmsActiveMQFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    <property name="connectionFactory">
      <bean class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
      </bean>
    </property>
  </bean>

  <!--  ActiveMQ destination to use  by default -->
  <bean id="defaultDestination"
        class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="ESPER.QUEUE"/>
  </bean>

  <!--  Spring JMS Template for ActiveMQ -->
  <bean id="jmsActiveMQTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory">
      <ref bean="jmsActiveMQFactory"/>
    </property>
    <property name="defaultDestination">
      <ref bean="defaultDestination"/>
    </property>
    <property name="receiveTimeout">
      <value>30000</value>
    </property>
  </bean>

  <!--  Marshaller marshals events into map messages -->
  <bean id="jmsMessageMarshaller" class="com.espertech.esperio.jms.JMSDefaultMapMessageMarshaller"/>
  <bean id="myCustomMarshaller" class="com.espertech.esperio.jms.JMSDefaultMapMessageMarshaller"/>

  <!--  Output adapter puts it all together -->
  <bean id="jmsOutputAdapter" class="com.espertech.esperio.jms.SpringJMSTemplateOutputAdapter">
    <property name="jmsTemplate">
      <ref bean="jmsActiveMQTemplate"/>
    </property>
    <property name="subscriptionMap">
      <map>
        <entry>
          <key><idref local="subscriptionOne"/></key>
          <ref bean="subscriptionOne"/>
        </entry>
        <entry>
          <key><idref local="subscriptionTwo"/></key>
          <ref bean="subscriptionTwo"/>
        </entry>
      </map>
    </property>
    <property name="jmsMessageMarshaller">
      <ref bean="jmsMessageMarshaller"/>
    </property>
  </bean>

  <bean id="subscriptionOne" class="com.espertech.esperio.jms.JMSSubscription">
    <property name="eventTypeName" value="MyOutputStream"/>
  </bean>

  <bean id="subscriptionTwo" class="com.espertech.esperio.jms.JMSSubscription">
    <property name="eventTypeName" value="MyOtherOutputStream"/>
    <property name="jmsMessageMarshaller">
      <ref bean="myCustomMarshaller"/>
    </property>
  </bean>

</beans>

EsperIO provides a marshal implementation in the class JMSDefaultMapMessageMarshaller. This marshaller constructs a JMS MapMessage from any event received by copying event properties into the name-value pairs of the message. The configuration file makes it easy to configure a custom marshaller that adheres to the com.espertech.esperio.jms.JMSMessageMarshaller interface.

Note that this marshaller uses javax.jms.MapMessage name-value pairs and not general javax.jms.Message properties. This means when you'll read the event properties back from the JMS MapMessage, you will have to use the javax.jms.MapMessage.getObject(...) method.

The SpringJMSTemplateOutputAdapter is configured with a list of subscription instances of type JMSSubscription as the sample configuration shows. Each subscription defines an event type name that must be configured and used in the insert-into syntax of a statement.

To connect the Spring JMS output adapter and the EPL statements producing events, use the insert-into syntax to direct events for output. Here is a sample statement that sends events into MyOutputStream:

insert into MyOutputStream select assetId, zone from RFIDEvent

The type MyOutputStream must be known to an engine instance. The output adapter requires the name to be configured with the Engine instance, e.g.:

<esper-configuration>
  <event-type name="MyOutputStream">
    <java-util-map>
      <map-property name="assetId" class="String"/>
      <map-property name="zone" class="int"/>
    </java-util-map>
  </event-type>
</esper-configuration>

This chapter discusses the input and output adapters for AMQP. AMQP input and output utilizes data flow operators.

The AMQPSink operator receives input stream events, transforms events to AMQP messages and sends messages into an AMQP queue.

The AMQPSink operator must have a single input stream.

The AMQPSink operator cannot declare any output streams.

Parameters for the AMQPSink operator are:


Either the queueName or the combination of exchange and routingKey are required parameters.

The collector is required and must be specified to transform events to AMQP messages. The collector instance must implement the interface ObjectToAMQPCollector. The adapter provides a default implementation ObjectToAMQPCollectorSerializable that employs default serialization.

The following example declares a data flow that is triggered by MyMapEventType events from the event bus (type not declared here) that sends serialized messages to an AMQP queue:

create dataflow AMQPOutgoingDataFlow
   EventBusSource -> outstream<MyMapEventType> {}
   AMQPSink(outstream) {
     host: 'localhost',
     queueName: 'myqueue',
     collector: {class: 'ObjectToAMQPCollectorSerializable'}
   }

The AMQPSource operator receives AMQP messages from a queue, transforms messages and populates a data flow instance with events.

The AMQPSource operator cannot declare any input streams.

The AMQPSource operator must have a single output stream.

Parameters for the AMQPSource operator are listed below, with the required parameters listed first:


The collector is required and must be specified to transform AMQP messages to events. The collector instance must implement the interface AMQPToObjectCollector. The adapter provides a default implementation AMQPToObjectCollectorSerializable that employs default serialization.

The following example declares a data flow that is receives AMQP messages from a queue, transforms each message and sends each message of type MyMapEventType into the event bus:

create dataflow AMQPIncomingDataFlow
  AMQPSource -> outstream<MyMapEventType> {
    host: 'localhost',
    queueName: 'myqueue',
    collector: {class: 'AMQPToObjectCollectorSerializable'},
    logMessages: true
  }
  EventBusSink(outstream){}

This chapter discusses the EsperIO Kafka input adapter.

This input adapter is for receiving events and event or engine time from Kafka topics.

The scope of this input adapter is a local reader and is not meant for coordinated use by multiple servers, which is the scope of Esper Enterprise Edition. Please see Esper Enterprise Edition for information on the horizontal scale-out architecture based on Kafka (the scope of this input adapter is NOT horizontal scale-out).

You may configure and start the EsperIO Kafka input adapter either as part of your Esper configuration file in the plugin loader section or via the adapter API.

The following example shows an Esper configuration file with all properties:

<?xml version="1.0" encoding="UTF-8"?>
<esper-configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns="http://www.espertech.com/schema/esper"
  xsi:noNamespaceSchemaLocation="../../esper/etc/esper-configuration-6-0.xsd">
  
  <plugin-loader name="KafkaInput" class-name="com.espertech.esperio.kafka.EsperIOKafkaInputAdapterPlugin">
    <!--
      Kafka Consumer Properties: Passed-Through to Kafka Consumer.
    -->
    <init-arg name="bootstrap.servers" value="127.0.0.1:9092"/>
    <init-arg name="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
    <init-arg name="value.deserializer" value="com.mycompany.MyCustomDeserializer"/>
    <init-arg name="group.id" value="my_group_id"/>

    <!--
      EsperIO Kafka Input Properties: Define subscription, topics, processor and timestamp extractor.
    -->
    <init-arg name="esperio.kafka.input.subscriber" value="com.espertech.esperio.kafka.EsperIOKafkaInputSubscriberByTopicList"/>
    <init-arg name="esperio.kafka.topics" value="my_topic"/>
    <init-arg name="esperio.kafka.input.processor" value="com.espertech.esperio.kafka.EsperIOKafkaInputProcessorDefault"/>
    <init-arg name="esperio.kafka.input.timestampextractor" value="com.espertech.esperio.kafka.EsperIOKafkaInputTimestampExtractorConsumerRecord"/>
  </plugin-loader>
</esper-configuration>

Alternatively the equivalent API calls to configure the adapter are:

Properties props = new Properties();

// Kafka Consumer Properties
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, com.mycompany.MyCustomDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group_id");

// EsperIO Kafka Input Adapter Properties
props.put(EsperIOKafkaConfig.INPUT_SUBSCRIBER_CONFIG, EsperIOKafkaInputSubscriberByTopicList.class.getName());
props.put(EsperIOKafkaConfig.TOPICS_CONFIG, "my_topic");
props.put(EsperIOKafkaConfig.INPUT_PROCESSOR_CONFIG, EsperIOKafkaInputProcessorDefault.class.getName());
props.put(EsperIOKafkaConfig.INPUT_TIMESTAMPEXTRACTOR_CONFIG, EsperIOKafkaInputTimestampExtractorConsumerRecord.class.getName());

Configuration config = new Configuration();
config.addPluginLoader("KafkaInput", EsperIOKafkaInputAdapterPlugin.class.getName(), props, null);

By adding the plug-in loader to the configuration as above the engine automatically starts the adapter as part of engine initialization.

Alternatively, the adapter can be started and stopped programatically as follows:

// start adapter
EsperIOKafkaInputAdapter adapter = new EsperIOKafkaInputAdapter(props, "default");
adapter.start();

// destroy the adapter when done
adapter.destroy();

The input adapter operation depends on the subscriber and processor.

The subscriber is responsible for calling Kafka consumer subscribe methods, i.e. calls Kafka API consumer.subscribe(...).

The processor is responsible for processing Kafka API ConsumerRecords messages, i.e. implements process(ConsumerRecords records).

Properties that define the subscriber and consumer are below. EsperIOKafka is part of the EsperIO Kafka API in com.espertech.esperio.kafka.EsperIOKafkaConfig.

Table 5.2. Kafka Input Adapter Properties

NameAPI NameDescription
esperio.kafka.input.subscriberEsperIOKafkaConfig.INPUT_SUBSCRIBER_CONFIG

Required property.

Fully-qualified class name of subscriber that subscribes to topics and partitions.

The class must implement the interface EsperIOKafkaInputSubscriber.

You may use com.espertech.esperio.kafka.EsperIOKafkaInputSubscriberByTopicList and provide a topic list in esperio.kafka.topics.

esperio.kafka.topicsEsperIOKafkaConfig.TOPICS_CONFIG

Optional property and only required if the subscriber is EsperIOKafkaInputSubscriberByTopicList.

Specifies a comma-separated list of topic names to subscribe to.

esperio.kafka.input.processorEsperIOKafkaConfig.INPUT_PROCESSOR_CONFIG

Required property.

Fully-qualified class name of the Kafka consumer records processor that sends events into the engine and may advance engine time.

The class must implement the interface EsperIOKafkaInputProcessor.

You may use com.espertech.esperio.kafka.EsperIOKafkaInputProcessorDefault for default event and time processing.

esperio.kafka.input.timestampextractorEsperIOKafkaConfig.INPUT_TIMESTAMPEXTRACTOR_CONFIG

Optional property.

Fully-qualified class name of the Kafka message timestamp extractor that extracts a long-typed timestamp from a consumer record, for use as time.

The class must implement the interface EsperIOKafkaInputTimestampExtractor.

You may use com.espertech.esperio.kafka.EsperIOKafkaInputTimestampExtractorConsumerRecord which returns the time of each consumer record that is part of the consumer record.


The processor is responsible for processing Kafka API ConsumerRecords.

The adapter provides a default implementation by name EsperIOKafkaInputProcessorDefault. Your application may provide its own processor by implementing the simple EsperIOKafkaInputProcessor interface.

This default processor can be configured with an optional timestamp extractor that obtains a timestamp for each consumer record. If no timestamp extractor is configured, the default processor does not advance time.

For reference, we provide the (slightly simplified) code of the default processor below (repository or source jar for full code):

public class EsperIOKafkaInputProcessorDefault implements EsperIOKafkaInputProcessor {

  private EPServiceProvider engine;
  private EsperIOKafkaInputTimestampExtractor timestampExtractor;

  public void init(EsperIOKafkaInputProcessorContext context) {
    this.engine = context.getEngine();

    String timestampExtractorClassName = context.getProperties().getProperty(EsperIOKafkaConfig.INPUT_TIMESTAMPEXTRACTOR_CONFIG);
    if (timestampExtractorClassName != null) {
      timestampExtractor = (EsperIOKafkaInputTimestampExtractor) JavaClassHelper.instantiate(EsperIOKafkaInputTimestampExtractor.class, timestampExtractorClassName);
    }
  }

  public void process(ConsumerRecords<Object, Object> records) {
    for (ConsumerRecord record : records) {

      if (timestampExtractor != null) {
        long timestamp = timestampExtractor.extract(record);
        // advances engine time
        engine.getEPRuntime().sendEvent(new CurrentTimeSpanEvent(timestamp));
      }

      if (record.value() != null) {
        engine.getEPRuntime().sendEvent(record.value());
      }
    }
  }

  public void close() {}
}

The default processor takes the message value and sends it as an event into the engine. The default processor takes the extracted time, if a timestamp extractor is provided, and sends a time span event to the engine to advance engine time.

You must provide your own processor if any additional event transformation is required or if using epRuntime.send(Map/ObjectArray/Node) or if the default behavior does not fit for other reasons.

You may configure and start the EsperIO Kafka output adapter either as part of your Esper configuration file in the plugin loader section or via the adapter API.

The following example shows an Esper configuration file with all properties:

<?xml version="1.0" encoding="UTF-8"?>
<esper-configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns="http://www.espertech.com/schema/esper"
  xsi:noNamespaceSchemaLocation="../../esper/etc/esper-configuration-6-0.xsd">
  
  <plugin-loader name="KafkaOutput" class-name="com.espertech.esperio.kafka.EsperIOKafkaOutputAdapterPlugin">
    <!--
      Kafka Producer Properties: Passed-Through to Kafka Consumer.
    -->
    <init-arg name="bootstrap.servers" value="127.0.0.1:9092"/>
    <init-arg name="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
    <init-arg name="value.serializer" value="com.mycompany.MyCustomSerializer"/>

    <!--
      EsperIO Kafka Output Properties: Define a flow controller.
    -->
    <init-arg name="esperio.kafka.output.flowcontroller" value="com.espertech.esperio.kafka.EsperIOKafkaOutputFlowControllerByAnnotatedStmt"/>
    <init-arg name="esperio.kafka.topics" value="my_topic"/>
  </plugin-loader>
</esper-configuration>

Alternatively the equivalent API calls to configure the adapter are:

Properties props = new Properties();

// Kafka Producer Properties
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());

// EsperIO Kafka Output Adapter Properties
props.put(EsperIOKafkaConfig.OUTPUT_FLOWCONTROLLER_CONFIG, EsperIOKafkaOutputFlowControllerByAnnotatedStmt.class.getName());
props.put(EsperIOKafkaConfig.TOPICS_CONFIG, "my_topic");

Configuration config = new Configuration();
config.addPluginLoader("KafkaOutput", EsperIOKafkaOutputAdapterPlugin.class.getName(), props, null);

By adding the plug-in loader to the configuration as above the engine automatically starts the adapter as part of engine initialization.

Alternatively, the adapter can be started and stopped programatically as follows:

// start adapter
EsperIOKafkaOutputAdapter adapter = new EsperIOKafkaOutputAdapter(props, "default");
adapter.start();

// destroy the adapter when done
adapter.destroy();

The output adapter operation depends on the flow controller, which is reponsible for attaching listeners to statements that send messages to Kafka topics.

Properties that define the flow controller are below. EsperIOKafka is part of the EsperIO Kafka API in com.espertech.esperio.kafka.EsperIOKafkaConfig.


This chapter discusses the EsperIO HTTP adapter.

You may configure the EsperIO HTTP adapter either as part of your Esper configuration file in the plugin loader section or via the adapter API. Add the esperio-http-version.jar file to your classpath.

For input adapter operation, add the httpcore-version.jar to your classpath. If using Java NIO add the httpcore-nio-version.jar to your classpath in addition.

For output adapter operation, add the httpclient-version.jar to your classpath.

A sample adapter configuration file is provided in esperio-http-sample-config.xml in the etc folder of the distribution. A configuration file must be valid according to schema esperio-http-configuration-6-0.xsd.

One or more handlers for HTTP Get operations can be installed for a service and are used to receive events.

Define a get element in the adapter configuration file (or use the GetRequest class) for every handler to register for a service.

The synopsis is as follows:

<get service="[service]" pattern="[pattern]"/>

The service attribute value is required and provides the name of the HTTP service to register the Get operation handler for.

A value for the pattern attribute is required and may be either * for all URIs, *[uri] for all URIs ending with the given URI or [uri]* for all URI starting with the given URI.

A sample Get-handler configuration follows:

<get service="myservice" pattern="*"/>

When posting events to the engine, the Get request URI must contain a stream parameter that carries the name of the stream (event type) to insert into. Each event property to be populated in the input event must be part of the Get request parameter values.

For example, the URI http://localhost:8079/sendevent?stream=MyFirewallEvent&name=Joe&changed=true entered into a browser sends an input event of type MyFirewallEvent setting the name property of the event to "Joe" and the changed property of the event to true.

Note that if your target type is a Java object event, your event class must provide setter-methods according to JavaBean conventions. The event class should also provide a default constructor taking no parameters. If your event class does not have a default constructor, your application may configure a factory method via ConfigurationEventTypeLegacy.

This facility instructs the adapter to perform an HTTP Get request when a triggering event occurs, passing event properties as URI parameters.

Define a request element in the adapter configuration file (or use the Request class) for every HTTP Get to execute.

The synopsis is as follows:

<request stream="[stream]" uri="[uri_with_placeholders]"/>

A value for the stream attribute is required and provides the name of the stream that triggers the HTTP Get. The adapter expects a stream by this name to exist at adapter start time.

The uri_with_placeholders attribute value is required. You may place event property placeholders inside the URI to format the URI as needed. Placeholders are of the format ${property_name}.

A sample request configuration follows:

<request stream="TriggerFirewallStream" uri="http://myremotehost:80/root/event"/>

Assuming the HttpTriggerStream has event properties name and ipaddress then a sample Get request URI is as follows:

http://myremotehost:80/root/event?stream=TriggerFirewallStream&name=Joe&ipaddress=120.1.0.0

You may parameterize the URI via placeholders by placing ${property_name} and the special placeholder ${stream} into the URI string.

The next example configuration defines URI parameters via placeholder:

<request stream="TriggerFirewallStream" uri="http://myremotehost:80/root/${stream}?violation&amp;name=${name};violationip=${ipaddress}"/>

The URI generated by the adapter:

http://myremotehost:80/root/TriggerFirewallStream?violation&name=Joe&violationip=120.1.0.0

This chapter discusses the EsperIO Socket adapter.

The EsperIO Socket input adapter can be used to send events into an Esper engine instance via socket client, either as Java objects or as CSV name-value pair strings.

You may configure the EsperIO Socket adapter either as part of your Esper configuration file in the plugin loader section or via the adapter API. Add the esperio-socket-version.jar file to your classpath. There are no other dependent jar files required.

A sample adapter configuration file is provided in esperio-socket-sample-config.xml in the etc folder of the distribution. A configuration file must be valid according to schema esperio-socket-configuration-6-0.xsd.

Add a socket configuration for each unique port that you want to expose a socket receive service for use by socket client connections.

The synopsis is as follows:

<esperio-socket-configuration>
  <socket name="[name]" port="[port]" data="[csv|object|property_ordered_csv]" 
    [hostname="hostname"] [backlog="backlog"] [unescape="true|false"]/>
</esperio-socket-configuration>

The required name attribute provides the name of the socket service for use in logging.

The required port attribute provides the port that the socket service accepts client connections.

The required data attribute specifies whether the data arriving through the socket is formatted as a Java binary object stream or as CSV string values.

The optional hostname attribute can provide the host name passed to the server socket (ServerSocket).

The optional backlog attribute can provide the backlog number of connections passed to the server socket. This number defaults to 2 when a host name is passed but no backlog is provided.

The optional unescape attribute is false by default. When false the adapter does not unescape (Java escape rules) values. When true the adapter performs an unescape on all values.

If configuring via the adapter API or Spring, use the com.espertech.esperio.socket.config.SocketConfig class.

When sending events as CSV strings, the format of the string should be:

stream=[type],[name]=[value] [,...] (newline)

The CSV string must end with a newline character: Each event is one line. Each CSV element must be in the [name]=[value] format. Your CSV must contain a value for stream which must denote a configured event type. The adapter parses each string value and populates an instance of the target type.

This next example XML configures a socket accepting client connections that provide events as CSV-formatted strings with name-value pairs :

<esperio-socket-configuration>
  <socket name="csvStreamSocket" port="8079" data="csv"/>
</esperio-socket-configuration>

A piece of client code that sends an event of type MyEvent may look as follows:

// connect first
String newline = System.getProperty("line.separator");
Socket requestSocket = new Socket("localhost", port);
BufferedWriter wr = new BufferedWriter(
    new OutputStreamWriter(socket.getOutputStream()));

// send a few events
wr.write("stream=MyEvent,price=20.d,upcCode=A0001");
wr.write(newline);
wr.flush();

// close when done
wr.close();
requestSocket.close();

Note that if your target type is a Java object event, your event class must provide setter-methods according to JavaBean conventions. The event class should also provide a default constructor taking no parameters. If your event class does not have a default constructor, your application may configure a factory method via ConfigurationEventTypeLegacy.

This chapter discusses the EsperIO adapter for relational databases.

You may configure the EsperIO DB adapter either as part of your Esper configuration file in the plugin loader section or via the adapter API. Add the esperio-db-version.jar file to your classpath as well as the JDBC driver. There are not other dependent jar files required by the adapter.

A sample adapter configuration file is provided in esperio-db-sample-config.xml in the etc folder of the distribution. A configuration file must be valid according to schema esperio-db-configuration-6-0.xsd.

This facility allows running a SQL DML (Data Manipulation) query, i.e. an Update, Insert, Delete query or a stored procedure when an event in a triggering stream occurs.

Define a dml element in the adapter configuration file (or use the DMLQuery class) for every query to execute.

The synopsis is as follows:

<dml connection="[connection]" stream="[stream]"  
    [name="name"] [executor-name="executor"] [retry="count"] [retry-interval-sec="sec"]>
  <sql>[sql]</sql>
  <bindings>
    <parameter pos="[position]" property="[property_name]"/>
    [...parameters]
  </bindings>
</dml>

The connection attribute value is required and provides the name of the configured JDBC connection.

A value for the stream attribute is required and provides the name of the stream that triggers the DML. The adapter expects a stream by this name to exist at adapter start time.

The name attribute is optional and is only used for logging errors.

The executor-name attribute is optional. If specified, it must be the name of an executor configuration. If specified, the adapter will use the executor service (queue and thread pool) for performing all DML work. If not specified, the adapter performs the DML work in the same thread.

The retry attribute is optional. If specified, the adapter will retry a given number of times in case an error is encountered. If retry-interval-sec is specified, the adapter waits the given number of seconds between retries.

The sql element is required and provides the SQL DML or stored procedure call to execute, with parameters as question mark (?).

The bindings element is required and provides the bindings for expression parameters.

The parameter element should occur as often as there are parameters in the SQL query. The position attribute starts at 1 and counts up for each parameter. The property parameter provide the name of the event property of the stream to use as the parameter value.

A sample DML configuration follows:

<dml connection="db1" stream="InsertToDBStream" 
      name="MyInsertQuery" executor-name="queue1" retry="count">
  <sql>insert into MyEventStore(key1, value1, value2) values (?, ?, ?)</sql>
  <bindings>
    <parameter pos="1" property="eventProperty1"/>
    <parameter pos="2" property="eventProperty2"/>
    <parameter pos="3" property="eventProperty3"/>
  </bindings>
</dml>

This facility allows running an SQL Update that is followed by an Insert if the Update did not update any rows.

Define an upsert element in the adapter configuration file (or use the UpsertQuery class) for every update-insert to execute.

The synopsis is as follows:

<upsert connection="[connection]" stream="[stream]" table-name="[table]"
      [name="name"] [executor-name="executor"] [retry="count"] [retry-interval-sec="sec"]>
  <keys>
    <column property="[property_name]" column="[column_name]" type="[sql_type]"/>
    [...column]
  </keys>
  <values>
    <column property="[property_name]" column="[column_name]" type="[sql_type]"/>
    [...column]
  </values>
</upsert>

The connection attribute value is required and provides the name of the configured JDBC connection.

A value for the stream attribute is required and provides the name of the stream that triggers the Update-Insert. The adapter expects a stream by this name to exist at adapter start time.

The table attribute value is required and provides the database table name.

The name attribute is optional and is only used for logging errors.

The executor-name attribute is optional. If specified, it must be the name of an executor configuration. If specified, the adapter will use the executor service (queue and thread pool) for performing all work. If not specified, the adapter performs the work in the same thread.

The retry attribute is optional. If specified, the adapter will retry a given number of times in case an error is encountered. If retry-interval-sec is specified, the adapter waits the given number of seconds between retries.

The keys element is required and provides the key columns of the table and the values element provides the list of value columns of the table.

The column element should occur as many as there are key and value columns in the table. The property attribute provides the name of the event property, the column attribute provides the database table column name and the type is any of the java.sql.Types names (case ignored).

A sample Update-Insert configuration follows:

<upsert connection="db1" stream="UpdateInsertDBTableTrigger" name="UpdateInsertSample" 
    table-name="MyKeyedTable" executor-name="queue1" retry="3">
  <keys>
    <column property="eventProperty1" column="keyColumn1" type="varchar"/>
    <column property="eventProperty2" column="keyColumn2" type="varchar"/>
  </keys>
  <values>
    <column property="eventProperty3" column="valueColumn1" type="varchar"/>
    <column property="eventProperty4" column="valueColumn2" type="integer"/>
  </values>
</upsert>

Herein we provide sample statements and documentation pointers to use Esper EPL for reading from database tables. If only reading and not writing to a database, no configuration or EsperIO jar is file required.

Please consult the Esper SQL access documentation for more information.

The plug-in event representation based on Apache Axiom can process XML documents by means of the Streaming API for XML (StAX) and the concept of "pull parsing", which can gain performance improvements extracting data from XML documents.

The instructions below have been tested with Apache Axiom version 1.2.5. Please visit http://ws.apache.org/commons/axiom/ for more information. Apache Axiom requires additional jar files that are not part of the EsperIO distribution and must be downloaded separately.

There are 3 steps to follow:

To enable Apache Axiom event processing, use the code snippet shown next, or configure via confiugration XML:

Configuration config = new Configuration();
config.addPlugInEventRepresentation(new URI("type://xml/apacheaxiom/OMNode"),
    AxiomEventRepresentation.class.getName(), null);

Your application may register Axiom event types in advance. Here is sample code for adding event types based on Axiom:

ConfigurationEventTypeAxiom desc = new ConfigurationEventTypeAxiom();
desc.setRootElementName("measurement");
desc.addXPathProperty("measurement", "/sensor/measurement", XPathConstants.NUMBER);
URI[] resolveURIs = new URI[] {new URI("type://xml/apacheaxiom/OMNode/SensorEvent")};
configuration.addPlugInEventType("SensorEvent", resolveURIs, desc);

The operation above is available at configuration time and also at runtime via ConfigurationOperations. After registering an event type name as above, your application can create EPL statements.

To send Axiom OMDocument or OMElement events into the engine, your application code must obtain an EventSender to process Axiom OMElement events:

URI[] resolveURIs = new URI[] {new URI("type://xml/apacheaxiom/OMNode/SensorEvent")};
EventSender sender = epService.getEPRuntime().getEventSender(resolveURIs);

String xml = "<measurement><temperature>98.6</temperature></measurement>";
InputStream s = new ByteArrayInputStream(xml.getBytes());
OMElement omElement = new StAXOMBuilder(s).getDocumentElement();

sender.sendEvent(omElement);

Configuring an Axiom event type via XML is easy. An Esper configuration XML can be found in the file esper-axiom-sample-configuration.xml in the etc folder of the EsperIO distribution.

The configuration XML for the ConfigurationEventTypeAxiom class adheres to the schema esperio-axiom-configuration-6-0.xsd also in the etc folder of the EsperIO distribution.