www.espertech.comDocumentation
The runtime takes on these functions:
Provide an environment to execute compiled modules.
Provide an environment to run compiled fire-and-forget queries.
Process incoming events and time against deployed modules.
Your application obtains a runtime from EPRuntimeProvider
. You may pass an arbitrary string-type runtime URI that uniquely identifies the runtime instance.
A runtime is an instance of EPRuntime
. Use the runtime as follows:
The runtime is a stateful service.
You may obtain and use any number of runtime instances in parallel, each runtime instance uniquely identified by the runtime URI.
You may share a runtime instance between threads.
All runtime methods are thread-safe.
Each runtime is completely independent of other runtimes.
The EPRuntimeProvider
class provides static methods that return EPRuntime
runtimes.
Each runtime has a unique runtime URI which can be any string value.
If your application does not pass a runtime URI then the default URI is default
(as defined by EPRuntimeProvider.DEFAULT_RUNTIME_URI
).
For the getRuntime
methods, your application can pass a runtime URI to obtain different runtimes. The EPRuntimeProvider
determines whether the provided runtime URI matches any existing runtime URIs and returns the existing runtime, or allocates a new runtime if none was found.
The getExistingRuntime
method takes a runtime URI and returns the existing runtime for that URI or null
if there is none.
The code snip below gets the default runtime. Subsequent calls to get the default runtime return the same runtime.
EPRuntime runtime = EPRuntimeProvider.getDefaultRuntime();
The next code gets a runtime for the runtime URI RFIDProcessor1
. Subsequent calls to get a runtime with the same runtime URI return the same runtime instance.
EPRuntime runtime = EPRuntimeProvider.getRuntime("RFIDProcessor1");
Since the getRuntime
methods return the same runtime for each URI there is no need to statically cache a runtime in your application.
You may also pass an optional Configuration
.
The next code snippet outlines a typical sequence of use:
// Configure the runtime, this is optional Configuration config = new Configuration(); config.configure("configuration.xml"); // load a configuration from file // Optionally set additional configuration values like so: // config.getCommon().add....(...); // Obtain a runtime EPRuntime runtime = EPRuntimeProvider.getDefaultRuntime(config); // Optionally, use initialize if the same runtime has been used before to start clean runtime.initialize(); // Destroy the runtime when no longer needed, frees up resources, releases the runtime URI runtime.destroy();
The EPRuntime
interface represents a runtime. Only the static methods of the EPRuntimeProvider
class allocate new runtimes.
A runtime is uniquely identified by runtime URI. The runtime URI is an arbitrary string. The default runtime has a runtime URI of default
.
A runtime provides these services:
Table 16.1. Choices For Receiving Statement Results
Service | Runtime Method | Description |
---|---|---|
EPDeploymentService | getDeploymentService | For deploying and undeploying compiled modules. |
EPEventService | getEventService | For processing events and advancing time. |
EPContextPartitionService | getContextPartitionService | For information about context partitions. |
EPVariableService | getVariableService | For access to variables. |
EPEventTypeService | getEventTypeService | For obtaining event types. |
EPFireAndForgetService | getFireAndForgetService | For executing fire-and-forget queries. |
EPDataFlowService | getDataFlowService | For managing data flows. |
EPMetricsService | getMetricsService | For control over metrics. |
EPRenderEventService | getRenderEventService | For rendering events. |
You can reset a runtime by calling the initialize
method. This operation resets the runtime to the configuration last provided to EPRuntimeProvider
. If no configuration is provided, an empty (default) configuration applies.
Your application must obtain new services from the initialized runtime as initialize marks existing services as invalid.
A runtime can be destroyed via the destroy
method. This frees all resources held by the runtime. After a call to destroy
the runtime can no longer be used.
You may register callbacks to receive notifications about runtime state. The runtime invokes any EPRuntimeStateListener
callbacks when a runtime instance is about to be destroyed and after a runtime has been initialized.
Use the addRuntimeStateListener
methods to register interest.
When destroying a runtime your application must make sure that threads that are sending events into the runtime have completed their work. More generally, the runtime should not be currently in use during or after the destroy operation.
All runtime instances are completely independent. Your application may not send EventBean
instances obtained from one runtime into a second runtime since the event type space between two runtimes is not shared.
Your application must first compile a module or obtain a compiled module before it can deploy. The object representation of a compiled module is EPCompiled
.
Call the deploy
method and pass the compiled module. The runtime loads the byte code and adds the information contained in the byte code, causing all the compiled module's statements to begin receiving events and time.
Deploying is an atomic operation. At deployment completion all statements of the deployment begin to see events arriving and time passing consistently. In case the deployment fails the runtime rolls back all deployment changes.
The runtime resolves dependencies of the compiled module upon its deployment. The runtime does not validate that the information about EPL-object dependencies that existed at compile-time matches the runtime EPL-objects.
For example, assume there is a compiled module by name compiledModuleM1
. Deploy as follows:
EPDeployment deployment = runtime.getDeploymentService().deploy(compiledModuleM1);
The runtime returns a EPDeployment
instance that contains the deployment id, the EPStatement
statement instances, module name and module properties.
The deployment id is an arbitrary string-type identifier that uniquely identifies the deployment in the runtime.
The undeploy
method takes the deployment id and undeploys the deployment. The undeployAll
method undeploys all deployments.
A compiled module may be deployed any number of times. Substitution parameters can be handy for parameterizing deployed modules.
Your application may deploy and undeploy using any thread and also within listener or subscriber code. If using Bean-style class-based events your application may not invoke deploy or undeploy methods as part of getter or setter code. Extension API code and plug-in single-row methods also may not invoke deploy or undeploy methods.
You may pass a DeploymentOptions
instance. Deployment options provide deployment callbacks and other deploy-time parameters:
Provide a deployment id. If none is provided the runtime generates a unique deployment id.
Provide substitution parameter values for parameterized modules.
Provide or override statement names.
Provide a runtime statement user object that gets associated to the statement and that can be obtained from an EPStatement
with getUserObjectRuntime
.
Please consult the JavaDoc for more information.
The compiled module may have substitution parameters as explained in the compiler documentation.
All substitution parameters must be replaced by actual values before a compiled module with substitution parameters can be deployed. A compiled module may be deployed multiple times. Substitution parameters can be set to new values for every deployment.
To set substitution parameter values pass a Deployment Options
object to the deploy
method that provides a StatementSubstitutionParameterOption
.
If not assigning a name to substitution parameters, replace the substitution parameter with an actual value using the setObject(int index, Object value)
method for each index, starting from 1.
If assigning a name to each substitution parameter, replace the substitution parameter with an actual value using the setObject(String name, Object value)
method for each name.
While the setObject
method allows substitution parameters to assume any actual value including application Java objects or enumeration values, the application must provide the correct type of substitution parameter that matches the requirements of the expression the parameter resides in.
The below sample code compiles and deploys a parameterized module:
String stmt = "select * from PersonEvent(firstName=?::string)"; Configuration configuration = new Configuration(); configuration.getCommon().addEventType(PersonEvent.class); CompilerArguments compilerArguments = new CompilerArguments(configuration); EPCompiled compiled = EPCompilerProvider.getCompiler().compile(stmt, compilerArguments); DeploymentOptions deploymentOptions = new DeploymentOptions(); deploymentOptions.setStatementSubstitutionParameter(prepared -> prepared.setObject(1, "Joe")); EPDeployment deployment = runtime.getDeploymentService().deploy(compiled, deploymentOptions); EPStatement statement = deployment.getStatements()[0];
Your application may deploy multiple compiled modules by calling the rollout
method of EPDeploymentService
. Roll-out deploys each compiled module, either deploying all compiled modules or deploying none of the compiled modules.
Internally to the runtime, a roll-out generally follows these steps:
For each compiled module, determine the deployment id or use the deployment id when provided in the deployment options; Check that all deployment ids do not already exist
For each compiled module, load the compilation unit via classloader and validate basic class-related information such as manifest information and version
For each compiled module, check deployment preconditions and resolve deployment dependencies on EPL objects
For each compiled module, initialize statement-internal objects
For each compiled module, perform internal deployment of each statement of each module
In case any of the above steps fail the runtime completely rolls back all changes.
Roll-out does not re-order compiled modules and expects EPCompiled
instances to be ordered according to module dependencies (if any).
For multiple rollouts or for atomically adding listeners and subscribers use Section 16.4.3, “Atomic Deployment Management”.
Your application can concurrently send events into the runtime while deploying and undeploying statements and adding or removing listeners. It is safe to undeploy and deploy compiled modules while sending in events from other threads concurrently.
However in some cases your application may need more control over deployment, for example when deploying multiple modules or when attaching custom listener code.
Your application can use the API described below to obtain a lock and perform deployment actions as an atomic unit. For example, if your application would like to undeploy and re-deploy as a single atomic unit, while at the same time sending events into the runtime from different threads, it can obtain a lock to ensure that no events are concurrently processed while the operations take place.
Deploying or undeploying a single compiled module is already an atomic operation by default and does not require taking an explicit lock. If your application would like to deploy multiple compiled modules or add custom listeners or subscribers during deployment it may obtain a lock as discussed below.
The below code sample obtains the runtime exclusive write lock to perform multiple management operations as a unit, excluding concurrent processing of events.
runtime.getRuntimeInstanceWideLock().writeLock().lock(); // Start atomic management unit. // Any events concurrently being processed by other threads must complete before the code completes obtaining the lock. // Any events sent in by other threads will await the release of the lock. try { // Perform operations such as : // - deploy and/or undeploy multiple compiled modules (deployment admin API) // - set statement listeners and subscribers while deploying // There is no need to obtain this lock when deploying or undeploying a single module. // The lock is reentrant and can be safely taken multiple times by the same thread. // Make sure you use "try" and "finally" just like we have it here. } finally { // Complete atomic management unit. // Any events sent in by other threads will now continue processing against the changed set of statements. runtime.getRuntimeInstanceWideLock().writeLock().unlock(); }
There should always be a finally
block in your code to ensure the lock is released in all cases.
A compiled module contains zero, one or multiple statements. You can attach callbacks (listeners, subscribers) to statements to receive results (aka push, observer pattern). You can iterate statement current results (aka. poll).
Each statement is uniquely identified in the runtime by the combination of deployment id and statement name. The compiler or runtime always assign a statement name if none was provided.
The EPStatement
instance represents the statement. Your application receives statements when deploying a module by calling getStatements
on EPDeployment
.
Your application may also look up a statement by it's deployment id and statement name using the getStatement
method on EPDeploymentService
.
For NEsper .NET also see Section J.14, “.NET API - Receiving Statement Results”.
Esper provides three choices for your application to receive statement results. Your application can use all three mechanisms alone or in any combination for each statement. The choices are:
Table 16.2. Choices For Receiving Statement Results
Name | Methods on EPStatement | Description |
---|---|---|
Listener Callbacks | addListener and removeListener |
Your application provides implementations of the The runtime continuously indicates results to all listeners. |
Subscriber Object | setSubscriber |
Requires setting the Your application provides a POJO (plain Java object) that exposes methods to receive statement results.
The name of the method that a subscriber object provides to receive results is The runtime continuously indicates results to the single subscriber.
This is the fastest method to receive statement results, as the runtime delivers strongly-typed results directly to your application objects without the need for
building an There can be at most one subscriber object registered per statement. If you require more than one listener, use the listener instead (or in addition). The subscriber object is bound to the statement with a strongly typed support which ensure direct delivery of new events without type conversion. This optimization is made possible because there can only be zero or one subscriber object per statement. |
Pull API | safeIterator and iterator |
Your application asks the statement for results and receives a set of events via This is useful if your application does not need continuous indication of new results in real-time. |
The runtime calls application-provided update listeners and subscribers for output. These commonly encapsulate the actions to take when there is output. This design decouples statements from actions and places actions outside of EPL. It allows actions to change independently from statements: A statement does not need to be updated when its associated action(s) change.
While action-taking, in respect to the code or script taking action, is not a part of the EPL language, here are a few noteworthy points.
Through the use of EPL annotations you can attach information to EPL that can be used by applications to flexibly determine actions.
The insert into
-clause can be used to send results into a further stream and input and output adapters or data flows can exist to process output events from that stream.
Also the data flow EPStatementSource
operator can be used to hook up actions declaratively.
The DeploymentStateListener
can inform your application of newly-deployed statements and currently-undeployed statements.
Your application may attach one or more listeners, zero or one single subscriber and in addition use the pull API on the same statement. There are no limitations to the use of iterator, subscriber or listener alone or in combination to receive statement results.
The best delivery performance can generally be achieved by attaching a subscriber and by not attaching listeners. The runtime is aware of the listeners and subscriber attached to a statement. The runtime uses this information internally to reduce statement overhead. For example, if your statement does not have listeners or a subscriber attached, the runtime does not need to continuously generate results for delivery.
If your application attaches both a subscriber and one or more listeners then the subscriber receives the result first before any of the listeners.
If your application attaches more than one listener then the UpdateListener
listeners receive results in the order they were added to the statement.
To change the order of delivery among listeners your application can add and remove listeners at runtime.
If you have configured outbound threading, it means a thread from the outbound thread pool delivers results to the subscriber and listeners instead of the processing or event-sending thread.
If outbound threading is turned on, we recommend turning off the runtime setting preserving the order of events delivered to listeners as described in Section 17.6.1.1, “Preserving the Order of Events Delivered to Listeners”. If outbound threading is turned on statement execution is not blocked for the configured time in the case a subscriber or listener takes too much time.
The compiler option allowSubscriber
must be set at compile-time.
A subscriber object is a direct binding of statement results to an object. The object, receives statement results via method invocation. The subscriber class does not need to implement an interface or extend a superclass. Only one subscriber object may be set for a statement.
Subscriber objects have several advantages over listeners. First, they offer a substantial performance benefit: Statement results are delivered directly to your method(s) through Java virtual machine method calls, and there is no intermediate representation (EventBean
). Second, as subscribers receive strongly-typed parameters, the subscriber code tends to be simpler.
This chapter describes the requirements towards the methods provided by your subscriber class.
The runtime can deliver results to your subscriber in two ways:
Each evert in the insert stream results in a method invocation, and each event in the remove stream results in further method invocations. This is termed row-by-row delivery.
A single method invocation that delivers all rows of the insert and remove stream. This is termed multi-row delivery.
In the case that your subscriber object wishes to receive the EPStatement
instance along with output data,
please add EPStatement
as the very first parameter of any of the delivery method footprints that are discussed next.
For example, your statement may be:
select count(*) from OrderEvent
Your subscriber class exposes the method:
public void update(EPStatement statement, long currentCount) {...}
Your subscriber class must provide a method by name update
to receive insert stream events row-by-row. The number and types of parameters declared by the update
method must match the number and types of columns as specified in the select
clause, in the same order as in the select
clause.
For example, if your statement is:
select orderId, price, count(*) from OrderEvent
Then your subscriber update
method looks as follows:
public class MySubscriber { ... public void update(String orderId, double price, long count) {...} ... }
Each method parameter declared by the update
method must be assignable from the respective column type as listed in the select
-clause, in the order selected. The assignability rules are:
Widening of types follows Java standards. For example, if your select
clause selects an integer value, the method parameter for the same column can be typed int, long, float or double (or any equivalent boxed type).
Auto-boxing and unboxing follows Java standards. For example, if your select
clause selects an java.lang.Integer
value, the method parameter for the same column can be typed int
. Note that if your select
clause column may generate null
values, an exception may occur at runtime unboxing the null
value.
Interfaces and super-classes are honored in the test for assignability. Therefore java.lang.Object
can be used to accept any select
clause column type
In the case that your subscriber class offers multiple update
method footprints, the runtime selects the closest-matching footprint by comparing the output types and method parameter types. The runtime prefers the update method that is an exact match of types, followed by an update method that requires boxing or unboxing, followed by an update method that requires widening and finally any other allowable update method.
Within the above criteria, in the case that your subscriber class offers multiple update
method footprints with same method parameter types, the runtime prefers the update method that has EPStatement
as the first parameter.
If your select
clause contains one or more wildcards (*), then the equivalent parameter type is the underlying event type of the stream selected from.
For example, your statement may be:
select *, count(*) from OrderEvent
Then your subscriber update
method looks as follows:
public void update(OrderEvent orderEvent, long count) {...}
In a join, the wildcard expands to the underlying event type of each stream in the join in the order the streams occur in the from
clause. An example statement for a join is:
select *, count(*) from OrderEvent order, OrderHistory hist
Then your subscriber update
method should be:
public void update(OrderEvent orderEvent, OrderHistory orderHistory, long count) {...}
The stream wildcard syntax and the stream name itself can also be used:
select hist.*, order from OrderEvent order, OrderHistory hist
The matching update
method is:
public void update(OrderHistory orderHistory, OrderEvent orderEvent) {...}
Alternatively, your update
method may simply choose to accept java.util.Map
as a representation for each row. Each column in the select
clause is
then made an entry in the resulting Map
. The Map
keys are the column name if supplied, or the expression string itself for columns without a name.
The update
method for Map
delivery is:
public void update(Map row) {...}
The runtime also supports delivery of select
clause columns as an object array. Each item in the object array represents a column in the select
clause. The update
method then looks as follows:
public void update(Object[] row) {...}
Your subscriber receives remove stream events if it provides a method named updateRStream
. The method must accept the same number and types of parameters as the update
method (including EPStatement
if present).
An example statement:
select orderId, count(*) from OrderEvent#time(20 sec) group by orderId
Then your subscriber update
and updateRStream
methods should be:
public void update(String, long count) {...} public void updateRStream(String orderId, long count) {...}
If your subscriber requires a notification for begin and end of event delivery, it can expose methods by name updateStart
and updateEnd
.
The updateStart
method must take two integer parameters that indicate the number of events of the insert stream and remove stream to be delivered. The runtime invokes the updateStart
method immediately prior to delivering events to the update
and updateRStream
methods.
The updateEnd
method must take no parameters. The runtime invokes the updateEnd
method immediately after delivering events to the update
and updateRStream
methods.
An example set of delivery methods:
// Called by the runtime before delivering events to update methods public void updateStart(int insertStreamLength, int removeStreamLength) // To deliver insert stream events public void update(String orderId, long count) {...} // To deliver remove stream events public void updateRStream(String orderId, long count) {...} // Called by the runtime after delivering events public void updateEnd() {...}
In place of row-by-row delivery, your subscriber can receive all events in the insert and remove stream via a single method invocation. This is applicable when an EPL delivers multiple output rows for a given input event or time advancing, for example when multiple pattern matches occur for the same incoming event, for a join producing multiple output rows or with output rate limiting, for example.
The event delivery follow the scheme as described earlier in Section 16.5.2.2.2, “Row Delivery as Map and Object Array ”. The subscriber class must provide one of the following methods:
Table 16.3. Update Method for Multi-Row Delivery of Underlying Events
Method | Description |
---|---|
update(Object[][] insertStream, Object[][] removeStream) |
The first dimension of each Object array is the event row, and the second dimension is the column matching the column order of the statement |
update(Map[] insertStream, Map[] removeStream) |
Each map represents one event, and Map entries represent columns of the statement |
If your select
clause contains a single wildcard (*) or wildcard stream selector, the subscriber object may also directly receive arrays of the underlying events. In this case, the subscriber class should provide a method update(
Underlying[] insertStream,
Underlying[] removeStream)
, such that Underlying represents the class of the underlying event.
For example, your statement may be:
select * from OrderEvent#time(30 sec)
Your subscriber class exposes the method:
public void update(OrderEvent[] insertStream, OrderEvent[] removeStream) {...}
In the case that your subscriber object wishes to receive no data from a statement please follow the instructions here.
You statement must select a single null
value.
For example, your statement may be:
select null from OrderEvent(price > 100)
Your subscriber class exposes the method:
public void update() {...}
For NEsper .NET also see Section J.15, “.NET API - Adding Listeners”.
Your application can subscribe to updates posted by a statement via the addListener
and removeListener
methods on EPStatement
. Your application must to provide an implementation of the UpdateListener
interface to the statement:
UpdateListener myListener = new MyUpdateListener(); countStmt.addListener(myListener);
Statements publish old data and new data to registered UpdateListener
listeners.
New data published by statements is the events representing the new values of derived data held by the statement.
Old data published by statements consists of the events representing the prior values of derived data held by the statement.
UpdateListener
listeners receive multiple result rows in one invocation by the runtime: the new data and old data parameters to your listener are array parameters. For example, if your application uses one of the batch data windows, or your application creates a pattern that matches multiple times when a single event arrives, then the runtime indicates such multiple result rows in one invocation and your new data array carries two or more rows.
To indicate results the runtime invokes the following method on UpdateListener
listeners: update(EventBean[] newEvents, EventBean[] oldEvents, EPStatement statement, EPRuntime runtime)
The addListenerWithReplay
method provided by EPStatement
makes it possible to send a snapshot of current statement results to a listener when the listener is added.
When using the addListenerWithReplay
method to register a listener, the listener receives current statement results as the first call to the update method of the listener, passing in the newEvents parameter the current statement results as an array of zero or more events. Subsequent calls to the update method of the listener are statement results.
Current statement results are the events returned by the iterator
or safeIterator
methods.
Delivery is atomic: Events occurring during delivery of current results to the listener are guaranteed to be delivered in a separate call and not lost. The listener implementation should thus minimize long-running or blocking operations to reduce lock times held on statement-level resources.
Subscribing to events posted by a statement is following a push model. The runtime pushes data to listeners when events are received that cause data to change or patterns to match. Alternatively, you need to know that statements serve up data that your application can obtain via the safeIterator
and iterator
methods on EPStatement
. This is called the pull API and can come in handy if your application is not interested in all new updates, and only needs to perform a frequent or infrequent poll for the latest data.
The safeIterator
method on EPStatement
returns a concurrency-safe iterator returning current statement results, even while concurrent threads may send events into the runtime for processing. The runtime employs a read-write lock per context partition and obtains a read lock for iteration. Thus safe iterator guarantees correct results even as events are being processed by other threads and other context partitions. The cost is that the iterator obtains and holds zero, one or multiple context partition locks for that statement that must be released via the close
method on the SafeIterator
instance.
The iterator
method on EPStatement
returns a concurrency-unsafe iterator. This iterator is only useful for applications that are single-threaded, or applications that themselves perform coordination between the iterating thread and the threads that send events into the runtime for processing. The advantage to this iterator is that it does not hold a lock.
When statements are used with contexts and context partitions, the APIs to identify, filter and select context partitions for statement iteration are described in Section 16.16, “Context Partition Selection”.
The next code snippet shows a short example of use of safe iterators:
EPStatement statement = epAdmin.createEPL("select avg(price) as avgPrice from MyTick"); // .. send events into the runtime // then use the pull API... SafeIterator<EventBean> safeIter = statement.safeIterator(); try { for (;safeIter.hasNext();) { // .. process event .. EventBean event = safeIter.next(); System.out.println("avg:" + event.get("avgPrice"); } } finally { safeIter.close(); // Note: safe iterators must be closed }
This is a short example of use of the regular iterator that is not safe for concurrent event processing:
double averagePrice = (Double) eplStatement.iterator().next().get("average");
The safeIterator
and iterator
methods can be used to pull results out of all statements, including statements that join streams, contain aggregation functions, pattern statements, and statements that contain a where
clause, group by
clause, having
clause or order by
clause.
For statements without an order by
clause, the iterator
method returns events in the order maintained by the data window. For statements that contain an order by
clause, the iterator
method returns events in the order indicated by the order by
clause.
Consider using the on-select
clause and a named window if your application requires iterating over a partial result set or requires indexed access for fast iteration; Note that on-select
requires that you sent a trigger event, which may contain the key values for indexed access.
Esper places the following restrictions on the pull API and usage of the safeIterator
and iterator
methods:
In multithreaded applications, use the safeIterator
method. Note: make sure your application closes the iterator via the close
method when done, otherwise the iterated statement context partitions stay locked and event processing for statement context partitions does not resume.
In multithreaded applications, the iterator
method does not hold any locks. The iterator returned by this method does not make any guarantees towards correctness of results and fail-behavior, if your application processes events into the runtime by multiple threads. Use the safeIterator
method for concurrency-safe iteration instead.
Since the safeIterator
and iterator
methods return events to the application immediately, the iterator does not honor an output rate limiting clause, if present. That is, the iterator returns results as if there is no output-rate clause for the statement in statements without grouping or aggregation. For statements with grouping or aggregation, the iterator in combination with an output clause returns last output group and aggregation results. Use a separate statement and the insert into
clause to control the output rate for iteration, if so required.
When iterating a statement that operates on an unbound stream (no data window declared), please note the following:
When iterating a statement that groups and aggregates values from an unbound stream and that specifies output snapshot
, the runtime retains groups and aggregations for output as iteration results or upon the output snapshot condition .
When iterating a statement that groups and aggregates values from an unbound stream and that does not specify output snapshot
, the runtime only retains the last aggregation values and the iterated result contains only the last updated group.
When iterating a statement that operates on an unbound stream the iterator returns no rows. This behavior can be changed by specifying either the @IterableUnbound
annotation or by changing the global view resources configuration.
An EventBean
object represents a row (event) in your statement's result set. Each EventBean
object has an associated EventType
object providing event metadata.
An UpdateListener
implementation receives one or more EventBean
events with each invocation. Via the iterator
method on EPStatement
your application can poll or read data out of statements. Statement iterators also return EventBean
instances.
Each statement provides the event type of the events it produces, available via the getEventType
method on EPStatement
.
An EventType
object encapsulates all the metadata about a certain type of events. As Esper supports an inheritance hierarchy for event types, it also provides information about super-types to an event type.
An EventType
object provides the following information:
For each event property, it lists the property name and type as well as flags for indexed or mapped properties and whether a property is a fragment.
The direct and indirect super-types to the event type.
Value getters for property expressions.
Underlying class of the event representation.
For each property of an event type, there is an EventPropertyDescriptor
object that describes the property.
The EventPropertyDescriptor
contains flags that indicate whether a property is an indexed (array) or a mapped property and whether access to property values require an integer index value (indexed properties only) or string key value (mapped properties only). The descriptor also contains a fragment flag that indicates whether a property value is available as a fragment.
The term fragment means an event property value that is itself an event, or a property value that can be represented as an event. The getFragmentType
on EventType
may be used to determine a fragment's event type in advance.
A fragment event type and thereby fragment events allow navigation over a statement's results even if the statement result contains nested events or a graph of events. There is no need to use the Java reflection API to navigate events, since fragments allow the querying of nested event properties or array values, including nested Java classes.
When using the Map or Object-array event representation, any named Map type or Object-array type nested within a Map or Object-array as a simple or array property is also available as a fragment. When using Java objects either directly or within Map or Object-array events, any object that is neither a primitive or boxed built-in type, and that is not an enumeration and does not implement the Map interface is also available as a fragment.
The nested, indexed and mapped property syntax can be combined to a property expression that may query an event property graph. Most of the methods on the EventType
interface allow a property expression to be passed.
Your application may use an EventType
object to obtain special getter-objects. A getter-object is a fast accessor to a property value of an event of a given type. All getter objects implement the EventPropertyGetter
interface. Getter-objects work only for events of the same type or sub-types as the EventType
that provides the EventPropertyGetter
. The performance section provides additional information and samples on using getter-objects.
An event object is an EventBean
that provides:
The property value for a property given a property name or property expression that may include nested, indexed or mapped properties in any combination.
The event type of the event.
Access to the underlying event object.
The EventBean
fragment or array of EventBean
fragments given a property name or property expression.
The getFragment
method on EventBean
and EventPropertyGetter
return the fragment EventBean
or array of EventBean
, if the property is itself an event
or can be represented as an event. Your application may use EventPropertyDescriptor
to determine which properties are also available as fragments.
The underlying event object of an EventBean
can be obtained via the getUnderlying
method. Please see Chapter 3, Event Representations for more information on different event representations.
From a threading perspective, it is safe to retain and query EventBean
and EventType
objects in multiple threads.
Consider a statement that returns the symbol, count of events per symbol and average price per symbol for tick events. Our sample statement uses the event type: StockTickEvent
. Assume that this event type was declared previously and exposes a symbol
property of type String and a price
property of type (Java primitive) double.
select symbol, avg(price) as avgprice, count(*) as mycount from StockTickEvent group by symbol
The next table summarizes the property names and types as posted by the statement above:
Table 16.4. Properties Offered by Sample Statement Aggregating Price
Name | Type | Description | Java code snippet |
---|---|---|---|
symbol | java.lang.String | Value of symbol event property | eventBean.get("symbol") |
avgprice | java.lang.Double | Average price per symbol | eventBean.get("avgprice") |
mycount | java.lang.Long | Number of events per symbol | eventBean.get("mycount") |
A code snippet out of a possible UpdateListener
implementation to this statement may look as below:
String symbol = (String) newEvents[0].get("symbol"); Double price= (Double) newEvents[0].get("avgprice"); Long count= (Long) newEvents[0].get("mycount");
The runtime supplies the boxed java.lang.Double
and java.lang.Long
types as property values rather than primitive Java types. This is because aggregated values can return a null
value to indicate that no data is available for aggregation. Also, in a select statement that computes expressions, the underlying event objects to EventBean
instances are either of type Object[]
(object-array) or of type java.util.Map
.
Use statement.getEventType().getUnderlyingType()
to inspect the underlying type for all events delivered to listeners. Whether the runtime delivers Map or Object-array events to listeners can be specified as follows. If the statement provides the @EventRepresentation(objectarray)
annotation the runtime delivers the output events as object array. If the statement provides the @EventRepresentation(map)
annotation the runtime delivers output events as a Map. If neither annotation is provided, the runtime delivers the configured default event representation as discussed in Section 17.4.9.1, “Default Event Representation”.
Consider the next statement that specifies a wildcard selecting the same type of event:
select * from StockTickEvent where price > 100
The property names and types provided by an EventBean
query result row, as posted by the statement above are as follows:
Table 16.5. Properties Offered by Sample Wildcard-Select Statement
Name | Type | Description | Java code snippet |
---|---|---|---|
symbol | java.lang.String | Value of symbol event property | eventBean.get("symbol") |
price | double | Value of price event property | eventBean.get("price") |
As an alternative to querying individual event properties via the get
methods, the getUnderlying
method on EventBean
returns the underlying object representing the statement result.
In the sample statement that features a wildcard-select, the underlying event object is of type org.sample.StockTickEvent
:
StockTickEvent tick = (StockTickEvent) newEvents[0].getUnderlying();
Composite events are events that aggregate one or more other events. Composite events are typically created by the runtime for statements that join two event streams, and for event patterns in which the causal events are retained and reported in a composite event. The example below shows such an event pattern.
// Look for a pattern where BEvent follows AEvent select * from pattern [a=AEvent -> b=BEvent]
// Example listener code public class MyUpdateListener implements UpdateListener { public void update(EventBean[] newData, EventBean[] oldData, EPStatement statement, EPRuntime runtime) { System.out.println("a event=" + newData[0].get("a")); System.out.println("b event=" + newData[0].get("b")); } }
Note that the update
method can receive multiple events at once as it accepts an array of EventBean
instances. For example, a time batch window may post multiple events to listeners representing a batch of events received during a given time period.
Pattern statements can also produce multiple events delivered to update listeners in one invocation. The pattern statement below, for instance, delivers an event for each A event that was not followed by a B event with the same id
property within 60 seconds of the A event. The runtime may deliver all matching A events as an array of events in a single invocation of the update
method of each listener to the statement:
select * from pattern[every a=A -> (timer:interval(60 sec) and not B(id=a.id))]
A code snippet out of a possible UpdateListener
implementation to this statement that retrives the events as fragments may look as below:
EventBean a = (EventBean) newEvents[0].getFragment("a"); // ... or using a nested property expression to get a value out of A event... double value = (Double) newEvent[0].get("a.value");
Some pattern objects return an array of events. An example is the unbound repeat operator. Here is a sample pattern that collects all A events until a B event arrives:
select * from pattern [a=A until b=B]
A possible code to retrieve different fragments or property values:
EventBean[] a = (EventBean[]) newEvents[0].getFragment("a"); // ... or using a nested property expression to get a value out of A event... double value = (Double) newEvent[0].get("a[0].value");
As discussed in Section 5.2.7, “Annotation” an EPL annotation is an addition made to statement information. The API and examples to interrogate annotations are described here.
You may use the getAnnotations
method of EPStatement
to obtain annotations specified for a statement. Or when compiling an EPL expression to a EPStatementObjectModel
statement object model you may also query, change or add annotations.
The following example code demonstrates iterating over an EPStatement
statement's annotations and retrieving values:
String exampleEPL = "@Tag(name='direct-output', value='sink 1') select * from RootEvent"; Configuration configuration = new Configuration(); configuration.getCommon().addEventType("RootEvent", Collections.emptyMap()); // add an event type without properties CompilerArguments compilerArguments = new CompilerArguments(configuration); EPCompiled compiled = EPCompilerProvider.getCompiler().compile(stmt, compilerArguments); EPDeployment deployment = runtime.getDeploymentService().deploy(compiled); EPStatement stmt = deployment.getStatements()[0]; for (Annotation annotation : stmt.getAnnotations()) { if (annotation instanceof Tag) { Tag tag = (Tag) annotation; System.out.println("Tag name " + tag.name() + " value " + tag.value()); } }
The output of the sample code shown above is Tag name direct-output value sink 1
.
The EPEventService
interface is used to send events and advance time. Obtain the event service from a runtime by calling getEventService
on EPRuntime
.
This section focuses on processing events. For more information on controlling time using the event service please skip forward to Section 16.9, “Controlling Time-Keeping”.
Your application invokes any of the sendEventType
methods listed below and must provide an event type name along with the actual event object:
Table 16.6. Send-Event Methods
Method | Description |
---|---|
sendEventBean(Object event, String eventTypeName) | Call when the event is a Bean-style event. The event type name should be associated to a class event representation. |
sendEventMap(Map<String, Object> event, String eventTypeName) | Call when the event is a map. The event type name should be associated to a map event representation. |
sendEventObjectArray(Object[] event, String eventTypeName); | Call when the event is an object-array. The event type name should be associated to an object-array event representation. |
sendEventXMLDOM(Node node, String eventTypeName); | Call when the event is a DOM-Node. The event type name should be associated to an XML event representation. |
void sendEventJson(String json, String jsonEventTypeName); | Call when the event is an JSON-formatted string-type document. The event type name should be associated to a JSON event representation. |
void sendEventAvro(Object avroGenericDataDotRecord, String avroEventTypeName); | Call when the event is an Avro object. The event type name should be associated to an Avro event representation. |
The Chapter 3, Event Representations section explains the types of event representations.
The below sample code assumes that the event type name MarketDataBean
refers to a class event representation that matches the class MarketDataBean
:
EPRuntime runtime = EPRuntimeProvider.getDefaultRuntime(); EPEventService eventService = runtime.getEventService(); // Send an example event containing stock market data eventService.sendEventBean(new MarketDataBean("IBM", 75.0), "MarketDataBean");
Events, in theoretical terms, are observations of a state change that occurred in the past. Since you cannot change an event that happened in the past, events are best modelled as immutable objects.
If you find that your application requires processing events and control over time only for specific deployments and not for other deployments, please read up on Section 16.21, “Stages”.
The runtime relies on events that are sent into the runtime to not change their state. Typically, applications create a new event object for every new event, to represent that new event. Application should not modify an existing event that was sent into the runtime.
Another important method in the runtime interface are the routeEventType
methods. These methods are designed for use by UpdateListener
and
subscriber implementations as well as runtime extensions that need to send events into a runtime to avoid the possibility of a stack overflow due to nested calls
to sendEvent
and to ensure correct processing of the current and routed event.
Note that if outbound-threading is enabled, listeners and subscribers should use sendEvent
and not routeEvent
.
The EventSender
interface processes event objects that are of a known type. This facility can reduce the overhead of event object reflection and type lookup as an event sender
is always associated to a single concrete event type.
Use the method getEventSender(String eventTypeName)
to obtain an event sender for processing events of the named type:
EventSender sender = runtime.getEVentService().getEventSender("MyEvent"); sender.sendEvent(myEvent);
For events backed by a Java class (JavaBean events), the event sender ensures that the event object equals the underlying class, or implements or extends the underlying class for the given event type name.
For events backed by a java.util.Map
(Map events), the event sender does not perform any checking other than checking that the event object implements Map.
For events backed by a Object[]
(Object-array events), the event sender does not perform any checking other than checking that the event object implements Object[]. The array elements must be in the exact same order of properties as declared and array length must always be at least the number of properties declared.
For JSON events, the event sender checks that the event object is a string-type value or is an JsonEventUnderlying
object returned by the parse
method of EventSenderJson
. The JSON document should match the fields defined in create schema
.
For Avro events backed by a Apache Avro GenericData.Record
, the event sender does not perform any checking other than checking that the event object is a GenericData.Record. The schema associated to the record should match the event type's Avro schema.
For events backed by a org.w3c.Node (XML DOM events), the event sender checks that the root element name equals the root element name for the event type.
Your application can register an implementation of the UnmatchedListener
interface with the event service by calling the setUnmatchedListener
method to receive events that were not matched by any statement.
Events that can be unmatched are all events that your application sends into the runtime via one of the sendEvent
or routeEvent
methods, or that have been generated via an insert into
clause.
For an event to become unmatched by any statement, the event must not match any statement's event stream filter criteria. Note that the EPL where
clause or having
clause are not considered part of the filter criteria for a stream, as explained by example below.
In the following statement a MyEvent event with a 'quantity' property value of 5 or less does not match this statement's event stream filter criteria. The runtime delivers such an event to the registered UnmatchedListener
instance provided no other statement matches on the event:
select * from MyEvent(quantity > 5)
For patterns, if no pattern sub-expression is active for an event type, an event of that type also counts as unmatched in regards to the pattern statement.
The EPFireAndForgetService
interface offers methods to execute fire-and-forget queries. Obtain the fire-and-forget service from a runtime by calling getFireAndForgetService
on EPRuntime
.
As your application may not require streaming results and may not know each statement in advance, the fire-and-forget query facility provides for ad-hoc on-demand execution of an EPL query.
Fire-and-forget queries are not continuous in nature: The fire-and-forget query runtime executes the query once and returns all result rows to the application. Fire-and-forget query execution is very lightweight as the runtime performs no statement deployment and the query leaves no traces within the runtime.
Esper provides the facility to explicitly index named windows and tables to speed up fire-and-forget queries and statements. Please consult Section 6.9, “Explicitly Indexing Named Windows and Tables” for more information.
When named windows and tables are used with contexts and context partitions, the APIs to identify, filter and select context partitions for fire-and-forget queries can be found in Section 16.16, “Context Partition Selection”.
There are three ways to run fire-and-forget queries:
Use the executeQuery
method to executes a given fire-and-forget query exactly once, see Section 16.7.1, “Fire-and-forget Query Single Execution”.
Use the prepareQuery
method to prepare a given fire-and-forget query such that the same query can be executed multiple times, see Section 16.7.2, “Fire-and-forget Query Prepared Unparameterized Execution”.
Use the prepareQueryWithParameters
method to prepare a given fire-and-forget query that may have substitution parameters such that the same query can be parameterized and executed multiple times without repeated parsing, see Section 16.7.3, “Fire-and-forget Query Prepared Parameterized Execution”
If your application must execute the same fire-and-forget query multiple times with different parameters use prepareQueryWithParameters
.
If your application must execute the same fire-and-forget query multiple times without parameters use either prepareQuery
or prepareQueryWithParameters
and specify no substitution parameters.
By using any of the prepare...
methods the runtime can load the byte code for the query once and reuse the byte code and thereby speed up repeated execution.
The following limitations apply:
A fire-and-forget only evaluates against the named windows and tables that your application creates. Fire-and-forget queries may not specify any other streams or application event types.
The following clauses are not allowed in fire-and-forget EPL queries: insert into
and output
.
Data windows and patterns are not allowed to appear in fire-and-forget queries.
Fire-and-forget EPL may not perform subqueries.
The previous
and prior
functions may not be used.
Use the executeQuery
method for executing a fire-and-forget query once. For repeated execution, please consider any of the prepare...
methods instead.
The next program listing runs a fire-and-forget query against a named window MyNamedWindow
and prints a column of each row result of the query (this sample uses the compiler runtime-path):
String query = "select * from MyNamedWindow"; CompilerArguments compilerArguments = new CompilerArguments(); compilerArguments.getPath().add(runtime.getRuntimePath()); EPCompiled compiled = EPCompilerProvider.getCompiler().compileQuery(query, compilerArguments); EPFireAndForgetQueryResult result = runtime.getFireAndForgetService().executeQuery(compiled); for (EventBean row : result.getArray()) { System.out.println("name=" + row.get("name")); }
For executing a fire-and-forget against a table please put the table name into the from
-clause instead.
Prepared fire-and-forget queries are designed for repeated execution and may perform better then the dynamic single-execution method if running the same query multiple times. For use with parameter placeholders please see Section 16.7.3, “Fire-and-forget Query Prepared Parameterized Execution”.
The next code snippet demonstrates prepared fire-and-forget queries without parameter placeholder:
String query = "select * from MyNamedWindow where orderId = '123'"; CompilerArguments compilerArguments = new CompilerArguments(); compilerArguments.getPath().add(runtime.getRuntimePath()); EPCompiled compiled = EPCompilerProvider.getCompiler().compileQuery(query, compilerArguments); EPFireAndForgetPreparedQuery prepared = runtime.getFireAndForgetService().prepareQuery(compiled); EPFireAndForgetQueryResult result = prepared.execute(); // ...later on execute once more ... prepared.execute(); // execute a second time
Please see the compiler documentation for specifying substitution parameters.
All substitution parameters must be replaced by actual values before a fire-and-forget query with substitution parameters can be executed. Substitution parameters can be replaced with an actual value using the setObject
method for each index or name. Substitution parameters can be set to new values and the query executed more than once.
While the setObject
method allows substitution parameters to assume any actual value including application Java objects or enumeration values, the application must provide the correct type of substitution parameter that matches the type that
was specified, if any, and the requirements of the expression the parameter resides in.
The next program listing runs a prepared and parameterized fire-and-forget query against a named window MyNamedWindow
(this example does not assign names to substitution parameters):
String query = "select * from MyNamedWindow where orderId = ?::string"; CompilerArguments compilerArguments = new CompilerArguments(); compilerArguments.getPath().add(runtime.getRuntimePath()); EPCompiled compiled = EPCompilerProvider.getCompiler().compileQuery(query, compilerArguments); EPFireAndForgetPreparedQueryParameterized prepared = runtime.getFireAndForgetService().prepareQueryWithParameters(query); // Set the required parameter values before each execution prepared.setObject(1, "123"); EPFireAndForgetQueryResult result = runtime.getFireAndForgetService().executeQuery(prepared); // ...execute a second time with new parameter values... prepared.setObject(1, "456"); result = runtime.getFireAndForgetService().executeQuery(prepared);
This second example uses the in
operator and has multiple parameters:
String query = "select * from MyNamedWindow where orderId in (?::string[]) and price > ?::double"; CompilerArguments compilerArguments = new CompilerArguments(); compilerArguments.getPath().add(runtime.getRuntimePath()); EPCompiled compiled = EPCompilerProvider.getCompiler().compileQuery(query, compilerArguments); EPFireAndForgetPreparedQueryParameterized prepared = runtime.getFireAndForgetService().prepareQueryWithParameters(compiled); prepared.setObject(1, new String[] {"123", "456"}); prepared.setObject(2, 1000.0);
For NEsper .NET also see Section J.16, “.NET API - Runtime Threading and Concurrency”.
The runtime is designed from the ground up to operate as a component to multi-threaded, highly-concurrent applications that require efficient use of Java VM resources. In addition, multi-threaded execution requires guarantees in predictability of results and deterministic processing. This section discusses these concerns in detail.
In Esper, a runtime instance is a unit of separation. Applications can obtain and discard (initialize) one or more runtime instances within the same Java VM and can provide the same or different configurations to each instance. A runtime instance shares resources between statements by means of named windows, tables and variables.
Applications can use Esper APIs to concurrently, by multiple threads of execution, perform such functions as deploying modules, or sending events into the runtime for processing. Applications can use application-managed threads or thread pools or any set of same or different threads of execution with any of the public runtime APIs. There are no restrictions towards threading other than those noted in specific sections of this document.
The runtime does not prescribe a specific threading model. Applications using Esper retain full control over threading, allowing a runtime to be easily embedded and used as a component or library in your favorite Java container or process.
In the default configuration it is up to the application code to use multiple threads for processing events by the runtime, if so desired. All event processing takes places within your application thread call stack. The exception is timer-based processing if your runtime relies on the internal timer (default). If your application relies on external timer events instead of the internal timer then there need not be any runtime-managed internal threads.
The fact that event processing can take place within your application thread's call stack makes developing applications with the Esper runtime easier: Any common Java integrated development environment (IDE) can host a compiler and runtime instance. This allows developers to easily set up test cases, debug through listener code and inspect input or output events, or trace their call stack.
In the default configuration, each runtime maintains a single timer thread (internal timer) providing for time or schedule-based processing within the runtime. The default resolution at which the internal timer operates is 100 milliseconds. The internal timer thread can be disabled and applications can instead advance time to perform timer or scheduled processing at the resolution required by an application.
A runtime performs minimal locking to enable high levels of concurrency. A runtime locks on the combination of query and context partition to protect context partition resources. For example, two statements with three partitions each have a total of six locks. For stateless EPL select-statements the runtime does not use a context-partition lock and operates lock-free for the context partition. For stateful statements, the maximum (theoretical) degree of parallelism is 2^31-1 (2,147,483,647) parallel threads working to process a single statement under a hash segmented context.
For named windows and tables, on-select
, on-merge
, on-update
and on-delete
all execute under the same lock as the partition of the named window or table.
Any insert into
produces a new event for the work queue and does not lock the target of the insert-into.
You may turn off context partition locking runtime-wide (also read the caution notice) as described in Section 17.6.10.3, “Disable Locking”. You may disable context partition locking for a given statement by providing the @NoLock
annotation as part of your EPL. Note, Esper provides the @NoLock
annotation for the purpose of identifying locking overhead, or when your application is single-threaded, or when using an external mechanism for concurrency control or for example with virtual data windows or plug-in data windows to allow customizing concurrency for a given statement or named window. Using this annotation may have unpredictable results unless your application is taking concurrency under consideration.
For a runtime to produce predictable results from the viewpoint of listeners to statements, a runtime by default ensures that it dispatches statement result events to listeners in the order in which a statement produced result events. Applications that require the highest possible concurrency and do not require predictable order of delivery of events to listeners, this feature can be turned off via configuration, see Section 17.6.1.1, “Preserving the Order of Events Delivered to Listeners”. For example, assume thread T1 processes an event applied to statement S producing output event O1. Assume thread T2 processes another event applied to statement S and produces output event O2. The runtime employs a configurable latch system to ensure that listeners to statement S receive and may complete processing of O1 before receiving O2. When using outbound threading (advanced threading options) or changing the configuration this guarantee is weakened or removed.
In multithreaded environments, when one or more statements make result events available via the insert into
clause to further statements, the runtime preserves the order of events inserted into the generated insert-into stream, allowing statements that consume other statement's events to behave deterministic. This feature can also be turned off via configuration, see Section 17.6.1.2, “Preserving the Order of Events for Insert-Into Streams”. For example, assume thread T1 processes an event applied to statement S and thread T2 processes another event applied to statement S. Assume statement S inserts into stream ST. T1 produces an output event O1 for processing by consumers of ST1 and T2 produces an output event O2 for processing by consumers of ST. The runtime employs a configurable latch system such that O1 is processed before O2 by consumers of ST. When using route execution threading (advanced threading options) or changing the configuration this guarantee is weakened or removed.
We generally recommended that listener implementations block minimally or do not block at all. By implementing listener code as non-blocking code execution threads can often achieve higher levels of concurrency.
We recommended that, when using a single listener or subscriber instance to receive output from multiple statements, that the listener or subscriber code is multithread-safe. If your application has shared state between listener or subscriber instances then such shared state should be thread-safe.
In the default configuration the same application thread that invokes any of the sendEvent
methods will process the event fully and also deliver output events to listeners and subscribers. By default the single internal timer thread based on system time performs time-based processing and delivery of time-based results.
This default configuration reduces the processing overhead associated with thread context switching, is lightweight and fast and works well in many environments such as J2EE, server or client. Latency and throughput requirements are largely use case dependent, and Esper provides runtime-level facilities for controlling concurrency that are described next.
Inbound Threading queues all incoming events: A pool of runtime-managed threads performs the event processing. The application thread that sends an event via any of the sendEvent
methods returns without blocking.
Outbound Threading queues events for delivery to listeners and subscribers, such that slow or blocking listeners or subscribers do not block event processing.
Timer Execution Threading means time-based event processing is performed by a pool of runtime-managed threads. With this option the internal timer thread (or external timer event) serves only as a metronome, providing units-of-work to the runtime-managed threads in the timer execution pool, pushing threading to the level of each statement for time-based execution.
Route Execution Threading means that the thread sending in an event via any of the sendEvent
methods (or the inbound threading pooled thread if inbound threading is enabled) only identifies and pre-processes an event, and a pool of runtime-managed threads handles the actual processing of the event for each statement, pushing threading to the level of each statement for event-arrival-based execution.
The runtime starts runtime-managed threads as daemon threads when the runtime instance is first obtained. The runtime stops runtime-managed threads when the runtime instance is destroyed via the destroy
method. When the runtime is initialized via the initialize
method the existing runtime-managed threads are stopped and new threads are created. When shutting down your application, use the destroy
method to stop runtime-managed threads.
Note that the options discussed herein may introduce additional processing overhead into your system, as each option involves work queue management and thread context switching.
If your use cases require ordered processing of events or do not tolerate disorder, the threading options described herein are not the right choice.
For enforcing a processing order within a given criteria, your application must enforce such processing order. Esper does not enforce order of processing if you enable inbound or route threading. Your application code could, for example, utilize a thread per group of criteria keys, a latch per criteria key, or a queue per criteria key, or use Java's completion service, all depending on your ordering requirements.
If your use cases require loss-less processing of events, wherein the threading options mean that events are held in an in-memory queue, the threading options described herein may not be the right choice.
Care should be taken to consider arrival rates and queue depth. Threading options utilize unbound queues or capacity-bound queues with blocking-put, depending on your configuration, and may therefore introduce an overload or blocking situation to your application. You may use the service provider interface as outlined below to manage queue sizes, if required, and to help tune the runtime to your application needs. Consider throttling down the event send rate when the API (see below) indicates that events are getting queued.
All threading options are on the level of a runtime. If you require different threading behavior for certain statements then consider using multiple runtimes, consider using the routeEvent
method or consider
using application threads instead.
Please consult Section 17.6.1, “Runtime Settings Related to Concurrency and Threading” for instructions on how to configure threading options. Threading options take effect at runtime initialization time.
With inbound threading a runtime places inbound events in a queue for processing by one or more runtime-managed threads other than the delivering application threads.
The delivering application thread uses one of the sendEventType
methods on EPEventService
to deliver events or may also use the sendEvent
method on a EventSender
. The runtime receives the event and places the event into a queue, allowing the delivering thread to continue and not block while the event is being processed and results are delivered.
Events that are sent into the runtime via one of the routeEvent
methods are not placed into queue but processed by the same thread invoking the routeEvent
operation.
With outbound threading a runtime places outbound events in a queue for delivery by one or more runtime-managed threads other than the processing thread originating the result.
With outbound threading your listener or subscriber class receives statement results from one of the runtime-managed threads in the outbound pool of threads. This is useful when you expect your listener or subscriber code to perform significantly blocking operations and you do not want to hold up event processing.
sendEvent
method and not the routeEvent
method.
With timer execution threading an runtime places time-based work units into a queue for processing by one or more runtime-managed threads other than the internal timer thread or the application thread that sends an external timer event.
Using timer execution threading the internal timer thread (or thread delivering an external timer event) serves to evaluate which time-based work units must be processed. A pool of runtime-managed threads performs the actual processing of time-based work units and thereby offloads the work from the internal timer thread (or thread delivering an external timer event).
Enable this option as a tuning parameter when your statements utilize time-based patterns or data windows. Timer execution threading is fine grained and works on the level of a time-based schedule in combination with a statement.
With route execution threading an runtime identifies event-processing work units based on the event and statement combination. It places such work units into a queue for processing by one or more runtime-managed threads other than the thread that originated the event.
While inbound threading works on the level of an event, route execution threading is fine grained and works on the level of an event in combination with a statement.
The service-provider interface EPRuntimeSPI
is an extension API that allows to manage runtime-level queues and thread pools (Extension APIs are subject to change between release versions).
The following code snippet shows how to obtain the BlockingQueue<Runnable>
and the ThreadPoolExecutor
for the managing the queue and thread pool responsible for inbound threading:
EPRuntimeSPI spi = (EPRuntimeSPI) runtime; int queueSize = spi.getThreadingService().getInboundQueue().size(); ThreadPoolExecutor threadpool = spi.getThreadingService().getInboundThreadPool();
This section discusses the order in which N competing statements that all react to the same arriving event execute.
The runtime, by default, does not guarantee to execute competing statements in any particular order unless using @Priority. We therefore recommend that an application does not rely on the order of execution of statements by the runtime, since that best shields the behavior of an application from changes in the order that statements may get created by your application or by threading configurations that your application may change at will.
If your application requires a defined order of execution of competing statements, use the @Priority EPL syntax to make the order of execution between statements well-defined (requires that you set the prioritized-execution configuration setting). And the @Drop can make a statement preempt all other lowered priority ones that then won't get executed for any matching events.
This section discusses the order of event evaluation when multiple events must be processed, for example when multiple statements use insert-into to generate further events upon arrival of an event.
The runtime processes an arriving event completely before indicating output events to listeners and subscribers, and before considering output events generated by insert-into or routed events inserted by listeners or subscribers.
For example, assume three statements (1) select * from MyEvent and (2) insert into ABCStream select * from MyEvent. (3) select * from ABCStream. When a MyEvent event arrives then the listeners to statements (1) and (2) execute first (default threading model). Listeners to statement (3) which receive the inserted-into stream events are always executed after delivery of the triggering event.
Among all events generated by insert-into of statements and the events routed into the runtime via the routeEvent
method, all events that insert-into a named window are processed first in the order generated. All other events are processed thereafter in the order they were generated.
When enabling timer or route execution threading as explained under advanced threading options then the runtime does not make any guarantee to the processing order except that is will prioritize events inserted into a named window.
There are two modes for a runtime to keep track of time: The internal timer based on JVM system time (the default), and externally-controlled (aka. event time) time giving your application full control over the concept of time within a runtime.
By default the internal timer provides time and evaluates schedules. External clocking i.e. event time can be used to supply time ticks to the runtime instead. The latter is useful for when events themselves provide the time to advance. External clocking also helps in testing time-based event sequences or for synchronizing the runtime with an external time source.
The internal timer relies on the java.util.concurrent.ScheduledThreadPoolExecutor
class for time tick events. The next section describes timer resolution for the internal timer, by default set to 100 milliseconds but is configurable via the threading options. When using externally-controlled time the timer resolution is in your control.
To disable the internal timer and use externally-provided time instead, there are two options. The first option is to use the configuration API at runtime initialization time. The second option toggles on and off the internal timer at runtime, via special timer control events that are sent into the runtime like any other event.
If using a timer execution thread pool as discussed above, the internal timer or external time event provide the schedule evaluation however do not actually perform the time-based processing. The time-based processing is performed by the threads in the timer execution thread pool.
External and internal/system time is the same internally to the runtime thus the runtime behaves the same whether using external or internal timer.
This code snippet shows the use of the configuration API to disable the internal timer and thereby turn on externally-provided time (see the Configuration section for configuration via XML file):
Configuration config = new Configuration(); config.getRuntime().getThreading().setInternalTimerEnabled(false); EPRuntime runtime = EPRuntimeProvider.getDefaultRuntime(config);
After disabling the internal timer, it is wise to set a defined time so that any statements created thereafter start relative to the time defined. Use the advanceTime
method to indicate current time to the runtime
and to move time forward for the runtime (a.k.a application-time model).
This code snippet obtains the current time and advances time:
long timeInMillis = System.currentTimeMillis(); runtime.getEventService().advanceTime(timeInMillis);
To enable or disable the internal timer by API call use the clockExternal
and clockInternal
methods of EPEventService
.
The next code snippet demonstrates toggling to external time:
EPRuntime runtime = EPRuntimeProvider.getDefaultRuntime(); EPEventService eventService = runtime.getEventService(); // switch to external clocking eventService.clockExternal();
The advanceTime
method moves the time forward. All aspects of runtime current time related to statements and patterns are driven by the time that your application advances to.
The next example sequence of instructions sets time to zero, then creates a statement, then moves time forward to 1 seconds later and then 6 seconds later:
// Set start time at zero. runtime.getEventService().advanceTime(0); // deploy a module here // sample EPL: select current_timestamp() as ct from pattern[every timer:interval(1 minute)] runtime.getDeploymentService().deploy(compiled); // compiled is a module you compiled earlier // move time forward 1 second runtime.getEventService().advanceTime(1000); // move time forward 5 seconds runtime.getEventService().advanceTime(6000);
When advancing time your application should make sure values are ascending. That is, each time value should be either the same value or a larger value then the prior value provided.
Your application may use the getNextScheduledTime
method in EPEventService
to determine the earliest time a schedule for any statement requires evaluation.
The following code snippet sets the current time, creates a statement and prints the next scheduled time which is 1 minute later then the current time:
// set start time to the current time. runtime.getEventService().advanceTime(System.currentTimeMillis()); // deploy a module // sample EPL: select current_timestamp() as ct from pattern[every timer:interval(1 minute)] runtime.getDeploymentService().deploy(compiled); // compiled is a module you compiled earlier // print next schedule time System.out.println("Next schedule at " + new Date(runtime.getEventService().getNextScheduledTime()));
If you find that your application requires control over time only for specific deployments and not for other deployments, please read up on Section 16.21, “Stages”.
You may not use advanceTime
or advanceTimeSpan
to control time when using internal timer since the internal timer tracks system time
and must be the only source of time.
The advanceTime
method allows your application to advance runtime time to a given point in time.
In addition, the getNextScheduledTime
method in EPEventService
returns the next scheduled time according to started statements.
You would typically use advanceTime
to advance time at a relatively high resolution i.e. milliseconds or microseconds.
To advance time for a span of time without individual calls to advanceTime
the API provides the method advanceTimeSpan
. The advanceTimeSpan
method can accept a resolution parameter.
If your application provides the target end time of a time span to the advanceTimeSpan
method and does not provide a resolution, the runtime advances time up to the target time by stepping through all relevant times according to started statements.
If your application provides the target end time of a time span and in addition a long
-typed resolution to the advanceTimeSpan
method the runtime advances time up to the target time by incrementing time according to the resolution (regardless of next scheduled time according to started statements).
Consider the following example:
// Set start time to Jan.1, 2010, 00:00 am for this example SimpleDateFormat format = new SimpleDateFormat("yyyy MM dd HH:mm:ss SSS"); Date startTime = format.parse("2010 01 01 00:00:00 000"); runtime.getEventService().advanceTime(startTime.getTime()); // deploy a module // sample EPL: select current_timestamp() as ct from pattern[every timer:interval(1 minute)] runtime.getDeploymentService().deploy(compiled); // compiled is a module you compiled earlier stmt.addListener(...); // add a listener // Advance time to 10 minutes after start time runtime.getEventService().advanceTimeSpan(startTime.getTime() + 10*60*1000));
The above example advances time to 10 minutes after the start time using the advanceTimeSpan
method. As the example does not pass a resolution, the runtime advances time according to statement schedules. Upon calling the advanceTimeSpan
method the listener sees 10 invocations for minute 1 to minute 10.
To advance time according to a given resolution, you may provide the resolution as shown below:
// Advance time to 10 minutes after start time at 100 msec resolution runtime.getEventService().advanceTimeSpan(startTime.getTime() + 10*60*1000, 100);
Time can have a resolution of either milliseconds or microseconds.
The default time resolution is milliseconds. To configure the runtime for microsecond resolution, please see Section 17.4.14.1, “Time Unit”.
Table 16.7. Time Resolution
Millisecond | Microsecond | |
---|---|---|
Smallest unit for advancing time | 1 millisecond | 1 microsecond |
Equivalent java.util.concurrent.TimeUnit | TimeUnit.MILLISECONDS | TimeUnit.MICROSECONDS |
Default? | Default | Requires configuration change, see Section 17.4.14.1, “Time Unit” |
Long-type runtime time represents | Milliseconds since Epoch | Microseconds since Epoch |
Example: the date Tue, 01 Jan 1980 00:00:00 GMT | 315532800000 | 315532800000000 |
Support for Internal System Time | Yes | No, requires external time (aka. event time) via advanceTimeSpan or advanceTime |
A few notes on usage of microsecond time unit for time resolution:
The runtime automatically computes time periods into microseconds. For example 1 minute 2 seconds
is 62000000
microseconds (62 * 1000000
).
The runtime automatically computes time-in-second parameters into microseconds. For example 5.02 seconds
is 5020000
microseconds.
The runtime automatically computes ISO schedules, crontabs and hints related to runtime time into microseconds.
The CurrentTimeSpanEvent
or CurrentTimeEvent
events must provide microsecond values.
Date-time methods with long-type input values assume microsecond values.
Date-time methods or other functions that take millisecond parameters or produce millisecond values still consume/produce millisecond values, such as the date-time method toMillisec
.
The internal timer must be disabled (setInternalTimerEnabled(false)
) and TimerControlEvent.ClockType.CLOCK_INTERNAL
cannot be used.
By default the internal timer is enabled and that tracks VM system time. For many use cases your application may want to use event time or external time instead, as discussed above.
The internal timer thread, by default, uses the call System.currentTimeMillis()
to obtain system time. Please see the JIRA issue ESPER-191 Support nano/microsecond resolution for more information on Java system time-call performance, accuracy and drift.
The internal timer thread can be configured to use nanosecond time as returned by System.nanoTime()
. If configured for nanosecond time, the runtime computes an offset of the nanosecond ticks to wall clock time upon startup to present back an accurate millisecond wall clock time.
Please see section Section 17.6.6, “Runtime Settings Related to Time Source” to configure the internal timer thread to use System.nanoTime()
.
The internal timer is based on java.util.concurrent.ScheduledThreadPoolExecutor
and that generally provides high accuracy VM time
(java.util.Timer
does not support high accuracy VM time).
You may register one or more exception handlers for the runtime to invoke in the case it encounters an exception processing a continuously-executing statement. By default and without exception handlers the runtime cancels execution of the current statement that encountered the exception, logs the exception and continues to the next statement, if any. The configuration is described in Section 17.6.11, “Runtime Settings Related to Exception Handling”.
If your application registers exception handlers as part of runtime configuration, the runtime invokes the exception handlers in the order they are registered passing relevant exception information such as statement name, expression and the exception itself.
Exception handlers receive any statement unchecked exception such as internal exceptions or exceptions thrown by plug-in aggregation functions or plug-in data windows. The runtime does not provide to exception handlers any exceptions thrown by static method invocations for function calls, method invocations in joins, methods on variables and event classes and listeners or subscriber exceptions.
An exception handler can itself throw a runtime exception to cancel execution of the current event against any further statements.
Exceptions are meant to indicate an actual unexpected problem.
We do not recommend explicitly throwing exceptions for the purpose of flow control, preempting execution or other normal situations.
The runtime does not guarantee that throwing an exception has no other side effect and the runtime may not roll back changes that are already made to state.
For fire-and-forget queries the API indicates any exception directly back to the caller without the exception handlers being invoked, as exception handlers apply to statements only. The same applies to any API calls other than sendEvent
and the EventSender
methods.
As the configuration section describes, your application registers one or more classes that implement the ExceptionHandlerFactory
interface in the runtime configuration. Upon runtime initialization the runtime obtains a factory instance from the class name that then provides the exception handler instance. The exception handler class must implement the ExceptionHandler
interface.
You may register one or more condition handlers for the runtime to invoke in the case it encounters certain conditions, as outlined below, when executing a statement. By default and without condition handlers the runtime logs the condition at informational level and continues processing. The configuration is described in Section 17.6.12, “Runtime Settings Related to Condition Handling”.
If your application registers condition handlers as part of runtime configuration, the runtime invokes the condition handlers in the order they are registered passing relevant condition information such as statement name, expression and the condition information itself.
Currently the only conditions indicated by this facility are raised by the pattern followed-by operator, see Section 7.5.8.1, “Limiting Sub-Expression Count” and see Section 7.5.8.2, “Limiting Runtime-Wide Sub-Expression Count”.
A condition handler may not itself throw a runtime exception or return any value.
As the configuration section describes, your application registers one or more classes that implement the ConditionHandlerFactory
interface in the runtime configuration. Upon runtime initialization the runtime obtains a factory instance from the class name that then provides the condition handler instance. The condition handler class must implement the ConditionHandler
interface.
The runtime can report key processing metrics through the JMX platform mbean server by setting a single configuration flag described in Section 17.6.7, “Runtime Settings Related to JMX Metrics”. For additional detailed reporting and metrics events, please read on.
Metrics reporting is a feature that allows an application to receive ongoing reports about key runtime-level and statement-level metrics. Examples are the number of incoming events, the CPU time and wall time taken by statement executions or the number of output events per statement.
Metrics reporting is, by default, disabled. To enable reporting, please follow the steps as outlined in Section 17.6.8, “Runtime Settings Related to Metrics Reporting”. Metrics reporting must be enabled at runtime initialization time. Reporting intervals can be controlled at runtime via the EPMetricsService
interface available from the runtime API.
Your application can receive metrics at configurable intervals via statement. A metric datapoint is simply a well-defined event. The events are RuntimeMetric
and StatementMetric
and the Java class representing the events can be found in the client API in package com.espertech.esper.common.client.metric
.
Since metric events are processed by the runtime the same as application events, your EPL may use any construct on such events. For example, your application may select, filter, aggregate properties, sort or insert into a stream, named window or table all metric events the same as application events.
This example statement selects all runtime metric events:
select * from RuntimeMetric
The next statement selects all statement metric events:
select * from StatementMetric
Make sure to have metrics reporting enabled since only then do listeners or subscribers to a statement such as above receive metric events.
The runtime provides metric events after the configured interval of time has passed. By default, only started statements that have activity within an interval (in the form of event or timer processing) are reported upon.
The default configuration performs the publishing of metric events in an Esper daemon thread under the control of the runtime instance. Metrics reporting honors externally-supplied time, if using external timer events.
Via runtime configuration options provided by EPMetricsService
, your application may enable and disable metrics reporting globally, provided that metrics reporting was enabled at initialization time. Your application may also enable and disable metrics reporting for individual statements by statement name.
Statement groups is a configuration feature that allows to assigning reporting intervals to statements. Statement groups are described further in the Section 17.6.8, “Runtime Settings Related to Metrics Reporting” section. Statement groups cannot be added or removed at runtime.
The following limitations apply:
If your Java VM version does not report current thread CPU time (most JVM do), then CPU time is reported as zero (use ManagementFactory.getThreadMXBean().isCurrentThreadCpuTimeSupported()
to determine if your JVM supports this feature).
Note: In some JVM the accuracy of CPU time returned is very low (in the order of 10 milliseconds off) which can impact the usefulness of CPU metrics returned. Consider measuring CPU time in your application thread after sending a number of events in the same thread, external to the runtime as an alternative.
Your Java VM may not provide high resolution time via System.nanoTime
. In such case wall time may be inaccurate and inprecise.
CPU time and wall time have nanosecond precision but not necessarily nanosecond accuracy, please check with your Java VM provider.
There is a performance cost to collecting and reporting metrics.
Not all statements may report metrics: The runtime performs certain runtime optimizations sharing resources between similar statements, thereby not reporting on certain statements.
Runtime metrics are properties of RuntimeMetric
events:
Table 16.8. RuntimeMetric Properties
Name | Description |
---|---|
runtimeURI | The URI of the runtime. |
timestamp | The current runtime time. |
inputCount | Cumulative number of input events since runtime initialization time. Input events are defined as events send in via application threads as well as insert into events. |
inputCountDelta | Number of input events since last reporting period. |
scheduleDepth | Number of outstanding schedules. |
Statement metrics are properties of StatementMetric
. The properties are:
Table 16.9. StatementMetric Properties
Name | Description |
---|---|
runtimeURI | The URI of the runtime. |
timestamp | The current runtime time. |
statementName | Statement name. |
cpuTime | Statement processing CPU time (system and user) in nanoseconds (if available by Java VM, obtained from ThreadMXBean.getCurrentThreadCpuTime ). |
wallTime | Statement processing wall time in nanoseconds (based on System.nanoTime ). |
numInput | Number of input events to the statement. |
numOutputIStream | Number of insert stream rows output to listeners or the subscriber, if any. |
numOutputRStream | Number of remove stream rows output to listeners or the subscriber, if any. |
The totals reported are cumulative relative to the last metric report.
Enterprise Edition has a library for measuring and reporting memory use for a runtime.
The runtime can report key processing metrics through the JMX platform mbean server by setting a single configuration flag described in Section 17.6.7, “Runtime Settings Related to JMX Metrics”.
Runtime and statement-level metrics reporting is described in Section 16.12, “Runtime and Statement Metrics Reporting”.
The easiest way to see thread contentions is by using VisualVM when Esper is under load and looking at the Threads tab. In the worst case you will see a lot of red color in VisualVM. The red line in VisualVM shows the threads that are either in a monitor region or waiting in an entry set for the monitor. The monitor is the mechanism that Java uses to support synchronization. When a statement is stateful the runtime manages the state using a monitor (lock) per context partition.
A JVM profiler can be handy to see how much CPU is spent in Esper by the sendEvent
method.
The jconsole
can provide information on the JVM heap. If memory gets tights the performance can drop significantly.
The EPRenderEventService
interface offers methods to render events as XML or JSON. Obtain the service from a runtime by calling getRenderEventService
on EPRuntime
.
Your application may use the built-in XML and JSON formatters to render output events into a readable textual format, such as for integration or debugging purposes. This section introduces the utility classes in the client util
package for rendering events to strings. Further API information can be found in the JavaDocs.
For repeated rendering of events of the same event type or subtypes, it is recommended to obtain a JSONEventRenderer
or XMLEventRenderer
instance and use the render
method provided by the interface. This allows the renderer implementations to cache event type metadata for fast rendering.
This example shows how to obtain a renderer for repeated rendering of events of the same type, assuming that statement
is an instance of EPStatement
:
JSONEventRenderer jsonRenderer = runtime.getRenderEventService().getJSONRenderer(statement.getEventType());
Assuming that event
is an instance of EventBean
, this code snippet renders an event into the JSON format:
String jsonEventText = jsonRenderer.render("MyEvent", event);
The XML renderer works the same:
XMLEventRenderer xmlRenderer = runtime.getRenderEventService().getXMLRenderer(statement.getEventType());
...and...
String xmlEventText = xmlRenderer.render("MyEvent", event);
If the event type is not known in advance or if your application does not want to obtain a renderer instance per event type for fast rendering, your application can use one of the following methods to render an event to a XML or JSON textual format:
String json = runtime.getRenderEventService().renderJSON(event); String xml = runtime.getRenderEventService().renderXML(event);
Use the JSONRenderingOptions
or XMLRenderingOptions
classes to control how events are rendered. To render specific event properties using a custom event property renderer, specify an EventPropertyRenderer
as part of the options
that renders event property values to strings. Please see the JavaDoc documentation for more information.
The JSON renderer produces JSON text according to the standard documented at http://www.json.org
.
The renderer formats simple properties as well as nested properties and indexed properties according to the JSON string encoding, array encoding and nested object encoding requirements.
The renderer does render indexed properties, it does not render indexed properties that require an index, i.e. if your event representation is backed by POJO objects and your getter method is getValue(int index)
, the indexed property values are not part of the JSON text. This is because the implementation has no way to determine how many index keys there are. A workaround is to have a method such as Object[] getValue()
instead.
The same is true for mapped properties that the renderer also renders. If a property requires a Map key for access, i.e. your getter method is getValue(String key)
, such property values are not part of the result text as there is no way for the implementation to determine the key set.
The XML renderer produces well-formed XML text according to the XML standard.
The renderer can be configured to format simple properties as attributes or as elements. Nested properties and indexed properties are always represented as XML sub-elements to the root or parent element.
The root element name provided to the XML renderer must be the element name of the root in the XML document and may include namespace instructions.
The renderer does render indexed properties, it does not render indexed properties that require an index, i.e. if your event representation is backed by POJO objects and your getter method is getValue(int index)
, the indexed property values are not part of the XML text. This is because the implementation has no way to determine how many index keys there are. A workaround is to have a method such as Object[] getValue()
instead.
The same is true for mapped properties that the renderer also renders. If a property requires a Map key for access, i.e. your getter method is getValue(String key)
, such property values are not part of the result text as there is no way for the implementation to determine the key set.
A plug-in loader is for general use with input adapters, output adapters or EPL code deployment or any other task that can benefits from being part of an Esper configuration file and that follows runtime lifecycle.
A plug-in loader implements the com.espertech.esper.runtime.client.plugin.PluginLoader
interface and can be listed in the configuration.
Each configured plug-in loader follows the runtime instance lifecycle: When an runtime instance initializes, it instantiates each PluginLoader
implementation class listed in the configuration. The runtime then invokes the lifecycle
methods of the PluginLoader
implementation class before and after the runtime is fully initialized and before an runtime instance is destroyed.
Declare a plug-in loader in your configuration XML as follows:
... <plugin-loader name="MyLoader" class-name="org.mypackage.MyLoader"> <init-arg name="property1" value="val1"/> </plugin-loader> ...
Alternatively, add the plug-in loader via the configuration API:
Configuration config = new Configuration(); Properties props = new Properties(); props.put("property1", "value1"); config.getRuntime().addPluginLoader("MyLoader", "org.mypackage.MyLoader", props);
Implement the init
method of your PluginLoader
implementation to receive
initialization parameters. The runtime invokes this method before the runtime is fully initialized, therefore your implementation
should not yet rely on the runtime instance within the method body:
public class MyPluginLoader implements PluginLoader { public void init(String loaderName, Properties properties, EPRuntime runtime) { // save the configuration for later, perform checking } ...
The runtime calls the postInitialize
method once the runtime completed initialization
and to indicate the runtime is ready for traffic.
public void postInitialize() { // Start the actual interaction with external feeds or the runtime here } ...
The runtime calls the destroy
method once the runtime is destroyed or initialized for a second time.
public void destroy() { // Destroy resources allocated as the runtime instance is being destroyed }
To access the plug-in at runtime, the getContext
method provides access under the name plugin-loader/
name:
runtime.getContext().getEnvironment().get("plugin-loader/MyLoader");
This chapter discusses how to select context partitions. Contexts are discussed in Chapter 4, Context and Context Partitions and the reasons for context partition selection are introduced in Section 4.9, “Operations on Specific Context Partitions”.
The section is only relevant when you declare a context. It applies to all different types of hash, partitioned, category, overlapping or other temporal contexts. The section uses a category context for the purpose of illustration. The API discussed herein is general and handles all different types of contexts including nested contexts.
Consider a category context that separates bank transactions into small, medium and large:
// declare category context create context TxnCategoryContext group by amount < 100 as small, group by amount between 100 and 1000 as medium, group by amount > 1000 as large from BankTxn
// retain 1 minute of events of each category separately context TxnCategoryContext select * from BankTxn#time(1 minute)
In order for your application to iterate one or more specific categories it is necessary to identify which category, i.e. which context partition, to iterate. Similarly for fire-and-forget queries, to execute fire-and-forget queries against one or more specific categories, it is necessary to identify which context partition to execute the fire-and-forget query against.
Your application may iterate one or more specific context partitions using either the iterate
or safeIterate
method of EPStatement
by providing an implementation of the ContextPartitionSelector
interface.
For example, assume your application must obtain all bank transactions for small amounts. It may use the API to identify the category and iterate the associated context partition:
ContextPartitionSelectorCategory categorySmall = new ContextPartitionSelectorCategory() { public Set<String> getLabels() { return Collections.singleton("small"); } }; Iterator<EventBean> it = stmt.iterator(categorySmall);
Your application may execute fire-and-forget queries against one or more specific context partitions by using the executeQuery
method on EPRuntime
or the execute
method on EPFireAndForgetPreparedQuery
and by providing an implementation of ContextPartitionSelector
.
Fire-and-forget queries execute against named windows and tables, therefore below statement creates a named window which the runtime manages separately for small, medium and large transactions according to the context declared earlier:
// Named window per category context TxnCategoryContext create window BankTxnWindow#time(1 min) as BankTxn
The following code demonstrates how to fire a fire-and-forget query against the small and the medium category:
ContextPartitionSelectorCategory categorySmallMed = new ContextPartitionSelectorCategory() { public Set<String> getLabels() { return new HashSet<String>(Arrays.asList("small", "medium")); } }; runtime.getFireAndForgetService().executeQuery( "select count(*) from BankTxnWindow", new ContextPartitionSelector[] {categorySmallMed});
The following limitations apply:
Fire-and-forget queries may not join named windows or tables that declare a context.
This section summarizes the selector interfaces that are available for use to identify and interrogate context partitions. Please refer to the JavaDoc documentation for package com.espertech.esper.common.client.context
and classes therein for additional information.
Use an implementation of ContextPartitionSelectorAll
or the ContextPartitionSelectorAll.INSTANCE
object to instruct the runtime to consider all context partitions.
Use an implementation of ContextPartitionSelectorById
if your application knows the context partition ids to query. This selector instructs the runtime to consider only those provided context partitions based on their integer id value. The runtime outputs the context partition id in the built-in property context.id
.
Use an implementation of ContextPartitionSelectorFiltered
to receive and interrogate context partitions. Use the filter
method that receives a ContextPartitionIdentifier
to return a boolean indicator whether to include the context partition or not. The ContextPartitionIdentifier
provides information about each context partition. Your application may not retain ContextPartitionIdentifier
instances between filter
method invocations as the runtime reuses the same instance. This selector is not supported with nested contexts.
Use an implementation of ContextPartitionSelectorCategory
with category contexts.
Use an implementation of ContextPartitionSelectorSegmented
with keyed segmented contexts.
Use an implementation of ContextPartitionSelectorHash
with hash segmented contexts.
Use an implementation of ContextPartitionSelectorNested
in combination with the selectors described above with nested contexts.
This chapter briefly discusses the API to manage context partitions. Contexts are discussed in Chapter 4, Context and Context Partitions.
The section is only relevant when you declare a context. It applies to all different types of hash, partitioned, category, overlapping or other temporal contexts.
The EPContextPartitionService
interface offers methods to manage context partitions. Obtain the service from a runtime by calling getContextPartitionService
on EPRuntime
.
The context partition admin API allows an application to:
Interrogate the state and identifiers for existing context partitions.
Determine statements associated to a context and context nesting level.
Receive a callback when new contexts get created and destroyed or when context partitions are allocated and de-allocated.
Obtain context properties.
Please see the JavaDoc documentation for more information.
Esper offers a listener and an assertions class to facilitate automated testing of EPL rules, for example when using a test framework such as JUnit
or TestNG
.
Esper does not require any specific test framework. If your application has the JUnit
test framework in classpath Esper uses junit.framework.AssertionFailedError
to indicate assertion errors, so as to integrate with continuous integration tools.
For detailed method-level information, please consult the JavaDoc of the package com.espertech.esper.common.client.scopetest
and com.espertech.esper.runtime.client.scopetest
.
The class com.espertech.esper.common.client.scopetest.EPAssertionUtil
provides methods to assert or compare event property values as well as perform various array arthithmatic, sort events and convert events or iterators to arrays.
The class com.espertech.esper.runtime.client.scopetest.SupportUpdateListener
provides an UpdateListener
implementation that collects events and returns event data for assertion.
The class com.espertech.esper.runtime.client.scopetest.SupportSubscriber
provides a subscriber implementation that collects events and returns event data for assertion. The SupportSubscriberMRD
is a subscriber that accepts events multi-row delivery. The SupportSubscriber
and SupportSubscriberMRD
work similar to SupportUpdateListener
that is introduced in more detail below.
The below table only summarizes the most relevant assertion methods offered by EPAssertionUtil
. Methods provide multiple footprints that are not listed in detail below. Please consult the JavaDoc for additional method-level information.
Table 16.10. Method Summary for EPAssertionUtil
Name | Description |
---|---|
assertProps | Methods that assert that property values of a single |
assertPropsPerRow | Methods that assert that property values of multiple |
assertPropsPerRowAnyOrder | Same as above, but any row may match. Useful for unordered result sets. |
assertEqualsExactOrder | Methods that compare arrays, allowing |
assertEqualsAnyOrder | Same as above, but any row may match. Useful for unordered result sets. |
The below table only summarizes the most relevant methods offered by SupportUpdateListener
. Please consult the JavaDoc for additional information.
Table 16.11. Method Summary for SupportUpdateListener
Name | Description |
---|---|
reset | Initializes listener clearing current events and resetting the invoked flag. |
getAndClearIsInvoked | Returns the "invoked" flag indicating the listener has been invoked, and clears the flag. |
getLastNewData | Returns the last events received by the listener. |
getAndResetDataListsFlattened | Returns all events received by the listener as a pair. |
assertOneGetNewAndReset | Asserts that exactly one new event was received and no removed events, returns the event and resets the listener. |
assertOneGetNew | Asserts that exactly one new event was received and returns the event. |
The next code block is a short but complete programming example that asserts that the properties received from output events match expected value.
String epl = "select personName, count(*) as cnt from PersonEvent#length(3) group by personName"; Configuration configuration = new Configuration(); configuration.getCommon().addEventType(PersonEvent.class); CompilerArguments compilerArguments = new CompilerArguments(configuration); EPCompiled compiled = EPCompilerProvider.getCompiler().compile(stmt, compilerArguments); EPRuntime runtime = EPRuntimeProvider.getDefaultRuntime(configuration); EPStatement stmt = runtime.getDeploymentService().deploy(compiled).getStatements()[0]; SupportUpdateListener listener = new SupportUpdateListener(); stmt.addListener(listener); runtime.getEventService().sendEventBean(new PersonEvent("Joe"), "PersonEvent"); EPAssertionUtil.assertProps(listener.assertOneGetNewAndReset(), "personName,cnt".split(","), new Object[]{"Joe", 1L});
A few additional examples are shown below:
String[] fields = new String[] {"property"}; EPAssertionUtil.assertPropsPerRow(listener.getAndResetDataListsFlattened(), fields, new Object[][]{{"E2"}}, new Object[][]{{"E1"}});
EPAssertionUtil.assertPropsPerRow(listener.getAndResetLastNewData(), fields, new Object[][]{{"E1"}, {"E2"}, {"E3"}});
assertTrue(listener.getAndClearIsInvoked());
Please refer to the Esper codebase test sources for more examples using the assertion class and the listener class.
The configuration object (Configuration
), in respect to classes, holds the fully-qualified class name and does not generally hold Class
references.
This is by design since the configuration object can be populated from XML.
When deploying compiled modules the runtime may use a class loader to find resources. Your application has full control over class-for-name and classloader use. OSGi environments can provide a specific class-for-name and class loader. Please refer to Section 17.7, “Passing Services or Transient Objects”.
A compiler and runtime can well be deployed as part of a J2EE web or enterprise application archive to a web application server. When designing for deployment into a J2EE web application server, please consider the items discussed here.
We provide a sample servlet context listener in this section that uses the deployment API to deploy and undeploy modules as part of the servlet lifecycle.
The distribution provides a message-driven bean (MDB) example that you may find useful.
Esper does not have a dependency on any J2EE or Servlet APIs to allow the runtime to run in any environment or container.
As multiple web applications deployed to a J2EE web application server typically have a separate classloader per application, you should consider whether runtime instances need to be shared between applications or can remain separate runtime instances. Consider the EPRuntimeProvider
a Singleton.
When deploying multiple web applications, your J2EE container classloader may provide a separate instance of the Singleton EPRuntimeProvider
to each web application resulting in multiple independent runtime instances.
To share EPRuntime
instances between web applications, one approach is to add the runtime jar files to the system classpath. A second approach can be to have multiple web applications share the same servet context and have your application place the EPRuntime
instance into a servlet context attribute for sharing. Architecturally you may also consider a single archived application (such as an message-driven bean) that all your web applications communicate to via the JMS broker provided by your application server or an external JMS broker.
As per J2EE standards there are restrictions in regards to starting new threads in J2EE application code. Esper adheres to these restrictions: It allows to be driven entirely by external events. To remove all Esper threads, set the internal timer off and leave the advanced threading options turned off. To provide timer events when the internal timer is turned off, you should check with your J2EE application container for support of the Java system timer or for support of batch or work loading to send timer events to an runtime instance.
As per J2EE standards there are restrictions in regards to input and output by J2EE application code. Esper adheres to these restrictions: By itself it does not start socket listeners or performs any file IO.
When deploying a J2EE archive that contains EPL modules files below is sample code to read and deploy EPL modules files packaged with the enterprise or web application archive when the servlet initializes. The sample undeploys EPL modules when the servlet context gets destroyed.
A sample web.xml
configuration extract is:
<?xml version="1.0" encoding="UTF-8"?> <web-app> <listener> <listener-class>SampleServletListener</listener-class> </listener> <context-param> <param-name>eplmodules</param-name> <param-value>switchmonitor.epl</param-value> </context-param> </web-app>
A sample servet listener that deploys EPL module files packaged into the archive on context initialization and that undeploys when the application server destroys the context is shown here:
public class SampleServletListener implements ServletContextListener { private List<String> deploymentIds = new ArrayList<String>(); public void contextInitialized(ServletContextEvent servletContextEvent) { try { String modulesList = servletContextEvent.getServletContext().getInitParameter("eplmodules"); List<Module> modules = new ArrayList<Module>(); if (modulesList != null) { String[] split = modulesList.split(","); for (int i = 0; i < split.length; i++) { String resourceName = split[i].trim(); if (resourceName.length() == 0) { continue; } String realPath = servletContextEvent.getServletContext().getRealPath(resourceName); Module module = EPCompilerProvider.getCompiler().readModule(new File(realPath)); modules.add(module); } } // Determine deployment order ModuleOrder order = ModuleOrderUtil.getModuleOrder(modules, null); // Deploy for (Module module : order.getOrdered()) { // compile and deploy here (code not included), add deployment id deploymentIds.add(deployment.getDeploymentId()); } } catch (Exception ex) { ex.printStackTrace(); } } public void contextDestroyed(ServletContextEvent servletContextEvent) { EPRuntime runtime = EPRuntimeProvider.getDefaultRuntime(); for (String deploymentId : deploymentIds) { runtime.getDeploymentService().undeploy(deploymentId); } } }
An Esper stage is its own little container that can host deployments.
A stage processes only those events that an application sends into that stage. A stage does not see any events that an application sends into the runtime.
A stage has its own stage time. Time in a stage advances only when an application advances time for that stage. A stage advance times independently of runtime time.
Each stage is uniquely identified by a stage URI. Within different runtimes there can be stages of the same stage URI. A stage lives within the runtime instance and gets destroyed when the runtime gets destroyed. An application can allocate any number of stages. A stage URI can be any non-null string value.
These attributes make stages useful for:
Suspend and Resume. Suspend deployments so they don't receive events or don't advance time. Resume deployments when they are needed again.
Initialize, Load, or Catch-Up new Deployments to Existing Event and Time History. Stage deployments and replay events and time until deployments have caught up with history, and then unstage deployments to have them receive runtime events and time.
Replay. Replay events and time, receive results, destroy accumulated state, without affecting other deployments and in parallel to other activity.
In other words, a stage allows an application to control event visibility and the concept of time as desired on a deployment level: Events sent into a stage are visible only to those deployments that are staged and are not visible to deployments outside of that stage. Within a stage an application can control time independently, start time at a point in time and advance time at the resolution and pace suitable for the deployments added to that stage.
This API is under development for version 8.4 and newer, and is considered UNSTABLE.
The getStageService
method on EPRuntime
returns the service for managing stages:
EPStageService stageService = runtime.getStageService();
To allocate a stage or obtain an already-allocated stage, use getStage
and pass the stage URI. The new stage current time is the runtime current time.
EPStage stage = stageService.getStage("myStage");
The EPStage
instance represents the stage. The freshly-allocated stage is empty.
The stage
method moves deployments from the runtime to the stage (assume deploymentId
is a deployment id of an existing deployment):
stage.stage(Collections.singletonList(deploymentId));
The unstage
method moves deployments from the stage to the runtime:
stage.unstage(Collections.singletonList(deploymentId));
To send events to the stage, use EPStageEventService
which extends the EPEventService
interface:
stage.getEventService().sendEventBean(new Order(...), "Order");
To advance time for a stage, also use EPStageEventService
:
stage.getEventService().advanceTime(myTimeInMillis);
Finally, call destroy
to destroy the stage. Destroy requires that the stage is empty, i.e. does not have any deployments:
stage.destroy();
By staging deployments, the deployments' time and event processing occurs only when the application explicitly sends events to the stage or advances time for that stage.
The example code below compiles and deploys EPL that reports order events:
String module = "@public @buseventtype create schema OrderEvent(price double);\n" + "@name('All-Order-Events') select * from OrderEvent;\n"; EPCompiled compiled = EPCompilerProvider.getCompiler().compile(module, null); EPRuntime runtime = EPRuntimeProvider.getDefaultRuntime(); EPDeployment deployment = runtime.getDeploymentService().deploy(compiled); deployment.getStatements()[0].addListener(new SupportUpdateListener());
The stage
method stages the deployment:
EPStage stage = runtime.getStageService().getStage("myStage"); stage.stage(Collections.singletonList(deployment.getDeploymentId()));
The staged deployment only receives OrderEvent
events that an application send into the stage. It does not receive OrderEvent
events that an application sends into the runtime (the EPEventService
returned by getEventService
of EPRuntime
). For example:
// The listener receives the following event stage.getEventService().sendEventMap(Collections.singletonMap("price", 100d), "OrderEvent"); // The listener DOES NOT receive the following event runtime.getEventService().sendEventMap(Collections.singletonMap("price", 200d), "OrderEvent");
The unstage
method un-stages the deployment:
stage.unstage(Collections.singletonList(deployment.getDeploymentId()));
The un-staged deployment only receives OrderEvent
events that an application sends into the runtime. It does not receive OrderEvent
events that an application sends into the stage. For example:
// The listener DOES NOT receive the following event stage.getEventService().sendEventMap(Collections.singletonMap("price", 100d), "OrderEvent"); // The listener receives the following event runtime.getEventService().sendEventMap(Collections.singletonMap("price", 200d), "OrderEvent");
Finally, destroy the stage when it is no longer needed:
stage.destroy();
When staging and unstaging, existing schedules are adjusted for the time difference between stage and runtime, if any.
For example, assume a pattern pattern[timer:interval(10 minutes)]
deployed at 9:00:00, which would fire at 9:10:00.
If the time of the target is 8:55:00 or 9:05:00 the pattern still fires at 9:10:00. If the time of the target is after, for example 9:15:00, the pattern fires when time advances again.
Staging and un-staging deployments is an inexpensive operation in general.
When using advanced threading options, each stage has its own threading i.e. its own thread pool and queues.
Each stage has its own metrics reporting, when enabled.
Stage use the some configuration values as provided for the runtime.
Deployments that have dependencies on other deployments require additional consideration. Such deployments may provide EPL objects to other deployments, and deployments may consume EPL objects from other deployments. For example, a deployment may create a named window and another deployment may query the named window.
When staging deployments that have dependencies on other deployments, such EPL-object-providing or EPL-object-consuming deployments must also be staged or unstaged in the same operation.