www.espertech.comDocumentation

Chapter 5. The Kafka Adapter

5.1. Classpath Setup
5.2. Imports Setup
5.3. Input Adapter
5.3.1. Input Adapter Configuration and Start
5.3.2. Kafka Connectivity
5.3.3. Controlling Input Adapter Operation
5.4. Output Adapter
5.4.1. Output Adapter Configuration and Start
5.4.2. Kafka Connectivity
5.4.3. Controlling Output Adapter Operation

This chapter discusses the EsperIO Kafka input adapter.

This input adapter is for receiving events and event or engine time from Kafka topics.

The scope of this input adapter is a local reader and is not meant for coordinated use by multiple servers, which is the scope of Esper Enterprise Edition. Please see Esper Enterprise Edition for information on the horizontal scale-out architecture based on Kafka (the scope of this input adapter is NOT horizontal scale-out).

Please add the esperio-kafka-version.jar jar file to your classpath.

Please also add kafka-clients-version.jar and the Kafka client dependencies to your classpath.

The EsperIO Kafka input adapter supports the new Kafka consumer only and requires Kafka client version 0.10.1.0 and higher.

For use with the Kafka output adapter, and when using the KafkaOutputDefault annotation, please add the KafkaOutputDefault import. For example:

configuration.addImport(KafkaOutputDefault.class);

You may configure and start the EsperIO Kafka input adapter either as part of your Esper configuration file in the plugin loader section or via the adapter API.

The following example shows an Esper configuration file with all properties:

<?xml version="1.0" encoding="UTF-8"?>
<esper-configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns="http://www.espertech.com/schema/esper"
  xsi:noNamespaceSchemaLocation="../../esper/etc/esper-configuration-7-0.xsd">
  
  <plugin-loader name="KafkaInput" class-name="com.espertech.esperio.kafka.EsperIOKafkaInputAdapterPlugin">
    <!--
      Kafka Consumer Properties: Passed-Through to Kafka Consumer.
    -->
    <init-arg name="bootstrap.servers" value="127.0.0.1:9092"/>
    <init-arg name="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
    <init-arg name="value.deserializer" value="com.mycompany.MyCustomDeserializer"/>
    <init-arg name="group.id" value="my_group_id"/>

    <!--
      EsperIO Kafka Input Properties: Define subscription, topics, processor and timestamp extractor.
    -->
    <init-arg name="esperio.kafka.input.subscriber" value="com.espertech.esperio.kafka.EsperIOKafkaInputSubscriberByTopicList"/>
    <init-arg name="esperio.kafka.topics" value="my_topic"/>
    <init-arg name="esperio.kafka.input.processor" value="com.espertech.esperio.kafka.EsperIOKafkaInputProcessorDefault"/>
    <init-arg name="esperio.kafka.input.timestampextractor" value="com.espertech.esperio.kafka.EsperIOKafkaInputTimestampExtractorConsumerRecord"/>
  </plugin-loader>
</esper-configuration>

Alternatively the equivalent API calls to configure the adapter are:

Properties props = new Properties();

// Kafka Consumer Properties
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, com.mycompany.MyCustomDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group_id");

// EsperIO Kafka Input Adapter Properties
props.put(EsperIOKafkaConfig.INPUT_SUBSCRIBER_CONFIG, EsperIOKafkaInputSubscriberByTopicList.class.getName());
props.put(EsperIOKafkaConfig.TOPICS_CONFIG, "my_topic");
props.put(EsperIOKafkaConfig.INPUT_PROCESSOR_CONFIG, EsperIOKafkaInputProcessorDefault.class.getName());
props.put(EsperIOKafkaConfig.INPUT_TIMESTAMPEXTRACTOR_CONFIG, EsperIOKafkaInputTimestampExtractorConsumerRecord.class.getName());

Configuration config = new Configuration();
config.addPluginLoader("KafkaInput", EsperIOKafkaInputAdapterPlugin.class.getName(), props, null);

By adding the plug-in loader to the configuration as above the engine automatically starts the adapter as part of engine initialization.

Alternatively, the adapter can be started and stopped programatically as follows:

// start adapter
EsperIOKafkaInputAdapter adapter = new EsperIOKafkaInputAdapter(props, "default");
adapter.start();

// destroy the adapter when done
adapter.destroy();

The input adapter operation depends on the subscriber and processor.

The subscriber is responsible for calling Kafka consumer subscribe methods, i.e. calls Kafka API consumer.subscribe(...).

The processor is responsible for processing Kafka API ConsumerRecords messages, i.e. implements process(ConsumerRecords records).

Properties that define the subscriber and consumer are below. EsperIOKafka is part of the EsperIO Kafka API in com.espertech.esperio.kafka.EsperIOKafkaConfig.

Table 5.2. Kafka Input Adapter Properties

NameAPI NameDescription
esperio.kafka.input.subscriberEsperIOKafkaConfig.INPUT_SUBSCRIBER_CONFIG

Required property.

Fully-qualified class name of subscriber that subscribes to topics and partitions.

The class must implement the interface EsperIOKafkaInputSubscriber.

You may use com.espertech.esperio.kafka.EsperIOKafkaInputSubscriberByTopicList and provide a topic list in esperio.kafka.topics.

esperio.kafka.topicsEsperIOKafkaConfig.TOPICS_CONFIG

Optional property and only required if the subscriber is EsperIOKafkaInputSubscriberByTopicList.

Specifies a comma-separated list of topic names to subscribe to.

esperio.kafka.input.processorEsperIOKafkaConfig.INPUT_PROCESSOR_CONFIG

Required property.

Fully-qualified class name of the Kafka consumer records processor that sends events into the engine and may advance engine time.

The class must implement the interface EsperIOKafkaInputProcessor.

You may use com.espertech.esperio.kafka.EsperIOKafkaInputProcessorDefault for default event and time processing.

esperio.kafka.input.timestampextractorEsperIOKafkaConfig.INPUT_TIMESTAMPEXTRACTOR_CONFIG

Optional property.

Fully-qualified class name of the Kafka message timestamp extractor that extracts a long-typed timestamp from a consumer record, for use as time.

The class must implement the interface EsperIOKafkaInputTimestampExtractor.

You may use com.espertech.esperio.kafka.EsperIOKafkaInputTimestampExtractorConsumerRecord which returns the time of each consumer record that is part of the consumer record.


The processor is responsible for processing Kafka API ConsumerRecords.

The adapter provides a default implementation by name EsperIOKafkaInputProcessorDefault. Your application may provide its own processor by implementing the simple EsperIOKafkaInputProcessor interface.

This default processor can be configured with an optional timestamp extractor that obtains a timestamp for each consumer record. If no timestamp extractor is configured, the default processor does not advance time.

For reference, we provide the (slightly simplified) code of the default processor below (repository or source jar for full code):

public class EsperIOKafkaInputProcessorDefault implements EsperIOKafkaInputProcessor {

  private EPServiceProvider engine;
  private EsperIOKafkaInputTimestampExtractor timestampExtractor;

  public void init(EsperIOKafkaInputProcessorContext context) {
    this.engine = context.getEngine();

    String timestampExtractorClassName = context.getProperties().getProperty(EsperIOKafkaConfig.INPUT_TIMESTAMPEXTRACTOR_CONFIG);
    if (timestampExtractorClassName != null) {
      timestampExtractor = (EsperIOKafkaInputTimestampExtractor) JavaClassHelper.instantiate(EsperIOKafkaInputTimestampExtractor.class, timestampExtractorClassName);
    }
  }

  public void process(ConsumerRecords<Object, Object> records) {
    for (ConsumerRecord record : records) {

      if (timestampExtractor != null) {
        long timestamp = timestampExtractor.extract(record);
        // advances engine time
        engine.getEPRuntime().sendEvent(new CurrentTimeSpanEvent(timestamp));
      }

      if (record.value() != null) {
        engine.getEPRuntime().sendEvent(record.value());
      }
    }
  }

  public void close() {}
}

The default processor takes the message value and sends it as an event into the engine. The default processor takes the extracted time, if a timestamp extractor is provided, and sends a time span event to the engine to advance engine time.

You must provide your own processor if any additional event transformation is required or if using epRuntime.send(Map/ObjectArray/Node) or if the default behavior does not fit for other reasons.

You may configure and start the EsperIO Kafka output adapter either as part of your Esper configuration file in the plugin loader section or via the adapter API.

The following example shows an Esper configuration file with all properties:

<?xml version="1.0" encoding="UTF-8"?>
<esper-configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns="http://www.espertech.com/schema/esper"
  xsi:noNamespaceSchemaLocation="../../esper/etc/esper-configuration-7-0.xsd">
  
  <plugin-loader name="KafkaOutput" class-name="com.espertech.esperio.kafka.EsperIOKafkaOutputAdapterPlugin">
    <!--
      Kafka Producer Properties: Passed-Through to Kafka Consumer.
    -->
    <init-arg name="bootstrap.servers" value="127.0.0.1:9092"/>
    <init-arg name="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
    <init-arg name="value.serializer" value="com.mycompany.MyCustomSerializer"/>

    <!--
      EsperIO Kafka Output Properties: Define a flow controller.
    -->
    <init-arg name="esperio.kafka.output.flowcontroller" value="com.espertech.esperio.kafka.EsperIOKafkaOutputFlowControllerByAnnotatedStmt"/>
    <init-arg name="esperio.kafka.topics" value="my_topic"/>
  </plugin-loader>
</esper-configuration>

Alternatively the equivalent API calls to configure the adapter are:

Properties props = new Properties();

// Kafka Producer Properties
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());

// EsperIO Kafka Output Adapter Properties
props.put(EsperIOKafkaConfig.OUTPUT_FLOWCONTROLLER_CONFIG, EsperIOKafkaOutputFlowControllerByAnnotatedStmt.class.getName());
props.put(EsperIOKafkaConfig.TOPICS_CONFIG, "my_topic");

Configuration config = new Configuration();
config.addPluginLoader("KafkaOutput", EsperIOKafkaOutputAdapterPlugin.class.getName(), props, null);

By adding the plug-in loader to the configuration as above the engine automatically starts the adapter as part of engine initialization.

Alternatively, the adapter can be started and stopped programatically as follows:

// start adapter
EsperIOKafkaOutputAdapter adapter = new EsperIOKafkaOutputAdapter(props, "default");
adapter.start();

// destroy the adapter when done
adapter.destroy();

The output adapter operation depends on the flow controller, which is reponsible for attaching listeners to statements that send messages to Kafka topics.

Properties that define the flow controller are below. EsperIOKafka is part of the EsperIO Kafka API in com.espertech.esperio.kafka.EsperIOKafkaConfig.