bernhardttom@yahoo.com

/bernhardttom@yahoo.com

About bernhardttom@yahoo.com

This author has not yet filled in any details.
So far bernhardttom@yahoo.com has created 4 blog entries.

On Lateness, Event Time vs. Processing Time and Watermarks, Order-Of-Arrival, Retraction

EPL is powerful, but unfortunately this also means that when you design EPL you need to know how time passes and what event ordering is available or not. You can run Esper in an environment that doesn't pass time and that has completely disordered events.

There are many facilities that EPL offers that do not at all depend on event order or time passing. Some examples are...

  • A join select * from OrderEvent#unique(orderId) as ord, AccountEvent#unique(accountId) as act where ord.accountId = act.accountId  joins the last order event per order id with the last account event per account id
  • An aggregation select count(*), ip_address from PortScanEvent group by ip_address counts the port scans per ip address
  • A complex filter create context DynamicFilterContext initiated by CriteriaEvent as criteria and context DynamicFilterContext select * from News(language IN (context.criteria.languageIds) AND companies.anyOf(x=> x.company.id IN (context.criteria.companyIds)) allows you to dynamically filter news based on a criteria event (this sort of thing makes use of Esper filter indexes to fast-match)

Many facilities that EPL offers however do depend on time passing and do depend on event ordering. For example...

  • A pattern timer:interval(1 minute) and not SensorEvent alerts when an interval of one minute passes during which Esper receives no sensor event
  • A pattern match-recognize ( pattern (A B) ) matches when there is an event A that is immediately followed by an event B

When you run Esper, you need to look at your requirements and decide for each requirement whether you need to wait for late arriving events, and for how long to wait, and what to do for events that arrive later than that. You need to determine if and how you want to advance time. You need to determine whether to reorder events. Esper has a time-order window that can help with all that however you still need to define these parameters.

The pattern timer:interval(1 minute) and not SensorEvent detects the absence of the event with near-zero latency by itself. The actual latency depends on how time moves forward. For example your design may use a watermark and move its time based on allowed lateness. The 1 minute interval can span any amount of actual natural time, and so can the latency of detection when using watermarks.

The pattern timer:interval(1 minute) and not SensorEvent outputs a result after 1 minute without a sensor event. Assume that at a later time we do discover that a sensor event has come in. This would mean the output needs to be retracted. Retraction is not something that Esper handles currently.

6+ Million Events-Per-Second

There is a big change and a smaller targeted change in release 7 for increasing performance. In my test release 7 hits 6.29 million of events per second on a decent CPU.

The big change is that release 7 employs byte code generation. This technique leads to higher performance as it eliminates branches and virtual calls and allows the JVM's JIT to perform further optimizations.

The small change that I'm talking about is targeted at the specific case of fully-aggregated and grouped queries. These now have a shortcut evaluation path that kicks in when computing for a single event rather than any number of events in the current processing unit.

The changes together bring a measurable improvement FOR THIS PARTICULAR TEST. This blog post is testing a simple EPL query that detects when a total hits a given number for a given group.

The measurement is shown in the below table. Read on for the explanation and code.

The EPL query is this one.

  select p0, sum(p1) from SampleEvent group by p0 having sum(p1) >= 100000000

This query totals up a field and compares it against 100 million. The code processes 100 million events. While release 6.1.0 turns out more than 4 million events per second the speed of release 7 comes in at more than 6 million events per second on the test machine.

The test setup is JVM 1.8.0_121, heap 256M (-Xms256m -Xmx256m), Intel i7 7700HQ@2.80 GHz, 16 GB system memory.

The complete test code is not that much. It's safe to measure total time since sendEvent is not an asynchronous operation. CPU utilization is 1 CPU to 100%. This test is not written to use multiple threads.

EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
epService.getEPAdministrator().createEPL("create objectarray schema SampleEvent (p0 string, p1 long)");
EPStatement stmt = epService.getEPAdministrator().createEPL("select p0, sum(p1) from SampleEvent group by p0 having sum(p1) >= 100000000");
SupportUpdateListener listener = new SupportUpdateListener();
stmt.addListener(listener);
EventSender sender = epService.getEPRuntime().getEventSender("SampleEvent");

// warm-up
for (int i = 0; i < 100000; i++) {
    sender.sendEvent(new Object[]{"G1", 1L});
}

// measure
long start = System.currentTimeMillis();
for (int i = 0; i < 100000000; i++) {
    sender.sendEvent(new Object[]{"G2", 1L});
}
long delta = System.currentTimeMillis() - start;

System.out.println("Delta: " + delta);
System.out.println("Listener received group: " + listener.assertOneGetNewAndReset().get("p0"));
2017-10-30T16:40:24+00:00 Performance|

Architecture Diagram

This diagram organizes the various components that are needed to fulfill event and time analysis requirements. The diagram displays the components in levels. The positioning of a component helps describe the role that the component plays in the logical architecture. For example, Filter Service is a component that is used by the EPL pattern engine, and in turn relies on Expression Evaluation and the Type System.

Like most every diagram, don't take the diagram literally. It's merely meant as a kind of overview.

2017-10-30T16:36:46+00:00 Architecture|

On Windows

This post explores what the term Window means.

Wikipedia under the topic of Data Stream Management System lists a nice definition of Window:

Instead of using synopses to compress the characteristics of the whole data streams, window techniques only look on a portion of the data. This approach is motivated by the idea that only the most recent data are relevant. Therefore, a window continuously cuts out a part of the data stream, e.g. the last ten data stream elements, and only considers these elements during the processing. There are different kinds of such windows like sliding windows that are similar to FIFO lists or tumbling windows that cut out disjoint parts. Furthermore, the windows can also be differentiated into element-based windows, e.g., to consider the last ten elements, or time-based windows, e.g., to consider the last ten seconds of data.

In Esper we have...

  • Data is the events that are arriving into Esper
  • A window that considers the last 10 elements is #length(10), aka. length window
  • A window that considers the last 10 seconds of data is #time(10), aka. time window.

The smallest unit of change to a window is an individual event. A new event goes inside the window. The old event escapes the window. And thus events come and go. When such a change happens Esper determines if this change is meaningful. It does that by incrementally updating aggregations and match-recognize patterns each time an event comes and goes. When for example a query compares an aggregation against a threshold value it indicates this meaningful change to the application.

---> Esper evaluates windows continuously and incrementally on the level of individual events.

I have looked up Apache Flink which has a write-up on windows among its documentation in Application Development->Streaming->Operators->Windows. In Flink windows are at the heart of processing infinite streams. Windows split the stream into “buckets” of finite size, over which we can apply computations.. Flink forms a window and only when the window is completely formed does it apply a computation and then form a new window, unless I'm mistaken. This sounds a lot like batch processing to me. But batch != window.

In Esper a window can be an arbitrary subset of events. Here are two examples.

  • #length(10)#time(10) considers the last 10 elements that are not older than 10 seconds
  • create window MyWindow#keepall with on-merge, to insert and remove events according to any criteria

Other systems seem to form a window only from the data that arrives next to each other. It seems impossible in other systems to form a window across arbitrary data. They seem to require inserting into a table of some kind as I understand.

The term window in Esper means subset of events and in some systems means batch-delineation.