www.espertech.comDocumentation

Chapter 20. EPL Reference: Data Flow

20.1. Introduction
20.2. Usage
20.2.1. Overview
20.2.2. Syntax
20.3. Built-In Operators
20.3.1. BeaconSource
20.3.2. EPStatementSource
20.3.3. EventBusSink
20.3.4. EventBusSource
20.3.5. Filter
20.3.6. LogSink
20.3.7. Select
20.4. API
20.4.1. Declaring a Data Flow
20.4.2. Instantiating a Data Flow
20.4.3. Executing a Data Flow
20.4.4. Instantiation Options
20.4.5. Start Captive
20.4.6. Data Flow Punctuation With Markers
20.4.7. Exception Handling
20.5. Examples
20.6. Operator Implementation
20.6.1. Sample Operator Acting as Source
20.6.2. Sample Tokenizer Operator
20.6.3. Sample Aggregator Operator
20.6.4. Passing Operator Parameters

Data flows in EPL have the following purposes:

  1. Support for data flow programming and flow-based programming.

  2. Declarative and runtime manageable integration of input and output adapters that may be provided by EsperIO or by an application.

  3. Remove the need to use an event bus achieving dataflow-only visibility of events and event types for performance gains.

Data flow operators communicate via streams of either underlying event objects or wrapped events. Underlying event objects are POJO, Map, Object-array or DOM/XML. Wrapped events are represented by EventBean instances that associate type information to underlying event objects.

For more information on data flow programming or flow-based programming please consult the Wikipedia FBP Article.

EPL offers a number of useful built-in operators that can be combined in a graph to program a data flow. In addition EsperIO offers prebuilt operators that act as sources or sinks of events. An application can easily create and use its own data flow operators.

Using data flows an application can provide events to the data flow operators directly without using an runtime's event bus. Not using an event bus (as represented by the sendEventType methods of EPEventService) can achieve performance gains as the runtime does not need to match events to statements and the runtime does not need to wrap underlying event objects in EventBean instances.

Data flows also allow for finer-grained control over threading, synchronous and asynchronous operation.

Your application declares a data flow using create dataflow dataflow-name. Declaring the data flow causes the EPL compiler to validate the syntax and some aspects of the data flow graph of operators. Declaring the data flow does not actually instantiate or execute a data flow. Resolving event types and instantiating operators (as required) takes place at time of data flow instantiation.

After your application has declared a data flow, it can instantiate the data flow and execute it. A data flow can be instantiated as many times as needed and each data flow instance can only be executed once.

The example EPL below creates a data flow that, upon execution, outputs the text Hello World to console and then ends.

create dataflow HelloWorldDataFlow
  BeaconSource -> helloworld.stream { text: 'hello world' , iterations: 1}
  LogSink(helloworld.stream) {}

The sample data flow above declares a BeaconSource operator parameterized by the "hello world" text and 1 iteration. The -> keyword reads as produces streams. The BeaconSource operator produces a single stream named helloworld.stream. The LogSink operator receives this stream and prints it unformatted.

The next program code snippet declares the data flow to the runtime:

String epl = "create dataflow HelloWorldDataFlow\n" +
  "BeaconSource -> helloworldStream { text: 'hello world' , iterations: 1}\n" +
  "LogSink(helloworldStream) {}";

Configuration configuration = new Configuration();
CompilerArguments compilerArguments = new CompilerArguments(configuration);
EPCompiled compiled = EPCompilerProvider.getCompiler().compile(epl, compilerArguments);
EPDeployment deployment = runtime.getDeploymentService().deploy(compiled);

After declaring a data flow to a runtime, your application can then instantiate and execute the data flow.

The following program code snippet instantiates the data flow:

EPDataFlowInstance instance =
  runtime.getDataFlowService().instantiate(deployment.getDeploymentId(), "HelloWorldDataFlow");

A data flow instance is represented by an EPDataFlowInstance object.

The next code snippet executes the data flow instance:

instance.run();

By using the run method of EPDataFlowInstance the runtime executes the data flow using the same thread (blocking execute) and returns when the data flow completes. A data flow completes when all operators receive final markers.

The hello world data flow simply prints an unformatted Hello World string to console. Please check the built-in operator reference for BeaconSource and LogSink for more options.

The synopsis for declaring a data flow is:

create dataflow name
	[schema_declarations]
	[operator_declarations]
	

After create dataflow follows the data flow name and a mixed list of event type (schema) declarations and operator declarations.

Schema declarations define an event type. Specify any number of create schema clauses as part of the data flow declaration followed by a comma character to end each schema declaration. The syntax for create schema is described in Section 5.15, “Declaring an Event Type: Create Schema”.

All event types that are defined as part of a data flow are private to the data flow and not available to other statements. To define event types that are available across data flows and other statements, use a create schema statement, runtime or static configuration.

Annotations as well as expression declarations and scripts can also be pre-pended to the data flow declaration.

For each operator, declare the operator name, input streams, output streams and operator parameters.

The syntax for declaring a data flow operator is:

operator_name [(input_streams)]  [-> output_streams] {
  [parameter_name : parameter_value_expr] [, ...]
}

The operator name is an identifier that identifies an operator.

If the operator accepts input streams then those may be listed in parenthesis after the operator name, see Section 20.2.2.2, “Declaring Input Streams”.

If the operator can produce output streams then specify -> followed by a list of output stream names and types. See Section 20.2.2.3, “Declaring Output Streams”.

Following the input and output stream declaration provide curly brackets ({}) containing operator parameters. See Section 20.2.2.4, “Declaring Operator Parameters”.

An operator that receives no input streams, produces no output streams and has no parameters assigned to it is shown in this EPL example data flow:

create dataflow MyDataFlow
  MyOperatorSimple {}

The next EPL shows a data flow that consists of an operator MyOperator that receives a single input stream myInStream and produces a single output stream myOutStream holding MyEvent events. The EPL configures the operator parameter myParameter with a value of 10:

create dataflow MyDataFlow
  create schema MyEvent as (id string, price double),
  MyOperator(myInStream) -> myOutStream<MyEvent> {
    myParameter : 10
  }

The next sections outline input stream, output stream and parameter assignment in greater detail.

In case the operator produces output streams, list the output streams after the -> keyword. Multiple output streams can be listed separated by comma. We use the term output port to mean the ordinal number of the output stream in the order the output streams are listed.

The sample EPL below declares an operator that produces two output streams my.out.one and my.out.two.

create dataflow MyDataFlow
  MyOperator -> my.out.one, my.out.two {}

Each output stream can be assigned optional type information within less/greater-then (<>). Type information is required if the operator cannot deduce the output type from the input type and the operator does not declare explicit output type(s). The event type name can either be an event type defined within the same data flow or an event type defined in the runtime.

This EPL example declares an RFIDSchema event type based on an object-array event representation and associates the output stream rfid.stream with the RFIDSchema type. The stream rfid.stream therefore carries object-array (Object[]) typed objects according to schema RFIDSchema:

create dataflow MyDataFlow
  create objectarray schema RFIDSchema (tagId string, locX double, locY double),
  MyOperator -> rfid.stream<RFIDSchema> {}

The keyword eventbean is reserved: Use eventbean<type-name> to indicate that a stream carries EventBean instances of the given type instead of the underlying event object.

This EPL example declares an RFIDSchema event type based on an object-array event representation and associates the output stream rfid.stream with the event type, such that the stream rfid.stream carries EventBean objects:

create dataflow MyDataFlow
  create objectarray schema RFIDSchema (tagId string, locX double, locy double),
  MyOperator -> rfid.stream<eventbean<RFIDSchema>> {}

Use questionmark (?) to indicate that the type of events is not known in advance.

In the next EPL the stream my.stream carries EventBean instances of any type:

create dataflow MyDataFlow
  MyOperator -> my.stream<eventbean<?>> {}

Operators can receive constants, objects, EPL expressions and complete statements as parameters. All parameters are listed within curly brackets ({}) after input and output stream declarations. Curly brackets are required as a separator even if the operator has no parameters.

The syntax for parameters is:

name : value_expr [,...]

The parameter name is an identifier that is followed by the colon (:) or equals (=) character and a value expression. A value expression can be any expression, system property, JSON notation object or statement. Parameters are separated by comma character.

The next EPL demonstrates operator parameters that are scalar values:

create dataflow MyDataFlow
  MyOperator {
    stringParam : 'sample',
    secondString : "double-quotes are fine",
    intParam : 10
  }

Operator parameters can be any EPL expression including expressions that use variables. Subqueries, aggregations and the prev and prior functions cannot be applied here.

The EPL shown below lists operator parameters that are expressions:

create dataflow MyDataFlow
  MyOperator {
    intParam : 24*60*60,
    threshold : var_threshold	// a variable defined in the runtime
  }

To obtain the value of a system property, the special systemProperties property name is reserved for access to system properties.

The following EPL sets operator parameters to a value obtained from a system property:

create dataflow MyDataFlow
  MyOperator {
    someSystemProperty : systemProperties('mySystemProperty') 
  }

Any JSON value can also be used as a value. Use square brackets [] for JSON arrays. Use curly brackets {} to hold nested Map or other object values. Provide the special class property to instantiate a given instance by class name. The runtime populates the respective array, Map or Object as specified in the JSON parameter value.

The below EPL demonstrates operator parameters that are JSON values:

create dataflow MyDataFlow
  MyOperator {
    myStringArray: ['a', "b"],
    myMapOrObject: {
      a : 10,
      b : 'xyz',
    },
    myInstance: {
      class: 'com.myorg.myapp.MyImplementation',
      myValue : 'sample'
    }
  }

The special parameter name select is reserved for use with EPL select statements. Please see the Select built-in operator for an example.

The below table summarizes the built-in data flow operators available:


The below table summarizes the built-in EsperIO data flow operators. Please see the EsperIO documentation and source for more information.


The BeaconSource operator generates events and populates event properties.

The BeaconSource operator does not accept any input streams and has no input ports.

The BeaconSource operator must have a single output stream. When the BeaconSource operator completed generating events according to the number of iterations provided or when it is cancelled it outputs a final marker to the output stream.

Parameters for the BeaconSource operator are all optional parameters:


Event properties to be populated can simply be added to the parameters.

If your declaration provides an event type for the output stream then BeaconSource will populate event properties of the underlying events. If no event type is specified, BeaconSource creates an anonymous object-array event type to carry the event properties that are generated and associates this type with its output stream.

Examples are:

create dataflow MyDataFlow
  create schema SampleSchema(tagId string, locX double),	// sample type			
			
  // BeaconSource that produces empty object-array events without delay 
  // or interval until cancelled.
  BeaconSource -> stream.one {}
  
  // BeaconSource that produces one RFIDSchema event populating event properties
  // from a user-defined function "generateTagId" and the provided values.
  BeaconSource -> stream.two<SampleSchema> {
    iterations : 1,
    tagId : generateTagId(),
    locX : 10
  }
  
  // BeaconSource that produces 10 object-array events populating
  // the price property with a random value.
  BeaconSource -> stream.three {
    iterations : 10,
    interval : 10, // every 10 seconds
    initialDelay : 5, // start after 5 seconds
    price : Math.random() * 100
  }

The EPStatementSource operator maintains a subscription to the results of one or more statements. The operator produces the statement output events.

The EPStatementSource operator does not accept any input streams and has no input ports.

The EPStatementSource operator must have a single output stream. It does not generate a final or other marker.

Either the statement name or the statement filter parameter is required:


If a statement name is provided, the operator subscribes to output events of the statement if the statement exists or when it gets created at a later point in time.

If a statement filter is provided instead, the operator subscribes to output events of all statements that currently exist and pass the filter pass method or that get created at a later point in time and pass the filter pass method.

The collector can be specified to transform output events. If no collector is specified the operator submits the underlying events of the insert stream received from the statement. The collector object must implement the interface EPDataFlowIRStreamCollector.

Examples are:

create dataflow MyDataFlow
  create schema SampleSchema(tagId string, locX double),	// sample type			
			
  // Consider only the statement named MySelectStatement when it exists.
  // No transformation.
  EPStatementSource -> stream.one<eventbean<?>> {
    statementName : 'MySelectStatement'
  }
  
  // Consider all statements that match the filter object provided.
  // No transformation.
  EPStatementSource -> stream.two<eventbean<?>> {
    statementFilter : {
      class : 'com.mycompany.filters.MyStatementFilter'
    }
  }
  
  // Consider all statements that match the filter object provided.
  // With collector that performs transformation.
  EPStatementSource -> stream.two<SampleSchema> {
    collector : {
      class : 'com.mycompany.filters.MyCollector'
    },
    statementFilter : {
      class : 'com.mycompany.filters.MyStatementFilter'
    }
  }

The EventBusSource operator receives events from the event bus and produces an output stream of the events received. With the term event bus we mean any event visible to the runtime either because the application send the event via any of the sendEventType methods on EPEventService or because statements populated streams as a result of insert into.

The EventBusSource operator does not accept any input streams and has no input ports.

The EventBusSource operator must have a single output stream. It does not generate a final or other marker. The event type declared for the output stream is the event type of events received from the event bus.

All parameters to EventBusSource are optional:


The collector can be specified to transform output events. If no collector is specified the operator submits the underlying events of the stream received from the event bus. The collector object must implement the interface EPDataFlowEventBeanCollector.

The filter is an expression that the event bus compiles and efficiently matches even in the presence of a large number of event bus sources. The filter expression must return a boolean-typed value, returning true for those events that the event bus passes to the operator.

Examples are:

create dataflow MyDataFlow

  // Receive all SampleSchema events from the event bus.
  // No transformation.
  EventBusSource -> stream.one<SampleSchema> {}
  
  // Receive all SampleSchema events with tag id '001' from the event bus.
  // No transformation.
  EventBusSource -> stream.one<SampleSchema> {
    filter : tagId = '001'
  }

  // Receive all SampleSchema events from the event bus.
  // With collector that performs transformation.
  EventBusSource -> stream.two<SampleSchema> {
    collector : {
      class : 'com.mycompany.filters.MyCollector'
    },
  }

The Select operator is configured with an EPL select statement. It applies events from input streams to the select statement and outputs results either continuously or when the final marker arrives.

The Select operator accepts one or more input streams.

The Select operator requires a single output stream.

The Select operator requires the select parameter, all other parameters are optional:


Set the optional iterate flag to false (the default) to have the operator output results continuously. Set the iterate flag to true to indicate that the operator outputs results only when the final marker arrives. If iterate is true then output rate limiting clauses are not supported.

The select parameter is required and provides an EPL select statement within parenthesis. For each input port the statement should list the input stream name or the alias name in the from clause. Only filter-based streams are allowed in the from clause and patterns or named windows are not supported. Also not allowed are the insert into clause, the irstream keyword and subselects.

The Select operator determines the event type of output events based on the select clause. It is not necessary to declare an event type for the output stream.

Examples are:

create dataflow MyDataFlow
  create schema SampleSchema(tagId string, locX double),	// sample type			
  BeaconSource -> instream<SampleSchema> {}  // sample stream
  BeaconSource -> secondstream<SampleSchema> {}  // sample stream
  
  // Simple continuous count of events
  Select(instream) -> outstream {
    select: (select count(*) from instream)
  }
  
  // Demonstrate use of alias
  Select(instream as myalias) -> outstream {
    select: (select count(*) from myalias)
  }
  
  // Output only when the final marker arrives
  Select(instream as myalias) -> outstream {
    select: (select count(*) from myalias),
    iterate: true
  }

  // Same input port for the two sample streams
  Select( (instream, secondstream) as myalias) -> outstream {
    select: (select count(*) from myalias)
  }

  // A join with multiple input streams,
  // joining the last event per stream forming pairs
  Select(instream, secondstream) -> outstream {
    select: (select a.tagId, b.tagId 
        from instream#lastevent as a, secondstream#lastevent as b)
  }
  
  // A join with multiple input streams and using aliases.
  Select(instream as S1, secondstream as S2) -> outstream {
    select: (select a.tagId, b.tagId 
        from S1#lastevent as a, S2#lastevent as b)
  }

This section outlines the steps to declare, instantiate, execute and cancel or complete data flows.

The com.espertech.esper.common.client.dataflow.core.EPDataFlowService available via getDataFlowService on EPRuntime manages declared data flows.

Use the instantiate method on EPDataFlowRuntime to instantiate a data flow after it has been declared. Pass the data flow name and optional instantiation options to the method. A data flow can be instantiated any number of times.

A data flow instance is represented by an instance of EPDataFlowInstance. Each instance has a state as well as methods to start, run, join and cancel as well as methods to obtain execution statistics.

Various optional arguments including operator parameters can be passed to instantiate via the EPDataFlowInstantiationOptions object as explained in more detail below.

The following code snippet instantiates the data flow:

EPDataFlowInstance instance =
  runtime.getDataFlowService().instantiate(deployment.getDeploymentId(), "HelloWorldDataFlow");

The runtime does not track or otherwise retain data flow instances in memory. It is up to your application to retain data flow instances as needed.

Each data flow instance associates to a state. The start state is EPDataFlowState.INSTANTIATED. The end state is either COMPLETED or CANCELLED.

The following table outlines all states:


After your application instantiated a data flow instance it can execute the data flow instance using either the start, run or startCaptive methods.

Use the start method to have the runtime allocate a thread for each source operator. Execution is non-blocking. Use the join method to have one or more threads join a data flow instance execution.

Use the run method to have the runtime use the current thread to execute the single source operator. Multiple source operators are not allowed when using run.

Use the startCaptive method to have the runtime return all Runnable instances and emitters, for the purpose of having complete control over execution. The runtime allocates no threads and does not perform any logic for the data flow unless your application employs the Runnable instances and emitters returned by the method.

The next code snippet executes the data flow instance as a blocking call:

instance.run();

By using the run method of EPDataFlowInstance the runtime executes the data flow instance using the same thread (blocking execute) and returns when the data flow instance completes. A data flow instance completes when all operators receive final markers.

The hello world data flow simply prints an unformatted Hello World string to console. The BeaconSource operator generates a final marker when it finishes the 1 iteration. The data flow instance thus transitions to complete after the LogSink operator receives the final marker, and the thread invoking the run method returns.

The next code snippet executes the data flow instance as a non-blocking call:

instance.start();

Use the cancel method to cancel execution of a running data flow instance:

instance.cancel();

Use the join method to join execution of a running data flow instance, causing the joining thread to block until the data flow instance either completes or is cancelled:

instance.join();

The EPDataFlowInstantiationOptions object that can be passed to the instantiate method may be used to customize the operator graph, operator parameters and execution of the data flow instance.

Passing runtime parameters to data flow operators is easiest using the addParameterURI method. The first parameter is the data flow operator name and the operator parameter name separated by the slash character. The second parameter is the value object.

For example, in order to pass the file name to the FileSource operator at runtime, use the following code:

EPDataFlowInstantiationOptions options = new EPDataFlowInstantiationOptions();
options.addParameterURI("FileSource/file", filename);
EPDataFlowInstance instance = runtime.getDataFlowService().instantiate(deployment.getDeploymentId(), "MyFileReaderDataFlow",options);
instance.run();

The optional operatorProvider member takes an implementation of the EPDataFlowOperatorProvider interface. The runtime invokes this provider to obtain operator instances.

The optional parameterProvider member takes an implementation of the EPDataFlowOperatorParameterProvider interface. The runtime invokes this provider to obtain operator parameter values. The values override the values provided via parameter URI above.

The optional exceptionHandler member takes an implementation of the EPDataFlowExceptionHandler interface. The runtime invokes this provider to when exceptions occur.

The optional dataFlowInstanceId can be assigned any string value for the purpose of identifying the data flow instance.

The optional dataFlowInstanceUserObject can be assigned any object value for the purpose of associating a user object to the data flow instance.

Set the operatorStatistics flag to true to obtain statistics for operator execution.

Set the cpuStatistics flag to true to obtain CPU statistics for operator execution.

Use the startCaptive method on a EPDataFlowInstance data flow instance when your application requires full control over threading. This method returns an EPDataFlowInstanceCaptive instance that contains a list of java.lang.Runnable instances that represent each source operator.

The special Emitter operator can occur in a data flow. This emitter can be used to inject events into the data flow without writing a new operator. Emitter takes a single name parameter that provides the name of the emitter and that is returned in a map of emitters by EPDataFlowInstanceCaptive.

The example EPL below creates a data flow that uses emitter.

create dataflow HelloWorldDataFlow
  create objectarray schema SampleSchema(text string),	// sample type		
	
  Emitter -> helloworld.stream<SampleSchema> { name: 'myemitter' }
  LogSink(helloworld.stream) {}

Your application may obtain the Emitter instance and sends events directly into the output stream. This feature is only supported in relationship with startCaptive since the runtime does not allocate any threads or run source operators.

The example code snippet below obtains the emitter instance and send events directly into the data flow instance:

EPDataFlowInstance instance =
      runtime.getDataFlowService().instantiate(deployment.getDeploymentId(), "HelloWorldDataFlow", options);
EPDataFlowInstanceCaptive captiveStart = instance.startCaptive();
Emitter emitter = captiveStart.getEmitters().get("myemitter");
emitter.submit(new Object[] {"this is some text"});

When emitting DOM XML events please emit the root element obtained from document.getDocumentElement().

The following example is a rolling top words count implemented as a data flow, over a 30 second time window and providing the top 3 words every 2 seconds:

create dataflow RollingTopWords
  create objectarray schema WordEvent (word string),
  
  Emitter -> wordstream<WordEvent> {name:'a'} {} // Produces word stream
  
  Select(wordstream) -> wordcount { // Sliding time window count per word
    select: (select word, count(*) as wordcount 
          from wordstream#time(30) group by word)
  }

  Select(wordcount) -> wordranks { // Rank of words
    select: (select window(*) as rankedWords 
          from wordcount#sort(3, wordcount desc) 
          output snapshot every 2 seconds)
  }
  
  LogSink(wordranks) {}

The next example implements a bargain index computation that separates a mixed trade and quote event stream into a trade and a quote stream, computes a vwap and joins the two streams to compute an index:

create dataflow VWAPSample
  create objectarray schema TradeQuoteType as (type string, ticker string, price double, volume long, askprice double, asksize long),
  
  MyObjectArrayGraphSource -> TradeQuoteStream<TradeQuoteType> {}
  
  Filter(TradeQuoteStream) -> TradeStream {
    filter: type = "trade"
  }
  
  Filter(TradeQuoteStream) -> QuoteStream {
    filter: type = "quote"
  }
  
  Select(TradeStream) -> VwapTrades {
    select: (select ticker, sum(price * volume) / sum(volume) as vwap, 
          min(price) as minprice
          from TradeStream#groupwin(ticker)#length(4) group by ticker)
  }
  
  Select(VwapTrades as T, QuoteStream as Q) -> BargainIndex {
    select: 
      (select case when vwap > askprice then asksize * (Math.exp(vwap - askprice)) else 0.0d end as index
      from T#unique(ticker) as t, Q#lastevent as q
      where t.ticker = q.ticker)
  }
  
  LogSink(BargainIndex) {}

The final example is a word count data flow, in which three custom operators tokenize, word count and aggregate. The custom operators in this example are discussed next.

create dataflow WordCount
  MyLineFeedSource -> LineOfTextStream {}
  MyTokenizerCounter(LineOfTextStream) -> SingleLineCountStream {}
  MyWordCountAggregator(SingleLineCountStream) -> WordCountStream {}
  LogSink(WordCountStream) {}

This section discusses how to implement classes that serve as operators in a data flow. The section employs the example data flow as shown earlier.

This example data flow has operators MyLineFeedSource, MyTokenizerCounter and MyWordCountAggregator that are application provided operators:

create dataflow WordCount
  MyLineFeedSource -> LineOfTextStream {}
  MyTokenizerCounter(LineOfTextStream) -> SingleLineCountStream {}
  MyWordCountAggregator(SingleLineCountStream) -> WordCountStream {}
  LogSink(WordCountStream) {}

Each operator requires implementing the following interfaces:

The compiler must be able to find the class implementing DataFlowOperatorForge. Add the forge package or forge class to imports:

// Sample code adds 'package.*' to simply import the package.
Configuration configuration = new Configuration();
configuration.getCommon().addImport(MyLineFeedSourceForge.class.getName());

Every operator has a forge class that implements the DataFlowOperatorForge interface and is only used at compile-time. The compiler provides the operator parameter expressions to the forge instance and invokes the initializeForge method. When it is time to compile the compiler generates code by invoking the make method.

// The OutputTypes annotation can be used to specify the type of events
// that are output by the operator.
// If provided, it is not necessary to declare output types in the data flow.
// The event representation is object-array.
@OutputTypes(value = {
        @OutputType(name = "line", typeName = "String")
})

// Provide the DataFlowOpProvideSignal annotation to indicate that
// the source operator provides a final marker.
@DataFlowOpProvideSignal
public class MyLineFeedSourceForge implements DataFlowOperatorForge {

    public DataFlowOpForgeInitializeResult initializeForge(DataFlowOpForgeInitializeContext context) throws ExprValidationException {
        return null;
    }

    public CodegenExpression make(CodegenMethodScope parent, SAIFFInitializeSymbol symbols, CodegenClassScope classScope) {
        return newInstance(MyLineFeedSourceFactory.class);
    }
}

The operator factory class must implement the DataFlowOperatorFactory interface. At deployment time the operator factory initializes using the code generated in the forge make method. Upon instantiating a data flow the factory must return an operator instance.

The implementation for the sample MyLineFeedSourceFactory is:

public class MyLineFeedSourceFactory implements DataFlowOperatorFactory {

    public void initializeFactory(DataFlowOpFactoryInitializeContext context) {
    }

    public DataFlowOperator operator(DataFlowOpInitializeContext context) {
        return new MyLineFeedSource(Collections.emptyIterator());
    }
}

The operator implementation for the sample MyLineFeedSource is:

public class MyLineFeedSource implements DataFlowSourceOperator {

    @DataFlowContext
    private EPDataFlowEmitter dataFlowEmitter;

    private final Iterator<String> lines;

    public MyLineFeedSource(Iterator<String> lines) {
        this.lines = lines;
    }

    public void open(DataFlowOpOpenContext openContext) {
    }

    public void next() {
        if (lines.hasNext()) {
            dataFlowEmitter.submit(new Object[]{lines.next()});
        } else {
            dataFlowEmitter.submitSignal(new EPDataFlowSignalFinalMarker() {
            });
        }
    }

    public void close(DataFlowOpCloseContext openContext) {
    }
}

The forge instance receives parameters expressions. A forge can declare parameters like so:

// Expose a parameter named "file" that takes any expression as parameter
@DataFlowOpParameter
private ExprNode file;

// Expose a parameter named "adapterInputSource" that will be an instance of some interface
// Interface implementations as parameters are declare a Map<String, Object>
@DataFlowOpParameter
private Map<String, Object> adapterInputSource;

// Expose a paramerer named "propertyNames" that is an array of string constants
@DataFlowOpParameter
private String[] propertyNames;

The forge class can obtain the output event type if needed. It should also validate the expression parameters and throw ExprValidationException if the parameter expression does not return the expected type. The utility class DataFlowParameterValidation has validate utility methods that return a validated expression: For example:

public DataFlowOpForgeInitializeResult initializeForge(DataFlowOpForgeInitializeContext context) throws ExprValidationException {
  // Obtain the declared output event type
  outputEventType = context.getOutputPorts().get(0).getOptionalDeclaredType() != null ? context.getOutputPorts().get(0).getOptionalDeclaredType().getEventType() : null;
  if (outputEventType == null) {
    throw new ExprValidationException("No event type provided for output, please provide an event type name");
  }

  // validate the "file" parameter expression expected to return a String-typed value
  file = DataFlowParameterValidation.validate("file", file, String.class, context);
  return null;
}

The forge class passes parameters to the factory. We use SAIFFInitializeBuilder that is a builder utility for building the factory. For example:

public CodegenExpression make(CodegenMethodScope parent, SAIFFInitializeSymbol symbols, CodegenClassScope classScope) {
  return new SAIFFInitializeBuilder(FileSourceFactory.class, this.getClass(), "factory", parent, symbols, classScope)
    .exprnode("file", file)
    .constant("propertyNames", propertyNames)
    .map("adapterInputSource", adapterInputSource)
    .build();
}

The factory class must have setter-methods of the same name that receive the parameters:

private ExprEvaluator file;
private String[] propertyNames;
private Map<String, Object> adapterInputSource;

    public void setFile(ExprEvaluator file) {
        this.file = file;
    }
    
    public void setPropertyNames(String[] propertyNames) {
        this.propertyNames = propertyNames;
    }

    public void setAdapterInputSource(Map<String, Object> adapterInputSource) {
        this.adapterInputSource = adapterInputSource;
    }

The factory class can resolve parameter values by evaluating expressions and by determining whether parameters were passed as options. The DataFlowParameterResolution class provides convenience methods. For example:

public DataFlowOperator operator(DataFlowOpInitializeContext context) {
  String fileName = DataFlowParameterResolution.resolveWithDefault("file", file, null, String.class, context);
  AdapterInputSource adapterInputSourceInstance = DataFlowParameterResolution.resolveOptionalInstance("adapterInputSource", adapterInputSource, AdapterInputSource.class, context);
  return new MyOperator(fileName, adapterInputSourceInstance);
}