www.espertech.comDocumentation
This chapter summarizes integration and describes in detail each of the extension APIs that allow integrating external data and/or extend engine functionality.
For information on calling external services via instance method invocation, for instance to integrate with dependency injection frameworks such as Spring or Guice, please see Section 5.17.5, “Class and Event-Type Variables”.
For information on input and output adapters that connect to an event transport and perform event transformation for incoming and outgoing on-the-wire event data, for use with streaming data, please see the EsperIO reference documentation. The data flow instances as described in Chapter 20, EPL Reference: Data Flow are an easy way to plug in operators that perform input and output. Data flows allow providing parameters and managing individual flows independent of engine lifecycle. Also consider using the Plug-in Loader API for creating a new adapter that starts or stops as part of the CEP engine initialization and destroy lifecycle, see Section 14.16, “Plug-In Loader”.
To join data that resides in a relational database and that is accessible via JDBC driver and SQL statement the engine offers a syntax for using SQL within EPL, see Section 5.13, “Accessing Relational Data via SQL”. A relational database input and output adapter for streaming input from and output to a relational database also exists (EsperIO).
To join data that resides in a non-relational store the engine offers a two means: First, the virtual data window, as described below, for transparently integrating the external store as a named window. The second mechanism is a special join syntax based on static method invocation, see Section 5.14, “Accessing Non-Relational Data via Method, Script or UDF Invocation”.
The best way to test that your extension code works correctly is to write unit tests against an EPL statement that utilizes the extension code. Samples can be obtained from Esper regression test code base.
For all extension code and similar to listeners and subscribers, to send events into the engine from extension code the route
method should be used (and not sendEvent
) to avoid the possibility of stack overflow due to event-callback looping and ensure correct processing of the current and routed event.
Note that if outbound-threading is enabled, listeners and subscribers should use sendEvent
and not route
.
For all extension code it is not safe to administrate the engine within the extension code. For example, it is not safe to implement a data window that creates a new statement or destroys an existing statement.
Use a virtual data window if you have a (large) external data store that you want to access as a named window. The access is transparent: There is no need to use special syntax or join syntax. All regular queries including subqueries, joins, on-merge, on-select, on-insert, on-delete, on-update and fire-and-forget are supported with virtual data windows.
There is no need to keep any data or events in memory with virtual data windows. The only requirement for virtual data windows is that all data rows returned are EventBean
instances.
When implementing a virtual data window it is not necessary to send any events into the engine or to use insert-into. The event content is simply assumed to exist and accessible to the engine via the API implementation you provide.
The distribution ships with a sample virtual data window in the examples folder under the name virtualdw
. The code snippets below are extracts from the example.
We use the term store here to mean a source set of data that is managed by the virtual data window. We use the term store row or just row to mean a single data item provided by the store. We use the term lookup to mean a read operation against the store returning zero, one or many rows.
Virtual data windows allow high-performance low-latency lookup by exposing all relevant EPL query access path information. This makes it possible for the virtual data window to choose the desired access method into its store.
The following steps are required to develop and use a virtual data window:
Implement the interface com.espertech.esper.client.hook.VirtualDataWindowFactory
.
Implement the interface com.espertech.esper.client.hook.VirtualDataWindow
.
Implement the interface com.espertech.esper.client.hook.VirtualDataWindowLookup
.
Register the factory class in the engine configuration.
Once you have completed above steps, the virtual data window is ready to use in EPL statements.
From a threading perspective, virtual data window implementation classes must be thread-safe if objects are shared between multiple named windows. If no objects are shared between multiple different named windows, thereby each object is only used for the same named window and other named windows receive a separate instance, it is no necessary that the implementation classes are thread-safe.
Your application must first register the virtual data window factory as part of engine configuration:
Configuration config = new Configuration(); config.addPlugInVirtualDataWindow("sample", "samplevdw", SampleVirtualDataWindowFactory.class.getName());
Your application may then create a named window backed by a virtual data window.
For example, assume that the SampleEvent
event type is declared as follows:
create schema SampleEvent as (key1 string, key2 string, value1 int, value2 double)
The next EPL statement creates a named window MySampleWindow
that provides SampleEvent
events and is backed by a virtual data window provided by SampleVirtualDataWindowFactory
as configured above:
create window MySampleWindow.sample:samplevdw() as SampleEvent
You may then access the named window, same as any other named window, for example by subquery, join, on-action, fire-and-forget query or by consuming its insert and remove stream. While this example uses Map-type events, the example code is the same for POJO or other events.
Your application may obtain a reference to the virtual data window from the engine context.
This code snippet looks up the virtual data window by the named window name:
try { return (VirtualDataWindow) epService.getContext().lookup("/virtualdw/MySampleWindow"); } catch (NamingException e) { throw new RuntimeException("Failed to look up virtual data window, is it created yet?"); }
When you application registers a subquery, join or on-action query or executes a fire-and-forget query against a virtual data window the engine interacts with the virtual data window. The interaction is a two-step process.
At time of EPL statement creation (once), the engine analyzes the EPL where-clause, if present. It then compiles a list of hash-index and binary tree (btree, i.e. sorted) index properties. It passes the property names that are queried as well as the operators (i.e. =, >, range etc.) to the virtual data window. The virtual data window returns a lookup strategy object to the engine.
At time of EPL statement execution (repeatedly as triggered) , the engine uses that lookup strategy object to execute a lookup. It passes to the lookup all actual key values (hash, btree including ranges) to make fast and efficient lookup achievable.
To explain in detail, assume that your application creates an EPL statement with a subquery as follows:
select (select * from MySampleWindow where key1 = 'A1') from OtherEvent
At the time of creation of the EPL query above the engine analyzes the EPL query. It determines that the subquery queries a virtual data window. It determines from the where-clause that the lookup uses property key1
and hash-equals semantics. The engine then provides this information as part of VirtualDataWindowLookupContext
passed to the getLookup
method. Your application may inspect hash and btree properties and may determine the appropriate store access method to use.
The hash and btree property lookup information is for informational purposes, to enable fast and performant queries that returns the smallest number of rows possible. Your implementation classes may use some or none of the information provided and may also instead return some or perhaps even all rows, as is practical to your implementation. The where
-clause still remains in effect and gets evaluated on all rows that are returned by the lookup strategy.
Following the above example, the sub-query executes once when a OtherEvent
event arrives. At time of execution the engine delivers the string value A1
to the VirtualDataWindowLookup
lookup implementation provided by your application. The lookup object queries the store and returns
store rows as EventBean
instances.
As a second example, consider an EPL join statement as follows:
select * from MySampleWindow, MyTriggerEvent where key1 = trigger1 and key2 = trigger2
The engine analyzes the query and passes to the virtual data window the information that the lookup occurs on properties key1
and key2
under hash-equals semantics. When a MyTriggerEvent
arrives,
it passes the actual value of the trigger1
and trigger2
properties of the current MyTriggerEvent to the lookup.
As a last example, consider an EPL fire-and-forget statement as follows:
select * from MySampleWindow key1 = 'A2' and value1 between 0 and 1000
The engine analyzes the query and passes to the virtual data window the lookup information. The lookup occurs on property key1
under hash-equals semantics and on property value1
under btree-open-range semantics. When you application
executes the fire-and-forget query the engine passes A2
and the range endpoints 0
and 1000
to the lookup.
For more information, please consult the JavaDoc API documentation for class com.espertech.esper.client.hook.VirtualDataWindow
, VirtualDataWindowLookupContext
or VirtualDataWindowLookupFieldDesc
.
For each named window that refers to the virtual data window, the engine instantiates one instance of the factory.
A virtual data window factory class is responsible for the following functions:
Implement the initialize
method that accepts a virtual data window factory context object as a parameter.
Implement the create
method that accepts a virtual data window context object as a parameter and returns a VirtualDataWindow
implementation.
Implement the destroyAllContextPartitions
method that gets called once when the named window is stopped or destroyed.
The engine instantiates a VirtualDataWindowFactory
instance for each named window created via create window
. The engine invokes the initialize
method once in respect to the named window being created passing a VirtualDataWindowFactoryContext
context object.
If not using contexts, the engine calls the create
method once after calling the initialize
method. If using contexts, the engine calls the create
method every time it allocates a context partition.
If using contexts and your virtual data window implementation operates thread-safe, you may return the same virtual data window implementation object for each context partition. If using contexts and your implementation object is not thread safe, return a separate thread-safe implementation object for each context partition.
The engine invokes the destroyAllContextPartitions
once when the named window is stopped or destroyed. If not using contexts, the engine calls the destroy
method of the virtual data window implementation object before calling
the destroyAllContextPartitions
method on the factory object. If using contexts, the engine calls the destroy
method on each instance associates to a context partition at the time when the associated context partition terminates.
The sample code shown here can be found among the examples in the distribution under virtualdw
:
public class SampleVirtualDataWindowFactory implements VirtualDataWindowFactory { public void initialize(VirtualDataWindowFactoryContext factoryContext) { // Can add initialization logic here. } public VirtualDataWindow create(VirtualDataWindowContext context) { // This example allocates a new virtual data window (one per context partitions if using contexts). // For sharing the virtual data window instance between context partitions, return the same reference. return new SampleVirtualDataWindow(context); } public void destroyAllContextPartitions() { // Release shared resources here } }
Your factory class must implement the create
method which receives a VirtualDataWindowContext
object. This method is called once for each EPL that creates a virtual data window (see example create window
above).
The VirtualDataWindowContext
provides to your application:
String namedWindowName; // Name of named window being created. Object[] parameters; // Any optional parameters provided as part of create-window. EventType eventType; // The event type of events. EventBeanFactory eventFactory; // A factory for creating EventBean instances from store rows. VirtualDataWindowOutStream outputStream; // For stream output to consuming statements. AgentInstanceContext agentInstanceContext; // Other EPL statement information in statement context.
When using contexts you can decide whether your factory returns a new virtual data window for each context partition or returns the same virtual data window instance for all context partitions. Your extension code may refer to the named window name to identify the named window and may refer to the agent instance context that holds the agent instance id which is the id of the context partition.
A virtual data window implementation is responsible for the following functions:
Accept the lookup context object as a parameter and return the VirtualDataWindowLookup
implementation.
Optionally, post insert and remove stream data.
Implement the destroy
method, which the engine calls for each context partition when the named window is stopped or destroyed, or once when a context partition is ended/terminated.
The sample code shown here can be found among the examples in the distribution under virtualdw
.
The implementation class must implement the VirtualDataWindow
interface like so:
public class SampleVirtualDataWindow implements VirtualDataWindow { private final VirtualDataWindowContext context; public SampleVirtualDataWindow(VirtualDataWindowContext context) { this.context = context; } ...
When the engine compiles an EPL statement and detects a virtual data window, the engine invokes the getLookup
method indicating hash and btree access path information by passing a VirtualDataWindowLookupContext
context. The lookup method must return a VirtualDataWindowLookup
implementation that the EPL statement
uses for all lookups until the EPL statement is stopped or destroyed.
The sample implementation does not use the hash and btree access path information and simply returns a lookup object:
public VirtualDataWindowLookup getLookup(VirtualDataWindowLookupContext desc) { // Place any code that interrogates the hash-index and btree-index fields here. // Return the lookup strategy. return new SampleVirtualDataWindowLookup(context); }
If your virtual data window returns null instead of a lookup object, the EPL query creation fails and throws an EPStatementException
.
The engine calls the update
method when data changes because of on-merge, on-delete, on-update or insert-into. For example, if you have an on-merge statement that is triggered and that updates the virtual data window, the newData
parameter receives the new (updated) event and the oldData
parameters receives the event prior to the update. Your code may use these events to update the store or delete from the store, if needed.
If your application plans to consume data from the virtual data window, for example via select * from MySampleWindow
, then the code must implement the update
method to forward insert and remove stream events, as shown below, to receive the events in consuming statements. To post insert and remove stream data, use the VirtualDataWindowOutStream
provided by the context object as follows.
public void update(EventBean[] newData, EventBean[] oldData) { // This sample simply posts into the insert and remove stream what is received. context.getOutputStream().update(newData, oldData); }
Your application should not use VirtualDataWindowOutStream
to post new events that originate from the store. The object is intended for use with on-action EPL statements. Use insert-into instead for any new events that originate from the store.
A lookup implementation is responsible for the following functions:
Accept the lookup values as a parameter and return a set of EventBean
instances.
The sample code shown here can be found among the examples in the distribution under virtualdw
.
The implementation class must implement the VirtualDataWindowLookup
interface:
public class SampleVirtualDataWindowLookup implements VirtualDataWindowLookup { private final VirtualDataWindowContext context; public SampleVirtualDataWindowLookup(VirtualDataWindowContext context) { this.context = context; } ...
When an EPL query fires, the engine invokes the lookup and provides the actual lookup values. The lookup values are provided in the same exact order as the access path information that the engine provided when obtaining the lookup.
Each store row must be wrapped as an EventBean
instance. The context object provides an EventBeanFactory
implementation returned by getEventFactory()
that can be used to wrap rows.
The sample implementation does not use the lookup values and simply returns a hardcoded sample event:
public Set<EventBean> lookup(Object[] lookupValues) { // Add code to interogate lookup values here. // Create sample event. // This example uses Map events; Other underlying events such as POJO are exactly the same code. Map<String, Object> eventData = new HashMap<String, Object>(); eventData.put("key1", "sample1"); eventData.put("key2", "sample2"); eventData.put("value1", 100); eventData.put("value2", 1.5d); EventBean event = context.getEventFactory().wrap(eventData); return Collections.singleton(event); }
The lookupValues
object array represents all actual joined property values or expression results if you where-clause criteria are expressions. The code may use these keys to for efficient store access.
When a key value is a range, the key value is an instance of VirtualDataWindowKeyRange
.
Single-row functions return a single value. They are not expected to aggregate rows but instead should be stateless functions. These functions can appear in any expressions and can be passed any number of parameters.
The following steps are required to develop and use a custom single-row function with Esper.
Implement a class providing one or more public static methods accepting the number and type of parameters as required.
Register the single-row function class and method name with the engine by supplying a function name, via the engine configuration file or the configuration API.
You may not override a built-in function with a single-row function provided by you. The single-row function you register must have a different name then any of the built-in functions.
An example single-row function can also be found in the examples under the runtime configuration example.
Single-row function classes have no further requirement then provide a public static method.
The following sample single-row function simply computes a percentage value based on two number values.
This sample class provides a public static method by name computePercent
to return a percentage value:
public class MyUtilityClass { public static double computePercent(double amount, double total) { return amount / total * 100; } }
The class name of the class, the method name and the function name of the new single-row function must be added to the engine configuration via the configuration API or using the XML configuration file. The configuration shown below is XML however the same options are available through the runtime and static configuration API:
<esper-configuration <plugin-singlerow-function name="percent" function-class="mycompany.MyUtilityClass" function-method="computePercent" /> </esper-configuration>
Note that the function name and method name need not be the same.
The new single-row function is now ready to use in a statement:
select percent(fulfilled,total) from MyEvent
When selecting from a single stream, you may also pass wildcard to the single-row function and the function receives the underlying event:
select percent(*) from MyEvent
If the single-row function returns an object that provides further functions, you may chain function calls.
The following demonstrates a chained single-row function. The example assumes that a single-row function by name calculator
returns an object that provides the add
function which accepts two parameters:
select calculator().add(5, amount) from MyEvent
When a single-row function receives parameters that are all constant values or expressions that themselves receive only constant values, Esper can pre-evaluate the result of the single-row function at time of statement creation. By default, Esper does not pre-evaluate the single-row function unless you configure the value cache as enabled.
The following configuration XML enables the value cache for the single-row function:
<esper-configuration <plugin-singlerow-function name="getDate" function-class="mycompany.DateUtil" function-method="parseDate" value-cache="enabled" /> </esper-configuration>
When the single-row function receives constants as parameters, the engine computes the result once and returns the cached result for each evaluation:
select getDate('2002-05-30T9:00:00.000') from MyEvent
Your EPL may use plug-in single row functions among the predicate expressions as part of the filters in a stream or pattern.
For example, the EPL below uses the function computeHash
as part of a predicate expression:
select * from MyEvent(computeHash(field) = 100)
When you have many EPL statements or many context partitions that refer to the same function, event type and parameters in a predicate expression, the engine may optimize evaluation: The function gets evaluated only once per event.
While the optimization is enabled by default for all plug-in single row functions, you can also disable the optimization for a specific single-row function. By disabling the optimization for a single-row function the engine may use less memory to identify reusable function footprints but may cause the engine to evaluate each function more frequently then necessary.
The following configuration XML disables the filter optimization for a single-row function (by default it is enabled):
<esper-configuration <plugin-singlerow-function name="computeHash" function-class="mycompany.HashUtil" function-method="computeHash" filter-optimizable="disabled" /> </esper-configuration>
Esper allows parameters to a single-row function to be events. In this case, declare the method parameter type to either take EventBean
, Collection<EventBean>
or the underlying class as a parameter.
Sample method footprints are:
public static double doCompute(EventBean eventBean) {...} public static boolean doCheck(MyEvent myEvent, String text) {...} public static String doSearch(Collection<EventBean> events) {...}
To pass the event, specify the stream alias, or wildcard (*)
or the tag name when used in a pattern.
The EPL below shows example uses:
select * from MyEvent(doCompute(me) = 100) as me
select * from MyEvent where doCompute(*) = 100
select * from pattern[a=MyEvent -> MyEvent(doCheck(a, 'sometext'))]
select * from MyEvent#time(1 min) having doCompute(last(*))]
select * from MyEvent#time(1 min) having doSearch(window(*))]
Declare the method parameter as Collection<EventBean>
if the method expects
an expression result that returns multiple events.
Declare the method parameter as EventBean
if the method expects
an expression result that returns a single event.
A single-row function may return events. Please declare your single-row function method to return Collection<EventBean>
or EventBean[]
and configure the event type name.
For example, assuming there is an MyItem
event type such as created via create schema MyItem(id string)
:
public static EventBean[] myItemProducer(String string, EPLMethodInvocationContext context) { String[] split = string.split(","); EventBean[] events = new EventBean[split.length]; for (int i = 0; i < split.length; i++) { events[i] = context.getEventBeanService().adapterForMap(Collections.singletonMap("id", split[i]), "MyItem"); } return events; }
The sample EPL queries items filtering those items that have a given value for the id
field:
select myItemProducer(ordertext).where(v => v.id in ('id1', 'id3')) as c0 from Order
This sample code register the myItemProducer
function as a single-row function with an event type name:
ConfigurationPlugInSingleRowFunction entry = new ConfigurationPlugInSingleRowFunction(); entry.setName("myItemProducer"); entry.setFunctionClassName(...); entry.setFunctionMethodName(...); entry.setEventTypeName("MyItem"); epService.getEPAdministrator().getConfiguration().addPlugInSingleRowFunction(entry);
If your single row function returns EventBean[]
and is used with enumeration methods the configuration must provide an event type name.
Esper can pass an object containing contextual information such as statement name, function name, engine URI and context partition id to your
method. The container for this information is EPLMethodInvocationContext
in package com.espertech.esper.client.hook
.
Please declare your method to take EPLMethodInvocationContext
as the last parameter. The engine then passes the information along.
A sample method footprint and EPL are shown below:
public static double computeSomething(double number, EPLMethodInvocationContext context) {...}
select computeSomething(10) from MyEvent
By default the engine logs any exceptions thrown by the single row function and returns a null value. To have exceptions be re-thrown instead, which makes exceptions visible to any registered exception handler, please configure as discussed herein.
Set the rethrow-exceptions
flag in the XML configuration or the rethrowExceptions
flag in the API
when registering the single row function to have the engine re-throw any exceptions that the single row function may throw.
Views in Esper are used to derive information from an event stream, and to represent data windows onto an event stream. This chapter describes how to plug-in a new, custom view.
The following steps are required to develop and use a custom view with Esper.
Implement a view factory class. View factories are classes that accept and check view parameters and instantiate the appropriate view class.
Implement a view class. A view class commonly represents a data window or derives new information from a stream.
Configure the view factory class supplying a view namespace and name in the engine configuration file.
The example view factory and view class that are used in this chapter can be found in the examples source folder in the OHLC (open-high-low-close) example. The class names are OHLCBarPlugInViewFactory
and OHLCBarPlugInView
.
Views can make use of the following engine services available via StatementServiceContext
:
The SchedulingService
interface allows views to schedule timer callbacks to a view
The EventAdapterService
interface allows views to create new event types and event instances of a given type.
The StatementStopService
interface allows view to register a callback that the engine invokes to indicate that the view's statement has been stopped
Section 17.4.3, “View Contract” outlines the requirements for correct behavior of a your custom view within the engine.
Note that custom views may use engine services and APIs that can be subject to change between major releases. The engine services discussed above and view APIs are considered part of the engine internal public API and are stable. Any changes to such APIs are disclosed through the release change logs and history. Please also consider contributing your custom view to the Esper project team by submitting the view code through the mailing list or via a JIRA issue.
A view factory class is responsible for the following functions:
Accept zero, one or more view parameters. View parameters are themselves expressions. The view factory must validate and evaluate these expressions.
Instantiate the actual view class.
Provide information about the event type of events posted by the view.
View factory classes simply subclass com.espertech.esper.view.ViewFactorySupport
:
public class OHLCBarPlugInViewFactory extends ViewFactorySupport { ...
Your view factory class must implement the setViewParameters
method to accept and parse view parameters. The next code snippet shows an implementation of this method. The code checks the number of parameters and retains the parameters passed to the method:
public class OHLCBarPlugInViewFactory extends ViewFactorySupport { private ViewFactoryContext viewFactoryContext; private List<ExprNode> viewParameters; private ExprNode timestampExpression; private ExprNode valueExpression; public void setViewParameters(ViewFactoryContext viewFactoryContext, List<ExprNode> viewParameters) throws ViewParameterException { this.viewFactoryContext = viewFactoryContext; if (viewParameters.size() != 2) { throw new ViewParameterException( "View requires a two parameters: " + "the expression returning timestamps and the expression supplying OHLC data points"); } this.viewParameters = viewParameters; } ...
After the engine supplied view parameters to the factory, the engine will ask the view to attach to its parent view and validate any parameter expressions against the parent view's event type. If the view will be generating events of a different type then the events generated by the parent view, then the view factory can create the new event type in this method:
public void attach(EventType parentEventType, StatementContext statementContext, ViewFactory optionalParentFactory, List<ViewFactory> parentViewFactories) throws ViewParameterException { ExprNode[] validatedNodes = ViewFactorySupport.validate("OHLC view", parentEventType, statementContext, viewParameters, false); timestampExpression = validatedNodes[0]; valueExpression = validatedNodes[1]; if ((timestampExpression.getExprEvaluator().getType() != long.class) && (timestampExpression.getExprEvaluator().getType() != Long.class)) { throw new ViewParameterException( "View requires long-typed timestamp values in parameter 1"); } if ((valueExpression.getExprEvaluator().getType() != double.class) && (valueExpression.getExprEvaluator().getType() != Double.class)) { throw new ViewParameterException( "View requires double-typed values for in parameter 2"); } }
Finally, the engine asks the view factory to create a view instance, and asks for the type of event generated by the view:
public View makeView(AgentInstanceViewFactoryChainContext agentInstanceViewFactoryContext) { return new OHLCBarPlugInView(agentInstanceViewFactoryContext, timestampExpression, valueExpression); } public EventType getEventType() { return OHLCBarPlugInView.getEventType(viewFactoryContext.getEventAdapterService()); }
A view class is responsible for:
The setParent
method informs the view of the parent view's event type
The update
method receives insert streams and remove stream events from its parent view
The iterator
method supplies an (optional) iterator to allow an application to pull or request results from an EPStatement
View classes simply subclass com.espertech.esper.view.ViewSupport
:
public class MyTrendSpotterView extends ViewSupport { ...
Your view's update
method will be processing incoming (insert stream) and outgoing (remove stream) events posted by the parent view (if any), as well as providing incoming and outgoing events to child views. The convention required of your update method implementation is that the view releases any insert stream events (EventBean object references) which the view generates as reference-equal remove stream events (EventBean object references) at a later time.
The view implementation must call the updateChildren
method to post outgoing insert and remove stream events. Similar to the update
method, the updateChildren
method takes insert and remove stream events as parameters.
A sample update
method implementation is provided in the OHLC example.
The update
method must adhere to the following conventions, to prevent memory leaks and to enable correct behavior within the engine:
A view implementation that posts events to the insert stream must post unique EventBean
object references as insert stream events, and cannot post the same EventBean
object reference multiple times. The underlying event to the EventBean
object reference can be the same object reference, however the EventBean
object reference posted by the view into the insert stream must be a new instance for each insert stream event.
If the custom view posts a continuous insert stream, then the views must also post a continuous remove stream (second parameter to the updateChildren
method). If the view does not post remove stream events, it assumes unbound keep-all semantics.
EventBean
events posted as remove stream events must be the same object reference as the EventBean
events posted as insert stream by the view. Thus remove stream events posted by the view (the EventBean
instances, does not affect the underlying representation) must be reference-equal to insert stream events posted by the view as part of an earlier invocation of the update method, or the same invocation of the update method.
EventBean
events represent a unique observation. The values of the observation can be the same, thus the underlying representation of an EventBean
event can be reused, however event property values must be kept immutable and not be subject to change.
Array elements of the insert and remove stream events must not carry null values. Array size must match the number of EventBean
instances posted. It is recommended to use a null
value for no insert or remove stream events rather then an empty zero-size array.
Your view implementation can register a callback indicating when a statement using the view, or a context partition using the view, is stopped or terminated. Your
view code must implement, or provide an implementation, of the com.espertech.esper.util.StopCallback
interface. Register the stop callback in order for the engine to invoke the callback:
agentInstanceContext.getTerminationCallbacks().add(this);
Please refer to the sample views for a code sample on how to implement the iterator
method.
In terms of multiple threads accessing view state, there is no need for your custom view factory or view implementation to perform any synchronization to protect internal state. The iterator of the custom view implementation does also not need to be thread-safe. The engine ensures the custom view executes in the context of a single thread at a time. If your view uses shared external state, such external state must be still considered for synchronization when using multiple threads.
The view factory class name as well as the view namespace and name for the new view must be added to the engine configuration via the configuration API or using the XML configuration file. The configuration shown below is XML however the same options are available through the configuration API:
<esper-configuration <plugin-view namespace="custom" name="ohlc" factory-class="com.espertech.esper.example.ohlc.OHLCBarPlugInViewFactory" /> </esper-configuration>
The new view is now ready to use in a statement:
select * from StockTick.custom:ohlc(timestamp, price)
Note that the view must implement additional interfaces if it acts as a data window view, or works in a grouping context, as discussed in detail below.
Your custom view may represent an expiry policy and may retain events and thus act as a data window view. In order to allow the engine to validate that your view can be used with named windows, which allow only data window views, this section documents any additional requirement that your classes must fulfill.
Your view factory class must implement the com.espertech.esper.view.DataWindowViewFactory
interface. This marker interface (no methods required) indicates that your view factory provides only data window views.
Your view class must implement the com.espertech.esper.view.DataWindowView
interface. This interface indicates that your view is a data window view and therefore eligible to be used in any construct that requires a data window view. The DataWindowView
interface extends the ViewDataVisitable
interface. Please provide an empty implementation method for the visitView
method as required by ViewDataVisitable
(the default behavior is sufficient).
Your custom view may compute derived information from the arriving stream, instead of retaining events, and thus act as a derived-value view.
Your view class should implement the com.espertech.esper.view.DerivedValueView
interface. This marker interface indicates that your view is a derived-value view,
affecting correct behavior of the view when used in joins.
Grouped views are views that operate under the #groupwin
view. When operating under #groupwin
, the engine instantiates a single instance when the statement starts, and a new instance per group criteria dynamically as new group criteria become known.
The next statement shows EPL for using a view instance per grouping criteria:
select * from StockTick#groupwin(symbol).custom:trendspotter(price)
Your view must implement the com.espertech.esper.view.GroupableView
interface to indicate that the view can handle grouped windows.
Aggregation functions are stateful functions that aggregate events, event property values or expression results. Examples for built-in aggregation functions are count(*)
, sum(price * volume)
, window(*)
or maxby(volume)
.
Esper allows two different ways for your application to provide aggregation functions. We use the name aggregation single-function and aggregation multi-function for the two independent extension APIs for aggregation functions.
The aggregation single-function API is simple to use however it imposes certain restrictions on how expressions that contain aggregation functions share state and are evaluated.
The aggregation multi-function API is more powerful and provides control over how expressions that contain aggregation functions share state and are evaluated.
The next table compares the two aggregation function extension API's:
Table 17.1. Aggregation Function Extension API's
Single-Function | Multi-Function | |
---|---|---|
Return Value | Can only return a single value or object. Cannot return an EventBean event, collection of EventBean events or collection or array of values for use with enumeration methods, for example. | Can return an EventBean event, a collection of EventBean events or a collection or array of objects for use with enumeration methods or to access event properties. |
Complexity of API | Simple (consists of 2 interfaces). | More complex (consists of 6 interfaces). |
State Sharing | State and parameter evaluation shared if multiple aggregation functions of the same name in the same statement (and context partition) take the exact same parameter expressions. | State and parameter evaluation sharable when multiple aggregation functions of a related name (related thru configuration) for the same statement (and context partition) exist, according to a sharing-key provided by your API implementation. |
Function Name | Each aggregation function expression receives its own factory object. | Multiple related aggregation function expressions share a single factory object. |
Distinct Keyword | Handled by the engine transparently. | Indicated to the API implementation only. |
The following sections discuss developing an aggregation single-function first, followed by the subject of developing an aggregation multi-function.
The aggregation multi-function API is a powerful and lower-level API to extend the engine. Any classes that are not part of the client
, plugin
or agg.access
package are subject to change between minor and major releases of the engine.
This section describes the aggregation single-function extension API for providing aggregation functions.
The following steps are required to develop and use a custom aggregation single-function with Esper.
Implement an aggregation function factory by implementing the interface com.espertech.esper.client.hook.AggregationFunctionFactory
.
Implement an aggregation function by implementing the interface com.espertech.esper.epl.agg.aggregator.AggregationMethod
.
Register the aggregation single-function factory class with the engine by supplying a function name, via the engine configuration file or the runtime and static configuration API.
The optional keyword distinct
ensures that only distinct (unique) values are aggregated and duplicate values are ignored by the aggregation function. Custom plug-in aggregation single-functions do not need to implement the logic to handle distinct
values. This is because when the engine encounters the distinct
keyword, it eliminates any non-distinct values before passing the value for aggregation to the custom aggregation single-function.
Custom aggregation functions can also be passed multiple parameters, as further described in Section 17.5.1.4, “Aggregation Single-Function: Accepting Multiple Parameters”. In the example below the aggregation function accepts a single parameter.
The code for the example aggregation function as shown in this chapter can be found in the runtime configuration example in the package com.espertech.esper.example.runtimeconfig
by the name MyConcatAggregationFunction
. The sample function simply concatenates string-type values.
An aggregation function factory class is responsible for the following functions:
Implement a setFunctionName
method that receives the function name assigned to this instance.
Implement a validate
method that validates the value type of the data points that the function must process.
Implement a getValueType
method that returns the type of the aggregation value generated by the aggregation function instances. For example, the built-in count
aggregation function returns Long.class
as it generates long
-typed values.
Implement a newAggregator
method that instantiates and returns an aggregation function instance.
Aggregation function classes implement the interface com.espertech.esper.client.hook.AggregationFunctionFactory
:
public class MyConcatAggregationFunctionFactory implements AggregationFunctionFactory { ...
The engine generally constructs one instance of the aggregation function factory class for each time the function is listed in an EPL statement, however the engine may decide to reduce the number of aggregation class instances if it finds equivalent aggregations.
The aggregation function factory instance receives the aggregation function name via set setFunctionName
method.
The sample concatenation function factory provides an empty setFunctionName
method:
public void setFunctionName(String functionName) { // no action taken }
An aggregation function factory must provide an implementation of the validate
method that is passed a AggregationValidationContext
validation context object. Within the validation context you find the result type of each of the parameters expressions to the aggregation function as well as information about constant values and data window use. Please see the JavaDoc API documentation for a comprehensive list of validation context information.
Since the example concatenation function requires string types, it implements a type check:
public void validate(AggregationValidationContext validationContext) { if ((validationContext.getParameterTypes().length != 1) || (validationContext.getParameterTypes()[0] != String.class)) { throw new IllegalArgumentException("Concat aggregation requires a single parameter of type String"); } }
In order for the engine to validate the type returned by the aggregation function against the types expected by enclosing expressions, the getValueType
must return the result type of any values produced by the aggregation function:
public Class getValueType() { return String.class; }
Finally the factory implementation must provide a newAggregator
method that returns instances of AggregationMethod
. The engine invokes this method for each new aggregation state to be allocated.
public AggregationMethod newAggregator() { return new MyConcatAggregationFunction(); }
An aggregation function class is responsible for the following functions:
Implement an enter
method that the engine invokes to add a data point into the aggregation, when an event enters a data window
Implement a leave
method that the engine invokes to remove a data point from the aggregation, when an event leaves a data window
Implement a getValue
method that returns the current value of the aggregation.
Implement a clear
method that resets the current value.
Aggregation function classes implement the interface AggregationMethod
:
public class MyConcatAggregationFunction implements AggregationMethod { ...
The class that provides the aggregation and implements AggregationMethod
does not have to be threadsafe.
The constructor initializes the aggregation function:
public class MyConcatAggregationFunction implements AggregationMethod { private final static char DELIMITER = ' '; private StringBuilder builder; private String delimiter; public MyConcatAggregationFunction() { builder = new StringBuilder(); delimiter = ""; } ...
The enter
method adds a datapoint to the current aggregation value. The example enter
method shown below adds a delimiter and the string value to a string buffer:
public void enter(Object value) { if (value != null) { builder.append(delimiter); builder.append(value.toString()); delimiter = String.valueOf(DELIMITER); } }
Conversly, the leave
method removes a datapoint from the current aggregation value. The example leave
method removes from the string buffer:
public void leave(Object value) { if (value != null) { builder.delete(0, value.toString().length() + 1); } }
Finally, the engine obtains the current aggregation value by means of the getValue
method:
public Object getValue() { return builder.toString(); }
For on-demand queries the aggregation function must support resetting its value to empty or start values. Implement the clear
function to reset the value as shown below:
public void clear() { builder = new StringBuilder(); delimiter = ""; }
The aggregation function class name as well as the function name for the new aggregation function must be added to the engine configuration via the configuration API or using the XML configuration file. The configuration shown below is XML however the same options are available through the configuration API:
<esper-configuration <plugin-aggregation-function name="concat" factory-class="com.espertech.esper.example.runtimeconfig.MyConcatAggregationFunctionFactory" /> </esper-configuration>
The new aggregation function is now ready to use in a statement:
select concat(symbol) from StockTick#length(3)
Your plug-in aggregation function may accept multiple parameters, simply by casting the Object parameter of the enter
and leave
method to Object[]
.
For instance, assume an aggregation function rangeCount
that counts all values that fall into a range of values. The EPL that calls this function and provides a lower and upper bounds of 1 and 10 is:
select rangeCount(1, 10, myValue) from MyEvent
The enter
method of the plug-in aggregation function may look as follows:
public void enter(Object value) { Object[] params = (Object[]) value; int lower = (Integer) params[0]; int upper = (Integer) params[1]; int val = (Integer) params[2]; if ((val >= lower) && (val <= upper)) { count++; } }
Your plug-in aggregation function may want to validate parameter types or may want to know which parameters are constant-value expressions. Constant-value expressions are evaluated only once by the engine and could
therefore be cached by your aggregation function for performance reasons. The engine provides constant-value information as part of the AggregationValidationContext
passed to the validate
method.
When an EPL statement provides the filter
named parameter the value of the filter expression is a boolean
-type value
that the engine determines and provides to your enter
method as the last value in the parameter array.
For instance, assume an aggregation function concat
that receives a word
value and that has a filter
expression as parameters:
select concat(word, filter: word not like '%jim%') from MyWordEvent
The enter
method of the plug-in aggregation function may look as follows:
public void enter(Object value) { Object[] arr = (Object[]) value; Boolean pass = (Boolean) arr[1]; if (pass != null && pass) { buffer.append(arr[0].toString()); } }
Your code can obtain the actual filter expression from the AggregationValidationContext
that is passed to the validate
method and that
returns the named parameters via getNamedParameters
.
When the custom aggregation function returns an object as a return value, the EPL can use parenthesis and the dot-operator to invoke methods on the return value.
The following example assumes that the myAggregation
custom aggregation function returns an object that has getValueOne
and getValueTwo
methods:
select (myAggregation(myValue)).getValueOne(), (myAggregation(myValue)).getValueTwo() from MyEvent
Since the above EPL aggregates the same value, the engine internally uses a single aggregation to represent the current value of myAggregation
(and not two instances of the aggregation, even though myAggregation
is listed twice).
This section introduces the aggregation multi-function API. Please refer to the JavaDoc for more complete class and method-level documentation.
Among the Esper examples is an example use of the aggregation multi-function API in the example by name Cycle-Detect.
Cycle-Detect takes incoming transaction events that have from-account and to-account fields.
The example detects a cycle in the transactions between accounts in order to detect a possible transaction fraud.
Please note that the graph and cycle detection logic of the example is not part of Esper:
The example utilizes the jgrapht
library.
In the Cycle-Detect example, the vertices of a graph are the account numbers. For example the account numbers Acct-1
, Acct-2
and Acct-3
.
In the graph the edges are transaction events that identify a from-account and a to-account. An example edge is {from:Acct-1, to:Acct-2}
.
An example cycle is therefore in the three transactions {from:Acct-1, to:Acct-2}
, {from:Acct-2, to:Acct-3}
and {from:Acct-3, to:Acct-1}
.
The code for the example aggregation multi-function as shown in this chapter can be found in the Cycle-Detect example in the package com.espertech.esper.example.cycledetect
.
The example provides two aggregation functions named cycledetected
and cycleoutput
:
The cycledetected
function returns a boolean value whether a graph cycle is found or not.
The cycleoutput
function outputs the vertices (account numbers) that are part of the graph cycle.
In the Cycle-Detect example, the following statement utilizes the two functions cycledetected
and cycleoutput
that
share the same graph state to detect a cycle among the last 1000 events:
@Name('CycleDetector') select cycleoutput() as cyclevertices from TransactionEvent#length(1000) having cycledetected(fromAcct, toAcct)
If instead the goal is to run graph cycle detection every 1 second (and not upon arrival of a new event), this sample EPL statement uses a pattern to trigger cycle detection:
@Name('CycleDetector') select (select cycleoutput(fromAcct, toAcct) from TransactionEvent#length(1000)) as cyclevertices from pattern [every timer:interval(1)]
The following steps are required to develop and use a custom aggregation multi-function with Esper.
Implement an aggregation multi-function factory by implementing the interface com.espertech.esper.plugin.PlugInAggregationMultiFunctionFactory
.
Implement one or more handlers for aggregation functions by implementing the interface com.espertech.esper.plugin.PlugInAggregationMultiFunctionHandler
.
Implement an aggregation state key by implementing the interface com.espertech.esper.epl.agg.access.AggregationStateKey
.
Implement an aggregation state factory by implementing the interface com.espertech.esper.plugin.PlugInAggregationMultiFunctionStateFactory
.
Implement an aggregation state holder by implementing the interface com.espertech.esper.epl.agg.access.AggregationState
.
Implement a state accessor by implementing the interface com.espertech.esper.epl.agg.access.AggregationAccessor
.
Register the aggregation multi-function factory class with the engine by supplying one or more function names, via the engine configuration file or the runtime and static configuration API.
An aggregation multi-function factory class is responsible for the following functions:
Implement the addAggregationFunction
method that receives an invocation for each aggregation function declared in the statement that matches any of the function names provided at configuration time.
Implement the validateGetHandler
method that receives an invocation for each aggregation function to be validated in the statement that matches any of the function names provided at configuration time.
Aggregation multi-function factory classes implement the interface com.espertech.esper.plugin.PlugInAggregationMultiFunctionFactory
:
public class CycleDetectorAggregationFactory implements PlugInAggregationMultiFunctionFactory { ...
The engine constructs a single instance of the aggregation multi-function factory class that is shared for all aggregation function expressions in a statement that have one of the function names provided in the configuration object.
The engine invokes the addAggregationFunction
method at the time it parses an EPL statement or compiles a statement object model (SODA API).
The method receives a declaration-time context object that provides the function name as well as additional information.
The sample Cycle-Detect factory class provides an empty addAggregationFunction
method:
public void addAggregationFunction(PlugInAggregationMultiFunctionDeclarationContext declarationContext) { // no action taken }
The engine invokes the validateGetHandler
method at the time of expression validation. It passes
a PlugInAggregationMultiFunctionValidationContext
validation context object that contains actual parameters expressions.
Please see the JavaDoc API documentation for a comprehensive list of validation context information.
The validateGetHandler
method must return a handler object the implements the PlugInAggregationMultiFunctionHandler
interface.
Return a handler object for each aggregation function expression according to the aggregation function name and its parameters that are provided in the validation
context.
The example cycledetect
function takes two parameters that provide the cycle edge (from-account and to-account):
public PlugInAggregationMultiFunctionHandler validateGetHandler(PlugInAggregationMultiFunctionValidationContext validationContext) { if (validationContext.getParameterExpressions().length == 2) { fromExpression = validationContext.getParameterExpressions()[0].getExprEvaluator(); toExpression = validationContext.getParameterExpressions()[1].getExprEvaluator(); } return new CycleDetectorAggregationHandler(this, validationContext); }
An aggregation multi-function handler class must implement the PlugInAggregationMultiFunctionHandler
interface and is responsible for the following functions:
Implement the getAccessor
method that returns a reader object for the aggregation state.
Implement the getReturnType
method that returns information about the type of return values provided by the accessor reader object.
Implement the getAggregationStateUniqueKey
method that provides a key object used by the engine to determine which aggregation functions share state.
Implement the getStateFactory
method that returns a state factory object that the engine invokes, when required, to instantiate aggregation state holders.
Typically your API returns a handler instance for each aggregation function in an EPL statement expression.
In the Cycle-Detect example, the class CycleDetectorAggregationHandler
is the handler for all aggregation functions.
public class CycleDetectorAggregationHandler implements PlugInAggregationMultiFunctionHandler { ...
The getAccessor
methods return a reader object according to whether the aggregation function name is cycledetected
or cycleoutput
:
public AggregationAccessor getAccessor() { if (validationContext.getFunctionName().toLowerCase().equals(CycleDetectorConstant.CYCLEOUTPUT_NAME)) { return new CycleDetectorAggregationAccessorOutput(); } return new CycleDetectorAggregationAccessorDetect(); }
The getReturnType
method provided by the handler instructs the engine about the return type of each aggregation accessor.
The class com.espertech.esper.client.util.ExpressionReturnType
holds return type information.
In the Cycle-Detect example the cycledetected
function returns a single boolean value. The cycleoutput
returns a collection of vertices:
public ExpressionReturnType getReturnType() { if (validationContext.getFunctionName().toLowerCase().equals(CycleDetectorConstant.CYCLEOUTPUT_NAME)) { return ExpressionReturnType.collectionOfSingleValue(factory.getFromExpression().getType()); } return ExpressionReturnType.singleValue(Boolean.class) ; }
The engine invokes the getAggregationStateUniqueKey
method to determine whether multiple aggregation function expressions
in the same statement can share the same aggregation state or should receive different aggregation state instances.
The getAggregationStateUniqueKey
method must return an instance of AggregationStateKey
.
The engine uses equals-semantics (the hashCode
and equals
methods) to determine whether multiple aggregation function share the state object.
If the key object returned for each aggregation function by the handler is an equal key object then the engine shares aggregation
state between such aggregation functions for the same statement and context partition.
In the Cycle-Detect example the state is shared, which it achieves by simply returning the same key instance:
private static final AggregationStateKey CYCLE_KEY = new AggregationStateKey() {}; public AggregationStateKey getAggregationStateUniqueKey() { return CYCLE_KEY; // Share the same aggregation state instance }
The engine invokes the getStateFactory
method to obtain an instance of PlugInAggregationMultiFunctionStateFactory
. The
state factory is responsible to instantiating separate aggregation state instances. If you statement does not have a group by
clause,
the engine obtains a single aggregation state from the state factory. If your statement has a group by
clause, the engine obtains
an aggregation state instance for each group when it encounters a new group.
In the Cycle-Detect example the method passes the expression evaluators providing the from-account and to-account expressions to the state factory:
public PlugInAggregationMultiFunctionStateFactory getStateFactory() { return new CycleDetectorAggregationStateFactory(factory.getFromExpression(), factory.getToExpression()); }
An aggregation multi-function state factory class must implement the PlugInAggregationMultiFunctionStateFactory
interface and is responsible for the following functions:
Implement the makeAggregationState
method that returns an aggregation state holder.
The engine invokes the makeAggregationState
method to obtain a new aggregation state instance before applying aggregation state.
If using group by
in your statement, the engine invokes the makeAggregationState
method to obtain a state holder for each group.
In the Cycle-Detect example, the class CycleDetectorAggregationStateFactory
is the state factory for all aggregation functions:
public class CycleDetectorAggregationStateFactory implements PlugInAggregationMultiFunctionStateFactory { private final ExprEvaluator fromEvaluator; private final ExprEvaluator toEvaluator; public CycleDetectorAggregationStateFactory(ExprEvaluator fromEvaluator, ExprEvaluator toEvaluator) { this.fromEvaluator = fromEvaluator; this.toEvaluator = toEvaluator; } public AggregationState makeAggregationState(PlugInAggregationMultiFunctionStateContext stateContext) { return new CycleDetectorAggregationState(this); } public ExprEvaluator getFromEvaluator() { return fromEvaluator; } public ExprEvaluator getToEvaluator() { return toEvaluator; } }
An aggregation multi-function state class must implement the AggregationState
interface and is responsible for the following functions:
Implement the applyEnter
method that enters events, event properties or computed values.
Optionally implement the applyLeave
method that can remove events or computed values.
Implement the clear
method to clear state.
In the Cycle-Detect example, the class CycleDetectorAggregationState
is the state for all aggregation functions.
Please review the example for more information.
An aggregation multi-function accessor class must implement the AggregationAccessor
interface and is responsible for the following functions:
Implement the Object getValue(AggregationState state)
method that returns a result object for the aggregation state.
Implement the Collection<EventBean> getEnumerableEvents(AggregationState state)
method that returns a collection of events for enumeration,
if applicable (or null).
Implement the EventBean getEnumerableEvent(AggregationState state)
method that returns an event, if applicable (or null).
In the Cycle-Detect example, the class CycleDetectorAggregationAccessorDetect
returns state for the cycledetected
aggregation function
and the CycleDetectorAggregationAccessorOutput
returns the state for the cycleoutput
aggregation function.
An aggregation multi-function configuration can receive one or multiple function names. You must also set a factory class name.
The sample XML snippet below configures an aggregation multi-function that is associated with the function names func1
and func2
.
<esper-configuration <plugin-aggregation-multifunction function-names="cycledetected,cycleoutput" // a comma-separated list of function name factory-class="com.espertech.esper.example.cycledetect.CycleDetectorAggregationFactory"/> </esper-configuration>
The next example uses the runtime configuration API to register the same:
String[] functionNames = new String[] {"cycledetected", "cycleoutput"}; ConfigurationPlugInAggregationMultiFunction config = new ConfigurationPlugInAggregationMultiFunction(functionNames, CycleDetectorAggregationFactory.class.getName()); engine.getEPAdministrator().getConfiguration().addPlugInAggregationMultiFunction(config);
The engine shares an AggregationAccessor
instance between threads. The accessor should be designed stateless and should not use any locking of any kind
in the AggregationAccessor
implementation unless your implementation uses other state.
Since the engine passes an aggregation state instance to the accessor it is thread-safe as long as it relies only on the aggregation state passed to it.
The engine does not share an AggregationState
instance between threads. There is no need to use locking of any kind
in the AggregationState
implementation unless your implementation uses other state.
Tables allow columns to hold aggregation state including the state for multi-function aggregations. This section provides API pointers.
When an EPL statement accesses a table column that declares aggregation state of a multi-function aggregation, the
PlugInAggregationMultiFunctionValidationContext
contains an optionalTableColumnAccessed
field that provides information
about the table column.
To find out the statement type, such as to determine whether the current statement is a create-table
statement, use
context.getValidationContext().getExprEvaluatorContext().getStatementType()
.
To find out whether the statement aggregates into a table, use context.getValidationContext().getIntoTableName()
that returns the table name or null if not aggregating into a table.
The engine uses AggregationStateKey
to determine whether an aggregation function listed with into table
is compatible with the aggregation type
that a table column declares. The equals
method of the object must return true for compatible and false for incompatible.
The filter
expression is passed to you in PlugInAggregationMultiFunctionValidationContext
as part of getNamedParameters
under the name filter
. When use with tables the filter expression is part of PlugInAggregationMultiFunctionAgentContext
.
Your application must invoke the filter expression as the engine does not evaluate the filter expression for you. For example:
ExprEvaluator filterEval = validationContext.getNamedParameters().get("filter").get(0).getExprEvaluator();
public void applyEnter(EventBean[] eventsPerStream, ExprEvaluatorContext exprEvaluatorContext) { Boolean pass = (Boolean) filterEval.evaluate(eventsPerStream, true, exprEvaluatorContext); // note: pass "false" for applyLeave if (pass != null && pass) { Object value = valueEval.evaluate(eventsPerStream, true, exprEvaluatorContext); // note: pass "false" for applyLeave // do something } }
Pattern guards are pattern objects that control the lifecycle of the guarded sub-expression, and can filter the events fired by the subexpression.
The following steps are required to develop and use a custom guard object with Esper.
Implement a guard factory class, responsible for creating guard object instances.
Implement a guard class.
Register the guard factory class with the engine by supplying a namespace and name, via the engine configuration file or the configuration API.
The code for the example guard object as shown in this chapter can be found in the test source folder in the package com.espertech.esper.regression.client
by the name MyCountToPatternGuardFactory
. The sample guard discussed here counts the number of events occurring up to a maximum number of events, and end the sub-expression when that maximum is reached.
A guard factory class is responsible for the following functions:
Implement a setGuardParameters
method that takes guard parameters, which are themselves expressions.
Implement a makeGuard
method that constructs a new guard instance.
Guard factory classes subclass com.espertech.esper.pattern.guard.GuardFactorySupport
:
public class MyCountToPatternGuardFactory extends GuardFactorySupport { ...
The engine constructs one instance of the guard factory class for each time the guard is listed in a statement.
The guard factory class implements the setGuardParameters
method that is passed the parameters to the guard as supplied by the statement. It verifies the guard parameters, similar to the code snippet shown next. Our example counter guard takes a single numeric parameter:
public void setGuardParameters(List<ExprNode> guardParameters, MatchedEventConvertor convertor) throws GuardParameterException { String message = "Count-to guard takes a single integer-value expression as parameter"; if (guardParameters.size() != 1) { throw new GuardParameterException(message); } if (guardParameters.get(0).getExprEvaluator().getType() != Integer.class) { throw new GuardParameterException(message); } this.numCountToExpr = guardParameters.get(0); this.convertor = convertor; }
The makeGuard
method is called by the engine to create a new guard instance. The example makeGuard
method shown below passes the maximum count of events to the guard instance. It also passes a Quitable
implementation to the guard instance. The guard uses Quitable
to indicate that the sub-expression contained within must stop (quit) listening for events.
public Guard makeGuard(PatternAgentInstanceContext context, MatchedEventMap beginState, Quitable quitable, Object stateNodeId, Object guardState) { Object parameter = PatternExpressionUtil.evaluate("Count-to guard", beginState, numCountToExpr, convertor); if (parameter == null) { throw new EPException("Count-to guard parameter evaluated to a null value"); } Integer numCountTo = (Integer) parameter; return new MyCountToPatternGuard(numCountTo, quitable); }
A guard class has the following responsibilities:
Provides a startGuard
method that initalizes the guard.
Provides a stopGuard
method that stops the guard, called by the engine when the whole pattern is stopped, or the sub-expression containing the guard is stopped.
Provides an inspect
method that the pattern engine invokes to determine if the guard lets matching events pass for further evaluation by the containing expression.
Guard classes subclass com.espertech.esper.pattern.guard.GuardSupport
as shown here:
public abstract class GuardSupport implements Guard { ...
The engine invokes the guard factory class to construct an instance of the guard class for each new sub-expression instance within a statement.
A guard class must provide an implementation of the startGuard
method that the pattern engine invokes to start a guard instance. In our example, the method resets the guard's counter to zero:
public void startGuard() { counter = 0; }
The pattern engine invokes the inspect
method for each time the sub-expression indicates a new event result. Our example guard needs to count the number of events matched, and quit if the maximum number is reached:
public boolean inspect(MatchedEventMap matchEvent) { counter++; if (counter > numCountTo) { quitable.guardQuit(); return false; } return true; }
The inspect
method returns true for events that pass the guard, and false for events that should not pass the guard.
The guard factory class name as well as the namespace and name for the new guard must be added to the engine configuration via the configuration API or using the XML configuration file. The configuration shown below is XML however the same options are available through the configuration API:
<esper-configuration <plugin-pattern-guard namespace="myplugin" name="count_to" factory-class="com.espertech.esper.regression.client.MyCountToPatternGuardFactory"/> </esper-configuration>
The new guard is now ready to use in a statement. The next pattern statement detects the first 10 MyEvent events:
select * from pattern [(every MyEvent) where myplugin:count_to(10)]
Note that the every
keyword was placed within parentheses to ensure the guard controls the repeated matching of events.
Pattern observers are pattern objects that are executed as part of a pattern expression and can observe events or test conditions. Examples for built-in observers are timer:at
and timer:interval
. Some suggested uses of observer objects are:
Implement custom scheduling logic using the engine's own scheduling and timer services
Test conditions related to prior events matching an expression
The following steps are required to develop and use a custom observer object within pattern statements:
Implement an observer factory class, responsible for creating observer object instances.
Implement an observer class.
Register an observer factory class with the engine by supplying a namespace and name, via the engine configuration file or the configuration API.
The code for the example observer object as shown in this chapter can be found in the test source folder in package com.espertech.esper.regression.client
by the name MyFileExistsObserver
. The sample observer discussed here very simply checks if a file exists, using the filename supplied by the pattern statement, and via the java.io.File
class.
An observer factory class is responsible for the following functions:
Implement a setObserverParameters
method that takes observer parameters, which are themselves expressions.
Implement a makeObserver
method that constructs a new observer instance.
Observer factory classes subclass com.espertech.esper.pattern.observer.ObserverFactorySupport
:
public class MyFileExistsObserverFactory extends ObserverFactorySupport { ...
The engine constructs one instance of the observer factory class for each time the observer is listed in a statement.
The observer factory class implements the setObserverParameters
method that is passed the parameters to the observer as supplied by the statement. It verifies the observer parameters, similar to the code snippet shown next. Our example file-exists observer takes a single string parameter:
public void setObserverParameters(List<ExprNode> expressionParameters, MatchedEventConvertor convertor, ExprValidationContext validationContext) throws ObserverParameterException { String message = "File exists observer takes a single string filename parameter"; if (expressionParameters.size() != 1) { throw new ObserverParameterException(message); } if (!(expressionParameters.get(0).getExprEvaluator().getType() == String.class)) { throw new ObserverParameterException(message); } this.filenameExpression = expressionParameters.get(0); this.convertor = convertor; }
The pattern engine calls the makeObserver
method to create a new observer instance. The example makeObserver
method shown below passes parameters to the observer instance:
public EventObserver makeObserver(PatternAgentInstanceContext context, MatchedEventMap beginState, ObserverEventEvaluator observerEventEvaluator, Object stateNodeId, Object observerState) { Object filename = PatternExpressionUtil.evaluate("File-exists observer ", beginState, filenameExpression, convertor); if (filename == null) { throw new EPException("Filename evaluated to null"); } return new MyFileExistsObserver(beginState, observerEventEvaluator, filename.toString()); }
The ObserverEventEvaluator
parameter allows an observer to indicate events, and to indicate change of truth value to permanently false. Use this interface to indicate when your observer has received or witnessed an event, or changed it's truth value to true or permanently false.
The MatchedEventMap
parameter provides a Map of all matching events for the expression prior to the observer's start. For example, consider a pattern as below:
a=MyEvent -> myplugin:my_observer(...)
The above pattern tagged the MyEvent instance with the tag "a". The pattern engine starts an instance of my_observer
when it receives the first MyEvent. The observer can query the MatchedEventMap
using "a" as a key and obtain the tagged event.
An observer class has the following responsibilities:
Provides a startObserve
method that starts the observer.
Provides a stopObserve
method that stops the observer, called by the engine when the whole pattern is stopped, or the sub-expression containing the observer is stopped.
Observer classes subclass com.espertech.esper.pattern.observer.ObserverSupport
as shown here:
public class MyFileExistsObserver implements EventObserver { ...
The engine invokes the observer factory class to construct an instance of the observer class for each new sub-expression instance within a statement.
An observer class must provide an implementation of the startObserve
method that the pattern engine invokes to start an observer instance. In our example, the observer checks for the presence of a file and indicates the truth value to the remainder of the expression:
public void startObserve() { File file = new File(filename); if (file.exists()) { observerEventEvaluator.observerEvaluateTrue(beginState); } else { observerEventEvaluator.observerEvaluateFalse(); } }
Note the observer passes the ObserverEventEvaluator
an instance of MatchedEventMap
. The observer can also create one or more new events and pass these events through the Map to the remaining expressions in the pattern.
The observer factory class name as well as the namespace and name for the new observer must be added to the engine configuration via the configuration API or using the XML configuration file. The configuration shown below is XML however the same options are available through the configuration API:
<esper-configuration <plugin-pattern-observer namespace="myplugin" name="file_exists" factory-class="com.espertech.esper.regression.client.MyFileExistsObserverFactory" /> </esper-configuration>
The new observer is now ready to use in a statement. The next pattern statement checks every 10 seconds if the given file exists, and indicates to the listener when the file is found.
select * from pattern [every timer:interval(10 sec) -> myplugin:file_exists("myfile.txt")]
Creating a plug-in event representation can be useful under any of these conditions:
Your application has existing Java classes that carry event metadata and event property values and your application does not want to (or cannot) extract or transform such event metadata and event data into one of the built-in event representations (POJO Java objects, Map or XML DOM).
Your application wants to provide a faster or short-cut access path to event data, for example to access XML event data through a Streaming API for XML (StAX).
Your application must perform a network lookup or other dynamic resolution of event type and events.
Note that the classes to plug-in custom event representations are held stable between minor releases, but can be subject to change between major releases.
Currently, EsperIO provides the following additional event representations:
Apache Axiom provides access to XML event data on top of the fast Streaming API for XML (StAX).
The source code is available for these and they are therefore excellent examples for how to implement a plug-in event representation. Please see the EsperIO documentation for usage details.
Your application provides a plug-in event representation as an implementation of the com.espertech.esper.plugin.PlugInEventRepresentation
interface. It registers the implementation class in the
Configuration
and at the same time provides a unique URI. This URI is called the root event representation URI. An example value for a root URI is type://xml/apacheaxiom/OMNode
.
One can register multiple plug-in event representations. Each representation has a root URI. The root URI serves to divide the overall space of different event representations and plays a role in resolving event types and event objects.
There are two situations in an Esper engine instance asks an event representation for an event type:
When an application registers a new event type using the method addPlugInEventType
on ConfigurationOperations
, either at runtime or at configuration time.
When an EPL statement is created with a new event type name (a name not seen before) and the URIs for resolving such names are set beforehand via setPlugInEventTypeNameResolutionURIs
on ConfigurationOperations
.
The implementation of the PlugInEventRepresentation
interface must provide implementations for two key interfaces: com.espertech.esper.client.EventType
and EventBean
. It must also implement several other related interfaces as described below.
The EventType
methods provide event metadata including property names and property types. They also provide instances of EventPropertyGetter
for retrieving event property values. Each instance of EventType
represents a distinct type of event.
The EventBean
implementation is the event itself and encapsulates the underlying event object.
Follow the steps outlined below to process event objects for your event types:
Implement the EventType
, EventPropertyGetter
and EventBean
interfaces.
Implement the PlugInEventRepresentation
interface, the PlugInEventTypeHandler
and PlugInEventBeanFactory
interfaces, then add the PlugInEventRepresentation
class name to configuration.
Register plug-in event types, and/or set the event type name resolution URIs, via configuration.
Obtain an EventSender
from EPRuntime
via the getEventSender(URI[])
method and use that to send in your event objects.
Please consult the JavaDoc for further information on each of the interfaces and their respective methods. The Apache Axiom event representation is an example implementation that can be found in the EsperIO packages.
Assume you have registered event representations using the following URIs:
type://myFormat/myProject/myName
type://myFormat/myProject
type://myFormat/myOtherProject
When providing an array of child URIs for resolution, the engine compares each child URI to each of the event representation root URIs, in the order provided. Any event representation root URIs that spans the child URI space becomes a candidate event representation. If multiple root URIs match, the order is defined by the more specific root URI first, to the least specific root URI last.
During event type resolution and event sender resolution you provide a child URI. Assuming the child URI provided is type://myFormat/myProject/myName/myEvent?param1=abc¶m2=true
. In this example both root URIs #1 (the more specific) and #1 (the less specific) match, while root URI #3 is not a match. Thus at the time of type resolution the engine invokes the acceptType
method on event presentation for URI #1 first (the more specific), before asking #2 (the less specific) to resolve the type.
The EventSender
returned by the getEventSender(URI[])
method follows the same scheme. The event sender instance asks each matching event representation for each URI to resolve the event object in the order of most specific to least specific root URI, and the first event representation to return an instance of EventBean
ends the resolution process for event objects.
The type://
part of the URI is an optional convention for the scheme part of an URI that your application may follow. URIs can also be simple names and can include parameters, as the Java software JavaDoc documents for class java.net.URI
.
This section implements a minimal sample plug-in event representation. For the sake of keeping the example easy to understand, the event representation is rather straightforward: an event is a java.util.Properties
object that consists of key-values pairs of type string.
The code shown next does not document method footprints. Please consult the JavaDoc API documentation for method details.
First, the sample shows how to implement the EventType
interface. The event type provides information about property names and types, as well as supertypes of the event type.
Our EventType
takes a set of valid property names:
public class MyPlugInPropertiesEventType implements EventType { private final Set<String> properties; public MyPlugInPropertiesEventType(Set<String> properties) { this.properties = properties; } public Class getPropertyType(String property) { if (!isProperty(property)) { return null; } return String.class; } public Class getUnderlyingType() { return Properties.class; } //... further methods below }
An EventType
is responsible for providing implementations of EventPropertyGetter
to query actual events. The getter simply
queries the Properties
object underlying each event:
public EventPropertyGetter getGetter(String property) { final String propertyName = property; return new EventPropertyGetter() { public Object get(EventBean eventBean) throws PropertyAccessException { MyPlugInPropertiesEventBean propBean = (MyPlugInPropertiesEventBean) eventBean; return propBean.getProperties().getProperty(propertyName); } public boolean isExistsProperty(EventBean eventBean) { MyPlugInPropertiesEventBean propBean = (MyPlugInPropertiesEventBean) eventBean; return propBean.getProperties().getProperty(propertyName) != null; } public Object getFragment(EventBean eventBean) { return null; // The property is not a fragment } }; }
Our sample EventType
does not have supertypes. Supertypes represent an extends-relationship between event types, and subtypes are expected to exhibit the same event property names and types as each of their supertypes combined:
public EventType[] getSuperTypes() { return null; // no supertype for this example } public Iterator<EventType> getDeepSuperTypes() { return null; } public String getName() { return name; } public EventPropertyDescriptor[] getPropertyDescriptors() { Collection<EventPropertyDescriptor> descriptorColl = descriptors.values(); return descriptorColl.toArray(new EventPropertyDescriptor[descriptors.size()]); } public EventPropertyDescriptor getPropertyDescriptor(String propertyName) { return descriptors.get(propertyName); } public FragmentEventType getFragmentType(String property) { return null; // sample does not provide any fragments }
The example event type as above does not provide fragments, which are properties of the event that can themselves be represented as an event, to keep the example simple.
Each EventBean
instance represents an event. The interface is straightforward to implement. In this example an event is backed by a Properties
object:
public class MyPlugInPropertiesEventBean implements EventBean { private final MyPlugInPropertiesEventType eventType; private final Properties properties; public MyPlugInPropertiesEventBean(MyPlugInPropertiesEventType eventType, Properties properties) { this.eventType = eventType; this.properties = properties; } public EventType getEventType() { return eventType; } public Object get(String property) throws PropertyAccessException { EventPropertyGetter getter = eventType.getGetter(property); return getter.get(this); } public Object getFragment(String property) { EventPropertyGetter getter = eventType.getGetter(property); if (getter != null) { return getter.getFragment(this); } return null; } public Object getUnderlying() { return properties; } protected Properties getProperties() { return properties; } }
A PlugInEventRepresentation
serves to create EventType
and EventBean
instances through its related interfaces.
The sample event representation creates MyPlugInPropertiesEventType
and MyPlugInPropertiesEventBean
instances.
The PlugInEventTypeHandler
returns the EventType
instance and an EventSender
instance.
Our sample event representation accepts all requests for event types by returning boolean true on the acceptType
method. When asked for the PlugInEventTypeHandler
, it constructs a new EventType
. The list of property names for the new type is passed as an initialization value provided through the configuration API or XML, as a comma-separated list of property names:
public class MyPlugInEventRepresentation implements PlugInEventRepresentation { private List<MyPlugInPropertiesEventType> types; public void init(PlugInEventRepresentationContext context) { types = new ArrayList<MyPlugInPropertiesEventType>(); } public boolean acceptsType(PlugInEventTypeHandlerContext context) { return true; } public PlugInEventTypeHandler getTypeHandler(PlugInEventTypeHandlerContext eventTypeContext) { String proplist = (String) eventTypeContext.getTypeInitializer(); String[] propertyList = proplist.split(","); Set<String> typeProps = new HashSet<String>(Arrays.asList(propertyList)); MyPlugInPropertiesEventType eventType = new MyPlugInPropertiesEventType(typeProps); types.add(eventType); return new MyPlugInPropertiesEventTypeHandler(eventType); } // ... more methods below }
The PlugInEventTypeHandler
simply returns the EventType
as well as an implementation of EventSender
for processing same-type events:
public class MyPlugInPropertiesEventTypeHandler implements PlugInEventTypeHandler { private final MyPlugInPropertiesEventType eventType; public MyPlugInPropertiesEventTypeHandler(MyPlugInPropertiesEventType eventType) { this.eventType = eventType; } public EventSender getSender(EPRuntimeEventSender runtimeEventSender) { return new MyPlugInPropertiesEventSender(eventType, runtimeEventSender); } public EventType getType() { return eventType; } }
The EventSender
returned by PlugInEventTypeHandler
is expected process events of the same type or any subtype:
public class MyPlugInPropertiesEventSender implements EventSender { private final MyPlugInPropertiesEventType type; private final EPRuntimeEventSender runtimeSender; public MyPlugInPropertiesEventSender(MyPlugInPropertiesEventType type, EPRuntimeEventSender runtimeSender) { this.type = type; this.runtimeSender = runtimeSender; } public void sendEvent(Object event) { if (!(event instanceof Properties)) { throw new EPException("Sender expects a properties event"); } EventBean eventBean = new MyPlugInPropertiesEventBean(type, (Properties) event); runtimeSender.processWrappedEvent(eventBean); } }
The plug-in event representation may optionally provide an implementation of PlugInEventBeanFactory
. A PlugInEventBeanFactory
may inspect event objects and assign an event type dynamically based on resolution URIs and event properties.
Our sample event representation accepts all URIs and returns a MyPlugInPropertiesBeanFactory
:
public class MyPlugInEventRepresentation implements PlugInEventRepresentation { // ... methods as seen earlier public boolean acceptsEventBeanResolution( PlugInEventBeanReflectorContext eventBeanContext) { return true; } public PlugInEventBeanFactory getEventBeanFactory( PlugInEventBeanReflectorContext eventBeanContext) { return new MyPlugInPropertiesBeanFactory(types); } }
Last, the sample MyPlugInPropertiesBeanFactory
implements the PlugInEventBeanFactory
interface. It inspects incoming events and determines
an event type based on whether all properties for that event type are present:
public class MyPlugInPropertiesBeanFactory implements PlugInEventBeanFactory { private final List<MyPlugInPropertiesEventType> knownTypes; public MyPlugInPropertiesBeanFactory(List<MyPlugInPropertiesEventType> types) { knownTypes = types; } public EventBean create(Object event, URI resolutionURI) { Properties properties = (Properties) event; // use the known types to determine the type of the object for (MyPlugInPropertiesEventType type : knownTypes) { // if there is one property the event does not contain, then its not the right type boolean hasAllProperties = true; for (String prop : type.getPropertyNames()) { if (!properties.containsKey(prop)) { hasAllProperties = false; break; } } if (hasAllProperties) { return new MyPlugInPropertiesEventBean(type, properties); } } return null; // none match, unknown event } }