www.espertech.comDocumentation
Esper has the following primary interfaces:
The event and event type interfaces are described in Section 14.6, “Event and Event Type”.
The administrative interface to create and manage EPL and pattern statements, and set runtime configurations, is described in Section 14.3, “The Administrative Interface”.
The runtime interface to send events into the engine, set and get variable values and execute on-demand queries, is described in Section 14.4, “The Runtime Interface”.
For EPL introductory information please see Section 5.1, “EPL Introduction” and patterns are described at Section 7.1, “Event Pattern Overview”.
The JavaDoc documentation is also a great source for API information.
The EPServiceProvider
interface represents an engine instance. Each instance of an Esper engine is completely independent of other engine instances and has its own administrative and runtime interface.
An instance of the Esper engine is obtained via static methods on the EPServiceProviderManager
class.
The getDefaultProvider
method and the getProvider(String providerURI)
methods return an instance of the Esper engine.
The latter can be used to obtain multiple instances of the engine for different provider URI values. The EPServiceProviderManager
determines if the provider URI matches all prior provider URI values and returns the same engine instance for the same provider URI value. If the provider URI has not been seen before, it creates a new engine instance.
The code snipped below gets the default instance Esper engine. Subsequent calls to get the default engine instance return the same instance.
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
This code snippet gets an Esper engine for the provider URI RFIDProcessor1
. Subsequent calls to get an engine with the same provider URI return the same instance.
EPServiceProvider epService = EPServiceProviderManager.getProvider("RFIDProcessor1");
Since the getProvider
methods return the same cached engine instance for each URI, there is no need to statically cache an engine instance in your application.
An existing Esper engine instance can be reset via the initialize
method on the EPServiceProvider
instance. This operation stops and removes all statements and
resets the engine to the configuration provided when the engine instance for that URI was obtained. If no configuration is provided, an empty (default) configuration applies.
After initialize
your application must obtain new administrative and runtime services. Any administrative and runtime services obtained before the initialize are invalid and have undefined behavior.
The next code snippet outlines a typical sequence of use:
// Configure the engine, this is optional Configuration config = new Configuration(); config.configure("configuration.xml"); // load a configuration from file config.set....(...); // make additional configuration settings // Obtain an engine instance EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config); // Optionally, use initialize if the same engine instance has been used before to start clean epService.initialize(); // Optionally, make runtime configuration changes epService.getEPAdministrator().getConfiguration().add...(...); // Destroy the engine instance when no longer needed, frees up resources epService.destroy();
An existing Esper engine instance can be destroyed via the destroy
method on the EPServiceProvider
instance. This stops and removes all statements
as well as frees all resources held by the instance. After a destroy
the engine can no longer be used.
The EPServiceStateListener
interface may be implemented by your application to receive callbacks when an engine instance is about to be destroyed and after an engine instance has been initialized. Listeners are registered via the addServiceStateListener
method. The EPStatementStateListener
interface is used to receive callbacks when a new statement gets created and when a statement gets started, stopped or destroyed. Listeners are registered via the addStatementStateListener
method.
When destroying an engine instance your application must make sure that threads that are sending events into the engine have completed their work. More generally, the engine should not be currently in use during or after the destroy operation.
As engine instances are completely independent, your application may not send EventBean
instances obtained from one engine instance into a second engine instance since the event type space between two engine instances is not shared.
Create event pattern expression and EPL statements via the administrative interface EPAdministrator
.
For managing one or more related statements as a module, please consider the deployment administrative API and EPL modules as further described in Section 16.4, “Packaging and Deploying Overview”.
This code snippet gets an Esper engine then creates an event pattern and an EPL statement.
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPAdministrator admin = epService.getEPAdministrator(); EPStatement 10secRecurTrigger = admin.createPattern( "every timer:at(*, *, *, *, *, */10)"); EPStatement countStmt = admin.createEPL( "select count(*) from MarketDataBean#time(60 sec)");
Note that event pattern expressions can also occur within EPL statements. This is outlined in more detail in Section 5.4.2, “Pattern-Based Event Streams”.
The create
methods on EPAdministrator
are overloaded and allow an optional statement name to be passed to the engine. A statement name can be useful for retrieving a statement
by name from the engine at a later time. The engine assigns a statement name if no statement name is supplied on statement creation.
The createPattern
and createEPL
methods return EPStatement
instances. Statements are automatically started and active when created. A statement can also be stopped and started again via the stop
and start
methods shown in the code snippet below.
countStmt.stop(); countStmt.start();
The create
methods on EPAdministrator
also accept a user object.
The user object is associated with a statement at time of statement creation and is a single, unnamed field that is stored with every statement.
Applications may put arbitrary objects in this field. Use the getUserObject
method on EPStatement
to obtain the user object of a statement
and StatementAwareUpdateListener
for listeners.
Your application may create new statements or stop and destroy existing statements using any thread and also within listener or subscriber code. If using POJO events, your application may not create or manage statements in the event object itself while the same event is currently being processed by a statement.
For NEsper .NET also see Section I.14, “.NET API - Receiving Statement Results”.
Esper provides three choices for your application to receive statement results. Your application can use all three mechanisms alone or in any combination for each statement. The choices are:
Table 14.1. Choices For Receiving Statement Results
Name | Methods on EPStatement | Description |
---|---|---|
Listener Callbacks | addListener and removeListener |
Your application provides implementations of the The engine continuously indicates results to all listeners as soon they occur, and following output rate limiting clauses if specified. |
Subscriber Object | setSubscriber |
Your application provides a POJO (plain Java object) that exposes methods to receive statement results.
The name of the method that a subscriber object provides to receive results is The engine continuously indicates results to the single subscriber as soon they occur, and following output rate limiting clauses if specified.
This is the fastest method to receive statement results, as the engine delivers strongly-typed results directly to your application objects without the need for
building an There can be at most 1 Subscriber Object registered per statement. If you require more than one listener, use the Listener Callback instead (or in addition). The Subscriber Object is bound to the statement with a strongly typed support which ensure direct delivery of new events without type conversion. This optimization is made possible because there can only be 0 or 1 Subscriber Object per statement. |
Pull API | safeIterator and iterator |
Your application asks the statement for results and receives a set of events via This is useful if your application does not need continuous indication of new results in real-time. |
The engine calls application-provided update listeners and subscribers for output. These commonly encapsulate the actions to take when there is output. This design decouples EPL statements from actions and places actions outside of EPL. It allows actions to change independently from statements: A statement does not need to be updated when its associated action(s) change.
While action-taking, in respect to the code or script taking the action, is not a part of the EPL language, here are a few noteworthy points.
Through the use of EPL annotations you can attach information to EPL that can be used by applications to flexibly determine actions.
The convenient StatementAwareUpdateListener
interface is a listener that receives the statement itself and subscribers can accept EPStatement
as a parameter.
The insert into
-clause can be used to send results into a further stream and input and output adapters or data flows can exist to process output events from that stream.
Also the data flow EPStatementSource
operator can be used to hook up actions declaratively.
The EPStatementStateListener
can inform your application of new statements coming online.
Your application may attach one or more listeners, zero or one single subscriber and in addition use the Pull API on the same statement. There are no limitations to the use of iterator, subscriber or listener alone or in combination to receive statement results.
The best delivery performance can generally be achieved by attaching a subscriber and by not attaching listeners. The engine is aware of the listeners and subscriber attached to a statement. The engine uses this information internally to reduce statement overhead. For example, if your statement does not have listeners or a subscriber attached, the engine does not need to continuously generate results for delivery.
If your application attaches both a subscriber and one or more listeners then the subscriber receives the result first before any of the listeners.
If your application attaches more than one listener then the UpdateListener
listeners receive results first in the order they were added to the statement, and
StatementAwareUpdateListener
listeners receive results next in the order they were added to the statement. To change the order of delivery among listeners your application can add and remove listeners at runtime.
If you have configured outbound threading, it means a thread from the outbound thread pool delivers results to the subscriber and listeners instead of the processing or event-sending thread.
If outbound threading is turned on, we recommend turning off the engine setting preserving the order of events delivered to listeners as described in Section 15.4.13.1, “Preserving the Order of Events Delivered to Listeners”. If outbound threading is turned on statement execution is not blocked for the configured time in the case a subscriber or listener takes too much time.
A subscriber object is a direct binding of query results to a Java object. The object, a POJO, receives statement results via method invocation. The subscriber class does not need to implement an interface or extend a superclass. Only one subscriber object may be set for a statement.
Subscriber objects have several advantages over listeners. First, they offer a substantial performance benefit: Query results are delivered directly to your method(s) through Java virtual machine method calls, and there is no intermediate representation (EventBean
). Second, as subscribers receive strongly-typed parameters, the subscriber code tends to be simpler.
This chapter describes the requirements towards the methods provided by your subscriber class.
The engine can deliver results to your subscriber in two ways:
Each evert in the insert stream results in a method invocation, and each event in the remove stream results in further method invocations. This is termed row-by-row delivery.
A single method invocation that delivers all rows of the insert and remove stream. This is termed multi-row delivery.
In the case that your subscriber object wishes to receive the EPStatement
instance along with output data,
please add EPStatement
as the very first parameter of any of the delivery method footprints that are discussed next.
For example, your statement may be:
select count(*) from OrderEvent
Your subscriber class exposes the method:
public void update(EPStatement statement, long currentCount) {...}
Your subscriber class must provide a method by name update
to receive insert stream events row-by-row. The number and types of parameters declared by the update
method must match the number and types of columns as specified in the select
clause, in the same order as in the select
clause.
For example, if your statement is:
select orderId, price, count(*) from OrderEvent
Then your subscriber update
method looks as follows:
public class MySubscriber { ... public void update(String orderId, double price, long count) {...} ... }
Each method parameter declared by the update
method must be assignable from the respective column type as listed in the select
-clause, in the order selected. The assignability rules are:
Widening of types follows Java standards. For example, if your select
clause selects an integer value, the method parameter for the same column can be typed int, long, float or double (or any equivalent boxed type).
Auto-boxing and unboxing follows Java standards. For example, if your select
clause selects an java.lang.Integer
value, the method parameter for the same column can be typed int
. Note that if your select
clause column may generate null
values, an exception may occur at runtime unboxing the null
value.
Interfaces and super-classes are honored in the test for assignability. Therefore java.lang.Object
can be used to accept any select
clause column type
In the case that your subscriber class offers multiple update
method footprints, the engine selects the closest-matching footprint by comparing the output types and method parameter types. The engine prefers the update method that is an exact match of types, followed by an update method that requires boxing or unboxing, followed by an update method that requires widening and finally any other allowable update method.
Within the above criteria, in the case that your subscriber class offers multiple update
method footprints with same method parameter types, the engine prefers the update method that has EPStatement
as the first parameter.
If your select
clause contains one or more wildcards (*), then the equivalent parameter type is the underlying event type of the stream selected from.
For example, your statement may be:
select *, count(*) from OrderEvent
Then your subscriber update
method looks as follows:
public void update(OrderEvent orderEvent, long count) {...}
In a join, the wildcard expands to the underlying event type of each stream in the join in the order the streams occur in the from
clause. An example statement for a join is:
select *, count(*) from OrderEvent order, OrderHistory hist
Then your subscriber update
method should be:
public void update(OrderEvent orderEvent, OrderHistory orderHistory, long count) {...}
The stream wildcard syntax and the stream name itself can also be used:
select hist.*, order from OrderEvent order, OrderHistory hist
The matching update
method is:
public void update(OrderHistory orderHistory, OrderEvent orderEvent) {...}
Alternatively, your update
method may simply choose to accept java.util.Map
as a representation for each row. Each column in the select
clause is
then made an entry in the resulting Map
. The Map
keys are the column name if supplied, or the expression string itself for columns without a name.
The update
method for Map
delivery is:
public void update(Map row) {...}
The engine also supports delivery of select
clause columns as an object array. Each item in the object array represents a column in the select
clause. The update
method then looks as follows:
public void update(Object[] row) {...}
Your subscriber receives remove stream events if it provides a method named updateRStream
. The method must accept the same number and types of parameters as the update
method (including EPStatement
if present).
An example statement:
select orderId, count(*) from OrderEvent#time(20 sec) group by orderId
Then your subscriber update
and updateRStream
methods should be:
public void update(String, long count) {...} public void updateRStream(String orderId, long count) {...}
If your subscriber requires a notification for begin and end of event delivery, it can expose methods by name updateStart
and updateEnd
.
The updateStart
method must take two integer parameters that indicate the number of events of the insert stream and remove stream to be delivered. The engine invokes the updateStart
method immediately prior to delivering events to the update
and updateRStream
methods.
The updateEnd
method must take no parameters. The engine invokes the updateEnd
method immediately after delivering events to the update
and updateRStream
methods.
An example set of delivery methods:
// Called by the engine before delivering events to update methods public void updateStart(int insertStreamLength, int removeStreamLength) // To deliver insert stream events public void update(String orderId, long count) {...} // To deliver remove stream events public void updateRStream(String orderId, long count) {...} // Called by the engine after delivering events public void updateEnd() {...}
In place of row-by-row delivery, your subscriber can receive all events in the insert and remove stream via a single method invocation. This is applicable when an EPL delivers multiple output rows for a given input event or time advancing, for example when multiple pattern matches occur for the same incoming event, for a join producing multiple output rows or with output rate limiting, for example.
The event delivery follow the scheme as described earlier in Section 14.3.3.2.2, “Row Delivery as Map and Object Array ”. The subscriber class must provide one of the following methods:
Table 14.2. Update Method for Multi-Row Delivery of Underlying Events
Method | Description |
---|---|
update(Object[][] insertStream, Object[][] removeStream) |
The first dimension of each Object array is the event row, and the second dimension is the column matching the column order of the statement |
update(Map[] insertStream, Map[] removeStream) |
Each map represents one event, and Map entries represent columns of the statement |
If your select
clause contains a single wildcard (*) or wildcard stream selector, the subscriber object may also directly receive arrays of the underlying events. In this case, the subscriber class should provide a method update(
Underlying[] insertStream,
Underlying[] removeStream)
, such that Underlying represents the class of the underlying event.
For example, your statement may be:
select * from OrderEvent#time(30 sec)
Your subscriber class exposes the method:
public void update(OrderEvent[] insertStream, OrderEvent[] removeStream) {...}
In the case that your subscriber object wishes to receive no data from a statement please follow the instructions here.
You EPL statement must select a single null
value.
For example, your statement may be:
select null from OrderEvent(price > 100)
Your subscriber class exposes the method:
public void update() {...}
For NEsper .NET also see Section I.15, “.NET API - Adding Listeners”.
Your application can subscribe to updates posted by a statement via the addListener
and removeListener
methods on EPStatement
. Your application must to provide an implementation of the UpdateListener
or the StatementAwareUpdateListener
interface to the statement:
UpdateListener myListener = new MyUpdateListener(); countStmt.addListener(myListener);
EPL statements and event patterns publish old data and new data to registered UpdateListener
listeners.
New data published by statements is the events representing the new values of derived data held by the statement.
Old data published by statements constists of the events representing the prior values of derived data held by the statement.
UpdateListener
listeners receive multiple result rows in one invocation by the engine: the new data and old data parameters to your listener are array parameters. For example, if your application uses one of the batch data windows, or your application creates a pattern that matches multiple times when a single event arrives, then the engine indicates such multiple result rows in one invocation and your new data array carries two or more rows.
A second listener interface is the StatementAwareUpdateListener
interface. A StatementAwareUpdateListener
is especially useful for registering the same listener object with multiple statements,
as the listener receives the statement instance and engine instance in addition to new and old data when the engine indicates new results to a listener.
StatementAwareUpdateListener myListener = new MyStmtAwareUpdateListener(); statement.addListener(myListener);
To indicate results the engine invokes this method on StatementAwareUpdateListener
listeners: update(EventBean[] newEvents, EventBean[] oldEvents, EPStatement statement, EPServiceProvider epServiceProvider)
The addListenerWithReplay
method provided by EPStatement
makes it possible to send a snapshot of current statement results to a listener when the listener is added.
When using the addListenerWithReplay
method to register a listener, the listener receives current statement results as the first call to the update method of the listener, passing in the newEvents parameter the current statement results as an array of zero or more events. Subsequent calls to the update method of the listener are statement results.
Current statement results are the events returned by the iterator
or safeIterator
methods.
Delivery is atomic: Events occurring during delivery of current results to the listener are guaranteed to be delivered in a separate call and not lost. The listener implementation should thus minimize long-running or blocking operations to reduce lock times held on statement-level resources.
Subscribing to events posted by a statement is following a push model. The engine pushes data to listeners when events are received that cause data to change or patterns to match. Alternatively, you need to know that statements serve up data that your application can obtain via the safeIterator
and iterator
methods on EPStatement
. This is called the pull API and can come in handy if your application is not interested in all new updates, and only needs to perform a frequent or infrequent poll for the latest data.
The safeIterator
method on EPStatement
returns a concurrency-safe iterator returning current statement results, even while concurrent threads may send events into the engine for processing. The engine employs a read-write lock per context partition and obtains a read lock for iteration. Thus safe iterator guarantees correct results even as events are being processed by other threads and other context partitions. The cost is that the iterator obtains and holds zero, one or multiple context partition locks for that statement that must be released via the close
method on the SafeIterator
instance.
The iterator
method on EPStatement
returns a concurrency-unsafe iterator. This iterator is only useful for applications that are single-threaded, or applications that themselves perform coordination between the iterating thread and the threads that send events into the engine for processing. The advantage to this iterator is that it does not hold a lock.
When statements are used with contexts and context partitions, the APIs to identify, filter and select context partitions for statement iteration are described in Section 14.18, “Context Partition Selection”.
The next code snippet shows a short example of use of safe iterators:
EPStatement statement = epAdmin.createEPL("select avg(price) as avgPrice from MyTick"); // .. send events into the engine // then use the pull API... SafeIterator<EventBean> safeIter = statement.safeIterator(); try { for (;safeIter.hasNext();) { // .. process event .. EventBean event = safeIter.next(); System.out.println("avg:" + event.get("avgPrice"); } } finally { safeIter.close(); // Note: safe iterators must be closed }
This is a short example of use of the regular iterator that is not safe for concurrent event processing:
double averagePrice = (Double) eplStatement.iterator().next().get("average");
The safeIterator
and iterator
methods can be used to pull results out of all statements, including statements that join streams, contain aggregation functions, pattern statements, and statements that contain a where
clause, group by
clause, having
clause or order by
clause.
For statements without an order by
clause, the iterator
method returns events in the order maintained by the data window. For statements that contain an order by
clause, the iterator
method returns events in the order indicated by the order by
clause.
Consider using the on-select
clause and a named window if your application requires iterating over a partial result set or requires indexed access for fast iteration; Note that on-select
requires that you sent a trigger event, which may contain the key values for indexed access.
Esper places the following restrictions on the pull API and usage of the safeIterator
and iterator
methods:
In multithreaded applications, use the safeIterator
method. Note: make sure your application closes the iterator via the close
method when done, otherwise the iterated statement context partitions stay locked and event processing for statement context partitions does not resume.
In multithreaded applications, the iterator
method does not hold any locks. The iterator returned by this method does not make any guarantees towards correctness of results and fail-behavior, if your application processes events into the engine instance by multiple threads. Use the safeIterator
method for concurrency-safe iteration instead.
Since the safeIterator
and iterator
methods return events to the application immediately, the iterator does not honor an output rate limiting clause, if present. That is, the iterator returns results as if there is no output-rate clause for the statement in statements without grouping or aggregation. For statements with grouping or aggregation, the iterator in combintion with an output clause returns last output group and aggregation results. Use a separate statement and the insert into
clause to control the output rate for iteration, if so required.
When iterating a statement that operates on an unbound stream (no data window declared), please note the following:
When iterating a statement that groups and aggregates values from an unbound stream and that specifies output snapshot
, the engine retains groups and aggregations for output as iteration results or upon the output snapshot condition .
When iterating a statement that groups and aggregates values from an unbound stream and that does not specify output snapshot
, the engine only retains the last aggregation values and the iterated result contains only the last updated group.
When iterating a statement that operates on an unbound stream the iterator returns no rows. This behavior can be changed by specifying either the @IterableUnbound
annotation or by changing the global view resources configuration.
The EPAdministrator
interface provides the facilities for managing statements:
Use the getStatement
method to obtain an existing started or stopped statement by name
Use the getStatementNames
methods to obtain a list of started and stopped statement names
Use the startAllStatements
, stopAllStatements
and destroyAllStatements
methods to manage all statements in one operation
Your application can concurrently send events into the engine while performing statement or module management. Therefore it is safe to stop and start statements or un-deploy and deploy modules while sending in events from other threads concurrently.
Your application can use the API described below to obtain a lock and perform statement or module management as an atomic unit. For example, if your application would like to un-deploy and re-deploy all statements or modules as a single administrative unit, while at the same time sending events into the engine from different threads, it can obtain a lock to ensure that no events are concurrently processed while the statement or module management operations take place.
Deploying or un-deploying a single module is already an atomic operation by default and does not require taking an explicit lock. Please see Section 16.6, “The Deployment Administrative Interface” for the deployment API. If your application would like to deploy multiple modules and/or statements as a unit, please obtain a lock as discussed below.
The below code sample obtains the engine exclusive write lock to perform multiple management operations as a unit, excluding concurrent processing of events.
epService.getEngineInstanceWideLock().writeLock().lock(); // Start atomic management unit. // Any events concurrently being processed by other threads must complete before the code completes obtaining the lock. // Any events sent in by other threads will await the release of the lock. try { // Perform operations such as : // - start statements, destroy statements, stop statements // - un-deploy multiple modules, deploy multiple modules (deployment admin API) // There is no need to obtain this lock when deploying or un-deploying a single module. // The lock is reentrant and can be safely taken multiple times by the same thread. // Make sure you use "try" and "finally" just like we have it here. } finally { // Complete atomic management unit. // Any events sent in by other threads will now continue processing against the changed set of statements. epService.getEngineInstanceWideLock().writeLock().unlock(); }
There should always be a finally
block in your code to ensure the lock is released in all cases.
Certain configuration changes are available to perform on an engine instance while in operation. Such configuration operations are available via the getConfiguration
method on EPAdministrator
,
which returns a ConfigurationOperations
object.
Please consult the JavaDoc of ConfigurationOperations
for further information. The section Section 15.6, “Runtime Configuration” provides a summary of available configurations.
In summary, the configuration operations available on a running engine instance are as follows:
Add new event types for all event representations, check if an event type exists, update an existing event type, remove an event type, query a list of types and obtain a type by name.
Add and remove variables (get and set variable values is done via the runtime API).
Add a variant stream.
Add a revision event type.
Add event types for all event classes in a given Java package, using the simple class name as the event name.
Add import for user-defined functions.
Add a plug-in aggregation function, plug-in single row function, plug-in event type, plug-in event type resolution URIs.
Control metrics reporting.
Additional items please see the ConfigurationOperations
interface.
For examples of above runtime configuration API functions please consider the Configuration chapter, which applies to both static configuration and runtime configuration as the ConfigurationOperations
interface is the same.
The EPRuntime
interface is used to send events for processing into an Esper engine, set and get variable values and execute on-demand queries.
The below code snippet shows how to send a Java object event to the engine. Note that the sendEvent
method is overloaded. As events can take on different representation classes in Java, the sendEvent
takes parameters to reflect the different
types of events that can be send into the engine. The Chapter 3, Event Representations section explains the types of events
accepted.
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPRuntime runtime = epService.getEPRuntime(); // Send an example event containing stock market data runtime.sendEvent(new MarketDataBean('IBM', 75.0));
Events, in theoretical terms, are observations of a state change that occurred in the past. Since you cannot change an event that happened in the past, events are best modelled as immutable objects.
The engine relies on events that are sent into an engine to not change their state. Typically, applications create a new event object for every new event, to represent that new event. Application should not modify an existing event that was sent into the engine.
Another important method in the runtime interface is the route
method. This method is designed for use by UpdateListener
and
subscriber implementations as well as engine extensions that need to send events into an engine instance to avoid the possibility of a stack overflow due to nested calls
to sendEvent
and to ensure correct processing of the current and routed event.
Note that if outbound-threading is enabled, listeners and subscribers should use sendEvent
and not route
.
The EventSender
interface processes event objects that are of a known type. This facility can reduce the overhead of event object reflection and type lookup as an event sender
is always associated to a single concrete event type.
Use the method getEventSender(String eventTypeName)
to obtain an event sender for processing events of the named type:
EventSender sender = epService.getEPRuntime().getEventSender("MyEvent"); sender.sendEvent(myEvent);
For events backed by a Java class (JavaBean events), the event sender ensures that the event object equals the underlying class, or implements or extends the underlying class for the given event type name.
For events backed by a java.util.Map
(Map events), the event sender does not perform any checking other than checking that the event object implements Map.
For events backed by a Object[]
(Object-array events), the event sender does not perform any checking other than checking that the event object implements Object[]. The array elements must be in the exact same order of properties as declared and array length must always be at least the number of properties declared.
For events backed by a Apache Avro GenericData.Record
, the event sender does not perform any checking other than checking that the event object is a GenericData.Record. The schema associated to the record should match the event type's Avro schema.
For events backed by a org.w3c.Node (XML DOM events), the event sender checks that the root element name equals the root element name for the event type.
A second method to obtain an event sender is the method getEventSender(URI[])
, which takes an array of URIs. This method is for use with plug-in event representations.
The event sender returned by this method processes event objects that are of one of the types of one or more plug-in event representations. Please consult Section 17.8, “Event Type and Event Object” for more information.
Your application can register an implementation of the UnmatchedListener
interface with the EPRuntime
runtime via the setUnmatchedListener
method to receive events that were not matched by any statement.
Events that can be unmatched are all events that your application sends into the runtime via one of the sendEvent
or route
methods, or that have been generated via an insert into
clause.
For an event to become unmatched by any statement, the event must not match any statement's event stream filter criteria. Note that the EPL where
clause or having
clause are not considered part of the filter criteria for a stream, as explained by example below.
In the following statement a MyEvent event with a 'quantity' property value of 5 or less does not match this statement's event stream filter criteria. The engine delivers such an event to the registered UnmatchedListener
instance provided no other statement matches on the event:
select * from MyEvent(quantity > 5)
For patterns, if no pattern sub-expression is active for an event type, an event of that type also counts as unmatched in regards to the pattern statement.
As your application may not require streaming results and may not know each query in advance, the on-demand query facility provides for ad-hoc execution of an EPL expression.
On-demand queries are not continuous in nature: The query engine executes the query once and returns all result rows to the application. On-demand query execution is very lightweight as the engine performs no statement creation and the query leaves no traces within the engine.
Esper provides the facility to explicitly index named windows and tables to speed up on-demand and continuous queries. Please consult Section 6.9, “Explicitly Indexing Named Windows and Tables” for more information.
When named windows and tables are used with contexts and context partitions, the APIs to identify, filter and select context partitions for on-demand queries can be found in Section 14.18, “Context Partition Selection”.
The EPRuntime
interface provides three ways to run on-demand queries:
Use the executeQuery
method to executes a given on-demand query exactly once, see Section 14.5.1, “On-Demand Query Single Execution”.
Use the prepareQuery
method to prepare a given on-demand query such that the same query can be executed multiple times without repeated parsing, see Section 14.5.2, “On-Demand Query Prepared Unparameterized Execution”.
Use the prepareQueryWithParameters
method to prepare a given on-demand query that may have substitution parameters such that the same query can be parameterized and executed multiple times without repeated parsing, see Section 14.5.3, “On-Demand Query Prepared Parameterized Execution”
If your application must execute the same EPL on-demand query multiple times with different parameters use prepareQueryWithParameters
.
If your application must execute the same EPL on-demand query multiple times without use either prepareQuery
or prepareQueryWithParameters
and specify no substitution parameters.
By using any of the prepare...
methods the engine can compile an EPL query string or object model once and reuse the object and thereby speed up repeated execution.
The following limitations apply:
An on-demand EPL expression only evaluates against the named windows and tables that your application creates. On-demand queries may not specify any other streams or application event types.
The following clauses are not allowed in on-demand EPL: insert into
and output
.
Data windows and patterns are not allowed to appear in on-demand queries.
On-demand EPL may not perform subqueries.
The previous
and prior
functions may not be used.
Use the executeQuery
method for executing an on-demand query once. For repeated execution, please consider any of the prepare...
methods instead.
The next program listing runs an on-demand query against a named window MyNamedWindow
and prints a column of each row result of the query:
String query = "select * from MyNamedWindow"; EPOnDemandQueryResult result = epRuntime.executeQuery(query); for (EventBean row : result.getArray()) { System.out.println("name=" + row.get("name")); }
For executing an on-demand query against a table please put the table name into the from
-clause instead.
Prepared on-demand queries are designed for repeated execution and may perform better then the dynamic single-execution method if running the same query multiple times. For use with parameter placeholders please see Section 14.5.3, “On-Demand Query Prepared Parameterized Execution”.
The next code snippet demonstrates prepared on-demand queries without parameter placeholder:
String query = "select * from MyNamedWindow where orderId = '123'" EPOnDemandPreparedQuery prepared = epRuntime.prepareQuery(query); EPOnDemandQueryResult result = prepared.execute(); // ...later on execute once more ... prepared.execute(); // execute a second time
You can insert substitution parameters into an on-demand query as a single question mark character '?'
, making the substitution parameter addressable by index.
You can also insert substitution parameters using the following syntax, which makes the substitution parameter addressable by name:
?:name
If substitution parameters do not have a name assigned, the engine assigns the first substitution parameter an index of 1 and subsequent parameters increment the index by one. Please see Section 14.13, “Prepared Statement and Substitution Parameters” for additional detail and examples.
Substitution parameters can be inserted into any EPL construct that takes an expression.
All substitution parameters must be replaced by actual values before an on-demand query with substitution parameters can be executed. Substitution parameters can be replaced with an actual value using the setObject
method for each index or name. Substitution parameters can be set to new values and the query executed more than once.
While the setObject
method allows substitution parameters to assume any actual value including application Java objects or enumeration values, the application must provide the correct type of substitution parameter that matches the requirements of the expression the parameter resides in.
The next program listing runs a prepared and parameterized on-demand query against a named window MyNamedWindow
(this example does not assign names to substitution parameters):
String query = "select * from MyNamedWindow where orderId = ?"; EPOnDemandPreparedQueryParameterized prepared = epRuntime.prepareQueryWithParameters(query); // Set the required parameter values before each execution prepared.setObject(1, "123"); result = epRuntime.executeQuery(prepared); // ...execute a second time with new parameter values... prepared.setObject(1, "456"); result = epRuntime.executeQuery(prepared);
This second example uses the in
operator and has multiple parameters:
String query = "select * from MyNamedWindow where orderId in (?) and price > ?"; EPOnDemandPreparedQueryParameterized prepared = epRuntime.prepareQueryWithParameters(query); prepared.setObject(1, new String[] {"123", "456"}); prepared.setObject(2, 1000.0});
An EventBean
object represents a row (event) in your continuous query's result set. Each EventBean
object has an associated EventType
object providing event metadata.
An UpdateListener
implementation receives one or more EventBean
events with each invocation. Via the iterator
method on EPStatement
your application can poll or read data out of statements. Statement iterators also return EventBean
instances.
Each statement provides the event type of the events it produces, available via the getEventType
method on EPStatement
.
An EventType
object encapsulates all the metadata about a certain type of events. As Esper supports an inheritance hierarchy for event types, it also provides information about super-types to an event type.
An EventType
object provides the following information:
For each event property, it lists the property name and type as well as flags for indexed or mapped properties and whether a property is a fragment.
The direct and indirect super-types to the event type.
Value getters for property expressions.
Underlying class of the event representation.
For each property of an event type, there is an EventPropertyDescriptor
object that describes the property.
The EventPropertyDescriptor
contains flags that indicate whether a property is an indexed (array) or a mapped property and whether access to property values require an integer index value (indexed properties only) or string key value (mapped properties only). The descriptor also contains a fragment flag that indicates whether a property value is available as a fragment.
The term fragment means an event property value that is itself an event, or a property value that can be represented as an event. The getFragmentType
on EventType
may be used to determine a fragment's event type in advance.
A fragment event type and thereby fragment events allow navigation over a statement's results even if the statement result contains nested events or a graph of events. There is no need to use the Java reflection API to navigate events, since fragments allow the querying of nested event properties or array values, including nested Java classes.
When using the Map or Object-array event representation, any named Map type or Object-array type nested within a Map or Object-array as a simple or array property is also available as a fragment. When using Java objects either directly or within Map or Object-array events, any object that is neither a primitive or boxed built-in type, and that is not an enumeration and does not implement the Map interface is also available as a fragment.
The nested, indexed and mapped property syntax can be combined to a property expression that may query an event property graph. Most of the methods on the EventType
interface allow a property expression to be passed.
Your application may use an EventType
object to obtain special getter-objects. A getter-object is a fast accessor to a property value of an event of a given type. All getter objects implement the EventPropertyGetter
interface. Getter-objects work only for events of the same type or sub-types as the EventType
that provides the EventPropertyGetter
. The performance section provides additional information and samples on using getter-objects.
An event object is an EventBean
that provides:
The property value for a property given a property name or property expression that may include nested, indexed or mapped properties in any combination.
The event type of the event.
Access to the underlying event object.
The EventBean
fragment or array of EventBean
fragments given a property name or property expression.
The getFragment
method on EventBean
and EventPropertyGetter
return the fragment EventBean
or array of EventBean
, if the property is itself an event
or can be represented as an event. Your application may use EventPropertyDescriptor
to determine which properties are also available as fragments.
The underlying event object of an EventBean
can be obtained via the getUnderlying
method. Please see Chapter 3, Event Representations for more information on different event representations.
From a threading perspective, it is safe to retain and query EventBean
and EventType
objects in multiple threads.
Consider a statement that returns the symbol, count of events per symbol and average price per symbol for tick events. Our sample statement may declare a fully-qualified Java class name as the event type: org.sample.StockTickEvent
. Assume that this class exists and exposes a symbol
property of type String, and a price
property of type (Java primitive) double.
select symbol, avg(price) as avgprice, count(*) as mycount from org.sample.StockTickEvent group by symbol
The next table summarizes the property names and types as posted by the statement above:
Table 14.3. Properties Offered by Sample Statement Aggregating Price
Name | Type | Description | Java code snippet |
---|---|---|---|
symbol | java.lang.String | Value of symbol event property | eventBean.get("symbol") |
avgprice | java.lang.Double | Average price per symbol | eventBean.get("avgprice") |
mycount | java.lang.Long | Number of events per symbol | eventBean.get("mycount") |
A code snippet out of a possible UpdateListener
implementation to this statement may look as below:
String symbol = (String) newEvents[0].get("symbol"); Double price= (Double) newEvents[0].get("avgprice"); Long count= (Long) newEvents[0].get("mycount");
The engine supplies the boxed java.lang.Double
and java.lang.Long
types as property values rather than primitive Java types. This is because aggregated values can return a null
value to indicate that no data is available for aggregation. Also, in a select statement that computes expressions, the underlying event objects to EventBean
instances are either of type Object[]
(object-array) or of type java.util.Map
.
Use statement.getEventType().getUnderlyingType()
to inspect the underlying type for all events delivered to listeners. Whether the engine delivers Map or Object-array events to listeners can be specified as follows. If the statement provides the @EventRepresentation(objectarray)
annotation the engine delivers the output events as object array. If the statement provides the @EventRepresentation(map)
annotation the engine delivers output events as a Map. If neither annotation is provided, the engine delivers the configured default event representation as discussed in Section 15.4.14.1, “Default Event Representation”.
Consider the next statement that specifies a wildcard selecting the same type of event:
select * from org.sample.StockTickEvent where price > 100
The property names and types provided by an EventBean
query result row, as posted by the statement above are as follows:
Table 14.4. Properties Offered by Sample Wildcard-Select Statement
Name | Type | Description | Java code snippet |
---|---|---|---|
symbol | java.lang.String | Value of symbol event property | eventBean.get("symbol") |
price | double | Value of price event property | eventBean.get("price") |
As an alternative to querying individual event properties via the get
methods, the getUnderlying
method on EventBean
returns the underlying object representing the query result.
In the sample statement that features a wildcard-select, the underlying event object is of type org.sample.StockTickEvent
:
StockTickEvent tick = (StockTickEvent) newEvents[0].getUnderlying();
Composite events are events that aggregate one or more other events. Composite events are typically created by the engine for statements that join two event streams, and for event patterns in which the causal events are retained and reported in a composite event. The example below shows such an event pattern.
// Look for a pattern where BEvent follows AEvent String pattern = "a=AEvent -> b=BEvent"; EPStatement stmt = epService.getEPAdministrator().createPattern(pattern); stmt.addListener(testListener);
// Example listener code public class MyUpdateListener implements UpdateListener { public void update(EventBean[] newData, EventBean[] oldData) { System.out.println("a event=" + newData[0].get("a")); System.out.println("b event=" + newData[0].get("b")); } }
Note that the update
method can receive multiple events at once as it accepts an array of EventBean
instances. For example, a time batch window may post multiple events to listeners representing a batch of events received during a given time period.
Pattern statements can also produce multiple events delivered to update listeners in one invocation. The pattern statement below, for instance, delivers an event for each A event that was not followed by a B event with the same id
property within 60 seconds of the A event. The engine may deliver all matching A events as an array of events in a single invocation of the update
method of each listener to the statement:
select * from pattern[ every a=A -> (timer:interval(60 sec) and not B(id=a.id))]
A code snippet out of a possible UpdateListener
implementation to this statement that retrives the events as fragments may look as below:
EventBean a = (EventBean) newEvents[0].getFragment("a"); // ... or using a nested property expression to get a value out of A event... double value = (Double) newEvent[0].get("a.value");
Some pattern objects return an array of events. An example is the unbound repeat operator. Here is a sample pattern that collects all A events until a B event arrives:
select * from pattern [a=A until b=B]
A possible code to retrieve different fragments or property values:
EventBean[] a = (EventBean[]) newEvents[0].getFragment("a"); // ... or using a nested property expression to get a value out of A event... double value = (Double) newEvent[0].get("a[0].value");
For NEsper .NET also see Section I.16, “.NET API - Engine Threading and Concurrency”.
Esper is designed from the ground up to operate as a component to multi-threaded, highly-concurrent applications that require efficient use of Java VM resources. In addition, multi-threaded execution requires guarantees in predictability of results and deterministic processing. This section discusses these concerns in detail.
In Esper, an engine instance is a unit of separation. Applications can obtain and discard (initialize) one or more engine instances within the same Java VM and can provide the same or different engine configurations to each instance. An engine instance can shares resources between statements by means of named windows, tables and variables.
Applications can use Esper APIs to concurrently, by multiple threads of execution, perform such functions as creating and managing statements, or sending events into an engine instance for processing. Applications can use application-managed threads or thread pools or any set of same or different threads of execution with any of the public Esper APIs. There are no restrictions towards threading other than those noted in specific sections of this document.
Esper does not prescribe a specific threading model. Applications using Esper retain full control over threading, allowing an engine to be easily embedded and used as a component or library in your favorite Java container or process.
In the default configuration it is up to the application code to use multiple threads for processing events by the engine, if so desired. All event processing takes places within your application thread call stack. The exception is timer-based processing if your engine instance relies on the internal timer (default). If your application relies on external timer events instead of the internal timer then there need not be any Esper-managed internal threads.
The fact that event processing can take place within your application thread's call stack makes developing applications with Esper easier: Any common Java integrated development environment (IDE) can host an Esper engine instance. This allows developers to easily set up test cases, debug through listener code and inspect input or output events, or trace their call stack.
In the default configuration, each engine instance maintains a single timer thread (internal timer) providing for time or schedule-based processing within the engine. The default resolution at which the internal timer operates is 100 milliseconds. The internal timer thread can be disabled and applications can instead send external time events to an engine instance to perform timer or scheduled processing at the resolution required by an application.
Each engine instance performs minimal locking to enable high levels of concurrency. An engine instance locks on the combination of query and context partition to protect context partition resources. For example, two queries with three partitions each have a total of six locks. For stateless EPL select-statements the engine does not use a context-partition lock and operates lock-free for the context partition. For stateful statements, the maximum (theoretical) degree of parallelism is 2^31-1 (2,147,483,647) parallel threads working to process a single EPL statement under a hash segmented context.
For named windows and tables, on-select
, on-merge
, on-update
and on-delete
all execute under the same lock as the partition of the named window or table.
Any insert into
produces a new event for the work queue and does not lock the target of the insert-into.
You may turn off context partition locking engine-wide (also read the caution notice) as described in Section 15.4.27.3, “Disable Locking”. You may disable context partition locking for a given statement by providing the @NoLock
annotation as part of your EPL. Note, Esper provides the @NoLock
annotation for the purpose of identifying locking overhead, or when your application is single-threaded, or when using an external mechanism for concurrency control or for example with virtual data windows or plug-in data windows to allow customizing concurrency for a given statement or named window. Using this annotation may have unpredictable results unless your application is taking concurrency under consideration.
For an engine instance to produce predictable results from the viewpoint of listeners to statements, an engine instance by default ensures that it dispatches statement result events to listeners in the order in which a statement produced result events. Applications that require the highest possible concurrency and do not require predictable order of delivery of events to listeners, this feature can be turned off via configuration, see Section 15.4.13.1, “Preserving the Order of Events Delivered to Listeners”. For example, assume thread T1 processes an event applied to statement S producing output event O1. Assume thread T2 processes another event applied to statement S and produces output event O2. The engine employs a configurable latch system to ensure that listeners to statement S receive and may complete processing of O1 before receiving O2. When using outbound threading (advanced threading options) or changing the configuration this guarantee is weakened or removed.
In multithreaded environments, when one or more statements make result events available via the insert into
clause to further statements, the engine preserves the order of events inserted into the generated insert-into stream, allowing statements that consume other statement's events to behave deterministic. This feature can also be turned off via configuration, see , see Section 15.4.13.2, “Preserving the Order of Events for Insert-Into Streams”. For example, assume thread T1 processes an event applied to statement S and thread T2 processes another event applied to statement S. Assume statement S inserts into into stream ST. T1 produces an output event O1 for processing by consumers of ST1 and T2 produces an output event O2 for processing by consumers of ST. The engine employs a configurable latch system such that O1 is processed before O2 by consumers of ST. When using route execution threading (advanced threading options) or changing the configuration this guarantee is weakened or removed.
We generally recommended that listener implementations block minimally or do not block at all. By implementing listener code as non-blocking code execution threads can often achieve higher levels of concurrency.
We recommended that, when using a single listener or subscriber instance to receive output from multiple statements, that the listener or subscriber code is multithread-safe. If your application has shared state between listener or subscriber instances then such shared state should be thread-safe.
In the default configuration the same application thread that invokes any of the sendEvent
methods will process the event fully and also deliver output events to listeners and subscribers. By default the single internal timer thread based on system time performs time-based processing and delivery of time-based results.
This default configuration reduces the processing overhead associated with thread context switching, is lightweight and fast and works well in many environments such as J2EE, server or client. Latency and throughput requirements are largely use case dependant, and Esper provides engine-level facilities for controlling concurrency that are described next.
Inbound Threading queues all incoming events: A pool of engine-managed threads performs the event processing. The application thread that sends an event via any of the sendEvent
methods returns without blocking.
Outbound Threading queues events for delivery to listeners and subscribers, such that slow or blocking listeners or subscribers do not block event processing.
Timer Execution Threading means time-based event processing is performed by a pool of engine-managed threads. With this option the internal timer thread (or external timer event) serves only as a metronome, providing units-of-work to the engine-managed threads in the timer execution pool, pushing threading to the level of each statement for time-based execution.
Route Execution Threading means that the thread sending in an event via any of the sendEvent
methods (or the inbound threading pooled thread if inbound threading is enabled) only identifies and pre-processes an event, and a pool of engine-managed threads handles the actual processing of the event for each statement, pushing threading to the level of each statement for event-arrival-based execution.
The engine starts engine-managed threads as daemon threads when the engine instance is first obtained. The engine stops engine-managed threads when the engine instance is destroyed via the destroy
method. When the engine is initialized via the initialize
method the existing engine-managed threads are stopped and new threads are created. When shutting down your application, use the destroy
method to stop engine-managed threads.
Note that the options discussed herein may introduce additional processing overhead into your system, as each option involves work queue management and thread context switching.
If your use cases require ordered processing of events or do not tolerate disorder, the threading options described herein are not the right choice.
For enforcing a processing order within a given criteria, your application must enforce such processing order. Esper does not enforce order of processing if you enable inbound or route threading. Your application code could, for example, utilize a thread per group of criteria keys, a latch per criteria key, or a queue per criteria key, or use Java's completion service, all depending on your ordering requirements.
If your use cases require loss-less processing of events, wherein the threading options mean that events are held in an in-memory queue, the threading options described herein may not be the right choice.
Care should be taken to consider arrival rates and queue depth. Threading options utilize unbound queues or capacity-bound queues with blocking-put, depending on your configuration, and may therefore introduce an overload or blocking situation to your application. You may use the service provider interface as outlined below to manage queue sizes, if required, and to help tune the engine to your application needs. Consider throttling down the event send rate when the API (see below) indicates that events are getting queued.
All threading options are on the level of an engine. If you require different threading behavior for certain statements then consider using multiple engine instances, consider using the route
method or consider
using application threads instead.
Please consult Section 15.4.13, “Engine Settings Related to Concurrency and Threading” for instructions on how to configure threading options. Threading options take effect at engine initialization time.
With inbound threading an engine places inbound events in a queue for processing by one or more engine-managed threads other than the delivering application threads.
The delivering application thread uses one of the sendEvent
methods on EPRuntime
to deliver events or may also use the sendEvent
method on a EventSender
. The engine receives the event and places the event into a queue, allowing the delivering thread to continue and not block while the event is being processed and results are delivered.
Events that are sent into the engine via one of the route
methods are not placed into queue but processed by the same thread invoking the route
operation.
With outbound threading an engine places outbound events in a queue for delivery by one or more engine-managed threads other than the processing thread originating the result.
With outbound threading your listener or subscriber class receives statement results from one of the engine-managed threads in the outbound pool of threads. This is useful when you expect your listener or subscriber code to perform significantly blocking operations and you do not want to hold up event processing.
sendEvent
method and not the route
method.
With timer execution threading an engine places time-based work units into a queue for processing by one or more engine-managed threads other than the internal timer thread or the application thread that sends an external timer event.
Using timer execution threading the internal timer thread (or thread delivering an external timer event) serves to evaluate which time-based work units must be processed. A pool of engine-managed threads performs the actual processing of time-based work units and thereby offloads the work from the internal timer thread (or thread delivering an external timer event).
Enable this option as a tuning parameter when your statements utilize time-based patterns or data windows. Timer execution threading is fine grained and works on the level of a time-based schedule in combination with a statement.
With route execution threading an engine identifies event-processing work units based on the event and statement combination. It places such work units into a queue for processing by one or more engine-managed threads other than the thread that originated the event.
While inbound threading works on the level of an event, route execution threading is fine grained and works on the level of an event in combination with a statement.
The service-provider interface EPServiceProviderSPI
is an extension API that allows to manage engine-level queues and thread pools .
The service-provider interface EPServiceProviderSPI
is considered an extension API and subject to change between release versions.
The following code snippet shows how to obtain the BlockingQueue<Runnable>
and the ThreadPoolExecutor
for the managing the queue and thread pool responsible for inbound threading:
EPServiceProviderSPI spi = (EPServiceProviderSPI) epService; int queueSize = spi.getThreadingService().getInboundQueue().size(); ThreadPoolExecutor threadpool = spi.getThreadingService().getInboundThreadPool();
This section discusses the order in which N competing statements that all react to the same arriving event execute.
The engine, by default, does not guarantee to execute competing statements in any particular order unless using @Priority. We therefore recommend that an application does not rely on the order of execution of statements by the engine, since that best shields the behavior of an application from changes in the order that statements may get created by your application or by threading configurations that your application may change at will.
If your application requires a defined order of execution of competing statements, use the @Priority EPL syntax to make the order of execution between statements well-defined (requires that you set the prioritized-execution configuration setting). And the @Drop can make a statement preempt all other lowered priority ones that then won't get executed for any matching events.
This section discusses the order of event evaluation when multiple events must be processed, for example when multiple statements use insert-into to generate further events upon arrival of an event.
The engine processes an arriving event completely before indicating output events to listeners and subscribers, and before considering output events generated by insert-into or routed events inserted by listeners or subscribers.
For example, assume three statements (1) select * from MyEvent and (2) insert into ABCStream select * from MyEvent. (3) select * from ABCStream. When a MyEvent event arrives then the listeners to statements (1) and (2) execute first (default threading model). Listeners to statement (3) which receive the inserted-into stream events are always executed after delivery of the triggering event.
Among all events generated by insert-into of statements and the events routed into the engine via the route
method, all events that insert-into a named window are processed first in the order generated. All other events are processed thereafter in the order they were generated.
When enabling timer or route execution threading as explained under advanced threading options then the engine does not make any guarantee to the processing order except that is will prioritize events inserted into a named window.
There are two modes for an engine to keep track of time: The internal timer based on JVM system time (the default), and externally-controlled (aka. event time) time giving your application full control over the concept of time within an engine or isolated service.
An isolated service is an execution environment separate from the main engine runtime, allowing full control over the concept of time for a group of statements, as further described in Section 14.9, “Service Isolation”.
By default the internal timer provides time and evaluates schedules. External clocking i.e. event time can be used to supply time ticks to the engine instead. The latter is useful for when events themselves provide the time to advance. External clocking also helps in testing time-based event sequences or for synchronizing the engine with an external time source.
The internal timer relies on the java.util.concurrent.ScheduledThreadPoolExecutor
class for time tick events. The next section describes timer resolution for the internal timer, by default set to 100 milliseconds but is configurable via the threading options. When using externally-controlled time the timer resolution is in your control.
To disable the internal timer and use externally-provided time instead, there are two options. The first option is to use the configuration API at engine initialization time. The second option toggles on and off the internal timer at runtime, via special timer control events that are sent into the engine like any other event.
If using a timer execution thread pool as discussed above, the internal timer or external time event provide the schedule evaluation however do not actually perform the time-based processing. The time-based processing is performed by the threads in the timer execution thread pool.
External and internal/system time is the same internally to the engine thus the engine behaves the same whether using external or internal timer.
This code snippet shows the use of the configuration API to disable the internal timer and thereby turn on externally-provided time (see the Configuration section for configuration via XML file):
Configuration config = new Configuration(); config.getEngineDefaults().getThreading().setInternalTimerEnabled(false); EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config);
After disabling the internal timer, it is wise to set a defined time so that any statements created thereafter start relative to the time defined. Use the CurrentTimeEvent
class to indicate current time to the engine
and to move time forward for the engine (a.k.a application-time model).
This code snippet obtains the current time and sends a timer event in:
long timeInMillis = System.currentTimeMillis(); CurrentTimeEvent timeEvent = new CurrentTimeEvent(timeInMillis); epService.getEPRuntime().sendEvent(timeEvent);
Alternatively, you can use special timer control events to enable or disable the internal timer. Use the TimerControlEvent
class to control timer operation at runtime.
The next code snippet demonstrates toggling to external timer at runtime, by sending in a TimerControlEvent
event:
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPRuntime runtime = epService.getEPRuntime(); // switch to external clocking runtime.sendEvent(new TimerControlEvent(TimerControlEvent.ClockType.CLOCK_EXTERNAL));
Your application sends a CurrentTimeEvent
event when it desires to move the time forward. All aspects of Esper engine time related to EPL statements and patterns are driven by the time provided by the CurrentTimeEvent
that your application sends in.
The next example sequence of instructions sets time to zero, then creates a statement, then moves time forward to 1 seconds later and then 6 seconds later:
// Set start time at zero. runtime.sendEvent(new CurrentTimeEvent(0)); // create a statement here epAdministrator.createEPL("select * from MyEvent output every 5 seconds"); // move time forward 1 second runtime.sendEvent(new CurrentTimeEvent(1000)); // move time forward 5 seconds runtime.sendEvent(new CurrentTimeEvent(6000));
When sending external timer events, your application should make sure that long
-type time values are ascending. That is, each long-type value should be either the same value or a larger value then the prior value provided by a CurrentTimeEvent
.
Your application may use the getNextScheduledTime
method in EPRuntime
to determine the earliest time a schedule for any statement requires evaluation.
The following code snippet sets the current time, creates a statement and prints the next scheduled time which is 1 minute later then the current time:
// Set start time to the current time. runtime.sendEvent(new CurrentTimeEvent(System.currentTimeMillis())); // Create a statement. epService.getEPAdministrator().createEPL("select * from pattern[timer:interval(1 minute)]"); // Print next schedule time System.out.println("Next schedule at " + new Date(runtime.getNextScheduledTime());
With CurrentTimeEvent
, as described above, your application can advance engine time to a given point in time. In addition, the getNextScheduledTime
method in EPRuntime
returns the next scheduled time according to started statements.
You would typically use CurrentTimeEvent
to advance time at a relatively high resolution.
To advance time for a span of time without sending individual CurrentTimeEvent
events to the engine, the API provides the class CurrentTimeSpanEvent
. You may use CurrentTimeSpanEvent
with or without a resolution.
If your application only provides the target end time of time span to CurrentTimeSpanEvent
and no resolution, the engine advances time up to the target time by stepping through all relevant times according to started statements.
If your application provides the target end time of time span and in addition a long
-typed resolution, the engine advances time up to the target time by incrementing time according to the resolution (regardless of next scheduled time according to started statements).
Consider the following example:
// Set start time to Jan.1, 2010, 00:00 am for this example SimpleDateFormat format = new SimpleDateFormat("yyyy MM dd HH:mm:ss SSS"); Date startTime = format.parse("2010 01 01 00:00:00 000"); runtime.sendEvent(new CurrentTimeEvent(startTime.getTime())); // Create a statement. EPStatement stmt = epService.getEPAdministrator().createEPL("select current_timestamp() as ct " + "from pattern[every timer:interval(1 minute)]"); stmt.addListener(...); // add a listener // Advance time to 10 minutes after start time runtime.sendEvent(new CurrentTimeSpanEvent(startTime.getTime() + 10*60*1000));
The above example advances time to 10 minutes after the time set using CurrentTimeSpanEvent
. As the example does not pass a resolution, the engine advances time according to statement schedules.
Upon sending the CurrentTimeSpanEvent
the listener sees 10 invocations for minute 1 to minute 10.
To advance time according to a given resolution, you may provide the resolution as shown below:
// Advance time to 10 minutes after start time at 100 msec resolution runtime.sendEvent(new CurrentTimeSpanEvent(startTime.getTime() + 10*60*1000, 100));
Time can have a resolution of either milliseconds or microseconds.
The default time resolution is milliseconds. To configure the engine for microsecond resolution, please see Section 15.4.22.2, “Time Unit”.
Table 14.5. Time Resolution
Millisecond | Microsecond | |
---|---|---|
Smallest unit for advancing time | 1 millisecond | 1 microsecond |
Equivalent java.util.concurrent.TimeUnit | TimeUnit.MILLISECONDS | TimeUnit.MICROSECONDS |
Default? | Default | Requires configuration change, see Section 15.4.22.2, “Time Unit” |
Long-type engine time represents | Milliseconds since Epoch | Microseconds since Epoch |
Example: the date Tue, 01 Jan 1980 00:00:00 GMT | 315532800000 | 315532800000000 |
Support for Internal System Time | Yes | No, requires external time (aka. event time) via CurrentTimeSpanEvent or CurrentTimeEvent |
A few notes on usage of microsecond time unit for time resolution:
The engine automatically computes time periods into microseconds. For example 1 minute 2 seconds
is 62000000
microseconds (62 * 1000000
).
The engine automatically computes time-in-second parameters into microseconds. For example 5.02 seconds
is 5020000
microseconds.
The engine automatically computes ISO schedules, crontabs and hints related to engine time into microseconds.
The CurrentTimeSpanEvent
or CurrentTimeEvent
events must provide microsecond values.
Date-time methods with long-type input values assume microsecond values.
Date-time methods or other functions that take millisecond parameters or produce millisecond values still consume/produce millisecond values, such as the date-time method toMillisec
.
The internal timer must be disabled (setInternalTimerEnabled(false)
) and TimerControlEvent.ClockType.CLOCK_INTERNAL
cannot be used.
By default the internal timer is enabled and that tracks VM system time. For many use cases your application may want to use event time or external time instead, as discussed above.
The internal timer thread, by default, uses the call System.currentTimeMillis()
to obtain system time. Please see the JIRA issue ESPER-191 Support nano/microsecond resolution for more information on Java system time-call performance, accuracy and drift.
The internal timer thread can be configured to use nano-second time as returned by System.nanoTime()
. If configured for nano-second time, the engine computes an offset of the nano-second ticks to wall clock time upon startup to present back an accurate millisecond wall clock time.
Please see section Section 15.4.22, “Engine Settings Related to Time Source” to configure the internal timer thread to use System.nanoTime()
.
The internal timer is based on java.util.concurrent.ScheduledThreadPoolExecutor
and that generally provides high accuracy VM time
(java.util.Timer
does not support high accuracy VM time).
Consider using the service-provider interface EPRuntimeSPI
EPRuntimeIsolatedSPI
. The two interfaces are service-provider interfaces that expose additional function to manage statement schedules. However the SPI interfaces should be considered an extension API and are subject to change between release versions.
Additional engine-internal SPI interfaces can be obtained by downcasting EPServiceProvider
to EPServiceProviderSPI
. For example the SchedulingServiceSPI
exposes schedule information per statement (downcast from SchedulingService
). Engine-internal SPI are subject to change between versions.
An isolated service allows an application to control event visibility and the concept of time as desired on a statement level: Events sent into an isolated service are visible only to those statements that currently reside in the isolated service and are not visible to statements outside of that isolated service. Within an isolated service an application can control time independently, start time at a point in time and advance time at the resolution and pace suitable for the statements added to that isolated service.
In the default configuration, isolated service is disabled and not available. This is because there is a small overhead associated with this feature. Please review Section 15.4.27.7, “Allow Isolated Service Provider” for the configuration setting.
As discussed before, a single Java Virtual Machine may hold multiple Esper engine instances unique by engine URI. Within an Esper engine instance the default execution environment for statements is the EPRuntime
engine runtime, which coordinates all statement's reaction to incoming events and to time passing (via internal or external timer).
Subordinate to an Esper engine instance, your application can additionally allocate multiple isolated services (or execution environments), uniquely identified by a name and represented by the EPServiceProviderIsolated
interface. In the isolated service, time passes only when you application sends timer events to the EPRuntimeIsolated
instance. Only events explicitly sent to the isolated service are visible to statements added.
Your application can create new statements that start in an isolated service. You can also move existing statements back and forth between the engine and an isolated service.
Isolation does not apply and is not supported for globally-visible concepts. Specifically it is not supported for the following.
Contexts, context partitions.
Variables.
Named Windows.
Tables.
An isolated service allows an application to:
Suspend a statement without losing its statement state that may have accumulated for the statement.
Control the concept of time separately for a set of statements, for example to simulate, backtest, adjust arrival order or compute arrival time.
Initialize statement state by replaying events, without impacting already running statements, to catch-up statements from historical events for example.
While a statement resides in an isolated runtime it receives only those events explicitly sent to the isolated runtime, and performs time-based processing based on the timer events provided to that isolated runtime.
Use the getEPServiceIsolated
method on EPServiceProvider
passing a name to obtain an isolated runtime:
EPServiceProviderIsolated isolatedService = epServiceManager.getEPServiceIsolated("name");
Set the start time for your isolated runtime via the CurrentTimeEvent
timer event:
// In this example start the time at the system time long startInMillis = System.currentTimeMillis(); isolatedService.getEPRuntime().sendEvent(new CurrentTimeEvent(startInMillis));
Use the addStatement
method on EPAdministratorIsolated
to move an existing statement out of the engine runtime into the isolated runtime:
// look up the existing statement EPStatement stmt = epServiceManager.getEPAdministrator().getStatement("MyStmt"); // move it to an isolated service isolatedService.getEPAdministrator().addStatement(stmt);
To remove the statement from isolation and return the statement back to the engine runtime, use the removeStatement
method on EPAdministratorIsolated
:
isolatedService.getEPAdministrator().removeStatement(stmt);
To create a new statement in the isolated service, use the createEPL
method on EPAdministratorIsolated
:
isolatedService.getEPAdministrator().createEPL( "@Name('MyStmt') select * from Event", null, null); // the example is passing the statement name in an annotation and no user object
The destroy
method on EPServiceProviderIsolated
moves all currently-isolated statements for that isolated service provider back to engine runtime.
When moving a statement between engine runtime and isolated service or back, the algorithm ensures that events are aged according to the time that passed and time schedules stay intact.
To use isolated services, your configuration must have view sharing disabled as described in Section 15.4.15.1, “Sharing View Resources Between Statements”.
By adding an existing statement to an isolated service, the statement's processing effectively becomes suspended. Time does not pass for the statement and it will not process events, unless your application explicitly moves time forward or sends events into the isolated service.
First, let's create a statement and send events:
EPStatement stmt = epServiceManager.getEPAdministrator().createEPL("select * from TemperatureEvent#time(30)"); epServiceManager.getEPRuntime().send(new TemperatureEvent(...)); // send some more events over time
The steps to suspend the previously created statement are as follows:
EPServiceProviderIsolated isolatedService = epServiceManager.getEPServiceIsolated("suspendedStmts"); isolatedService.getEPAdministrator().addStatement(stmt);
To resume the statement, move the statement back to the engine:
isolatedService.getEPAdministrator().removeStatement(stmt);
If the statement employed a time window, the events in the time window did not age. If the statement employed patterns, the pattern's time-based schedule remains unchanged. This is because the example did not advance time in the isolated service.
This example creates a statement in the isolated service, replays some events and advances time, then merges back the statement to the engine to let it participate in incoming events and engine time processing.
First, allocate an isolated service and explicitly set it to a start time. Assuming that myStartTime
is a long millisecond time value that marks the beginning of the data to replay, the sequence is as follows:
EPServiceProviderIsolated isolatedService = epServiceManager.getEPServiceIsolated("suspendedStmts"); isolatedService.getEPRuntime().sendEvent(new CurrentTimeEvent(myStartTime));
Next, create the statement. The sample statement is a pattern statement looking for temperature events following each other within 60 seconds:
EPStatement stmt = epAdmin.createEPL( "select * from pattern[every a=TemperatureEvent -> b=TemperatureEvent where timer:within(60)]");
For each historical event to be played, advance time and send an event. This code snippet assumes that currentTime
is a time greater then myStartTime
and reflects the time that the historical event should be processed at.
It also assumes historyEvent
is the historical event object.
isolatedService.getEPRuntime().sendEvent(new CurrentTimeEvent(currentTime)); isolatedService.getEPRuntime().send(historyEvent); // repeat the above advancing time until no more events
Finally, when done replaying events, merge the statement back with the engine:
isolatedService.getEPAdministrator().removeStatement(stmt);
When isolating statements, events that are generated by insert into
are visible within the isolated service that currently holds that insert into
statement.
For example, assume the below two statements named A and B:
@Name('A') insert into MyStream select * from MyEvent @Name('B') select * from MyStream
When adding statement A to an isolated service, and assuming a MyEvent
is sent to either the engine runtime or the isolated service, a listener to statement B does not receive that event.
When adding statement B to an isolated service, and assuming a MyEvent
is sent to either the engine runtime or the isolated service, a listener to statement B does not receive that event.
When isolating named windows or tables, the event visibility of events entering and leaving from a named window or the rows inserted, changed and removed from tables is not limited to the isolated service. This is because named windows are global data windows and tables are global data structures.
For example, assume the below three statements named A, B and C:
@Name('A') create window MyNamedWindow#time(60) as select * from MyEvent @Name('B') insert into MyNamedWindow select * from MyEvent @Name('C') select * from MyNamedWindow
When adding statement A to an isolated service, and assuming a MyEvent
is sent to either the engine runtime or the isolated service, a listener to statement A and C does not receive that event.
When adding statement B to an isolated service, and assuming a MyEvent
is sent to either the engine runtime or the isolated service, a listener to statement A and C does not receive that event.
When adding statement C to an isolated service, and assuming a MyEvent
is sent to the engine runtime, a listener to statement A and C does receive that event.
Moving statements between an isolated service and the engine is an expensive operation and should not be performed with high frequency.
When using multiple threads to send events and at the same time moving a statement to an isolated service, it its undefined whether events will be delivered to a listener of the isolated statement until all threads completed sending events.
Metrics reporting is not available for statements in an isolated service. Advanced threading options are also not available in the isolated service, however it is thread-safe to send events including timer events from multiple threads to the same or different isolated service.
You may register one or more exception handlers for the engine to invoke in the case it encounters an exception processing a continuously-executing statement. By default and without exception handlers the engine cancels execution of the current EPL statement that encountered the exception, logs the exception and continues to the next statement, if any. The configuration is described in Section 15.4.28, “Engine Settings Related to Exception Handling”.
If your application registers exception handlers as part of engine configuration, the engine invokes the exception handlers in the order they are registered passing relevant exception information such as EPL statement name, expression and the exception itself.
Exception handlers receive any EPL statement unchecked exception such as internal exceptions or exceptions thrown by plug-in aggregation functions or plug-in data windows. The engine does not provide to exception handlers any exceptions thrown by static method invocations for function calls, method invocations in joins, methods on variables and event classes and listeners or subscriber exceptions.
An exception handler can itself throw a runtime exception to cancel execution of the current event against any further statements.
Exceptions are meant to indicate an actual unexpected problem.
We do not recommend explicitly throwing exceptions for the purpose of flow control, preempting execution or other normal situations.
The engine does not guarantee that throwing an exception has no other side effect and the engine may not roll back changes that are already made to state.
For on-demand queries the API indicates any exception directly back to the caller without the exception handlers being invoked, as exception handlers apply to continuous queries only. The same applies to any API calls other than sendEvent
and the EventSender
methods.
As the configuration section describes, your application registers one or more classes that implement the ExceptionHandlerFactory
interface in the engine configuration. Upon engine initialization the engine obtains a factory instance from the class name that then provides the exception handler instance. The exception handler class must implement the ExceptionHandler
interface.
You may register one or more condition handlers for the engine to invoke in the case it encounters certain conditions, as outlined below, when executing a statement. By default and without condition handlers the engine logs the condition at informational level and continues processing. The configuration is described in Section 15.4.29, “Engine Settings Related to Condition Handling”.
If your application registers condition handlers as part of engine configuration, the engine invokes the condition handlers in the order they are registered passing relevant condition information such as EPL statement name, expression and the condition information itself.
Currently the only conditions indicated by this facility are raised by the pattern followed-by operator, see Section 7.5.8.1, “Limiting Sub-Expression Count” and see Section 7.5.8.2, “Limiting Engine-wide Sub-Expression Count”.
A condition handler may not itself throw a runtime exception or return any value.
As the configuration section describes, your application registers one or more classes that implement the ConditionHandlerFactory
interface in the engine configuration. Upon engine initialization the engine obtains a factory instance from the class name that then provides the condition handler instance. The condition handler class must implement the ConditionHandler
interface.
The statement object model is a set of classes that provide an object-oriented representation of an EPL or pattern statement. The object model classes are found in package com.espertech.esper.client.soda
. An instance of EPStatementObjectModel
represents a statement's object model.
The statement object model classes are a full and complete specification of a statement. All EPL and pattern constructs including expressions and sub-queries are available via the statement object model.
In conjunction with the administrative API, the statement object model provides the means to build, change or interrogate statements beyond the EPL or pattern syntax string representation. The object graph of the statement object model is fully navigable for easy querying by code, and is also serializable allowing applications to persist or transport statements in object form, when required.
The statement object model supports full round-trip from object model to EPL statement string and back to object model: A statement object model can be rendered into an EPL string representation via the toEPL
method on EPStatementObjectModel
. Further, the administrative API allows to compile a statement string into an object model representation via the compileEPL
method on EPAdministrator
.
The statement object model is fully mutable. Mutating a any list such as returned by getChildren()
, for example, is acceptable and supported.
The create
method on EPAdministrator
creates and starts a statement as represented by an object model. In order to obtain an object model from an existing statement, obtain the statement expression text of the statement via the getText
method on EPStatement
and use the compileEPL
method to obtain the object model.
The following limitations apply:
Statement object model classes are not safe for sharing between threads other than for read access.
Between versions of Esper, the serialized form of the object model is subject to change. Esper makes no guarantees that the serialized object model of one version will be fully compatible with the serialized object model generated by another version of Esper. Please consider this issue when storing Esper object models in persistent store.
A EPStatementObjectModel
consists of an object graph representing all possible clauses that can be part of an EPL statement.
Among all clauses, the SelectClause
and FromClause
objects are required clauses that must be present, in order to define what to select and where to select from.
Table 14.6. Required Statement Object Model Instances
Class | Description |
---|---|
EPStatementObjectModel | All statement clauses for a statement, such as the select-clause and the from-clause, are specified within the object graph of an instance of this class |
SelectClause | A list of the selection properties or expressions, or a wildcard |
FromClause | A list of one or more streams; A stream can be a filter-based, a pattern-based or a SQL-based stream; Add data windows here. |
Part of the statement object model package are convenient builder classes that make it easy to build a new object model or change an existing object model. The SelectClause
and FromClause
are such builder classes and provide convenient create
methods.
Within the from-clause you have a choice of different streams to select on. The FilterStream
class represents a stream that is filled by events of a certain type and that pass an optional filter expression.
We can use the classes introduced above to create a simple statement object model:
EPStatementObjectModel model = new EPStatementObjectModel(); model.setSelectClause(SelectClause.createWildcard()); model.setFromClause(FromClause.create(FilterStream.create("com.chipmaker.ReadyEvent")));
The model as above is equivalent to the EPL :
select * from com.chipmaker.ReadyEvent
Last, the code snippet below creates a statement from the object model:
EPStatement stmt = epService.getEPAdministrator().create(model);
Notes on usage:
Variable names can simply be treated as property names.
When selecting from named windows or tables, the name of the named window or table is the event type name for use in FilterStream
instances or patterns.
To compile an arbitrary sub-expression text into an Expression
object representation, simply add the expression text to a where
clause,
compile the EPL string into an object model via the compileEPL
on EPAdministrator
, and obtain the compiled where
from the EPStatementObjectModel
via the getWhereClause
method.
The EPStatementObjectModel
includes an optional where-clause. The where-clause is a filter expression that the engine applies to events in one or more streams. The key interface for all expressions is the Expression
interface.
The Expressions
class provides a convenient way of obtaining Expression
instances for all possible expressions. Please consult the JavaDoc for detailed method information.
The next example discusses sample where-clause expressions.
Use the Expressions
class as a service for creating expression instances, and add additional expressions via the add
method that most expressions provide.
The next example adds a simple where-clause to the EPL as shown earlier:
select * from com.chipmaker.ReadyEvent where line=8
And the code to add a where-clause to the object model is below.
model.setWhereClause(Expressions.eq("line", 8));
The following example considers a more complex where-clause. Assume you need to build an expression using logical-and and logical-or:
select * from com.chipmaker.ReadyEvent where (line=8) or (line=10 and age<5)
The code for building such a where-clause by means of the object model classes is:
model.setWhereClause(Expressions.or() .add(Expressions.eq("line", 8)) .add(Expressions.and() .add(Expressions.eq("line", 10)) .add(Expressions.lt("age", 5)) ));
The Patterns
class is a factory for building pattern expressions. It provides convenient methods to create all pattern expressions of the pattern language.
Patterns in EPL are seen as a stream of events that consist of patterns matches. The PatternStream
class represents a stream of pattern matches and contains a pattern expression within.
For instance, consider the following pattern statement.
select * from pattern [every a=MyAEvent and not b=MyBEvent]
The next code snippet outlines how to use the statement object model and specifically the Patterns
class to create a statement object model that is equivalent to the pattern statement above.
EPStatementObjectModel model = new EPStatementObjectModel(); model.setSelectClause(SelectClause.createWildcard()); PatternExpr pattern = Patterns.and() .add(Patterns.everyFilter("MyAEvent", "a")) .add(Patterns.notFilter("MyBEvent", "b")); model.setFromClause(FromClause.create(PatternStream.create(pattern)));
This section builds a complete example statement and includes all optional clauses in one EPL statement, to demonstrate the object model API.
A sample statement:
insert into ReadyStreamAvg(line, avgAge) select line, avg(age) as avgAge from com.chipmaker.ReadyEvent(line in (1, 8, 10))#time(10) as RE where RE.waverId != null group by line having avg(age) < 0 output every 10.0 seconds order by line
Finally, this code snippet builds the above statement from scratch:
EPStatementObjectModel model = new EPStatementObjectModel(); model.setInsertInto(InsertIntoClause.create("ReadyStreamAvg", "line", "avgAge")); model.setSelectClause(SelectClause.create() .add("line") .add(Expressions.avg("age"), "avgAge")); Filter filter = Filter.create("com.chipmaker.ReadyEvent", Expressions.in("line", 1, 8, 10)); model.setFromClause(FromClause.create( FilterStream.create(filter, "RE").addView("win", "time", 10))); model.setWhereClause(Expressions.isNotNull("RE.waverId")); model.setGroupByClause(GroupByClause.create("line")); model.setHavingClause(Expressions.lt(Expressions.avg("age"), Expressions.constant(0))); model.setOutputLimitClause(OutputLimitClause.create(OutputLimitSelector.DEFAULT, Expressions.timePeriod(null, null, null, 10.0, null))); model.setOrderByClause(OrderByClause.create("line"));
This sample statement creates a variable:
create variable integer var_output_rate = 10
The code to build the above statement using the object model:
EPStatementObjectModel model = new EPStatementObjectModel(); model.setCreateVariable(CreateVariableClause.create("integer", "var_output_rate", 10)); epService.getEPAdministrator().create(model);
A second statement sets the variable to a new value:
on NewValueEvent set var_output_rate = new_rate
The code to build the above statement using the object model:
EPStatementObjectModel model = new EPStatementObjectModel(); model.setOnExpr(OnClause.createOnSet("var_output_rate", Expressions.property("new_rate"))); model.setFromClause(FromClause.create(FilterStream.create("NewValueEvent"))); EPStatement stmtSet = epService.getEPAdministrator().create(model);
This sample statement creates a named window:
create window OrdersTimeWindow#time(30 sec) as select symbol as sym, volume as vol, price from OrderEvent
The is the code that builds the create-window statement as above:
EPStatementObjectModel model = new EPStatementObjectModel(); model.setCreateWindow(CreateWindowClause.create("OrdersTimeWindow").addView("win", "time", 30)); model.setSelectClause(SelectClause.create() .addWithName("symbol", "sym") .addWithName("volume", "vol") .add("price")); model.setFromClause(FromClause.create(FilterStream.create("OrderEvent)));
A second statement deletes from the named window:
on NewOrderEvent as myNewOrders delete from OrdersNamedWindow as myNamedWindow where myNamedWindow.symbol = myNewOrders.symbol
The object model is built by:
EPStatementObjectModel model = new EPStatementObjectModel(); model.setOnExpr(OnClause.createOnDelete("OrdersNamedWindow", "myNamedWindow")); model.setFromClause(FromClause.create(FilterStream.create("NewOrderEvent", "myNewOrders"))); model.setWhereClause(Expressions.eqProperty("myNamedWindow.symbol", "myNewOrders.symbol")); EPStatement stmtOnDelete = epService.getEPAdministrator().create(model);
A third statement selects from the named window using the non-continuous on-demand selection via on-select:
on QueryEvent(volume>0) as query select count(*) from OrdersNamedWindow as win where win.symbol = query.symbol
The on-select statement is built from scratch via the object model as follows:
EPStatementObjectModel model = new EPStatementObjectModel(); model.setOnExpr(OnClause.createOnSelect("OrdersNamedWindow", "win")); model.setWhereClause(Expressions.eqProperty("win.symbol", "query.symbol")); model.setFromClause(FromClause.create(FilterStream.create("QueryEvent", "query", Expressions.gt("volume", 0)))); model.setSelectClause(SelectClause.create().add(Expressions.countStar())); EPStatement stmtOnSelect = epService.getEPAdministrator().create(model);
The prepare
method that is part of the administrative API pre-compiles an EPL statement and stores the precompiled statement in an EPPreparedStatement
object. This
object can then be used to efficiently start the parameterized statement multiple times.
You can insert substitution parameters as a single question mark character '?'
, making the substitution parameter addressable by index.
You can also insert substitution parameters using the following syntax, which makes the substitution parameter addressable by name:
?:name
All substitution parameters must either be unnamed (just '?'
) or named ('?:
name'
). It is not possible to mix the two styles.
If not assigning a name to substitution parameters, the engine assigns the first substitution parameter an index of 1 and subsequent parameters increment the index by one.
If assigning a name to each substitution parameter, the name can include slash (/
) characters and can occur multiple times.
Substitution parameters can be inserted into any EPL construct that takes an expression. They are therefore valid in any clauses such as the select-clause, from-clause filters, where-clause, group-by-clause, having-clause or order-by-clause, including data window parameters and pattern observers and guards. Substitution parameters cannot be used where a numeric constant is required rather than an expression and in SQL statements.
All substitution parameters must be replaced by actual values before a statement with substitution parameters can be started. Substitution parameters can be set to new values and new statements can be created from the same EPPreparedStatement
object more than once.
If not assigning a name to substitution parameters, replace the substitution parameter with an actual value using the setObject(int index, Object value)
method for each index, starting from 1.
If assigning a name to each substitution parameter, replace the substitution parameter with an actual value using the setObject(String name, Object value)
method for each name.
While the setObject
method allows substitution parameters to assume any actual value including application Java objects or enumeration values, the application must provide the correct type of substitution parameter that matches the requirements of the expression the parameter resides in.
In the following example of setting parameters on a prepared statement and starting the prepared statement, epService
represents an engine instance:
String stmt = "select * from com.chipmaker.ReadyEvent(line=?)"; EPPreparedStatement prepared = epService.getEPAdministrator().prepareEPL(stmt); prepared.setObject(1, 8); EPStatement statement = epService.getEPAdministrator().create(prepared);
The next example names the substitution parameter:
String stmt = "select * from ReadyEvent(line=?:lines/line1)"; EPPreparedStatement prepared = epService.getEPAdministrator().prepareEPL(stmt); prepared.setObject("lines/line1", 1);
The engine can report key processing metrics through the JMX platform mbean server by setting a single configuration flag described in Section 15.4.23, “Engine Settings Related to JMX Metrics”. For additional detailed reporting and metrics events, please read on.
Metrics reporting is a feature that allows an application to receive ongoing reports about key engine-level and statement-level metrics. Examples are the number of incoming events, the CPU time and wall time taken by statement executions or the number of output events per statement.
Metrics reporting is, by default, disabled. To enable reporting, please follow the steps as outlined in Section 15.4.24, “Engine Settings Related to Metrics Reporting”. Metrics reporting must be enabled at engine initialization time. Reporting intervals can be controlled at runtime via the ConfigurationOperations
interface available from the administrative API.
Your application can receive metrics at configurable intervals via EPL statement. A metric datapoint is simply a well-defined event. The events are EngineMetric
and StatementMetric
and the Java class representing the events can be found in the client API in package com.espertech.esper.client.metric
.
Since metric events are processed by the engine the same as application events, your EPL may use any construct on such events. For example, your application may select, filter, aggregate properties, sort or insert into a stream, named window or table all metric events the same as application events.
This example statement selects all engine metric events:
select * from com.espertech.esper.client.metric.EngineMetric
The next statement selects all statement metric events:
select * from com.espertech.esper.client.metric.StatementMetric
Make sure to have metrics reporting enabled since only then do listeners or subscribers to a statement such as above receive metric events.
The engine provides metric events after the configured interval of time has passed. By default, only started statements that have activity within an interval (in the form of event or timer processing) are reported upon.
The default configuration performs the publishing of metric events in an Esper daemon thread under the control of the engine instance. Metrics reporting honors externally-supplied time, if using external timer events.
Via runtime configuration options provided by ConfigurationOperations
, your application may enable and disable metrics reporting globally, provided that metrics reporting was enabled at initialization time. Your application may also enable and disable metrics reporting for individual statements by statement name.
Statement groups is a configuration feature that allows to assign reporting intervals to statements. Statement groups are described further in the Section 15.4.24, “Engine Settings Related to Metrics Reporting” section. Statement groups cannot be added or removed at runtime.
The following limitations apply:
If your Java VM version does not report current thread CPU time (most JVM do), then CPU time is reported as zero (use ManagementFactory.getThreadMXBean().isCurrentThreadCpuTimeSupported()
to determine if your JVM supports this feature).
Note: In some JVM the accuracy of CPU time returned is very low (in the order of 10 milliseconds off) which can impact the usefulness of CPU metrics returned. Consider measuring CPU time in your application thread after sending a number of events in the same thread, external to the engine as an alternative.
Your Java VM may not provide high resolution time via System.nanoTime
. In such case wall time may be inaccurate and inprecise.
CPU time and wall time have nanosecond precision but not necessarily nanosecond accuracy, please check with your Java VM provider.
There is a performance cost to collecting and reporting metrics.
Not all statements may report metrics: The engine performs certain runtime optimizations sharing resources between similar statements, thereby not reporting on certain statements unless resource sharing is disabled through configuration.
Engine metrics are properties of EngineMetric
events:
Table 14.7. EngineMetric Properties
Name | Description |
---|---|
engineURI | The URI of the engine instance. |
timestamp | The current engine time. |
inputCount | Cumulative number of input events since engine initialization time. Input events are defined as events send in via application threads as well as insert into events. |
inputCountDelta | Number of input events since last reporting period. |
scheduleDepth | Number of outstanding schedules. |
Statement metrics are properties of StatementMetric
. The properties are:
Table 14.8. StatementMetric Properties
Name | Description |
---|---|
engineURI | The URI of the engine instance. |
timestamp | The current engine time. |
statementName | Statement name, if provided at time of statement creation, otherwise a generated name. |
cpuTime | Statement processing CPU time (system and user) in nanoseconds (if available by Java VM, obtained from ThreadMXBean.getCurrentThreadCpuTime ). |
wallTime | Statement processing wall time in nanoseconds (based on System.nanoTime ). |
numInput | Number of input events to the statement. |
numOutputIStream | Number of insert stream rows output to listeners or the subscriber, if any. |
numOutputRStream | Number of remove stream rows output to listeners or the subscriber, if any. |
The totals reported are cumulative relative to the last metric report.
Your application may use the built-in XML and JSON formatters to render output events into a readable textual format, such as for integration or debugging purposes. This section introduces the utility classes in the client util
package for rendering events to strings. Further API information can be found in the JavaDocs.
The EventRenderer
interface accessible from the runtime interface via the getEventRenderer
method provides access to JSON and XML rendering. For repeated rendering of events of the same event type or subtypes, it is recommended to obtain a JSONEventRenderer
or XMLEventRenderer
instance and use the render
method provided by the interface. This allows the renderer implementations to cache event type metadata for fast rendering.
This example shows how to obtain a renderer for repeated rendering of events of the same type, assuming that statement
is an instance of EPStatement
:
JSONEventRenderer jsonRenderer = epService.getEPRuntime(). getEventRenderer().getJSONRenderer(statement.getEventType());
Assuming that event
is an instance of EventBean
, this code snippet renders an event into the JSON format:
String jsonEventText = jsonRenderer.render("MyEvent", event);
The XML renderer works the same:
XMLEventRenderer xmlRenderer = epService.getEPRuntime(). getEventRenderer().getXMLRenderer(statement.getEventType());
...and...
String xmlEventText = xmlRenderer.render("MyEvent", event);
If the event type is not known in advance or if you application does not want to obtain a renderer instance per event type for fast rendering, your application can use one of the following methods to render an event to a XML or JSON textual format:
String json = epService.getEPRuntime().getEventRenderer().renderJSON(event); String xml = epService.getEPRuntime().getEventRenderer().renderXML(event);
Use the JSONRenderingOptions
or XMLRenderingOptions
classes to control how events are rendered. To render specific event properties using a custom event property renderer, specify an EventPropertyRenderer
as part of the options
that renders event property values to strings. Please see the JavaDoc documentation for more information.
The JSON renderer produces JSON text according to the standard documented at http://www.json.org
.
The renderer formats simple properties as well as nested properties and indexed properties according to the JSON string encoding, array encoding and nested object encoding requirements.
The renderer does render indexed properties, it does not render indexed properties that require an index, i.e. if your event representation is backed by POJO objects and your getter method is getValue(int index)
, the indexed property values are not part of the JSON text. This is because the implementation has no way to determine how many index keys there are. A workaround is to have a method such as Object[] getValue()
instead.
The same is true for mapped properties that the renderer also renders. If a property requires a Map key for access, i.e. your getter method is getValue(String key)
, such property values are not part of the result text as there is no way for the implementation to determine the key set.
The XML renderer produces well-formed XML text according to the XML standard.
The renderer can be configured to format simple properties as attributes or as elements. Nested properties and indexed properties are always represented as XML sub-elements to the root or parent element.
The root element name provided to the XML renderer must be the element name of the root in the XML document and may include namespace instructions.
The renderer does render indexed properties, it does not render indexed properties that require an index, i.e. if your event representation is backed by POJO objects and your getter method is getValue(int index)
, the indexed property values are not part of the XML text. This is because the implementation has no way to determine how many index keys there are. A workaround is to have a method such as Object[] getValue()
instead.
The same is true for mapped properties that the renderer also renders. If a property requires a Map key for access, i.e. your getter method is getValue(String key)
, such property values are not part of the result text as there is no way for the implementation to determine the key set.
A plug-in loader is for general use with input adapters, output adapters or EPL code deployment or any other task that can benefits from being part of an Esper configuration file and that follows engine lifecycle.
A plug-in loader implements the com.espertech.esper.plugin.PluginLoader
interface and can be listed in the configuration.
Each configured plug-in loader follows the engine instance lifecycle: When an engine instance initializes, it instantiates each PluginLoader
implementation class listed in the configuration. The engine then invokes the lifecycle
methods of the PluginLoader
implementation class before and after the engine is fully initialized and before an engine instance is destroyed.
Declare a plug-in loader in your configuration XML as follows:
... <plugin-loader name="MyLoader" class-name="org.mypackage.MyLoader"> <init-arg name="property1" value="val1"/> </plugin-loader> ...
Alternatively, add the plug-in loader via the configuration API:
Configuration config = new Configuration(); Properties props = new Properties(); props.put("property1", "value1"); config.addPluginLoader("MyLoader", "org.mypackage.MyLoader", props);
Implement the init
method of your PluginLoader
implementation to receive
initialization parameters. The engine invokes this method before the engine is fully initialized, therefore your implementation
should not yet rely on the engine instance within the method body:
public class MyPluginLoader implements PluginLoader { public void init(String loaderName, Properties properties, EPServiceProviderSPI epService) { // save the configuration for later, perform checking } ...
The engine calls the postInitialize
method once the engine completed initialization
and to indicate the engine is ready for traffic.
public void postInitialize() { // Start the actual interaction with external feeds or the engine here } ...
The engine calls the destroy
method once the engine is destroyed or initialized for a second time.
public void destroy() { // Destroy resources allocated as the engine instance is being destroyed }
To access the plug-in at runtime, the getContext
method provides access under the name plugin-loader/
name:
epService.getContext().getEnvironment().get("plugin-loader/MyLoader");
As discussed in Section 5.2.7, “Annotation” an EPL annotation is an addition made to information in an EPL statement. The API and examples to interrogate annotations are described here.
You may use the getAnnotations
method of EPStatement
to obtain annotations specified for an EPL statement. Or when compiling an EPL expression to a EPStatementObjectModel
statement object model you may also query, change or add annotations.
The following example code demonstrates iterating over an EPStatement
statement's annotations and retrieving values:
String exampleEPL = "@Tag(name='direct-output', value='sink 1') select * from RootEvent"; EPStatement stmt = epService.getEPAdministrator().createEPL(exampleEPL); for (Annotation annotation : stmt.getAnnotations()) { if (annotation instanceof Tag) { Tag tag = (Tag) annotation; System.out.println("Tag name " + tag.name() + " value " + tag.value()); } }
The output of the sample code shown above is Tag name direct-output value sink 1
.
This chapter discusses how to select context partitions. Contexts are discussed in Chapter 4, Context and Context Partitions and the reasons for context partition selection are introduced in Section 4.9, “Operations on Specific Context Partitions”.
The section is only relevant when you declare a context. It applies to all different types of hash, partitioned, category, overlapping or other temporal contexts. The section uses a category context for the purpose of illustration. The API discussed herein is general and handles all different types of contexts including nested contexts.
Consider a category context that separates bank transactions into small, medium and large:
// declare category context create context TxnCategoryContext group by amount < 100 as small, group by amount between 100 and 1000 as medium, group by amount > 1000 as large from BankTxn
// retain 1 minute of events of each category separately context TxnCategoryContext select * from BankTxn#time(1 minute)
In order for your application to iterate one or more specific categories it is necessary to identify which category, i.e. which context partition, to iterate. Similarly for on-demand queries, to execute on-demand queries against one or more specific categories, it is necessary to identify which context partition to execute the on-demand query against.
Your application may iterate one or more specific context partitions using either the iterate
or safeIterate
method of EPStatement
by providing an implementation of the ContextPartitionSelector
interface.
For example, assume your application must obtain all bank transactions for small amounts. It may use the API to identify the category and iterate the associated context partition:
ContextPartitionSelectorCategory categorySmall = new ContextPartitionSelectorCategory() { public Set<String> getLabels() { return Collections.singleton("small"); } }; Iterator<EventBean> it = stmt.iterator(categorySmall);
Your application may execute on-demand queries against one or more specific context partitions by using the executeQuery
method on EPRuntime
or the execute
method on EPOnDemandPreparedQuery
and by providing an implementation of ContextPartitionSelector
.
On-demand queries execute against named windows and tables, therefore below EPL statement creates a named window which the engine manages separately for small, medium and large transactions according to the context declared earlier:
// Named window per category context TxnCategoryContext create window BankTxnWindow#time(1 min) as BankTxn
The following code demonstrates how to fire an on-demand query against the small and the medium category:
ContextPartitionSelectorCategory categorySmallMed = new ContextPartitionSelectorCategory() { public Set<String> getLabels() { return new HashSet<String>(Arrays.asList("small", "medium")); } }; epService.getEPRuntime().executeQuery( "select count(*) from BankTxnWindow", new ContextPartitionSelector[] {categorySmallMed});
The following limitations apply:
On-demand queries may not join named windows or tables that declare a context.
This section summarizes the selector interfaces that are available for use to identify and interrogate context partitions. Please refer to the JavaDoc documentation for package com.espertech.esper.client.context
and classes therein for additional information.
Use an implementation of ContextPartitionSelectorAll
or the ContextPartitionSelectorAll.INSTANCE
object to instruct the engine to consider all context partitions.
Use an implementation of ContextPartitionSelectorById
if your application knows the context partition ids to query. This selector instructs the engine to consider only those provided context partitions based on their integer id value. The engine outputs the context partition id in the built-in property context.id
.
Use an implementation of ContextPartitionSelectorFiltered
to receive and interrogate context partitions. Use the filter
method that receives a ContextPartitionIdentifier
to return a boolean indicator whether to include the context partition or not. The ContextPartitionIdentifier
provides information about each context partition. Your application may not retain ContextPartitionIdentifier
instances between filter
method invocations as the engine reuses the same instance. This selector is not supported with nested contexts.
Use an implementation of ContextPartitionSelectorCategory
with category contexts.
Use an implementation of ContextPartitionSelectorSegmented
with keyed segmented contexts.
Use an implementation of ContextPartitionSelectorHash
with hash segmented contexts.
Use an implementation of ContextPartitionSelectorNested
in combination with the selectors described above with nested contexts.
This chapter briefly discusses the API to manage context partitions. Contexts are discussed in Chapter 4, Context and Context Partitions.
The section is only relevant when you declare a context. It applies to all different types of hash, partitioned, category, overlapping or other temporal contexts.
The administrative API for context partitions is EPContextPartitionAdmin
. Use the getContextPartitionAdmin
method of the EPAdministrator
interface
to obtain said service.
The context partition admin API allows an application to:
Start, stop and destroy individual context partitions.
Interrogate the state and identifiers for existing context partitions.
Determine statements associated to a context and context nesting level.
Receive a callback when new contexts get created and destroyed or when context partitions are allocated and de-allocated.
Obtain context properties.
Stopping individual context partitions is useful to drop state, free memory and suspend a given context partition without stopping or destroying any associated statements. For example, assume a keyed segmented context per user id. To suspend and free the memory for a given user id your application can stop the user id's context partition. The engine does not allocate a context partition for this user id again, until your application destroys or starts the context partition.
Destroying individual context partitions is useful to drop state, free memory and deregister the given context partition without stopping or destroying any associated statements. For example, assume a keyed segmented context per user id. To deregister and free the memory for a given user id your application can destroy the user id's context partition. The engine can allocate a fresh context partition for this user id when events for this user id arrive.
Please see the JavaDoc documentation for more information.
Esper offers a listener and an assertions class to facilitate automated testing of EPL rules, for example when using a test framework such as JUnit
or TestNG
.
Esper does not require any specific test framework. If your application has the JUnit
test framework in classpath Esper uses junit.framework.AssertionFailedError
to indicate assertion errors, so as to integrate with continuous integration tools.
For detailed method-level information, please consult the JavaDoc of the package com.espertech.esper.client.scopetest
.
The class com.espertech.esper.client.scopetest.EPAssertionUtil
provides methods to assert or compare event property values as well as perform various array arthithmatic, sort events and convert events or iterators to arrays.
The class com.espertech.esper.client.scopetest.SupportUpdateListener
provides an UpdateListener
implementation that collects events and returns event data for assertion.
The class com.espertech.esper.client.scopetest.SupportSubscriber
provides a subscriber implementation that collects events and returns event data for assertion. The SupportSubscriberMRD
is a subscriber that accepts events multi-row delivery. The SupportSubscriber
and SupportSubscriberMRD
work similar to SupportUpdateListener
that is introduced in more detail below.
The below table only summarizes the most relevant assertion methods offered by EPAssertionUtil
. Methods provide multiple footprints that are not listed in detail below. Please consult the JavaDoc for additional method-level information.
Table 14.9. Method Summary for EPAssertionUtil
Name | Description |
---|---|
assertProps | Methods that assert that property values of a single |
assertPropsPerRow | Methods that assert that property values of multiple |
assertPropsPerRowAnyOrder | Same as above, but any row may match. Useful for unordered result sets. |
assertEqualsExactOrder | Methods that compare arrays, allowing |
assertEqualsAnyOrder | Same as above, but any row may match. Useful for unordered result sets. |
The below table only summarizes the most relevant methods offered by SupportUpdateListener
. Please consult the JavaDoc for additional information.
Table 14.10. Method Summary for SupportUpdateListener
Name | Description |
---|---|
reset | Initializes listener clearing current events and resetting the invoked flag. |
getAndClearIsInvoked | Returns the "invoked" flag indicating the listener has been invoked, and clears the flag. |
getLastNewData | Returns the last events received by the listener. |
getAndResetDataListsFlattened | Returns all events received by the listener as a pair. |
assertOneGetNewAndReset | Asserts that exactly one new event was received and no removed events, returns the event and resets the listener. |
assertOneGetNew | Asserts that exactly one new event was received and returns the event. |
The next code block is a short but complete programming example that asserts that the properties received from output events match expected value.
String epl = "select personName, count(*) as cnt from PersonEvent#length(3) group by personName"; EPStatement stmt = epService.getEPAdministrator().createEPL(epl); SupportUpdateListener listener = new SupportUpdateListener(); stmt.addListener(listener); epService.getEPRuntime().sendEvent(new PersonEvent("Joe")); EPAssertionUtil.assertProps(listener.assertOneGetNewAndReset(), "personName,cnt".split(","), new Object[]{"Joe", 1L});
A few additional examples are shown below:
String[] fields = new String[] {"property"}; EPAssertionUtil.assertPropsPerRow(listener.getAndResetDataListsFlattened(), fields, new Object[][]{{"E2"}}, new Object[][]{{"E1"}});
EPAssertionUtil.assertPropsPerRow(listener.getAndResetLastNewData(), fields, new Object[][]{{"E1"}, {"E2"}, {"E3"}});
assertTrue(listener.getAndClearIsInvoked());
Please refer to the Esper codebase test sources for more examples using the assertion class and the listener class.
Esper's static configuration object (Configuration
), in respect to classes, holds the fully-qualified class name and does not generally hold Class
references.
This is by design since the configuration object can be populated from XML.
At engine initialization time, Esper may look up classes using the fully-qualified class name. If using bean event types, the Class.forName()
call can be avoided by
using the runtime configuration API such as epService.getEPAdministrator().getConfiguration().addEventType(MyEvent.class);
.
When creating new EPL statements, Esper may need to look up a class by name and may need to obtain a class loader for CGLib FastClass
creation.
For deploying resources using the deployment admin API, Esper may also use a class loader to find resources as described in the JavaDoc.
Your application has full control over class-for-name and classloader use. OSGi environments can provide a specific class-for-name and class loader.
Please refer to Section 15.3, “Passing Services or Transient Objects”.