www.espertech.comDocumentation
For NEsper .NET also see Section J.11, “.NET Basic Concepts”.
Statements are continuous queries that analyze events and time and that detect situations.
You interact with Esper by compiling and deploying modules that contain statements, by sending events and advancing time and by receiving output by means of callbacks or by polling for current results.
Table 2.1. Interacting With Esper
What | How |
---|---|
EPL | First, compile and deploy statements, please refer to Chapter 5, EPL Reference: Clauses,Chapter 15, Compiler Reference and Chapter 16, Runtime Reference. |
Callbacks | Second, attach executable code that your application provides to receive output, please refer to Table 16.2, “Choices For Receiving Statement Results”. |
Events | Next, send events using the runtime API, please refer to Section 16.6, “Processing Events and Time Using EPEventService”. |
Time | Next, advance time using the runtime API or system time, please refer to Section 16.9, “Controlling Time-Keeping”. |
The runtime contains statements like so:
Statements can be partitioned. A partitioned statement can have multiple partitions. For example, there could be partition for each room in a building. For a building with 10 rooms you could have one statement that has 10 partitions. Please refer to Chapter 4, Context and Context Partitions.
A statement that is not partitioned implicitly has one partition. Upon deploying the un-partitioned statement the runtime allocates the single partition. Upon undeploying the un-partitioned statement the runtime destroys the partition.
A partition (or context partition) is where the runtime keeps the state. In the picture above there are three un-partitioned statement and one partitioned statement that has three partitions.
The next sections discuss various easily-understood statements.
The sections illustrate how statements behave, the information that the runtime passes to callbacks (the output) and what information the runtime remembers for statements (the state, all state lives in a partition).
The sample statements assume an event type by name Withdrawal
that has account
and amount
properties.
This statement selects all Withdrawal
events.
select * from Withdrawal
For this statement, the runtime remembers no information and does not remember any events. A statement where the runtime does not need to remember any information at all is a statement without state (a stateless statement).
The term insert stream is a name for the stream of new events that are arriving. The insert stream in this example is the stream of arriving Withdrawal events.
An aggregation function is a function that groups multiple events together to form a single value. Please find more information at Section 10.2, “Aggregation Functions”.
This statement selects a count and a total amount of all Withdrawal events.
select count(*), sum(amount) from Withdrawal
Upon a new Withdrawal
event arriving, the runtime increments the count and adds the amount to a running total. It passes the new count and total to callbacks.
After that the runtime effectively forgets the current event and does not remember any events at all, but does remember the current count and total.
Here, the runtime only remembers the current number of events and the total amount.
The count is a single long-type value and the total is a single double-type value (assuming amount
is a double-value, the total can be BigDecimal
as applicable).
This statement is not stateless and the state consists of a long-typed value and a double-typed value.
Upon a new Withdrawal
event arriving, the runtime increases the count by one and adds the amount to the running total. the runtime does not re-compute the count and total because it does not remember events.
In general, the runtime does not re-compute aggregations (unless otherwise indicated). Instead, the runtime adds (increments, enters, accumulates) data to aggregation state and subtracts (decrements, removes, reduces, decreases) from aggregation state.
Place filter expressions in parenthesis after the event type name. For further information see Section 5.4.1, “Filter-Based Event Streams”.
This statement selects Withdrawal
events that have an amount of 200 or higher:
select * from Withdrawal(amount >= 200)
Upon a new Withdrawal
event with an amount of 200 or higher arriving, the runtime passes the arriving event to callbacks.
For this statement, the runtime remembers no information and does not remember any events.
You may ask what happens for Withdrawal
events with an amount of less than 200. The answer is that the statement itself does not even see such events. This is because the runtime knows to discard such events right away
and the statement does not even know about such events. The runtime discards unneeded events very fast enabled by statement analysis, planning and suitable data structures.
select count(*), sum(amount) from Withdrawal(amount >= 200)
In this example the runtime only remembers the count and total and again does not remember events. The runtime discards Withdrawal
events with an amount of less than 200.
A data window, or window for short, retains events for the purpose of aggregation, join, match-recognize patterns, subqueries, iterating via API and output-snapshot. A data window defines which subset of events to retain. For example, a length window keeps the last N events and a time window keeps the last N seconds of events. See Chapter 14, EPL Reference: Data Windows for details.
This statement selects all Withdrawal
events and instructs the runtime to remember the last five events.
select * from Withdrawal#length(5)
Upon a new Withdrawal
event arriving, the runtime adds the event to the length window. It also passes the same event to callbacks.
Upon arrival of event W6, event W1 leaves the length window. We use the term expires to say that an event leaves a data window. We use the term remove stream to describe the stream of events leaving a data window.
The runtime remembers up to five events in total (the last five events). At the start of the statement the data window is empty. By itself, keeping the last five events may not sound useful. But in connection with a join, subquery or match-recognize pattern for example a data window tells the runtime which events you want to query.
This statement outputs the count and total of the last five Withdrawal
events.
select count(*), sum(amount) from Withdrawal#length(5)
Before the arrival of event W6 the current count is five and the running total amount is 1000. Upon arrival of event W6 the following takes place:
The runtime determines that event W1 leaves the length window.
To account for the new event W6, the runtime increases the count by one and adds 300 to the running total amount.
To account for the expiring event W1, the runtime decreases the count by one and subtracts 500 from the running total amount.
The output is a count of five and a total of 800 as a result of 1000 + 300 - 500
.
The runtime adds (increments, enters, accumulates) insert stream events into aggregation state and subtracts (decrements, removes, reduces, decreases) remove stream events from aggregation state. It thus maintains aggregation state in an incremental fashion.
For this statement, once the count reaches 5, the count will always remain at 5.
The information that the runtime remembers for this statement is the last five events and the current long-typed count and double-typed total.
Use the irstream
keyword to receive both the current as well as the previous aggregation value for aggregating statements.
select .... from Withdrawal(amount > 200) // equivalent to select .... from Withdrawal where amount > 200
The next statement applies a where-clause to Withdrawal events. Where-clauses are discussed in more detail in Section 5.5, “Specifying Search Conditions: The Where Clause”.
select * from Withdrawal#length(5) where amount >= 200
The where-clause applies to both new events and expiring events. Only events that pass the where-clause are passed to callbacks.
A time window is a data window that extends the specified time interval into the past. More information on time windows can be found at Section 14.3.3, “Time Window (time or win:time)”.
The next statement selects the count and total amount of Withdrawal
events considering the last four seconds of events.
select count(*), sum(amount) as total from Withdrawal#time(4)
The diagram starts at a given time t
and displays the contents of the time window at t + 4
and t + 5 seconds
and so on.
The activity as illustrated by the diagram:
At time t + 4 seconds
an event W1
arrives and the output is a count of one and a total of 500.
At time t + 5 seconds
an event W2
arrives and the output is a count of two and a total of 600.
At time t + 6.5 seconds
an event W3
arrives and the output is a count of three and a total of 800.
At time t + 8 seconds
event W1
expires and the output is a count of two and a total of 300.
For this statement the runtime remembers the last four seconds of Withdrawal
events as well as the long-typed count and the double-typed total amount.
Time can have a millisecond or microsecond resolution.
A partitioned statement is handy for batch processing, sessions, resetting and start/stop of your analysis. For partitioned statements you must specify a context. A context defines how partitions are allocated and destroyed. Additional information about partitioned statements and contexts can be found at Chapter 4, Context and Context Partitions.
We shall have a single partition that starts immediately and ends after four seconds:
create context Batch4Seconds start @now end after 4 sec
The next statement selects the count and total amount of Withdrawal
events that arrived since the last reset (resets are at t
, t+4
, t+8
as so on), resetting each four seconds:
context Batch4Seconds select count(*), total(amount) from Withdrawal
At time t + 4 seconds
and t + 8 seconds
the runtime destroys the current partition. This discards the current count and running total.
The runtime immediately allocates a new partition and the count and total start fresh at zero.
For this statement the runtime only remembers the count and running total, and the fact how long a partition lives.
All the previous statements had continuous output. In other words, in each of previous statements output occurred as a result of a new event arriving. Use output rate limiting to output when a condition occurs, as described in Section 5.7, “Stabilizing and Controlling Output: The Output Clause”.
The next statement outputs the last count and total of all Withdrawal
events every four seconds:
select count(*), total(amount) from Withdrawal output last every 4 seconds
At time t + 4 seconds
and t + 8 seconds
the runtime outputs the last aggregation values to callbacks.
For this statement the runtime only remembers the count and running total, and the fact when output shall occur.
create context Batch4Seconds start @now end after 4 sec
context Batch4Seconds select count(*), total(amount) from Withdrawal output last when terminated
At time t + 4 seconds
and t + 8 seconds
the runtime outputs the last aggregation values to callbacks, and resets the current count and total.
For this statement the runtime only remembers the count and running total, and the fact when the output shall occur and how long a partition lives.
The documentation link for both is Chapter 6, EPL Reference: Named Windows and Tables. Named windows and tables can be queried with fire-and-forget queries through the API and also the inward-facing JDBC driver.
The below drawing explains how named windows work.
The step #1 creates a named window like so:
create window WithdrawalWindow#time(10) as Withdrawal
The name of the named window is WithdrawalWindow
and it will be holding the last 10 seconds of Withdrawal
events (#time(10) as Withdrawal
).
As a result of step #1 the runtime allocates a named window to hold 10 seconds of Withdrawal
events. In the drawing the named window is filled with some events. Normally a named window starts out as an empty window however it looks nicer with some boxes already inside.
The step #2 creates an EPL statement to insert into the named window:
on Withdrawal merge WithdrawalWindow insert select *
This tells the runtime that on arrival of a Withdrawal
event it must merge with the WithdrawalWindow
and insert the event. The runtime now waits for Withdrawal
events to arrive.
The step #3 creates an EPL statement that computes the average withdrawal amount of the subset of events as controlled by the named window:
select avg(amount) as avgAmount from WithdrawalWindow
As a result of step #3 the runtime allocates state to keep a current average. The state consists of a count
field and a sum
field to compute a running average.
It determines that the named window is currently empty and sets the count
to zero and the sum
to null (if the named window was filled already it would determine the count
and sum
by iterating).
Internally, it also registers a consumer callback with the named window to receive inserted and removed events (the insert and remove stream). The callbacks are shown in the drawing as a dotted line.
In step #4 assume a Withdrawal
event arrives that has an account number of 0001
and an amount of 5000
.
The runtime executes the on Withdrawal merge WithdrawalWindow insert select *
and thus adds the event to the time window.
The runtime invokes the insert stream callback for all consumers (dotted line, internally managed callback).
The consumer that computes the average amount receives the callback and the newly-inserted event. It increases the count
field by one and increases the sum
field by 5000
.
The output of the statement is avgAmount
as 5000
.
In step #5, which occurs 10 seconds after step #4, the Withdrawal
event for account 0001
and amount 5000
leaves the time window.
The runtime invokes the remove stream callback for all consumers (dotted line, internally managed callback).
The consumer that computes the average amount receives the callback and the newly-removed event. It decreases the count
field by one and sets the sum
to null
and the count
is zero.
The output of the statement is avgAmount
as null
.
EPL allows each aggregation function to specify its own grouping criteria. Please find further information in Section 5.6.4, “Specifying Grouping for Each Aggregation Function”. The documentation provides output examples for statement types in Appendix A, Output Reference and Samples, and the next sections outlines each statement type.
The examples below assume BankInformationWindow
is a named window defined elsewhere.
The examples use a join to illustrate. Joins are further described in Section 5.12, “Joining Event Streams”.
An example statement for the un-aggregated and un-grouped case is as follows:
select * from Withdrawal unidirectional, BankInformationWindow
The appendix provides a complete example including input and output events over time at Section A.2, “Output for Un-Aggregated and Un-Grouped Statements”.
select sum(amount) from Withdrawal unidirectional, BankInformationWindow
Upon a Withdrawal
event coming in, the number of output rows is always zero or one.
The appendix provides a complete example including input and output events over time at Section A.3, “Output for Fully-Aggregated and Un-Grouped Statements”.
If any aggregation functions specify the group_by
parameter and a dimension, for example sum(amount, group_by:account)
,
the statement executes as an aggregated and grouped statement instead.
select account, sum(amount) from Withdrawal unidirectional, BankInformationWindow
The appendix provides a complete example including input and output events over time at Section A.4, “Output for Aggregated and Un-Grouped Statements”.
select account, sum(amount) from Withdrawal unidirectional, BankInformationWindow group by account
Upon a Withdrawal
event coming in, the number of output rows is one row per unique account number.
The appendix provides a complete example including input and output events over time at Section A.5, “Output for Fully-Aggregated and Grouped Statements”.
If any aggregation functions specify the group_by
parameter and a dimension other than group by
dimension(s),
for example sum(amount, group_by:accountCategory)
, the statement executes as an aggregated and grouped statement instead.
There are five types of operators:
every StockTickEvent(symbol="IBM", price>80) where timer:within(60 seconds)
A sample pattern that alerts every five minutes past the hour:
every timer:at(5, *, *, *, *)
A sample pattern that alerts when event A occurs, followed by either event B or event C:
A -> ( B or C)
A pattern where a property of a following event must match a property from the first event:
every a=EventX -> every b=EventY(objectID=a.objectID)
The runtime builds and maintains indexes for efficiency so as to achieve good performance.
The following table compares the two kinds of indexes:
We use the term filter or filter criteria to mean the selection predicate, such as symbol=“google” and price > 200 and volume > 111000
.
Statements provide filter criteria in the from
-clause, and/or in EPL patterns and/or in context declarations.
Please see Section 5.4.1, “Filter-Based Event Streams”, Section 7.4, “Filter Expressions in Patterns” and Section 4.2.7.1, “Filter Context Condition”.
Filter index building is a result of the compiler analyzing the filter expressions as described in Section 15.18, “Compiler Filter Expression Analysis”. The runtime uses the compiler output to build, maintain and use filter indexes.
Big-O notation scaling information can be found at Section 24.1.1, “Big-O Complexity of Matching Events to Statements and Context Partitions”.
When the runtime receives an event, it consults the filter indexes to determine which statements, if any, must process the event.
The purpose of filter indexes is to enable:
Efficient matching of events to only those statements that need them.
Efficient discarding of events that are not needed by any statement.
Efficient evaluation with best case approximately O(1) to O(log n) i.e. in the best case executes in approximately the same time regardless of the size of the input data set which is the number of active filters.
The runtime builds and maintains separate sets of filter indexes per event type, when such event type occurs in the from
-clause or pattern.
Filter indexes are sharable within the same event type filter. Thus various from
-clauses and patterns that refer for the same event type can contribute to the same set of filter indexes.
The runtime builds filter indexes in a nested fashion: Filter indexes may contain further filter indexes, forming a tree-like structure, a filter index tree. The nesting of indexes is beyond the introductory discussion provided here.
@name('A') select * from WithdrawalEvent(account = 1)
@name('B') select * from WithdrawalEvent(account = 1)
@name('C') select * from WithdrawalEvent(account = 2)
The below table is a sample filter index for the three statements:
When a Withdrawal
event arrives, the runtime extracts the account
and performs a lookup into above table.
If there are no matching rows in the table, for example when the account
is 3, the runtime knows that there is no further processing for the event.
For filter index planning, we use the term lookupable-expression to mean the expression providing the filter index lookup value.
In this example there is only one lookupable-expression and that is account
.
We use the term value-expression to mean the expression providing the indexed value.
Here there are three value-expressions namely 1
(from statement A), 1
(from statement B) and 2
(from statement C).
We use the term filter-index-operator to mean the type of index such as equals(=), relational (<,>,<=, >=) etc..
@name('P') select * from pattern [every w1=WithdrawalEvent -> w2=WithdrawalEvent(account = w.account)]
The below table is a sample filter index for the pattern after the Wa, Wband Wc events arrived:
When a Withdrawal
event arrives, the runtime extracts the account
and performs a lookup into above table.
If a matching row is found, the runtime can hand off the event to the relevant pattern subexpressions.
@name('A') create context UserSession initiated by LoginEvent as loginEvent
@name('B') context UserSession select count(*) from WithdrawalEvent(account = context.loginEvent.account)
The below table is a sample filter index for the three statement context partitions:
When a Withdrawal
event arrives, the runtime extracts the account
and performs a lookup into above table.
It can then hand of the event directly to the relevant statement context partitions, or ignore the event if no rows are found for a given account id.
Big-O notation scaling information can be found at Section 24.1.3, “Big-O Complexity of Joins, Subqueries, On-Select, On-Merge, On-Update, On-Delete”.
As event indexes are similar to database indexes, for this discussion, we use the term column to mean a column in a EPL table or named window and to also mean an event property or field. We use the term row to mean a row in an EPL table or named window and to also mean an event.
When the runtime performs statement processing it may use event indexes to find correlated rows efficiently.
The purpose of event indexes is to enable:
Efficient evaluation of subqueries.
Efficient evaluation of joins.
Efficient evaluation of on-action statements.
Efficient evaluation of fire-and-forget queries.
Event index building is a result of the compiler analyzing the where-
clause correlation criteria for joins (on-
clause for outer joins), subqueries, on-action and fire-and-forget queries.
It is done automatically by the compiler. You may utilize the create index
clause to explicitly index named windows and tables. You may utilize query planner hints to influence index building, use and sharing.