Starting with Esper version 8 the compiler generates byte code and the runtime loads the byte code and hosts the execution. The compiler of version 8 dynamically creates a class to represent the aggregation row and that speeds up processing even further as compared to version 7.x. This technique leads to higher performance as it eliminates branches and virtual calls and allows the JVM's JIT to perform further optimizations. In version 7.x we had already made optimizations and generated some byte code for expression evaluation, but version 8 is all byte code.

We measured a query and found Esper version 8.1.0 the performance reaches about 7.1 million events per second. This compares to about 6.1 million events per second for Esper version 7.1.0 and about 4 million events per second for Esper version 6.1.0.

The measurement is shown in the below table. Read on for the explanation and code.

The EPL query is this one.

  select p0, sum(p1) from SampleEvent group by p0 having sum(p1) >= 100000000

This query totals up a field and compares it against 100 million. The code processes 100 million events.

The test setup is JVM 1.8.0_121, heap 256M (-Xms256m -Xmx256m), Intel i7 7700HQ@2.80 GHz, 16 GB system memory.

The complete test code is not that much. It's safe to measure total time since sendEvent is not an asynchronous operation. CPU utilization is 1 CPU to 100%. This test is not written to use multiple threads.

The below code is for testing against version 8.

String epl = "@public @buseventtype create objectarray schema SampleEvent (p0 string, p1 long);\n" +
    "select p0, sum(p1) from SampleEvent group by p0 having sum(p1) >= 100000000;\n";
EPCompiled compiled = EPCompilerProvider.getCompiler().compile(epl, null);

EPRuntime runtime = EPRuntimeProvider.getDefaultRuntime();
EPDeployment deployment = runtime.getDeploymentService().deploy(compiled);

SupportUpdateListener listener = new SupportUpdateListener();
deployment.getStatements()[1].addListener(listener);
EventSender sender = runtime.getEventService().getEventSender("SampleEvent");

// warm-up
for (int i = 0; i < 100000; i++) {
    sender.sendEvent(new Object[]{"G1", 1L});
}

// measure
long start = System.currentTimeMillis();
for (int i = 0; i < 100000000; i++) {
    sender.sendEvent(new Object[]{"G2", 1L});
}
long delta = System.currentTimeMillis() - start;

System.out.println("Delta " + delta + " milliseconds at " + (100000000/delta)*1000 + " per second");
System.out.println("Listener received group: " + listener.assertOneGetNewAndReset().get("p0"));

The below code is for testing against version 7.x and version 6.x (there were API changes between version 8.x and version 7.x).

EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
epService.getEPAdministrator().createEPL("create objectarray schema SampleEvent (p0 string, p1 long)");
EPStatement stmt = epService.getEPAdministrator().createEPL("select p0, sum(p1) from SampleEvent group by p0 having sum(p1) >= 100000000");
SupportUpdateListener listener = new SupportUpdateListener();
stmt.addListener(listener);
EventSender sender = epService.getEPRuntime().getEventSender("SampleEvent");

// warm-up
for (int i = 0; i < 100000; i++) {
    sender.sendEvent(new Object[]{"G1", 1L});
}

// measure
long start = System.currentTimeMillis();
for (int i = 0; i < 100000000; i++) {
    sender.sendEvent(new Object[]{"G2", 1L});
}
long delta = System.currentTimeMillis() - start;

System.out.println("Delta: " + delta);
System.out.println("Listener received group: " + listener.assertOneGetNewAndReset().get("p0"));