esper.codehaus.org and espertech.comDocumentation
The Event Processing Language (EPL) is a SQL-like language with SELECT
, FROM
, WHERE
, GROUP BY
, HAVING
and ORDER BY
clauses. Streams replace tables as the source of data with events replacing rows as the basic unit of data. Since events are composed of data, the SQL concepts of correlation through joins, filtering and aggregation through grouping can be effectively leveraged.
The INSERT INTO
clause is recast as a means of forwarding events to other streams for further downstream processing. External data accessible through JDBC may be queried and joined with the stream data. Additional clauses such as the PATTERN
and OUTPUT
clauses are also available to provide the missing SQL language constructs specific to event processing.
The purpose of the UPDATE
clause is to update event properties. Update takes place before an event applies to any selecting statements or pattern statements.
EPL statements are used to derive and aggregate information from one or more streams of events, and to join or merge event streams. This section outlines EPL syntax. It also outlines the built-in views, which are the building blocks for deriving and aggregating information from event streams.
EPL statements contain definitions of one or more views. Similar to tables in a SQL statement, views define the data available for querying and filtering. Some views represent windows over a stream of events. Other views derive statistics from event properties, group events or handle unique event property values. Views can be staggered onto each other to build a chain of views. The Esper engine makes sure that views are reused among EPL statements for efficiency.
The built-in set of views is:
Data window views: win:length
, win:length_batch
, win:time
, win:time_batch
, win:time_length_batch
, win:time_accum
, win:ext_timed
, win:ext_timed_batch
, ext:sort
, ext:rank
, ext:time_order
, std:unique
, std:groupwin
, std:lastevent
, std:firstevent
, std:firstunique
, win:firstlength
, win:firsttime
.
Views that derive statistics: std:size
, stat:uni
, stat:linest
, stat:correl
, stat:weighted_avg
.
EPL provides the concept of named window. Named windows are data windows that can be inserted-into and deleted-from by one or more statements, and that can queried by one or more statements. Named windows have a global character, being visible and shared across an engine instance beyond a single statement. Use the CREATE WINDOW
clause to create named windows. Use the ON MERGE
clause to atomically merge events into named window state, the INSERT INTO
clause to insert data into a named window, the ON DELETE
clause to remove events from a named window, the ON UPDATE
clause to update events held by a named window and the ON SELECT
clause to perform a query triggered by a pattern or arriving event on a named window. Finally, the name of the named window can occur in a statement's FROM
clause to query a named window or include the named window in a join or subquery.
EPL allows execution of on-demand (fire-and-forget, non-continuous, triggered by API) queries against named windows through the runtime API. The query engine automatically indexes named window data for fast access by ON SELECT/UPDATE/INSERT/DELETE
without the need to create an index explicitly. For fast on-demand query execution via runtime API use the CREATE INDEX
syntax to create an explicit index.
Use CREATE SCHEMA
to declare an event type.
Variables can come in handy to parameterize statements and change parameters on-the-fly and in response to events. Variables can be used in an expression anywhere in a statement as well as in the output clause for dynamic control of output rates.
Esper can be extended by plugging-in custom developed views and aggregation functions.
EPL queries are created and stored in the engine, and publish results to listeners as events are received by the engine or timer events occur that match the criteria specified in the query. Events can also be obtained from running EPL queries via the safeIterator
and iterator
methods that provide a pull-data API.
The select
clause in an EPL query specifies the event properties or events to retrieve. The from
clause in an EPL query specifies the event stream definitions and stream names to use. The where
clause in an EPL query specifies search conditions that specify which event or event combination to search for. For example, the following statement returns the average price for IBM stock ticks in the last 30 seconds.
select avg(price) from StockTick.win:time(30 sec) where symbol='IBM'
EPL queries follow the below syntax. EPL queries can be simple queries or more complex queries. A simple select contains only a select
clause and a single stream definition. Complex EPL queries can be build that feature a more elaborate select list utilizing expressions, may join multiple streams, may contain a where
clause with search conditions and so on.
[annotations] [expression_declarations] [context context_name] [insert into insert_into_def] select select_list from stream_def [as name] [, stream_def [as name]] [,...] [where search_conditions] [group by grouping_expression_list] [having grouping_search_conditions] [output output_specification] [order by order_by_expression_list] [limit num_rows]
Time-based windows as well as pattern observers and guards take a time period as a parameter. Time periods follow the syntax below.
time-period : [year-part] [month-part] [week-part] [day-part] [hour-part] [minute-part] [seconds-part] [milliseconds-part] year-part : (number|variable_name) ("years" | "year") month-part : (number|variable_name) ("months" | "month") week-part : (number|variable_name) ("weeks" | "week") day-part : (number|variable_name) ("days" | "day") hour-part : (number|variable_name) ("hours" | "hour") minute-part : (number|variable_name) ("minutes" | "minute" | "min") seconds-part : (number|variable_name) ("seconds" | "second" | "sec") milliseconds-part : (number|variable_name) ("milliseconds" | "millisecond" | "msec")
Some examples of time periods are:
10 seconds 10 minutes 30 seconds 20 sec 100 msec 1 day 2 hours 20 minutes 15 seconds 110 milliseconds 0.5 minutes 1 year 1 year 1 month
Variable names and substitution parameters '?
' for prepared statements are also allowed as part of a time period expression.
A unit in the month part is equivalent to 30 days.
Comments can appear anywhere in the EPL or pattern statement text where whitespace is allowed. Comments can be written in two ways: slash-slash (// ...
) comments and slash-star (/* ... */
) comments.
Slash-slash comments extend to the end of the line:
// This comment extends to the end of the line. // Two forward slashes with no whitespace between them begin such comments. select * from MyEvent // this is a slash-slash comment // All of this text together is a valid statement.
Slash-star comments can span multiple lines:
/* This comment is a "slash-star" comment that spans multiple lines. * It begins with the slash-star sequence with no space between the '/' and '*' characters. * By convention, subsequent lines can begin with a star and are aligned, but this is * not required. */ select * from MyEvent /* this also works */
Comments styles can also be mixed:
select field1, // first comment /* second comment*/ field2 from MyEvent
Certain words such as select
, delete
or set
are reserved and may not be used as identifiers. Please consult Appendix B, Reserved Keywords for the list of reserved keywords and permitted keywords.
Names of built-in functions and certain auxiliary keywords are permitted as event property names and in the rename syntax of the select
clause. For example, count
is acceptable.
Consider the example below, which assumes that 'last'
is an event property of MyEvent:
// valid select last, count(*) as count from MyEvent
This example shows an incorrect use of a reserved keyword:
// invalid select insert from MyEvent
EPL offers an escape syntax for reserved keywords: Event properties as well as event or stream names may be escaped via the backwards apostrophe `
(ASCII 96) character.
The next example queries an event type by name Order
(a reserved keyword) that provides a property by name insert
(a reserved keyword):
// valid select `insert` from `Order`
You may surround string values by either double-quotes ("
) or single-quotes ('
). When your string constant in an EPL statement itself contains double quotes or single quotes,
you must escape the quotes.
Double and single quotes may be escaped by the backslash (\
) character or by unicode notation. Unicode 0027 is a single quote ('
) and 0022 is a double quote ("
).
Escaping event property names is described in Section 2.2.1, “Escape Characters”.
The sample EPL below escapes the single quote in the string constant John's
, and filters out order events where the name value matches:
select * from OrderEvent(name='John\'s') // ...equivalent to... select * from OrderEvent(name='John\u0027s')
The next EPL escapes the string constant Quote "Hello"
:
select * from OrderEvent(description like "Quote \"Hello\"") // is equivalent to select * from OrderEvent(description like "Quote \u0022Hello\u0022")
When building an escape string via the API, escape the backslash, as shown in below code snippet:
epService.getEPAdministrator().createEPL("select * from OrderEvent(name='John\\'s')"); // ... and for double quotes... epService.getEPAdministrator().createEPL("select * from OrderEvent( description like \"Quote \\\"Hello\\\"\")");
EPL honors all Java built-in primitive and boxed types, including java.math.BigInteger
and java.math.BigDecimal
.
EPL also follows Java standards in terms of widening, performing widening automatically in cases where widening type conversion is allowed without loss of precision, for both boxed and primitive types and including BigInteger
and BigDecimal
:
byte to short, int, long, float, double, BigInteger or BigDecimal
short to int, long, float, or double, BigInteger or BigDecimal
char to int, long, float, or double, BigInteger or BigDecimal
int to long, float, or double, BigInteger or BigDecimal
long to float or double, BigInteger or BigDecimal
float to double or BigDecimal
double to BigDecimal
In cases where loss of precision is possible because of narrowing requirements, EPL compilation outputs a compilation error.
EPL supports casting via the cast
function.
EPL returns double-type values for division regardless of operand type. EPL can also be configured to follow Java rules for integer arithmetic instead as described in Section 15.4.22, “Engine Settings related to Expression Evaluation”.
Division by zero returns positive or negative infinity. Division by zero can be configured to return null instead.
An EPL constant is a number or a character string that indicates a fixed value. Constants can be used as expressions in many EPL statements, including variable assignment and case-when statements. They can also be used as parameter values for many built-in objects and clauses. Constants are also called literals.
EPL supports the standard SQL constant notation as well as Java data type literals.
The following are types of EPL constants:
Table 5.1. Types of EPL constants
Type | Description | Examples |
---|---|---|
string | A single character to an unlimited number of characters. Valid delimiters are the single quote (') or double quote ("). | select 'volume' as field1, "sleep" as field2, "\u0041" as unicodeA |
boolean | A boolean value. | select true as field1, false as field2 |
integer | An integer value (4 byte). | select 1 as field1, -1 as field2, 1e2 as field3 |
long | A long value (8 byte). Use the "L" or "l" (lowercase L) suffix. | select 1L as field1, 1l as field2 |
double | A double-precision 64-bit IEEE 754 floating point. | select 1.67 as field1, 167e-2 as field2, 1.67d as field3 |
float | A single-precision 32-bit IEEE 754 floating point. Use the "f" suffix. | select 1.2f as field1, 1.2F as field2 |
byte | A 8-bit signed two's complement integer. | select 0x10 as field1 |
EPL does not have a single-byte character data type for its literals. Single character literals are treated as string.
Internal byte representation and boundary values of constants follow the Java standard.
EPL automatically performs widening of numbers to BigInteger
and BigDecimal
as required, and employs the respective equals
, compareTo
and arithmetic methods provided by BigInteger
and BigDecimal
.
To explicitly create BigInteger
and BigDecimal
constants in EPL, please use the cast syntax : cast(
value, BigInteger)
.
Note that since BigDecimal.valueOf(1.0)
is not the same as BigDecimal.valueOf(1)
(in terms of equality through equals
), care should be taken towards the consistent use of scale.
When using aggregation functions for BigInteger
and BigDecimal
values, please note these limitations:
The median
, stddev
and avedev
aggregation functions operate on the double value of the object and return a double value.
All other aggregation functions return BigDecimal
or BigInteger
values (except count
).
For BigDecimal
precision and rounding, please see Section 15.4.22.6, “Math Context”.
This chapter is about Java language constants and enum types and their use in EPL expressions.
Java language constants are public static final fields in Java that may participate in expressions of all kinds, as this example shows:
select * from MyEvent where property = MyConstantClass.FIELD_VALUE
Event properties that are enumeration values can be compared by their enum type value:
select * from MyEvent where enumProp = EnumClass.ENUM_VALUE_1
Event properties can also be passed to enum type functions or compared to an enum type method result:
select * from MyEvent where somevalue = EnumClass.ENUM_VALUE_1.getSomeValue() or EnumClass.ENUM_VALUE_2.analyze(someothervalue)
Enum types have a valueOf
method that returns the enum type value:
select * from MyEvent where enumProp = EnumClass.valueOf('ENUM_VALUE_1')
If your application does not import, through configuration, the package that contains the enumeration class, then it must also specify the package name of the class. Enum types that are inner classes must be qualified with $
following Java conventions.
For example, the Color enum type as an inner class to MyEvent
in package org.myorg
can be referenced as shown:
select * from MyEvent(enumProp=org.myorg.MyEvent$Color.GREEN).std:firstevent()
Instance methods may also be invoked on event instances by specifying a stream name, as shown below:
select myevent.computeSomething() as result from MyEvent as myevent
Chaining instance methods is supported as this example shows:
select myevent.getComputerFor('books', 'movies').calculate() as result from MyEvent as myevent
An annotation is an addition made to information in a statement. Esper provides certain built-in annotations for defining statement name, adding a statement description or for tagging statements such as for managing statements or directing statement output. Other then the built-in annotations, applications can provide their own annotation classes that the EPL compiler can populate.
An annotation is part of the statement text and precedes the EPL select or pattern statement. Annotations are therefore part of the EPL grammar. The syntax for annotations follows the host language (Java, .NET) annotation syntax:
@annotation_name [(annotation_parameters)]
An annotation consists of the annotation name and optional annotation parameters. The annotation_name is the simple class name or fully-qualified class name of the annotation class. The optional annotation_parameters are a list of key-value pairs following the syntax:
@annotation_name (attribute_name = attribute_value, [name=value, ...])
The attribute_name is an identifier that must match the attributes defined by the annotation class. An attribute_value is a constant of any of the primitive types or string, an array, an enum type value or another (nested) annotation. Null values are not allowed as annotation attribute values. Enumeration values are supported in EPL statements and not support in statements created via the createPattern
method.
Use the getAnnotations
method of EPStatement
to obtain annotations provided via statement text.
Your application may provide its own annotation classes. The engine detects and populates annotation instances for application annotation classes.
The name of application-provided annotations is case-sensitive.
To enable the engine to recognize application annotation classes, your annotation name must include the package name (i.e. be fully-qualified) or your engine configuration must import the annotation class or package via the configuration API.
For example, assume that your application defines an annotation in its application code as follows:
public @interface ProcessMonitor { String processName(); boolean isLongRunning default false; int[] subProcessIds; }
Shown next is an EPL statement text that utilizes the annotation class defined earlier:
@ProcessMonitor(processName='CreditApproval', isLongRunning=true, subProcessIds = {1, 2, 3} ) select count(*) from ProcessEvent(processId in (1, 2, 3).win:time(30)
Above example assumes the ProcessMonitor
annotation class is imported via configuration XML or API. Here is an example API call to import annotations provided by a package com.mycompany.myannotations
:
epService.getEPAdministrator().getConfiguration().addImport("com.mycompany.myannotations.*");
The name of built-in annotations is not case-sensitive, allowing both @NAME
or @name
, for example.
The list of built-in EPL annotations is:
Table 5.2. Built-In EPL Annotations
Name | Purpose and Attributes | Example |
---|---|---|
Name |
Provides a statement name. Attributes are: value : Statement name. | @Name("MyStatementName") |
Description |
Provides a statement textual description. Attributes are: value : Statement description. | @Description("Place statement description here.") |
Tag |
For tagging a statement with additional information. Attributes are: name : Tag name. value : Tag value. | @Tag(name="MyTagName", value="MyTagValue") |
Priority |
Applicable when an event (or schedule) matches filter criteria for multiple statements: Defines the order of statement processing (requires an engine-level setting). Attributes are: value : priority value. | @Priority(10) |
Drop |
Applicable when an event (or schedule) matches filter criteria for multiple statements, drops the event after processing the statement (requires an engine-level setting). No attributes. | @Drop |
Hint |
For providing one or more hints towards how the engine should execute a statement. Attributes are: value : A comma-separated list of one or more case-insensitive keywords. | @Hint('ITERATE_ONLY') |
Hook |
Use this annotation to register one or more statement-specific hooks providing a hook type for each individual hook, such as for SQL parameter, column or row conversion. Attributes are the hook | @Hook(type=HookType.SQLCOL, hook='MyDBTypeConvertor') |
Audit |
Causes the engine to output detailed processing information for a statement. optional value : A comma-separated list of one or more case-insensitive keywords. | @Audit |
EventRepresentation |
Causes the engine to use object-array event representation, if possible, for output and internal event types. | @EventRepresentation(array=true) |
The following example statement text specifies some of the built-in annotations in combination:
@Name("RevenuePerCustomer") @Description("Outputs revenue per customer considering all events encountered so far.") @Tag(name="grouping", value="customer") select customerId, sum(revenue) from CustomerRevenueEvent
Use the @Name EPL annotation to specify a statement name within the EPL statement itself, as an alternative to specifying the statement name via API.
If your application is also providing a statement name through the API, the statement name provided through the API overrides the annotation-provided statement name.
Example:
@Name("SecurityFilter1") select * from SecurityFilter(ip="127.0.0.1")
Use the @Description EPL annotation to add a statement textual description.
Example:
@Description('This statement filters localhost.') select * from SecurityFilter(ip="127.0.0.1")
Use the @Tag EPL annotation to tag statements with name-value pairs, effectively adding a property to the statement. The attributes name
and value
are of type string.
Example:
@Tag(name='ip_sensitive', value='Y') @Tag(name='author', value='Jim') select * from SecurityFilter(ip="127.0.0.1")
This annotation only takes effect if the engine-level setting for prioritized execution is set via configuration, as described in Section 15.4.23, “Engine Settings related to Execution of Statements”.
Use the @Priority EPL annotation to tag statements with a priority value. The default priority value is zero (0) for all statements. When an event (or single timer execution) requires processing the event for multiple statements, processing begins with the highest priority statement and ends with the lowest-priority statement.
Example:
@Priority(10) select * from SecurityFilter(ip="127.0.0.1")
This annotation only takes effect if the engine-level setting for prioritized execution is set via configuration, as described in Section 15.4.23, “Engine Settings related to Execution of Statements”.
Use the @Drop EPL annotation to tag statements that preempt all other same or lower-priority statements. When an event (or single timer execution) requires processing the event for multiple statements, processing begins with the highest priority statement and ends with the first statement marked with @Drop, which becomes the last statement to process that event.
Unless a different priority is specified, the statement with the @Drop EPL annotation executes at priority 1. Thereby @Drop alone is an effective means to remove events from a stream.
Example:
@Drop select * from SecurityFilter(ip="127.0.0.1")
A hint can be used to provide tips for the engine to affect statement execution. Hints change performance or memory-use of a statement but generally do not change its output.
The string value of a Hint
annotation contains a keyword or a comma-separated list of multiple keywords. Hint keywords are case-insensitive. A list of hints is available in Section 20.2.23, “Consider using Hints”.
Example:
@Hint('disable_reclaim_group') select ipaddress, count(*) from SecurityFilter.win:time(60 sec) group by ipaddress
A hook is for attaching a callback to a statement.
The type value of a @Hook
annotation defines the type of hook and the hook
value is an imported or fully-qualified class name providing the callback implementation.
Causes the engine to output detailed information about the statements processing. Described in more detail at Section 16.3.1, “@Audit Annotation”.
Use the @EventRepresentation
annotation with create schema
and create window
statements to instruct the engine to use a specific event representation for the schema or named window.
Use the @EventRepresentation
annotation with select
statements to instruct the engine to use a specific event representation for output events.
When no @EventRepresentation
annotation is specified, the engine uses the default event representation as configured, see Section 15.4.11.1, “Default Event Representation”.
Use @EventRepresentation(array=true)
to instruct the engine to use object-array events.
Use @EventRepresentation(array=false)
to instruct the engine to use Map events.
An EPL statement can contain expression declarations. Expressions that are common to multiple places in the same EPL statement can be moved to a named expression declaration and reused within the same statement without duplicating the expression itself.
For declaring expressions that are visible across multiple EPL statements i.e. globally visible expressions please consult Section 5.19.1, “Declaring a Global Expression”
that explains the create expression
clause.
An expression declaration follows the lambda-style expression syntax. This syntax was chosen as it typically allows for a shorter and more concise expression body that can be easier to read then most procedural code.
The syntax for an expression declaration is:
expression expression_name { expression_body }
An expression declaration consists of the expression name and an expression body. The expression_name is any identifier. The expression_body contains optional parameters and the expression. The parameter types and the return type of the expression is determined by the engine and do not need to be specified.
Parameters to a declared expression can be a stream name, pattern tag name or wildcard (*
). Use wildcard to pass the event itself to the expression. In a join or subquery, or more generally in an expression where multiple streams or pattern tags are available, the EPL must specify the stream name or pattern tag name and cannot use wildcard.
In the expression body the =>
lambda operator reads as "goes to". The left side of the lambda operator specifies the input parameters (if any) and the right side holds the expression. The lambda expression x => x * x
is read "x goes to x times x".
In the expression body, if your expression takes no parameters, you may simply specify the expression and do not need the =>
lambda operator.
If your expression takes one parameters, specify the input parameter name followed by the =>
lambda operator and followed by the expression. The synopsis for use with a single input parameter is:
expression_body: input_param_name => expression
If your expression takes two or more parameters, specify the input parameter names in parenthesis followed by the =>
lambda operator followed by the expression. The synopsis for use with a multiple input parameter is:
expression_body: (input_param [,input_param [,...]]) => expression
The following example declares an expression that returns two times PI (ratio of the circumference of a circle to its diameter) and demonstrates its use in a select-clause:
expression twoPI { Math.PI * 2} select twoPI() from SampleEvent
The next example declares an expression that accepts one parameter: a MarketData event. The expression computes a new "mid" price based on the buy and sell price:
expression midPrice { x => (x.buy + x.sell) / 2 } select midPrice(md) from MarketDataEvent as md
The variable name can be left off if event property names resolve without ambiguity.
This example EPL removes the variable name x
:
expression midPrice { x => (buy + sell) / 2 } select midPrice(md) from MarketDataEvent as md
The next example EPL specifies wildcard instead:
expression midPrice { x => (buy + sell) / 2 } select midPrice(*) from MarketDataEvent
A further example that demonstrates two parameters is listed next. The example joins two streams and uses the price value from MarketDataEvent and the sentiment value of NewsEvent to compute a weighted sentiment:
expression weightedSentiment { (x, y) => x.price * y.sentiment } select weightedSentiment(md, news) from MarketDataEvent.std:lastevent() as md, NewsEvent.std:lastevent() news
Any expression can be used in the expression body including aggregations, variables, subqueries or further declared expressions. Sub-queries, when used without in
or exists
, must be placed within parenthesis.
An example subquery within a declared expression is shown next:
expression newsSubq(md) { (select sentiment from NewsEvent.std:unique(symbol) where symbol = md.symbol) } select newsSubq(mdstream) from MarketDataEvent mdstream
When using declared expressions please note these limitations:
Parameters to a declared expression can only be a stream name, pattern tag name or wildcard (*
).
The following scope rules apply for declared expressions:
The scope of the expression body of a declared expression only includes the parameters explicitly listed.
Esper allows the use of scripting languages within EPL. Any scripting language that supports JSR 223 and also the MVEL scripting language can be specified in EPL.
Please see Chapter 18, Script Support for more information.
You may refer to a context in the EPL text by specifying the context
keyword followed by a context name. Context are described in more detail at Chapter 4, Context and Context Partitions
The effect of referring to a context is that your statement operates according to the context dimensional information as declared for the context.
The synopsis is:
... context context_name ...
You may refer to a context in all statements except for the following types of statements:
create schema
for declaring event types.
create variable
for declaring a variable.
create index
for creating an index on a named window.
update istream
for updating insert stream events.
The select
clause is required in all EPL statements. The select
clause can be used to select all properties via the wildcard *
, or to specify a list of event properties and expressions. The select
clause defines the event type (event property names and types) of the resulting events published by the statement, or pulled from the statement via the iterator methods.
The select
clause also offers optional istream
, irstream
and rstream
keywords to control whether input stream, remove stream or input and remove stream events are posted to UpdateListener
instances and observers to a statement. By default, the engine provides only the insert stream to listener and observers. See Section 15.4.17, “Engine Settings related to Stream Selection” on how to change the default.
The syntax for the select
clause is summarized below.
select [istream | irstream | rstream] [distinct] * | expression_list ...
The istream
keyword is the default, and indicates that the engine only delivers insert stream events to listeners and observers. The irstream
keyword indicates that the engine delivers both insert and remove stream. Finally, the rstream
keyword tells the engine to deliver only the remove stream.
The distinct
keyword outputs only unique rows depending on the column list you have specified after it. It must occur after the select
and after the optional stream keywords, as described in more detail below.
The syntax for selecting all event properties in a stream is:
select * from stream_def
The following statement selects StockTick events for the last 30 seconds of IBM stock ticks.
select * from StockTick(symbol='IBM').win:time(30 sec)
You may well be asking: Why does the statement specify a time window here? First, the statement is meant to demonstrate the use of *
wildcard.
When the engine pushes statement results to your listener and as the statement does not select remove stream events via rstream
keyword, the listener receives only new events and the time window could be left off.
By adding the time window the pull API (iterator API or JDBC driver) returns the last 30 seconds of events.
The *
wildcard and expressions can also be combined in a select
clause. The combination selects all event properties and in addition the computed values
as specified by any additional expressions that are part of the select
clause. Here is an example that selects all properties of stock tick events plus a computed product of price and volume that the
statement names 'pricevolume':
select *, price * volume as pricevolume from StockTick
When using wildcard (*), Esper does not actually read or copy your event properties out of your event or events, neither does it copy the event object.
It simply wraps your native type in an EventBean
interface. Your application has access to
the underlying event object through the getUnderlying
method and has access to the property values through the get
method.
In a join statement, using the select *
syntax selects one event property per stream to hold the event for that stream. The property name is the stream name in the from
clause.
To choose the particular event properties to return:
select event_property [, event_property] [, ...] from stream_def
The following statement simply selects the symbol and price properties of stock ticks, and the total volume for stock tick events in a 60-second time window.
select symbol, price, sum(volume) from StockTick(symbol='IBM').win:time(60 sec)
The following statement declares a further view onto the event stream of stock ticks: the univariate statistics view (stat:uni
). The statement selects the properties that this view derives from the stream, for the last 100 events of IBM stock ticks in the length window.
select datapoints, total, average, variance, stddev, stddevpa from StockTick(symbol='IBM').win:length(100).stat:uni(volume)
The select
clause can contain one or more expressions.
select expression [, expression] [, ...] from stream_def
The following statement selects the volume multiplied by price for a time batch of the last 30 seconds of stock tick events.
select volume * price from StockTick.win:time_batch(30 sec)
Event properties and expressions can be renamed using below syntax.
select [event_property | expression] as identifier [, ...]
The following statement selects volume multiplied by price and specifies the name volPrice for the resulting column.
select volume * price as volPrice from StockTick
Identifiers cannot contain the "." (dot) character, i.e. "vol.price" is not a valid identifier for the rename syntax.
If your statement is joining multiple streams, your may specify property names that are unique among the joined streams, or use wildcard (*) as explained earlier.
In case the property name in your select
or other clauses is not unique considering all joined streams, you will need to use the name of the stream as a prefix to the property.
This example is a join between the two streams StockTick and News, respectively named as 'tick' and 'news'. The example selects from the StockTick event the symbol value using the 'tick' stream name as a prefix:
select tick.symbol from StockTick.win:time(10) as tick, News.win:time(10) as news where news.symbol = tick.symbol
Use the wildcard (*) selector in a join to generate a property for each stream, with the property value being the event itself. The output events of the statement below have two properties: the 'tick' property holds the StockTick event and the 'news' property holds the News event:
select * from StockTick.win:time(10) as tick, News.win:time(10) as news
The following syntax can also be used to specify what stream's properties to select:
select stream_name.* [as name] from ...
The selection of tick.*
selects the StockTick stream events only:
select tick.* from StockTick.win:time(10) as tick, News.win:time(10) as news where tick.symbol = news.symbol
The next example uses the as
keyword to name each stream's joined events. This instructs the engine to create a property for each named event:
select tick.* as stocktick, news.* as news from StockTick.win:time(10) as tick, News.win:time(10) as news where stock.symbol = news.symbol
The output events of the above example have two properties 'stocktick' and 'news' that are the StockTick and News events.
The stream name itself, as further described in Section 5.4.5, “Using the Stream Name”, may be used within expressions or alone.
This example passes events to a user-defined function named compute
and also shows insert-into
to populate an event stream of combined events:
insert into TickNewStream select tick, news, MyLib.compute(news, tick) as result from StockTick.win:time(10) as tick, News.win:time(10) as news where tick.symbol = news.symbol
// second statement that uses the TickNewStream stream select tick.price, news.text, result from TickNewStream
In summary, the stream_name.* streamname wildcard syntax can be used to select a stream as the underlying event or as a property, but cannot appear within an expression. While the stream_name syntax (without wildcard) always selects a property (and not as an underlying event), and can occur anywhere within an expression.
If your statement employs pattern expressions, then your pattern expression tags events with a tag name. Each tag name becomes available for use as a property in the select
clause and all other clauses.
For example, here is a very simple pattern that matches on every StockTick event received within 30 seconds after start of the statement. The sample selects the symbol and price properties of the matching events:
select tick.symbol as symbol, tick.price as price from pattern[every tick=StockTick where timer:within(10 sec)]
The use of the wildcard selector, as shown in the next statement, creates a property for each tagged event in the output. The next statement outputs events that hold a single 'tick' property whose value is the event itself:
select * from pattern[every tick=StockTick where timer:within(10 sec)]
You may also select the matching event itself using the tick.*
syntax. The engine outputs the StockTick event itself to listeners:
select tick.* from pattern[every tick=StockTick where timer:within(10 sec)]
A tag name as specified in a pattern is a valid expression itself. This example uses the insert into
clause to make available the events matched by a pattern to further statements:
// make a new stream of ticks and news available insert into StockTickAndNews select tick, news from pattern [every tick=StockTick -> news=News(symbol=tick.symbol)]
// second statement to select from the stream of ticks and news select tick.symbol, tick.price, news.text from StockTickAndNews
The optional istream
, irstream
and rstream
keywords in the select
clause control the event streams posted to listeners and observers to a statement.
If neither keyword is specified, and in the default engine configuration, the engine posts only
insert stream events via the newEvents
parameter to the update
method of UpdateListener
instances listening to
the statement. The engine does not post remove stream events, by default.
The insert stream consists of the events entering the respective window(s) or stream(s) or aggregations, while the remove stream consists of the events leaving the respective window(s) or the changed aggregation result. See Chapter 3, Processing Model for more information on insert and remove streams.
The engine posts remove stream events to the oldEvents
parameter of the update
method only if the irstream
keyword
occurs in the select
clause. This behavior can be changed via engine-wide configuration as described in Section 15.4.17, “Engine Settings related to Stream Selection”.
By specifying the istream
keyword you can instruct the engine to only post insert stream events via the newEvents
parameter to the update
method on listeners. The engine will then not post any remove stream events, and the oldEvents
parameter is always a null value.
By specifying the irstream
keyword you can instruct the engine to post both insert stream and remove stream events.
By specifying the rstream
keyword you can instruct the engine to only post remove stream events via the newEvents
parameter to the update
method on listeners. The engine will then not post any insert stream events, and the oldEvents
parameter is also always a null value.
The following statement selects only the events that are leaving the 30 second time window.
select rstream * from StockTick.win:time(30 sec)
The istream
and rstream
keywords in the select
clause are matched by same-name keywords available in the insert into
clause. While the keywords in the select
clause control the event stream posted to listeners to the statement, the same keywords in the insert into
clause specify the event stream that the engine makes available to other statements.
Property or column names can optionally be qualified by a stream name and the provider URI. The syntax is:
[[provider_URI.]stream_name.]property_name
The provider_URI is the URI supplied to the EPServiceProviderManager
class, or the string default
for the default provider.
This example assumes the provider is the default provider:
select MyEvent.myProperty from MyEvent // ... equivalent to ... select default.MyEvent.myProperty from MyEvent
Stream names can also be qualified by the provider URI. The syntax is:
[provider_URI.]stream_name
The next example assumes a provider URI by name of Processor
:
select Processor.MyEvent.myProperty from Processor.MyEvent
The optional distinct
keyword removes duplicate output events from output. The keyword must occur after the select
keyword and after the optional irstream
keyword.
The distinct
keyword in your select
instructs the engine to consolidate, at time of output, the output event(s) and remove output events with identical property values.
Duplicate removal only takes place when two or more events are output together at any one time, therefore distinct
is typically used with a batch data window, output rate limiting, on-demand queries, on-select or iterator pull API.
If two or more output event objects have same property values for all properties of the event, the distinct
removes all but one duplicated event before outputting events to listeners. Indexed, nested and mapped properties
are considered in the comparison, if present in the output event.
The next example outputs sensor ids of temperature sensor events, but only every 10 seconds and only unique sensor id values during the 10 seconds:
select distinct sensorId from TemperatureSensorEvent output every 10 seconds
Use distinct
with wildcard (*
) to remove duplicate output events considering all properties of an event.
This example statement outputs all distinct events either when 100 events arrive or when 10 seconds passed, whichever occurs first:
select distinct * from TemperatureSensorEvent.win:time_length_batch(10, 100)
When selecting nested, indexed, mapped or dynamic properties in a select
clause with distinct
, it is relevant to know that the comparison uses hash code and the Java equals
semantics.
For transposing an instance of a Java object returned by an expression to a stream use the transpose function as described in Section 9.4, “Select-Clause transpose Function”.
By default, for certain select-clause expressions that output events or a collection of events, the engine outputs the underlying event objects. With outputs we refer to the data passed to listeners, subscribers and inserted-into into another stream via insert-into.
The select-clause expressions for which underlying event objects are output by default are:
previous
family of single-row functions
To have the engine output EventBean
instance(s) instead, add @eventbean
to the relevant expressions of the select
-clause.
The sample EPL shown below outputs current data window contents as EventBean
instances
into the stream OutStream
, thereby statements consuming the stream may operate on such instances:
insert into OutStream select prevwindow(s0) @eventbean as win from MyEvent.win:length(2) as s0
The next EPL consumes the stream and selects the last event:
select win.lastOf() from OutStream
It is not necessary to use @eventbean
if an event type by the same name (OutStream
in the example) is already declared
and a property exist on the type by the same name (win
in this example) and the type of the property is the event type (MyEvent
in the example)
returned by the expression. This is further described in Section 5.10.8, “Select-Clause Expression And Inserted-Into Column Event Type”.
The from
clause is required in all EPL statements. It specifies one or more event streams or named windows. Each event stream or named window can optionally be given a name by means of the as
keyword.
from stream_def [as name] [unidirectional] [retain-union | retain-intersection] [, stream_def [as stream_name]] [, ...]
The event stream definition stream_def as shown in the syntax above can consists of either a filter-based event stream definition or a pattern-based event stream definition.
For joins and outer joins, specify two or more event streams. Joins between pattern-based and filter-based event streams are also supported. Joins and the unidirectional
keyword are described in more detail in Section 5.12, “Joining Event Streams”.
Esper supports joins against relational databases for access to historical or reference data as explained in Section 5.13, “Accessing Relational Data via SQL”. Esper can also join results returned by an arbitrary method invocation, as discussed in Section 5.14, “Accessing Non-Relational Data via Method Invocation”.
The stream_name is an optional identifier assigned to the stream. The stream name can itself occur in any expression and provides access to the event itself from the named stream. Also, a stream name may be combined with a method name to invoke instance methods on events of that stream.
For all streams with the exception of historical sources your query may employ data window views as outlined below. The retain-intersection
(the default) and retain-union
keywords build a union or intersection of two or more data windows as described in Section 5.4.4, “Multiple Data Window Views”.
The stream_def syntax for a filter-based event stream is as below:
event_stream_name [(filter_criteria)] [contained_selection] [.view_spec] [.view_spec] [...]
The event_stream_name is either the name of an event type or name of an event stream populated by an insert into
statement or the name of a named window.
The filter_criteria is optional and consists of a list of expressions filtering the events of the event stream, within parenthesis after the event stream name.
The contained_selection is optional and is for use with coarse-grained events that have properties that are themselves one or more events, see Section 5.20, “Contained-Event Selection” for the synopsis and examples.
The view_spec are optional view specifications, which are combinable definitions for retaining events and for deriving information from events.
The following EPL statement shows event type, filter criteria and views combined in one statement. It selects all event properties for the last 100 events of IBM stock ticks for volume. In the example, the event type is the fully qualified Java class name org.esper.example.StockTick
. The expression filters for events where the property symbol
has a value of "IBM". The optional view specifications for deriving data from the StockTick events are a length window and a view for computing statistics on volume. The name for the event stream is "volumeStats".
select * from org.esper.example.StockTick(symbol='IBM').win:length(100).stat:uni(volume) as volumeStats
Esper filters out events in an event stream as defined by filter criteria before it sends events to subsequent views. Thus, compared to search conditions in a where
clause, filter criteria remove unneeded events early. In the above example, events with a symbol other then IBM do not enter the time window.
The simplest form of filter is a filter for events of a given type without any conditions on the event property values. This filter matches any event of that type regardless of the event's properties. The example below is such a filter.
select * from com.mypackage.myevents.RfidEvent
Instead of the fully-qualified Java class name any other event name can be mapped via Configuration to a Java class, making the resulting statement more readable:
select * from RfidEvent
Interfaces and superclasses are also supported as event types. In the below example IRfidReadable
is an interface class.
select * from org.myorg.rfid.IRfidReadable
The filtering criteria to filter for events with certain event property values are placed within parenthesis after the event type name:
select * from RfidEvent(category="Perishable")
All expressions can be used in filters, including static methods that return a boolean value:
select * from com.mycompany.RfidEvent(MyRFIDLib.isInRange(x, y) or (x < 0 and y < 0))
Filter expressions can be separated via a single comma ',
'. The comma represents a logical AND between filter expressions:
select * from RfidEvent(zone=1, category=10) ...is equivalent to... select * from RfidEvent(zone=1 and category=10)
The following operators are highly optimized through indexing and are the preferred means of filtering in high-volume event streams and especially in the presence of a larger number of filters or statements:
equals =
not equals !=
comparison operators < , > , >=, <=
ranges
use the between
keyword for a closed range where both endpoints are included
use the in
keyword and round ()
or square brackets []
to control how endpoints are included
for inverted ranges use the not
keyword and the between
or in
keywords
list-of-values checks using the in
keyword or the not in
keywords followed by a comma-separated list of values
single-row functions that have been registered and are invoked via function name (see user-defined functions) and that either return a boolean value or that have their return value compared to a constant
At compile time as well as at run time, the engine scans new filter expressions for sub-expressions that can be indexed. Indexing filter values to match event properties of incoming events enables the engine to match incoming events faster, especially if your application creates a large number of statements or requires many similar filters. The above list of operators represents the set of operators that the engine can best convert into indexes. The use of comma or logical and
in filter expressions does not impact optimizations by the engine.
Ranges come in the following 4 varieties. The use of round ()
or square []
bracket dictates whether an endpoint is included or excluded. The low point and the high-point of the range are separated by the colon :
character.
Open ranges that contain neither endpoint (low:high)
Closed ranges that contain both endpoints [low:high]
. The equivalent 'between' keyword also defines a closed range.
Half-open ranges that contain the low endpoint but not the high endpoint [low:high)
Half-closed ranges that contain the high endpoint but not the low endpoint (low:high]
The next statement shows a filter specifying a range for x
and y
values of RFID events. The range includes both endpoints therefore uses []
hard brackets.
mypackage.RfidEvent(x in [100:200], y in [0:100])
The between
keyword is equivalent for closed ranges. The same filter using the between
keyword is:
mypackage.RfidEvent(x between 100 and 200, y between 0 and 50)
The not
keyword can be used to determine if a value falls outside a given range:
mypackage.RfidEvent(x not in [0:100])
The equivalent statement using the between
keyword is:
mypackage.RfidEvent(x not between 0 and 100)
The in
keyword for filter criteria determines if a given value matches any value in a list of values.
In this example we are interested in RFID events where the category matches any of the given values:
mypackage.RfidEvent(category in ('Perishable', 'Container'))
By using the not in
keywords we can filter events with a property value that does not match any of the values in a list of values:
mypackage.RfidEvent(category not in ('Household', 'Electrical'))
The following restrictions apply to filter criteria:
Range and comparison operators require the event property to be of a numeric or string type.
Aggregation functions are not allowed within filter expressions.
The prev
previous event function and the prior
prior event function cannot be used in filter expressions.
Event pattern expressions can also be used to specify one or more event streams in an EPL statement.
For pattern-based event streams, the event stream definition stream_def consists of the keyword pattern
and a pattern expression in brackets []
. The syntax for an event stream definition using a pattern expression is below. As in filter-based event streams, an optional list of views that derive data from the stream can be supplied.
pattern [pattern_expression] [.view_spec] [.view_spec] [...]
The next statement specifies an event stream that consists of both stock tick events and trade events. The example tags stock tick events with the name "tick" and trade events with the name "trade".
select * from pattern [every tick=StockTickEvent or every trade=TradeEvent]
This statement generates an event every time the engine receives either one of the event types. The generated events resemble a map with "tick" and "trade" keys. For stock tick events, the "tick" key value is the underlying stock tick event, and the "trade" key value is a null value. For trade events, the "trade" key value is the underlying trade event, and the "tick" key value is a null value.
Lets further refine this statement adding a view the gives us the last 30 seconds of either stock tick or trade events. Lets also select prices and a price total.
select tick.price as tickPrice, trade.price as tradePrice, sum(tick.price) + sum(trade.price) as total from pattern [every tick=StockTickEvent or every trade=TradeEvent].win:time(30 sec)
Note that in the statement above tickPrice
and tradePrice
can each be null values depending on the event processed. Therefore, an aggregation function such as sum(tick.price + trade.price))
would always return null values as either of the two price properties are always a null value for any event matching the pattern. Use the coalesce
function to handle null values, for example: sum(coalesce(tick.price, 0) + coalesce(trade.price, 0))
.
Views are used to specify an expiry policy for events (data window views) and also to derive data. Views can be staggered onto each other. See the section Chapter 12, EPL Reference: Views on the views available that also outlines the different types of views: Data Window views and Derived-Value views.
Views can optionally take one or more parameters. These parameters are expressions themselves that may consist of any combination of variables, arithmetic, user-defined function or substitution parameters for prepared statements, for example.
The example statement below outputs a count per expressway for car location events (contains information about the location of a car on a highway) of the last 60 seconds:
select expressway, count(*) from CarLocEvent.win:time(60) group by expressway
The next example serves to show staggering of views. It uses the std:groupwin
view to create a separate length window per car id:
select cardId, expressway, direction, segment, count(*) from CarLocEvent.std:groupwin(carId).win:length(4) group by carId, expressway, direction, segment
The first view std:groupwin(carId)
groups car location events by car id. The second view win:length(4)
keeps a length window of the 4 last events, with one separate length window for each car id. The example reports the number of events per car id and per expressway, direction and segment considering the last 4 events for each car id only.
Note that the group by
syntax is generally preferable over std:groupwin
for grouping information as it is SQL-compliant, easier to read and does not create a separate data window per group. The std:groupwin
in above example creates a separate data window (length window in the example) per group, demonstrating staggering views.
When views are staggered onto each other as a chain of views, then the insert and remove stream received by each view is the insert and remove stream made available by the view (or stream) earlier in the chain.
The special keep-all view keeps all events: It does not provide a remove stream, i.e. events are not removed from the keep-all view unless by means of the on-delete
syntax or by revision events.
Data window views provide an expiry policy that indicates when to remove events from the data window, with the exception of the keep-all data window which has no expiry policy and the std:groupwin
grouped-window view for allocating a new data window per group.
EPL allows the freedom to use multiple data window views onto a stream and thus combine expiry policies. Combining data windows into an intersection (the default) or a union can achieve a useful strategy for retaining events and expiring events that are no longer of interest. Named windows and the on-delete
syntax provide an additional degree of freedom.
In order to combine two or more data window views there is no keyword required. The retain-intersection keyword is the default and the retain-union keyword may instead be provided for a stream.
The concept of union and intersection come from Set mathematics. In the language of Set mathematics, two sets A and B can be "added" together: The intersection of A and B is the set of all things which are members of both A and B, i.e. the members two sets have "in common". The union of A and B is the set of all things which are members of either A or B.
Use the retain-intersection (the default) keyword to retain an intersection of all events as defined by two or more data windows. All events removed from any of the intersected data windows are entered into the remove stream. This is the default behavior if neither retain keyword is specified.
Use the retain-union keyword to retain a union of all events as defined by two or more data windows. Only events removed from all data windows are entered into the remove stream.
The next example statement totals the price of OrderEvent events in a union of the last 30 seconds and unique by product name:
select sum(price) from OrderEvent.win:time(30 sec).std:unique(productName) retain-union
In the above statement, all OrderEvent events that are either less then 30 seconds old or that are the last event for the product name are considered.
Here is an example statement totals the price of OrderEvent events in an intersection of the last 30 seconds and unique by product name:
select sum(price) from OrderEvent.win:time(30 sec).std:unique(productName) retain-intersection
In the above statement, only those OrderEvent events that are both less then 30 seconds old and are the last event for the product name are considered. The number of events that the engine retains is the number of unique events per product name in the last 30 seconds (and not the number of events in the last 30 seconds).
For an intersection the engine retains the minimal number of events representing that intersection. Thus when combining a time window of 30 seconds and a last-event window, for example, the number of events retained at any time is zero or one event (and not 30 seconds of events).
When combining a batch window into an intersection with another data window the combined data window gains batching semantics: Only when the batch criteria is fulfilled does the engine provide the batch of intersecting insert stream events. Multiple batch data windows may not be combined into an intersection.
In below table we provide additional examples for data window intersections:
Table 5.3. Intersection Data Window Examples
Example | Description |
---|---|
win:time(30).std:firstunique(keys) | Retains 30 seconds of events unique per keys value (first event per value). |
win:firstlength(3).std:firstunique(keys) | Retains the first 3 events that are also unique per keys value. |
win:time_batch(N seconds).std:unique(keys) | Posts a batch every N seconds that contains the last of each unique event per keys value. |
win:time_batch(N seconds).std:firstunique(keys) | Posts a batch every N seconds that contains the first of each unique event per keys value. |
win:length_batch(N).std:unique(keys) | Posts a batch of unique events (last event per value) when N unique events per keys value are encountered. |
win:length_batch(N).std:firstunique(keys) | Posts a batch of unique events (first event per value) when N unique events per keys value are encountered. |
For advanced users and for backward compatibility, it is possible to configure Esper to allow multiple data window views without either of the retain
keywords, as described in Section 15.4.12.2, “Configuring Multi-Expiry Policy Defaults”.
Your from
clause may assign a name to each stream. This assigned stream name can serve any of the following purposes.
First, the stream name can be used to disambiguate property names. The stream_name.property_name
syntax uniquely identifies which property to select if property names overlap between streams. Here is an example:
select prod.productId, ord.productId from ProductEvent as prod, OrderEvent as ord
Second, the stream name can be used with a wildcard (*) character to select events in a join, or assign new names to the streams in a join:
// Select ProductEvent only select prod.* from ProductEvent as prod, OrderEvent
// Assign column names 'product' and 'order' to each event select prod.* as product, ord.* as order from ProductEvent as prod, OrderEvent as ord
Further, the stream name by itself can occur in any expression: The engine passes the event itself to that expression. For example, the engine passes the ProductEvent and the OrderEvent to the user-defined function 'checkOrder':
select prod.productId, MyFunc.checkOrder(prod, ord) from ProductEvent as prod, OrderEvent as ord
Last, you may invoke an instance method on each event of a stream, and pass parameters to the instance method as well. Instance method calls are allowed anywhere in an expression.
The next statement demonstrates this capability by invoking a method 'computeTotal' on OrderEvent events and a method 'getMultiplier' on ProductEvent events:
select ord.computeTotal(prod.getMultiplier()) from ProductEvent as prod, OrderEvent as ord
Instance methods may also be chained: Your EPL may invoke a method on the result returned by a method invocation.
Assume that your product event exposes a method getZone
which returns a zone object. Assume that the Zone class declares a method checkZone
.
This example statement invokes a method chain:
select prod.getZone().checkZone("zone 1") from ProductEvent as prod
The where
clause is an optional clause in EPL statements. Via the where
clause event streams can be joined and events can be filtered.
Comparison operators =, < , > , >=, <=, !=, <>, is null, is not null
and logical combinations via and
and or
are supported in the where
clause. The where
clause can also introduce join conditions as outlined in Section 5.12, “Joining Event Streams”. where
clauses can also contain expressions. Some examples are listed below.
...where fraud.severity = 5 and amount > 500 ...where (orderItem.orderId is null) or (orderItem.class != 10) ...where (orderItem.orderId = null) or (orderItem.class <> 10) ...where itemCount / packageCount > 10
The aggregate functions are further documented in Section 9.2, “Aggregation Functions”. You can use aggregate functions to calculate and summarize data from event properties. For example, to find out the total price for all stock tick events in the last 30 seconds, type:
select sum(price) from StockTickEvent.win:time(30 sec)
Here is the syntax for aggregate functions:
aggregate_function( [all | distinct] expression)
You can apply aggregate functions to all events in an event stream window or other view, or to one or more groups of events. From each set of events to which an aggregate function is applied, Esper generates a single value.
Expression
is usually an event property name. However it can also be a constant, function, or any combination of event property names, constants,
and functions connected by arithmetic operators.
For example, to find out the average price for all stock tick events in the last 30 seconds if the price was doubled:
select avg(price * 2) from StockTickEvent.win:time(30 seconds)
You can use the optional keyword distinct
with all aggregate functions to eliminate duplicate values before the aggregate function is applied. The optional
keyword all
which performs the operation on all events is the default.
You can use aggregation functions in a select
clause and in a having
clause. You cannot use aggregate functions in a where
clause, but you can use the where
clause to restrict the events to which the aggregate is applied. The next query computes the average and sum of the price of stock tick events for the symbol IBM only, for the last 10 stock tick events regardless of their symbol.
select 'IBM stats' as title, avg(price) as avgPrice, sum(price) as sumPrice from StockTickEvent.win:length(10) where symbol='IBM'
In the above example the length window of 10 elements is not affected by the where
clause, i.e. all events enter and leave the length window regardless of their symbol. If we only care about the last 10 IBM events, we need to add filter criteria as below.
select 'IBM stats' as title, avg(price) as avgPrice, sum(price) as sumPrice from StockTickEvent(symbol='IBM').win:length(10) where symbol='IBM'
You can use aggregate functions with any type of event property or expression, with the following exceptions:
You can use sum, avg, median, stddev, avedev
with numeric event properties only
Esper ignores any null values returned by the event property or expression on which the aggregate function is operating, except for the count(*)
function, which counts null values as well. All aggregate functions return null if the data set contains no events, or if all events in the data set contain only null values for the aggregated expression.
The group by
clause is optional in all EPL statements. The group by
clause divides the output of an EPL statement into groups. You can group by one or more event property names, or by the result of computed expressions. When used with aggregate functions, group by
retrieves the calculations in each subgroup. You can use group by
without aggregate functions, but generally that can produce confusing results.
For example, the below statement returns the total price per symbol for all stock tick events in the last 30 seconds:
select symbol, sum(price) from StockTickEvent.win:time(30 sec) group by symbol
The syntax of the group by
clause is:
group by aggregate_free_expression [, aggregate_free_expression] [, ...]
Esper places the following restrictions on expressions in the group by
clause:
Expressions in the group by
cannot contain aggregate functions.
When grouping an unbound stream, i.e. no data window is specified onto the stream providing groups, or when using output rate limiting with the ALL keyword, you should ensure your group-by expression does not return an unlimited number of values. If, for example, your group-by expression is a fine-grained timestamp, group state that accumulates for an unlimited number of groups potentially reduces available memory significantly. Use a @Hint as described below to instruct the engine when to discard group state.
You can list more then one expression in the group by
clause to nest groups. Once the sets are established with group by
the aggregation
functions are applied. This statement posts the median volume for all stock tick events in the last 30 seconds per symbol and tick data feed. Esper posts one event for each group to statement listeners:
select symbol, tickDataFeed, median(volume) from StockTickEvent.win:time(30 sec) group by symbol, tickDataFeed
In the statement above the event properties in the select
list (symbol, tickDataFeed) are also listed in the group by
clause.
The statement thus follows the SQL standard which prescribes that non-aggregated event properties in the select
list must match the
group by
columns.
Esper also supports statements in which one or more event properties in the select
list are not listed in the group by
clause.
The statement below demonstrates this case. It calculates the standard deviation for the last 30 seconds of stock ticks aggregating by symbol and posting for
each event the symbol, tickDataFeed and the standard deviation on price.
select symbol, tickDataFeed, stddev(price) from StockTickEvent.win:time(30 sec) group by symbol
The above example still aggregates the price
event property based on the symbol
, but produces one event per incoming event, not one
event per group.
Additionally, Esper supports statements in which one or more event properties in the group by
clause are not listed in the select
list.
This is an example that calculates the mean deviation per symbol
and tickDataFeed
and posts one event per group with symbol
and mean deviation of price in the generated events. Since tickDataFeed is not in the posted results, this can potentially be confusing.
select symbol, avedev(price) from StockTickEvent.win:time(30 sec) group by symbol, tickDataFeed
Expressions are also allowed in the group by
list:
select symbol * price, count(*) from StockTickEvent.win:time(30 sec) group by symbol * price
If the group by
expression resulted in a null value, the null value becomes its own group. All null values are aggregated into the same group. If you are using the count(expression)
aggregate function which does not count null values, the count returns zero if only null values are encountered.
You can use a where
clause in a statement with group by
. Events that do not satisfy the conditions in the where
clause are eliminated before any grouping is done. For example, the statement below posts the number of stock ticks in the last 30 seconds with a volume larger then 100, posting one event per group (symbol).
select symbol, count(*) from StockTickEvent.win:time(30 sec) where volume > 100 group by symbol
The Esper engine reclaims aggregation state agressively when it determines that a group has no data points, based on the data in the data windows. When your application data creates a large number of groups with a small or zero number of data points then performance may suffer as state is reclaimed and created anew. Esper provides the @Hint('disable_reclaim_group')
hint that you can specify as part of an EPL statement text to avoid group reclaim.
When aggregating values over an unbound stream (i.e. no data window is specified onto the stream) and when your group-by expression returns an unlimited number of values, for example when a timestamp expression is used, then please note the next hint.
A sample statement that aggregates stock tick events by timestamp, assuming the event type offers a property by name timestamp
that, reflects time in high resolution, for example arrival or system time:
// Note the below statement could lead to an out-of-memory problem: select symbol, sum(price) from StockTickEvent group by timestamp
As the engine has no means of detecting when aggregation state (sums per symbol) can be discarded, you may use the following hints to control aggregation state lifetime.
The @Hint("reclaim_group_aged=
age_in_seconds") hint instructs the engine to discard aggregation state that has not been
updated for age_in_seconds seconds.
The optional @Hint("reclaim_group_freq=
sweep_frequency_in_seconds") can be used in addition to control the frequency at which the engine sweeps aggregation
state to determine aggregation state age and remove state that is older then age_in_seconds seconds.
If the hint is not specified, the frequency defaults to the same value as age_in_seconds.
The updated sample statement with both hints:
// Instruct engine to remove state older then 10 seconds and sweep every 5 seconds @Hint('reclaim_group_aged=10,reclaim_group_freq=5') select symbol, sum(price) from StockTickEvent group by timestamp
Variables may also be used to provide values for age_in_seconds and sweep_frequency_in_seconds.
This example statement uses a variable named varAge
to control how long aggregation state remains in memory, and the engine defaults the sweep frequency to the same value as the variable provides:
@Hint('reclaim_group_aged=varAge') select symbol, sum(price) from StockTickEvent group by timestamp
Use the having
clause to pass or reject events defined by the group-by
clause. The having
clause sets conditions for the group by
clause in the same way where
sets conditions for the select
clause, except where
cannot include aggregate functions, while having
often does.
This statement is an example of a having
clause with an aggregate function. It posts the total price per symbol for the last 30 seconds of stock tick events for only those symbols in which the total price exceeds 1000. The having
clause eliminates all symbols where the total price is equal or less then 1000.
select symbol, sum(price) from StockTickEvent.win:time(30 sec) group by symbol having sum(price) > 1000
To include more then one condition in the having
clause combine the conditions with and
, or
or not
.
This is shown in the statement below which selects only groups with a total price greater then 1000 and an average volume less then 500.
select symbol, sum(price), avg(volume) from StockTickEvent.win:time(30 sec) group by symbol having sum(price) > 1000 and avg(volume) < 500
A statement with the having
clause should also have a group by
clause. If you omit group-by
, all the events not excluded
by the where
clause return as a single group. In that case having
acts like a where
except that having
can have aggregate functions.
The having
clause can also be used without group by
clause as the below example shows. The example below posts events where the price is less then the current running average price of all stock tick events in the last 30 seconds.
select symbol, price, avg(price) from StockTickEvent.win:time(30 sec) having price < avg(price)
When you include filters, the where
condition, the group by
clause and the having
condition in an EPL statement
the sequence in which each clause affects events determines the final result:
The event stream's filter condition, if present, dictates which events enter a window (if one is used). The filter discards any events not meeting filter criteria.
The where
clause excludes events that do not meet its search condition.
Aggregate functions in the select list calculate summary values for each group.
The having
clause excludes events from the final results that do not meet its search condition.
The following query illustrates the use of filter, where
, group by
and having
clauses in one statement with
a select
clause containing an aggregate function.
select tickDataFeed, stddev(price) from StockTickEvent(symbol='IBM').win:length(10) where volume > 1000 group by tickDataFeed having stddev(price) > 0.8
Esper filters events using the filter criteria for the event stream StockTickEvent
. In the example above only events with symbol IBM enter the length window over the last 10 events, all other events are simply discarded. The where
clause removes any events posted by the length window (events entering the window and event leaving the window) that do not match the condition of volume greater then 1000. Remaining events are applied to the stddev
standard deviation aggregate function for each tick data feed as specified in the group by
clause. Each tickDataFeed
value generates one event. Esper applies the having
clause and only lets events pass for tickDataFeed
groups with a standard deviation of price greater then 0.8.
The keyed segmented context create context ... partition by and the group by clause as well as the built-in std:groupwin view are similar in their ability to group events but very different in their semantics. This section explains the key differences in their behavior and use.
The keyed segmented context as declared with create context ... partition by and context .... select ... creates a new context partition per key value(s). The engine maintains separate data window views as well as separate aggregations per context partition; thereby the keyed segmented context applies to both. See Section 4.2.2, “Keyed Segmented Context” for additional examples.
The group by clause works together with aggregation functions in your statement to produce an aggregation result per group. In greater detail, this means that when a new event arrives, the engine applies the expressions in the group by clause to determine a grouping key. If the engine has not encountered that grouping key before (a new group), the engine creates a set of new aggregation results for that grouping key and performs the aggregation changing that new set of aggregation results. If the grouping key points to an existing set of prior aggregation results (an existing group), the engine performs the aggregation changing the prior set of aggregation results for that group.
The std:groupwin view is a built-in view that groups events into data windows. The view is described in greater detail in Section 12.3.2, “Grouped Data Window (std:groupwin)”. Its primary use is to create a separate data window per group, or more generally to create separate instances of all its sub-views for each grouping key encountered.
The table below summarizes the point:
Table 5.4. Grouping Options
Option | Description |
---|---|
Keyed Segmented Context |
Separate context partition per key value. Affects all of data windows, aggregations, patterns, etc. (except variables which are global). |
Grouped Data Window (std:groupwin) |
Separate data window per key value. Affects only the data window that is declared next to it. |
Group By Clause (group by) |
Separate aggregation values per key value. Affects only aggregation values. |
Please review the performance section for advice related to performance or memory-use.
The next example shows queries that produce equivalent results. The query using the group by clause is generally preferable as is easier to read. The second form introduces the stat:uni
view which computes univariate statistics for a given property:
select symbol, avg(price) from StockTickEvent group by symbol // ... is equivalent to ... select symbol, average from StockTickEvent.std:groupwin(symbol).stat:uni(price)
The next example shows two queries that are NOT equivalent as the length window is ungrouped in the first query, and grouped in the second query:
select symbol, sum(price) from StockTickEvent.win:length(10) group by symbol // ... NOT equivalent to ... select symbol, sum(price) from StockTickEvent.std:groupwin(symbol).win:length(10)
The key difference between the two statements is that in the first statement the length window is ungrouped and applies to all events regardless of group. While in the second query each group gets its own instance of a length window. For example, in the second query events arriving for symbol "ABC" get a length window of 10 events, and events arriving for symbol "DEF" get their own length window of 10 events.
The output
clause is optional in Esper and is used to control or stabilize the rate at which events are output and to suppress output events. The EPL language provides for several different ways to control output rate.
Here is the syntax for the output
clause that specifies a rate in time interval or number of events:
output [after suppression_def] [[all | first | last | snapshot] every output_rate [seconds | events]] [and when terminated]
An alternate syntax specifies the time period between output as outlined in Section 5.2.1, “Specifying Time Periods” :
output [after suppression_def] [[all | first | last | snapshot] every time_period] [and when terminated]
A crontab-like schedule can also be specified. The schedule parameters follow the pattern observer parameters and are further described in Section 6.6.2.2, “timer:at” :
output [after suppression_def] [[all | first | last | snapshot] at (minutes, hours, days of month, months, days of week [, seconds])] [and when terminated]
For use with contexts, in order to trigger output only when a context partition terminates, specify when terminated
as further described in Section 4.5, “Output When Context Partition Ends”:
output [after suppression_def] [[all | first | last | snapshot] when terminated [and termination_expression] [then set variable_name = assign_expression [, variable_name = assign_expression [,...]]] ]
Last, output can be controlled by an expression that may contain variables, user-defined functions and information about the number of collected events. Output that is controlled by an expression is discussed in detail below.
The after
keyword and suppression_def can appear alone or together with further output conditions and suppresses output events.
For example, the following statement outputs, every 60 seconds, the total price for all orders in the 30-minute time window:
select sum(price) from OrderEvent.win:time(30 min) output snapshot every 60 seconds
The all
keyword is the default and specifies that all events in a batch should be output, each incoming row in the batch producing an output row.
Note that for statements that group via the group by
clause, the all
keyword provides special behavior as below.
The first
keyword specifies that only the first event in an output batch is to be output.
Using the first
keyword instructs the engine to output the first matching event as soon as it arrives, and then ignores matching events for the time interval or number of events specified.
After the time interval elapsed, or the number of matching events has been reached, the next first matching event is output again and the following interval the engine again ignores matching events.
For statements that group via the group by
clause, the first
keywords provides special behavior as below.
The last
keyword specifies to only output the last event at the end of the given time interval or after the given number of matching events
have been accumulated. Again, for statements that group via the group by
clause the last
keyword provides special behavior as below.
The snapshot
keyword indicates that the engine output current computation results considering all events as per views specified and/or current aggregation results. While the other keywords control how a batch of events between output intervals is being considered, the snapshot
keyword outputs all current state of a statement independent of the last batch. Its output is equivalent to the iterator
method provided by a statement. The snapshot
keyword requires a data window declaration if not used within a join and outputs only the last event if used without a data window.
The output_rate is the frequency at which the engine outputs events. It can be specified in terms of time or number of events. The value can be a number to denote a fixed output rate, or the name of a variable whose value is the output rate. By means of a variable the output rate can be controlled externally and changed dynamically at runtime.
Please consult the Appendix A, Output Reference and Samples for detailed information on insert and remove stream output for the various output
clause keywords.
For use with contexts you may append the keywords and when terminated
to trigger output at the rate defined and in addition trigger output when the context partition terminates. Please see Section 4.5, “Output When Context Partition Ends” for details.
The time interval can also be specified in terms of minutes; the following statement is identical to the first one.
select * from StockTickEvent output every 1.5 minutes
A second way that output can be stabilized is by batching events until a certain number of events have been collected:
select * from StockTickEvent output every 5 events
Additionally, event output can be further modified by the optional last
keyword, which causes output of only the last event to arrive into an output batch.
select * from StockTickEvent output last every 5 events
Using the first
keyword you can be notified at the start of the interval. The allows to watch for situations such as a rate falling below a threshold
and only be informed every now and again after the specified output interval, but be informed the moment it first happens.
select * from TickRate where rate<100 output first every 60 seconds
A sample statement using the Unix "crontab"-command schedule is shown next. See Section 6.6.2.2, “timer:at” for details on schedule syntax. Here, output occurs every 15 minutes from 8am to 5:45pm (hours 8 to 17 at 0, 15, 30 and 45 minutes past the hour):
select symbol, sum(price) from StockTickEvent group by symbol output at (*/15, 8:17, *, *, *)
Output can also be controlled by an expression that may check variable values, use user-defined functions and query built-in properties that provide additional information. The synopsis is as follows:
output [after suppression_def] [[all | first | last | snapshot] when trigger_expression [then set variable_name = assign_expression [, variable_name = assign_expression [,...]]] [and when terminated [and termination_expression] [then set variable_name = assign_expression [, variable_name = assign_expression [,...]]] ]
The when
keyword must be followed by a trigger expression returning a boolean value of true or false, indicating whether to output.
Use the optional then
keyword to change variable values after the trigger expression evaluates to true. An assignment expression assigns a new value to variable(s).
For use with contexts you may append the keywords and when terminated
to also trigger output when the context partition terminates. Please see Section 4.5, “Output When Context Partition Ends” for details. You may optionally specify a termination expression. If that expression is provided the
engine evaluates the expression when the context partition terminates: The evaluation result of true
means output occurs when the context partition terminates, false
means no output occurs when the context partition terminates.
You may specify then set
followed by a list of assignments to assign variables. Assignments are executed on context partition termination regardless of the termination expression, if present.
Lets consider an example. The next statement assumes that your application has defined a variable by name OutputTriggerVar of boolean type. The statement outputs rows only when the OutputTriggerVar variable has a boolean value of true:
select sum(price) from StockTickEvent output when OutputTriggerVar = true
The engine evaluates the trigger expression when streams and data views post one or more insert or remove stream events after considering the where
clause, if present. It also evaluates the trigger expression when any of the variables used in the trigger expression, if any, changes value. Thus output occurs as follows:
When there are insert or remove stream events and the when
trigger expression evaluates to true, the engine outputs the resulting rows.
When any of the variables in the when
trigger expression changes value, the engine evaluates the expression and outputs results. Result output occurs within the minimum time interval of timer resolution (100 milliseconds).
By adding a then
part to the EPL, we can reset any variables after the trigger expression evaluated to true:
select sum(price) from StockTickEvent output when OutputTriggerVar = true then set OutputTriggerVar = false
Expressions in the when
and then
may, for example, use variables, user defined functions or any of the built-in named properties that are described in the below list.
The following built-in properties are available for use:
Table 5.5. Built-In Properties for Use with Output When
Built-In Property Name | Description |
---|---|
last_output_timestamp | Timestamp when the last output occurred for the statement; Initially set to time of statement creation |
count_insert | Number of insert stream events |
count_insert_total | Number of insert stream events in total (not reset when output occurs). |
count_remove | Number of remove stream events |
count_remove_total | Number of remove stream events in total (not reset when output occurs). |
The values provided by count_insert
and count_remove
are non-continues: The number returned for these properties may 'jump' up rather then count up by 1.
The counts reset to zero upon output.
The following restrictions apply to expressions used in the output rate clause:
Event property names cannot be used in the output clause.
Aggregation functions cannot be used in the output clause.
The prev
previous event function and the prior
prior event function cannot be used in the output clause.
The after
keyword and its time period or number of events parameters is optional and can occur after the output
keyword, either alone or with output conditions as listed above.
The synopsis of after
is as follows:
output after time_period | number events [...]
When using after
either alone or together with further output conditions, the engine discards all output events until the time period passed as measured from the start of the statement, or until the
number of output events are reached. The discarded events are not output and do not count towards any further output conditions if any are specified.
For example, the following statement outputs every minute the total price for all orders in the 30-minute time window but only after 30 minutes have passed:
select sum(price) from OrderEvent.win:time(30 min) output after 30 min snapshot every 1 min
An example in which after
occur alone is below, in a statement that outputs total price for all orders in the last minute but only after 1 minute passed, each time an event arrives or leaves the data window:
select sum(price) from OrderEvent.win:time(1 min) output after 1 min
To demonstrate after
when used with an event count, this statement find pairs of orders with the same id but suppresses output for the first 5 pairs:
select * from pattern[every o=OrderEvent->p=OrderEvent(id=o.id)] output after 5 events
Remove stream events can also be useful in conjunction with aggregation and the output
clause: When the engine posts remove stream events for fully-aggregated queries, it presents the aggregation state before the expiring event leaves the data window. Your application can thus easily obtain a delta between the new aggregation value and the prior aggregation value.
The engine evaluates the having-clause at the granularity of the data posted by views. That is, if you utilize a time window and output every 10 events, the having
clause applies to each individual event or events entering and leaving the time window (and not once per batch of 10 events).
The output
clause interacts in two ways with the group by
and having
clauses. First, in the output every n events
case, the number n
refers to the number of events arriving into the group by clause
. That is, if the group by
clause outputs only 1 event per group, or if the arriving events don't satisfy the having
clause, then the actual number of events output by the statement could be fewer than n
.
Second, the last
, all
and first
keywords have special meanings when used in a statement with aggregate functions and the group by
clause:
When no keyword is specified, the engine produces an output row for each row in the batch or when using group-by then an output per group only for those groups present in the batch, following Section 3.7.2, “Output for Aggregation and Group-By”.
The all
keyword (the default) specifies that the most recent data for all groups seen so far should be output, whether or not these groups' aggregate values have just been updated
The last
keyword specifies that only groups whose aggregate values have been updated with the most recent batch of events should be output.
The first
keyword specifies that only groups whose aggregate values have been updated with the most recent batch of events should be output following the defined frequency, keeping frequency state for each group.
Please consult the Appendix A, Output Reference and Samples for detailed information on insert and remove stream output for aggregation and group-by.
By adding an output rate limiting clause to a statement that contains a group by clause we can control output of groups to obtain one row for each group, generating an event per group at the given output frequency.
The next statement outputs total price per symbol cumulatively (no data window was used here). As it specifies the all
keyword, the statement outputs the current value for all groups seen so far, regardless of whether the group was updated in the last interval. Output occurs after an interval of 5 seconds passed and at the end of each subsequent interval:
select symbol, sum(price) from StockTickEvent group by symbol output all every 5 seconds
The below statement outputs total price per symbol considering events in the last 3 minutes. When events leave the 3-minute data window output also occurs as new aggregation values are computed. The last
keyword instructs the engine to output only those groups that had changes. Output occurs after an interval of 10 seconds passed and at the end of each subsequent interval:
select symbol, sum(price) from StockTickEvent.win:time(3 min) group by symbol output last every 10 seconds
This statement also outputs total price per symbol considering events in the last 3 minutes. The first
keyword instructs the engine to output as soon as there is a new value for a group. After output for a given group the engine suppresses output for the same group for 10 seconds and does not suppress output for other groups. Output occurs again for that group after the interval when the group has new value(s):
select symbol, sum(price) from StockTickEvent.win:time(3 min) group by symbol output first every 10 seconds
Output rate limiting provides output events to your application in regular intervals. Between intervals, the engine uses a buffer to hold events until the output condition is reached. If your application has high-volume streams, you may need to be mindful of the memory needs for output rates.
The output
clause with the snapshot
keyword does not require a buffer, all other output keywords do consume memory until the output condition is reached.
The order by
clause is optional. It is used for ordering output events by their properties, or by expressions involving those properties. .
For example, the following statement outputs batches of 5 or more stock tick events that are sorted first by price ascending and then by volume ascending:
select symbol from StockTickEvent.win:time(60 sec) output every 5 events order by price, volume
Here is the syntax for the order by
clause:
order by expression [asc | desc] [, expression [asc | desc]] [, ...]
If the order by
clause is absent then the engine still makes certain guarantees about the ordering of output:
If the statement is not a join, does not group via group by
clause and does not declare grouped data windows via std:groupwin
view, the order in which events are delivered to listeners and through the iterator
pull API is the order of event arrival.
If the statement is a join or outer join, or groups, then the order in which events are delivered to listeners and through the iterator
pull API is not well-defined. Use the order by
clause if your application requires events to be delivered in a well-defined order.
Esper places the following restrictions on the expressions in the order by
clause:
All aggregate functions that appear in the order by
clause must also appear in the select
expression.
Otherwise, any kind of expression that can appear in the select
clause,
as well as any name defined in the select
clause, is also valid in the order by clause.
By default all sort operations on string values are performed via the compare
method and are thus not locale dependent. To account for differences in language or locale, see Section 15.4.21, “Engine Settings related to Language and Locale” to change this setting.
The limit
clause is typically used together with the order by
and output
clause to limit your query results to those that fall within a specified range. You can use it to receive the first given number of result rows, or to receive a range of result rows.
There are two syntaxes for the limit
clause, each can be parameterized by integer constants or by variable names. The first syntax is shown below:
limit row_count [offset offset_count]
The required row_count parameter specifies the number of rows to output. The row_count can be an integer constant and can also be the name of the integer-type variable to evaluate at runtime.
The optional offset_count parameter specifies the number of rows that should be skipped (offset) at the beginning of the result set. A variable can also be used for this parameter.
The next sample EPL query outputs the top 10 counts per property 'uri' every 1 minute.
select uri, count(*) from WebEvent group by uri output snapshot every 1 minute order by count(*) desc limit 10
The next statement demonstrates the use of the offset
keyword. It outputs ranks 3 to 10 per property 'uri' every 1 minute:
select uri, count(*) from WebEvent group by uri output snapshot every 1 minute order by count(*) desc limit 8 offset 2
The second syntax for the limit
clause is for SQL standard compatibility and specifies the offset first, followed by the row count:
limit offset_count[, row_count]
The following are equivalent:
limit 8 offset 2 // ...equivalent to limit 2, 8
A negative value for row_count returns an unlimited number or rows, and a zero value returns no rows. If variables are used, then the current variable value at the time of output dictates the row count and offset. A variable returning a null value for row_count also returns an unlimited number or rows.
A negative value for offset is not allowed. If your variable returns a negative or null value for offset then the value is assumed to be zero (i.e. no offset).
The iterator
pull API also honors the limit
clause, if present.
The insert into
clause is optional in Esper. The clause can be specified to make the results of a statement available as an event stream for use
in further statements, or to insert events into a named window. The clause can also be used to merge multiple event streams to form a single stream of events.
The syntax for the insert into
clause is as follows:
insert [istream | irstream | rstream] into event_stream_name [ (property_name [, property_name] ) ]
The istream
(default) and rstream
keywords are optional. If no keyword or the istream
keyword is specified, the engine supplies the insert stream events generated by the statement. The insert stream consists of the events entering the respective window(s) or stream(s). If the rstream
keyword is specified, the engine supplies the remove stream events generated by the statement. The remove stream consists of the events leaving the respective window(s).
If your application specifies irstream
, the engine inserts into the new stream both the insert and remove stream. This is often useful in connection with the istream
built-in function that returns an inserted/removed boolean indicator for each event, see Section 9.1.7, “The Istream Function”.
The event_stream_name
is an identifier that names the event stream (and also implicitly names the types of events in the stream) generated by the engine. The identifier can be used in further statements to filter and
process events of that event stream. The insert into
clause can consist of just an event stream name, or an event stream name and one or more property names.
The engine also allows listeners to be attached to a statement that contain an insert into
clause. Listeners receive all events posted to the event stream.
To merge event streams, simply use the same event_stream_name
identifier in all EPL statements that merge their result event streams. Make sure to use the
same number and names of event properties and event property types match up.
Esper places the following restrictions on the insert into
clause:
The number of elements in the select
clause must match the number of elements in the insert into
clause if the clause specifies a list of event property names
If the event stream name has already been defined by a prior statement or configuration, and the event property names and/or event types do not match, an exception is thrown at statement creation time.
The following sample inserts into an event stream by name CombinedEvent:
insert into CombinedEvent select A.customerId as custId, A.timestamp - B.timestamp as latency from EventA.win:time(30 min) A, EventB.win:time(30 min) B where A.txnId = B.txnId
Each event in the CombinedEvent
event stream has two event properties named "custId" and "latency". The events generated by the above statement can be used in further statements, such as shown in the next statement:
select custId, sum(latency) from CombinedEvent.win:time(30 min) group by custId
The example statement below shows the alternative form of the insert into
clause that explicitly defines the property names to use.
insert into CombinedEvent (custId, latency) select A.customerId, A.timestamp - B.timestamp ...
The rstream
keyword can be useful to indicate to the engine to generate only remove stream events. This can be useful if we want to trigger
actions when events leave a window rather then when events enter a window. The statement below generates CombinedEvent
events when
EventA and EventB leave the window after 30 minutes.
insert rstream into CombinedEvent select A.customerId as custId, A.timestamp - B.timestamp as latency from EventA.win:time(30 min) A, EventB.win:time(30 min) B where A.txnId = B.txnId
The insert into
clause can be used in connection with patterns to provide pattern results to further statements for analysis:
insert into ReUpEvent select linkUp.ip as ip from pattern [every linkDown=LinkDownEvent -> linkUp=LinkUpEvent(ip=linkDown.ip)]
Sometimes your events may carry properties that are themselves event objects. Therefore EPL offers a special syntax to insert the value of a property itself as an event into a stream:
insert into stream_name select property_name.* from ...
This feature is only supported for JavaBean events and for Map
and Object-array (Object[]
) event types that associate an event type name with the property type. It is not supported for XML
events. Nested property names are also not supported.
In this example, the class Summary
with properties bid
and ask
that are of type Quote
is:
public class Summary { private Quote bid; private Quote ask; ...
The statement to populate a stream of Quote
events is thus:
insert into MyBidStream select bid.* from Summary
The insert into
clause allows to merge multiple event streams into a event single stream.
The clause names an event stream to insert into by specifing an event_stream_name. The first statement that inserts into the named stream defines the stream's event types. Further statements that
insert into the same event stream must match the type of events inserted into the stream as declared by the first statement.
One approach to merging event streams specifies individual colum names either in the select
clause or in the insert into
clause of the statement. This approach has been shown in earlier examples.
Another approach to merging event streams specifies the wildcard (*) in the select
clause (or the stream wildcard) to select the underlying event. The events in the event stream must then
have the same event type as generated by the from
clause.
Assume a statement creates an event stream named MergedStream by selecting OrderEvent events:
insert into MergedStream select * from OrderEvent
A statement can use the stream wildcard selector to select only OrderEvent events in a join:
insert into MergedStream select ord.* from ItemScanEvent, OrderEvent as ord
And a statement may also use an application-supplied user-defined function to convert events to OrderEvent instances:
insert into MergedStream select MyLib.convert(item) from ItemScanEvent as item
Esper specifically recognizes a conversion function as follows: A conversion function must be the only selected column, and it must return either a Java object or java.util.Map
or Object[]
(object array).
Your EPL should not use the as
keyword to assign a column name.
A variant stream is a predefined stream into which events of multiple disparate event types can be inserted.
A variant stream name may appear anywhere in a pattern or from
clause. In a pattern, a filter against a variant stream matches any events of any of the event types inserted into the variant stream.
In a from
clause including for named windows, views declared onto a variant stream may hold events of any of the event types inserted into the variant stream.
A variant stream is thus useful in problems that require different types of event to be treated the same.
Variant streams can be declared by means of create variant schema
or can be predefined via runtime or initialization-time configuration as described in Section 15.4.27, “Variant Stream”. Your application may declare or predefine variant streams to carry events of a limited set of event types, or you may choose the variant stream to carry any and all types of events. This choice affects what event properties are available for consuming statements or patterns of the variant stream.
Assume that an application predefined a variant stream named OrderStream
to carry only ServiceOrder
and ProductOrder
events. An insert into
clause inserts events into
the variant stream:
insert into OrderStream select * from ServiceOrder
insert into OrderStream select * from ProductOrder
Here is a sample statement that consumes the variant stream and outputs a total price per customer id for the last 30 seconds of ServiceOrder
and ProductOrder
events:
select customerId, sum(price) from OrderStream.win:time(30 sec) group by customerId
If your application predefines the variant stream to hold specific type of events, as the sample above did, then all event properties that are common to all specified types are visible on the variant stream, including nested, indexed and mapped properties. For access to properties that are only available on one of the types, the dynamic property syntax must be used. In the example above, the customerId
and price
were properties common to both ServiceOrder
and ProductOrder
events.
For example, here is a consuming statement that selects a service duraction
property that only ServiceOrder
events have, and that must therefore be casted to double and null values removed in order to aggregate:
select customerId, sum(coalesce(cast(serviceDuraction?, double), 0)) from OrderStream.win:time(30 sec) group by customerId
If your application predefines a variant stream to hold any type of events (the any
type variance), then all event properties of the variant stream are effectively dynamic properties.
For example, an application may define an OutgoingEvents
variant stream to hold any type of event. The next statement is a sample consumer of the OutgoingEvents
variant stream that looks for the destination
property and fires for each event in which the property exists with a value of 'email'
:
select * from OutgoingEvents(destination = 'email')
Your select
clause may use the '*' wildcard together with further expressions to populate a stream of events. A sample statement is:
insert into OrderStream select *, price*units as linePrice from PurchaseOrder
When using wildcard and selecting additional expression results, the engine produces what is called decorating events for the resulting stream. Decorating events add additional property values to an underlying event.
In the above example the resulting OrderStream consists of underlying PurchaseOrder events decorated by a linePrice
property that is a result of the price*units
expression.
In order to use insert into
to insert into an existing stream of decorated events, your underlying event type must match, and all additional decorating property names and types of the select
clause must also match.
Your select
clause may use the stream name to populate a stream of events in which each event has properties that are itself an event. A sample statement is:
insert into CompositeStream select order, service, order.price+service.price as totalPrice from PurchaseOrder.std:lastevent() as order, ServiceEvent:std:lastevent() as service
When using the stream name (or tag in patterns) in the select-clause, the engine produces composite events: One or more of the properties of the composite event are events themselves.
In the above example the resulting CompositeStream consists of 3 columns: the PurchaseOrder event, the ServiceEvent event and the totalPrice
property that is a result of the order.price+service.price
expression.
In order to use insert into
to insert into an existing stream of events in which properties are themselves events, each event column's event type must match, and all additional property names and types of the select
clause must also match.
Your insert into
clause may also directly instantiate and populate application underlying event objects or Map
or Object[]
event objects. This is described in greater detail in Section 2.12, “Event Objects Instantiated and Populated by Insert Into”.
If instead you have an expression that returns an event object, please read on to the next section.
You can transpose an object returned as an expression result into a stream using the transpose
function as described further in Section 9.4, “Select-Clause transpose Function”.
When you declare the inserted-into event type in advance to the statement that inserts, the engine compares the inserted-into event type information to the return type of expressions in the select-clause.
The comparison uses the column alias assigned to each select-clause expression using the as
keyword.
When the inserted-into column type is an event type and when using a subquery or the new
operator,
the engine compares column names assigned to subquery columns or new
operator columns.
For example, assume a PurchaseOrder
event type that has a property called items
that consists of Item
rows:
create schema Item(name string, price double)
create schema PurchaseOrder(orderId string, items Item[])
Declare a statement that inserts into the PurchaseOrder
stream:
insert into PurchaseOrder select '001' as orderId, new {name='i1', price=10} as items from TriggerEvent
The alias assigned to the first and second expression in the select-clause, namely orderId
and items
,
both match the event property names of the Purchase Order
event type. The column names provided to the new
operator
also both match the event property names of the Item
event type.
When the event type declares the column as a single value (and not an array) and when the select-clause expression produces a multiple rows, the engine only populate the first row.
Consider a PurchaseOrder
event type that has a property called item
that consists of a single Item
event:
create schema PurchaseOrder(orderId string, items Item)
The sample subquery below populates only the very first event, discarding remaining subquery result events, since the items
property above is declared as holding a single Item
-typed event only (versus Item[]
to hold multiple Item
-typed events).
insert into PurchaseOrder select (select 'i1' as name, 10 as price from HistoryEvent.win:length(2)) as items from TriggerEvent
Consider using a subquery with filter, or one of the enumeration methods to select a specific subquery result row.
A subquery is a select
within another statement. Esper supports subqueries in the select
clause, where
clause, having
clause and in stream and pattern filter expressions. Subqueries provide an alternative way to perform operations that would otherwise require complex joins. Subqueries can also make statements more readable then complex joins.
Esper supports both simple subqueries as well as correlated subqueries. In a simple subquery, the inner query is not correlated to the outer query. Here is an example simple subquery within a select
clause:
select assetId, (select zone from ZoneClosed.std:lastevent()) as lastClosed from RFIDEvent
If the inner query is dependent on the outer query, we will have a correlated subquery. An example of a correlated subquery is shown below. Notice the where
clause in the inner query, where the condition involves a stream from the outer query:
select * from RfidEvent as RFID where 'Dock 1' = (select name from Zones.std:unique(zoneId) where zoneId = RFID.zoneId)
The example above shows a subquery in the where
clause. The statement selects RFID events in which the zone name matches a string constant based on zone id. The statement uses the view std:unique
to guarantee that only the last event per zone id is held from processing by the subquery.
The next example is a correlated subquery within a select
clause. In this statement the select
clause retrieves the zone name by means of a subquery against the Zones set of events correlated by zone id:
select zoneId, (select name from Zones.std:unique(zoneId) where zoneId = RFID.zoneId) as name from RFIDEvent
Note that when a simple or correlated subquery returns multiple rows, the engine returns a null
value as the subquery result. To limit the number of events returned by a subquery consider using one of the views std:lastevent
, std:unique
and std:groupwin
or aggregation functions or the multi-row and multi-column selects as described below.
The select
clause of a subquery also allows wildcard selects, which return as an event property the underlying event object of the event type as defined in the from
clause. An example:
select (select * from MarketData.std:lastevent()) as md from pattern [every timer:interval(10 sec)]
The output events to the statement above contain the underlying MarketData event in a property named "md". The statement populates the last MarketData event into a property named "md" every 10 seconds following the pattern definition, or populates a null
value if no MarketData event has been encountered so far.
Aggregation functions may be used in the select
clause of the subselect as this example outlines:
select * from MarketData where price > (select max(price) from MarketData(symbol='GOOG').std:lastevent())
As the sub-select expression is evaluated first (by default), the query above actually never fires for the GOOG symbol, only for other symbols that have a price higher then the current maximum for GOOG. As a sidenote, the insert into
clause can also be handy to compute aggregation results for use in multiple subqueries.
When using aggregation functions in a correlated subselect the engine computes the aggregation based on data window or named window contents matching the where-clause.
The following example compares the quantity value provided by the current order event against the total quantity of all order events in the last 1 hour for the same client.
select * from OrderEvent oe where qty > (select sum(qty) from OrderEvent.win:time(1 hour) pd where pd.client = oe.client)
Filter expressions in a pattern or stream may also employ subqueries. Subqueries can be uncorrelated or can be correlated to properties of the stream or to properties of tagged events in a pattern. Subqueries may reference named windows as well.
The following example filters BarData
events that have a close price less then the last moving average (field movAgv
) as provided by stream SMA20Stream
(an uncorrelated subquery):
select * from BarData(ticker='MSFT', closePrice < (select movAgv from SMA20Stream(ticker='MSFT').std:lastevent()))
A few generic examples follow to demonstrate the point. The examples use short event and property names so they are easy to read. Assume A
and B
are streams and DNamedWindow
is a named window, and properties a_id, b_id, d_id, a_val, b_val, d_val
respectively:
// Sample correlated subquery as part of stream filter criteria select * from A(a_val in (select b_val from B.std:unique(b_val) as b where a.a_id = b.b_id)) as a
// Sample correlated subquery against a named window select * from A(a_val in (select b_val from DNamedWindow as d where a.a_id = d.d_id)) as a
// Sample correlated subquery in the filter criteria as part of a pattern, querying a named window select * from pattern [ a=A -> b=B(bvalue = (select d_val from DNamedWindow as d where d.d_id = b.b_id and d.d_id = a.a_id)) ]
Subquery state starts to accumulate as soon as a statement starts (and not only when a pattern-subexpression activates).
The following restrictions apply to subqueries:
The subquery stream definition must define a data window or other view to limit subquery results, reducing the number of events held for subquery execution
Subqueries can only consist of a select
clause, a from
clause, a where
clause and a group by
clause. The having
clause, as well as joins, outer-joins and output rate limiting are not permitted within subqueries.
If using aggregation functions in a subquery, note these limitations:
None of the properties of the correlated stream(s) can be used within aggregation functions.
The properties of the subselect stream must all be within aggregation functions.
The order of evaluation of subqueries relative to the containing statement is guaranteed: If the containing statement and its subqueries are reacting to the same type of event, the subquery will receive the event first before the containing statement's clauses are evaluated. This behavior can be changed via configuration. The order of evaluation of subqueries is not guaranteed between subqueries.
Performance of your statement containing one or more subqueries principally depends on two parameters. First, if your subquery correlates one or more columns in the subquery stream with the enclosing statement's streams, the engine automatically builds the appropriate indexes for fast row retrieval based on the key values correlated (joined). The second parameter is the number of rows found in the subquery stream and the complexity of the filter criteria (where
clause), as each row in the subquery stream must evaluate against the where
clause filter.
The exists
condition is considered "to be met" if the subquery returns at least one row. The not exists
condition is considered true if the subquery returns no rows.
The synopsis for the exists
keyword is as follows:
exists (subquery)
Let's take a look at a simple example. The following is an EPL statement that uses the exists
condition:
select assetId from RFIDEvent as RFID where exists (select * from Asset.std:unique(assetId) where assetId = RFID.assetId)
This select statement will return all RFID events where there is at least one event in Assets unique by asset id with the same asset id.
The in
subquery condition is true if the value of an expression matches one or more of the values returned by the subquery. Consequently, the not in
condition is true if the value of an expression matches none of the values returned by the subquery.
The synopsis for the in
keyword is as follows:
expression in (subquery)
The right-hand side subquery must return exactly one column.
The next statement demonstrates the use of the in
subquery condition:
select assetId from RFIDEvent where zone in (select zone from ZoneUpdate(status = 'closed').win:time(10 min))
The above statement demonstrated the in
subquery to select RFID events for which the zone status is in a closed state.
Note that if the left-hand expression yields null, or if there are no equal right-hand values and at least one right-hand row yields null, the result of the in
construct will be null, not false (or true for not-in
). This is in accordance with SQL's normal rules for Boolean combinations of null values.
The any
subquery condition is true if the expression returns true for one or more of the values returned by the subquery.
The synopsis for the any
keyword is as follows:
expression operator any (subquery) expression operator some (subquery)
The left-hand expression is evaluated and compared to each row of the subquery result using the given operator, which must yield a Boolean result. The result of any
is "true" if any true result is obtained. The result is "false" if no true result is found (including the special case where the subquery returns no rows).
The operator can be any of the following values: =, !=, <>, <, <=, >, >=
.
The some
keyword is a synonym for any
. The in
construct is equivalent to = any
.
The right-hand side subquery must return exactly one column.
The next statement demonstrates the use of the any
subquery condition:
select * from ProductOrder as ord where quantity < any (select minimumQuantity from MinimumQuantity.win:keepall())
The above query compares ProductOrder event's quantity value with all rows from the MinimumQuantity stream of events and returns only those ProductOrder events that have a quantity that is less then any of the minimum quantity values of the MinimumQuantity events.
Note that if there are no successes and at least one right-hand row yields null for the operator's result, the result of the any
construct will be null, not false. This is in accordance with SQL's normal rules for Boolean combinations of null values.
The all
subquery condition is true if the expression returns true for all of the values returned by the subquery.
The synopsis for the all
keyword is as follows:
expression operator all (subquery)
The left-hand expression is evaluated and compared to each row of the subquery result using the given operator, which must yield a Boolean result. The result of all
is "true" if all rows yield true (including the special case where the subquery returns no rows). The result is "false" if any false result is found. The result is null
if the comparison does not return false for any row, and it returns null
for at least one row.
The operator can be any of the following values: =, !=, <>, <, <=, >, >=
.
The not in
construct is equivalent to != all
.
The right-hand side subquery must return exactly one column.
The next statement demonstrates the use of the all
subquery condition:
select * from ProductOrder as ord where quantity < all (select minimumQuantity from MinimumQuantity.win:keepall())
The above query compares ProductOrder event's quantity value with all rows from the MinimumQuantity stream of events and returns only those ProductOrder events that have a quantity that is less then all of the minimum quantity values of the MinimumQuantity events.
The optional group by
clause in subqueries works the same way as the group-by clause outside of subqueries,
except that it impacts only those aggregations within the subquery.
The following restrictions apply:
Expressions in the group-by clause cannot contain aggregate functions, subqueries or the prev
and prior
functions.
Subqueries only support the fully-aggregated case when using group-by: All non-aggregated properties in the select clause must be listed in the group by clause.
The group-by expressions cannot be correlated. All properties in the group by
must be provided by the subselect stream.
Your subquery may select multiple columns in the select
clause including multiple aggregated values from a data window or named window.
The following example is a correlated subquery that selects wildcard and in addition selects the bid
and offer
properties of the last MarketData
event for the same symbol as the arriving OrderEvent
:
select *, (select bid, offer from MarketData.std:unique(symbol) as md where md.symbol = oe.symbol) as bidoffer from OrderEvent oe
Output events for the above query contain all properties of the original OrderEvent
event. In addition each output event contains a bidoffer
nested property that itself contains the bid
and offer
properties. You may retrieve the bid and offer from output events directly via the bidoffer.bid
property name syntax for nested properties.
The next example is similar to the above query but instead selects aggregations and selects from a named window by name OrderNamedWindow
(creation not shown here). For each arriving OrderEvent
it selects the total quantity and count of all order events for the same client, as currently held by the named window:
select *, (select sum(qty) as sumPrice, count(*) as countRows from OrderNamedWindow as onw where onw.client = oe.client) as pastOrderTotals from OrderEvent as oe
The next EPL statement computes a prorated quantity considering the maximum and minimum quantity for the last 1 minute of order events:
expression subq { (select max(quantity) as maxq, min(quantity) as minq from OrderEvent.win:time(1 min)) } select (quantity - minq) / (subq().maxq - subq().minq) as prorated from OrderEvent
Output events for the above query contain all properties of the original OrderEvent
event. In addition each output event contains a pastOrderTotals
nested property that itself contains the sumPrice
and countRows
properties.
While a subquery cannot change the cardinality of the selected stream, a subquery can return multiple values from the selected data window or named window. This section shows examples of the window
aggregation function as well as the use of enumeration methods with subselects.
Consider using an inner join, outer join or unidirectional join instead to achieve a 1-to-many cardinality in the number of output events.
The next example is an uncorrelated subquery that selects all current ZoneEvent
events considering the last ZoneEvent
per zone for each arriving RFIDEvent
.
select assetId, (select window(z.*) as winzones from ZoneEvent.std:unique(zone) as z) as zones from RFIDEvent
Output events for the above query contain two properties: the assetId
property and the zones
property. The latter property is a nested property that contains the winzones
property. You may retrieve the zones from output events directly via the zones.winzones
property name syntax for nested properties.
In this example for a correlated subquery against a named window we assume that the OrderNamedWindow
has been created and contains order events. The query returns for each MarketData
event
the list of order ids for orders with the same symbol:
select price, (select window(orderId) as winorders from OrderNamedWindow onw where onw.symbol = md.symbol) as orderIds from MarketData md
Output events for the above query contain two properties: the price
property and the orderIds
property. The latter property is a nested property that contains the winorders
property of type array.
Another option to reduce selected rows to a single value is through the use of enumeration methods.
select price, (select * from OrderNamedWindow onw where onw.symbol = md.symbol).selectFrom(v => v) as ordersSymbol from MarketData md
Output events for the above query also contain a Collection of underlying events in the ordersSymbol
property.
The following hints are available to tune performance and memory use of subqueries.
Use the @Hint('set_noindex')
hint for a statement that utilizes one or more subqueries. It instructs the engine to always perform a full table scan. The engine does not build an implicit index or use an explicitly-created index when this hint is provided. Use of the hint may result in reduced memory use but poor statement performance.
The following hints are available to tune performance and memory use of subqueries that select from named windows.
Named windows are globally-visible data windows. As such an application may create explicit indexes as discussed in Section 5.15.13, “Explicitly Indexing Named Windows”. The engine may also elect to create implicit indexes (no create-index EPL required) for index-based lookup of rows when executing on-select
, on-merge
, on-update
and on-delete
statements and for statements that subquery a named window.
By default and without specifying a hint, each statement that subqueries a named window also maintains its own index for looking up events held by the named window. The engine maintains the index by consuming the named window insert and remove stream. When the statement is destroyed it releases that index.
Specify the @Hint('enable_window_subquery_indexshare')
hint to enable subquery index sharing for named windows. When using this hint, indexes for subqueries are maintained by the named window itself (and not each statement context partition), are shared between one or more statements and may also utilize explicit indexes. Specify the hint once as part of the create window
statement.
This sample EPL statement creates a named window with subquery index sharing enabled:
@Hint('enable_window_subquery_indexshare') create window AllOrdersNamedWindow.win:keepall() as OrderMapEventType
When subquery index sharing is enabled, performance may increase as named window stream consumption is no longer needed for correlated subqueries. You may also expect reduced memory use especially if a large number of EPL statements perform similar subqueries against a named window. Subquery index sharing may require additional short-lived object creation and may slightly increase lock held time for named windows.
The following statement performs a correlated subquery against the named window above. When a settlement event arrives it select the order detail for the same order id as provided by the settlement event:
select (select * from AllOrdersNamedWindow as onw where onw.orderId = se.orderId) as orderDetail from SettlementEvent as se
With subquery index sharing enabled the engine maintains an index of order events by order id for the named window, and shares that index between additional statements until the time all utilizing statements are destroyed.
You may disable subquery index sharing for a specific statement by specifying the @Hint('disable_window_subquery_indexshare')
hint, as this example shows, causing the statement to maintain its own index:
@Hint('disable_window_subquery_indexshare') select (select * from AllOrdersNamedWindow as onw where onw.orderId = se.orderId) as orderDetail from SettlementEvent as se
Two or more event streams can be part of the from
-clause and thus both (all) streams determine the resulting events. This section summarizes the important concepts. The sections that follow present more detail on each topic.
The default join is an inner join which produces output events only when there is at least one match in all streams.
Consider the sample statement shown next:
select * from TickEvent.std:lastevent(), NewsEvent.std:lastevent()
The above statement outputs the last TickEvent and the last NewsEvent in one output event when either a TickEvent or a NewsEvent arrives. If no TickEvent was received before a NewsEvent arrives, no output occurs. Similarly when no NewsEvent was received before a TickEvent arrives, no output occurs.
The where
-clause lists the join conditions that Esper uses to relate events in the two or more streams.
The next example statement retains the last TickEvent and last NewsEvent per symbol, and joins the two streams based on their symbol value:
select * from TickEvent.std:unique(symbol) as t, NewsEvent.std:unique(symbol) as n where t.symbol = n.symbol
As before, when a TickEvent arrives for a symbol that has no matching NewsEvent then there is no output event.
An outer join does not require each event in either stream to have a matching event. The full outer join is useful when output is desired when no match is found. The different outer join types (full, left, right) are explained in more detail below.
This example statement is an outer-join and also returns the last TickEvent and last NewsEvent per symbol:
select * from TickEvent.std:unique(symbol) as t full outer join NewsEvent.std:unique(symbol) as n on t.symbol = n.symbol
In the sample statement above, when a TickEvent arrives for a symbol that has no matching NewsEvent, or when a NewsEvent arrives for a symbol that has no matching TickEvent, the statement still produces an output event with a null column value for the missing event.
Note that each of the sample queries above defines a data window. The sample queries above use the last-event data window (std:lastevent) or the unique data window (std:unique). A data window serves to indicate the subset of events to join from each stream and may be required depending on the join.
In above queries, when either a TickEvent arrives or when a NewsEvent arrives then the query evaluates and there is output. The same holds true if additional streams are added to the from
-clause: Each of the streams in the from
-clause trigger the join to evaluate.
The unidirectional
keyword instructs the engine to evaluate the join only when an event arrives from the single stream that was marked with the unidirectional
keyword. In this case no data window should be specified for the stream marked as unidirectional
since the keyword implies that the current event of that stream triggers the join.
Here is the sample statement above with unidirectional
keyword, so that output occurs only when a TickEvent arrives and not when a NewsEvent arrives:
select * from TickEvent as t unidirectional, NewsEvent.std:unique(symbol) as n where t.symbol = n.symbol
It is oftentimes the case that an aggregation (count, sum, average) only needs to be calculated in the context of an arriving event or timer. Consider using the unidirectional
keyword when aggregating over joined streams.
An EPL pattern is a normal citizen also providing a stream of data consisting of pattern matches. A time pattern, for example, can be useful to evaluate a join and produce output upon each interval.
This sample statement includes a pattern that fires every 5 seconds and thus triggers the join to evaluate and produce output, computing an aggregated total quantity per symbol every 5 seconds:
select symbol, sum(qty) from pattern[every timer:interval(5 sec)] unidirectional, TickEvent.std:unique(symbol) t, NewsEvent.std:unique(symbol) as n where t.symbol = n.symbol group by symbol
Named windows as well as reference and historical data such as stored in your relational database, and data returned by a method invocation, can also be included in joins as discussed in Section 5.13, “Accessing Relational Data via SQL” and Section 5.14, “Accessing Non-Relational Data via Method Invocation”.
Related to joins are subqueries: A subquery is a select
within another statement, see Section 5.11, “Subqueries”
The engine performs extensive query analysis and planning, building internal indexes and strategies as required to allow fast evaluation of many types of queries.
Each point in time that an event arrives to one of the event streams, the two event streams are joined and output events are produced according to the where
clause when matching events are found for all joined streams.
This example joins 2 event streams. The first event stream consists of fraud warning events for which we keep the last 30 minutes. The second stream is withdrawal events for which we consider the last 30 seconds. The streams are joined on account number.
select fraud.accountNumber as accntNum, fraud.warning as warn, withdraw.amount as amount, max(fraud.timestamp, withdraw.timestamp) as timestamp, 'withdrawlFraud' as desc from com.espertech.esper.example.atm.FraudWarningEvent.win:time(30 min) as fraud, com.espertech.esper.example.atm.WithdrawalEvent.win:time(30 sec) as withdraw where fraud.accountNumber = withdraw.accountNumber
Joins can also include one or more pattern statements as the next example shows:
select * from FraudWarningEvent.win:time(30 min) as fraud, pattern [every w=WithdrawalEvent -> PINChangeEvent(acct=w.acct)].std:lastevent() as withdraw where fraud.accountNumber = withdraw.w.accountNumber
The statement above joins the last 30 minutes of fraud warnings with a pattern. The pattern consists of every withdrawal event that is followed by a PIN change event for the same account number. It joins the two event streams on account number. The last-event view instucts the join to only consider the last pattern match.
In a join and outer join, your statement must declare a data window view or other view onto each stream. Streams that are marked as unidirectional and named windows as well as database or methods in a join are an exception and do not require a view to be specified. If you are joining an event to itself via contained-event selection, views also do not need to be specified. The reason that a data window must be declared is that a data window specifies which events are considered for the join (i.e. last event, last 10 events, all events, last 1 second of events etc.).
The next example joins all FraudWarningEvent events that arrived since the statement was started, with the last 20 seconds of PINChangeEvent events:
select * from FraudWarningEvent.win:keepall() as fraud, PINChangeEvent.win:time(20 sec) as pin where fraud.accountNumber = pin.accountNumber
The above example employed the special keep-all view that retains all events.
Esper supports left outer joins, right outer joins, full outer joins and inner joins in any combination between an unlimited number of event streams. Outer and inner joins can also join reference and historical data as explained in Section 5.13, “Accessing Relational Data via SQL”, as well as join data returned by a method invocation as outlined in Section 5.14, “Accessing Non-Relational Data via Method Invocation”.
The keywords left, right, full
and inner
control the type of the join between two streams. The optional on
clause specifies one or more properties that join each stream. The synopsis is as follows:
...from stream_def [as name] ((left|right|full outer) | inner) join stream_def [on property = property [and property = property ...] ] [ ((left|right|full outer) | inner) join stream_def [on ...]]...
If the outer join is a left outer join, there will be an output event for each event of the stream on the left-hand side of the clause. For example, in the left outer join shown below we will get output for each event in the stream RfidEvent, even if the event does not match any event in the event stream OrderList.
select * from RfidEvent.win:time(30 sec) as rfid left outer join OrderList.win:length(10000) as orderlist on rfid.itemId = orderList.itemId
Similarly, if the join is a Right Outer Join, then there will be an output event for each event of the stream on the right-hand side of the clause. For example, in the right outer join shown below we will get output for each event in the stream OrderList, even if the event does not match any event in the event stream RfidEvent.
select * from RfidEvent.win:time(30 sec) as rfid right outer join OrderList.win:length(10000) as orderlist on rfid.itemId = orderList.itemId
For all types of outer joins, if the join condition is not met, the select list is computed with the event properties of the arrived event while all other event properties are considered to be null.
The next type of outer join is a full outer join. In a full outer join, each point in time that an event arrives to one of the event streams, one or more output events are produced. In the example below, when either an RfidEvent or an OrderList event arrive, one or more output event is produced. The next example shows a full outer join that joins on multiple properties:
select * from RfidEvent.win:time(30 sec) as rfid full outer join OrderList.win:length(10000) as orderlist on rfid.itemId = orderList.itemId and rfid.assetId = orderList.assetId
The last type of join is an inner join. In an inner join, the engine produces an output event for each event of the stream on the left-hand side that matches at least one event on the right hand side considering the join properties. For example, in the inner join shown below we will get output for each event in the RfidEvent stream that matches one or more events in the OrderList data window:
select * from RfidEvent.win:time(30 sec) as rfid inner join OrderList.win:length(10000) as orderlist on rfid.itemId = orderList.itemId and rfid.assetId = orderList.assetId
Patterns as streams in a join follow this rule: If no data window view is declared for the pattern then the pattern stream retains the last match. Thus a pattern must have matched at least once for the last row to become available in a join. Multiple rows from a pattern stream may be retained by declaring a data window view onto a pattern using the pattern [...].
view_specification syntax.
This example outer joins multiple streams. Here the RfidEvent stream is outer joined to both ProductName and LocationDescription via left outer join:
select * from RfidEvent.win:time(30 sec) as rfid left outer join ProductName.win:keepall() as refprod on rfid.productId = refprod.prodId left outer join LocationDescription.win:keepall() as refdesc on rfid.location = refdesc.locId
If the optional on
clause is specified, it may only employ the =
equals operator and property names. Any other operators must be placed in the where
-clause.
The stream names that appear in the on
clause may refer to any stream in the from
-clause.
Your EPL may also provide no on
clause. This is useful when the streams that are joined do not provide any properties to join on, for example when joining with a time-based pattern.
The next example employs a unidirectional left outer join such that the engine, every 10 seconds, outputs a count of the number of RfidEvent events in the 60-second time window.
select count(*) from pattern[every timer:interval(1)] unidirectional left outer join RfidEvent.win:time(60 sec)
In a join or outer join your statement lists multiple event streams, views and/or patterns in the from
clause. As events arrive into the engine, each of the streams (views, patterns) provides insert and remove stream events. The engine evaluates each insert and remove stream event provided by each stream, and joins or outer joins each event against data window contents of each stream, and thus generates insert and remove stream join results.
The direction of the join execution depends on which stream or streams are currently providing an insert or remove stream event for executing the join. A join is thus multidirectional, or bidirectional when only two streams are joined. A join can be made unidirectional if your application does not want new results when events arrive on a given stream or streams.
The unidirectional
keyword can be used in the from
clause to identify a single stream that provides the events to execute the join. If the keyword is present for a stream, all other streams in the from
clause become passive streams. When events arrive or leave a data window of a passive stream then the join does not generate join results.
For example, consider a use case that requires us to join stock tick events (TickEvent) and news events (NewsEvent). The unidirectional
keyword allows to generate results only when TickEvent events arrive, and not when NewsEvent arrive or leave the 10-second time window:
select * from TickEvent unidirectional, NewsEvent.win:time(10 sec) where tick.symbol = news.symbol
Aggregation functions in a unidirectional
join aggregate within the context of each unidirectional event evaluation and are not cumulative. Thereby aggregation functions when used with unidirectional
may evaluate faster as they do not need to consider a remove stream (data removed from data windows or named windows).
The count function in the next query returns, for each TickEvent, the number of matching NewEvent events:
select count(*) from TickEvent unidirectional, NewsEvent.win:time(10 sec) where tick.symbol = news.symbol
The following restrictions apply to unidirectional joins:
The unidirectional
keyword can only be specified for a single stream in the from
clause.
Receiving data from a unidirectional join via the pull API (iterator
method) is not allowed. This is because the engine holds no state for the single stream that provides the events to execute the join.
The stream that declares the unidirectional
keyword cannot declare a data window view or other view for that stream, since remove stream events are not processed for the single stream.
When joining 3 or more streams (including any relational or non-relational sources as below) it can sometimes help to provide the query planner instructions how to best execute the join. The engine compiles a query plan for the EPL statement at statement creation time. You can output the query plan to logging (see configuration).
An outer join that specifies only inner
keywords for all streams is equivalent to an default (inner) join. The following two statements are equivalent:
select * from TickEvent.std:lastevent(), NewsEvent.std:lastevent() where tick.symbol = news.symbol
Equivalent to:
select * from TickEvent.std:lastevent() inner join NewsEvent.std:lastevent() on tick.symbol = news.symbol
For all types of joins, the query planner determines a query graph: The term is used here for all the information regarding what properties or expressions are used to join the streams. The query graph thus includes the where-clause expressions as well as outer-join on-clauses if this statement is an outer join. The query planner also computes a dependency graph which includes information about all historical data streams (relational and non-relational as below) and their input needs.
For default (inner) joins the query planner first attempts to find a path of execution as a nested iteration. For each stream the query planner selects the best order of streams available for the nested iteration considering the query graph and dependency graph. If the full depth of the join is achievable via nested iteration for all streams without full table scan then the query planner uses that nested iteration plan. If not, then the query planner re-plans considering a merge join (Cartesian) approach instead.
Specify the @Hint('PREFER_MERGE_JOIN') to instruct the query planner to prefer a merge join plan instead of a nested iteration plan. Specify the @Hint('FORCE_NESTED_ITER') to instruct the query planner to always use a nested iteration plan.
For example, consider the below statement. Depending on the number of matching rows in OrderBookOne and OrderBookTwo (named windows in this example, and assumed to be defined elsewhere) the performance of the join may be better using the merge join plan.
@Hint('PREFER_MERGE_JOIN') select * from TickEvent.std:lastevent() t, OrderBookOne ob1, OrderBookOne ob2 where ob1.symbol = t.symbol and ob2.symbol = t.symbol and ob1.price between t.buy and t.sell and ob2.price between t.buy and t.sell
For outer joins the query planner considers nested iteration and merge join (Cartesian) equally and above hints don't apply.
This chapter outlines how reference data and historical data that are stored in a relational database can be queried via SQL within EPL statements.
Esper can access via join and outer join as well as via iterator (poll) API all types of event streams to stored data. In order for such data sources to become accessible to Esper, some configuration is required. The Section 15.4.9, “Relational Database Access” explains the required configuration for database access in greater detail, and includes information on configuring a query result cache.
Esper does not parse or otherwise inspect your SQL query. Therefore your SQL can make use of any database-specific SQL language extensions or features that your database provides.
If you have enabled query result caching in your Esper database configuration, Esper retains SQL query results in cache following the configured cache eviction policy.
Also if you have enabled query result caching in your Esper database configuration and provide EPL where
clause and/or on
clause (outer join) expressions, then
Esper builds indexes on the SQL query results to enable fast lookup. This is especially useful if your queries return a large number of rows. For building the proper indexes, Esper inspects the expression found in your EPL query where
clause, if present. For outer joins, Esper also inspects your EPL query on
clause. Esper analyzes the EPL on
clause and where
clause expressions, if present, looking for property comparison with or without logical AND-relationships between properties. When a SQL query returns rows for caching, Esper builds and caches the appropriate index and lookup strategies for fast row matching against indexes.
Joins or outer joins in which only SQL statements or method invocations are listed in the from
clause and no other event streams are termed passive joins. A passive join does not produce an insert or remove stream and therefore does not invoke statement listeners with results. A passive join can be iterated on (polled) using a statement's safeIterator
and iterator
methods.
There are no restrictions to the number of SQL statements or types of streams joined. The following restrictions currently apply:
Sub-views on an SQL query are not allowed; That is, one cannot create a time or length window on an SQL query. However one can use the insert into
syntax to make join results available to a further statement.
Your database software must support JDBC prepared statements that provide statement meta data at compilation time. Most major databases provide this function. A workaround is available for databases that do not provide this function.
JDBC drivers must support the getMetadata feature. A workaround is available as below for JDBC drivers that don't support getting metadata.
The next sections assume basic knowledge of SQL (Structured Query Language).
To join an event stream against stored data, specify the sql
keyword followed by the name of the database and a parameterized SQL query. The syntax to use in the from
clause of an EPL statement is:
sql:database_name [" parameterized_sql_query "]
The engine uses the database_name identifier to obtain configuration information in order to establish a database connection, as well as settings that control connection creation and removal. Please see Section 15.4.9, “Relational Database Access” to configure an engine for database access.
Following the database name is the SQL query to execute. The SQL query can contain one or more substitution parameters. The SQL query string is placed in single brackets [
and ]
. The SQL query can be placed in either single quotes (') or double quotes ("). The SQL query grammer is passed to your database software unchanged, allowing you to write any SQL query syntax that your database understands, including stored procedure calls.
Substitution parameters in the SQL query string take the form ${
expression}
. The engine resolves expression at statement execution time to the actual expression result by evaluating the events in the joined event stream or current variable values, if any event property references or variables occur in the expression. An expression may not contain EPL substitution parameters.
The engine determines the type of the SQL query output columns by means of the result set metadata that your database software returns for the statement. The actual
query results are obtained via the getObject
on java.sql.ResultSet
.
The sample EPL statement below joins an event stream consisting of CustomerCallEvent
events with the results of an SQL query against the database named MyCustomerDB
and table Customer
:
select custId, cust_name from CustomerCallEvent, sql:MyCustomerDB [' select cust_name from Customer where cust_id = ${custId} ']
The example above assumes that CustomerCallEvent
supplies an event property named custId
. The SQL query selects the customer name from the Customer table. The where
clause in the SQL matches the Customer table column cust_id
with the value of custId
in each CustomerCallEvent
event. The engine executes the SQL query for each new CustomerCallEvent
encountered.
If the SQL query returns no rows for a given customer id, the engine generates no output event. Else the engine generates one output event for each row returned by the SQL query. An outer join as described in the next section can be used to control whether the engine should generate output events even when the SQL query returns no rows.
The next example adds a time window of 30 seconds to the event stream CustomerCallEvent
. It also renames the selected properties to customerName and customerId to demonstrate how the naming of columns in an SQL query can be used in the select
clause in the EPL query. And the example uses explicit stream names via the as
keyword.
select customerId, customerName from CustomerCallEvent.win:time(30 sec) as cce, sql:MyCustomerDB ["select cust_id as customerId, cust_name as customerName from Customer where cust_id = ${cce.custId}"] as cq
Any window, such as the time window, generates insert stream (istream) events as events enter the window, and remove stream (rstream) events as events leave the window. The engine executes the given SQL query for each CustomerCallEvent
in both the insert stream and the remove stream. As a performance optimization, the istream
or rstream
keywords in the select
clause can be used to instruct the engine to only join insert stream or remove stream events, reducing the number of SQL query executions.
Since any expression may be placed within the ${...}
syntax, you may use variables or user-defined functions as well.
The next example assumes that a variable by name varLowerLimit
is defined and that a user-defined function getLimit
exists on the MyLib
imported class that takes a LimitEvent
as a parameter:
select * from LimitEvent le, sql:MyCustomerDB [' select cust_name from Customer where amount > ${max(varLowerLimit, MyLib.getLimit(le))} ']
The example above takes the higher of the current variable value or the value returned by the user-defined function to return only those customer names where the amount exceeds the computed limit.
Consider using the EPL where
clause to join the SQL query result to your event stream. Similar to EPL joins and outer-joins that join event streams or patterns, the EPL where
clause provides join criteria between the SQL query results and the event stream (as a side note, an SQL where
clause is a filter of rows executed by your database on your database server before returning SQL query results).
Esper analyzes the expression in the EPL where
clause and outer-join on
clause, if present, and builds the appropriate indexes from that information at runtime, to ensure fast matching of event stream events to SQL query results, even if your SQL query returns a large number of rows. Your applications must ensure to configure a cache for your database using Esper configuration, as such indexes are held with regular data in a cache. If you application does not enable caching of SQL query results, the engine does not build indexes on cached data.
The sample EPL statement below joins an event stream consisting of OrderEvent
events with the results of an SQL query against the database named MyRefDB
and table SymbolReference
:
select symbol, symbolDesc from OrderEvent as orders, sql:MyRefDB ['select symbolDesc from SymbolReference'] as reference where reference.symbol = orders.symbol
Notice how the EPL where
clause joins the OrderEvent
stream to the SymbolReference
table. In this example, the SQL query itself does not have a SQL where
clause
and therefore returns all rows from table SymbolReference
.
If your application enables caching, the SQL query fires only at the arrival of the first OrderEvent
event. When the second OrderEvent
arrives, the join execution uses the cached query result. If the caching policy that you specified in the Esper database configuration evicts the SQL query result from cache, then the engine fires the SQL query again to obtain a new result and places the result in cache.
If SQL result caching is enabled and your EPL where
clause, as show in the above example, provides the properties to join, then the engine indexes the SQL query results in cache and retains the index together with the query result in cache. Thus your application can benefit from high performance index-based lookups as long as the SQL query results are found in cache.
The SQL result caches operate on the level of all result rows for a given parameter set. For example, if your query returns 10 rows for a certain set of parameter values then the cache treats all 10 rows as a single entry keyed by the parameter values, and the expiry policy applies to all 10 rows and not to each individual row.
It is also possible to join multiple autonomous database systems in a single query, for example:
select symbol, symbolDesc from OrderEvent as orders, sql:My_Oracle_DB ['select symbolDesc from SymbolReference'] as reference, sql:My_MySQL_DB ['select orderList from orderHistory'] as history where reference.symbol = orders.symbol and history.symbol = orders.symbol
You can use outer joins to join data obtained from an SQL query and control when an event is produced. Use a left outer join, such as in the next statement, if you need an output event for each event regardless of whether or not the SQL query returns rows. If the SQL query returns no rows, the join result populates null values into the selected properties.
select custId, custName from CustomerCallEvent as cce left outer join sql:MyCustomerDB ["select cust_id, cust_name as custName from Customer where cust_id = ${cce.custId}"] as cq on cce.custId = cq.cust_id
The statement above always generates at least one output event for each CustomerCallEvent
, containing all columns selected by the SQL query, even if the SQL query does not return any rows. Note the on
expression that is required for outer joins. The on
acts as an additional filter to rows returned by the SQL query.
Pattern statements and SQL queries can also be applied together in useful ways. One such use is to poll or request data from a database at regular intervals or following the schedule of the crontab-like timer:at
.
The next statement is an example that shows a pattern that fires every 5 seconds to query the NewOrder table for new orders:
insert into NewOrders select orderId, orderAmount from pattern [every timer:interval(5 sec)], sql:MyCustomerDB ['select orderId, orderAmount from NewOrders']
Usually your SQL query will take part in a join and thus be triggered by an event or pattern occurrence. Instead, your application may need to poll a SQL query and thus use Esper query execution and caching facilities and obtain event data and metadata.
Your EPL statement can specify an SQL statement without a join. Such a stand-alone SQL statement does not post new events, and may only be queried via the iterator
poll API. Your EPL and SQL statement may still use variables.
The next statement assumes that a price_var
variable has been declared. It selects from the relational database table named NewOrder
all rows in which the price
column is greater
then the current value of the price_var
EPL variable:
select * from sql:MyCustomerDB ['select * from NewOrder where ${price_var} > price']
Use the iterator
and safeIterator
methods on EPStatement
to obtain results. The statement does not post events to listeners, it is strictly passive in that sense.
The engine translates SQL queries into JDBC java.sql.PreparedStatement
statements by replacing ${name} parameters with '?' placeholders. It obtains name and type of result columns from the compiled PreparedStatement
meta data when the EPL statement is created.
The engine supplies parameters to the compiled statement via the setObject
method on PreparedStatement
. The engine uses the getObject
method on the compiled statement PreparedStatement
to obtain column values.
Certain JDBC database drivers are known to not return metadata for precompiled prepared SQL statements. This can be a problem as metadata is required by Esper. Esper obtains SQL result set metadata to validate an EPL statement and to provide column types for output events. JDBC drivers that do not provide metadata for precompiled SQL statements require a workaround. Such drivers do generally provide metadata for executed SQL statements, however do not provide the metadata for precompiled SQL statements.
Please consult the Chapter 15, Configuration for the configuration options available in relation to metadata retrieval.
To obtain metadata for an SQL statement, Esper can alternatively fire a SQL statement which returns the same column names and types as the actual SQL statement but without returning any rows. This kind of SQL statement is referred to as a sample statement in below workaround description. The engine can then use the sample SQL statement to retrieve metadata for the column names and types returned by the actual SQL statement.
Applications can provide a sample SQL statement to retrieve metadata via the metadatasql
keyword:
sql:database_name ["parameterized_sql_query" metadatasql "sql_meta_query"]
The sql_meta_query must be an SQL statement that returns the same number of columns, the same type of columns and the same column names as the parameterized_sql_query, and does not return any rows.
Alternatively, applications can choose not to provide an explicit sample SQL statement. If the EPL statement does not use the metadatasql
syntax, the engine applies lexical analysis to the SQL statement. From the lexical analysis Esper generates a sample SQL statement adding a restrictive clause "where 1=0" to the SQL statement.
Alternatively, you can add the following tag to the SQL statement: ${$ESPER-SAMPLE-WHERE}
. If the tag exists in the SQL statement, the engine does not perform lexical analysis and simply replaces the
tag with the SQL where
clause "where 1=0". Therefore this workaround is applicable to SQL statements that cannot be correctly lexically analyzed. The SQL text after the placeholder is not part of the sample query. For example:
select mycol from sql:myDB [ 'select mycol from mytesttable ${$ESPER-SAMPLE-WHERE} where ....'], ...
If your parameterized_sql_query SQL query contains vendor-specific SQL syntax, generation of the metadata query may fail to produce a valid SQL statement. If you experience an SQL error while fetching metadata, use any of the above workarounds with the Oracle JDBC driver.
As part of database access configuration you may optionally specify SQL type mappings. These mappings apply to all queries against the same database identified by name.
If your application must perform SQL-query-specific or EPL-statement-specific mapping or conversion between types, the facility to register a conversion callback exists as follows.
Use the @Hook
instruction and HookType.SQLCOL
as part of your EPL statement text to register a statement SQL parameter or column conversion hook.
Implement the interface com.espertech.esper.client.hook.SQLColumnTypeConversion
to perform the input parameter or column value conversion.
A sample statement with annotation is shown:
@Hook(type=HookType.SQLCOL, hook='MyDBTypeConvertor') select * from sql:MyDB ['select * from MyEventTable]
The engine expects MyDBTypeConvertor
to resolve to a class (considering engine imports) and instantiates one instance of MyDBTypeConvertor for each statement.
Your application may also directly convert a SQL result row into a Java class which is an opportunity for your application to interrogate and transform the SQL row result data freely before packing the data into a Java class. Your application can additionally indicate to skip SQL result rows.
Use the @Hook
instruction and HookType.SQLROW
as part of your EPL statement text to register a statement SQL output row conversion hook.
Implement the interface com.espertech.esper.client.hook.SQLOutputRowConversion
to perform the output row conversion.
A sample statement with annotation is shown:
@Hook(type=HookType.SQLROW, hook='MyDBRowConvertor') select * from sql:MyDB ['select * from MyEventTable]
The engine expects MyDBRowConvertor
to resolve to a class (considering engine imports) and instantiates one MyDBRowConvertor instance for each statement.
Your application may need to join data that originates from a web service, a distributed cache, an object-oriented database or simply data held in memory by your application. Esper accommodates this need by allowing
a method invocation (or procedure call or function) in the from
clause of a statement.
The results of such a method invocation in the from
clause plays the same role as a relational database table in an inner and outer join in SQL. The role is thus dissimilar to the role of a user-defined function, which may occur in any expression such as in the select
clause or the where
clause. Both are backed by one or more static methods provided by your class library.
Esper can join and outer join an unlimited number and all types of event streams to the data returned by your method invocation. In addition, Esper can be configured to cache the data returned by your method invocations.
Joins or outer joins in which only SQL statements or method invocations are listed in the from
clause and no other event streams are termed passive joins. A passive join does not produce an insert or remove stream and therefore does not invoke statement listeners with results. A passive join can be iterated on (polled) using a statement's safeIterator
and iterator
methods.
The following restrictions currently apply:
Sub-views on a method invocations are not allowed; That is, one cannot create a time or length window on a method invocation. However one can use the insert into
syntax to make join results available to a further statement.
The syntax for a method invocation in the from
clause of an EPL statement is:
method:class_name.method_name[(parameter_expressions)]
The method
keyword denotes a method invocation. It is followed by a class name and a method name separated by a dot (.) character. If you have parameters to your method invocation, these are placed in
round brackets after the method name. Any expression is allowed as a parameter, and individual parameter expressions are separated by a comma. Expressions may also use event properties of the joined stream.
In the sample join statement shown next, the method 'lookupAsset' provided by class 'MyLookupLib' returns one or more rows based on the asset id (a property of the AssetMoveEvent) that is passed to the method:
select * from AssetMoveEvent, method:MyLookupLib.lookupAsset(assetId)
The following statement demonstrates the use of the where
clause to join events to the rows returned by a method invocation, which in this example does not take parameters:
select assetId, assetDesc from AssetMoveEvent as asset, method:MyLookupLib.getAssetDescriptions() as desc where asset.assetid = desc.assetid
Your method invocation may return zero, one or many rows for each method invocation. If you have caching enabled through configuration, then Esper can avoid the method invocation and instead use cached results. Similar to SQL joins, Esper also indexes cached result rows such that join operations based on the where
clause or outer-join on
clause can be very efficient, especially if your method invocation returns a large number of rows.
If the time taken by method invocations is critical to your application, you may configure local caches as Section 15.4.7, “Cache Settings for From-Clause Method Invocations” describes.
Esper analyzes the expression in the EPL where
clause and outer-join on
clause, if present, and builds the appropriate indexes from that information at runtime, to ensure fast matching of event stream events to method invocation results, even if your method invocation returns a large number of rows. Your applications must ensure to configure a cache for your method invocation using Esper configuration, as such indexes are held with regular data in a cache. If you application does not enable caching of method invocation results, the engine does not build indexes on cached data.
Usually your method invocation will take part in a join and thus be triggered by an event or pattern occurrence. Instead, your application may need to poll a method invocation and thus use Esper query execution and caching facilities and obtain event data and metadata.
Your EPL statement can specify a method invocation in the from
clause without a join. Such a stand-alone method invocation does not post new events, and may only be queried via the iterator
poll API. Your EPL statement may still use variables.
The next statement assumes that a category_var
variable has been declared. It polls the getAssetDescriptions
method passing the current value of the category_var
EPL variable:
select * from method:MyLookupLib.getAssetDescriptions(category_var)]
Use the iterator
and safeIterator
methods on EPStatement
to obtain results. The statement does not post events to listeners, it is strictly passive in that sense.
Your application must provide a Java class that exposes a public static method. The method must accept the same number and type of parameters as listed in the parameter expression list.
If your method invocation returns either no row or only one row, then the return type of the method can be a Java class or a java.util.Map
. If your method invocation can return more then one row, then the return type of the method must be an array of Java class or an array of Map
.
If you are using a Java class or an array of Java class as the return type, then the class must adhere to JavaBean conventions: it must expose properties through getter methods.
If you are using java.util.Map
as the return type or an array of Map
, then the map should have String
-type keys and object values (Map<String, Object>
). When using Map
as the return type, your application must provide a second method that returns property metadata, as the next section outlines.
Your application method must return either of the following:
A null
value or an empty array to indicate an empty result (no rows).
A Java object or Map
to indicate a one-row result, or an array that consists of a single Java object or Map
.
An array of Java objects or Map
instances to return multiple result rows.
As an example, consider the method 'getAssetDescriptions' provided by class 'MyLookupLib' as discussed earlier:
select assetId, assetDesc from AssetMoveEvent as asset, method:com.mypackage.MyLookupLib.getAssetDescriptions() as desc where asset.assetid = desc.assetid
The 'getAssetDescriptions' method may return multiple rows and is therefore declared to return an array of the class 'AssetDesc'. The class AssetDesc is a POJO class (not shown here):
public class MyLookupLib { ... public static AssetDesc[] getAssetDescriptions() { ... return new AssetDesc[] {...}; }
The example above specifies the full Java class name of the class 'MyLookupLib' class in the EPL statement. The package name does not need to be part of the EPL if your application imports the package using the auto-import configuration through the API or XML, as outlined in Section 15.4.6, “Class and package imports”.
Your application may return java.util.Map
or an array of Map
from method invocations. If doing so, your application must provide metadata about each row: it must declare the property name and property type of each Map
entry of a row. This information allows the engine to perform type checking of expressions used within the statement.
You declare the property names and types of each row by providing a method that returns property metadata. The metadata method must follow these conventions:
The method name providing the property metadata must have same method name appended by the literal Metadata
.
The method must have an empty parameter list and must be declared public and static.
The method providing the metadata must return a Map
of String
property name keys and java.lang.Class
property name types (Map<String, Class>
).
In the following example, a class 'MyLookupLib' provides a method to return historical data based on asset id and asset code:
select assetId, location, x_coord, y_coord from AssetMoveEvent as asset, method:com.mypackage.MyLookupLib.getAssetHistory(assetId, assetCode) as history
A sample implementation of the class 'MyLookupLib' is shown below.
public class MyLookupLib { ... // For each column in a row, provide the property name and type // public static Map<String, Class> getAssetHistoryMetadata() { Map<String, Class> propertyNames = new HashMap<String, Class>(); propertyNames.put("location", String.class); propertyNames.put("x_coord", Integer.class); propertyNames.put("y_coord", Integer.class); return propertyNames; } ... // Lookup rows based on assetId and assetCode // public static Map<String, Object>[] getAssetHistory(String assetId, String assetCode) { Map rows = new Map[2]; // this sample returns 2 rows for (int i = 0; i < 2; i++) { rows[i] = new HashMap(); rows[i].put("location", "somevalue"); rows[i].put("x_coord", 100); // ... set more values for each row } return rows; }
In the example above, the 'getAssetHistoryMetadata' method provides the property metadata: the names and types of properties in each row. The engine calls this method once per statement to determine event typing information.
The 'getAssetHistory' method returns an array of Map
objects that are two rows. The implementation shown above is a simple example. The parameters to the method are the assetId and assetCode properties of the AssetMoveEvent joined to the method. The engine calls this method for each insert and remove stream event in AssetMoveEvent.
To indicate that no rows are found in a join, your application method may return either a null
value or an array of size zero.
A named window is a global data window that can take part in many statement queries, and that can be inserted-into and deleted-from by multiple statements. A named window holds events of the same type or supertype, unless used with a variant stream.
The create window
clause declares a new named window. The named window starts up empty unless populated from an existing named window at time of creation. Events must be inserted into the named window using the insert into
clause. Events can also be deleted from a named window via the on delete
clause.
Events enter the named window by means of insert into
clause of a select
statement. Events leave a named window either because the expiry policy of the declared data window removes events from the named window, or through statements that use the on delete
clause to explicitly delete from a named window.
To query a named window, simply use the window name in the from
clause of your statement, including statements that contain subqueries, joins and outer-joins.
A named window may also decorate an event to preserve original events as described in Section 5.10.4, “Decorated Events” and Section 5.15.2.1, “Named Windows Holding Decorated Events”. In addition, columns of a named window are allowed to hold events themselves, as further explained in Section 5.10.5, “Event as a Property” and Section 5.15.2.2, “Named Windows Holding Events As Property”.
To tune subquery performance when the subquery selects from a named window, consider the hints discussed in Section 5.11.8, “Hints Related to Subqueries”.
The create window
statement creates a named window by specifying a window name and one or more data window views, as well as the type of event to hold in the named window.
There are two syntaxes for creating a named window: The first syntax allows to model a named window after an existing event type or an existing named window. The second syntax is similar to the SQL create-table syntax and provides a list of column names and column types.
A new named window starts up empty. It must be explicitly inserted into by one or more statements, as discussed below. A named window can also be populated at time of creation from an existing named window.
If your application stops or destroys the statement that creates the named window, any consuming statements no longer receive insert or remove stream events. The named window can also not be deleted from after it was stopped or destroyed.
The create window
statement posts to listeners any events that are inserted into the named window as new data. The statement posts all deleted events or events that expire out of the data window to listeners as the remove stream (old data). The named window contents can also be iterated on via the pull API to obtain the current contents of a named window.
The benefit of modelling a named window after an existing event type is that event properties can be nested, indexed, mapped or other types that your event objects may provide as properties, including the type of the underlying event itself. Also, using the wildcard (*) operator means your EPL does not need to list each individual property explicitly.
The syntax for creating a named window by modelling the named window after an existing event type, is as follows:
[context context_name] create window window_name.view_specifications [as] [select list_of_properties from] event_type_or_windowname [insert [where filter_expression]]
The window_name you assign to the named window can be any identifier. The name should not already be in use as an event type or stream name.
The view_specifications are one or more data window views that define the expiry policy for removing events from the data window. Named windows must explicitly declare a data window view. This is required to ensure that the policy for retaining events in the data window is well defined. To keep all events, use the keep-all view: It indicates that the named window should keep all events and only remove events from the named window that are deleted via the on delete
clause. The view specification can only list data window views, derived-value views are not allowed since these don't represent an expiry policy. Data window views are listed in Chapter 12, EPL Reference: Views. View parameterization and staggering are described in Section 5.4.3, “Specifying Views”.
The select
clause and list_of_properties are optional. If present, they specify the column names and, implicitly by definition of the event type, the column types of events held by the named window. Expressions other than column names are not allowed in the select
list of properties. Wildcards (*) and wildcards with additional properties can also be used.
The event_type_or_windowname is required if using the model-after syntax. It provides the name of the event type of events held in the data window, unless column names and types have been explicitly selected via select
. The name of an (existing) other named window is also allowed here. Please find more details in Section 5.15.7, “Populating a Named Window from an Existing Named Window”.
Finally, the insert
clause and optional filter_expression are used if the new named windows is modelled after an existing named window, and the data of the new named window is to be populated from the existing named window upon creation. The optional filter_expression can be used to exclude events.
You may refer to a context by specifying the context
keyword followed by a context name. Context are described in more detail at Chapter 4, Context and Context Partitions. The effect of referring to a context is that your named window operates according to the context dimensional information as declared for the context. For usage and limitations please see the respective chapter.
The next statement creates a named window 'AllOrdersNamedWindow' for which the expiry policy is simply to keep all events. Assume that the event type 'OrderMapEventType' has been configured. The named window is to hold events of type 'OrderMapEventType':
create window AllOrdersNamedWindow.win:keepall() as OrderMapEventType
The below sample statement demonstrates the select
syntax. It defines a named window in which each row has the three properties 'symbol', 'volume' and 'price'. This named window actively removes events from the window that are older then 30 seconds.
create window OrdersTimeWindow.win:time(30 sec) as select symbol, volume, price from OrderEvent
In an alternate form, the as
keyword can be used to rename columns, and constants may occur in the select-clause as well:
create window OrdersTimeWindow.win:time(30 sec) as select symbol as sym, volume as vol, price, 1 as alertId from OrderEvent
The second syntax for creating a named window is by supplying column names and types:
[context context_name] create window window_name.view_specifications [as] (column_name column_type [,column_name column_type [,...])
The column_name is an identifier providing the event property name. The column_type is also required for each column. Valid column types are listed in Section 5.18.1, “Creating Variables: the Create Variable clause” and are the same as for variable types.
For attributes that are array-type append []
(left and right brackets).
The next statement creates a named window:
create window SecurityEvent.win:time(30 sec) (ipAddress string, userId String, numAttempts int, properties String[])
Named window columns can hold events by declaring the column type as the event type name. Array-type in combination with event-type is also supported.
The next two statements declare an event type and create a named window with a column of the defined event type:
create schema SecurityData (name String, roles String[])
create window SecurityEvent.win:time(30 sec) (ipAddress string, userId String, secData SecurityData, historySecData SecurityData[])
Whether the named window uses a Map or Object-array event representation for the rows can be specified as follows. If the create-window statement provides the @EventRepresentation(array=true)
annotation the engine maintains named window rows as object array. If the statement provides the @EventRepresentation(array=false)
annotation the engine maintains named window rows using Map objects. If neither annotation is provided, the engine uses the configured default event representation as discussed in Section 15.4.11.1, “Default Event Representation”.
The following EPL statement instructs the engine to represent FooWindow rows as object arrays:
@EventRepresentation(array=true) create window FooWindow.win:time(5 sec) as (string prop1)
There is no syntax to drop or remove a named window.
The destroy
method on the EPStatement
that created the named window removes the named window. However the implicit event type associated with the named window remains active since further statements may continue to use that type. Therefore a named window of the same name can only be created again if the type information matches the prior declaration for a named window.
The insert into
clause inserts events into named windows. Your application must ensure that the column names and types match the declared column names and types of the named window to be inserted into.
For inserting into a named window and for simulateously checking if the inserted row already exists in the named window or for atomic update-insert operation on a named window, consider using on-merge
as described in Section 5.15.12, “Triggered Upsert using the On-Merge Clause”. On-merge
is similar to the SQL merge
clause and provides what is known as an "Upsert" operation: Update existing events or if no existing event(s) are found then insert a new event, all in one atomic operation provided by a single EPL statement.
In this example we first create a named window using some of the columns of an OrderEvent event type:
create window OrdersWindow.win:keepall() as select symbol, volume, price from OrderEvent
The insert into the named window selects individual columns to be inserted:
insert into OrdersWindow(symbol, volume, price) select name, count, price from FXOrderEvent
An alternative form is shown next:
insert into OrdersWindow select name as symbol, vol as volume, price from FXOrderEvent
Following above statement, the engine enters every FXOrderEvent arriving into the engine into the named window 'OrdersWindow'.
The following EPL statements create a named window for an event type backed by a Java class and insert into the window any 'OrderEvent' where the symbol value is IBM:
create window OrdersWindow.win:time(30) as com.mycompany.OrderEvent
insert into OrdersWindow select * from com.mycompany.OrderEvent(symbol='IBM')
The last example adds one column named 'derivedPrice' to the 'OrderEvent' type by specifying a wildcard, and uses a user-defined function to populate the column:
create window OrdersWindow.win:time(30) as select *, price as derivedPrice from OrderEvent
insert into OrdersWindow select *, MyFunc.func(price, percent) as derivedPrice from OrderEvent
Event representations based on Java base classes or interfaces, and subclasses or implementing classes, are compatible as these statements show:
// create a named window for the base class create window OrdersWindow.std:unique(name) as select * from ProductBaseEvent
// The ServiceProductEvent class subclasses the ProductBaseEvent class insert into OrdersWindow select * from ServiceProductEvent
// The MerchandiseProductEvent class subclasses the ProductBaseEvent class insert into OrdersWindow select * from MerchandiseProductEvent
To avoid duplicate events inserted in a named window and atomically check if a row already exists, use on-merge
as outlined in Section 5.15.12, “Triggered Upsert using the On-Merge Clause”. An example:
on ServiceProductEvent as spe merge OrdersWindow as win where win.id = spe.id when not matched then insert select *
Decorated events hold an underlying event and add additional properties to the underlying event, as described further in Section 5.10.4, “Decorated Events”.
Here we create a named window that decorates OrderEvent events by adding an additional property named priceTotal
to each OrderEvent. A matching insert into
statement is also part of the sample:
create window OrdersWindow.win:time(30) as select *, price as priceTotal from OrderEvent
insert into OrdersWindow select *, price * unit as priceTotal from ServiceOrderEvent
The property type of the additional priceTotal
column is the property type of the existing price
property of OrderEvent.
Columns in a named window may also hold an event itself. More information on the insert into
clause providing event columns is in Section 5.10.5, “Event as a Property”.
The next sample creates a named window that specifies two columns: A column that holds an OrderEvent, and a column by name priceTotal
. A matching insert into
statement is also part of the sample:
create window OrdersWindow.win:time(30) as select this, price as priceTotal from OrderEvent
insert into OrdersWindow select order, price * unit as priceTotal from ServiceOrderEvent as order
Note that the this
proprerty must exist on the event and must return the event class itself (JavaBean events only). The property type of the additional priceTotal
column is the property type of the existing price
property.
Your application can insert rows into a named window using on-demand (fire-and-forget, non-continuous) queries as described in Section 14.5, “On-Demand Fire-And-Forget Query Execution”.
The syntax for the insert into
clause is as follows:
insert into window_name [(property_names)] select value_expressions
The window_name is the name of the named window to insert events into.
After the named window name you can optionally provide a comma-separated list of event property names. When providing property names, the order of expressions in the select-clause must match the order of property names specified (select-clause property names are ignored). When not providing property names, the select-clause expressions must name the event properties to be inserted into by assigning a property name.
It follows the select
keyword and a list of value expressions that provide values for the event properties of the event to be inserted into the named window.
The example code snippet inserts a new order event into the OrdersWindow
named window:
String query = "insert into OrdersWindow(orderId, symbol, price) select '001', 'GE', 100"; epService.getEPRuntime().executeQuery(query);
If you do not specify event property names, the select-clause expressions must name the event property names.
The following EPL inserts the same values as above but specifies property names as part of the select-clause expressions:
insert into OrdersWindow select '001' as orderId, 'GE' as symbol, 100 as price
A named window can be referred to by any statement in the from
clause of the statement. Filter criteria can also be specified. Additional views may be used onto named windows however such views cannot include data window views.
A statement selecting all events from a named window 'AllOrdersNamedWindow' is shown next. The named window must first be created via the create window
clause before use.
select * from AllOrdersNamedWindow
The statement as above simply receives the unfiltered insert stream of the named window and reports that stream to its listeners. The iterator
method returns returns all events in the named window, if any.
If your application desires to obtain the events removed from the named window, use the rstream
keyword as this statement shows:
select rstream * from AllOrdersNamedWindow
The next statement derives an average price per symbol for the events held by the named window:
select symbol, avg(price) from AllOrdersNamedWindow group by symbol
A statement that consumes from a named window, like the one above,
receives the insert and remove stream of the named window. The insert stream represents the events inserted into the named window. The remove stream represents the events expired from
the named window data window and the events explicitly deleted via on-delete
for on-demand (fire-and-forget) delete
.
Your application may create a consuming statement such as above on an empty named window, or your application may create the above statement on an already filled named window. The engine provides correct results in either case: At the time of statement creation the Esper engine internally initializes the consuming statement from the current named window, also taking your declared filters into consideration. Thus, your statement deriving data from a named window does not start empty if the named window already holds one or more events. A consuming statement also sees the remove stream of an already populated named window, if any.
If you require a subset of the data in the named window, you can specify one or more filter expressions onto the named window as shown here:
select symbol, avg(price) from AllOrdersNamedWindow(sector='energy') group by symbol
By adding a filter to the named window, the aggregation and grouping as well as any views that may be declared onto to the named window receive a filtered insert and remove stream. The above statement thus outputs, continuously, the average price per symbol for all orders in the named window that belong to a certain sector.
A side note on variables in filters filtering events from named windows: The engine initializes consuming statements at statement creation time and changes aggregation state continuously as events arrive. If the filter criteria contain variables and variable values changes, then the engine does not re-evaluate or re-build aggregation state. In such a case you may want to place variables in the having
clause which evaluates on already-built aggregation state.
The following example further declares a view into the named window. Such a view can be a plug-in view or one of the built-in views, but cannot be a data window view (with the exception of the std:groupwin
grouped-window view which is allowed).
select * from AllOrdersNamedWindow(volume>0, price>0).mycompany:mypluginview()
Data window views cannot be used onto named windows since named windows post insert and remove streams for the events entering and leaving the named window, thus the expiry policy and batch behavior are well defined by the data window declared for the named window. For example, the following is not allowed and fails at time of statement creation:
// not a valid statement select * from AllOrdersNamedWindow.win:time(30 sec)
The on select
clause performs a one-time, non-continuous query on a named window every time a triggering event arrives or a triggering pattern matches. The query can consider all events in the named window, or only events that match certain criteria, or events that correlate with an arriving event or a pattern of arriving events.
The syntax for the on select
clause is as follows:
on event_type[(filter_criteria)] [as stream_name] [insert into insert_into_def] select select_list from window_name [as stream_name] [where criteria_expression] [group by grouping_expression_list] [having grouping_search_conditions] [order by order_by_expression_list]
The event_type is the name of the type of events that trigger the query against the named window. It is optionally followed by filter_criteria which are filter expressions to apply to arriving events. The optional as
keyword can be used to assign an stream name. Patterns or named windows can also be specified in the on
clause, see the samples in Section 5.15.10.1, “Using Patterns in the On Delete Clause” (for named windows only insert stream events trigger actions).
The insert into clause works as described in Section 5.10, “Merging Streams and Continuous Insertion: the Insert Into Clause”. The select clause is described in Section 5.3, “Choosing Event Properties And Events: the Select Clause”. For all clauses the semantics are equivalent to a join operation: The properties of the triggering event or events are available in the select
clause and all other clauses.
The window_name in the from
clause is the name of the named window to select events from. The as
keyword is also available to assign a stream name to the named window. The as
keyword is helpful in conjunction with wildcard in the select
clause to select named window events via the syntax select streamname.*
.
The optional where
clause contains a criteria_expression that correlates the arriving (triggering) event to the events to be considered from the named window. The criteria_expression may also simply filter for events in the named window to be considered by the query.
The group by
clause, the having
clause and the order by
clause are all optional and work as described in earlier chapters.
The similarities and differences between an on select
clause and a regular or outer join are as follows:
A join is evaluated when any of the streams participating in the join have new events (insert stream) or events leaving data windows (remove stream). A join is therefore bi-directional or multi-directional. However, the on select
statement has one triggering event or pattern that causes the query to be evaluated and is thus uni-directional.
The query within the on select
statement is not continuous: It executes only when a triggering event or pattern occurs. Aggregation and groups are computed anew considering the contents of the named window at the time the triggering event arrives.
The iterator
of the EPStatement
object representing the on select
clause returns the last batch of selected events in response to the last triggering event, or null if the last triggering event did not select any rows.
For correlated queries that correlate triggering events with events held by a named window, Esper internally creates efficient indexes to enable high performance querying of events. It analyzes the where
clause to build one or more indexes for fast lookup in the named window based on the properties of the triggering event.
The next statement demonstrates the concept. Upon arrival of a QueryEvent event the statement selects all events in the 'OrdersNamedWindow' named window:
on QueryEvent select win.* from OrdersNamedWindow as win
The engine executes the query on arrival of a triggering event, in this case a QueryEvent. It posts the query results to any listeners to the statement, in a single invocation, as the new data array. By prefixing the wildcard (*) selector with the stream name, the select
clause returns only events of the named window and does not also return triggering events.
The where
clause filters and correlates events in the named window with the triggering event, as shown next:
on QueryEvent(volume>0) as query select query.symbol, query.volume, win.symbol from OrdersNamedWindow as win where win.symbol = query.symbol
Upon arrival of a QueryEvent, if that event has a value for the volume property that is greater then zero, the engine executes the query. The query considers all events currently held by the 'OrdersNamedWindow' that match the symbol property value of the triggering QueryEvent event. The engine then posts query results to the statement's listeners.
Aggregation, grouping and ordering of results are possible as this example shows:
on QueryEvent as queryEvent select symbol, sum(volume) from OrdersNamedWindow as win group by symbol having volume > 0 order by symbol
The above statement outputs the total volume per symbol for those groups where the sum of the volume is greater then zero, ordered by symbol ascending. The engine computes and posts the output based on the current contents of the 'OrdersNamedWindow' named window considering all events in the named window, since the query does not have a where
clause.
When using wildcard (*) to select from streams in an on-select clause, each stream, that is the the triggering stream and the selected-upon named window, are selected, similar to a join. Therefore your wildcard select returns two columns: the triggering event and the selection result event, for each row.
on QueryEvent as queryEvent select * from OrdersNamedWindow as win
The query above returns a queryEvent
column and a win
column for each event. If only a single stream's event is desired in the result, use select win.*
instead.
To trigger an on-select when an update to the selected named window occurs or when the triggering event is the same event that is being inserted into the named window, specify the named window name as the event type.
The next query fires the select for every change to the named window OrdersNamedWindow:
on OrdersNamedWindow as trig select onw.symbol, sum(onw.volume) from OrdersNamedWindow as onw where onw.symbol = trig.symbol
On-select
and the unidirectional join can be compared as follows.
On-select
, on-merge
, on-insert
, on-delete
, on-update
and on-select-and-delete
operate only on named windows. Unidirectional joins however can operate on any stream.
If the unidirectional join is between a single named window and a triggering event or pattern and that triggering event or pattern is marked unidirectional, the unidirectional join is equivalent to on-select
.
Execution aspects differ in terms of locking. An on-select
statement executes under a shareable named window context partition lock.
A unidirectional join does not execute under such named window context partition lock and instead is a consumer relationship to the named window (see above).
The on select delete
clause performs a one-time, non-continuous query on a named window every time a triggering event arrives or a triggering pattern matches, similar to on-select
as described in the previous section. In addition, any selected rows are also deleted.
The syntax for the on select delete
clause is as follows:
on trigger
select [and] delete select_list...
... (please see on-select for insert into, from, group by, having, order by
)...
The syntax follows the syntax of on-select
as described earlier. The select
clause follows the optional and
keyword and the delete
keyword.
The example statement below selects and deletes all rows from OrdersNamedWindow when a QueryEvent arrives:
on QueryEvent select and delete window(win.*) as rows from OrdersNamedWindow as win
The sample EPL above also shows the use of the window
aggregation function. It specifies the window
aggregation function
to instruct the engine to output a single event, regardless of the number of rows in the named window, and that contains a column rows
that contains a collection of the selected event's underlying objects.
Your EPL statement may specify the name of an existing named window when creating a new named window, and may use the insert
keyword to indicate that the new named window is to be populated from
the events currently held by the existing named window.
For example, and assuming the named window OrdersNamedWindow
already exists, this statement creates a new named window ScratchOrders
and populates all orders in OrdersNamedWindow
into the new named window:
create window ScratchOrders.win:keepall() as OrdersNamedWindow insert
The where
keyword is also available to perform filtering, for example:
create window ScratchBuyOrders.win:time(10) as OrdersNamedWindow insert where side = 'buy'
An on update
clause updates events held by a named window. The clause can be used to update all events, or only events that match certain criteria, or events that correlate with an arriving event or a pattern of arriving events.
For updating a named window and for simulateously checking if the updated row exists in the named window or for atomic update-insert operation on a named window, consider using on-merge
as described in Section 5.15.12, “Triggered Upsert using the On-Merge Clause”. On-merge
is similar to the SQL merge
clause and provides what is known as an "Upsert" operation: Update existing events or if no existing event(s) are found then insert a new event, all in one atomic operation provided by a single EPL statement.
The syntax for the on update
clause is as follows:
on event_type[(filter_criteria)] [as stream_name] update window_name [as stream_name] set mutation_expression [, mutation_expression [,...]] [where criteria_expression]
The event_type is the name of the type of events that trigger an update of rows in a named window. It is optionally followed by filter_criteria which are filter expressions to apply to arriving events. The optional as
keyword can be used to assign a name for use in expressions and the where
clause. Patterns and named windows can also be specified in the on
clause.
The window_name is the name of the named window to update events. The as
keyword is also available to assign a name to the named window.
After the set
keyword follows a list of comma-separated mutation_expression expressions. A mutation expression is any valid EPL expression.
Subqueries may by part of expressions however aggregation functions and the prev
or prior
function may not be used in expressions.
The below table shows some typical mutation expessions:
Table 5.6. Mutation Expressions in Named Window Update And Merge
Description | Syntax and Examples | |
---|---|---|
Assignment |
property_name = value_expression price = 10, side = 'BUY' | |
Event Method Invocation |
alias_or_windowname.methodname(...) orderWindow.clear() | |
User-Defined Function Call |
functionname(...)
clearQuantities(orderRow) |
The optional where
clause contains a criteria_expression that correlates the arriving (triggering) event to the events to be updated in the named window. The criteria_expression may also simply filter for events in the named window to be updated.
The iterator
of the EPStatement
object representing the on update
clause can also be helpful: It returns the last batch of updated events in response to the last triggering event, in any order, or null if the last triggering event did not update any rows.
Statements that reference the named window receive the new event in the insert stream and the event prior to the update in the remove stream.
Let's look at a couple of examples. In the simplest form, this statement updates all events in the named window 'AllOrdersNamedWindow' when any 'UpdateOrderEvent' event arrives, setting the price property to zero for all events currently held by the named window:
on UpdateOrderEvent update AllOrdersNamedWindow set price = 0
This example adds a where
clause to the example above. Upon arrival of a triggering 'ZeroVolumeEvent', the statement updates prices on any orders that have a volume of zero or less:
on ZeroVolumeEvent update AllOrdersNamedWindow set price = 0 where volume <= 0
The next example shows a more complete use of the syntax, and correlates the triggering event with events held by the named window:
on NewOrderEvent(volume>0) as myNewOrders update AllOrdersNamedWindow as myNamedWindow set price = myNewOrders.price where myNamedWindow.symbol = myNewOrders.symbol
In the above sample statement, only if a 'NewOrderEvent' event with a volume greater then zero arrives does the statement trigger. Upon triggering, all events in the named window that have the same value for the symbol property as the triggering 'NewOrderEvent' event are then updated (their price property is set to that of the arriving event). The statement also showcases the as
keyword to assign a name for use in the where
expression.
For correlated queries (as above) that correlate triggering events with events held by a named window, Esper internally creates efficient indexes to enable high performance update of events.
Your application can subscribe a listener to your on update
statements to determine update events. The statement post any events that are updated to all listeners attached to the statement as new data, and the events prior to the update as old data. Upon iteration, the statement provides the last update event, if any.
The following example shows the use of tags and a pattern. It sets the price value of orders to that of either a 'FlushOrderEvent' or 'OrderUpdateEvent' depending on which arrived:
on pattern [every ord=OrderUpdateEvent(volume>0) or every flush=FlushOrderEvent] update AllOrdersNamedWindow as win set price = case when ord.price is null then flush.price else ord.price end where ord.id = win.id or flush.id = win.id
When updating indexed properties use the syntax propertyName[
index] =
value with the index value being an integer number.
When updating mapped properties use the syntax propertyName(
key) =
value with the key being a string value.
The engine executes assignments in the order they are listed. When performing multiple assignments, the engine takes the most recent named window event property value according to the last assignment, if any. To instruct the engine to use the initial property value before update, prefix the event property name with the literal initial
.
The following statement illustrates:
on UpdateEvent as upd update MyWindow as win set field_a = 1, field_b = win.field_a, // assigns the value 1 field_c = initial.field_a // assigns the field_a original value before update
The next example assumes that your application provides a user-defined function copyFields
that receives 3 parameters:
The update event, the new row and the initial state before-update row.
on UpdateEvent as upd update MyWindow as win set copyFields(win, upd, initial)
The following example assumes that your event type provides a method by name populateFrom
that receives the update event as a parameter:
on UpdateEvent as upd update MyWindow as win set win.populateFrom(upd)
The following restrictions apply:
Each property to be updated via assignment must be writable.
For underlying event representations that are Java objects, a event object class must implement the java.io.Serializable interface as discussed in Section 5.21.1, “Immutability and Updates” and must provide setter methods for updated properties.
When using an XML underlying event type, event properties in the XML document representation are not available for update.
Nested properties are not supported for update. Revision event types and variant streams may also not be updated.
Your application can update named window rows using on-demand (fire-and-forget, non-continuous) queries as described in Section 14.5, “On-Demand Fire-And-Forget Query Execution”.
The syntax for the update
clause is as follows:
update window_name [as stream_name] set mutation_expression [, mutation_expression [,...]] [where criteria_expression]
The window_name is the name of the named window to delete events from. The as
keyword is also available to assign a name to the named window.
After the set
keyword follows a comma-separated list of mutation expressions. For fire-and-forget queries the following restriction applies: Subqueries, aggregation functions and the prev
or prior
function may not be used in expressions. Mutation expressions are detailed in Section 5.15.8, “Updating Named Windows: the On Update clause”.
The optional where
clause contains a criteria_expression that identifies events to be updated.
The example code snippet updates those rows of the named window that have a negative value for volume:
String query = "update AllOrdersNamedWindow set volume = 0 where volumne = 0"; epService.getEPRuntime().executeQuery(query);
To instruct the engine to use the initial property value before update, prefix the event property name with the literal initial
.
An on delete
clause removes events from a named window. The clause can be used to remove all events, or only events that match certain criteria, or events that correlate with an arriving event or a pattern of arriving events.
The syntax for the on delete
clause is as follows:
on event_type[(filter_criteria)] [as stream_name] delete from window_name [as stream_name] [where criteria_expression]
The event_type is the name of the type of events that trigger removal from the named window. It is optionally followed by filter_criteria which are filter expressions to apply to arriving events. The optional as
keyword can be used to assign a name for use in the where
clause. Patterns and named windows can also be specified in the on
clause as described in the next section.
The window_name is the name of the named window to delete events from. The as
keyword is also available to assign a name to the named window.
The optional where
clause contains a criteria_expression that correlates the arriving (triggering) event to the events to be removed from the named window. The criteria_expression may also simply filter for events in the named window to be removed from the named window.
The iterator
of the EPStatement
object representing the on delete
clause can also be helpful: It returns the last batch of deleted events in response to the last triggering event, in any order, or null if the last triggering event did not remove any rows.
Let's look at a couple of examples. In the simplest form, this statement deletes all events from the named window 'AllOrdersNamedWindow' when any 'FlushOrderEvent' arrives:
on FlushOrderEvent delete from AllOrdersNamedWindow
This example adds a where
clause to the example above. Upon arrival of a triggering 'ZeroVolumeEvent', the statement removes from the named window any orders that have a volume of zero or less:
on ZeroVolumeEvent delete from AllOrdersNamedWindow where volume <= 0
The next example shows a more complete use of the syntax, and correlates the triggering event with events held by the named window:
on NewOrderEvent(volume>0) as myNewOrders delete from AllOrdersNamedWindow as myNamedWindow where myNamedWindow.symbol = myNewOrders.symbol
In the above sample statement, only if a 'NewOrderEvent' event with a volume greater then zero arrives does the statement trigger. Upon triggering, all events in the named window that have the same value for the symbol property as the triggering 'NewOrderEvent' event are then removed from the named window. The statement also showcases the as
keyword to assign a name for use in the where
expression.
For correlated queries (as above) that correlate triggering events with events held by a named window, Esper internally creates efficient indexes to enable high performance removal of events especially from named windows that hold large numbers of events.
Your application can subscribe a listener to your on delete
statements to determine removed events. The statement post any events that are deleted from a named window to all listeners attached to the statement as new data. Upon iteration, the statement provides the last deleted event, if any.
By means of patterns the on delete
clause and on select
clause (described below) can look for more complex conditions to occur, possibly involving multiple events or the passing of time. The syntax for on delete
with a pattern expression is show next:
on pattern [pattern_expression] [as stream_name] delete from window_name [as stream_name] [where criteria_expression]
The pattern_expression is any pattern that matches zero or more arriving events. Tags can be used to name events in the pattern and can occur in the optional where
clause to correlate to events to be removed from a named window.
In the next example the triggering pattern fires every 10 seconds. The effect is that every 10 seconds the statement removes from 'MyNamedWindow' all rows:
on pattern [every timer:interval(10 sec)] delete from MyNamedWindow
The following example shows the use of tags in a pattern:
on pattern [every ord=OrderEvent(volume>0) or every flush=FlushOrderEvent] delete from OrderWindow as win where ord.id = win.id or flush.id = win.id
The pattern above looks for OrderEvent events with a volume value greater then zero and tags such events as 'ord'.
The pattern also looks for FlushOrderEvent events and tags such events as 'flush'. The where
clause deletes from the 'OrderWindow' named window any events that match in the value of the 'id' property either of the arriving events.
Your application can delete rows from a named window using on-demand (fire-and-forget, non-continuous) queries as described in Section 14.5, “On-Demand Fire-And-Forget Query Execution”.
The syntax for the delete
clause is as follows:
delete from window_name [as stream_name] [where criteria_expression]
The window_name is the name of the named window to delete events from. The as
keyword is also available to assign a name to the named window.
The optional where
clause contains a criteria_expression that identifies events to be removed from the named window.
The example code snippet deletes from a named window all rows that have a negative value for volume:
String query = "delete from AllOrdersNamedWindow where volume <= 0"; epService.getEPRuntime().executeQuery(query);
The on merge
clause is similar to the SQL merge
clause. It provides what is known as an "Upsert" operation: Update existing events or if no existing event(s) are found then insert a new event, all in an atomic operation provided by a single EPL statement.
The syntax for the on merge
clause is as follows:
on event_type[(filter_criteria)] [as stream_name] merge [into] window_name [as stream_name] [where criteria_expression] when [not] matched [and search_condition] then [ insert [into streamname] [ (property_name [, property_name] [,...]) ] select select_expression [, select_expression[,...]] [where filter_expression] | update set mutation_expression [, mutation_expression [,...]] [where filter_expression] | delete [where filter_expression] ] [then [insert|update|delete]] [,then ...] [when ... then ... [...]]
The event_type is the name of the type of events that trigger the merge. It is optionally followed by filter_criteria which are filter expressions to apply to arriving events. The optional as
keyword can be used to assign a name for use in the where
clause. Patterns and named windows can also be specified in the on
clause as described in prior sections.
The window_name is the name of the named window to insert, update or delete events. The as
keyword is also available to assign a name to the named window.
The optional where
clause contains a criteria_expression that correlates the arriving (triggering) event to the events to be considered of the named window.
We recommend specifying a criteria expression that is as specific as possible.
Following the where
clause is one or more when matched
or when not matched
clauses in any order. Each may have an additional search condition associated.
After each when [not] matched
follow one or more then
clauses that each contain the action to take: Either an insert
, update
or delete
keyword.
After when not matched
only insert
action(s) are available. After when matched
any insert
, update
and delete
action(s) are available.
After insert
follows, optionally, the into
keyword followed by the stream name or named window to insert-into. If no into
and stream name is specified, the insert applies to the current named window. It follows an optional list of columns inserted. It follows the required select
keyword and one or more select-clause expressions. The wildcard (*
) is available in the select-clause as well. It follows an optional where-clause that may return Boolean false to indicate that the action should not be applied.
After update
follows the set
keyword and one or more mutation expressions. For mutation expressions please see Section 5.15.8, “Updating Named Windows: the On Update clause”.
It follows an optional where-clause that may return Boolean false to indicate that the action should not be applied.
After delete
follows an optional where-clause that may return Boolean false to indicate that the action should not be applied.
When according to the where-clause criteria_expression the engine finds no events in the named window that match the condition, the engine evaluates each when not matched clause. If the optional search condition returns true or no search condition was provided then the engine performs all of the actions listed after each then
.
When according to the where-clause criteria_expression the engine finds one or more events in the named window that match the condition, the engine evaluates each when matched clause. If the optional search condition returns true or no search condition was provided the engine performs all of the actions listed after each then
.
The engine executes when matched
and when not matched
in the order specified. If the optional search condition returns true or no search condition was specified then the engine takes the associated action (or multiple actions for multiple then
keywords). When the block of actions completed the engine proceeds to the next matching event, if any. After completing all matching events the engine continues to the next triggering event if any.
In the first example we declare a schema that provides a product id and that holds a total price:
create schema ProductTotalRec as (productId string, totalPrice double)
We create a named window that holds a row for each unique product:
create window ProductWindow.std:unique(productId) as ProductTotalRec
The events for this example are order events that hold an order id, product id, price, quantity and deleted-flag declared by the next schema:
create schema OrderEvent as (orderId string, productId string, price double, quantity int, deletedFlag boolean)
The following EPL statement utilizes on-merge
to total up the price for each product based on arriving order events:
on OrderEvent oe merge ProductWindow pw where pw.productId = oe.productId when matched then update set totalPrice = totalPrice + oe.price when not matched then insert select productId, price as totalPrice
In the above example, when an order event arrives, the engine looks up in the product named window the matching row or rows for the same product id as the arriving event. In this example the engine always finds no row or one row as the product named window is declared with a unique data window based on product id.
If the engine finds a row in the named window, it performs the update action adding up the price as defined under when matched
. If the engine does not find a row in the named window it performs the insert action as defined under when not matched
, inserting a new row.
The insert
keyword may be followed by a list of columns as shown in this EPL snippet:
// equivalent to the insert shown in the last 2 lines in above EPL ...when not matched then insert(productId, totalPrice) select productId, price
The second example demonstrates the use of a select-clause with wildcard, a search condition and the delete
keyword. It creates a named window that holds order events and employs on-merge to insert order events for which no corresponding order id was found, update quantity to the quantity provided by the last arriving event and delete order events that are marked as deleted:
create window OrderWindow.win:keepall() as OrderEvent
on OrderEvent oe merge OrderWindow pw where pw.orderId = oe.orderId when not matched then insert select * when matched and oe.deletedFlag=true then delete when matched then update set pw.quantity = oe.quantity, pw.price = oe.price
In the above example the oe.deletedFlag=true
search condition instructs the engine to take the delete action only if the deleted-flag is set.
You may specify multiple actions by providing multiple then
keywords each followed by an action. Each of the insert
, update
and delete
actions can itself have a where-clause as well.
If a where-clause exists for an action, the engine evaluates the where-clause and applies the action only if the where-clause returns Boolean true.
This example specifies two update actions and uses the where-clause to trigger different update behavior depending on whether the order event price is less than zero.
This example assumes that the host application defined a clearorder
user-defined function, to demonstrate calling a user-defined function as part of the update mutation expressions:
on OrderEvent oe merge OrderWindow pw where pw.orderId = oe.orderId when matched then update set clearorder(pw) where oe.price < 0 then update set pw.quantity = oe.quantity, pw.price = oe.price where oe.price >= 0
To insert events into another stream and not the named window, use insert
into
streamname.
In the next example each matched-clause contains two actions, one action to insert a log event and a second action to insert, delete or update:
on OrderEvent oe merge OrderWindow pw where pw.orderId = oe.orderId when not matched then insert into LogEvent select 'this is an insert' as name then insert select * when matched and oe.deletedFlag=true then insert into LogEvent select 'this is a delete' as name then delete when matched then insert into LogEvent select 'this is a update' as name then update set pw.quantity = oe.quantity, pw.price = oe.price
While the engine evaluates and executes all actions listed under the same matched-clause in order, you may not rely on updated field values of an earlier action to trigger the where-clause of a later action. Similarly you should avoid simultaneous update and delete actions for the same match: the engine does not guarantee whether the update or the delete take final affect.
For correlated queries (as above) that correlate triggering events with events held by a named window, Esper internally creates efficient indexes to enable high performance update and removal of events especially from named windows that hold large numbers of events.
Your application can subscribe a listener to on merge
statements to determine inserted, updated and removed events. Statements post any events that are inserted to, updated or deleted from a named window to all listeners attached to the statement as new data and removed data. Upon iteration, the statement provides the last inserted events, if any.
The following limitations apply to on-merge statements:
Aggregation functions and the prev
and prior
operators are not available in conditions and the select
-clause.
You may explicitly create an index on a named window. The engine considers explicitly-created as well as implicitly-allocated indexes in query planning and execution of the following types of usages of named windows:
On-demand (fire-and-forget, non-continuous) queries as described in Section 14.5, “On-Demand Fire-And-Forget Query Execution”.
On-select
, on-merge
, on-update
, on-delete
and on-insert
.
Subqueries against named windows.
For joins (including outer joins) with named windows the engine considers the filter criteria listed in parenthesis using the syntax
name_window_name(filter_criteria)
for index access.
Please use the following syntax to create an explicit index on a named window:
create [unique
] index index_name on named_window_name (property [hash|btree]
[, property] [hash|btree] [,...] )
The optional unique keyboard indicates that the property or properties uniquely identify rows. If unique is not specified the index allows duplicate rows.
The index_name is the name assigned to the index. The name uniquely identifies the index and is used in engine query plan logging.
The named_window_name is the name of an existing named window. If the named window has data already, the engine builds an index for the data in the named window.
The list of property names are the properties of events within the named window to include in the index. Following each property name you may specify the optional hash
or btree
keyword.
If you specify no keyword or the hash
keyword for a property, the index will be a hash-based (unsorted) index in respect to that property. If you specify the btree
keyword, the index will be a binary-tree-based sorted index in respect to that property.
You may combine hash
and btree
properties for the same index.
Specify btree
for a property if you expect to perform numerical or string comparison using relational operators (<, >, >=, <=), the between
or the in
keyword for ranges and inverted ranges. Use hash
(the default) instead of btree
if you expect to perform exact comparison using =
.
We list a few example EPL statements next that create a named window and create a single index:
// create a named window create window UserProfileWindow.win:time(1 hour) select * from UserProfile
// create a non-unique index (duplicates allowed) for the user id property only create index UserProfileIndex on UserProfileWindow(userId)
Next, execute an on-demand fire-and-forget query as shown below, herein we use the prepared version to demonstrate:
String query = "select * from UserProfileWindow where userId='Joe'"; EPOnDemandPreparedQuery prepared = epRuntime.prepareQuery(query); // query performance excellent in the face of large number of rows EPOnDemandQueryResult result = prepared.execute(); // ...later ... prepared.execute(); // execute a second time
A unique index is generally preferable over non-unique indexes. If your data window declares a unique data window (std:unique
, std:firstunique
, including intersections and grouped unique data windows) it is not necessary to create a unique index unless index sharing is enabled, since the engine considers the unique data window declaration in query planning.
The engine enforces uniqueness (e.g. unique constraint) for unique indexes. If your application inserts a duplicate row the engine raises a runtime exception when processing the statement and discards the row. The default error handler logs such an exception and continues.
For example, if the user id together with the profile id uniquely identifies an entry into the named window, your application can create a unique index as shown below:
// create a unique index on user id and profile id create unique index UserProfileIndex on UserProfileWindow(userId, profileId)
By default, the engine builds a hash code -based index useful for direct comparison via equals (=). Filter expressions that look for ranges or use in, between
do not benefit from the hash-based index and should use the btree
keyword. For direct comparison via equals (=) then engine does not use btree
indexes.
The next example creates a composite index over two fields symbol
and buyPrice
:
// create a named window create window TickEventWindow.win:time(1 hour) as (symbol string, buyPrice double)
// create a non-unique index create index idx1 on TickEventWindow(symbol hash, buyPrice btree)
A sample fire-and-forget query is shown below (this time the API calls are not shown):
// query performance excellent in the face of large number of rows select * from TickEventWindow where symbol='GE' and buyPrice between 10 and 20
As outlined in Section 2.10, “Updating, Merging and Versioning Events”, revision event types process updates or new versions of events held by a named window.
A revision event type is simply one or more existing pre-configured event types whose events are related, as configured by static configuration, by event properties that provide same key values. The purpose of key values is to indicate that arriving events are related: An event amends, updates or adds properties to an earlier event that shares the same key values. No additional EPL is needed when using revision event types for merging event data.
Revision event types can be useful in these situations:
Some of your events carry only partial information that is related to a prior event and must be merged together.
Events arrive that add additional properties or change existing properties of prior events.
Events may carry properties that have null values or properties that do no exist (for example events backed by Map or XML), and for such properties the earlier value must be used instead.
To better illustrate, consider a revision event type that represents events for creation and updates to user profiles. Lets assume the user profile creation events carry the user id and a full profile. The profile update events indicate only the user id and the individual properties that actually changed. The user id property shall serve as a key value relating profile creation events and update events.
A revision event type must be configured to instruct the engine which event types participate and what their key properties are. Configuration is described in Section 15.4.26, “Revision Event Type” and is not shown here.
Assume that an event type UserProfileRevisions
has been configured to hold profile events, i.e. creation and update events related by user id. This statement creates a named window to hold the last 1 hour of current profiles per user id:
create window UserProfileWindow.win:time(1 hour) select * from UserProfileRevisions
insert into UserProfileWindow select * from UserProfileCreation
insert into UserProfileWindow select * from UserProfileUpdate
In revision event types, the term base event is used to describe events that are subject to update. Events that update, amend or add additional properties to base events are termed delta events. In the example, base events are profile creation events and delta events are profile update events.
Base events are expected to arrive before delta events. In the case where a delta event arrives and is not related by key value to a base event or a revision of the base event currently held by the named window the engine ignores the delta event. Thus, considering the example, profile update events for a user id that does not have an existing profile in the named window are not applied.
When a base or delta event arrives, the insert and remove stream output by the named window are the current and the prior version of the event. Let's come back to the example. As creation events arrive that are followed by update events or more creation events for the same user id, the engine posts the current version of the profile as insert stream (new data) and the prior version of the profile as remove stream (old data).
Base events are also implicitly delta events. That is, if multiple base events of the same key property values arrive, then each base event provides a new version. In the example, if multiple profile creation events arrive for the same user id then new versions of the current profile for that user id are output by the engine for each base event, as it does for delta events.
The expiry policy as specified by view definitions applies to each distinct key value, or multiple distinct key values for composite keys. An expiry policy re-evaluates when new versions arrive. In the example, user profile events expire from the time window when no creation or update event for a given user id has been received for 1 hour.
std:unique
) or unique data window in intersection with other data windows
instead (i.e. std:unique(field).win:time(1 hour)
).
Several strategies are available for merging or overlaying events as the configuration chapter describes in greater detail.
Any of the Map, XML and JavaBean event representations as well as plug-in event representations may participate in a revision event type. For example, profile creation events could be JavaBean events, while profile update events could be java.util.Map
events.
Delta events may also add properties to the revision event type. For example, one could add a new event type with security information to the revision event type and such security-related properties become available on the resulting revision event type.
The following restrictions apply to revision event types:
Nested properties are only supported for the JavaBean event representation. Nested properties are not individually versioned; they are instead versioned by the containing property.
Dynamic, indexed and mapped properties are only supported for nested properties and not as properties of the revision event type itself.
EPL allows declaring an event type via the create schema
clause and also by means of the static or runtime configuration API addEventType
functions. The term schema and event type has the same meaning in EPL.
Your application can declare an event type by providing the property names and types or by providing a class name. Your application may also declare a variant stream schema.
When using the create schema
syntax to declare an event type, the engine automatically removes the event type when there are no started statements referencing the event type, including the statement that declared the event type. When using
the configuration API, the event type stays cached even if there are no statements that refer to the event type and until explicitly removed via the runtime configuration API.
The synopsis of the create schema
syntax providing property names and types is:
create [map | objectarray] schema schema_name [as] (property_name property_type [,property_name property_type [,...]) [inherits inherited_event_type[, inherited_event_type] [,...]] [starttimestamp timestamp_property_name] [endtimestamp timestamp_property_name] [copyfrom copy_type_name [, copy_type_name] [,...]]
The create
keyword can be followed by map
to instruct the engine to represent events of that type by the Map event representation, or objectarray
to denote an Object-array event type. If neither the map
or objectarray
keywords are provided, the engine-wide default event representation applies.
After create schema
follows a schema_name. The schema name is the event type name.
The property_name is an identifier providing the event property name. The property_type is also required for each property. Valid property types are listed in Section 5.18.1, “Creating Variables: the Create Variable clause” and in addition include:
Any Java class name, fully-qualified or the simple class name if imports are configured.
Add left and right square brackets []
to any type to denote an array-type event property.
Use an event type name as a property type.
The optional inherits
keywords is followed by a comma-separated list of event type names that are the supertypes to the declared type.
The optional starttimestamp
keyword is followed by a property name. Use this to tell the engine that your event has a timestamp. The engine checks that the property name exists on the declared type and returns a date-time value.
Declare a timestamp property if you want your events to implicitly carry a timestamp value for convenient use with interval algebra methods as a start timestamp.
The optional endtimestamp
keyword is followed by a property name. Use this together with starttimestamp to tell the engine that your event has a duration. The engine checks that the property name exists on the declared type and returns a date-time value.
Declare an endtimestamp property if you want your events to implicitly carry a duration value for convenient use with interval algebra methods.
The optional copyfrom
keyword is followed by a comma-separate list of event type names. For each event type listed, the engine looks up that type and adds all event property definitions to the newly-defined type, in addition to those listed explicitly (if any).
A few example event type declarations follow:
// Declare type SecurityEvent create schema SecurityEvent as (ipAddress string, userId String, numAttempts int) // Declare type AuthorizationEvent with the roles property being an array of String // and the hostinfo property being a POJO object create schema AuthorizationEvent(group String, roles String[], hostinfo com.mycompany.HostNameInfo) // Declare type CompositeEvent in which the innerEvents property is an array of SecurityEvent create schema CompositeEvent(group String, innerEvents SecurityEvent[]) // Declare type WebPageVisitEvent that inherits all properties from PageHitEvent create schema WebPageVisitEvent(userId String) inherits PageHitEvent // Declare a type with start and end timestamp (i.e. event with duration). create schema RoboticArmMovement (robotId string, startts long, endts long) starttimestamp startts endtimestamp endts // Create a type that has all properties of SecurityEvent plus a userName property create schema ExtendedSecurityEvent (userName string) copyfrom SecurityEvent // Create a type that has all properties of SecurityEvent create schema SimilarSecurityEvent () copyfrom SecurityEvent // Create a type that has all properties of SecurityEvent and WebPageVisitEvent plus a userName property create schema WebSecurityEvent (userName string) copyfrom SecurityEvent, WebPageVisitEvent
To elaborate on the inherits
keyword, consider the following two schema definitions:
create schema Foo as (string prop1)
create schema Bar() inherits Foo
Following above schema, Foo is a supertype or Bar and therefore any Bar event also fulfills Foo and matches where Foo matches. An EPL statement such as select * from Foo
returns any Foo event as well as any event that is a subtype of Foo such as all Bar events. When your EPL queries don't use any Foo events there is no cost, thus inherits
is
generally an effective way to share properties between types. The start and end timestamp are also inherited from any supertype that has the timestamp property names defined.
The optional copyfrom
keyword is for defining a schema based on another schema. This keyword causes the engine to copy property definitions: There is no inherits, extends, supertype or subtype relationship between the types listed.
To define an event type Bar
that has the same properties as Foo
:
create schema Foo as (string prop1)
create schema Bar() copyfrom Foo
To define an event type Bar
that has the same properties as Foo
and that adds its own property prop2
:
create schema Foo as (string prop1)
create schema Bar(string prop2) copyfrom Foo
If neither the map
or objectarray
keywords are provided, and if the create-schema statement provides the @EventRepresentation(array=true)
annotation the engine expects object array events. If the statement provides the @EventRepresentation(array=false)
annotation the engine expects Map objects as events. If neither annotation is provided, the engine uses the configured default event representation as discussed in Section 15.4.11.1, “Default Event Representation”.
The following two EPL statements both instructs the engine to represent Foo events as object arrays. When sending Foo events into the engine use the sendEvent(Object[] data, String typeName)
footprint.
create objectarray schema Foo as (string prop1)
@EventRepresentation(array=true) create schema Foo as (string prop1)
The next two EPL statements both instructs the engine to represent Foo events as Maps. When sending Foo events into the engine use the sendEvent(Map data, String typeName)
footprint.
create map schema Foo as (string prop1)
@EventRepresentation(array=false) create schema Foo as (string prop1)
When using Java classes as the underlying event representation your application may simply provide the class name:
create schema schema_name [as] class_name [starttimestamp timestamp_property_name] [endtimestamp timestamp_property_name]
The class_name must be a fully-qualified class name (including the package name) if imports are not configured. If you application configures imports then the simple class name suffices without package name.
The optional starttimestamp
and endtimestamp
keywords have a meaning as defined earlier.
The next example statements declare an event type based on a class:
// Shows the use of a fully-qualified class name to declare the LoginEvent event type create schema LoginEvent as com.mycompany.LoginValue // When the configuration includes imports, the declaration does not need a package name create schema LogoutEvent as SignoffValue
A variant stream is a predefined stream into which events of multiple disparate event types can be inserted. Please see Section 5.10.3, “Merging Disparate Types of Events: Variant Streams” for rules regarding property visibility and additional information.
The synopsis is:
create variant schema schema_name [as] eventtype_name|* [, eventtype_name|*] [,...]
Provide the variant
keyword to declare a variant stream.
The '*
' wildcard character declares a variant stream that accepts any type of event inserted into the variant stream.
Provide eventtype_name if the variant stream should hold events of the given type only. When using insert into
to
insert into the variant stream the engine checks to ensure the inserted event type or its supertypes match the required event type.
A few examples are shown below:
// Create a variant stream that accepts only LoginEvent and LogoutEvent event types create variant schema SecurityVariant as LoginEvent, LogoutEvent // Create a variant stream that accepts any event type create variant schema AnyEvent as *
EPL offers a convenient syntax to splitting, routing or duplicating events into multiple streams, and for receiving unmatched events among a set of filter criteria.
For splitting a single event that acts as a container and expose child events as a property of itself consider the contained-event syntax as described in Section 5.20, “Contained-Event Selection”.
You may define a triggering event or pattern in the on
-part of the statement followed by multiple insert into
, select
and where
clauses.
The synopsis is:
[context context_name] on event_type[(filter_criteria)] [as stream_name] insert into insert_into_def select select_list [where condition] [insert into insert_into_def select select_list [where condition]] [insert into...] [output first | all]
The event_type is the name of the type of events that trigger the split stream. It is optionally
followed by filter_criteria which are filter expressions to apply to arriving events. The optional as
keyword
can be used to assign a stream name. Patterns and named windows can also be specified in the on
clause.
Following the on
-clause is one or more insert into clauses as described in Section 5.10, “Merging Streams and Continuous Insertion: the Insert Into Clause” and select clauses as described in Section 5.3, “Choosing Event Properties And Events: the Select Clause”.
Each select
clause may be followed by a where
clause containing a condition. If the condition is true for the event, the engine transforms the event according to the select
clause and inserts it into the corresponding stream.
At the end of the statement can be an optional output
clause. By default the engine inserts into the first stream for which the where
clause condition matches if one was specified, starting from the top. If you specify the output all
keywords, then the engine inserts into each stream (not only the first stream) for which the where
clause condition matches or that do not have a where
clause.
If, for a given event, none of the where
clause conditions match, the statement listener receives the unmatched event. The statement listener only receives unmatched events and does not receive any transformed or inserted events. The iterator
method to the statement returns no events.
You may specify an optional context name to the effect that the split-stream operates according to the context dimensional information as declared for the context. See Chapter 4, Context and Context Partitions for more information.
In the below sample statement, the engine inserts each OrderEvent
into the LargeOrders
stream if the order quantity is 100 or larger, or into the SmallOrders
stream if the order quantity is smaller then 100:
on OrderEvent insert into LargeOrders select * where orderQty >= 100 insert into SmallOrders select *
The next example statement adds a new stream for medium-sized orders. The new stream receives orders that have an order quantity between 20 and 100:
on OrderEvent insert into LargeOrders select orderId, customer where orderQty >= 100 insert into MediumOrders select orderId, customer where orderQty between 20 and 100 insert into SmallOrders select orderId, customer where orderQty > 0
As you may have noticed in the above statement, orders that have an order quantity of zero don't match any of the conditions. The engine does not insert such order events into any stream and the listener to the statement receives these unmatched events.
By default the engine inserts into the first insert into
stream without a where
clause or for which the where
clause condition matches. To change the default behavior and insert into all matching streams instead (including those without a where
clause), the output all
keywords may be added to the statement.
The sample statement below shows the use of the output all
keywords. The statement populates both the LargeOrders
stream with large orders as well as the VIPCustomerOrders
stream with orders for certain customers based on customer id:
on OrderEvent insert into LargeOrders select * where orderQty >= 100 insert into VIPCustomerOrders select * where customerId in (1001, 1002) output all
Since the output all
keywords are present, the above statement inserts each order event into either both streams or only one stream or none of the streams, depending on order quantity and customer id of the order event. The statement delivers order events not inserted into any of the streams to the listeners and/or subscriber to the statement.
The following limitations apply to split-stream statements:
Aggregation functions and the prev
and prior
operators are not available in conditions and the select
-clause.
A variable is a scalar, object or event value that is available for use in all statements including patterns. Variables can be used in an expression anywhere in a statement as well as in the output
clause for output rate limiting.
Variables must first be declared or configured before use, by defining each variable's type and name. Variables can be created via the create variable
syntax or declared by runtime or static configuration. Variables can be assigned new values by using the on set
syntax or via the setVariableValue
methods on EPRuntime
. The EPRuntime
also provides method to read variable values.
A variable can be declared constant. A constant variable always has the initial value and cannot be assigned a new value. A constant variable can be used like any other variable and can be used wherever a constant is required. By declaring a variable constant you enable the Esper engine to optimize and perform query planning knowing that the variable value cannot change.
When declaring a class-type or an event type variable you may read or set individual properties within the same variable.
The engine guarantees consistency and atomicity of variable reads and writes on the level of context partition (this is a soft guarantee, see below). Variables are optimized for fast read access and are also multithread-safe.
Variables can also be removed, at runtime, by destroying all referencing statements including the statement that created the variable, or by means of the runtime configuration API.
The create variable
syntax creates a new variable by defining the variable type and name. In alternative to the syntax, variables can also be declared in the runtime and engine configuration options.
The synopsis for creating a variable is as follows:
create [constant] variable variable_type [[]] variable_name [ = assignment_expression ]
Specify the optional constant
keyword when the variable is a constant whose associated value cannot be altered. Your EPL design should prefer constant variables over non-constant variables.
The variable_type can be any of the following:
variable_type : string | char | character | bool | boolean | byte | short | int | integer | long | double | float | object | enum_class | class_name | event_type_name
All variable types accept null values. The object
type is for an untyped variable that can be assigned any value. You can provide a class name (use imports) or a fully-qualified class name to declare a variable of that Java class type including an enumeration class. You can also supply the name of an event type to declare a variable that holds an event of that type.
Append []
to the variable type to declare an array variable. A limitation is that if your variable type is an event type then array is not allowed.
The variable_name is an identifier that names the variable. The variable name should not already be in use by another variable.
The assignment_expression
is optional. Without an assignment expression the initial value for the variable is null
. If present, it supplies the initial value for the variable.
The EPStatement
object of the create variable
statement provides access to variable values. The pull API methods iterator
and safeIterator
return the current variable value. Listeners to the
create variable
statement subscribe to changes in variable value: the engine posts new and old value of the variable to all listeners when the variable value is updated by an on set
statement.
The example below creates a variable that provides a threshold value. The name of the variable is var_threshold
and its type is long
. The variable's initial value is null
as no other value has been assigned:
create variable long var_threshold
This statement creates an integer-type variable named var_output_rate
and initializes it to the value ten (10):
create variable integer var_output_rate = 10
The next statement declares a constant string-type variable:
create constant variable string const_filter_symbol = 'GE'
In addition to creating a variable via the create variable
syntax, the runtime and engine configuration API also allows adding variables. The next code snippet illustrates the
use of the runtime configuration API to create a string-typed variable:
epService.getEPAdministrator().getConfiguration() .addVariable("myVar", String.class, "init value");
The following example declares a constant that is an array of string:
create constant variable string[] const_filters = {'GE', 'MSFT'}
The next example declares a constant that is an array of enumeration values. It assumes the Color
enumeration class was imported:
create constant variable Color[] const_colors = {Color.RED, Color.BLUE}
The engine removes the variable if the statement that created the variable is destroyed and all statements that reference the variable are also destroyed.
The getVariableNameUsedBy
and the removeVariable
methods, both part of the runtime ConfigurationOperations
API, provide use information and can remove a variable. If the variable was added via configuration, it can only be removed via the configuration API.
The on set
statement assigns a new value to one or more variables when a triggering event arrives or a triggering pattern occurs. Use the setVariableValue
methods on EPRuntime
to assign variable values programmatically.
The synopsis for setting variable values is:
on event_type[(filter_criteria)] [as stream_name] set variable_name = expression [, variable_name = expression [,...]]
The event_type is the name of the type of events that trigger the variable assignments. It is optionally followed by filter_criteria which are filter expressions to apply to arriving events. The optional as
keyword can be used to assign an stream name. Patterns and named windows can also be specified in the on
clause.
The comma-separated list of variable names and expressions set the value of one or more variables. Subqueries may by part of expressions however aggregation functions and the prev
or prior
function may not be used in expressions.
All new variable values are applied atomically: the changes to variable values by the on set
statement
become visible to other statements all at the same time. No changes are visible to other processing threads until the on set
statement completed processing, and at that time all changes become visible at once.
The EPStatement
object provides access to variable values. The pull API methods iterator
and safeIterator
return the current variable values for each of the variables set by the statement.
Listeners to the statement subscribe to changes in variable values: the engine posts new variable values of all variables to any listeners.
In the following example, a variable by name var_output_rate
has been declared previously. When a NewOutputRateEvent event arrives, the variable is updated to a new value supplied by the event property 'rate':
on NewOutputRateEvent set var_output_rate = rate
The next example shows two variables that are updated when a ThresholdUpdateEvent arrives:
on ThresholdUpdateEvent as t set var_threshold_lower = t.lower, var_threshold_higher = t.higher
The sample statement shown next counts the number of pattern matches using a variable. The pattern looks for OrderEvent events that are followed by CancelEvent events for the same order id within 10 seconds of the OrderEvent:
on pattern[every a=OrderEvent -> (CancelEvent(orderId=a.orderId) where timer:within(10 sec))] set var_counter = var_counter + 1
A variable name can be used in any expression and can also occur in an output rate limiting clause. This section presents examples and discusses performance, consistency and atomicity attributes of variables.
The next statement assumes that a variable named 'var_threshold' was created to hold a total price threshold value. The statement outputs an event when the total price for a symbol is greater then the current threshold value:
select symbol, sum(price) from TickEvent group by symbol having sum(price) > var_threshold
In this example we use a variable to dynamically change the output rate on-the-fly. The variable 'var_output_rate' holds the current rate at which the statement posts a current count to listeners:
select count(*) from TickEvent output every var_output_rate seconds
Variables are optimized towards high read frequency and lower write frequency. Variable reads do not incur locking overhead (99% of the time) while variable writes do incur locking overhead.
The engine softly guarantees consistency and atomicity of variables when your statement executes in response to an event or timer invocation. Variables acquire a stable value (implemented by versioning) when your statement starts executing in response to an event or timer invocation, and variables do not change value during execution. When one or more variable values are updated via on set
statements, the changes to all updated variables become visible to statements as one unit and only when the on set
statement completes successfully.
The atomicity and consistency guarantee is a soft guarantee. If any of your application statements, in response to an event or timer invocation, execute for a time interval longer then 15 seconds (default interval length), then the engine may use current variable values after 15 seconds passed, rather then then-current variable values at the time the statement started executing in response to an event or timer invocation.
The length of the time interval that variable values are held stable for the duration of execution of a given statement is by default 15 seconds, but can be configured via engine default settings.
A variable of type object
(or java.lang.Object
via the API) can be assigned any value including null. When using an object-type variable in an expression, your statement may need to cast the
value to the desired type.
The following sample EPL creates a variable by name varobj
of type object:
create variable object varobj
The create variable
syntax and the API accept a fully-qualified class name or alternatively the name of an event type. This is useful when you want a single variable to have multiple property values to read or set.
The next statement assumes that the event type PageHitEvent
is declared:
create variable PageHitEvent varPageHitZero
These example statements show two ways of assigning to the variable:
// You may assign the complete event on PageHitEvent(ip='0.0.0.0') pagehit set varPageHitZero = pagehit
// Or assign individual properties of the event on PageHitEvent(ip='0.0.0.0') pagehit set varPageHitZero.userId = pagehit.userId
When using class or event-type variables, in order for the engine to assign property values, the underlying event type must allow writing property values. If using JavaBean event classes the class must have setter methods and a default constructor. The underlying event type must also be copy-able i.e. implement Serializable
or configure a copy method.
Similarly statements may use properties of class or event-type variables as this example shows:
select * from FirewallEvent(userId=varPageHitZero.userId)
Instances method can also be invoked:
create variable com.example.StateChecker stateChecker
select * from TestEvent as e where stateChecker.checkState(e)
Your application can declare an expression or script using the create expression
clause. Such expressions or scripts become available
globally to any EPL statement.
The synopsis of the create expression
syntax is:
create expression expression_or_script
Use the create expression
keywords and append the expression or scripts.
At the time your application creates the create expression
statement the expression or script becomes globally visible.
At the time your application destroys the create expression
statement the expression or script are no longer visible.
Existing statements that use the global expression or script are unaffected.
The syntax and additional examples for declaring an expression is outlined in Section 5.2.8, “Expression Declaration”, which discusses declaring expressions that are visible within the same EPL statement i.e. visible locally only.
When using the create expression
syntax to declare an expression the engine remembers the expression
and allows the expression to be referenced in all other EPL statements.
The below EPL declares a globally visible expression that computes a mid-price:
create expression midPrice { in => (buy + sell) / 2 }
The next EPL returns mid-price for each event:
select midPrice(md) from MarketDataEvent as md
The expression name must be unique for global expressions. It is not possible to declare the same global expression twice with the same name.
Your application can declare an expression of the same name local to a given EPL statement as well as globally using create expression
.
The locally-declared expression overrides the globally declared expression.
The engine validates globally declared expressions at the time your application creates a statement that references the global expression. When a statement references a global expression, the engine uses that statement's type information to validate the global expressions. Global expressions can therefore be dynamically typed and type information does not need to be the same for all statements that reference the global expression.
This example shows a sequence of EPL, that can be created in the order shown, and that demonstrates expression validation at time of referral:
create expression minPrice {(select min(price) from OrderWindow)}
create window OrderWindow.win:time(30) as OrderEvent
insert into OrderWindow select * from OrderEvent
// Validates and incorporates the declared global expression select minPrice() as minprice from MarketData
The syntax and additional examples for declaring scripts is outlined in Chapter 18, Script Support, which discusses declaring scripts that are visible within the same EPL statement i.e. visible locally only.
When using the create expression
syntax to declare a script the engine remembers the script
and allows the script to be referenced in all other EPL statements.
The below EPL declares a globally visible script in the JavaScript dialect that computes a mid-price:
create expression midPrice(buy, sell) [ (buy + sell) / 2 ]
The next EPL returns mid-price for each event:
select midPrice(buy, sell) from MarketDataEvent
The engine validates globally declared scripts at the time your application creates a statement that references the global script. When a statement references a global script, the engine uses that statement's type information to determine parameter types. Global scripts can therefore be dynamically typed and type information does not need to be the same for all statements that reference the global script.
The script name in combination with the number of parameters must be unique for global scripts. It is not possible to declare the same global script twice with the same name and number of parameters.
Your application can declare a script of the same name and number of parameters that is local to a given EPL statement as well as globally using create expression
.
The locally-declared script overrides the globally declared script.
Contained-event selection is for use when an event contains properties that are themselves events, or more generally when your application needs to split an event into multiple events. One example is when application events are coarse-grained structures and you need to perform bulk operations on the rows of the property graph in an event.
Use the contained-event selection syntax in a filter expression such as in a pattern, from
clause, subselect, on-select and on-delete. This section provides the synopsis and examples.
To review, in the from
clause a contained_selection may appear after the event stream name and filter criteria, and before any view specifications.
The synopsis for contained_selection is as follows:
[select select_expressions from] contained_expression [@type(eventtype_name)] [as alias_name] [where filter_expression]
The select
clause and select_expressions are optional and may be used to select specific properties of contained events.
The contained_expression is required and returns individual events. The expression can, for example, be an event property name that returns an event fragment, i.e. a property that can itself be represented as an event by the underlying event representation. Simple values such as integer or string are not fragments. The expression can also be any other expression such as a single-row function or a script that returns either an array or a java.util.Collection
of events.
Provide the @type(name)
annotation after the contained expression to name the event type of events returned by the expression. The annotation is optional and not needed when the contained-expression is an event property that returns a class or other event fragment.
The alias_name can be provided to assign a name to the expression result value rows.
The where
clause and filter_expression is optional and may be used to filter out properties.
As an example event, consider a media order. A media order consists of order items as well as product descriptions. A media order event can be represented as an object graph (POJO event representation), or a structure of nested Maps (Map event representation) or a XML document (XML DOM or Axiom event representation) or other custom plug-in event representation.
To illustrate, a sample media order event in XML event representation is shown below. Also, a XML event type can optionally be strongly-typed with an explicit XML XSD schema that we don't show here. Note that Map and POJO representation can be considered equivalent for the purpose of this example.
Let us now assume that we have declared the event type MediaOrder
as being represented by the root node <mediaorder>
of such XML snip:
<mediaorder> <orderId>PO200901</orderId> <items> <item> <itemId>100001</itemId> <productId>B001</productId> <amount>10</amount> <price>11.95</price> </item> </items> <books> <book> <bookId>B001</bookId> <author>Heinlein</author> <review> <reviewId>1</reviewId> <comment>best book ever</comment> </review> </book> <book> <bookId>B002</bookId> <author>Isaac Asimov</author> </book> </books> </mediaorder>
The next query utilizes the contained-event selection syntax to return each book:
select * from MediaOrder[books.book]
The result of the above query is one event per book. Output events contain only the book properties and not any of the mediaorder-level properties.
Note that, when using listeners, the engine delivers multiple results in one invocation of each listener. Therefore listeners to the above statement can expect a single invocation passing all book events within one media order event as an array.
To better illustrate the position of the contained-event selection syntax in a statement, consider the next two queries:
select * from MediaOrder(orderId='PO200901')[books.book]
The above query the returns each book only for media orders with a given order id. This query illustrates a contained-event selection and a view:
select count(*) from MediaOrder[books.book].std:unique(bookId)
The sample above counts each book unique by book id.
Contained-event selection can be staggered. When staggering multiple contained-event selections the staggered contained-event selection is relative to its parent.
This example demonstrates staggering contained-event selections by selecting each review of each book:
select * from MediaOrder[books.book][review]
Listeners to the query above receive a row for each review of each book. Output events contain only the review properties and not the book or media order properties.
The following is not valid:
// not valid select * from MediaOrder[books.book.review]
The book
property in an indexed property (an array or collection) and thereby requires an index in order to determine which book to use. The expression books.book[1].review
is valid and means all reviews of the second (index 1) book.
The contained-event selection syntax is part of the filter expression and may therefore occur in patterns and anywhere a filter expression is valid.
A pattern example is below. The example assumes that a Cancel
event type has been defined that also has an orderId
property:
select * from pattern [c=Cancel -> books=MediaOrder(orderId = c.orderId)[books.book] ]
When used in a pattern, a filter with a contained-event selection returns an array of events, similar to the match-until clause in patterns. The above statement returns, in the books
property, an
array of book events.
The optional select
clause provides control over which fields are available in output events. The expressions in the select-clause apply only to the properties available underneath the property in the from
clause, and the properties of the enclosing event.
When no select
is specified, only the properties underneath the selected property are available in output events.
In summary, the select
clause may contain:
Any expressions, wherein properties are resolved relative to the property in the from
clause.
Use the wildcard (*
) to provide all properties that exist under the property in the from
clause.
Use the alias_name.*
syntax to provide all properties that exist under a property in the from
clause.
The next query's select
clause selects each review for each book, and the order id as well as the book id of each book:
select * from MediaOrder[select orderId, bookId from books.book][select * from review] // ... equivalent to ... select * from MediaOrder[select orderId, bookId from books.book][review]]
Listeners to the statement above receive an event for each review of each book. Each output event has all properties of the review row, and in addition the bookId
of each book and the orderId
of the order.
Thus bookId
and orderId
are found in each result event, duplicated when there are multiple reviews per book and order.
The above query uses wildcard (*)
to select all properties from reviews. As has been discussed as part of the select
clause, the wildcard (*
) and property_alias.*
do not copy properties for performance reasons. The wildcard syntax instead specifies the underlying type, and additional properties are added onto that underlying type if required. Only one wildcard (*
) and property_alias.*
(unless used with a column rename) may therefore occur in the select
clause list of expressions.
All the following queries produce an output event for each review of each book. The next sample queries illustrate the options available to control the fields of output events.
The output events produced by the next query have all properties of each review and no other properties available:
select * from MediaOrder[books.book][review]
The following query is not a valid query, since the order id and book id are not part of the contained-event selection:
// Invalid select-clause: orderId and bookId not produced. select orderId, bookId from MediaOrder[books.book][review]
This query is valid. Note that output events carry only the orderId
and bookId
properties and no other data:
select orderId, bookId from MediaOrder[books.book][select orderId, bookId from review] //... equivalent to ... select * from MediaOrder[select orderId, bookId from books.book][review]
This variation produces output events that have all properties of each book and only reviewId
and comment
for each review:
select * from MediaOrder[select * from books.book][select reviewId, comment from review] // ... equivalent to ... select * from MediaOrder[books.book as book][select book.*, reviewId, comment from review]
The output events of the next EPL have all properties of the order and only bookId
and reviewId
for each review:
select * from MediaOrder[books.book as book] [select mediaOrder.*, bookId, reviewId from review] as mediaOrder
This EPL produces output events with 3 columns: a column named mediaOrder
that is the order itself, a column named book
for each book and a column named review
that holds each review:
insert into ReviewStream select * from MediaOrder[books.book as book] [select mo.* as mediaOrder, book.* as book, review.* as review from review as review] as mo
// .. and a sample consumer of ReviewStream... select mediaOrder.orderId, book.bookId, review.reviewId from ReviewStream
Please note these limitations:
Sub-selects, aggregation functions and the prev
and prior
operators are not available in contained-event selection.
Expressions in the select
and where
clause of a contained-event selection can only reference properties relative to the current event and property.
The optional where
clause may be used to filter out properties at the same level that the where-clause occurs.
The properties in the filter expression must be relative to the property in the from
clause or the enclosing event.
This query outputs all books with a given author:
select * from MediaOrder[books.book where author = 'Heinlein']
This query outputs each review of each book where a review comment contains the word 'good':
select * from MediaOrder[books.book][review where comment like 'good']
This section discusses contained-event selection in joins.
When joining within the same event it is not required that views are specified. Recall, in a join or outer join there must be views specified that hold the data to be joined. For self-joins, no views are required and the join executes against the data returned by the same event.
This query inner-joins items to books where book id matches the product id:
select book.bookId, item.itemId from MediaOrder[books.book] as book, MediaOrder[items.item] as item where productId = bookId
Query results for the above query when sending the media order event as shown earlier are:
book.bookId | item.itemId |
---|---|
B001 | 100001 |
The next example query is a left outer join. It returns all books and their items, and for books without item it returns the book and a null
value:
select book.bookId, item.itemId from MediaOrder[books.book] as book left outer join MediaOrder[items.item] as item on productId = bookId
Query results for the above query when sending the media order event as shown earlier are:
book.bookId | item.itemId |
---|---|
B001 | 100001 |
B002 | null |
A full outer join combines the results of both left and right outer joins. The joined table will contain all records from both tables, and fill in null
values for missing matches on either side.
This example query is a full outer join, returning all books as well as all items, and filling in null
values for book id or item id if no match is found:
select orderId, book.bookId,item.itemId from MediaOrder[books.book] as book full outer join MediaOrder[select orderId, * from items.item] as item on productId = bookId order by bookId, item.itemId asc
As in all other continuous queries, aggregation results are cumulative from the time the statement was created.
The following query counts the cumulative number of items in which the product id matches a book id:
select count(*) from MediaOrder[books.book] as book, MediaOrder[items.item] as item where productId = bookId
The unidirectional
keyword in a join indicates to the query engine that aggregation state is not cumulative. The next query counts the number of items in which the product id matches a book id for each event:
select count(*) from MediaOrder[books.book] as book unidirectional, MediaOrder[items.item] as item where productId = bookId
The next example splits an event representing a sentence into multiple events in which each event represents a word. It represents all events and the logic to split events into contained events as Java code. The next chapter has additional examples that use Map-type events and put contained-event logic into a separate expression or script.
The sentence event in this example is represented by a class declared as follows:
public class SentenceEvent { private final String sentence; public SentenceEvent(String sentence) { this.sentence = sentence; } public WordEvent[] getWords() { String[] split = sentence.split(" "); WordEvent[] words = new WordEvent[split.length]; for (int i = 0; i < split.length; i++) { words[i] = new WordEvent(split[i]); } return words; } }
The sentence event as above provides an event property words
that returns each word event.
The declaration of word event is also a class:
public class WordEvent { private final String word; public WordEvent(String word) { this.word = word; } public String getWord() { return word; } }
The EPL statement to populate a stream of words from a sentence event is:
insert into WordStream select * from SentenceEvent[words]
Finally, the API call to send a sentence event to the engine is shown here:
epService.getEPRuntime().sendEvent(new SentenceEvent("Hello Word Contained Events"));
The examples herein are not based on the POJO events of the prior example. They are meant to demonstrate different types of contained-event expressions and the use of @type(
type_name)
to identify the event type of the return values of the contained-event expression.
The example first defines a few sample event types:
create schema SentenceEvent(sentence String)
create schema WordEvent(word String)
create schema CharacterEvent(char String)
The following EPL assumes that your application defined a plug-in single-row function by name splitSentence
that returns an array of Map, producting output events that are WordEvent
events:
insert into WordStream select * from SentenceEvent[splitSentence(sentence)@type(WordEvent)]
The example EPL shown next invokes a JavaScript function which returns some events of type WordEvent
:
expression Collection js:splitSentenceJS(sentence) [ importPackage(java.util); var words = new ArrayList(); words.add(Collections.singletonMap('word', 'wordOne')); words.add(Collections.singletonMap('word', 'wordTwo')); words; ] select * from SentenceEvent[splitSentenceJS(sentence)@type(WordEvent)]
In the next example the sentence event first gets split into words and then each word event gets split into character events via an additional splitWord
single-row function,
producing events of type CharacterEvent
:
select * from SentenceEvent [splitSentence(sentence)@type(WordEvent)] [splitWord(word)@type(CharacterEvent)]
The update istream
statement allows declarative modification of event properties of events entering a stream. Update is a pre-processing step to each new event, modifying an event before the event applies to any statements.
The synopsis of update istream
is as follows:
update istream event_type [as stream_name] set property_name = set_expression [, property_name = set_expression] [,...] [where where_expression]
The event_type is the name of the type of events that the update
applies to.
The optional as
keyword can be used to assign a name to the event type for use with subqueries, for example.
Following the set
keyword is a comma-separated list of property names and expressions that provide the event properties to change and values to set.
The optional where
clause and expression can be used to filter out events to which to apply updates.
Listeners to an update
statement receive the updated event in the insert stream (new data) and the event prior to the update in the remove stream (old data). Note that if there are multiple update statements that all apply to the same event then the engine will ensure that the output events delivered to listeners or subscribers are consistent with the then-current updated properties of the event (if necessary making event copies, as described below, in the case that listeners are attached to update statements). Iterating over an update statement returns no events.
As an example, the below statement assumes an AlertEvent
event type that has properties named severity
and reason
:
update istream AlertEvent set severity = 'High' where severity = 'Medium' and reason like '%withdrawal limit%'
The statement above changes the value of the severity
property to "High" for AlertEvent
events that have a medium severity and contain a specific reason text.
Update statements apply the changes to event properties before other statements receive the event(s) for processing, e.g. "select * from AlertEvent
" receives the updated AlertEvent
. This is true regardless of the order in which your application creates statements.
When multiple update statements apply to the same event, the engine executes updates in the order in which update statements are created. We recommend the @Priority
EPL annotation to define a deterministic order of processing updates, especially in the case where update statements get created and destroyed dynamically or multiple update statements update the same fields. The update statement with the highest @Priority
value applies last.
The update
clause can be used on streams populated via insert into
, as this example utilizing a pattern demonstrates:
insert into DoubleWithdrawalStream select a.id, b.id, a.account as account, 0 as minimum from pattern [a=Withdrawal -> b=Withdrawal(id = a.id)]
update istream DoubleWithdrawalStream set minimum = 1000 where account in (10002, 10003)
When using update
with named windows, any changes to event properties apply before an event enters the named window.
Consider the next example (shown here with statement names in @Name EPL annotation, multiple EPL statements):
@Name("CreateWindow") create window MyWindow.win:time(30 sec) as AlertEvent @Name("UpdateStream") update istream MyWindow set severity = 'Low' where reason = '%out of paper%' @Name("InsertWindow") insert into MyWindow select * from AlertEvent @Name("SelectWindow") select * from MyWindow
The UpdateStream
statement specifies an update
clause that applies to all events entering the named window. Note that update
does not apply to events already in the named window at the time an application creates the UpdateStream
statement, it only applies to new events entering the named window (after an application created the update
statement).
Therefore, in the above example listeners to the SelectWindow
statement as well as the CreateWindow
statement receive the updated event,
while listeners to the InsertWindow
statement receive the original AlertEvent
event (and not the updated event).
Subqueries can also be used in all expressions including the optional where
clause.
This example demonstrates a correlated subquery in an assignment expression and also demonstrates the optional as
keyword. It assigns the phone
property of an AlertEvent
event a new value based on the lookup within all unique PhoneEvent
events (according to an empid
property) correlating the AlertEvent
property reporter
with the empid
property of PhoneEvent
:
update istream AlertEvent as ae set phone = (select phone from PhoneEvent.std:unique(empid) where empid = ae.reporter)
When updating indexed properties use the syntax propertyName[
index] =
value with the index value being an integer number.
When updating mapped properties use the syntax propertyName(
key) =
value with the key being a string value.
When using update
, please note these limitations:
Expressions may not use aggregation functions.
The prev
and prior
functions may not be used.
For underlying event representations that are Java objects, a event object class must implement the java.io.Serializable
interface as discussed below.
When using an XML underlying event type, event properties in the XML document representation are not available for update.
Nested properties are not supported for update. Revision event types and variant streams may also not be updated.
When updating event objects the engine maintains consistency across statements. The engine ensures that an update to an event does not impact the results of statements that look for or retain the original un-updated event. As a result the engine may need to copy an event object to maintain consistency.
In the case your application utilizes Java objects as the underlying event representation and an update
statement updates properties on an object, then in order to maintain consistency across statements
it is necessary for the engine to copy the object before changing properties (and thus not change the original object).
For Java application objects, the copy operation is implemented by serialization. Your event object must therefore implement the java.io.Serializable
interface to become eligible for update. As an alternative to serialization, you may instead configure a copy method as part of the event type configuration via ConfigurationEventTypeLegacy
.
The engine delivers all result events of a given statement to the statement's listeners and subscriber (if any) in a single invocation of each listener and subscriber's update
method passing an array of result events. For example, a statement using a time-batch view may provide many result events after a time period passes, a pattern may provide multiple matching events or in a join the join cardinality could be multiple rows.
For statements that typically post multiple result events to listeners the for
keyword controls the number of invocations of the engine to listeners and subscribers and the subset of all result events delivered by each invocation.
This can be useful when your application listener or subscriber code expects multiple invocations or expects that invocations only receive events that belong together by some additional criteria.
The for
keyword is a reserved keyword. It is followed by either the grouped_delivery
keyword for grouped delivery or the discrete_delivery
keyword for discrete delivery. The for
clause is valid after any EPL select statement.
The synopsis for grouped delivery is as follows:
... for grouped_delivery (group_expression [, group_expression] [,...])
The group_expression expression list provides one or more expressions to apply to result events. The engine invokes listeners and subscribers once for each distinct set of values returned by group_expression expressions passing only the events for that group.
The synopsis for discrete delivery is as follows:
... for discrete_delivery
With discrete delivery the engine invokes listeners and subscribers once for each result event passing a single result event in each invocation.
Consider the following example without for
-clause. The time batch data view collects RFIDEvent events for 10 seconds and posts an array of result events:
select * from RFIDEvent.win:time_batch(10 sec)
Let's consider an example event sequence as follows:
Table 5.7. Sample Sequence of Events for For
Keyword
Event |
---|
RFIDEvent(id:1, zone:'A') |
RFIDEvent(id:2, zone:'B') |
RFIDEvent(id:3, zone:'A') |
Without for
-clause and after the 10-second time period passes, the engine delivers an array of 3 events in a single invocation to listeners and the subscriber.
The next example specifies the for
-clause and grouped delivery by zone:
select * from RFIDEvent.win:time_batch(10 sec) for grouped_delivery (zone)
With grouped delivery and after the 10-second time period passes, the above statement delivers result events in two invocations to listeners and the subscriber: The first invocation delivers an array of two events that contains zone A events with id 1 and 3. The second invocation delivers an array of 1 event that contains a zone B event with id 2.
The next example specifies the for
-clause and discrete delivery:
select * from RFIDEvent.win:time_batch(10 sec) for discrete_delivery
With discrete delivery and after the 10-second time period passes, the above statement delivers result events in three invocations to listeners and the subscriber: The first invocation delivers an array of 1 event that contains the event with id 1, the second invocation delivers an array of 1 event that contains the event with id 2 and the third invocation delivers an array of 1 event that contains the event with id 3.
Remove stream events are also delivered in multiple invocations, one for each group, if your statement selects remove stream events explicitly via irstream
or rstream
keywords.
The insert into
for inserting events into a stream is not affected by the for
-clause.
The delivery order respects the natural sort order or the explicit sort order as provided by the order by
clause, if present.
The following are known limitations:
The engine validates group_expression expressions against the output event type, therefore all properties specified in group_expression expressions must occur in the select
clause.