What is Complex Event Processing (and what is not)

On Lateness, Event Time vs. Processing Time and Watermarks, Order-Of-Arrival, Retraction

EPL is powerful, but unfortunately this also means that when you design EPL you need to know how time passes and what event ordering is available or not. You can run Esper in an environment that doesn't pass time and that has completely disordered events.

There are many facilities that EPL offers that do not at all depend on event order or time passing. Some examples are...

  • A join select * from OrderEvent#unique(orderId) as ord, AccountEvent#unique(accountId) as act where ord.accountId = act.accountId  joins the last order event per order id with the last account event per account id
  • An aggregation select count(*), ip_address from PortScanEvent group by ip_address counts the port scans per ip address
  • A complex filter create context DynamicFilterContext initiated by CriteriaEvent as criteria and context DynamicFilterContext select * from News(language IN (context.criteria.languageIds) AND companies.anyOf(x=> x.company.id IN (context.criteria.companyIds)) allows you to dynamically filter news based on a criteria event (this sort of thing makes use of Esper filter indexes to fast-match)

Many facilities that EPL offers however do depend on time passing and do depend on event ordering. For example...

  • A pattern timer:interval(1 minute) and not SensorEvent alerts when an interval of one minute passes during which Esper receives no sensor event
  • A pattern match-recognize ( pattern (A B) ) matches when there is an event A that is immediately followed by an event B

When you run Esper, you need to look at your requirements and decide for each requirement whether you need to wait for late arriving events, and for how long to wait, and what to do for events that arrive later than that. You need to determine if and how you want to advance time. You need to determine whether to reorder events. Esper has a time-order window that can help with all that however you still need to define these parameters.

The pattern timer:interval(1 minute) and not SensorEvent detects the absence of the event with near-zero latency by itself. The actual latency depends on how time moves forward. For example your design may use a watermark and move its time based on allowed lateness. The 1 minute interval can span any amount of actual natural time, and so can the latency of detection when using watermarks.

The pattern timer:interval(1 minute) and not SensorEvent outputs a result after 1 minute without a sensor event. Assume that at a later time we do discover that a sensor event has come in. This would mean the output needs to be retracted. Retraction is not something that Esper handles currently.

On Windows

This post explores what the term Window means.

Wikipedia under the topic of Data Stream Management System lists a nice definition of Window:

Instead of using synopses to compress the characteristics of the whole data streams, window techniques only look on a portion of the data. This approach is motivated by the idea that only the most recent data are relevant. Therefore, a window continuously cuts out a part of the data stream, e.g. the last ten data stream elements, and only considers these elements during the processing. There are different kinds of such windows like sliding windows that are similar to FIFO lists or tumbling windows that cut out disjoint parts. Furthermore, the windows can also be differentiated into element-based windows, e.g., to consider the last ten elements, or time-based windows, e.g., to consider the last ten seconds of data.

In Esper we have...

  • Data is the events that are arriving into Esper
  • A window that considers the last 10 elements is #length(10), aka. length window
  • A window that considers the last 10 seconds of data is #time(10), aka. time window.

The smallest unit of change to a window is an individual event. A new event goes inside the window. The old event escapes the window. And thus events come and go. When such a change happens Esper determines if this change is meaningful. It does that by incrementally updating aggregations and match-recognize patterns each time an event comes and goes. When for example a query compares an aggregation against a threshold value it indicates this meaningful change to the application.

---> Esper evaluates windows continuously and incrementally on the level of individual events.

I have looked up Apache Flink which has a write-up on windows among its documentation in Application Development->Streaming->Operators->Windows. In Flink windows are at the heart of processing infinite streams. Windows split the stream into “buckets” of finite size, over which we can apply computations.. Flink forms a window and only when the window is completely formed does it apply a computation and then form a new window, unless I'm mistaken. This sounds a lot like batch processing to me. But batch != window.

In Esper a window can be an arbitrary subset of events. Here are two examples.

  • #length(10)#time(10) considers the last 10 elements that are not older than 10 seconds
  • create window MyWindow#keepall with on-merge, to insert and remove events according to any criteria

Other systems seem to form a window only from the data that arrives next to each other. It seems impossible in other systems to form a window across arbitrary data. They seem to require inserting into a table of some kind as I understand.

The term window in Esper means subset of events and in some systems means batch-delineation.

Go to Top