Solution Patterns

Key Concepts

  1. What is CEP?
  2. What is an event?
  3. What is an event stream or simply "stream"?
  4. What is an event cloud compared to a stream?

Introductory

  1. How do I look for specific events on a stream, dropping the unwanted events?
  2. How do I aggregate several (simple) events from a stream into a single new (complex) event summarizing event properties of the simple events?
  3. How do I limit the unbounded events of a stream to a defined timespan or number of events?
  4. How do I correlate events from one or several streams on a common set of values?

Troubleshooting

  1. My query presents incorrect or incomplete results? What can I do?
  2. My query does not compile?
  3. How do I design reducing memory use? I have an out-of-memory error and don't know the origin?
  4. I'm getting a stack overflow error?

Design Tips

  1. Simple Statements
  2. EPL Editor
  3. Few General Design Tips
  4. Understand Subqueries and Joins
  5. Performance
  6. Offline Analysis - Analyzing Historical Data

EPL Questions

  1. How do I detect N events within X seconds?
  2. How to I compute a percentage or ratio?
  3. How do I measure the rate of arrival of events in a given time period? Per category?
  4. I want to get average, count, min and max for a 60-second time window?
  5. How to continually maintain customer ids that represent the top X customers with total purchases across a sliding time window?
  6. I want to compute and compare the maximum of all events since start and the maximum for the last 10 seconds?
  7. How can I aggregate large sliding windows and/or aggregate without retaining incoming events?
  8. How to calculate aggregations per partition and control output externally?
  9. How do I correlate events arriving in 2 or more streams?
  10. How do I correlate events arriving out-of-order?
  11. How do I use patterns to correlate events arriving in-order or out-of-order?
  12. How to implement an On-Off window? How to detect events between other events?
  13. How do I look for pairs of immediately-followed events?
  14. How do I correlate 3 events in a time window in which events have similar properties?
  15. How do I remove all events from a window and start over?
  16. How do I combine data windows and their expiry polices? Or define custom logic for removing events?
  17. How do I seed an empty data window from a filled data window?
  18. How do I keep a separate window of events per category and compute aggregates for each category's window?
  19. How do I use results of one statement in another statement?
  20. How do I reduce the rate of event output by my statement? How do I get frequent but not continuous results?
  21. How do I delay data? How do I compare against previous events?
  22. How do I detect the absence of an event?
  23. How do I detect the absence of an event and the presence of an event arriving too late?
  24. How do I report at a regular interval without any incoming events?
  25. How do I find missing events arriving in 2 or more streams that are correlated?
  26. How do I look into the past and consider missing events? How do I do a snapshot, fire-and-forget or on-demand query?
  27. How do I detect the absence of multiple unrelated events that are known in advance? Is there a way to simplify hundreds or thousands of similar statements into one global statement? How do I "prime" or initialize a large number of similar patterns without creating a pattern statement for each pattern instance?
  28. How to filter events based on parent-child relationship? How to detect events that are neither preceded nor followed by a related event?
  29. How do I detect when a sensor value holds a threshold for X seconds?
  30. How do I detect something really complex, like a triple-bottom pattern?
  31. How do I stop an insert after a period of time?
  32. Can I use a regular expression (regexp) within a filter?
  33. How can I remove duplicates?
  34. What if I want to form pairs of events where each pair is a unique combination of the latest event of two streams?
  35. How do I remove or drop events from a stream? I have a dynamic set of negative filters?
  36. How do I detect a specific sequence of events and get all events that take part in this sequence?
  37. How to implement a sell trailing stop order?
  38. I have one listener for multiple statements, how do I track which statement generated a result?
  39. Is there a way to receive the list of all events that participated in a window? I'm looking for a way to show the user the cause of the alert.
  40. We have our own math library, what are the options of utilizing it? How can I make calls out of the EPL into our own existing code?
  41. Can I use SOAP, WS-*, RESTful, JMS, RPC or other remote calls?
  42. What to do if my events are not JavaBeans, not well-defined, their properties are not known in advance and may differ wildly, and are nested?
  43. How do I query nested indexed events? Or more generally, event objects contained in event objects?
  44. How to calculate a weighted average over multible time ranges?
  45. How to compute for 5-minute buckets? How to aggregate (e.g.vwap) n buckets of k minutes each?
  46. How can I execute a query only every X seconds?
  47. How do I create a recursive query? That is a query that feeds to itself and defines it's own input stream as output?
  48. We have an event stream for cars entering and leaving streets and want a car count per street? How do I insert or update and keep a count?
  49. I would like to trigger whenever the event contains a new and distinct security id and the source is one of the {A,B,C}?
  50. Start a window for each unique id when it arrives, wait 10 seconds for that id and output the last event for that window, starting a new window when the next event arrives for that id?
  51. How to perform an aggregation over a "semantic" window, i.e. a window that opens upon the arrival of a given event and closes upon the arrival of another event?
  52. I have two streams A and B that I want to join based on an "id" value that occurs exactly once per event type, and they can come any order.
  53. I have a port scan detector and would like to create a new event based on multiple events occurring over a given period of time?
  54. How to get notified each time a certain value has increased by a specified amount i.e. each time the value is greater than the value + x?

Common API Questions

  1. I want to know what streams an EPL statement references but don't want to parse the EPL string? I want to programmatically inspect and change the select clause columns when a user enters an EPL query, how to do this?
  2. How do I store statements in a file? Is there a standard file storage?
  3. How can I parse and validate an EPL statement?
  4. Can I create a statement in "stopped" state, attach listeners or subscribers and start it?
  5. Can I use an UpdateListener to listen to events and the iterator-based pull-API together?
  6. I want to know if an event hits at least one registered query?
  7. How to get a single callback with the list of matching subscriptions?
  8. When to use a plug-in aggregation function and when to use a plug-in custom view?
  9. When to use on-demand fire-and-forget queries versus on-select predefined queries?
  10. How do I integrate with the Spring framework? How to use Spring support for Groovy or other scripting languages with EPL?
  11. How to change statements at runtime?
  12. Can Esper support a distributed cache or data grid?

Key Concepts

What is CEP?

Complex Event Processing, or CEP, is primarily an event processing concept that deals with the task of processing multiple events with the goal of identifying the meaningful events within the event cloud.

CEP employs techniques such as detection of complex patterns of many events, event correlation and abstraction, event hierarchies, and relationships between events such as causality, membership, and timing, and event-driven processes.

(source: wikipedia.org).

[top]


What is an event?

An event is an immutable record of a past occurrence of an action or state change. Event properties capture the useful information for an event.

Or.... "Something that happens" (source: webster.com).

Typically, the following is true for an event:

It's anchored in time. It's not under your control.

An event can itself contain further events. Event properties contain may contain rich and nested domain-specific information.

[top]


What is an event stream or simply "stream"?

A time ordered sequence of events in time.

A stream is append-only, one cannot remove events (conceptually), one can just add them to the sequence.

A stream is unbounded, i.e. there is no end to the sequence {event1, event2, event3, event4, ..., eventN}.

Streams are the basic building blocks of event stream processing. Similar to functions in a functional programming language, a CEP engine treats computation as the evaluation of streaming data.

Streams in Esper have zero cost in terms of memory or processing. You could register millions of streams.

[top]


What is an event cloud compared to a stream?

A stream is a time ordered sequence of events in time, while a cloud is unordered.

For example, as valid stream is {{1s, event1}, {2s, event2}, {4s, event3}}.

A cloud is unordered e.g. {{1s, event1}, {4s, event2}, {2s, event3}}.

[top]

Introductory

How do I look for specific events on a stream, dropping the unwanted events?

Consider a stream of temperature sensor events that provide a temperature value. This query looks for all sensor events (named SensorEvent) where the temperature is greater then 90:

select * from SensorEvent where temperature > 90

[top]


How do I aggregate several (simple) events from a stream into a single new (complex) event summarizing event properties of the simple events?

This sample outputs the average temperature of all sensor events received from the start of the query:

select avg(temperature) from SensorEvent

[top]


How do I limit the unbounded events of a stream to a defined timespan or number of events?

There is many different flavors of data windows, which serve to limit the view on the unbound stream of data.

The next sample outputs the average temperature of all sensor events received within the last 1 minute:

select avg(temperature) from SensorEvent.win:time(1 min)

[top]


How do I correlate events from one or several streams on a common set of values?

Consider the stream of sensor events and a stream of sensor location events, assuming our sensors can move around geographically.

This sample query joins the last sensor event per sensor id with the last sensor location (SensorLocationEvent) for each sensor identified by a sensor id (sensorId):

select temperature, coordinates
from SensorEvent.std:unique(sensorId) as sensor,
     SensorLocationEvent.std:unique(sensorId) as loc
where sensor.sensorId = loc.sensorId

You may ask why the "std:unique(sensorId)" is needed here. When joining two streams, in this example and as is often the case we are looking to join the last event per key of each stream.

[top]

Troubleshooting

My query presents incorrect or incomplete results? What can I do?

Use @Audit to turn on statement-level processing debug output which is logged under informational level to your output, please see the documentation for more details. For example: "@Audit select * from MyEvent". @Audit also accepts a number of keywords to control which aspects of processing are output and which are not.

Make sure each event object is its own object. As Esper cannot detect when properties of an event object are changed by your application, it is best not to change event properties of an existing event object, and it is best not to reuse an existing event object when sending an event into the engine.

The UpdateListener interface presents an array of output events (and not just one event). Make sure to inspect all events in the array output.

Note that the engine does not process statements in the order they are created. Instead the engine processes events as they come in, and therefore the statements they apply to, in any order. If you do want to dictate to the engine a particular order of execution, use @Priority.

Consider adding expressions to the select-clause so that you can inspect what the expression result is.

If you have listeners that send events into the engine upon receipt of events from the engine, those listeners must use the "route" method to send events.

Turn on logging as described in the documentation to see warning message that the engine may log, or exceptions raised by the engine or your code, or to trace event processing. Ensure to turn off logging when deploying to production.

Consider adding a user-defined function to your EPL statement to output application information or to set a breakpoint.

If you want to see remove stream events posted to listeners, the "irstream" and "rstream" keywords in the select clause select the remove stream.

For aggregation queries, note that the presence or absence of an event property in the select clause and with or without group-by clause can change what is output by the engine. See "3.7.2. Output for Aggregation and Group-By" in the doc.

The Reporting Issues page on this site provides further information for resolving issues. When submitting code to the forums please submit the smallest test class as can be, remove any randomness and send only the smallest number of events to reproduce the problem.

[top]


My query does not compile?

For syntax errors, the query compiler provides line and column numbers as part of the error message.

You could try a simpler query first and then add to the query.

[top]


How do I design reducing memory use? I have an out-of-memory error and don't know the origin?

The documentation in the performance section includes information on designing statements for reduced memory use and things to look for to prevent memory "leaks".

[top]


I'm getting a stack overflow error?

This can happen if your listener or subscriber code sends in an event via "sendEvent" method and that event triggers a listener again which may send in another event (applies to default threading model). Use the "route" method to send events into the engine from a listener.

[top]

Design Tips

Simple Statements

We recommend keeping your EPL statements simple and easy-to-read: camelCase or CamelCase for property names works best, and event type names usually are CamelCase as well.

Consider splitting a single, complex statement into multiple EPL statements, where the first statement produces data via insert-into into a stream that a second statement consumes.

Consider using expression declarations, which allow factoring out common expressions into a "expression abc{...}".

[top]


EPL Editor

While Esper does not ship with an EPL editor, there are a few choices available.

Esper Enterprise Edition from EsperTech offers an editor.

For Netbeans IDE are available following plugins: EPL ErrorChecking, EPL FileSupport, EPL SyntaxHighlighter, EPL dependency visualiser, They were created for usage of company Mycroft Mind by students of Masaryk University and are free to use.

For Eclipse the Eclipse SQL Editor is also quit usable for EPL Statements. To use it, do the following: . Open Eclipse . Preferences / Install Software . Work with: http://download.eclipse.org/datatools/updates . Select Eclipse Data Tools Platform (latest version) . Windows / Preferences / General/ Editors / File Associations -> assign SQL Editor as default to *.epl Syntax Highlighting does work quit well, but of course there is no content assist.

[top]


Few General Design Tips

For many use cases and requirements there are typically multiple solutions in EPL. This discussion provides useful hints for newcomers. It is not comprehensive. You will find more information on each idea discussed herein in the documentation. We recommend looking at the examples and coming up with your own EPL statements and testing them out.

For example, consider sensor stream data produced by devices such as temperature sensors, pressure sensors or valve movement sensors for customers and regions.

One aspect of statement design will need to consider the specificity of EPL statements. What should the EPL statement be specific to, i.e. which sub-stream of all the sensor data should an EPL statement focus on? In relationship to the sensor stream example, you may decide to create an EPL statement per device specifying the device id, an EPL statement for a group of devices by listing multiple device ids, an EPL statement for a device type (i.e. valve sensors), an EPL statement for a given customer id that owns the sensor, or an EPL statement that analyzes all devices or combinations of these. You typically solve specificity by using filters such as "TemperatureSensor(customerId=10)" or TemperatureSensor(deviceId='A0001')" as part of statements. In that respect consider using constant variables (declared as const) so that certain constants are defined only once and need not occur multiple times.

You may search for the right EPL construct to use. Should I use an EPL subquery, join or outer join? Or an EPL pattern? Or combine a join and a pattern? Or use a sub-query? While there is no general answer, since you are likely processing high-arrival-rate streams, its best to understand, at least at a high level, what the engine does for each such construct. Joins use relational algebra and query planning, so they are very suited to one-to-many relationships and fast index-based lookup. Outer joins can handle absence of events. Joins also handle out-of-order arrival well and time-based relationship testing can be accomplished with Alan�s interval algebra in the where-clause. EPL patterns are dynamically-managed state trees. They are very suitable to detect time-based relationship with presence or absence of events and other complex situations based on event arrival order (reorder if needed using time order window). EPL match-recognize pattern matching is based on NFA and most suitable to find patterns among ordered events.

You may consider whether to use a data window, what the right data window to use is and whether to use named windows or not. The most frequently used data windows are unique (std:unique), last event (std:lastevent) and time window (win:time). Use named windows when you need custom merge, update or expiry logic or when the same data window is used in multiple EPL statements. You can also use no data window, so the engine does not retain any events. Context partitions are often useful here to control when state should be discarded.

Use a data window if you want to retain events. Don't specify a data window unless you need to retain events.

You can aggregate with and without data windows. Use aggregation with data windows if you can and want to afford to retain events. Use aggregation with context partitions if you do not want to retain events.

We recommend using filters where possible for filtering out events, such as for example "StockTick(symbol='IBM')". We recommend using the where-clause only for correlation criteria.

[top]


Understand Subqueries and Joins

Joins are multidirectional: When new data arrives on any of the joined streams the join (Cartesian) product of the data windows is output (joined or limited by the where-clause if present) and aggregated (if using aggregations).

You can make joins unidirectional: Thus the join only outputs data when an event arrives for the single stream marked as unidirectional and no output occurs for other streams. Unidirectional is also useful when you want to trigger a join from a time-based pattern, for example "select sum(xyz) from pattern[every timer:interval(10)] unidirectional, MyEvent.win:time(100), ... where ...".

Consider replacing the join with one or more sub-queries. Subqueries are often easier to read and conceptually clearer. Subqueries don't have multidirectional evaluation. The query planning and indexing for fast lookup can be very different between subqueries and joins. Please see the performance tips section in the docs for a comparison of subqueries and joins.

When using joins with aggregations, consider splitting the statement into multiple simpler statements: Aggregate and output to a stream in one statement and use the output stream(s) in a second statement that joins or sub-queries without also aggregating. Using separate streams gives you the flexibility to report or debug each individually, do the output rate controlling you prefer and reuse the streams for other stuff.

[top]


Performance

To obtain the best possible startup and runtime performance for your design, we have compiled valuable performance tips in a section on performance in the documentation. Please also see the "On Performance" page under "Esper for Java" on the left side page menu.

[top]


Offline Analysis - Analyzing Historical Data

We have seen quite a few use cases around analyzing data offline. One example is to detect fraud in health care by replaying recorded past claims and prescriptions. Or detect fraud by replaying bank transactions at the end of a business day since they were not available real-time. Another is fleet tracking to determine when trucks are not on schedule considering up to 2 weeks of past data.

Esper EPL can detect complex events regardless of whether the data is offline and historical, or whether the data is online and currently arriving (or a mix thereof). Trying to do this analysis using a relational database and SQL can be difficult when relationships become complex and may include the time dimension. Since the engine is fast it is often feasible to replay large numbers of events.

Start by disabling the internal timer in the configuration passed when obtaining an engine instance:

Configuration config = new Configuration();
config.getEngineDefaults().getThreading().setInternalTimerEnabled(false);

Set time to zero or to the oldest event's timestamp:

runtime.sendEvent(new CurrentTimeEvent(0));

Next create a bunch of EPL statements.

From your event source, read events and send events into the engine advancing time (here entry is your event object):

CurrentTimeEvent timeEvent = new CurrentTimeEvent(entry.timestamp);
cepRT.sendEvent(timeEvent);
cepRT.sendEvent(entry);

It is not necessary to advance time for every event that your application sends in. Instead and as an optimization you could decide on a time resolution and advance time according to that resolution (i.e. every 100 milliseconds).

Replay events from multiple threads to gain the best performance, given your EPL statements are well laid out using context partitions for example. It is not necessary to synchronize application events with timer control events (CurrentTimeEvent or time span event instances, see doc). If you are concerned with a defined order of processing application and timer control events, consider using a CountDownLatch to trigger timer processing. We would recommend reordering events that are not in time order before sending events into the engine.

Also, when your application is done replaying events, it could switch on the internal timer again using an Esper class called TimerControlEvent and start sending online currently-arriving events. See the API chapter in the doc for additional information.

All this works for EsperHA the same as for Esper. With EsperHA your application can snapshot state at any time and continue from snapshot state.

[top]

EPL Questions

How do I detect N events within X seconds?

The next statement outputs the count of events within the last 3 minutes:

select count(*) from MyEvent.win:time(3 min)

By adding a having-clause the query only outputs a result when at least 5 events occur within 3 minutes:

select count(*) from MyEvent.win:time(3 min) having count(*) >= 5

By adding a filter the engine looks for events that match a certain criteria:

select count(*) from MyEvent(somefield = 10).win:time(3 min) having count(*) >= 5

This query also suppresses output for 3 minutes after a first match occurs:

select count(*) from MyEvent(somefield = 10).win:time(3 min) having count(*) >= 5 output first every 3 minutes

This query also outputs the events themselves:

select count(*), window(*) from MyEvent(somefield = 10).win:time(3 min) having count(*) >= 5 output first every 3 minutes

The query could also have a group-by clause to count per group. Of course and as always, in multithreaded use it is ensured that the event entering the window and the count(*) firing is atomic. The count can not jump i.e. count is incremental.

[top]


How to I compute a percentage or ratio?

Let's assume we have an event that indicates whether an item is black or white. Assume the boolean-type "black" property is true when the item is black.

Most aggregation functions allow passing a filter expression as a parameter to the aggregation function. This solution passes the boolean "black" property as the filter expression to the count aggregation function to compute the running percentage of items that are black compared to all items:

select count(*, black)/count(*) as pct from BlackWhiteItem

One could also formulate the same query as follows:

select count(*, black = true)/count(*) as pct from BlackWhiteItem

[top]


How do I measure the rate of arrival of events in a given time period? Per category?

The "rate" built-in aggregation function computes arrival rate. It can also be used when timestamp values are provided by your event.

This computes the per-second rate using engine timestamps by averaging 5 seconds of events, and outputs the rate every second:

select rate(5) from MarketDataEvent output snapshot every 1 sec

This computes the per-second rate using event timestamps (timestamp being a property of your event), also considering the last 5 seconds of events:

select rate(timestamp) from MarketDataEvent.win:time(5 sec) output snapshot every 1 sec

For per-category computation of a rate, lets assume the category is the "feed" fields of the MarketDataEvent.

Create a keyed segmented context, instructing the engine that your dimension of analysis is the "feed" value:

create context SegmentedByFeed partition by feed from MarketDataEvent

The engine keeps a context partition per "feed" value, as locking is per context partition this achieves the highest concurrency.

Select the rate:

context SegmentedByFeed select feed, rate(timestamp) from MarketDataEvent.win:time(5 sec) output snapshot every 1 sec

You may alternatively use the rate aggregation function with group-by as follows:

select feed, rate(timestamp) from MarketDataEvent.win:time(5 sec) group by feed output snapshot every 1 sec

[top]


I want to get average, count, min and max for a 60-second time window?

There are a few solutions to consider, mostly depending on how often you want to compute aggregations (continuously or once on output) and output a result (continuously or once after 60 sec) and whether you want the engine to retain events in memory or not.

Lets assume we want to output only once after 60 seconds, start fresh every 60 seconds, compute aggregations continuously and don't retain any events in memory for those 60 seconds (only retain aggregated values in memory). This is the two statements that accomplish this:

// Activate now and every 1-minute, terminating after 1 minute
create context CtxEachMinute initiated @now and pattern [every timer:interval(1 min)] terminated after 1 minute
context CtxEachMinute select avg(value) as avgValue, count(value) as countValue,
min(value) as minValue, max(value) as maxValue from MyEvent
output snapshot when terminated

Another possible solution is to use a batch window like the following EPL shows. This EPL asks the engine to keep 60 seconds of events in memory, perform the aggregation computation once (not continuously) for all events at the end of 60 seconds, and then output the result.

select avg(value) as avgValue, count(value) as countValue,
min(value) as minValue, max(value) as maxValue from MyEvent.win:time_batch(60 sec)

A third possible solution uses a rolling time window as shown below. The engine always retains 60 seconds of events and outputs every time the aggregations change either when new events arrive or when events leave the time window. This output is continuous and aggregation computation is continuous as well.

select avg(value) as avgValue, count(value) as countValue,
min(value) as minValue, max(value) as maxValue from MyEvent.win:time(60 sec)

The first solution (the one with CtxEachMinute) may be preferable for many cases, however solutions two and three can have an advantage depending on what output you are looking for and when you want to compute aggregated values.

[top]


How to continually maintain customer ids that represent the top X customers with total purchases across a sliding time window?

I have a single stream of events that contain 3 properties, a Sale ID, a Customer ID, and a sale amount. I want to continually maintain a set of Customer IDs that represent the top X customers with total purchases across a sliding time window.

For example, I'd like to know who my top 30 purchasers based off of sales for the last 7 days. I'm trying to find a solution that provides me with a place to put an UpdateListener that will process the insert and remove streams maintaining a highspeed cache (like memcached, but not necessarily) other programs can leverage.

The solution utilizes two statements, one to compute the total per customer over a time window, and the second to rank the output of the first statement.

// define input event
create map schema BuyEvent as (saleid int, customerid int, amount double);

// compute current total per customer
insert into CustomerTotals
select customerid, sum(amount) as total
from BuyEvent.win:time(7 days) cbe
group by customerid;

// output insert and remove stream of top 30 customers.
@Name('TopCustomers')
select irstream customerid, total
from CustomerTotals.ext:rank(customerid, 30, total desc);

[top]


I want to compute and compare the maximum of all events since start and the maximum for the last 10 seconds?

Compute the maximum of all the values over the total time, output at a 1-minute resolution:

insert into HistoricStream select max(e) as maxE from MyEvent output last every 1 minute

Compute the maximum for the last 10 seconds:

insert into Last10Sec select max(e) as maxE from MyEvent.win:time(10)

Compare, for example using a subquery:

select * from Last10Sec where maxE > (select maxE from HistoricStream.std:lastevent())

Alternatively, you could put all in one statement as shown next. This sample EPL doesn't compare but also outputs average and the value itself:

select e, max(e), avg(e), select (max(local.e) as localMax, avg(local.e) as localAvg from MyEvent.win:time(10)) from MyEvent

[top]


How can I aggregate large sliding windows and/or aggregate without retaining incoming events?

You can use a strategy called 2-level aggregation. The first level of aggregation forms 1-second buckets of counts and totals. The second level aggregation considers only the buckets provided by the first level (and not the original event data points).

Lets start with the first level aggregation that computes 1-second totals. We use a context declaration that instructs the engine to start counting now and restart every 1 second. This way the engine aggregates for 1 second and then after the second passes it outputs the aggregated values, discards existing aggregations and aggregates for another second, and so on.

create context PerSecond start @now end after 1 second

The context above controls our aggregation lifecycle. We need a second statement that computes the aggregation and outputs a stream of aggregated totals.

The next statement is still part of the first level aggregation and outputs aggregated buckets into a stream called OneSecondBucket:

// First level aggregation
context PerSecond
insert into OneSecondBucket
select type, sum(value) as secTotal
from MyEvent group by type output snapshot when terminated

The EPL above outputs a total per type every second into the OneSecondBucket stream. The OneSecondBucket stream is now available to any other statement that is interested in per-second totals. Note that it retains none of the MyEvent events in memory since it does not declare a data window. We could instead have used a time-batch window of 1-second but that would retain events in memory which we don't want.

We can now have a statement that takes the OneSecondBucket totals and reports 1-hour totals:

// Second level aggregation
select type, sum(secTotal) from OneSecondBucket.win:time(1 hour) group by type

The above statement outputs a total per type every second, summing up 1 hour of MyEvent events. The max number of OneSecondBucket events retained is 60*60 per type. The resolution is 1 second.

If you have a large number of different dimensions to report, here is some additional advice. The example above only has a single "type" dimension, to keep the example simple. If you have X different dimensions, consider using a named window instead of "OneSecondBucket.win:time(1 hour)" so that the data points are shared between statements computing different dimensions. Use the group-by with rollup feature to compact multiple dimensions into a single statement.

[top]


How to calculate aggregations per partition and control output externally?

We have input events that have fields called key1 and key2 which logically partition events. We want to calculate aggregates for each partition (combination of key1 and key2) and control externally when that specific partition should end.

The sample statements here compute average temperature and count for sensor events. A nested context is useful in that is can partition by keys and when a control events arrives can stop counting and trigger output. The control event that ends the counting and triggers output in below EPL is MyEndEvent.

create context CtxPerKeysAndExternallyControlled
  context PartitionedByKeys
    partition by
      key1, key2 from SensorEvent
  context StopWhenEndArrives
    start SensorEvent as e1
    end MyEndEvent(key1=e1.key1 and key2=e1.key2)
context CtxPerKeysAndExternallyControlled
select key1, key2, avg(temp) as avgTemp, count(*) as cnt
from SensorEvent
output snapshot when terminated
// note: no group-by needed since partioned by keys

You could also maintain multiple overlapping context partitions for each key1 and key2 value. The previous example is non-overlapping while this example allows overlapping context partitions (i.e. simple counts in this example). This example statement initiates a new context partition when a MyInitEvent arrives and terminates all context partitions for a given key1 and key2 value when a MyTermEvent arrives.

create context CtxPerKeysAndExternallyControlled
        context PartitionedByKeys
          partition by
            key1, key2 from MyInitEvent
            key1, key2 from SensorEvent
        context InitiateAndTerm
          initiated by MyInitEvent as e1
          terminated by MyTermEvent(key1=e1.key1 and key2=e1.key2)

(the counting statement is the same as above)

[top]


How do I correlate events arriving in 2 or more streams?

The join of event streams looks very similar to joins in SQL. To bind data in the streams together, across streams, we identify keys to join on.

The below example specifies the 'accountNumber' field as the only join key. In this example we hold the last 30 seconds of events for each stream.

select fraud.accountNumber as accntNum, withdraw.amount as amount
from FraudWarningEvent.win:time(30 sec) as fraud,
     WithdrawalEvent.win:time(30 sec) as withdraw
where fraud.accountNumber = withdraw.accountNumber

[top]


How do I correlate events arriving out-of-order?

Let's assume we have three different types of events, all having a common attribute 'exchangeId'. Let's call the events start, finished and aborted.

Let's expect exactly one start event and multiple finished or aborted events for every exchange_id. The start event may happen after the finished or aborted events, but they all happen within say 30 seconds per exchangeId.

There are multiple possible answers to this problem. One solution can be an outer join using time windows, and looking at the remove stream since we care about the composite events when a start event leaves the time window after 30 sec, when the other events for the same exchange id have accumulated.

select rstream * from
  StartEvent.win:time(30 sec) start
    left outer join
  AbortedEvent.win:time(30 sec) abort
    on about.exchangeId = start.exchangeId
    left outer join
  FinishedEvent.win:time(30 sec) finished
    on finished.exchangeId = start.exchangeId

In the example above, every time a StartEvent leaves the time window it takes with it all aborted and finished events. The abort property will be null if no abort occurred.

Use an inner join instead, like below, if all events are mandatory but can come in any order. This query joins by "id" keeping each last event per id for up to 5 minutes.

select * from
Event(conditions).std:unique(id).win:time(5 min) as s1,
Event(conditions).std:unique(id).win:time(5 min) as s2,
Event(conditions).std:unique(id).win:time(5 min) as s3
where s1.id = s2.id and s1.id = s3.id

Another solution is shown next using patterns to detect out-of-order events.

[top]


How do I use patterns to correlate events arriving in-order or out-of-order?

The prior discussion focused on 3 kinds of events: start, finished and aborted.

A second possible solution can be found in using patterns. If one doesn't really care about processing the multiple aborted events and simply wants to get a failed or finished indication when the first aborted event or finished event is encountered, then a simpler approach can be specifying a pattern for each interesting combinations, some of which are shown below.

select * from pattern [every s=StartEvent ->
  a=AbortedEvent(exchangeId = s.echangeId) where timer:within(30 sec)]

The above pattern simply looks for aborted transactions and reacts to the first aborted event coming in after a start event. The pattern to detect finished transactions, i.e. where no abort occurred, should look about like this:

select * from pattern [every s=StartEvent ->
  (f=FinishedEvent(exchangeId = s.echangeId) where timer:within(30 sec)
    and not AbortedEvent(exchangeId = s.echangeId) where timer:within(30 sec)]

To detect out-of-order events, the pattern can certainly be reversed:

select * from pattern [every a=AbortedEvent ->
  s=StartEvent(exchangeId = s.echangeId) where timer:within(30 sec)]

[top]


How to implement an On-Off window? How to detect events between other events?

A use case wants to select all tuples that are between two tuples. For example, assume that I want all tuples between the first tuple with PARAM1=2 and the first tuple after this one with PARAM2=0. This would select all tuples with time between 3 and 8 in the example below.

TIME | PARAM1 | PARAM2
1 1 0
2 1 1
3 2 2 <== ON
4 2 3
5 3 5
6 3 4
7 2 3
8 2 0 <== OFF
9 3 1

This seems best solved with a pattern with followed-by and unbound repeat, such as:

select * from pattern [
  beginevent=Event(param1 = 2, param2 = 2)
    -> middleevent=Event(param1 != beginevent.param1, param2 != 0)
         until endevent=Event(param1 = beginevent.param1, param2 = 0)
  ]

[top]


How do I look for pairs of immediately-followed events?

While one could use an EPL pattern statement to look for the absence of any other event between two events, i.e. (A -> (B and not C)), the match-recognize regular-expression pattern matching (proposed for SQL standard) provides a good solution.

The next statement utilizes match-recognize to look for two immediately-followed events that both have the same origin (partition-clause) and where the first event has a picked flag value of false and the second event has a picked flag value of true.

select * from AlertEventStream
  match_recognize (
    partition by origin
    measures a1.alarmNumber as alarmNumber1, a1.alarmNumber as alarmNumber2, a1.origin as origin
    pattern (a1 a2)
    define
      a1 as a1.picked = false
      a2 as a2.picked = true
)

[top]


How do I correlate 3 events in a time window in which events have similar properties?

My application needs to match 3 events which occur within a time window where 3 different users submit trades with similar properties. The properties that must have the same value for each of the 3 events matched is currency and direction. The pattern is to match only if all 3 events have a different user. The time window should be 10 minutes long.

The pattern that solves this problem is shown below. It uses the timer:within pattern guard to limit the lifetime of each active sub-expression to 10 minutes.

every trade1=Trade(userId in ('U1000','U1001','U1002') ) ->
  (trade2=Trade(userId in ('U1000','U1001','U1002') and
     userId != trade1.userId and ccypair = trade1.ccypair
     and direction = trade1.direction) ->
   trade3=Trade(userId in ('U1000','U1001','U1002') and
     userId not in (trade1.userId, trade2.userId) and
     ccypair = trade1.ccypair and direction = trade1.direction))
  ) where timer:within(10 min)

[top]


How do I remove all events from a window and start over?

You have a need for a data window that can detect a certain situation, and if that situation occurs you want to start fresh and remove all events?

Named windows and the on-delete clause address this need. This sample declares a named window to hold MyEvent events:

create window MyWindow.win:keepall() as select * from MyEvent

Populate the named window from all arriving MyEvent events:

insert into MyWindow select * from MyEvent

Detect the situation, in the below example the query looks at the average wait time per train station:

insert into WarningStream
select trainStation, avg(waitTime) as avgWait
from MyWindow
group by trainStation
having avg(waitTime) > 60

Use the WarningStream events to remove from the named window:

on WarningStream delete from MyWindow

[top]


How do I combine data windows and their expiry polices? Or define custom logic for removing events?

The documentation outlines the built-in views, some of which combine length and time based expiry.

Another good place to look at is a named window. Named windows provide an on-delete clause that helps to build or combine a custom strategy for when to remove events.

In addition, multiple data windows can also be combined via the retain-union and retain-intersection keywords.

Next, we'll show the named window and on-delete option. Let's start with a named window that keeps the last 1 minute of events:

create window MyWindow.win:time(1 min) select * from MyEvent

This example EPL removes from the named window those that have the same id:

on MyDeleteEvent as d delete from MyWindow as w where w.id = d.id

This example EPL removes non-unique rows by category, so that only the last event for each category stays in the named window. It therefore selects the remove stream (rstream) of the unique window (2 statements):

insert rstream into MyNonUnique select rstream id from MyEvent.std:unique(category)
on MyNonUnique as d delete from MyWindow as w where w.id = d.id

Variables can also be a useful way to parameterize an expiry policy. The next sample EPL assumes that a variable by name CURRENT_THRESHOLD was declared and employs a pattern to execute every 20 seconds:

on pattern[every timer:interval(20 sec)]
delete from MyWindow where threshold > CURRENT_THRESHOLD

Last, a plug-in view implementation may be the right way to go if you want to parameterize it special ways or need integration into the EPL language or want to use the Esper scheduling APIs.

[top]


How do I seed an empty data window from a filled data window?

This is a feature of named windows. When a named window is filled already, and a new statement gets created on a filled named window, that statement's aggregation does not start empty.

Also, named window may be initialized from other named windows. Look up the "insert" keyword in the create window clause.

[top]


How do I keep a separate window of events per category and compute aggregates for each category's window?

I have one or more categories and for each of these categories I need to keep a separate window of events.

We present two approaches here: The first approach shown here uses a partitioned context, and the second approach is based on the grouped data windows.

The First Approach uses a partitioned context. The first statement declares a partitioned context wherein each context partition carries events for a different symbol value. The second statement outputs the average price for the last 10 events, separately for each symbol.

create context PerSymbolContext partition by symbol on StockTick
context PerSymbolContext select symbol, avg(price) as avgPrice from StockTick.win:length(10)

You could also use a category context to categorize events based on some expression, or a hash context to assign events based on consistent hash code.

The Second Approach uses grouped data windows.

In the statement below we have stock tick events for which we want to compute the average price of the last 10 stock tick events per symbol. Notice we are not using the last 10 events overall, we are looking at the last 10 events per symbol.

select symbol, avg(price) as avgPrice
from StockTick.std:groupwin(symbol).win:length(10)
group by symbol

We can also specify multiple categories:

select symbol, location, avg(price) as avgPrice
from StockTick.std:groupwin(symbol,location).win:length(10)
group by symbol, location

Let's consider another possible way of using a separate window of events per category. In some use cases we may need to compute not an average per group, but an average over all groups that consider only the last N events per group. This can be accomplished by leaving the group-by clause off. Now the engine computes the average price over all symbols, considering only the last 10 events per symbol:

select symbol, location, avg(price) as avgPrice
from StockTick.std:groupwin(symbol).win:length(10)

[top]


How do I use results of one statement in another statement?

Use the insert into syntax to use the events generated by one statement as input to another statement.

We can first compute the number of events arriving within 1 second, then use that number to perform additional aggregation. Here we compute for the last 30 seconds the maximum and minimum rate per feed (2 statements).

insert into TicksPerSecond select feed, rate(1) as cnt
from MarketDataEvent
group by feed
output snapshot every 1 sec
select feed, max(cnt) as maxCount, min(cnt) as minCount
from TicksPerSecond.win:time(30 sec)
group by feed

[top]


How do I reduce the rate of event output by my statement? How do I get frequent but not continuous results?

Use output rate limiting to stabilize or reduce the rate at which rows are output from a query, by outputting rows at a specified time or row-based interval.

The example below limits the otherwise continuous output to an output row every 5 seconds. The output contains the feed and average volume per feed of the last 60 seconds of market data events.

select feed, avg(volume) as cnt from MarketDataEvent.win:time(60 sec)
group by feed
output every 5 seconds

[top]


How do I delay data? How do I compare against previous events?

There are a few different approaches that this section outlines.

Your application may need to delay events for a certain time. A simple way to delay data is to enter the data into a time window and select the remove stream which is the data leaving the window:

insert rstream into DelayedStream select rstream task, measurement, rate from CurrentStream.win:time(10 min)

In order to compare current data with delayed data, one possible way is to join delayed data and current data. For example:

select d.task, d.measurement, d.rate - c.rate as delta
from CurrentStream as c unidirectional, DelayedStream.std:lastevent() as d
where d.task = c.task and d.measurement = c.measurement

This example uses the "unidirectional" keyword. The keyword is useful to indicate that results are only output when events of one stream arrive, and not the other. In this example, when events of the DelayedStream there is no output.

Here is an alternative way using the "output snapshot" keywords instead. This example executes a join and post results only every 1 minute:

select d.task, d.measurement, d.rate - c.rate as delta
from CurrentStream.std:lastevent() as c, DelayedStream.std:lastevent() as d
where d.task = c.task and d.measurement = c.measurement
output snapshot every 1 minute

Instead of the join, the "prev" previous-event function could be used to fetch and compare data from previous rows. This is useful if the arrival intervals of the stream are known:

select task, measurement, rate - prev(4, rate) as delta
from CurrentStream.win:time(5 min)

The "prev" previous-event function also works well with the "std:groupwin" view in that is operates per-group when used with this view, for example:

select rate, prev(1, rate) from MyStream.std:groupwin(task).win:length(2)

A pattern statement can also be a great approach to form pairs or triplets (or any other combination of old and current events) and insert the pattern-matched events into a new stream for further processing, or use the select-clause to determine deltas as this sample statement shows:

select a.qty - b.qty, a.acct, a.instr from pattern  [
  every a=OrderStatus -> b=OrderStatus(acct=a.acct, instr=a.instr)
]

...or...

insert into MyStream select a, b from pattern [every a=OrderStatus -> b=OrderStatus(acct=a.acct, instr=a.instr)]
select a.qty - b.qty from MyStream

[top]


How do I detect the absence of an event?

Use a pattern to detect the absence of an event. The below pattern fires if an event A is not followed by an event B within 10 seconds.

select * from pattern [every EventA -> (timer:interval(10 sec) and not EventB)]

Outer joins are also a good way to detect missing events. A solution with an outer join was discussed above.

[top]


How do I detect the absence of an event and the presence of an event arriving too late?

Let's say we want to detect 2 situations: a) A Down event is not followed by an Up event, i.e. the Up event for the same equipment id is not coming in within 1 minute b) A Down event is followed by an Up event 30 seconds or more after the Down event, for the same equipment id as the Up event

select * from pattern [
  every down=MyEvent(text='down') ->
  (
    (timer:interval(1 min) and not up=MyEvent(text='Up', equipmentId=down.equipmentId))
      or
    ( (timer:interval(30 sec) and not MyEvent(text='Up', equipmentId=down.equipmentId))
        ->
      up=MyEvent(text='Up', equipmentId=down.equipmentId) where timer:within(30 seconds)
    )
  )]

[top]


How do I report at a regular interval without any incoming events?

Let's say we want to have our listener get invoked every 5 seconds, and select the last value, if any, from a stream.

select (select price from MarketData.std:lastevent()) as price
from pattern [every timer:interval(5 sec)]

The pattern fires every 5 seconds causing the sub-select to take place, returning null if no MarketData events have come in, or returning the price column of the last MarketData event.

[top]


How do I find missing events arriving in 2 or more streams that are correlated?

As in SQL we can use outer joins to generate a result even if one or more of the correlated events are not found in a stream. Usually we want to generate the result after a certain time or after a certain number of events have been received, indicating that a correlated event is truly missing.

In this example we are looking for a withdrawal event without a login event for the same account number after 60 seconds.

We join withdrawal events with login events looking for login events that do not exist (account number is null). We want to get notified as these events leave the 60-second time window.

select withdraw.accountNumber as accntNum, withdraw.amount as amount
from WithdrawalEvent.win:time(60 sec) as withdraw
     left outer join
     LoginEvent.win:time(60 sec) as login
on login.accountNumber = withdraw.accountNumber
where login.accountNumber is null

[top]


How do I look into the past and consider missing events? How do I do a snapshot, fire-and-forget or on-demand query?

I have the good old StockTrades stream with two fields: symbol and price. I'm trying to answer the following question "Were Company X stock actions priced above $70 during any moment of the last 5 minutes?".

Unfortunately, using a simple time-based sliding window won't work. To see why, imagine there were only two price updates: the first, at 10:54 am stated that stocks were at $71. The second, 2 minutes later, notified that the stocks went down to $69. Now imagine that the above question was posed at 11:00 am (of the same day). We, humans, know that the answer is "yes" because the price was $71 between 10:54 and 10:56. But the first event is outside the 5 minutes window and will thus be ignored by the system.

Since the question is posed at 11:00am and the question result is expected to be aware of the events that arrived before 11:00am, the solution seems to require that some events or perhaps only some aggregated information about events must be retained from before 11:00am.

Also, the "were" in the question "Were Company X stock actions priced above $70 during any moment of the last 5 minutes?" indicates that this is a snapshot on-demand query that should return results just once, i.e. the result in not expected to be continuous but a simple tuple as a result of fire-and-forget.

In Esper the solution could be a named window that is created in the morning, say 9am. Esper also supports snapshot on-demand (fire-and-forget) queries against named windows through API and JDBC inward-facing driver for reporting. The named window would need to hold the price and also the prior price to ensure that its not missing the the drop from $71 to $69. The Esper EPL queries would be something like below:

This set of statements would be created at 9am (2 statements):

create window TickWindow.win:time(5 min) (price double, priorprice double)
insert into TickWindow select price, prior(1, price) as priorprice from StockTickEvent

This question posed at 11:00am via snapshot EPL query against the named window:

select * from TickWindow where price > 70 or priorprice > 70

Alternative solutions are as follows. One could use the on-select instead of a fire-and-forget query, as on-select is able to compile and maintain the proper index to speed up repeated execution. A second solution may change the queries to keep only the maximum, instead of each datapoint, however then the on-demand queries must be limited to questions towards the max value.

Here is the syntax to use a predefine query via on-select instead of a fire-and-forget query:

on MyMaxQueryEvent as limit select * from TickWindow where price > limit.pricelimit or priorprice > limit.pricelimit

When the question is posed at 11am one can send in a MyMaxQueryEvent with the pricelimit property set to 70 and the listener to the on-select statement gets the result.

[top]


How do I detect the absence of multiple unrelated events that are known in advance? Is there a way to simplify hundreds or thousands of similar statements into one global statement? How do I "prime" or initialize a large number of similar patterns without creating a pattern statement for each pattern instance?

We would like to detect the absence of certain events in the event stream. All possible tuples are known beforehand. Let's say a tuple simple consists of the IP-Address. The possible tuples might look like the following:

(ip=192.168.1.1), (ip=192.168.1.2), ... many more..., (ip=192.168.1.3)

Every one hour, we would like to know when one of those known tuples is not present in the event stream. The pattern EPL might look like:

select * from pattern [every (timer:interval(1 hours) and not IPEvent(ip=’192.168.1.1’))]

We would like to simplify this so hundreds or thousands of similar statements are reduced to one global statement, and send in a primer event to initialize each IP address looked for.

The single pattern that reacts to primer events:

select * from pattern [every p=PrimerEvent -> (every (timer:interval(1 hours) and not IPEvent(ip=p.ip)))]

Starting from this simple pattern, one could also add additional control events, such as to indicate when to end a looking for an IP.

Another possible solution may utilize a named window to hold the last event per client id and source id, and a second named window to hold only those IP to report on. Then, in intervals, one could select those events where the timestamp is older then two hours and that exist in the second named window, via on-select for example.

[top]


How to filter events based on parent-child relationship? How to detect events that are neither preceded nor followed by a related event?

I have a network scenario in which ports are entities of a device. For instance if I am receiving a port down event followed by device down event or device down event followed by port down event, I should drop port down event from reporting, since the parent (device) itself down before or after some period of time.

A solution was to delay events and then outer join, something like:

select * from pattern [every ine=StatusEvent -> timer:interval(2 sec)] as dse
    unidirectional left outer join
  StatusEvent(status="down").win:time(4 sec) as wsde
    on dse.ine.parent=wsde.id
  where dse.ine.status="up" or wsde.id is null

This approach uses "unidirectional" to state that only the pattern drives the join, and delays events using a pattern and "timer-interval" as part of the join from-clause.

Note the check "where ... is null" in the where-clause that ensures that, if the device id is not null i.e. a matching "down" event is found, the event is suppressed.

[top]


How do I detect when a sensor value holds a threshold for X seconds?

Use a pattern to detect the absence of an event indicating that the sensor value falls below the threshold.

select * from pattern [ [1] every (timer:interval(10 sec) and not Sensor(temp < 100)) ]

Using the "[1]" repeat operator we can tell the pattern engine to look for the first match. Using "every" the pattern engine restarts the interval when the temperature sensor value falls below 100. Therefore the pattern can handle a dip or multiple dips where the temperature falls below 100. The pattern fires exactly once and only when the temperature stays at 100 or more for at least 10 seconds.

[top]


How do I detect something really complex, like a triple-bottom pattern?

The triple-bottom pattern is out of the world of stock trading and is described in Triple-Bottom Pattern in detail.

The problem can be broken down: First, how does one identify bottom price points among a stream of market data events? Second, once the individual bottom price points are identified, how does one detect an occurrence of 3 bottom price points, whose value is within approximation of each other, and that are spaced out over time in a pattern?

The first problem is an event streaming processing problem, I believe. The stream of events is market data that contains price points for the NIFTY index over time. I'll attempt to define a bottom price point as follows: If the average price for the last 5 days is 15% lower then the average price over a period of say 60 days, then the minimum price during that 5 days is a single bottom price point. Of course the number of days and percentages are parameters to figure out and get right.

-- The query to determine the average price for the last 60 days:
insert into AvgPriceLast60Days
select avg(price) as avgPrice
from MarketData.win:time(60 days)
output every 10 minutes
-- The query to determine the average price for the last 5 days:
insert into AvgPriceLast5Days
select avg(price) as avgPrice, min(price) as minPrice
from MarketData.win:time(5 days)
output every 10 minutes
-- Compare the last average prices for each:
insert into BottomPriceEvent
select minPrice as bottomPrice
from AvgPriceLast60Days.std:last() as LastAvg60Days,
     AvgPriceLast5Days.std:last() as LastAvg5Days
where LastAvg60Days.avgPrice * 0.85 > LastAvg5Days.avgPrice
output first every 1 day

The last statement populates the "BottomPriceEvent" event stream as a stream of higher-level events in which each event represents a bottom price point.

The second part of the problem requires detecting 3 bottom price points whose values are within a given range of each other, and that have a certain temporal relationship with each other. Let's assume that the bottom price points should be within 5% each other. Let's also assume we are looking for bottom price points spaced at least 10 days apart from each other, but within 30 days of the prior bottom price point.

-- The pattern to detect the triple-bottom:
insert into TripeBottomPattern
select * from pattern [every a=ButtomPriceEvent
  -> timer:interval(10 days)
  -> ButtomPriceEvent(minPrice between 0.95*a.minPrice and 1.05*a.minPrice) where timer:within(30 days)
  -> timer:interval(10 days)
  -> ButtomPriceEvent(minPrice between 0.95*a.minPrice and 1.05*a.minPrice) where timer:within(30 days)]

Finally, the resulting TripeBottomPattern event stream created by the last statement is the higher-level complex events representing that a triple-bottom pattern has been detected.

An additional interesting problem is that the stream and pattern queries are rather long-running continuous queries, since they need to run over days and month. That may requires persisting events, and/or using simulated time by playing back past events into the engine.

[top]


How do I stop an insert after a period of time?

Assume we only want to receive the first 10 minutes of an incoming event stream and then stop receiving data from that stream.

The timer:within pattern guard function can serve here to stop the stream after a given amount of time, as the next statement shows:

insert into PackageNotifyEvent
select myevent.uid as uid, 'HOME' as loc, 'ARRIVED' as status
from pattern[every myevent=TrackingEvent where timer:within(10 min)]

[top]


Can I use a regular expression (regexp) within a filter?

Yes a regular expression can be used as part of a filter expression. Pretty much any expression is allowed within event filter expressions other then aggregation functions and the previous or prior function.

select * from pattern[every myevent=TrackingEvent(event.uid regexp '^a-b-.*'
  and event.lat in [40.1:40.2] and event.lon in [-74.1:-74.0])]

[top]


How can I remove duplicates?

One construct EPL exposes to remove duplicates in a stream of events is the first-unique data window. Another is the pattern "every-distinct".

The next example statement reports only the first sensor event per device:

select * from Sensor.std:firstunique(device)

The next example statement reports only the first sensor event per device and suppresses subsequent sensor events from the same device for 30 seconds:

// Retain only the intersection of unique-by-device and 30 seconds
// Does not retain 30 seconds of events, only 30-seconds of events unique by device
select * from Sensor.std:firstunique(device).win:time(30)

Other related constructs are the "prior" and "prev" functions, the "unique" data windows, the group-by and output-rate-limiting clauses as well as the "distinct" keyword within the select clause and also subqueries. We only discuss the pattern "every-distinct" here, please see the documentation for additional examples.

The next example utilizes a pattern which could be extended looking for additional events, and reports only the first sensor event per device and suppresses subsequent sensor events from the same device:

select * from pattern[every-distinct(s.device) s=Sensor]

When your distinct-value key is a timestamp or other ever-increasing or non-unique value, you should specify a time period indicating how long the key is valid and after which a duplicate can be reported again.

The next example utilizes a pattern that reports only the first sensor event per device and for 10 seconds suppresses subsequent sensor events from the same device:

select * from pattern[every-distinct(s.device, 10 sec) s=Sensor]

[top]


What if I want to form pairs of events where each pair is a unique combination of the latest event of two streams?

I'm trying to detect pairs of events correlated by a "type" value and a unique "device" value. Then based on this pair of events, I'd like to find the maximum "measurement" value and the corresponding "confidence" for the one with the max "measurement" value. Here's my event object:

class Sensor {
	long id;
	String type;
	String device;
	Double measurement;
	Double confidence;
}

I'll know in advance the set of possible device values, but the Sensor events can happen in any order, and two or more Sensor event for the same device might occur before a Sensor event for the other device occurs. Thus if a Sensor event for the same device occurs before a Sensor event for the other device, then the second Sensor event would replace the first Sensor event for that device of the same type. In other words, the last event for a particular device of a given type is the one that should be used in the calculation of the maximum.

The computation would be the maximum value of the 'measurement' property between A and B. Also, the 'confidence' value would correspond to the value from the event with the maximum 'measurement' property.

A sample input and output:

Sensor[id=1,type='Temperature',device='A',measurement=51,confidence=94.5]
Sensor[id=2,type='Temperature',device='A',measurement=57,confidence=95.5]
Sensor[id=3,type='Humidity',device='B',measurement=29,confidence=67.5]
Sensor[id=4,type='Temperature',device='B',measurement=55,confidence=88.0]
Sensor[id=5,type='Temperature',device='B',measurement=65,confidence=85.0]
Sensor[id=6,type='Temperature',device='B',measurement=49,confidence=87.0]
Sensor[id=7,type='Temperature',device='A',measurement=51,confidence=99.5]

For output, one would expect the following:

// First output event pairs events with id=2 and id=4 and chooses the values from id=2
MaxReading[type='Temperature',device='A',measurement=57,confidence=95.5]
// Second output event pairs events with id=6 and id=7, since the event with id=6
// replaces the one with id=5, the event with id=5 is never compared against the
// event with id=7
MaxReading[type='Temperature',device='A',measurement=51,confidence=99.5]

One possible solution builds pairs of events using a join:

// Create pairs of device A and B events
insert into Pair
select * from Sensor(device='A').std:lastevent() as a, Sensor(device='B').std:lastevent() as b
where a.type = b.type

From the resulting stream we remove those pairs in which either event was seen before, leaving unique pairs:

// Declaring stream type
create schema PairDuplicatesRemoved(a Sensor, b Sensor)
// Remove duplicate pairs where either sensor event is from the prior pair
insert into PairDuplicatesRemoved
select * from Pair
where a.id != coalesce((select a.id from PairDuplicatesRemoved.std:lastevent()), -1)
	and b.id != coalesce((select b.id from PairDuplicatesRemoved.std:lastevent()), -1)

The example uses the "coalesce" function to return -1 when there is no last event for PairDuplicatesRemoved, so that the first pair matches as well.

Last, select the maximum measurement value between the pair of sensor events and the corresponding confidence and device:

select a.type,
       max(a.measurement, b.measurement) as measurement,
       case when a.measurement > b.measurement then a.confidence else b.confidence end as confidence,
       case when a.measurement > b.measurement then a.device else b.device end as device
       from PairDuplicatesRemoved

[top]


How do I remove or drop events from a stream? I have a dynamic set of negative filters?

Let's assume you have a stream of events and you want to remove events from the stream before further processing of the remaining events by other statements.

The @Drop annotation marks statements that preempt further processing of an event, if an event matches multiple statements. The @Priority annotation is also useful to specify, when an event matches multiple statements, which statements to process first. Note that @Drop and @Priority require an engine-level configuration setting that is off by default, please see the documentation for further details.

Other ways of solving this use case could be to use the UnmatchedListener to catch unmatched events or to use the EPL split-stream syntax.

[top]


How do I detect a specific sequence of events and get all events that take part in this sequence?

I have events coming from different sources. They have 2 states: success and failure, and they have a source. I would like to create a query to know when there are, for example, for a specific source 5 failure events followed by a success one. Of course as soon as there's a success event, the previous failure events shouldn't count anymore.

There needs to be a maximum time to wait until a success event to arrive, since we don't want to keep looking without end for a matching success event. We'll put a maximum time to wait for a success event to arrive, let's say 5 minutes. So we'll just drop failure events after 5 minutes.

Let's look at some samples: F5 means 5th failure event, S3 means 3rd success event. Also let's say we only need 5 failure events before a success one to have an alert.

Case1 - If within 5 minutes I have (from the same source)

F1 F2 F3 F4 F5 S1
then I want to throw an alert. The alert must know about those events meaning I would like the listener to get those events.

Case2 - If within 5 minutes I have (from the same source)

F1 F2 F3 F4 F5 F6 F7 F8 S1
then I would have an alert knowing about F4 to F8 and S1.

Case3 - If within 5 minutes I have (from the same source)

F1 F2 F3 F4 S1 F5 F6 S2
then no alert would be emitted since once S1 arrives there were only 4 failure events.

Case4 - still from the same source

F1 F2 F3 F4 (then 10 minutes later) F5 S1
then of course no alert as all the events aren't within the 5 minutes window.

Case5 - If within 5 minutes (this time we have the sources a and b)

F1a F1b F2a F3a F2b F4a S1b F5a F3b S1a
No alert will be create when S1b arrive because there's only 2 failures for b. When S1a arrives an alert is created because we have F1a to F5a before, with no success concerning a in between.

Solution: Since we are looking for a very specific sequence of events, a pattern is the best way to go. We want to make sure the pattern subexpressions end when a success event arrives, and this can be accomplished via the not-operator. We also want to limit the expression to live 5 minutes from the first failure event:

every a=F -> (
        (b=F(src=a.src) and not S(src=a.src)) ->
        (c=F(src=a.src) and not S(src=a.src)) ->
        (d=F(src=a.src) and not S(src=a.src)) ->
        (d=F(src=a.src) and not S(src=a.src)) ->
        (e=S(src=a.src) and not F(src=a.src))
     )
) where timer:within(5 min)

This solution works for all cases, including case 2. Even though the pattern looks for only 5 events in a row, it looks at any 5 subsequent events for the same source, matching case 2 for events F4 to F8 and S1 (the active expressions that include F1, F2 and F3 end when F6, F7 and F8 arrive).

[top]


How to implement a sell trailing stop order?

This is an example out of the stock trading domain, in which incoming events are market prices. A sell trailing stop order is a technique that is designed to allow an investor to specify a limit on the maximum possible loss, without setting a limit on the maximum possible gain.

A sell trailing stop order sets the lower boundary (stop) price at a fixed amount below the current market price with an attached "trailing" amount. As the market price rises, the stop price rises by the trail amount, but if the stock price falls, the stop price doesn't change, and a market order is submitted when the stop price (lower boundary) is hit.

Assume that the market price is 700 at the time of placing the trailing stop order. Assume that the stop price is 600. If the price goes to 703, the stop price must be updated to 603. If the price drops to 682, the trigger is still 603.

The solution considers the maximum market price since statement start time, compared against the current market price:

// since release 2.0
select * from Quote(symbol='GOOGL')
where price <= max(select max(lastPx) as lastPx from Quote(symbol='GOOG')) - 100, 600)
		

[top]


I have one listener for multiple statements, how do I track which statement generated a result?

Your listener can implement the StatementAwareUpdateListener interface and get passed the statement and engine instance for each result.

For some use cases it can also come in handy to simply add a constant to each statement to identify the statement producing a result, for example:

select 120 as strategyId, * from Tick

[top]


Is there a way to receive the list of all events that participated in a window? I'm looking for a way to show the user the cause of the alert.

The data window aggregations offer a way to select the window, for example;

select window(*) from MyAlerts having count(*) > 10

The pull API is also a convenient way to retrieve data from a data window. The safeIterator method on EPStatement provides the events in a data window.

[top]


We have our own math library, what are the options of utilizing it? How can I make calls out of the EPL into our own existing code?

There are several options. The best choice among the options depends on what you want to accomplish, and how the existing library, function or other system exposes its functionality (static methods, service or POJO etc.).

The first option is the user-defined method. You can invoke a user-defined method in any expression directly without any configuration. You can import a class via configuration to avoid package names in EPL. For example, assuming that the "com.mycompany.MyLibrary" class provides a static method by name "computeDistance":

select com.mycompany.MyLibrary.computeDistance(x1, y1, x2, y2) from MyCoordinateEvent
// ... or after MyLibrary is imported via configuration
select MyLibrary.computeDistance(x1, y1, x2, y2) from MyCoordinateEvent

The second option is to invoke a method on your event object itself. This works only if your event representation is a Java object. An example, assuming that the "MyCoordinateEvent" event underlying class provides a method by name "computeDistance":

select myevent.computeDistance(x1, y1, x2, y2) from MyCoordinateEvent as myevent

The third option is to provide a custom aggregation function via the extension API. A custom aggregation function can take many parameters and returns only one value, i.e. cannot return multiple values, however the value returned can be any object. Please consult the documentation for examples. A sample EPL statement is as follows, assuming that the "myTrendFunction" custom aggregation function has been created and configured:

select myTrendFunction(price) from OrderEvent group by productId

The forth option is to provide a custom view via the extension API. A custom view takes parameters as well as an input stream of events and generates a result stream of events. Please consult the documentation for examples. A sample EPL statement is as follows, assuming that the "mathlib:volatility" custom view has been created and configured:

select * from OrderEvent(symbol='IBM').mathlib:volatility()

The fifth option is to use a method invocation. A method invocation is a function that acts alone or in a join and returns rows. Please consult the documentation for examples. Here is a sample EPL that utilizes a join, assuming that the "cacheLookup" function is provided by class "com.mycompany.MyLibrary":

select * from RFIDEvent, method:com.mycompany.MyLibrary.cacheLookup(assetId)

The last option is to have your listener or subscriber code invoke the library. Results can be send back into the engine by you listener via a further event, if needed.

[top]


Can I use SOAP, WS-*, RESTful, JMS, RPC or other remote calls?

The previous FAQ had outlined how to invoke an external function. Your external function may invoke internal or external resource as part of its evaluation using any of the standards.

Also, Esper provides certain input and output adapters as described in the EsperIO documentation.

You may also want to consider creating your own event representation via the extension API if your transport or event repository already has event metadata available that you want to reuse.

In the design you should keep in mind that blocking calls may reduce throughput.

[top]


What to do if my events are not JavaBeans, not well-defined, their properties are not known in advance and may differ wildly, and are nested?

Here is an actual user question:

Each data item implements an interface, but the properties available on the concrete objects differ wildly. Also, each data item can be considered to be composed of multiple levels of these data items. One of the fields on the interface is a getParent field, which returns the data item one level up. For example: If X is the object which we have a direct handle to, X.parent = Y, and Y.parent = Z, we often want to look at Z.field as part of our filter. Also, my events do not describe the property set of the event (not a JavaBean). Thus, I don't directly know that X's parent is Y, whose parent is of type Z, which then has a property named 'foo'. (Beans here are essentially user-defined structures for contributed code, and thus we have no real way of knowing what properties are on a bean until we receive the bean. When the users use the beans, they obviously know which bean they're working with. For us, we're getting it downstream, and that bean could be any of a dynamically growing variety.

Dynamic properties are weakly-typed dynamically-resolved properties that use a '?' syntax to denote the dynamic part of a property expression, for example:

select parent?.parent.foo from XEvent

The above query returns the value of "foo" property of the object provided by the "parent" property (if present) and its parent property (if present) of a XEvent. The value returned is of type Object and probably requires the use of the EPL "cast" or "instanceof" functions depending on what to do with the value.

By moving the '?' operator into a different position the query can indicate which properties in the nesting level must exist. The next query checks that the "parent" propery exists upon compilation:

select parent.parent.foo? from XEvent

Dynamic properties can be used in combination with indexed and mapped properties as well.

Another approach is to map such Java objects to Map event types: these also allow inheritance, nesting and dynamic properties and are easy to generate programmatically and its easier to change the event type at runtime based on the available metadata or the actually arriving events, through the runtime configuration API.

Another possible approach is to create a custom event representation plug-in.

The best approach generally is to use event inheritance (Java object and Map event representations) when possible, nested properties (all event representations) when possible, and strongly-typed properties to keep statements simple and easy to read.

There a number of options available in the configuration to handle Java classes that may not adhere to JavaBean conventions.

[top]


How do I query nested indexed events? Or more generally, event objects contained in event objects?

Your application may have a parent event that contains multiple subevents. You want to perform aggregation or pattern matching on the subevents plus information on the parent event.

Under the term contained-event selection the Esper engine can handle events that contain properties that are themselves events. For example when application events are coarse-grained structures and you need to perform bulk operations on the rows of the property graph in an event, with any number of nesting level.

In this example that a user has provided, a parent ResponseTime event contains multiple subevents that each measure individual operations (a database or JMS operation, for example) that are part of a larger operation, represented by a ResponseTime event. Each ResponseTime event has a one or more SubEvent object that provide a subevent type and number of milliseconds for the operation.

The example here uses Java objects. The example works the same for XML or Map-based events, we are picking a Java event object here for demonstration. See the docs for further examples.

The sample ResponseEvent event and SubEvent definitions are:

public class ResponseEvent {
  private String category;
  private SubEvent[] subEvents;

  public ResponseEvent(String category, SubEvent[] subEvents) {
    this.category = category;
    this.subEvents = subEvents;
  }

  public String getCategory() {
    return category;
  }

  public SubEvent[] getSubEvents() {
    return subEvents;
  }
}

public class SubEvent {
  private long responseTimeMillis;
  private String subEventType;

  public SubEvent(long responseTimeMillis, String subEventType) {
    this.responseTimeMillis = responseTimeMillis;
    this.subEventType = subEventType;
  }

  public long getResponseTimeMillis() {
    return responseTimeMillis;
  }

  public String getSubEventType() {
    return subEventType;
  }
}

The next sample code snip adds the parent event type to the known types, here via configuration API but configuration XML would work just as well:

epService.getEPAdministrator().getConfiguration().addEventType("ResponseEvent", ResponseEvent.class);

This is a sample query to continuously output the average response time per category and subevent-type:

select category, subEventType, avg(responseTimeMillis) as avgTime
from ResponseEvent[select category, * from subEvents].win:time(1 min)
group by category, subEventType

[top]


How to calculate a weighted average over multible time ranges?

For example: avgArrival = 0.6 * avg(t-1) + 0.3 * avg(t-2) + 0.1 * avg(t-3)

The "t-1" means a range of 1-second size, for example when the current time is 14:00:

t-1: 13:59-14:00     -> avg(Arrival_13:59-14:00) = 12
t-2: 13:58-13:59     -> avg(Arrival_13:58-13:59) = 10
t-3: 13:57-13:58     -> avg(Arrival_13:57-13:58) = 15
avgArrival = 0.6*12 + 0.3 * 10 + 0.1 * 15 = 11.7

Here is a sample solution consisting of 2 statements:

// Output the average every 1 second to another stream
insert into MyAverages select avg(...) as myavg from MyEvent.win:time(1 sec) output snapshot every 1 sec
// Compute a weighted average based on the averages provided by the MyAverages stream
select 0.6 * prior(1, myavg) + 0.3 * prior(2, myavg) + 0.1 * prior(3, myavg) from MyAverages

[top]


How to compute for 5-minute buckets? How to aggregate (e.g.vwap) n buckets of k minutes each?

In other words, my buckets look as follows: t---(bucket1)---t-n---(bucket2)---t-2n---(bucket3)---t-3n---(bucket4)---t-4n

One solution is to make a staggered set of named windows, where the remove stream of the first named windows feeds to the second, each named window a k-minute time window, and the second to the third and so on.

Here is a sample of multiple statements:

create window BucketWindow1.win:time(5 min) as select * from MyEvent
create window BucketWindow2.win:time(5 min) as select * from MyEvent
create window BucketWindow3.win:time(5 min) as select * from MyEvent

insert into BucketWindow1 select * from MyEvent
insert rstream into BucketWindow2 select rstream * from W1
insert rstream into BucketWindow3 select rstream * from W2

select sum(price*volume) from BucketWindow1
select sum(price*volume) from BucketWindow2
select sum(price*volume) from BucketWindow3

[top]


How can I execute a query only every X seconds?

Let's assume I only want to output every 20 seconds the last value, and forget the rest of the data. This would do it:

Select sum(price) from Marketdata.std:unique(ticker) group by ticker output snapshot every 20 seconds

                

If using a named window, the on-select is a good way to fire at any schedule as the pattern defines. Here the example fires the query every 20 seconds if the hour is between 9am and 9:59am:

on pattern[every timer:at(*, 9, *, *, *, */20)] select * from MyNamedWindow

                

You could also join a pattern to historical data, thereby repeatedly poll, optionally parameterize with variables (not shown below) to do incremental polling, a sample is here:

select * from pattern[every timer:interval(20)], sql:db1["select * from MySQLTable"]

Or join the pattern to a stream. If joining to a stream then mark the join direction as unidirectional, thereby having the join execute only when the pattern fires:

select userId, sum(qty) from pattern[every timer:interval(10)] unidirectional, MyOrderEvent.win:time(10 min)

Last, the next example defines a variable and sets the variable to "on" or "off" to control output of statements. Variables can also be set programmatically via runtime API (we show multiple individual statements below).

create variable boolean var_on_off
on pattern[timer:at(*, 9, *, *, *)] set var_on_off = true
on pattern[timer:at(*, 10, *, *, *)] set var_on_off = false
insert into nextStream select * from xyz(var_on_off)
..or..
select * from xyz output when var_on_off

[top]


How do I create a recursive query? That is a query that feeds to itself and defines it's own input stream as output?

When creating your recursive query, you would want to ensure the event type is defined in advance, since the runtime will not be able to check the type at time of statement creation, since the statement itself creates the type.

Therefore, a good way is to define the type:

Map<String, Object> mapType = new HashMap<String, Object>();
mapType.put("x", long.class);
mapType.put("id", String.class);
mapType.put("vol", double.class);
epService.getEPAdministrator().getConfiguration().addEventType("Volatility", mapType);

Then create the query:

insert into Volatility
select id, x+ prev(1,vol) as vol ,
from Volatility.std:groupwin(id).win:length(2)

[top]


We have an event stream for cars entering and leaving streets and want a car count per street? How do I insert or update and keep a count?

A solution is to have a named window holding the count per street and use the on-merge clause to atomically merge the indicator stream.

Define the schema to hold the car count:

create schema StreetCarCountSchema (streetid string, carcount int)

Define the schema for the arriving events indicating whether cars enter or leave a street:

create schema StreetChangeEvent (streetid string, action string);

Define a named window based on the schema to hold the count per street:

create window StreetCarCountWindow.std:unique(streetid) as StreetCarCountSchema

Merge the arriving data with a named window entry:

on StreetChangeEvent ce merge StreetCarCountWindow w where ce.streetid = w.streetid
when not matched and ce.action = 'ENTER' then insert select streetid, 1 as carcount
when matched and ce.action = 'ENTER' then update set carcount = carcount + 1
when matched and ce.action = 'LEAVE' then update set carcount = carcount - 1

Output the current count when it changes:

select * from StreetCarCountWindow

[top]


I would like to trigger whenever the event contains a new and distinct security id and the source is one of the {A,B,C}?

The firstunique datawindow outputs only the first unique event per criteria(s).

select * from Event(string in ('A','B','C')).win:firstunique(securityId)

If you like to delete from the first-unique data window, use a named window and on-delete.

[top]


Start a window for each unique id when it arrives, wait 10 seconds for that id and output the last event for that window, starting a new window when the next event arrives for that id?

Let's say we have a stream of ChangeEvent objects that are coming in at a rate of a few thousand per second. The ChangeEvent has an 'id' field that identifies the type of event. Over the course of 10 seconds, we could receive several ChangeEvent 'ticks' for the same id but we really only need to process the last one.

The logic needs to start a separate bucket for each ChangeEvent tick with a unique id and replace any future ticks for that same id. 10 seconds after the first tick for that id is received the last event with that id is delivered to the listener and then the window for that id is no longer available until another ChangeEvent with that id is received.

Say ChangeEvent is the tuple (id, quantity, price). For example the sequence and output is (input events):

ChangeEvent(1, 500, 99.75) t=0 secs
ChangeEvent(2, 200, 98.10) t=1 secs
...
ChangeEvent(1, 400, 99.81) t=8 secs
ChangeEvent(2, 190, 98.50) t=10.1 secs
...
ChangeEvent(1, 375, 99.82) t=10.1 secs
	

So my desired output to the listener is this:

ChangeEvent(1, 400, 99.81) at t=10 secs
ChangeEvent(2, 190, 98.50) at t=11 secs
ChangeEvent(1, 375, 99.82) at t=20.1 secs
	

Controlling the lifecycle of a data window, or any analysis, is a feature of context declarations.

// Declare a context that allocates a context partition when a unique id arrives
// and terminates the context partition after 10 seconds.
create context CtxPerIdAnd10Sec initiated distinct(id) ChangeEvent terminated after 10 sec
// Within the context, output the last event upon context partition termination
context CtxPerIdAnd10Sec select last(*), first(*) from ChangeEvent(id=context.a.id) output snapshot when terminated

We could also use a pattern for the context declaration (first statement), like shown here (second statement stays the same):

// Declare a context that allocates a context partition when a unique id arrives
// and terminates the context partition after 10 seconds.
// Specify @inclusive to have the ChangeEvent that triggers the pattern be included in the next statement's evaluation.
create context CtxPerIdAnd10Sec initiated pattern[every-distinct(a.id, 10 sec) a=ChangeEvent]@inclusive terminated after 10 sec

Here is another solution that applies if, for all ids, the 10-second time bucket should start and end at the same time.

create context SixtySecondBucket start @now end after 10 seconds;
context SixtySecondBucket select first(*), last(*) from ChangeEvent group by id output snapshot when terminated;

[top]


How to perform an aggregation over a "semantic" window, i.e. a window that opens upon the arrival of a given event and closes upon the arrival of another event?

The overlapping or non-overlapping contexts address this use case. In addition named windows can also be used. Both solutions are discussed here.

An example use case for this is: Give me the average CPU utilization of machine "X" while user "Y" was logged onto it. So the "semantic" window opens when the event "user_Y_logged_In" arrives and closes when another event "user_Y_logged_Out" arrives.

This first solution declares a context that initiates when a user logs in and terminates when a user logs out.

// declare context
create context WhileUserLogedInCtx as initiated by UserLoginEvent as ule terminated by UserLogoutEvent(userId=ule.userId)
// perform some analysis for that user
context WhileUserLogedInCtx select sum(cpuTime) from CPUReportEvent(userId = context.ule.userId)

This second solution uses two named windows. Named windows offer a custom expiry policy, through the use of on-merge and on-delete.

Hold CPU utilization:

create window CPUUtilizationWin.win:time(1 day) as CPUUtilization
insert into CPUUtilizationWin select * from CPUUtilization

Hold users that are logged in:

create window UserWin.std:unique(userId) as (userId string, loginTime long)
on LogInEvent li merge UserWin uw where li.userId = uw.userId when not matched then insert select li.userId as userId, current_timestamp() as loginTime
on LogOutEvent lo delete from UserWin uw where uw.userId = lo.userId

Output average utilization every 1 minute:

select userId, (select avg(cpu) from CPUUtilizationWin where timestamp between uw.loginTime and current_timestamp()) as avgUserCPU
from pattern[every timer:interval(1 min)] unidirectional, UserWin uw group by userId

[top]


I have two streams A and B that I want to join based on an "id" value that occurs exactly once per event type, and they can come any order.

For example, if the input sequence is { A(id=1), B(id=1) } the join should match. Same for { B(id=1), A(id=1) }. The order could be chaotic, for example the sequence { A(id=1), B(id=2), B(id=4), B(id=2) } should not match and { A(id=1), B(id=2), B(id=4), A(id=2) } should match on "id=2".

This solution uses match-recognize for pattern detection for two reasons. One, the "id" value could be seen as a partition and the pattern matching should take place within each individual "id" partition. Second, match-recognize allows us to specify a data window within which to match patterns. So when events leave the data window the engine can forget the partition.

Sample statement:

select * from MyEvent.win:length(10)
match_recognize (
  partition by value
  measures E1.value as value
  pattern (E1 E2 | E2 E1)
  define
    E1 as E1.string = 'A',
    E2 as E2.string = 'B'
)

Note the length-window: when a given event does not match, that event will eventually leave the window and the partition for that "id" value gets removed from memory by the engine. Also, when a match occurs in a partition and no remaining matches are active for the same partition the engine removes that partition, reducing memory use.

[top]


I have a port scan detector and would like to create a new event based on multiple events occurring over a given period of time?

Events are port scan events and have the following fields:

create schema PortScanEvent(src string, dst string, port int, marker string)

A sample event may look like this:

PortScanEvent = {src = '10.0.0.1', dst = '10.0.0.2', port = 16, marker = 'm1'}

We want detect and report a port scan situation.

A port scan situation starts when for 30 seconds there are 20 or more events unique by port for a given combination of {src, dst}. The output event should have {type='DETECTED'} and include a marker list and ports.

After the port scan situation has been detected, there are additional output events that should occur every minute, on the minute.

  1. Send the current count for that {src, dst} with {type='UPDATE'} and include a marker list
  2. When the count falls below 10, send one last count and {type='DONE'} and stop sending the current count.
  3. When the count stays over 10 for 12 hours, send one last count with {type='EXPIRED'} then stop sending a count. Only detect the situation again for that {src, dst} when the count drops below 10.

Solution:

// Define port scan event event type.
// The most efficient representation of an event is an object array.
create objectarray schema PortScanEvent(src string, dst string, port int, marker string);

// Hold the current aggregation state (counts, grouped data) per key in a central place.
create table ScanCountTable(src string primary key, dst string primary key, cnt count(*), win window(*) @type(PortScanEvent));

// Aggregate, keeping state in the central place.
// Populate a stream of count per {src, dst} pair.
into table ScanCountTable
insert into CountStream
select src, dst, count(*) as cnt, window(*) as win
from PortScanEvent.std:unique(src, dst, port).win:time(30 sec) group by src,dst;

// Define a named window to hold the situations currently detected.
create window SituationsWindow.win:keepall() (src string, dst string, detectionTime long);

// Inserted newly-detected situations into the named window.
on CountStream(cnt >= 20) as cs
merge SituationsWindow sw
where cs.src = sw.src and cs.dst = sw.dst
when not matched
  then insert select src, dst, current_timestamp as detectionTime
  then insert into OutputAlerts select 'DETECTED' as type, cs.cnt as cnt, cs.win as contributors;

// Every 1-minute output an update for all currently-detected situations.
on pattern [every timer:at(*, *, *, *, *)]
insert into OutputAlerts
select 'UPDATE' as type, ScanCountTable[src, dst].cnt as cnt, ScanCountTable[src, dst].win as contributors
from SituationsWindow sc;

// Every 1-minute remove expired or completed situations.
on pattern [every timer:at(*, *, *, *, *)]
merge SituationsWindow sw
when matched and (select cnt from ScanCountTable where src = sw.src and dst = sw.dst) < 10
  then delete
  then insert into OutputAlerts select 'DONE' as type, ScanCountTable[src, dst].cnt as cnt, null as contributors
when matched and detectionTime.after(current_timestamp, 16 hours)
  then delete
  then insert into OutputAlerts select 'EXPIRED' as type, -1L as cnt, null as contributors;

// Listen to output alerts.
@name('output') select * from OutputAlerts;

[top]

How to get notified each time a certain value has increased by a specified amount i.e. each time the value is greater than the value + x?

A table or named window could both be used to keep the current threshold. This solution uses a table. We define a reset event type to set an initial threshold.

create schema ValueEvent(value long);
create schema ResetEvent(startThreshold long);
create table CurrentMaxTable(currentThreshold long);
@name('trigger') insert into ThresholdTriggered select * from ValueEvent(value >= CurrentMaxTable.currentThreshold);
on ResetEvent merge CurrentMaxTable when matched then update set currentThreshold = startThreshold when not matched then insert select startThreshold as currentThreshold;
on ThresholdTriggered update CurrentMaxTable set currentThreshold = value + 100;
                    

The complete code with sending events and asserting the output is below.

String epl =
		"create schema ValueEvent(value long);\n" +
		"create schema ResetEvent(startThreshold long);\n" +
		"create table CurrentMaxTable(currentThreshold long);\n" +
		"@name('trigger') insert into ThresholdTriggered select * from ValueEvent(value >= CurrentMaxTable.currentThreshold);\n" +
		"on ResetEvent merge CurrentMaxTable when matched then update set currentThreshold = startThreshold when not matched then insert select startThreshold as currentThreshold;\n" +
		"on ThresholdTriggered update CurrentMaxTable set currentThreshold = value + 100;\n";
epService.getEPAdministrator().getDeploymentAdmin().parseDeploy(epl);

SupportUpdateListener listener = new SupportUpdateListener();
epService.getEPAdministrator().getStatement("trigger").addListener(listener);

epService.getEPRuntime().sendEvent(Collections.singletonMap("startThreshold", 100L), "ResetEvent");
epService.getEPRuntime().sendEvent(Collections.singletonMap("value", 30L), "ValueEvent");
epService.getEPRuntime().sendEvent(Collections.singletonMap("value", 99L), "ValueEvent");
epService.getEPRuntime().sendEvent(Collections.singletonMap("value", 100L), "ValueEvent");
EPAssertionUtil.assertProps(listener.assertOneGetNewAndReset(), "value".split(","), new Object[]{100L});

epService.getEPRuntime().sendEvent(Collections.singletonMap("value", 101L), "ValueEvent");
epService.getEPRuntime().sendEvent(Collections.singletonMap("value", 103L), "ValueEvent");
epService.getEPRuntime().sendEvent(Collections.singletonMap("value", 130L), "ValueEvent");
epService.getEPRuntime().sendEvent(Collections.singletonMap("value", 199L), "ValueEvent");
epService.getEPRuntime().sendEvent(Collections.singletonMap("value", 200L), "ValueEvent");
EPAssertionUtil.assertProps(listener.assertOneGetNewAndReset(), "value".split(","), new Object[]{200L});

epService.getEPRuntime().sendEvent(Collections.singletonMap("value", 201L), "ValueEvent");
epService.getEPRuntime().sendEvent(Collections.singletonMap("value", 260L), "ValueEvent");
epService.getEPRuntime().sendEvent(Collections.singletonMap("value", 301L), "ValueEvent");
EPAssertionUtil.assertProps(listener.assertOneGetNewAndReset(), "value".split(","), new Object[]{301L});
                    

One could instead use the following to have the first-arriving value event provide the initial current threshold:

insert into ResetEvent select value+100 as startThreshold from ValueEvent.firstevent()

[top]

Common API Questions

I want to know what streams an EPL statement references but don't want to parse the EPL string? I want to programmatically inspect and change the select clause columns when a user enters an EPL query, how to do this?

The statement object model is designed to handle this case. Your application could compile (epAdministrator.compile) each statement and get an EPStatementObjectModel object representation of the statement and then interrogate the from-clause or select-clause.

The statement object model also allows EPL query to be composed from scratch via normal Java POJO objects. A statement object model can be rendered back into an EPL string, and it is also possible to create a statement directly from a statement object model.

[top]


How do I store statements in a file? Is there a standard file storage?

Esper does not prescribe any particular way of storing EPL queries. Some applications prefer XML files and some prefer properties files. Your application may use a table in a relational database to store queries if so desired.

We present below an example of storing statements in a property file. This is not a recommendation and is merely one possible way among many of storing EPL queries in a file.

Here is the example:

#Alert12
esper.statement.alert12.0=create window Alert12Window.win:time(1 hour) as select ticker as alertId, this from LastTickEvent
esper.statement.alert12.0.ha.prefix=resilient
esper.statement.alert12.1=insert into MaxPrice:alertId:(lastPx) select max(lastPx) as lastPx from LastTickEvent(ticker=':ticker:')
esper.statement.alert12.1.ha.prefix=resilient
esper.statement.alert12.2=insert into Alert:alertId:Event select * from LastTickEvent(ticker=':ticker:') where lastPx <= (1-(:value:/100.0))*(select lastPx from MaxPrice:alertId:.std:lastevent())
esper.statement.alert12.2.ha.prefix=durable
esper.statement.alert12.3=insert into Alert12Window select ':alertId:' as alertId, quote.this as this from pattern [quote=Alert:alertId:Event]
esper.statement.alert12.3.ha.prefix=durable

This way of storing EPL assignes an alert name and a suffix that represents the statement number for all EPL statements for the alert. It uses the

:replaceme:
format as a parameter placeholder that the application itself replaces with a value before creating a statement. This example does not use prepared statements.

[top]


How can I parse and validate an EPL statement?

For just parsing an EPL statement and performing syntax validation you could use the "compile" method of the administrative interface which parses and creates an object model from an EPL query text.

Create the statement using an isolated service provider to validate the statement in respect to the currently-declared event types and variables.

[top]


Can I create a statement in "stopped" state, attach listeners or subscribers and start it?

Statements can only be created in "started" state. This removes uncertainty around what it means to declare a schema or variable or create an index in a stopped state.

Create the statement using an isolated service provider and attach listeners or subscribers as needed. Once happy with the statement move the statement to the main runtime.

[top]


Can I use an UpdateListener to listen to events and the iterator-based pull-API together?

UpdateListener and iterator can be used together. An update listener implementation can also itself query the same statement or another statement's iterator, as the engine guarantees the iterators are up-to-date before calling update listeners even across statements.

The iterator can also be used to query for no matches. You should find the iterator to have minimal overhead depending on the type of statement iterated on, the overhead for patterns statements specifically is negligible.

[top]


I want to know if an event hits at least one registered query?

Esper has an UnmatchedListener interface that one can register with the engine via epRuntime.setUnmatchedListener(UnmatchedListener). The UnmatchedListener receives any event that has not been processed by any statement, i.e. events where no statement’s stream filter matches (where-clause however counts as a match since the event enters a data window with a where clause, i.e. only stream filters count).

[top]


How to get a single callback with the list of matching subscriptions?

Question Detail: I need to have thousands of active filters in a single Esper engine instance all listening to the same stream. When an event gets posted into this stream and is matched with N of these filters, I want to get a single callback rather than N of such callbacks. I know there is a statement-aware listener which makes it possible to have one handler for all the N statements, but that is still N update calls. I need something like "EPStatement[] statements" in my UpdateListener? Is it possible?

Answer: The engine does not have an option to bundle deliveries of multiple statements to a single listener into a single invocation to that listener.

However this seems easiest solved by using a single UpdateListener or StatementAwareUpdateListener that accumulates the received events. From a threading perspective in the default configuration you may simply ask that listener to return the accumulated events in the same thread that sends the events. This works well since by default the thread that sends an event will process it end-to-end thereby you have a guarantee when the thread returns that all is delivered to the listener.

[top]


When to use a plug-in aggregation function and when to use a plug-in custom view?

If you are not sure how to choose between custom plug-in aggregation functions and custom plug-in views, this entry explains the differences or advantages of one over the other in more detail.

A plug-in custom aggregation function works like other aggregation functions such as count, sum, average or standard deviation and may appear in the select-clause and in the having-clause in EPL.

A plug-in custom view can be a data window providing an expiry policy like a time window or length window, for example. Or instead a custom view can derive new information from a stream of events such as the results of a linear regression function (aka. derived-value view).

A plug-in view is always attached to a certain type of event that is provided by a filtered event stream or a pattern or by another view. Plug-in views can receive only one type of input event (input stream). If the view is a data window view, the output event type is always the same event type as the input event type. For derived-value views the output event type can be an entirely different type of events with new and often computed property values and types, including events that are from a different event representation such as for example XML-DOM.

If your application wants to provide a data window then use a plug-in view. If it needs to provide multiple computed values for each row of output, such as the slope-value and y-intercept value for a linear regression function for example, use a plug-in view.

The input values to a plug-in aggregation function are the result of one or more expressions (as compared to views which have events as input), and the output value of a plug-in aggregation function is always a single value, usually a primitive value such as a double-typed value but can be any object. If your application only needs to return a single value, more likely an aggregation function is appropriate.

The group-by clause acts only on aggregation functions (and not view output), providing an aggregation value per-group. Also output-rate-limiting can provide the last value per such group when the output condition occurs.

Views can also compute one or more output values per group by means of the "std:groupwin()" view. These view output events are not grouped by the group-by or output-rate clauses, if present.

A view's output event properties become available in the select-clause, where-clause, group-by-clause, having-clause and order-by clause, while aggregation functions output values are only available in the select-clause and having-clause.

[top]


When to use on-demand fire-and-forget queries versus on-select predefined queries?

Sometimes user requirements are such that a query against data maintained by the engine must be fired. Sometimes such intra-day queries are well-defined and known upfront, sometimes not.

Via named windows Esper allows predefined queries based on the on-select clause.

Via named windows Esper also allows fire-and-forget queries that leave no trace. Fire-and-forget queries can also be compiled for repeated execution.

Here is a sample code snippet to prepare and call a fire-and-forget query:

String stmtText = "select * from SensorWindow where temperature = 80";
    EPOnDemandPreparedQuery onDemandQuery = epService.getEPRuntime().prepareQuery(stmtText);
    EPOnDemandQueryResult result = onDemandQuery.execute();
    System.out.println(result.getArray()[0].get("sensor"));

A on-demand fire-and-forget query has the penalty of compiling the query and executing the query against an un-indexed data set, making the query slower to execute compared to pre-defined queries. The advantage is that it allows any type of query.

Compare this to a predefined query based on the on-select clause. The next code snippet creates and executes a pre-defined query:

String stmtText = "on SensorQueryEvent select sensor from SensorWindow where temperature = querytemp";
    EPStatement onSelectStmt = epService.getEPAdministrator().createEPL(stmtText);
    onSelectStmt.setSubscriber(this);	// make sure you have an update(String sensor) method for the class

    // Execute query, results are delivered via call to the update method.
    // The SensorQueryEvent is expected to have a "querytemp" property as used in the on-select.
    epService.getEPRuntime().sendEvent(new SensorQueryEvent(80));

A predefined query allows the Esper engine to inspect the query conditions and thus maintain a proper index on named window contents to evaluate each query in a very efficient fashion. Thereby a predefined query can exhibt much better performance then a fire-and-forget query. See also the named window query benchmark for performance tests of both approaches.

[top]


How do I integrate with the Spring framework? How to use Spring support for Groovy or other scripting languages with EPL?

The Spring Framework (or Spring for short) is an open source application framework. This FAQ entry describes how a Spring XML file can hold EPL statements and inject listeners. It also shows how the Groovy dynamic scripting language can provide inlined scripts that acts as listeners to EPL continuous-query statements.

This solution requires Spring and Java 6.

A sample XML file for use with Spring follows. The XML relies on two classes in your classpath: EsperBean and StatementBean. These classes are NOT part of the Esper distribution. They are instead listed below as examples.

<beans>
        <bean id="esperBean" class="EsperBean">
    	<property name="statements">
    	    <bean class="StatementBean">
    		<constructor-arg value="select * from java.lang.String"/>
    		<property name="listeners">
    		    <list>
    			<bean class="MyUpdateListener"/>
    			<ref bean="groovyListener"/>
    		    </list>
    		</property>
    	    </bean>
    	</property>
        </bean>

        <!--sample groovy listener-->
        <lang:groovy id="groovyListener">
    	<lang:inline-script>
    	    package org.springframework.scripting.groovy;
    	    import com.espertech.esper.client.UpdateListener
    	    import com.espertech.esper.client.EventBean;

    	    class GroovyMessenger implements UpdateListener {
    		public void update(EventBean[] eventBeans, EventBean[] eventBeans1) {
    		    System.out.println(Arrays.toString(eventBeans) + "from groovy");
    		}
    	    }
    	</lang:inline-script>
        </lang:groovy>

    </beans>

The EsperBean class below represents a thin wrapper for an EPServiceProvider:

public class EsperBean implements BeanNameAware, InitializingBean, DisposableBean {
        private EPServiceProvider epServiceProvider;
        private EPRuntime epRuntime;
        private String name;
        private Set<StatementBean> statementBeans = new LinkedHashSet<StatementBean>();

        public void setStatements(StatementBean... statementBeans) {
    	for (StatementBean statementBean : statementBeans) {
    	    addStatement(statementBean);
    	}
        }

        public void addStatement(StatementBean statementBean) {
    	statementBeans.add(statementBean);
        }

        public void sendEvent(Object event) {
    	epRuntime.sendEvent(event);
        }

        public void setBeanName(String name) {
    	this.name = name;
        }

        public void afterPropertiesSet() throws Exception {
    	epServiceProvider = EPServiceProviderManager.getProvider(name);
    	epRuntime = epServiceProvider.getEPRuntime();
    	for (StatementBean statementBean : statementBeans) {
    	    EPStatement epStatement = epServiceProvider.getEPAdministrator().createEPL(statementBean.getEPL());
    	    statementBean.setEPStatement(epStatement);
    	}
        }

        public void destroy() throws Exception {
    	epServiceProvider.destroy();
        }
    }

The StatementBean class is a thin wrapper for an EPStatement, and is also required for the example:

public class StatementBean {
        private String epl;
        private EPStatement epStatement;
        private Set<UpdateListener> listeners = new LinkedHashSet<UpdateListener>();

        public StatementBean(String epl) {
            this.epl = epl;
        }

        public String getEPL(){
            return epl;
        }

        public void setListeners(UpdateListener... listeners) {
            for (UpdateListener listener : listeners) {
                addListener(listener);
            }
        }
        public void addListener(UpdateListener listener) {
            listeners.add(listener);
            if (epStatement != null) {
                epStatement.addListener(listener);
            }
        }

        void setEPStatement(EPStatement epStatement) {
            this.epStatement = epStatement;
            for (UpdateListener listener : listeners) {
                epStatement.addListener(listener);
            }
        }
    }

Finally, next is a sample code snippet for loading the XML file in Spring, which will automatically hook up the statements and listeners as defined in the XML:

ClassPathXmlApplicationContext appContext = new ClassPathXmlApplicationContext(new String[]{"esperspring.xml"});
    EsperBean esperBean = (EsperBean) appContext.getBean("esperBean", EsperBean.class);
    esperBean.sendEvent("Test Event");
    // ...when done, destroy the context...
    appContext.destroy();

[top]


How to change statements at runtime?

The runtime configuration API, available from epAdministrator.getConfiguration(), allows to add, set and remove variables and add, remove and change event types and has other useful runtime functions.

Variables are the preferred way to introduce dynamic thresholds, change filters on the fly or generally parameterize a statement. Variables can be scalar, object(any) type and can be classes with properties as well as event-typed to hold events.

Consider using subqueries and named windows as a second and alternative approach. Named windows are similar to a relational database table, automatically indexed based on the standing queries against them or explicitly indexed for use in on-demand fire-and-forget queries.

Alternatively you may use any of the extension points such as user-defined function, custom aggregation functions (these are often useful in subqueries returning multiple rows) or custom views.

[top]


Can Esper support a distributed cache or data grid?

Distributed cache integration is available as part of Esper Enterprise Edition, a commercial product by EsperTech Inc (http://www.espertech.com).

[top]