www.espertech.comDocumentation
Esper EPL allows you to declare contexts explicitly, offering the following benefits:
Esper EPL allows you to declare a context explicitly via the create context
syntax introduced below.
You may have heard of the term session. A context partition is the same as a session.
The context declaration specifies how the engine manages context partitions (or sessions):
For keyed segmented context there is a context partition (or session) per key or multiple keys, see Section 4.2.2, “Keyed Segmented Context”.
For hash segmented context there is a context partition (or session) per hash code of one or more keys, see Section 4.2.3, “Hash Segmented Context”.
For overlapping contexts there can be multiple overlapping context partitions (or sessions), see Section 4.2.6, “Overlapping Context”.
For non-overlapping contexts there is only zero or one single context partition (or session), see Section 4.2.5, “Non-Overlapping Context”.
For category segmented context there is a context partition (or session) per predefined category, see Section 4.2.4, “Category Segmented Context”.
For more information on locking and threading please see Section 16.7, “Engine Threading and Concurrency”. For performance related information please refer to Chapter 22, Performance.
The syntax for creating a keyed segmented context is as follows:
create context context_name partition [by] event_property [and event_property [and ...]] from stream_def [, event_property [...] from stream_def] [, ...]
The context_name you assign to the context can be any identifier.
create context SegmentedByCustomer partition by custId from BankTxn
context SegmentedByCustomer select custId, account, sum(amount) from BankTxn group by account
context SegmentedByCustomer select * from pattern [ every a=BankTxn(amount > 400) -> b=BankTxn(amount > 400) where timer:within(10 minutes) ]
For example, the following is not valid:
// Neither LoginEvent nor LogoutEvent are listed in the context declaration context SegmentedByCustomer select * from pattern [every a=LoginEvent -> b=LogoutEvent where timer:within(10 minutes)]
The following is not a valid declaration since the BankTxn
event type is listed twice:
// Not valid create context SegmentedByCustomer partition by custId from BankTxn, account from BankTxn
// Invalid: Type mismatch between properties create context SegmentedByCustomer partition by custId from BankTxn, loginTime from LoginEvent
create context SegmentedByCustomer partition by custId from BankTxn, loginId from LoginEvent, loginId from LogoutEvent
An example context declaration follows:
create context ByCustomerAndAccount partition by custId and account from BankTxn
context ByCustomerAndAccount select custId, account, sum(amount) from BankTxn
Consider a context declared as follows:
create context ByCust partition by custId from BankTxn
context ByCust select * from BankTxn as t1 unidirectional, BankTxn#time(30) t2 where t1.amount = t2.amount
context ByCust select * from SecurityEvent as t1 unidirectional, BankTxn#time(30) t2 where t1.customerName = t2.customerName
The syntax for creating a hashed segmented context is as follows:
create context context_name coalesce [by] hash_func_name(hash_func_param) from stream_def [, hash_func_name(hash_func_param) from stream_def ] [, ...] granularity granularity_value [preallocate]
The context_name you assign to the context can be any identifier.
create context SegmentedByCustomerHash coalesce by consistent_hash_crc32(custId) from BankTxn granularity 16 preallocate
context SegmentedByCustomerHash select custId, account, sum(amount) from BankTxn group by custId, account
create context MyHashContext coalesce by computeHash(*) from BankTxn granularity 16 preallocate
For example, the following is not valid:
// Neither LoginEvent nor LogoutEvent are listed in the context declaration context SegmentedByCustomerHash select * from pattern [every a=LoginEvent -> b=LogoutEvent where timer:within(10 minutes)]
The syntax for creating a category segmented context is as follows:
create context context_name group [by] group_expression as category_label [, group [by] group_expression as category_label] [, ...] from stream_def
The context_name you assign to the context can be any identifier.
create context CategoryByTemp group temp < 65 as cold, group temp between 65 and 85 as normal, group temp > 85 as large from SensorEvent
context CategoryByTemp select context.label, count(*) from SensorEvent
The syntax for creating a non-overlapping context is as follows:
create context context_name start (@now | start_condition) [ end end_condition ]
The context_name you assign to the context can be any identifier.
Both the start condition and the end condition, if specified, can be an event filter, a pattern, a crontab or a time period. The syntax of start and end conditions is described in Section 4.2.7, “Context Conditions”.
Once the start condition occurs, the engine no longer observes the start condition and begins observing the end condition, if an end condition was provided.
Once the end condition occurs, the engine observes the start condition again.
If you specified @now
instead of a start condition, the engine begins observing the end condition instead.
If there is no end condition the context partition remains alive and does not end.
If you specified an event filter as the start condition, then the event also counts towards the statement(s) that refer to that context. If you specified a pattern as the start condition, then the events that may constitute the pattern match can also count towards the statement(s) that refer to the context provided that @inclusive
and event tags are both specified (see below).
At the time of context activation when your application creates a statement that utilizes the context, the engine checks whether the start and end condition are crontab expressions. The engine evaluates the start and end crontab expressions and determines whether the current time is a time between start and end. If the current time is between start and end times, the engine allocates the context partition and waits for observing the end time. Otherwise the engine waits to observe the start time and does not allocate a context partition.
The built-in context properties that are available are the same as described in Section 4.2.6.2, “Built-In Context Properties”.
The next statement creates a context NineToFive
that declares a daily time period that starts at 9 am and ends at 5 pm:
create context NineToFive start (0, 9, *, *, *) end (0, 17, *, *, *)
The following statement outputs speed violations between 9 am and 5 pm, considering a speed of 100 or greater as a violation:
context NineToFive select * from TrafficEvent(speed >= 100)
The example that follows demonstrates the use of an event filter as the start condition and a pattern as the end condition.
The next statement creates a context PowerOutage
that starts when the first PowerOutageEvent
event arrives and that ends 5 seconds after a subsequent PowerOnEvent
arrives:
create context PowerOutage start PowerOutageEvent end pattern [PowerOnEvent -> timer:interval(5)]
The following statement outputs the temperature during a power outage and for 5 seconds after the power comes on:
context PowerOutage select * from TemperatureEvent
To output only the last value when a context partition ends (terminates, expires), please read on to the description of output rate limiting.
The next statement creates a context Every15Minutes
that starts immediately and lasts for 15 minutes, repeatedly allocating a new context partition at the end of 15 minute intervals:
create context Every15Minutes start @now end after 15 minutes
The next example declares an AlwaysOn
context: It starts immediately and does not end unless the application uses the API to terminate the context partition:
create context AlwaysOn start @now
A non-overlapping context with @now
is always-on: A context partition is always allocated at any given point in time. Only if @now
is specified
will a context partition always exist at any point in time.
If you specified an event filter or pattern as the end condition for a context partition, and statements that refer to the context specify an event filter or pattern that matches the same conditions, use @Priority to instruct the engine whether the context management or the statement evaluation takes priority (see below for configuring prioritized execution).
For example, if your context declaration looks like this:
create context MyCtx start MyStartEvent end MyEndEvent
And a statement managed by the context is this:
context MyCtx select count(*) as cnt from MyEndEvent output when terminated
By using @Priority(1)
for create-context and @Priority(0)
for the counting statement the counting statement does not count the last MyEndEvent
since context partition management takes priority.
By using @Priority(0)
for create-context and @Priority(1)
for the counting statement the counting statement will count the last MyEndEvent
since the statement evaluation takes priority.
The syntax for creating an overlapping context is as follows:
create context context_name initiated [by] [distinct (distinct_value_expr [,...])] [@now and] initiating_condition [ terminated [by] terminating_condition ]
The context_name you assign to the context can be any identifier.
Both the initiating condition and the terminating condition, if specified, can be an event filter, a pattern, a crontab or a time period. The syntax of initiating and terminating conditions is described in Section 4.2.7, “Context Conditions”.
If you specified @now and
before the initiating condition then the engine initiates a new context partition immediately. The @now
is only allowed in conjunction with initiation conditions that specify a pattern, crontab or time period and not with event filters.
If you specified an event filter for the initiating condition, then the event that initiates a new context partition also counts towards the statement(s) that refer to that context. If you specified a pattern to initiate a new context partition, then the events that may constitute the pattern match can also count towards the statement(s) that refer to the context provided that @inclusive
and event tags are both specified (see below).
The next statement creates a context CtxTrainEnter
that allocates a new context partition when a train enters a station, and that terminates each context partition 5 minutes after the time the context partition was allocated:
create context CtxTrainEnter initiated by TrainEnterEvent as te terminated after 5 minutes
The context declared above assigns the stream name te
. Thereby the initiating event's properties can be accessed, for example, by specifying context.te.trainId
.
The following statement detects when a train enters a station as indicated by a TrainEnterEvent
, but does not leave the station within 5 minutes as would be indicated by a matching TrainLeaveEvent
:
context CtxTrainEnter select t1 from pattern [ t1=TrainEnterEvent -> timer:interval(5 min) and not TrainLeaveEvent(trainId = context.te.trainId) ]
Since the TrainEnterEvent
that initiates a new context partition also counts towards the statement, the first part of the pattern (the t1=TrainEnterEvent
) is satisfied by that initiating event.
The next statement creates a context CtxEachMinute
that allocates a new context partition immediately and every 1 minute, and that terminates each context partition 1 minute after the time the context partition was allocated:
create context CtxEachMinute initiated @now and pattern [every timer:interval(1 minute)] terminated after 1 minutes
The statement above specifies @now
to instruct the engine to allocate a new context partition immediately as well as when the pattern fires.
Without the @now
the engine would only allocate a new context partition when the pattern fires after 1 minute and every minute thereafter.
The following statement averages the temperature, starting anew every 1 minute and outputs the aggregate value continuously:
context CtxEachMinute select avg(temp) from SensorEvent
To output only the last value when a context partition ends (terminates, expires), please read on to the description of output rate limiting.
By providing no terminating condition, we can tell the engine to allocate context partitions that never terminate, for example:
create context CtxTrainEnter initiated by TrainEnterEvent as te
If you specified an event filter or pattern as the termination condition for a context partition, and statements that refer to the context specify an event filter or pattern that matches the same conditions, use @Priority to instruct the engine whether the context management or the statement evaluation takes priority (see below for configuring prioritized execution). See the note above for more information.
The following context properties are available in your EPL statement when it refers to a context:
You may, for example, select the context properties as follows:
context NineToFive select context.name, context.startTime, context.endTime from TrafficEvent(speed >= 100)
The following statement looks for the next train leave event for the same train id and selects a few of the context properties:
context CtxTrainEnter select *, context.te.trainId, context.id, context.name from TrainLeaveEvent(trainId = context.te.trainId)
event_stream_name [(filter_criteria)] [as stream_name]
// A non-overlapping context that starts when MyStartEvent arrives and ends when MyEndEvent arrives create context MyContext start MyStartEvent end MyEndEvent
// An overlapping context where each MyEvent with level greater zero // initiates a new context partition that terminates after 10 seconds create context MyContext initiated MyEvent(level > 0) terminated after 10 seconds
Two examples that correlate the start/initiating and end/terminating condition are:
// A non-overlapping context that starts when MyEvent arrives // and ends when a matching MyEvent arrives (same id) create context MyContext start MyEvent as myevent end MyEvent(id=myevent.id)
// An overlapping context where each MyInitEvent initiates a new context partition // that terminates when a matching MyTermEvent arrives create context MyContext initiated by MyInitEvent as e1 terminated by MyTermEvent(id=e1.id, level <> e1.level)
pattern [pattern_expression] [@inclusive]
The pattern_expression is a pattern at Chapter 7, EPL Reference: Patterns.
Specify @inclusive
after the pattern to have those same events that constitute the pattern match also count towards any statements that are associated to the context.
You must also provide a tag for each event in a pattern that should be included.
Examples are:
// A non-overlapping context that starts when either StartEventOne or StartEventTwo arrive // and that ends after 5 seconds. // Here neither StartEventOne or StartEventTwo count towards any statements // that are referring to the context. create context MyContext start pattern [StartEventOne or StartEventTwo] end after 5 seconds
// Same as above. // Here both StartEventOne or StartEventTwo do count towards any statements // that are referring to the context. create context MyContext start pattern [a=StartEventOne or b=StartEventTwo] @inclusive end after 5 seconds
// An overlapping context where each distinct MyInitEvent initiates a new context // and each context partition terminates after 20 seconds // We use @inclusive to say that the same MyInitEvent that fires the pattern // also applies to statements that are associated to the context. create context MyContext initiated by pattern [every-distinct(a.id, 20 sec) a=MyInitEvent]@inclusive terminated after 20 sec
// An overlapping context where each pattern match initiates a new context // and all context partitions terminate when MyTermEvent arrives. // The MyInitEvent and MyOtherEvent that trigger the pattern are themselves not included // in any statements that are associated to the context. create context MyContext initiated by pattern [every MyInitEvent -> MyOtherEvent where timer:within(5)] terminated by MyTermEvent
You may correlate the start and end streams by providing tags as part of the pattern, and by referring to the tag name(s) in the filter criteria of the end condition.
An example that correlates the start and end condition is:
// A non-overlapping context that starts when either StartEventOne or StartEventTwo arrive // and that ends when either a matching EndEventOne or EndEventTwo arrive create context MyContext start pattern [a=StartEventOne or b=StartEventTwo]@inclusive end pattern [EndEventOne(id=a.id) or EndEventTwo(id=b.id)]
Crontab expression are described in Section 7.6.4, “Crontab (timer:at)”.
Examples are:
// A non-overlapping context started daily between 9 am to 5 pm // and not started outside of these hours: create context NineToFive start (0, 9, *, *, *) end (0, 17, *, *, *)
// An overlapping context where crontab initiates a new context every 1 minute // and each context partition terminates after 10 seconds: create context MyContext initiated (*, *, *, *, *) terminated after 10 seconds
You may specify a time period that the engine observes before the condition fires. Time period expressions are described in Section 5.2.1, “Specifying Time Periods”.
The syntax is:
after time_period_expression
Examples are:
// A non-overlapping context started after 10 seconds // that ends 1 minute after it starts and that again starts 10 seconds thereafter. create context NonOverlap10SecFor1Min start after 10 seconds end after 1 minute
// An overlapping context that starts a new context partition every 5 seconds // and each context partition lasts 1 minute create context Overlap5SecFor1Min initiated after 5 seconds terminated after 1 minute
A nested context is a context that is composed from two or more contexts.
The syntax for creating a nested context is as follows:
create context context_name context nested_context_name [as] nested_context_definition , context nested_context_name [as] nested_context_definition [, ...]
The context_name you assign to the context can be any identifier.
Following the context name is a comma-separated list of nested contexts. For each nested context specify the context
keyword followed a nested context name and the nested context declaration. Any of the context declarations as outlined in Section 4.2, “Context Declaration” are allowed for nested contexts.
The order of nested context declarations matters as outlined below. The nested context names have meaning only in respect to built-in properties and statements may not be assigned to nested context names.
The next statement creates a nested context NineToFiveSegmented
that, between 9 am and 5 pm, allocates a new context partition for each customer id:
create context NineToFiveSegmented context NineToFive start (0, 9, *, *, *) end (0, 17, *, *, *), context SegmentedByCustomer partition by custId from BankTxn
The following statement refers to the nested context to compute a total withdrawal amount per account for each customer but only between 9 am and 5 pm:
context NineToFiveSegmented select custId, account, sum(amount) from BankTxn group by account
Esper implements nested contexts as a context tree: The context declared first controls the lifecycle of the context(s) declared thereafter. Thereby, in the above example, outside of the 9am-to-5pm time the engine has no memory and consumes no resources in relationship to bank transactions or customer ids.
When combining segmented contexts, the set of context partitions for the nested context effectively is the Cartesian product of the partition sets of the nested segmented contexts.
When combining temporal contexts with other contexts, since temporal contexts may overlap and may terminate, it is important to understand that temporal contexts control the lifecycle of sub-contexts (contexts declared thereafter). The order of declaration of contexts in a nested context can thereby change resource usage and output result.
The next statement creates a context that allocates context partition only when a train enters a station and then for each hash of the tag id of a passenger as indicated by PassengerScanEvent events, and terminates all context partitions after 5 minutes:
create context CtxNestedTrainEnter context InitCtx initiated by TrainEnterEvent as te terminated after 5 minutes, context HashCtx coalesce by consistent_hash_crc32(tagId) from PassengerScanEvent granularity 16 preallocate
In the example above the engine does not start tracking PassengerScanEvent events or hash codes or allocate context partitions until a TrainEnterEvent arrives.
create context CtxSampleNestedContext context SpanA start AStart end AEnd, context SpanB start BStart end BEnd
context CtxSampleNestedContext select count(*) from C
create context CtxSampleNestedContext start pattern[every a=AStart or every a=BStart] as mypattern end pattern[every AEnd or every BEnd]
context CtxNestedTrainEnter select context.InitCtx.te.trainId, context.HashCtx.id, tagId, count(*) from PassengerScanEvent group by tagId
context NineToFiveSegmented select context.NineToFive.startTime, context.SegmentedByCustomer.key1 from BankTxn
This example selects the nested context name and context partition id:
context NineToFiveSegmented select context.name, context.id from BankTxn
You may use output rate limiting to trigger output when a context partition ends, as further described in Section 5.7, “Stabilizing and Controlling Output: the Output Clause”.
Consider the fixed temporal context: A new context partition gets allocated at the designated start time and the current context partition ends at the designated end time. To trigger output when the context partition ends and before it gets removed, read on.
The same is true for the initiated temporal context: That context starts a new context partition when trigger events arrive or when a pattern matches. Each context partition expires (ends, terminates) after the specified time period passed. To trigger output at the time the context partition expires, read on.
You may use the when terminated
syntax with output rate limiting to trigger output when a context partition ends. The following example demonstrates the idea by declaring an initiated temporal context.
The next statement creates a context CtxEachMinute
that initiates a new context partition every 1 minute, and that expires each context partition after 5 minutes:
create context CtxEachMinute initiated by pattern [every timer:interval(1 min)] terminated after 5 minutes
The following statement computes an ongoing average temperature however only outputs the last value of the average temperature after 5 minutes when a context partition ends:
context CtxEachMinute select context.id, avg(temp) from SensorEvent output snapshot when terminated
The when terminated
syntax can be combined with other output rates.
The next example outputs every 1 minute and also when the context partition ends:
context CtxEachMinute select context.id, avg(temp) from SensorEvent output snapshot every 1 minute and when terminated
In the case that the end/terminating condition of the context partition is an event or pattern, the context properties contain the information of the tagged events in the pattern or the single event that ended/terminated the context partition.
For example, consider the following context wherein the engine initializes a new context partition for each arriving MyStartEvent
event and that terminates a context partition when a matching MyEndEvent
arrives:
create context CtxSample initiated by MyStartEvent as startevent terminated by MyEndEvent(id = startevent.id) as endevent
The following statement outputs the id property of the initiating and terminating event and only outputs when a context partition ends:
context CtxSample select context.startevent.id, context.endevent.id, count(*) from MyEvent output snapshot when terminated
You may in addition specify a termination expression that the engine evaluates when a context partition terminates. Only when the terminaton expression evaluates to true does output occur. The expression may refer to built-in properties as described in Section 5.7.1.1, “Controlling Output Using an Expression”. The syntax is as follows:
...output when terminated and termination_expression
The next example statement outputs when a context partition ends but only if at least two events are available for output:
context CtxEachMinute select * from SensorEvent output when terminated and count_insert >= 2
The final example EPL outputs when a context partition ends and sets the variable myvar
to a new value:
context CtxEachMinute select * from SensorEvent output when terminated then set myvar=3
For example, consider the 9 am to 5 pm fixed temoral context as shown earlier:
create context NineToFive start (0, 9, *, *, *) end (0, 17, *, *, *)
You may create a named window that only exists between 9 am and 5 pm:
context NineToFive create window SpeedingEvents1Hour#time(30 min) as TrafficEvent
You can insert into the named window:
insert into SpeedingEvents1Hour select * from TrafficEvent(speed > 100)
Any on-merge, on-select, on-update and on-delete statements must however declare the same context.
// You must declare the same context for on-trigger statements on TruncateEvent delete from SpeedingEvents1Hour
context NineToFive on TruncateEvent delete from SpeedingEvents1Hour
create schema ScoreCycle (userId string, keyword string, productId string, score long)
create context HashByUserCtx as coalesce by consistent_hash_crc32(userId) from ScoreCycle granularity 64
context HashByUserCtx create window ScoreCycleWindow#unique(productId, keyword) as ScoreCycle
For example, consider the 9 am to 5 pm fixed temoral context as shown earlier:
create context NineToFive start (0, 9, *, *, *) end (0, 17, *, *, *)
You may create a table that only exists between 9 am and 5 pm:
context NineToFive create table AverageSpeedTable ( carId string primary key, avgSpeed avg(double))
You can aggregate-into the table only if the aggregating statement declares the same context:
// declare the same context as for the table context NineToFive into table AverageSpeedTable select avg(speed) as avgSpeed from TrafficEvent group by carId
For example, this EPL truncates the AverageSpeedTable:
context NineToFive on TruncateEvent delete from AverageSpeedTable
create variable integer var_global_threshold = 100
create context ParkingLotContext initiated by CarArrivalEvent as cae terminated by CarDepartureEvent(lot = cae.lot)
context ParkingLotContext create variable integer var_parkinglot_threshold = 100
For more information on variables, please refer to Section 5.17, “Variables and Constants”.
Context variables can only be used in statements that associated to the same context.
Selecting specific context partitions and interrogating context partitions is useful for:
Iterating a specific context partition or a specific set of context partitions. Iterating a statement is described in Section 16.3.5, “Using Iterators”.
Executing an on-demand (fire-and-forget) query against specific context partition(s). On-demand queries are described in Section 16.5, “On-Demand Fire-And-Forget Query Execution”.
Esper provides APIs to identify, filter and select context partitions for statement iteration and on-demand queries. The APIs are described in detail at Section 16.18, “Context Partition Selection”.
For statement iteration, your application can provide context selector objects to the iterate
and safeIterate
methods on EPStatement
. If your code does not provide context selectors
the iteration considers all context partitions. At the time of iteration, the engine obtains the current set of context partitions and iterates each independently. If your statement has an order-by clause, the order-by clause orders within the context partition and does not order across context partitions.
For on-demand queries, your application can provide context selector objects to the executeQuery
method on EPRuntime
and to the execute
method on EPOnDemandPreparedQuery
. If your code does not provide context selectors the on-demand query considers all context partitions. At the time of on-demand query execution, the engine obtains the current set of context partitions and queries each independently. If the on-demand query has an order-by clause, the order-by clause orders within the context partition and does not order across context partitions.