The aggregation function “avg” returns the average of a numeric value. The runtime maintains a count and a total for each average that is must compute.

To illustrate, imagine events that have 10 numeric values:

create objectarray schema SampleEvent (key int, v0 int, v1 int, v2 int, v3 int, v4 int, v5 int, v6 int, v7 int, v8 int, v9 int);

The next EPL outputs the average of each value considering all events that arrives so far:

select key, avg(v0), avg(v1), avg(v2), avg(v3), avg(v4), avg(v5), avg(v6), avg(v7), avg(v8), avg(v9) from SampleEvent group by key;

In version 8 of Esper the compiler generates a class that represents the 10 averages. This class becomes visible when we turn on logging of generated classes or we have the compiler output the code into a separate directory (see doc).

The Esper 8 compiler generates a class similar to this:

static class AggRow implements AggregationRow {
  long cnt0;
  int sum0;
  long cnt1;
  int sum1;
  long cnt2;
  int sum2;
  long cnt3;
  int sum3;
  long cnt4;
  int sum4;
  long cnt5;
  int sum5;
  long cnt6;
  int sum6;
  long cnt7;
  int sum7;
  long cnt8;
  int sum8;
  long cnt9;
  int sum9;
  // leaving methods away for now
}

The compiler also generates code to update the averages from the events coming in and going out, and it generates code to query the averages. All this is used internally by the runtime.

The reason that the compiler generates the class it is that by having all aggregation state next to each other in fields, without any additional objects that are allocated, is optimal for heap memory use. It also has performance advantages when adding and removing events and querying as the generated code eliminates virtual calls and down-casts and crossing object boundaries that would otherwise be necessary.

We tested memory use and performance adding 10 million rows. The test setup is JVM 1.8.0_192, heap 4G (-Xms4g –Xmx4g), Intel i7 7700HQ@2.80 GHz, 16 GB system memory. The code for Esper 8.1 and Esper 7.1 is below. Measuring memory use was done with jmap, jconsole and a memory measuring agent (Jamm, multiple for comparison).

Here is the table comparing the versions:

Esper Version       Memory in GB       Load Time in Milliseconds       
(10 million events)
Version 8.11.91 GB7300 msec
Version 7.11.99 GB8327 msec

The Esper 8.1 code is:

String epl = "@public @buseventtype create objectarray schema SampleEvent (key int, v0 int, v1 int, v2 int, v3 int, v4 int, v5 int, v6 int, v7 int, v8 int, v9 int);\n" +
    "select key, avg(v0), avg(v1), avg(v2), avg(v3), avg(v4), avg(v5), avg(v6), avg(v7), avg(v8), avg(v9) from SampleEvent group by key;\n";
EPCompiled compiled = EPCompilerProvider.getCompiler().compile(epl, null);

EPRuntime runtime = EPRuntimeProvider.getDefaultRuntime();
EPDeployment deployment = runtime.getDeploymentService().deploy(compiled);

// we add a listener since realistically there would be one
deployment.getStatements()[0].addListener((ne, oe, stmr, r) -> {});

EventSender sender = runtime.getEventService().getEventSender("SampleEvent");

// load events
long start = System.currentTimeMillis();
for (int i = 0; i < 10000000; i++) {
    sender.sendEvent(new Object[]{i, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1});
}
long delta = System.currentTimeMillis() - start;
System.out.println("Loaded groups, done in " + delta + " milliseconds");

The Esper 7.1 code is:

String epl = "create objectarray schema SampleEvent (key int, v0 int, v1 int, v2 int, v3 int, v4 int, v5 int, v6 int, v7 int, v8 int, v9 int);\n" +
    "@name('out') select key, avg(v0), avg(v1), avg(v2), avg(v3), avg(v4), avg(v5), avg(v6), avg(v7), avg(v8), avg(v9) from SampleEvent group by key;\n";
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
epService.getEPAdministrator().getDeploymentAdmin().parseDeploy(epl);

// we add a listener since realistically there would be one
epService.getEPAdministrator().getStatement("out").addListener((ne, oe) -> {});

EventSender sender = epService.getEPRuntime().getEventSender("SampleEvent");

// load events
long start = System.currentTimeMillis();
for (int i = 0; i < 10000000; i++) {
    sender.sendEvent(new Object[]{i, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1});
}
long delta = System.currentTimeMillis() - start;
System.out.println("Loaded groups, done in " + delta + " milliseconds");