www.espertech.comDocumentation
Data flows in EPL have the following purposes:
Support for data flow programming and flow-based programming.
Declarative and runtime manageable integration of input and output adapters that may be provided by EsperIO or by an application.
Remove the need to use an event bus achieving dataflow-only visibility of events and event types for performance gains.
Data flow operators communicate via streams of either underlying event objects or wrapped events. Underlying event objects are POJO, Map, Object-array or DOM/XML. Wrapped events are represented by EventBean
instances that associate type information to underlying event objects.
For more information on data flow programming or flow-based programming please consult the Wikipedia FBP Article.
EPL offers a number of useful built-in operators that can be combined in a graph to program a data flow. In addition EsperIO offers prebuilt operators that act as sources or sinks of events. An application can easily create and use its own data flow operators.
Using data flows an application can provide events to the data flow operators directly without using an runtime's event bus. Not using an event bus (as represented by the sendEventType
methods of EPEventService
) can achieve performance gains as the runtime does not need to match events to statements and the runtime does not need to wrap underlying event objects in EventBean
instances.
Data flows also allow for finer-grained control over threading, synchronous and asynchronous operation.
Your application declares a data flow using create dataflow
dataflow-name. Declaring the data flow causes the EPL compiler to validate the syntax and some aspects of the data flow graph of operators. Declaring the data flow does not actually instantiate or execute a data flow. Resolving event types and instantiating operators (as required) takes place at time of data flow instantiation.
After your application has declared a data flow, it can instantiate the data flow and execute it. A data flow can be instantiated as many times as needed and each data flow instance can only be executed once.
The example EPL below creates a data flow that, upon execution, outputs the text Hello World
to console and then ends.
create dataflow HelloWorldDataFlow BeaconSource -> helloworld.stream { text: 'hello world' , iterations: 1} LogSink(helloworld.stream) {}
The sample data flow above declares a BeaconSource
operator parameterized by the "hello world" text and 1 iteration. The ->
keyword reads as produces streams. The BeaconSource
operator produces a single stream named helloworld.stream
. The LogSink
operator receives this stream and prints it unformatted.
The next program code snippet declares the data flow to the runtime:
String epl = "create dataflow HelloWorldDataFlow\n" + "BeaconSource -> helloworldStream { text: 'hello world' , iterations: 1}\n" + "LogSink(helloworldStream) {}"; Configuration configuration = new Configuration(); CompilerArguments compilerArguments = new CompilerArguments(configuration); EPCompiled compiled = EPCompilerProvider.getCompiler().compile(epl, compilerArguments); EPDeployment deployment = runtime.getDeploymentService().deploy(compiled);
After declaring a data flow to a runtime, your application can then instantiate and execute the data flow.
The following program code snippet instantiates the data flow:
EPDataFlowInstance instance = runtime.getDataFlowService().instantiate(deployment.getDeploymentId(), "HelloWorldDataFlow");
A data flow instance is represented by an EPDataFlowInstance
object.
The next code snippet executes the data flow instance:
instance.run();
By using the run
method of EPDataFlowInstance
the runtime executes the data flow using the same thread (blocking execute) and returns when the data flow completes. A data flow completes when all operators receive final markers.
The hello world data flow simply prints an unformatted Hello World
string to console. Please check the built-in operator reference for BeaconSource
and LogSink
for more options.
The synopsis for declaring a data flow is:
create dataflow name [schema_declarations] [operator_declarations]
After create dataflow
follows the data flow name and a mixed list of event type (schema) declarations and operator declarations.
Schema declarations define an event type. Specify any number of create schema
clauses as part of the data flow declaration followed by a comma character to end each schema declaration. The syntax for create schema
is described in Section 5.15, “Declaring an Event Type: Create Schema”.
All event types that are defined as part of a data flow are private to the data flow and not available to other statements. To define event types that are available across data flows and other statements, use a create schema
statement, runtime or static configuration.
Annotations as well as expression declarations and scripts can also be pre-pended to the data flow declaration.
For each operator, declare the operator name, input streams, output streams and operator parameters.
The syntax for declaring a data flow operator is:
operator_name [(input_streams)] [-> output_streams] { [parameter_name : parameter_value_expr] [, ...] }
The operator name is an identifier that identifies an operator.
If the operator accepts input streams then those may be listed in parenthesis after the operator name, see Section 20.2.2.2, “Declaring Input Streams”.
If the operator can produce output streams then specify ->
followed by a list of output stream names and types. See Section 20.2.2.3, “Declaring Output Streams”.
Following the input and output stream declaration provide curly brackets ({}
) containing operator parameters. See Section 20.2.2.4, “Declaring Operator Parameters”.
An operator that receives no input streams, produces no output streams and has no parameters assigned to it is shown in this EPL example data flow:
create dataflow MyDataFlow MyOperatorSimple {}
The next EPL shows a data flow that consists of an operator MyOperator
that receives a single input stream myInStream
and produces a single output stream myOutStream
holding MyEvent
events.
The EPL configures the operator parameter myParameter
with a value of 10:
create dataflow MyDataFlow create schema MyEvent as (id string, price double), MyOperator(myInStream) -> myOutStream<MyEvent> { myParameter : 10 }
The next sections outline input stream, output stream and parameter assignment in greater detail.
In case the operator receives input streams, list the input stream names within parenthesis following the operator name. As part of the input stream declaration you may use the as
keyword to assign an alias short name to one or multiple input streams.
The EPL shown next declares myInStream
and assigns the alias mis
:
create dataflow MyDataFlow MyOperator(myInStream as mis) {}
Multiple input streams can be listed separated by comma. We use the term input port to mean the ordinal number of the input stream in the order the input streams are listed.
The EPL as below declares two input streams and assigns an alias to each. The runtime assigns streamOne
to input port 0 (zero) and streamTwo
to port 1.
create dataflow MyDataFlow MyOperator(streamOne as one, streamTwo as two) {}
You may assign multiple input streams to the same port and alias by placing the stream names into parenthesis. All input streams for the same port must have the same event type associated.
The next statement declares an operator that receives input streams streamA
and streamB
both assigned to port 0 (zero) and alias streamsAB
:
create dataflow MyDataFlow MyOperator( (streamA, streamB) as streamsAB) {}
Input and output stream names can have the dot-character in their name.
The following is also valid EPL:
create dataflow MyDataFlow MyOperator(my.in.stream) -> my.out.stream {}
Reserved keywords may not appear in the stream name.
In case the operator produces output streams, list the output streams after the ->
keyword. Multiple output streams can be listed separated by comma. We use the term output port to mean the ordinal number of the output stream in the order the output streams are listed.
The sample EPL below declares an operator that produces two output streams my.out.one
and my.out.two
.
create dataflow MyDataFlow MyOperator -> my.out.one, my.out.two {}
Each output stream can be assigned optional type information within less/greater-then (<>
). Type information is required if the operator cannot deduce the output type from the input type and the operator does not declare explicit output type(s). The event type name can
either be an event type defined within the same data flow or an event type defined in the runtime.
This EPL example declares an RFIDSchema
event type based on an object-array event representation and associates the output stream rfid.stream
with the RFIDSchema
type.
The stream rfid.stream
therefore carries object-array (Object[]
) typed objects according to schema RFIDSchema
:
create dataflow MyDataFlow create objectarray schema RFIDSchema (tagId string, locX double, locY double), MyOperator -> rfid.stream<RFIDSchema> {}
The keyword eventbean
is reserved: Use eventbean<
type-name>
to indicate that a stream carries EventBean
instances of the given type instead of the underlying event object.
This EPL example declares an RFIDSchema
event type based on an object-array event representation and associates the output stream rfid.stream
with the event type,
such that the stream rfid.stream
carries EventBean
objects:
create dataflow MyDataFlow create objectarray schema RFIDSchema (tagId string, locX double, locy double), MyOperator -> rfid.stream<eventbean<RFIDSchema>> {}
Use questionmark (?
) to indicate that the type of events is not known in advance.
In the next EPL the stream my.stream
carries EventBean
instances of any type:
create dataflow MyDataFlow MyOperator -> my.stream<eventbean<?>> {}
Operators can receive constants, objects, EPL expressions and complete statements as parameters. All parameters are listed within curly brackets ({}
) after input and output stream declarations. Curly brackets are required as a separator even if
the operator has no parameters.
The syntax for parameters is:
name : value_expr [,...]
The parameter name is an identifier that is followed by the colon (:
) or equals (=
) character and a value expression. A value expression can be any expression, system property, JSON notation object or statement. Parameters are separated by comma character.
The next EPL demonstrates operator parameters that are scalar values:
create dataflow MyDataFlow MyOperator { stringParam : 'sample', secondString : "double-quotes are fine", intParam : 10 }
Operator parameters can be any EPL expression including expressions that use variables. Subqueries, aggregations and the prev
and prior
functions cannot be applied here.
The EPL shown below lists operator parameters that are expressions:
create dataflow MyDataFlow MyOperator { intParam : 24*60*60, threshold : var_threshold // a variable defined in the runtime }
To obtain the value of a system property, the special systemProperties
property name is reserved for access to system properties.
The following EPL sets operator parameters to a value obtained from a system property:
create dataflow MyDataFlow MyOperator { someSystemProperty : systemProperties('mySystemProperty') }
Any JSON value can also be used as a value. Use square brackets []
for JSON arrays. Use curly brackets {}
to hold nested Map or other object values. Provide the special class
property to
instantiate a given instance by class name. The runtime populates the respective array, Map or Object as specified in the JSON parameter value.
The below EPL demonstrates operator parameters that are JSON values:
create dataflow MyDataFlow MyOperator { myStringArray: ['a', "b"], myMapOrObject: { a : 10, b : 'xyz', }, myInstance: { class: 'com.myorg.myapp.MyImplementation', myValue : 'sample' } }
The special parameter name select
is reserved for use with EPL select statements. Please see the Select
built-in operator for an example.
The below table summarizes the built-in data flow operators available:
Table 20.1. Built-in Operators
Operator | Description |
---|---|
BeaconSource |
Utility source that generates events. See Section 20.3.1, “BeaconSource”. |
Emitter |
Special operator for injecting events into a stream. See Section 20.4.5, “Start Captive”. |
EPStatementSource |
One or more statements act as event sources. See Section 20.3.2, “EPStatementSource”. |
EventBusSink |
The event bus is the sink: Sends events from the data flow into the event bus. See Section 20.3.3, “EventBusSink”. |
EventBusSource |
The event bus is the source: Receives events from the event bus into the data flow. See Section 20.3.4, “EventBusSource”. |
Filter |
Filters an input stream and produces an output stream containing the events passing the filter criteria. See Section 20.3.5, “Filter”. |
LogSink |
Utility sink that outputs events to console or log. See Section 20.3.6, “LogSink”. |
Select |
An EPL select statement that executes on the input stream events. See Section 20.3.7, “Select”. |
The below table summarizes the built-in EsperIO data flow operators. Please see the EsperIO documentation and source for more information.
Table 20.2. EsperIO Built-in Operators
Operator | Description |
---|---|
AMQPSource |
Attaches to AMQP broker to receive messages to process. |
AMQPSink |
Attaches to AMQP broker to send messages. |
FileSource |
Reads one or more files and produces events from file data. |
FileSink |
Write one or more files from events received. |
The BeaconSource operator generates events and populates event properties.
The BeaconSource operator does not accept any input streams and has no input ports.
The BeaconSource operator must have a single output stream. When the BeaconSource operator completed generating events according to the number of iterations provided or when it is cancelled it outputs a final marker to the output stream.
Parameters for the BeaconSource operator are all optional parameters:
Table 20.3. BeaconSource Parameters
Name | Description |
---|---|
initialDelay | Specifies the number of seconds delay before producing events. |
interval | Time interval between events. Takes a integer or double-typed value for the number of seconds. The interval is zero when not provided. |
iterations | Number of events produced. Takes an integer value. When not provided the operator produces tuples until the data flow instance gets cancelled. |
Event properties to be populated can simply be added to the parameters.
If your declaration provides an event type for the output stream then BeaconSource will populate event properties of the underlying events. If no event type is specified, BeaconSource creates an anonymous object-array event type to carry the event properties that are generated and associates this type with its output stream.
Examples are:
create dataflow MyDataFlow create schema SampleSchema(tagId string, locX double), // sample type // BeaconSource that produces empty object-array events without delay // or interval until cancelled. BeaconSource -> stream.one {} // BeaconSource that produces one RFIDSchema event populating event properties // from a user-defined function "generateTagId" and the provided values. BeaconSource -> stream.two<SampleSchema> { iterations : 1, tagId : generateTagId(), locX : 10 } // BeaconSource that produces 10 object-array events populating // the price property with a random value. BeaconSource -> stream.three { iterations : 10, interval : 10, // every 10 seconds initialDelay : 5, // start after 5 seconds price : Math.random() * 100 }
The EPStatementSource operator maintains a subscription to the results of one or more statements. The operator produces the statement output events.
The EPStatementSource operator does not accept any input streams and has no input ports.
The EPStatementSource operator must have a single output stream. It does not generate a final or other marker.
Either the statement name or the statement filter parameter is required:
Table 20.4. EPStatementSource Parameters
Name | Description |
---|---|
collector | Optional parameter, used to transform statement output events to submitted events. |
statementName | Name of the statement that produces events. The statement does not need to exist at the time of data flow instantiation. |
statementFilter | Implementation of the EPDataFlowEPStatementFilter that returns true for each statement that produces events. Statements do not need to exist at the time of data flow instantiation. |
If a statement name is provided, the operator subscribes to output events of the statement if the statement exists or when it gets created at a later point in time.
If a statement filter is provided instead, the operator subscribes to output events of all statements that currently exist and pass the filter pass
method or that get created at a later point in time and pass the filter pass
method.
The collector
can be specified to transform output events. If no collector is specified the operator submits the underlying events of the insert stream received from the statement. The collector object must implement the interface EPDataFlowIRStreamCollector
.
Examples are:
create dataflow MyDataFlow create schema SampleSchema(tagId string, locX double), // sample type // Consider only the statement named MySelectStatement when it exists. // No transformation. EPStatementSource -> stream.one<eventbean<?>> { statementName : 'MySelectStatement' } // Consider all statements that match the filter object provided. // No transformation. EPStatementSource -> stream.two<eventbean<?>> { statementFilter : { class : 'com.mycompany.filters.MyStatementFilter' } } // Consider all statements that match the filter object provided. // With collector that performs transformation. EPStatementSource -> stream.two<SampleSchema> { collector : { class : 'com.mycompany.filters.MyCollector' }, statementFilter : { class : 'com.mycompany.filters.MyStatementFilter' } }
The EventBusSink operator send events received from a data flow into the event bus. Any statement that looks for any of the events gets triggered, equivalent to the sendEventType
methods on EPEventService
or the insert into
clause.
The EventBusSink operator accepts any number of input streams. The operator forwards all events arriving on any input ports to the event bus, equivalent to the sendEventType
methods on EPEventService
.
The EventBusSink operator cannot declare any output streams.
Parameters for the EventBusSink operator are all optional parameters:
Table 20.5. EventBusSink Parameters
Name | Description |
---|---|
collector | Optional parameter, used to transform data flow events to event bus events. |
The collector
can be specified to transform data flow events to event bus events. If no collector is specified the operator submits the events directly to the event bus. The collector object must implement the interface EPDataFlowEventCollector
.
Examples are:
create dataflow MyDataFlow BeaconSource -> instream<SampleSchema> {} // produces a sample stream // Send SampleSchema events produced by beacon to the event bus. EventBusSink(instream) {} // Send SampleSchema events produced by beacon to the event bus. // With collector that performs transformation. EventBusSink(instream) { collector : { class : 'com.mycompany.filters.MyCollector' } }
The EventBusSource operator receives events from the event bus and produces an output stream of the events received. With the term event bus we mean any event visible to the runtime either because the application send the event via any of the sendEventType
methods on EPEventService
or because statements populated streams as a result of insert into
.
The EventBusSource operator does not accept any input streams and has no input ports.
The EventBusSource operator must have a single output stream. It does not generate a final or other marker. The event type declared for the output stream is the event type of events received from the event bus.
All parameters to EventBusSource are optional:
Table 20.6. EventBusSource Parameters
Name | Description |
---|---|
collector | Optional parameter and used to transform event bus events to submitted events. |
filter | Filter expression for event bus matching. |
The collector
can be specified to transform output events. If no collector is specified the operator submits the underlying events of the stream received from the event bus. The collector object must implement the interface EPDataFlowEventBeanCollector
.
The filter
is an expression that the event bus compiles and efficiently matches even in the presence of a large number of event bus sources. The filter expression must return a boolean-typed value, returning true for those events that the event bus passes to the operator.
Examples are:
create dataflow MyDataFlow // Receive all SampleSchema events from the event bus. // No transformation. EventBusSource -> stream.one<SampleSchema> {} // Receive all SampleSchema events with tag id '001' from the event bus. // No transformation. EventBusSource -> stream.one<SampleSchema> { filter : tagId = '001' } // Receive all SampleSchema events from the event bus. // With collector that performs transformation. EventBusSource -> stream.two<SampleSchema> { collector : { class : 'com.mycompany.filters.MyCollector' }, }
The Filter operator filters an input stream and produces an output stream containing the events passing the filter criteria. If a second output stream is provided, the operator sends events not passing filter criteria to that output stream.
The Filter operator accepts a single input stream.
The Filter operator requires one or two output streams. The event type of the input and output stream(s) must be the same. The first output stream receives the matching events according to the filter expression. If declaring two output streams, the second stream receives non-matching events.
The Filter operator has a single required parameter:
Examples are:
create dataflow MyDataFlow create schema SampleSchema(tagId string, locX double), // sample type BeaconSource -> samplestream<SampleSchema> {} // sample source // Filter all events that have a tag id of '001' Filter(samplestream) -> tags_001 { filter : tagId = '001' } // Filter all events that have a tag id of '001', // putting all other events into the second stream Filter(samplestream) -> tags_001, tags_other { filter : tagId = '001' }
The LogSink operator outputs events to console or log file in either a JSON, XML or built-in format (the default).
The LogSink operator accepts any number of input streams. All events arriving on any input ports are logged.
The LogSink operator cannot declare any output streams.
Parameters for the LogSink operator are all optional parameters:
Table 20.8. LogSink Parameters
Name | Description |
---|---|
format | Specify format as a string value: json for JSON-formatted output, xml for XML-formatted output and summary (default) for a built-in format. |
layout | Pattern string according to which output is formatted. Place %df for data flow name, %p for port number, %i for data flow instance id, %t for title, %e for event data. |
log | Boolean true (default) for log output, false for console output. |
linefeed | Boolean true (default) for line feed, false for no line feed. |
title | String title text pre-pended to output. |
Examples are:
create dataflow MyDataFlow BeaconSource -> instream {} // produces sample stream to use below // Output textual event to log using defaults. LogSink(instream) {} // Output JSON-formatted to console. LogSink(instream) { format : 'json', layout : '%t [%e]', log : false, linefeed : true, title : 'My Custom Title:' }
The Select operator is configured with an EPL select statement. It applies events from input streams to the select statement and outputs results either continuously or when the final marker arrives.
The Select operator accepts one or more input streams.
The Select operator requires a single output stream.
The Select operator requires the select
parameter, all other parameters are optional:
Table 20.9. Select Operator Parameters
Name | Description |
---|---|
iterate | Boolean indicator whether results should be output continuously or only upon arrival of the final marker. |
select | EPL select statement in parenthesis. |
Set the optional iterate
flag to false (the default) to have the operator output results continuously. Set the iterate
flag to true to indicate that the operator outputs results only when the final marker arrives. If iterate
is true then output rate limiting clauses are not supported.
The select
parameter is required and provides an EPL select statement within parenthesis.
For each input port the statement should list the input stream name or the alias name in the from
clause. Only filter-based streams are allowed in the from
clause and patterns or named windows are not supported. Also not allowed are
the insert into
clause, the irstream
keyword and subselects.
The Select operator determines the event type of output events based on the select
clause. It is not necessary to declare an event type for the output stream.
Examples are:
create dataflow MyDataFlow create schema SampleSchema(tagId string, locX double), // sample type BeaconSource -> instream<SampleSchema> {} // sample stream BeaconSource -> secondstream<SampleSchema> {} // sample stream // Simple continuous count of events Select(instream) -> outstream { select: (select count(*) from instream) } // Demonstrate use of alias Select(instream as myalias) -> outstream { select: (select count(*) from myalias) } // Output only when the final marker arrives Select(instream as myalias) -> outstream { select: (select count(*) from myalias), iterate: true } // Same input port for the two sample streams Select( (instream, secondstream) as myalias) -> outstream { select: (select count(*) from myalias) } // A join with multiple input streams, // joining the last event per stream forming pairs Select(instream, secondstream) -> outstream { select: (select a.tagId, b.tagId from instream#lastevent as a, secondstream#lastevent as b) } // A join with multiple input streams and using aliases. Select(instream as S1, secondstream as S2) -> outstream { select: (select a.tagId, b.tagId from S1#lastevent as a, S2#lastevent as b) }
This section outlines the steps to declare, instantiate, execute and cancel or complete data flows.
Compile data flow the same as any other statement and deploy the compiled module. The EPStatementObjectModel
statement object model can also be used to compile a data flow.
Annotations that are listed at the top of the EPL text are applied to all statements and operators in the data flow. Annotations listed for a specific operator apply to that operator only.
The next program code snippet declares a data flow to the runtime:
String epl = "@Name('MyStatementName') create dataflow HelloWorldDataFlow\n" + "BeaconSource -> helloworldStream { text: 'hello world' , iterations: 1}\n" + "LogSink(helloworldStream) {}"; Configuration configuration = new Configuration(); CompilerArguments compilerArguments = new CompilerArguments(configuration); EPCompiled compiled = EPCompilerProvider.getCompiler().compile(epl, compilerArguments); EPDeployment deployment = runtime.getDeploymentService().deploy(compiled);
The statement name that can be assigned to the statement is used only for statement management. Your application may undeploy the statement declaring the data flow thereby making the data flow unavailable for instantiation. Existing instances of the data flow are not affected by an undeploy of the statement that declares the data flow.
Listeners or the subscriber to the statement declaring a data flow receive no events or other output. The statement declaring a data flow returns no rows when iterated.
The com.espertech.esper.common.client.dataflow.core.EPDataFlowService
available via getDataFlowService
on EPRuntime
manages declared data flows.
Use the instantiate
method on EPDataFlowRuntime
to instantiate a data flow after it has been declared. Pass the data flow name and optional instantiation options to the method. A data flow can be instantiated any number of times.
A data flow instance is represented by an instance of EPDataFlowInstance
. Each instance has a state as well as methods to start, run, join and cancel as well as methods to obtain execution statistics.
Various optional arguments including operator parameters can be passed to instantiate
via the EPDataFlowInstantiationOptions
object as explained in more detail below.
The following code snippet instantiates the data flow:
EPDataFlowInstance instance = runtime.getDataFlowService().instantiate(deployment.getDeploymentId(), "HelloWorldDataFlow");
The runtime does not track or otherwise retain data flow instances in memory. It is up to your application to retain data flow instances as needed.
Each data flow instance associates to a state. The start state is EPDataFlowState.INSTANTIATED
. The end state is either COMPLETED
or CANCELLED
.
The following table outlines all states:
Table 20.10. Data Flow Instance States
State | Description |
---|---|
INSTANTIATED |
Start state, applies when a data flow instance has been instantiated and has not executed. |
RUNNING |
A data flow instance transitions from instantiated to running when any of the |
COMPLETED |
A data flow instance transitions from running to completed when all final markers have been processed by all operators. |
CANCELLED |
A data flow instance transitions from running to cancelled when your application invokes the |
After your application instantiated a data flow instance it can execute the data flow instance using either the start
, run
or startCaptive
methods.
Use the start
method to have the runtime allocate a thread for each source operator. Execution is non-blocking. Use the join
method to have one or more threads join a data flow instance execution.
Use the run
method to have the runtime use the current thread to execute the single source operator. Multiple source operators are not allowed when using run
.
Use the startCaptive
method to have the runtime return all Runnable
instances and emitters, for the purpose of having complete control over execution. The runtime allocates no threads and does not perform any logic for the data flow unless your application employs the Runnable
instances and emitters returned by the method.
The next code snippet executes the data flow instance as a blocking call:
instance.run();
By using the run
method of EPDataFlowInstance
the runtime executes the data flow instance using the same thread (blocking execute) and returns when the data flow instance completes. A data flow instance completes when all operators receive final markers.
The hello world data flow simply prints an unformatted Hello World
string to console. The BeaconSource
operator generates a final marker when it finishes the 1 iteration. The data flow instance thus transitions to complete after the LogSink
operator receives the final marker, and the thread invoking the run
method returns.
The next code snippet executes the data flow instance as a non-blocking call:
instance.start();
Use the cancel
method to cancel execution of a running data flow instance:
instance.cancel();
Use the join
method to join execution of a running data flow instance, causing the joining thread to block until the data flow instance either completes or is cancelled:
instance.join();
The EPDataFlowInstantiationOptions
object that can be passed to the instantiate
method may be used to customize the operator graph, operator parameters and execution of the data flow instance.
Passing runtime parameters to data flow operators is easiest using the addParameterURI
method. The first parameter is the data flow operator name and the
operator parameter name separated by the slash character. The second parameter is the value object.
For example, in order to pass the file name to the FileSource
operator at runtime, use the following code:
EPDataFlowInstantiationOptions options = new EPDataFlowInstantiationOptions(); options.addParameterURI("FileSource/file", filename); EPDataFlowInstance instance = runtime.getDataFlowService().instantiate(deployment.getDeploymentId(), "MyFileReaderDataFlow",options); instance.run();
The optional operatorProvider
member takes an implementation of the EPDataFlowOperatorProvider
interface. The runtime invokes this provider to obtain operator instances.
The optional parameterProvider
member takes an implementation of the EPDataFlowOperatorParameterProvider
interface. The runtime invokes this provider to obtain operator parameter values. The values override the values provided via parameter URI above.
The optional exceptionHandler
member takes an implementation of the EPDataFlowExceptionHandler
interface. The runtime invokes this provider to when exceptions occur.
The optional dataFlowInstanceId
can be assigned any string value for the purpose of identifying the data flow instance.
The optional dataFlowInstanceUserObject
can be assigned any object value for the purpose of associating a user object to the data flow instance.
Set the operatorStatistics
flag to true to obtain statistics for operator execution.
Set the cpuStatistics
flag to true to obtain CPU statistics for operator execution.
Use the startCaptive
method on a EPDataFlowInstance
data flow instance when your application requires full control over threading. This method returns an EPDataFlowInstanceCaptive
instance
that contains a list of java.lang.Runnable
instances that represent each source operator.
The special Emitter
operator can occur in a data flow. This emitter can be used to inject events into the data flow without writing a new operator. Emitter takes a single name
parameter that provides the name of the emitter
and that is returned in a map of emitters by EPDataFlowInstanceCaptive
.
The example EPL below creates a data flow that uses emitter.
create dataflow HelloWorldDataFlow create objectarray schema SampleSchema(text string), // sample type Emitter -> helloworld.stream<SampleSchema> { name: 'myemitter' } LogSink(helloworld.stream) {}
Your application may obtain the Emitter instance and sends events directly into the output stream. This feature is only supported in relationship with startCaptive
since the runtime does not allocate any threads or run source operators.
The example code snippet below obtains the emitter instance and send events directly into the data flow instance:
EPDataFlowInstance instance = runtime.getDataFlowService().instantiate(deployment.getDeploymentId(), "HelloWorldDataFlow", options); EPDataFlowInstanceCaptive captiveStart = instance.startCaptive(); Emitter emitter = captiveStart.getEmitters().get("myemitter"); emitter.submit(new Object[] {"this is some text"});
When emitting DOM XML events please emit the root element obtained from document.getDocumentElement()
.
When your application executes a data flow instance by means of the start
(non-blocking) or run
(blocking) methods, the data flow instance stays running until either completed or cancelled. While cancellation is always via the cancel
method,
completion occurs when all source operators provide final markers.
The final marker is an object that implements the EPDataFlowSignalFinalMarker
interface. Some operators may also provide or process data window markers which implement the EPDataFlowSignalWindowMarker
interface. All such signals implement the EPDataFlowSignal
interface.
Some source operators such as EventBusSource
and EPStatementSource
do not generate final markers as they act continuously.
All exceptions during the execution of a data flow are logged and reported to the EPDataFlowExceptionHandler
instance if one was provided.
If no exception handler is provided or the provided exception handler re-throws or generates a new runtime exception, the source operator handles the exception and completes (ends). When all source operators complete then the data flow instance transitions to complete.
The following example is a rolling top words count implemented as a data flow, over a 30 second time window and providing the top 3 words every 2 seconds:
create dataflow RollingTopWords create objectarray schema WordEvent (word string), Emitter -> wordstream<WordEvent> {name:'a'} {} // Produces word stream Select(wordstream) -> wordcount { // Sliding time window count per word select: (select word, count(*) as wordcount from wordstream#time(30) group by word) } Select(wordcount) -> wordranks { // Rank of words select: (select window(*) as rankedWords from wordcount#sort(3, wordcount desc) output snapshot every 2 seconds) } LogSink(wordranks) {}
The next example implements a bargain index computation that separates a mixed trade and quote event stream into a trade and a quote stream, computes a vwap and joins the two streams to compute an index:
create dataflow VWAPSample create objectarray schema TradeQuoteType as (type string, ticker string, price double, volume long, askprice double, asksize long), MyObjectArrayGraphSource -> TradeQuoteStream<TradeQuoteType> {} Filter(TradeQuoteStream) -> TradeStream { filter: type = "trade" } Filter(TradeQuoteStream) -> QuoteStream { filter: type = "quote" } Select(TradeStream) -> VwapTrades { select: (select ticker, sum(price * volume) / sum(volume) as vwap, min(price) as minprice from TradeStream#groupwin(ticker)#length(4) group by ticker) } Select(VwapTrades as T, QuoteStream as Q) -> BargainIndex { select: (select case when vwap > askprice then asksize * (Math.exp(vwap - askprice)) else 0.0d end as index from T#unique(ticker) as t, Q#lastevent as q where t.ticker = q.ticker) } LogSink(BargainIndex) {}
The final example is a word count data flow, in which three custom operators tokenize, word count and aggregate. The custom operators in this example are discussed next.
create dataflow WordCount MyLineFeedSource -> LineOfTextStream {} MyTokenizerCounter(LineOfTextStream) -> SingleLineCountStream {} MyWordCountAggregator(SingleLineCountStream) -> WordCountStream {} LogSink(WordCountStream) {}
Implementing an operator requires the use of extension and internal APIs that are not considered stable and may change between versions.
This section discusses how to implement classes that serve as operators in a data flow. The section employs the example data flow as shown earlier.
This example data flow has operators MyLineFeedSource
, MyTokenizerCounter
and MyWordCountAggregator
that are application provided operators:
create dataflow WordCount MyLineFeedSource -> LineOfTextStream {} MyTokenizerCounter(LineOfTextStream) -> SingleLineCountStream {} MyWordCountAggregator(SingleLineCountStream) -> WordCountStream {} LogSink(WordCountStream) {}
Each operator requires implementing the following interfaces:
Implement the DataFlowOperatorForge
interface for the compiler to use.
Implement the DataFlowOperatorFactory
interface for the runtime to instantiate operator instances.
Implement either the DataFlowOperator
interface, the DataFlowOperatorLifecycle
or the DataFlowSourceOperator
interface.
The compiler must be able to find the class implementing DataFlowOperatorForge
. Add the forge package or forge class to imports:
// Sample code adds 'package.*' to simply import the package. Configuration configuration = new Configuration(); configuration.getCommon().addImport(MyLineFeedSourceForge.class.getName());
Every operator has a forge class that implements the DataFlowOperatorForge
interface and is only used at compile-time.
The compiler provides the operator parameter expressions to the forge instance and invokes the initializeForge
method.
When it is time to compile the compiler generates code by invoking the make
method.
// The OutputTypes annotation can be used to specify the type of events // that are output by the operator. // If provided, it is not necessary to declare output types in the data flow. // The event representation is object-array. @OutputTypes(value = { @OutputType(name = "line", typeName = "String") }) // Provide the DataFlowOpProvideSignal annotation to indicate that // the source operator provides a final marker. @DataFlowOpProvideSignal public class MyLineFeedSourceForge implements DataFlowOperatorForge { public DataFlowOpForgeInitializeResult initializeForge(DataFlowOpForgeInitializeContext context) throws ExprValidationException { return null; } public CodegenExpression make(CodegenMethodScope parent, SAIFFInitializeSymbol symbols, CodegenClassScope classScope) { return newInstance(MyLineFeedSourceFactory.class); } }
The operator factory class must implement the DataFlowOperatorFactory
interface.
At deployment time the operator factory initializes using the code generated in the forge make
method.
Upon instantiating a data flow the factory must return an operator instance.
The implementation for the sample MyLineFeedSourceFactory
is:
public class MyLineFeedSourceFactory implements DataFlowOperatorFactory { public void initializeFactory(DataFlowOpFactoryInitializeContext context) { } public DataFlowOperator operator(DataFlowOpInitializeContext context) { return new MyLineFeedSource(Collections.emptyIterator()); } }
The operator implementation for the sample MyLineFeedSource
is:
public class MyLineFeedSource implements DataFlowSourceOperator { @DataFlowContext private EPDataFlowEmitter dataFlowEmitter; private final Iterator<String> lines; public MyLineFeedSource(Iterator<String> lines) { this.lines = lines; } public void open(DataFlowOpOpenContext openContext) { } public void next() { if (lines.hasNext()) { dataFlowEmitter.submit(new Object[]{lines.next()}); } else { dataFlowEmitter.submitSignal(new EPDataFlowSignalFinalMarker() { }); } } public void close(DataFlowOpCloseContext openContext) { } }
The implementation for the sample MyTokenizerCounter
is a forge, factory and operator in one class:
@OutputTypes({ @OutputType(name = "line", type = int.class), @OutputType(name = "wordCount", type = int.class), @OutputType(name = "charCount", type = int.class) }) public class MyTokenizerCounter implements DataFlowOperatorForge, DataFlowOperatorFactory, DataFlowOperator { private static final Logger log = LoggerFactory.getLogger(MyTokenizerCounter.class); @DataFlowContext private EPDataFlowEmitter graphContext; public DataFlowOpForgeInitializeResult initializeForge(DataFlowOpForgeInitializeContext context) throws ExprValidationException { return null; } public CodegenExpression make(CodegenMethodScope parent, SAIFFInitializeSymbol symbols, CodegenClassScope classScope) { return newInstance(MyTokenizerCounter.class); } public void initializeFactory(DataFlowOpFactoryInitializeContext context) { } public DataFlowOperator operator(DataFlowOpInitializeContext context) { return new MyTokenizerCounter(); } public void onInput(String line) { StringTokenizer tokenizer = new StringTokenizer(line, " \t"); int wordCount = tokenizer.countTokens(); int charCount = 0; while (tokenizer.hasMoreTokens()) { String token = tokenizer.nextToken(); charCount += token.length(); } log.debug("Submitting stat words[" + wordCount + "] chars[" + charCount + "] for line '" + line + "'"); graphContext.submit(new Object[]{1, wordCount, charCount}); } }
The implementation for the sample MyWordCountAggregator
with comments is:
@OutputTypes(value = { @OutputType(name = "stats", type = MyWordCountStats.class) }) public class MyWordCountAggregator implements DataFlowOperatorForge, DataFlowOperatorFactory, DataFlowOperator { private static final Logger log = LoggerFactory.getLogger(MyWordCountAggregator.class); @DataFlowContext private EPDataFlowEmitter graphContext; private final MyWordCountStats aggregate = new MyWordCountStats(); public DataFlowOpForgeInitializeResult initializeForge(DataFlowOpForgeInitializeContext context) throws ExprValidationException { return null; } public CodegenExpression make(CodegenMethodScope parent, SAIFFInitializeSymbol symbols, CodegenClassScope classScope) { return newInstance(MyWordCountAggregator.class); } public void initializeFactory(DataFlowOpFactoryInitializeContext context) { } public DataFlowOperator operator(DataFlowOpInitializeContext context) { return new MyWordCountAggregator(); } public void onInput(int lines, int words, int chars) { aggregate.add(lines, words, chars); log.debug("Aggregated: " + aggregate); } public void onSignal(EPDataFlowSignal signal) { log.debug("Received punctuation, submitting totals: " + aggregate); graphContext.submit(aggregate); } }
The forge instance receives parameters expressions. A forge can declare parameters like so:
// Expose a parameter named "file" that takes any expression as parameter @DataFlowOpParameter private ExprNode file; // Expose a parameter named "adapterInputSource" that will be an instance of some interface // Interface implementations as parameters are declare a Map<String, Object> @DataFlowOpParameter private Map<String, Object> adapterInputSource; // Expose a paramerer named "propertyNames" that is an array of string constants @DataFlowOpParameter private String[] propertyNames;
The forge class can obtain the output event type if needed. It should also validate the expression parameters and throw ExprValidationException
if the parameter expression does not return the expected type.
The utility class DataFlowParameterValidation
has validate utility methods that return a validated expression:
For example:
public DataFlowOpForgeInitializeResult initializeForge(DataFlowOpForgeInitializeContext context) throws ExprValidationException { // Obtain the declared output event type outputEventType = context.getOutputPorts().get(0).getOptionalDeclaredType() != null ? context.getOutputPorts().get(0).getOptionalDeclaredType().getEventType() : null; if (outputEventType == null) { throw new ExprValidationException("No event type provided for output, please provide an event type name"); } // validate the "file" parameter expression expected to return a String-typed value file = DataFlowParameterValidation.validate("file", file, String.class, context); return null; }
The forge class passes parameters to the factory. We use SAIFFInitializeBuilder
that is a builder utility for building the factory. For example:
public CodegenExpression make(CodegenMethodScope parent, SAIFFInitializeSymbol symbols, CodegenClassScope classScope) { return new SAIFFInitializeBuilder(FileSourceFactory.class, this.getClass(), "factory", parent, symbols, classScope) .exprnode("file", file) .constant("propertyNames", propertyNames) .map("adapterInputSource", adapterInputSource) .build(); }
The factory class must have setter-methods of the same name that receive the parameters:
private ExprEvaluator file; private String[] propertyNames; private Map<String, Object> adapterInputSource; public void setFile(ExprEvaluator file) { this.file = file; } public void setPropertyNames(String[] propertyNames) { this.propertyNames = propertyNames; } public void setAdapterInputSource(Map<String, Object> adapterInputSource) { this.adapterInputSource = adapterInputSource; }
The factory class can resolve parameter values
by evaluating expressions and by determining whether parameters were passed as options. The DataFlowParameterResolution
class provides
convenience methods. For example:
public DataFlowOperator operator(DataFlowOpInitializeContext context) { String fileName = DataFlowParameterResolution.resolveWithDefault("file", file, null, String.class, context); AdapterInputSource adapterInputSourceInstance = DataFlowParameterResolution.resolveOptionalInstance("adapterInputSource", adapterInputSource, AdapterInputSource.class, context); return new MyOperator(fileName, adapterInputSourceInstance); }