www.espertech.comDocumentation
For information on calling external services via instance method invocation, for instance to integrate with dependency injection frameworks such as Spring or Guice, please see Section 5.17.5, “Class and Event-Type Variables”.
For information on input and output adapters that connect to an event transport and perform event transformation for incoming and outgoing on-the-wire event data, for use with streaming data, please see the EsperIO reference documentation. The data flow instances as described in Chapter 15, EPL Reference: Data Flow are an easy way to plug in operators that perform input and output. Data flows allow providing parameters and managing individual flows independent of engine lifecycle. Also consider using the Plug-in Loader API for creating a new adapter that starts or stops as part of the CEP engine initialization and destroy lifecycle, see Section 16.16, “Plug-in Loader”.
To join data that resides in a relational database and that is accessible via JDBC driver and SQL statement the engine offers a syntax for using SQL within EPL, see Section 5.13, “Accessing Relational Data via SQL”. A relational database input and output adapter for streaming input from and output to a relational database also exists (EsperIO).
To join data that resides in a non-relational store the engine offers a two means: First, the virtual data window, as described below, for transparently integrating the external store as a named window. The second mechanism is a special join syntax based on static method invocation, see Section 5.14, “Accessing Non-Relational Data via Method, Script or UDF Invocation”.
The best way to test that your extension code works correctly is to write unit tests against an EPL statement that utilizes the extension code. Samples can be obtained from Esper regression test code base.
For all extension code and similar to listeners and subscribers, to send events into the engine from extension code the route
method should be used (and not sendEvent
) to avoid the possibility of stack overflow due to event-callback looping and ensure correct processing of the current and routed event.
Note that if outbound-threading is enabled, listeners and subscribers should use sendEvent
and not route
.
For all extension code it is not safe to administrate the engine within the extension code. For example, it is not safe to implement a data window view that creates a new statement or destroys an existing statement.
The following steps are required to develop and use a virtual data window:
Once you have completed above steps, the virtual data window is ready to use in EPL statements.
Configuration config = new Configuration(); config.addPlugInVirtualDataWindow("sample", "samplevdw", SampleVirtualDataWindowFactory.class.getName());
Your application may then create a named window backed by a virtual data window.
For example, assume that the SampleEvent
event type is declared as follows:
create schema SampleEvent as (key1 string, key2 string, value1 int, value2 double)
create window MySampleWindow.sample:samplevdw() as SampleEvent
Your application may obtain a reference to the virtual data window from the engine context.
This code snippet looks up the virtual data window by the named window name:
try { return (VirtualDataWindow) epService.getContext().lookup("/virtualdw/MySampleWindow"); } catch (NamingException e) { throw new RuntimeException("Failed to look up virtual data window, is it created yet?"); }
select (select * from MySampleWindow where key1 = 'A1') from OtherEvent
As a second example, consider an EPL join statement as follows:
select * from MySampleWindow, MyTriggerEvent where key1 = trigger1 and key2 = trigger2
As a last example, consider an EPL fire-and-forget statement as follows:
select * from MySampleWindow key1 = 'A2' and value1 between 0 and 1000
A virtual data window factory class is responsible for the following functions:
The sample code shown here can be found among the examples in the distribution under virtualdw
:
public class SampleVirtualDataWindowFactory implements VirtualDataWindowFactory { public void initialize(VirtualDataWindowFactoryContext factoryContext) { // Can add initialization logic here. } public VirtualDataWindow create(VirtualDataWindowContext context) { // This example allocates a new virtual data window (one per context partitions if using contexts). // For sharing the virtual data window instance between context partitions, return the same reference. return new SampleVirtualDataWindow(context); } public void destroyAllContextPartitions() { // Release shared resources here } }
The VirtualDataWindowContext
provides to your application:
String namedWindowName; // Name of named window being created. Object[] parameters; // Any optional parameters provided as part of create-window. EventType eventType; // The event type of events. EventBeanFactory eventFactory; // A factory for creating EventBean instances from store rows. VirtualDataWindowOutStream outputStream; // For stream output to consuming statements. AgentInstanceContext agentInstanceContext; // Other EPL statement information in statement context.
A virtual data window implementation is responsible for the following functions:
The sample code shown here can be found among the examples in the distribution under virtualdw
.
The implementation class must implement the VirtualDataWindow
interface like so:
public class SampleVirtualDataWindow implements VirtualDataWindow { private final VirtualDataWindowContext context; public SampleVirtualDataWindow(VirtualDataWindowContext context) { this.context = context; } ...
public VirtualDataWindowLookup getLookup(VirtualDataWindowLookupContext desc) { // Place any code that interrogates the hash-index and btree-index fields here. // Return the lookup strategy. return new SampleVirtualDataWindowLookup(context); }
public void update(EventBean[] newData, EventBean[] oldData) { // This sample simply posts into the insert and remove stream what is received. context.getOutputStream().update(newData, oldData); }
A lookup implementation is responsible for the following functions:
The sample code shown here can be found among the examples in the distribution under virtualdw
.
The implementation class must implement the VirtualDataWindowLookup
interface:
public class SampleVirtualDataWindowLookup implements VirtualDataWindowLookup { private final VirtualDataWindowContext context; public SampleVirtualDataWindowLookup(VirtualDataWindowContext context) { this.context = context; } ...
public Set<EventBean> lookup(Object[] lookupValues) { // Add code to interogate lookup values here. // Create sample event. // This example uses Map events; Other underlying events such as POJO are exactly the same code. Map<String, Object> eventData = new HashMap<String, Object>(); eventData.put("key1", "sample1"); eventData.put("key2", "sample2"); eventData.put("value1", 100); eventData.put("value2", 1.5d); EventBean event = context.getEventFactory().wrap(eventData); return Collections.singleton(event); }
When a key value is a range, the key value is an instance of VirtualDataWindowKeyRange
.
Single-row functions return a single value. They are not expected to aggregate rows but instead should be stateless functions. These functions can appear in any expressions and can be passed any number of parameters.
The following steps are required to develop and use a custom single-row function with Esper.
Implement a class providing one or more public static methods accepting the number and type of parameters as required.
Register the single-row function class and method name with the engine by supplying a function name, via the engine configuration file or the configuration API.
You may not override a built-in function with a single-row function provided by you. The single-row function you register must have a different name then any of the built-in functions.
An example single-row function can also be found in the examples under the runtime configuration example.
Single-row function classes have no further requirement then provide a public static method.
public class MyUtilityClass { public static double computePercent(double amount, double total) { return amount / total * 100; } }
<esper-configuration <plugin-singlerow-function name="percent" function-class="mycompany.MyUtilityClass" function-method="computePercent" /> </esper-configuration>
Note that the function name and method name need not be the same.
The new single-row function is now ready to use in a statement:
select percent(fulfilled,total) from MyEvent
select percent(*) from MyEvent
select calculator().add(5, amount) from MyEvent
The following configuration XML enables the value cache for the single-row function:
<esper-configuration <plugin-singlerow-function name="getDate" function-class="mycompany.DateUtil" function-method="parseDate" value-cache="enabled" /> </esper-configuration>
select getDate('2002-05-30T9:00:00.000') from MyEvent
For example, the EPL below uses the function computeHash
as part of a predicate expression:
select * from MyEvent(computeHash(field) = 100)
<esper-configuration <plugin-singlerow-function name="computeHash" function-class="mycompany.HashUtil" function-method="computeHash" filter-optimizable="disabled" /> </esper-configuration>
public static double doCompute(EventBean eventBean) {...} public static boolean doCheck(MyEvent myEvent, String text) {...} public static String doSearch(Collection<EventBean> events) {...}
To pass the event, specify the stream alias, or wildcard (*)
or the tag name when used in a pattern.
The EPL below shows example uses:
select * from MyEvent(doCompute(me) = 100) as me
select * from MyEvent where doCompute(*) = 100
select * from pattern[a=MyEvent -> MyEvent(doCheck(a, 'sometext'))]
select * from MyEvent#time(1 min) having doCompute(last(*))]
select * from MyEvent#time(1 min) having doSearch(window(*))]
public static EventBean[] myItemProducer(String string, EPLMethodInvocationContext context) { String[] split = string.split(","); EventBean[] events = new EventBean[split.length]; for (int i = 0; i < split.length; i++) { events[i] = context.getEventBeanService().adapterForMap(Collections.singletonMap("id", split[i]), "MyItem"); } return events; }
The sample EPL queries items filtering those items that have a given value for the id
field:
select myItemProducer(ordertext).where(v => v.id in ('id1', 'id3')) as c0 from Order
ConfigurationPlugInSingleRowFunction entry = new ConfigurationPlugInSingleRowFunction(); entry.setName("myItemProducer"); entry.setFunctionClassName(...); entry.setFunctionMethodName(...); entry.setEventTypeName("MyItem"); epService.getEPAdministrator().getConfiguration().addPlugInSingleRowFunction(entry);
A sample method footprint and EPL are shown below:
public static double computeSomething(double number, EPLMethodInvocationContext context) {...}
select computeSomething(10) from MyEvent
Views in Esper are used to derive information from an event stream, and to represent data windows onto an event stream. This chapter describes how to plug-in a new, custom view.
The following steps are required to develop and use a custom view with Esper.
Implement a view factory class. View factories are classes that accept and check view parameters and instantiate the appropriate view class.
Implement a view class. A view class commonly represents a data window or derives new information from a stream.
Configure the view factory class supplying a view namespace and name in the engine configuration file.
The example view factory and view class that are used in this chapter can be found in the examples source folder in the OHLC (open-high-low-close) example. The class names are OHLCBarPlugInViewFactory
and OHLCBarPlugInView
.
Views can make use of the following engine services available via StatementServiceContext
:
The SchedulingService
interface allows views to schedule timer callbacks to a view
The EventAdapterService
interface allows views to create new event types and event instances of a given type.
The StatementStopService
interface allows view to register a callback that the engine invokes to indicate that the view's statement has been stopped
Section 19.4.3, “View Contract” outlines the requirements for correct behavior of a your custom view within the engine.
Note that custom views may use engine services and APIs that can be subject to change between major releases. The engine services discussed above and view APIs are considered part of the engine internal public API and are stable. Any changes to such APIs are disclosed through the release change logs and history. Please also consider contributing your custom view to the Esper project team by submitting the view code through the mailing list or via a JIRA issue.
A view factory class is responsible for the following functions:
View factory classes simply subclass com.espertech.esper.view.ViewFactorySupport
:
public class OHLCBarPlugInViewFactory extends ViewFactorySupport { ...
public class OHLCBarPlugInViewFactory extends ViewFactorySupport { private ViewFactoryContext viewFactoryContext; private List<ExprNode> viewParameters; private ExprNode timestampExpression; private ExprNode valueExpression; public void setViewParameters(ViewFactoryContext viewFactoryContext, List<ExprNode> viewParameters) throws ViewParameterException { this.viewFactoryContext = viewFactoryContext; if (viewParameters.size() != 2) { throw new ViewParameterException( "View requires a two parameters: " + "the expression returning timestamps and the expression supplying OHLC data points"); } this.viewParameters = viewParameters; } ...
public void attach(EventType parentEventType, StatementContext statementContext, ViewFactory optionalParentFactory, List<ViewFactory> parentViewFactories) throws ViewParameterException { ExprNode[] validatedNodes = ViewFactorySupport.validate("OHLC view", parentEventType, statementContext, viewParameters, false); timestampExpression = validatedNodes[0]; valueExpression = validatedNodes[1]; if ((timestampExpression.getExprEvaluator().getType() != long.class) && (timestampExpression.getExprEvaluator().getType() != Long.class)) { throw new ViewParameterException( "View requires long-typed timestamp values in parameter 1"); } if ((valueExpression.getExprEvaluator().getType() != double.class) && (valueExpression.getExprEvaluator().getType() != Double.class)) { throw new ViewParameterException( "View requires double-typed values for in parameter 2"); } }
public View makeView(AgentInstanceViewFactoryChainContext agentInstanceViewFactoryContext) { return new OHLCBarPlugInView(agentInstanceViewFactoryContext, timestampExpression, valueExpression); } public EventType getEventType() { return OHLCBarPlugInView.getEventType(viewFactoryContext.getEventAdapterService()); }
A view class is responsible for:
View classes simply subclass com.espertech.esper.view.ViewSupport
:
public class MyTrendSpotterView extends ViewSupport { ...
A sample update
method implementation is provided in the OHLC example.
<esper-configuration <plugin-view namespace="custom" name="ohlc" factory-class="com.espertech.esper.example.ohlc.OHLCBarPlugInViewFactory" /> </esper-configuration>
The new view is now ready to use in a statement:
select * from StockTick.custom:ohlc(timestamp, price)
Aggregation functions are stateful functions that aggregate events, event property values or expression results. Examples for built-in aggregation functions are count(*)
, sum(price * volume)
, window(*)
or maxby(volume)
.
Esper allows two different ways for your application to provide aggregation functions. We use the name aggregation single-function and aggregation multi-function for the two independent extension APIs for aggregation functions.
The aggregation single-function API is simple to use however it imposes certain restrictions on how expressions that contain aggregation functions share state and are evaluated.
The aggregation multi-function API is more powerful and provides control over how expressions that contain aggregation functions share state and are evaluated.
The next table compares the two aggregation function extension API's:
Table 19.1. Aggregation Function Extension API's
Single-Function | Multi-Function | |
---|---|---|
Return Value | Can only return a single value or object. Cannot return an EventBean event, collection of EventBean events or collection or array of values for use with enumeration methods, for example. | Can return an EventBean event, a collection of EventBean events or a collection or array of objects for use with enumeration methods or to access event properties. |
Complexity of API | Simple (consists of 2 interfaces). | More complex (consists of 6 interfaces). |
State Sharing | State and parameter evaluation shared if multiple aggregation functions of the same name in the same statement (and context partition) take the exact same parameter expressions. | State and parameter evaluation sharable when multiple aggregation functions of a related name (related thru configuration) for the same statement (and context partition) exist, according to a sharing-key provided by your API implementation. |
Function Name | Each aggregation function expression receives its own factory object. | Multiple related aggregation function expressions share a single factory object. |
Distinct Keyword | Handled by the engine transparently. | Indicated to the API implementation only. |
The following sections discuss developing an aggregation single-function first, followed by the subject of developing an aggregation multi-function.
The aggregation multi-function API is a powerful and lower-level API to extend the engine. Any classes that are not part of the client
, plugin
or agg.access
package are subject to change between minor and major releases of the engine.
The following steps are required to develop and use a custom aggregation single-function with Esper.
Custom aggregation functions can also be passed multiple parameters, as further described in Section 19.5.1.4, “Aggregation Single-Function: Accepting Multiple Parameters”. In the example below the aggregation function accepts a single parameter.
The code for the example aggregation function as shown in this chapter can be found in the runtime configuration example in the package com.espertech.esper.example.runtimeconfig
by the name MyConcatAggregationFunction
. The sample function simply concatenates string-type values.
An aggregation function factory class is responsible for the following functions:
public class MyConcatAggregationFunctionFactory implements AggregationFunctionFactory { ...
The sample concatenation function factory provides an empty setFunctionName
method:
public void setFunctionName(String functionName) { // no action taken }
Since the example concatenation function requires string types, it implements a type check:
public void validate(AggregationValidationContext validationContext) { if ((validationContext.getParameterTypes().length != 1) || (validationContext.getParameterTypes()[0] != String.class)) { throw new IllegalArgumentException("Concat aggregation requires a single parameter of type String"); } }
public Class getValueType() { return String.class; }
public AggregationMethod newAggregator() { return new MyConcatAggregationFunction(); }
An aggregation function class is responsible for the following functions:
Aggregation function classes implement the interface AggregationMethod
:
public class MyConcatAggregationFunction implements AggregationMethod { ...
The constructor initializes the aggregation function:
public class MyConcatAggregationFunction implements AggregationMethod { private final static char DELIMITER = ' '; private StringBuilder builder; private String delimiter; public MyConcatAggregationFunction() { builder = new StringBuilder(); delimiter = ""; } ...
public void enter(Object value) { if (value != null) { builder.append(delimiter); builder.append(value.toString()); delimiter = String.valueOf(DELIMITER); } }
public void leave(Object value) { if (value != null) { builder.delete(0, value.toString().length() + 1); } }
Finally, the engine obtains the current aggregation value by means of the getValue
method:
public Object getValue() { return builder.toString(); }
public void clear() { builder = new StringBuilder(); delimiter = ""; }
<esper-configuration <plugin-aggregation-function name="concat" factory-class="com.espertech.esper.example.runtimeconfig.MyConcatAggregationFunctionFactory" /> </esper-configuration>
The new aggregation function is now ready to use in a statement:
select concat(symbol) from StockTick#length(3)
select rangeCount(1, 10, myValue) from MyEvent
The enter
method of the plug-in aggregation function may look as follows:
public void enter(Object value) { Object[] params = (Object[]) value; int lower = (Integer) params[0]; int upper = (Integer) params[1]; int val = (Integer) params[2]; if ((val >= lower) && (val <= upper)) { count++; } }
select concat(word, filter: word not like '%jim%') from MyWordEvent
The enter
method of the plug-in aggregation function may look as follows:
public void enter(Object value) { Object[] arr = (Object[]) value; Boolean pass = (Boolean) arr[1]; if (pass != null && pass) { buffer.append(arr[0].toString()); } }
@Name('CycleDetector') select cycleoutput() as cyclevertices from TransactionEvent#length(1000) having cycledetected(fromAcct, toAcct)
@Name('CycleDetector') select (select cycleoutput(fromAcct, toAcct) from TransactionEvent#length(1000)) as cyclevertices from pattern [every timer:interval(1)]
The following steps are required to develop and use a custom aggregation multi-function with Esper.
An aggregation multi-function factory class is responsible for the following functions:
public class CycleDetectorAggregationFactory implements PlugInAggregationMultiFunctionFactory { ...
The sample Cycle-Detect factory class provides an empty addAggregationFunction
method:
public void addAggregationFunction(PlugInAggregationMultiFunctionDeclarationContext declarationContext) { // no action taken }
public PlugInAggregationMultiFunctionHandler validateGetHandler(PlugInAggregationMultiFunctionValidationContext validationContext) { if (validationContext.getParameterExpressions().length == 2) { fromExpression = validationContext.getParameterExpressions()[0].getExprEvaluator(); toExpression = validationContext.getParameterExpressions()[1].getExprEvaluator(); } return new CycleDetectorAggregationHandler(this, validationContext); }
public class CycleDetectorAggregationHandler implements PlugInAggregationMultiFunctionHandler { ...
public AggregationAccessor getAccessor() { if (validationContext.getFunctionName().toLowerCase().equals(CycleDetectorConstant.CYCLEOUTPUT_NAME)) { return new CycleDetectorAggregationAccessorOutput(); } return new CycleDetectorAggregationAccessorDetect(); }
public ExpressionReturnType getReturnType() { if (validationContext.getFunctionName().toLowerCase().equals(CycleDetectorConstant.CYCLEOUTPUT_NAME)) { return ExpressionReturnType.collectionOfSingleValue(factory.getFromExpression().getType()); } return ExpressionReturnType.singleValue(Boolean.class) ; }
private static final AggregationStateKey CYCLE_KEY = new AggregationStateKey() {}; public AggregationStateKey getAggregationStateUniqueKey() { return CYCLE_KEY; // Share the same aggregation state instance }
public PlugInAggregationMultiFunctionStateFactory getStateFactory() { return new CycleDetectorAggregationStateFactory(factory.getFromExpression(), factory.getToExpression()); }
<esper-configuration <plugin-aggregation-multifunction function-names="cycledetected,cycleoutput" // a comma-separated list of function name factory-class="com.espertech.esper.example.cycledetect.CycleDetectorAggregationFactory"/> </esper-configuration>
The next example uses the runtime configuration API to register the same:
String[] functionNames = new String[] {"cycledetected", "cycleoutput"}; ConfigurationPlugInAggregationMultiFunction config = new ConfigurationPlugInAggregationMultiFunction(functionNames, CycleDetectorAggregationFactory.class.getName()); engine.getEPAdministrator().getConfiguration().addPlugInAggregationMultiFunction(config);
ExprEvaluator filterEval = validationContext.getNamedParameters().get("filter").get(0).getExprEvaluator();
public void applyEnter(EventBean[] eventsPerStream, ExprEvaluatorContext exprEvaluatorContext) { Boolean pass = (Boolean) filterEval.evaluate(eventsPerStream, true, exprEvaluatorContext); // note: pass "false" for applyLeave if (pass != null && pass) { Object value = valueEval.evaluate(eventsPerStream, true, exprEvaluatorContext); // note: pass "false" for applyLeave // do something } }
Pattern guards are pattern objects that control the lifecycle of the guarded sub-expression, and can filter the events fired by the subexpression.
The following steps are required to develop and use a custom guard object with Esper.
Implement a guard factory class, responsible for creating guard object instances.
Implement a guard class.
Register the guard factory class with the engine by supplying a namespace and name, via the engine configuration file or the configuration API.
The code for the example guard object as shown in this chapter can be found in the test source folder in the package com.espertech.esper.regression.client
by the name MyCountToPatternGuardFactory
. The sample guard discussed here counts the number of events occurring up to a maximum number of events, and end the sub-expression when that maximum is reached.
A guard factory class is responsible for the following functions:
Guard factory classes subclass com.espertech.esper.pattern.guard.GuardFactorySupport
:
public class MyCountToPatternGuardFactory extends GuardFactorySupport { ...
public void setGuardParameters(List<ExprNode> guardParameters, MatchedEventConvertor convertor) throws GuardParameterException { String message = "Count-to guard takes a single integer-value expression as parameter"; if (guardParameters.size() != 1) { throw new GuardParameterException(message); } if (guardParameters.get(0).getExprEvaluator().getType() != Integer.class) { throw new GuardParameterException(message); } this.numCountToExpr = guardParameters.get(0); this.convertor = convertor; }
public Guard makeGuard(PatternAgentInstanceContext context, MatchedEventMap beginState, Quitable quitable, Object stateNodeId, Object guardState) { Object parameter = PatternExpressionUtil.evaluate("Count-to guard", beginState, numCountToExpr, convertor); if (parameter == null) { throw new EPException("Count-to guard parameter evaluated to a null value"); } Integer numCountTo = (Integer) parameter; return new MyCountToPatternGuard(numCountTo, quitable); }
A guard class has the following responsibilities:
Guard classes subclass com.espertech.esper.pattern.guard.GuardSupport
as shown here:
public abstract class GuardSupport implements Guard { ...
public void startGuard() { counter = 0; }
public boolean inspect(MatchedEventMap matchEvent) { counter++; if (counter > numCountTo) { quitable.guardQuit(); return false; } return true; }
Pattern observers are pattern objects that are executed as part of a pattern expression and can observe events or test conditions. Examples for built-in observers are timer:at
and timer:interval
. Some suggested uses of observer objects are:
Implement custom scheduling logic using the engine's own scheduling and timer services
Test conditions related to prior events matching an expression
The following steps are required to develop and use a custom observer object within pattern statements:
Implement an observer factory class, responsible for creating observer object instances.
Implement an observer class.
Register an observer factory class with the engine by supplying a namespace and name, via the engine configuration file or the configuration API.
The code for the example observer object as shown in this chapter can be found in the test source folder in package com.espertech.esper.regression.client
by the name MyFileExistsObserver
. The sample observer discussed here very simply checks if a file exists, using the filename supplied by the pattern statement, and via the java.io.File
class.
An observer factory class is responsible for the following functions:
Observer factory classes subclass com.espertech.esper.pattern.observer.ObserverFactorySupport
:
public class MyFileExistsObserverFactory extends ObserverFactorySupport { ...
public void setObserverParameters(List<ExprNode> expressionParameters, MatchedEventConvertor convertor, ExprValidationContext validationContext) throws ObserverParameterException { String message = "File exists observer takes a single string filename parameter"; if (expressionParameters.size() != 1) { throw new ObserverParameterException(message); } if (!(expressionParameters.get(0).getExprEvaluator().getType() == String.class)) { throw new ObserverParameterException(message); } this.filenameExpression = expressionParameters.get(0); this.convertor = convertor; }
public EventObserver makeObserver(PatternAgentInstanceContext context, MatchedEventMap beginState, ObserverEventEvaluator observerEventEvaluator, Object stateNodeId, Object observerState) { Object filename = PatternExpressionUtil.evaluate("File-exists observer ", beginState, filenameExpression, convertor); if (filename == null) { throw new EPException("Filename evaluated to null"); } return new MyFileExistsObserver(beginState, observerEventEvaluator, filename.toString()); }
a=MyEvent -> myplugin:my_observer(...)
An observer class has the following responsibilities:
Observer classes subclass com.espertech.esper.pattern.observer.ObserverSupport
as shown here:
public class MyFileExistsObserver implements EventObserver { ...
public void startObserve() { File file = new File(filename); if (file.exists()) { observerEventEvaluator.observerEvaluateTrue(beginState); } else { observerEventEvaluator.observerEvaluateFalse(); } }
Creating a plug-in event representation can be useful under any of these conditions:
Your application has existing Java classes that carry event metadata and event property values and your application does not want to (or cannot) extract or transform such event metadata and event data into one of the built-in event representations (POJO Java objects, Map or XML DOM).
Your application wants to provide a faster or short-cut access path to event data, for example to access XML event data through a Streaming API for XML (StAX).
Your application must perform a network lookup or other dynamic resolution of event type and events.
Note that the classes to plug-in custom event representations are held stable between minor releases, but can be subject to change between major releases.
Currently, EsperIO provides the following additional event representations:
Apache Axiom provides access to XML event data on top of the fast Streaming API for XML (StAX).
The source code is available for these and they are therefore excellent examples for how to implement a plug-in event representation. Please see the EsperIO documentation for usage details.
There are two situations in an Esper engine instance asks an event representation for an event type:
The EventBean
implementation is the event itself and encapsulates the underlying event object.
Follow the steps outlined below to process event objects for your event types:
Our EventType
takes a set of valid property names:
public class MyPlugInPropertiesEventType implements EventType { private final Set<String> properties; public MyPlugInPropertiesEventType(Set<String> properties) { this.properties = properties; } public Class getPropertyType(String property) { if (!isProperty(property)) { return null; } return String.class; } public Class getUnderlyingType() { return Properties.class; } //... further methods below }
public EventPropertyGetter getGetter(String property) { final String propertyName = property; return new EventPropertyGetter() { public Object get(EventBean eventBean) throws PropertyAccessException { MyPlugInPropertiesEventBean propBean = (MyPlugInPropertiesEventBean) eventBean; return propBean.getProperties().getProperty(propertyName); } public boolean isExistsProperty(EventBean eventBean) { MyPlugInPropertiesEventBean propBean = (MyPlugInPropertiesEventBean) eventBean; return propBean.getProperties().getProperty(propertyName) != null; } public Object getFragment(EventBean eventBean) { return null; // The property is not a fragment } }; }
public EventType[] getSuperTypes() { return null; // no supertype for this example } public Iterator<EventType> getDeepSuperTypes() { return null; } public String getName() { return name; } public EventPropertyDescriptor[] getPropertyDescriptors() { Collection<EventPropertyDescriptor> descriptorColl = descriptors.values(); return descriptorColl.toArray(new EventPropertyDescriptor[descriptors.size()]); } public EventPropertyDescriptor getPropertyDescriptor(String propertyName) { return descriptors.get(propertyName); } public FragmentEventType getFragmentType(String property) { return null; // sample does not provide any fragments }
public class MyPlugInEventRepresentation implements PlugInEventRepresentation { private List<MyPlugInPropertiesEventType> types; public void init(PlugInEventRepresentationContext context) { types = new ArrayList<MyPlugInPropertiesEventType>(); } public boolean acceptsType(PlugInEventTypeHandlerContext context) { return true; } public PlugInEventTypeHandler getTypeHandler(PlugInEventTypeHandlerContext eventTypeContext) { String proplist = (String) eventTypeContext.getTypeInitializer(); String[] propertyList = proplist.split(","); Set<String> typeProps = new HashSet<String>(Arrays.asList(propertyList)); MyPlugInPropertiesEventType eventType = new MyPlugInPropertiesEventType(typeProps); types.add(eventType); return new MyPlugInPropertiesEventTypeHandler(eventType); } // ... more methods below }
public class MyPlugInPropertiesEventTypeHandler implements PlugInEventTypeHandler { private final MyPlugInPropertiesEventType eventType; public MyPlugInPropertiesEventTypeHandler(MyPlugInPropertiesEventType eventType) { this.eventType = eventType; } public EventSender getSender(EPRuntimeEventSender runtimeEventSender) { return new MyPlugInPropertiesEventSender(eventType, runtimeEventSender); } public EventType getType() { return eventType; } }
public class MyPlugInPropertiesEventSender implements EventSender { private final MyPlugInPropertiesEventType type; private final EPRuntimeEventSender runtimeSender; public MyPlugInPropertiesEventSender(MyPlugInPropertiesEventType type, EPRuntimeEventSender runtimeSender) { this.type = type; this.runtimeSender = runtimeSender; } public void sendEvent(Object event) { if (!(event instanceof Properties)) { throw new EPException("Sender expects a properties event"); } EventBean eventBean = new MyPlugInPropertiesEventBean(type, (Properties) event); runtimeSender.processWrappedEvent(eventBean); } }
Our sample event representation accepts all URIs and returns a MyPlugInPropertiesBeanFactory
:
public class MyPlugInEventRepresentation implements PlugInEventRepresentation { // ... methods as seen earlier public boolean acceptsEventBeanResolution( PlugInEventBeanReflectorContext eventBeanContext) { return true; } public PlugInEventBeanFactory getEventBeanFactory( PlugInEventBeanReflectorContext eventBeanContext) { return new MyPlugInPropertiesBeanFactory(types); } }
public class MyPlugInPropertiesBeanFactory implements PlugInEventBeanFactory { private final List<MyPlugInPropertiesEventType> knownTypes; public MyPlugInPropertiesBeanFactory(List<MyPlugInPropertiesEventType> types) { knownTypes = types; } public EventBean create(Object event, URI resolutionURI) { Properties properties = (Properties) event; // use the known types to determine the type of the object for (MyPlugInPropertiesEventType type : knownTypes) { // if there is one property the event does not contain, then its not the right type boolean hasAllProperties = true; for (String prop : type.getPropertyNames()) { if (!properties.containsKey(prop)) { hasAllProperties = false; break; } } if (hasAllProperties) { return new MyPlugInPropertiesEventBean(type, properties); } } return null; // none match, unknown event } }