www.espertech.comDocumentation

Chapter 14. EPL Reference: Data Windows

14.1. A Note on Data Window Name and Parameters
14.2. A Note on Batch Windows
14.3. Data Windows
14.3.1. Length Window (length or win:length)
14.3.2. Length Batch Window (length_batch or win:length_batch)
14.3.3. Time Window (time or win:time)
14.3.4. Externally-timed Window (ext_timed or win:ext_timed)
14.3.5. Time batch Window (time_batch or win:time_batch)
14.3.6. Externally-timed Batch Window (ext_timed_batch or win:ext_timed_batch)
14.3.7. Time-Length Combination Batch Window (time_length_batch or win:time_length_batch)
14.3.8. Time-Accumulating Window (time_accum or win:time_accum)
14.3.9. Keep-All Window (keepall or win:keepall)
14.3.10. First Length Window(firstlength or win:firstlength)
14.3.11. First Time Window (firsttime or win:firsttime)
14.3.12. Expiry Expression Window (expr or win:expr)
14.3.13. Expiry Expression Batch Window (expr_batch or win:expr_batch)
14.3.14. Unique Window (unique or std:unique)
14.3.15. Grouped Data Window (groupwin or std:groupwin)
14.3.16. Last Event Window (std:lastevent)
14.3.17. First Event Window (firstevent or std:firstevent)
14.3.18. First Unique Window (firstunique or std:firstunique)
14.3.19. Sorted Window (sort or ext:sort)
14.3.20. Ranked Window (rank or ext:rank)
14.3.21. Time-Order Window (time_order or ext:time_order)
14.3.22. Time-To-Live Window (timetolive or ext:timetolive)
14.4. Special Derived-Value Windows
14.4.1. Size Derived-Value Window (size) or std:size)
14.4.2. Univariate Statistics Derived-Value Window (uni or stat:uni)
14.4.3. Regression Derived-Value Window (linest or stat:linest)
14.4.4. Correlation Derived-Value Window (correl or stat:correl)
14.4.5. Weighted Average Derived-Value Window (weighted_avg or stat:weighted_avg)

This chapter outlines the data windows. The section on Chapter 2, Basic Concepts provides additional information on the relationship of filtering, windows and aggregation. Please also see Section 5.4.3, “Specifying Data Windows” for the use of windows in the from clause with streams, patterns and named windows.

Data windows retain incoming events until an expiry policy indicates to release events. Thus data windows are a means of indicating what subset of events to analyze.

Two or more data windows can be combined. This allows a sets of events retained by one data window to be placed into a union or an intersection with the set of events retained by one or more other data windows. Please see Section 5.4.4, “Multiple Data Windows” for more detail.

The keep-all data window counts as a data window but has no expiry policy: it retains all events received. The grouped-window declaration allocates a new data window per grouping criteria and thereby counts as a data window, but cannot appear alone.

The next table summarizes data windows:

Table 14.1. Built-in Data Windows

Data WindowSyntaxDescription
Length Windowlength(size)Sliding length window extending the specified number of elements into the past.
Length Batch Windowlength_batch(size)Tumbling window that batches events and releases them when a given minimum number of events has been collected.
Time Windowtime(time period)Sliding time window extending the specified time interval into the past.
Externally-timed Windowext_timed(timestamp expression, time period)Sliding time window, based on the long-type time value supplied by an expression.
Time Batch Windowtime_batch(time period[,optional reference point] [, flow control])Tumbling window that batches events and releases them every specified time interval, with flow control options.
Externally-timed Batch Windowext_timed_batch(timestamp expression, time period[,optional reference point])Tumbling window that batches events and releases them every specified time interval based on the long-type value supplied by an expression.
Time-Length Combination Batch Windowtime_length_batch(time period, size [, flow control])Tumbling multi-policy time and length batch window with flow control options.
Time-Accumulating Windowtime_accum(time period)Sliding time window accumulates events until no more events arrive within a given time interval.
Keep-All WindowkeepallThe keep-all data window simply retains all events.
Sorted Windowsort(size, sort criteria)Sorts by values returned by sort criteria expressions and keeps only the top events up to the given size.
Ranked Windowrank(unique criteria(s), size, sort criteria(s))Retains only the most recent among events having the same value for the criteria expression(s) sorted by sort criteria expressions and keeps only the top events up to the given size.
Time-Order Windowtime_order(timestamp expression, time period)Orders events that arrive out-of-order, using an expression providing timestamps to be ordered.
Time-To-Live Windowtimetolive(timestamp expression)Retains events until the time returned by the timestamp expression.
Unique Windowunique(unique criteria(s))Retains only the most recent among events having the same value for the criteria expression(s). Acts as a length window of size 1 for each distinct expression value.
Grouped Data Windowgroupwin(grouping criteria(s))Groups events into sub-data-windows by the value of the specified expression(s), generally used to provide a separate data window per group.
Last Event WindowlasteventRetains the last event, acts as a length window of size 1.
First Event WindowfirsteventRetains the very first arriving event, disregarding all subsequent events.
First Unique Windowfirstunique(unique criteria(s))Retains only the very first among events having the same value for the criteria expression(s), disregarding all subsequent events for same value(s).
First Length Windowfirstlength(size)Retains the first size events, disregarding all subsequent events.
First Time Windowfirsttime(time period)Retains the events arriving until the time interval has passed, disregarding all subsequent events.
Expiry Expression Windowexpr(expiry expression)Expire events based on the result of an expiry expression passed as a parameter.
Expiry Expression Batch Windowexpr_batch(expiry expression)Tumbling window that batches events and releases them based on the result of an expiry expression passed as a parameter.

There is a special kind of data window that is used less frequently, and is called a derived-value window. They are windows that derive a new value from event streams and post the result as events of a new type. The table below summarizes these special derived-value windows.

Table 14.2. Built-in Derived-Value Data Windows

Data WindowSyntaxDescription
Sizesize([expression, ...])Derives a count of the number of events in a data window, or in an insert stream if used without a data window, and optionally provides additional event properties as listed in parameters.
Univariate statisticsuni(value expression [,expression, ...])Calculates univariate statistics on the values returned by the expression.
Regressionlinest(value expression, value expression [,expression, ...])Calculates regression on the values returned by two expressions.
Correlationcorrel(value expression, value expression [,expression, ...])Calculates the correlation value on the values returned by two expressions.
Weighted averageweighted_avg(value expression, value expression [,expression, ...])Calculates weighted average given a weight expression and an expression to compute the average for.

The syntax for data windows starts with data window name and is followed by optional parameter expressions in parenthesis:

name(window_parameters)

This example specifies a time window of 5 seconds:

select * from StockTickEvent#time(5 sec)

EPL organizes built-in data windows in namespaces and names. Windows that provide sliding or tumbling data windows are in the win namespace. Other most commonly used windows are in the std namespace. The ext namespace are window that order events. The stat namespace is used for windows that derive statistical data.

Alternatively you may specify the namespace name and : colon character.

namespace:name(window_parameters)

The below examples all specify a time window of 5 seconds:

select * from StockTickEvent#time(5 sec)
select * from StockTickEvent#win:time(5 sec)
select * from StockTickEvent.win:time(5 sec)

All expressions are allowed as parameters to data windows, including expressions that contain variables or substitution parameters for prepared statements. Subqueries, the special prior and prev functions and aggregations (with the exception of the expression window and expression batch window) are not allowed as data window parameters.

For example, assuming a variable by name VAR_WINDOW_SIZE is defined:

select * from StockTickEvent#time(VAR_WINDOW_SIZE)

The system evaluates expression parameters for data windows at the time of context partition instantiation with the exception of the expression window (expr) and expression batch window (expr_batch).

Also consider multiple data windows in intersection or union (keywords retain-intersection and retain-union). Consider writing a custom plug-in data window if your application requires behavior that is not yet provided by any of the built-in windows.

If a window takes no parameters you may leave parenthesis off or the use empty parenthesis ().

The below examples all specify a keep-all window:

select * from StockTickEvent#keepall
select * from StockTickEvent#keepall()
select * from StockTickEvent.win:keepall()
select * from StockTickEvent.win:keepall

Expression parameters can reference context-provided properties. For example:

create schema ParameterEvent(windowSize int)
create context MyContext initiated by ParameterEvent as params terminated after 1 year
context MyContext select * from StockTickEvent#length(context.params.windowSize)

Batch windows buffer events until a certain threshold is reached and then release the batched events for processing. The released events become the insert stream events and the previous batch of events constitutes the remove stream events. Batch windows thus retain the current and the last batch of events in memory.

It is often desirable to aggregate without retaining events in memory, or with just keeping the current events in memory (and not also the last batch of events). You can declare a context and define what starts and ends a "batch" instead. Contexts provide a large degree of freedom in allowing batches to overlap, in allowing batches to span multiple statements and in allowing batches to have complex start and end conditions. They are further described in Chapter 4, Context and Context Partitions.

This example declares a non-overlapping context that spans a time interval of 3 seconds (i.e. a batch of 3 seconds):

create context IntervalSpanning3Seconds start @now end after 3 sec

The next example EPL aggregates events without retaining events in memory and outputs at the end of each interval:

context IntervalSpanning3Seconds select count(*) from Events output snapshot when terminated

Here is an example that outputs all events when at least 10 events, in the 3-second interval, have collected:

context IntervalSpanning3Seconds select window(*) from Events#keepall having count(*) >= 10

For the examples above, at the end of each 3-second interval, the runtime discards all data windows and aggregation state. If your application would like 3-second intervals keyed by some fields please consider a nested context declaration with a keyed segmented context, for example:

create context PerSymbolInterval3Sec 
  context ById partition by symbol from StockTick, 
  context Interval3Sec start @now end after 3 sec

Batch windows keep not only the current batch in memory but also the previous batch of events. For example, let's say at time 0 an event arrives and enters the batch window. At time 3 seconds (3-second batch window) the event becomes an insert-stream event and the runtime now updates aggregations for that batch (i.e. count goes up to 1). At time 6 seconds the event becomes a remove-stream event and the runtime now updates aggregations for that batch (i.e. count goes down to 0). Since the runtime continually updates aggregations from insert and remove stream events, and does not re-compute aggregations, batch windows follow the same paradigm.

This window is a moving (sliding) time window extending the specified time interval into the past based on the system time. Provide a time period (see Section 5.2.1, “Specifying Time Periods”) or an expression defining the number of seconds as a parameter:

time(time period)
time(seconds_interval_expression)

For the GE stock tick events in the last 1 second, calculate a sum of price.

select sum(price) from StockTickEvent(symbol='GE')#time(1 sec)

The following time windows are equivalent specifications:

time(2 minutes 5 seconds)
time(125 sec)
time(125)
time(MYINTERVAL)  // MYINTERVAL defined as a variable

Similar to the time window, this window is a moving (sliding) time window extending the specified time interval into the past, but based on the long-type time value supplied by a timestamp expression. The window takes two parameters: the expression to return long-typed timestamp values, and a time period or expression that provides a number of seconds:

ext_timed(timestamp_expression, time_period)
ext_timed(timestamp_expression, seconds_interval_expression)

The key difference comparing the externally-timed window to the regular time window is that the window slides not based on the runtime time, but strictly based on the result of the timestamp expression when evaluated against the events entering the window.

The algorithm underlying the window compares the timestamp value returned by the expression when the oldest event arrived with the timestamp value returned by the expression for the newest arriving event on event arrival. If the time interval between the timestamp values is larger then the timer period parameter, then the algorithm removes all oldest events tail-first until the difference between the oldest and newest event is within the time interval. The window therefore slides only when events arrive and only considers each event's timestamp property (or other expression value returned) and not runtime time.

This window holds stock tick events of the last 10 seconds based on the timestamp property in StockTickEvent.

select * from StockTickEvent#ext_timed(timestamp, 10 seconds)

The externally-timed data window expects strict ordering of the timestamp values returned by the timestamp expression. The window is not useful for ordering events in time order, please use the time-order window instead.

On a related subject, runtime time itself can be entirely under control of the application as described in Section 16.9, “Controlling Time-Keeping”, allowing control over all time-based aspects of processing in one place.

This window buffers events (tumbling window) and releases them every specified time interval in one update. The window takes a time period or an expression providing a number of seconds as a parameter, plus optional parameters described next.

time_batch(time_period [,optional_reference_point] [,flow_control])
time_batch(seconds_interval_expression [,optional_reference_point] [,flow_control])

The time batch window takes a second, optional parameter that serves as a reference point to batch flush times. If not specified, the arrival of the first event into the batch window sets the reference point. Therefore if the reference point is not specified and the first event arrives at time t1, then the batch flushes at time t1 plus time_period and every time_period thereafter.

Note

Please see Section 14.2, “A Note on Batch Windows” for information on what a batch window is and how to best to compute over intervals.

Note that using this window means that the runtime keeps events in memory until the time is up: Consider your event arrival rate and determine if this is the behavior you want. Use context declaration or output rate limiting such as output snapshot as an alternative.

The below example batches events into a 5 second window releasing new batches every 5 seconds. Listeners to updates posted by this window receive updated information only every 5 seconds.

select * from StockTickEvent#time_batch(5 sec)

By default, if there are no events arriving in the current interval (insert stream), and no events remain from the prior batch (remove stream), then the window does not post results to listeners. The window allows overriding this default behavior via flow control keywords.

The synopsis with flow control parameters is:

time_batch(time_period or seconds_interval_expr [,optional_reference_point] 
    [, "flow-control-keyword [, keyword...]"] )

The FORCE_UPDATE flow control keyword instructs the window to post an empty result set to listeners if there is no data to post for an interval. When using this keyword the irstream keyword should be used in the select clause to ensure the remove stream is also output. Note that FORCE_UPDATE is for use with listeners to the same statement and not for use with named windows. Consider output rate limiting instead.

The START_EAGER flow control keyword instructs the window to post empty result sets even before the first event arrives, starting a time interval at statement deployment time. As when using FORCE_UPDATE, the window also posts an empty result set to listeners if there is no data to post for an interval, however it starts doing so at time of statement deployment rather then at the time of arrival of the first event.

Taking the two flow control keywords in one sample statement, this example presents a window that waits for 10 seconds. It posts empty result sets after one interval after the statement gets deployed and keeps posting an empty result set as no events arrive during intervals:

select * from MyEvent#time_batch(10 sec, "FORCE_UPDATE, START_EAGER")

The optional reference point is provided as a long-value of milliseconds (or microseconds for microsecond runtime time unit) relative to January 1, 1970 and time 00:00:00.

The following example statement sets the reference point to 5 seconds and the batch size to 1 hour, so that each batch output is 5 seconds after each hour:

select * from OrderSummaryEvent#time_batch(1 hour, 5000L)

Similar to the time batch window, this window buffers events (tumbling) and releases them every specified time interval in one update, but based on the long-type time value supplied by a timestamp expression. The window has two required parameters taking an expression that returns long-typed timestamp values and a time period or constant-value expression that provides a number of seconds:

ext_timed_batch(timestamp_expression, time_period [,optional_reference_point])
ext_timed_batch(timestamp_expression, seconds_interval_expression [,optional_reference_point])

The externally-timed batch window takes a third, optional parameter that serves as a reference point to batch flush times. If not specified, the arrival of the first event into the batch window sets the reference point. Therefore if the reference point is not specified and the first event arrives at time t1, then the batch flushes at time t1 plus time_period and every time_period thereafter.

The key difference comparing the externally-timed batch window to the regular time batch window is that the window tumbles not based on the runtime time, but strictly based on the result of the timestamp expression when evaluated against the events entering the window.

The algorithm underlying the window compares the timestamp value returned by the expression when the oldest event arrived with the timestamp value returned by the expression for the newest arriving event on event arrival. If the time interval between the timestamp values is larger then the timer period parameter, then the algorithm posts the current batch of events. The window therefore posts batches only when events arrive and only considers each event's timestamp property (or other expression value returned) and not runtime time.

Note that using this window means that the runtime keeps events in memory until the time is up: Consider your event arrival rate and determine if this is the behavior you want. Use context declaration or output rate limiting such as output snapshot as an alternative.

The below example batches events into a 5 second window releasing new batches every 5 seconds. Listeners to updates posted by this window receive updated information only when event arrive with timestamps that indicate the start of a new batch:

select * from StockTickEvent#ext_timed_batch(timestamp, 5 sec)

The optional reference point is provided as a long-value of milliseconds (or microseconds) relative to January 1, 1970 and time 00:00:00.

The following example statement sets the reference point to 5 seconds and the batch size to 1 hour, so that each batch output is 5 seconds after each hour:

select * from OrderSummaryEvent#ext_timed_batch(timestamp, 1 hour, 5000L)

The externally-timed data window expects strict ordering of the timestamp values returned by the timestamp expression. The window is not useful for ordering events in time order, please use the time-order window instead.

On a related subject, runtime time itself can be entirely under control of the application as described in Section 16.9, “Controlling Time-Keeping”, allowing control over all time-based aspects of processing in one place.

This data window is a combination of time and length batch (tumbling) windows. Similar to the time and length batch windows, this batches events and releases the batched events when either one of the following conditions occurs, whichever occurs first: the data window has collected a given number of events, or a given time interval has passed.

The parameters take 2 forms. The first form accepts a time period or an expression providing a number of seconds, and an expression for the number of events:

time_length_batch(time_period, number_of_events_expression)
time_length_batch(seconds_interval_expression, number_of_events_expression)

The next example shows a time-length combination batch window that batches up to 100 events or all events arriving within a 1-second time interval, whichever condition occurs first:

 select * from MyEvent#time_length_batch(1 sec, 100)

In this example, if 100 events arrive into the window before a 1-second time interval passes, the window posts the batch of 100 events. If less then 100 events arrive within a 1-second interval, the window posts all events that arrived within the 1-second interval at the end of the interval.

By default, if there are no events arriving in the current interval (insert stream), and no events remain from the prior batch (remove stream), then the window does not post results to listeners. This window allows overriding this default behavior via flow control keywords.

The synopsis of the window with flow control parameters is:

time_length_batch(time_period or seconds_interval_expression, number_of_events_expression, 
    "flow control keyword [, keyword...]")

The FORCE_UPDATE flow control keyword instructs the window to post an empty result set to listeners if there is no data to post for an interval. The window begins posting no later then after one time interval passed after the first event arrives. When using this keyword the irstream keyword should be used in the select clause to ensure the remove stream is also output.

The START_EAGER flow control keyword instructs the window to post empty result sets even before the first event arrives, starting a time interval at statement deployment time. As when using FORCE_UPDATE, the window also posts an empty result set to listeners if there is no data to post for an interval, however it starts doing so at time of statement deployment rather then at the time of arrival of the first event.

Taking the two flow control keywords in one sample statement, this example presents a window that waits for 10 seconds or reacts when the 5th event arrives, whichever comes first. It posts empty result sets after one interval after the statement gets deployed and keeps posting an empty result set as no events arrive during intervals:

 select * from MyEvent#time_length_batch(10 sec, 5, "FORCE_UPDATE, START_EAGER")

The expr data window applies an expiry expression and removes events from the data window when the expression returns false.

Use this window to implement rolling and dynamically shrinking or expanding time, length or other windows. Rolling can, for example, be controlled based on event properties of arriving events, based on aggregation values or based on the return result of user-defined functions. Use this window to accumulate events until a value changes or other condition occurs based on arriving events or change of a variable value.

The synopsis is:

expr(expiry_expression)

The expiry expression can be any expression including expressions on event properties, variables, aggregation functions or user-defined functions. The window applies this expression to the oldest event(s) currently in the window, as described next.

When a new event arrives or when a variable value referenced by the expiry expression changes then the window applies the expiry expression starting from the oldest event in the data window. If the expiry expression returns false for the oldest event, the window removes the event from the data window. The window then applies the expression to the next oldest event. If the expiry expression returns true for the oldest event, no further evaluation takes place and the window indicates any new and expired events through insert and remove stream.

By using variables in the expiry expression it is possible to change the behavior of the window dynamically at runtime. When one or more variables used in the expression are updated the window evaluates the expiry expression starting from the oldest event.

Aggregation functions, if present in the expiry expression, are continuously updated as events enter and leave the data window. Use the grouped data window with this window to compute aggregations per group.

The runtime makes the following built-in properties available to the expiry expression:


This EPL declares an expiry expression that retains the last 2 events:

select * from MyEvent#expr(current_count <= 2)

The following example implements a dynamically-sized length window by means of a SIZE variable. As the SIZE variable value changes the window retains the number of events according to the current value of SIZE:

create variable int SIZE = 1000
select * from MyEvent#expr(current_count <= SIZE)

The next EPL retains the last 2 seconds of events:

select * from MyEvent#expr(oldest_timestamp > newest_timestamp - 2000)

The following example implements a dynamically-sized time window. As the SIZE long-type variable value changes the window retains a time interval accordingly:

create variable long SIZE = 1000
select * from MyEvent#expr(newest_timestamp - oldest_timestamp < SIZE)

The following example declares a KEEP variable and flushes all events from the data window when the variable turns false:

create variable boolean KEEP = true
select * from MyEvent#expr(KEEP)

The next example specifies a rolling window that removes the oldest events from the window until the total price of all events in the window is less then 1000:

select * from MyEvent#expr(sum(price) < 1000)

This example retains all events that have the same value of the flag event property. When the flag value changes, the data window expires all events with the old flag value and retains only the most recent event of the new flag value:

select * from MyEvent#expr(newest_event.flag = oldest_event.flag)

The expr_batch buffers events (tumbling window) and releases them when a given expiry expression returns true.

Use this window to implement dynamic or custom batching behavior, such as for dynamically shrinking or growing time, length or other batches, for batching based on event properties of arriving events, aggregation values or for batching based on a user-defined function.

The synopsis is:

expr_batch(expiry_expression, [include_triggering_event])

The expiry expression can be any expression including expressions on event properties, variables, aggregation functions or user-defined functions. The window applies this expression to arriving event(s), as described next.

The optional second parameter include_triggering_event defines whether to include the event that triggers the batch in the current batch (true, the default) or in the next batch (false).

When a new event arrives or when a variable value referenced by the expiry expression changes or when events get removed from the data window then the window applies the expiry expression. If the expiry expression returns true the data window posts the collected events as the insert stream and the last batch of events as remove stream.

By using variables in the expiry expression it is possible to change the behavior of the window dynamically at runtime. When one or more variables used in the expression are updated the window evaluates the expiry expression as well.

Aggregation functions, if present in the expiry expression, are continuously updated as events enter the data window and reset when the runtime posts a batch of events. Use the grouped data window with this window to compute aggregations per group.

The compiler makes the following built-in properties available to the expiry expression:


This EPL declares an expiry expression that posts event batches consisting of 2 events:

select * from MyEvent#expr_batch(current_count >= 2)

The following example implements a dynamically-sized length batch window by means of a SIZE variable. As the SIZE variable value changes the window accumulates and posts the number of events according to the current value of SIZE:

create variable int SIZE = 1000
select * from MyEvent#expr_batch(current_count >= SIZE)

The following example accumulates events until an event arrives that has a value of postme for property myvalue:

select * from MyEvent#expr_batch(myvalue = 'postme')

The following example declares a POST variable and posts a batch of events when the variable turns true:

create variable boolean POST = false
select * from MyEvent#expr_batch(POST)

The next example specifies a tumbling window that posts a batch of events when the total price of all events in the window is greater then 1000:

select * from MyEvent#expr_batch(sum(price) > 1000)

Specify the second parameter as false when you want the triggering event not included in the current batch.

This example batches all events that have the same value of the flag event property. When the flag value changes, the data window releases the batch of events collected for the old flag value. The data window collects the most recent event and the future arriving events of the same new flag value:

select * from MyEvent#expr_batch(newest_event.flag != oldest_event.flag, false)

Specifying #groupwin groups events into sub-data-window by the value returned by the specified expression or the combination of values returned by a list of expressions. The #groupwin takes a single expression to supply the group criteria values, or a list of expressions as parameters, as the synopsis shows:

groupwin(grouping_expression [, grouping_expression ...])

The grouping_expression expression(s) return one or more group keys, by which it creates a separate data window for each distinct group key. Note that the expression should not return an unlimited number of values: the grouping expression should not return a time value or otherwise unlimited key.

An expression may return a null value. The runtime treats a null value as any other value. An expression can also return a custom application object, whereby the application class should implement the hashCode and equals methods.

You can specify a single groupwin per stream. Multiple groupwin declarations for the same stream are not allowed.

Use group by instead of the grouped data window to control how aggregations are grouped.

A grouped data window with a length window of 1 is equivalent to the unique data window unique. The unique data window is the preferred notation:

select * from StockTickEvent#unique(symbol)	// Prefer this
// ... equivalent to ...
select * from StockTickEvent#groupwin(symbol)#length(1)

This example computes the total price for the last 5 events considering the last 5 events per each symbol, aggregating the price across all symbols (since no group by clause is specified the aggregation is across all symbols):

select symbol, sum(price) from StockTickEvent#groupwin(symbol)#length(5)

The @Hint("reclaim_group_aged=age_in_seconds") hint instructs the runtime to discard grouped data window state that has not been updated for age_in_seconds seconds. The optional @Hint("reclaim_group_freq=sweep_frequency_in_seconds") can be specified in addition to control the frequency at which the runtime sweeps data window state. If the hint is not specified, the frequency defaults to the same value as age_in_seconds. Use the hints when your group criteria returns a changing or unlimited number of values. By default and without hints the data window does not reclaim or remove data windows for group criteria values.

The updated sample statement with both hints:

// Remove data window for symbols not updated for 10 seconds or more and sweep every 30 seconds
@Hint('reclaim_group_aged=10,reclaim_group_freq=30')
select symbol, sum(price) from StockTickEvent#groupwin(symbol)#length(5)

Reclaim executes when an event arrives and not in the timer thread. In the example above reclaim can occur up to 40 seconds of runtime time after the newest event arrives. Reclaim may affect iteration order for the statement and iteration order becomes indeterministic with reclaim.

To compute the total price for the last 5 events considering the last 5 events per each symbol and outputting a price per symbol, add the group by clause:

select symbol, sum(price) from StockTickEvent#groupwin(symbol)#length(5) group by symbol

The groupwin grouped-window can also take multiple expressions that provide values to group by. This example computes the total price for each symbol and feed for the last 10 events per symbol and feed combination:

select sum(price) from StockTickEvent#groupwin(symbol, feed)#length(10)

The order in which the groupwin grouped-window appears controls the data the runtime derives from events for each group. The next 2 statements demonstrate this using a length window.

Without the groupwin declaration the same statement returns the total price per symbol for only the last 10 events across all symbols. Here the runtime allocates only one length window for all events:

select sum(price) from StockTickEvent#length(10)

We have learned that by placing the groupwin grouped-window before other data windows, these other data windows become part of the grouped set of windows. The runtime dynamically allocates a new window instance for each, every time it encounters a new group key such as a new value for symbol. Therefore, in groupwin(symbol)#length(10) the runtime allocates a new length window for each distinct symbol. However in length(10) alone the runtime maintains a single length window.

The groupwin can be used with multiple data windows to achieve a grouped intersection or union policy.

The next statement retains the last 4 events per symbol and only those events that are also not older then 10 seconds:

select * from StockTickEvent#groupwin(symbol)#length(4)#time(10)

Last, considers a grouped data window for two group criteria. Here, the statement results are total price per symbol and feed for the last 100 events per symbol and feed.

select sum(price) from StockTickEvent#groupwin(symbol, feed)#length(100)

Note

A note on grouped time windows: When using grouped-window with time windows, note that whether the runtime retains 5 minutes of events or retains 5 minutes of events per group, the result is the same from the perspective of retaining events as both policies retain, considering all groups, the same set of events. Therefore please specify the time window alone (ungrouped).

For example:

// Use this:
select sum(price) from StockTickEvent#time(1 minute)

// is equivalent to (don't use this):
// select sum(price) from StockTickEvent#groupwin(symbol)#time(1 minute)

// Use the group-by clause for grouping aggregation by symbol.

For advanced users: There is an optional declaration that can control how the groupwin grouped-window gets evaluated and that is #merge. The merge can only occur after a groupwin grouped-window. It controls the end of the grouped declaration.

Compare the following statements:

select * from Market#groupwin(ticker)#length(1000000)
    #weighted_avg(price, volume)#merge(ticker)
// ... and ...
select * from Market#groupwin(ticker)#length(1000000)#merge(ticker)
    #weighted_avg(price, volume)

If your statement does not specify the optional #merge, the semantics are the same as the first statement.

The first statement, in which the #mergeis added to the end (same as no merge), computes weighted average per ticker, considering, per-ticker, the last 1M Market events for each ticker. The second statement, in which the merge is added to the middle, computes weighted average considering, per-ticker, the last 1M Market events, computing the weighted average for all such events using a single data window rather then multiple data window instances with one window per ticker.

This window sorts by values returned by the specified expression or list of expressions and keeps only the top (or bottom) events up to the given size.

This window retains all events in the stream that fall into the sort range. Use the ranked window as described next to retain events per unique key(s) and sorted.

The syntax is as follows:

sort(size_expression, 
    sort_criteria_expression [asc/desc][, sort_criteria_expression [asc/desc]...]) 

An expression may be followed by the optional asc or desc keywords to indicate that the values returned by that expression are sorted in ascending or descending sort order.

The window below retains only those events that have the highest 10 prices considering all events (and not only the last event per symbol, see rank below) and reports a total price:

select sum(price) from StockTickEvent#sort(10, price desc)

The following example sorts events first by price in descending order, and then by symbol name in ascending (alphabetical) order, keeping only the 10 events with the highest price (with ties resolved by alphabetical order of symbol).

select * from StockTickEvent#sort(10, price desc, symbol asc)

The sorted window is often used with the prev, prevwindow or prevtail single-row functions to output properties of events at a certain position or to output the complete data window according to sort order.

Use the grouped window to retain a separate sort window for each group. For example, the windows groupwin(market)#sort(10, price desc) instruct the runtime to retain, per market, the highest 10 prices.

This window retains only the most recent among events having the same value for the criteria expression(s), sorted by sort criteria expressions and keeps only the top events up to the given size.

This window is similar to the sorted window in that it keeps only the top (or bottom) events up to the given size, however the window also retains only the most recent among events having the same value(s) for the specified uniqueness expression(s).

The syntax is as follows:

rank(unique_expression [, unique_expression ...],
    size_expression, 
    sort_criteria_expression [asc/desc][, sort_criteria_expression [asc/desc]...]) 

Specify the expressions returning unique key values first. Then specify a constant value that is the size of the ranked window. Then specify the expressions returning sort criteria values. The sort criteria expressions may be followed by the optional asc or desc keywords to indicate that the values returned by that expression are sorted in ascending or descending sort order.

The window below retains only those events that have the highest 10 prices considering only the last event per symbol and reports a total price:

select sum(price) from StockTickEvent#rank(symbol, 10, price desc)

The following example retains, for the last event per market and symbol, those events that sort by price and quantity ascending into the first 10 ranks:

select * from StockTickEvent#rank(market, symbol, 10, price, quantity)

The ranked window is often used with the prev, prevwindow or prevtail single-row functions to output properties of events at a certain position or to output the complete data window according to sort order.

This example outputs every 5 seconds the top 10 events according to price descending and considering only the last event per symbol:

select prevwindow(*) from StockTickEvent#rank(symbol, 10, price desc)
  output snapshot every 5 seconds limit 1  // need only 1 row

Use the grouped window to retain a separate rank for each group. For example, the windows groupwin(market)#rank(symbol, 10, price desc) instruct the runtime to retain, per market, the highest 10 prices considering the last event per symbol.

This window orders events that arrive out-of-order, using timestamp-values provided by an expression, and by comparing that timestamp value to runtime time.

The syntax for this window is as follows.

time_order(timestamp_expression, time_period)
time_order(timestamp_expression, seconds_interval_expression)

The first parameter to the window is the expression that supplies timestamp values. The timestamp is expected to be a long-typed value that denotes an event's time of consideration by the window (or other expression). This is typically the time of arrival. The second parameter is a number-of-seconds expression or the time period specifying the time interval that an arriving event should maximally be held, in order to consider older events arriving at a later time.

Since the window compares timestamp values to runtime time, the window requires that the timestamp values and runtime time are both following the same clock. Therefore, to the extend that the clocks that originated both timestamps differ, the window may produce inaccurate results.

As an example, the next statement uses the arrival_time property of MyTimestampedEvent events to order and release events by arrival time:

insert rstream into ArrivalTimeOrderedStream
select rstream * from MyTimestampedEvent#time_order(arrival_time, 10 sec)

In the example above, the arrival_time property holds a long-typed timestamp value. On arrival of an event, the runtime compares the timestamp value of each event to the tail-time of the window. The tail-time of the window is, in this example, 10 seconds before runtime time (continuously sliding). If the timestamp value indicates that the event is older then the tail-time of the time window, the event is released immediately in the remove stream. If the timestamp value indicates that the event is newer then the tail-time of the window, the window retains the event until runtime time moves such that the event timestamp is older then tail-time.

The examples thus holds each arriving event in memory anywhere from zero seconds to 10 seconds, to allow for older events (considering arrival time timestamp) to arrive. In other words, the window holds an event with an arrival time equal to runtime time for 10 seconds. The window holds an event with an arrival time that is 2 seconds older then runtime time for 8 seconds. The window holds an event with an arrival time that is 10 or more seconds older then runtime time for zero seconds, and releases such (old) events immediately into the remove stream.

The insert stream of this sliding window consists of all arriving events. The remove stream of the window is ordered by timestamp value: The event that has the oldest timestamp value is released first, followed by the next newer events. Note the statement above uses the rstream keyword in both the insert into clause and the select clause to select ordered events only. It uses the insert into clause to makes such ordered stream available for subsequent statements to use.

It is up to your application to populate the timestamp property into your events or use a sensible expression that returns timestamp values for consideration by the window. The window also works well if you use externally-provided time via timer events.

This window retains events until runtime time reaches the value returned by the given timestamp expression.

The syntax for this window is as follows:

timetolive(timestamp_expression)

The only parameter to the window is the expression that supplies timestamp values. The timestamp is expected to be a long-typed value that denotes an event's time-to-live.

Since the window compares timestamp values to runtime time, the window requires that the timestamp values and runtime time are both following the same clock.

On arrival of an event, the runtime evaluates the timestamp expression and obtains a long-type timestamp. The runtime compares that timestamp value to runtime time:

  • If the timestamp is older than runtime time or the same as runtime time, the runtime releases the event immediately into the remove stream and does not retain the event at all.

  • If the timestamp value is newer than the runtime time, the data window retains the event until runtime time moves forward such that the timestamp is the same or older than runtime time.

As an example, the next statement uses the arrival_time property of MyTimestampedEvent events to release events by arrival time:

insert rstream into ArrivalTimeOrderedStream
select rstream * from MyTimestampedEvent#timetolive(arrival_time)

For example, assume runtime time is 8:00:00 (8 am).

  • If the arrival_time timestamp is 8:00:00 or older (such as 7:59:00), the data window does not retain the event at all, i.e. the runtime releases the event into the remove stream upon arrival.

  • If the arrival_time timestamp is after 8:00:00 the data window retains the event. Let's say the arrival_time timestamp is 8:02:00 the runtime retains the event until runtime time is 8:02:00 or newer.

The runtime evaluates the expression only once at the arrival of each event to determine that event's time-to-live.

The insert stream of this sliding window consists of all arriving events. The remove stream of the window is ordered by timestamp value: The event that has the oldest timestamp value is released first, followed by the next newer events. Note the statement above uses the rstream keyword in both the insert into clause and the select clause to select ordered events only. It uses the insert into clause to makes such ordered stream available for subsequent statements to use.

It is up to your application to populate the timestamp property into your events or use a sensible expression that returns timestamp values for consideration by the window. The window also works well if you use externally-provided time via timer events and if you have runtime time track watermarks.

The time-to-live data window is fully equivalent to the time-order data window with a zero value for the time period.

The derived-value windows can be used combined with data windows or alone. Very similar to aggregation functions, these windows aggregate or derive information from an event stream. As compared to aggregation functions, statistics windows can post multiple derived fields all-in-one including properties from the last event that was received. The derived fields and event properties are available for querying in the where-clause and are often compared to prior values using the prior function. Derived-value window do not retain events.

This window posts the number of events received from a stream or window plus any additional event properties or expression values listed as parameters. The synopsis is:

size([expression, ...] [ * ])

The window posts a single long-typed property named size. The window posts the prior size as old data, and the current size as new data to update listeners of the window. Via the iterator method of the statement the size value can also be polled (read). The window only posts output events when the size count changes and does not stay the same.

As optional parameters the window takes a list of expressions that the window evaluates against the last arriving event and provides along the size field. You may also provide the * wildcard selector to have the window output all event properties.

An alternative to receiving a data window event count is the prevcount function. Compared to the size window the prevcount function requires a data window while the size window does not. The related count(...) aggregation function provides a count per group when used with group by.

When combined with a data window, the size window reports the current number of events in the data window in the insert stream and the prior number of events in the data window as the remove stream. This example reports the number of tick events within the last 1 minute:

select size from StockTickEvent#time(1 min)#size

To select additional event properties you may add each event property to output as a parameter to the window.

The next example selects the symbol and feed event properties in addition to the size property:

select size, symbol, feed from StockTickEvent#time(1 min)#size(symbol, feed)

This example selects all event properties in addition to the size property:

select * from StockTickEvent#time(1 min)#size(*)

The size window is also useful in conjunction with a groupwin grouped-window to count the number of events per group. The EPL below returns the number of events per symbol.

select size from StockTickEvent#groupwin(symbol)#size

When used without a data window, the window simply counts the number of events:

select size from StockTickEvent#size

All windows can be used with pattern statements as well. The next EPL snippet shows a pattern that looks for tick events followed by trade events for the same symbol. The size window counts the number of occurrences of the pattern.

select size from pattern[every s=StockTickEvent -> TradeEvent(symbol=s.symbol)]#size

This window calculates univariate statistics on a numeric expression. The window takes a single value expression as a parameter plus any number of optional additional expressions to return properties of the last event. The value expression must return a numeric value:

uni(value_expression [,expression, ...] [ * ])

After the value expression you may optionally list additional expressions or event properties to evaluate for the stream and return their value based on the last arriving event. You may also provide the * wildcard selector to have the window output all event properties.


The below example selects the standard deviation on price for stock tick events for the last 10 events.

select stddev from StockTickEvent#length(10)#uni(price)

To add properties from the event stream you may simply add all additional properties as parameters to the window.

This example selects all of the derived values, based on the price property, plus the values of the symbol and feed event properties:

select * from StockTickEvent#length(10)#uni(price, symbol, feed)

The following example selects all of the derived values plus all event properties:

select * from StockTickEvent#length(10)#uni(price, symbol, *)

This window calculates regression and related intermediate results on the values returned by two expressions. The window takes two value expressions as parameters plus any number of optional additional expressions to return properties of the last event. The value expressions must return a numeric value:

linest(value_expression, value_expression [,expression, ...] [ * ])

After the two value expressions you may optionally list additional expressions or event properties to evaluate for the stream and return their value based on the last arriving event. You may also provide the * wildcard selector to have the window output all event properties.


The next example calculates regression and returns the slope and y-intercept on price and offer for all events in the last 10 seconds.

select slope, YIntercept from StockTickEvent#time(10 seconds)#linest(price, offer)

To add properties from the event stream you may simply add all additional properties as parameters to the window.

This example selects all of the derived values, based on the price and offer properties, plus the values of the symbol and feed event properties:

select * from StockTickEvent#time(10 seconds)#linest(price, offer, symbol, feed)

The following example selects all of the derived values plus all event properties:

select * from StockTickEvent#time(10 seconds)#linest(price, offer, *)

This window returns the weighted average given an expression returning values to compute the average for and an expression returning weight. The window takes two value expressions as parameters plus any number of optional additional expressions to return properties of the last event. The value expressions must return numeric values:

weighted_avg(value_expression_field, value_expression_weight [,expression, ...] [ * ])

After the value expression you may optionally list additional expressions or event properties to evaluate for the stream and return their value based on the last arriving event. You may also provide the * wildcard selector to have the window output all event properties.


A statement that derives the volume-weighted average price for the last 3 seconds for a given symbol is shown below:

select average 
from StockTickEvent(symbol='GE')#time(3 seconds)#weighted_avg(price, volume)

To add properties from the event stream you may simply add all additional properties as parameters to the window.

This example selects all of the derived values, based on the price and volume properties, plus the values of the symbol and feed event properties:

select *
from StockTickEvent#time(3 seconds)#weighted_avg(price, volume, symbol, feed)

The next example selects all of the derived values plus the values of all event properties:

select *
from StockTickEvent#time(3 seconds)#weighted_avg(price, volume, *)

Aggregation functions could instead be used to compute the weighted average as well. The next example also posts weighted average per symbol considering the last 3 seconds of stock tick data:

select symbol, sum(price*volume)/sum(volume)
from StockTickEvent#time(3 seconds) group by symbol

The following example computes weighted average keeping a separate data window per symbol considering the last 5 events of each symbol:

select symbol, average
from StockTickEvent#groupwin(symbol)#length(5)#weighted_avg(price, volume)