www.espertech.comDocumentation

Chapter 16. Runtime Reference

16.1. Introduction
16.2. Obtaining a Runtime From EPRuntimeProvider
16.3. The EPRuntime Runtime Interface
16.4. Deploying and Undeploying Using EPDeploymentService
16.4.1. Substitution Parameters
16.4.2. Roll-Out of Multiple Compiled Modules
16.4.3. Atomic Deployment Management
16.5. Obtaining Results Using EPStatement
16.5.1. Receiving Statement Results
16.5.2. Setting a Subscriber Object
16.5.3. Adding Listeners
16.5.4. Using Iterators
16.5.5. Event and Event Type
16.5.6. Type Information
16.5.7. Interrogating Annotations
16.6. Processing Events and Time Using EPEventService
16.6.1. Event Sender
16.6.2. Receiving Unmatched Events
16.7. Execute Fire-and-Forget Queries Using EPFireAndForgetService
16.7.1. Fire-and-forget Query Single Execution
16.7.2. Fire-and-forget Query Prepared Unparameterized Execution
16.7.3. Fire-and-forget Query Prepared Parameterized Execution
16.7.4. The From-Clause is Optional
16.7.5. The From-Clause can Access Relational Data via SQL
16.8. Runtime Threading and Concurrency
16.8.1. Advanced Threading
16.8.2. Processing Order
16.9. Controlling Time-Keeping
16.9.1. Controlling Time Using Time Span Events
16.9.2. Time Resolution and Time Unit
16.9.3. Internal Timer Based on JVM System Time
16.10. Exception Handling
16.11. Condition Handling
16.12. Runtime and Statement Metrics Reporting
16.12.1. Runtime Metrics
16.12.2. Statement Metrics
16.13. Monitoring and JMX
16.14. Event Rendering to XML and JSON
16.14.1. JSON Event Rendering Conventions and Options
16.14.2. XML Event Rendering Conventions and Options
16.15. Plug-In Loader
16.16. Context Partition Selection
16.16.1. Selectors
16.17. Context Partition Administration
16.18. Test and Assertion Support
16.18.1. EPAssertionUtil Summary
16.18.2. SupportUpdateListener Summary
16.18.3. Usage Example
16.19. OSGi, Class Loader, Class-For-Name
16.20. When Deploying with J2EE
16.20.1. J2EE Deployment Considerations
16.20.2. Servlet Context Listener
16.21. Stages
16.21.1. Overview
16.21.2. Stage APIs
16.21.3. Stage Example: Suspending and Resuming a Statement
16.21.4. Stage Other Considerations
16.21.5. Stage Limitations

The runtime takes on these functions:

Your application obtains a runtime from EPRuntimeProvider. You may pass an arbitrary string-type runtime URI that uniquely identifies the runtime instance.

A runtime is an instance of EPRuntime. Use the runtime as follows:

The EPRuntimeProvider class provides static methods that return EPRuntime runtimes.

Each runtime has a unique runtime URI which can be any string value. If your application does not pass a runtime URI then the default URI is default (as defined by EPRuntimeProvider.DEFAULT_RUNTIME_URI).

For the getRuntime methods, your application can pass a runtime URI to obtain different runtimes. The EPRuntimeProvider determines whether the provided runtime URI matches any existing runtime URIs and returns the existing runtime, or allocates a new runtime if none was found.

The getExistingRuntime method takes a runtime URI and returns the existing runtime for that URI or null if there is none.

The code snip below gets the default runtime. Subsequent calls to get the default runtime return the same runtime.

EPRuntime runtime = EPRuntimeProvider.getDefaultRuntime();

The next code gets a runtime for the runtime URI RFIDProcessor1. Subsequent calls to get a runtime with the same runtime URI return the same runtime instance.

EPRuntime runtime = EPRuntimeProvider.getRuntime("RFIDProcessor1");

Since the getRuntime methods return the same runtime for each URI there is no need to statically cache a runtime in your application.

You may also pass an optional Configuration. The next code snippet outlines a typical sequence of use:

// Configure the runtime, this is optional
Configuration config = new Configuration();
config.configure("configuration.xml");	// load a configuration from file

// Optionally set additional configuration values like so:
// config.getCommon().add....(...);

// Obtain a runtime
EPRuntime runtime = EPRuntimeProvider.getDefaultRuntime(config);

// Optionally, use initialize if the same runtime has been used before to start clean
runtime.initialize();

// Destroy the runtime when no longer needed, frees up resources, releases the runtime URI
runtime.destroy();

The EPRuntime interface represents a runtime. Only the static methods of the EPRuntimeProvider class allocate new runtimes. A runtime is uniquely identified by runtime URI. The runtime URI is an arbitrary string. The default runtime has a runtime URI of default.

A runtime provides these services:


You can reset a runtime by calling the initialize method. This operation resets the runtime to the configuration last provided to EPRuntimeProvider. If no configuration is provided, an empty (default) configuration applies. Your application must obtain new services from the initialized runtime as initialize marks existing services as invalid.

A runtime can be destroyed via the destroy method. This frees all resources held by the runtime. After a call to destroy the runtime can no longer be used.

You may register callbacks to receive notifications about runtime state. The runtime invokes any EPRuntimeStateListener callbacks when a runtime instance is about to be destroyed and after a runtime has been initialized. Use the addRuntimeStateListener methods to register interest.

When destroying a runtime your application must make sure that threads that are sending events into the runtime have completed their work. More generally, the runtime should not be currently in use during or after the destroy operation.

All runtime instances are completely independent. Your application may not send EventBean instances obtained from one runtime into a second runtime since the event type space between two runtimes is not shared.

Your application must first compile a module or obtain a compiled module before it can deploy. The object representation of a compiled module is EPCompiled.

Call the deploy method and pass the compiled module. The runtime loads the byte code and adds the information contained in the byte code, causing all the compiled module's statements to begin receiving events and time.

Deploying is an atomic operation. At deployment completion all statements of the deployment begin to see events arriving and time passing consistently. In case the deployment fails the runtime rolls back all deployment changes.

The runtime resolves dependencies of the compiled module upon its deployment. The runtime does not validate that the information about EPL-object dependencies that existed at compile-time matches the runtime EPL-objects.

For example, assume there is a compiled module by name compiledModuleM1. Deploy as follows:

EPDeployment deployment = runtime.getDeploymentService().deploy(compiledModuleM1);

The runtime returns a EPDeployment instance that contains the deployment id, the EPStatement statement instances, module name and module properties. The deployment id is an arbitrary string-type identifier that uniquely identifies the deployment in the runtime.

The undeploy method takes the deployment id and undeploys the deployment. The undeployAll method undeploys all deployments.

A compiled module may be deployed any number of times. Substitution parameters can be handy for parameterizing deployed modules.

Your application may deploy and undeploy using any thread and also within listener or subscriber code. If using Bean-style class-based events your application may not invoke deploy or undeploy methods as part of getter or setter code. Extension API code and plug-in single-row methods also may not invoke deploy or undeploy methods.

You may pass a DeploymentOptions instance. Deployment options provide deployment callbacks and other deploy-time parameters:

  • Provide a deployment id. If none is provided the runtime generates a unique deployment id.

  • Provide substitution parameter values for parameterized modules.

  • Provide or override statement names.

  • Provide a runtime statement user object that gets associated to the statement and that can be obtained from an EPStatement with getUserObjectRuntime.

Please consult the JavaDoc for more information.

The compiled module may have substitution parameters as explained in the compiler documentation.

All substitution parameters must be replaced by actual values before a compiled module with substitution parameters can be deployed. A compiled module may be deployed multiple times. Substitution parameters can be set to new values for every deployment.

To set substitution parameter values pass a Deployment Options object to the deploy method that provides a StatementSubstitutionParameterOption.

If not assigning a name to substitution parameters, replace the substitution parameter with an actual value using the setObject(int index, Object value) method for each index, starting from 1.

If assigning a name to each substitution parameter, replace the substitution parameter with an actual value using the setObject(String name, Object value) method for each name.

While the setObject method allows substitution parameters to assume any actual value including application Java objects or enumeration values, the application must provide the correct type of substitution parameter that matches the requirements of the expression the parameter resides in.

The below sample code compiles and deploys a parameterized module:

String stmt = "select * from PersonEvent(firstName=?::string)";
Configuration configuration = new Configuration();
configuration.getCommon().addEventType(PersonEvent.class);
CompilerArguments compilerArguments = new CompilerArguments(configuration);
EPCompiled compiled = EPCompilerProvider.getCompiler().compile(stmt, compilerArguments);

DeploymentOptions deploymentOptions = new DeploymentOptions();
deploymentOptions.setStatementSubstitutionParameter(prepared -> prepared.setObject(1, "Joe")); 
EPDeployment deployment = runtime.getDeploymentService().deploy(compiled, deploymentOptions);
EPStatement statement = deployment.getStatements()[0];

Your application can concurrently send events into the runtime while deploying and undeploying statements and adding or removing listeners. It is safe to undeploy and deploy compiled modules while sending in events from other threads concurrently.

However in some cases your application may need more control over deployment, for example when deploying multiple modules or when attaching custom listener code.

Your application can use the API described below to obtain a lock and perform deployment actions as an atomic unit. For example, if your application would like to undeploy and re-deploy as a single atomic unit, while at the same time sending events into the runtime from different threads, it can obtain a lock to ensure that no events are concurrently processed while the operations take place.

The below code sample obtains the runtime exclusive write lock to perform multiple management operations as a unit, excluding concurrent processing of events.

runtime.getRuntimeInstanceWideLock().writeLock().lock();
// Start atomic management unit. 
// Any events concurrently being processed by other threads must complete before the code completes obtaining the lock. 
// Any events sent in by other threads will await the release of the lock.
try {
  // Perform operations such as : 
  //   - deploy and/or undeploy multiple compiled modules  (deployment admin API)
  //   - set statement listeners and subscribers while deploying
  // There is no need to obtain this lock when deploying or undeploying a single module.
  // The lock is reentrant and can be safely taken multiple times by the same thread.
  // Make sure you use "try" and "finally" just like we have it here.
}
finally {
  // Complete atomic management unit. 
  // Any events sent in by other threads will now continue processing against the changed set of statements.
  runtime.getRuntimeInstanceWideLock().writeLock().unlock();
}

A compiled module contains zero, one or multiple statements. You can attach callbacks (listeners, subscribers) to statements to receive results (aka push, observer pattern). You can iterate statement current results (aka. poll).

Each statement is uniquely identified in the runtime by the combination of deployment id and statement name. The compiler or runtime always assign a statement name if none was provided.

The EPStatement instance represents the statement. Your application receives statements when deploying a module by calling getStatements on EPDeployment.

Your application may also look up a statement by it's deployment id and statement name using the getStatement method on EPDeploymentService.

For NEsper .NET also see Section J.15, “.NET API - Receiving Statement Results”.

Esper provides three choices for your application to receive statement results. Your application can use all three mechanisms alone or in any combination for each statement. The choices are:

Table 16.2. Choices For Receiving Statement Results

NameMethods on EPStatementDescription
Listener CallbacksaddListener and removeListener

Your application provides implementations of the UpdateListener interface to the statement. Listeners receive EventBean instances containing statement results.

The runtime continuously indicates results to all listeners.

Subscriber ObjectsetSubscriber

Requires setting the allowSubscriber option on the compiler.

Your application provides a POJO (plain Java object) that exposes methods to receive statement results.

The name of the method that a subscriber object provides to receive results is update, unless your call to setSubscriber provides another method name.

The runtime continuously indicates results to the single subscriber.

This is the fastest method to receive statement results, as the runtime delivers strongly-typed results directly to your application objects without the need for building an EventBean result set.

There can be at most one subscriber object registered per statement. If you require more than one listener, use the listener instead (or in addition). The subscriber object is bound to the statement with a strongly typed support which ensure direct delivery of new events without type conversion. This optimization is made possible because there can only be zero or one subscriber object per statement.

Pull APIsafeIterator and iterator

Your application asks the statement for results and receives a set of events via java.util.Iterator<EventBean>.

This is useful if your application does not need continuous indication of new results in real-time.


Tip

The runtime calls application-provided update listeners and subscribers for output. These commonly encapsulate the actions to take when there is output. This design decouples statements from actions and places actions outside of EPL. It allows actions to change independently from statements: A statement does not need to be updated when its associated action(s) change.

While action-taking, in respect to the code or script taking action, is not a part of the EPL language, here are a few noteworthy points. Through the use of EPL annotations you can attach information to EPL that can be used by applications to flexibly determine actions. The insert into-clause can be used to send results into a further stream and input and output adapters or data flows can exist to process output events from that stream. Also the data flow EPStatementSource operator can be used to hook up actions declaratively. The DeploymentStateListener can inform your application of newly-deployed statements and currently-undeployed statements.

Your application may attach one or more listeners, zero or one single subscriber and in addition use the pull API on the same statement. There are no limitations to the use of iterator, subscriber or listener alone or in combination to receive statement results.

The best delivery performance can generally be achieved by attaching a subscriber and by not attaching listeners. The runtime is aware of the listeners and subscriber attached to a statement. The runtime uses this information internally to reduce statement overhead. For example, if your statement does not have listeners or a subscriber attached, the runtime does not need to continuously generate results for delivery.

If your application attaches both a subscriber and one or more listeners then the subscriber receives the result first before any of the listeners.

If your application attaches more than one listener then the UpdateListener listeners receive results in the order they were added to the statement. To change the order of delivery among listeners your application can add and remove listeners at runtime.

If you have configured outbound threading, it means a thread from the outbound thread pool delivers results to the subscriber and listeners instead of the processing or event-sending thread.

If outbound threading is turned on, we recommend turning off the runtime setting preserving the order of events delivered to listeners as described in Section 17.6.1.1, “Preserving the Order of Events Delivered to Listeners”. If outbound threading is turned on statement execution is not blocked for the configured time in the case a subscriber or listener takes too much time.

Note

The compiler option allowSubscriber must be set at compile-time.

A subscriber object is a direct binding of statement results to an object. The object, receives statement results via method invocation. The subscriber class does not need to implement an interface or extend a superclass. Only one subscriber object may be set for a statement.

Subscriber objects have several advantages over listeners. First, they offer a substantial performance benefit: Statement results are delivered directly to your method(s) through Java virtual machine method calls, and there is no intermediate representation (EventBean). Second, as subscribers receive strongly-typed parameters, the subscriber code tends to be simpler.

This chapter describes the requirements towards the methods provided by your subscriber class.

The runtime can deliver results to your subscriber in two ways:

  1. Each evert in the insert stream results in a method invocation, and each event in the remove stream results in further method invocations. This is termed row-by-row delivery.

  2. A single method invocation that delivers all rows of the insert and remove stream. This is termed multi-row delivery.

Your subscriber class must provide a method by name update to receive insert stream events row-by-row. The number and types of parameters declared by the update method must match the number and types of columns as specified in the select clause, in the same order as in the select clause.

For example, if your statement is:

select orderId, price, count(*) from OrderEvent

Then your subscriber update method looks as follows:

public class MySubscriber {
  ...
  public void update(String orderId, double price, long count) {...}
  ...
}

Each method parameter declared by the update method must be assignable from the respective column type as listed in the select-clause, in the order selected. The assignability rules are:

  • Widening of types follows Java standards. For example, if your select clause selects an integer value, the method parameter for the same column can be typed int, long, float or double (or any equivalent boxed type).

  • Auto-boxing and unboxing follows Java standards. For example, if your select clause selects an java.lang.Integer value, the method parameter for the same column can be typed int. Note that if your select clause column may generate null values, an exception may occur at runtime unboxing the null value.

  • Interfaces and super-classes are honored in the test for assignability. Therefore java.lang.Object can be used to accept any select clause column type

In the case that your subscriber class offers multiple update method footprints, the runtime selects the closest-matching footprint by comparing the output types and method parameter types. The runtime prefers the update method that is an exact match of types, followed by an update method that requires boxing or unboxing, followed by an update method that requires widening and finally any other allowable update method.

Within the above criteria, in the case that your subscriber class offers multiple update method footprints with same method parameter types, the runtime prefers the update method that has EPStatement as the first parameter.

In place of row-by-row delivery, your subscriber can receive all events in the insert and remove stream via a single method invocation. This is applicable when an EPL delivers multiple output rows for a given input event or time advancing, for example when multiple pattern matches occur for the same incoming event, for a join producing multiple output rows or with output rate limiting, for example.

The event delivery follow the scheme as described earlier in Section 16.5.2.2.2, “Row Delivery as Map and Object Array ”. The subscriber class must provide one of the following methods:


For NEsper .NET also see Section J.16, “.NET API - Adding Listeners”.

Your application can subscribe to updates posted by a statement via the addListener and removeListener methods on EPStatement . Your application must to provide an implementation of the UpdateListener interface to the statement:

UpdateListener myListener = new MyUpdateListener();
countStmt.addListener(myListener);

Statements publish old data and new data to registered UpdateListener listeners. New data published by statements is the events representing the new values of derived data held by the statement. Old data published by statements consists of the events representing the prior values of derived data held by the statement.

Important

UpdateListener listeners receive multiple result rows in one invocation by the runtime: the new data and old data parameters to your listener are array parameters. For example, if your application uses one of the batch data windows, or your application creates a pattern that matches multiple times when a single event arrives, then the runtime indicates such multiple result rows in one invocation and your new data array carries two or more rows.

To indicate results the runtime invokes the following method on UpdateListener listeners: update(EventBean[] newEvents, EventBean[] oldEvents, EPStatement statement, EPRuntime runtime)

Subscribing to events posted by a statement is following a push model. The runtime pushes data to listeners when events are received that cause data to change or patterns to match. Alternatively, you need to know that statements serve up data that your application can obtain via the safeIterator and iterator methods on EPStatement. This is called the pull API and can come in handy if your application is not interested in all new updates, and only needs to perform a frequent or infrequent poll for the latest data.

The safeIterator method on EPStatement returns a concurrency-safe iterator returning current statement results, even while concurrent threads may send events into the runtime for processing. The runtime employs a read-write lock per context partition and obtains a read lock for iteration. Thus safe iterator guarantees correct results even as events are being processed by other threads and other context partitions. The cost is that the iterator obtains and holds zero, one or multiple context partition locks for that statement that must be released via the close method on the SafeIterator instance.

The iterator method on EPStatement returns a concurrency-unsafe iterator. This iterator is only useful for applications that are single-threaded, or applications that themselves perform coordination between the iterating thread and the threads that send events into the runtime for processing. The advantage to this iterator is that it does not hold a lock.

When statements are used with contexts and context partitions, the APIs to identify, filter and select context partitions for statement iteration are described in Section 16.16, “Context Partition Selection”.

The next code snippet shows a short example of use of safe iterators:

EPStatement statement = epAdmin.createEPL("select avg(price) as avgPrice from MyTick");
// .. send events into the runtime
// then use the pull API...
SafeIterator<EventBean> safeIter = statement.safeIterator();
try {
  for (;safeIter.hasNext();) {
     // .. process event ..
     EventBean event = safeIter.next();
     System.out.println("avg:" + event.get("avgPrice");
  }
}
finally {
  safeIter.close();	// Note: safe iterators must be closed
}

This is a short example of use of the regular iterator that is not safe for concurrent event processing:

double averagePrice = (Double) eplStatement.iterator().next().get("average");

The safeIterator and iterator methods can be used to pull results out of all statements, including statements that join streams, contain aggregation functions, pattern statements, and statements that contain a where clause, group by clause, having clause or order by clause.

For statements without an order by clause, the iterator method returns events in the order maintained by the data window. For statements that contain an order by clause, the iterator method returns events in the order indicated by the order by clause.

Consider using the on-select clause and a named window if your application requires iterating over a partial result set or requires indexed access for fast iteration; Note that on-select requires that you sent a trigger event, which may contain the key values for indexed access.

Esper places the following restrictions on the pull API and usage of the safeIterator and iterator methods:

  1. In multithreaded applications, use the safeIterator method. Note: make sure your application closes the iterator via the close method when done, otherwise the iterated statement context partitions stay locked and event processing for statement context partitions does not resume.

  2. In multithreaded applications, the iterator method does not hold any locks. The iterator returned by this method does not make any guarantees towards correctness of results and fail-behavior, if your application processes events into the runtime by multiple threads. Use the safeIterator method for concurrency-safe iteration instead.

  3. Since the safeIterator and iterator methods return events to the application immediately, the iterator does not honor an output rate limiting clause, if present. That is, the iterator returns results as if there is no output-rate clause for the statement in statements without grouping or aggregation. For statements with grouping or aggregation, the iterator in combination with an output clause returns last output group and aggregation results. Use a separate statement and the insert into clause to control the output rate for iteration, if so required.

  4. When iterating a statement that operates on an unbound stream (no data window declared), please note the following:

    • When iterating a statement that groups and aggregates values from an unbound stream and that specifies output snapshot, the runtime retains groups and aggregations for output as iteration results or upon the output snapshot condition .

    • When iterating a statement that groups and aggregates values from an unbound stream and that does not specify output snapshot, the runtime only retains the last aggregation values and the iterated result contains only the last updated group.

    • When iterating a statement that operates on an unbound stream the iterator returns no rows. This behavior can be changed by specifying either the @IterableUnbound annotation or by changing the global view resources configuration.

An EventBean object represents a row (event) in your statement's result set. Each EventBean object has an associated EventType object providing event metadata.

An UpdateListener implementation receives one or more EventBean events with each invocation. Via the iterator method on EPStatement your application can poll or read data out of statements. Statement iterators also return EventBean instances.

Each statement provides the event type of the events it produces, available via the getEventType method on EPStatement.

An EventType object encapsulates all the metadata about a certain type of events. As Esper supports an inheritance hierarchy for event types, it also provides information about super-types to an event type.

An EventType object provides the following information:

For each property of an event type, there is an EventPropertyDescriptor object that describes the property. The EventPropertyDescriptor contains flags that indicate whether a property is an indexed (array) or a mapped property and whether access to property values require an integer index value (indexed properties only) or string key value (mapped properties only). The descriptor also contains a fragment flag that indicates whether a property value is available as a fragment.

The term fragment means an event property value that is itself an event, or a property value that can be represented as an event. The getFragmentType on EventType may be used to determine a fragment's event type in advance.

A fragment event type and thereby fragment events allow navigation over a statement's results even if the statement result contains nested events or a graph of events. There is no need to use the Java reflection API to navigate events, since fragments allow the querying of nested event properties or array values, including nested Java classes.

When using the Map or Object-array event representation, any named Map type or Object-array type nested within a Map or Object-array as a simple or array property is also available as a fragment. When using Java objects either directly or within Map or Object-array events, any object that is neither a primitive or boxed built-in type, and that is not an enumeration and does not implement the Map interface is also available as a fragment.

The nested, indexed and mapped property syntax can be combined to a property expression that may query an event property graph. Most of the methods on the EventType interface allow a property expression to be passed.

Your application may use an EventType object to obtain special getter-objects. A getter-object is a fast accessor to a property value of an event of a given type. All getter objects implement the EventPropertyGetter interface. Getter-objects work only for events of the same type or sub-types as the EventType that provides the EventPropertyGetter. The performance section provides additional information and samples on using getter-objects.

Consider a statement that returns the symbol, count of events per symbol and average price per symbol for tick events. Our sample statement uses the event type: StockTickEvent. Assume that this event type was declared previously and exposes a symbol property of type String and a price property of type (Java primitive) double.

select symbol, avg(price) as avgprice, count(*) as mycount 
from StockTickEvent 
group by symbol

The next table summarizes the property names and types as posted by the statement above:


A code snippet out of a possible UpdateListener implementation to this statement may look as below:

String symbol = (String) newEvents[0].get("symbol");
Double price= (Double) newEvents[0].get("avgprice");
Long count= (Long) newEvents[0].get("mycount");

The runtime supplies the boxed java.lang.Double and java.lang.Long types as property values rather than primitive Java types. This is because aggregated values can return a null value to indicate that no data is available for aggregation. Also, in a select statement that computes expressions, the underlying event objects to EventBean instances are either of type Object[] (object-array) or of type java.util.Map.

Use statement.getEventType().getUnderlyingType() to inspect the underlying type for all events delivered to listeners. Whether the runtime delivers Map or Object-array events to listeners can be specified as follows. If the statement provides the @EventRepresentation(objectarray) annotation the runtime delivers the output events as object array. If the statement provides the @EventRepresentation(map) annotation the runtime delivers output events as a Map. If neither annotation is provided, the runtime delivers the configured default event representation as discussed in Section 17.4.9.1, “Default Event Representation”.

Consider the next statement that specifies a wildcard selecting the same type of event:

select * from StockTickEvent where price > 100

The property names and types provided by an EventBean query result row, as posted by the statement above are as follows:


As an alternative to querying individual event properties via the get methods, the getUnderlying method on EventBean returns the underlying object representing the statement result. In the sample statement that features a wildcard-select, the underlying event object is of type org.sample.StockTickEvent:

StockTickEvent tick = (StockTickEvent) newEvents[0].getUnderlying();

Composite events are events that aggregate one or more other events. Composite events are typically created by the runtime for statements that join two event streams, and for event patterns in which the causal events are retained and reported in a composite event. The example below shows such an event pattern.

// Look for a pattern where BEvent follows AEvent
select * from pattern [a=AEvent -> b=BEvent]
// Example listener code
public class MyUpdateListener implements UpdateListener {
  public void update(EventBean[] newData, EventBean[] oldData, EPStatement statement, EPRuntime runtime) {
    System.out.println("a event=" + newData[0].get("a"));
    System.out.println("b event=" + newData[0].get("b"));
  }
}

Note that the update method can receive multiple events at once as it accepts an array of EventBean instances. For example, a time batch window may post multiple events to listeners representing a batch of events received during a given time period.

Pattern statements can also produce multiple events delivered to update listeners in one invocation. The pattern statement below, for instance, delivers an event for each A event that was not followed by a B event with the same id property within 60 seconds of the A event. The runtime may deliver all matching A events as an array of events in a single invocation of the update method of each listener to the statement:

select * from pattern[every a=A -> (timer:interval(60 sec) and not B(id=a.id))]

A code snippet out of a possible UpdateListener implementation to this statement that retrives the events as fragments may look as below:

EventBean a = (EventBean) newEvents[0].getFragment("a");
// ... or using a nested property expression to get a value out of A event...
double value = (Double) newEvent[0].get("a.value");

Some pattern objects return an array of events. An example is the unbound repeat operator. Here is a sample pattern that collects all A events until a B event arrives:

select * from pattern [a=A until b=B]

A possible code to retrieve different fragments or property values:

EventBean[] a = (EventBean[]) newEvents[0].getFragment("a");
// ... or using a nested property expression to get a value out of A event...
double value = (Double) newEvent[0].get("a[0].value");

The Esper compiler and runtime use the EPType interface for tracking all Java (or C# for .NET) type information. The EPType interface can be found in package com.espertech.esper.common.client.type.

EPL uses three-valued logic and thus null is a viable type. More information can be found at Section 2.19, “Basic Null”. The null-type is represented by EPTypeNull.

EPL supports un-parameterized types and a type that is not parameterized is represented by EPTypeClass.

EPL also supports parameterized types and a type that is parameterized is represented by EPTypeClassParameterized. Since an instance of java.lang.Class does not provide information about actual type parameters (aka. type erasure) and since there is no null-type class the compiler and runtime use EPType instead of java.lang.Class.

To obtain the event property type use the getPropertyEPType method of EventType or EventPropertyDescriptor. Please consult the JavaDoc for additional information.

As discussed in Section 5.2.7, “Annotation” an EPL annotation is an addition made to statement information. The API and examples to interrogate annotations are described here.

You may use the getAnnotations method of EPStatement to obtain annotations specified for a statement. Or when compiling an EPL expression to a EPStatementObjectModel statement object model you may also query, change or add annotations.

The following example code demonstrates iterating over an EPStatement statement's annotations and retrieving values:

String exampleEPL = "@Tag(name='direct-output', value='sink 1') select * from RootEvent";
Configuration configuration = new Configuration();
configuration.getCommon().addEventType("RootEvent", Collections.emptyMap()); // add an event type without properties
CompilerArguments compilerArguments = new CompilerArguments(configuration);
EPCompiled compiled = EPCompilerProvider.getCompiler().compile(stmt, compilerArguments);

EPDeployment deployment = runtime.getDeploymentService().deploy(compiled);
EPStatement stmt = deployment.getStatements()[0];
for (Annotation annotation : stmt.getAnnotations()) {
  if (annotation instanceof Tag) {
    Tag tag = (Tag) annotation;
    System.out.println("Tag name " + tag.name() + " value " + tag.value());
  }
}

The output of the sample code shown above is Tag name direct-output value sink 1.

The EPEventService interface is used to send events and advance time. Obtain the event service from a runtime by calling getEventService on EPRuntime.

This section focuses on processing events. For more information on controlling time using the event service please skip forward to Section 16.9, “Controlling Time-Keeping”.

Your application invokes any of the sendEventType methods listed below and must provide an event type name along with the actual event object:


The Chapter 3, Event Representations section explains the types of event representations.

The below sample code assumes that the event type name MarketDataBean refers to a class event representation that matches the class MarketDataBean:

EPRuntime runtime = EPRuntimeProvider.getDefaultRuntime();
EPEventService eventService = runtime.getEventService();

// Send an example event containing stock market data
eventService.sendEventBean(new MarketDataBean("IBM", 75.0), "MarketDataBean");

Tip

Events, in theoretical terms, are observations of a state change that occurred in the past. Since you cannot change an event that happened in the past, events are best modelled as immutable objects.

If you find that your application requires processing events and control over time only for specific deployments and not for other deployments, please read up on Section 16.21, “Stages”.

Caution

The runtime relies on events that are sent into the runtime to not change their state. Typically, applications create a new event object for every new event, to represent that new event. Application should not modify an existing event that was sent into the runtime.

Important

Another important method in the runtime interface are the routeEventType methods. These methods are designed for use by UpdateListener and subscriber implementations as well as runtime extensions that need to send events into a runtime to avoid the possibility of a stack overflow due to nested calls to sendEvent and to ensure correct processing of the current and routed event. Note that if outbound-threading is enabled, listeners and subscribers should use sendEvent and not routeEvent.

The EventSender interface processes event objects that are of a known type. This facility can reduce the overhead of event object reflection and type lookup as an event sender is always associated to a single concrete event type.

Use the method getEventSender(String eventTypeName) to obtain an event sender for processing events of the named type:

EventSender sender = runtime.getEVentService().getEventSender("MyEvent");
sender.sendEvent(myEvent);

For events backed by a Java class (JavaBean events), the event sender ensures that the event object equals the underlying class, or implements or extends the underlying class for the given event type name.

For events backed by a java.util.Map (Map events), the event sender does not perform any checking other than checking that the event object implements Map.

For events backed by a Object[] (Object-array events), the event sender does not perform any checking other than checking that the event object implements Object[]. The array elements must be in the exact same order of properties as declared and array length must always be at least the number of properties declared.

For JSON events, the event sender checks that the event object is a string-type value or is an JsonEventUnderlying object returned by the parse method of EventSenderJson. The JSON document should match the fields defined in create schema.

For Avro events backed by a Apache Avro GenericData.Record, the event sender does not perform any checking other than checking that the event object is a GenericData.Record. The schema associated to the record should match the event type's Avro schema.

For events backed by a org.w3c.Node (XML DOM events), the event sender checks that the root element name equals the root element name for the event type.

The EPFireAndForgetService interface offers methods to execute fire-and-forget queries. Obtain the fire-and-forget service from a runtime by calling getFireAndForgetService on EPRuntime.

As your application may not require streaming results and may not know each statement in advance, the fire-and-forget query facility provides for ad-hoc on-demand execution of an EPL query.

Fire-and-forget queries are not continuous in nature: The fire-and-forget query runtime executes the query once and returns all result rows to the application. Fire-and-forget query execution is very lightweight as the runtime performs no statement deployment and the query leaves no traces within the runtime.

Esper provides the facility to explicitly index named windows and tables to speed up fire-and-forget queries and statements. Please consult Section 6.9, “Explicitly Indexing Named Windows and Tables” for more information.

When named windows and tables are used with contexts and context partitions, the APIs to identify, filter and select context partitions for fire-and-forget queries can be found in Section 16.16, “Context Partition Selection”.

There are three ways to run fire-and-forget queries:

  1. Use the executeQuery method to executes a given fire-and-forget query exactly once, see Section 16.7.1, “Fire-and-forget Query Single Execution”.

  2. Use the prepareQuery method to prepare a given fire-and-forget query such that the same query can be executed multiple times, see Section 16.7.2, “Fire-and-forget Query Prepared Unparameterized Execution”.

  3. Use the prepareQueryWithParameters method to prepare a given fire-and-forget query that may have substitution parameters such that the same query can be parameterized and executed multiple times without repeated parsing, see Section 16.7.3, “Fire-and-forget Query Prepared Parameterized Execution”

If your application must execute the same fire-and-forget query multiple times with different parameters use prepareQueryWithParameters.

If your application must execute the same fire-and-forget query multiple times without parameters use either prepareQuery or prepareQueryWithParameters and specify no substitution parameters.

By using any of the prepare... methods the runtime can load the byte code for the query once and reuse the byte code and thereby speed up repeated execution.

The following limitations apply:

  • A fire-and-forget only evaluates against the named windows and tables that your application creates. Fire-and-forget queries may not specify any other streams or application event types.

  • The following clauses are not allowed in fire-and-forget EPL queries: insert into and output.

  • Data windows and patterns are not allowed to appear in fire-and-forget queries.

  • Fire-and-forget EPL may not perform subqueries.

  • The previous and prior functions may not be used.

Prepared fire-and-forget queries are designed for repeated execution and may perform better then the dynamic single-execution method if running the same query multiple times. For use with parameter placeholders please see Section 16.7.3, “Fire-and-forget Query Prepared Parameterized Execution”.

The next code snippet demonstrates prepared fire-and-forget queries without parameter placeholder:

String query = "select * from MyNamedWindow where orderId = '123'";
CompilerArguments compilerArguments = new CompilerArguments();
compilerArguments.getPath().add(runtime.getRuntimePath());
EPCompiled compiled = EPCompilerProvider.getCompiler().compileQuery(query, compilerArguments);
						
EPFireAndForgetPreparedQuery prepared = runtime.getFireAndForgetService().prepareQuery(compiled);
EPFireAndForgetQueryResult result = prepared.execute();

// ...later on execute once more ...
prepared.execute();	// execute a second time

Please see the compiler documentation for specifying substitution parameters.

All substitution parameters must be replaced by actual values before a fire-and-forget query with substitution parameters can be executed. Substitution parameters can be replaced with an actual value using the setObject method for each index or name. Substitution parameters can be set to new values and the query executed more than once.

While the setObject method allows substitution parameters to assume any actual value including application Java objects or enumeration values, the application must provide the correct type of substitution parameter that matches the type that was specified, if any, and the requirements of the expression the parameter resides in.

The next program listing runs a prepared and parameterized fire-and-forget query against a named window MyNamedWindow (this example does not assign names to substitution parameters):

String query = "select * from MyNamedWindow where orderId = ?::string";
CompilerArguments compilerArguments = new CompilerArguments();
compilerArguments.getPath().add(runtime.getRuntimePath());
EPCompiled compiled = EPCompilerProvider.getCompiler().compileQuery(query, compilerArguments);

EPFireAndForgetPreparedQueryParameterized prepared = runtime.getFireAndForgetService().prepareQueryWithParameters(query);

// Set the required parameter values before each execution
prepared.setObject(1, "123");
EPFireAndForgetQueryResult result = runtime.getFireAndForgetService().executeQuery(prepared);

// ...execute a second time with new parameter values...
prepared.setObject(1, "456");
result = runtime.getFireAndForgetService().executeQuery(prepared);

This second example uses the in operator and has multiple parameters:

String query = "select * from MyNamedWindow where orderId in (?::string[]) and price > ?::double";
CompilerArguments compilerArguments = new CompilerArguments();
compilerArguments.getPath().add(runtime.getRuntimePath());
EPCompiled compiled = EPCompilerProvider.getCompiler().compileQuery(query, compilerArguments);

EPFireAndForgetPreparedQueryParameterized prepared = runtime.getFireAndForgetService().prepareQueryWithParameters(compiled);
prepared.setObject(1, new String[] {"123", "456"});
prepared.setObject(2, 1000.0);

The from-clause can have an SQL query:

... from sql:database_name [" parameterized_sql_query "]

The Esper runtime executes such fire-and-forget queries with SQL using the JDBC API. Please also refer to Section 5.13, “Accessing Relational Data via SQL” for more information.

The sample EPL below returns the results of an SQL query against the database named MyCustomerDB and table Customer:

select * from sql:MyCustomerDB ['select cust_name from Customer where cust_id = 10']

Tip

  • Use prepared execution (unparameterized or parameterized) when executing the same SQL query multiple times with same or different parameters. This allows the Esper runtime to reuse the JDBC java.sql.Connection and JDBC PreparedStatement. EPL Expressions including expressions with (public) variables and substitution parameters can be placed within ${...} within the SQL text.

  • To execute a query once an application can use single execution (see above). In single execution the runtime obtains a new JDBC java.sql.Connection, creates a JDBC PreparedStatement and discards the JDBC connection and JDBC statement after execution. This is not recommended for best performance.

This code snippet illustrates executing the SQL query by fire-and-forget API prepared unparameterized execution:

String query = "select * from sql:MyCustomerDB ['select cust_name from Customer where cust_id = 10";
EPCompiled compiled = EPCompilerProvider.getCompiler().compileQuery(query, new CompilerArguments());
EPFireAndForgetPreparedQuery prepared = runtime.getFireAndForgetService().prepareQuery(compiled);
EPFireAndForgetQueryResult result = prepared.execute();

// ...close() ...
prepared.close();	// close when done executing

Warning

Call the close method of EPFireAndForgetPreparedQuery and EPFireAndForgetPreparedQueryParameterized to release JDBC resources held by the runtime.

The following limitations apply:

  • Joins are not allowed when using sql: SQL queries in the from-clause however the SQL itself can be any SQL including a join.

  • A context cannot be specified.

For NEsper .NET also see ???.

The runtime is designed from the ground up to operate as a component to multi-threaded, highly-concurrent applications that require efficient use of Java VM resources. In addition, multi-threaded execution requires guarantees in predictability of results and deterministic processing. This section discusses these concerns in detail.

In Esper, a runtime instance is a unit of separation. Applications can obtain and discard (initialize) one or more runtime instances within the same Java VM and can provide the same or different configurations to each instance. A runtime instance shares resources between statements by means of named windows, tables and variables.

Applications can use Esper APIs to concurrently, by multiple threads of execution, perform such functions as deploying modules, or sending events into the runtime for processing. Applications can use application-managed threads or thread pools or any set of same or different threads of execution with any of the public runtime APIs. There are no restrictions towards threading other than those noted in specific sections of this document.

The runtime does not prescribe a specific threading model. Applications using Esper retain full control over threading, allowing a runtime to be easily embedded and used as a component or library in your favorite Java container or process.

In the default configuration it is up to the application code to use multiple threads for processing events by the runtime, if so desired. All event processing takes places within your application thread call stack. The exception is timer-based processing if your runtime relies on the internal timer (default). If your application relies on external timer events instead of the internal timer then there need not be any runtime-managed internal threads.

The fact that event processing can take place within your application thread's call stack makes developing applications with the Esper runtime easier: Any common Java integrated development environment (IDE) can host a compiler and runtime instance. This allows developers to easily set up test cases, debug through listener code and inspect input or output events, or trace their call stack.

In the default configuration, each runtime maintains a single timer thread (internal timer) providing for time or schedule-based processing within the runtime. The default resolution at which the internal timer operates is 100 milliseconds. The internal timer thread can be disabled and applications can instead advance time to perform timer or scheduled processing at the resolution required by an application.

A runtime performs minimal locking to enable high levels of concurrency. A runtime locks on the combination of query and context partition to protect context partition resources. For example, two statements with three partitions each have a total of six locks. For stateless EPL select-statements the runtime does not use a context-partition lock and operates lock-free for the context partition. For stateful statements, the maximum (theoretical) degree of parallelism is 2^31-1 (2,147,483,647) parallel threads working to process a single statement under a hash segmented context.

For named windows and tables, on-select, on-merge, on-update and on-delete all execute under the same lock as the partition of the named window or table. Any insert into produces a new event for the work queue and does not lock the target of the insert-into.

Tip

For logging of lock activity please enable runtime lock activity logging as described in Section 17.6.2.3, “Lock Activity Logging”.

You may turn off context partition locking runtime-wide (also read the caution notice) as described in Section 17.6.10.4, “Disable Locking”. You may disable context partition locking for a given statement by providing the @NoLock annotation as part of your EPL. Note, Esper provides the @NoLock annotation for the purpose of identifying locking overhead, or when your application is single-threaded, or when using an external mechanism for concurrency control or for example with virtual data windows or plug-in data windows to allow customizing concurrency for a given statement or named window. Using this annotation may have unpredictable results unless your application is taking concurrency under consideration.

For a runtime to produce predictable results from the viewpoint of listeners to statements, a runtime by default ensures that it dispatches statement result events to listeners in the order in which a statement produced result events. Applications that require the highest possible concurrency and do not require predictable order of delivery of events to listeners, this feature can be turned off via configuration, see Section 17.6.1.1, “Preserving the Order of Events Delivered to Listeners”. For example, assume thread T1 processes an event applied to statement S producing output event O1. Assume thread T2 processes another event applied to statement S and produces output event O2. The runtime employs a configurable latch system to ensure that listeners to statement S receive and may complete processing of O1 before receiving O2. When using outbound threading (advanced threading options) or changing the configuration this guarantee is weakened or removed.

In multithreaded environments, when one or more statements make result events available via the insert into clause to further statements, the runtime preserves the order of events inserted into the generated insert-into stream, allowing statements that consume other statement's events to behave deterministic. This feature can also be turned off via configuration, see Section 17.6.1.2, “Preserving the Order of Events for Insert-Into Streams”. For example, assume thread T1 processes an event applied to statement S and thread T2 processes another event applied to statement S. Assume statement S inserts into stream ST. T1 produces an output event O1 for processing by consumers of ST1 and T2 produces an output event O2 for processing by consumers of ST. The runtime employs a configurable latch system such that O1 is processed before O2 by consumers of ST. When using route execution threading (advanced threading options) or changing the configuration this guarantee is weakened or removed.

We generally recommended that listener implementations block minimally or do not block at all. By implementing listener code as non-blocking code execution threads can often achieve higher levels of concurrency.

We recommended that, when using a single listener or subscriber instance to receive output from multiple statements, that the listener or subscriber code is multithread-safe. If your application has shared state between listener or subscriber instances then such shared state should be thread-safe.

In the default configuration the same application thread that invokes any of the sendEvent methods will process the event fully and also deliver output events to listeners and subscribers. By default the single internal timer thread based on system time performs time-based processing and delivery of time-based results.

This default configuration reduces the processing overhead associated with thread context switching, is lightweight and fast and works well in many environments such as J2EE, server or client. Latency and throughput requirements are largely use case dependent, and Esper provides runtime-level facilities for controlling concurrency that are described next.

Inbound Threading queues all incoming events: A pool of runtime-managed threads performs the event processing. The application thread that sends an event via any of the sendEvent methods returns without blocking.

Outbound Threading queues events for delivery to listeners and subscribers, such that slow or blocking listeners or subscribers do not block event processing.

Timer Execution Threading means time-based event processing is performed by a pool of runtime-managed threads. With this option the internal timer thread (or external timer event) serves only as a metronome, providing units-of-work to the runtime-managed threads in the timer execution pool, pushing threading to the level of each statement for time-based execution.

Route Execution Threading means that the thread sending in an event via any of the sendEvent methods (or the inbound threading pooled thread if inbound threading is enabled) only identifies and pre-processes an event, and a pool of runtime-managed threads handles the actual processing of the event for each statement, pushing threading to the level of each statement for event-arrival-based execution.

The runtime starts runtime-managed threads as daemon threads when the runtime instance is first obtained. The runtime stops runtime-managed threads when the runtime instance is destroyed via the destroy method. When the runtime is initialized via the initialize method the existing runtime-managed threads are stopped and new threads are created. When shutting down your application, use the destroy method to stop runtime-managed threads.

Note that the options discussed herein may introduce additional processing overhead into your system, as each option involves work queue management and thread context switching.

If your use cases require loss-less processing of events, wherein the threading options mean that events are held in an in-memory queue, the threading options described herein may not be the right choice.

Care should be taken to consider arrival rates and queue depth. Threading options utilize unbound queues or capacity-bound queues with blocking-put, depending on your configuration, and may therefore introduce an overload or blocking situation to your application. You may use the service provider interface as outlined below to manage queue sizes, if required, and to help tune the runtime to your application needs. Consider throttling down the event send rate when the API (see below) indicates that events are getting queued.

All threading options are on the level of a runtime. If you require different threading behavior for certain statements then consider using multiple runtimes, consider using the routeEvent method or consider using application threads instead.

Please consult Section 17.6.1, “Runtime Settings Related to Concurrency and Threading” for instructions on how to configure threading options. Threading options take effect at runtime initialization time.

The runtime processes event by event. It only processes the next event when processing of the current event has completed.

For the purpose of processing event-by-event, the runtime maintains a thread-specific ordered work queue. In the default configuration the order of events in the work queue is dictated by the order of execution of insert-into.

New events get added to the end of the work queue (exceptions highlighted below) unless events have an event-precedence (see below). The runtime works off each event in the work queue completely, beginning with the oldest event, before processing the next newer event in the work queue.

The runtime processes the current event against all statements completely before delivering results to listeners and subscribers and before processing the next event. This is true regardless whether a given event was sent in via the API or whether the event was produced by a statement and insert-into. The order is:

For example, assume three statements named statement-1, statement-2 and statement-3:

@name('statement-1') select * from MyEvent;
@name('statement-2') insert into ABCStream select * from MyEvent;
@name('statement-3') select * from ABCStream;

When a MyEvent event arrives, the order of processing is:

Among all events generated by insert-into of statements and the events routed into the runtime via the routeEvent method, all events that insert-into a named window or table are processed first in the order generated. All other events are processed thereafter in the order they were generated (except when using event-precedence).

If your application requires a defined order of processing among the events that are generated by insert-into, use the event-precedence syntax. Please see Section 5.10.10, “Insert Into and Event Precedence” and Section 17.6.10.2, “Event-Precedence Execution” for more information.

When enabling timer or route execution threading as explained under advanced threading options then the runtime does not make any guarantee to the processing order except that is will prioritize events inserted into a named window.

There are two modes for a runtime to keep track of time: The internal timer based on JVM system time (the default), and externally-controlled (aka. event time) time giving your application full control over the concept of time within a runtime.

By default the internal timer provides time and evaluates schedules. External clocking i.e. event time can be used to supply time ticks to the runtime instead. The latter is useful for when events themselves provide the time to advance. External clocking also helps in testing time-based event sequences or for synchronizing the runtime with an external time source.

The internal timer relies on the java.util.concurrent.ScheduledThreadPoolExecutor class for time tick events. The next section describes timer resolution for the internal timer, by default set to 100 milliseconds but is configurable via the threading options. When using externally-controlled time the timer resolution is in your control.

To disable the internal timer and use externally-provided time instead, there are two options. The first option is to use the configuration API at runtime initialization time. The second option toggles on and off the internal timer at runtime, via special timer control events that are sent into the runtime like any other event.

If using a timer execution thread pool as discussed above, the internal timer or external time event provide the schedule evaluation however do not actually perform the time-based processing. The time-based processing is performed by the threads in the timer execution thread pool.

Tip

External and internal/system time is the same internally to the runtime thus the runtime behaves the same whether using external or internal timer.

This code snippet shows the use of the configuration API to disable the internal timer and thereby turn on externally-provided time (see the Configuration section for configuration via XML file):

Configuration config = new Configuration();
config.getRuntime().getThreading().setInternalTimerEnabled(false);
EPRuntime runtime = EPRuntimeProvider.getDefaultRuntime(config);

After disabling the internal timer, it is wise to set a defined time so that any statements created thereafter start relative to the time defined. Use the advanceTime method to indicate current time to the runtime and to move time forward for the runtime (a.k.a application-time model).

This code snippet obtains the current time and advances time:

long timeInMillis = System.currentTimeMillis();
runtime.getEventService().advanceTime(timeInMillis);

To enable or disable the internal timer by API call use the clockExternal and clockInternal methods of EPEventService.

The next code snippet demonstrates toggling to external time:

EPRuntime runtime = EPRuntimeProvider.getDefaultRuntime();
EPEventService eventService = runtime.getEventService();
// switch to external clocking
eventService.clockExternal();

The advanceTime method moves the time forward. All aspects of runtime current time related to statements and patterns are driven by the time that your application advances to.

The next example sequence of instructions sets time to zero, then creates a statement, then moves time forward to 1 seconds later and then 6 seconds later:

// Set start time at zero.
runtime.getEventService().advanceTime(0);

// deploy a module here
// sample EPL: select current_timestamp() as ct from pattern[every timer:interval(1 minute)]
runtime.getDeploymentService().deploy(compiled); // compiled is a module you compiled earlier

// move time forward 1 second
runtime.getEventService().advanceTime(1000);

// move time forward 5 seconds
runtime.getEventService().advanceTime(6000);

When advancing time your application should make sure values are ascending. That is, each time value should be either the same value or a larger value then the prior value provided.

Your application may use the getNextScheduledTime method in EPEventService to determine the earliest time a schedule for any statement requires evaluation.

The following code snippet sets the current time, creates a statement and prints the next scheduled time which is 1 minute later then the current time:

// set start time to the current time.
runtime.getEventService().advanceTime(System.currentTimeMillis());

// deploy a module
// sample EPL: select current_timestamp() as ct from pattern[every timer:interval(1 minute)]
runtime.getDeploymentService().deploy(compiled); // compiled is a module you compiled earlier

// print next schedule time
System.out.println("Next schedule at " + new Date(runtime.getEventService().getNextScheduledTime()));

Warning

You may not use advanceTime or advanceTimeSpan to control time when using internal timer since the internal timer tracks system time and must be the only source of time.

The advanceTime method allows your application to advance runtime time to a given point in time. In addition, the getNextScheduledTime method in EPEventService returns the next scheduled time according to started statements. You would typically use advanceTime to advance time at a relatively high resolution i.e. milliseconds or microseconds.

To advance time for a span of time without individual calls to advanceTime the API provides the method advanceTimeSpan. The advanceTimeSpan method can accept a resolution parameter.

If your application provides the target end time of a time span to the advanceTimeSpan method and does not provide a resolution, the runtime advances time up to the target time by stepping through all relevant times according to started statements.

If your application provides the target end time of a time span and in addition a long-typed resolution to the advanceTimeSpan method the runtime advances time up to the target time by incrementing time according to the resolution (regardless of next scheduled time according to started statements).

Consider the following example:

// Set start time to Jan.1, 2010, 00:00 am for this example
SimpleDateFormat format = new SimpleDateFormat("yyyy MM dd HH:mm:ss SSS");
Date startTime = format.parse("2010 01 01 00:00:00 000");
runtime.getEventService().advanceTime(startTime.getTime());

// deploy a module
// sample EPL: select current_timestamp() as ct from pattern[every timer:interval(1 minute)]
runtime.getDeploymentService().deploy(compiled); // compiled is a module you compiled earlier
stmt.addListener(...);	// add a listener

// Advance time to 10 minutes after start time
runtime.getEventService().advanceTimeSpan(startTime.getTime() + 10*60*1000));

The above example advances time to 10 minutes after the start time using the advanceTimeSpan method. As the example does not pass a resolution, the runtime advances time according to statement schedules. Upon calling the advanceTimeSpan method the listener sees 10 invocations for minute 1 to minute 10.

To advance time according to a given resolution, you may provide the resolution as shown below:

// Advance time to 10 minutes after start time at 100 msec resolution
runtime.getEventService().advanceTimeSpan(startTime.getTime() + 10*60*1000, 100);

Time can have a resolution of either milliseconds or microseconds.

The default time resolution is milliseconds. To configure the runtime for microsecond resolution, please see Section 17.4.14.1, “Time Unit”.


A few notes on usage of microsecond time unit for time resolution:

  • The runtime automatically computes time periods into microseconds. For example 1 minute 2 seconds is 62000000 microseconds (62 * 1000000).

  • The runtime automatically computes time-in-second parameters into microseconds. For example 5.02 seconds is 5020000 microseconds.

  • The runtime automatically computes ISO schedules, crontabs and hints related to runtime time into microseconds.

  • The CurrentTimeSpanEvent or CurrentTimeEvent events must provide microsecond values.

  • Date-time methods with long-type input values assume microsecond values.

  • Date-time methods or other functions that take millisecond parameters or produce millisecond values still consume/produce millisecond values, such as the date-time method toMillisec.

  • The internal timer must be disabled (setInternalTimerEnabled(false)) and TimerControlEvent.ClockType.CLOCK_INTERNAL cannot be used.

You may register one or more exception handlers for the runtime to invoke in the case it encounters an exception processing a continuously-executing statement. By default and without exception handlers the runtime cancels execution of the current statement that encountered the exception, logs the exception and continues to the next statement, if any. The configuration is described in Section 17.6.11, “Runtime Settings Related to Exception Handling”.

If your application registers exception handlers as part of runtime configuration, the runtime invokes the exception handlers in the order they are registered passing relevant exception information such as statement name, expression and the exception itself.

Exception handlers receive any statement unchecked exception such as internal exceptions or exceptions thrown by plug-in aggregation functions or plug-in data windows. The runtime does not provide to exception handlers any exceptions thrown by static method invocations for function calls, method invocations in joins, methods on variables and event classes and listeners or subscriber exceptions.

An exception handler can itself throw a runtime exception to cancel execution of the current event against any further statements.

Note

Exceptions are meant to indicate an actual unexpected problem.

We do not recommend explicitly throwing exceptions for the purpose of flow control, preempting execution or other normal situations.

The runtime does not guarantee that throwing an exception has no other side effect and the runtime may not roll back changes that are already made to state.

For fire-and-forget queries the API indicates any exception directly back to the caller without the exception handlers being invoked, as exception handlers apply to statements only. The same applies to any API calls other than sendEvent and the EventSender methods.

As the configuration section describes, your application registers one or more classes that implement the ExceptionHandlerFactory interface in the runtime configuration. Upon runtime initialization the runtime obtains a factory instance from the class name that then provides the exception handler instance. The exception handler class must implement the ExceptionHandler interface.

You may register one or more condition handlers for the runtime to invoke in the case it encounters certain conditions, as outlined below, when executing a statement. By default and without condition handlers the runtime logs the condition at informational level and continues processing. The configuration is described in Section 17.6.12, “Runtime Settings Related to Condition Handling”.

If your application registers condition handlers as part of runtime configuration, the runtime invokes the condition handlers in the order they are registered passing relevant condition information such as statement name, expression and the condition information itself.

Currently the only conditions indicated by this facility are raised by the pattern followed-by operator, see Section 7.5.8.1, “Limiting Sub-Expression Count” and see Section 7.5.8.2, “Limiting Runtime-Wide Sub-Expression Count”.

A condition handler may not itself throw a runtime exception or return any value.

As the configuration section describes, your application registers one or more classes that implement the ConditionHandlerFactory interface in the runtime configuration. Upon runtime initialization the runtime obtains a factory instance from the class name that then provides the condition handler instance. The condition handler class must implement the ConditionHandler interface.

The runtime can report key processing metrics through the JMX platform mbean server by setting a single configuration flag described in Section 17.6.7, “Runtime Settings Related to JMX Metrics”. For additional detailed reporting and metrics events, please read on.

Metrics reporting is a feature that allows an application to receive ongoing reports about key runtime-level and statement-level metrics. Examples are the number of incoming events, the CPU time and wall time taken by statement executions or the number of output events per statement.

Metrics reporting is, by default, disabled. To enable reporting, please follow the steps as outlined in Section 17.6.8, “Runtime Settings Related to Metrics Reporting”. Metrics reporting must be enabled at runtime initialization time. Reporting intervals can be controlled at runtime via the EPMetricsService interface available from the runtime API.

Your application can receive metrics at configurable intervals via statement. A metric datapoint is simply a well-defined event. The events are RuntimeMetric and StatementMetric and the Java class representing the events can be found in the client API in package com.espertech.esper.common.client.metric.

Since metric events are processed by the runtime the same as application events, your EPL may use any construct on such events. For example, your application may select, filter, aggregate properties, sort or insert into a stream, named window or table all metric events the same as application events.

This example statement selects all runtime metric events:

select * from com.espertech.esper.common.client.metric.RuntimeMetric

The next statement selects all statement metric events:

select * from com.espertech.esper.common.client.metric.StatementMetric

The next statement selects all metric events:

select * from com.espertech.esper.common.client.metric.MetricEvent

Make sure to have metrics reporting enabled since only then do listeners or subscribers to a statement such as above receive metric events.

The runtime provides metric events after the configured interval of time has passed. By default, only started statements that have activity within an interval (in the form of event or timer processing) are reported upon.

The default configuration performs the publishing of metric events in an Esper daemon thread under the control of the runtime instance. Metrics reporting honors externally-supplied time, if using external timer events.

Via runtime configuration options provided by EPMetricsService, your application may enable and disable metrics reporting globally, provided that metrics reporting was enabled at initialization time. Your application may also enable and disable metrics reporting for individual statements by statement name.

Statement groups is a configuration feature that allows to assigning reporting intervals to statements. Statement groups are described further in the Section 17.6.8, “Runtime Settings Related to Metrics Reporting” section. Statement groups cannot be added or removed at runtime.

The following limitations apply:

  • If your Java VM version does not report current thread CPU time (most JVM do), then CPU time is reported as zero (use ManagementFactory.getThreadMXBean().isCurrentThreadCpuTimeSupported() to determine if your JVM supports this feature).

    Note: In some JVM the accuracy of CPU time returned is very low (in the order of 10 milliseconds off) which can impact the usefulness of CPU metrics returned. Consider measuring CPU time in your application thread after sending a number of events in the same thread, external to the runtime as an alternative.

  • Your Java VM may not provide high resolution time via System.nanoTime. In such case wall time may be inaccurate and inprecise.

  • CPU time and wall time have nanosecond precision but not necessarily nanosecond accuracy, please check with your Java VM provider.

  • There is a performance cost to collecting and reporting metrics.

  • Not all statements may report metrics: The runtime performs certain runtime optimizations sharing resources between similar statements, thereby not reporting on certain statements.

Enterprise Edition has a library for measuring and reporting memory use for a runtime.

The runtime can report key processing metrics through the JMX platform mbean server by setting a single configuration flag described in Section 17.6.7, “Runtime Settings Related to JMX Metrics”.

Runtime and statement-level metrics reporting is described in Section 16.12, “Runtime and Statement Metrics Reporting”.

The easiest way to see thread contentions is by using VisualVM when Esper is under load and looking at the Threads tab. In the worst case you will see a lot of red color in VisualVM. The red line in VisualVM shows the threads that are either in a monitor region or waiting in an entry set for the monitor. The monitor is the mechanism that Java uses to support synchronization. When a statement is stateful the runtime manages the state using a monitor (lock) per context partition.

A JVM profiler can be handy to see how much CPU is spent in Esper by the sendEvent method.

The jconsole can provide information on the JVM heap. If memory gets tights the performance can drop significantly.

The EPRenderEventService interface offers methods to render events as XML or JSON. Obtain the service from a runtime by calling getRenderEventService on EPRuntime.

Your application may use the built-in XML and JSON formatters to render output events into a readable textual format, such as for integration or debugging purposes. This section introduces the utility classes in the client util package for rendering events to strings. Further API information can be found in the JavaDocs.

For repeated rendering of events of the same event type or subtypes, it is recommended to obtain a JSONEventRenderer or XMLEventRenderer instance and use the render method provided by the interface. This allows the renderer implementations to cache event type metadata for fast rendering.

This example shows how to obtain a renderer for repeated rendering of events of the same type, assuming that statement is an instance of EPStatement:

JSONEventRenderer jsonRenderer = runtime.getRenderEventService().getJSONRenderer(statement.getEventType());

Assuming that event is an instance of EventBean, this code snippet renders an event into the JSON format:

String jsonEventText = jsonRenderer.render("MyEvent", event);

The XML renderer works the same:

XMLEventRenderer xmlRenderer = runtime.getRenderEventService().getXMLRenderer(statement.getEventType());

...and...

String xmlEventText = xmlRenderer.render("MyEvent", event);

If the event type is not known in advance or if your application does not want to obtain a renderer instance per event type for fast rendering, your application can use one of the following methods to render an event to a XML or JSON textual format:

String json = runtime.getRenderEventService().renderJSON(event);
String xml = runtime.getRenderEventService().renderXML(event);

Use the JSONRenderingOptions or XMLRenderingOptions classes to control how events are rendered. To render specific event properties using a custom event property renderer, specify an EventPropertyRenderer as part of the options that renders event property values to strings. Please see the JavaDoc documentation for more information.

A plug-in loader is for general use with input adapters, output adapters or EPL code deployment or any other task that can benefits from being part of an Esper configuration file and that follows runtime lifecycle.

A plug-in loader implements the com.espertech.esper.runtime.client.plugin.PluginLoader interface and can be listed in the configuration.

Each configured plug-in loader follows the runtime instance lifecycle: When an runtime instance initializes, it instantiates each PluginLoader implementation class listed in the configuration. The runtime then invokes the lifecycle methods of the PluginLoader implementation class before and after the runtime is fully initialized and before an runtime instance is destroyed.

Declare a plug-in loader in your configuration XML as follows:

...
  <plugin-loader name="MyLoader" class-name="org.mypackage.MyLoader">
    <init-arg name="property1" value="val1"/>
  </plugin-loader>
...

Alternatively, add the plug-in loader via the configuration API:

Configuration config = new Configuration();
Properties props = new Properties();
props.put("property1", "value1");
config.getRuntime().addPluginLoader("MyLoader", "org.mypackage.MyLoader", props);

Implement the init method of your PluginLoader implementation to receive initialization parameters. The runtime invokes this method before the runtime is fully initialized, therefore your implementation should not yet rely on the runtime instance within the method body:

public class MyPluginLoader implements PluginLoader {
  public void init(String loaderName, Properties properties, EPRuntime runtime) {
     // save the configuration for later, perform checking
  }
  ...

The runtime calls the postInitialize method once the runtime completed initialization and to indicate the runtime is ready for traffic.

public void postInitialize() {
  // Start the actual interaction with external feeds or the runtime here
}
...

The runtime calls the destroy method once the runtime is destroyed or initialized for a second time.

public void destroy() {
  // Destroy resources allocated as the runtime instance is being destroyed
}

To access the plug-in at runtime, the getContext method provides access under the name plugin-loader/name:

runtime.getContext().getEnvironment().get("plugin-loader/MyLoader");

This chapter discusses how to select context partitions. Contexts are discussed in Chapter 4, Context and Context Partitions and the reasons for context partition selection are introduced in Section 4.10, “Operations on Specific Context Partitions”.

The section is only relevant when you declare a context. It applies to all different types of hash, partitioned, category, overlapping or other temporal contexts. The section uses a category context for the purpose of illustration. The API discussed herein is general and handles all different types of contexts including nested contexts.

Consider a category context that separates bank transactions into small, medium and large:

// declare category context
create context TxnCategoryContext 
  group by amount < 100 as small, 
  group by amount between 100 and 1000 as medium, 
  group by amount > 1000 as large from BankTxn
// retain 1 minute of events of each category separately
context TxnCategoryContext select * from BankTxn#time(1 minute)

In order for your application to iterate one or more specific categories it is necessary to identify which category, i.e. which context partition, to iterate. Similarly for fire-and-forget queries, to execute fire-and-forget queries against one or more specific categories, it is necessary to identify which context partition to execute the fire-and-forget query against.

Your application may iterate one or more specific context partitions using either the iterate or safeIterate method of EPStatement by providing an implementation of the ContextPartitionSelector interface.

For example, assume your application must obtain all bank transactions for small amounts. It may use the API to identify the category and iterate the associated context partition:

ContextPartitionSelectorCategory categorySmall = new ContextPartitionSelectorCategory() {
    public Set<String> getLabels() {
      return Collections.singleton("small");
    }
  };
Iterator<EventBean> it = stmt.iterator(categorySmall);

Your application may execute fire-and-forget queries against one or more specific context partitions by using the executeQuery method on EPRuntime or the execute method on EPFireAndForgetPreparedQuery and by providing an implementation of ContextPartitionSelector.

Fire-and-forget queries execute against named windows and tables, therefore below statement creates a named window which the runtime manages separately for small, medium and large transactions according to the context declared earlier:

// Named window per category
context TxnCategoryContext create window BankTxnWindow#time(1 min) as BankTxn

The following code demonstrates how to fire a fire-and-forget query against the small and the medium category:

ContextPartitionSelectorCategory categorySmallMed = new ContextPartitionSelectorCategory() {
    public Set<String> getLabels() {
      return new HashSet<String>(Arrays.asList("small", "medium"));
    }
  };
runtime.getFireAndForgetService().executeQuery(
   "select count(*) from BankTxnWindow", 
   new ContextPartitionSelector[] {categorySmallMed});

The following limitations apply:

  • Fire-and-forget queries may not join named windows or tables that declare a context.

This section summarizes the selector interfaces that are available for use to identify and interrogate context partitions. Please refer to the JavaDoc documentation for package com.espertech.esper.common.client.context and classes therein for additional information.

Use an implementation of ContextPartitionSelectorAll or the ContextPartitionSelectorAll.INSTANCE object to instruct the runtime to consider all context partitions.

Use an implementation of ContextPartitionSelectorById if your application knows the context partition ids to query. This selector instructs the runtime to consider only those provided context partitions based on their integer id value. The runtime outputs the context partition id in the built-in property context.id.

Use an implementation of ContextPartitionSelectorFiltered to receive and interrogate context partitions. Use the filter method that receives a ContextPartitionIdentifier to return a boolean indicator whether to include the context partition or not. The ContextPartitionIdentifier provides information about each context partition. Your application may not retain ContextPartitionIdentifier instances between filter method invocations as the runtime reuses the same instance. This selector is not supported with nested contexts.

Use an implementation of ContextPartitionSelectorCategory with category contexts.

Use an implementation of ContextPartitionSelectorSegmented with keyed segmented contexts.

Use an implementation of ContextPartitionSelectorHash with hash segmented contexts.

Use an implementation of ContextPartitionSelectorNested in combination with the selectors described above with nested contexts.

This chapter briefly discusses the API to manage context partitions. Contexts are discussed in Chapter 4, Context and Context Partitions.

The section is only relevant when you declare a context. It applies to all different types of hash, partitioned, category, overlapping or other temporal contexts.

The EPContextPartitionService interface offers methods to manage context partitions. Obtain the service from a runtime by calling getContextPartitionService on EPRuntime.

The context partition admin API allows an application to:

  • Interrogate the state and identifiers for existing context partitions.

  • Determine statements associated to a context and context nesting level.

  • Receive a callback when new contexts get created and destroyed or when context partitions are allocated and de-allocated.

  • Obtain context properties.

Please see the JavaDoc documentation for more information.

Esper offers a listener and an assertions class to facilitate automated testing of EPL rules, for example when using a test framework such as JUnit or TestNG.

Esper does not require any specific test framework. If your application has the JUnit test framework in classpath Esper uses junit.framework.AssertionFailedError to indicate assertion errors, so as to integrate with continuous integration tools.

For detailed method-level information, please consult the JavaDoc of the package com.espertech.esper.common.client.scopetest and com.espertech.esper.runtime.client.scopetest.

The class com.espertech.esper.common.client.scopetest.EPAssertionUtil provides methods to assert or compare event property values as well as perform various array arthithmatic, sort events and convert events or iterators to arrays.

The class com.espertech.esper.runtime.client.scopetest.SupportUpdateListener provides an UpdateListener implementation that collects events and returns event data for assertion.

The class com.espertech.esper.runtime.client.scopetest.SupportSubscriber provides a subscriber implementation that collects events and returns event data for assertion. The SupportSubscriberMRD is a subscriber that accepts events multi-row delivery. The SupportSubscriber and SupportSubscriberMRD work similar to SupportUpdateListener that is introduced in more detail below.

The configuration object (Configuration), in respect to classes, holds the fully-qualified class name and does not generally hold Class references. This is by design since the configuration object can be populated from XML.

When deploying compiled modules the runtime may use a class loader to find resources. Your application has full control over class-for-name and classloader use. OSGi environments can provide a specific class-for-name and class loader. Please refer to Section 17.7, “Passing Services or Transient Objects”.

A compiler and runtime can well be deployed as part of a J2EE web or enterprise application archive to a web application server. When designing for deployment into a J2EE web application server, please consider the items discussed here.

We provide a sample servlet context listener in this section that uses the deployment API to deploy and undeploy modules as part of the servlet lifecycle.

The distribution provides a message-driven bean (MDB) example that you may find useful.

Esper does not have a dependency on any J2EE or Servlet APIs to allow the runtime to run in any environment or container.

As multiple web applications deployed to a J2EE web application server typically have a separate classloader per application, you should consider whether runtime instances need to be shared between applications or can remain separate runtime instances. Consider the EPRuntimeProvider a Singleton. When deploying multiple web applications, your J2EE container classloader may provide a separate instance of the Singleton EPRuntimeProvider to each web application resulting in multiple independent runtime instances.

To share EPRuntime instances between web applications, one approach is to add the runtime jar files to the system classpath. A second approach can be to have multiple web applications share the same servet context and have your application place the EPRuntime instance into a servlet context attribute for sharing. Architecturally you may also consider a single archived application (such as an message-driven bean) that all your web applications communicate to via the JMS broker provided by your application server or an external JMS broker.

As per J2EE standards there are restrictions in regards to starting new threads in J2EE application code. Esper adheres to these restrictions: It allows to be driven entirely by external events. To remove all Esper threads, set the internal timer off and leave the advanced threading options turned off. To provide timer events when the internal timer is turned off, you should check with your J2EE application container for support of the Java system timer or for support of batch or work loading to send timer events to an runtime instance.

As per J2EE standards there are restrictions in regards to input and output by J2EE application code. Esper adheres to these restrictions: By itself it does not start socket listeners or performs any file IO.

When deploying a J2EE archive that contains EPL modules files below is sample code to read and deploy EPL modules files packaged with the enterprise or web application archive when the servlet initializes. The sample undeploys EPL modules when the servlet context gets destroyed.

A sample web.xml configuration extract is:

<?xml version="1.0" encoding="UTF-8"?>
<web-app>
  <listener>
    <listener-class>SampleServletListener</listener-class>
  </listener>
  <context-param>
    <param-name>eplmodules</param-name>
    <param-value>switchmonitor.epl</param-value>
</context-param>
</web-app>

A sample servet listener that deploys EPL module files packaged into the archive on context initialization and that undeploys when the application server destroys the context is shown here:

public class SampleServletListener implements ServletContextListener {

  private List<String> deploymentIds = new ArrayList<String>();
  
  public void contextInitialized(ServletContextEvent servletContextEvent) {
    try {
      String modulesList = servletContextEvent.getServletContext().getInitParameter("eplmodules");
      List<Module> modules = new ArrayList<Module>();
      if (modulesList != null) {
        String[] split = modulesList.split(",");
        for (int i = 0; i < split.length; i++) {
          String resourceName = split[i].trim();
          if (resourceName.length() == 0) {
            continue;
          }
          String realPath = servletContextEvent.getServletContext().getRealPath(resourceName);
  		Module module = EPCompilerProvider.getCompiler().readModule(new File(realPath));
          modules.add(module);
        }
      }
    
      // Determine deployment order
      ModuleOrder order = ModuleOrderUtil.getModuleOrder(modules, null);
  
      // Deploy
      for (Module module : order.getOrdered()) {
       // compile and deploy here (code not included), add deployment id
        deploymentIds.add(deployment.getDeploymentId());
      }
    }
    catch (Exception ex) {
      ex.printStackTrace();
    }
  }
  
  public void contextDestroyed(ServletContextEvent servletContextEvent) {
    EPRuntime runtime = EPRuntimeProvider.getDefaultRuntime();
    for (String deploymentId : deploymentIds) {
       runtime.getDeploymentService().undeploy(deploymentId);
    }
  }
}

An Esper stage is its own little container that can host deployments.

A stage processes only those events that an application sends into that stage. A stage does not see any events that an application sends into the runtime.

A stage has its own stage time. Time in a stage advances only when an application advances time for that stage. A stage advance times independently of runtime time.

Each stage is uniquely identified by a stage URI. Within different runtimes there can be stages of the same stage URI. A stage lives within the runtime instance and gets destroyed when the runtime gets destroyed. An application can allocate any number of stages. A stage URI can be any non-null string value.

These attributes make stages useful for:

In other words, a stage allows an application to control event visibility and the concept of time as desired on a deployment level: Events sent into a stage are visible only to those deployments that are staged and are not visible to deployments outside of that stage. Within a stage an application can control time independently, start time at a point in time and advance time at the resolution and pace suitable for the deployments added to that stage.

By staging deployments, the deployments' time and event processing occurs only when the application explicitly sends events to the stage or advances time for that stage.

The example code below compiles and deploys EPL that reports order events:

String module =
    "@public @buseventtype create schema OrderEvent(price double);\n" +
    "@name('All-Order-Events') select * from OrderEvent;\n";
  EPCompiled compiled = EPCompilerProvider.getCompiler().compile(module, null);
  EPRuntime runtime = EPRuntimeProvider.getDefaultRuntime();
  EPDeployment deployment = runtime.getDeploymentService().deploy(compiled);
  deployment.getStatements()[0].addListener(new SupportUpdateListener());

The stage method stages the deployment:

EPStage stage = runtime.getStageService().getStage("myStage");
stage.stage(Collections.singletonList(deployment.getDeploymentId()));

The staged deployment only receives OrderEvent events that an application send into the stage. It does not receive OrderEvent events that an application sends into the runtime (the EPEventService returned by getEventService of EPRuntime). For example:

// The listener receives the following event
stage.getEventService().sendEventMap(Collections.singletonMap("price", 100d), "OrderEvent");

// The listener DOES NOT receive the following event
runtime.getEventService().sendEventMap(Collections.singletonMap("price", 200d), "OrderEvent");

The unstage method un-stages the deployment:

stage.unstage(Collections.singletonList(deployment.getDeploymentId()));

The un-staged deployment only receives OrderEvent events that an application sends into the runtime. It does not receive OrderEvent events that an application sends into the stage. For example:

// The listener DOES NOT receive the following event
stage.getEventService().sendEventMap(Collections.singletonMap("price", 100d), "OrderEvent");

// The listener receives the following event
runtime.getEventService().sendEventMap(Collections.singletonMap("price", 200d), "OrderEvent");

Finally, destroy the stage when it is no longer needed:

stage.destroy();