www.espertech.comDocumentation
This section discusses the notion of context and its role in the Esper event processing language (EPL).
When you look up the word context in a dictionary, you may find: Context is the set of circumstances or facts that surround a particular event, situation, etc.
Context-dependent event processing occurs frequently: For example, consider a requirement that monitors banking transactions. For different customers your analysis considers customer-specific aggregations, patterns or data windows. In this example the context of detection is the customer. For a given customer you may want to analyze the banking transactions of that customer by using aggregations, data windows, patterns including other EPL constructs.
In a second example, consider traffic monitoring to detect speed violations. Assume the speed limit must be enforced only between 9 am and 5 pm. The context of detection is of temporal nature.
A context takes a cloud of events and classifies them into one or more sets. These sets are called context partitions. An event processing operation that is associated with a context operates on each of these context partitions independently. (Credit: Taken from the book "Event Processing in Action" by Opher Etzion and Peter Niblett.) A basic partitioned query was reviewed in Section 2.11, “Basic Partitioned Query”.
A context is a declaration of dimension and may thus result in one or more context partitions. In the banking transaction example the context dimension is the customer and a context partition exists per customer. In the traffic monitoring example there is a single context partition that exists only between 9 am and 5 pm and does not exist outside of that daily time period.
In an event processing glossary you may find the term event processing agent. An EPL statement is an event processing agent. An alternative term for context partition is event processing agent instance.
Think of context partitions as instances of a class, wherein the class is the EPL statement.
Esper EPL allows you to declare contexts explicitly, offering the following benefits:
Context can apply to multiple statements thereby eliminating the need to duplicate context dimensional information between statements.
Context partitions can be temporally overlapping.
Context partitions provide a fine-grained lifecycle that is independent of the lifecycle of statement lifecycle, making it easy to specify when an analysis should start and end.
Fine-grained lock granularity: The engine locks on the level of context partitions thereby allowing very high concurrency, with a maximum (theoretical) degree of parallelism at 2^31-1 (2,147,483,647) parallel threads working to process a single EPL statement under a hash segmented context.
EPL can become easier to read as common predicate expressions can be factored out into a context.
You may specify a nested context that is composed from two or more contexts. In particular a temporal context type is frequently used in combination with a segmentation-oriented context.
Using contexts your application can aggregate events over time periods (overlapping or non-overlapping) without retaining any events in memory.
Using contexts your application can coordinate boundaries for multiple statements.
Esper EPL allows you to declare a context explicitly via the create context
syntax introduced below.
After you have declared a context, one or more EPL statements can refer to that context by specifying context
name.
When an EPL statement refers to a context, all EPL-statement related state such as aggregations, patterns or data windows etc. exists once per context partition.
If an EPL statement does not declare a context, it implicitly has a single context partition. The single context partition lives as long as the EPL statement is started and ends when the EPL statement is stopped.
You may have heard of the term session. A context partition is the same as a session.
You may have heard of the term session window to describe the duration between when a session becomes alive to when a session gets destroyed. We use the term context partition lifecycle instead.
The context declaration specifies how the engine manages context partitions (or sessions):
For keyed segmented context there is a context partition (or session) per key or multiple keys see Section 4.2.2, “Keyed Segmented Context”.
For hash segmented context there is a context partition (or session) per hash code of one or more keys see Section 4.2.3, “Hash Segmented Context”.
For overlapping contexts there can be multiple overlapping context partitions (or sessions), see Section 4.2.6, “Overlapping Context”.
For non-overlapping contexts there is only zero or one single context partition (or session), see Section 4.2.5, “Non-Overlapping Context”.
For category segmented context there is a context partition (or session) per predefined category, see Section 4.2.4, “Category Segmented Context”.
For the API to administrate context partitions please see Section 14.19, “Context Partition Administration”. For more information on locking and threading please see Section 14.7, “Engine Threading and Concurrency”. For performance related information please refer to Chapter 23, Performance.
The create context
statement declares a context by specifying a context name and context dimension information.
A context declaration by itself does not consume any resources or perform any logic until your application starts at least one statement that refers to that context. Until then the context is inactive and not in use.
When your application creates or starts the first statement that refers to the context, the engine activates the context.
As soon as your application stops or destroys all statements that refer to the context, the context becomes inactive again.
When your application stops or destroys a statement that refers to a context, the context partitions associated to that statement also end (context partitions associated to other started statements live on).
When your application stops or destroys the statement that declared the context and does not also stop or destroy any statements that refer to the context, the context partitions associated to each such statement do not end.
When your application destroys the statement that declared the context and destroys all statements that refer to that context then the engine removes the context declaration entirely.
The create context
statement posts no output events to listeners or subscribers and does not return any rows when iterated.
Each of the context declarations makes available a set of built-in context properties as well as initiating event or pattern properties, as applicable. You may select these context properties for output or use them in any of the statement expressions.
Refer to built-in context properties as context.
property_name, wherein property_name refers to the name of the built-in context property.
Refer to initiating event or pattern match event properties as context.
stream_name.property_name, wherein stream_name refers to the name assigned to the event or the tag name specified in a pattern and property_name refers to the name of the initiating event or pattern match event property.
This context assigns events to context partitions based on the values of one or more event properties, using the value of these property(s) as a key that picks a unique context partition directly. Each event thus belongs to exactly one context partition or zero context partitions if the event does not match the optional filter predicate expression(s). Each context partition handles exactly one set of key values.
The syntax for creating a keyed segmented context is as follows:
create context context_name partition [by] event_property [and event_property [and ...]] from stream_def [, event_property [...] from stream_def] [, ...]
The context_name you assign to the context can be any identifier.
Following the context name is one or more lists of event properties and a stream definition for each entry, separated by comma (,
).
The event_property is the name(s) of the event properties that provide the value(s) to pick a unique partition. Multiple event property names are separated by the and
keyword.
The stream_def is a stream definition which consists of an event type name optionally followed by parenthesis that contains filter expressions. If providing filter expressions, only events matching the provided filter expressions for that event type are considered by context partitions. The name of a named window or table is not allowed.
You may list multiple event properties for each stream definition. You may list multiple stream definitions. Please refer to usage guidelines below when specifying multiple event properties and/or multiple stream definitions.
The next statement creates a context SegmentedByCustomer
that considers the value of the custId
property of the BankTxn
event type to pick the context partition to assign events to:
create context SegmentedByCustomer partition by custId from BankTxn
The following statement refers to the context created as above to compute a total withdrawal amount per account for each customer:
context SegmentedByCustomer select custId, account, sum(amount) from BankTxn group by account
The following statement refers to the context created as above and detects a withdrawal of more than 400 followed by a second withdrawal of more then 400 that occur within 10 minutes of the first withdrawal, all for the same customer:
context SegmentedByCustomer select * from pattern [ every a=BankTxn(amount > 400) -> b=BankTxn(amount > 400) where timer:within(10 minutes) ]
The EPL statement that refers to a keyed segmented context must have at least one filter expression, at any place within the EPL statement that looks for events of any of the event types listed in the context declaration.
For example, the following is not valid:
// Neither LoginEvent nor LogoutEvent are listed in the context declaration context SegmentedByCustomer select * from pattern [every a=LoginEvent -> b=LogoutEvent where timer:within(10 minutes)]
If the context declaration lists multiple streams, each event type must be unrelated: You may not list the same event type twice and you may not list a sub- or super-type of any event type already listed.
The following is not a valid declaration since the BankTxn
event type is listed twice:
// Not valid create context SegmentedByCustomer partition by custId from BankTxn, account from BankTxn
If the context declaration lists multiple streams, the number of event properties provided for each event type must also be the same. The value type returned by event properties of each event type must match within the respective position it is listed in, i.e. the first property listed for each event type must have the same type, the second property listed for each event type must have the same type, and so on.
The following is not a valid declaration since the customer id of BankTxn
and login time of LoginEvent
is not the same type:
// Invalid: Type mismatch between properties create context SegmentedByCustomer partition by custId from BankTxn, loginTime from LoginEvent
The next statement creates a context SegmentedByCustomer
that also considers LoginEvent
and LogoutEvent
:
create context SegmentedByCustomer partition by custId from BankTxn, loginId from LoginEvent, loginId from LogoutEvent
As you may have noticed, the above example refers to loginId
as the event property name for LoginEvent
and LogoutEvent
events. The assumption is that the loginId
event property of the login and logout events has the same type and carries the same exact value as the custId
of bank transaction events, thereby allowing all events of the three event types to apply to the same customer-specific context partition.
You may add a filter expression to each of the event types listed. The engine applies the filter expression to the EPL statement that refers to the context and to the same event type.
The next statement creates a context SegmentedByCustomer
that does not consider login events that indicate that the login failed.
create context SegmentedByCustomer partition by custId from BankTxn, loginId from LoginEvent(failed=false)
You may assign events to context partitions based on the values of two or more event properties. The engine thus uses the combination of values of these properties to pick a context partition.
An example context declaration follows:
create context ByCustomerAndAccount partition by custId and account from BankTxn
The next statement refers to the context and computes a total withdrawal amount, per account and customer:
context ByCustomerAndAccount select custId, account, sum(amount) from BankTxn
As you can see, the above statement does not need to specify group by
clause to aggregate per customer and account, since events of each unique combination of customer id and account are assigned to separate context partitions.
The following context properties are available in your EPL statement when it refers to a keyed segmented context:
Table 4.1. Keyed Segmented Context Properties
Name | Description |
---|---|
name | The string-type context name. |
id | The integer-type internal context partition id that the engine assigns to the context partition. |
key1 | The event property value for the first key. |
key N | The event property value for the Nth key. |
Assume the keyed segmented context is declared as follows:
create context ByCustomerAndAccount partition by custId and account from BankTxn
You may, for example, select the context properties as follows:
context ByCustomerAndAccount select context.name, context.id, context.key1, context.key2 from BankTxn
This section discusses the impact of contexts on joins to provide further samples of use and deepen the understanding of context partitions.
Consider a context declared as follows:
create context ByCust partition by custId from BankTxn
The following statement matches, within the same customer id, the current event with the last 30 minutes of events to determine those events that match amounts:
context ByCust select * from BankTxn as t1 unidirectional, BankTxn#time(30) t2 where t1.amount = t2.amount
Note that the where
-clause in the join above does not mention customer id. Since each BankTxn
applies to a specific context partition the join evaluates within that single context partition.
Consider the next statement that matches a security event with the last 30 minutes of transaction events for each customer:
context ByCust select * from SecurityEvent as t1 unidirectional, BankTxn#time(30) t2 where t1.customerName = t2.customerName
When a security event comes in, it applies to all context partitions and not any specific context partition, since the SecurityEvent
event type is not part of the context declaration.
This context assigns events to context partitions based on the result of a hash function and modulo operation. Each event belongs to exactly one context partition or zero context partitions if the event does not match the optional filter predicate expression(s). Each context partition handles exactly one result of hash value modulo granularity.
The syntax for creating a hashed segmented context is as follows:
create context context_name coalesce [by] hash_func_name(hash_func_param) from stream_def [, hash_func_name(hash_func_param) from stream_def ] [, ...] granularity granularity_value [preallocate]
The context_name you assign to the context can be any identifier.
Following the context name is one or more lists of hash function name and parameters pairs and a stream definition for each entry, separated by comma (,
).
The hash_func_name can either be consistent_hash_crc32
or hash_code
or a plug-in single-row function. The hash_func_param is a list of parameter expressions.
If you specify consistent_hash_crc32
the engine computes a consistent hash code using the CRC-32 algorithm.
If you specify hash_code
the engine uses the Java object hash code.
If you specify the name of a plug-in single-row function your function must return an integer value that is the hash code. You may use the wildcard (*)
character among the parameters to pass the underlying event to the single-row function.
The stream_def is a stream definition which consists of an event type name optionally followed by parenthesis that contains filter expressions. If providing filter expressions, only events matching the provided filter expressions for that event type are considered by context partitions. The name of a named window or table is not allowed.
You may list multiple stream definitions. Please refer to usage guidelines below when specifying multiple stream definitions.
The granularity
is required and is an integer number that defines the maximum number of context partitions. The engine computes hash code modulo granularity hash(
params) mod
granularity to determine the context partition. When you specify the hash_code
function the engine uses the object hash code and the computation is params.hashCode() %
granularity.
Since the engine locks on the level of context partition to protect state, the granularity defines the maximum degree of parallelism. For example, a granularity of 1024 means that 1024 context partitions handle events and thus a maximum 1024 threads can process each assigned statement concurrently.
The optional preallocate
keyword instructs the engine to allocate all context partitions at once at the time a statement refers to the context. This is beneficial for performance as the engine does not need to determine whether a context partition exists and dynamically allocate, but may require more memory.
The next statement creates a context SegmentedByCustomerHash
that considers the CRC-32 hash code of the custId
property of the BankTxn
event type to pick the context partition to assign events to, with up to 16 different context partitions that are preallocated:
create context SegmentedByCustomerHash coalesce by consistent_hash_crc32(custId) from BankTxn granularity 16 preallocate
The following statement refers to the context created as above to compute a total withdrawal amount per account for each customer:
context SegmentedByCustomerHash select custId, account, sum(amount) from BankTxn group by custId, account
Note that the statement above groups by custId
: Since the events for different customer ids can be assigned to the same context partition, it is necessary that the EPL statement also groups by customer id.
The context declaration shown next assumes that the application provides a computeHash
single-row function that accepts BankTxn as a parameter, wherein the result of this function must be an integer value that returns the context partition id for each event:
create context MyHashContext coalesce by computeHash(*) from BankTxn granularity 16 preallocate
The EPL statement that refers to a hash segmented context must have at least one filter expression, at any place within the EPL statement that looks for events of any of the event types listed in the context declaration.
For example, the following is not valid:
// Neither LoginEvent nor LogoutEvent are listed in the context declaration context SegmentedByCustomerHash select * from pattern [every a=LoginEvent -> b=LogoutEvent where timer:within(10 minutes)]
If the context declaration lists multiple streams, each event type must be unrelated: You may not list the same event type twice and you may not list a sub- or super-type of any event type already listed.
If the context declaration lists multiple streams, the hash code function should return the same hash code for the related keys of all streams.
The next statement creates a context HashedByCustomer
that also considers LoginEvent
and LogoutEvent
:
create context HashedByCustomer as coalesce consistent_hash_crc32(custId) from BankTxn, consistent_hash_crc32(loginId) from LoginEvent, consistent_hash_crc32(loginId) from LogoutEvent granularity 32 preallocate
You may add a filter expression to each of the event types listed. The engine applies the filter expression to the EPL statement that refers to the context and to the same event type.
The next statement creates a context HashedByCustomer
that does not consider login events that indicate that the login failed.
create context HashedByCustomer coalesce consistent_hash_crc32(loginId) from LoginEvent(failed = false) granularity 1024 preallocate
The following context properties are available in your EPL statement when it refers to a keyed segmented context:
Table 4.2. Hash Segmented Context Properties
Name | Description |
---|---|
name | The string-type context name. |
id | The integer-type internal context partition id that the engine assigns to the context partition. |
Assume the hashed segmented context is declared as follows:
create context ByCustomerHash coalesce consistent_hash_crc32(custId) from BankTxn granularity 1024
You may, for example, select the context properties as follows:
context ByCustomerHash select context.name, context.id from BankTxn
The hash_code
function based on the Java object hash code is generally faster than the CRC32 algorithm. The CRC32 algorithm, when used with a non-String parameter or with multiple parameters, requires the engine to serialize all expression results to a byte array to compute the CRC32 hash code.
We recommend keeping the granularity small (1k and under) when using preallocate
.
When specifying a granularity greater than 65536
(64k) the engine switches to a Map-based lookup of context partition state which can slow down statement processing.
This context assigns events to context partitions based on the values of one or more event properties, using a predicate expression(s) to define context partition membership. Each event can thus belong to zero, one or many context partitions depending on the outcome of the predicate expression(s).
The syntax for creating a category segmented context is as follows:
create context context_name group [by] group_expression as category_label [, group [by] group_expression as category_label] [, ...] from stream_def
The context_name you assign to the context can be any identifier.
Following the context name is a list of groups separated by the group
keyword. The list of groups is followed by the from
keyword and a stream definition.
The group_expression is an expression that categorizes events. Each group expression must be followed by the as
keyword and a category label which can be any identifier.
Group expressions are predicate expressions and must return a Boolean-type true or false when applied to an event. For a given event, any number of group expressions may return true thus categories can be overlapping.
The stream_def is a stream definition which consists of an event type name optionally followed by parenthesis that contains filter expressions. If providing filter expressions, only events matching the provided filter expressions for that event type are considered by context partitions.
The next statement creates a context CategoryByTemp
that consider the value of the temperature
property of the SensorEvent
event type to pick context partitions to assign events to:
create context CategoryByTemp group temp < 65 as cold, group temp between 65 and 85 as normal, group temp > 85 as large from SensorEvent
The following statement simply counts, for each category, the number of events and outputs the category label and count:
context CategoryByTemp select context.label, count(*) from SensorEvent
The following context properties are available in your EPL statement when it refers to a category segmented context:
Table 4.3. Category Segmented Context Properties
Name | Description |
---|---|
name | The string-type context name. |
id | The integer-type internal context partition id that the engine assigns to the context partition. |
label | The category label is the string identifier value after the as keyword that is specified for each group. |
You may, for example, select the context properties as follows:
context CategoryByTemp select context.name, context.id, context.label from SensorEvent
You may declare a non-overlapping context that exists once or that repeats in a regular fashion as controlled by a start condition and an optional end condition. The number of context partitions is always either one or zero: Context partitions do not overlap.
The syntax for creating a non-overlapping context is as follows:
create context context_name start (@now | start_condition) [ end end_condition ]
The context_name you assign to the context can be any identifier.
Following the context name is the start
keyword, either @now
or a start_condition. It follows the optional end
keyword and an end_condition.
Both the start condition and the end condition, if specified, can be an event filter, a pattern, a crontab or a time period. The syntax of start and end conditions is described in Section 4.2.7, “Context Conditions”.
Once the start condition occurs, the engine no longer observes the start condition and begins observing the end condition, if an end condition was provided.
Once the end condition occurs, the engine observes the start condition again.
If you specified @now
instead of a start condition, the engine begins observing the end condition instead.
If there is no end condition the context partition remains alive and does not end.
If you specified an event filter as the start condition, then the event also counts towards the statement(s) that refer to that context. If you specified a pattern as the start condition, then the events that may constitute the pattern match can also count towards the statement(s) that refer to the context provided that @inclusive
and event tags are both specified (see below).
At the time of context activation when your application creates a statement that utilizes the context, the engine checks whether the start and end condition are crontab expressions. The engine evaluates the start and end crontab expressions and determines whether the current time is a time between start and end. If the current time is between start and end times, the engine allocates the context partition and waits for observing the end time. Otherwise the engine waits to observe the start time and does not allocate a context partition.
The built-in context properties that are available are the same as described in Section 4.2.6.2, “Built-In Context Properties”.
The next statement creates a context NineToFive
that declares a daily time period that starts at 9 am and ends at 5 pm:
create context NineToFive start (0, 9, *, *, *) end (0, 17, *, *, *)
The following statement outputs speed violations between 9 am and 5 pm, considering a speed of 100 or greater as a violation:
context NineToFive select * from TrafficEvent(speed >= 100)
The example that follows demonstrates the use of an event filter as the start condition and a pattern as the end condition.
The next statement creates a context PowerOutage
that starts when the first PowerOutageEvent
event arrives and that ends 5 seconds after a subsequent PowerOnEvent
arrives:
create context PowerOutage start PowerOutageEvent end pattern [PowerOnEvent -> timer:interval(5)]
The following statement outputs the temperature during a power outage and for 5 seconds after the power comes on:
context PowerOutage select * from TemperatureEvent
To output only the last value when a context partition ends (terminates, expires), please read on to the description of output rate limiting.
The next statement creates a context Every15Minutes
that starts immediately and lasts for 15 minutes, repeatedly allocating a new context partition at the end of 15 minute intervals:
create context Every15Minutes start @now end after 15 minutes
The next example declares an AlwaysOn
context: It starts immediately and does not end unless the application uses the API to terminate the context partition:
create context AlwaysOn start @now
A non-overlapping context with @now
is always-on: A context partition is always allocated at any given point in time. Only if @now
is specified
will a context partition always exist at any point in time.
If you specified an event filter or pattern as the end condition for a context partition, and statements that refer to the context specify an event filter or pattern that matches the same conditions, use @Priority to instruct the engine whether the context management or the statement evaluation takes priority (see below for configuring prioritized execution).
For example, if your context declaration looks like this:
create context MyCtx start MyStartEvent end MyEndEvent
And a statement managed by the context is this:
context MyCtx select count(*) as cnt from MyEndEvent output when terminated
By using @Priority(1)
for create-context and @Priority(0)
for the counting statement the counting statement does not count the last MyEndEvent
since context partition management takes priority.
By using @Priority(0)
for create-context and @Priority(1)
for the counting statement the counting statement will count the last MyEndEvent
since the statement evaluation takes priority.
This context initiates a new context partition when an initiating condition occurs, and terminates one or more context partitions when the terminating condition occurs, if a terminating condition was specified. Thus multiple overlapping context partitions can be active at any point and context partitions can overlap.
The syntax for creating an overlapping context is as follows:
create context context_name initiated [by] [distinct (distinct_value_expr [,...])] [@now and] initiating_condition [ terminated [by] terminating_condition ]
The context_name you assign to the context can be any identifier.
Following the context name is the initiated
keyword.
After the initiated
keyword you can optionally specify the distinct
keyword and, within parenthesis, list one or more distinct value expressions.
After the initiated
keyword you can also specify @now and
as explained below.
After the initiated
keyword you must specify the initiating condition.
You may optionally use the terminated
keyword followed by the terminating condition.
If no terminating condition is specified each context partition remains alive and does not terminate.
Both the initiating condition and the terminating condition, if specified, can be an event filter, a pattern, a crontab or a time period. The syntax of initiating and terminating conditions is described in Section 4.2.7, “Context Conditions”.
If you specified @now and
before the initiating condition then the engine initiates a new context partition immediately. The @now
is only allowed in conjunction with initiation conditions that specify a pattern, crontab or time period and not with event filters.
If you specified an event filter for the initiating condition, then the event that initiates a new context partition also counts towards the statement(s) that refer to that context. If you specified a pattern to initiate a new context partition, then the events that may constitute the pattern match can also count towards the statement(s) that refer to the context provided that @inclusive
and event tags are both specified (see below).
The next statement creates a context CtxTrainEnter
that allocates a new context partition when a train enters a station, and that terminates each context partition 5 minutes after the time the context partition was allocated:
create context CtxTrainEnter initiated by TrainEnterEvent as te terminated after 5 minutes
The context declared above assigns the stream name te
. Thereby the initiating event's properties can be accessed, for example, by specifying context.te.trainId
.
The following statement detects when a train enters a station as indicated by a TrainEnterEvent
, but does not leave the station within 5 minutes as would be indicated by a matching TrainLeaveEvent
:
context CtxTrainEnter select t1 from pattern [ t1=TrainEnterEvent(trainId = context.te.trainId) -> timer:interval(5 min) and not TrainLeaveEvent(trainId = context.te.trainId) ]
Each event (incoming or inserted-into) applies to each context partition. You must provide filter expressions that indicate how events apply to context partitions.
The example above has trainId = context.te.trainId
to say that the train id of events must match the train id of the initiating event of the particular context partition.
Since the TrainEnterEvent
that initiates a new context partition also counts towards the statement, the first part of the pattern (the t1=TrainEnterEvent
) is satisfied by that initiating event.
The next statement creates a context CtxEachMinute
that allocates a new context partition immediately and every 1 minute, and that terminates each context partition 1 minute after the time the context partition was allocated:
create context CtxEachMinute initiated @now and pattern [every timer:interval(1 minute)] terminated after 1 minutes
The statement above specifies @now
to instruct the engine to allocate a new context partition immediately as well as when the pattern fires.
Without the @now
the engine would only allocate a new context partition when the pattern fires after 1 minute and every minute thereafter.
The following statement averages the temperature, starting anew every 1 minute and outputs the aggregate value continuously:
context CtxEachMinute select avg(temp) from SensorEvent
To output only the last value when a context partition ends (terminates, expires), please read on to the description of output rate limiting.
By providing no terminating condition, you can tell the engine to allocate context partitions that never terminate, for example:
create context CtxTrainEnter initiated by TrainEnterEvent as te
If you specified an event filter or pattern as the termination condition for a context partition, and statements that refer to the context specify an event filter or pattern that matches the same conditions, use @Priority to instruct the engine whether the context management or the statement evaluation takes priority (see below for configuring prioritized execution). See the note above for more information.
If your initiating condition is a filter context condition, you may specify the distinct
keyword followed by one or more distinct-value expressions.
The following sample EPL specifies a context that initiates a context partition for distinct order id values, remembering that order id until the time the context partition terminates:
create context OrderContext initiated by distinct(orderId) NewOrderEvent as newOrder terminated by CloseOrderEvent(closeOrderId = newOrder.orderId)
The engine allocates a new context partition only when a context partition does not already exist for a given orderId
value of NewOrderEvent
.
When the context partition terminates at the time a CloseOrderEvent
arrives, the engine forgets about the orderId
,
allowing the next NewOrderEvent
event for the same orderId
to allocate a new context partition.
Please note the following limitations:
distinct
keyword requires the initiating condition to be an event stream (and not a crontab or pattern, for example) and a stream name must be
assigned using the as
keyword.
prev
and prior
functions are not allowed among the distinct-value expressions.
The following context properties are available in your EPL statement when it refers to a context:
Table 4.4. Context Properties
Name | Description |
---|---|
name | The string-type context name. |
startTime | The start time of the context partition. |
endTime | The end time of the context partition. This field is only available in the case that it can be computed from the crontab or time period expression that is provided. Use current_timestamp instead.
|
You may, for example, select the context properties as follows:
context NineToFive select context.name, context.startTime, context.endTime from TrafficEvent(speed >= 100)
The following statement looks for the next train leave event for the same train id and selects a few of the context properties:
context CtxTrainEnter select *, context.te.trainId, context.id, context.name from TrainLeaveEvent(trainId = context.te.trainId)
Context start/initiating and end/terminating conditions are for use with overlapping and non-overlapping contexts. Any combination of conditions may be specified.
Define the stream that starts/initiates a context partition or that ends/terminates a context partition:
event_stream_name [(filter_criteria)] [as stream_name]
The event_stream_name is either the name of an event type or name of an event stream populated by an insert into statement. The filter_criteria is optional and consists of a list of expressions filtering the events of the event stream, within parenthesis after the event stream name.
Two examples are:
// A non-overlapping context that starts when MyStartEvent arrives and ends when MyEndEvent arrives create context MyContext start MyStartEvent end MyEndEvent
// An overlapping context where each MyEvent with level greater zero // initiates a new context partition that terminates after 10 seconds create context MyContext initiated MyEvent(level > 0) terminated after 10 seconds
You may correlate the start/initiating and end/terminating streams by providing a stream name following the as
keyword, and by referring to that stream name in the filter criteria of the end condition.
Two examples that correlate the start/initiating and end/terminating condition are:
// A non-overlapping context that starts when MyEvent arrives // and ends when a matching MyEvent arrives (same id) create context MyContext start MyEvent as myevent end MyEvent(id=myevent.id)
// An overlapping context where each MyInitEvent initiates a new context partition // that terminates when a matching MyTermEvent arrives create context MyContext initiated by MyInitEvent as e1 terminated by MyTermEvent(id=e1.id, level <> e1.level)
You can define a pattern that starts/initiates a context partition or that ends/terminates a context partition:
pattern [pattern_expression] [@inclusive]
The pattern_expression is a pattern at Chapter 7, EPL Reference: Patterns.
Specify @inclusive
after the pattern to have those same events that constitute the pattern match also count towards any statements that are associated to the context.
You must also provide a tag for each event in a pattern that should be included.
Examples are:
// A non-overlapping context that starts when either StartEventOne or StartEventTwo arrive // and that ends after 5 seconds. // Here neither StartEventOne or StartEventTwo count towards any statements // that are referring to the context. create context MyContext start pattern [StartEventOne or StartEventTwo] end after 5 seconds
// Same as above. // Here both StartEventOne or StartEventTwo do count towards any statements // that are referring to the context. create context MyContext start pattern [a=StartEventOne or b=StartEventTwo] @inclusive end after 5 seconds
// An overlapping context where each distinct MyInitEvent initiates a new context // and each context partition terminates after 20 seconds // Use @inclusive to say that the same MyInitEvent that fires the pattern // also applies to statements that are associated to the context. create context MyContext initiated by pattern [every-distinct(a.id, 20 sec) a=MyInitEvent]@inclusive terminated after 20 sec
// An overlapping context where each pattern match initiates a new context // and all context partitions terminate when MyTermEvent arrives. // The MyInitEvent and MyOtherEvent that trigger the pattern are themselves not included // in any statements that are associated to the context. create context MyContext initiated by pattern [every MyInitEvent -> MyOtherEvent where timer:within(5)] terminated by MyTermEvent
You may correlate the start and end streams by providing tags as part of the pattern, and by referring to the tag name(s) in the filter criteria of the end condition.
An example that correlates the start and end condition is:
// A non-overlapping context that starts when either StartEventOne or StartEventTwo arrive // and that ends when either a matching EndEventOne or EndEventTwo arrive create context MyContext start pattern [a=StartEventOne or b=StartEventTwo]@inclusive end pattern [EndEventOne(id=a.id) or EndEventTwo(id=b.id)]
Crontab expression are described in Section 7.6.4, “Crontab (timer:at)”.
Examples are:
// A non-overlapping context started daily between 9 am to 5 pm // and not started outside of these hours: create context NineToFive start (0, 9, *, *, *) end (0, 17, *, *, *)
// An overlapping context where crontab initiates a new context every 1 minute // and each context partition terminates after 10 seconds: create context MyContext initiated (*, *, *, *, *) terminated after 10 seconds
You may specify a time period that the engine observes before the condition fires. Time period expressions are described in Section 5.2.1, “Specifying Time Periods”.
The syntax is:
after time_period_expression
Examples are:
// A non-overlapping context started after 10 seconds // that ends 1 minute after it starts and that again starts 10 seconds thereafter. create context NonOverlap10SecFor1Min start after 10 seconds end after 1 minute
// An overlapping context that starts a new context partition every 5 seconds // and each context partition lasts 1 minute create context Overlap5SecFor1Min initiated after 5 seconds terminated after 1 minute
A nested context is a context that is composed from two or more contexts.
The syntax for creating a nested context is as follows:
create context context_name context nested_context_name [as] nested_context_definition , context nested_context_name [as] nested_context_definition [, ...]
The context_name you assign to the context can be any identifier.
Following the context name is a comma-separated list of nested contexts. For each nested context specify the context
keyword followed a nested context name and the nested context declaration. Any of the context declarations as outlined in Section 4.2, “Context Declaration” are allowed for nested contexts.
The order of nested context declarations matters as outlined below. The nested context names have meaning only in respect to built-in properties and statements may not be assigned to nested context names.
The next statement creates a nested context NineToFiveSegmented
that, between 9 am and 5 pm, allocates a new context partition for each customer id:
create context NineToFiveSegmented context NineToFive start (0, 9, *, *, *) end (0, 17, *, *, *), context SegmentedByCustomer partition by custId from BankTxn
The following statement refers to the nested context to compute a total withdrawal amount per account for each customer but only between 9 am and 5 pm:
context NineToFiveSegmented select custId, account, sum(amount) from BankTxn group by account
Esper implements nested contexts as a context tree: The context declared first controls the lifecycle of the context(s) declared thereafter. Thereby, in the above example, outside of the 9am-to-5pm time the engine has no memory and consumes no resources in relationship to bank transactions or customer ids.
When combining segmented contexts, the set of context partitions for the nested context effectively is the Cartesian product of the partition sets of the nested segmented contexts (sample: create context MyCartSegmented context SegmentedByAccount partition by acctId from BankTxn context SegmentedByCustomer partition by custId from BankTxn
).
When combining temporal contexts with other contexts, since temporal contexts may overlap and may terminate, it is important to understand that temporal contexts control the lifecycle of sub-contexts (contexts declared thereafter). The order of declaration of contexts in a nested context can thereby change resource usage and output result.
The next statement creates a context that allocates context partition only when a train enters a station and then for each hash of the tag id of a passenger as indicated by PassengerScanEvent events, and terminates all context partitions after 5 minutes:
create context CtxNestedTrainEnter context InitCtx initiated by TrainEnterEvent as te terminated after 5 minutes, context HashCtx coalesce by consistent_hash_crc32(tagId) from PassengerScanEvent granularity 16 preallocate
In the example above the engine does not start tracking PassengerScanEvent events or hash codes or allocate context partitions until a TrainEnterEvent arrives.
This section declares a nested context with nested non-overlapping contexts and walks through a specific scenario to help you better understand nested context lifecycles.
Assume event types AStart
, AEnd
, BStart
, BEnd
and C
.
The following EPL counts C
-events that occur within the span of AStart
and AEnd
and
a span of BStart
and BEnd
, wherein the span of AStart
-to-AEnd
must contain the span of BStart
-to-BEnd
:
create context CtxSampleNestedContext context SpanA start AStart end AEnd, context SpanB start BStart end BEnd
context CtxSampleNestedContext select count(*) from C
Upon creating the EPL statements above, the engine starts looking for an AStart
event only and does not yet look for AEnd
, BStart
, BEnd
or C
events.
In the scenario, assume that an AStart
event arrives next. This is, logically, the beginning of the SpanA
lifecycle (aka. session, interval):
The engine stops looking for an AStart
event.
The engine starts looking for an AEnd
event, since that would mean the end of the current SpanA
lifecycle.
The engine starts looking for a BStart
event, in order to detect the beginning of a SpanB
lifecycle.
In the scenario, assume that a BStart
event arrives. This is, logically, the beginning of the SpanB
lifecycle:
The engine stops looking for further BStart
events.
The engine starts looking for a BEnd
event, since that would mean the end of the current SpanB
lifecycle.
The engine keeps looking for an AEnd
event, since that would mean the end of the current SpanA
lifecycle.
The engine starts looking for C
events and now starts counting each C
that arrives.
In the scenario, assume that a BEnd
event arrives. This is, logically, the end of the SpanB
lifecycle:
The engine stops looking for a BEnd
event.
The engine stops looking for C
events and stops counting each.
The engine starts looking for a BStart
event, since that would mean the beginning of another SpanB
lifecycle.
In the scenario, assume that an AEnd
event arrives. This is, logically, the end of the SpanA
lifecycle:
The engine stops looking for an AEnd
event.
The engine stops looking for a BStart
event.
The engine starts looking for an AStart
event, since that would mean the beginning of another SpanA
lifecycle.
In the scenario describe above, after the AEnd
arrives, the engine is back to the same state as the engine had after the statements were created originally.
If your use case calls for a logical OR relationships, please consider a pattern for the start condition, like for example so (not equivalent to above):
create context CtxSampleNestedContext start pattern[every a=AStart or every a=BStart] as mypattern end pattern[every AEnd or every BEnd]
Context properties of all nested contexts are available for use. Specify context.
nested_context_name.
property_name or if nested context declaration provided stream names or tags for patterns then context.
nested_context_name.
stream_name.
property_name.
For example, consider the CtxNestedTrainEnter
context declared earlier. The following statement selects a few of the context properties:
context CtxNestedTrainEnter select context.InitCtx.te.trainId, context.HashCtx.id, tagId, count(*) from PassengerScanEvent group by tagId
In a second example, consider the NineToFiveSegmented
context declared earlier. The following statement selects a few of the context properties:
context NineToFiveSegmented select context.NineToFive.startTime, context.SegmentedByCustomer.key1 from BankTxn
The following context properties are available in your EPL statement when it refers to a nested context:
Table 4.5. Nested Context Properties
Name | Description |
---|---|
name | The string-type context name. |
id | The integer-type internal context partition id that the engine assigns to the context partition. |
This example selects the nested context name and context partition id:
context NineToFiveSegmented select context.name, context.id from BankTxn
You do not need to declare a context to partition data windows, aggregation values or patterns themselves individually. You may mix-and-match partitioning as needed.
The table below outlines other partitioning syntax supported by EPL:
Table 4.6. Partition in EPL Without the Use of Context Declarations
Partition Type | Description | Example |
---|---|---|
Grouped Data Window | Partitions at the level of data window, only applies to appended data window(s). Syntax: | // Length window of 2 events per customer select * from BankTxn#groupwin(custId)#length(2) |
Grouped Aggregation | Partitions at the level of aggregation, only applies to any aggregations. Syntax: | select avg(price), window(*) from BankTxn group by custId |
Pattern | Partitions pattern subexpressions. Syntax: | select * from pattern [ every a=BankTxn -> BankTxn(custId = a.custId)...] |
Match-Recognize | Partitions match-recognize patterns. Syntax: | select * from match_recognize ... partition by custId |
Join and Subquery | Partitions join and subqueries. Syntax: | select * from ... where a.custId = b.custId |
You may use output rate limiting to trigger output when a context partition ends or terminates using output when terminated
.
This concept was introduced to you in Section 2.13, “Basic Partitioned and Output-Rate-Limited Query”.
For more information on output rate limiting please see Section 5.7, “Stabilizing and Controlling Output: The Output Clause”.
The context CtxEachMinute
initiates a new context partition every 1 minute, and each context partition expires after 5 minutes:
create context CtxEachMinute initiated by pattern [every timer:interval(1 min)] terminated after 5 minutes
The following statement computes an ongoing average temperature however only outputs the last value of the average temperature, together with the context partition id and the current engine time, after 5 minutes when a context partition ends:
context CtxEachMinute select current_timestamp as endtime, context.id, avg(temp) from SensorEvent output last when terminated
The when terminated
syntax can be combined with other output rates.
The next example outputs every 1 minute and also when the context partition ends:
context CtxEachMinute select context.id, avg(temp) from SensorEvent output last every 1 minute and when terminated
In the case that the end/terminating condition of the context partition is an event or pattern, the context properties contain the information of the tagged events in the pattern or the single event that ended/terminated the context partition.
For example, consider the following context wherein the engine initializes a new context partition for each arriving MyStartEvent
event and that terminates a context partition when a matching MyEndEvent
arrives:
create context CtxSample initiated by MyStartEvent as startevent terminated by MyEndEvent(id = startevent.id) as endevent
The following statement outputs the id property of the initiating and terminating event and only outputs when a context partition ends:
context CtxSample select context.startevent.id, context.endevent.id, count(*) from MyEvent output last when terminated
You may in addition specify a termination expression that the engine evaluates when a context partition terminates. Only when the termination expression evaluates to true does output occur. The expression may refer to built-in properties as described in Section 5.7.1.1, “Controlling Output Using an Expression”. The syntax is as follows:
...output when terminated and termination_expression
The next example statement outputs when a context partition ends but only if at least two events are available for output:
context CtxEachMinute select * from SensorEvent output when terminated and count_insert >= 2
The final example EPL outputs when a context partition ends and sets the variable myvar
to a new value:
context CtxEachMinute select * from SensorEvent output when terminated then set myvar=3
Named windows are globally-visible data windows that may be referred to by multiple statements. You may refer to named windows in statements that declare a context without any special considerations, with the exception of on-action statements (latter must refer to the same context associated with the named window).
You may also create a named window and declare a context for the named window. In this case the engine in effect manages separate named windows, one for each context partition.
For example, consider the 9 am to 5 pm non-overlapping context as shown earlier:
create context NineToFive start (0, 9, *, *, *) end (0, 17, *, *, *)
You may create a named window that only exists between 9 am and 5 pm:
context NineToFive create window SpeedingEvents1Hour#time(30 min) as TrafficEvent
You can insert into the named window:
context NineToFive on TrafficEvent(speed > 100) merge SpeedingEvents1Hour insert select *
Any on-merge, on-select, on-update and on-delete statements must declare the same context, in order to operate on partitioned named windows or tables for the same partition.
The following is not a valid statement as it does not declare the same context that was used to declare the named window:
// You must declare the same context for on-trigger statements on TruncateEvent delete from SpeedingEvents1Hour
The following is valid:
context NineToFive on TruncateEvent delete from SpeedingEvents1Hour
For context declarations that require specifying event types, such as the hash segmented context and keyed segmented context, please provide the named window underlying event type.
The following sample EPL statements define a type for the named window, declare a context and associate the named window to the context:
create schema ScoreCycle (userId string, keyword string, productId string, score long)
create context HashByUserCtx as coalesce by consistent_hash_crc32(userId) from ScoreCycle granularity 64
context HashByUserCtx create window ScoreCycleWindow#unique(productId, keyword) as ScoreCycle
Use on-merge to insert events into a specific partition.
Insert Into
produces an event that is visible to all partitions.
Tables are globally-visible data structures that hold rows organized by primary key(s) and that may be referred to by multiple statements. You may refer to tables in statements that declare a context without any special considerations, with the exception of on-action statements (latter must refer to the same context associated with the table).
You may also create a table and declare a context for the table. In this case the engine in effect manages separate tables, one for each context partition.
For example, consider the 9 am to 5 pm non-overlapping context as shown earlier:
create context NineToFive start (0, 9, *, *, *) end (0, 17, *, *, *)
You may create a table that only exists between 9 am and 5 pm:
context NineToFive create table AverageSpeedTable ( carId string primary key, avgSpeed avg(double))
You can aggregate-into the table only if the aggregating statement declares the same context:
// declare the same context as for the table context NineToFive into table AverageSpeedTable select avg(speed) as avgSpeed from TrafficEvent group by carId
When you declare a context for a table, any select, on-merge, on-select, on-update and on-delete statements as well as statements that subquery the table must declare the same context.
For example, this EPL truncates the AverageSpeedTable:
context NineToFive on TruncateEvent delete from AverageSpeedTable
Use on-merge or into-table to insert events into a specific partition.
Insert Into
produces an event that is visible to all partitions.
A variable is a scalar, object or event value that is available for use in all statements. Variables can be either global variables or context variables.
The value of a global variable is the same for all context partitions. The next example declares a global threshold variable:
create variable integer var_global_threshold = 100
For context variables, there is a variable value per context partition. The next example declares a context and a context variable:
create context ParkingLotContext initiated by CarArrivalEvent as cae terminated by CarDepartureEvent(lot = cae.lot)
context ParkingLotContext create variable integer var_parkinglot_threshold = 100
The variable var_parkinglot_threshold
is a context variable. Each context partition can have its own value for the variable.
For more information on variables, please refer to Section 5.17, “Variables and Constants”.
Context variables can only be used in statements that associated to the same context.
The API to read and manage content partitions themselves is Section 14.19, “Context Partition Administration”.
Selecting specific context partitions and interrogating context partition state is useful for:
Iterating a specific context partition or a specific set of context partitions. Iterating a statement is described in Section 14.3.5, “Using Iterators”.
Executing an on-demand (fire-and-forget) query against specific context partition(s). On-demand queries are described in Section 14.5, “On-Demand Fire-and-Forget Query Execution”.
Esper provides APIs to identify, filter and select context partitions for statement iteration and on-demand queries. The APIs are described in detail at Section 14.18, “Context Partition Selection”.
For statement iteration, your application can provide context selector objects to the iterate
and safeIterate
methods on EPStatement
. If your code does not provide context selectors
the iteration considers all context partitions. At the time of iteration, the engine obtains the current set of context partitions and iterates each independently. If your statement has an order-by clause, the order-by clause orders within the context partition and does not order across context partitions.
For on-demand queries, your application can provide context selector objects to the executeQuery
method on EPRuntime
and to the execute
method on EPOnDemandPreparedQuery
. If your code does not provide context selectors the on-demand query considers all context partitions. At the time of on-demand query execution, the engine obtains the current set of context partitions and queries each independently. If the on-demand query has an order-by clause, the order-by clause orders within the context partition and does not order across context partitions.