www.espertech.comDocumentation
The big O notation is used to classify algorithms according to how their running time grow as the input size grows. This chapter discusses big O complexity of algorithms implemented by the EPL runtime.
For hash lookups the O-notation value is approximate. For logarithmic running-time function we use O(log N) but mean O(logt N) with an unspecified value for t.
Big-O provided here is general guidance. This section may not be entirely complete and may not list all exceptions.
The runtime determines, for each event, which EPL statements must process the event. For EPL statements that are context partitioned with multiple partitions, such as for overlapping, keyed, hash or category contexts (see Chapter 4, Context and Context Partitions), the runtime determines, for each event, which partitions of each statement must process the event.
This operation takes place when:
When an application calls the sendEvent
method of EPEventService
.
For example, the application invokes eventService.sendEventBean(new StockTickEvent(...), "StockTick")
and the runtime determines which EPL statement and partition must process the StockTick
event.
When the runtime evaluates a statement that uses insert into
.
For example, the application creates a statement insert into StockTickOverPrice100 select * as price from StockTick(price>100)
.
After processing a StockTick
event with a price greater 100, the runtime allocates a new StockTickOverPrice100
event and determines which EPL statement and partition must process the new StockTickOverPrice100
event.
The parameter to the operation is the individual event e.g. the StockTick
or the StockTickOverPrice100
event.
The data structure is the filter indexes, a nestable tree of indexes organized by event type, see Section 2.18.2, “Filter Indexes”.
The algorithm takes the individual event and walks the filter indexes to determine which statements and context partitions must process the event.
The input is the presence and nature of filter expressions of EPL statements and the number of partitions, including where-clauses (see Section 5.5, “Specifying Search Conditions: The Where Clause” for where-clause rewrites).
The big-o complexity depends on the input. It always includes a hash lookup by event type that is constant time O(1).
Assume the input is N statements as follows.
select * from Event(property=value)
The complexity is constant time O(1) as the same property appears in all filter expressions and with the equals-operator (=
) and therefore hash lookup.
As for example in this EPL:
select * from StockTick(symbol='A'); select * from StockTick(symbol='B');
The runtime obtains the symbol value of the stock tick event once and performs a single hash lookup.
Assume the input is N statements as follows wherein each property name is a different event property name:
select * from Event(property_n=value)
The complexity is linear time O(N) as a different property appears in all filter expressions.
As for example in this EPL:
select * from StockTick(symbol='A'); select * from StockTick(feed='001');
Assume the input is N statements as follows.
select * from Event(property_1=constant and property_2>value)
The complexity is O(log N). The same property names appears in all filter expressions. The relational greater-than operator (>
) is a btree lookup.
As for example in this EPL:
select * from StockTick(symbol='A', price>100); select * from StockTick(symbol='A', price>200);
The runtime obtains the symbol value of the stock tick event once and performs a single hash lookup. It obtains the price value of the stock tick event once and performs a single btree lookup.
The runtime determines, when time advances, which EPL statements must process the new runtime time. For EPL statements that are context partitioned with multiple partitions, such as for overlapping, keyed, hash or category contexts (see Chapter 4, Context and Context Partitions), the runtime determines which partitions of each statement must process the new runtime time.
This operation takes place when:
When an application calls the advanceTime
or advanceTimeSpan
method of EPEventService
(when using external timer).
For example, the application invokes eventService.advanceTime(DateTime.parse("2002-05-30T09:01:02.003"))
and the runtime determines which EPL statement and partition must process the new runtime time.
When the runtime uses the internal timer (aka. system time) and the current system time becomes current runtime time.
The parameter to the operation is the new runtime time.
The data structure is the schedule maintained internally by the runtime which is a data structure sorted by time.
The algorithm takes the new runtime time and performs a lookup.
The input is the presence and nature of time-related expressions of EPL statements and the number of partitions.
The big-o complexity is O(log N).
The runtime performs a join, a subquery, an on-select, an on-merge, an on-update, an on-delete or a fire-and-forget query. The runtime determines the subset of events (or rows of a table) by performing an index lookup. It performs additional actions on the subset.
The query planner is responsible for determining the indexes to use. The query planner uses the where
-clause (if any) and the on
-clause (if any) to plan index use.
Use query plan logging to obtain information about the query plans.
This operation takes place when:
It is a join and the runtime performs a lookup into a stream's events (or named window or table) to resolve the subset of events for that stream (or named window or table) to determine final join results.
It is a subquery and the runtime performs a lookup into subqueries's events (or aggregation rows) to resolve the subset of subquery result events and to process these.
It is an on-action statement such as on-select, on-merge, on-update and on-delete and the runtime performs a lookup into a named window events or table rows to resolve the subset and process these.
It is a fire-and-forget select, update or delete query.
The parameters to the operation are the events of the from-clause and on-trigger. For fire-and-forget queries the parameters originate from the filter expressions and where
-clause.
The data structure is the event index, see Section 2.18.3, “Event Indexes”.
The algorithm takes event data and performs an index lookup according to the chosen index organization, to determine and process the subset of events.
The input is the indexed events (or table rows).
The big-o complexity depends on the type of lookup operation.
For lookups that are hash only the complexity is constant time O(1).
For lookups that are btree or that combine hash and btree the complexity is O(log N).
Otherwise the complexity is O(N). Without indexes a scan has to inspect every event which means it will scale with the number of events.
This example is a subquery. When an RFIDEvent
arrives the runtime finds Zone
events for the same zone id:
select * from RFIDEvent as rfid where exists (select * from Zone#unique(zoneId) as zone where rfid.zoneId = zone.zoneId)
The where
-clause uses equals ('=
') and the query planner plans a hash index lookup. The operation is constant time O(1).
This example declares a TickWindow
named window holding StockTick
events. It uses on-select to select all stock ticks with a price greater the price provided by PriceQuery
:
create window TickWindow#time(10) from StockTick; on PriceQuery as priceQuery select * from TickWindow as ticks where ticks.price > priceQuery.price;
The where
-clause uses the relational greater operator ('>
') and the query planner plans a btree index lookup. The operation is O(log N) with N related to the number of TickWindow
events.
This example is an unidirectional join. When an RFIDEvent
arrives it outputs a row for each Zone
event unique by zone id:
select * from RFIDEvent unidirectional, Zone#unique(zoneId)
There is no where
-clause and the query planner cannot use an index. The operation is linear time O(N) with N related to the number of Zone
events.
The runtime evaluates enumeration methods by applying an operation to each element in a collection.
This operation takes place for each enumeration method.
The parameters to the operation are the parameters passed to the enumeration method.
The algorithm iterates the collection and performs the operation.
The input are the collection of events or scalar values.
The big-o complexity depends on the enumeration method.
For take
and takeLast
the complexity is constant time O(1) for a small number of elements, when the size parameter is small, and linear time O(N) in the worst case, when the size parameter is large.
For countOf
the complexity is constant time O(1) and can be linear time O(N) for data structures that don't provide a size.
For all other enumeration methods the complexity is linear time O(N).
The runtime evaluates aggregation methods by applying an operation to aggregation state.
This operation takes place for each aggregation method.
The parameters to the operation are the parameters passed to the aggregation method.
The algorithm evaluates parameters and queries aggregation state.
The input is the number of aggregated events held by the aggregation state.
The big-o complexity for sorted aggregations is:
For eventsBetween
the complexity is O(log N) (for small ranges) and can be linear time O(N) in the worst case (for large ranges) .
For all other aggregation methods on sorted aggregations the complexity is O(log N).
The big-o complexity for aggregation methods on window aggregations is constant O(1).
The compiler and runtime run on a JVM and you need to be familiar with JVM tuning. Key parameters to consider include minimum and maximum heap memory and nursery heap sizes. Statements with time-based or length-based data windows can consume large amounts of memory as their size or length can be large.
For time-based data windows, one needs to be aware that the memory consumed depends on the actual event stream input throughput. Event pattern instances also consume memory, especially when using the "every" keyword in patterns to repeat pattern sub-expressions - which again will depend on the actual event stream input throughput.
Your application receives output events from statements through the UpdateListener
interface or via the strongly-typed subscriber POJO object. Such output events are delivered by the application or timer thread(s) that sends an input event into the runtime instance.
The processing of output events that your listener or subscriber performs temporarily blocks the thread until the processing completes, and may thus reduce throughput. It can therefore be beneficial for your application to process output events asynchronously and not block the runtime while an output event is being processed by your listener, especially if your listener code performs blocking IO operations.
For example, your application may want to send output events to a JMS destination or write output event data to a relational database. For optimal throughput, consider performing such blocking operations in a separate thread.
Additionally, when reading input events from a store or network in a performance test, you may find that the runtime processes events faster then you are able to feed events into the runtime. In such case you may want to consider an in-memory driver for use in performance testing. Also consider decoupling your read operation from the event processing operation (sendEvent method) by having multiple readers or by pre-fetching your data from the store.
We recommend using multiple threads to send events into the runtime. There is a test class below. Our test class does not use a blocking queue and thread pool so as to avoid a point of contention.
A sample code for testing performance with multiple threads is provided:
public class SampleClassThreading { public static void main(String[] args) throws InterruptedException { int numEvents = 1000000; int numThreads = 3; Configuration config = new Configuration(); config.getRuntime().getThreading().setListenerDispatchPreserveOrder(false); config.getRuntime().getThreading().setInternalTimerEnabled(false); // remove thread that handles time advancing config.getCommon().addEventType(MyEvent.class); String epl = "create context MyContext coalesce by consistent_hash_crc32(id) " + "from MyEvent granularity 64 preallocate;\n" + "@name('result') context MyContext select count(*) from MyEvent group by id;\n"; EPCompiled compiled; try { compiled = EPCompilerProvider.getCompiler().compile(epl, new CompilerArguments(config); } catch (EPCompileException ex) { throw new RuntimeException(ex.getMessage(), ex); } EPRuntime runtime = EPRuntimeProvider.getDefaultRuntime(config); EPDeployment deployment; try { deployment = runtime.getDeploymentService().deploy(compiled); } catch (EPDeployException ex) { throw new RuntimeException(ex.getMessage(), ex); } EPStatement stmt = runtime.getDeploymentService().getStatement(deployment.getDeploymentId(), "result"); stmt.setSubscriber(new MySubscriber()); Thread[] threads = new Thread[numThreads]; CountDownLatch latch = new CountDownLatch(numThreads); int eventsPerThreads = numEvents / numThreads; for (int i = 0; i < numThreads; i++) { threads[i] = new Thread( new MyRunnable(latch, eventsPerThreads, runtime.getEventService())); } long startTime = System.currentTimeMillis(); for (int i = 0; i < numThreads; i++) { threads[i].start(); } latch.await(10, TimeUnit.MINUTES); if (latch.getCount() > 0) { throw new RuntimeException("Failed to complete in 10 minute"); } long delta = System.currentTimeMillis() - startTime; System.out.println("Took " + delta + " millis"); } public static class MySubscriber { public void update(Object[] args) { } } public static class MyRunnable implements Runnable { private final CountDownLatch latch; private final int numEvents; private final EPEventService eventService; public MyRunnable(CountDownLatch latch, int numEvents, EPEventService eventService) { this.latch = latch; this.numEvents = numEvents; this.eventService = eventService; } public void run() { Random r = new Random(); for (int i = 0; i < numEvents; i++) { eventService.sendEventBean(new MyEvent(r.nextInt(512)), "MyEvent"); } latch.countDown(); } } public static class MyEvent { private final int id; public MyEvent(int id) { this.id = id; } public int getId() { return id; } } }
We recommend using Java threads as above, or a blocking queue and thread pool with sendEventType
or alternatively we recommend configuring inbound threading if your application does not already employ threading.
The runtime provides the configuration option to use runtime-level queues and threadpools for inbound, outbound and internal executions. See Section 16.8.1, “Advanced Threading” for more information.
We recommend the outbound threading if your listeners are blocking. For outbound threading also see the section below on tuning and disabling listener delivery guarantees.
If enabling advanced threading options keep in mind that the runtime will maintain a queue and thread pool. There is additional overhead associated with entering work units into the queue, maintaining the queue and the hand-off between threads. The Java blocking queues are not necessarily fast on all JVM. It is not necessarily true that your application will perform better with any of the advanced threading options.
We found scalability better on Linux systems and running Java with -server
and pinning threads to exclusive CPUs and after making sure CPUs are available on your system.
We recommend looking at LMAX Disruptor, an inter-thread messaging library, for setting up processing stages. Disruptor, however, is reportedly less suitable for setting up a worker pool.
The sample code below may help you get started setting up a thread pool of workers with back pressure and consideration for IO threads and clean shutdown.
The sample code starts by setting up a thread factory:
private static class RuntimeThreadFactory implements ThreadFactory { private AtomicInteger id = new AtomicInteger(0); public Thread newThread(Runnable r) { Thread t = new Thread(r, "Event Runtime Thread #" + id.incrementAndGet()); t.setDaemon(true); t.setPriority(Thread.NORM_PRIORITY); return t; } }
The sample uses a fixed-size array blocking queue. To handle the situation where the queue is full and accepts no more messages, it uses a rejection handler that counts the number of rejections and retries:
private class RuntimeRejectionHandler implements RejectedExecutionHandler { private volatile long spinCount = 0; public long getSpinCount() { return spinCount; } public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { ++spinCount; try { boolean isAccepted = false; while (!isAccepted) { isAccepted = executorQueue.offer(r, 120, TimeUnit.MICROSECONDS); } } catch (InterruptedException e) { log.warn("could not queue work entry"); } } }
The Runnable that submits an event for processing could look like this:
class Holder implements Runnable { public void run() { // do any stuff needed to "prepare" event which doesn't involve IO runtime.getEventService().sendEventBean(lm, "LMEventType"); } }
Initialize the queue and worker pool as follows:
private final static int CAPACITY = 10000; private final static int THREAD_COUNT = 4; private static EPRuntime runtime; private ThreadFactory threadFactory = new RuntimeThreadFactory(); private RuntimeRejectionHandler rejectionHandler = new RuntimeRejectionHandler(); private BlockingQueue<Runnable> executorQueue; private ThreadPoolExecutor executor; public void start() { executorQueue = new ArrayBlockingQueue<Runnable>(CAPACITY); executor = new ThreadPoolExecutor(THREAD_COUNT, THREAD_COUNT, 0, TimeUnit.SECONDS, executorQueue, threadFactory, rejectionHandler); executor.allowCoreThreadTimeOut(false); while (executor.getPoolSize() < executor.getCorePoolSize()) { executor.prestartCoreThread(); } }
To shut down cleanly, and before destroying the runtime, the sample code is:
executor.shutdown(); while (!executor.isTerminated()) { Thread.sleep(100); }
The next sample code goes into the IO or input thread(s) such as NIO mapped file, file channel, socket channel, or zmq / nanomsg etc., and submits a work unit to the queue:
while (programAlive) { // deserialize event to POJO, Map, Array, etc., // pass along an event type name when needed executor.execute(new Holder(myeventobject)); }
You could periodically dump the spinCount
variable to get an idea of queue depth.
You can tune the size of the Executor's pool, and the size of the TimeUnit's of sleep used inside the rejectedExecution method, until you get 1) stable performance at highest level (determined by optimal number of threads in pool, 2) avoid wasting CPU in IO thread(s) (determined by optimal sleeping time between each attempt to re-queue rejected events to the thread pool).
By selecting the underlying event in the select-clause you can reduce load on the runtime, since the runtime does not need to generate a new output event for each input event.
For example, the following statement returns the underlying event to update listeners:
// Better performance select * from RFIDEvent
In comparison, the next statement selects individual properties. This statement requires the runtime to generate an output event that contains exactly the required properties:
// Less good performance select assetId, zone, xlocation, ylocation from RFIDEvent
The runtime stream-level filtering is very well optimized, while filtering via the where-clause post any data windows is not optimized.
The same is true for named windows. If your application is only interested in a subset of named window data and such filters are not correlated to arriving events, place the filters into parenthesis after the named window name.
Consider the example below, which performs stream-level filtering:
// Better performance : stream-level filtering select * from MarketData(ticker = 'GOOG')
The example below is the equivalent (same semantics) statement and performs post-data-window filtering without a data window. The compiler does not optimize statements that filter in the where-clause for the reason that data windows are generally present.
// Less good performance : post-data-window filtering select * from Market where ticker = 'GOOG'
Thus this optimization technique applies to statements without any data window.
When a data window is used, the semantics change. Let's look at an example to better understand the difference: In the next statement only GOOG market events enter the length window:
select avg(price) from MarketData(ticker = 'GOOG')#length(100)
The above statement computes the average price of GOOG market data events for the last 100 GOOG market data events.
Compare the filter position to a filter in the where clause. The following statement is NOT equivalent as all events enter the data window (not just GOOG events):
select avg(price) from Market#length(100) where ticker = 'GOOG'
The statement above computes the average price of all market data events for the last 100 market data events, and outputs results only for GOOG.
The next two example statements put the account number filter criteria directly into parenthesis following the named window name:
// Better performance : stream-level filtering select * from WithdrawalNamedWindow(accountNumber = '123')
// Better performance : example with subquery select *, (select * from LoginSucceededWindow(accountNumber = '123')) from WithdrawalNamedWindow(accountNumber = '123')
If you have a number of statements performing a given computation on incoming events, consider moving the computation from the where-clause to a plug-in user-defined function that is listed as part of stream-level filter criteria. The compiler optimizes evaluation of user-defined functions in filters such that an incoming event can undergo the computation just once even in the presence of N statements.
// Prefer stream-level filtering with a user-defined function select * from MarketData(vstCompare(*))
// Less preferable when there are N similar statements: // Move the computation in the where-clause to the "vstCompare" function. select * from MarketData where (VST * RT) – (VST / RT) > 1
The compiler and runtime do not yet pre-evaluate arithmetic expressions that produce constant results, however since the compiler generates byte code the JVM byte code optimization takes place and may pre-evaluate certain expressions.
Therefore, a filter expression as below is optimized:
// Better performance : no arithmetic select * from MarketData(price>40)
While the compiler cannot currently optimize this expression:
// Less good performance : with arithmetic select * from MarketData(price+10>50)
If your statement uses order by
to order output events, consider removing order by
unless your application does indeed require the events it receives to be ordered.
If your statement specifies group by
but does not use aggregation functions, consider removing group by
.
If your statement specifies group by
but the filter criteria only allows one group, consider removing group by
:
// Prefer: select * from MarketData(symbol = 'GE') having sum(price) > 1000 // Don't use this since the filter specifies a single symbol: select * from MarketData(symbol = 'GE') group by symbol having sum(price) > 1000
If your statement specifies the grouped data window #groupwin
but the window being grouped retains the same set of events regardless of grouping,
remove #groupwin
, for example:
// Prefer: create window MarketDataWindow#keepall as MarketDataEventType // Don't use this, since keeping all events // or keeping all events per symbol is the same thing: create window MarketDataWindow#groupwin(symbol)#keepall as MarketDataEventType // Don't use this, since keeping the last 1-minute of events // or keeping 1-minute of events per symbol is the same thing: create window MarketDataWindow#groupwin(symbol)#time(1 min) as MarketDataEventType
It is not necessary to specify a data window for each stream.
// Prefer: select * from MarketDataWindow // Don't have a data window if just listening to events, prefer the above select * from MarketDataWindow#lastevent
If your statement specifies unique data window but the filter criteria only allows one unique criteria, consider removing the unique data window:
// Prefer: select * from MarketDataWindow(symbol = 'GE')#lastevent // Don't have a unique-key data window if your filter specifies a single value select * from MarketDataWindow(symbol = 'GE')#unique(symbol)
In patterns, the every
keyword in conjunction with followed by (->
) starts a new sub-expression per match.
For example, the following pattern starts a sub-expression looking for a B event for every A event that arrives.
every A -> B
Determine under what conditions a subexpression should end so the runtime can stop looking for a B event. Here are a few generic examples:
every A -> (B and not C) every A -> B where timer:within(1 sec)
The EventPropertyGetter interface is useful for obtaining an event property value without property name table lookup given an EventBean instance that is of the same event type that the property getter was obtained from.
When compiling a statement, the EPStatement instance lets us know the EventType via the getEventType() method. From the EventType you can obtain EventPropertyGetter instances for named event properties.
To demonstrate, consider the following simple statement:
select symbol, avg(price) from Market group by symbol
After compiling and deploying the module, obtain the EventType and pass the type to the listener:
EPStatement stmt = runtime.getDeploymentService().getStatement(deploymentId, statementName); MyGetterUpdateListener listener = new MyGetterUpdateListener(stmt.getEventType());
The listener can use the type to obtain fast getters for property values of events for the same type:
public class MyGetterUpdateListener implements StatementAwareUpdateListener { private final EventPropertyGetter symbolGetter; private final EventPropertyGetter avgPriceGetter; public MyGetterUpdateListener(EventType eventType) { symbolGetter = eventType.getGetter("symbol"); avgPriceGetter = eventType.getGetter("avg(price)"); }
Last, the update method can invoke the getters to obtain event property values:
public void update(EventBean[] eventBeans, EventBean[] oldBeans, EPStatement epStatement, EPRuntime runtime) { String symbol = (String) symbolGetter.get(eventBeans[0]); long volume = (Long) volumeGetter.get(eventBeans[0]); // some more logic here }
When an application requires the value of most or all event properties, it can often be best to simply select the underlying event via wildcard and cast the received events.
Let's look at the sample statement:
select * from MarketData(symbol regexp 'E[a-z]')
An update listener to the statement may want to cast the received events to the expected underlying event class:
public void update(EventBean[] eventBeans, EventBean[] eventBeans) { MarketData md = (MarketData) eventBeans[0].getUnderlying(); // some more logic here }
Even if you don't have a log4j configuration file in place, the runtime will make sure to minimize execution path logging overhead. For prior versions, and to reduce logging overhead overall, we recommend the "WARN" log level or the "INFO" log level.
Please see the log4j configuration file in "etc/infoonly_log4j.xml" for example log4j settings.
EPL provides the @Audit
annotation for statements. For performance testing and production deployment, we recommend removing @Audit
.
If your application is not a multithreaded application, or your application is not sensitive to the order of delivery of result events to your application listeners, then consider disabling the delivery order guarantees the runtime makes towards ordered delivery of results to listeners:
Configuration config = new Configuration(); config.getRuntime().getThreading().setListenerDispatchPreserveOrder(false);
If your application is not a multithreaded application, or your application uses the insert into
clause to make results of one statement available for further consuming statements but does not require ordered delivery of results from producing statements to consuming statements, you may disable delivery order guarantees between statements:
Configuration config = new Configuration(); config.getRuntime().getThreading().setInsertIntoDispatchPreserveOrder(false);
If your application declares only stateless statements then the settings described herein are not relevant.
Additional configuration options are available and described in the configuration section that specify timeout values and spin or thread context switching.
the runtime logging will log the following informational message when guaranteed delivery order to listeners is enabled and spin lock times exceed the default or configured timeout : Spin wait timeout exceeded in listener dispatch
.
The respective message for delivery from insert into
statements to consuming statements is Spin wait timeout exceeded in insert-into dispatch
.
If your application sees messages that spin lock times are exceeded, your application has several options: First, disabling preserve order is an option. Second, ensure your listener does not perform (long-running) blocking operations before returning, for example by performing output event processing in a separate thread. Third, change the timeout value to a larger number to block longer without logging the message.
The subscriber object is a technique to receive result data that has performance advantages over the UpdateListener
interface. Please refer to Section 16.5.2, “Setting a Subscriber Object”.
Data flows offer a high-performance means to execute EPL select statements and use other built-in data flow operators. The data flow Emitter
operator allows sending underlying event objects directly into a data flow. Thereby the runtime does not need to wrap each underlying event into a EventBean
instance and the runtime does not need to match events to statements. Instead, the underling event directly applies to only that data flow instance that your application submits the event to, and no other statements or data flows see the same event.
Data flows are described in Chapter 21, EPL Reference: Data Flow.
A context partition is associated with certain context partition state that consists of current aggregation values, partial pattern matches, data windows depending on whether your statement uses such constructs. When an runtime receives events it updates context partition state under locking such that context partition state remains consistent under concurrent multi-threaded access.
For high-volume streams, the locking required to protected context partition state may slow down or introduce blocking for very high arrival rates of events that apply to the very same context partition and its state.
Your first choice should be to utilize a context that allows for multiple context partitions, such as the hash segmented context. The hash segmented context usually performs better compared to the keyed segmented context since in the keyed segmented context the runtime must check whether a partition exists or must be created for a given key.
Your second choice is to split the statement into multiple statements that each perform part of the intended function or that each look for a certain subset of the high-arrival-rate stream. There is very little cost in terms of memory or CPU resources per statement, the runtime can handle larger number of statements usually as efficiently as single statements.
For example, consider the following statement:
// less effective in a highly threaded environment select venue, ccyPair, side, sum(qty) from CumulativePrice where side='O' group by venue, ccyPair, side
The runtime protects state of each context partition by a separate lock for each context partition, as discussed in the API section. In highly threaded applications threads may block on a specific context partition. You would therefore want to use multiple context partitions.
Consider creating either a hash segmented context or a keyed segmented context. In the hash segmented context incoming data is simply assigned to one of the buckets using a small computation. In the keyed segmented context the runtime must check keys to see if a partition already exists or whether a new partition must be allocated. We'll discuss both below. For both types of context, since locking is on the level of context partition, the locks taken by the runtime are very fine grained allowing for highly concurrent processing.
This sample EPL declares a hash segmented context. In a hash segmented context the runtime can pre-allocate context partitions and therefore does not need to check whether a partition exists already. In a hash segmented context the runtime simply assigns events to context partitions based on result of a hash function and modulo operation.
create context MyContext coalesce by consistent_hash_crc32(venue) from CumulativePrice(side='O') granularity 16 preallocate
This sample EPL declares a keyed segmented context. The keyed segmented context instructs the runtime to employ a context partition per venue, ccyPair, side
key combination. The runtime must check for each event whether a partition exists for that combination of venue
, ccyPair
and side
:
create context MyContext partition by venue, ccyPair, side from CumulativePrice(side='O')
After declaring the context using create context
, make sure all your statements, including those statements that create named windows and tables, specify that context. This is done by prefixing each statement with context
context_name ....
.
The new statement that refers to the context as created above is below. Note the context MyContext
which tells the runtime that this statement executes context partitioned. This must be provided otherwise the statement does not execute context partitioned.
context MyContext select venue, ccyPair, side, sum(qty) from CumulativePrice
For testing purposes or if your application controls concurrency, you may disable context partition locking, see Section 17.6.10.3, “Disable Locking”.
When joining streams the runtime builds a product of the joined data windows based on the where
clause. It analyzes the where
clause at time of statement compilation and
builds the appropriate indexes and query strategy. Avoid using expressions in the join where
clause that require evaluation, such as user-defined functions or arithmatic expressions.
When joining streams and not providing a where
clause, consider using the #unique
data window or #lastevent
data window to join only the last event or the last event per unique key(s) of each stream.
The sample statement below can produce up to 5,000 rows when both data windows are filled and an event arrives for either stream:
// Avoid joins between streams with data windows without where-clause select * from StreamA#length(100), StreamB#length(50)
Consider using a subquery, consider using separate statements with insert-into and consider providing a where
clause to limit the product of rows.
Below examples show different approaches, that are not semantically equivalent, assuming that an MyEvent
is defined with the properties symbol and value:
// Replace the following statement as it may not perform well select a.symbol, avg(a.value), avg(b.value) from MyEvent#length(100) a, MyEvent#length(50) b // Join with where-clause select a.symbol, avg(a.value), avg(b.value) from MyEvent#length(100) a, MyEvent#length(50) b where a.symbol = b.symbol // Unidirectional join with where-clause select a.symbol, avg(b.value) from MyEvent unidirectional, MyEvent#length(50) b where a.symbol = b.symbol // Subquery select (select avg(value) from MyEvent#length(100)) as avgA, (select avg(value) from MyEvent#length(50)) as avgB, a.symbol from MyEvent // Since streams cost almost nothing, use insert-into to populate and a unidirectional join insert into StreamAvgA select symbol, avg(value) as avgA from MyEvent#length(100) insert into StreamAvgB select symbol, avg(value) as avgB from MyEvent#length(50) select a.symbol, avgA, avgB from StreamAvgA unidirectional, StreamAvgB#unique(symbol) b where a.symbol = b.symbol
A join is multidirectionally evaluated: When an event of any of the streams participating in the join arrive, the join gets evaluated, unless using the unidirectional keyword. Consider using a subquery instead when evaluation only needs to take place when a certain event arrives:
// Rewrite this join since you don't need to join when a LoginSucceededWindow arrives // Also rewrite because the account number always is the value 123. select * from LoginSucceededWindow as l, WithdrawalWindow as w where w.accountNumber = '123' and w.accountNumber = l.accountNumber // Rewritten as a subquery, select *, (select * from LoginSucceededWindow where accountNumber=’123’) from WithdrawalWindow(accountNumber=’123’) as w
The every
and repeat operators in patterns control the number of sub-expressions that are active. Each sub-expression can consume memory as it may retain, depending on the use of tags in the pattern,
the matching events. A large number of active sub-expressions can reduce performance or lead to out-of-memory errors.
During the design of the pattern statement consider the use of timer:within
to reduce the amount of time a sub-expression lives, or consider the not
operator to end a sub-expression.
The examples herein assume an AEvent
and a BEvent
event type that have an id
property that may correlate between arriving events of the two event types.
In the following sample pattern the runtime starts, for each arriving AEvent, a new pattern sub-expression looking for a matching BEvent. Since the AEvent is tagged with a
the runtime retains
each AEvent until a match is found for delivery to listeners or subscribers:
every a=AEvent -> b=BEvent(b.id = a.id)
One way to end a sub-expression is to attach a time how long it may be active.
The next statement ends sub-expressions looking for a matching BEvent 10 seconds after arrival of the AEvent event that started the sub-expression:
every a=AEvent -> (b=BEvent(b.id = a.id) where timer:within(10 sec))
A second way to end a sub-expression is to use the not
operator. You can use the not
operator together with the and
operator to end a sub-expression when a certain event arrives.
The next statement ends sub-expressions looking for a matching BEvent when, in the order of arrival, the next BEvent that arrives after the AEvent event that started the sub-expression does not match the id of the AEvent:
every a=AEvent -> (b=BEvent(b.id = a.id) and not BEvent(b.id != a.id))
The every-distinct
operator can be used to keep one sub-expression alive per one or more keys. The next pattern demonstrates an alternative to every-distinct
. It ends sub-expressions looking for a matching BEvent when an AEvent arrives that matches the id of the AEvent that started the sub-expression:
every a=AEvent -> (b=BEvent(b.id = a.id) and not AEvent(b.id = a.id))
For some use cases you can either specify one or more data windows as the solution, or you can specify a pattern that also solves your use case.
For patterns, you should understand that the runtime employs a dynamic state machine. For data windows, the runtime employs a delta network and collections. Generally you may find patterns that require a large number of sub-expression instances to consume more memory and more CPU then data windows.
For example, consider the following statement that filters out duplicate transaction ids that occur within 20 seconds of each other:
select * from TxnEvent#firstunique(transactionId)#time(20 sec)
You could also address this solution using a pattern:
select * from pattern [every-distinct(a.transactionId) a=TxnEvent where timer:within(20 sec)]
If you have a fairly large number of different transaction ids to track, you may find the pattern to perform less well then the data window solution as the pattern asks the runtime to manage a pattern sub-expression per transaction id. The data window solution asks the runtime to manage expiry, which can give better performance in many cases.
Similar to this, it is generally preferable to use EPL join syntax over a pattern that cardinally detects relationships i.e. pattern [every-distinct(...) ... -> every-distinct(...) ...]
. Join query planning is a powerful compiler and runtime feature that implements fast relational joins.
The #keepall
data window is a data window that retains all arriving events. The data window can be useful during the development phase and to implement a custom expiry policy using on-delete
and named windows. Care should be taken to timely remove from the keep-all data window however. Use on-select
or fire-and-forget queries to count the number of rows currently held by a named window with keep-all expiry policy.
This section describes common sources of out-of-memory problems.
If using the keep-all data window please consider the information above. If using pattern statements please consider pattern sub-expression instantiation and lifetime as discussed prior to this section.
When using the group-by
clause or #groupwin
grouped data windows please consider the hints as described below. Make sure your grouping criteria are fields that don't have an unlimited number of possible values or specify hints otherwise.
The #unique
unique data window can also be a source for error. If your uniqueness criteria include a field which is never unique the memory use of the data window can grow, unless your application deletes events.
When using the every-distinct
pattern construct parameterized by distinct value expressions that generate an unlimited number of distinct values, consider specifying a time period as part of the parameters to indicate to the runtime how long a distinct value should be considered.
In a match-recognize pattern consider limiting the number of optional events if optional events are part of the data reported in the measures
clause. Also when using the partition clause, if your partitioning criteria include a field which is never unique the memory use of the match-recognize runtime can grow.
A further source of memory use is when your application deploys modules but fails to undeploy modules when they are no longer needed.
In your application design you may also want to be conscious when the application listener or subscriber objects retain output data.
A runtime, uniquely identified by a runtime URI
is a relatively heavyweight object. Optimally your application allocates less than one-thousand (1000) runtime instances per JVM.
A statement instance is associated to one runtime instance, is uniquely identified by a statement name and is a medium weight object. We have seen applications allocate 100,000 statements easily.
A statement's context partition instance is associated to one statement, is uniquely identified by a context partition id and is a light weight object. We have seen applications allocate 5000 context partitions for 100 statements easily, i.e. 5,000,000 context partitions.
An aggregation row, data window row, pattern etc. is associated to a statement context partition and is a very lightweight object itself.
The prev
, prevwindow
and prevtail
functions access a data window directly. The runtime does not need to maintain a separate data structure and grouping is based on the use of the #groupwin
grouped data window.
Compare this to the use of event aggregation functions such as first
, window
and last
which group according to the group by
clause. If your statement utilizes both together consider reformulating to use prev
instead.
Performance will also depend on your JVM (Sun HotSpot, BEA JRockit, IBM J9), your operating system and your hardware. A JVM performance index such as specJBB at spec.org can be used. For memory intensive statement, you may want to consider 64bit architecture that can address more than 2GB or 3GB of memory, although a 64bit JVM usually comes with a slow performance penalty due to more complex pointer address management.
The choice of JVM, OS and hardware depends on a number of factors and therefore a definite suggestion is hard to make. The choice depends on the number of statements, and number of threads. A larger number of threads would benefit of more CPU and cores. If you have very low latency requirements, you should consider getting more GHz per core, and possibly soft real-time JVM to enforce GC determinism at the JVM level, or even consider dedicated hardware such as Azul. If your statements utilize large data windows, more RAM and heap space will be utilized hence you should clearly plan and account for that and possibly consider 64bit architectures or consider EsperHA.
The number and type of statements is a factor that cannot be generically accounted for. The benchmark kit can help test out some requirements and establish baselines, and for more complex use cases a simulation or proof of concept would certainly works best. EsperTech' experts can be available to help write interfaces in a consulting relationship.
The @Hint annotation provides a single keyword or a comma-separated list of keywords that provide instructions to the compiler and runtime towards statement execution that affect runtime performance and memory-use of statements. Also see Section 5.2.7.9, “@Hint”.
The query planning in general is described in Section 24.2.31, “Notes on Query Planning”.
The hint for influencing query planning expression analysis is described at Section 24.2.32, “Query Planning Expression Analysis Hints”.
The hint for influencing query planning index choice is described at Section 24.2.33, “Query Planning Index Hints”.
Further hints, also related to query planning, for use with joins, outer joins, unidirectional joins, relational and non-relational joins are described in Section 5.12.6, “Hints Related to Joins”.
The hint for use with group by
to specify how state for groups is reclaimed is described in Section 5.6.2.1, “Hints Pertaining to Group-By” and Section 14.3.15, “Grouped Data Window (groupwin or std:groupwin)”.
The hint for use with group by
to specify aggregation state reclaim for unbound streams and timestamp groups is described in Section 5.6.2.1, “Hints Pertaining to Group-By”.
The hint for use with match_recognize
to specify iterate-only is described in Section 8.4.7, “Eliminating Duplicate Matches”.
To tune subquery performance when your subquery selects from a named window, consider the hints discussed in Section 5.11.8, “Hints Related to Subqueries”.
The @NoLock
hint to remove context partition locking (also read caution note) is described at Section 16.8, “Runtime Threading and Concurrency”.
The hint to control expansion of filter expressions, further described at Section 17.5.8.1, “Filter Service Max Filter Width”.
Assume your statement invokes a static method in the stream filter as the below statement shows as an example:
select * from MyEvent(MyHelperLibrary.filter(field1, field2, field3, field4*field5))
As a result of starting above statement, the runtime must evaluate each MyEvent event invoking the MyHelperLibrary.filter
method and passing certain event properties. The same applies to pattern filters that
specify functions to evaluate.
If possible, consider moving some of the checking performed by the function back into the filter or consider splitting the function into a two parts separated by and
conjunction. In general for all expressions, the runtime evaluates expressions left of the and
first and can skip evaluation of the
further expressions in the conjunction in the case when the first expression returns false. In addition the compiler can determine filter index fields and the runtime can build a filter index for fields provided in stream or pattern filters.
For example, the below statement could be faster to evaluate:
select * from MyEvent(field1="value" and MyHelperLibrary.filter(field1, field2, field3, field4*field5))
You can use statement and runtime metric reporting as described in Section 16.12, “Runtime and Statement Metrics Reporting” to monitor performance or identify slow statements.
The term "early exit" or "short-circuit evaluation" refers to when the runtime can evaluate an expression without a complete evaluation of all sub-expressions.
Consider an expression such as follows:
where expr1 and expr2 and expr3
If expr1 is false the runtime does not need to evaluate expr2 and expr3. Therefore when using the AND
logical operator consider reordering expressions placing the most-selective expression first and less selective expressions thereafter.
The same is true for the OR
logical operator: If expr1 is true the runtime does not need to evaluate expr2 and expr3. Therefore when using the OR
logical operator consider reordering expressions placing the least-selective expression first and more selective expressions thereafter.
The order of expressions (here: expr1, expr2 and expr3) does not make a difference for the join and subquery query planner.
Note that the runtime does not guarantee short-circuit evaluation in all cases. The runtime may rewrite the where-clause or filter conditions into another order of evaluation so that it can perform index lookups.
When using a large number of threads with the runtime, such as more then 100 threads, you can provide a setting in the configuration that instructs the runtime to reduce the use of thread-local variables. Please see Section 17.6.10, “Runtime Settings Related to Execution of Statements” for more information.
We offer a switch for tuning evaluation of incoming events against filters. Please see Section 17.6.10, “Runtime Settings Related to Execution of Statements” for more information.
As the runtime locks on the level of context partition, high concurrency under threading can be achieved by using context partitions.
Generally context partitions require more memory then the more fine-grained grouping that can be achieved by group by
or #groupwin
.
The create-variable syntax as well as the APIs can identify a variable as a constant value. When a variable's value is not intended to change it is best to declare the variable as constant.
For example, consider the following two statements that each declares a variable. The first statement declares a constant variable and the second statement declares a non-constant variable:
// declare a constant variable create constant variable CONST_DEPARTMENT = 'PURCHASING'
// declare a non-constant variable create variable VAR_DEPARTMENT = 'SALES'
When your application compiles a statement that has filters for events according to variable values, the compiler internally inspects such expressions and performs filter optimizations for constant variables that are more effective in evaluation.
For example, consider the following two statements that each look for events related to persons that belong to a given department:
// perfer the constant select * from PersonEvent(department=CONST_DEPARTMENT)
// less efficient select * from PersonEvent(department=VAR_DEPARTMENT)
The runtime can more efficiently evaluate the expression using a variable declared as constant. The same observation can be made for subquery and join query planning.
Object-array events offer the best read access performance for access to event property values. In addition, object-array events use much less memory then Map-type events. They also offer the best write access performance.
A comparison of different event representations is in Section 3.5, “Comparing Event Representations”.
First, we recommend that your application sends object-array events into the runtime, instead of Map-type events. See Appendix F, Event Representation: Object-Array (Object[]) Events for more information.
Second, we recommend that your application sets the compiler configuration of the default event representation to object array, as described in Section 17.4.9.1, “Default Event Representation”. Alternatively you can use the @EventRepresentation(objectarray)
annotation with individual statements.
Query planning takes place for subqueries, joins (any type), named window and table on-actions (on-select, on-merge, on-insert, on-update, on-select) and fire-and-forget queries. Query planning affects query execution speed. Enable query plan logging to output query plan information.
For query planning, the compiler draws information from:
The where
-clauses, if any are specified. Where
-clauses correlate streams, patterns, named windows, tables etc. with more streams, patterns, tables and named windows and are thus the main source of information for query planning.
The data window(s) declared on streams and named windows.
The #unique
and the #firstunique
data window instruct the compiler to retain the last event per unique criteria.
For named windows and tables, the explicit indexes created via create unique index
or create index
.
For named windows (and not tables), the previously created implicit indexes. The compiler can plan to create implicit indexes automatically if explicit indexes do not match correlation requirements.
Any hints specified for the statement in question and including hints specified during the creation of named windows with create window
.
The compiler prefers unique indexes over non-unique indexes.
The compiler prefers hash-based lookups (equals) and combination hash-btree lookups (equals and relational-operator or range) over btree lookups (relational-operator or range) over in-keyword (single and multi-index) lookup plans. This behavior can be controlled by hints that are discussed next.
The expression analysis hints impact query planning for any statement and fire-and-forget query that performs a join or subquery. They also impact named window and table on-action statements.
This hint instructs the compiler which expressions, operators or streams should be excluded and therefore not considered for query planning.
The hint applies to the where
-clause and, for outer joins, to the on
-clause when present.
The hint takes a single expression as its sole parameter, which is placed in parenthesis. The expression must return a boolean value.
When the provided expression returns true for a given combination, that combination will not be considered for the query plan. A combination consists of a from-stream (name or number), a to-stream (name or number), an operator (i.e. equals, relational, in-keyword) and a set of expressions.
Table 24.1. Built-In Properties of the Expression Analysis Hint
Name | Type | Description |
---|---|---|
exprs | string-array (String[] ) | Expression texts with minified whitespace. |
from_streamname | string | The stream name of the stream providing lookup values as provided by the as keyword. |
from_streamnum | int | The integer ordinal number of the stream providing lookup values as listed in the from-clause. |
opname | string | The operator name. Valid values are equals , relop (relational operators and ranges) and inkw (in -keyword). |
to_streamname | string | The stream name of the stream providing indexable values as provided by the as keyword. |
to_streamnum | int | The integer ordinal number of the stream providing indexable values as listed in the from-clause. |
Consider two event types A and B. Event type A has a property aprop
and
event type B has a property bprop
. Let's assume A and B are related by aprop
and bprop
.
An inner join of all A and B events might look like this:
select * from A#keepall as a, B#keepall as b where aprop = bprop
In the default query plan, when an A event comes in, the runtime obtains the value of aprop
and performs an index lookup against bprop
values to obtain matching B events. Vice versa, when a B event comes in, the runtime obtains the value of bprop
and performs an index lookup against aprop
values to obtain matching A events.
The compiler evaluates the hint expression for each combination. The table below outlines the two rows provided to the hint expression:
Table 24.2. Built-In Properties of the Expression Analysis Hint
exprs | from_streamname | from_streamnum | opname | to_streamname | to_streamnum |
---|---|---|---|---|---|
["aprop", "bprop"] | a | 0 | equals | b | 1 |
["bprop", "aprop"] | b | 1 | equals | a | 0 |
The following statement with hint causes the analyzer to exclude all combinations since the expression passed in always returns true, in effect causing the query planner to always execute the statement as a full table scan.
@hint('exclude_plan(true)') select * from A#keepall as a, B#keepall as b where aprop = bprop
This hint instructs the compiler to ignore all equals-operators for query planning:
@hint('exclude_plan(opname="equals")') select ....
The next hint instructs the compiler to ignore the equals-operator for the direction of lookup from A to B:
@hint('exclude_plan(opname="equals" and from_streamname="a")') select ....
Conversely, this hint instructs the compiler to ignore the equals-operator for the direction of lookup from B to A:
@hint('exclude_plan(opname="equals" and from_streamname="b")') select ....
Use the exprs
array of expression texts to exclude specific expressions:
@hint('exclude_plan(exprs[0]="aprop")') select ....
For subqueries the stream number zero is the subquery from-clause itself and 1 to N are the enclosing statement's from-clause streams. For named window and table on-action statements the stream number zero is the named window or table and stream number 1 refers to the triggering pattern or event.
To specify multiple expressions, please specify multiple hints. The compiler excludes a specific combination when any of the hint expressions returns true.
To inspect values passed to the hint expression, please enable query plan logging. To inspect expression evaluation, please use @Audit
.
Currently index hints are only supported for the following types of statements:
Named window and table on-action statements (on-select, on-merge, on-insert, on-update, on-select).
Statements that have subselects against named windows that have index sharing enabled (the default is disabled).
Statements that have subselects against tables.
Fire-and-forget queries.
For the above statements, you may dictate to the compiler which explicit index (created via create index
syntax) to use.
Specify the name of the explicit index in parentheses following @Hint
and the index
literal.
The following example instructs the compiler to use the UserProfileIndex
if possible:
@Hint('index(UserProfileIndex)')
Add the literal bust
to instruct the compiler to use the index, or if the compiler cannot use the index fail query planning with an exception and therefore fail statement compilation.
The following example instructs the compiler to use the UserProfileIndex
if possible or fail with an exception if the index cannot be used:
@Hint('index(UserProfileIndex, bust)')
Multiple indexes can be listed separated by comma (,
).
The next example instructs the compiler to consider the UserProfileIndex
and the SessionIndex
or fail with an exception if either index cannot be used:
@Hint('index(UserProfileIndex, SessionIndex, bust)')
The literal explicit
can be added to instruct the compiler to use only explicitly created indexes.
The final example instructs the compiler to consider any explicitly create index or fail with an exception if any of the explicitly created indexes cannot be used:
@Hint('index(explicit, bust)')
We recommend using System.nanoTime()
to measure elapsed time when processing a batch of, for example, 1000 events.
Note that System.nanoTime()
provides nanosecond precision, but not necessarily nanosecond resolution.
Therefore don't try to measure the time spent by the runtime processing a single event: The resolution of System.nanoTime()
is not sufficient.
Also, there are reports that System.nanoTime()
can be actually go "backwards" and may not always behave as expected under threading.
Please check your JVM platform documentation.
In the default configuration, the best way to measure performance is to take nano time, send a large number of events, for example 10.000 events, and take nano time again reporting on the difference between the two numbers.
If your configuration has inbound threading or other threading options set, you should either monitor the queue depth to determine performance, or disable threading options when measuring performance, or have your application use multiple threads to send events instead.
It is vastly more efficient to create a statement once and attach multiple listeners, then to create the same statement X times.
It is vastly more efficient to use context declarations to factor out commonalities between statements then creating X similar statements.
EPL, the compiler and runtime are optimized for low-latency and high-throughput execution. In order to accomplish that the compiler analyzes and query-plans. Certain information within each statement can effectively shared in the runtime (indexes) so that the runtime can remove duplication of processing and thus the runtime can achieve low-latency and high-throughput. The tradeoff is that the compiler must, for each statement, perform some upfront analysis.
Since your goal will be to make all test code as realistic, real-world and production-like as possible, we recommend against production code or test code deploying the same exact statement multiple times. Instead consider creating the same statement once and attaching multiple listeners. The compiler and runtime do not try to detect duplicate statements, since that can easily be done by your application.
Let's assume your test statement computes an aggregation over a 1-minute time window, for example select symbol, count(*) from StockTick#time(1 min) group by symbol
.
If your code creates the same statement 100 times the code instructs the runtime to track 100 logically independent time windows
and to track aggregations for each group 100 times. Obviously, this is not a good use of EPL and the design of your statements and code may not be optimal.
Consider the world of relational databases. Your code could attach to a relational database, create the same table with a different name 100 times, and populate each of the 100 different tables with the same row data. A relational database administrator would probably recommend against creating 100 identical tables holding the same row data. Compare a statement to a relational database table in respect to how many there should be. In a good design there are limited number of statements. The runtime is not specifically designed for very large number of statements. Similarly a relational database schema design that has 100,000 tables would be something one would seriously question. It depends on the statement itself in respect to how many statements fit into memory and there is no general guideline.
EPL allows you the freedom to design your EPL in a way that reuses state and processing. For example, your EPL design could utilize a named window instead of allocating 100 independent time window. Since named windows are shared, the runtime only needs to track one time window instead of 100. And your EPL design could use an EPL table to maintain aggregations once and in a central place, so that tracking counts per symbol is done once instead of 100 times.
Context declarations can be an efficient way to take commonalities between statements (things that are similar between multiple statements) and factor them out into a context declaration. Instead of creating X similar statements, declare a context and attach one statement to the context, thus having X context partitions. This eliminates compiling and/or deploying X same statements. Using context the compiler only needs to analyze the context declaration and the statement. Your application can send start and stop events to control which context partitions exist and what events each context partition analyzes. Use the context partition administrative API to browse or terminate context partitions.
For example, assume you need to create 100000 similar statements that all filter GeoEvent
events:
create schema GeoEvent(id string, value int, marker string)
@name('statment-1) select * from GeoEvent(id = '0001', value between 10 and 20, marker in ('a', 'b'))
@name('statment-N) select * from GeoEvent(id = '0002', value between 20 and 30, marker in ('c', 'd'))
If your application compiles and deploys 100k statements as above, the compiler must analyze and query plan each statement separately, and the runtime must enter each set of filter criteria into a shared filter index tree. Remember that the runtime can process incoming events very fast, with low latency and high throughput, even for 100k statements. However compiling and deploying 100k individual statements does take CPU time.
In this example, the statements have similar filters: id =
an_id, value between
start_range and
end_range and
marker in (
markers)
.
You could say that statements are similar and look like:
select * from GeoEvent(id=an_id, value between start_range and end_range, marker in (markers))
The an_id, start_range, end_range and markers are essential parameters to an instance of the filtering statement. Instances of statements are context partitions. Declare a context to refactor and change our design so the common filters are in one place. This apprach just requires two statements: the context declaration and the statement with the filters. You may declare two event types: one to allocate new context partitions and one to terminate context partitions.
Start by creating an event type that controls which instances of the filtering statement (the context partitions) are active:
create schema InitEvent(id string, startRange int, endRange int, markers string[])
Next, create an event type that controls when a context partition terminates:
create schema TermEvent(id string)
The context declaration tells the runtime that when an InitEvent
arrives you want have a new instance that is parameterized by the InitEvent
properties:
create context GeoEventFilterContext initiated by InitEvent as initevent terminated by by TermEvent(id=initevent.id)
Define the statement that filters:
context GeoEventFilterContext select * from GeoEvent(id = context.initevent.id, value between context.initevent.startRange and context.initevent.endRange, marker in (context.initevent.markers))
Your application can now send InitEvent
instances, for example (notation from the online EPL tool):
InitEvent={id='0001', startRange=10, endRange=20, markers={'a', 'b'}} InitEvent={id='0002', startRange=20, endRange=30, markers={'c', 'd'}}
When the runtime receives an InitEvent
instance, it can simply take the id
, startRange
, endRange
and markers
values and
instantiate the EPL filter statement (aka. allocate a new context partition) and start looking for matching GeoEvent
events.
To stop looking for a given id, send a TermEvent
, like so:
TermEvent={id='0001'}
The Java Virtual Machine optimizes locks such that the time to obtain a read lock, for example, differs widely between single-threaded and multi-threaded applications.
We compared code that obtains an unfair ReentrantReadWriteLock
read lock 100 million times, without any writer.
We measured 3 seconds for a single-threaded application and 15 seconds for an application with 2 threads.
It can therefore not be expected that scaling from single-threaded to 2 threads will always double performance. There is a base cost for multiple threads to coordinate.
Whether aggregations of named window rows are computed incrementally or are recomputed from scratch depends on the type of statement.
When the runtime computes aggregation values incrementally, meaning it continuously updates the aggregation value as events enter and leave a named window, it means that the runtime internally subscribes to named window updates and applies these updates as they occur. For some applications this is the desired behavior.
For some applications re-computing aggregation values from scratch when a certain condition occurs, for example when a triggering event arrives or time passes, is beneficial. Re-computing an aggregation can be less expensive if the number of rows to consider is small and/or when the triggering event or time condition triggers infrequently.
The next paragraph assumes that a named window has been created to hold some historical financial data per symbol and minute:
create window HistoricalWindow#keepall as (symbol string, int minute, double price)
insert into HistoricalWindow select symbol, minute, price from HistoricalTick
For statements that simply select from a named window (excludes on-select) the runtime computes aggregation values incrementally, continuously updating the aggregation, as events enter and leave the named window.
For example, the below statement updates the total price incrementally as events enter and leave the named window. If events in the named window already exist at the time the statement gets created, the total price gets pre-computed once when the statement gets created and incrementally updated when events enter and leave the named window:
select sum(price) from HistoricalWindow(symbol='GE')
The same is true for uncorrelated subqueries. For statements that sub-select from a named window, the runtime computes aggregation values incrementally, continuously updating the aggregation, as events enter and leave the named window. This is only true for uncorrelated subqueries that don't have a where-clause.
For example, the below statement updates the total price incrementally as events enter and leave the named window. If events in the named window already exist at the time the statement gets created, the total price gets pre-computed once when the statement gets created and incrementally updated when events enter and leave the named window:
// Output GE symbol total price, incrementally computed // Outputs every 15 minutes on the hour. select (sum(price) from HistoricalWindow(symbol='GE')) from pattern [every timer:at(0, 15, 30, 45), *, *, *, *, 0)]
If instead your application uses on-select
or a correlated subquery, the runtime recomputes aggregation values from scratch every time the triggering event fires.
For example, the below statement does not incrementally compute the total price (use a plain select or subselect as above instead). Instead the runtime computes the total price from scratch based on the where-clause and matching rows:
// Output GE symbol total price (recomputed from scratch) every 15 minutes on the hour on pattern [every timer:at(0, 15, 30, 45), *, *, *, *, 0)] select sum(price) from HistoricalWindow where symbol='GE'
Unidirectional joins against named windows also do not incrementally compute aggregation values.
Joins and outer joins, that are not unidirectional, compute aggregation values incrementally.
Java Virtual Machines (JVMs) release memory only when a garbage collection occurs. Depending on your JVM settings a garbage collection can occur frequently or infrequently and may consider all or only parts of heap memory.
The runtime is optimized towards latency and throughput. The runtime does not force garbage collection or interfere with garbage collection. For performance-sensitive code areas, the runtime utilizes thread-local buffers such as arrays or ringbuffers that can retain small amounts of recently processed state. The runtime does not try to clean such buffers after every event for performance reasons. It does clean such buffers when destroying the runtime and undeploying. It is therefore normal to see a small non-increasing amount of memory to be retained after processing events that the garbage collector may not free immediately.
When an event comes in and the event does not match any statement, the runtime can discard that event since the event is a non-match. When measuring throughput, we suggest including non-matching events. The fact that the runtime can discard non-matching events extremely fast is an important aspect of processing.
Many use cases look for a needle-in-a-haystack situation or rarely occurring pattern. For example, a use case looking for security breaches may analyze 10 million events and find only a single situation consisting, for example, of 5 correlated events of the 10 million input events. We'd recommend your benchmark to closely mimic or to play back production data and watch the expected ratio of input and output events. Reducing the number of output events generally increases performance.
For example, assume you have 10 statements:
select * from pattern[A -> B(id = 1)]; select * from pattern[A -> B(id = 2)]; ..... select * from pattern[A -> B(id = 10)];
The above patterns each match once when an A event comes in followed by a B event with a given id between 1 and 10.
We recommend to measure throughput by sending in B events that have a value of minus one (-1) for id, for example, to determine how fast such events are discarded.
We would consider an event type that has more than 1000 event properties to be an event type with a large number of properties. Here are some of the available options for handling large events (not in order of preference, this is just a list):
One can design the EPL so that only those event properties that matter for the specific use case are retained and the Esper runtime can thus garbage collect the original event and retain only the few event properties that the specific use case needs.
For example using insert into ReducedEvent select a,b from LargeEvent
and select ... from ReducedEvent...
.
One can design the EPL so that there is no need for the runtime to hold on to the large event objects themselves. This documentation and the FAQ page describe in detail when the runtime does and does not retain events.
Use EsperHA since EsperHA has the ability to remove state from memory and swap it back into memory when needed.
The JVM in general needs more memory for objects like strings or arrays. Making sure that event properties have the smallest memory footprint can help, i.e. a primitive integer uses less memory than a string.
Instead of keeping all event properties on the event, reduce the event properties to only those properties that matter. The application code could lazy-fetch the remaining properties from an external source when they are needed.
Event properties could be stored binary-encoded which also uses less memory (use an external library or write your own binary encoding; Esper does not provide a binary encoding). The application code could for instance provide "getter"-methods that query the encoded data.
The benchmark application is basically an event server build with the runtime that listens to remote clients over TCP. Remote clients send MarketData(ticker, price, volume) streams to the event server. The event server is started with 1000 statements of one single kind (unless otherwise written), with one statement per ticker symbol, unless the statement kind does not depend on the symbol. The statement prototype is provided along the results with a '$' instead of the actual ticker symbol value. The event server is entirely multithreaded and can leverage the full power of 32bit or 64bit underlying hardware multi-processor multi-core architecture.
The kit also prints out when starting up the event size and the theoretical maximal throughput you can get on a 100 Mbit/s and 1 Gbit/s network. Keep in mind a 100 Mbit/s network will be overloaded at about 400 000 event/s when using our kit despite the small size of events.
Results are posted on our Wiki page at Performance Wiki. Reported results do not represent best ever obtained results. Reported results may help you better compare Esper to other solutions (for latency, throughput and CPU utilization) and also assess your target hardware and JVMs.
The event server, client and statement prototypes are provided in the source repository
esper/trunk/examples/benchmark/
.
Refer to http://www.espertech.com/esper
for source access.
If you use the kit you should:
Choose the statement you want to benchmark, add it to
etc/statements.properties
under
your own KEY and use the
-mode KEY
when you start the event server.
Prepare your runServer.sh/runServer.cmd and runClient.sh/runclient.cmd scripts. You'll need to
drop required
jar libraries in
lib/
, make sure the classpath is configured in those script to include
build
and
etc
. The required libraries are Esper (any compatible version, we have tested started with Esper
1.7.0)
and its dependencies.
Note that ./etc
and ./build
have to be in the classpath.
At that stage you should also start to set min and max JVM heap. A good start is 1GB as in
-Xms1g -Xmx1g
Write the statement you want to benchmark given that client will send a stream MarketData(String
ticker, int volume, double price), add it to
etc/statements.properties
under
your own KEY and use the
-mode KEY
when you start the event server.
Use '$'
in the statement to create a prototype. For every symbol, a statement
will get registered with all '$'
replaced by the actual symbol value (f.e. 'GOOG'
)
Ensure client and server are using the same
-Desper.benchmark.symbol=1000
value.
This sets the number of symbol to use (thus may set the number of statement if you are using
a statement prototype, and governs how MarketData event are represented over the network.
Basically all events will have the same size over the network to ensure predictability and will be ranging
between S0AA
and S999A
if you use 1000 as a value here (prefix with S and padded with A up to
a fixed length string. Volume and price attributes will be randomized.
By default the benchmark registers a subscriber to the statement(s). Use -Desper.benchmark.ul
to use
an UpdateListener instead. Note that the subscriber contains suitable update(..) methods for the default
proposed statement in the etc/statements.properties
file but might not be suitable if you change statements due
to the strong binding with statement results. Refer to Table 16.2, “Choices For Receiving Statement Results”.
Establish a performance baseline in simulation mode (without clients). Use the
-rate 1x5000
option
to simulate one client (one thread) sending 5000 evt/s. You can ramp up both the number of client simulated
thread and their emission rate to maximize CPU
utilization.
The right number should mimic the client emission rate you will use in the client/server benchmark
and should thus be
consistent with what your client machine and network will be able to send.
On small hardware, having a lot of thread with slow rate will not help getting high throughput in this
simulation mode.
Do performance runs with client/server mode. Remove the
-rate NxM
option from the runServer script or Ant task.
Start the server with
-help
to display the possible server options (listen port, statistics, fan out options etc).
On the remote machine, start one or more client. Use
-help
to display the possible client options (remote port, host,
emission rate). The client will output the actual number of event it is sending to the server.
If the server gets overloaded (or if you turned on
-queue
options on the server) the client will likely
not be able to reach its target rate.
Usually you will get better performance by using server side -queue -1
option so as to have
each client connection handled by a single thread pipeline. If you change to 0 or more, there will be
intermediate structures to pass the event stream in an asynchronous fashion. This will increase context
switching, although if you are using many clients, or are using the -sleep xxx
(xxx in
milliseconds) to simulate a listener delay you may get better performance.
The most important server side option is -stat xxx
(xxx in seconds) to print out
throughput and latency statistics aggregated over the last xxx seconds (and reset every time).
It will produce both internal latency (in nanosecond) and also end to end latency (in millisecond, including network time).
If you are measuring end to end latency you should make sure your server and client machine(s) are having the same time
with f.e. ntpd with a good enough precision.
The stat format is like:
---Stats - runtime (unit: ns) Avg: 2528 #4101107 0 < 5000: 97.01% 97.01% #3978672 5000 < 10000: 2.60% 99.62% #106669 10000 < 15000: 0.35% 99.97% #14337 15000 < 20000: 0.02% 99.99% #971 20000 < 25000: 0.00% 99.99% #177 25000 < 50000: 0.00% 100.00% #89 50000 < 100000: 0.00% 100.00% #41 100000 < 500000: 0.00% 100.00% #120 500000 < 1000000: 0.00% 100.00% #2 1000000 < 2500000: 0.00% 100.00% #7 2500000 < 5000000: 0.00% 100.00% #5 5000000 < more: 0.00% 100.00% #18 ---Stats - endToEnd (unit: ms) Avg: -2704829444341073400 #4101609 0 < 1: 75.01% 75.01% #3076609 1 < 5: 0.00% 75.01% #0 5 < 10: 0.00% 75.01% #0 10 < 50: 0.00% 75.01% #0 50 < 100: 0.00% 75.01% #0 100 < 250: 0.00% 75.01% #0 250 < 500: 0.00% 75.01% #0 500 < 1000: 0.00% 75.01% #0 1000 < more: 24.99% 100.00% #1025000 Throughput 412503 (active 0 pending 0 cnx 4)
This one reads as:
"Throughput is 412 503 event/s with 4 client connected. No -queue options was used thus no event is pending at the time the statistics are printed. latency average is at 2528 ns (that is 2.5 us) for 4 101 107 events (which means we have 10 seconds stats here). Less than 10us latency was achieved for 106 669 events that is 99.62%. Latency between 5us and 10us was achieved for those 2.60% of all the events in the interval." "End to end latency was ... in this case likely due to client clock difference we ended up with unusable end to end statistics."
Consider the second output paragraph on end-to-end latency:
---Stats - endToEnd (unit: ms) Avg: 15 #863396 0 < 1: 0.75% 0.75% #6434 1 < 5: 0.99% 1.74% #8552 5 < 10: 2.12% 3.85% #18269 10 < 50: 91.27% 95.13% #788062 50 < 100: 0.10% 95.32% #827 100 < 250: 4.36% 99.58% #37634 250 < 500: 0.42% 100.00% #3618 500 < 1000: 0.00% 100.00% #0 1000 < more: 0.00% 100.00% #0
This would read:
"End to end latency average is at 15 milliseconds for the 863 396 events considered for this statistic report. 95.13% ie 788 062 events were handled (end to end) below 50ms, and 91.27% were handled between 10ms and 50ms."