esper.codehaus.org and espertech.comDocumentation

Chapter 21. Performance

21.1. Performance Results
21.2. Performance Tips
21.2.1. Understand how to tune your Java virtual machine
21.2.2. Input and Output Bottlenecks
21.2.3. Theading
21.2.4. Select the underlying event rather than individual fields
21.2.5. Prefer stream-level filtering over where-clause filtering
21.2.6. Reduce the use of arithmetic in expressions
21.2.7. Remove Unneccessary Constructs
21.2.8. End Pattern Sub-Expressions
21.2.9. Consider using EventPropertyGetter for fast access to event properties
21.2.10. Consider casting the underlying event
21.2.11. Turn off logging and audit
21.2.12. Disable view sharing
21.2.13. Tune or disable delivery order guarantees
21.2.14. Use a Subscriber Object to Receive Events
21.2.15. Consider Data Flows
21.2.16. High-Arrival-Rate Streams and Single Statements
21.2.17. Subqueries versus Joins And Where-clause And Data Windows
21.2.18. Patterns and Pattern Sub-Expression Instances
21.2.19. Pattern Sub-Expression Instance Versus Data Window Use
21.2.20. The Keep-All Data Window
21.2.21. Statement Design for Reduced Memory Consumption - Diagnosing OutOfMemoryError
21.2.22. Performance, JVM, OS and hardware
21.2.23. Consider using Hints
21.2.24. Optimizing Stream Filter Expressions
21.2.25. Statement and Engine Metric Reporting
21.2.26. Expression Evaluation Order and Early Exit
21.2.27. Large Number of Threads
21.2.28. Filter Evaluation Tuning
21.2.29. Context Partition Related Information
21.2.30. Prefer Constant Variables over Non-Constant Variables
21.2.31. Prefer Object-array Events
21.2.32. Composite or Compound Keys
21.2.33. Notes on Query Planning
21.2.34. Query Planning Expression Analysis Hints
21.2.35. Query Planning Index Hints
21.2.36. Measuring Throughput
21.2.37. Do not create the same EPL Statement X times
21.2.38. Comparing Single-Threaded and Multi-Threaded Performance
21.2.39. Incremental Versus Recomputed Aggregation for Named Window Events
21.2.40. When Does Memory Get Released
21.2.41. Measure throughput of non-matches as well as matches
21.3. Using the performance kit
21.3.1. How to use the performance kit
21.3.2. How we use the performance kit

Esper has been highly optimized to handle very high throughput streams with very little latency between event receipt and output result posting. It is also possible to use Esper on a soft-real-time or hard-real-time JVM to maximize predictability even further.

This section describes performance best practices and explains how to assess Esper performance by using our provided performance kit.

For a complete understanding of those results, consult the next sections.

Esper exceeds over 500 000 event/s on a dual CPU 2GHz Intel based hardware,
with engine latency below 3 microseconds average (below 10us with more than 
99% predictability) on a VWAP benchmark with 1000 statements registered in the system 
- this tops at 70 Mbit/s at 85% CPU usage.

Esper also demonstrates linear scalability from 100 000 to 500 000 event/s on this 
hardware, with consistent results accross different statements.

Other tests demonstrate equivalent performance results
(straight through processing, match all, match none, no statement registered,
VWAP with time based window or length based windows).
                
Tests on a laptop demonstrated about 5x time less performance - that is 
between 70 000 event/s and 200 000 event/s - which still gives room for easy 
testing on small configuration.

We recommend using multiple threads to send events into Esper. We provide a test class below. Our test class does not use a blocking queue and thread pool so as to avoid a point of contention.

A sample code for testing performance with multiple threads is provided:

public class SampleClassThreading {

    public static void main(String[] args) throws InterruptedException {

        int numEvents = 1000000;
        int numThreads = 3;

        Configuration config = new Configuration();
        config.getEngineDefaults().getThreading()
          .setListenerDispatchPreserveOrder(false);
        config.getEngineDefaults().getThreading()
          .setInternalTimerEnabled(false);   // remove thread that handles time advancing
        EPServiceProvider engine = EPServiceProviderManager
          .getDefaultProvider(config);
        engine.getEPAdministrator().getConfiguration().addEventType(MyEvent.class);

        engine.getEPAdministrator().createEPL(
          "create context MyContext coalesce by consistent_hash_crc32(id) " +
          "from MyEvent granularity 64 preallocate");
        String epl = "context MyContext select count(*) from MyEvent group by id";
        EPStatement stmt = engine.getEPAdministrator().createEPL(epl);
        stmt.setSubscriber(new MySubscriber());

        Thread[] threads = new Thread[numThreads];
        CountDownLatch latch = new CountDownLatch(numThreads);

        int eventsPerThreads = numEvents / numThreads;
        for (int i = 0; i < numThreads; i++) {
            threads[i] = new Thread(
              new MyRunnable(latch, eventsPerThreads, engine.getEPRuntime()));
        }
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < numThreads; i++) {
            threads[i].start();
        }

        latch.await(10, TimeUnit.MINUTES);
        if (latch.getCount() > 0) {
            throw new RuntimeException("Failed to complete in 10 minute");
        }
        long delta = System.currentTimeMillis() - startTime;
        System.out.println("Took " + delta + " millis");
    }

    public static class MySubscriber {
        public void update(Object[] args) {
        }
    }

    public static class MyRunnable implements Runnable {
        private final CountDownLatch latch;
        private final int numEvents;
        private final EPRuntime runtime;

        public MyRunnable(CountDownLatch latch, int numEvents, EPRuntime runtime) {
            this.latch = latch;
            this.numEvents = numEvents;
            this.runtime = runtime;
        }

        public void run() {
            Random r = new Random();
            for (int i = 0; i < numEvents; i++) {
                runtime.sendEvent(new MyEvent(r.nextInt(512)));
            }
            latch.countDown();
        }
    }

    public static class MyEvent {
        private final int id;

        public MyEvent(int id) {
            this.id = id;
        }

        public int getId() {
            return id;
        }
    }
}

We recommend using Java threads as above, or a blocking queue and thread pool with sendEvent() or alternatively we recommend configuring inbound threading if your application does not already employ threading. Esper provides the configuration option to use engine-level queues and threadpools for inbound, outbound and internal executions. See Section 15.7.1, “Advanced Threading” for more information.

We recommend the outbound threading if your listeners are blocking. For outbound threading also see the section below on tuning and disabling listener delivery guarantees.

If enabling advanced threading options keep in mind that the engine will maintain a queue and thread pool. There is additional overhead associated with entering work units into the queue, maintaining the queue and the hand-off between threads. The Java blocking queues are not necessarily fast on all JVM. It is not necessarily true that your application will perform better with any of the advanced threading options.

We found scalability better on Linux systems and running Java with -server and pinning threads to exclusive CPUs and after making sure CPUs are available on your system.

We recommend looking at LMAX Disruptor, an inter-thread messaging library, for setting up processing stages. Disruptor, however, is reportedly less suitable for setting up a worker pool.

The sample code below may help you get started setting up a thread pool of workers with back pressure and consideration for IO threads and clean shutdown.

The sample code starts by setting up a thread factory:

private static class EngineThreadFactory implements ThreadFactory {
  private AtomicInteger id = new AtomicInteger(0);

  public Thread newThread(Runnable r) {
    Thread t = new Thread(r, "Event Engine Thread #" + id.incrementAndGet());
    t.setDaemon(true);
    t.setPriority(Thread.NORM_PRIORITY);
    return t;
  }
}

The sample uses a fixed-size array blocking queue. To handle the situation where the queue is full and accepts no more messages, we use a rejection handler that counts the number of rejections and retries:

private class EngineRejectionHandler implements RejectedExecutionHandler {
  private volatile long spinCount = 0;
  
  public long getSpinCount() {
    return spinCount;
  }

  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    ++spinCount;

    try {
      boolean isAccepted = false;
      while (!isAccepted) {
        isAccepted = executorQueue.offer(r, 120, TimeUnit.MICROSECONDS);
      }
    }
    catch (InterruptedException e) {
      log.warn("could not queue work entry");
    }
  }
}

The Runnable that submits an event for processing could look like this:

class Holder implements Runnable {
  public void run() {
    // do any stuff needed to "prepare" event which doesn't involve IO
    esperService.sendEvent(lm);
  }
}

Initialize the queue and worker pool as follows:

  private final static int CAPACITY = 10000;
  private final static int THREAD_COUNT = 4;

  private static EPRuntime esperService;
  private ThreadFactory threadFactory = new EngineThreadFactory();
  private EngineRejectionHandler rejectionHandler = new EngineRejectionHandler();
  private BlockingQueue<Runnable> executorQueue;
  private ThreadPoolExecutor executor;

  public void start() {
    executorQueue = new ArrayBlockingQueue<Runnable>(CAPACITY);
    executor = new ThreadPoolExecutor(THREAD_COUNT, THREAD_COUNT, 0, TimeUnit.SECONDS,
    executorQueue, threadFactory, rejectionHandler);
    executor.allowCoreThreadTimeOut(false);
    while (executor.getPoolSize() < executor.getCorePoolSize()) {
      executor.prestartCoreThread();
    }
  }

To shut down cleanly, and before destroying the Esper engine instance, the sample code is:

  executor.shutdown();
  while (!executor.isTerminated()) {
    Thread.sleep(100);
  }

The next sample code goes into the IO or input thread(s) such as NIO mapped file, file channel, socket channel, or zmq / nanomsg etc., and submits a work unit to the queue:

  while (programAlive) {
    // deserialize event to POJO, Map, Array, etc.,
    // pass along an event type name when needed
    executor.execute(new Holder(myeventobject));
  }

You could periodically dump the spinCount variable to get an idea of queue depth. You can tune the size of the Executor's pool, and the size of the TimeUnit's of sleep used inside the rejectedExecution method, until you get 1) stable performance at highest level (determined by optimal number of threads in pool, 2) avoid wasting CPU in IO thread(s) (determined by optimal sleeping time between each attempt to re-queue rejected events to the thread pool).

Esper stream-level filtering is very well optimized, while filtering via the where-clause post any data windows is not optimized.

The same is true for named windows. If your application is only interested in a subset of named window data and such filters are not correlated to arriving events, place the filters into parenthesis after the named window name.

If your statement uses order by to order output events, consider removing order by unless your application does indeed require the events it receives to be ordered.

If your statement specifies group by but does not use aggregation functions, consider removing group by.

If your statement specifies group by but the filter criteria only allows one group, consider removing group by:

// Prefer:
select * from MarketData(symbol = 'GE') having sum(price) > 1000

// Don't use this since the filter specifies a single symbol:
select * from MarketData(symbol = 'GE') group by symbol having sum(price) > 1000

If your statement specifies the grouped data window std:groupwin but the window being grouped retains the same set of events regardless of grouping, remove std:groupwin:

// Prefer:
create window MarketDataWindow.win:keepall() as MarketDataEventType

// Don't use this, since keeping all events 
// or keeping all events per symbol is the same thing:
create window MarketDataWindow.std:groupwin(symbol).win:keepall() as MarketDataEventType

// Don't use this, since keeping the last 1-minute of events 
// or keeping 1-minute of events per symbol is the same thing:
create window MarketDataWindow.std:groupwin(symbol).win:time(1 min) as MarketDataEventType

It is not necessary to specify a data window for each stream.

// Prefer:
select * from MarketDataWindow

// Don't have a data window if just listening to events, prefer the above
select * from MarketDataWindow.std:lastevent()

If your statement specifies unique data window but the filter criteria only allows one unique criteria, consider removing the unique data window:

// Prefer:
select * from MarketDataWindow(symbol = 'GE').std:lastevent()

// Don't have a unique-key data window if your filter specifies a single value
select * from MarketDataWindow(symbol = 'GE').std:unique(symbol)

The EventPropertyGetter interface is useful for obtaining an event property value without property name table lookup given an EventBean instance that is of the same event type that the property getter was obtained from.

When compiling a statement, the EPStatement instance lets us know the EventType via the getEventType() method. From the EventType we can obtain EventPropertyGetter instances for named event properties.

To demonstrate, consider the following simple statement:

select symbol, avg(price) from Market group by symbol

After compiling the statement, obtain the EventType and pass the type to the listener:

EPStatement stmt = epService.getEPAdministrator().createEPL(stmtText);
MyGetterUpdateListener listener = new MyGetterUpdateListener(stmt.getEventType());

The listener can use the type to obtain fast getters for property values of events for the same type:

public class MyGetterUpdateListener implements StatementAwareUpdateListener {
    private final EventPropertyGetter symbolGetter;
    private final EventPropertyGetter avgPriceGetter;

    public MyGetterUpdateListener(EventType eventType) {
        symbolGetter = eventType.getGetter("symbol");
        avgPriceGetter = eventType.getGetter("avg(price)");
    }

Last, the update method can invoke the getters to obtain event property values:

    public void update(EventBean[] eventBeans, EventBean[] oldBeans, EPStatement epStatement, EPServiceProvider epServiceProvider) {
        String symbol = (String) symbolGetter.get(eventBeans[0]);
        long volume = (Long) volumeGetter.get(eventBeans[0]);
        // some more logic here
    }

If your application is not a multithreaded application, or you application is not sensitive to the order of delivery of result events to your application listeners, then consider disabling the delivery order guarantees the engine makes towards ordered delivery of results to listeners:

Configuration config = new Configuration();
config.getEngineDefaults().getThreading().setListenerDispatchPreserveOrder(false);

If your application is not a multithreaded application, or your application uses the insert into clause to make results of one statement available for further consuming statements but does not require ordered delivery of results from producing statements to consuming statements, you may disable delivery order guarantees between statements:

Configuration config = new Configuration();
config.getEngineDefaults().getThreading().setInsertIntoDispatchPreserveOrder(false);

Additional configuration options are available and described in the configuration section that specify timeout values and spin or thread context switching.

Esper logging will log the following informational message when guaranteed delivery order to listeners is enabled and spin lock times exceed the default or configured timeout : Spin wait timeout exceeded in listener dispatch. The respective message for delivery from insert into statements to consuming statements is Spin wait timeout exceeded in insert-into dispatch.

If your application sees messages that spin lock times are exceeded, your application has several options: First, disabling preserve order is an option. Second, ensure your listener does not perform (long-running) blocking operations before returning, for example by performing output event processing in a separate thread. Third, change the timeout value to a larger number to block longer without logging the message.

A context partition is associated with certain context partition state that consists of current aggregation values, partial pattern matches, data windows or other view state depending on whether your statement uses such constructs. When an engine receives events it updates context partition state under locking such that context partition state remains consistent under concurrent multi-threaded access.

For high-volume streams, the locking required to protected context partition state may slow down or introduce blocking for very high arrival rates of events that apply to the very same context partition and its state.

Your first choice should be to utilize a context that allows for multiple context partitions, such as the hash segmented context. The hash segmented context usually performs better compared to the keyed segmented context since in the keyed segmented context the engine must check whether a partition exists or must be created for a given key.

Your second choice is to split the statement into multiple statements that each perform part of the intended function or that each look for a certain subset of the high-arrival-rate stream. There is very little cost in terms of memory or CPU resources per statement, the engine can handle larger number of statements usually as efficiently as single statements.

For example, consider the following statement:

// less effective in a highly threaded environment 
select venue, ccyPair, side, sum(qty)
from CumulativePrice
where side='O'
group by venue, ccyPair, side

The engine protects state of each context partition by a separate lock for each context partition, as discussed in the API section. In highly threaded applications threads may block on a specific context partition. You would therefore want to use multiple context partitions.

Consider creating a keyed segmented context, for example:

create context MyContext partition by venue, ccyPair, side from CumulativePrice(side='O')

The keyed segmented context instructs the engine to employ a context partition per venue, ccyPair, side key combination. As locking is on the level of context partition, the locks taken by the engine are very fine grained allowing for highly concurrent processing.

The new statement that refers to the context as created above is:

context MyContext select venue, ccyPair, side, sum(qty) from CumulativePrice

For testing purposes or if your application controls concurrency, you may disable context partition locking, see Section 16.4.23.3, “Disable Locking”.

When joining streams the engine builds a product of the joined data windows based on the where clause. It analyzes the where clause at time of statement compilation and builds the appropriate indexes and query strategy. Avoid using expressions in the join where clause that require evaluation, such as user-defined functions or arithmatic expressions.

When joining streams and not providing a where clause, consider using the std:unique data window or std:lastevent data window to join only the last event or the last event per unique key(s) of each stream.

The sample query below can produce up to 5,000 rows when both data windows are filled and an event arrives for either stream:

// Avoid joins between streams with data windows without where-clause
select * from StreamA.win:length(100), StreamB.win:length(50)

Consider using a subquery, consider using separate statements with insert-into and consider providing a where clause to limit the product of rows.

Below examples show different approaches, that are not semantically equivalent, assuming that an MyEvent is defined with the properties symbol and value:

// Replace the following statement as it may not perform well
select a.symbol, avg(a.value), avg(b.value) 
from MyEvent.win:length(100) a, MyEvent.win:length(50) b

// Join with where-clause
select a.symbol, avg(a.value), avg(b.value) 
from MyEvent.win:length(100) a, MyEvent.win:length(50) b 
where a.symbol = b.symbol

// Unidirectional join with where-clause
select a.symbol, avg(b.value) 
from MyEvent unidirectional, MyEvent.win:length(50) b 
where a.symbol = b.symbol

// Subquery
select 
  (select avg(value) from MyEvent.win:length(100)) as avgA, 
  (select avg(value) from MyEvent.win:length(50)) as avgB,
  a.symbol
from MyEvent

// Since streams cost almost nothing, use insert-into to populate and a unidirectional join 
insert into StreamAvgA select symbol, avg(value) as avgA from MyEvent.win:length(100)
insert into StreamAvgB select symbol, avg(value) as avgB from MyEvent.win:length(50)
select a.symbol, avgA, avgB from StreamAvgA unidirectional, StreamAvgB.std:unique(symbol) b
where a.symbol = b.symbol

A join is multidirectionally evaluated: When an event of any of the streams participating in the join arrive, the join gets evaluated, unless using the unidirectional keyword. Consider using a subquery instead when evaluation only needs to take place when a certain event arrives:

// Rewrite this join since we don't need to join when a LoginSucceededWindow arrives
// Also rewrite because the account number always is the value 123.
select * from LoginSucceededWindow as l, WithdrawalWindow as w
where w.accountNumber = '123' and w.accountNumber = l.accountNumber

// Rewritten as a subquery, 
select *, (select * from LoginSucceededWindow where accountNumber=’123’) 
from WithdrawalWindow(accountNumber=’123’) as w

The every and repeat operators in patterns control the number of sub-expressions that are active. Each sub-expression can consume memory as it may retain, depending on the use of tags in the pattern, the matching events. A large number of active sub-expressions can reduce performance or lead to out-of-memory errors.

During the design of the pattern statement consider the use of timer:within to reduce the amount of time a sub-expression lives, or consider the not operator to end a sub-expression.

The examples herein assume an AEvent and a BEvent event type that have an id property that may correlate between arriving events of the two event types.

In the following sample pattern the engine starts, for each arriving AEvent, a new pattern sub-expression looking for a matching BEvent. Since the AEvent is tagged with a the engine retains each AEvent until a match is found for delivery to listeners or subscribers:

every a=AEvent -> b=BEvent(b.id = a.id)

One way to end a sub-expression is to attach a time how long it may be active.

The next statement ends sub-expressions looking for a matching BEvent 10 seconds after arrival of the AEvent event that started the sub-expression:

every a=AEvent -> (b=BEvent(b.id = a.id) where timer:within(10 sec))

A second way to end a sub-expression is to use the not operator. You can use the not operator together with the and operator to end a sub-expression when a certain event arrives.

The next statement ends sub-expressions looking for a matching BEvent when, in the order of arrival, the next BEvent that arrives after the AEvent event that started the sub-expression does not match the id of the AEvent:

every a=AEvent -> (b=BEvent(b.id = a.id) and not BEvent(b.id != a.id))

The every-distinct operator can be used to keep one sub-expression alive per one or more keys. The next pattern demonstrates an alternative to every-distinct. It ends sub-expressions looking for a matching BEvent when an AEvent arrives that matches the id of the AEvent that started the sub-expression:

every a=AEvent -> (b=BEvent(b.id = a.id) and not AEvent(b.id = a.id))

This section describes common sources of out-of-memory problems.

If using the keep-all data window please consider the information above. If using pattern statements please consider pattern sub-expression instantiation and lifetime as discussed prior to this section.

When using the group-by clause or std:groupwin grouped data windows please consider the hints as described below. Make sure your grouping criteria are fields that don't have an unlimited number of possible values or specify hints otherwise.

The std:unique unique data window can also be a source for error. If your uniqueness criteria include a field which is never unique the memory use of the data window can grow, unless your application deletes events.

When using the every-distinct pattern construct parameterized by distinct value expressions that generate an unlimited number of distinct values, consider specifying a time period as part of the parameters to indicate to the pattern engine how long a distinct value should be considered.

In a match-recognize pattern consider limiting the number of optional events if optional events are part of the data reported in the measures clause. Also when using the partition clause, if your partitioning criteria include a field which is never unique the memory use of the match-recognize pattern engine can grow.

A further source of memory use is when your application creates new statements but fails to destroy created statements when they are no longer needed.

In your application design you may also want to be conscious when the application listener or subscriber objects retain output data.

An engine instance, uniquely identified by an engine URI is a relatively heavyweight object. Optimally your application allocates only one or a few engine instances per JVM. A statement instance is associated to one engine instance, is uniquely identified by a statement name and is a medium weight object. We have seen applications allocate 100,000 statements easily. A statement's context partition instance is associated to one statement, is uniquely identified by a context partition id and is a light weight object. We have seen applications allocate 5000 context partitions for 100 statements easily, i.e. 5,000,000 context partitions. An aggregation row, data window row, pattern etc. is associated to a statement context partition and is a very lightweight object itself.

The prev, prevwindow and prevtail functions access a data window directly. The engine does not need to maintain a separate data structure and grouping is based on the use of the std:groupwin grouped data window. Compare this to the use of event aggregation functions such as first, window and last which group according to the group by clause. If your statement utilizes both together consider reformulating to use prev instead.

Performance will also depend on your JVM (Sun HotSpot, BEA JRockit, IBM J9), your operating system and your hardware. A JVM performance index such as specJBB at spec.org can be used. For memory intensive statement, you may want to consider 64bit architecture that can address more than 2GB or 3GB of memory, although a 64bit JVM usually comes with a slow performance penalty due to more complex pointer address management.

The choice of JVM, OS and hardware depends on a number of factors and therefore a definite suggestion is hard to make. The choice depends on the number of statements, and number of threads. A larger number of threads would benefit of more CPU and cores. If you have very low latency requirements, you should consider getting more GHz per core, and possibly soft real-time JVM to enforce GC determinism at the JVM level, or even consider dedicated hardware such as Azul. If your statements utilize large data windows, more RAM and heap space will be utilized hence you should clearly plan and account for that and possibly consider 64bit architectures or consider EsperHA.

The number and type of statements is a factor that cannot be generically accounted for. The benchmark kit can help test out some requirements and establish baselines, and for more complex use cases a simulation or proof of concept would certainly works best. EsperTech' experts can be available to help write interfaces in a consulting relationship.

The @Hint annotation provides a single keyword or a comma-separated list of keywords that provide instructions to the engine towards statement execution that affect runtime performance and memory-use of statements. Also see Section 5.2.7.8, “@Hint”.

The query planning in general is described in Section 21.2.33, “Notes on Query Planning”.

The hint for influencing query planning expression analysis is described at Section 21.2.34, “Query Planning Expression Analysis Hints”.

The hint for influencing query planning index choice is described at Section 21.2.35, “Query Planning Index Hints”.

Further hints, also related to query planning, for use with joins, outer joins, unidirectional joins, relational and non-relational joins are described in Section 5.12.5, “Hints Related to Joins”.

The hint for use with group by to specify how state for groups is reclaimed is described in Section 5.6.2.1, “Hints Pertaining to Group-By” and Section 13.3.2, “Grouped Data Window (std:groupwin)”.

The hint for use with group by to specify aggregation state reclaim for unbound streams and timestamp groups is described in Section 5.6.2.1, “Hints Pertaining to Group-By”.

The hint for use with match_recognize to specify iterate-only is described in Section 8.4.7, “Eliminating Duplicate Matches”.

To tune subquery performance when your subquery selects from a named window, consider the hints discussed in Section 5.11.8, “Hints Related to Subqueries”.

The @NoLock hint to remove context partition locking (also read caution note) is described at Section 15.7, “Engine Threading and Concurrency”.

The hint to control expansion of filter expressions, further described at Section 16.4.23.6, “Filter Service Max Filter Width”.

Object-array events offer the best read access performance for access to event property values. In addition, object-array events use much less memory then Map-type events. They also offer the best write access performance.

A comparison of different event representations is in Section 2.13, “Comparing Event Representations”.

First, we recommend that your application sends object-array events into the engine, instead of Map-type events. See Section 2.7, “Object-array (Object[]) Events” for more information.

Second, we recommend that your application sets the engine-wide configuration of the default event representation to object array, as described in Section 16.4.11.1, “Default Event Representation”. Alternatively you can use the @EventRepresentation(array=true) annotation with individual statements.

Query planning takes place for subqueries, joins (any type), named window and table on-actions (on-select, on-merge, on-insert, on-update, on-select) and fire-and-forget queries. Query planning affects query execution speed. Enable query plan logging to output query plan information.

For query planning, the engine draws information from:

The engine prefers unique indexes over non-unique indexes.

The engine prefers hash-based lookups (equals) and combination hash-btree lookups (equals and relational-operator or range) over btree lookups (relational-operator or range) over in-keyword (single and multi-index) lookup plans. This behavior can be controlled by hints that we discuss next.

The expression analysis hints impact query planning for any statement and fire-and-forget query that performs a join or subquery. They also impact named window and table on-action statements.

This hint instructs the engine which expressions, operators or streams should be excluded and therefore not considered for query planning. The hint applies to the where-clause and, for outer joins, to the on-clause when present.

The hint takes a single expression as its sole parameter, which is placed in parenthesis. The expression must return a boolean value.

When the provided expression returns true for a given combination, that combination will not be considered for the query plan. A combination consists of a from-stream (name or number), a to-stream (name or number), an operator (i.e. equals, relational, in-keyword) and a set of expressions.


Consider two event types A and B. Event type A has a property aprop and event type B has a property bprop. Let's assume A and B are related by aprop and bprop.

An inner join of all A and B events might look like this:

select * from A.win:keepall() as a, B.win:keepall() as b where aprop = bprop

In the default query plan, when an A event comes in, the engine obtains the value of aprop and performs an index lookup against bprop values to obtain matching B events. Vice versa, when a B event comes in, the engine obtains the value of bprop and performs an index lookup against aprop values to obtain matching A events.

The engine evaluates the hint expression for each combination. The table below outlines the two rows provided to the hint expression:


The following EPL statement with hint causes the analyzer to exclude all combinations since the expression passed in always returns true, in effect causing the query planner to always execute the statement as a full table scan.

@hint('exclude_plan(true)')
select * from A.win:keepall() as a, B.win:keepall() as b where aprop = bprop

This hint instructs the engine to ignore all equals-operators for query planning:

@hint('exclude_plan(opname="equals")') select ....

The next hint instructs the engine to ignore the equals-operator for the direction of lookup from A to B:

@hint('exclude_plan(opname="equals" and from_streamname="a")') select ....

Conversely, this hint instructs the engine to ignore the equals-operator for the direction of lookup from B to A:

@hint('exclude_plan(opname="equals" and from_streamname="b")') select ....

Use the exprs array of expression texts to exclude specific expressions:

@hint('exclude_plan(exprs[0]="aprop")') select ....

For subqueries the stream number zero is the subquery from-clause itself and 1 to N are the enclosing statement's from-clause streams. For named window and table on-action statements the stream number zero is the named window or table and stream number 1 refers to the triggering pattern or event.

To specify multiple expressions, please specify multiple hints. The engine excludes a specific combination when any of the hint expressions returns true.

To inspect values passed to the hint expression, please enable query plan logging. To inspect expression evaluation, please use @Audit.

Currently index hints are only supported for the following types of statements:

For the above statements, you may dictate to the engine which explicit index (created via create index syntax) to use.

Specify the name of the explicit index in parentheses following @Hint and the index literal.

The following example instructs the engine to use the UserProfileIndex if possible:

@Hint('index(UserProfileIndex)')

Add the literal bust to instruct the engine to use the index, or if the engine cannot use the index fail query planning with an exception and therefore fail statement creation.

The following example instructs the engine to use the UserProfileIndex if possible or fail with an exception if the index cannot be used:

@Hint('index(UserProfileIndex, bust)')

Multiple indexes can be listed separated by comma (,).

The next example instructs the engine to consider the UserProfileIndex and the SessionIndex or fail with an exception if either index cannot be used:

@Hint('index(UserProfileIndex, SessionIndex, bust)')

The literal explicit can be added to instruct the engine to use only explicitly created indexes.

The final example instructs the engine to consider any explicitly create index or fail with an exception if any of the explicitly created indexes cannot be used:

@Hint('index(explicit, bust)')

Whether aggregations of named window rows are computed incrementally or are recomputed from scratch depends on the type of query.

When the engine computes aggregation values incrementally, meaning it continuously updates the aggregation value as events enter and leave a named window, it means that the engine internally subscribes to named window updates and applies these updates as they occur. For some applications this is the desired behavior.

For some applications re-computing aggregation values from scratch when a certain condition occurs, for example when a triggering event arrives or time passes, is beneficial. Re-computing an aggregation can be less expensive if the number of rows to consider is small and/or when the triggering event or time condition triggers infrequently.

The next paragraph assumes that a named window has been created to hold some historical financial data per symbol and minute:

create window HistoricalWindow.win:keepall() as (symbol string, int minute, double price)
insert into HistoricalWindow select symbol, minute, price from HistoricalTick

For statements that simply select from a named window (excludes on-select) the engine computes aggregation values incrementally, continuously updating the aggregation, as events enter and leave the named window.

For example, the below statement updates the total price incrementally as events enter and leave the named window. If events in the named window already exist at the time the statement gets created, the total price gets pre-computed once when the statement gets created and incrementally updated when events enter and leave the named window:

select sum(price) from HistoricalWindow(symbol='GE')

The same is true for uncorrelated subqueries. For statements that sub-select from a named window, the engine computes aggregation values incrementally, continuously updating the aggregation, as events enter and leave the named window. This is only true for uncorrelated subqueries that don't have a where-clause.

For example, the below statement updates the total price incrementally as events enter and leave the named window. If events in the named window already exist at the time the statement gets created, the total price gets pre-computed once when the statement gets created and incrementally updated when events enter and leave the named window:

// Output GE symbol total price, incrementally computed
// Outputs every 15 minutes on the hour.
select (sum(price) from HistoricalWindow(symbol='GE')) 
from pattern [every timer:at(0, 15, 30, 45), *, *, *, *, 0)]

If instead your application uses on-select or a correlated subquery, the engine recomputes aggregation values from scratch every time the triggering event fires.

For example, the below statement does not incrementally compute the total price (use a plain select or subselect as above instead). Instead the engine computes the total price from scratch based on the where-clause and matching rows:

// Output GE symbol total price (recomputed from scratch) every 15 minutes on the hour
on pattern [every timer:at(0, 15, 30, 45), *, *, *, *, 0)]
select sum(price) from HistoricalWindow where symbol='GE'

Unidirectional joins against named windows also do not incrementally compute aggregation values.

Joins and outer joins, that are not unidirectional, compute aggregation values incrementally.

The benchmark application is basically an Esper event server build with Esper that listens to remote clients over TCP. Remote clients send MarketData(ticker, price, volume) streams to the event server. The Esper event server is started with 1000 statements of one single kind (unless otherwise written), with one statement per ticker symbol, unless the statement kind does not depend on the symbol. The statement prototype is provided along the results with a '$' instead of the actual ticker symbol value. The Esper event server is entirely multithreaded and can leverage the full power of 32bit or 64bit underlying hardware multi-processor multi-core architecture.

The kit also prints out when starting up the event size and the theoretical maximal throughput you can get on a 100 Mbit/s and 1 Gbit/s network. Keep in mind a 100 Mbit/s network will be overloaded at about 400 000 event/s when using our kit despite the small size of events.

Results are posted on our Wiki page at Performance Wiki. Reported results do not represent best ever obtained results. Reported results may help you better compare Esper to other solutions (for latency, throughput and CPU utilization) and also assess your target hardware and JVMs.

The Esper event server, client and statement prototypes are provided in the source repository esper/trunk/examples/benchmark/. Refer to http://xircles.codehaus.org/projects/esper/repo for source access.

A built is provided for convenience (without sources) as an attachment to the Wiki page at Performance Wiki. It contains Ant script to start client, server in simulation mode and server. For real measurement we advise to start from a shell script (because Ant is pipelining stdout/stderr when you invoke a JVM from Ant - which is costly). Sample scripts are provided for you to edit and customize.

If you use the kit you should:

  1. Choose the statement you want to benchmark, add it to etc/statements.properties under your own KEY and use the -mode KEY when you start the Esper event server.

  2. Prepare your runServer.sh/runServer.cmd and runClient.sh/runclient.cmd scripts. You'll need to drop required jar libraries in lib/ , make sure the classpath is configured in those script to include build and etc . The required libraries are Esper (any compatible version, we have tested started with Esper 1.7.0) and its dependencies as in the sample below (with Esper 2.1) :

    # classpath on Unix/Linux (on one single line)
    etc:build:lib/esper-5.2.0.jar:lib/commons-logging-1.1.3.jar:lib/cglib-nodep-3.1.jar
       :lib/antlr-runtime-4.1.jar:lib/log4j-1.2.17.jar
    @rem  classpath on Windows (on one single line)
    etc;build;lib\esper-5.2.0.jar;lib\commons-logging-1.1.3.jar;lib\cglib-nodep-3.1.jar
       ;lib\antlr-runtime-4.1.jar;lib\log4j-1.2.17.jar

    Note that ./etc and ./build have to be in the classpath. At that stage you should also start to set min and max JVM heap. A good start is 1GB as in -Xms1g -Xmx1g

  3. Write the statement you want to benchmark given that client will send a stream MarketData(String ticker, int volume, double price), add it to etc/statements.properties under your own KEY and use the -mode KEY when you start the Esper event server. Use '$' in the statement to create a prototype. For every symbol, a statement will get registered with all '$' replaced by the actual symbol value (f.e. 'GOOG')

  4. Ensure client and server are using the same -Desper.benchmark.symbol=1000 value. This sets the number of symbol to use (thus may set the number of statement if you are using a statement prototype, and governs how MarketData event are represented over the network. Basically all events will have the same size over the network to ensure predictability and will be ranging between S0AA and S999A if you use 1000 as a value here (prefix with S and padded with A up to a fixed length string. Volume and price attributes will be randomized.

  5. By default the benchmark registers a subscriber to the statement(s). Use -Desper.benchmark.ul to use an UpdateListener instead. Note that the subscriber contains suitable update(..) methods for the default proposed statement in the etc/statements.properties file but might not be suitable if you change statements due to the strong binding with statement results. Refer to Section 15.3.2, “Receiving Statement Results”.

  6. Establish a performance baseline in simulation mode (without clients). Use the -rate 1x5000 option to simulate one client (one thread) sending 5000 evt/s. You can ramp up both the number of client simulated thread and their emission rate to maximize CPU utilization. The right number should mimic the client emission rate you will use in the client/server benchmark and should thus be consistent with what your client machine and network will be able to send. On small hardware, having a lot of thread with slow rate will not help getting high throughput in this simulation mode.

  7. Do performance runs with client/server mode. Remove the -rate NxM option from the runServer script or Ant task. Start the server with -help to display the possible server options (listen port, statistics, fan out options etc). On the remote machine, start one or more client. Use -help to display the possible client options (remote port, host, emission rate). The client will output the actual number of event it is sending to the server. If the server gets overloaded (or if you turned on -queue options on the server) the client will likely not be able to reach its target rate.

    Usually you will get better performance by using server side -queue -1 option so as to have each client connection handled by a single thread pipeline. If you change to 0 or more, there will be intermediate structures to pass the event stream in an asynchronous fashion. This will increase context switching, although if you are using many clients, or are using the -sleep xxx (xxx in milliseconds) to simulate a listener delay you may get better performance.

    The most important server side option is -stat xxx (xxx in seconds) to print out throughput and latency statistics aggregated over the last xxx seconds (and reset every time). It will produce both internal Esper latency (in nanosecond) and also end to end latency (in millisecond, including network time). If you are measuring end to end latency you should make sure your server and client machine(s) are having the same time with f.e. ntpd with a good enough precision. The stat format is like:

    ---Stats - engine (unit: ns)
      Avg: 2528 #4101107
            0 <    5000:  97.01%  97.01% #3978672
         5000 <   10000:   2.60%  99.62% #106669
        10000 <   15000:   0.35%  99.97% #14337
        15000 <   20000:   0.02%  99.99% #971
        20000 <   25000:   0.00%  99.99% #177
        25000 <   50000:   0.00% 100.00% #89
        50000 <  100000:   0.00% 100.00% #41
       100000 <  500000:   0.00% 100.00% #120
       500000 < 1000000:   0.00% 100.00% #2
      1000000 < 2500000:   0.00% 100.00% #7
      2500000 < 5000000:   0.00% 100.00% #5
      5000000 <    more:   0.00% 100.00% #18
    ---Stats - endToEnd (unit: ms)
      Avg: -2704829444341073400 #4101609
            0 <       1:  75.01%  75.01% #3076609
            1 <       5:   0.00%  75.01% #0
            5 <      10:   0.00%  75.01% #0
           10 <      50:   0.00%  75.01% #0
           50 <     100:   0.00%  75.01% #0
          100 <     250:   0.00%  75.01% #0
          250 <     500:   0.00%  75.01% #0
          500 <    1000:   0.00%  75.01% #0
         1000 <    more:  24.99% 100.00% #1025000
    Throughput 412503 (active 0 pending 0 cnx 4)

    This one reads as:

    "Throughput is 412 503 event/s with 4 client connected. No -queue options 
    was used thus no event is pending at the time the statistics are printed. 
    Esper latency average is at 2528 ns (that is 2.5 us) for 4 101 107 events 
    (which means we have 10 seconds stats here). Less than 10us latency 
    was achieved for 106 669 events that is 99.62%. Latency between 5us 
    and 10us was achieved for those 2.60% of all the events in the interval."
    
    "End to end latency was ... in this case likely due to client clock difference
    we ended up with unusable end to end statistics."

    Consider the second output paragraph on end-to-end latency:

    ---Stats - endToEnd (unit: ms)
      Avg: 15 #863396
            0 <       1:   0.75%   0.75% #6434
            1 <       5:   0.99%   1.74% #8552
            5 <      10:   2.12%   3.85% #18269
           10 <      50:  91.27%  95.13% #788062
           50 <     100:   0.10%  95.22% #827
          100 <     250:   4.36%  99.58% #37634
          250 <     500:   0.42% 100.00% #3618
          500 <    1000:   0.00% 100.00% #0
         1000 <    more:   0.00% 100.00% #0

    This would read:

    "End to end latency average is at 15 milliseconds for the 863 396 events 
    considered for this statistic report. 95.13% ie 788 062 events were handled 
    (end to end) below 50ms, and 91.27% were handled between 10ms and 50ms."