www.espertech.comDocumentation

Chapter 22. Integration and Extension

22.1. Overview
22.2. Single-Row Function
22.2.1. Using an Inlined Class to Provide a Single-Row Function
22.2.2. Using an Application Class to Provide a Single-Row Function
22.2.3. Value Cache
22.2.4. Single-Row Functions in Filter Predicate Expressions
22.2.5. Single-Row Functions Taking Events as Parameters
22.2.6. Single-Row Functions Returning Events
22.2.7. Receiving a Context Object
22.2.8. Exception Handling
22.3. Virtual Data Window
22.3.1. How to Use
22.3.2. Implementing the Forge
22.3.3. Implementing the Factory-Factory
22.3.4. Implementing the Factory
22.3.5. Implementing the Virtual Data Window
22.4. Data Window View and Derived-Value View
22.4.1. Implementing a View Forge
22.4.2. Implementing a View Factory
22.4.3. Implementing a View
22.4.4. View Contract
22.4.5. Configuring View Namespace and Name
22.4.6. Requirement for Data Window Views
22.4.7. Requirement for Derived-Value Views
22.5. Aggregation Function
22.5.1. Aggregation Single-Function Development
22.5.2. Aggregation Multi-Function Development
22.6. Pattern Guard
22.6.1. Implementing a Guard Forge
22.6.2. Implementing a Guard Factory
22.6.3. Implementing a Guard Class
22.6.4. Configuring Guard Namespace and Name
22.7. Pattern Observer
22.7.1. Implementing an Observer Forge
22.7.2. Implementing an Observer Factory
22.7.3. Implementing an Observer Class
22.7.4. Configuring Observer Namespace and Name
22.8. Date-Time Method
22.8.1. Implement the DateTimeMethodForgeFactory Interface
22.8.2. Implement the Static Methods
22.8.3. Add the Date-Time Method Extension to the Compiler Configuration
22.8.4. Use the new Date-Time Methods
22.9. Enumeration Method
22.9.1. Implement the EnumMethodForgeFactory Interface
22.9.2. Implement the EnumMethodState Interface
22.9.3. Implement the Static Method for Processing
22.9.4. Add the Enumeration Method Extension to the Compiler Configuration
22.9.5. Use the new Enumeration Method
22.9.6. Further Information to Lambda Parameters

This chapter summarizes integration and describes in detail each of the extension APIs that allow integrating external data and/or extend runtime functionality.

For information on calling external services via instance method invocation, for instance to integrate with dependency injection frameworks such as Spring or Guice, please see Section 5.17.5, “Class and Event-Type Variables”.

For information on input and output adapters that connect to an event transport and perform event transformation for incoming and outgoing on-the-wire event data, for use with streaming data, please see the EsperIO reference documentation. The data flow instances as described in Chapter 21, EPL Reference: Data Flow are an easy way to plug in operators that perform input and output. Data flows allow providing parameters and managing individual flows independent of runtime lifecycle. Also consider using the Plug-in Loader API for creating a new adapter that starts or stops as part of the CEP runtime initialization and destroy lifecycle, see Section 16.15, “Plug-In Loader”.

To join data that resides in a relational database and that is accessible via JDBC driver and SQL statement the runtime offers syntax for using SQL within EPL, see Section 5.13, “Accessing Relational Data via SQL”. A relational database input and output adapter for streaming input from and output to a relational database also exists (EsperIO).

To join data that resides in a non-relational store the runtime offers a two means: First, the virtual data window, as described below, for transparently integrating the external store as a named window. The second mechanism is a special join syntax based on static method invocation; see Section 5.14, “Accessing Non-Relational Data via Method, Script or UDF Invocation”.

Tip

The best way to test that your extension code works correctly is to write unit tests against a statement that utilizes the extension code. Samples can be obtained from Esper regression test code base.

Note

For all extension code and similar to listeners and subscribers, to send events into the runtime from extension code the routeEvent method should be used (and not sendEvent) to avoid the possibility of stack overflow due to event-callback looping and ensure correct processing of the current and routed event. Note that if outbound-threading is enabled, listeners and subscribers should use sendEvent and not routeEvent.

Note

For all extension code it is not safe to deploy and undeploy within the extension code. For example, it is not safe to implement a data window that deploys compiled modules and that undeploys deployments.

Single-row functions return a single value. They are not expected to aggregate rows but instead should be stateless non-blocking functions. These functions can appear in any expressions and can be passed any number of parameters.

You may not override a built-in function with a single-row function provided by you. The single-row function you register must have a different name then any of the built-in functions.

The EPL compiler provides two ways to provide plug-in single-row functions:

An example single-row function can also be found in the examples under the runtime configuration example.

You may use an inline class that is part of the EPL modules to provide a single-row function. For more information on inline classes see Chapter 18, Inlined Classes. Using an inline class does not require any compiler configuration.

Specify the ExtensionSingleRowFunction annotation on the class level and the name (the EPL function name) and methodName (the name of the exposed method). This annotation instructs the compiler that the inlined class exposes a single-row function.

This sample EPL includes an inlined class by name MyUtilityClass that provides a public static method by name computePercent to return a percentage value:

inlined_class """
  @com.espertech.esper.common.client.hook.singlerowfunc.ExtensionSingleRowFunction(name="computePercent", methodName="computePercent")
  public class MyUtilityClass {
    public static double computePercent(double amount, double total) {
      return amount / total * 100;
    }
  }
""" 
select computePercent(itemQuantity,totalQuantity) from OrderEvent

Only one ExtensionSingleRowFunction annotation can be specified per class and the annotation is only for use with inlined classes. The method can be overloaded. Using an inline class does not require any compiler configuration.

When using create inlined_class the runtime resolves dependencies on EPL objects at time of deployment (the same as for all EPL objects).

Your application may provide a class that exposes a single-row function. In this case you must configure the class name, name of the function and function method name as part of compiler configuration. This is not necessary when using inlined classes as discussed before.

Your EPL may use plug-in single row functions among the predicate expressions as part of the filters in a stream or pattern.

For example, the EPL below uses the function computeHash as part of a predicate expression:

select * from MyEvent(computeHash(field) = 100)

When you have many statements or many context partitions that refer to the same function, event type and parameters in a predicate expression, the compiler may optimize evaluation: The function gets evaluated only once per event.

While the optimization is enabled by default for all plug-in single row functions, you can also disable the optimization for a specific single-row function. By disabling the optimization for a single-row function the runtime may use less memory to identify reusable function footprints but may cause the runtime to evaluate each function more frequently than necessary.

If using inlined classes and the ExtensionSingleRowFunction annotation, specify the filter optimizable flag as follows (default is enabled):

ExtensionSingleRowFunction(..., filterOptimizable=com.espertech.esper.common.client.configuration.compiler.ConfigurationCompilerPlugInSingleRowFunction.FilterOptimizable.DISABLED)

If using application classes and the compiler configuration, the following configuration XML disables the filter optimization for a single-row function (by default it is enabled):

<esper-configuration xmlns="http://www.espertech.com/schema/esper">
  <compiler>
    <plugin-singlerow-function name="computeHash" 
      function-class="mycompany.HashUtil" function-method="computeHash"
      filter-optimizable="disabled" />
  </compiler>
</esper-configuration>

A single-row function may return events. Please declare your single-row function method to return Collection<EventBean> or EventBean[] and configure the event type name.

For example, assuming there is an MyItem event type such as created via create schema MyItem(id string):

public static EventBean[] myItemProducer(String string, EPLMethodInvocationContext context) {
  String[] split = string.split(",");
  EventBean[] events = new EventBean[split.length];
  for (int i = 0; i < split.length; i++) {
    events[i] = context.getEventBeanService().adapterForMap(Collections.singletonMap("id", split[i]), "MyItem");
  }
  return events;
}

The sample EPL queries items filtering those items that have a given value for the id field:

select myItemProducer(ordertext).where(v => v.id in ('id1', 'id3')) as c0 from Order

If using inlined classes and the ExtensionSingleRowFunction annotation, specify the event type name as follows:

ExtensionSingleRowFunction(..., eventTypeName="MyItem")

If using application classes and the compiler configuration, this sample code register the myItemProducer function as a single-row function with an event type name:

ConfigurationCompilerPlugInSingleRowFunction entry = new ConfigurationCompilerPlugInSingleRowFunction();
entry.setName("myItemProducer");
entry.setFunctionClassName(...);
entry.setFunctionMethodName(...);
entry.setEventTypeName("MyItem");
Configuration configuration = new Configuration();
configuration.getCompiler().addPlugInSingleRowFunction(entry);

If your single row function returns EventBean[] and is used with enumeration methods the configuration must provide an event type name.

Use a virtual data window if you have a (large) external data store that you want to access as a named window. The access is transparent: There is no need to use special syntax or join syntax. All regular queries including subqueries, joins, on-merge, on-select, on-insert, on-delete, on-update and fire-and-forget are supported with virtual data windows.

There is no need to keep any data or events in memory with virtual data windows. The only requirement for virtual data windows is that all data rows returned are EventBean instances.

When implementing a virtual data window it is not necessary to send any events into the runtime or to use insert-into. The event content is simply assumed to exist and accessible to the runtime via the API implementation you provide.

The distribution ships with a sample virtual data window in the examples folder under the name virtualdw. The code snippets below are extracts from the example.

We use the term store here to mean a source set of data that is managed by the virtual data window. We use the term store row or just row to mean a single data item provided by the store. We use the term lookup to mean a read operation against the store returning zero, one or many rows.

Virtual data windows allow high-performance low-latency lookup by exposing all relevant statement access path information. This makes it possible for the virtual data window to choose the desired access method into its store.

The following steps are required to develop and use a virtual data window:

Once you have completed above steps, the virtual data window is ready to use in statements.

From a threading perspective, virtual data window implementation classes must be thread-safe if objects are shared between multiple named windows. If no objects are shared between multiple different named windows, thereby each object is only used for the same named window and other named windows receive a separate instance, it is no necessary that the implementation classes are thread-safe.

Your application must first register the virtual data window factory as part of configuration:

Configuration config = new Configuration();
config.getCompiler().addPlugInVirtualDataWindow("sample", "samplevdw", 
    SampleVirtualDataWindowForge.class.getName());

Your application may then create a named window backed by a virtual data window.

For example, assume that the SampleEvent event type is declared as follows:

create schema SampleEvent as (key1 string, key2 string, value1 int, value2 double)

The next statement creates a named window MySampleWindow that provides SampleEvent events and is backed by a virtual data window:

create window MySampleWindow.sample:samplevdw() as SampleEvent

You may then access the named window, same as any other named window, for example by subquery, join, on-action, fire-and-forget query or by consuming its insert and remove stream. While this example uses Map-type events, the example code is the same for POJO or other events.

Your application may obtain a reference to the virtual data window from the runtime context.

This code snippet looks up the virtual data window by the named window name:

try {
  return (VirtualDataWindow) runtime.getContext().lookup("/virtualdw/MySampleWindow");
}
catch (NamingException e) {
  throw new RuntimeException("Failed to look up virtual data window, is it created yet?");
}

When your application registers a subquery, join or on-action query or executes a fire-and-forget query against a virtual data window the runtime interacts with the virtual data window. The interaction is a two-step process.

At time of deployment (once), the runtime uses the information the compiler collected by analyzing the EPL where-clause, if present. It then creates a list of hash-index and binary tree (btree, i.e. sorted) index properties. It passes the property names that are queried as well as the operators (i.e. =, >, range etc.) to the virtual data window. The virtual data window returns a lookup strategy object to the runtime.

At time of statement execution (repeatedly as triggered), the runtime uses that lookup strategy object to execute a lookup. It passes to the lookup all actual key values (hash, btree including ranges) to make fast and efficient lookup achievable.

To explain in detail, assume that your application creates a statement with a subquery as follows:

select (select * from MySampleWindow where key1 = 'A1') from OtherEvent

At the time of compilation of the statement above the compiler analyzes the statement. It determines that the subquery queries a virtual data window. It determines from the where-clause that the lookup uses property key1 and hash-equals semantics. The runtime then provides this information as part of VirtualDataWindowLookupContext passed to the getLookup method. Your application may inspect hash and btree properties and may determine the appropriate store access method to use.

The hash and btree property lookup information is for informational purposes, to enable fast and performant queries that return the smallest number of rows possible. Your implementation classes may use some or none of the information provided and may also instead return some or perhaps even all rows, as is practical to your implementation. The where-clause still remains in effect and gets evaluated on all rows that are returned by the lookup strategy.

Following the above example, the sub-query executes once when a OtherEvent event arrives. At time of execution the runtime delivers the string value A1 to the VirtualDataWindowLookup lookup implementation provided by your application. The lookup object queries the store and returns store rows as EventBean instances.

As a second example, consider an EPL join statement as follows:

select * from MySampleWindow, MyTriggerEvent where key1 = trigger1 and key2 = trigger2

The compiler analyzes the statement and the runtime passes to the virtual data window the information that the lookup occurs on properties key1 and key2 under hash-equals semantics. When a MyTriggerEvent arrives, it passes the actual value of the trigger1 and trigger2 properties of the current MyTriggerEvent to the lookup.

As a last example, consider a fire-and-forget query as follows:

select * from MySampleWindow key1 = 'A2' and value1 between 0 and 1000

The compiler analyzes the statement and the runtime passes to the virtual data window the lookup information. The lookup occurs on property key1 under hash-equals semantics and on property value1 under btree-open-range semantics. When your application executes the fire-and-forget query the runtime passes A2 and the range endpoints 0 and 1000 to the lookup.

For more information, please consult the JavaDoc API documentation for class VirtualDataWindow, VirtualDataWindowLookupContext or VirtualDataWindowLookupFieldDesc.

For each named window that refers to the virtual data window, the runtime instantiates one instance of the forge at compile-time.

A virtual data window forge class is responsible for the following functions:

The compiler instantiates a VirtualDataWindowForge instance for each named window created by create window. The compiler invokes the initialize method once in respect to the named window being created passing a VirtualDataWindowForgeContext context object.

The sample code shown here can be found among the examples in the distribution under virtualdw:

public class SampleVirtualDataWindowForge implements VirtualDataWindowForge {

    public void initialize(VirtualDataWindowForgeContext initializeContext) {
    }

    public VirtualDataWindowFactoryMode getFactoryMode() {
        // The injection strategy defines how to obtain and configure the factory-factory.
        InjectionStrategy injectionStrategy = new InjectionStrategyClassNewInstance(SampleVirtualDataWindowFactoryFactory.class);
        
        // The managed-mode is the default. It uses the provided injection strategy.
        VirtualDataWindowFactoryModeManaged managed = new VirtualDataWindowFactoryModeManaged();
        managed.setInjectionStrategyFactoryFactory(injectionStrategy);
        
        return managed;
    }

    public Set<String> getUniqueKeyPropertyNames() {
        // lets assume there is no unique key property names
        return null;
    }
}

Your forge class must implement the getFactoryMode method which instructs the compiler how to obtain a factory class that returns a factory for creating virtual data window instances (a factory-factory). The class acting as the factory-factory will be SampleVirtualDataWindowFactoryFactory.

For each named window that refers to the virtual data window, the runtime instantiates one instance of the factory.

A virtual data window factory class is responsible for the following functions:

The runtime instantiates a VirtualDataWindowFactory instance for each named window created via create window. The runtime invokes the initialize method once in respect to the named window being created passing a VirtualDataWindowFactoryContext context object.

If not using contexts, the runtime calls the create method once after calling the initialize method. If using contexts, the runtime calls the create method every time it allocates a context partition. If using contexts and your virtual data window implementation operates thread-safe, you may return the same virtual data window implementation object for each context partition. If using contexts and your implementation object is not thread safe, return a separate thread-safe implementation object for each context partition.

The runtime invokes the destroy method once when the named window is undeployed. If not using contexts, the runtime calls the destroy method of the virtual data window implementation object before calling the destroy method on the factory object. If using contexts, the runtime calls the destroy method on each instance associates to a context partition at the time the associated context partition terminates.

The sample code shown here can be found among the examples in the distribution under virtualdw:

public class SampleVirtualDataWindowFactory implements VirtualDataWindowFactory {

    public void initialize(VirtualDataWindowFactoryContext factoryContext) {
    }

    public VirtualDataWindow create(VirtualDataWindowContext context) {
        return new SampleVirtualDataWindow(context);
    }

    public void destroy() {
        // cleanup can be performed here
    }

    public Set<String> getUniqueKeyPropertyNames() {
        // lets assume there is no unique key property names
        return null;
    }
}

Your factory class must implement the create method which receives a VirtualDataWindowContext object. This method is called once for each EPL that creates a virtual data window (see example create window above).

The VirtualDataWindowContext provides to your application:

String namedWindowName;	// Name of named window being created.
Object[] parameters;  // Any optional parameters provided as part of create-window.
EventType eventType;  // The event type of events.
EventBeanFactory eventFactory;  // A factory for creating EventBean instances from store rows.
VirtualDataWindowOutStream outputStream;  // For stream output to consuming statements.
AgentInstanceContext agentInstanceContext;  // Other statement information in statement context.

When using contexts you can decide whether your factory returns a new virtual data window for each context partition or returns the same virtual data window instance for all context partitions. Your extension code may refer to the named window name to identify the named window and may refer to the agent instance context that holds the agent instance id which is the id of the context partition.

A virtual data window implementation is responsible for the following functions:

The sample code shown here can be found among the examples in the distribution under virtualdw.

The implementation class must implement the VirtualDataWindow interface like so:

public class SampleVirtualDataWindow implements VirtualDataWindow {

  private final VirtualDataWindowContext context;
  
  public SampleVirtualDataWindow(VirtualDataWindowContext context) {
    this.context = context;
  } ...

When the compiler compiles a statement and detects a virtual data window, the compiler compiles access path information and the runtime invokes the getLookup method indicating hash and btree access path information by passing a VirtualDataWindowLookupContext context. The lookup method must return a VirtualDataWindowLookup implementation that the statement uses for all lookups until the statement is stopped or destroyed.

The sample implementation does not use the hash and btree access path information and simply returns a lookup object:

public VirtualDataWindowLookup getLookup(VirtualDataWindowLookupContext desc) {

  // Place any code that interrogates the hash-index and btree-index fields here.

  // Return the lookup strategy.
  return new SampleVirtualDataWindowLookup(context);
}

The runtime calls the update method when data changes because of on-merge, on-delete, on-update or insert-into. For example, if you have an on-merge statement that is triggered and that updates the virtual data window, the newData parameter receives the new (updated) event and the oldData parameter receives the event prior to the update. Your code may use these events to update the store or delete from the store, if needed.

If your application plans to consume data from the virtual data window, for example via select * from MySampleWindow, then the code must implement the update method to forward insert and remove stream events, as shown below, to receive the events in consuming statements. To post insert and remove stream data, use the VirtualDataWindowOutStream provided by the context object as follows.

public void update(EventBean[] newData, EventBean[] oldData) {
  // This sample simply posts into the insert and remove stream what is received.
  context.getOutputStream().update(newData, oldData);
}

Your application should not use VirtualDataWindowOutStream to post new events that originate from the store. The object is intended for use with on-action statements. Use insert-into instead for any new events that originate from the store.

Views in EPL are used to derive information from an event stream, and to represent data windows onto an event stream. This chapter describes how to plug-in a new, custom view.

The following steps are required to develop and use a custom view.

  1. Implement a view forge class. View forges are compile-time classes that accept and check view parameters and refer to the appropriate view factory for the runtime.

  2. Implement a view factory class. View factories are classes that instantiate the appropriate view class at runtime.

  3. Implement a view class. A view class commonly represents a data window or derives new information from a stream at runtime.

  4. Configure the view factory class supplying a view namespace and name in the compiler configuration.

The example view factory and view class that are used in this chapter can be found in the examples source folder in the OHLC (open-high-low-close) example. The class names are OHLCBarPlugInViewForge, OHLCBarPlugInViewFactory and OHLCBarPlugInView.

Views can make use of the runtime services available via StatementContext, for example:

  • The SchedulingService interface allows views to schedule timer callbacks to a view

Section 22.4.4, “View Contract” outlines the requirements for correct behavior of your custom view within the runtime.

Note that custom views may use runtime services and APIs that can be subject to change between major releases. The runtime services discussed above and view APIs are considered part of the runtime internal API and are only limited stable. Please also consider contributing your custom view to the project by submitting the view code.

A view forge class is a compile-time class and is responsible for the following functions:

View forge classes must implement the ViewFactoryForge interface. Additionally a view forge class must implement the DataWindowViewForge interface if the view is a data window (retains events provided to it).

public class OHLCBarPlugInViewForge implements ViewFactoryForge { ...

Your view forge class must implement the setViewParameters method to accept view parameters and the attach method to attach the view to a stream:

public class OHLCBarPlugInViewForge implements ViewFactoryForge {
    private List<ExprNode> viewParameters;
    private ExprNode timestampExpression;
    private ExprNode valueExpression;
    private EventType eventType;

    public void setViewParameters(List<ExprNode> parameters, ViewForgeEnv viewForgeEnv, int streamNumber) throws ViewParameterException {
        this.viewParameters = parameters;
    }

    public void attach(EventType parentEventType, int streamNumber, ViewForgeEnv env, boolean grouped) throws ViewParameterException {
        if (viewParameters.size() != 2) {
            throw new ViewParameterException("View requires a two parameters: the expression returning timestamps and the expression supplying OHLC data points");
        }
        ExprNode[] validatedNodes = ViewForgeSupport.validate("OHLC view", parentEventType, viewParameters, false, env, streamNumber);

        timestampExpression = validatedNodes[0];
        valueExpression = validatedNodes[1];

        if (!JavaClassHelper.isTypeLong(timestampExpression.getForge().getEvaluationType())) {
            throw new ViewParameterException("View requires long-typed timestamp values in parameter 1");
        }
        if (!JavaClassHelper.isTypeDouble(valueExpression.getForge().getEvaluationType())) {
            throw new ViewParameterException("View requires double-typed values for in parameter 2");
        }
        ....

After the compiler supplied view parameters to the forge, the compiler will ask the view to attach to its parent and validate any parameter expressions against the parent view's event type. If the view will be generating events of a different type then the events generated by the parent view, then the view factory can allocate the new event type.

Finally, the compiler asks the view forge to generate code that initializes the view factory:

public CodegenExpression make(CodegenMethodScope parent, SAIFFInitializeSymbol symbols, CodegenClassScope classScope) {
    return new SAIFFInitializeBuilder(new EPTypeClass(OHLCBarPlugInViewFactory.class), this.getClass(), "factory", parent, symbols, classScope)
                .exprnode("timestampExpression", timestampExpression)
                .exprnode("valueExpression", valueExpression)
                .build();
}

Use the internal SAIFFInitializeBuilder to build your view factory providing it the expressions and other values it needs.

The update method must adhere to the following conventions, to prevent memory leaks and to enable correct behavior within the runtime:

Your view implementation must implement the AgentInstanceStopCallback interface to receive a callback when the view gets destroyed.

Please refer to the sample views for a code sample on how to implement the iterator method.

In terms of multiple threads accessing view state, there is no need for your custom view factory or view implementation to perform any synchronization to protect internal state. The iterator of the custom view implementation does also not need to be thread-safe. The runtime ensures the custom view executes in the context of a single thread at a time. If your view uses shared external state, such external state must be still considered for synchronization when using multiple threads.

Aggregation functions are stateful functions that aggregate events, event property values or expression results. Examples for built-in aggregation functions are count(*), sum(price * volume), window(*) or maxby(volume).

EPL allows two different ways for your application to provide aggregation functions. We use the name aggregation single-function and aggregation multi-function for the two independent extension APIs for aggregation functions.

The aggregation single-function API is simple to use however it imposes certain restrictions on how expressions that contain aggregation functions share state and how they are evaluated.

The aggregation multi-function API is more powerful and provides control over how expressions that contain aggregation functions share state and are evaluated.

The next table compares the two aggregation function extension API's:


The following sections discuss developing an aggregation single-function first, followed by the subject of developing an aggregation multi-function.

Note

The aggregation multi-function API is a powerful and lower-level API to extend the runtime. Any classes that are not part of the client package should be considered unstable and are subject to change between minor and major releases.

This section describes the aggregation single-function extension API for providing aggregation functions.

The EPL compiler provides two ways to provide aggregation single-functions:

In either case, the following steps are required to develop and use a custom aggregation single-function.

  1. Implement an aggregation function forge by implementing the interface com.espertech.esper.common.client.hook.aggfunc.AggregationFunctionForge. This class provides compile-time information.

  2. Implement an aggregation function factory by implementing the interface com.espertech.esper.common.client.hook.aggfunc.AggregationFunctionFactory (used at runtime).

  3. Implement an aggregation function by implementing the interface com.espertech.esper.common.client.hook.aggfunc.AggregationFunction (used at runtime).

Custom aggregation functions can also be passed multiple parameters, as further described in Section 22.5.1.6, “Aggregation Single-Function: Accepting Multiple Parameters”. In the example below the aggregation function accepts a single parameter.

The code for the example aggregation function as shown in this chapter can be found in the runtime configuration example in the package com.espertech.esper.example.runtimeconfig by the name MyConcatAggregationFunction. The sample function simply concatenates string-type values.

An aggregation function forge class is only used at compile-time and is responsible for the following functions:

Aggregation forge classes implement the interface AggregationFunctionForge:

public class MyConcatAggregationFunctionForge implements AggregationFunctionForge { ...

The compiler constructs one instance of the aggregation function forge class for each time the function is listed in a statement, however the compiler may decide to reduce the number of aggregation forge instances if it finds equivalent aggregations.

The aggregation function forge instance receives the aggregation function name via set setFunctionName method.

The sample concatenation function forge provides an empty setFunctionName method:

public void setFunctionName(String functionName) {
  // no action taken
}

An aggregation function forge must provide an implementation of the validate method that is passed a AggregationFunctionValidationContext validation context object. Within the validation context you find the result type of each of the parameters expressions to the aggregation function as well as information about constant values and data window use. Please see the JavaDoc API documentation for a comprehensive list of validation context information.

Since the example concatenation function requires string types it implements a type check:

public void validate(AggregationValidationContext validationContext) {
  if (validationContext.getParameterTypes().length != 1 || !JavaClassHelper.isTypeString(validationContext.getParameterTypes()[0])) {
    throw new IllegalArgumentException("Concat aggregation requires a single parameter of type String");
  }
}

In order for the compiler to validate the type returned by the aggregation function against the types expected by enclosing expressions, the getValueType must return the result type of any values produced by the aggregation function:

public EPTypeClass getValueType() {
  return EPTypePremade.STRING.getEPType();
}

Finally the forge implementation must provide a getAggregationFunctionMode method that returns information about the factory. The compiler uses this information to build the aggregation function factory.

public AggregationFunctionMode getAggregationFunctionMode() {
    // Inject a factory by using "new"
    InjectionStrategy injectionStrategy = new InjectionStrategyClassNewInstance(MyConcatAggregationFunctionFactory.class);
    
    // The managed mode means there is no need to write code that generates code
    AggregationFunctionModeManaged mode = new AggregationFunctionModeManaged();
    mode.setInjectionStrategyAggregationFunctionFactory(injectionStrategy);
        
    return mode;
}

An aggregation function class is responsible for the following functions:

Aggregation function classes implement the interface AggregationFunction:

public class MyConcatAggregationFunction implements AggregationFunction { ...

The class that provides the aggregation and implements AggregationFunction does not have to be threadsafe.

The constructor initializes the aggregation function:

public class MyConcatAggregationFunction implements AggregationFunction {
  private final static char DELIMITER = ' ';
  private StringBuilder builder;
  private String delimiter;

  public MyConcatAggregationFunction() {
    builder = new StringBuilder();
    delimiter = "";
  }
  ...

The enter method adds a datapoint to the current aggregation value. The example enter method shown below adds a delimiter and the string value to a string buffer:

public void enter(Object value) {
  if (value != null) {
    builder.append(delimiter);
    builder.append(value.toString());
    delimiter = String.valueOf(DELIMITER);
  }
}

Conversly, the leave method removes a datapoint from the current aggregation value. The example leave method removes from the string buffer:

public void leave(Object value) {
  if (value != null) {
    builder.delete(0, value.toString().length() + 1);
  }
}

Finally, the runtime obtains the current aggregation value by means of the getValue method:

public Object getValue() {
  return builder.toString();
}

For on-demand queries the aggregation function must support resetting its value to empty or start values. Implement the clear function to reset the value as shown below:

public void clear() {
  builder = new StringBuilder();
  delimiter = "";
}

You may use an inline class that is part of the EPL modules to provide an aggregation function. For more information on inline classes see Chapter 18, Inlined Classes. Using an inline class does not require any compiler configuration.

Specify the ExtensionAggregationFunction annotation on the class level of the AggregationFunctionForge implementation class and the name (the EPL aggregation function name). This annotation instructs the compiler that the inlined class exposes an aggregation function.

This sample EPL includes an inlined class by name ConcatAggForge that provides a concat aggregation function that concatenates:

inlined_class """
import com.espertech.esper.common.client.hook.aggfunc.*;
import com.espertech.esper.common.client.hook.forgeinject.*;
import com.espertech.esper.common.client.serde.*;
import java.io.*;
import com.espertech.esper.common.internal.epl.expression.core.*;
@ExtensionAggregationFunction(name="concat")
public class ConcatAggForge implements AggregationFunctionForge {
  public void validate(AggregationFunctionValidationContext validationContext) throws ExprValidationException {
    EPType paramType = validationContext.getParameterTypes()[0];
    if (paramType == EPTypeNull.INSTANCE || ((EPTypeClass) paramType).getType() != String.class) {
      throw new ExprValidationException("Invalid parameter type '" + paramType + "'");
    }
  }

  public EPTypeClass getValueType() {
    return new EPTypeClass(String.class);
  }

  public AggregationFunctionMode getAggregationFunctionMode() {
    AggregationFunctionModeManaged mode = new AggregationFunctionModeManaged();
    mode.setHasHA(true);
    mode.setSerde(ConcatAggSerde.class);
    mode.setInjectionStrategyAggregationFunctionFactory(new InjectionStrategyClassNewInstance(ConcatAggFactory.class.getName()));
    return mode;
  }

  public static class ConcatAggFactory implements AggregationFunctionFactory {
    public AggregationFunction newAggregator(AggregationFunctionFactoryContext ctx) {
      return new ConcatAggFunction();
    }
  }

  public static class ConcatAggFunction implements AggregationFunction {
    private final static String DELIMITER = ",";
    private StringBuilder builder;
    private String delimiter;

    public ConcatAggFunction() {
      super();
      builder = new StringBuilder();
      delimiter = "";
    }

    public void enter(Object value) {
      if (value != null) {
        builder.append(delimiter);
        builder.append(value.toString());
        delimiter = DELIMITER;
      }
    }

    public void leave(Object value) {
      if (value != null) {
        builder.delete(0, value.toString().length() + 1);
      }
    }
  
    public String getValue() {
      return builder.toString();
    }
  
    public void clear() {
      builder = new StringBuilder();
      delimiter = "";
    }
  }
  
  // the serializer-deserializer is only for high availability and is not required otherwise
  public static class ConcatAggSerde {
    public static void write(DataOutput output, AggregationFunction value) throws IOException {
      ConcatAggFunction agg = (ConcatAggFunction) value;
      output.writeUTF(agg.getValue());
    }

    public static AggregationFunction read(DataInput input) throws IOException {
      ConcatAggFunction concatAggFunction = new ConcatAggFunction();
      String current = input.readUTF();
      if (!current.isEmpty()) {
        concatAggFunction.enter(current);
      }
      return concatAggFunction;
    }
  }
}
""" 
select concat(personName) from PersonEvent

Only one ExtensionAggregationFunction annotation can be specified per class and the annotation is only for use with inlined classes. Using an inline class does not require any compiler configuration.

When using create inlined_class the runtime resolves dependencies on EPL objects at time of deployment (the same as for all EPL objects).

Your plug-in aggregation function may accept multiple parameters. You must provide a different mode however:

    public AggregationFunctionMode getAggregationFunctionMode() {
        InjectionStrategy injectionStrategy = new InjectionStrategyClassNewInstance(SupportCountBackAggregationFunctionFactory.class);

        AggregationFunctionModeMultiParam multiParam = new AggregationFunctionModeMultiParam();
        multiParam.setInjectionStrategyAggregationFunctionFactory(injectionStrategy);
        
        return multiParam;
    }

For instance, assume an aggregation function rangeCount that counts all values that fall into a range of values. The EPL that calls this function and provides a lower and upper bounds of 1 and 10 is:

select rangeCount(1, 10, myValue) from MyEvent

The enter method of the plug-in aggregation function may look as follows:

public void enter(Object value)  {
  Object[] params = (Object[]) value;
  int lower = (Integer) params[0];
  int upper = (Integer) params[1];
  int val = (Integer) params[2];
  if ((val >= lower) && (val <= upper)) {
    count++;
  }
}

Your plug-in aggregation function may want to validate parameter types or may want to know which parameters are constant-value expressions. Constant-value expressions are evaluated only once by the runtime and could therefore be cached by your aggregation function for performance reasons. The runtime provides constant-value information as part of the AggregationValidationContext passed to the validate method.

This section introduces the aggregation multi-function API. Aggregation multi-functions can offer multiple functions, methods, modifiers or views onto sharable aggregation state. Please refer to the JavaDoc for more complete class and method-level documentation.

The EPL compiler provides two ways to provide aggregation multi-functions:

In either case, the following steps are required to develop and use a custom aggregation multi-function.

  1. Implement an aggregation multi-function forge by implementing the interface com.espertech.esper.common.client.hook.aggmultifunc.AggregationMultiFunctionForge.

  2. Implement one or more handlers for aggregation functions by implementing the interface com.espertech.esper.common.client.hook.aggmultifunc.AggregationMultiFunctionHandler.

  3. Implement an aggregation state key by implementing the interface com.espertech.esper.common.client.hook.aggmultifunc.AggregationMultiFunctionStateKey.

  4. Implement an aggregation state factory by implementing the interface com.espertech.esper.common.client.hook.aggmultifunc.AggregationMultiFunctionStateFactory.

  5. Implement an aggregation state holder by implementing the interface com.espertech.esper.common.client.hook.aggmultifunc.AggregationMultiFunctionState.

  6. Implement a state accessor factory by implementing the interface com.espertech.esper.common.client.hook.aggmultifunc.AggregationMultiFunctionAccessorFactory.

  7. Implement a state accessor by implementing the interface com.espertech.esper.common.client.hook.aggmultifunc.AggregationMultiFunctionAccessor.

  8. For use with tables, implement an agent factory by implementing the interface com.espertech.esper.common.client.hook.aggmultifunc.AggregationMultiFunctionAgentFactory.

  9. For use with tables, implement an agent by implementing the interface com.espertech.esper.common.client.hook.aggmultifunc.AggregationMultiFunctionAgent.

  10. For use with aggregation methods, implement an aggregation method factory by implementing the interface com.espertech.esper.common.client.hook.aggmultifunc.AggregationMultiFunctionAggregationMethodFactory.

  11. For use with aggregation methods, implement an aggregation method by implementing the interface com.espertech.esper.common.client.hook.aggmultifunc.AggregationMultiFunctionAggregationMethod.

  12. When using an inlined class, use the ExtensionAggregationMultiFunction annotation and by providing the function names. When providing external application classes, register the aggregation multi-function forge class with the compiler by supplying one or more function names, via the compiler configuration file or the runtime and static configuration API.

There are two examples for aggregation multi-function. The first example uses inlined classes and can be found at Section 22.5.2.8, “Using an Inlined Class to Provide an Aggregation Multi-Function”. The second example for aggregation multi-function is called Cycle-Detect and is used for the step-by-step below.

Cycle-Detect takes incoming transaction events that have from-account and to-account fields. The example detects a cycle in the transactions between accounts in order to detect a possible transaction fraud. Please note that the graph and cycle detection logic of the example is not part of the distribution: The example utilizes the jgrapht library.

In the Cycle-Detect example, the vertices of a graph are the account numbers. For example the account numbers Acct-1, Acct-2 and Acct-3. In the graph the edges are transaction events that identify a from-account and a to-account. An example edge is {from:Acct-1, to:Acct-2}. An example cycle is therefore in the three transactions {from:Acct-1, to:Acct-2}, {from:Acct-2, to:Acct-3} and {from:Acct-3, to:Acct-1}.

The code for the example aggregation multi-function as shown in this chapter can be found in the Cycle-Detect example in the package com.espertech.esper.example.cycledetect. The example provides two aggregation functions named cycledetected and cycleoutput:

  1. The cycledetected function returns a boolean value whether a graph cycle is found or not.

  2. The cycleoutput function outputs the vertices (account numbers) that are part of the graph cycle.

In the Cycle-Detect example, the following statement utilizes the two functions cycledetected and cycleoutput that share the same graph state to detect a cycle among the last 1000 events:

@Name('CycleDetector') select cycleoutput() as cyclevertices
from TransactionEvent#length(1000)
having cycledetected(fromAcct, toAcct)

If instead the goal is to run graph cycle detection every 1 second (and not upon arrival of a new event), this sample statement uses a pattern to trigger cycle detection:

@Name('CycleDetector')
select (select cycleoutput(fromAcct, toAcct) from TransactionEvent#length(1000)) as cyclevertices
from pattern [every timer:interval(1)]

An aggregation multi-function forge class is a compile-time class responsible for the following functions:

Aggregation multi-function factory classes implement the interface AggregationMultiFunctionForge:

public class CycleDetectorAggregationForge implements AggregationMultiFunctionForge { ...

The compiler constructs a single instance of the aggregation multi-function forge class that is shared for all aggregation function expressions in a statement that have one of the function names provided in the configuration object.

The compiler invokes the addAggregationFunction method at the time it compiles a statement. The method receives a declaration-time context object that provides the function name as well as additional information.

The sample Cycle-Detect factory class provides an empty addAggregationFunction method:

public void addAggregationFunction(AggregationMultiFunctionDeclarationContext declarationContext) {
    // provides an opportunity to inspect where used
}

The compiler invokes the validateGetHandler method at the time of expression validation. It passes a AggregationMultiFunctionValidationContext validation context object that contains actual parameters expressions. Please see the JavaDoc API documentation for a comprehensive list of validation context information.

The validateGetHandler method must return a handler object the implements the AggregationMultiFunctionHandler interface. Return a handler object for each aggregation function expression according to the aggregation function name and its parameters that are provided in the validation context.

The example cycledetect function takes two parameters that provide the cycle edge (from-account and to-account):

public AggregationMultiFunctionHandler validateGetHandler(AggregationMultiFunctionValidationContext validationContext) {
  if (validationContext.getParameterExpressions().length == 2) {
    fromExpression = validationContext.getParameterExpressions()[0];
    toExpression = validationContext.getParameterExpressions()[1];
  }
  return new CycleDetectorAggregationHandler(this, validationContext);
}

An aggregation multi-function handler class is a compile-time class that must implement the AggregationMultiFunctionHandler interface and is responsible for the following functions:

In the Cycle-Detect example, the class CycleDetectorAggregationHandler is the handler for all aggregation functions.

public class CycleDetectorAggregationHandler implements AggregationMultiFunctionHandler { ...

The getReturnType method provided by the handler instructs the compiler about the return type of each aggregation accessor. The class EPChainableType holds return type information.

In the Cycle-Detect example the cycledetected function returns a single boolean value. The cycleoutput returns a collection of vertices:

public EPChainableType getReturnType() {
    if (validationContext.getFunctionName().toLowerCase(Locale.ENGLISH).equals(CycleDetectorConstant.CYCLEOUTPUT_NAME)) {
        return EPChainableTypeHelper.collectionOfSingleValue((EPTypeClass) forge.getFromExpression().getForge().getEvaluationType());
    }
    return EPChainableTypeHelper.singleValue(Boolean.class);
}

The compiler invokes the getAggregationStateUniqueKey method to determine whether multiple aggregation function expressions in the same statement can share the same aggregation state or should receive different aggregation state instances.

The getAggregationStateUniqueKey method must return an instance of AggregationMultiFunctionStateKey. The compiler uses equals-semantics (the hashCode and equals methods) to determine whether multiple aggregation function share the state object. If the key object returned for each aggregation function by the handler is an equal key object then the compiler shares aggregation state between such aggregation functions for the same statement and context partition.

In the Cycle-Detect example the state is shared, which it achieves by simply returning the same key instance:

private static final AggregationMultiFunctionStateKey CYCLE_KEY = new AggregationMultiFunctionStateKey() {};

public AggregationMultiFunctionStateKey getAggregationStateUniqueKey() {
    return CYCLE_KEY;
}

The compiler invokes the getStateMode method to obtain an instance of AggregationMultiFunctionStateMode. The state mode is responsible to obtaining and configuring an aggregation state factory instance at time of deployment.

In the Cycle-Detect example the method passes the expression evaluators providing the from-account and to-account expressions to the state factory:

public AggregationMultiFunctionStateMode getStateMode() {
    AggregationMultiFunctionStateModeManaged managed = new AggregationMultiFunctionStateModeManaged();
    InjectionStrategyClassNewInstance injection = new InjectionStrategyClassNewInstance(CycleDetectorAggregationStateFactory.class);
    injection.addExpression("from", forge.getFromExpression());
    injection.addExpression("to", forge.getToExpression());
    managed.setInjectionStrategyAggregationStateFactory(injection);
    return managed;
}

The compiler invokes the getAccessorMode method to obtain an instance of AggregationMultiFunctionAccessorMode. The accessor mode is responsible to obtaining and configuring an accessor factory instance at time of deployment.

The getAccessorMode method provides information about the accessor factories according to whether the aggregation function name is cycledetected or cycleoutput:

public AggregationMultiFunctionAccessorMode getAccessorMode() {
    Class accessor;
    if (validationContext.getFunctionName().toLowerCase(Locale.ENGLISH).equals(CycleDetectorConstant.CYCLEOUTPUT_NAME)) {
        accessor = CycleDetectorAggregationAccessorOutputFactory.class;
    }
    else {
        accessor = CycleDetectorAggregationAccessorDetectFactory.class;
    }
    AggregationMultiFunctionAccessorModeManaged managed = new AggregationMultiFunctionAccessorModeManaged();
    InjectionStrategyClassNewInstance injection = new InjectionStrategyClassNewInstance(accessor);
    managed.setInjectionStrategyAggregationAccessorFactory(injection);
    return managed;
}

You may use an inline class that is part of the EPL modules to provide an aggregation multi-function. For more information on inline classes see Chapter 18, Inlined Classes. Using an inline class does not require any compiler configuration.

Specify the ExtensionAggregationMultiFunction annotation on the class level of the AggregationMultiFunctionForge implementation class and the function names (names of states and aggregation methods). This annotation instructs the compiler that the inlined class exposes an aggregation multi-function.

This sample EPL includes an inlined class by name TrieAggForge that uses a Trie data structures to store person names. A trie is a tree-like data structure whose nodes store the letters of an alphabet. Here, the Trie stores person names. The implementation uses the Trie provided by the Apache Commons Collections library.

The following sample EPL uses the Trie to store persons by person name and return a prefix-map, which is a sorted map of all persons that have the same prefix:

// The PersonEvent provides a person name and person id			
@public @buseventtype create schema PersonEvent(name string, id string);

// This provides the aggregation multi-function 
create inlined_class """
import com.espertech.esper.common.client.*;
import com.espertech.esper.common.client.type.*;
import com.espertech.esper.common.client.hook.aggmultifunc.*;
import com.espertech.esper.common.client.hook.forgeinject.*;
import com.espertech.esper.common.internal.epl.expression.core.*;
import com.espertech.esper.common.internal.rettype.*;
import com.espertech.esper.common.internal.epl.agg.core.*;
import org.apache.commons.collections4.Trie;
import org.apache.commons.collections4.trie.PatriciaTrie;
import java.util.*;
import java.util.function.*;

// We have 3 function names: 
// - "trieState" for use with create-table to hold the Trie
// - "trieEnter" for entering person events into the Trie by person name, for use with into-table aggregation
// - "triePrefixMap" for returning a prefix map of all persons with the same prefix as provided by its parameter
@ExtensionAggregationMultiFunction(names="trieState,trieEnter,triePrefixMap")
/**
 * The trie aggregation forge is the entry point for providing the multi-function aggregation.
 */
public class TrieAggForge implements AggregationMultiFunctionForge {
  public AggregationMultiFunctionHandler validateGetHandler(AggregationMultiFunctionValidationContext validationContext) {
    String name = validationContext.getFunctionName();
    if (name.equals("trieState")) {
      return new TrieAggHandlerTrieState();
    } else if (name.equals("trieEnter")) {
      return new TrieAggHandlerTrieEnter(validationContext.getParameterExpressions());
    } else if (name.equals("triePrefixMap")) {
      return new TrieAggHandlerTriePrefixMap();
    }
    throw new IllegalStateException("Unrecognized name '" + name + "' for use with trie");
  }
  
  /**
   * This handler handles the "trieState"-type table column, for use with create-table.
   */
  public static class TrieAggHandlerTrieState implements AggregationMultiFunctionHandler {
    public EPChainableType getReturnType() {
      return EPChainableTypeHelper.singleValue(Trie.class);
    }

    public AggregationMultiFunctionStateKey getAggregationStateUniqueKey() {
      return new AggregationMultiFunctionStateKey() {};
    }

    public AggregationMultiFunctionStateMode getStateMode() {
      InjectionStrategyClassNewInstance injection = new InjectionStrategyClassNewInstance(TrieAggStateFactory.class);
      return new AggregationMultiFunctionStateModeManaged(injection);
    }

    public AggregationMultiFunctionAccessorMode getAccessorMode() {
      // accessor that returns the trie itself
      InjectionStrategyClassNewInstance injection = new InjectionStrategyClassNewInstance(TrieAggAccessorFactory.class);
      return new AggregationMultiFunctionAccessorModeManaged(injection);
    }

    public AggregationMultiFunctionAgentMode getAgentMode() {
      throw new UnsupportedOperationException("Trie aggregation access is only by the 'triePrefixMap' method");
    }

    public AggregationMultiFunctionAggregationMethodMode getAggregationMethodMode(AggregationMultiFunctionAggregationMethodContext ctx) {
      throw new UnsupportedOperationException("Trie aggregation access is only by the 'triePrefixMap' method");
    }
  }
  
  /**
   * This handler handles the "trieEnter"-operation that updates trie state, for use with into-table aggregation
   */
  public static class TrieAggHandlerTrieEnter implements AggregationMultiFunctionHandler {
    private final ExprNode[] parameters;
    
    public TrieAggHandlerTrieEnter(ExprNode[] parameters) {
      this.parameters = parameters;
    }

    public EPChainableType getReturnType() {
      return EPChainableTypeHelper.singleValue(Trie.class); // we return the Trie itself
    }

    public AggregationMultiFunctionStateKey getAggregationStateUniqueKey() {
      throw new UnsupportedOperationException("Not a trie state");
    }

    public AggregationMultiFunctionStateMode getStateMode() {
      throw new UnsupportedOperationException("Not a trie state");
    }

    public AggregationMultiFunctionAccessorMode getAccessorMode() {
      // accessor that returns the trie itself
      InjectionStrategyClassNewInstance injection = new InjectionStrategyClassNewInstance(TrieAggAccessorFactory.class);
      return new AggregationMultiFunctionAccessorModeManaged(injection);
    }

    public AggregationMultiFunctionAgentMode getAgentMode() {
      if (parameters.length != 1 || ((EPTypeClass) parameters[0].getForge().getEvaluationType()).getType() != String.class) {
        throw new IllegalArgumentException("Requires a single parameter returing a string value");
      }
      InjectionStrategyClassNewInstance injection = new InjectionStrategyClassNewInstance(TrieAggAgentFactory.class);
      injection.addExpression("keyExpression", parameters[0]);
      return new AggregationMultiFunctionAgentModeManaged(injection);
    }

    public AggregationMultiFunctionAggregationMethodMode getAggregationMethodMode(AggregationMultiFunctionAggregationMethodContext ctx) {
      throw new UnsupportedOperationException("Trie aggregation access is only by the 'triePrefixMap' method");
    }
  }
  
  /**
   * This handler handles the "triePrefixMap" accessor for returning a prefix map of same-prefix person events
   */
  public static class TrieAggHandlerTriePrefixMap implements AggregationMultiFunctionHandler {
    public EPChainableType getReturnType() {
      return EPChainableTypeHelper.singleValue(Map.class);
    }
    
    public AggregationMultiFunctionStateKey getAggregationStateUniqueKey() {
      throw new UnsupportedOperationException("Not implemented for 'triePrefixMap' trie method");
    }

    public AggregationMultiFunctionStateMode getStateMode() {
      throw new UnsupportedOperationException("Not implemented for 'triePrefixMap' trie method");
    }

    public AggregationMultiFunctionAccessorMode getAccessorMode() {
      throw new UnsupportedOperationException("Not implemented for 'triePrefixMap' trie method");
    }

    public AggregationMultiFunctionAgentMode getAgentMode() {
      throw new UnsupportedOperationException("Not implemented for 'triePrefixMap' trie method");
    }

    public AggregationMultiFunctionAggregationMethodMode getAggregationMethodMode(AggregationMultiFunctionAggregationMethodContext ctx) {
      if (ctx.getParameters().length != 1 || ((EPTypeClass) ctx.getParameters()[0].getForge().getEvaluationType()).getType() != String.class) {
        throw new IllegalArgumentException("Requires a single parameter returning a string value");
      }
      InjectionStrategyClassNewInstance injection = new InjectionStrategyClassNewInstance(TrieAggMethodFactoryPrefixMap.class);
      injection.addExpression("keyExpression", ctx.getParameters()[0]);
      return new AggregationMultiFunctionAggregationMethodModeManaged(injection);
    }
  }
  
  /**
   * The agent state factory is responsible for producing a state holder that holds the trie state
   */
  public static class TrieAggStateFactory implements AggregationMultiFunctionStateFactory {
    public AggregationMultiFunctionState newState(AggregationMultiFunctionStateFactoryContext ctx) {
      return new TrieAggState();
    }
  }
  
  /**
   * The agent state is the state holder that holds the trie state
   */
  public static class TrieAggState implements AggregationMultiFunctionState {
    private final Trie<String, List<Object>> trie = new PatriciaTrie<>();
    
    public void applyEnter(EventBean[] eventsPerStream, ExprEvaluatorContext exprEvaluatorContext) {
      throw new UnsupportedOperationException("Not used since the agent updates the table");
    }
    
    public void applyLeave(EventBean[] eventsPerStream, ExprEvaluatorContext exprEvaluatorContext) {
      throw new UnsupportedOperationException("Not used since the agent updates the table");
    }
    
    public void clear() {
      trie.clear();
    }

    public void add(String key, Object underlying) {
      List<Object> existing = (List<Object>) trie.get(key);
      if (existing != null) {
        existing.add(underlying);
        return;
      }
      List<Object> events = new ArrayList<>(2);
      events.add(underlying);
      trie.put(key, events);
    }
    
    public void remove(String key, Object underlying) {
      List<Object> existing = (List<Object>) trie.get(key);
      if (existing != null) {
        existing.remove(underlying);
        if (existing.isEmpty()) {
          trie.remove(key);
        }
      }
    }
  }
  
  /**
   * The accessor factory is responsible for producing an accessor that returns the result of the trie table column when accessed without an aggregation method
   */
  public static class TrieAggAccessorFactory implements AggregationMultiFunctionAccessorFactory {
    public AggregationMultiFunctionAccessor newAccessor(AggregationMultiFunctionAccessorFactoryContext ctx) {
      return new TrieAggAccessor();
    }
  }
  
  /**
   * The accessor returns the result of the trie table column when accessed without an aggregation method
   */
  public static class TrieAggAccessor implements AggregationMultiFunctionAccessor {
    // This is the value return when just referring to the trie table column by itself without a method name such as "prefixMap".
    public Object getValue(AggregationMultiFunctionState state, EventBean[] eventsPerStream, boolean isNewData, ExprEvaluatorContext exprEvaluatorContext) {
      TrieAggState trie = (TrieAggState) state;
      return trie.trie;
    }
  }
  
  /**
   * The agent factory is responsible for producing an agent that handles all changes to the trie table column.
   */
  public static class TrieAggAgentFactory implements AggregationMultiFunctionAgentFactory {
    private ExprEvaluator keyExpression;
    
    public void setKeyExpression(ExprEvaluator keyExpression) {
      this.keyExpression = keyExpression;
    }
    
    public AggregationMultiFunctionAgent newAgent(AggregationMultiFunctionAgentFactoryContext ctx) {
      return new TrieAggAgent(this);
    }
  }
  
  /**
   * The agent is responsible for all changes to the trie table column.
   */
  public static class TrieAggAgent implements AggregationMultiFunctionAgent {
    private final TrieAggAgentFactory factory;
    
    public TrieAggAgent(TrieAggAgentFactory factory) {
      this.factory = factory;
    }
    
    public void applyEnter(EventBean[] eventsPerStream, ExprEvaluatorContext exprEvaluatorContext, AggregationRow row, int column) {
      String key = (String) factory.keyExpression.evaluate(eventsPerStream, true, exprEvaluatorContext);
      TrieAggState trie = (TrieAggState) row.getAccessState(column);
      trie.add(key, eventsPerStream[0].getUnderlying());
    }

    public void applyLeave(EventBean[] eventsPerStream, ExprEvaluatorContext exprEvaluatorContext, AggregationRow row, int column) {
      String key = (String) factory.keyExpression.evaluate(eventsPerStream, false, exprEvaluatorContext);
      TrieAggState trie = (TrieAggState) row.getAccessState(column);
      trie.remove(key, eventsPerStream[0].getUnderlying());
    }
  }
  
  /**
   * The aggregation method factory is responsible for producing an aggregation method for the "trie" return result of the trie table column.
   */
  public static class TrieAggMethodFactoryTrieColumn implements AggregationMultiFunctionAggregationMethodFactory {
    public AggregationMultiFunctionAggregationMethod newMethod(AggregationMultiFunctionAggregationMethodFactoryContext context) {
      return new AggregationMultiFunctionAggregationMethod() {
        public Object getValue(int aggColNum, AggregationRow row, EventBean[] eventsPerStream, boolean isNewData, ExprEvaluatorContext exprEvaluatorContext) {
          TrieAggState trie = (TrieAggState) row.getAccessState(aggColNum);
          return trie.trie;
        }
      };
    }
  }
  
  /**
   * The aggregation method factory is responsible for producing an aggregation method for the "triePrefixMap" expression of the trie table column.
   */
  public static class TrieAggMethodFactoryPrefixMap implements AggregationMultiFunctionAggregationMethodFactory {
    private ExprEvaluator keyExpression;
    
    public void setKeyExpression(ExprEvaluator keyExpression) {
      this.keyExpression = keyExpression;
    }

    public AggregationMultiFunctionAggregationMethod newMethod(AggregationMultiFunctionAggregationMethodFactoryContext context) {
      return new TrieAggMethodPrefixMap(this);
    }
  }
  
  /**
   * The aggregation method is responsible for the "triePrefixMap" expression result of the trie table column.
   */
  public static class TrieAggMethodPrefixMap implements AggregationMultiFunctionAggregationMethod {
    private final TrieAggMethodFactoryPrefixMap factory;
    
    public TrieAggMethodPrefixMap(TrieAggMethodFactoryPrefixMap factory) {
      this.factory = factory;
    }
    
    public Object getValue(int aggColNum, AggregationRow row, EventBean[] eventsPerStream, boolean isNewData, ExprEvaluatorContext exprEvaluatorContext) {
      String key = (String) factory.keyExpression.evaluate(eventsPerStream, false, exprEvaluatorContext);
      TrieAggState trie = (TrieAggState) row.getAccessState(aggColNum);
      return trie.trie.prefixMap(key);
    }
  }
}
""";

// We use a table to store the Trie. The Trie is effectively a Trie<String, List<PersonEvent>> holding a list of person events in branch and leaf nodes.
@name('table') create table TableWithTrie(nameTrie trieState(string));

// We aggregate directly into the table using the person name as the Trie key and the event as value
@Priority(1) into table TableWithTrie select trieEnter(name) as nameTrie from PersonEvent;

// For each person output the prefix map, a sorted map (SortedMap<String, List<PersonEvent>>) with the same prefixes as the person name
@Priority(0) @name('s0') select TableWithTrie.nameTrie.triePrefixMap(name) from PersonEvent;

Only one ExtensionAggregationMultiFunction annotation can be specified per class and the annotation is only for use with inlined classes. Using an inline class does not require any compiler configuration.

When using create inlined_class the runtime resolves dependencies on EPL objects at time of deployment (the same as for all EPL objects).

Pattern guards are pattern objects that control the lifecycle of the guarded sub-expression, and can filter the events fired by the subexpression.

The following steps are required to develop and use a custom guard object.

  1. Implement a guard forge class, responsible for compile-time guard information.

  2. Implement a guard factory class, responsible for creating guard object instances at runtime.

  3. Implement a guard class (used at runtime).

  4. Register the guard forge class with the compiler by supplying a namespace and name, via the compiler configuration.

The code for the example guard object as shown in this chapter can be found in the test source folder in the package com.espertech.esper.regressionlib.support.extend.pattern by the name MyCountToPatternGuardForge. The sample guard discussed here counts the number of events occurring up to a maximum number of events, and end the sub-expression when that maximum is reached.

Some of the APIs that you use to implement a pattern guard are internal APIs and are not stable and may change between releases. The client package contains all the stable interface classes.

A guard forge class is only used by the compiler and is responsible for the following functions:

Guard forge classes implement the GuardForge:

public class MyCountToPatternGuardForge implements GuardForge { ...

The compiler constructs one instance of the guard forge class for each time the guard is listed in a statement.

The guard forge class implements the setGuardParameters method that is passed the parameters to the guard as supplied by the statement. It verifies the guard parameters, similar to the code snippet shown next. Our example counter guard takes a single numeric parameter:

public void setGuardParameters(List<ExprNode> guardParameters, MatchedEventConvertorForge convertor, StatementCompileTimeServices services) throws GuardParameterException {
    String message = "Count-to guard takes a single integer-value expression as parameter";
    if (guardParameters.size() != 1) {
        throw new GuardParameterException(message);
    }

    Class paramType = guardParameters.get(0).getForge().getEvaluationType();
    if (paramType != Integer.class && paramType != int.class) {
        throw new GuardParameterException(message);
    }
        
    this.numCountToExpr = guardParameters.get(0);
    this.convertor = convertor;
}

The makeCodegen method is called by the compiler to receive the code that builds a guard factory. Use the SAIFFInitializeBuilder to build factory initialization code:

public CodegenExpression makeCodegen(CodegenMethodScope parent, SAIFFInitializeSymbol symbols, CodegenClassScope classScope) {
    SAIFFInitializeBuilder builder = new SAIFFInitializeBuilder(MyCountToPatternGuardFactory.class, this.getClass(), "guardFactory", parent, symbols, classScope);
    return builder.exprnode("numCountToExpr", numCountToExpr)
                .expression("convertor", convertor.makeAnonymous(builder.getMethod(), classScope))
                .build();
}

Pattern observers are pattern objects that are executed as part of a pattern expression and can observe events or test conditions. Examples for built-in observers are timer:at and timer:interval. Some suggested uses of observer objects are:

  • Implement custom scheduling logic using the runtime's own scheduling and timer services

  • Test conditions related to prior events matching an expression

The following steps are required to develop and use a custom observer object within pattern statements:

  1. Implement an observer forge class, which is used by the compiler only and is responsible for validating parameters and for initializing an observer factory.

  2. Implement an observer factory class, responsible for creating observer object instances.

  3. Implement an observer class.

  4. Register an observer factory class with the compiler by supplying a namespace and name, via the compiler configuration file or the configuration API.

The code for the example observer object as shown in this chapter can be found in the test source folder in package com.espertech.esper.regression.client by the name MyFileExistsObserver. The sample observer discussed here very simply checks if a file exists, using the filename supplied by the pattern statement, and via the java.io.File class.

Some of the APIs that you use to implement a pattern observer are internal APIs and are not stable and may change between releases. The client package contains all the stable interface classes.

An observer forge class is responsible for the following functions:

Observer forge classes implement the ObserverForge interface:

public class MyFileExistsObserverForge implements ObserverForge { ...

The compiler constructs one instance of the observer forge class for each time the observer is listed in a statement.

The observer forge class implements the setObserverParameters method that is passed the parameters to the observer as supplied by the statement. It verifies the observer parameters, similar to the code snippet shown next. Our example file-exists observer takes a single string parameter:

public void setObserverParameters(List<ExprNode> observerParameters, MatchedEventConvertorForge convertor, ExprValidationContext validationContext) throws ObserverParameterException {
    String message = "File exists observer takes a single string filename parameter";
    if (observerParameters.size() != 1) {
        throw new ObserverParameterException(message);
    }
    if (!(observerParameters.get(0).getForge().getEvaluationType() == String.class)) {
        throw new ObserverParameterException(message);
    }

    this.filenameExpression = observerParameters.get(0);
    this.convertor = convertor;
}

The compiler calls the makeCodegen method to provide code that initializes the observer factory at time of deployment. It uses the SAIFFInitializeBuilder to build the code.

public CodegenExpression makeCodegen(CodegenMethodScope parent, SAIFFInitializeSymbol symbols, CodegenClassScope classScope) {
    SAIFFInitializeBuilder builder = new SAIFFInitializeBuilder(MyFileExistsObserverFactory.class, this.getClass(), "observerFactory", parent, symbols, classScope);
    return builder.exprnode("filenameExpression", filenameExpression)
            .expression("convertor", convertor.makeAnonymous(builder.getMethod(), classScope))
            .build();
}

An observer factory class is responsible for the following functions:

Observer factory classes implement the ObserverFactory:

public class MyFileExistsObserverFactory implements ObserverFactory { ...

The runtime obtains an instance of the observer factory class at time of deployment.

The runtime calls the makeObserver method to create a new observer instance. The example makeObserver method shown below passes parameters to the observer instance:

public EventObserver makeObserver(PatternAgentInstanceContext context, MatchedEventMap beginState, ObserverEventEvaluator observerEventEvaluator, Object observerState, boolean isFilterChildNonQuitting) {
    EventBean[] events = convertor == null ? null : convertor.convert(beginState);
    Object filename = PatternExpressionUtil.evaluateChecked("File-exists observer ", filenameExpression, events, context.getAgentInstanceContext());
    if (filename == null) {
        throw new EPException("Filename evaluated to null");
    }
    return new MyFileExistsObserver(beginState, observerEventEvaluator, filename.toString());
}

The ObserverEventEvaluator parameter allows an observer to indicate events, and to indicate change of truth value to permanently false. Use this interface to indicate when your observer has received or witnessed an event, or changed it's truth value to true or permanently false.

The MatchedEventMap parameter provides a Map of all matching events for the expression prior to the observer's start. For example, consider a pattern as below:

a=MyEvent -> myplugin:my_observer(...)

The above pattern tagged the MyEvent instance with the tag "a". The runtime starts an instance of my_observer when it receives the first MyEvent. The observer can query the MatchedEventMap using "a" as a key and obtain the tagged event.

Your application can provide additional date-time methods by implementing the extension API as shown below. The steps are as follows:

  1. Implement the DateTimeMethodForgeFactory interface (package com.espertech.esper.common.client.hook.datetimemethod) that the compiler invokes for validation and to receive information on the public static methods that your application exposes that provide the date-time method logic.

  2. Implement a static method (or multiple static methods) that receives the respective date-time value and additional parameters, if any, and that may return a new value, following the rules outlined below.

  3. Add the class name of the DateTimeMethodForgeFactory implementation to the compiler configuration.

  4. Use the new date-time method(s).

The EPL compiler distinguishes between two types of date-time methods:

  • A modifying date-time method modifies the date-time value, while the type of the result is the same as the type of the date-time value (i.e. input and result is a Calendar or LocalDateTime or other value).

  • A reformatting date-time method transforms the date-time value into a result that has a different return type.

The example herein builds a value-changing date-time method by name roll that rolls a field forward or backward and that receives the name of the field and an up/down flag as parameters.

The second example herein builds a reformatting date-time method by name asArrayOfString that returns an array of strings containing the day, month and year values of the date-time value.

The implementation of the DateTimeMethodForgeFactory interface is responsible for:

The EPL compiler calls the initialize method to obtain the allowed footprints. It compares the footprints to the number and type of parameters actually provided.

For example, if there is a single footprint with no parameters, use this:

DotMethodFP[] footprints = new DotMethodFP[] {
    new DotMethodFP(DotMethodFPInputEnum.SCALAR_ANY)
  };
return new DateTimeMethodDescriptor(footprints);

For example, if there is a single footprint with a single string-type parameter, use this:

DotMethodFP[] footprints = new DotMethodFP[] {
  new DotMethodFP(DotMethodFPInputEnum.SCALAR_ANY,
      new DotMethodFPParam("provide a descriptive name of parameter", EPLExpressionParamType.SPECIFIC, String.class)
    )
  };

In the case that the date-time method modifies the date-time value, make the validate method returns an instance of DateTimeMethodModifyOps.

In the case that the date-time method reformats the date-time value, make the validate method returns an instance of DateTimeMethodReformatOps.

Use the DateTimeMethodModeStaticMethod class to provide the class and the name of the public static method providing the respective operation.

It is not required to provide a static method for each of the different types of date-time values. You may leave an operation at a null-value to indicate it is not provided for that date-time value type.

The following class handles the new roll date-time method, which has single footprint that has a string-type and a boolean-type parameter:

public class MyLocalDTMForgeFactoryRoll implements DateTimeMethodForgeFactory {
  private final static DotMethodFP[] FOOTPRINTS = new DotMethodFP[]{
    new DotMethodFP(DotMethodFPInputEnum.SCALAR_ANY,
        new DotMethodFPParam("an string-type calendar field name", EPLExpressionParamType.SPECIFIC, String.class),
        new DotMethodFPParam("a boolean-type up/down indicator", EPLExpressionParamType.SPECIFIC, boolean.class))
    };

  public DateTimeMethodDescriptor initialize(DateTimeMethodInitializeContext context) {
    return new DateTimeMethodDescriptor(FOOTPRINTS);
  }

  public DateTimeMethodOps validate(DateTimeMethodValidateContext context) {
    // this is an opportunity to do additional validation or evaluation when desired
    // however the footprint is already validated
    DateTimeMethodOpsModify roll = new DateTimeMethodOpsModify();
    
    // see below for MyLocalDTMRollUtility
    roll.setCalendarOp(new DateTimeMethodModeStaticMethod(MyLocalDTMRollUtility.class, "roll"));
    roll.setLdtOp(new DateTimeMethodModeStaticMethod(MyLocalDTMRollUtility.class, "roll"));
    roll.setZdtOp(new DateTimeMethodModeStaticMethod(MyLocalDTMRollUtility.class, "roll"));
    return roll;
 }
}

The following class handles the new asArrayOfString date-time method, which has single footprint that has a no parameters:

public class MyLocalDTMForgeFactoryArrayOfString implements DateTimeMethodForgeFactory {
  private final static DotMethodFP[] FOOTPRINTS = new DotMethodFP[]{
    new DotMethodFP(DotMethodFPInputEnum.SCALAR_ANY)
  };

  public DateTimeMethodDescriptor initialize(DateTimeMethodInitializeContext context) {
    return new DateTimeMethodDescriptor(FOOTPRINTS);
  }

  public DateTimeMethodOps validate(DateTimeMethodValidateContext context) {
    DateTimeMethodOpsReformat asArrayOfString = new DateTimeMethodOpsReformat();
    asArrayOfString.setReturnType(String[].class);
    
    // see below for MyLocalDTMArrayOfStringUtility
    asArrayOfString.setLongOp(new DateTimeMethodModeStaticMethod(MyLocalDTMArrayOfStringUtility.class, "asArrayOfString"));
    asArrayOfString.setDateOp(new DateTimeMethodModeStaticMethod(MyLocalDTMArrayOfStringUtility.class, "asArrayOfString"));
    asArrayOfString.setCalendarOp(new DateTimeMethodModeStaticMethod(MyLocalDTMArrayOfStringUtility.class, "asArrayOfString"));
    asArrayOfString.setLdtOp(new DateTimeMethodModeStaticMethod(MyLocalDTMArrayOfStringUtility.class, "asArrayOfString"));
    asArrayOfString.setZdtOp(new DateTimeMethodModeStaticMethod(MyLocalDTMArrayOfStringUtility.class, "asArrayOfString"));
    return asArrayOfString;    
  }
}

For value-changing date-time methods that operate on long, Date and Calendar, the static method must return void, its first parameter must be Calendar and the remaining parameters much match the expression parameters, such as:

public static void roll(Calendar calendar, String fieldName, boolean flagValue) {

For value-changing date-time methods that operate on LocalDateTime, the static method must return LocalDateTime, its first parameter must be LocalDateTime and the remaining parameters much match the expression parameters, such as:

public static LocalDateTime roll(LocalDateTime ldt, String fieldName, boolean flagValue) {

For value-changing date-time methods that operate on ZonedDateTime, the static method must return ZonedDateTime, its first parameter must be ZonedDateTime and the remaining parameters much match the expression parameters, such as:

public static ZonedDateTime roll(ZonedDateTime zdt, String fieldName, boolean flagValue) {

For reformatting date-time methods, the static method must return the same type as provided by the getReturnType method of DateTimeMethodReformatMode, its first parameter must be any of the below and the remaining parameters much match the expression parameters (see example below).

The class providing the static methods for the roll date-time method is shown next.

public class MyLocalDTMRollUtility {
  public static void roll(Calendar calendar, String fieldName, boolean flagValue) {
    switch (fieldName) {
      case "date": calendar.roll(Calendar.DATE, flagValue); break;
        default: throw new EPException("Invalid field name '" + fieldName + "'");
    }
  }

  public static LocalDateTime roll(LocalDateTime ldt, String fieldName, boolean flagValue) {
    switch (fieldName) {
      case "date": return ldt.plusDays(1);
        default: throw new EPException("Invalid field name '" + fieldName + "'");
    }
  }

  public static ZonedDateTime roll(ZonedDateTime zdt, String fieldName, boolean flagValue) {
    switch (fieldName) {
      case "date": return zdt.plusDays(1);
        default: throw new EPException("Invalid field name '" + fieldName + "'");
    }
  }
}

The class providing the static methods for the asArrayOfString date-time method is shown next.

public class MyLocalDTMArrayOfStringUtility {
  public static String[] asArrayOfString(long date) {
    Calendar calendar = Calendar.getInstance();
    calendar.setTimeInMillis(date);
    return asArrayOfString(calendar);
  }

  public static String[] asArrayOfString(Date date) {
    Calendar calendar = Calendar.getInstance();
    calendar.setTime(date);
    return asArrayOfString(calendar);
  }

  public static String[] asArrayOfString(Calendar calendar) {
    return new String[] {Integer.toString(calendar.get(Calendar.DAY_OF_MONTH)),
      Integer.toString(calendar.get(Calendar.MONTH) + 1),
      Integer.toString(calendar.get(Calendar.YEAR))};
  }

  public static String[] asArrayOfString(LocalDateTime ldt) {
    return new String[] {Integer.toString(ldt.getDayOfMonth()),
      Integer.toString(ldt.getMonthValue()),
      Integer.toString(ldt.getYear())};
  }

  public static String[] asArrayOfString(ZonedDateTime zdt) {
    return new String[] {Integer.toString(zdt.getDayOfMonth()),
      Integer.toString(zdt.getMonthValue()),
      Integer.toString(zdt.getYear())};
    }
  }
}

Your application can provide additional enumeration methods by implementing the extension API as shown below. The steps are as follows:

  1. Implement the EnumMethodForgeFactory interface (package com.espertech.esper.common.client.hook.enummethod) that the compiler invokes for validation and to receive information on the state class and the public static methods that your application exposes that provide the enumeration method logic.

  2. Implement the EnumMethodState that holds the state for enumerating over input values.

  3. Implement a static method (the processing method) that receives the state and each of the items of the collection of events, scalar values or object values and the result of lambda parameter expression evalutions.

  4. Add the class name of the EnumMethodForgeFactory implementation to the compiler configuration.

  5. Use the new enumeration method(s).

The example herein builds a simple enumeration method by name median that computes the median for a set of integer-typed input values and that returns a double-type median.

The implementation of the EnumMethodForgeFactory interface is responsible for:

The EPL compiler calls the initialize method to obtain the allowed footprints. It compares the footprints to the number and type of parameters actually provided.

The example median enumeration method takes a scalar numeric values as input and has no parameters.

DotMethodFP[] footprints = new DotMethodFP[] {
    new DotMethodFP(DotMethodFPInputEnum.SCALAR_NUMERIC)
  };
return new EnumMethodDescriptor(footprints);

Additional examples for footprint are:

Table 22.2. Enumeration Method Footprint Examples

Sample Footprint and Processing MethodComment
new DotMethodFP(DotMethodFPInputEnum.EVENTCOLL, 
  new DotMethodFPParam(1, "predicate", EPLExpressionParamType.BOOLEAN))
process(State_class state, EventBean event, Boolean pass)
Enumeration method taking events as input and that has a single lambda expression that is a predicate and that returns a boolean value
new DotMethodFP(DotMethodFPInputEnum.SCALAR_ANY, 
  new DotMethodFPParam(1, "value-selector", EPLExpressionParamType.NUMERIC))
						      
process(State_class state, Object value, Object lambdaResult)
Enumeration method taking any type of scalar values as input and that has a single lambda expression that is a value-selector and that returns a numeric value
new DotMethodFP(DotMethodFPInputEnum.SCALAR_NUMERIC, 
  new DotMethodFPParam("from", EPLExpressionParamType.NUMERIC), 
  new DotMethodFPParam("to", EPLExpressionParamType.NUMERIC))
process(State_class state, Object value)
Enumeration method taking any type of numeric scalar values as input and that has no lambda expressions as parameter but two non-lambda expressions as parameter both returning a numeric value
new DotMethodFP(DotMethodFPInputEnum.EVENTCOLL, 
  new DotMethodFPParam(1, "v1", EPLExpressionParamType.ANY), 
  new DotMethodFPParam(1, "v2", EPLExpressionParamType.ANY))
process(State_class state, EventBean event, Object v1, Object v2)
Enumeration method taking events as input and that has two lambda expressions as parameter both returning any object value
new DotMethodFP(DotMethodFPInputEnum.ANY, 
  new DotMethodFPParam(2, "value, index", EPLExpressionParamType.BOOLEAN))
process(State_class state, EventBean event, Boolean pass)
Enumeration method taking any type of scalar values as input and that a single lambda expression that has 2 parameters (value and index, similar to takeWhile with index) and that returns a boolean value

Use the EnumMethodModeStaticMethod class to provide the class of the state object and the class and the name of the public static method that is the processing method.

The following class handles the new median enumeration method, which has single footprint that has no parameters:

public static class MyLocalEnumMethodForgeMedian implements EnumMethodForgeFactory {
  private static final DotMethodFP[] FOOTPRINTS = new DotMethodFP[]{
    new DotMethodFP(DotMethodFPInputEnum.SCALAR_NUMERIC)
  };

  public EnumMethodDescriptor initialize(EnumMethodInitializeContext context) {
    return new EnumMethodDescriptor(FOOTPRINTS);
  }

  public EnumMethodModeStaticMethod validate(EnumMethodValidateContext context) {
    Class stateClass = MyLocalEnumMethodMedianState.class; // the class providing state, must implement EnumMethodState
    Class serviceClass = MyLocalEnumMethodMedianService.class; // the class providing the processing method (any class)
    String methodName = "next"; // the name of the method for processing an item of input values (any method name)
    EPChainableType returnType = EPChainableTypeHelper.singleValue(Double.class); // indicate that we are returning a Double-type value
    boolean earlyExit = false;
    return new EnumMethodModeStaticMethod(stateClass, serviceClass, methodName, returnType, earlyExit);
  }
}

The EnumMethodModeStaticMethod provides multiple settings to the EPL compiler:

  • The class that implements the EnumMethodState interface which holds enumeration state.

  • The class and method name of the method that processes values and that receives the result of lambda parameter evaluation.

  • The return type of the enumeration method which is an EPChainableType value.

  • An indicator whether the state requires early-access checking.

The EPChainableType return type has the following choices:

  • For a method returning a collection of events, always use EPChainableTypeHelper.collectionOfEvents(context.getInputEventType()).

  • For a method returning a collection of objects, use EPChainableTypeHelper.collectionOfSingleValue(class_of_value).

  • For a method returning a single scalar value, use EPChainableTypeHelper.singleValue(class_of_value).

The implementation of the EnumMethodState interface is responsible for holding the transient state of one pass over input values to the enumeration method. The runtime allocates a new instance of the provided class for each execution of the enumeration method. The implementation class must have a default constructor.

The state class does this:

The following class handles the state for the median enumeration method and computes the median:

public class MyLocalEnumMethodMedianState implements EnumMethodState {
  private List<Integer> list = new ArrayList<>();

  public Object state() {
    Collections.sort(list);
    // get count of scores
    int totalElements = list.size();
    if (totalElements < 2) {
      return null;
    }
    // check if total number of scores is even
    if (totalElements % 2 == 0) {
      int sumOfMiddleElements = list.get(totalElements / 2) + list.get(totalElements / 2 - 1);
      // calculate average of middle elements
      return ((double) sumOfMiddleElements) / 2;
    }
  return (double) list.get(totalElements / 2);
  }

  public void add(Integer value) {
    list.add(value);
  }
}

The example does not have additional non-lambda parameters and therefore does not override setParameter. Your application can receive any non-lambda expression values by overriding setParameter.

The example does not have early-exit and therefore does not override completed. Your application can override completed to indicate an early exit. Please make sure EnumMethodModeStaticMethod has the early-exit flag set.

There are three types of lambda parameters supported.

When a lambda parameter is the value itself it is represented by EnumMethodLambdaParameterTypeValue. For example, in orderItems.where(v => v.price > 0) the v parameter is the input item value itself, i.e. the event or scalar value depending on input. This is always the default and no additional code is required.

When a lambda parameter is the index of the value it is represented by EnumMethodLambdaParameterTypeIndex. For example, in orderItems.takeWhile( (v, ind) => ind < 10) the ind parameter is the numeric index of the item starting at zero.

The below code snippet sets the lambda parameter types for value and index:

mode.setLambdaParameters(descriptor -> {
  if (descriptor.getLambdaParameterNumber() == 0) {
    return EnumMethodLambdaParameterTypeValue.INSTANCE;
  }
  return EnumMethodLambdaParameterTypeIndex.INSTANCE;
});

When a lambda parameter is provided by the state class itself it is represented by EnumMethodLambdaParameterTypeStateGetter. For example, in orderItems.aggregate(0, (result, v) => result + v.price)) the result parameter is provided by the state itself.

The below code snippet sets the lambda parameter types for state-provided and index:

mode.setLambdaParameters(descriptor -> {
  if (descriptor.getLambdaParameterNumber() == 0) {
    // the state class has a getResult method returning string
    return new EnumMethodLambdaParameterTypeStateGetter(int.class, "getResult");
  }
  return EnumMethodLambdaParameterTypeValue.INSTANCE;
});

Note that the example above assumes that the state class has a getResult method returning an int-type value.

For additional information and examples please consult the JavaDoc and regression testing code.