Chapter 15. Extension and Plug-in

Chapter 15. Extension and Plug-in

15.1. Overview

This chapter summarizes integration and describes in detail each of the extension APIs that allow integrating external data and/or extend engine functionality.

For information on input and output adapters that connect to an event transport and perform event transformation for incoming and outgoing on-the-wire event data, for use with streaming data, please see the EsperIO reference documentation. Consider using the Plug-in Loader API for creating a new adapter that starts or stops as part of the CEP engine initialization and destroy lifecycle, see Section 12.16, “Plug-in Loader”.

To join data that resides in a relational database and that is accessible via JDBC driver and SQL statement the engine offers a syntax for using SQL within EPL, see Section 4.13, “Accessing Relational Data via SQL”. A relational database input and output adapter for streaming input from and output to a relational database also exists (EsperIO).

To join data that resides in a non-relational store the engine offers a two means: FIrst, the virtual data window, as described below, for transparently integrating the external store as a named window. The second mechanism is a special join syntax based on static method invocation, see Section 4.14, “Accessing Non-Relational Data via Method Invocation”.

15.2. Virtual Data Window

Use a virtual data window if you have a (large) external data store that you want to access as a named window. The access is transparent: There is no need to use special syntax or join syntax. All regular queries including subqueries, joins, on-merge, on-select, on-insert, on-delete, on-update and fire-and-forget are supported with virtual data windows.

There is no need to keep any data or events in memory with virtual data windows. The only requirement for virtual data windows is that all data rows returned are EventBean instances.

When implementing a virtual data window it is not necessary to send any events into the engine or to use insert-into. The event content is simply assumed to exist and accessible to the engine via the API implementation you provide.

The distribution ships with a sample virtual data window in the examples folder under the name virtualdw. The code snippets below are extracts from the example.

We use the term store here to mean a source set of data that is managed by the virtual data window. We use the term store row or just row to mean a single data item provided by the store. We use the term lookup to mean a read operation against the store returning zero, one or many rows.

Virtual data windows allow high-performance low-latency lookup by exposing all relevant EPL query access path information. This makes it possible for the virtual data window to choose the desired access method into its store.

The following steps are required to develop and use a virtual data window:

  1. Implement the interface com.espertech.esper.client.hook.VirtualDataWindowFactory.

  2. Implement the interface com.espertech.esper.client.hook.VirtualDataWindow.

  3. Implement the interface com.espertech.esper.client.hook.VirtualDataWindowLookup.

  4. Register the factory class in the engine configuration.

Once you have completed above steps, the virtual data window is ready to use in EPL statements.

The following restrictions apply to virtual data windows:

  1. The virtual data window may not be enumerated via the GetEnumerator or GetSafeEnumerator methods.

  2. A statement consuming a stream from a virtual data window does not get up-to-date state at statement creation time. For example, the EPL select count(*) from MySampleWindow always starts at count zero and not the count of events that are currently held in store.

  3. When a virtual data window itself originates new events, use insert-into to insert into the named window.

From a threading perspective, virtual data window implementation classes must be thread-safe if objects are shared between multiple named windows. If no objects are shared between multiple different named windows, thereby each object is only used for the same named window and other named windows receive a separate instance, it is no necessary that the implementation classes are thread-safe.

15.2.1. How to Use

Your application must first register the virtual data window factory as part of engine configuration:

Configuration config = new Configuration();
config.AddPlugInVirtualDataWindow<SampleVirtualDataWindowFactory>("sample", "samplevdw");
config.AddPlugInVirtualDataWindow("sample2", "samplevdw2", 
    typeof(SampleVirtualDataWindowFactory2).FullName);

Your application may then create a named window backed by a virtual data window.

For example, assume that the SampleEvent event type is declared as follows:

create schema SampleEvent as (key1 string, key2 string, value1 int, value2 double)

The next EPL statement creates a named window MySampleWindow that provides SampleEvent events and is backed by a virtual data window provided by SampleVirtualDataWindowFactory as configured above:

create window MySampleWindow.sample:samplevdw() as SampleEvent

You may then access the named window, same as any other named window, for example by subquery, join, on-action, fire-and-forget query or by consuming its insert and remove stream. While this example uses Map-type events, the example code is the same for POJO or other events.

Your application may obtain a reference to the virtual data window from the engine context.

This code snippet looks up the virtual data window by the named window name:

try {
  return (VirtualDataWindow) epService.Context.Lookup("/virtualdw/MySampleWindow");
}
catch (NamingException e) {
  throw new ApplicationException("Failed to look up virtual data window, is it created yet?");
}

15.2.1.1. Query Access Path

When you application registers a subquery, join or on-action query or executes a fire-and-forget query against a virtual data window the engine interacts with the virtual data window. The interaction is a two-step process.

At time of EPL statement creation (once), the engine analyzes the EPL where-clause, if present. It then compiles a list of hash-index and binary tree (btree, i.e. sorted) index properties. It passes the property names that are queried as well as the operators (i.e. =, >, range etc.) to the virtual data window. The virtual data window returns a lookup strategy object to the engine.

At time of EPL statement execution (repeatedly as triggered) , the engine uses that lookup strategy object to execute a lookup. It passes to the lookup all actual key values (hash, btree including ranges) to make fast and efficient lookup achievable.

To explain in detail, assume that your application creates an EPL statement with a subquery as follows:

select (select * from MySampleWindow where key1 = 'A1') from OtherEvent

At the time of creation of the EPL query above the engine analyzes the EPL query. It determines that the subquery queries a virtual data window. It determines from the where-clause that the lookup uses property key1 and hash-equals semantics. The engine then provides this information as part of VirtualDataWindowLookupContext passed to the getLookup method. Your application may inspect hash and btree properties and may determine the appropriate store access method to use.

The hash and btree property lookup information is for informational purposes, to enable fast and performant queries that returns the smallest number of rows possible. Your implementation classes may use some or none of the information provided and may also instead return some or perhaps even all rows, as is practical to your implementation. The where-clause still remains in effect and gets evaluated on all rows that are returned by the lookup strategy.

Following the above example, the sub-query executes once when a OtherEvent event arrives. At time of execution the engine delivers the string value A1 to the VirtualDataWindowLookup lookup implementation provided by your application. The lookup object queries the store and returns store rows as EventBean instances.

As a second example, consider an EPL join statement as follows:

select * from MySampleWindow, MyTriggerEvent where key1 = trigger1 and key2 = trigger2

The engine analyzes the query and passes to the virtual data window the information that the lookup occurs on properties key1 and key2 under hash-equals semantics. When a MyTriggerEvent arrives, it passes the actual value of the trigger1 and trigger2 properties of the current MyTriggerEvent to the lookup.

As a last example, consider an EPL fire-and-forget statement as follows:

select * from MySampleWindow key1 = 'A2' and value1 between 0 and 1000

The engine analyzes the query and passes to the virtual data window the lookup information. The lookup occurs on property key1 under hash-equals semantics and on property value1 under btree-open-range semantics. When you application executes the fire-and-forget query the engine passes A2 and the range endpoints 0 and 1000 to the lookup.

For more information, please consult the JavaDoc API documentation for class com.espertech.esper.client.hook.VirtualDataWindow, VirtualDataWindowLookupContext or VirtualDataWindowLookupFieldDesc.

15.2.2. Implementing the Factory

A virtual data window factory class is responsible for the following functions:

  • Accept the factory context object as a parameter and return the VirtualDataWindow implementation.

The sample code shown here can be found among the examples in the distribution under virtualdw:

public class SampleVirtualDataWindowFactory : IVirtualDataWindowFactory {
  public IVirtualDataWindow Create(VirtualDataWindowContext context) {
    return new SampleVirtualDataWindow(context);
  }
}

Your factory class must implement the create method which receives a VirtualDataWindowContext object. This method is called once for each EPL that creates a virtual data window (see example create window above).

The VirtualDataWindowContext provides to your application:

String namedWindowName;	// Name of named window being created.
Object[] parameters;  // Any optional parameters provided as part of create-window.
EventType eventType;  // The event type of events.
EventBeanFactory eventFactory;  // A factory for creating EventBean instances from store rows.
VirtualDataWindowOutStream outputStream;  // For stream output to consuming statements.
StatementContext statementContext;  // Other EPL statement information.

15.2.3. Implementing the Virtual Data Window

A virtual data window implementation is responsible for the following functions:

  • Accept the lookup context object as a parameter and return the VirtualDataWindowLookup implementation.

  • Optionally, post insert and remove stream data.

The sample code shown here can be found among the examples in the distribution under virtualdw.

The implementation class must implement the VirtualDataWindow interface like so:

public class SampleVirtualDataWindow : IVirtualDataWindow {

  private readonly VirtualDataWindowContext context;
  
  public SampleVirtualDataWindow(VirtualDataWindowContext context) {
    this.context = context;
  } ...

When the engine compiles an EPL statement and detects a virtual data window, the engine invokes the getLookup method indicating hash and btree access path information by passing a VirtualDataWindowLookupContext context. The lookup method must return a VirtualDataWindowLookup implementation that the EPL statement uses for all lookups until the EPL statement is stopped or destroyed.

The sample implementation does not use the hash and btree access path information and simply returns a lookup object:

public VirtualDataWindowLookup GetLookup(VirtualDataWindowLookupContext desc) {

  // Place any code that interrogates the hash-index and btree-index fields here.

  // Return the lookup strategy.
  return new SampleVirtualDataWindowLookup(context);
}

If your virtual data window returns null instead of a lookup object, the EPL query creation fails and throws an EPStatementException.

The engine calls the update method when data changes because of on-merge, on-delete, on-update or insert-into. For example, if you have an on-merge statement that is triggered and that updates the virtual data window, the newData parameter receives the new (updated) event and the oldData parameters receives the event prior to the update. Your code may use these events to update the store or delete from the store, if needed.

If your application plans to consume data from the virtual data window, for example via select * from MySampleWindow, then the code must implement the update method to forward insert and remove stream events, as shown below, to receive the events in consuming statements. To post insert and remove stream data, use the VirtualDataWindowOutStream provided by the context object as follows.

public void update(Object sender, UpdateEventArgs e) {
  // This sample simply posts into the insert and remove stream what is received.
  context.OutputStream.Update(sender, e);
}

Your application should not use VirtualDataWindowOutStream to post new events that originate from the store. The object is intended for use with on-action EPL statements. Use insert-into instead for any new events that originate from the store.

15.2.4. Implementing the Lookup

A lookup implementation is responsible for the following functions:

  • Accept the lookup values as a parameter and return a set of EventBean instances.

The sample code shown here can be found among the examples in the distribution under virtualdw.

The implementation class must implement the VirtualDataWindowLookup interface:

public class SampleVirtualDataWindowLookup : IVirtualDataWindowLookup {

  private readonly VirtualDataWindowContext context;
  
  public SampleVirtualDataWindowLookup(VirtualDataWindowContext context) {
    this.context = context;
  } ...

When an EPL query fires, the engine invokes the lookup and provides the actual lookup values. The lookup values are provided in the same exact order as the access path information that the engine provided when obtaining the lookup.

Each store row must be wrapped as an EventBean instance. The context object provides an EventBeanFactory implementation returned by getEventFactory() that can be used to wrap rows.

The sample implementation does not use the lookup values and simply returns a hardcoded sample event:

public ICollection<EventBean> Lookup(Object[] lookupValues) {
  // Add code to interogate lookup values here.

  // Create sample event.
  // This example uses Map events; Other underlying events such as POCO are exactly the same code.
  IDictionary<String, Object> eventData = new Dictionary<String, Object>();
  eventData["key1" = "sample1";
  eventData["key2" = "sample2";
  eventData["value1" = 100;
  eventData["value2"] = 1.5d;
  EventBean @event = context.EventFactory.Wrap(eventData);
  return new EventBean[]{ @event };
}

The lookupValues object array represents all actual joined property values or expression results if you where-clause criteria are expressions. The code may use these keys to for efficient store access.

When a key value is a range, the key value is an instance of VirtualDataWindowKeyRange.

15.3. Single-Row Functions

Single-row functions return a single value. They are not expected to aggregate rows but instead should be stateless functions. These functions can appear in any expressions and can be passed any number of parameters.

The following steps are required to develop and use a custom single-row function with Esper.

  1. Implement a class providing one or more public static methods accepting the number and type of parameters as required.

  2. Register the single-row function class and method name with the engine by supplying a function name, via the engine configuration file or the configuration API.

You may not override a built-in function with a single-row function provided by you. The single-row function you register must have a different name then any of the built-in functions.

An example single-row function can also be found in the examples under the runtime configuration example.

15.3.1. Implementing a Single-Row Function

Single-row function classes have no further requirement then provide a public static method.

The following sample single-row function simply computes a percentage value based on two number values.

This sample class provides a public static method by name computePercent to return a percentage value:

public class MyUtilityClass {
  public static double computePercent(double amount, double total) {
    return amount / total * 100;
  }
}

15.3.2. Configuring the Single-Row Function Name

The class name of the class, the method name and the function name of the new single-row function must be added to the engine configuration via the configuration API or using the XML configuration file. The configuration shown below is XML however the same options are available through the runtime and static configuration API:

<esper-configuration
  <plugin-singlerow-function name="percent" 
    function-class="mycompany.MyUtilityClass" function-method="computePercent" />
</esper-configuration>

Note that the function name and method name need not be the same.

The new single-row function is now ready to use in a statement:

select percent(50,100) from MyEvent

If the single-row function returns an object that provides further functions, you may chain function calls.

The following demonstrates a chained single-row function. The example assumes that a single-row function by name calculator returns an object that provides the add function which accepts two parameters:

select calculator().add(5, amount) from MyEvent

15.4. Derived-value and Data Window View

Views in Esper are used to derive information from an event stream, and to represent data windows onto an event stream. This chapter describes how to plug-in a new, custom view.

The following steps are required to develop and use a custom view with Esper.

  1. Implement a view factory class. View factories are classes that accept and check view parameters and instantiate the appropriate view class.

  2. Implement a view class. A view class commonly represents a data window or derives new information from a stream.

  3. Configure the view factory class supplying a view namespace and name in the engine configuration file.

The example view factory and view class that are used in this chapter can be found in the examples source folder in the OHLC (open-high-low-close) example. The class names are OHLCBarPlugInViewFactory and OHLCBarPlugInView.

Views can make use of the following engine services available via StatementServiceContext:

  • The SchedulingService interface allows views to schedule timer callbacks to a view

  • The EventAdapterService interface allows views to create new event types and event instances of a given type.

  • The StatementStopService interface allows view to register a callback that the engine invokes to indicate that the view's statement has been stopped

Section 15.4.3, “View Contract” outlines the requirements for correct behavior of a your custom view within the engine.

Note that custom views may use engine services and APIs that can be subject to change between major releases. The engine services discussed above and view APIs are considered part of the engine internal public API and are stable. Any changes to such APIs are disclosed through the release change logs and history. Please also consider contributing your custom view to the Esper project team by submitting the view code through the mailing list or via a JIRA issue.

15.4.1. Implementing a View Factory

A view factory class is responsible for the following functions:

  • Accept zero, one or more view parameters. View parameters are themselves expressions. The view factory must validate and evaluate these expressions.

  • Instantiate the actual view class.

  • Provide information about the event type of events posted by the view.

View factory classes simply subclass com.espertech.esper.view.ViewFactorySupport:

public class OHLCBarPlugInViewFactory : ViewFactorySupport { ...

Your view factory class must implement the SetViewParameters method to accept and parse view parameters. The next code snippet shows an implementation of this method. The code checks the number of parameters and retains the parameters passed to the method:

public class OHLCBarPlugInViewFactory : ViewFactorySupport {
    private ViewFactoryContext viewFactoryContext;
    private IList<ExprNode> viewParameters;
    private ExprNode timestampExpression;
    private ExprNode valueExpression;

    public void SetViewParameters(ViewFactoryContext viewFactoryContext, IList<ExprNode> viewParameters) {
        this.viewFactoryContext = viewFactoryContext;
        if (viewParameters.size() != 2) {
            throw new ViewParameterException(
                "View requires a two parameters: " +
                "the expression returning timestamps and the expression supplying OHLC data points");
        }
        this.viewParameters = viewParameters;
    }
  ...

After the engine supplied view parameters to the factory, the engine will ask the view to attach to its parent view and validate any parameter expressions against the parent view's event type. If the view will be generating events of a different type then the events generated by the parent view, then the view factory can create the new event type in this method:

public void Attach(EventType parentEventType, 
		StatementContext statementContext, 
		ViewFactory optionalParentFactory, 
		IList<ViewFactory> parentViewFactories) {
		
    ExprNode[] validatedNodes = ViewFactorySupport.Validate("OHLC view", 
	      parentEventType, statementContext, viewParameters, false);

    timestampExpression = validatedNodes[0];
    valueExpression = validatedNodes[1];

    if ((timestampExpression.ExprEvaluator.ReturnType != typeof(long)) && 
        (timestampExpression.ExprEvaluator.ReturnType != typeof(long?))) {
        throw new ViewParameterException(
            "View requires long-typed timestamp values in parameter 1");
    }
    if ((valueExpression.ExprEvaluator.ReturnType != typeof(double)) && 
        (valueExpression.ExprEvaluator.ReturnType != typeof(double?))) {
        throw new ViewParameterException(
            "View requires double-typed values for in parameter 2");
    }
}

Finally, the engine asks the view factory to create a view instance, and asks for the type of event generated by the view:

public View MakeView(StatementContext statementContext) {
    return new OHLCBarPlugInView(statementContext, timestampExpression, valueExpression);
}

public EventType EventType {
	get { return OHLCBarPlugInView.GetEventType(viewFactoryContext.EventAdapterService) }
}

15.4.2. Implementing a View

A view class is responsible for:

  • The Parent property informs the view of the parent view's event type

  • The Update method receives insert streams and remove stream events from its parent view

  • The GetEnumerator method supplies an (optional) iterator to allow an application to pull or request results from an EPStatement

  • The CloneView method must make a configured copy of the view to enable the view to work in a grouping context together with a std:groupwin parent view

View classes simply subclass com.espertech.esper.view.ViewSupport:

public class MyTrendSpotterView : ViewSupport { ...

Your view's Update method will be processing incoming (insert stream) and outgoing (remove stream) events posted by the parent view (if any), as well as providing incoming and outgoing events to child views. The convention required of your update method implementation is that the view releases any insert stream events (EventBean object references) which the view generates as reference-equal remove stream events (EventBean object references) at a later time.

The view implementation must call the UpdateChildren method to post outgoing insert and remove stream events. Similar to the Update method, the UpdateChildren method takes insert and remove stream events as parameters.

A sample Update method implementation is provided in the OHLC example.

15.4.3. View Contract

The Update method must adhere to the following conventions, to prevent memory leaks and to enable correct behavior within the engine:

  • A view implementation that posts events to the insert stream must post unique EventBean object references as insert stream events, and cannot post the same EventBean object reference multiple times. The underlying event to the EventBean object reference can be the same object reference, however the EventBean object reference posted by the view into the insert stream must be a new instance for each insert stream event.

  • If the custom view posts a continuous insert stream, then the views must also post a continuous remove stream (second parameter to the UpdateChildren method). If the view does not post remove stream events, it assumes unbound keep-all semantics.

  • EventBean events posted as remove stream events must be the same object reference as the EventBean events posted as insert stream by the view. Thus remove stream events posted by the view (the EventBean instances, does not affect the underlying representation) must be reference-equal to insert stream events posted by the view as part of an earlier invocation of the update method, or the same invocation of the update method.

  • EventBean events represent a unique observation. The values of the observation can be the same, thus the underlying representation of an EventBean event can be reused, however event property values must be kept immutable and not be subject to change.

  • Array elements of the insert and remove stream events must not carry null values. Array size must match the number of EventBean instances posted. It is recommended to use a null value for no insert or remove stream events rather then an empty zero-size array.

The engine can provide a callback to the view indicating when a statement using the view is stopped. The callback is available to the view via the com.espertech.esper.view.StatementStopCallback interface. Your view code must subscribe to the stop callback in order for the engine to invoke the callback:

statementContext.StatementStopService.AddSubscriber(this);

Please refer to the sample views for a code sample on how to implement iterator and CloneView methods.

In terms of multiple threads accessing view state, there is no need for your custom view factory or view implementation to perform any synchronization to protect internal state. The iterator of the custom view implementation does also not need to be thread-safe. The engine ensures the custom view executes in the context of a single thread at a time. If your view uses shared external state, such external state must be still considered for synchronization when using multiple threads.

15.4.4. Configuring View Namespace and Name

The view factory class name as well as the view namespace and name for the new view must be added to the engine configuration via the configuration API or using the XML configuration file. The configuration shown below is XML however the same options are available through the configuration API:

<esper-configuration
  <plugin-view namespace="custom" name="ohlc" 
      factory-class="com.espertech.esper.example.ohlc.OHLCBarPlugInViewFactory" /> 
</esper-configuration>

The new view is now ready to use in a statement:

select * from StockTick.custom:ohlc(timestamp, price)

Note that the view must implement additional interfaces if it acts as a data window view, or works in a grouping context, as discussed in detail below.

15.4.5. Requirement for Data Window Views

Your custom view may represent an expiry policy and may retain events and thus act as a data window view. In order to allow the engine to validate that your view can be used with named windows, which allow only data window views, this section documents any additional requirement that your classes must fulfill.

Your view factory class must implement the com.espertech.esper.view.DataWindowViewFactory interface. This marker interface (no methods required) indicates that your view factory provides only data window views.

Your view class must implement the com.espertech.esper.view.DataWindowView interface. This marker interface indicates that your view is a data window view and therefore eligible to be used in any construct that requires a data window view.

15.4.6. Requirement for Grouped Views

Grouped views are views that operate under the std:groupwin view. When operating under one or more std:groupwin views, the engine instantiates a single view instance when the statement starts, and a new view instance per group criteria dynamically as new group criteria become known.

The next statement shows EPL for using a view instance per grouping criteria:

select * from StockTick.std:groupwin(symbol).custom:trendspotter(price)

Your view must implement the com.espertech.esper.view.CloneableView interface to indicate your view may create new views. This code snippet shows a sample implementation of the CloneView method required by the interface:

public View CloneView(StatementContext statementContext) {
  return new MyPlugInView(statementContext);	// pass any parameters along where
}

15.5. Aggregation Functions

Aggregation functions aggregate event property values or expression results obtained from one or more streams. Examples for built-in aggregation functions are count(*), sum(price * volume) or avg(distinct volume).

The optional keyword distinct ensures that only distinct (unique) values are aggregated and duplicate values are ignored by the aggregation function. Custom plug-in aggregation functions do not need to implement the logic to handle distinct values. This is because when the engine encounters the distinct keyword, it eliminates any non-distinct values before passing the value for aggregation to the custom aggregation function.

Custom aggregation functions can also be passed multiple parameters, as further discribed in Section 15.5.3, “Accepting Multiple Parameters”. In the example below the aggregation function accepts a single parameter.

The following steps are required to develop and use a custom aggregation function with Esper.

  1. Implement an aggregation function class.

  2. Register the aggregation function class with the engine by supplying a function name, via the engine configuration file or the runtime and static configuration API.

The code for the example aggregation function as shown in this chapter can be found in the runtime configuration example in the package com.espertech.esper.example.runtimeconfig by the name MyConcatAggregationFunction. The sample function simply concatenates string-type values.

15.5.1. Implementing an Aggregation Function

An aggregation function class is responsible for the following functions:

  • Implement a Validate method that validates the value type of the data points that the function must process.

  • Implement a ValueType property that returns the type of the aggregation value generated by the function. For example, the built-in count aggregation function returns Long.class as it generates long -typed values.

  • Implement an Enter method that the engine invokes to add a data point into the aggregation, when an event enters a data window

  • Implement a Leave method that the engine invokes to remove a data point from the aggregation, when an event leaves a data window

  • Implement a Value property that returns the current value of the aggregation.

Aggregation function classes simply subclass com.espertech.esper.epl.agg.AggregationSupport:

public class MyConcatAggregationFunction : AggregationSupport { ...

The engine generally constructs one instance of the aggregation function class for each time the function is listed in a statement, however the engine may decide to reduce the number of aggregation class instances if it finds equivalent aggregations. The class that provides the aggregation and extends AggregationSupport does not have to be threadsafe.

The constructor initializes the aggregation function:

public class MyConcatAggregationFunction : AggregationSupport {
  private readonly static char DELIMITER = ' ';
  private StringBuilder builder;
  private String delimiter;

  public MyConcatAggregationFunction() : base()
  {
    builder = new StringBuilder();
    delimiter = "";
  }
  ...

An aggregation function must provide an implementation of the Validate method that is passed a AggregationValidationContext validation context object. Within the validation context you find the result type of each of the parameters expressions to the aggregation function as well as information about constant values and data window use. Please see the API documentation for a comprehensive list of validation context information.

Since the example concatenation function requires string types, it implements a type check:

public void Validate(AggregationValidationContext validationContext) {
  if ((validationContext.ParameterTypes.Length != 1) ||
    (validationContext.ParameterTypes[0] != typeof(string))) {
    throw new ArgumentException("Concat aggregation requires a single parameter of type String");
  }
}

The Enter method adds a datapoint to the current aggregation value. The example Enter method shown below adds a delimiter and the string value to a string buffer:

public void Enter(Object value) {
  if (value != null) {
    builder.Append(delimiter);
    builder.Append(value.ToString());
    delimiter = DELIMITER.ToString();
  }
}

Conversly, the Leave method removes a datapoint from the current aggregation value. The example Leave method removes from the string buffer:

public void Leave(Object value) {
  if (value != null) {
	builder.Length = 0;
  }
}

In order for the engine to validate the type returned by the aggregation function against the types expected by enclosing expressions, the ValueType must return the result type of any values produced by the aggregation function:

public Class ValueType {
  get { return typeof(string); }
}

For on-demand queries the aggregation function must support resetting its value to empty or start values. Implement the Clear function to reset the value as shown below:

public void Clear() {
  builder = new StringBuilder();
  delimiter = "";
}

Finally, the engine obtains the current aggregation value by means of the Value property:

public Object Value {
  get { return builder.ToString(); }
}

15.5.2. Configuring the Aggregation Function Name

The aggregation function class name as well as the function name for the new aggregation function must be added to the engine configuration via the configuration API or using the XML configuration file. The configuration shown below is XML however the same options are available through the configuration API:

<esper-configuration
  <plugin-aggregation-function name="concat" 
    function-class="com.espertech.esper.example.runtimeconfig.MyConcatAggregationFunction" />
</esper-configuration>

The new aggregation function is now ready to use in a statement:

select concat(symbol) from StockTick.win:length(3)

15.5.3. Accepting Multiple Parameters

Your plug-in aggregation function may accept multiple parameters, simply by casting the Object parameter of the Enter and Leave method to Object[].

For instance, assume an aggregation function rangeCount that counts all values that fall into a range of values. The EPL that calls this function and provides a lower and upper bounds of 1 and 10 is:

select rangeCount(1, 10, myValue) from MyEvent

The Enter method of the plug-in aggregation function may look as follows:

public void Enter(Object value)  {
  Object[] params = (Object[]) value;
  int lower = (int) params[0];
  int upper = (int) params[1];
  int val = (int) params[2];
  if ((val >= lower) && (val <= upper)) {
    count++;
  }
}

Your plug-in aggregation function may want to validate parameter types or may want to know which parameters are constant-value expressions. Constant-value expressions are evaluated only once by the engine and could therefore be cached by your aggregation function for performance reasons. The engine provides constant-value information as part of the AggregationValidationContext passed to the Validate method.

15.6. Pattern Guard

Pattern guards are pattern objects that control the lifecycle of the guarded sub-expression, and can filter the events fired by the subexpression.

The following steps are required to develop and use a custom guard object with Esper.

  1. Implement a guard factory class, responsible for creating guard object instances.

  2. Implement a guard class.

  3. Register the guard factory class with the engine by supplying a namespace and name, via the engine configuration file or the configuration API.

The code for the example guard object as shown in this chapter can be found in the test source folder in the package com.espertech.esper.regression.client by the name MyCountToPatternGuardFactory. The sample guard discussed here counts the number of events occurring up to a maximum number of events, and end the sub-expression when that maximum is reached.

15.6.1. Implementing a Guard Factory

A guard factory class is responsible for the following functions:

  • Implement a SetGuardParameters method that takes guard parameters, which are themselves expressions.

  • Implement a MakeGuard method that constructs a new guard instance.

Guard factory classes subclass com.espertech.esper.pattern.guard.GuardFactorySupport:

public class MyCountToPatternGuardFactory : GuardFactorySupport { ...

The engine constructs one instance of the guard factory class for each time the guard is listed in a statement.

The guard factory class implements the SetGuardParameters method that is passed the parameters to the guard as supplied by the statement. It verifies the guard parameters, similar to the code snippet shown next. Our example counter guard takes a single numeric parameter:

public void SetGuardParameters(IList<ExprNode> guardParameters, 
			MatchedEventConvertor convertor) {
    String message = "Count-to guard takes a single integer-value expression as parameter";
    if (guardParameters.Count != 1) {
        throw new GuardParameterException(message);
    }

    if (guardParameters[0].ExprEvaluator.ReturnType != typeof(int)) {
        throw new GuardParameterException(message);
    }

    this.numCountToExpr = guardParameters[0];
    this.convertor = convertor;
}

The MakeGuard method is called by the engine to create a new guard instance. The example MakeGuard method shown below passes the maximum count of events to the guard instance. It also passes a Quitable implementation to the guard instance. The guard uses Quitable to indicate that the sub-expression contained within must stop (quit) listening for events.

public Guard MakeGuard(PatternContext context, 
      MatchedEventMap beginState, 
      Quitable quitable, 
      Object stateNodeId, 
      Object guardState) {
      
    Object parameter = PatternExpressionUtil.Evaluate("Count-to guard", 
        beginState, numCountToExpr, convertor);
    if (parameter == null) {
        throw new EPException("Count-to guard parameter evaluated to a null value");
    }

    int? numCountTo = (int?) parameter;
    return new MyCountToPatternGuard(numCountTo, quitable);
}

15.6.2. Implementing a Guard Class

A guard class has the following responsibilities:

  • Provides a StartGuard method that initalizes the guard.

  • Provides a StopGuard method that stops the guard, called by the engine when the whole pattern is stopped, or the sub-expression containing the guard is stopped.

  • Provides an Inspect method that the pattern engine invokes to determine if the guard lets matching events pass for further evaluation by the containing expression.

Guard classes subclass com.espertech.esper.pattern.guard.GuardSupport as shown here:

public abstract class GuardSupport : Guard { ...

The engine invokes the guard factory class to construct an instance of the guard class for each new sub-expression instance within a statement.

A guard class must provide an implementation of the StartGuard method that the pattern engine invokes to start a guard instance. In our example, the method resets the guard's counter to zero:

public void StartGuard() {
  counter = 0;
}

The pattern engine invokes the Inspect method for each time the sub-expression indicates a new event result. Our example guard needs to count the number of events matched, and quit if the maximum number is reached:

public bool Inspect(MatchedEventMap matchEvent) {
  counter++;
  if (counter > numCountTo) {
    quitable.GuardQuit();
    return false;
  }
  return true;
}

The Inspect method returns true for events that pass the guard, and false for events that should not pass the guard.

15.6.3. Configuring Guard Namespace and Name

The guard factory class name as well as the namespace and name for the new guard must be added to the engine configuration via the configuration API or using the XML configuration file. The configuration shown below is XML however the same options are available through the configuration API:

<esper-configuration
  <plugin-pattern-guard namespace="myplugin" name="count_to" 
      factory-class="com.espertech.esper.regression.client.MyCountToPatternGuardFactory"/>
</esper-configuration>

The new guard is now ready to use in a statement. The next pattern statement detects the first 10 MyEvent events:

select * from pattern [(every MyEvent) where myplugin:count_to(10)]

Note that the every keyword was placed within parentheses to ensure the guard controls the repeated matching of events.

15.7. Pattern Observer

Pattern observers are pattern objects that are executed as part of a pattern expression and can observe events or test conditions. Examples for built-in observers are timer:at and timer:interval. Some suggested uses of observer objects are:

  • Implement custom scheduling logic using the engine's own scheduling and timer services

  • Test conditions related to prior events matching an expression

The following steps are required to develop and use a custom observer object within pattern statements:

  1. Implement an observer factory class, responsible for creating observer object instances.

  2. Implement an observer class.

  3. Register an observer factory class with the engine by supplying a namespace and name, via the engine configuration file or the configuration API.

The code for the example observer object as shown in this chapter can be found in the test source folder in package com.espertech.esper.regression.client by the name MyFileExistsObserver. The sample observer discussed here very simply checks if a file exists, using the filename supplied by the pattern statement, and via the System.IO.File class.

15.7.1. Implementing an Observer Factory

An observer factory class is responsible for the following functions:

  • Implement a SetObserverParameters method that takes observer parameters, which are themselves expressions.

  • Implement a MakeObserver method that constructs a new observer instance.

Observer factory classes subclass com.espertech.esper.pattern.observer.ObserverFactorySupport:

public class MyFileExistsObserverFactory : ObserverFactorySupport { ...

The engine constructs one instance of the observer factory class for each time the observer is listed in a statement.

The observer factory class implements the SetObserverParameters method that is passed the parameters to the observer as supplied by the statement. It verifies the observer parameters, similar to the code snippet shown next. Our example file-exists observer takes a single string parameter:

public void SetObserverParameters(IList<ExprNode> expressionParameters, 
			MatchedEventConvertor convertor) {
    String message = "File exists observer takes a single string filename parameter";
    if (expressionParameters.Count != 1) {
	    throw new ObserverParameterException(message);
    }
    if (expressionParameters[0].ExprEvaluator.ReturnType != typeof(string)) {
	    throw new ObserverParameterException(message);
    }

    this.filenameExpression = expressionParameters[0];
    this.convertor = convertor;
}

The pattern engine calls the MakeObserver method to create a new observer instance. The example MakeObserver method shown below passes parameters to the observer instance:

public EventObserver MakeObserver(PatternContext context, 
			MatchedEventMap beginState, 
			ObserverEventEvaluator observerEventEvaluator, 
			Object stateNodeId, 
			Object observerState) {
    Object filename = PatternExpressionUtil.Evaluate("File-exists observer ", beginState, filenameExpression, convertor);
    if (filename == null) {
	    throw new EPException("Filename evaluated to null");
    }

    return new MyFileExistsObserver(beginState, observerEventEvaluator, filename.ToString());
}

The ObserverEventEvaluator parameter allows an observer to indicate events, and to indicate change of truth value to permanently false. Use this interface to indicate when your observer has received or witnessed an event, or changed it's truth value to true or permanently false.

The MatchedEventMap parameter provides a Map of all matching events for the expression prior to the observer's start. For example, consider a pattern as below:

a=MyEvent -> myplugin:my_observer(...)

The above pattern tagged the MyEvent instance with the tag "a". The pattern engine starts an instance of my_observer when it receives the first MyEvent. The observer can query the MatchedEventMap using "a" as a key and obtain the tagged event.

15.7.2. Implementing an Observer Class

An observer class has the following responsibilities:

  • Provides a StartObserve method that starts the observer.

  • Provides a StopObserve method that stops the observer, called by the engine when the whole pattern is stopped, or the sub-expression containing the observer is stopped.

Observer classes subclass com.espertech.esper.pattern.observer.ObserverSupport as shown here:

public class MyFileExistsObserver : EventObserver { ...

The engine invokes the observer factory class to construct an instance of the observer class for each new sub-expression instance within a statement.

An observer class must provide an implementation of the StartObserve method that the pattern engine invokes to start an observer instance. In our example, the observer checks for the presence of a file and indicates the truth value to the remainder of the expression:

public void startObserve() {
  File file = new File(filename);
  if (file.exists()) {
    observerEventEvaluator.observerEvaluateTrue(beginState);
  } 
  else {
    observerEventEvaluator.observerEvaluateFalse(); 
  }
}

Note the observer passes the ObserverEventEvaluator an instance of MatchedEventMap. The observer can also create one or more new events and pass these events through the Map to the remaining expressions in the pattern.

15.7.3. Configuring Observer Namespace and Name

The observer factory class name as well as the namespace and name for the new observer must be added to the engine configuration via the configuration API or using the XML configuration file. The configuration shown below is XML however the same options are available through the configuration API:

<esper-configuration
  <plugin-pattern-observer namespace="myplugin" name="file_exists" 
    factory-class="com.espertech.esper.regression.client.MyFileExistsObserverFactory" />
</esper-configuration>

The new observer is now ready to use in a statement. The next pattern statement checks every 10 seconds if the given file exists, and indicates to the listener when the file is found.

select * from pattern [every timer:interval(10 sec) -> myplugin:file_exists("myfile.txt")]

15.8. Event Type And Event Object

Creating a plug-in event representation can be useful under any of these conditions:

  • Your application has existing types that carry event metadata and event property values and your application does not want to (or cannot) extract or transform such event metadata and event data into one of the built-in event representations (Native objects, Map or XML DOM).

  • Your application must perform a network lookup or other dynamic resolution of event type and events.

Note that the classes to plug-in custom event representations are held stable between minor releases, but can be subject to change between major releases.

Currently, EsperIO provides the following additional event representations:

The source code is available for these and they are therefore excellent examples for how to implement a plug-in event representation. Please see the EsperIO documentation for usage details.

15.8.1. How It Works

Your application provides a plug-in event representation as an implementation of the com.espertech.esper.plugin.PlugInEventRepresentation interface. It registers the implementation class in the Configuration and at the same time provides a unique URI. This URI is called the root event representation URI. An example value for a root URI is type://xml/apacheaxiom/OMNode.

One can register multiple plug-in event representations. Each representation has a root URI. The root URI serves to divide the overall space of different event representations and plays a role in resolving event types and event objects.

There are two situations in an Esper engine instance asks an event representation for an event type:

  1. When an application registers a new event type using the method AddPlugInEventType on ConfigurationOperations, either at runtime or at configuration time.

  2. When an EPL statement is created with a new event type name (a name not seen before) and the URIs for resolving such names are set beforehand via PlugInEventTypeNameResolutionURIs on ConfigurationOperations.

The implementation of the PlugInEventRepresentation interface must provide implementations for two key interfaces: com.espertech.esper.client.EventType and EventBean. It must also implement several other related interfaces as described below.

The EventType methods provide event metadata including property names and property types. They also provide instances of EventPropertyGetter for retrieving event property values. Each instance of EventType represents a distinct type of event.

The EventBean implementation is the event itself and encapsulates the underlying event object.

15.8.2. Steps

Follow the steps outlined below to process event objects for your event types:

  1. Implement the EventType, EventPropertyGetter and EventBean interfaces.

  2. Implement the PlugInEventRepresentation interface, the PlugInEventTypeHandler and PlugInEventBeanFactory interfaces, then add the PlugInEventRepresentation class name to configuration.

  3. Register plug-in event types, and/or set the event type name resolution URIs, via configuration.

  4. Obtain an EventSender from EPRuntime via the GetEventSender(Uri[]) method and use that to send in your event objects.

Please consult the SDK documentation for further information on each of the interfaces and their respective methods.

15.8.3. URI-based Resolution

Assume you have registered event representations using the following URIs:

  1. type://myFormat/myProject/myName

  2. type://myFormat/myProject

  3. type://myFormat/myOtherProject

When providing an array of child URIs for resolution, the engine compares each child URI to each of the event representation root URIs, in the order provided. Any event representation root URIs that spans the child URI space becomes a candidate event representation. If multiple root URIs match, the order is defined by the more specific root URI first, to the least specific root URI last.

During event type resolution and event sender resolution you provide a child URI. Assuming the child URI provided is type://myFormat/myProject/myName/myEvent?param1=abc&param2=true. In this example both root URIs #1 (the more specific) and #1 (the less specific) match, while root URI #3 is not a match. Thus at the time of type resolution the engine invokes the AcceptType method on event presentation for URI #1 first (the more specific), before asking #2 (the less specific) to resolve the type.

The EventSender returned by the GetEventSender(Uri[]) method follows the same scheme. The event sender instance asks each matching event representation for each URI to resolve the event object in the order of most specific to least specific root URI, and the first event representation to return an instance of EventBean ends the resolution process for event objects.

The type:// part of the URI is an optional convention for the scheme part of an URI that your application may follow. URIs can also be simple names and can include parameters, as the MSDN documents for class System.Uri.

15.8.4. Example

This section implements a minimal sample plug-in event representation. For the sake of keeping the example easy to understand, the event representation is rather straightforward: an event is a System.Collections.Generic.IDictionary<string, string> object that consists of key-values pairs of type string.

The code shown next does not document method footprints. Please consult the MSDN documentation for method details.

15.8.4.1. Sample Event Type

First, the sample shows how to implement the EventType interface. The event type provides information about property names and types, as well as supertypes of the event type.

Our EventType takes a set of valid property names:

public class MyPlugInPropertiesEventType : EventType {
  private readonly IDictionary<string, string> properties;

  public MyPlugInPropertiesEventType(IDictionary<string, string> properties) {
    this.properties = properties;
  }

  public Type GetPropertyType(string property) {
    if (!IsProperty(property)) {
      return null;
    }
    return typeof(string);
  }

  public Type UnderlyingType {
    get { return typeof(IDictionary<string, string>); }
  }
  
  //... further methods below
}

An EventType is responsible for providing implementations of EventPropertyGetter to query actual events. The getter simply queries the Properties object underlying each event:

  public EventPropertyGetter GetGetter(string property) {
    readonly String propertyName = property;
    
    return new EventPropertyGetter() {
      public Object get(EventBean eventBean) throws PropertyAccessException {
        MyPlugInPropertiesEventBean propBean = (MyPlugInPropertiesEventBean) eventBean;
        return propBean.getProperties().getProperty(propertyName);
      }
      
      public boolean isExistsProperty(EventBean eventBean) {
        MyPlugInPropertiesEventBean propBean = (MyPlugInPropertiesEventBean) eventBean;
        return propBean.getProperties().getProperty(propertyName) != null;
      }
      
      public Object getFragment(EventBean eventBean) {
	    return null;	// The property is not a fragment
      }
    };
  }

Our sample EventType does not have supertypes. Supertypes represent an extends-relationship between event types, and subtypes are expected to exhibit the same event property names and types as each of their supertypes combined:

  public EventType[] SuperTypes {
    get { return null; } // no supertype for this example
  }

  public EventType[] DeepSuperTypes {
	get { return null ; }
  }
  
  public string Name {
    get { return name; }
  }

  public EventPropertyDescriptor[] PropertyDescriptors {
	get {
		return descriptors.Values.ToArray(); // Use of LINQ - ToArray() extension
	}
  }

  public EventPropertyDescriptor GetPropertyDescriptor(string propertyName) {
    return descriptors.Get(propertyName); // Use of compat extension
  }

  public FragmentEventType GetFragmentType(string property) {
    return null;  // sample does not provide any fragments
  }

The example event type as above does not provide fragments, which are properties of the event that can themselves be represented as an event, to keep the example simple.

15.8.4.2. Sample Event Bean

Each EventBean instance represents an event. The interface is straightforward to implement. In this example an event is backed by a Properties object:

public class MyPlugInPropertiesEventBean : EventBean {
  private readonly MyPlugInPropertiesEventType eventType;

  public MyPlugInPropertiesEventBean(MyPlugInPropertiesEventType eventType, 
        IDictionary<string, string> properties) {
    this.eventType = eventType;
    this.Properties = properties;
  }

  public EventType EventType {
    get { return eventType; }
  }

  public Object Get(string property) {
    EventPropertyGetter getter = eventType.GetGetter(property);
    return getter.Get(this);
  }

  public Object GetFragment(string property) {
    EventPropertyGetter getter = eventType.GetGetter(property);
    if (getter != null) {
      return getter.GetFragment(this);
    }
    return null;
  }

  public Object Underlying {
    get { return properties; }
  }

  protected IDictionary<string, string> Properties {
	get; private set;
  }    
}

15.8.4.3. Sample Event Representation

A PlugInEventRepresentation serves to create EventType and EventBean instances through its related interfaces.

The sample event representation creates MyPlugInPropertiesEventType and MyPlugInPropertiesEventBean instances. The PlugInEventTypeHandler returns the EventType instance and an EventSender instance.

Our sample event representation accepts all requests for event types by returning boolean true on the AcceptType method. When asked for the PlugInEventTypeHandler, it constructs a new EventType. The list of property names for the new type is passed as an initialization value provided through the configuration API or XML, as a comma-separated list of property names:

public class MyPlugInEventRepresentation : PlugInEventRepresentation {

  private IList<MyPlugInPropertiesEventType> types;

  public void Init(PlugInEventRepresentationContext context) {
    types = new List<MyPlugInPropertiesEventType>();
  }

  public bool AcceptsType(PlugInEventTypeHandlerContext context) {
    return true;
  }

  public PlugInEventTypeHandler GetTypeHandler(PlugInEventTypeHandlerContext eventTypeContext) {
    String proplist = (String) eventTypeContext.TypeInitializer;
    String[] propertyList = proplist.Split(',');

    HashSet<String> typeProps = new HashSet<String>(propertyList);

    MyPlugInPropertiesEventType eventType = new MyPlugInPropertiesEventType(typeProps);
    types.Add(eventType);

    return new MyPlugInPropertiesEventTypeHandler(eventType);
  }
  // ... more methods below
}

The PlugInEventTypeHandler simply returns the EventType as well as an implementation of EventSender for processing same-type events:

public class MyPlugInPropertiesEventTypeHandler :  PlugInEventTypeHandler {
  private readonly MyPlugInPropertiesEventType eventType;

  public MyPlugInPropertiesEventTypeHandler(MyPlugInPropertiesEventType eventType) {
    this.eventType = eventType;
  }

  public EventSender GetSender(EPRuntimeEventSender runtimeEventSender) {
    return new MyPlugInPropertiesEventSender(eventType, runtimeEventSender);
  }

  public EventType EventType {
    get { return eventType; }
  }
}

The EventSender returned by PlugInEventTypeHandler is expected process events of the same type or any subtype:

public class MyPlugInPropertiesEventSender : EventSender {
  private readonly MyPlugInPropertiesEventType type;
  private readonly EPRuntimeEventSender runtimeSender;

  public MyPlugInPropertiesEventSender(MyPlugInPropertiesEventType type, 
        EPRuntimeEventSender runtimeSender) {
    this.type = type;
    this.runtimeSender = runtimeSender;
  }

  public void SendEvent(Object @event) {
    if (!(@event is IDictionary<string,string>)) {
       throw new EPException("Sender expects a properties event");
    }
    EventBean eventBean = new MyPlugInPropertiesEventBean(type, (IDictionary<string,string>) @event);
    runtimeSender.ProcessWrappedEvent(eventBean);
  }
}

15.8.4.4. Sample Event Bean Factory

The plug-in event representation may optionally provide an implementation of PlugInEventBeanFactory. A PlugInEventBeanFactory may inspect event objects and assign an event type dynamically based on resolution URIs and event properties.

Our sample event representation accepts all URIs and returns a MyPlugInPropertiesBeanFactory:

public class MyPlugInEventRepresentation : PlugInEventRepresentation {

  // ... methods as seen earlier
  public bool AcceptsEventBeanResolution(
        PlugInEventBeanReflectorContext eventBeanContext) {
    return true;
  }

  public PlugInEventBeanFactory GetEventBeanFactory(
        PlugInEventBeanReflectorContext eventBeanContext) {
    return new MyPlugInPropertiesBeanFactory(types);
   }
}

Last, the sample MyPlugInPropertiesBeanFactory implements the PlugInEventBeanFactory interface. It inspects incoming events and determines an event type based on whether all properties for that event type are present:

public class MyPlugInPropertiesBeanFactory : PlugInEventBeanFactory {
  private readonly IList<MyPlugInPropertiesEventType> knownTypes;

  public MyPlugInPropertiesBeanFactory(IList<MyPlugInPropertiesEventType> types) {
    knownTypes = types;
  }

  public EventBean create(Object @event, URI resolutionURI) {
    Properties properties = (Properties) @event;

    // use the known types to determine the type of the object
    foreach (MyPlugInPropertiesEventType type in knownTypes) {
      // if there is one property the event does not contain, then its not the right type
      bool hasAllProperties = true;
      foreach (String prop in type.PropertyNames) {
        if (!properties.ContainsKey(prop)) {
          hasAllProperties = false;
          break;
        }
      }

      if (hasAllProperties) {
        return new MyPlugInPropertiesEventBean(type, properties);
      }
    }
    return null; // none match, unknown event
  }
}

© 2011 EsperTech Inc. All Rights Reserved