www.espertech.comDocumentation

Chapter 15. API Reference

15.1. API Overview
15.2. The Service Provider Interface
15.3. The Administrative Interface
15.3.1. Creating Statements
15.3.2. Receiving Statement Results
15.3.3. Setting a Subscriber Object
15.3.4. Adding Listeners
15.3.5. Using Iterators
15.3.6. Managing Statements
15.3.7. Atomic Statement Management
15.3.8. Runtime Configuration
15.4. The Runtime Interface
15.4.1. Event Sender
15.4.2. Receiving Unmatched Events
15.5. On-Demand Fire-And-Forget Query Execution
15.5.1. On-Demand Query Single Execution
15.5.2. On-Demand Query Prepared Unparameterized Execution
15.5.3. On-Demand Query Prepared Parameterized Execution
15.6. Event and Event Type
15.6.1. Event Type Metadata
15.6.2. Event Object
15.6.3. Query Example
15.6.4. Pattern Example
15.7. Engine Threading and Concurrency
15.7.1. Advanced Threading
15.7.2. Processing Order
15.8. Controlling Time-Keeping
15.8.1. Controlling Time Using Time Span Events
15.8.2. Additional Time-Related APIs
15.9. Time Resolution
15.10. Service Isolation
15.10.1. Overview
15.10.2. Example: Suspending a Statement
15.10.3. Example: Catching up a Statement from Historical Data
15.10.4. Isolation for Insert-Into
15.10.5. Isolation for Named Windows and Tables
15.10.6. Runtime Considerations
15.11. Exception Handling
15.12. Condition Handling
15.13. Statement Object Model
15.13.1. Building an Object Model
15.13.2. Building Expressions
15.13.3. Building a Pattern Statement
15.13.4. Building a Select Statement
15.13.5. Building a Create-Variable and On-Set Statement
15.13.6. Building Create-Window, On-Delete and On-Select Statements
15.14. Prepared Statement and Substitution Parameters
15.15. Engine and Statement Metrics Reporting
15.15.1. Engine Metrics
15.15.2. Statement Metrics
15.16. Event Rendering to XML and JSON
15.16.1. JSON Event Rendering Conventions and Options
15.16.2. XML Event Rendering Conventions and Options
15.17. Plug-in Loader
15.18. Interrogating EPL Annotations
15.19. Context Partition Selection
15.19.1. Selectors
15.20. Context Partition Administration
15.21. Test and Assertion Support
15.21.1. EPAssertionUtil Summary
15.21.2. SupportUpdateListener Summary
15.21.3. Usage Example

Esper has the following primary interfaces:

For EPL introductory information please see Section 5.1, “EPL Introduction” and patterns are described at Section 7.1, “Event Pattern Overview”.

The JavaDoc documentation is also a great source for API information.

The EPServiceProvider interface represents an engine instance. Each instance of an Esper engine is completely independent of other engine instances and has its own administrative and runtime interface.

An instance of the Esper engine is obtained via static methods on the EPServiceProviderManager class. The getDefaultProvider method and the getProvider(String providerURI) methods return an instance of the Esper engine. The latter can be used to obtain multiple instances of the engine for different provider URI values. The EPServiceProviderManager determines if the provider URI matches all prior provider URI values and returns the same engine instance for the same provider URI value. If the provider URI has not been seen before, it creates a new engine instance.

The code snipped below gets the default instance Esper engine. Subsequent calls to get the default engine instance return the same instance.

EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();

This code snippet gets an Esper engine for the provider URI RFIDProcessor1. Subsequent calls to get an engine with the same provider URI return the same instance.

EPServiceProvider epService = EPServiceProviderManager.getProvider("RFIDProcessor1");

Since the getProvider methods return the same cached engine instance for each URI, there is no need to statically cache an engine instance in your application.

An existing Esper engine instance can be reset via the initialize method on the EPServiceProvider instance. This operation stops and removes all statements and resets the engine to the configuration provided when the engine instance for that URI was obtained. If no configuration is provided, an empty (default) configuration applies.

After initialize your application must obtain new administrative and runtime services. Any administrative and runtime services obtained before the initialize are invalid and have undefined behavior.

The next code snippet outlines a typical sequence of use:

// Configure the engine, this is optional
Configuration config = new Configuration();
config.configure("configuration.xml");	// load a configuration from file
config.set....(...);    // make additional configuration settings

// Obtain an engine instance
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config);

// Optionally, use initialize if the same engine instance has been used before to start clean
epService.initialize();

// Optionally, make runtime configuration changes
epService.getEPAdministrator().getConfiguration().add...(...);

// Destroy the engine instance when no longer needed, frees up resources
epService.destroy();

An existing Esper engine instance can be destroyed via the destroy method on the EPServiceProvider instance. This stops and removes all statements as well as frees all resources held by the instance. After a destroy the engine can no longer be used.

The EPServiceStateListener interface may be implemented by your application to receive callbacks when an engine instance is about to be destroyed and after an engine instance has been initialized. Listeners are registered via the addServiceStateListener method. The EPStatementStateListener interface is used to receive callbacks when a new statement gets created and when a statement gets started, stopped or destroyed. Listeners are registered via the addStatementStateListener method.

When destroying an engine instance your application must make sure that threads that are sending events into the engine have completed their work. More generally, the engine should not be currently in use during or after the destroy operation.

As engine instances are completely independent, your application may not send EventBean instances obtained from one engine instance into a second engine instance since the event type space between two engine instances is not shared.

Create event pattern expression and EPL statements via the administrative interface EPAdministrator.

For managing one or more related statements as a module, please consider the deployment administrative API and EPL modules as further described in Section 17.4, “Packaging and Deploying Overview”.

This code snippet gets an Esper engine then creates an event pattern and an EPL statement.

EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
EPAdministrator admin = epService.getEPAdministrator();

EPStatement 10secRecurTrigger = admin.createPattern(
  "every timer:at(*, *, *, *, *, */10)");

EPStatement countStmt = admin.createEPL(
  "select count(*) from MarketDataBean.win:time(60 sec)");

Note that event pattern expressions can also occur within EPL statements. This is outlined in more detail in Section 5.4.2, “Pattern-based Event Streams”.

The create methods on EPAdministrator are overloaded and allow an optional statement name to be passed to the engine. A statement name can be useful for retrieving a statement by name from the engine at a later time. The engine assigns a statement name if no statement name is supplied on statement creation.

The createPattern and createEPL methods return EPStatement instances. Statements are automatically started and active when created. A statement can also be stopped and started again via the stop and start methods shown in the code snippet below.

countStmt.stop();
countStmt.start();

The create methods on EPAdministrator also accept a user object. The user object is associated with a statement at time of statement creation and is a single, unnamed field that is stored with every statement. Applications may put arbitrary objects in this field. Use the getUserObject method on EPStatement to obtain the user object of a statement and StatementAwareUpdateListener for listeners.

Your application may create new statements or stop and destroy existing statements using any thread and also within listener or subscriber code. If using POJO events, your application may not create or manage statements in the event object itself while the same event is currently being processed by a statement.

For NEsper .NET also see Section C.14, “.NET API - Receiving Statement Results”.

Esper provides three choices for your application to receive statement results. Your application can use all three mechanisms alone or in any combination for each statement. The choices are:

Table 15.1. Choices For Receiving Statement Results

NameMethods on EPStatementDescription
Listener CallbacksaddListener and removeListener

Your application provides implementations of the UpdateListener or the StatementAwareUpdateListener interface to the statement. Listeners receive EventBean instances containing statement results.

The engine continuously indicates results to all listeners as soon they occur, and following output rate limiting clauses if specified.

Subscriber ObjectsetSubscriber

Your application provides a POJO (plain Java object) that exposes methods to receive statement results.

The name of the method that a subscriber object provides to receive results is update, unless your call to setSubscriber provides another method name.

The engine continuously indicates results to the single subscriber as soon they occur, and following output rate limiting clauses if specified.

This is the fastest method to receive statement results, as the engine delivers strongly-typed results directly to your application objects without the need for building an EventBean result set as in the Listener Callback choice.

There can be at most 1 Subscriber Object registered per statement. If you require more than one listener, use the Listener Callback instead (or in addition). The Subscriber Object is bound to the statement with a strongly typed support which ensure direct delivery of new events without type conversion. This optimization is made possible because there can only be 0 or 1 Subscriber Object per statement.

Pull APIsafeIterator and iterator

Your application asks the statement for results and receives a set of events via java.util.Iterator<EventBean>.

This is useful if your application does not need continuous indication of new results in real-time.


Your application may attach one or more listeners, zero or one single subscriber and in addition use the Pull API on the same statement. There are no limitations to the use of iterator, subscriber or listener alone or in combination to receive statement results.

The best delivery performance can generally be achieved by attaching a subscriber and by not attaching listeners. The engine is aware of the listeners and subscriber attached to a statement. The engine uses this information internally to reduce statement overhead. For example, if your statement does not have listeners or a subscriber attached, the engine does not need to continuously generate results for delivery.

If your application attaches both a subscriber and one or more listeners then the subscriber receives the result first before any of the listeners.

If your application attaches more than one listener then the UpdateListener listeners receive results first in the order they were added to the statement, and StatementAwareUpdateListener listeners receive results next in the order they were added to the statement. To change the order of delivery among listeners your application can add and remove listeners at runtime.

If you have configured outbound threading, it means a thread from the outbound thread pool delivers results to the subscriber and listeners instead of the processing or event-sending thread.

If outbound threading is turned on, we recommend turning off the engine setting preserving the order of events delivered to listeners as described in Section 16.4.11.1, “Preserving the order of events delivered to listeners”. If outbound threading is turned on statement execution is not blocked for the configured time in the case a subscriber or listener takes too much time.

A subscriber object is a direct binding of query results to a Java object. The object, a POJO, receives statement results via method invocation. The subscriber class does not need to implement an interface or extend a superclass. Only one subscriber object may be set for a statement.

Subscriber objects have several advantages over listeners. First, they offer a substantial performance benefit: Query results are delivered directly to your method(s) through Java virtual machine method calls, and there is no intermediate representation (EventBean). Second, as subscribers receive strongly-typed parameters, the subscriber code tends to be simpler.

This chapter describes the requirements towards the methods provided by your subscriber class.

The engine can deliver results to your subscriber in two ways:

  1. Each evert in the insert stream results in a method invocation, and each event in the remove stream results in further method invocations. This is termed row-by-row delivery.

  2. A single method invocation that delivers all rows of the insert and remove stream. This is termed multi-row delivery.

Your subscriber class must provide a method by name update to receive insert stream events row-by-row. The number and types of parameters declared by the update method must match the number and types of columns as specified in the select clause, in the same order as in the select clause.

For example, if your statement is:

select orderId, price, count(*) from OrderEvent

Then your subscriber update method looks as follows:

public class MySubscriber {
  ...
  public void update(String orderId, double price, long count) {...}
  ...
}

Each method parameter declared by the update method must be assignable from the respective column type as listed in the select-clause, in the order selected. The assignability rules are:

  • Widening of types follows Java standards. For example, if your select clause selects an integer value, the method parameter for the same column can be typed int, long, float or double (or any equivalent boxed type).

  • Auto-boxing and unboxing follows Java standards. For example, if your select clause selects an java.lang.Integer value, the method parameter for the same column can be typed int. Note that if your select clause column may generate null values, an exception may occur at runtime unboxing the null value.

  • Interfaces and super-classes are honored in the test for assignability. Therefore java.lang.Object can be used to accept any select clause column type

In the case that your subscriber class offers multiple update method footprints, the engine selects the closest-matching footprint by comparing the output types and method parameter types. The engine prefers the update method that is an exact match of types, followed by an update method that requires boxing or unboxing, followed by an update method that requires widening and finally any other allowable update method.

Within the above criteria, in the case that your subscriber class offers multiple update method footprints with same method parameter types, the engine prefers the update method that has EPStatement as the first parameter.

In place of row-by-row delivery, your subscriber can receive all events in the insert and remove stream via a single method invocation. This is applicable when an EPL delivers multiple output rows for a given input event or time advancing, for example when multiple pattern matches occur for the same incoming event, for a join producing multiple output rows or with output rate limiting, for example.

The event delivery follow the scheme as described earlier in Section 15.3.3.2.2, “Row Delivery as Map and Object Array ”. The subscriber class must provide one of the following methods:


For NEsper .NET also see Section C.15, “.NET API - Adding Listeners”.

Your application can subscribe to updates posted by a statement via the addListener and removeListener methods on EPStatement . Your application must to provide an implementation of the UpdateListener or the StatementAwareUpdateListener interface to the statement:

UpdateListener myListener = new MyUpdateListener();
countStmt.addListener(myListener);

EPL statements and event patterns publish old data and new data to registered UpdateListener listeners. New data published by statements is the events representing the new values of derived data held by the statement. Old data published by statements constists of the events representing the prior values of derived data held by the statement.

Important

UpdateListener listeners receive multiple result rows in one invocation by the engine: the new data and old data parameters to your listener are array parameters. For example, if your application uses one of the batch data windows, or your application creates a pattern that matches multiple times when a single event arrives, then the engine indicates such multiple result rows in one invocation and your new data array carries two or more rows.

A second listener interface is the StatementAwareUpdateListener interface. A StatementAwareUpdateListener is especially useful for registering the same listener object with multiple statements, as the listener receives the statement instance and engine instance in addition to new and old data when the engine indicates new results to a listener.

StatementAwareUpdateListener myListener = new MyStmtAwareUpdateListener();
statement.addListener(myListener);

To indicate results the engine invokes this method on StatementAwareUpdateListener listeners: update(EventBean[] newEvents, EventBean[] oldEvents, EPStatement statement, EPServiceProvider epServiceProvider)

Subscribing to events posted by a statement is following a push model. The engine pushes data to listeners when events are received that cause data to change or patterns to match. Alternatively, you need to know that statements serve up data that your application can obtain via the safeIterator and iterator methods on EPStatement. This is called the pull API and can come in handy if your application is not interested in all new updates, and only needs to perform a frequent or infrequent poll for the latest data.

The safeIterator method on EPStatement returns a concurrency-safe iterator returning current statement results, even while concurrent threads may send events into the engine for processing. The engine employs a read-write lock per context partition and obtains a read lock for iteration. Thus safe iterator guarantees correct results even as events are being processed by other threads and other context partitions. The cost is that the iterator obtains and holds zero, one or multiple context partition locks for that statement that must be released via the close method on the SafeIterator instance.

The iterator method on EPStatement returns a concurrency-unsafe iterator. This iterator is only useful for applications that are single-threaded, or applications that themselves perform coordination between the iterating thread and the threads that send events into the engine for processing. The advantage to this iterator is that it does not hold a lock.

When statements are used with contexts and context partitions, the APIs to identify, filter and select context partitions for statement iteration are described in Section 15.19, “Context Partition Selection”.

The next code snippet shows a short example of use of safe iterators:

EPStatement statement = epAdmin.createEPL("select avg(price) as avgPrice from MyTick");
// .. send events into the engine
// then use the pull API...
SafeIterator<EventBean> safeIter = statement.safeIterator();
try {
  for (;safeIter.hasNext();) {
     // .. process event ..
     EventBean event = safeIter.next();
     System.out.println("avg:" + event.get("avgPrice");
  }
}
finally {
  safeIter.close();	// Note: safe iterators must be closed
}

This is a short example of use of the regular iterator that is not safe for concurrent event processing:

double averagePrice = (Double) eplStatement.iterator().next().get("average");

The safeIterator and iterator methods can be used to pull results out of all statements, including statements that join streams, contain aggregation functions, pattern statements, and statements that contain a where clause, group by clause, having clause or order by clause.

For statements without an order by clause, the iterator method returns events in the order maintained by the data window. For statements that contain an order by clause, the iterator method returns events in the order indicated by the order by clause.

Consider using the on-select clause and a named window if your application requires iterating over a partial result set or requires indexed access for fast iteration; Note that on-select requires that you sent a trigger event, which may contain the key values for indexed access.

Esper places the following restrictions on the pull API and usage of the safeIterator and iterator methods:

  1. In multithreaded applications, use the safeIterator method. Note: make sure your application closes the iterator via the close method when done, otherwise the iterated statement context partitions stay locked and event processing for statement context partitions does not resume.

  2. In multithreaded applications, the iterator method does not hold any locks. The iterator returned by this method does not make any guarantees towards correctness of results and fail-behavior, if your application processes events into the engine instance by multiple threads. Use the safeIterator method for concurrency-safe iteration instead.

  3. Since the safeIterator and iterator methods return events to the application immediately, the iterator does not honor an output rate limiting clause, if present. That is, the iterator returns results as if there is no output-rate clause for the statement in statements without grouping or aggregation. For statements with grouping or aggregation, the iterator in combintion with an output clause returns last output group and aggregation results. Use a separate statement and the insert into clause to control the output rate for iteration, if so required.

  4. When iterating a statement that operates on an unbound stream (no data window declared), please note the following:

    • When iterating a statement that groups and aggregates values from an unbound stream and that specifies output snapshot, the engine retains groups and aggregations for output as iteration results or upon the output snapshot condition .

    • When iterating a statement that groups and aggregates values from an unbound stream and that does not specify output snapshot, the engine only retains the last aggregation values and the iterated result contains only the last updated group.

    • When iterating a statement that operates on an unbound stream the iterator returns no rows. This behavior can be changed by specifying either the @IterableUnbound annotation or by changing the global view resources configuration.

Your application can concurrently send events into the engine while performing statement or module management. Therefore it is safe to stop and start statements or undeploy and deploy modules while sending in events from other threads concurrently.

Your application can use the API described below to obtain a lock and perform statement or module management as an atomic unit. For example, if your application would like to undeploy and re-deploy all statements or modules as a single administrative unit, while at the same time sending events into the engine from different threads, it can obtain a lock to ensure that no events are concurrently processed while the statement or module management operations take place.

The below code sample obtains the engine exclusive write lock to perform multiple management operations as a unit, excluding concurrent processing of events.

epService.getEngineInstanceWideLock().writeLock().lock();
// Start atomic management unit. 
// Any events concurrently being processed by other threads must complete before the code completes obtaining the lock. 
// Any events sent in by other threads will await the release of the lock.
try {
  // perform operations such as : 
  //   - start statements, destroy statements, stop statements
  //   - undeploy modules, deploy modules (deployment admin API)
}
finally {
  // Complete atomic management unit. 
  // Any events sent in by other threads will now continue processing against the changed set of statements.
  epService.getEngineInstanceWideLock().writeLock().unlock();
}

Certain configuration changes are available to perform on an engine instance while in operation. Such configuration operations are available via the getConfiguration method on EPAdministrator, which returns a ConfigurationOperations object.

Please consult the JavaDoc of ConfigurationOperations for further information. The section Section 16.6, “Runtime Configuration” provides a summary of available configurations.

In summary, the configuration operations available on a running engine instance are as follows:

  • Add new event types for all event representations, check if an event type exists, update an existing event type, remove an event type, query a list of types and obtain a type by name.

  • Add and remove variables (get and set variable values is done via the runtime API).

  • Add a variant stream.

  • Add a revision event type.

  • Add event types for all event classes in a given Java package, using the simple class name as the event name.

  • Add import for user-defined functions.

  • Add a plug-in aggregation function, plug-in single row function, plug-in event type, plug-in event type resolution URIs.

  • Control metrics reporting.

  • Additional items please see the ConfigurationOperations interface.

For examples of above runtime configuration API functions please consider the Configuration chapter, which applies to both static configuration and runtime configuration as the ConfigurationOperations interface is the same.

The EPRuntime interface is used to send events for processing into an Esper engine, set and get variable values and execute on-demand queries.

The below code snippet shows how to send a Java object event to the engine. Note that the sendEvent method is overloaded. As events can take on different representation classes in Java, the sendEvent takes parameters to reflect the different types of events that can be send into the engine. The Chapter 2, Event Representations section explains the types of events accepted.

EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
EPRuntime runtime = epService.getEPRuntime();

// Send an example event containing stock market data
runtime.sendEvent(new MarketDataBean('IBM', 75.0));		

Tip

Events, in theoretical terms, are observations of a state change that occurred in the past. Since one cannot change an event that happened in the past, events are best modelled as immutable objects.

Caution

The engine relies on events that are sent into an engine to not change their state. Typically, applications create a new event object for every new event, to represent that new event. Application should not modify an existing event that was sent into the engine.

Important

Another important method in the runtime interface is the route method. This method is designed for use by UpdateListener and subscriber implementations as well as engine extensions that need to send events into an engine instance to avoid the possibility of a stack overflow due to nested calls to sendEvent and to ensure correct processing of the current and routed event. Note that if outbound-threading is enabled, listeners and subscribers should use sendEvent and not route.

The EventSender interface processes event objects that are of a known type. This facility can reduce the overhead of event object reflection and type lookup as an event sender is always associated to a single concrete event type.

Use the method getEventSender(String eventTypeName) to obtain an event sender for processing events of the named type:

EventSender sender = epService.getEPRuntime().getEventSender("MyEvent");
sender.sendEvent(myEvent);

For events backed by a Java class (JavaBean events), the event sender ensures that the event object equals the underlying class, or implements or extends the underlying class for the given event type name.

For events backed by a java.util.Map (Map events), the event sender does not perform any checking other than checking that the event object implements Map.

For events backed by a Object[] (Object-array events), the event sender does not perform any checking other than checking that the event object implements Object[]. The array elements must be in the exact same order of properties as declared and array length must always be at least the number of properties declared.

For events backed by a org.w3c.Node (XML DOM events), the event sender checks that the root element name equals the root element name for the event type.

A second method to obtain an event sender is the method getEventSender(URI[]), which takes an array of URIs. This method is for use with plug-in event representations. The event sender returned by this method processes event objects that are of one of the types of one or more plug-in event representations. Please consult Section 18.8, “Event Type And Event Object” for more information.

As your application may not require streaming results and may not know each query in advance, the on-demand query facility provides for ad-hoc execution of an EPL expression.

On-demand queries are not continuous in nature: The query engine executes the query once and returns all result rows to the application. On-demand query execution is very lightweight as the engine performs no statement creation and the query leaves no traces within the engine.

Esper provides the facility to explicitly index named windows and tables to speed up on-demand and continuous queries. Please consult Section 6.9, “Explicitly Indexing Named Windows and Tables” for more information.

When named windows and tables are used with contexts and context partitions, the APIs to identify, filter and select context partitions for on-demand queries can be found in Section 15.19, “Context Partition Selection”.

The EPRuntime interface provides three ways to run on-demand queries:

  1. Use the executeQuery method to executes a given on-demand query exactly once, see Section 15.5.1, “On-Demand Query Single Execution”.

  2. Use the prepareQuery method to prepare a given on-demand query such that the same query can be executed multiple times without repeated parsing, see Section 15.5.2, “On-Demand Query Prepared Unparameterized Execution”.

  3. Use the prepareQueryWithParameters method to prepare a given on-demand query that may have substitution parameters such that the same query can be parameterized and executed multiple times without repeated parsing, see Section 15.5.3, “On-Demand Query Prepared Parameterized Execution”

If your application must execute the same EPL on-demand query multiple times with different parameters use prepareQueryWithParameters.

If your application must execute the same EPL on-demand query multiple times without use either prepareQuery or prepareQueryWithParameters and specify no substitution parameters.

By using any of the prepare... methods the engine can compile an EPL query string or object model once and reuse the object and thereby speed up repeated execution.

The following limitations apply:

  • An on-demand EPL expression only evaluates against the named windows and tables that your application creates. On-demand queries may not specify any other streams or application event types.

  • The following clauses are not allowed in on-demand EPL: insert into and output.

  • Views and patterns are not allowed to appear in on-demand queries.

  • On-demand EPL may not perform subqueries.

  • The previous and prior functions may not be used.

Prepared on-demand queries are designed for repeated execution and may perform better then the dynamic single-execution method if running the same query multiple times. For use with parameter placeholders please see Section 15.5.3, “On-Demand Query Prepared Parameterized Execution”.

The next code snippet demonstrates prepared on-demand queries without parameter placeholder:

String query = "select * from MyNamedWindow where orderId = '123'"
EPOnDemandPreparedQuery prepared = epRuntime.prepareQuery(query);
EPOnDemandQueryResult result = prepared.execute();

// ...later on execute once more ...
prepared.execute();	// execute a second time

You can insert substitution parameters into an on-demand query as a single question mark character '?', making the substitution parameter addressable by index.

You can also insert substitution parameters using the following syntax, which makes the substitution parameter addressable by name:

?:name

If substitution parameters do not have a name assigned, the engine assigns the first substitution parameter an index of 1 and subsequent parameters increment the index by one. Please see Section 15.14, “Prepared Statement and Substitution Parameters” for additional detail and examples.

Substitution parameters can be inserted into any EPL construct that takes an expression.

All substitution parameters must be replaced by actual values before an on-demand query with substitution parameters can be executed. Substitution parameters can be replaced with an actual value using the setObject method for each index or name. Substitution parameters can be set to new values and the query executed more than once.

While the setObject method allows substitution parameters to assume any actual value including application Java objects or enumeration values, the application must provide the correct type of substitution parameter that matches the requirements of the expression the parameter resides in.

The next program listing runs a prepared and parameterized on-demand query against a named window MyNamedWindow (this example does not assign names to substitution parameters):

String query = "select * from MyNamedWindow where orderId = ?";
EPOnDemandPreparedQueryParameterized prepared = epRuntime.prepareQueryWithParameters(query);

// Set the required parameter values before each execution
prepared.setObject(1, "123");
result = epRuntime.executeQuery(prepared);

// ...execute a second time with new parameter values...
prepared.setObject(1, "456");
result = epRuntime.executeQuery(prepared);

This second example uses the in operator and has multiple parameters:

String query = "select * from MyNamedWindow where orderId in (?) and price > ?";
EPOnDemandPreparedQueryParameterized prepared = epRuntime.prepareQueryWithParameters(query);
prepared.setObject(1, new String[] {"123", "456"});
prepared.setObject(2, 1000.0});

An EventBean object represents a row (event) in your continuous query's result set. Each EventBean object has an associated EventType object providing event metadata.

An UpdateListener implementation receives one or more EventBean events with each invocation. Via the iterator method on EPStatement your application can poll or read data out of statements. Statement iterators also return EventBean instances.

Each statement provides the event type of the events it produces, available via the getEventType method on EPStatement.

An EventType object encapsulates all the metadata about a certain type of events. As Esper supports an inheritance hierarchy for event types, it also provides information about super-types to an event type.

An EventType object provides the following information:

For each property of an event type, there is an EventPropertyDescriptor object that describes the property. The EventPropertyDescriptor contains flags that indicate whether a property is an indexed (array) or a mapped property and whether access to property values require an integer index value (indexed properties only) or string key value (mapped properties only). The descriptor also contains a fragment flag that indicates whether a property value is available as a fragment.

The term fragment means an event property value that is itself an event, or a property value that can be represented as an event. The getFragmentType on EventType may be used to determine a fragment's event type in advance.

A fragment event type and thereby fragment events allow navigation over a statement's results even if the statement result contains nested events or a graph of events. There is no need to use the Java reflection API to navigate events, since fragments allow the querying of nested event properties or array values, including nested Java classes.

When using the Map or Object-array event representation, any named Map type or Object-array type nested within a Map or Object-array as a simple or array property is also available as a fragment. When using Java objects either directly or within Map or Object-array events, any object that is neither a primitive or boxed built-in type, and that is not an enumeration and does not implement the Map interface is also available as a fragment.

The nested, indexed and mapped property syntax can be combined to a property expression that may query an event property graph. Most of the methods on the EventType interface allow a property expression to be passed.

Your application may use an EventType object to obtain special getter-objects. A getter-object is a fast accessor to a property value of an event of a given type. All getter objects implement the EventPropertyGetter interface. Getter-objects work only for events of the same type or sub-types as the EventType that provides the EventPropertyGetter. The performance section provides additional information and samples on using getter-objects.

Consider a statement that returns the symbol, count of events per symbol and average price per symbol for tick events. Our sample statement may declare a fully-qualified Java class name as the event type: org.sample.StockTickEvent. Assume that this class exists and exposes a symbol property of type String, and a price property of type (Java primitive) double.

select symbol, avg(price) as avgprice, count(*) as mycount 
from org.sample.StockTickEvent 
group by symbol

The next table summarizes the property names and types as posted by the statement above:


A code snippet out of a possible UpdateListener implementation to this statement may look as below:

String symbol = (String) newEvents[0].get("symbol");
Double price= (Double) newEvents[0].get("avgprice");
Long count= (Long) newEvents[0].get("mycount");

The engine supplies the boxed java.lang.Double and java.lang.Long types as property values rather than primitive Java types. This is because aggregated values can return a null value to indicate that no data is available for aggregation. Also, in a select statement that computes expressions, the underlying event objects to EventBean instances are either of type Object[] (object-array) or of type java.util.Map.

Use statement.getEventType().getUnderlyingType() to inspect the underlying type for all events delivered to listeners. Whether the engine delivers Map or Object-array events to listeners can be specified as follows. If the statement provides the @EventRepresentation(array=true) annotation the engine delivers the output events as object array. If the statement provides the @EventRepresentation(array=false) annotation the engine delivers output events as a Map. If neither annotation is provided, the engine delivers the configured default event representation as discussed in Section 16.4.12.1, “Default Event Representation”.

Consider the next statement that specifies a wildcard selecting the same type of event:

select * from org.sample.StockTickEvent where price > 100

The property names and types provided by an EventBean query result row, as posted by the statement above are as follows:


As an alternative to querying individual event properties via the get methods, the getUnderlying method on EventBean returns the underlying object representing the query result. In the sample statement that features a wildcard-select, the underlying event object is of type org.sample.StockTickEvent:

StockTickEvent tick = (StockTickEvent) newEvents[0].getUnderlying();

Composite events are events that aggregate one or more other events. Composite events are typically created by the engine for statements that join two event streams, and for event patterns in which the causal events are retained and reported in a composite event. The example below shows such an event pattern.

// Look for a pattern where BEvent follows AEvent
String pattern = "a=AEvent -> b=BEvent";
EPStatement stmt = epService.getEPAdministrator().createPattern(pattern);
stmt.addListener(testListener);
// Example listener code
public class MyUpdateListener implements UpdateListener {
  public void update(EventBean[] newData, EventBean[] oldData) {
    System.out.println("a event=" + newData[0].get("a"));
    System.out.println("b event=" + newData[0].get("b"));
  }
}

Note that the update method can receive multiple events at once as it accepts an array of EventBean instances. For example, a time batch window may post multiple events to listeners representing a batch of events received during a given time period.

Pattern statements can also produce multiple events delivered to update listeners in one invocation. The pattern statement below, for instance, delivers an event for each A event that was not followed by a B event with the same id property within 60 seconds of the A event. The engine may deliver all matching A events as an array of events in a single invocation of the update method of each listener to the statement:

select * from pattern[
  every a=A -> (timer:interval(60 sec) and not B(id=a.id))]

A code snippet out of a possible UpdateListener implementation to this statement that retrives the events as fragments may look as below:

EventBean a = (EventBean) newEvents[0].getFragment("a");
// ... or using a nested property expression to get a value out of A event...
double value = (Double) newEvent[0].get("a.value");

Some pattern objects return an array of events. An example is the unbound repeat operator. Here is a sample pattern that collects all A events until a B event arrives:

select * from pattern [a=A until b=B]

A possible code to retrieve different fragments or property values:

EventBean[] a = (EventBean[]) newEvents[0].getFragment("a");
// ... or using a nested property expression to get a value out of A event...
double value = (Double) newEvent[0].get("a[0].value");

For NEsper .NET also see Section C.16, “.NET API - Engine Threading and Concurrency”.

Esper is designed from the ground up to operate as a component to multi-threaded, highly-concurrent applications that require efficient use of Java VM resources. In addition, multi-threaded execution requires guarantees in predictability of results and deterministic processing. This section discusses these concerns in detail.

In Esper, an engine instance is a unit of separation. Applications can obtain and discard (initialize) one or more engine instances within the same Java VM and can provide the same or different engine configurations to each instance. An engine instance efficiently shares resources between statements. For example, consider two statements that declare the same data window. The engine matches up view declarations provided by each statement and can thus provide a single data window representation shared between the two statements.

Applications can use Esper APIs to concurrently, by multiple threads of execution, perform such functions as creating and managing statements, or sending events into an engine instance for processing. Applications can use application-managed threads or thread pools or any set of same or different threads of execution with any of the public Esper APIs. There are no restrictions towards threading other than those noted in specific sections of this document.

Esper does not prescribe a specific threading model. Applications using Esper retain full control over threading, allowing an engine to be easily embedded and used as a component or library in your favorite Java container or process.

In the default configuration it is up to the application code to use multiple threads for processing events by the engine, if so desired. All event processing takes places within your application thread call stack. The exception is timer-based processing if your engine instance relies on the internal timer (default). If your application relies on external timer events instead of the internal timer then there need not be any Esper-managed internal threads.

The fact that event processing can take place within your application thread's call stack makes developing applications with Esper easier: Any common Java integrated development environment (IDE) can host an Esper engine instance. This allows developers to easily set up test cases, debug through listener code and inspect input or output events, or trace their call stack.

In the default configuration, each engine instance maintains a single timer thread (internal timer) providing for time or schedule-based processing within the engine. The default resolution at which the internal timer operates is 100 milliseconds. The internal timer thread can be disabled and applications can instead send external time events to an engine instance to perform timer or scheduled processing at the resolution required by an application.

Each engine instance performs minimal locking to enable high levels of concurrency. An engine instance locks on a context partition level to protect context partition resources. For stateless EPL select-statements the engine does not use a context-partition lock and operates lock-free for the context partition. For stateful statements, the maximum (theoretical) degree of parallelism is 2^31-1 (2,147,483,647) parallel threads working to process a single EPL statement under a hash segmented context.

You may turn off context partition locking engine-wide (also read the caution notice) as described in Section 16.4.25.3, “Disable Locking”. You may disable context partition locking for a given statement by providing the @NoLock annotation as part of your EPL. Note, we provide the @NoLock annotation for the purpose of identifying locking overhead, or when your application is single-threaded, or when using an external mechanism for concurrency control or for example with virtual data windows or plug-in data windows to allow customizing concurrency for a given statement or named window. Using this annotation may have unpredictable results unless your application is taking concurrency under consideration.

For an engine instance to produce predictable results from the viewpoint of listeners to statements, an engine instance by default ensures that it dispatches statement result events to listeners in the order in which a statement produced result events. Applications that require the highest possible concurrency and do not require predictable order of delivery of events to listeners, this feature can be turned off via configuration, see Section 16.4.11.1, “Preserving the order of events delivered to listeners”. For example, assume thread T1 processes an event applied to statement S producing output event O1. Assume thread T2 processes another event applied to statement S and produces output event O2. The engine employs a configurable latch system to ensure that listeners to statement S receive and may complete processing of O1 before receiving O2. When using outbound threading (advanced threading options) or changing the configuration this guarantee is weakened or removed.

In multithreaded environments, when one or more statements make result events available via the insert into clause to further statements, the engine preserves the order of events inserted into the generated insert-into stream, allowing statements that consume other statement's events to behave deterministic. This feature can also be turned off via configuration, see , see Section 16.4.11.2, “Preserving the order of events for insert-into streams”. For example, assume thread T1 processes an event applied to statement S and thread T2 processes another event applied to statement S. Assume statement S inserts into into stream ST. T1 produces an output event O1 for processing by consumers of ST1 and T2 produces an output event O2 for processing by consumers of ST. The engine employs a configurable latch system such that O1 is processed before O2 by consumers of ST. When using route execution threading (advanced threading options) or changing the configuration this guarantee is weakened or removed.

We generally recommended that listener implementations block minimally or do not block at all. By implementing listener code as non-blocking code execution threads can often achieve higher levels of concurrency.

We recommended that, when using a single listener or subscriber instance to receive output from multiple statements, that the listener or subscriber code is multithread-safe. If your application has shared state between listener or subscriber instances then such shared state should be thread-safe.

In the default configuration the same application thread that invokes any of the sendEvent methods will process the event fully and also deliver output events to listeners and subscribers. By default the single internal timer thread based on system time performs time-based processing and delivery of time-based results.

This default configuration reduces the processing overhead associated with thread context switching, is lightweight and fast and works well in many environments such as J2EE, server or client. Latency and throughput requirements are largely use case dependant, and Esper provides engine-level facilities for controlling concurrency that are described next.

Inbound Threading queues all incoming events: A pool of engine-managed threads performs the event processing. The application thread that sends an event via any of the sendEvent methods returns without blocking.

Outbound Threading queues events for delivery to listeners and subscribers, such that slow or blocking listeners or subscribers do not block event processing.

Timer Execution Threading means time-based event processing is performed by a pool of engine-managed threads. With this option the internal timer thread (or external timer event) serves only as a metronome, providing units-of-work to the engine-managed threads in the timer execution pool, pushing threading to the level of each statement for time-based execution.

Route Execution Threading means that the thread sending in an event via any of the sendEvent methods (or the inbound threading pooled thread if inbound threading is enabled) only identifies and pre-processes an event, and a pool of engine-managed threads handles the actual processing of the event for each statement, pushing threading to the level of each statement for event-arrival-based execution.

The engine starts engine-managed threads as daemon threads when the engine instance is first obtained. The engine stops engine-managed threads when the engine instance is destroyed via the destroy method. When the engine is initialized via the initialize method the existing engine-managed threads are stopped and new threads are created. When shutting down your application, use the destroy method to stop engine-managed threads.

Note that the options discussed herein may introduce additional processing overhead into your system, as each option involves work queue management and thread context switching.

If your use cases require ordered processing of events or do not tolerate disorder, the threading options described herein may not be the right choice. For enforcing a processing order within a given criteria, your application must enforce such processing order using Java or .NET code. Esper will not enforce order of processing if you enable inbound or route threading. Your application code could, for example, utilize a thread per group of criteria keys, a latch per criteria key, or a queue per criteria key, or use Java's completion service, all depending on your ordering requirements.

If your use cases require loss-less processing of events, wherein the threading options mean that events are held in an in-memory queue, the threading options described herein may not be the right choice.

Care should be taken to consider arrival rates and queue depth. Threading options utilize unbound queues or capacity-bound queues with blocking-put, depending on your configuration, and may therefore introduce an overload or blocking situation to your application. You may use the service provider interface as outlined below to manage queue sizes, if required, and to help tune the engine to your application needs. Consider throttling down the event send rate when the API (see below) indicates that events are getting queued.

All threading options are on the level of an engine. If you require different threading behavior for certain statements then consider using multiple engine instances, consider using the route method or consider using application threads instead.

Please consult Section 16.4.11, “Engine Settings related to Concurrency and Threading” for instructions on how to configure threading options. Threading options take effect at engine initialization time.

There are two modes for an engine to keep track of time: The internal timer based on JVM system time (the default), and externally-controlled time giving your application full control over the concept of time within an engine or isolated service.

An isolated service is an execution environment separate from the main engine runtime, allowing full control over the concept of time for a group of statements, as further described in Section 15.10, “Service Isolation”.

By default the internal timer provides time and evaluates schedules. External clocking can be used to supply time ticks to the engine instead. The latter is useful for testing time-based event sequences or for synchronizing the engine with an external time source.

The internal timer relies on the java.util.concurrent.ScheduledThreadPoolExecutor class for time tick events. The next section describes timer resolution for the internal timer, by default set to 100 milliseconds but is configurable via the threading options. When using externally-controlled time the timer resolution is in your control.

To disable the internal timer and use externally-provided time instead, there are two options. The first option is to use the configuration API at engine initialization time. The second option toggles on and off the internal timer at runtime, via special timer control events that are sent into the engine like any other event.

If using a timer execution thread pool as discussed above, the internal timer or external time event provide the schedule evaluation however do not actually perform the time-based processing. The time-based processing is performed by the threads in the timer execution thread pool.

This code snippet shows the use of the configuration API to disable the internal timer and thereby turn on externally-provided time (see the Configuration section for configuration via XML file):

Configuration config = new Configuration();
config.getEngineDefaults().getThreading().setInternalTimerEnabled(false);
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config);

After disabling the internal timer, it is wise to set a defined time so that any statements created thereafter start relative to the time defined. Use the CurrentTimeEvent class to indicate current time to the engine and to move time forward for the engine (a.k.a application-time model).

This code snippet obtains the current time and sends a timer event in:

long timeInMillis = System.currentTimeMillis();
CurrentTimeEvent timeEvent = new CurrentTimeEvent(timeInMillis);
epService.getEPRuntime().sendEvent(timeEvent);

Alternatively, you can use special timer control events to enable or disable the internal timer. Use the TimerControlEvent class to control timer operation at runtime.

The next code snippet demonstrates toggling to external timer at runtime, by sending in a TimerControlEvent event:

EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
EPRuntime runtime = epService.getEPRuntime();
// switch to external clocking
runtime.sendEvent(new TimerControlEvent(TimerControlEvent.ClockType.CLOCK_EXTERNAL));

Your application sends a CurrentTimeEvent event when it desires to move the time forward. All aspects of Esper engine time related to EPL statements and patterns are driven by the time provided by the CurrentTimeEvent that your application sends in.

The next example sequence of instructions sets time to zero, then creates a statement, then moves time forward to 1 seconds later and then 6 seconds later:

// Set start time at zero.
runtime.sendEvent(new CurrentTimeEvent(0));

// create a statement here
epAdministrator.createEPL("select * from MyEvent output every 5 seconds");

// move time forward 1 second
runtime.sendEvent(new CurrentTimeEvent(1000));

// move time forward 5 seconds
runtime.sendEvent(new CurrentTimeEvent(6000));

When sending external timer events, your application should make sure that long-type time values are ascending. That is, each long-type value should be either the same value or a larger value then the prior value provided by a CurrentTimeEvent.

Your application may use the getNextScheduledTime method in EPRuntime to determine the earliest time a schedule for any statement requires evaluation.

The following code snippet sets the current time, creates a statement and prints the next scheduled time which is 1 minute later then the current time:

// Set start time to the current time.
runtime.sendEvent(new CurrentTimeEvent(System.currentTimeMillis()));

// Create a statement.
epService.getEPAdministrator().createEPL("select * from pattern[timer:interval(1 minute)]");

// Print next schedule time
System.out.println("Next schedule at " + new Date(runtime.getNextScheduledTime());

With CurrentTimeEvent, as described above, your application can advance engine time to a given point in time. In addition, the getNextScheduledTime method in EPRuntime returns the next scheduled time according to started statements. You would typically use CurrentTimeEvent to advance time at a relatively high resolution.

To advance time for a span of time without sending individual CurrentTimeEvent events to the engine, the API provides the class CurrentTimeSpanEvent. You may use CurrentTimeSpanEvent with or without a resolution.

If your application only provides the target end time of time span to CurrentTimeSpanEvent and no resolution, the engine advances time up to the target time by stepping through all relevant times according to started statements.

If your application provides the target end time of time span and in addition a long-typed resolution, the engine advances time up to the target time by incrementing time according to the resolution (regardless of next scheduled time according to started statements).

Consider the following example:

// Set start time to Jan.1, 2010, 00:00 am for this example
SimpleDateFormat format = new SimpleDateFormat("yyyy MM dd HH:mm:ss SSS");
Date startTime = format.parse("2010 01 01 00:00:00 000");
runtime.sendEvent(new CurrentTimeEvent(startTime.getTime()));

// Create a statement.
EPStatement stmt = epService.getEPAdministrator().createEPL("select current_timestamp() as ct " +
  "from pattern[every timer:interval(1 minute)]");
stmt.addListener(...);	// add a listener

// Advance time to 10 minutes after start time
runtime.sendEvent(new CurrentTimeSpanEvent(startTime.getTime() + 10*60*1000));

The above example advances time to 10 minutes after the time set using CurrentTimeSpanEvent. As the example does not pass a resolution, the engine advances time according to statement schedules. Upon sending the CurrentTimeSpanEvent the listener sees 10 invocations for minute 1 to minute 10.

To advance time according to a given resolution, you may provide the resolution as shown below:

// Advance time to 10 minutes after start time at 100 msec resolution
runtime.sendEvent(new CurrentTimeSpanEvent(startTime.getTime() + 10*60*1000, 100));

The minimum resolution that all data windows, patterns and output rate limiting operate at is the millisecond. Parameters to time window views, pattern operators or the output clause that are less then 1 millisecond are not allowed. As stated earlier, the default frequency at which the internal timer operates is 100 milliseconds (configurable).

The internal timer thread, by default, uses the call System.currentTimeMillis() to obtain system time. Please see the JIRA issue ESPER-191 Support nano/microsecond resolution for more information on Java system time-call performance, accuracy and drift.

The internal timer thread can be configured to use nano-second time as returned by System.nanoTime(). If configured for nano-second time, the engine computes an offset of the nano-second ticks to wall clock time upon startup to present back an accurate millisecond wall clock time. Please see section Section 16.4.20, “Engine Settings related to Time Source” to configure the internal timer thread to use System.nanoTime().

The internal timer is based on java.util.concurrent.ScheduledThreadPoolExecutor (java.util.Timer does not support high accuracy VM time).

Your application can achieve a higher tick rate then 1 tick per millisecond by sending external timer events that carry a long-value which is not based on milliseconds since January 1, 1970, 00:00:00 GMT. In this case, your time interval parameters need to take consideration of the changed use of engine time.

Thus, if your external timer events send long values that represents microseconds (1E-6 sec), then your time window interval must be 1000-times larger, i.e. "win:time(1000)" becomes a 1-second time window.

And therefore, if your external timer events send long values that represents nanoseconds (1E-9 sec), then your time window interval must be 1000000-times larger, i.e. "win:time(1000000)" becomes a 1-second time window.

An isolated service allows an application to control event visibility and the concept of time as desired on a statement level: Events sent into an isolated service are visible only to those statements that currently reside in the isolated service and are not visible to statements outside of that isolated service. Within an isolated service an application can control time independently, start time at a point in time and advance time at the resolution and pace suitable for the statements added to that isolated service.

In the default configuration, isolated service is disabled and not available. This is because there is a small overhead associated with this feature. Please review Section 16.4.25.7, “Allow Isolated Service Provider” for the configuration setting.

As discussed before, a single Java Virtual Machine may hold multiple Esper engine instances unique by engine URI. Within an Esper engine instance the default execution environment for statements is the EPRuntime engine runtime, which coordinates all statement's reaction to incoming events and to time passing (via internal or external timer).

Subordinate to an Esper engine instance, your application can additionally allocate multiple isolated services (or execution environments), uniquely identified by a name and represented by the EPServiceProviderIsolated interface. In the isolated service, time passes only when you application sends timer events to the EPRuntimeIsolated instance. Only events explicitly sent to the isolated service are visible to statements added.

Your application can create new statements that start in an isolated service. You can also move existing statements back and forth between the engine and an isolated service.

Isolation does not apply to variables: Variables are global in nature. Also, as named windows are globally visibly data windows, consumers to named windows see changes in named windows even though a consumer or the named window (through the create statement) may be in an isolated service. Tables are also globally visible.

An isolated service allows an application to:

  1. Suspend a statement without loosing its statement state that may have accumulated for the statement.

  2. Control the concept of time separately for a set of statements, for example to simulate, backtest, adjust arrival order or compute arrival time.

  3. Initialize statement state by replaying events, without impacting already running statements, to catch-up statements from historical events for example.

While a statement resides in an isolated runtime it receives only those events explicitly sent to the isolated runtime, and performs time-based processing based on the timer events provided to that isolated runtime.

Use the getEPServiceIsolated method on EPServiceProvider passing a name to obtain an isolated runtime:

EPServiceProviderIsolated isolatedService = epServiceManager.getEPServiceIsolated("name");

Set the start time for your isolated runtime via the CurrentTimeEvent timer event:

// In this example start the time at the system time
long startInMillis = System.currentTimeMillis();	
isolatedService.getEPRuntime().sendEvent(new CurrentTimeEvent(startInMillis));

Use the addStatement method on EPAdministratorIsolated to move an existing statement out of the engine runtime into the isolated runtime:

// look up the existing statement
EPStatement stmt = epServiceManager.getEPAdministrator().getStatement("MyStmt");

// move it to an isolated service
isolatedService.getEPAdministrator().addStatement(stmt);

To remove the statement from isolation and return the statement back to the engine runtime, use the removeStatement method on EPAdministratorIsolated:

isolatedService.getEPAdministrator().removeStatement(stmt);

To create a new statement in the isolated service, use the createEPL method on EPAdministratorIsolated:

isolatedService.getEPAdministrator().createEPL(
  "@Name('MyStmt') select * from Event", null, null); 
// the example is passing the statement name in an annotation and no user object

The destroy method on EPServiceProviderIsolated moves all currently-isolated statements for that isolated service provider back to engine runtime.

When moving a statement between engine runtime and isolated service or back, the algorithm ensures that events are aged according to the time that passed and time schedules stay intact.

To use isolated services, your configuration must have view sharing disabled as described in Section 16.4.13.1, “Sharing View Resources between Statements”.

This example creates a statement in the isolated service, replays some events and advances time, then merges back the statement to the engine to let it participate in incoming events and engine time processing.

First, allocate an isolated service and explicitly set it to a start time. Assuming that myStartTime is a long millisecond time value that marks the beginning of the data to replay, the sequence is as follows:

EPServiceProviderIsolated isolatedService = epServiceManager.getEPServiceIsolated("suspendedStmts");
isolatedService.getEPRuntime().sendEvent(new CurrentTimeEvent(myStartTime));

Next, create the statement. The sample statement is a pattern statement looking for temperature events following each other within 60 seconds:

EPStatement stmt = epAdmin.createEPL(
  "select * from pattern[every a=TemperatureEvent -> b=TemperatureEvent where timer:within(60)]");

For each historical event to be played, advance time and send an event. This code snippet assumes that currentTime is a time greater then myStartTime and reflects the time that the historical event should be processed at. It also assumes historyEvent is the historical event object.

isolatedService.getEPRuntime().sendEvent(new CurrentTimeEvent(currentTime));
isolatedService.getEPRuntime().send(historyEvent);
// repeat the above advancing time until no more events

Finally, when done replaying events, merge the statement back with the engine:

isolatedService.getEPAdministrator().removeStatement(stmt);

You may register one or more exception handlers for the engine to invoke in the case it encounters an exception processing a continuously-executing statement. By default and without exception handlers the engine cancels execution of the current EPL statement that encountered the exception, logs the exception and continues to the next statement, if any. The configuration is described in Section 16.4.26, “Engine Settings related to Exception Handling”.

If your application registers exception handlers as part of engine configuration, the engine invokes the exception handlers in the order they are registered passing relevant exception information such as EPL statement name, expression and the exception itself.

Exception handlers receive any EPL statement unchecked exception such as internal exceptions or exceptions thrown by plug-in aggregation functions or plug-in views. The engine does not provide to exception handlers any exceptions thrown by static method invocations for function calls, method invocations in joins, methods on event classes and listeners or subscriber exceptions.

An exception handler can itself throw a runtime exception to cancel execution of the current event against any further statements.

For on-demand queries the API indicates any exception directly back to the caller without the exception handlers being invoked, as exception handlers apply to continuous queries only. The same applies to any API calls other than sendEvent and the EventSender methods.

As the configuration section describes, your application registers one or more classes that implement the ExceptionHandlerFactory interface in the engine configuration. Upon engine initialization the engine obtains a factory instance from the class name that then provides the exception handler instance. The exception handler class must implement the ExceptionHandler interface.

You may register one or more condition handlers for the engine to invoke in the case it encounters certain conditions, as outlined below, when executing a statement. By default and without condition handlers the engine logs the condition at informational level and continues processing. The configuration is described in Section 16.4.27, “Engine Settings related to Condition Handling”.

If your application registers condition handlers as part of engine configuration, the engine invokes the condition handlers in the order they are registered passing relevant condition information such as EPL statement name, expression and the condition information itself.

Currently the only conditions indicated by this facility are raised by the pattern followed-by operator, see Section 7.5.8.1, “Limiting Sub-Expression Count” and see Section 7.5.8.2, “Limiting Engine-wide Sub-Expression Count”.

A condition handler may not itself throw a runtime exception or return any value.

As the configuration section describes, your application registers one or more classes that implement the ConditionHandlerFactory interface in the engine configuration. Upon engine initialization the engine obtains a factory instance from the class name that then provides the condition handler instance. The condition handler class must implement the ConditionHandler interface.

The statement object model is a set of classes that provide an object-oriented representation of an EPL or pattern statement. The object model classes are found in package com.espertech.esper.client.soda. An instance of EPStatementObjectModel represents a statement's object model.

The statement object model classes are a full and complete specification of a statement. All EPL and pattern constructs including expressions and sub-queries are available via the statement object model.

In conjunction with the administrative API, the statement object model provides the means to build, change or interrogate statements beyond the EPL or pattern syntax string representation. The object graph of the statement object model is fully navigable for easy querying by code, and is also serializable allowing applications to persist or transport statements in object form, when required.

The statement object model supports full round-trip from object model to EPL statement string and back to object model: A statement object model can be rendered into an EPL string representation via the toEPL method on EPStatementObjectModel. Further, the administrative API allows to compile a statement string into an object model representation via the compileEPL method on EPAdministrator.

The statement object model is fully mutable. Mutating a any list such as returned by getChildren(), for example, is acceptable and supported.

The create method on EPAdministrator creates and starts a statement as represented by an object model. In order to obtain an object model from an existing statement, obtain the statement expression text of the statement via the getText method on EPStatement and use the compileEPL method to obtain the object model.

The following limitations apply:

  • Statement object model classes are not safe for sharing between threads other than for read access.

  • Between versions of Esper, the serialized form of the object model is subject to change. Esper makes no guarantees that the serialized object model of one version will be fully compatible with the serialized object model generated by another version of Esper. Please consider this issue when storing Esper object models in persistent store.

A EPStatementObjectModel consists of an object graph representing all possible clauses that can be part of an EPL statement.

Among all clauses, the SelectClause and FromClause objects are required clauses that must be present, in order to define what to select and where to select from.


Part of the statement object model package are convenient builder classes that make it easy to build a new object model or change an existing object model. The SelectClause and FromClause are such builder classes and provide convenient create methods.

Within the from-clause we have a choice of different streams to select on. The FilterStream class represents a stream that is filled by events of a certain type and that pass an optional filter expression.

We can use the classes introduced above to create a simple statement object model:

EPStatementObjectModel model = new EPStatementObjectModel();
model.setSelectClause(SelectClause.createWildcard());
model.setFromClause(FromClause.create(FilterStream.create("com.chipmaker.ReadyEvent")));

The model as above is equivalent to the EPL :

select * from com.chipmaker.ReadyEvent

Last, the code snippet below creates a statement from the object model:

EPStatement stmt = epService.getEPAdministrator().create(model);

Notes on usage:

  • Variable names can simply be treated as property names.

  • When selecting from named windows or tables, the name of the named window or table is the event type name for use in FilterStream instances or patterns.

  • To compile an arbitrary sub-expression text into an Expression object representation, simply add the expression text to a where clause, compile the EPL string into an object model via the compileEPL on EPAdministrator, and obtain the compiled where from the EPStatementObjectModel via the getWhereClause method.

This sample statement creates a named window:

create window OrdersTimeWindow.win:time(30 sec) as select symbol as sym, volume as vol, price from OrderEvent

The is the code that builds the create-window statement as above:

EPStatementObjectModel model = new EPStatementObjectModel();
model.setCreateWindow(CreateWindowClause.create("OrdersTimeWindow").addView("win", "time", 30));
model.setSelectClause(SelectClause.create()
		.addWithName("symbol", "sym")
		.addWithName("volume", "vol")
		.add("price"));
model.setFromClause(FromClause.create(FilterStream.create("OrderEvent)));

A second statement deletes from the named window:

on NewOrderEvent as myNewOrders
delete from OrdersNamedWindow as myNamedWindow
where myNamedWindow.symbol = myNewOrders.symbol

The object model is built by:

EPStatementObjectModel model = new EPStatementObjectModel();
model.setOnExpr(OnClause.createOnDelete("OrdersNamedWindow", "myNamedWindow"));
model.setFromClause(FromClause.create(FilterStream.create("NewOrderEvent", "myNewOrders")));
model.setWhereClause(Expressions.eqProperty("myNamedWindow.symbol", "myNewOrders.symbol"));
EPStatement stmtOnDelete = epService.getEPAdministrator().create(model);

A third statement selects from the named window using the non-continuous on-demand selection via on-select:

on QueryEvent(volume>0) as query
select count(*) from OrdersNamedWindow as win
where win.symbol = query.symbol

The on-select statement is built from scratch via the object model as follows:

EPStatementObjectModel model = new EPStatementObjectModel();
model.setOnExpr(OnClause.createOnSelect("OrdersNamedWindow", "win"));
model.setWhereClause(Expressions.eqProperty("win.symbol", "query.symbol"));
model.setFromClause(FromClause.create(FilterStream.create("QueryEvent", "query", 
  Expressions.gt("volume", 0))));
model.setSelectClause(SelectClause.create().add(Expressions.countStar()));
EPStatement stmtOnSelect = epService.getEPAdministrator().create(model);

The prepare method that is part of the administrative API pre-compiles an EPL statement and stores the precompiled statement in an EPPreparedStatement object. This object can then be used to efficiently start the parameterized statement multiple times.

You can insert substitution parameters as a single question mark character '?', making the substitution parameter addressable by index.

You can also insert substitution parameters using the following syntax, which makes the substitution parameter addressable by name:

?:name

All substitution parameters must either be unnamed (just '?') or named ('?:name'). It is not possible to mix the two styles.

If not assigning a name to substitution parameters, the engine assigns the first substitution parameter an index of 1 and subsequent parameters increment the index by one.

If assigning a name to each substitution parameter, the name can include slash (/) characters and can occur multiple times.

Substitution parameters can be inserted into any EPL construct that takes an expression. They are therefore valid in any clauses such as the select-clause, from-clause filters, where-clause, group-by-clause, having-clause or order-by-clause, including view parameters and pattern observers and guards. Substitution parameters cannot be used where a numeric constant is required rather than an expression and in SQL statements.

All substitution parameters must be replaced by actual values before a statement with substitution parameters can be started. Substitution parameters can be set to new values and new statements can be created from the same EPPreparedStatement object more than once.

If not assigning a name to substitution parameters, replace the substitution parameter with an actual value using the setObject(int index, Object value) method for each index, starting from 1.

If assigning a name to each substitution parameter, replace the substitution parameter with an actual value using the setObject(String name, Object value) method for each name.

While the setObject method allows substitution parameters to assume any actual value including application Java objects or enumeration values, the application must provide the correct type of substitution parameter that matches the requirements of the expression the parameter resides in.

In the following example of setting parameters on a prepared statement and starting the prepared statement, epService represents an engine instance:

String stmt = "select * from com.chipmaker.ReadyEvent(line=?)";
EPPreparedStatement prepared = epService.getEPAdministrator().prepareEPL(stmt);
prepared.setObject(1, 8);
EPStatement statement = epService.getEPAdministrator().create(prepared);

The next example names the substitution parameter:

String stmt = "select * from ReadyEvent(line=?:lines/line1)";
EPPreparedStatement prepared = epService.getEPAdministrator().prepareEPL(stmt);
prepared.setObject("lines/line1", 1);

The engine can report key processing metrics through the JMX platform mbean server by setting a single configuration flag described in Section 16.4.21, “Engine Settings related to JMX Metrics”. For additional detailed reporting and metrics events, please read on.

Metrics reporting is a feature that allows an application to receive ongoing reports about key engine-level and statement-level metrics. Examples are the number of incoming events, the CPU time and wall time taken by statement executions or the number of output events per statement.

Metrics reporting is, by default, disabled. To enable reporting, please follow the steps as outlined in Section 16.4.22, “Engine Settings related to Metrics Reporting”. Metrics reporting must be enabled at engine initialization time. Reporting intervals can be controlled at runtime via the ConfigurationOperations interface available from the administrative API.

Your application can receive metrics at configurable intervals via EPL statement. A metric datapoint is simply a well-defined event. The events are EngineMetric and StatementMetric and the Java class representing the events can be found in the client API in package com.espertech.esper.client.metric.

Since metric events are processed by the engine the same as application events, your EPL may use any construct on such events. For example, your application may select, filter, aggregate properties, sort or insert into a stream, named window or table all metric events the same as application events.

This example statement selects all engine metric events:

select * from com.espertech.esper.client.metric.EngineMetric

The next statement selects all statement metric events:

select * from com.espertech.esper.client.metric.StatementMetric

Make sure to have metrics reporting enabled since only then do listeners or subscribers to a statement such as above receive metric events.

The engine provides metric events after the configured interval of time has passed. By default, only started statements that have activity within an interval (in the form of event or timer processing) are reported upon.

The default configuration performs the publishing of metric events in an Esper daemon thread under the control of the engine instance. Metrics reporting honors externally-supplied time, if using external timer events.

Via runtime configuration options provided by ConfigurationOperations, your application may enable and disable metrics reporting globally, provided that metrics reporting was enabled at initialization time. Your application may also enable and disable metrics reporting for individual statements by statement name.

Statement groups is a configuration feature that allows to assign reporting intervals to statements. Statement groups are described further in the Section 16.4.22, “Engine Settings related to Metrics Reporting” section. Statement groups cannot be added or removed at runtime.

The following limitations apply:

  • If your Java VM version does not report current thread CPU time (most JVM do), then CPU time is reported as zero (use ManagementFactory.getThreadMXBean().isCurrentThreadCpuTimeSupported() to determine if your JVM supports this feature).

    Note: In some JVM the accuracy of CPU time returned is very low (in the order of 10 milliseconds off) which can impact the usefulness of CPU metrics returned. Consider measuring CPU time in your application thread after sending a number of events in the same thread, external to the engine as an alternative.

  • Your Java VM may not provide high resolution time via System.nanoTime. In such case wall time may be inaccurate and inprecise.

  • CPU time and wall time have nanosecond precision but not necessarily nanosecond accuracy, please check with your Java VM provider.

  • There is a performance cost to collecting and reporting metrics.

  • Not all statements may report metrics: The engine performs certain runtime optimizations sharing resources between similar statements, thereby not reporting on certain statements unless resource sharing is disabled through configuration.

Your application may use the built-in XML and JSON formatters to render output events into a readable textual format, such as for integration or debugging purposes. This section introduces the utility classes in the client util package for rendering events to strings. Further API information can be found in the JavaDocs.

The EventRenderer interface accessible from the runtime interface via the getEventRenderer method provides access to JSON and XML rendering. For repeated rendering of events of the same event type or subtypes, it is recommended to obtain a JSONEventRenderer or XMLEventRenderer instance and use the render method provided by the interface. This allows the renderer implementations to cache event type metadata for fast rendering.

In this example we show how one may obtain a renderer for repeated rendering of events of the same type, assuming that statement is an instance of EPStatement:

JSONEventRenderer jsonRenderer = epService.getEPRuntime().
    getEventRenderer().getJSONRenderer(statement.getEventType());

Assuming that event is an instance of EventBean, this code snippet renders an event into the JSON format:

String jsonEventText = jsonRenderer.render("MyEvent", event);

The XML renderer works the same:

XMLEventRenderer xmlRenderer = epService.getEPRuntime().
    getEventRenderer().getXMLRenderer(statement.getEventType());

...and...

String xmlEventText = xmlRenderer.render("MyEvent", event);

If the event type is not known in advance or if you application does not want to obtain a renderer instance per event type for fast rendering, your application can use one of the following methods to render an event to a XML or JSON textual format:

String json = epService.getEPRuntime().getEventRenderer().renderJSON(event);
String xml = epService.getEPRuntime().getEventRenderer().renderXML(event);

Use the JSONRenderingOptions or XMLRenderingOptions classes to control how events are rendered. To render specific event properties using a custom event property renderer, specify an EventPropertyRenderer as part of the options that renders event property values to strings. Please see the JavaDoc documentation for more information.

A plug-in loader is for general use with input adapters, output adapters or EPL code deployment or any other task that can benefits from being part of an Esper configuration file and that follows engine lifecycle.

A plug-in loader implements the com.espertech.esper.plugin.PluginLoader interface and can be listed in the configuration.

Each configured plug-in loader follows the engine instance lifecycle: When an engine instance initializes, it instantiates each PluginLoader implementation class listed in the configuration. The engine then invokes the lifecycle methods of the PluginLoader implementation class before and after the engine is fully initialized and before an engine instance is destroyed.

Declare a plug-in loader in your configuration XML as follows:

...
  <plugin-loader name="MyLoader" class-name="org.mypackage.MyLoader">
    <init-arg name="property1" value="val1"/>
  </plugin-loader>
...

Alternatively, add the plug-in loader via the configuration API:

Configuration config = new Configuration();
Properties props = new Properties();
props.put("property1", "value1");
config.addPluginLoader("MyLoader", "org.mypackage.MyLoader", props);

Implement the init method of your PluginLoader implementation to receive initialization parameters. The engine invokes this method before the engine is fully initialized, therefore your implementation should not yet rely on the engine instance within the method body:

public class MyPluginLoader implements PluginLoader {
  public void init(String loaderName, Properties properties, EPServiceProviderSPI epService) {
     // save the configuration for later, perform checking
  }
  ...

The engine calls the postInitialize method once the engine completed initialization and to indicate the engine is ready for traffic.

public void postInitialize() {
  // Start the actual interaction with external feeds or the engine here
}
...

The engine calls the destroy method once the engine is destroyed or initialized for a second time.

public void destroy() {
  // Destroy resources allocated as the engine instance is being destroyed
}

As discussed in Section 5.2.7, “Annotation” an EPL annotation is an addition made to information in an EPL statement. The API and examples to interrogate annotations are described here.

You may use the getAnnotations method of EPStatement to obtain annotations specified for an EPL statement. Or when compiling an EPL expression to a EPStatementObjectModel statement object model you may also query, change or add annotations.

The following example code demonstrates iterating over an EPStatement statement's annotations and retrieving values:

String exampleEPL = "@Tag(name='direct-output', value='sink 1') select * from RootEvent";
EPStatement stmt = epService.getEPAdministrator().createEPL(exampleEPL);
for (Annotation annotation : stmt.getAnnotations()) {
  if (annotation instanceof Tag) {
    Tag tag = (Tag) annotation;
    System.out.println("Tag name " + tag.name() + " value " + tag.value());
  }
}

The output of the sample code shown above is Tag name direct-output value sink 1.

This chapter discusses how to select context partitions. Contexts are discussed in Chapter 4, Context and Context Partitions and the reasons for context partition selection are introduced in Section 4.9, “Operations on Specific Context Partitions”.

The section is only relevant when you declare a context. It applies to all different types of hash, partitioned, category, overlapping or other temporal contexts. The section uses a category context for the purpose of illustration. The API discussed herein is general and handles all different types of contexts including nested contexts.

Consider a category context that separates bank transactions into small, medium and large:

// declare category context
create context TxnCategoryContext 
  group by amount < 100 as small, 
  group by amount between 100 and 1000 as medium, 
  group by amount > 1000 as large from BankTxn
// retain 1 minute of events of each category separately
context TxnCategoryContext select * from BankTxn.win:time(1 minute)

In order for your application to iterate one or more specific categories it is necessary to identify which category, i.e. which context partition, to iterate. Similarly for on-demand queries, to execute on-demand queries against one or more specific categories, it is necessary to identify which context partition to execute the on-demand query against.

Your application may iterate one or more specific context partitions using either the iterate or safeIterate method of EPStatement by providing an implementation of the ContextPartitionSelector interface.

For example, assume your application must obtain all bank transactions for small amounts. It may use the API to identify the category and iterate the associated context partition:

ContextPartitionSelectorCategory categorySmall = new ContextPartitionSelectorCategory() {
    public Set<String> getLabels() {
      return Collections.singleton("small");
    }
  };
Iterator<EventBean> it = stmt.iterator(categorySmall);

Your application may execute on-demand queries against one or more specific context partitions by using the executeQuery method on EPRuntime or the execute method on EPOnDemandPreparedQuery and by providing an implementation of ContextPartitionSelector.

On-demand queries execute against named windows and tables, therefore below EPL statement creates a named window which the engine manages separately for small, medium and large transactions according to the context declared earlier:

// Named window per category
context TxnCategoryContext create window BankTxnWindow.win:time(1 min) as BankTxn

The following code demonstrates how to fire an on-demand query against the small and the medium category:

ContextPartitionSelectorCategory categorySmallMed = new ContextPartitionSelectorCategory() {
    public Set<String> getLabels() {
      return new HashSet<String>(Arrays.asList("small", "medium"));
    }
  };
epService.getEPRuntime().executeQuery(
   "select count(*) from BankTxnWindow", 
   new ContextPartitionSelector[] {categorySmallMed});

The following limitations apply:

  • On-demand queries may not join named windows or tables that declare a context.

This section summarizes the selector interfaces that are available for use to identify and interrogate context partitions. Please refer to the JavaDoc documentation for package com.espertech.esper.client.context and classes therein for additional information.

Use an implementation of ContextPartitionSelectorAll or the ContextPartitionSelectorAll.INSTANCE object to instruct the engine to consider all context partitions.

Use an implementation of ContextPartitionSelectorById if your application knows the context partition ids to query. This selector instructs the engine to consider only those provided context partitions based on their integer id value. The engine outputs the context partition id in the built-in property context.id.

Use an implementation of ContextPartitionSelectorFiltered to receive and interrogate context partitions. Use the filter method that receives a ContextPartitionIdentifier to return a boolean indicator whether to include the context partition or not. The ContextPartitionIdentifier provides information about each context partition. Your application may not retain ContextPartitionIdentifier instances between filter method invocations as the engine reuses the same instance. This selector is not supported with nested contexts.

Use an implementation of ContextPartitionSelectorCategory with category contexts.

Use an implementation of ContextPartitionSelectorSegmented with keyed segmented contexts.

Use an implementation of ContextPartitionSelectorHash with hash segmented contexts.

Use an implementation of ContextPartitionSelectorNested in combination with the selectors described above with nested contexts.

This chapter briefly discusses the API to manage context partitions. Contexts are discussed in Chapter 4, Context and Context Partitions.

The section is only relevant when you declare a context. It applies to all different types of hash, partitioned, category, overlapping or other temporal contexts.

The administrative API for context partitions is EPContextPartitionAdmin. Use the getContextPartitionAdmin method of the EPAdministrator interface to obtain said service.

The context partition admin API allows an application to:

  • Start, stop and destroy individual context partitions.

  • Interrogate the state and identifiers for existing context partitions.

  • Determine statements associated to a context and context nesting level.

Stopping individual context partitions is useful to drop state, free memory and suspend a given context partition without stopping or destroying any associated statements. For example, assume a keyed segmented context per user id. To suspend and free the memory for a given user id your application can stop the user id's context partition. The engine does not allocate a context partition for this user id again, until your application destroys or starts the context partition.

Destroying individual context partitions is useful to drop state, free memory and deregister the given context partition without stopping or destroying any associated statements. For example, assume a keyed segmented context per user id. To deregister and free the memory for a given user id your application can destroy the user id's context partition. The engine can allocate a fresh context partition for this user id when events for this user id arrive.

Please see the JavaDoc documentation for more information.

Esper offers a listener and an assertions class to facilitate automated testing of EPL rules, for example when using a test framework such as JUnit or TestNG.

Esper does not require any specific test framework. If your application has the JUnit test framework in classpath Esper uses junit.framework.AssertionFailedError to indicate assertion errors, so as to integrate with continuous integration tools.

For detailed method-level information, please consult the JavaDoc of the package com.espertech.esper.client.scopetest.

The class com.espertech.esper.client.scopetest.EPAssertionUtil provides methods to assert or compare event property values as well as perform various array arthithmatic, sort events and convert events or iterators to arrays.

The class com.espertech.esper.client.scopetest.SupportUpdateListener provides an UpdateListener implementation that collects events and returns event data for assertion.

The class com.espertech.esper.client.scopetest.SupportSubscriber provides a subscriber implementation that collects events and returns event data for assertion. The SupportSubscriberMRD is a subscriber that accepts events multi-row delivery. The SupportSubscriber and SupportSubscriberMRD work similar to SupportUpdateListener that is introduced in more detail below.