esper.codehaus.org and espertech.comDocumentation
Data flows in Esper EPL have the following purposes:
Support for data flow programming and flow-based programming.
Declarative and runtime manageable integration of Esper input and output adapters that may be provided by EsperIO or by an application.
Remove the need to use an event bus achieving dataflow-only visibility of events and event types for performance gains.
Data flow operators communicate via streams of either underlying event objects or wrapped events. Underlying event objects are POJO, Map, Object-array or DOM/XML. Wrapped events are represented by EventBean
instances that associate type information to underlying event objects.
For more information on data flow programming or flow-based programming please consult the Wikipedia FBP Article.
Esper offers a number of useful built-in operators that can be combined in a graph to program a data flow. In addition EsperIO offers prebuilt operators that act as sources or sinks of events. An application can easily create and use its own data flow operators.
Using data flows an application can provide events to the data flow operators directly without using an engine's event bus. Not using an event bus (as represented by EPRuntime.sendEvent
) can achieve performance gains as the engine does not need to match events to statements and the engine does not need to wrap underlying event objects in EventBean
instances.
Data flows also allow for finer-grained control over threading, synchronous and asynchronous operation.
Data flows are new in release 4.6 and may be subject to evolutionary change.
Your application declares a data flow using create dataflow
dataflow-name. Declaring the data flow causes the EPL compiler to validate the syntax and some aspects of the data flow graph of operators. Declaring the data flow does not actually instantiate or execute a data flow. Resolving event types and instantiating operators (as required) takes place at time of data flow instantiation.
After your application has declared a data flow, it can instantiate the data flow and execute it. A data flow can be instantiated as many times as needed and each data flow instance can only be executed once.
The example EPL below creates a data flow that, upon execution, outputs the text Hello World
to console and then ends.
create dataflow HelloWorldDataFlow BeaconSource -> helloworld.stream { text: 'hello world' , iterations: 1} LogSink(helloworld.stream) {}
The sample data flow above declares a BeaconSource
operator parameterized by the "hello world" text and 1 iteration. The ->
keyword reads as produces streams. The BeaconSource
operator produces a single stream named helloworld.stream
. The LogSink
operator receives this stream and prints it unformatted.
The next program code snippet declares the data flow to the engine:
String epl = "create dataflow HelloWorldDataFlow\n" + "BeaconSource -> helloworldStream { text: 'hello world' , iterations: 1}\n" + "LogSink(helloworldStream) {}"; epService.getEPAdministrator().createEPL(epl);
After declaring a data flow to an engine, your application can then instantiate and execute the data flow.
The following program code snippet instantiates the data flow:
EPDataFlowInstance instance = epService.getEPRuntime().getDataFlowRuntime().instantiate("HelloWorldDataFlow");
A data flow instance is represented by an EPDataFlowInstance
object.
The next code snippet executes the data flow instance:
instance.run();
By using the run
method of EPDataFlowInstance
the engine executes the data flow using the same thread (blocking execute) and returns when the data flow completes. A data flow completes when all operators receive final markers.
The hello world data flow simply prints an unformatted Hello World
string to console. Please check the built-in operator reference for BeaconSource
and LogSink
for more options.
The synopsis for declaring a data flow is:
create dataflow name [schema_declarations] [operator_declarations]
After create dataflow
follows the data flow name and a mixed list of event type (schema) declarations and operator declarations.
Schema declarations define an event type. Specify any number of create schema
clauses as part of the data flow declaration followed by a comma character to end each schema declaration. The syntax for create schema
is described in Section 5.16, “Declaring an Event Type: Create Schema”.
All event types that are defined as part of a data flow are private to the data flow and not available to other EPL statements. To define event types that are available across data flows and other EPL statements, use a create schema
EPL statement, runtime or static configuration.
Annotations as well as expression declarations and scripts can also be pre-pended to the data flow declaration.
For each operator, declare the operator name, input streams, output streams and operator parameters.
The syntax for declaring a data flow operator is:
operator_name [(input_streams)] [-> output_streams] { [parameter_name : parameter_value_expr] [, ...] }
The operator name is an identifier that identifies an operator.
If the operator accepts input streams then those may be listed in parenthesis after the operator name, see Section 13.2.2.2, “Declaring Input Streams”.
If the operator can produce output streams then specify ->
followed by a list of output stream names and types. See Section 13.2.2.3, “Declaring Output Streams”.
Following the input and output stream declaration provide curly brackets ({}
) containing operator parameters. See Section 13.2.2.4, “Declaring Operator Parameters”.
An operator that receives no input streams, produces no output streams and has no parameters assigned to it is shown in this EPL example data flow:
create dataflow MyDataFlow MyOperatorSimple {}
The next EPL shows a data flow that consists of an operator MyOperator
that receives a single input stream myInStream
and produces a single output stream myOutStream
holding MyEvent
events.
The EPL configures the operator parameter myParameter
with a value of 10:
create dataflow MyDataFlow create schema MyEvent as (id string, price double), MyOperator(myInStream) -> myOutStream<MyEvent> { myParameter : 10 }
The next sections outline input stream, output stream and parameter assignment in greater detail.
In case the operator receives input streams, list the input stream names within parenthesis following the operator name. As part of the input stream declaration you may use the as
keyword to assign an alias short name to one or multiple input streams.
The EPL shown next declares myInStream
and assigns the alias mis
:
create dataflow MyDataFlow MyOperator(myInStream as mis) {}
Multiple input streams can be listed separated by comma. We use the term input port to mean the ordinal number of the input stream in the order the input streams are listed.
The EPL as below declares two input streams and assigns an alias to each. The engine assigns streamOne
to input port 0 (zero) and streamTwo
to port 1.
create dataflow MyDataFlow MyOperator(streamOne as one, streamTwo as two) {}
You may assign multiple input streams to the same port and alias by placing the stream names into parenthesis. All input streams for the same port must have the same event type associated.
The next EPL statement declares an operator that receives input streams streamA
and streamB
both assigned to port 0 (zero) and alias streamsAB
:
create dataflow MyDataFlow MyOperator( (streamA, streamB) as streamsAB) {}
Input and output stream names can have the dot-character in their name.
The following is also valid EPL:
create dataflow MyDataFlow MyOperator(my.in.stream) -> my.out.stream {}
Reserved keywords may not appear in the stream name.
In case the operator produces output streams, list the output streams after the ->
keyword. Multiple output streams can be listed separated by comma. We use the term output port to mean the ordinal number of the output stream in the order the output streams are listed.
The sample EPL below declares an operator that produces two output streams my.out.one
and my.out.two
.
create dataflow MyDataFlow MyOperator -> my.out.one, my.out.two {}
Each output stream can be assigned optional type information within less/greater-then (<>
). Type information is required if the operator cannot deduce the output type from the input type and the operator does not declare explicit output type(s). The event type name can
either be an event type defined within the same data flow or an event type defined in the engine.
This EPL example declares an RFIDSchema
event type based on an object-array event representation and associates the output stream rfid.stream
with the RFIDSchema
type.
The stream rfid.stream
therefore carries object-array (Object[]
) typed objects according to schema RFIDSchema
:
create dataflow MyDataFlow create objectarray schema RFIDSchema (tagId string, locX double, locY double), MyOperator -> rfid.stream<RFIDSchema> {}
The keyword eventbean
is reserved: Use eventbean<
type-name>
to indicate that a stream carries EventBean
instances of the given type instead of the underlying event object.
This EPL example declares an RFIDSchema
event type based on an object-array event representation and associates the output stream rfid.stream
with the event type,
such that the stream rfid.stream
carries EventBean
objects:
create dataflow MyDataFlow create objectarray schema RFIDSchema (tagId string, locX double, locy double), MyOperator -> rfid.stream<eventbean<RFIDSchema>> {}
Use questionmark (?
) to indicate that the type of events is not known in advance.
In the next EPL the stream my.stream
carries EventBean
instances of any type:
create dataflow MyDataFlow MyOperator -> my.stream<eventbean<?>> {}
Operators can receive constants, objects, EPL expressions and complete EPL statements as parameters. All parameters are listed within curly brackets ({}
) after input and output stream declarations. Curly brackets are required as a separator even if
the operator has no parameters.
The syntax for parameters is:
name : value_expr [,...]
The parameter name is an identifier that is followed by the colon (:
) or equals (=
) character and a value expression. A value expression can be any expression, system property, JSON notation object or EPL statement. Parameters are separated by comma character.
The next EPL demonstrates operator parameters that are scalar values:
create dataflow MyDataFlow MyOperator { stringParam : 'sample', secondString : "double-quotes are fine", intParam : 10 }
Operator parameters can be any EPL expression including expressions that use variables. Subqueries, aggregations and the prev
and prior
functions cannot be applied here.
The EPL shown below lists operator parameters that are expressions:
create dataflow MyDataFlow MyOperator { intParam : 24*60*60, threshold : var_threshold // a variable defined in the engine }
To obtain the value of a system property, the special systemProperties
property name is reserved for access to system properties.
The following EPL sets operator parameters to a value obtained from a system property:
create dataflow MyDataFlow MyOperator { someSystemProperty : systemProperties('mySystemProperty') }
Any JSON value can also be used as a value. Use square brackets []
for JSON arrays. Use curly brackets {}
to hold nested Map or other object values. Provide the special class
property to
instantiate a given instance by class name. The engine populates the respective array, Map or Object as specified in the JSON parameter value.
The below EPL demonstrates operator parameters that are JSON values:
create dataflow MyDataFlow MyOperator { myStringArray: ['a', "b"], myMapOrObject: { a : 10, b : 'xyz', }, myInstance: { class: 'com.myorg.myapp.MyImplementation', myValue : 'sample' } }
The special parameter name select
is reserved for use with EPL select statements. Please see the Select
built-in operator for an example.
The below table summarizes the built-in data flow operators (Esper only) available:
Table 13.1. Esper Built-in Operators
Operator | Description |
---|---|
BeaconSource |
Utility source that generates events. See Section 13.3.1, “BeaconSource”. |
Emitter |
Special operator for injecting events into a stream. See Section 13.4.5, “Start Captive”. |
EPStatementSource |
One or more EPL statements act as event sources. See Section 13.3.2, “EPStatementSource”. |
EventBusSink |
The event bus is the sink: Sends events from the data flow into the event bus. See Section 13.3.3, “EventBusSink”. |
EventBusSource |
The event bus is the source: Receives events from the event bus into the data flow. See Section 13.3.4, “EventBusSource”. |
Filter |
Filters an input stream and produces an output stream containing the events passing the filter criteria. See Section 13.3.5, “Filter”. |
LogSink |
Utility sink that outputs events to console or log. See Section 13.3.6, “LogSink”. |
Select |
An EPL select statement that executes on the input stream events. See Section 13.3.7, “Select”. |
The below table summarizes the built-in EsperIO data flow operators. Please see the EsperIO documentation and source for more information.
Table 13.2. EsperIO Built-in Operators
Operator | Description |
---|---|
AMQPSource |
Attaches to AMQP broker to receive messages to process. |
AMQPSink |
Attaches to AMQP broker to send messages. |
FileSource |
Reads one or more files and produces events from file data. |
FileSink |
Write one or more files from events received. |
The BeaconSource operator generates events and populates event properties.
The BeaconSource operator does not accept any input streams and has no input ports.
The BeaconSource operator must have a single output stream. When the BeaconSource operator completed generating events according to the number of iterations provided or when it is cancelled it outputs a final marker to the output stream.
Parameters for the BeaconSource operator are all optional parameters:
Table 13.3. BeaconSource Parameters
Name | Description |
---|---|
initialDelay | Specifies the number of seconds delay before producing events. |
interval | Time interval between events. Takes a integer or double-typed value for the number of seconds. The interval is zero when not provided. |
iterations | Number of events produced. Takes an integer value. When not provided the operator produces tuples until the data flow instance gets cancelled. |
Event properties to be populated can simply be added to the parameters.
If your declaration provides an event type for the output stream then BeaconSource will populate event properties of the underlying events. If no event type is specified, BeaconSource creates an anonymous object-array event type to carry the event properties that are generated and associates this type with its output stream.
Examples are:
create dataflow MyDataFlow create schema SampleSchema(tagId string, locX double), // sample type // BeaconSource that produces empty object-array events without delay // or interval until cancelled. BeaconSource -> stream.one {} // BeaconSource that produces one RFIDSchema event populating event properties // from a user-defined function "generateTagId" and the provided values. BeaconSource -> stream.two<SampleSchema> { iterations : 1, tagId : generateTagId(), locX : 10 } // BeaconSource that produces 10 object-array events populating // the price property with a random value. BeaconSource -> stream.three { iterations : 1, interval : 10, // every 10 seconds initialDelay : 5, // start after 5 seconds price : Math.random() * 100 }
The EPStatementSource operator maintains a subscription to the results of one or more EPL statements. The operator produces the statement output events.
The EPStatementSource operator does not accept any input streams and has no input ports.
The EPStatementSource operator must have a single output stream. It does not generate a final or other marker.
Either the statement name or the statement filter parameter is required:
Table 13.4. EPStatementSource Parameters
Name | Description |
---|---|
collector | Optional parameter, used to transform statement output events to submitted events. |
statementName | Name of the statement that produces events. The statement does not need to exist at the time of data flow instantiation. |
statementFilter | Implementation of the EPDataFlowEPStatementFilter that returns true for each statement that produces events. Statements do not need to exist at the time of data flow instantiation. |
If a statement name is provided, the operator subscribes to output events of the statement if the statement exists or when it gets created at a later point in time.
If a statement filter is provided instead, the operator subscribes to output events of all statements that currently exist and pass the filter pass
method or that get created at a later point in time and pass the filter pass
method.
The collector
can be specified to transform output events. If no collector is specified the operator submits the underlying events of the insert stream received from the statement. The collector object must implement the interface EPDataFlowIRStreamCollector
.
Examples are:
create dataflow MyDataFlow create schema SampleSchema(tagId string, locX double), // sample type // Consider only the statement named MySelectStatement when it exists. // No transformation. EPStatementSource -> stream.one<eventbean<?>> { statementName : 'MySelectStatement' } // Consider all statements that match the filter object provided. // No transformation. EPStatementSource -> stream.two<eventbean<?>> { statementFilter : { class : 'com.mycompany.filters.MyStatementFilter' } } // Consider all statements that match the filter object provided. // With collector that performs transformation. EPStatementSource -> stream.two<SampleSchema> { collector : { class : 'com.mycompany.filters.MyCollector' }, statementFilter : { class : 'com.mycompany.filters.MyStatementFilter' } }
The EventBusSink operator send events received from a data flow into the event bus. Any statement that looks for any of the events gets triggered, equivalent to EPRuntime.sendEvent
or the insert into
clause.
The EventBusSink operator accepts any number of input streams. The operator forwards all events arriving on any input ports to the event bus, equivalent to EPRuntime.sendEvent
.
The EventBusSink operator cannot declare any output streams.
Parameters for the EventBusSink operator are all optional parameters:
Table 13.5. EventBusSink Parameters
Name | Description |
---|---|
collector | Optional parameter, used to transform data flow events to event bus events. |
The collector
can be specified to transform data flow events to event bus events. If no collector is specified the operator submits the events directly to the event bus. The collector object must implement the interface EPDataFlowEventCollector
.
Examples are:
create dataflow MyDataFlow BeaconSource -> instream<SampleSchema> {} // produces a sample stream // Send SampleSchema events produced by beacon to the event bus. EventBusSink(instream) {} // Send SampleSchema events produced by beacon to the event bus. // With collector that performs transformation. EventBusSink(instream) { collector : { class : 'com.mycompany.filters.MyCollector' } }
The EventBusSource operator receives events from the event bus and produces an output stream of the events received. With the term event bus we mean any event visible to the engine either because the application send the event via EPRuntime.sendEvent
or because statements populated streams as a result of insert into
.
The EventBusSource operator does not accept any input streams and has no input ports.
The EventBusSource operator must have a single output stream. It does not generate a final or other marker. The event type declared for the output stream is the event type of events received from the event bus.
All parameters to EventBusSource are optional:
Table 13.6. EventBusSource Parameters
Name | Description |
---|---|
collector | Optional parameter and used to transform event bus events to submitted events. |
filter | Filter expression for event bus matching. |
The collector
can be specified to transform output events. If no collector is specified the operator submits the underlying events of the stream received from the event bus. The collector object must implement the interface EPDataFlowEventBeanCollector
.
The filter
is an expression that the event bus compiles and efficiently matches even in the presence of a large number of event bus sources. The filter expression must return a boolean-typed value, returning true for those events that the event bus passes to the operator.
Examples are:
create dataflow MyDataFlow // Receive all SampleSchema events from the event bus. // No transformation. EventBusSource -> stream.one<SampleSchema> {} // Receive all SampleSchema events with tag id '001' from the event bus. // No transformation. EventBusSource -> stream.one<SampleSchema> { filter : tagId = '001' } // Receive all SampleSchema events from the event bus. // With collector that performs transformation. EventBusSource -> stream.two<SampleSchema> { collector : { class : 'com.mycompany.filters.MyCollector' }, }
The Filter operator filters an input stream and produces an output stream containing the events passing the filter criteria. If a second output stream is provided, the operator sends events not passing filter criteria to that output stream.
The Filter operator accepts a single input stream.
The Filter operator requires one or two output streams. The event type of the input and output stream(s) must be the same. The first output stream receives the matching events according to the filter expression. If declaring two output streams, the second stream receives non-matching events.
The Filter operator has a single required parameter:
Examples are:
create dataflow MyDataFlow create schema SampleSchema(tagId string, locX double), // sample type BeaconSource -> samplestream<SampleSchema> {} // sample source // Filter all events that have a tag id of '001' Filter(samplestream) -> tags_001 { filter : tagId = '001' } // Filter all events that have a tag id of '001', // putting all other events into the second stream Filter(samplestream) -> tags_001, tags_other { filter : tagId = '001' }
The LogSink operator outputs events to console or log file in either a JSON, XML or built-in format (the default).
The LogSink operator accepts any number of input streams. All events arriving on any input ports are logged.
The LogSink operator cannot declare any output streams.
Parameters for the LogSink operator are all optional parameters:
Table 13.8. LogSink Parameters
Name | Description |
---|---|
format | Specify format as a string value: json for JSON-formatted output, xml for XML-formatted output and summary (default) for a built-in format. |
layout | Pattern string according to which output is formatted. Place %df for data flow name, %p for port number, %i for data flow instance id, %t for title, %e for event data. |
log | Boolean true (default) for log output, false for console output. |
linefeed | Boolean true (default) for line feed, false for no line feed. |
title | String title text pre-pended to output. |
Examples are:
create dataflow MyDataFlow BeaconSource -> instream {} // produces sample stream to use below // Output textual event to log using defaults. LogSink(instream) {} // Output JSON-formatted to console. LogSink(instream) { format : 'json', layout : '%t [%e]', log : false, linefeed : true, title : 'My Custom Title:' }
The Select operator is configured with an EPL select statement. It applies events from input streams to the select statement and outputs results either continuously or when the final marker arrives.
The Select operator accepts one or more input streams.
The Select operator requires a single output stream.
The Select operator requires the select
parameter, all other parameters are optional:
Table 13.9. Select Operator Parameters
Name | Description |
---|---|
iterate | Boolean indicator whether results should be output continuously or only upon arrival of the final marker. |
select | EPL select statement in parenthesis. |
Set the optional iterate
flag to false (the default) to have the operator output results continuously. Set the iterate
flag to true to indicate that the operator outputs results only when the final marker arrives. If iterate
is true then output rate limiting clauses are not supported.
The select
parameter is required and provides an EPL select statement within parenthesis.
For each input port the statement should list the input stream name or the alias name in the from
clause. Only filter-based streams are allowed in the from
clause and patterns or named windows are not supported. Also not allowed are
the insert into
clause, the irstream
keyword and subselects.
The Select operator determines the event type of output events based on the select
clause. It is not necessary to declare an event type for the output stream.
Examples are:
create dataflow MyDataFlow create schema SampleSchema(tagId string, locX double), // sample type BeaconSource -> instream<SampleSchema> {} // sample stream BeaconSource -> secondstream<SampleSchema> {} // sample stream // Simple continuous count of events Select(instream) -> outstream { select: (select count(*) from instream) } // Demonstrate use of alias Select(instream as myalias) -> outstream { select: (select count(*) from myalias) } // Output only when the final marker arrives Select(instream as myalias) -> outstream { select: (select count(*) from myalias), iterate: true } // Same input port for the two sample streams Select( (instream, secondstream) as myalias) -> outstream { select: (select count(*) from myalias) } // A join with multiple input streams, // joining the last event per stream forming pairs Select(instream, secondstream) -> outstream { select: (select a.tagId, b.tagId from instream.std:lastevent() as a, secondstream.std:lastevent() as b) } // A join with multiple input streams and using aliases. Select(instream as S1, secondstream as S2) -> outstream { select: (select a.tagId, b.tagId from S1.std:lastevent() as a, S2.std:lastevent() as b) }
This section outlines the steps to declare, instantiate, execute and cancel or complete data flows.
Use the createEPL
and related create
methods on EPAdministrator
to declare a data flow or the deployment admin API. The EPStatementObjectModel
statement object model can also be used to declare a data flow.
Annotations that are listed at the top of the EPL text are applied to all EPL statements and operators in the data flow. Annotations listed for a specific operator apply to that operator only.
The next program code snippet declares a data flow to the engine:
String epl = "@Name('MyStatementName') create dataflow HelloWorldDataFlow\n" + "BeaconSource -> helloworldStream { text: 'hello world' , iterations: 1}\n" + "LogSink(helloworldStream) {}"; EPStatement stmt = epService.getEPAdministrator().createEPL(epl);
The statement name that can be assigned to the statement is used only for statement management. Your application may stop and/or destroy the statement declaring the data flow thereby making the data flow unavailable for instantiation. Existing instances of the data flow are not affected by a stop or destroy of the statement that declares the data flow (example: stmt.destroy()
).
Listeners or the subscriber to the statement declaring a data flow receive no events or other output. The statement declaring a data flow returns no rows when iterated.
The com.espertech.esper.client.dataflow.EPDataFlowRuntime
available via getDataFlowRuntime
on EPRuntime
manages declared data flows.
Use the instantiate
method on EPDataFlowRuntime
to instantiate a data flow after it has been declared. Pass the data flow name and optional instantiation options to the method. A data flow can be instantiated any number of times.
A data flow instance is represented by an instance of EPDataFlowInstance
. Each instance has a state as well as methods to start, run, join and cancel as well as methods to obtain execution statistics.
Various optional arguments including operator parameters can be passed to instantiate
via the EPDataFlowInstantiationOptions
object as explained in more detail below.
The following code snippet instantiates the data flow:
EPDataFlowInstance instance = epService.getEPRuntime().getDataFlowRuntime().instantiate("HelloWorldDataFlow");
The engine does not track or otherwise retain data flow instances in memory. It is up to your application to retain data flow instances as needed.
Each data flow instance associates to a state. The start state is EPDataFlowState.INSTANTIATED
. The end state is either COMPLETED
or CANCELLED
.
The following table outlines all states:
Table 13.10. Data Flow Instance States
State | Description |
---|---|
INSTANTIATED |
Start state, applies when a data flow instance has been instantiated and has not executed. |
RUNNING |
A data flow instance transitions from instantiated to running when any of the |
COMPLETED |
A data flow instance transitions from running to completed when all final markers have been processed by all operators. |
CANCELLED |
A data flow instance transitions from running to cancelled when your application invokes the |
After your application instantiated a data flow instance it can execute the data flow instance using either the start
, run
or startCaptive
methods.
Use the start
method to have the engine allocate a thread for each source operator. Execution is non-blocking. Use the join
method to have one or more threads join a data flow instance execution.
Use the run
method to have the engine use the current thread to execute the single source operator. Multiple source operators are not allowed when using run
.
Use the startCaptive
method to have the engine return all Runnable
instances and emitters, for the purpose of having complete control over execution. The engine allocates no threads and does not perform any logic for the data flow unless your application employs the Runnable
instances and emitters returned by the method.
The next code snippet executes the data flow instance as a blocking call:
instance.run();
By using the run
method of EPDataFlowInstance
the engine executes the data flow instance using the same thread (blocking execute) and returns when the data flow instance completes. A data flow instance completes when all operators receive final markers.
The hello world data flow simply prints an unformatted Hello World
string to console. The BeaconSource
operator generates a final marker when it finishes the 1 iteration. The data flow instance thus transitions to complete after the LogSink
operator receives the final marker, and the thread invoking the run
method returns.
The next code snippet executes the data flow instance as a non-blocking call:
instance.start();
Use the cancel
method to cancel execution of a running data flow instance:
instance.cancel();
Use the join
method to join execution of a running data flow instance, causing the joining thread to block until the data flow instance either completes or is cancelled:
instance.join();
The EPDataFlowInstantiationOptions
object that can be passed to the instantiate
method may be used to customize the operator graph, operator parameters and execution of the data flow instance.
Passing runtime parameters to data flow operators is easiest using the addParameterURI
method. The first parameter is the data flow operator name and the
operator parameter name separated by the slash character. The second parameter is the value object.
For example, in order to pass the file name to the FileSource
operator at runtime, use the following code:
EPDataFlowInstantiationOptions options = new EPDataFlowInstantiationOptions(); options.addParameterURI("FileSource/file", filename); EPDataFlowInstance instance = epService.getEPRuntime().getDataFlowRuntime() .instantiate("MyFileReaderDataFlow",options); instance.run();
The optional operatorProvider
member takes an implementation of the EPDataFlowOperatorProvider
interface. The engine invokes this provider to obtain operator instances.
The optional parameterProvider
member takes an implementation of the EPDataFlowOperatorParameterProvider
interface. The engine invokes this provider to obtain operator parameter values. The values override the values provided via parameter URI above.
The optional exceptionHandler
member takes an implementation of the EPDataFlowExceptionHandler
interface. The engine invokes this provider to when exceptions occur.
The optional dataFlowInstanceId
can be assigned any string value for the purpose of identifying the data flow instance.
The optional dataFlowInstanceUserObject
can be assigned any object value for the purpose of associating a user object to the data flow instance.
Set the operatorStatistics
flag to true to obtain statistics for operator execution.
Set the cpuStatistics
flag to true to obtain CPU statistics for operator execution.
Use the startCaptive
method on a EPDataFlowInstance
data flow instance when your application requires full control over threading. This method returns an EPDataFlowInstanceCaptive
instance
that contains a list of java.lang.Runnable
instances that represent each source operator.
The special Emitter
operator can occur in a data flow. This emitter can be used to inject events into the data flow without writing a new operator. Emitter takes a single name
parameter that provides the name of the emitter
and that is returned in a map of emitters by EPDataFlowInstanceCaptive
.
The example EPL below creates a data flow that uses emitter.
create dataflow HelloWorldDataFlow create objectarray schema SampleSchema(text string), // sample type Emitter -> helloworld.stream<SampleSchema> { name: 'myemitter' } LogSink(helloworld.stream) {}
Your application may obtain the Emitter instance and sends events directly into the output stream. This feature is only supported in relationship with startCaptive
since the engine does not allocate any threads or run source operators.
The example code snippet below obtains the emitter instance and send events directly into the data flow instance:
EPDataFlowInstance instance = epService.getEPRuntime().getDataFlowRuntime().instantiate("HelloWorldDataFlow", options); EPDataFlowInstanceCaptive captiveStart = instance.startCaptive(); Emitter emitter = captiveStart.getEmitters().get("myemitter"); emitter.submit(new Object[] {"this is some text"});
When emitting DOM XML events please emit the root element obtained from document.getDocumentElement()
.
When your application executes a data flow instance by means of the start
(non-blocking) or run
(blocking) methods, the data flow instance stays running until either completed or cancelled. While cancellation is always via the cancel
method,
completion occurs when all source operators provide final markers.
The final marker is an object that implements the EPDataFlowSignalFinalMarker
interface. Some operators may also provide or process data window markers which implement the EPDataFlowSignalWindowMarker
interface. All such signals implement the EPDataFlowSignal
interface.
Some source operators such as EventBusSource
and EPStatementSource
do not generate final markers as they act continuously.
All exceptions during the execution of a data flow are logged and reported to the EPDataFlowExceptionHandler
instance if one was provided.
If no exception handler is provided or the provided exception handler re-throws or generates a new runtime exception, the source operator handles the exception and completes (ends). When all source operators complete then the data flow instance transitions to complete.
The following example is a rolling top words count implemented as a data flow, over a 30 second time window and providing the top 3 words every 2 seconds:
create dataflow RollingTopWords create objectarray schema WordEvent (word string), Emitter -> wordstream<WordEvent> {name:'a'} {} // Produces word stream Select(wordstream) -> wordcount { // Sliding time window count per word select: (select word, count(*) as wordcount from wordstream.win:time(30) group by word) } Select(wordcount) -> wordranks { // Rank of words select: (select window(*) as rankedWords from wordcount.ext:sort(3, wordcount desc) output snapshot every 2 seconds) } LogSink(wordranks) {}
The next example implements a bargain index computation that separates a mixed trade and quote event stream into a trade and a quote stream, computes a vwap and joins the two streams to compute an index:
create dataflow VWAPSample create objectarray schema TradeQuoteType as (type string, ticker string, price double, volume long, askprice double, asksize long), MyObjectArrayGraphSource -> TradeQuoteStream<TradeQuoteType> {} Filter(TradeQuoteStream) -> TradeStream { filter: type = "trade" } Filter(TradeQuoteStream) -> QuoteStream { filter: type = "quote" } Select(TradeStream) -> VwapTrades { select: (select ticker, sum(price * volume) / sum(volume) as vwap, min(price) as minprice from TradeStream.std:groupwin(ticker).win:length(4) group by ticker) } Select(VwapTrades as T, QuoteStream as Q) -> BargainIndex { select: (select case when vwap > askprice then asksize * (Math.exp(vwap - askprice)) else 0.0d end as index from T.std:unique(ticker) as t, Q.std:lastevent() as q where t.ticker = q.ticker) } LogSink(BargainIndex) {}
The final example is a word count data flow, in which three custom operators tokenize, word count and aggregate. The custom operators in this example are discussed next.
create dataflow WordCount MyLineFeedSource -> LineOfTextStream {} MyTokenizerCounter(LineOfTextStream) -> SingleLineCountStream {} MyWordCountAggregator(SingleLineCountStream) -> WordCountStream {} LogSink(WordCountStream) {}
This section discusses how to implement classes that serve as operators in a data flow. The section employs the example data flow as shown earlier.
This example data flow has operators MyLineFeedSource
, MyTokenizerCounter
and MyWordCountAggregator
that are application provided operators:
create dataflow WordCount MyLineFeedSource -> LineOfTextStream {} MyTokenizerCounter(LineOfTextStream) -> SingleLineCountStream {} MyWordCountAggregator(SingleLineCountStream) -> WordCountStream {} LogSink(WordCountStream) {}
In order to resolve application operators, add the package or operator class to imports:
// Sample code adds 'package.*' to simply import the package. epService.getEPAdministrator().getConfiguration() .addImport(MyTokenizerCounter.class.getPackage().getName() + ".*");
The implementation class must implement the DataFlowSourceOperator
interface.
The implementation for the sample MyLineFeedSource
with comments is:
// The OutputTypes annotation can be used to specify the type of events // that are output by the operator. // If provided, it is not necessary to declare output types in the data flow. // The event representation is object-array. @OutputTypes(value = { @OutputType(name = "line", typeName = "String") }) // Provide the DataFlowOpProvideSignal annotation to indicate that // the source operator provides a final marker. @DataFlowOpProvideSignal public class MyLineFeedSource implements DataFlowSourceOperator { // Use the DataFlowContext annotation to indicate the field that receives the emitter. // The engine provides the emitter. @DataFlowContext private EPDataFlowEmitter dataFlowEmitter; // Mark a parameter using the DataFlowOpParameter annotation @DataFlowOpParameter private String myStringParameter; private final Iterator<String> lines; public MyLineFeedSource(Iterator<String> lines) { this.lines = lines; } // Invoked by the engine at time of data flow instantiation. public DataFlowOpInitializeResult initialize(DataFlowOpInitializateContext context) throws Exception { return null; // can return type information here instead } // Invoked by the engine at time of data flow instante execution. public void open(DataFlowOpOpenContext openContext) { // attach to input } // Invoked by the engine in a tight loop. // Submits the events which contain lines of text. public void next() { // read and submit events if (lines.hasNext()) { dataFlowEmitter.submit(new Object[] {lines.next()}); } else { dataFlowEmitter.submitSignal(new EPDataFlowSignalFinalMarker() {}); } } // Invoked by the engine at time of cancellation or completion. public void close(DataFlowOpCloseContext openContext) { // detach from input } }
The implementation for the sample MyTokenizerCounter
with comments is:
// Annotate with DataFlowOperator so the engine knows its a data flow operator @DataFlowOperator @OutputTypes({ @OutputType(name = "line", type = int.class), @OutputType(name = "wordCount", type = int.class), @OutputType(name = "charCount", type = int.class) }) public class MyTokenizerCounter { @DataFlowContext private EPDataFlowEmitter dataFlowEmitter; // Name the method that receives data onInput(...) public void onInput(String line) { // tokenize StringTokenizer tokenizer = new StringTokenizer(line, " \t"); int wordCount = tokenizer.countTokens(); int charCount = 0; while(tokenizer.hasMoreTokens()) { String token = tokenizer.nextToken(); charCount += token.length(); } // submit count of line, words and characters dataFlowEmitter.submit(new Object[] {1, wordCount, charCount}); } }
The implementation for the sample MyWordCountAggregator
with comments is:
@DataFlowOperator @OutputTypes(value = { @OutputType(name = "stats", type = MyWordCountStats.class) }) public class MyWordCountAggregator { @DataFlowContext private EPDataFlowEmitter dataFlowEmitter; private final MyWordCountStats aggregate = new MyWordCountStats(); public void onInput(int lines, int words, int chars) { aggregate.add(lines, words, chars); } // Name the method that receives a marker onSignal public void onSignal(EPDataFlowSignal signal) { // Received puntuation, submit aggregated totals dataFlowEmitter.submit(aggregate); } }