www.espertech.comDocumentation

Chapter 3. Processing Model

3.1. Introduction
3.2. Insert Stream
3.3. Insert and Remove Stream
3.4. Filters and Where-clauses
3.5. Time Windows
3.5.1. Time Window
3.5.2. Time Batch
3.6. Batch Windows
3.7. Aggregation and Grouping
3.7.1. Insert and Remove Stream
3.7.2. Output for Aggregation and Group-By
3.8. Event Visibility and Current Time
3.9. Indexes
3.9.1. Index Kinds
3.9.2. Filter Indexes
3.9.3. Event Indexes

For NEsper .NET also see Section H.11, “.NET Processing Model Introduction”.

The Esper processing model is continuous: Update listeners and/or subscribers to statements receive updated data as soon as the engine processes events for that statement, according to the statement's choice of event streams, views, filters and output rates.

As outlined in Chapter 16, API Reference the interface for listeners is com.espertech.esper.client.UpdateListener. Implementations must provide a single update method that the engine invokes when results become available:

A second, strongly-typed and native, highly-performant method of result delivery is provided: A subscriber object is a direct binding of query results to a Java object. The object, a POJO, receives statement results via method invocation. The subscriber class need not implement an interface or extend a superclass. Please see Section 16.3.3, “Setting a Subscriber Object”.

The engine provides statement results to update listeners by placing results in com.espertech.esper.client.EventBean instances. A typical listener implementation queries the EventBean instances via getter methods to obtain the statement-generated results.

The get method on the EventBean interface can be used to retrieve result columns by name. The property name supplied to the get method can also be used to query nested, indexed or array properties of object graphs as discussed in more detail in Chapter 2, Event Representations and Section 16.6, “Event and Event Type”

The getUnderlying method on the EventBean interface allows update listeners to obtain the underlying event object. For wildcard selects, the underlying event is the event object that was sent into the engine via the sendEvent method. For joins and select clauses with expressions, the underlying object implements java.util.Map.

Tip

The engine calls application-provided update listeners and subscribers for output. These commonly encapsulate the actions to take when there is output. This design decouples EPL statements from actions and places actions outside of EPL. It allows actions to change independently from statements: A statement does not need to be updated when its associated action(s) change.

While action-taking, in respect to the code or script taking the action, is not a part of the EPL language, here are a few noteworthy points. Through the use of EPL annotations one can attach information to EPL that can be used by applications to flexibly determine actions. The convenient StatementAwareUpdateListener interface is a listener that receives the statement itself and subscribers can accept EPStatement as a parameter. The insert into-clause can be used to send results into a further stream and input and output adapters or data flows can exist to process output events from that stream. Also the data flow EPStatementSource operator can be used to hook up actions declaratively. The EPStatementStateListener can inform your application of new statements coming online.

In this section we look at the output of a very simple EPL statement. The statement selects an event stream without using a data window and without applying any filtering, as follows:

select * from Withdrawal

This statement selects all Withdrawal events. Every time the engine processes an event of type Withdrawal or any sub-type of Withdrawal, it invokes all update listeners, handing the new event to each of the statement's listeners.

The term insert stream denotes the new events arriving, and entering a data window or aggregation. The insert stream in this example is the stream of arriving Withdrawal events, and is posted to listeners as new events.

The diagram below shows a series of Withdrawal events 1 to 6 arriving over time. The number in parenthesis is the withdrawal amount, an event property that is used in the examples that discuss filtering.


The example statement above results in only new events and no old events posted by the engine to the statement's listeners.

A length window instructs the engine to only keep the last N events for a stream. The next statement applies a length window onto the Withdrawal event stream. The statement serves to illustrate the concept of data window and events entering and leaving a data window:

select * from Withdrawal#length(5)

The size of this statement's length window is five events. The engine enters all arriving Withdrawal events into the length window. When the length window is full, the oldest Withdrawal event is pushed out the window. The engine indicates to listeners all events entering the window as new events, and all events leaving the window as old events.

While the term insert stream denotes new events arriving, the term remove stream denotes events leaving a data window, or changing aggregation values. In this example, the remove stream is the stream of Withdrawal events that leave the length window, and such events are posted to listeners as old events.

The next diagram illustrates how the length window contents change as events arrive and shows the events posted to an update listener.


As before, all arriving events are posted as new events to listeners. In addition, when event W1 leaves the length window on arrival of event W6, it is posted as an old event to listeners.

Similar to a length window, a time window also keeps the most recent events up to a given time period. A time window of 5 seconds, for example, keeps the last 5 seconds of events. As seconds pass, the time window actively pushes the oldest events out of the window resulting in one or more old events posted to update listeners.

Filters to event streams allow filtering events out of a given stream before events enter a data window (if there are data windows defined in your query). The statement below shows a filter that selects Withdrawal events with an amount value of 200 or more.

select * from Withdrawal(amount>=200)#length(5)

With the filter, any Withdrawal events that have an amount of less then 200 do not enter the length window and are therefore not passed to update listeners. Filters are discussed in more detail in Section 5.4.1, “Filter-based Event Streams” and Section 7.4, “Filter Expressions In Patterns”.


The where-clause and having-clause in statements eliminate potential result rows at a later stage in processing, after events have been processed into a statement's data window or other views.

The next statement applies a where-clause to Withdrawal events. Where-clauses are discussed in more detail in Section 5.5, “Specifying Search Conditions: the Where Clause”.

select * from Withdrawal#length(5) where amount >= 200

The where-clause applies to both new events and old events. As the diagram below shows, arriving events enter the window however only events that pass the where-clause are handed to update listeners. Also, as events leave the data window, only those events that pass the conditions in the where-clause are posted to listeners as old events.


In this section we explain the output model of statements employing a time window view and a time batch view.

The built-in data windows that act on batches of events are the win:time_batch and the win:length_batch views, among others. The win:time_batch data window collects events arriving during a given time interval and posts collected events as a batch to listeners at the end of the time interval. The win:length_batch data window collects a given number of events and posts collected events as a batch to listeners when the given number of events has collected.

For more detailed information on batch windows please see Section 14.2, “A Note on Batch Windows”.

Related to batch data windows is output rate limiting. While batch data windows retain events the output clause offered by output rate limiting can control or stabilize the rate at which events are output, see Section 5.7, “Stabilizing and Controlling Output: the Output Clause”.

Let's look at how a time batch window may be used:

select account, amount from Withdrawal#time_batch(1 sec)

The above statement collects events arriving during a one-second interval, at the end of which the engine posts the collected events as new events (insert stream) to each listener. The engine posts the events collected during the prior batch as old events (remove stream). The engine starts posting events to listeners one second after it receives the first event and thereon.

For statements containing aggregation functions and/or a group by clause, the engine posts consolidated aggregation results for an event batch. For example, consider the following statement:

select sum(amount) as mysum from Withdrawal#time_batch(1 sec)

Note that output rate limiting also generates batches of events following the output model as discussed here.

Following SQL (Standard Query Language) standards for queries against relational databases, the presence or absence of aggregation functions and the presence or absence of the group by clause and group_by named parameters for aggregation functions dictates the number of rows posted by the engine to listeners. The next sections outline the output model for batched events under aggregation and grouping. The examples also apply to data windows that don't batch events and post results continously as events arrive or leave data windows. The examples also apply to patterns providing events when a complete pattern matches.

In summary, as in SQL, if your query only selects aggregation values, the engine provides one row of aggregated values. It provides that row every time the aggregation is updated (insert stream), which is when events arrive or a batch of events gets processed, and when the events leave a data window or a new batch of events arrives. The remove stream then consists of prior aggregation values.

Also as in SQL, if your query selects non-aggregated values along with aggregation values in the select clause, the engine provides a row per event. The insert stream then consists of the aggregation values at the time the event arrives, while the remove stream is the aggregation value at the time the event leaves a data window, if any is defined in your query.

EPL allows each aggregation function to specify its own grouping criteria. Please find further information in Section 5.6.4, “Specifying grouping for each aggregation function”.

The documentation provides output examples for query types in Appendix A, Output Reference and Samples, and the next sections outlines each query type.

An event sent by your application or generated by statements is visible to all other statements in the same engine instance. Similarly, current time (the time horizon) moves forward for all statements in the same engine instance. Please see the Chapter 16, API Reference chapter for how to send events and how time moves forward through system time or via simulated time, and the possible threading models.

Within an Esper engine instance you can additionally control event visibility and current time on a statement level, under the term isolated service as described in Section 16.9, “Service Isolation”.

An isolated service provides a dedicated execution environment for one or more statements. Events sent to an isolated service are visible only within that isolated service. In the isolated service you can move time forward at the pace and resolution desired without impacting other statements that reside in the engine runtime or other isolated services. You can move statements between the engine and an isolated service.

Filter indexes organize filters so that they can be searched efficiently. Filter indexes link back to the statement that the filter(s) come from.

We use the term filter or filter criteria to mean the selection predicate, such as symbol=“google” and price > 200 and volume > 111000. Statements provide filter criteria in the from-clause, and/or in EPL patterns and/or in context declarations. Please see Section 5.4.1, “Filter-based Event Streams”, Section 7.4, “Filter Expressions In Patterns” and Section 4.2.7.1, “Filter Context Condition”.

When the engine receives an event, it consults the filter indexes to determine which statements, if any, must process the event.

The purpose of filter indexes is to enable:

  • Efficient matching of events to only those statements that need them.

  • Efficient discarding of events that are not needed by any statement.

  • Efficient evaluation with best case approximately O(1) to O(log n) i.e. in the best case executes in approximately the same time regardless of the size of the input data set which is the number of active filters.

Filter index building is a result of the engine analyzing the filter criteria in the from-clause and also in EPL patterns. It is done automatically by the engine.

Esper builds and maintains separate sets of filter indexes per event type, when such event type occurs in the from-clause or pattern. Filter indexes are sharable within the same event type filter. Thus various from-clauses and patterns that refer for the same event type can contribute to the same set of filter indexes.

Esper builds filter indexes in a nested fashion: Filter indexes may contain further filter indexes, forming a tree-like structure, a filter index tree. The nesting of indexes is beyond the introductory discussion provided here.

As part of a pattern you may specify event types and filter criteria. The engine analyzes patterns and determines filter criteria for filter index building.

Consider the following example pattern that fires for each WithdrawalEvent that is followed by another WithdrawalEvent for the same accountId value:

@name('P') select * from pattern [every w1=WithdrawalEvent -> w2=WithdrawalEvent(accountId = w.accountId)]

Upon creating the above statement, the engine starts looking for WithdrawalEvent events. At this time there is only one active filter:

Assume a WithdrawalEvent Wa for account 1 arrives. The engine then activates a filter looking for another WithdrawalEvent for account 1. At this time there are 2 active filters:

Assume another WithdrawalEvent Wb for account 1 arrives. The engine then activates a filter looking for another WithdrawalEvent for account 1. At this time there are 3 active filters:

Assume another WithdrawalEvent Wc for account 2 arrives. The engine then activates a filter looking for another WithdrawalEvent for account 2. At this time there are 4 active filters:

The below table is a sample filter index for the pattern after the Wa, Wband Wc events arrived:


When a Withdrawal event arrives, the engine extracts the accountId and performs a lookup into above table. If a matching row is found, the engine can hand off the event to the relevant pattern subexpressions.

This example is similar to the previous example of multiple statements, but instead it declares a context and associates a single statement to the context.

For example, assume the LoginEvent has an accountId field. One could declare a context initiated by a LoginEvent for a user:

@name('A') create context UserSession initiated by LoginEvent as loginEvent

By associating the statement to the context we can tell the engine to analze per LoginEvent, for example:

@name('B') context UserSession select count(*) from WithdrawalEvent(accountId = context.loginEvent.accountId)

Upon creating the above two statements, the engine starts looking for LoginEvent events. At this time there is only one active filter:

Assume a LoginEvent La for account 1 arrives. The engine then activates a context partition of statement B and therefore the filter looking for WithdrawalEvent for account 1. At this time there are 2 active filters:

Assume a LoginEvent Lb for account 1 arrives. The engine then activates a context partition of statement B and therefore the filter looking for WithdrawalEvent for account 1. At this time there are 3 active filters:

Assume a LoginEvent Lc for account 2 arrives. The engine then activates a context partition of statement B and therefore the filter looking for WithdrawalEvent for account 2. At this time there are 4 active filters:

The below table is a sample filter index for the three statement context partitions:


When a Withdrawal event arrives, the engine extracts the accountId and performs a lookup into above table. It can then hand of the event directly to the relevant statement context partitions, or ignore the event if no rows are found for a given account id.