www.espertech.comDocumentation
This chapter discusses the EsperIO Kafka input adapter.
This input adapter is for receiving events and event or engine time from Kafka topics.
The scope of this input adapter is a local reader and is not meant for coordinated use by multiple servers, which is the scope of Esper Enterprise Edition. Please see Esper Enterprise Edition for information on the horizontal scale-out architecture based on Kafka (the scope of this input adapter is NOT horizontal scale-out).
Please add the esperio-kafka-version.jar
jar file to your classpath.
Please also add kafka-clients-
version.jar
and the Kafka client dependencies to your classpath.
The EsperIO Kafka input adapter supports the new Kafka consumer only and requires Kafka client version 0.10.1.0 and higher.
For use with the Kafka output adapter, and when using the KafkaOutputDefault
annotation, please add the KafkaOutputDefault
import.
For example:
configuration.addImport(KafkaOutputDefault.class);
You may configure and start the EsperIO Kafka input adapter either as part of your Esper configuration file in the plugin loader section or via the adapter API.
The following example shows an Esper configuration file with all properties:
<?xml version="1.0" encoding="UTF-8"?> <esper-configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.espertech.com/schema/esper" xsi:noNamespaceSchemaLocation="../../esper/etc/esper-configuration-6-0.xsd"> <plugin-loader name="KafkaInput" class-name="com.espertech.esperio.kafka.EsperIOKafkaInputAdapterPlugin"> <!-- Kafka Consumer Properties: Passed-Through to Kafka Consumer. --> <init-arg name="bootstrap.servers" value="127.0.0.1:9092"/> <init-arg name="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> <init-arg name="value.deserializer" value="com.mycompany.MyCustomDeserializer"/> <init-arg name="group.id" value="my_group_id"/> <!-- EsperIO Kafka Input Properties: Define subscription, topics, processor and timestamp extractor. --> <init-arg name="esperio.kafka.input.subscriber" value="com.espertech.esperio.kafka.EsperIOKafkaInputSubscriberByTopicList"/> <init-arg name="esperio.kafka.topics" value="my_topic"/> <init-arg name="esperio.kafka.input.processor" value="com.espertech.esperio.kafka.EsperIOKafkaInputProcessorDefault"/> <init-arg name="esperio.kafka.input.timestampextractor" value="com.espertech.esperio.kafka.EsperIOKafkaInputTimestampExtractorConsumerRecord"/> </plugin-loader> </esper-configuration>
Alternatively the equivalent API calls to configure the adapter are:
Properties props = new Properties(); // Kafka Consumer Properties props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, com.mycompany.MyCustomDeserializer.class.getName()); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group_id"); // EsperIO Kafka Input Adapter Properties props.put(EsperIOKafkaConfig.INPUT_SUBSCRIBER_CONFIG, EsperIOKafkaInputSubscriberByTopicList.class.getName()); props.put(EsperIOKafkaConfig.TOPICS_CONFIG, "my_topic"); props.put(EsperIOKafkaConfig.INPUT_PROCESSOR_CONFIG, EsperIOKafkaInputProcessorDefault.class.getName()); props.put(EsperIOKafkaConfig.INPUT_TIMESTAMPEXTRACTOR_CONFIG, EsperIOKafkaInputTimestampExtractorConsumerRecord.class.getName()); Configuration config = new Configuration(); config.addPluginLoader("KafkaInput", EsperIOKafkaInputAdapterPlugin.class.getName(), props, null);
By adding the plug-in loader to the configuration as above the engine automatically starts the adapter as part of engine initialization.
Alternatively, the adapter can be started and stopped programatically as follows:
// start adapter EsperIOKafkaInputAdapter adapter = new EsperIOKafkaInputAdapter(props, "default"); adapter.start(); // destroy the adapter when done adapter.destroy();
All properties are passed to the Kafka consumer. This allows your application to add additional properties that are not listed here and according to Kafka consumer documentation.
Required properties are below. ConsumerConfig
is part of the Kafka API in org.apache.kafka.clients.consumer.ConsumerConfig
.
Table 5.1. Kafka Consumer Required Properties
Name | API Name | Description |
---|---|---|
bootstrap.servers | ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG | Kafka bootstrap server list. |
key.deserializer | ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG | Fully-qualified class name of Kafka message key de-serializer. |
value.deserializer | ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG | Fully-qualified class name of Kafka message value de-serializer. |
group.id | ConsumerConfig.GROUP_ID_CONFIG | Application consumer group id. |
The input adapter operation depends on the subscriber and processor.
The subscriber is responsible for calling Kafka consumer subscribe methods, i.e. calls Kafka API consumer.subscribe(...)
.
The processor is responsible for processing Kafka API ConsumerRecords
messages, i.e. implements process(ConsumerRecords records)
.
Properties that define the subscriber and consumer are below.
EsperIOKafka
is part of the EsperIO Kafka API in com.espertech.esperio.kafka.EsperIOKafkaConfig
.
Table 5.2. Kafka Input Adapter Properties
Name | API Name | Description |
---|---|---|
esperio.kafka.input.subscriber | EsperIOKafkaConfig.INPUT_SUBSCRIBER_CONFIG |
Required property. Fully-qualified class name of subscriber that subscribes to topics and partitions.
The class must implement the interface
You may use |
esperio.kafka.topics | EsperIOKafkaConfig.TOPICS_CONFIG |
Optional property and only required if the subscriber is Specifies a comma-separated list of topic names to subscribe to. |
esperio.kafka.input.processor | EsperIOKafkaConfig.INPUT_PROCESSOR_CONFIG |
Required property. Fully-qualified class name of the Kafka consumer records processor that sends events into the engine and may advance engine time.
The class must implement the interface
You may use |
esperio.kafka.input.timestampextractor | EsperIOKafkaConfig.INPUT_TIMESTAMPEXTRACTOR_CONFIG |
Optional property. Fully-qualified class name of the Kafka message timestamp extractor that extracts a long-typed timestamp from a consumer record, for use as time.
The class must implement the interface
You may use |
The subcriber is responsible for calling consumer.subscribe(...)
.
The adapter provides a default implementation by name EsperIOKafkaInputSubscriberByTopicList
.
Your application may provide its own subscriber by implementing the simple EsperIOKafkaInputSubscriber
interface.
This default implementation takes the value of esperio.kafka.topics
and subscribes to each topic.
For reference, we provide the code of the default subscriber below (repository or source jar for full code):
public class EsperIOKafkaInputSubscriberByTopicList implements EsperIOKafkaInputSubscriber { public void subscribe(EsperIOKafkaInputSubscriberContext context) { String topicsCSV = EsperIOKafkaInputAdapter.getRequiredProperty(context.getProperties(), EsperIOKafkaConfig.TOPICS_CONFIG); String[] topicNames = topicsCSV.split(","); List<String> topics = new ArrayList<>(); for (String topicName : topicNames) { if (topicName.trim().length() > 0) { topics.add(topicName.trim()); } } context.getConsumer().subscribe(topics); } }
The processor is responsible for processing Kafka API ConsumerRecords
.
The adapter provides a default implementation by name EsperIOKafkaInputProcessorDefault
.
Your application may provide its own processor by implementing the simple EsperIOKafkaInputProcessor
interface.
This default processor can be configured with an optional timestamp extractor that obtains a timestamp for each consumer record. If no timestamp extractor is configured, the default processor does not advance time.
For reference, we provide the (slightly simplified) code of the default processor below (repository or source jar for full code):
public class EsperIOKafkaInputProcessorDefault implements EsperIOKafkaInputProcessor { private EPServiceProvider engine; private EsperIOKafkaInputTimestampExtractor timestampExtractor; public void init(EsperIOKafkaInputProcessorContext context) { this.engine = context.getEngine(); String timestampExtractorClassName = context.getProperties().getProperty(EsperIOKafkaConfig.INPUT_TIMESTAMPEXTRACTOR_CONFIG); if (timestampExtractorClassName != null) { timestampExtractor = (EsperIOKafkaInputTimestampExtractor) JavaClassHelper.instantiate(EsperIOKafkaInputTimestampExtractor.class, timestampExtractorClassName); } } public void process(ConsumerRecords<Object, Object> records) { for (ConsumerRecord record : records) { if (timestampExtractor != null) { long timestamp = timestampExtractor.extract(record); // advances engine time engine.getEPRuntime().sendEvent(new CurrentTimeSpanEvent(timestamp)); } if (record.value() != null) { engine.getEPRuntime().sendEvent(record.value()); } } } public void close() {} }
The default processor takes the message value and sends it as an event into the engine. The default processor takes the extracted time, if a timestamp extractor is provided, and sends a time span event to the engine to advance engine time.
You must provide your own processor if any additional event transformation is required or if using epRuntime.send(Map/ObjectArray/Node)
or if the default behavior does not fit for other reasons.
You may configure and start the EsperIO Kafka output adapter either as part of your Esper configuration file in the plugin loader section or via the adapter API.
The following example shows an Esper configuration file with all properties:
<?xml version="1.0" encoding="UTF-8"?> <esper-configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.espertech.com/schema/esper" xsi:noNamespaceSchemaLocation="../../esper/etc/esper-configuration-6-0.xsd"> <plugin-loader name="KafkaOutput" class-name="com.espertech.esperio.kafka.EsperIOKafkaOutputAdapterPlugin"> <!-- Kafka Producer Properties: Passed-Through to Kafka Consumer. --> <init-arg name="bootstrap.servers" value="127.0.0.1:9092"/> <init-arg name="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/> <init-arg name="value.serializer" value="com.mycompany.MyCustomSerializer"/> <!-- EsperIO Kafka Output Properties: Define a flow controller. --> <init-arg name="esperio.kafka.output.flowcontroller" value="com.espertech.esperio.kafka.EsperIOKafkaOutputFlowControllerByAnnotatedStmt"/> <init-arg name="esperio.kafka.topics" value="my_topic"/> </plugin-loader> </esper-configuration>
Alternatively the equivalent API calls to configure the adapter are:
Properties props = new Properties(); // Kafka Producer Properties props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName()); // EsperIO Kafka Output Adapter Properties props.put(EsperIOKafkaConfig.OUTPUT_FLOWCONTROLLER_CONFIG, EsperIOKafkaOutputFlowControllerByAnnotatedStmt.class.getName()); props.put(EsperIOKafkaConfig.TOPICS_CONFIG, "my_topic"); Configuration config = new Configuration(); config.addPluginLoader("KafkaOutput", EsperIOKafkaOutputAdapterPlugin.class.getName(), props, null);
By adding the plug-in loader to the configuration as above the engine automatically starts the adapter as part of engine initialization.
Alternatively, the adapter can be started and stopped programatically as follows:
// start adapter EsperIOKafkaOutputAdapter adapter = new EsperIOKafkaOutputAdapter(props, "default"); adapter.start(); // destroy the adapter when done adapter.destroy();
All properties are passed to the Kafka producer. This allows your application to add additional properties that are not listed here and according to Kafka producer documentation.
Required properties are below. ProducerConfig
is part of the Kafka API in org.apache.kafka.clients.producer.ProducerConfig
.
Table 5.3. Kafka Producer Required Properties
Name | API Name | Description |
---|---|---|
bootstrap.servers | ProducerConfig.BOOTSTRAP_SERVERS_CONFIG | Kafka bootstrap server list. |
key.serializer | ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG | Fully-qualified class name of Kafka message key serializer. |
value.serializer | ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG | Fully-qualified class name of Kafka message value serializer. |
The output adapter operation depends on the flow controller, which is reponsible for attaching listeners to statements that send messages to Kafka topics.
Properties that define the flow controller are below.
EsperIOKafka
is part of the EsperIO Kafka API in com.espertech.esperio.kafka.EsperIOKafkaConfig
.
Table 5.4. Kafka Output Adapter Properties
Name | API Name | Description |
---|---|---|
esperio.kafka.output.flowcontroller | EsperIOKafkaConfig.OUTPUT_FLOWCONTROLLER_CONFIG |
Required property. Fully-qualified class name of flow controller that produces messages.
The class must implement the interface
You may use |
esperio.kafka.topics | EsperIOKafkaConfig.TOPICS_CONFIG |
Specifies a comma-separated list of topic names to produce to, for use with the above flow controller and not required otherwise. |
The flow controller is responsible for allocating a KafkaProducer
and associating statement listeners to the producer, for listeners to send messages to Kafka topics.
The adapter provides a default implementation by name EsperIOKafkaOutputFlowControllerByAnnotatedStmt
.
Your application may provide its own subscriber by implementing the simple EsperIOKafkaOutputFlowControllerContext
interface.
The flow controller takes the value of esperio.kafka.topics
and produces a message to each topic for each statement listener output event.
The flow controller attaches a listener to all statements that have the @KafkaOutputDefault
annotation.
Please ensure that the annotation is part of your imports. The adapter considers all newly-created statements that have the annotation.
Thus please create the EPL as follows:
@KafkaOutputDefault select * from ......
The flow controller produces JSON output. It uses the engine JSON renderer that can be obtained from epService.getEPRuntime().getEventRenderer().getJSONRenderer(statement.getEventType());
.
The statement listeners that the flow controller attaches do not provide a key or partition id to the producer.
The listeners simply invoke new ProducerRecord(topic, json)
for output event and each topic.
The value serializer must be the string serializer.
For reference, please find the source code of the flow controller in the repository.