Key Concepts

  1. What is CEP?
  2. What is an event?
  3. What is an event stream or simply "stream"?
  4. What is an event cloud compared to a stream?

Introductory

  1. How do I look for specific events on a stream, dropping the unwanted events?
  2. How do I aggregate several (simple) events from a stream into a single new (complex) event summarizing event properties of the simple events?
  3. How do I limit the unbounded events of a stream to a defined timespan or number of events?
  4. How do I correlate events from one or several streams on a common set of values?

Troubleshooting

  1. My query presents incorrect or incomplete results? What can I do?
  2. My query does not compile?
  3. How do I design reducing memory use? I have an out-of-memory error and don't know the origin?
  4. I'm getting a stack overflow error?

Design Tips

  1. Simple Statements
  2. EPL Editor
  3. Few General Design Tips
  4. Understand Subqueries and Joins
  5. Performance
  6. Offline Analysis, Analyzing Historical Data, Priming from Recorded Events, Preloading

Diagramming Tips

  1. Diagramming Tips

EPL Questions

  1. How do I detect N events within X seconds?
  2. How to I compute a percentage or ratio?
  3. How do I measure the rate of arrival of events in a given time period? Per category?
  4. I want to get average, count, min and max for a 60-second time window?
  5. How to continually maintain customer ids that represent the top X customers with total purchases across a sliding time window?
  6. I want to compute and compare the maximum of all events since start and the maximum for the last 10 seconds?
  7. How can I aggregate large sliding windows and/or aggregate without retaining incoming events?
  8. How to calculate aggregations per partition and control output externally?
  9. How can I alert if a message came 5 times in 10 minute buckets, not sliding window but fixed to the start of hour, but my events can be out-of-order and can arrive late? Is there event time support?
  10. How do I output and compare totals of events over differently-sized windows?
  11. How do I detect N events meeting a condition of which M events meet a sub-condition within 10 minutes?
  12. How do I detect when over the course of 1 second there are at least 2 distinct field1 values, and for every one of at least 2 distinct field1 values, there are at least 3 distinct field2 values?
  13. How do I correlate events arriving in 2 or more streams?
  14. How do I correlate events arriving out-of-order?
  15. How do I use patterns to correlate events arriving in-order or out-of-order?
  16. How to detect events between other events using a pattern?
  17. How do I look for pairs of immediately-followed events?
  18. How do I correlate 3 events in a time window in which events have similar properties?
  19. How do I find three consecutive events with the same attribute values?
  20. How do I find at least two 'foo' events followed by two 'bar' events, all within 2 seconds?
  21. How can I drop events from pattern matches?
  22. How do I remove all events from a window and start over?
  23. How do I combine data windows and their expiry polices? Or define custom logic for removing events?
  24. How do I seed an empty data window from a filled data window?
  25. How do I keep a separate window of events per category and compute aggregates for each category's window?
  26. How do I use results of one statement in another statement?
  27. How do I put common filtering and normalization into a central place?
  28. How do I reduce the rate of event output by my statement? How do I get frequent but not continuous results?
  29. How do I delay data? How do I compare against previous events?
  30. How do I detect the absence of an event?
  31. How do I detect the absence of an event and the presence of an event arriving too late?
  32. How do I report at a regular interval without any incoming events?
  33. How do I find missing events arriving in 2 or more streams that are correlated?
  34. How do I look into the past and consider missing events? How do I do a snapshot, fire-and-forget or on-demand query?
  35. How do I detect the absence of multiple unrelated events that are known in advance? Is there a way to simplify hundreds or thousands of similar statements into one global statement? How do I "prime" or initialize a large number of similar patterns without creating a pattern statement for each pattern instance?
  36. How to filter events based on parent-child relationship? How to detect events that are neither preceded nor followed by a related event?
  37. How do I detect when a sensor exceeds a threshold and remains above the threshold for X seconds?
  38. How do I detect when a sensor value holds a threshold for X seconds?
  39. How to match when we see N events with "alert=true" over an hour, provided no events where "alert=false" came in?
  40. How to match when we see N events with "alert=true" over a second, provided no events where "alert=false" came in, per partition that comes and goes?
  41. I have an online-offline use case; I need to detect the presence of a first event and the absence of an event after some quiet time?
  42. How do I detect something really complex, like a triple-bottom pattern?
  43. How do I stop an insert after a period of time?
  44. Can I use a regular expression (regexp) within a filter?
  45. How can I remove duplicates?
  46. Within 5 minutes I want to collect 5 repeated and correlated events that each do not contain the same attribute(s) as previous repeated events, removing any overlapping matches?
  47. What if I want to form pairs of events where each pair is a unique combination of the latest event of two streams?
  48. How do I remove or drop events from a stream? I have a dynamic set of negative filters?
  49. How do I detect a specific sequence of events and get all events that take part in this sequence?
  50. How to implement a sell trailing stop order?
  51. I have one listener for multiple statements, how do I track which statement generated a result?
  52. Is there a way to receive the list of all events that participated in a window? I'm looking for a way to show the user the cause of the alert.
  53. We have our own math library, what are the options of utilizing it? How can I make calls out of the EPL into our own existing code?
  54. Can I use SOAP, WS-*, RESTful, JMS, RPC or other remote calls?
  55. What to do if my events are not JavaBeans, not well-defined, their properties are not known in advance and may differ wildly, and are nested?
  56. How do I query nested indexed events? Or more generally, event objects contained in event objects?
  57. How to calculate a weighted average over multiple time ranges?
  58. How to compute for 5-minute buckets? How to aggregate (e.g.vwap) n buckets of k minutes each?
  59. How can I execute a query only every X seconds?
  60. How do I create a recursive query? That is a query that feeds to itself and defines it's own input stream as output?
  61. We have an event stream for cars entering and leaving streets and want a car count per street? How do I insert or update and keep a count?
  62. I would like to trigger whenever the event contains a new and distinct security id and the source is one of the {A,B,C}?
  63. How to aggregate values between events?
  64. Start a window for each unique id when it arrives, wait 10 seconds for that id and output the last event for that window, starting a new window when the next event arrives for that id?
  65. How to perform an aggregation over a "semantic" window, i.e. a window that opens upon the arrival of a given event and closes upon the arrival of another event?
  66. How do I control start and end of matching considering a certain amount of time passed or a certain number of events arrived?
  67. How do I combine results from two distinct events and send out a single update? How do I process large events that contain further rows of information and need mapping information?
  68. I have two streams A and B that I want to join based on an "id" value that occurs exactly once per event type, and they can come any order.
  69. I have a port scan detector and would like to create a new event based on multiple events occurring over a given period of time?
  70. How to get notified each time a certain value has increased by a specified amount i.e. each time the value is greater than the value + x?
  71. I need to compare objects within an event? I also plan to use JavaScript scripting for custom logic?
  72. Notify the user when an observed data field is three standard deviations above or below its mean.
  73. Notify the user when two out of three successive data points lie more than two standard deviations from the mean on the same side of the mean line.
  74. List the items that receive bids outside of the period of their auction.
  75. How to detect when vehicles enter a region and exit a region using geo-fencing and spatial?
  76. How Do I Handle Graph Data, Vertices, Edges, Graph Nodes, Linked Data, Relationships?

Common API Questions

  1. I want to know what streams an EPL statement references but don't want to parse the EPL string? I want to programmatically inspect and change the select clause columns when a user enters an EPL query, how to do this?
  2. How do I store statements in a file? Is there a standard file storage?
  3. How can I parse and validate an EPL statement?
  4. Can I create a statement in "stopped" state, attach listeners or subscribers and start it?
  5. Can I use an UpdateListener to listen to events and the iterator-based pull-API together?
  6. I want to know if an event hits at least one registered query?
  7. How to get a single callback with the list of matching subscriptions?
  8. When to use a plug-in aggregation function and when to use a plug-in custom data window?
  9. When to use on-demand fire-and-forget queries versus on-select statements?
  10. How do I integrate with the Spring framework? How to use Spring support for Groovy or other scripting languages with EPL?
  11. How to change statements at runtime?
  12. I'm considering creating a very large number of statements (such as 100k+ statements)?

Key Concepts

What is CEP?

Complex Event Processing, or CEP, is primarily an event processing concept that deals with the task of processing multiple events with the goal of identifying the meaningful events within the event cloud. CEP employs techniques such as detection of complex patterns of many events, event correlation and abstraction, event hierarchies, and relationships between events such as causality, membership, and timing, and event-driven processes (source: wikipedia.org).

 

[top]


What is an event?

An event is an immutable record of a past occurrence of an action or state change. Event properties capture the useful information for an event.
Or.... "Something that happens" (source: webster.com). Typically, the following is true for an event:It's anchored in time.
It's not under your control. An event can itself contain further events. Event properties contain may contain rich and nested
domain-specific information.

 

[top]


What is an event stream or simply "stream"?

A time ordered sequence of events in time.

 

A stream is append-only, one cannot remove events (conceptually), and one can just add them to the sequence.

A stream is unbounded, i.e. there is no end to the sequence {event1, event2, event3, event4, ..., eventN}.

Streams are the basic building blocks of event stream processing. Similar to functions in a functional programming language, a CEP engine treats computation as the evaluation of streaming data.

Streams in Esper have zero cost in terms of memory or processing. You could register millions of streams.

[top]


What is an event cloud compared to a stream?

A stream is a time ordered sequence of events in time, while a cloud is unordered.

 

For example, as valid stream is {{1s, event1}, {2s, event2}, {4s, event3}}.

A cloud is unordered e.g. {{1s, event1}, {4s, event2}, {2s, event3}}.

[top]

Introductory

How do I look for specific events on a stream, dropping the unwanted events?

Consider a stream of temperature sensor events that provide a temperature value. This query looks for all sensor events (named SensorEvent) where the temperature is greater than 90:

 

select * from SensorEvent(temperature > 90)

[top]


How do I aggregate several (simple) events from a stream into a single new (complex) event summarizing event properties of the simple events?

This sample outputs the average temperature of all sensor events received from the start of the query:

 

select avg(temperature) from SensorEvent

[top]


How do I limit the unbounded events of a stream to a defined timespan or number of events?

There is many different flavors of data windows, which serve to limit the subset of events to analyze on the unbound stream of data.

 

The next sample outputs the average temperature of all sensor events received within the last 1 minute:

select avg(temperature) from SensorEvent#time(1 min)

[top]


How do I correlate events from one or several streams on a common set of values?

Consider the stream of sensor events and a stream of sensor location events, assuming our sensors can
move around geographically. This sample query joins the last sensor event per sensor id with the last sensor location (SensorLocationEvent) for each sensor identified by a sensor id (sensorId):

 

select temperature, coordinates
from SensorEvent#unique(sensorId) as sensor,
     SensorLocationEvent#unique(sensorId) as loc
where sensor.sensorId = loc.sensorId

You may ask why the "#unique(sensorId)" is needed here. When joining two streams, in this example and as is often the case we are looking to join the last event per key of each stream.

[top]

Troubleshooting

My query presents incorrect or incomplete results? What can I do?

Use @Audit to turn on statement-level processing debug output which is logged under informational level to your output, please
see the documentation for more details. For example: "@Audit select * from MyEvent". @Audit also accepts a number of keywords to control which aspects of processing are output and which are not.
Make sure each event object is its own object. As Esper cannot detect when properties of an event object are changed by your application, it is best not to change event properties of an existing event object, and it is best not to reuse an existing event object when sending an event into the runtime.
The UpdateListener interface presents an array of output events (and not just one event). Make sure to inspect all events in the array output.
Note that the runtime does not process statements in the order they are created. Instead the runtime processes events as they come in, and therefore the statements they apply to, in any order. If you do want to dictate to the runtime a particular order of execution, use @Priority.
Consider adding expressions to the select-clause so that you can inspect what the expression result is.
If you have listeners that send events into the runtime upon receipt of events from the runtime, those listeners must use the "route" method to send events.
Turn on logging as described in the documentation to see warning message that the runtime may log, or exceptions raised by the runtime or your code, or to trace event processing. Ensure to turn off logging when deploying to production.
Consider adding a user-defined function to your EPL statement to output application information or to set a breakpoint.
If you want to see remove stream events posted to listeners, the "irstream" and "rstream" keywords in the select clause select the remove stream.
For aggregation statements, note that the presence or absence of an event property in the select clause and with or without group-by clause can change what is output by the runtime. See "Output for Aggregation and Group-By" in the doc.
The Reporting Issues page on this site provides further information for resolving issues. When submitting code to the forums please submit the smallest test class as can be, remove any randomness and send only the smallest number of events to reproduce the problem.

 

[top]


My query does not compile?

For syntax errors, the EPL compiler provides line and column numbers as part of the error message.

 

You could try a simpler query first and then add to the query.

[top]


How do I design reducing memory use? I have an out-of-memory error and don't know the origin?

The documentation in the performance section includes information on designing statements for reduced memory use and things to look for to prevent memory "leaks". EsperHA virtually eliminates the chance of out-of-memory error. This is because EsperHA can release state from memory, and bring it back when needed.

 

[top]


I'm getting a stack overflow error?

This can happen if your listener or subscriber code sends in an event via "sendEvent" method and that event triggers a listener again which may send in another event (applies to default threading model). Use the "route" method to send events into the runtime from a listener.

 

[top]

Design Tips

Simple Statements

We recommend keeping your EPL statements simple and easy-to-read: camelCase or CamelCase for property names works best, and event type names usually are CamelCase as well.

 

Consider splitting a single, complex statement into multiple EPL statements, where the first statement produces data via insert-into into a stream that a second statement consumes.

Consider using expression declarations, which allow factoring out common expressions into a "expression abc{...}".

[top]


EPL Editor

We provide a free Visual Studio Code extension for EPL editing and an EPL language server. Please visit the Visual Studio Marketplace for the download.

 

For Netbeans IDE are available following plugins: EPL ErrorChecking, EPL FileSupport, EPL SyntaxHighlighter, EPL dependency visualiser,
They were created for usage of company Mycroft Mind by students of Masaryk University and are free to use.

For Eclipse the Eclipse SQL Editor is also quit usable for EPL Statements. To use it, do the following: . Open Eclipse . Preferences / Install Software . Work with: http://download.eclipse.org/datatools/updates . Select Eclipse Data Tools Platform (latest version) . Windows / Preferences / General/ Editors / File
Associations -> assign SQL Editor as default to *.epl Syntax Highlighting does work quit well, but there is no content assist.

[top]


Few General Design Tips

For many use cases and requirements there are typically multiple solutions in EPL. This discussion provides useful hints for newcomers. It is not comprehensive. You will find more information on each idea discussed herein in the documentation. We recommend looking at the examples and coming up with your own EPL statements and testing them out.

 

For example, consider sensor stream data produced by devices such as temperature sensors, pressure sensors or valve movement sensors for customers and regions.

One aspect of statement design will need to consider the specificity of EPL statements. What should the EPL statement be specific to, i.e. which sub-stream of all the sensor data should an EPL statement focus on? In relationship to the sensor stream example, you may decide to create an EPL statement per device specifying the device id, an EPL statement for a group of devices by listing multiple device ids, an EPL statement for a device type (i.e. valve sensors), an EPL statement for a given customer id that owns the sensor, or an EPL statement that analyzes all devices or combinations of these. You typically solve specificity by using filters such as "TemperatureSensor(customerId=10)" or TemperatureSensor(deviceId='A0001')" as part of statements. In that respect consider using constant variables (declared as const) so that certain constants are defined only once and need not occur multiple times.

From a performance perspective:

  1. The least expensive way to analyze by a given dimension or multiple dimensions is by using the "group-by/rollup" clause (this groups aggregations)
    or "partition-by" clause (this groups match-recognize pattern matches) or "#groupwin" (this groups data windows). This should be applied when similar things need to be analyzed for many users/devices/customers i.e. one EPL statement can handle very many.
  2. The next higher and slightly more expensive level of analyzing by one or more dimensions is by partitioning via context partitions, i.e. keyed segmented context or overlapping or non-overlapping context can be useful. This operates on a higher level as "group-by/partition-by/groupwin". You can still have one EPL statement for many partitions. This should be applied when similar things need to be analyzed for multiple users/devices/customers wherein the use case requires separate but similar states for each.
  3. The next higher level would be a separate statement. This should be use when there is specific non-reuseable analysis. Each statement requires some memory for metadata.
  4. The next higher level is the runtime and one can always allocate multiple runtimes.

Also see the discussion on very large number of statements.

You may search for the right EPL construct to use. Should I use an EPL subquery, join or outer join? Or an EPL pattern? Or combine a join and a pattern? Or use a sub-query? While there is no general answer, since you are likely processing high-arrival-rate streams, its best to understand, at least at a high
level, what the runtime does for each such construct. Joins use relational algebra and query planning, so they are very suited to one-to-many relationships and fast index-based lookup. Outer joins can handle absence of events. Joins also handle out-of-order arrival well and time-based relationship testing can be accomplished with Allen's interval algebra in the where-clause (aka. date-time methods). EPL patterns are dynamically-managed state trees. They are very suitable to detect time-based relationship with presence or absence of events and other complex situations based on event arrival order (reorder if needed using time order window). EPL match-recognize pattern matching is based on NFA and most suitable to find patterns among ordered events.

You may consider whether to use a data window, what the right data window to use is and whether to use named windows or not. The most frequently used data windows are unique (#unique), last event (#lastevent) and time window (#time). Use named windows when you need custom merge, update or expiry logic or when the same data window is used in multiple EPL statements. You can also use no data window, so the runtime does not retain any events. Context partitions are often useful here to control when state should be discarded.

Use a data window if you want to retain events. Don't specify a data window unless you need to retain events.

You can aggregate with and without data windows. Use aggregation with data windows if you can and want to afford to retain events. Use aggregation with context partitions if you do not want to retain events.

We recommend using filters where possible for filtering out events, such as for example "StockTick(symbol='IBM')". We recommend using the where-clause only for correlation criteria.

[top]


Understand Subqueries and Joins

Joins are multidirectional: When new data arrives on any of the joined streams the join (Cartesian) product of the data windows is output (joined or limited by the where-clause if present) and aggregated (if using aggregations).

 

You can make joins unidirectional: Thus the join only outputs data when an event arrives for the single stream marked as unidirectional and no output occurs for other streams. Unidirectional is also useful when you want to trigger a join from a time-based pattern, for example "select sum(xyz) from pattern[every timer:interval(10)] unidirectional, MyEvent#time(100), ... where ...".

Consider replacing the join with one or more sub-queries. Subqueries are often easier to read and conceptually clearer.
Subqueries don't have multidirectional evaluation. The query planning and indexing for fast lookup can be very different between subqueries and joins. Please see the performance tips section in the docs for a comparison of subqueries and joins.

When using joins with aggregations, consider splitting the statement into multiple simpler statements: Aggregate and output to a stream in one statement and use the output stream(s) in a second statement that joins or sub-queries without also aggregating. Using separate streams gives you the flexibility to report or debug each individually, do the output rate controlling you prefer and reuse the streams for other statements.

[top]


Performance

To obtain the best possible startup and runtime performance for your design, we have compiled valuable performance tips in a section on performance in the documentation. Please also see the "On Performance" page under "Esper for Java" on the left side page menu.

 

[top]


Offline Analysis, Analyzing Historical Data, Priming from Recorded Events, Preloading

We have seen quite a few use cases around analyzing data offline or loading recorded events. One example is to detect fraud in health care by replaying recorded past claims and prescriptions. Or detect fraud by replaying bank transactions at the end of a business day since they were not available real-time. Another is fleet tracking to determine when trucks are not on schedule considering up to 2 weeks of past data. Or when you have a database of events and need to load them before processing streaming data.

 

Esper EPL can detect complex events regardless of whether the data is offline and historical, or whether the data is online and currently arriving (or a mix thereof). Trying to do this analysis using a relational database and SQL can be difficult when relationships become complex and may include the time dimension. Since the runtime is fast it is often feasible to replay large numbers of events.

Start by disabling the internal timer in the configuration passed when obtaining an runtime instance:

Configuration config = new Configuration();
config.getRuntime().getThreading().setInternalTimerEnabled(false);

Set time to zero or to the oldest event's timestamp:

runtime.getEventService().advanceTime(0);

Next create a bunch of EPL statements.

From your event source, read events and send events into the runtime advancing time (here entry is your event object):

runtime.getEventService().advanceTime(entry.timestamp);
runtime.getEventService().sendEventBean(entry, "MyEvent");

It is not necessary to advance time for every event that your application sends in. Instead and as an optimization you could decide on a time resolution and advance time according to that resolution (i.e. every 100 milliseconds).

Replay events from multiple threads to gain the best performance, given your EPL statements are well laid out using context partitions for example. It is not necessary to synchronize application events with timer control events (CurrentTimeEvent or time span event instances, see doc).
If you are concerned with a defined order of processing application and timer control events, consider using a CountDownLatch to trigger timer processing. We would recommend reordering events that are not in time order before sending events into the runtime.

Also, when your application is done replaying events, it could switch on the internal timer again using an Esper class called TimerControlEvent and start sending online currently-arriving events. See the API chapter in the doc for additional information.

All this works for EsperHA the same as for Esper. With EsperHA your application can persiste and recover state at any time and continue from state.

[top]

Diagramming Tips

Diagramming Tips

EPL is a lot about event streams and for diagramming streams, EPL, sources and sinks we would use an arrow head line (a line with one-end being an arrow) for each stream. The lines connect incoming event feeds (sources), outgoing event feeds (sinks) and EPL statements. For each EPL statement that processes a stream, according to the from-clauses of the EPL statement, we would use a box. For feeds (sources and sinks) we could use a stick-figure or a queue.

Inside each box that represents an EPL statement one could simply put a short text that describe the EPL statement so the diagram doesn't need to have details. Or you could put some or all of the EPL into the box.

For the drawing tool we have used draw.io. When creating a new diagram in draw.io we used the "Basic" and "Blank Diagram" category.

Here is an example diagram. This one has a single source that sends buy-events.

EPL Sample Diagram

[top]

EPL Questions

How do I detect N events within X seconds?

The next statement outputs the count of events within the last 3 minutes:

 

select count(*) from MyEvent#time(3 min)

By adding a having-clause the query only outputs a result when at least 5 events occur within 3 minutes:

select count(*) from MyEvent#time(3 min) having count(*) >= 5

By adding a filter the runtime looks for events that match a certain criteria:

select count(*) from MyEvent(somefield = 10)#time(3 min) having count(*) >= 5

This query also suppresses output for 3 minutes after a first match occurs:

select count(*) from MyEvent(somefield = 10)#time(3 min) having count(*) >= 5 output first every 3 minutes

This query also outputs the events themselves:

select count(*), window(*) from MyEvent(somefield = 10)#time(3 min) having count(*) >= 5 output first every 3 minutes

The query could also have a group-by clause to count per group. Of course and as always, in multithreaded use it is ensured that the event entering the
window and the count(*) firing is atomic. The count cannot jump i.e. count is incremental.

[top]


How to I compute a percentage or ratio?

Let's assume we have an event that indicates whether an item is black or white. Assume the boolean-type "black" property is true when the item is black.

 

Most aggregation functions allow passing a filter expression as a parameter to the aggregation function. This solution passes the boolean "black" property as the filter expression to the count aggregation function to compute the running percentage of items that are black compared to all items:

select count(*, black)/count(*) as pct from BlackWhiteItem

One could also formulate the same query as follows:

select count(*, black = true)/count(*) as pct from BlackWhiteItem

[top]


How do I measure the rate of arrival of events in a given time period? Per category?

The "rate" built-in aggregation function computes arrival rate. It can also be used when timestamp values are provided by your event.

 

This computes the per-second rate using runtime current-time timestamps by averaging 5 seconds of events, and outputs the rate every second:

select rate(5) from MarketDataEvent output snapshot every 1 sec

This computes the per-second rate using event timestamps (timestamp being a property of your event), also considering the last 5 seconds of events:

select rate(timestamp) from MarketDataEvent#time(5 sec) output snapshot every 1 sec

For per-category computation of a rate, lets assume the category is the "feed" fields of the MarketDataEvent.

Create a keyed segmented context, instructing the runtime that your dimension of analysis is the "feed" value:

create context SegmentedByFeed partition by feed from MarketDataEvent

The runtime keeps a context partition per "feed" value, as locking is per context partition this achieves the highest concurrency.

Select the rate:

context SegmentedByFeed select feed, rate(timestamp) from MarketDataEvent#time(5 sec) output snapshot every 1 sec

You may alternatively use the rate aggregation function with group-by as follows:

select feed, rate(timestamp) from MarketDataEvent#time(5 sec) group by feed output snapshot every 1 sec

[top]


I want to get average, count, min and max for a 60-second time window?

There are a few solutions to consider, mostly depending on how often you want to compute aggregations (continuously or once on output) and output a result (continuously or once after 60 sec) and whether you want the runtime to retain events in memory or not.

 

Lets assume we want to output only once after 60 seconds, start fresh every 60 seconds, compute aggregations continuously and don't retain any events in memory for those 60 seconds (only retain aggregated values in memory). This is the two statements that accomplish this:

// Activate now and every 1-minute, terminating after 1 minute
create context CtxEachMinute initiated @now and pattern [every timer:interval(1 min)] terminated after 1 minute
context CtxEachMinute select avg(value) as avgValue, count(value) as countValue,
min(value) as minValue, max(value) as maxValue from MyEvent
output snapshot when terminated

Another possible solution is to use a batch window like the following EPL shows. This EPL asks the runtime to keep 60 seconds of
events in memory, perform the aggregation computation once (not continuously) for all events at the end of 60 seconds, and then output the result.

select avg(value) as avgValue, count(value) as countValue,
min(value) as minValue, max(value) as maxValue from MyEvent#time_batch(60 sec)

A third possible solution uses a rolling time window as shown below. The runtime always retains 60 seconds of events and outputs every time the aggregations change either when new events arrive or when events leave the time window. This output is continuous and aggregation
computation is continuous as well.

select avg(value) as avgValue, count(value) as countValue,
min(value) as minValue, max(value) as maxValue from MyEvent#time(60 sec)

The first solution (the one with CtxEachMinute) may be preferable for many cases, however solutions two
and three can have an advantage depending on what output you are looking for and when you want to compute aggregated values.
In the case that output should be aligned to milliseconds and that we want to force output for all "symbol"-values regardless of whether events arrived for a symbol or not, use this:

create context BatchEvery5Second
  context PartitionBySymbol partition by symbol from StockTick,
  context Batch start @now end pattern[timer:schedule(date:'1970-01-01T00:00:00.0Z', period: 5 seconds, repetitions: -1)];

context BatchEvery5Second 
select context.PartitionBySymbol.key1 as symbol, avg(price) as avgPrice, current_timestamp() as cepTime
from StockTick output snapshot when terminated;

[top]


How to continually maintain customer ids that represent the top X customers with total purchases across a sliding time window?

I have a single stream of events that contain 3 properties, a Sale ID, a Customer ID, and a sale amount. I want to continually maintain a set of Customer IDs that represent the top X customers with total purchases across a sliding time window.

 

For example, I'd like to know who my top 30 purchasers based off of sales for the last 7 days. I'm trying to find a solution that provides me with a place to put an UpdateListener that will process the insert and remove streams maintaining a high-speed cache (like memcached, but not necessarily) other programs can leverage.

The solution utilizes two statements, one to compute the total per customer over a time window, and the second to rank the output of the first statement.

// define input event
create map schema BuyEvent as (saleid int, customerid int, amount double);

// compute current total per customer
insert into CustomerTotals
select customerid, sum(amount) as total
from BuyEvent#time(7 days) cbe
group by customerid;

// output insert and remove stream of top 30 customers.
@Name('TopCustomers')
select irstream customerid, total
from CustomerTotals#rank(customerid, 30, total desc);

[top]


I want to compute and compare the maximum of all events since start and the maximum for the last 10 seconds?

Compute the maximum of all the values over the total time, output at a 1-minute resolution:

 

insert into HistoricStream select max(e) as maxE from MyEvent output last every 1 minute

Compute the maximum for the last 10 seconds:

insert into Last10Sec select max(e) as maxE from MyEvent#time(10)

Compare, for example using a subquery:

select * from Last10Sec where maxE > (select maxE from HistoricStream#lastevent())

Alternatively, you could put all in one statement as shown next. This sample EPL doesn't compare but also outputs average and the value itself:

select e, max(e), avg(e), select (max(local.e) as localMax, avg(local.e) as localAvg from MyEvent#time(10)) from MyEvent

[top]


How can I aggregate large sliding windows and/or aggregate without retaining incoming events?

EsperHA can handle any size of sliding window since EsperHA can release state from memory, and bring state back when needed.

 

You can use a strategy called 2-level aggregation. The first level of aggregation forms 1-second buckets of counts and totals. The second level aggregation considers only the buckets provided by the first level (and not the original event data points).

Lets start with the first level aggregation that computes 1-second totals. We use a context declaration that instructs the runtime to start counting now and restart every 1 second. This way the runtime aggregates for 1 second and then after the second passes it outputs the aggregated values, discards existing aggregations and aggregates for another second, and so on.

create context PerSecond start @now end after 1 second

The context above controls our aggregation lifecycle. We need a second statement that computes the aggregation and outputs a stream of aggregated totals.

The next statement is still part of the first level aggregation and outputs aggregated buckets into a stream called OneSecondBucket:

// First level aggregation
context PerSecond
insert into OneSecondBucket
select type, sum(value) as secTotal
from MyEvent group by type output snapshot when terminated

The EPL above outputs a total per type every second into the OneSecondBucket stream. The OneSecondBucket stream is now available to any other statement that is interested in per-second totals. Note that it retains none of the MyEvent events in memory since it does not declare a data window.
We could instead have used a time-batch window of 1-second but that would retain events in memory which we don't want.

We can now have a statement that takes the OneSecondBucket totals and reports 1-hour totals:

// Second level aggregation
select type, sum(secTotal) from OneSecondBucket#time(1 hour) group by type

The above statement outputs a total per type every second, summing up 1 hour of MyEvent events. The max number of OneSecondBucket events retained is 60*60 per type. The resolution is 1 second.

If you have a large number of different dimensions to report, here is some additional advice. The example above only has a single "type" dimension, to keep the example simple. If you have X different dimensions, consider using a named window instead of "OneSecondBucket#time(1 hour)" so that the data points are shared between statements computing different dimensions. Use the group-by with rollup feature to compact multiple dimensions into a single statement.

[top]


How to calculate aggregations per partition and control output externally?

We have input events that have fields called key1 and key2 which logically partition events. We want to calculate aggregates for each partition (combination of key1 and key2) and control externally when that specific partition should end.

 

The sample statements here compute average temperature and count for sensor events. A nested context is useful in that is can partition by keys and when a control events arrives can stop counting and trigger output. The control event that ends the counting and triggers output in below EPL is MyEndEvent.

create context CtxPerKeysAndExternallyControlled
  context PartitionedByKeys
    partition by
      key1, key2 from SensorEvent
  context StopWhenEndArrives
    start SensorEvent as e1
    end MyEndEvent(key1=e1.key1 and key2=e1.key2)
context CtxPerKeysAndExternallyControlled
select key1, key2, avg(temp) as avgTemp, count(*) as cnt
from SensorEvent
output snapshot when terminated
// note: no group-by needed since partioned by keys

You could also maintain multiple overlapping context partitions for each key1 and key2 value. The previous example is non-overlapping while this example allows overlapping context partitions (i.e. simple counts in this example). This example statement initiates a new context partition
when a MyInitEvent arrives and terminates all context partitions for a given key1 and key2 value when a MyTermEvent arrives.

create context CtxPerKeysAndExternallyControlled
        context PartitionedByKeys
          partition by
            key1, key2 from MyInitEvent
            key1, key2 from SensorEvent
        context InitiateAndTerm
          initiated by MyInitEvent as e1
          terminated by MyTermEvent(key1=e1.key1 and key2=e1.key2)

(the counting statement is the same as above)

[top]


How can I alert if a message came 5 times in 10 minute buckets, not sliding window but fixed to the start of hour, but my events can be out-of-order and can arrive late? Is there event time support?

Say I want to aggregate events in buckets from 8:00:00 to 8:09:59, and 8:10:00 to 8:19:59, and 8:20:00 to 8:29:59 and so on. So my time bucket starts at the one-hour mark and each bucket lasts for 10 minutes.

 

Each event can arrive late. I want to use the event time that comes with the event to determine the bucket it belongs to and count it as part of the bucket it belongs to. This is because I have network glitches and events are sometimes severely out of order.

Lets assume the event type looks like this:

create schema MyEvent(eventtime java.util.Date)

One solution is to use the event time in the group-by clause. The simplified example is:

select count(*) from MyEvent group by roundTime(eventtime) having count(*) >= 5

The roundTime function above is a function that rounds down to the nearest 10-minute timestamp. We can use EPL to compute that rounded-down (floor) time or a user-defined external function, for example. A sample EPL statement is below:

expression getFloor10Min { v=>
  cast(eventtime.get('minute') / 10, int)
}

expression roundedToFloor10Min { v=>
  eventtime.set('minute', getFloor10Min(v))
}

@Name('Out')
@Hint('reclaim_group_aged=3600')
select count(*), roundedToFloor10Min(e)
from MyEvent as e
group by roundedToFloor10Min(e)
having count(*) >= 5;

Send events like this (syntax is from EPL Online Tool). Note how the last event is late:

MyEvent={eventtime="2001-01-01 08:00:00.000"}
MyEvent={eventtime="2001-01-01 08:01:00.000"}
MyEvent={eventtime="2001-01-01 08:02:00.000"}
MyEvent={eventtime="2001-01-01 08:09:00.000"}
MyEvent={eventtime="2001-01-01 08:20:00.000"}
MyEvent={eventtime="2001-01-01 08:21:00.000"}
MyEvent={eventtime="2001-01-01 08:09:00.000"}

Use the @Hint('reclaim_group_aged=3600') to tell the runtime to throw away aggregations when they get older than 1 hour. This is to make sure groups are removed at some time to prevent a possible memory leak.

As an alternative, please see controlling time-keeping in the documentation for information on moving runtime time forward based on event time.

How do I output and compare totals of events over differently-sized windows?

Given an event with an integer "size" property I need to output totals over a 1-minute-window and a 2-minute-window when the 1-minute-aggregate-size is greater than 100. I want no output for another minute once the total is greater than 100.

 

Later, I want additional window sizes, i.e. 3-minute-window, 4-minute-window, 5-minute-window and so on. I also may need to compare these totals in other ways.

The event type is:

create schema MyEvent(size int)

This statement satisfies the first question:

select sum(size) as sumOneMin, (select sum(size) from MyEvent#time(2 min)) as sumTwoMin
from MyEvent#time(1 min)
having sum(size) > 100
output first every 1 minute

A nice approach can be to aggregate into an EPL table, keeping each X-minute aggregation as a column of the same EPL table row:

create table MyTable(sum1Min sum(int), sum2Min sum(int), sum3Min sum(int));

// priority makes sure table gets updated first in case we need this table to query for the same event arriving
// configure "prioritized execution" flag
@Priority(1) into table MyTable select sum(size) as sum1Min from MyEvent#time(1 min);
@Priority(1) into table MyTable select sum(size) as sum2Min from MyEvent#time(2 min);
@Priority(1) into table MyTable select sum(size) as sum3Min from MyEvent#time(3 min);
on MyEvent select * from MyTable where sum1Min > sum2Min;

How do I detect N events meeting a condition of which M events meet a sub-condition within 10 minutes?

My events have ports 80, 81, 82, 9000, 9001, 9002 and I'm looking for 10 entries for any of these ports and at least 4 entries for 9000-and-up ports,
in the last 10 minutes, grouped by ip-source. The event type is:

 

create schema MyEvent(port int, ip_src string)

All aggregation functions can have filter conditions therefore the EPL design is pretty simple.

select * from MyEvent(port IN (80, 81, 82, 9000, 9001, 9002)).win:time(10 minutes)
group by ip_src
having count(*) = 10 and count(*, port IN (9000, 9001, 9002)) >= 4

Sample events for the EPL online editor are next.

MyEvent={port=9001, ip_src='1.1.1.1'}
MyEvent={port=9001, ip_src='1.1.1.1'}
MyEvent={port=9001, ip_src='1.1.1.1'}
MyEvent={port=80, ip_src='1.1.1.1'}
MyEvent={port=80, ip_src='1.1.1.1'}
MyEvent={port=80, ip_src='1.1.1.1'}
MyEvent={port=80, ip_src='1.1.1.1'}
MyEvent={port=80, ip_src='1.1.1.1'}
MyEvent={port=80, ip_src='1.1.1.1'}
MyEvent={port=9001, ip_src='1.1.1.1'}

How do I detect when over the course of 1 second there are at least 2 distinct field1 values, and for every one of at least 2 distinct field1 values, there are at least 3 distinct field2 values?

My events have field1 and field2 values. I need to detect when there are two distinct field1 values and each of those have three distinct field2 values.

 

The events follow this schema.

create schema MyEvent(id long, field1 string, field2 string)

For example:

Event={id=0, field1='foo', field2='c'}
Event={id=1, field1='bar', field2='a'}
Event={id=2, field1='foo', field2='a'}
Event={id=3, field1='baz', field2='a'}
Event={id=4, field1='baz', field2='b'}
Event={id=5, field1='baz', field2='b'}
Event={id=6, field1='baz', field2='c'}
Event={id=7, field1='bar', field2='b'}
Event={id=8, field1='foo', field2='b'}

The last event with id 8 should trigger the output since at that point for 'baz' we have 3 distinct values 'a', 'b' and 'c' and
for 'foo' we have 3 distinct values 'a', 'b' and 'c'. The output must include the last event per combined field1 and field2.

The idea behind this design is to keep a current distinct count in a summary table and use a detail table to keep the last events so they be part of the output.

create schema Event(id long, field1 string, field2 string);

// Keep a summary table, keyed by field1, that holds the distinct count.
create table SummaryTable(field1 string primary key, cntDistinct count(distinct string));

// Populate the summary table
@Priority(1) into table SummaryTable select count(distinct field2) as cntDistinct from Event#time(1 second) group by field1;

// Keep a detail table, keyed by field1 and field2, to keep the last event for output
create table DetailTable(field1 string primary key, field2 string primary key, lst lastever(*) @type(Event));

// Populate the detail table
@Priority(1) into table DetailTable select lastever(*) as lst from Event#time(1 second) group by field1, field2;

// Detection
insert into Detected select (select * from SummaryTable).where(v => v.cntDistinct >= 3).selectFrom(v => v.field1) as field1Vals
from Event
where (select * from SummaryTable).countOf(v => v.cntDistinct >= 3) >= 2;

// Upon detection, output the detail
@name('alert') select (select * from DetailTable).where(d => d.field1 in (field1Vals)).selectFrom(v => v.lst) from Detected;

How do I correlate events arriving in 2 or more streams?

The join of event streams looks very similar to joins in SQL. To bind data in the streams together, across streams, we identify keys to join on.

 

The below example specifies the 'accountNumber' field as the only join key. In this example we hold the last 30 seconds of events for each stream.

select fraud.accountNumber as accntNum, withdraw.amount as amount
from FraudWarningEvent#time(30 sec) as fraud,
     WithdrawalEvent#time(30 sec) as withdraw
where fraud.accountNumber = withdraw.accountNumber

[top]


How do I correlate events arriving out-of-order?

Let's assume we have three different types of events, all having a common attribute 'exchangeId'. Let's call the events start, finished and aborted.

 

Let's expect exactly one start event and multiple finished or aborted events for every exchange_id. The start event may happen after the
finished or aborted events, but they all happen within say 30 seconds per exchangeId.

There are multiple possible answers to this problem. One solution can be an outer join using time windows, and looking at the remove stream since we care about the composite events when a start event leaves the time window after 30 sec, when the other events for the same exchange id have accumulated.

select rstream * from
  StartEvent#time(30 sec) start
    left outer join
  AbortedEvent#time(30 sec) abort
    on about.exchangeId = start.exchangeId
    left outer join
  FinishedEvent#time(30 sec) finished
    on finished.exchangeId = start.exchangeId

In the example above, every time a StartEvent leaves the time window it takes with it all aborted and finished events. The abort property will be null if no abort occurred.

Use an inner join instead, like below, if all events are mandatory but can come in any order. This query joins by "id" keeping each last event per id for up to 5 minutes.

select * from
Event(conditions)#unique(id)#time(5 min) as s1,
Event(conditions)#unique(id)#time(5 min) as s2,
Event(conditions)#unique(id)#time(5 min) as s3
where s1.id = s2.id and s1.id = s3.id

Another solution is shown next using patterns to detect out-of-order events.

[top]


How do I use patterns to correlate events arriving in-order or out-of-order?

The prior discussion focused on 3 kinds of events: start, finished and aborted.

 

A second possible solution can be found in using patterns. If one doesn't really care about processing the multiple aborted events and simply wants to get a failed or finished indication when the first aborted event or finished event is encountered, then a simpler approach can be specifying a pattern for each interesting combinations, some of which are shown below.

select * from pattern [every s=StartEvent ->
  a=AbortedEvent(exchangeId = s.echangeId) where timer:within(30 sec)]

The above pattern simply looks for aborted transactions and reacts to the first aborted event coming in after a start event. The pattern to detect finished transactions, i.e. where no abort occurred, should look about like this:

select * from pattern [every s=StartEvent ->
  (f=FinishedEvent(exchangeId = s.echangeId) where timer:within(30 sec)
    and not AbortedEvent(exchangeId = s.echangeId) where timer:within(30 sec)]

To detect out-of-order events, the pattern can certainly be reversed:

select * from pattern [every a=AbortedEvent ->
  s=StartEvent(exchangeId = s.echangeId) where timer:within(30 sec)]

[top]


How to detect events between other events using a pattern?

A use case wants to select all tuples that are between two tuples. For example, assume that I want all tuples between the first tuple with PARAM1=2 and the first tuple after this one with PARAM2=0. This would select all tuples with time between 3 and 8 in the example below.

 

TIME | PARAM1 | PARAM2
1 1 0
2 1 1
3 2 2 <== ON
4 2 3
5 3 5
6 3 4
7 2 3
8 2 0 <== OFF
9 3 1

This seems best solved with a pattern with followed-by and unbound repeat, such as:

select * from pattern [
  beginevent=Event(param1 = 2, param2 = 2)
    -> middleevent=Event(param1 != beginevent.param1, param2 != 0)
         until endevent=Event(param1 = beginevent.param1, param2 = 0)
  ]

[top]


How do I look for pairs of immediately-followed events?

While one could use an EPL pattern statement to look for the absence of any other event between two events, i.e. (A -> (B and not C)), the match-recognize regular-expression pattern matching (proposed for SQL standard) provides a good solution. The next statement utilizes match-recognize to look for two immediately-followed events that both have the same origin (partition-clause) and where the first event has a picked flag value of false and the second event has a picked flag value of true.

 

select * from AlertEventStream
  match_recognize (
    partition by origin
    measures a1.alarmNumber as alarmNumber1, a1.alarmNumber as alarmNumber2, a1.origin as origin
    pattern (a1 a2)
    define
      a1 as a1.picked = false
      a2 as a2.picked = true
)

[top]


How do I correlate 3 events in a time window in which events have similar properties?

My application needs to match 3 events which occur within a time window where 3 different users submit trades with similar properties. The properties that must have the same value for each of the 3 events matched is currency and direction. The pattern is to match only if all 3 events have a different user. The time window should be 10 minutes long.

 

The pattern that solves this problem is shown below. It uses the timer:within pattern guard to limit the lifetime of each active sub-expression to 10 minutes.

every trade1=Trade(userId in ('U1000','U1001','U1002') ) ->
  (trade2=Trade(userId in ('U1000','U1001','U1002') and
     userId != trade1.userId and ccypair = trade1.ccypair
     and direction = trade1.direction) ->
   trade3=Trade(userId in ('U1000','U1001','U1002') and
     userId not in (trade1.userId, trade2.userId) and
     ccypair = trade1.ccypair and direction = trade1.direction))
  ) where timer:within(10 min)

[top]


How do I find three consecutive events with the same attribute values?

My use case needs to find three consecutive events with the same attribute values. Let's say we are looking for the same "mytimestamp" value. We are not interested in the value of "mytimestamp" and we only need that the three events have the same timestamp attribute value whatever it is. The three events must also have a fixed value and in my example my events have an "myeventcode" of 1.

 

Since all events must have "myeventcode" of 1 this solution specifies a filter. We use match-recognize to look for the sequence of immediately-following (consecutive) events.

select * from Event(myeventcode=1)
match_recognize (
  measures A as a, B as b
  pattern (A B{2} )
  define
    B as B.mytimestamp = A.mytimestamp
)

[top]


How do I find at least two 'foo' events followed by two 'bar' events, all within 2 seconds?

We can use match-recognize to look for the sequence of immediately-following (consecutive) events, limiting matches to 2 seconds of events.

 

In this solution example, events are stock tick events. We have 'foo' as 'symbol' being 'GE'. We have 'bar' as 'symbol' being 'YHOO'.

create schema StockTick(symbol string, price double);

select * from StockTick#time(2) match_recognize (
   measures g as g, y as y
   pattern (g{2,} y{2})
   define
       g as g.symbol = 'GE',
       y as y.symbol = 'YHOO'
);

[top]


How can I drop events from pattern matches?

I have alarm events and each event has a 'new' and 'clear' flag. I need to detect a sequence of three 'new' alarms and when a 'clear' alarm comes in, the 'new' sequence detection can be dropped/deleted/removed as it is no longer relevant. Matches should be within 60 seconds. I'm only interested in alarms of symbol 'AL1'.

 

// Define the event type information.
create schema Alarm(type string, sometext string, symbol string, id string);

// Define a 60-second window of alarms
create window AlarmWindow#time(60) as Alarm;

// We are only interested in 'AL1' alarms and matches are for 'new' alarms
insert into AlarmWindow select * from Alarm(symbol='AL1' and type='new');

// When a 'clear' alarm comes in, we can remove the corresponding 'new' alarms.
// Deleting alarm events from the named window tells the runtime to drop partially-completed patterns of 'new' alarms
on Alarm(type='clear') as cleared delete from AlarmWindow aw where aw.id = cleared.id;

// Match-recognize finds 3 consecutive events
@name('output')
select * from AlarmWindow
match_recognize (
  measures A as a, B as b, C as c
  pattern (A B C)
)

The sample events are:

Alarm={type='new', symbol='AL1', id='A1'}
Alarm={type='new', symbol='AL1', id='A2'}
Alarm={type='clear', symbol='AL1', id='A1'}
Alarm={type='clear', symbol='AL1', id='A2'}
Alarm={type='new', symbol='AL1', id='A3'}
Alarm={type='new', symbol='AL1', id='A4'}
Alarm={type='new', symbol='AL1', id='A5'}

[top]


How do I remove all events from a window and start over?

You have a need for a data window that can detect a certain situation, and if that situation occurs you want to start fresh and remove all events?

 

Named windows and the on-delete clause address this need. This sample declares a named window to hold MyEvent events:

create window MyWindow#keepall() as select * from MyEvent

Populate the named window from all arriving MyEvent events:

insert into MyWindow select * from MyEvent

Detect the situation, in the below example the query looks at the average wait time per train station:

insert into WarningStream
select trainStation, avg(waitTime) as avgWait
from MyWindow
group by trainStation
having avg(waitTime) > 60

Use the WarningStream events to remove from the named window:

on WarningStream delete from MyWindow

[top]


How do I combine data windows and their expiry polices? Or define custom logic for removing events?

The documentation outlines the built-in data windows, some of which combine length and time based expiry.

 

Another good place to look at is a named window. Named windows provide an on-delete clause that helps to build or combine a custom strategy for when to remove events.

In addition, multiple data windows can also be combined via the retain-union and retain-intersection keywords.

Next, we'll show the named window and on-delete option. Let's start with a named window that keeps the last 1 minute of events:

create window MyWindow#time(1 min) select * from MyEvent

This example EPL removes from the named window those that have the same id:

on MyDeleteEvent as d delete from MyWindow as w where w.id = d.id

This example EPL removes non-unique rows by category, so that only the last event for each category stays in the named window. It therefore selects the remove stream (rstream) of the unique window (2 statements):

insert rstream into MyNonUnique select rstream id from MyEvent#unique(category)
on MyNonUnique as d delete from MyWindow as w where w.id = d.id

Variables can also be a useful way to parameterize an expiry policy. The next sample EPL assumes that a variable by name CURRENT_THRESHOLD was declared and employs a pattern to execute every 20 seconds:

on pattern[every timer:interval(20 sec)]
delete from MyWindow where threshold > CURRENT_THRESHOLD

Last, a plug-in data window implementation may be the right way to go if you want to parameterize it special ways or need integration into the EPL language or want to use the Esper scheduling APIs.

[top]


How do I seed an empty data window from a filled data window?

This is a feature of named windows. When a named window is filled already, and a new statement gets created on a filled named window, that statement's aggregation does not start empty. Also, named window may be initialized from other named windows. Look up the "insert" keyword in the create window clause.

 

[top]


How do I keep a separate window of events per category and compute aggregates for eh3h
category's window?

I have one or more categories and for each of these categories I need to keep a separate window of events.

We present two approaches here: The first approach shown here uses a partitioned context, and the second approach is based on the grouped data windows.

The First Approach uses a partitioned context. The first statement declares a partitioned context wherein each context partition carries events for a
different symbol value. The second statement outputs the average price for the last 10 events, separately for each symbol.

create context PerSymbolContext partition by symbol on StockTick
context PerSymbolContext select symbol, avg(price) as avgPrice from StockTick#length(10)

You could also use a category context to categorize events based on some expression, or a hash context to assign events based on consistent hash code.

The Second Approach uses grouped data windows.

In the statement below we have stock tick events for which we want to compute the average price of the last 10 stock tick events per symbol. Notice we are not using the last 10 events overall, we are looking at the last 10 events per symbol.

select symbol, avg(price) as avgPrice
from StockTick#groupwin(symbol)#length(10)
group by symbol

We can also specify multiple categories:

select symbol, location, avg(price) as avgPrice
from StockTick#groupwin(symbol,location)#length(10)
group by symbol, location

Let's consider another possible way of using a separate window of events per category. In some use cases we may need to compute not an average per group, but an average over all groups that consider only the last N events per group. This can be accomplished by leaving the group-by clause off. Now the runtime computes the average price over all symbols, considering only the last 10 events per symbol:

select symbol, location, avg(price) as avgPrice
from StockTick#groupwin(symbol)#length(10)

[top]


How do I use results of one statement in another statement?

Use the insert into syntax to use the events generated by one statement as input to another statement.

We can first compute the number of events arriving within 1 second, then use that number to perform additional aggregation. Here we compute for the last 30 seconds the maximum and minimum rate per feed (2 statements).

insert into TicksPerSecond select feed, rate(1) as cnt
from MarketDataEvent
group by feed
output snapshot every 1 sec
select feed, max(cnt) as maxCount, min(cnt) as minCount
from TicksPerSecond#time(30 sec)
group by feed

[top]


How do I put common filtering and normalization into a central place?

I have some upfront event filtering to do. I also want to attach a marker to my events depending on combinations of values. My events have these properties: ip (string), port (int), deviceType (string), deviceClass (string), service (int).

The sample schema is:

create schema Event(ip string, port int, deviceType string, deviceClass string, service int)

We'd recommend using insert into. The sample EPL below filters out any events with an ip-value of null and always assigns a marker value:

insert into FilteredNormalizedStream
select *, case
    when deviceClass IN ('ips', 'firewall') and port = 4431 then "m1"
    when service != 50 and port = 443 then "m2"
    when deviceType = 'ecut' then "m3"
    else "m4"
    end
  as marker
from Event(ip is not null) // <-- put filters here

For each incoming event, the above EPL outputs into FilteredNormalizedStream an event that contains a value "m1/m2/m3/m4" as marker.

The syntax for splitting and merging streams can come in handy for removing events that don't match any conditions. For example, if we don't care about marker "m4" we could use this instead:

// This drops any events that don't match any of the where-clauses
on Event(ip is not null) // <-- common filter criteria are placed here
insert into FilteredNormalizedStream select *, "m1" as marker where deviceClass IN ('ips', 'firewall') and port = 4431
insert into FilteredNormalizedStream select *, "m2" as marker where service != 50 and port = 443
insert into FilteredNormalizedStream select *, "m2" as marker where deviceType = 'ecut'

Finally, use the FilteredNormalizedStream, for example for counting per marker:

select marker, count(*) from FilteredNormalizedStream group by marker

[top]


How do I reduce the rate of event output by my statement? How do I get frequent but not continuous results?

Use output rate limiting to stabilize or reduce the rate at which rows are output from a query, by outputting rows at a specified time or row-based interval.

The example below limits the otherwise continuous output to an output row every 5 seconds. The output contains the feed and average volume per feed of the last 60 seconds of market data events.

select feed, avg(volume) as cnt from MarketDataEvent#time(60 sec)
group by feed
output every 5 seconds

[top]


How do I delay data? How do I compare against previous events?

There are a few different approaches that this section outlines.

Your application may need to delay events for a certain time. A simple way to delay data is to enter the data into a time window and select the remove stream which is the data leaving the window:

insert rstream into DelayedStream select rstream task, measurement, rate from CurrentStream#time(10 min)

In order to compare current data with delayed data, one possible way is to join delayed data and current data. For example:

select d.task, d.measurement, d.rate - c.rate as delta
from CurrentStream as c unidirectional, DelayedStream#lastevent() as d
where d.task = c.task and d.measurement = c.measurement

This example uses the "unidirectional" keyword. The keyword is useful to indicate that results are only output when events of one stream arrive, and not the other. In this example, when events of the DelayedStream there is no output.

Here is an alternative way using the "output snapshot" keywords instead. This example executes a join and post results only every 1 minute:

select d.task, d.measurement, d.rate - c.rate as delta
from CurrentStream#lastevent() as c, DelayedStream#lastevent() as d
where d.task = c.task and d.measurement = c.measurement
output snapshot every 1 minute

Instead of the join, the "prev" previous-event function could be used to fetch and compare data from previous rows. This is useful if the arrival intervals
of the stream are known:

select task, measurement, rate - prev(4, rate) as delta
from CurrentStream#time(5 min)

The "prev" previous-event function also works well with the "#groupwin" in that is operates per-group when used with this data window, for example:

select rate, prev(1, rate) from MyStream#groupwin(task)#length(2)

A pattern statement can also be a great approach to form pairs or triplets (or any other combination of old and current events) and insert the pattern-matched events into a new stream for further processing, or use the select-clause to determine deltas as this sample statement shows:

select a.qty - b.qty, a.acct, a.instr from pattern  [
  every a=OrderStatus -> b=OrderStatus(acct=a.acct, instr=a.instr)
]

...or...

insert into MyStream select a, b from pattern [every a=OrderStatus -> b=OrderStatus(acct=a.acct, instr=a.instr)]
select a.qty - b.qty from MyStream

[top]


How do I detect the absence of an event?

Use a pattern to detect the absence of an event. The below pattern fires if an event A is not followed by an event B within 10 seconds (alerts once).

select * from pattern [every EventA -> (timer:interval(10 sec) and not EventB)]

The next example adds an "every" to keep alerting. This example pretends there is an event "Message" that has an event property "hostname" that should match.

select * from pattern [every a=Message -> every (timer:interval(10 sec) and not Message(hostname=a.hostname))]

Outer joins are also a good way to detect missing events. A solution with an outer join was discussed above.

[top]


How do I detect the absence of an event and the presence of an event arriving late?

Let's say we want to detect 2 situations:
a) A Down event is not followed by an Up event, i.e. the Up event for the same equipment id is not coming in within 1 minute
b) A Down event is followed by an Up event 30 seconds or more after the Down event, for the same equipment id as the Up event

select * from pattern [
  every down=MyEvent(text='down') ->
  (
    (timer:interval(1 min) and not up=MyEvent(text='Up', equipmentId=down.equipmentId))
      or
    ( (timer:interval(30 sec) and not MyEvent(text='Up', equipmentId=down.equipmentId))
        ->
      up=MyEvent(text='Up', equipmentId=down.equipmentId) where timer:within(30 seconds)
    )
  )]

[top]


How do I report at a regular interval without any incoming events?

Let's say we want to have our listener get invoked every 5 seconds, and select the last value, if any, from a stream.

select (select price from MarketData#lastevent()) as price
from pattern [every timer:interval(5 sec)]

The pattern fires every 5 seconds causing the sub-select to take place, returning null if no MarketData events have come in, or returning the price column of the last MarketData event.

[top]


How do I find missing events arriving in 2 or more streams that are correlated?

As in SQL we can use outer joins to generate a result even if one or more of the correlated events are not found in a stream. Usually we want to generate the result after a certain time or after a certain number of events have been received, indicating that a correlated event is truly missing.

In this example we are looking for a withdrawal event without a login event for the same account number after 60 seconds.

We join withdrawal events with login events looking for login events that do not exist (account number is null). We want to get notified as these events leave the 60-second time window.

select withdraw.accountNumber as accntNum, withdraw.amount as amount
from WithdrawalEvent#time(60 sec) as withdraw
     left outer join
     LoginEvent#time(60 sec) as login
on login.accountNumber = withdraw.accountNumber
where login.accountNumber is null

[top]


How do I look into the past and consider missing events? How do I do a snapshot, fire-and-forget or on-demand query?

I have the good old StockTrades stream with two fields: symbol and price. I'm trying to answer the following question "Were Company X stock actions priced above $70 during any moment of the last 5 minutes?".

Unfortunately, using a simple time-based sliding window won't work. To see why, imagine there were only two price updates: the first, at 10:54 am stated that stocks were at $71. The second, 2 minutes later, notified that the stocks went down to $69. Now imagine that the above question was posed at 11:00 am (of the same day). We, humans, know that the answer is "yes" because the price was $71 between 10:54 and 10:56. But the first event is outside the 5 minutes window and will thus be ignored by the system.

Since the question is posed at 11:00am and the question result is expected to be aware of the events that arrived before 11:00am,
the solution seems to require that some events or perhaps only some aggregated information about events must be retained from before 11:00am.

Also, the "were" in the question "Were Company X stock actions priced above $70 during any moment of the last 5 minutes?" indicates that this
is a snapshot on-demand query that should return results just once, i.e. the result in not expected to be continuous but a simple tuple as a result of fire-and-forget.

In Esper the solution could be a named window that is created in the morning, say 9am. Esper also supports snapshot on-demand (fire-and-forget) queries against named windows through API and JDBC inward-facing driver for reporting. The named window would need to hold the price and also the prior price to ensure that its not missing the the drop from $71 to $69. The Esper EPL statements would be something like below:

This set of statements would be created at 9am (2 statements):

create window TickWindow#time(5 min) (price double, priorprice double)
insert into TickWindow select price, prior(1, price) as priorprice from StockTickEvent

This question posed at 11:00am via snapshot EPL query against the named window:

select * from TickWindow where price > 70 or priorprice > 70

Alternative solutions are as follows. One could use the on-select instead of a fire-and-forget query, as on-select is able to compile and maintain the proper index to speed up repeated execution. A second solution may change the statements to keep only the maximum, instead of each data point, however then the fire-and-forget queries must be limited to questions towards the max value.

Here is the syntax to use a predefine query via on-select instead of a fire-and-forget query:

on MyMaxQueryEvent as limit select * from TickWindow where price > limit.pricelimit or priorprice > limit.pricelimit

When the question is posed at 11am one can send in a MyMaxQueryEvent with the pricelimit property set to 70 and the listener to the on-select statement gets the result.

[top]


How do I detect the absence of multiple unrelated events that are known in advance? Is there a way to simplify hundreds or thousands of similar statements into one global statement? How do I "prime" or initialize a large number of similar patterns without creating a pattern statement for each pattern instance?

We would like to detect the absence of certain events in the event stream. All possible tuples are known beforehand. Let's say a tuple simple consists of the IP-Address. The possible tuples might look like the following:

(ip=192.168.1.1), (ip=192.168.1.2), ... many more..., (ip=192.168.1.3)

Every one hour, we would like to know when one of those known tuples is not present in the event stream. The pattern EPL might look like:

select * from pattern [every (timer:interval(1 hours) and not IPEvent(ip=’192.168.1.1’))]

We would like to simplify this so hundreds or thousands of similar statements are reduced to one global statement, and send in a primer event to initialize each IP address looked for.

The single pattern that reacts to primer events:

select * from pattern [every p=PrimerEvent -> (every (timer:interval(1 hours) and not IPEvent(ip=p.ip)))]

Starting from this simple pattern, one could also add additional control events, such as to indicate when to end a looking for an IP.

Another possible solution may utilize a named window to hold the last event per client id and source id, and a second named window to hold only those IP to report on. Then, in intervals, one could select those events where the timestamp is older than two hours and that
exist in the second named window, via on-select for example.

[top]


How to filter events based on parent-child relationship? How to detect events that events are neither preceded nor followed by a related event?

I have a network scenario in which ports are entities of a device. For instance if I am receiving a port down event followed by device down event or device down event followed by port down event, I should drop port down event from reporting, since the parent (device) itself down before or after some
period of time.A solution was to delay events and then outer join, something like:

select * from pattern [every ine=StatusEvent -> timer:interval(2 sec)] as dse
    unidirectional left outer join
  StatusEvent(status="down")#time(4 sec) as wsde
    on dse.ine.parent=wsde.id
  where dse.ine.status="up" or wsde.id is null

This approach uses "unidirectional" to state that only the pattern drives the join, and delays events using a pattern and "timer-interval" as part of the join from-clause.

Note the check "where ... is null" in the where-clause that ensures that, if the device id is not null i.e. a matching "down" event is found, the event is suppressed.

[top]


How do I detect when a sensor exceeds a threshold and remains above the threshold for X seconds?

I need to detect a scenario when the heart rate is above 160 for 5 minutes. We can use a pattern, like so.

// Read:
// Output when a heart rate is above 160 followed by a time interval of 5 minutes during which the heartrate stays above 160.
select * from pattern[ every (HeartRate(heartrate>160)
  -> (timer:interval(5 min) and not HeartRate(heartrate<=160)))]

In addition, I want to run the pattern detection separately for each "macAddress". Any separation like this can be addressed with a keyed segmented context, like so (two separate EPL statements, use a module if needed):

create context PartitionByMac partition by macAddress from HeartRate
// same as above, just added the "context PartitionByMac"
context PartitionByMac select * from pattern[ every (HeartRate(heartrate>160)
  -> (timer:interval(5 min) and not HeartRate(heartrate<=160)))]

[top]


How do I detect when a sensor value holds a threshold for X seconds?

Use a pattern to detect the absence of an event indicating that the sensor value falls below the threshold.

select * from pattern [ [1] every (timer:interval(10 sec) and not Sensor(temp < 100)) ]

Using the "[1]" repeat operator we can tell the pattern runtime to look for the first match. Using "every" the pattern runtime restarts the interval when the temperature sensor value falls below 100. Therefore the pattern can handle a dip or multiple dips where the temperature falls below 100. The pattern fires exactly once and only when the temperature stays at 100 or more for at least 10 seconds.

[top]


How to match when we see N events with "alert=true" over an hour, provided no events where "alert=false" came in?

Suppose we have an event with a boolean-type field 'alert.' We want to emit a match every time we see 100 events where 'alert' is true in the course of an hour, provided no events where 'alert' is false were interspersed into the events where 'alert' is true. We also want to capture all event ID's comprising the match.

We are matching within a sliding 1-hour window and we want to retain only those events that have 'alert' as true. When an event comes in with 'alert' as false we can forget about previous events that have 'alert' as true.

We use a named window for this design. This is because EPL patterns don't do sliding windows and match-recognize while it can match in a sliding window in this case we really just need to count and not pattern match.

create schema Event(event_id string, alert boolean);

create window EventWindow#time(1 hour) as Event;

// we keep the events that have 'alert' as true
insert into EventWindow select * from Event(alert);

// we delete when an event comes in with 'alert' as false
on Event(not alert) delete from EventWindow;

// output when there are 100 events
select window(event_id) as event_ids from EventWindow having count(*) >= 100;

Sample events are:

Event={event_id='001', alert=true}
Event={event_id='002', alert=true}

This solution keeps alerting for the same events. This may be desired, or if it is not desired, depending on your use case details, we can change it to reset. Here is the solution that resets when there was a match.

create schema Event(event_id string, alert boolean);

create window EventWindow#time(1 hour) as Event;

// we keep the events that have 'alert' as true
insert into EventWindow select * from Event(alert);

// we delete when an event comes in with 'alert' as false
on Event(not alert) delete from EventWindow;

// output when there are 100 events
insert into ResetTrigger select window(event_id) as event_ids from EventWindow having count(*) >= 100;

// when we have a match do a reset
on ResetTrigger delete from EventWindow;

[top]

How to match when we see N events with "alert=true" over a second, provided no events where "alert=false" came in, per partition that comes and goes?

This is the same use case as discussed immediately before this entry, with an additional requirement. Events now also have a partition-name ("partition_name") and we want to detect within the same partition-name. New partition-name values come and when there are no more events for a partition-name for 1 second we want to forget that partition-name.

Since there are multiple partitions active at a given point in time we use the EPL overlapping contexts.

// Events have a "partition_name" and belong to different partitions
create schema Event(id long, partition_name string, alert boolean);

// Define an overlapping context that allocates a new context partition for each distinct "partition_name" value.
// We want to forget partitions when there is no event for a given partition for 1 second, so we use "terminated by".
create context ContextPerPName
  initiated by distinct(partition_name) Event as initiator
  terminated by pattern [timer:interval(1 second) and not Event(partition_name = initiator.partition_name)];

// Window to hold the last 1-second of events per partition
context ContextPerPName create window EventWindow#time(1 second) as Event;

// Insert events into the respective window for that partition
context ContextPerPName on Event(partition_name = context.initiator.partition_name, alert = true) as arriving
  merge EventWindow as win where arriving.id = win.id when not matched then insert select *;

// Delete all events from the respective window for that partition
context ContextPerPName on Event(partition_name = context.initiator.partition_name, alert = false) delete from EventWindow;

// Output when any partition's window has 3 events
@Name('final') context ContextPerPName insert into ThresholdReachedTrigger
  select first(partition_name) as partition_name, window(id) as ids from EventWindow having count(*) >= 3;

// When there was output, empty the respective window for that partition
context ContextPerPName on ThresholdReachedTrigger(partition_name = context.initiator.partition_name) delete from EventWindow;

Sample events are:

Event={id=1000, partition_name="alpha", alert=true}
t=t.plus(200 milliseconds)
Event={id=1200, partition_name="alpha", alert=false}
Event={id=1200, partition_name="bravo", alert=true}
t=t.plus(200 milliseconds)
Event={id=1400, partition_name="alpha", alert=true}
t=t.plus(100 milliseconds)
Event={id=1500, partition_name="bravo", alert=true}
t=t.plus(100 milliseconds)
Event={id=1600, partition_name="alpha", alert=true}
t=t.plus(150 milliseconds)
Event={id=1750, partition_name="bravo", alert=true}
t=t.plus(50 milliseconds)
Event={id=1800, partition_name="bravo", alert=true}
t=t.plus(100 milliseconds)
Event={id=1900, partition_name="bravo", alert=true}
t=t.plus(100 milliseconds)
Event={id=2000, partition_name="bravo", alert=false}
t=t.plus(100 milliseconds)
Event={id=2100, partition_name="bravo", alert=true}
t=t.plus(301 milliseconds)
Event={id=2401, partition_name="alpha", alert=true}
t=t.plus(99 milliseconds)
Event={id=2500, partition_name="alpha", alert=true}
t=t.plus(50 milliseconds)
Event={id=2550, partition_name="bravo", alert=true}
t=t.plus(50 milliseconds)
Event={id=2600, partition_name="alpha", alert=true}
t=t.plus(100 milliseconds)
Event={id=2700, partition_name="bravo", alert=true}
t=t.plus(100 milliseconds)
Event={id=2800, partition_name="alpha", alert=true}
t=t.plus(200 milliseconds)
Event={id=3000, partition_name="alpha", alert=true}

[top]

I have an online-offline use case; I need to detect the presence of a first event and the absence of an event after some quiet time?

I'm tracking cars that have a car id and a tracked flag. I only track cars that have the tracked-flag set and ignore all others.
I need to detect when a tracked car has the first event which means the car comes online.
I need to detect when there is no event for a tracked car for 1 minute which means the car goes offline.
After going offline, I need to detect when a tracked car has an event again, which means the car has come online again after going offline.
The output should include the car id and the current or last event.

The next solution uses a table to track the current status by car id. The car id will be the primary key for the table. In the table the solution keeps the last event since the use case wants that as output.
When there is no row for a car id in the table the status if offline. When there is a row for a car id in the table the status if online.

The solution uses on-merge to merge car events with the status table. When the status table has no row for the car id, this means a car comes online for the first time or after being offline and this produces an
output event with status online (see CarOnlineStream).

The solution uses a pattern to track an exactly 1 minute interval per car id. This is similar to "#unique(carid)#time(1 minute)" which could also be used.
When the pattern detects that no events arrived for a given car id (see CarTimeoutStream) it uses on-merge again to delete the row from the status table and produce output with status offline (see CarOutputStream).

create schema CarEvent(carId string, tracked boolean);

// Create a table to hold the last car event per car-id
create table StatusTable(carId string primary key, lastevent CarEvent);

// When a tracked car arrives, update the status table for the same car id.
// When a row exists for the car, update the last event column with the current event.
// When a row does not exists for the car, insert a row and also populate the output stream CarOnlineStream
on CarEvent(tracked=true) as ce merge StatusTable as st where ce.carId = st.carId 
  when matched 
    then update set lastevent = ce 
  when not matched 
    then insert(carId, lastevent) select ce.carId, ce 
    then insert into CarOnlineStream select ce as outputevent;
  
// Track an exactly 1 minute timeout for each car id.
// This could be done more efficiently using a #unique(carid)#time(1 minute) but lets use a pattern since we don't track that many intervals.
// The CarTimeoutStream indicates that we didn't receive a ping from a car.
insert into CarTimeoutStream select e.* 
  from pattern[every e=CarEvent(tracked=true) -> (timer:interval(1 minutes) and not CarEvent(carId = e.carId, tracked=true))];

// Handle the car timeout, by deleting the row in the status table and by producing an output event to CarOfflineStream.
on CarTimeoutStream as cts merge StatusTable as st where cts.carId = st.carId 
  when matched 
    then delete 
    then insert into CarOfflineStream select lastevent as outputevent;
    
@Name('output online') select * from CarOnlineStream;
@Name('output offline') select * from CarOfflineStream;

[top]

How do I detect something really complex, like a triple-bottom pattern?

The triple-bottom pattern is out of the world of stock trading and is described in Triple-Bottom Pattern in detail.

The problem can be broken down: First, how does one identify bottom price points among a stream of market data events? Second, once the individual bottom price points are identified, how does one detect an occurrence of 3 bottom price points, whose value is within approximation of each other, and that are spaced out over time in a pattern?

The first problem is an event streaming processing problem, I believe. The stream of events is market data that contains price points for the NIFTY index over time. I'll attempt to define a bottom price point as follows: If the average price for the last 5 days is 15% lower than the average price over a period of say 60 days, then the minimum price during that 5 days is a single bottom price point. Of course the number of days and percentages are parameters to figure out and get right.

-- The query to determine the average price for the last 60 days:
insert into AvgPriceLast60Days
select avg(price) as avgPrice
from MarketData#time(60 days)
output every 10 minutes
-- The query to determine the average price for the last 5 days:
insert into AvgPriceLast5Days
select avg(price) as avgPrice, min(price) as minPrice
from MarketData#time(5 days)
output every 10 minutes
-- Compare the last average prices for each:
insert into BottomPriceEvent
select minPrice as bottomPrice
from AvgPriceLast60Days#last() as LastAvg60Days,
     AvgPriceLast5Days#last() as LastAvg5Days
where LastAvg60Days.avgPrice * 0.85 > LastAvg5Days.avgPrice
output first every 1 day

The last statement populates the "BottomPriceEvent" event stream as a stream of higher-level events in which each event represents a bottom price point.

The second part of the problem requires detecting 3 bottom price points whose values are within a given range of each other, and that have a certain temporal relationship with each other. Let's assume that the bottom price points should be within 5% each other. Let's also assume we are looking for bottom price points spaced at least 10 days apart from each other, but within 30 days of the prior bottom price point.

-- The pattern to detect the triple-bottom:
insert into TripeBottomPattern
select * from pattern [every a=ButtomPriceEvent
  -> timer:interval(10 days)
  -> ButtomPriceEvent(minPrice between 0.95*a.minPrice and 1.05*a.minPrice) where timer:within(30 days)
  -> timer:interval(10 days)
  -> ButtomPriceEvent(minPrice between 0.95*a.minPrice and 1.05*a.minPrice) where timer:within(30 days)]

Finally, the resulting TripeBottomPattern event stream created by the last statement is the higher-level complex events representing that a triple-bottom pattern has been detected.

An additional interesting problem is that the stream and pattern statements are rather long-running continuous queries, since they need to run over days and month. That may requires persisting events, and/or using simulated time by playing back past events into the runtime.

[top]


How do I stop an insert after a period of time?

Assume we only want to receive the first 10 minutes of an incoming event stream and then stop receiving data from that stream.

The timer:within pattern guard function can serve here to stop the stream after a given amount of time, as the next statement shows:

insert into PackageNotifyEvent
select myevent.uid as uid, 'HOME' as loc, 'ARRIVED' as status
from pattern[every myevent=TrackingEvent where timer:within(10 min)]

[top]


Can I use a regular expression (regexp) within a filter?

Yes a regular expression can be used as part of a filter expression. Pretty much any expression is allowed within event filter expressions other then aggregation functions and the previous or prior function.

select * from pattern[every myevent=TrackingEvent(event.uid regexp '^a-b-.*'
  and event.lat in [40.1:40.2] and event.lon in [-74.1:-74.0])]

[top]


How can I remove duplicates?

One construct EPL exposes to remove duplicates in a stream of events is the first-unique data window. Another is the pattern "every-distinct".

The next example statement reports only the first sensor event per device:

select * from Sensor#firstunique(device)

The next example statement reports only the first sensor event per device and suppresses subsequent sensor events from the same device for 30 seconds:

// Retain only the intersection of unique-by-device and 30 seconds
// Does not retain 30 seconds of events, only 30-seconds of events unique by device
select * from Sensor#firstunique(device)#time(30)

Other related constructs are the "prior" and "prev" functions, the "unique" data windows, the group-by and output-rate-limiting clauses as well as the "distinct" keyword within the select clause and also subqueries. We only discuss the pattern "every-distinct" here, please see the documentation for additional examples.

The next example utilizes a pattern which could be extended looking for additional events, and reports only the first sensor event per device and suppresses subsequent sensor events from the same device:

select * from pattern[every-distinct(s.device) s=Sensor]

When your distinct-value key is a timestamp or other ever-increasing or non-unique value, you should specify a time period indicating how long the key is valid and after which a duplicate can be reported again.

The next example utilizes a pattern that reports only the first sensor event per device and for 10 seconds suppresses subsequent sensor events from the same device:

select * from pattern[every-distinct(s.device, 10 sec) s=Sensor]

[top]


Within 5 minutes I want to collect 5 repeated and correlated events that each do not contain the same attribute(s) as previous repeated events, removing any overlapping matches?

The use case is to collect 5 events that are correlated wherein each event should have the same value for the field "source". And we want each of the 5 events to not contain the same value for the "auditrecord" field as any of the previous events, without overlapping matches i.e. any given event should not be part of any other match. All that must be within 5 minutes from the arrival of the first event.

This could be done via pattern or match-recognize. The solution here uses a pattern with @SuppressOverlappingMatches so the runtime knows to remove partially-completed patterns if there is a match.

create schema Event(source string, auditrecord string);
select * from pattern @SuppressOverlappingMatches
  [every a=Event ->
    (
      [4] every-distinct(b.auditrecord) b=Event(source=a.source and auditrecord != a.auditrecord)
    ) where timer:within(5 minutes)
  ];

You may cut & paste the above statements and the events below into the online tool and test. Here are the input events:

Event={source="id1", auditrecord="v1"}
Event={source="id1", auditrecord="v2"}
Event={source="id1", auditrecord="v3"}
Event={source="id1", auditrecord="v4"}
Event={source="id1", auditrecord="v5"}

[top]


What if I want to form pairs of events where each pair is a unique combination of h3e
latest event of two streams?

I'm trying to detect pairs of events correlated by a "type" value and a unique "device" value. Then based on this pair of events, I'd like to find the maximum "measurement" value and the corresponding "confidence" for the one with the max "measurement" value. Here's my event object:

class Sensor {
	long id;
	String type;
	String device;
	Double measurement;
	Double confidence;
}

I'll know in advance the set of possible device values, but the Sensor events can happen in any order, and two or more Sensor event for the same device might occur before a Sensor event for the other device occurs. Thus if a Sensor event for the same device occurs before a Sensor event for the other device, then the second Sensor event would replace the first Sensor event for that device of the same type. In other words, the last event for a particular device of a given type is the one that should be used in the calculation of the maximum.

The computation would be the maximum value of the 'measurement' property between A and B. Also, the 'confidence' value would correspond to the value from the event with the maximum 'measurement' property.

A sample input and output:

Sensor[id=1,type='Temperature',device='A',measurement=51,confidence=94.5]
Sensor[id=2,type='Temperature',device='A',measurement=57,confidence=95.5]
Sensor[id=3,type='Humidity',device='B',measurement=29,confidence=67.5]
Sensor[id=4,type='Temperature',device='B',measurement=55,confidence=88.0]
Sensor[id=5,type='Temperature',device='B',measurement=65,confidence=85.0]
Sensor[id=6,type='Temperature',device='B',measurement=49,confidence=87.0]
Sensor[id=7,type='Temperature',device='A',measurement=51,confidence=99.5]

For output, one would expect the following:

// First output event pairs events with id=2 and id=4 and chooses the values from id=2
MaxReading[type='Temperature',device='A',measurement=57,confidence=95.5]
// Second output event pairs events with id=6 and id=7, since the event with id=6
// replaces the one with id=5, the event with id=5 is never compared against the
// event with id=7
MaxReading[type='Temperature',device='A',measurement=51,confidence=99.5]

One possible solution builds pairs of events using a join:

// Create pairs of device A and B events
insert into Pair
select * from Sensor(device='A')#lastevent() as a, Sensor(device='B')#lastevent() as b
where a.type = b.type

From the resulting stream we remove those pairs in which either event was seen before, leaving unique pairs:

// Declaring stream type
create schema PairDuplicatesRemoved(a Sensor, b Sensor)
// Remove duplicate pairs where either sensor event is from the prior pair
insert into PairDuplicatesRemoved
select * from Pair
where a.id != coalesce((select a.id from PairDuplicatesRemoved#lastevent()), -1)
	and b.id != coalesce((select b.id from PairDuplicatesRemoved#lastevent()), -1)

The example uses the "coalesce" function to return -1 when there is no last event for PairDuplicatesRemoved, so that the first pair matches as well.

Last, select the maximum measurement value between the pair of sensor events and the corresponding confidence and device:

select a.type,
       max(a.measurement, b.measurement) as measurement,
       case when a.measurement > b.measurement then a.confidence else b.confidence end as confidence,
       case when a.measurement > b.measurement then a.device else b.device end as device
       from PairDuplicatesRemoved

[top]


How do I remove or drop events from a stream? I have a dynamic set of negative filters?

Let's assume you have a stream of events and you want to remove events from the stream before further processing of the remaining events by other statements.

The @Drop annotation marks statements that preempt further processing of an event, if an event matches multiple statements. The @Priority annotation
is also useful to specify, when an event matches multiple statements, which statements to process first. Note that @Drop and @Priority require an runtime-level configuration setting that is off by default, please see the documentation for further details.

Other ways of solving this use case could be to use the UnmatchedListener to catch unmatched events or to use the EPL split-stream syntax.

[top]


How do I detect a specific sequence of events and get all events that take part in this sequence?

I have events coming from different sources. They have 2 states: success and failure, and they have a source. I would like to create a query to know when there are, for example, for a specific source 5 failure events followed by a success one. Of course as soon as there's a success event, the previous failure events shouldn't count anymore.

There needs to be a maximum time to wait until a success event to arrive, since we don't want to keep looking without end for a matching success event. We'll put a maximum time to wait for a success event to arrive, let's say 5 minutes. So we'll just drop failure events after 5 minutes.

Let's look at some samples: F5 means 5th failure event, S3 means 3rd success event. Also let's say we only need 5 failure events before a success one to have an alert.

Case1 - If within 5 minutes I have (from the same source)

F1 F2 F3 F4 F5 S1

then I want to throw an alert. The alert must know about those events meaning I would like the listener to
get those events.

Case2 - If within 5 minutes I have (from the same source)

F1 F2 F3 F4 F5 F6 F7 F8 S1

then I would have an alert knowing about F4 to F8 and S1.

Case3 - If within 5 minutes I have (from the same source)

F1 F2 F3 F4 S1 F5 F6 S2

then no alert would be emitted since once S1 arrives there were only 4 failure events.

Case4 - still from the same source

F1 F2 F3 F4 (then 10 minutes later) F5 S1

then of course no alert as all the events aren't within the 5 minutes window.

Case5 - If within 5 minutes (this time we have the sources a and b)

F1a F1b F2a F3a F2b F4a S1b F5a F3b S1a

No alert will be create when S1b arrive because there's only 2 failures for b. When S1a arrives an alert is created because we have F1a to F5a before, with no success concerning a in between.

Solution: Since we are looking for a very specific sequence of events, a pattern is the best way to go. We want to make sure the pattern subexpressions end when a success event arrives, and this can be accomplished via the not-operator. We also want to limit the expression to live 5 minutes from the first
failure event:

every a=F -> (
        (b=F(src=a.src) and not S(src=a.src)) ->
        (c=F(src=a.src) and not S(src=a.src)) ->
        (d=F(src=a.src) and not S(src=a.src)) ->
        (d=F(src=a.src) and not S(src=a.src)) ->
        (e=S(src=a.src) and not F(src=a.src))
     )
) where timer:within(5 min)

This solution works for all cases, including case 2. Even though the pattern looks for only 5 events in a row, it looks at any 5 subsequent events for the same source, matching case 2 for events F4 to F8 and S1 (the active expressions that include F1, F2 and F3 end when F6, F7 and F8 arrive).

[top]


How to implement a sell trailing stop order?

This is an example out of the stock trading domain, in which incoming events are market prices. A sell trailing stop order is a technique that is designed to allow an investor to specify a limit on the maximum possible loss, without setting a limit on the maximum possible gain.

A sell trailing stop order sets the lower boundary (stop) price at a fixed amount below the current market price with an attached "trailing" amount.
As the market price rises, the stop price rises by the trail amount, but if the stock price falls, the stop price doesn't change, and a market order is submitted when the stop price (lower boundary) is hit.

Assume that the market price is 700 at the time of placing the trailing stop order. Assume that the stop price is 600. If the price goes to 703, the stop price must be updated to 603. If the price drops to 682, the trigger is still 603.

The solution considers the maximum market price since statement start time, compared against the current market price:

// since release 2.0
select * from Quote(symbol='GOOGL')
where price <= max(select max(lastPx) as lastPx from Quote(symbol='GOOG')) - 100, 600)

[top]


I have one listener for multiple statements, how do I track which statement generated a result?

Your listener can implement the UpdateListener interface and get passed the statement and runtime instance for each result.

For some use cases it can also come in handy to simply add a constant to each statement to identify the statement producing a result, for example:

select 120 as strategyId, * from Tick

[top]


Is there a way to receive the list of all events that participated in a window? I'm looking for a way to show the user the cause of the alert.

The data window aggregations offer a way to select the window, for example;

select window(*) from MyAlerts having count(*) > 10

The pull API is also a convenient way to retrieve data from a data window. The safeIterator method on EPStatement provides the events in a data window.

[top]


We have our own math library, what are the options of utilizing it? How can I make calls out of the EPL into our own existing code?

There are several options. The best choice among the options depends on what you want to accomplish, and how the existing library, function or other system exposes its functionality (static methods, service or POJO etc.).

The first option is the user-defined method. You can invoke a user-defined method in any expression directly without any configuration. You
can import a class via configuration to avoid package names in EPL. For example, assuming that the "com.mycompany.MyLibrary" class provides a static
method by name "computeDistance":

select com.mycompany.MyLibrary.computeDistance(x1, y1, x2, y2) from MyCoordinateEvent
// ... or after MyLibrary is imported via configuration
select MyLibrary.computeDistance(x1, y1, x2, y2) from MyCoordinateEvent

The second option is to invoke a method on your event object itself. This works only if your event representation is a Java object. An example, assuming that the "MyCoordinateEvent" event underlying class provides a method by name "computeDistance":

select myevent.computeDistance(x1, y1, x2, y2) from MyCoordinateEvent as myevent

The third option is to provide a custom aggregation function via the extension API. A custom aggregation function can take many parameters and returns only one value, i.e. cannot return multiple values, however the value returned can be any object. Please consult the documentation for examples. A sample
EPL statement is as follows, assuming that the "myTrendFunction" custom aggregation function has been created and configured:

select myTrendFunction(price) from OrderEvent group by productId

The forth option is to provide a custom data window via the extension API. A custom data window takes parameters as well as an input stream of events and generates a result stream of events. Please consult the documentation for examples. A sample EPL statement is as follows, assuming that the "mathlib:volatility" custom data window has been created and configured:

select * from OrderEvent(symbol='IBM').mathlib:volatility()

The fifth option is to use a method invocation. A method invocation is a function that acts alone or in a join and returns rows. Please consult the documentation for examples. Here is a sample EPL that utilizes a join, assuming that the "cacheLookup" function is provided by class "com.mycompany.MyLibrary":

select * from RFIDEvent, method:com.mycompany.MyLibrary.cacheLookup(assetId)

The last option is to have your listener or subscriber code invokes the library. Results can be send back into the runtime by you listener via a further event, if needed.

[top]


Can I use SOAP, WS-*, RESTful, JMS, RPC or other remote calls?

The previous FAQ had outlined how to invoke an external function. Your external function may invoke internal or external resource as part of its evaluation using any of the standards.

Also, Esper provides certain input and output adapters as described in the EsperIO documentation.

You may also want to consider creating your own event representation via the extension API if your transport or event repository already has event metadata available that you want to reuse.

In the design you should keep in mind that blocking calls may reduce throughput.

[top]


What to do if my events are not JavaBeans, not well-defined, their properties are not known in advance and may differ wildly, and are nested?

Here is an actual user question:


Each data item implements an interface, but the properties available on the concrete objects differ wildly. Also, each data item can be considered to be composed of multiple levels of these data items. One of the fields on the interface is a getParent field, which returns the data item one level up. For example: If X is the object which we have a direct handle to, X.parent = Y, and Y.parent = Z, we
often want to look at Z.field as part of our filter. Also, my events do not describe the property set of the event (not a
JavaBean). Thus, I don't directly know that X's parent is Y, whose parent is of type Z, which then has a property
named 'foo'. (Beans here are essentially user-defined structures for contributed code, and thus we have no real way
of knowing what properties are on a bean until we receive the bean. When the users use the beans, they obviously know which bean
they're working with. For us, we're getting it downstream, and that bean could be any of a dynamically growing variety.

Dynamic properties are weakly-typed dynamically-resolved properties that use a '?' syntax to denote the dynamic part of a property expression, for example:

select parent?.parent.foo from XEvent

The above query returns the value of "foo" property of the object provided by the "parent" property (if present) and its parent property (if present) of a XEvent. The value returned is of type Object and probably requires the use of the EPL "cast" or "instanceof" functions depending on what to do with the value.

By moving the '?' operator into a different position the query can indicate which properties in the nesting level must exist. The next query checks that the "parent" property exists upon compilation:

select parent.parent.foo? from XEvent

Dynamic properties can be used in combination with indexed and mapped properties as well.

Another approach is to map such Java objects to Object-Array, Map or Avro or XML event types: these also allow inheritance, nesting and dynamic properties and are easy to generate programmatically and its easier to declare the event type at runtime based on the available metadata or the actually arriving events, through EPL create-schema.

Another possible approach is to create a custom event representation plug-in.

The best approach generally is to use event inheritance (Java object, Object-Array and Map event representations) when possible, nested properties (all event representations) when possible, and strongly-typed properties to keep statements simple and easy to read.

There a number of options available in the configuration to handle Java classes that may not adhere to JavaBean conventions.

[top]


How do I query nested indexed events? Or more generally, event objects contained in event objects?

Your application may have a parent event that contains multiple subevents. You want to perform aggregation or pattern matching on the subevents plus information on the parent event.

Under the term contained-event selection the Esper runtime can handle events that contain properties that are themselves events. For example when application events are coarse-grained structures and you need to perform bulk operations on the rows of the property graph in an event, with any number of nesting level.

In this example that a user has provided, a parent ResponseTime event contains multiple subevents that each measure individual operations (a database or JMS operation, for example) that are part of a larger operation, represented by a ResponseTime event. Each ResponseTime event has a one or more SubEvent object that provide a subevent type and number of milliseconds for the operation.

The example here uses Java objects. The example works the same for XML or Map-based events, we are picking a Java event object here for demonstration. See the docs for further examples.

The sample ResponseEvent event and SubEvent definitions are:

public class ResponseEvent {
  private String category;
  private SubEvent[] subEvents;

  public ResponseEvent(String category, SubEvent[] subEvents) {
    this.category = category;
    this.subEvents = subEvents;
  }

  public String getCategory() {
    return category;
  }

  public SubEvent[] getSubEvents() {
    return subEvents;
  }
}

public class SubEvent {
  private long responseTimeMillis;
  private String subEventType;

  public SubEvent(long responseTimeMillis, String subEventType) {
    this.responseTimeMillis = responseTimeMillis;
    this.subEventType = subEventType;
  }

  public long getResponseTimeMillis() {
    return responseTimeMillis;
  }

  public String getSubEventType() {
    return subEventType;
  }
}

The next sample code snip adds the parent event type to the known types using configuration:

Configuration configuration = new Configuration();
configuration.getCommon().getConfiguration().addEventType("ResponseEvent", ResponseEvent.class);

This is a sample query to continuously output the average response time per category and subevent-type:

select category, subEventType, avg(responseTimeMillis) as avgTime
from ResponseEvent[select category, * from subEvents]#time(1 min)
group by category, subEventType

[top]


How to calculate a weighted average over multiple time ranges?

For example: avgArrival = 0.6 * avg(t-1) + 0.3 * avg(t-2) + 0.1 * avg(t-3)

The "t-1" means a range of 1-second size, for example when the current time is 14:00:

t-1: 13:59-14:00     -> avg(Arrival_13:59-14:00) = 12
t-2: 13:58-13:59     -> avg(Arrival_13:58-13:59) = 10
t-3: 13:57-13:58     -> avg(Arrival_13:57-13:58) = 15
avgArrival = 0.6*12 + 0.3 * 10 + 0.1 * 15 = 11.7

Here is a sample solution consisting of 2 statements:

// Output the average every 1 second to another stream
insert into MyAverages select avg(...) as myavg from MyEvent#time(1 sec) output snapshot every 1 sec
// Compute a weighted average based on the averages provided by the MyAverages stream
select 0.6 * prior(1, myavg) + 0.3 * prior(2, myavg) + 0.1 * prior(3, myavg) from MyAverages

[top]


How to compute for 5-minute buckets? How to aggregate (e.g.vwap) n buckets of k minus each?

In other words, my buckets look as follows:
t---(bucket1)---t-n---(bucket2)---t-2n---(bucket3)---t-3n---(bucket4)---t-4n. One solution is to make a staggered set of named windows, where the remove stream of the first named windows feeds to the second, each named window a k-minute time window, and the second to the third and so on. Here is a sample of multiple statements:

create window BucketWindow1#time(5 min) as select * from MyEvent
create window BucketWindow2#time(5 min) as select * from MyEvent
create window BucketWindow3#time(5 min) as select * from MyEvent

insert into BucketWindow1 select * from MyEvent
insert rstream into BucketWindow2 select rstream * from W1
insert rstream into BucketWindow3 select rstream * from W2

select sum(price*volume) from BucketWindow1
select sum(price*volume) from BucketWindow2
select sum(price*volume) from BucketWindow3

[top]


How can I execute a query only every X seconds?

Let's assume I only want to output every 20 seconds the last value, and forget the rest of the data. This would do it:

Select sum(price) from Marketdata#unique(ticker) group by ticker output snapshot
 every 20 seconds

If using a named window, the on-select is a good way to fire at any schedule as the pattern defines. Here the example fires the query every
20 seconds if the hour is between 9am and 9:59am:

on pattern[every timer:at(*, 9, *, *, *, */20)] select * from MyNamedWindow

You could also join a pattern to historical data, thereby repeatedly poll, optionally parameterize with variables (not shown below) to do incremental polling, a sample is here:

select * from pattern[every timer:interval(20)], sql:db1["select * from MySQLTable"]

Or join the pattern to a stream. If joining to a stream then mark the join direction as unidirectional, thereby having the join execute only when the pattern fires:

select userId, sum(qty) from pattern[every timer:interval(10)] unidirectional, MyOrderEvent#time(10 min)

Last, the next example defines a variable and sets the variable to "on" or "off" to control output of statements. Variables can also be set programmatically
via runtime API (we show multiple individual statements below).

create variable boolean var_on_off
on pattern[timer:at(*, 9, *, *, *)] set var_on_off = true
on pattern[timer:at(*, 10, *, *, *)] set var_on_off = false
insert into nextStream select * from xyz(var_on_off)
..or..
select * from xyz output when var_on_off

[top]


How do I create a recursive query? That is a query that feeds to itself and defines it's own input stream as output?

When creating your recursive query, you would want to ensure the event type is defined in advance, since the runtime will not be able to check
the type at time of statement creation, since the statement itself creates the type. Therefore, a good way is to define the type:

Map<String, Object> mapType = new HashMap<String, Object>();
mapType.put("x", long.class);
mapType.put("id", String.class);
mapType.put("vol", double.class);
configuration.getCommon().addEventType("Volatility", mapType);

Then create the query:

insert into Volatility
select id, x+ prev(1,vol) as vol ,
from Volatility#groupwin(id)#length(2)

[top]


We have an event stream for cars entering and leaving streets and want a car count per street? How do I insert or update and keep a count?

A solution is to have a named window holding the count per street and use the on-merge clause to atomically merge the indicator stream.

Define the schema to hold the car count:

create schema StreetCarCountSchema (streetid string, carcount int)

Define the schema for the arriving events indicating whether cars enter or leave a street:

create schema StreetChangeEvent (streetid string, action string);

Define a named window based on the schema to hold the count per street:

create window StreetCarCountWindow#unique(streetid) as StreetCarCountSchema

Merge the arriving data with a named window entry:

on StreetChangeEvent ce merge StreetCarCountWindow w where ce.streetid = w.streetid
when not matched and ce.action = 'ENTER' then insert select streetid, 1 as carcount
when matched and ce.action = 'ENTER' then update set carcount = carcount + 1
when matched and ce.action = 'LEAVE' then update set carcount = carcount - 1

Output the current count when it changes:

select * from StreetCarCountWindow

[top]


I would like to trigger whenever the event contains a new and distinct security id and the source is one of the {A,B,C}?

The firstunique datawindow outputs only the first unique event per criteria(s).

select * from Event(string in ('A','B','C'))#firstunique(securityId)

If you like to delete from the first-unique data window, use a named window and on-delete.

[top]


How to aggregate values between events?

I need to aggregate values between events based on some tag value in the events. For the sake of example I have an event with the following schema:

create schema MyEvent(tag int, value int);

I want to count and sum the values of zero of more events with tag != 1 between two enclosing events with tag = 1 and return the count and sum and the value from the last event. Consider the following stream of events:

tag   value

1      10
2      20
3      25
3      15
2      25
1      8

I need to return the following results:

countValuesForTagNot1=4, sumValuesForTagNot1=85, lastValueForTagEq1=8

Contexts are designed to handle this type of analysis, where the analyis has a lifecycle, or "semantic window" or "session window".

The solution for cutting & pasting into the EPL Online Tool is:

create schema MyEvent(tag int, value int);

create context TagValueContext start MyEvent(tag=1) end MyEvent(tag=1) as theLast;

context TagValueContext
select
  count(value, filter:tag != 1) as countValuesForTagNot1,
  sum(value, filter:tag != 1) as sumValuesForTagNot1,
  context.theLast.value as lastValueForTagEq1
from MyEvent
output last when terminated;

And the data:

MyEvent={tag=1, value=10}
MyEvent={tag=2, value=20}
MyEvent={tag=3, value=25}
MyEvent={tag=3, value=15}
MyEvent={tag=2, value=25}
MyEvent={tag=1, value=8}

[top]


Start a window for each unique id when it arrives, wait 10 seconds for that id and output the last event for that window, starting a new window when the next event arrives for the id?

Let's say we have a stream of ChangeEvent objects that are coming in at a rate of a few thousand per second. The ChangeEvent has an 'id' field that identifies the type of event. Over the course of 10 seconds, we could receive several ChangeEvent 'ticks' for the same id but we really only need to process the last one.

The logic needs to start a separate bucket for each ChangeEvent tick with a unique id and replace any future ticks for that same id. 10 seconds after the first tick for that id is received the last event with that id is delivered to the listener and then the window for that id is no longer available until another ChangeEvent with that id is received.

Say ChangeEvent is the tuple (id, quantity, price). For example the sequence and output is (input events):

ChangeEvent(1, 500, 99.75) t=0 secs
ChangeEvent(2, 200, 98.10) t=1 secs
...
ChangeEvent(1, 400, 99.81) t=8 secs
ChangeEvent(2, 190, 98.50) t=10.1 secs
...
ChangeEvent(1, 375, 99.82) t=10.1 secs

So my desired output to the listener is this:

ChangeEvent(1, 400, 99.81) at t=10 secs
ChangeEvent(2, 190, 98.50) at t=11 secs
ChangeEvent(1, 375, 99.82) at t=20.1 secs

Controlling the lifecycle of a data window, or any analysis, is a feature of context declarations.

// Declare a context that allocates a context partition when a unique id arrives
// and terminates the context partition after 10 seconds.
create context CtxPerIdAnd10Sec initiated distinct(id) ChangeEvent terminated after 10 sec
// Within the context, output the last event upon context partition termination
context CtxPerIdAnd10Sec select last(*), first(*) from ChangeEvent(id=context.a.id) output snapshot when terminated

We could also use a pattern for the context declaration (first statement), like shown here (second statement stays the same):

// Declare a context that allocates a context partition when a unique id arrives
// and terminates the context partition after 10 seconds.
// Specify @inclusive to have the ChangeEvent that triggers the pattern be included in the next statement's evaluation.
create context CtxPerIdAnd10Sec initiated distinct(id) ChangeEvent terminated after 10 sec

Here is another solution that applies if, for all ids, the 10-second time bucket should start and end at the same time.

create context SixtySecondBucket start @now end after 10 seconds;
context SixtySecondBucket select first(*), last(*) from ChangeEvent group by id output snapshot when terminated;

[top]


How to perform an aggregation over a "semantic" window, i.e. a window that opens upon the arrival of a given event and closes upon the arrival of another event?

The overlapping or non-overlapping contexts address this use case. In addition named windows can also be used. Both solutions are discussed here.

An example use case for this is: Give me the average CPU utilization of machine "X" while user "Y" was logged onto it. So the "semantic" window opens when the event "user_Y_logged_In" arrives and closes when another event "user_Y_logged_Out" arrives.

This first solution declares a context that initiates when a user logs in and terminates when a user logs out.

// declare context
create context WhileUserLogedInCtx as initiated by UserLoginEvent as ule terminated by UserLogoutEvent(userId=ule.userId)
// perform some analysis for that user
context WhileUserLogedInCtx select sum(cpuTime) from CPUReportEvent(userId = context.ule.userId)

This second solution uses two named windows. Named windows offer a custom expiry policy, through the use of on-merge and on-delete.

Hold CPU utilization:

create window CPUUtilizationWin#time(1 day) as CPUUtilization
insert into CPUUtilizationWin select * from CPUUtilization

Hold users that are logged in:

create window UserWin#unique(userId) as (userId string, loginTime long)
on LogInEvent li merge UserWin uw where li.userId = uw.userId when not matched then insert select li.userId as userId, current_timestamp() as loginTime
on LogOutEvent lo delete from UserWin uw where uw.userId = lo.userId

Output average utilization every 1 minute:

select userId, (select avg(cpu) from CPUUtilizationWin where timestamp between uw.loginTime and current_timestamp()) as avgUserCPU
from pattern[every timer:interval(1 min)] unidirectional, UserWin uw group by userId

[top]


How do I control start and end of matching considering a certain amount of time passed or a certain number of events arrived?

For additional control over how an analysis starts and ends you can define event types and use insert-into.
For example, assume we want to start an analysis when either a stock tick event or a new event come in for a given name. Assume we want to end an analysis and output when either 10 events arrived or 5 seconds passed.

// create some event types for events that may arrive
create schema StockTick(symbol string, price double);
create schema NewsEvent(symbol string);

// create an event type to indicate that an analysis lifecycle starts
create schema AnalysisStartTrigger();

// create an event type to indicate that an analysis lifecycle ends
create schema AnalysisEndTrigger();

// create a non-overlapping context that starts and ends with the respective trigger events
// use "initiated" and "terminated" for overlapping contexts.
create context StockTickSessionContext start AnalysisStartTrigger end AnalysisEndTrigger;

// Produce a start-analysis trigger event when a stock tick event comes in
insert into AnalysisStartTrigger select null from StockTick(symbol='GE', price>20);

// Produce a start-analysis trigger event when a news event comes in
insert into AnalysisStartTrigger select null from NewsEvent(symbol='GE');

// Produce a end-analysis trigger event after 10 events have come in after the start
context StockTickSessionContext insert into AnalysisEndTrigger select null from StockTick(symbol='GE') having count(*) = 10;

// Produce a end-analysis trigger event after 5 seconds after the start
context StockTickSessionContext insert into AnalysisEndTrigger select * from pattern[timer:interval(5)];

// Do some analysis between start and end
context StockTickSessionContext select count(*) as countNews from NewsEvent output when terminated;

Sample Events:

StockTick={symbol='GE', price=20.5}

t=t.plus(5 seconds)

[top]


I have two streams A and B that I want to join based on an "id" value that occurs exactly once per event type, and they can come any order.

For example, if the input sequence is { A(id=1), B(id=1) } the join should match. Same for { B(id=1), A(id=1) }. The order could be chaotic, for example the sequence { A(id=1), B(id=2), B(id=4), B(id=2) } should not match and { A(id=1), B(id=2), B(id=4), A(id=2) } should match on "id=2".

This solution uses match-recognize for pattern detection for two reasons. One, the "id" value could be seen as a partition and the pattern matching should take place within each individual "id" partition. Second, match-recognize allows us to specify a data window within which to match patterns. So when
events leave the data window the runtime can forget the partition.

Sample statement:

select * from MyEvent#length(10)
match_recognize (
  partition by value
  measures E1.value as value
  pattern (E1 E2 | E2 E1)
  define
    E1 as E1.string = 'A',
    E2 as E2.string = 'B'
)

Note the length-window: when a given event does not match, that event will eventually leave the window and the partition for that "id" value gets removed from memory by the runtime. Also, when a match occurs in a partition and no remaining matches are active for the same partition the runtime removes that partition, reducing memory use.

[top]


I have a port scan detector and would like to create a new event based on multiple events occurring over a given period of time?

Events are port scan events and have the following fields:

create schema PortScanEvent(src string, dst string, port int, marker string)

A sample event may look like this:

PortScanEvent = {src = '10.0.0.1', dst = '10.0.0.2', port = 16, marker = 'm1'}

We want detect and report a port scan situation.

A port scan situation starts when for 30 seconds there are 20 or more events unique by port for a given combination of {src, dst}. The output event should have {type='DETECTED'} and include a marker list and ports. After the port scan situation has been detected, there are additional output events that should occur every minute, on the minute.

  1. Send the current count for that {src, dst} with {type='UPDATE'} and include a marker list
  2. When the count falls below 10, send one last count and {type='DONE'} and stop sending the current count.
  3. When the count stays over 10 for 12 hours, send one last count with {type='EXPIRED'} then stop sending a count.
    Only detect the situation again for that {src, dst} when the count drops below 10.

Solution:

// Define port scan event event type.
// The most efficient representation of an event is an object array.
create objectarray schema PortScanEvent(src string, dst string, port int, marker string);

// Hold the current aggregation state (counts, grouped data) per key in a central place.
create table ScanCountTable(src string primary key, dst string primary key, cnt count(*), win window(*) @type(PortScanEvent));

// Aggregate, keeping state in the central place.
// Populate a stream of count per {src, dst} pair.
into table ScanCountTable
insert into CountStream
select src, dst, count(*) as cnt, window(*) as win
from PortScanEvent#unique(src, dst, port)#time(30 sec) group by src,dst;

// Define a named window to hold the situations currently detected.
create window SituationsWindow#keepall() (src string, dst string, detectionTime long);

// Inserted newly-detected situations into the named window.
on CountStream(cnt >= 20) as cs
merge SituationsWindow sw
where cs.src = sw.src and cs.dst = sw.dst
when not matched
  then insert select src, dst, current_timestamp as detectionTime
  then insert into OutputAlerts select 'DETECTED' as type, cs.cnt as cnt, cs.win as contributors;

// Every 1-minute output an update for all currently-detected situations.
on pattern [every timer:at(*, *, *, *, *)]
insert into OutputAlerts
select 'UPDATE' as type, ScanCountTable[src, dst].cnt as cnt, ScanCountTable[src, dst].win as contributors
from SituationsWindow sc;

// Every 1-minute remove expired or completed situations.
on pattern [every timer:at(*, *, *, *, *)]
merge SituationsWindow sw
when matched and (select cnt from ScanCountTable where src = sw.src and dst = sw.dst) < 10
  then delete
  then insert into OutputAlerts select 'DONE' as type, ScanCountTable[src, dst].cnt as cnt, null as contributors
when matched and detectionTime.after(current_timestamp, 16 hours)
  then delete
  then insert into OutputAlerts select 'EXPIRED' as type, -1L as cnt, null as contributors;

// Listen to output alerts.
@name('output') select * from OutputAlerts;

[top]

How do I combine results from two distinct events and send out a single update? How do I process large events that contain further rows of information and need mapping information?

This example supposes there are two streams of inputs, a stream of foreign symbols and a stream of local symbols.

The stream of foreign symbols has a value for each foreign symbol. In JSON the foreign symbol events looks like this:

{"companies": [ 
  {"foreignSymbol":"ABC", "value":500},
  {"foreignSymbol":"DEF", "value":300},
  {"foreignSymbol":"JKL", "value":400} ] }

The stream of local symbols also has a value for each local symbol. In JSON the local symbol events look like this:

{"companies": [ 
  {"localSymbol":"123", "value":600},
  {"localSymbol":"456", "value":100},
  {"localSymbol":"789", "value":200} ] }

There exists a mapping between foreignSymbol and localSymbol. In JSON the mapping looks like this:

{"mappings": [
  {"foreignSymbol": "ABC", "localSymbol": "123" },
  {"foreignSymbol": "DEF", "localSymbol": "456" },
  {"foreignSymbol": "GHI", "localSymbol": "789" },
  {"foreignSymbol": "JKL", "localSymbol": "666" } ] }

When we receive the two inputs we need to combine data from both these streams and send out just one event which contains a union of rows from both the events. When the same row exists in both the input streams, compared using the mapping of symbols, the output should pick up the row whose value is larger. Therefore the desired output in JSON looks like this:

{"companies": [
  {"foreignSymbol": "ABC", "localSymbol": "123", "value": 600 },
  {"foreignSymbol": "DEF", "localSymbol": "456", "value": 300 },
  {"foreignSymbol": "GHI", "localSymbol": "789", "value": 200 },
  {"foreignSymbol": "JKL", "localSymbol": "666", "value": 400 } ] }

This calls for some splitting and combining. The incoming events gets "split" into a row-per-symbol, then a processing step occurs on each row-per-symbol, and then a "combine" step puts together the output event from the result of the row-per-symbol processing. In EPL a split-combine is done with on-insert. In the documentation the chapter is "Splitting and Duplicating Streams".

Each incoming event has nested rows. To arrive at a row-per-symbol and process each nested row in EPL there is contained-event selection. See "Contained-Event Selection" in the docs. To mark the beginning and end of the split-combine, the on-insert can produce marker events.

To process the row-per-symbol, the design uses on-merge with a result table and a subquery against the mapping table. The design stores the result in a result table.

The design forms pairs of foreign-symbol events and local-symbol events by simply inner-joining the two streams.

When a new pair of foreign and local events needs to be processed, the design uses a context so that the runtime allocates a table at the start of processing a pair (SymbolsPairBeginEvent) and destroys the table at the end of processing a pair (SymbolsPairEndEvent).

The processing order follows the on-insert that drives the split and combining into a result. The runtime processes event-by-event and therefore the runtime executes these steps:

  1. A new foreign-symbol event or a new local-symbol event arrive, and the runtime generates a SymbolsPair event that has both the foreign-symbol event and the local-symbol event.
  2. The On-insert processes the SymbolsPair and kicks off processing for (3) to (7)
  3. The runtime processes the SymbolsPairBeginEvent marker event which allocates the table instance
  4. For the foreign-symbols event of the SymbolsPair, the runtime processes each symbol row (ForeignSymbolRow) and inserts or updates the result table
  5. For the local-symbols event of the SymbolsPair, the runtime processes each symbol row (LocalSymbolRow) and inserts or updates the result table
  6. The runtime processes the SymbolsPairOutputEvent which outputs the result rows
  7. The runtime processes the end marker event (i.e. SymbolsPairEndEvent) which terminates the context partition and thereby discards the table

The EPL is...

// The combination of symbol and value
create schema Symbol(symbol string, value double);

// The ForeignSymbols event type has a bunch of companies i.e. a bunch of symbols and values
@public @buseventtype create schema ForeignSymbols(companies Symbol[]);

// The LocalSymbols event type has a bunch of companies i.e. a bunch of symbols and values
@public @buseventtype create schema LocalSymbols(companies Symbol[]);

// The mapping table keeps the mapping of foreign symbols to local symbols
create table Mapping(foreignSymbol string primary key, localSymbol string primary key);

// The MappingEvent will be used to provide the mapping values
@public @buseventtype create schema MappingEvent(foreignSymbol string, localSymbol string);
on MappingEvent merge Mapping insert select foreignSymbol, localSymbol;

// We index the mapping table for fast lookup on foreign and local symbol
create index MappingIndexForeignSymbol on Mapping(foreignSymbol);
create index MappingIndexLocalSymbol on Mapping(localSymbol);

// This forms a pair of the last ForeignSymbols and the last LocalSymbols event.
// The output stream SymbolsPair has "foreign" and "local" properties that hold the arriving event.
insert into SymbolsPair select * from ForeignSymbols#lastevent as foreign, LocalSymbols#lastevent as local;

// This is the Splitting. From each SymbolsPair event it generates one SymbolsPairBeginEvent that will be processed first.
// It generates ForeignSymbolRow events for all companies in the "foreign" property (the event type of ForeignSymbols). The runtime will process these after.
// It generates LocalSymbolRow events for all companies in the "local" property (the event of type LocalSymbols). The runtime will process these after.
// It generates one SymbolsPairOutputEvent which shall trigger the combined output. The runtime will process this after.
// It generates one SymbolsPairEndEvent which shall discard the accumulated state so memory gets freed. The runtime will process this last.
on SymbolsPair
  insert into SymbolsPairBeginEvent select null
  insert into ForeignSymbolRow select * from [foreign.companies]
  insert into LocalSymbolRow select * from [local.companies]
  insert into SymbolsPairOutputEvent select null  
  insert into SymbolsPairEndEvent select null  output all;

// The context serves to allocate the result table and discard the result table when we are done with a SymbolsPair
create context SymbolsPairContext start SymbolsPairBeginEvent end SymbolsPairEndEvent;

// This is the result table. The context makes sure it gets destroyed after processing each SymbolsPair.
context SymbolsPairContext create table Result(foreignSymbol string primary key, localSymbol string primary key, value double);

// Merge each foreign symbol rows with the result table
context SymbolsPairContext on ForeignSymbolRow as fsr merge Result as result where result.foreignSymbol = fsr.symbol
  when not matched then insert select fsr.symbol as foreignSymbol,
    (select localSymbol from Mapping as mapping where mapping.foreignSymbol = fsr.symbol) as localSymbol, fsr.value as value
  when matched and fsr.value > result.value then update set value = fsr.value;

// Merge each local symbol rows with the result table
context SymbolsPairContext on LocalSymbolRow as lsr merge Result as result where result.localSymbol = lsr.symbol
  when not matched then insert select (select foreignSymbol from Mapping as mapping where mapping.localSymbol = lsr.symbol) as foreignSymbol,    lsr.symbol as localSymbol, lsr.value as value
  when matched and lsr.value > result.value then update set value = lsr.value;

// Produce the output
@name('out') context SymbolsPairContext on SymbolsPairOutputEvent select foreignSymbol, localSymbol, value from Result order by foreignSymbol asc;

The events are, for use with the online tool, ...

MappingEvent={foreignSymbol="ABC", localSymbol="123"}
MappingEvent={foreignSymbol="DEF", localSymbol="456"}
MappingEvent={foreignSymbol="GHI", localSymbol="789"}
MappingEvent={foreignSymbol="JKL", localSymbol="666"}

ForeignSymbols={companies={ {symbol='ABC', value=500}, {symbol='DEF', value=300}, {symbol='JKL', value=400}}}
LocalSymbols={companies={ {symbol='123', value=600}, {symbol='456', value=100}, {symbol='789', value=200}}}

[top]

How to get notified each time a certain value has increased by a specified amount i.e. each time the value is greater than the value + x?

A table or named window could both be used to keep the current threshold. This solution uses a table. We define a reset event type to set an initial threshold.

create schema ValueEvent(value long);
create schema ResetEvent(startThreshold long);
create table CurrentMaxTable(currentThreshold long);
@name('trigger') insert into ThresholdTriggered select * from ValueEvent(value >= CurrentMaxTable.currentThreshold);
on ResetEvent merge CurrentMaxTable when matched then update set currentThreshold = startThreshold when not matched then insert select startThreshold as currentThreshold;
on ThresholdTriggered update CurrentMaxTable set currentThreshold = value + 100;

The complete code with sending events and asserting the output is below.

String epl =
		"create schema ValueEvent(value long);\n" +
		"create schema ResetEvent(startThreshold long);\n" +
		"create table CurrentMaxTable(currentThreshold long);\n" +
		"@name('trigger') insert into ThresholdTriggered select * from ValueEvent(value >= CurrentMaxTable.currentThreshold);\n" +
		"on ResetEvent merge CurrentMaxTable when matched then update set currentThreshold = startThreshold when not matched then insert select startThreshold as currentThreshold;\n" +
		"on ThresholdTriggered update CurrentMaxTable set currentThreshold = value + 100;\n";
EPDeployment deployment = compileDeploy(runtime, epl);

SupportUpdateListener listener = new SupportUpdateListener();
runtime.getDeploymentService().getStatement(deployment.getDeploymentId(). "trigger").addListener(listener);

runtime.getEventService().sendEventMap(Collections.singletonMap("startThreshold", 100L), "ResetEvent");
runtime.getEventService().sendEventMap(Collections.singletonMap("value", 30L), "ValueEvent");
runtime.getEventService().sendEventMap(Collections.singletonMap("value", 99L), "ValueEvent");
runtime.getEventService().sendEventMap(Collections.singletonMap("value", 100L), "ValueEvent");
EPAssertionUtil.assertProps(listener.assertOneGetNewAndReset(), "value".split(","), new Object[]{100L});

runtime.getEventService().sendEventMap(Collections.singletonMap("value", 101L), "ValueEvent");
runtime.getEventService().sendEventMap(Collections.singletonMap("value", 103L), "ValueEvent");
runtime.getEventService().sendEventMap(Collections.singletonMap("value", 130L), "ValueEvent");
runtime.getEventService().sendEventMap(Collections.singletonMap("value", 199L), "ValueEvent");
runtime.getEventService().sendEventMap(Collections.singletonMap("value", 200L), "ValueEvent");
EPAssertionUtil.assertProps(listener.assertOneGetNewAndReset(), "value".split(","), new Object[]{200L});

runtime.getEventService().sendEventMap(Collections.singletonMap("value", 201L), "ValueEvent");
runtime.getEventService().sendEventMap(Collections.singletonMap("value", 260L), "ValueEvent");
runtime.getEventService().sendEventMap(Collections.singletonMap("value", 301L), "ValueEvent");
EPAssertionUtil.assertProps(listener.assertOneGetNewAndReset(), "value".split(","), new Object[]{301L});

One could instead use the following to have the first-arriving value event provide the initial current threshold:

insert into ResetEvent select value+100 as startThreshold from ValueEvent.firstevent()

[top]

I need to compare objects within an event? I also plan to use JavaScript scripting for custom logic?

I have an event that has nested objects that I need to compare.

Let's say my event is a predefined class like this:

public class MyEvent {
  Calendar transmissionTime;
  List workingDays;
...}

And each working day is a predefined class:

public class WorkingDay {
  String dayName;
  Calendar startTimeOfDay;
  Calendar endTimeOfDay;
...}

I want to create a statement that filters out the events, where following things are true:

  • The "transmissionTime" field has the same day name as the "dayName" fields of all of the "workingDay" objects (extract day name using some code)
  • The "transmissionTime" field time of day is outside of the range of the "startTimeOfDay" and "endTimeOfDay" fields of all of the "workingDay" objects

For getting a day name from a Calendar field there are various ways. This solution calls a JavaScript script so that any custom logic for getting special day names can added without having to write a user-defined function or other callout to code.

expression string getCalendarDayName(ts) [
  getCalendarDayName(ts);
  function getCalendarDayName(ts) {
    return new java.text.SimpleDateFormat("EEEE").format(ts.getTime());
  }
]
select * from MyEventPOJO(
  workingDays.allOf(v => dayName = getCalendarDayName(transmissionTime))
  and
  workingDays.allOf(v => transmissionTime.before(startTimeOfDay) or transmissionTime.after(endTimeOfDay))
)

[top]

Notify the user when an observed data field is three standard deviations above or below its mean.

The having-clause is the right place for comparing:

// This is the input event type.
create schema MyEvent(id string, value double);

// Compare mean and standard deviation
@name('alarm') select *, avg(value) as mean, stddev(value) as sdev from MyEvent
 having Math.abs(value - avg(value)) > 3 * stddev(value);

You can cut&paste the above and the below into the online tool. Sample events are:

MyEvent={id='E1', value=5}
MyEvent={id='E1', value=5}
MyEvent={id='E2', value=5}
MyEvent={id='E4', value=5}
MyEvent={id='E5', value=5}
MyEvent={id='E6', value=5}
MyEvent={id='E7', value=5}
MyEvent={id='E8', value=5}
MyEvent={id='E10', value=5}
MyEvent={id='E11', value=5}
MyEvent={id='E12', value=10000000}

[top]

Notify the user when two out of three successive data points lie more than two standard deviations from the mean on the same side of the mean line.

When an event arrives, we need to take the current mean and current standard deviation and compare it to the event value. The output is an enriched row that we then analyze.

// This is the input event type.
create schema MyEvent(id string, value double);

// This event type holds the enriched event.
create schema Enriched(e MyEvent,
  currentMean double,
  currentStddev double,
  distanceMean double,
  exceedFlag boolean);

// Compute enriched event: this computes the flag that a data points lies more than two standard deviations over mean
insert into Enriched select e,
  avg(value) as currentMean,
  stddev(value) as currentStddev,
  value - avg(value) as distanceMean,
  Math.abs(value - avg(value)) > 2 * stddev(value) as exceedFlag
from MyEvent as e;

// Finally alarm when the history tells us that we exceeded
@name('alarm') select * from Enriched#length(3)
having exceedFlag and
  window(*).countOf(v=>v.exceedFlag) >= 2 and
  Math.abs(window(*).sumOf(v=>Math.signum(v.distanceMean))) >= 2;

You can cut&paste the above and the below into the online tool. Sample events are:

MyEvent={id='E1', value=5}
MyEvent={id='E2', value=6}
MyEvent={id='E3', value=4}
MyEvent={id='E4', value=5}
MyEvent={id='E5', value=5}
MyEvent={id='E6', value=5}
MyEvent={id='E7', value=5}
MyEvent={id='E8', value=5}
MyEvent={id='E9', value=5}
MyEvent={id='E10', value=20}
MyEvent={id='E11', value=1000}

Just for reference, the below solution is very different in that it compares prior event's value against the current mean and deviation but this would not solve the use case.

// NOTE:
// The below is added information, as an example of an possibly incorrect solution, and that may NOT solve use case depending on exact requirements
create schema MyEvent(id string, value double);

expression distances {e => new {
  avg = avg(value),
  sdev = stddev(value),
  d0 = value - avg(value),
  m0 = Math.abs(value - avg(value)) > 2 * stddev(value),
  s0 = Math.signum(value - avg(value)),
  p1 = prior(1, value),
  d1 = prior(1, value) - avg(value),
  m1 = Math.abs(prior(1, value) - avg(value)) > 2 * stddev(value),
  s1 = Math.signum(prior(1, value) - avg(value)),
  p2 = prior(2, value),
  d2 = prior(2, value) - avg(value),
  m2 = Math.abs(prior(2, value) - avg(value)) > 2 * stddev(value),
  s2 = Math.signum(prior(2, value) - avg(value))
}}
insert into MyEventWithDistances select e, distances(e) as d from MyEvent as e;

@name('alarm') select * from MyEventWithDistances(
  {d.m0, d.m1, d.m2}.countOf(v=>true) > 2  and
  Math.abs({d.s0, d.s1, d.s2}.sumof()) >= 2
);

[top]

List the items that receive bids outside of the period of their auction.

This problem is a detection of the relative positioning of multiple events in the stream: an alarm should be raised if, for example, a bid for some item is seen before the start event for that same item. Similarly, an occurrence of a bid event is also invalid if it takes place (n) EndOfDay events after its opening, with (n) being the duration attribute of the corresponding start event.

create schema StartAuction(auctionId string, durationInDays int);

create schema Bid(auctionId string, bidId string);

create schema EndOfDay(auctionId string);

create table LiveAuction(auctionId string primary key, remainingDays int);

on StartAuction as aa merge LiveAuction as la where la.auctionId=aa.auctionId
when not matched then insert select aa.auctionId as auctionId, durationInDays as remainingDays;

on EndOfDay as eod merge LiveAuction as la where la.auctionId=eod.auctionId
when matched and remainingDays > 1 then update set remainingDays = remainingDays - 1
when matched and remainingDays = 1 then delete;

@name('alarm') select * from Bid(LiveAuction[auctionId] is null);

This solution approach uses a table to keep track of active auctions and their remaining days, as that seems a fairly simple solution.

Sample events are:

Bid={auctionId='A1', bidId='B1'}
StartAuction={auctionId='A1', durationInDays=3}
Bid={auctionId='A1', bidId='B2'}
EndOfDay={auctionId='A1'}
EndOfDay={auctionId='A1'}
EndOfDay={auctionId='A1'}
Bid={auctionId='A1', bidId='B3'}

[top]


How to detect when vehicles enter a region and exit a region using geo-fencing and spatial?

I have cities in a country and each region around a city is a shape represented by a circle. The circle has GPS coordinates of the circle center and has a radius. An example is circle-center point '41.90636, 12.49694' and radius '10000'.

I have vehicles that enter and exit regions. Each vehicle has vehicle identification (vin) and current GPS coordinates. The schema for vehicles will be the following.

  create schema Vehicle(x double, y double, w double, h double, vin string, coordinates string)

I want to know when vehicles enter a region and I want to know when vehicles exit a region. I have a lot of regions.

The design for this use case uses the Esper MXCIF quadtree index for keeping the regions. The index enables finding, for each vehicle event, the matching regions by a very well-performing index lookup. MXCIF quadtrees require the bounding rectangle for each region. Computing a rectangle from the circle is not done by Esper as its trivial math and is not here in this design.

This design uses a table to keep the regions.

  create table Regions(regionId string primary key, rx double, ry double, rwidth double, rheight double, circle string, circleRadius string)

The next EPL declares the MXCIF quadtree index for the rectangle representing each region. This allows matching incoming vehicle coordinates to rectangles (one rectangle per region).

  create index RegionIndex on Regions((rx, ry, rwidth, rheight) mxcifquadtree(36.5265, 6.51496, 10, 12))

The next EPL looks up the region. Its output is a stream of events by name VehicleWithRegion. Each event in VehicleWithRegion contains the vehicle number and the region id, or a null value for the region id if the vehicle is not in any of the regions. The "pointContainedInCircle" further compares the circles. This is necessary as the quadtree index is based on rectangles and would otherwise give false positives.

insert into VehicleWithRegion 
select vin, 
  (select regionId 
    from Regions 
    where rectangle(rx, ry, rwidth, rheight).intersects(rectangle(vin.x, vin.y, vin.w, vin.h)) and 
      pointContainedInCircle(vin.coordinates, circle, circleRadius)).firstOf() as regionId 
  from Vehicle as vin

To detect that a vehicle enters a region the EPL below uses a pattern that matches when the region id transitions for null to non-null.

insert into VehicleEnteredRegion select * from VehicleWithRegion match_recognize (
  partition by vin
  measures e1 as e1, e2 as e2
  pattern (e1 e2)
  define
    e1 as e1.regionId is null,
    e2 as e2.regionId is not null)

To detect that a vehicle exists a region simply switch the define-clause expressions.

Common API Questions

I want to know what streams an EPL statement references but don't want to parse the EPL string? I want to programmatically inspect and change the select clause columns when a user enters an EPL query, how to do this?

The statement object model is designed to handle this case. Your application could parse each EPL module (EPCompiler#parseModule) and parse each statement (EPCompiler#eplToModel) and get an
EPStatementObjectModel
object representation of the statement and then interrogate the from-clause or select-clause. The statement object model also allows EPL query to be composed from scratch via normal Java POJO objects. A statement object model can be rendered back into an EPL string, and it is also possible to create a statement directly from a statement object model.

[top]


How Do I Handle Graph Data, Vertices, Edges, Graph Nodes, Linked Data, Relationships

The information herein does not apply to Object Graphs or Events-Within-Events. See the documentation for "Event Representations" and "Contained-Event Selection" for object or event graphs.

Instead this section is about graphs such as when the relationship between data is more than 2 levels deep and when the graph consists of edges and vertices. Examples of such graphs are social graphs that model relationships between people and groups, or a network topology graph such as representing a server A that is connected to switch B connected to router C connected to switch D connected to server E.

Esper does not "elegantly" handle traversing a graph and does not support "recursive" or "recursive common table expressions (CTEs)". SQL in general does not "elegantly" handle graphs.

The common approaches to handling graphs with Esper are discussed here and in no particular order.

You could use an external grUaph database. Esper can query external databases using "method:"-joins or using user-defined function. See the doc for "Accessing Non-Relational Data via Method, Script or UDF Invocation".

You could use an external graph library. There is an example for integrating a graph library among the Esper examples and the example’s name is "cycledetect". The graph resides in an extension aggregation function.

Esper can do self-joins. Self-joins can handle a given depth. You would want to use the "unidirectional" keywords and would probably want to use "left outer join" if the depth of the graph varies. This approach cannot handle unlimited-depth graphs.

Your application could flatten the graph and keep a row that connects each node to each other node that is directly or indirectly connected. This probably works well for small graphs. Esper can keep a flattened graph in a table or named window. You can use joins and subqueries or just table-access expressions to query a flattened graph. You would want to design the table(s) and table indexes for fast lookup since there may be many relationships.

Some of the SQL databases have graph extensions and have support for the SQL "recursive" keyword. Esper can query SQL databases. See the doc for "Joining SQL Query Results".


How do I store statements in a file? Is there a standard file storage?

Esper does not prescribe any particular way of storing EPL modules or statements. Some applications prefer XML files and some prefer properties files. Your application may use a table in a relational database to store statements if so desired.

 

We present below an example of storing statements in a property file. This is not a recommendation and is merely one possible way among many of storing EPL modules in a file.

Here is the example:

#Alert12
esper.statement.alert12.0=create window Alert12Window#time(1 hour) as select ticker as alertId, this from LastTickEvent
esper.statement.alert12.0.ha.prefix=resilient
esper.statement.alert12.1=insert into MaxPrice:alertId:(lastPx) select max(lastPx) as lastPx from LastTickEvent(ticker=':ticker:')
esper.statement.alert12.1.ha.prefix=resilient
esper.statement.alert12.2=insert into Alert:alertId:Event select * from LastTickEvent(ticker=':ticker:') where lastPx <= (1-(:value:/100.0))*(select lastPx from MaxPrice:alertId:#lastevent())
esper.statement.alert12.2.ha.prefix=durable
esper.statement.alert12.3=insert into Alert12Window select ':alertId:' as alertId, quote.this as this from pattern [quote=Alert:alertId:Event]
esper.statement.alert12.3.ha.prefix=durable

This way of storing EPL assignes an alert name and a suffix that represents the statement number for all EPL statements for the alert. It uses the

:replaceme:

format as a parameter placeholder that the application itself replaces with a value before creating a statement. This example does not use prepared statements.

[top]


How can I parse and validate an EPL statement?

For just parsing an EPL statement or module and performing syntax validation you could use the compiler.

 

[top]


Can I create a statement in "stopped" state, attach listeners or subscribers and start it?

Use the EPRuntime#getRuntimeInstanceWideLock to obtain the lock for managing multiple deployments transactionally.

[top]


Can I use an UpdateListener to listen to events and the iterator-based pull-API
together?

UpdateListener and iterator can be used together. An update listener implementation can also itself query the same statement or another statement's iterator, as the runtime guarantees the iterators are up-to-date before calling update listeners even across statements.

The iterator can also be used to query for no matches. You should find the iterator to have minimal overhead depending on the type of statement iterated on, the overhead for patterns statements specifically is negligible.

[top]


I want to know if an event hits at least one registered query?

Esper has an UnmatchedListener interface that one can register with the runtime via runtime.getEventService().setUnmatchedListener(UnmatchedListener).
The UnmatchedListener receives any event that has not been processed by any statement, i.e. events where no statement’s stream filter matches (where-clause however counts as a match since the event enters a data window with a where clause, i.e. only stream filters count).

[top]


How to get a single callback with the list of matching subscriptions?

Question Detail: I need to have thousands of active filters in a single Esper runtime instance all listening to the same stream. When an event gets posted into this stream and is matched with N of these filters, I want to get a single callback rather than N of such callbacks. I know there is a statement-aware listener
which makes it possible to have one handler for all the N statements, but that is still N update calls. I need something like "EPStatement[] statements" in my UpdateListener? Is it possible?Answer: The runtime does not have an option to bundle deliveries of multiple statements to a single listener into a single invocation to that listener.However this seems easiest solved by using a single UpdateListener or StatementAwareUpdateListener that accumulates the received events.
From a threading perspective in the default configuration you may simply ask that listener to return the accumulated events in the same thread that sends the events. This works well since by default the thread that sends an event will process it end-to-end thereby you have a guarantee when the thread returns that all is delivered to the listener.

[top]


When to use a plug-in aggregation function and when to use a plug-in custom data window?

If you are not sure how to choose between custom plug-in aggregation functions and custom plug-in data window, this entry explains the differences or advantages of one over the other in more detail.

A plug-in custom aggregation function works like other aggregation functions such as count, sum, average or standard deviation and may appear in the select-clause and in the having-clause in EPL.

A plug-in custom data window can be a data window providing an expiry policy like a time window or length window, for example. Or instead a custom data window can derive new information from a stream of events such as the results of a linear regression function (aka. derived-value data window).

A plug-in data window is always attached to a certain type of event that is provided by a filtered event stream or a pattern or by another data window. Plug-in data windows can receive only one type of input event (input stream). If the data window is a data window, the output event type is always the same event type as the input event type. For derived-value data windows the output event type can be an entirely different type of events with new and often computed property values and types, including events that are from a different event representation such as for example XML-DOM.

If your application wants to provide a data window then use a plug-in data window. If it needs to provide multiple computed values for each row of output,
such as the slope-value and y-intercept value for a linear regression function for example, use a plug-in data window.

The input values to a plug-in aggregation function are the result of one or more expressions (as compared to data windows which have events as input),
and the output value of a plug-in aggregation function is always a single value, usually a primitive value such as a double-typed value but can be any object. If your application only needs to return a single value, more likely an aggregation function is appropriate.

The group-by clause acts only on aggregation functions (and not data window output), providing an aggregation value per-group. Also output-rate-limiting can
provide the last value per such group when the output condition occurs.

Data windows can also compute one or more output values per group by means of the "#groupwin()". These data window output events are not grouped by the group-by or output-rate clauses, if present.

A data window's output event properties become available in the select-clause, where-clause, group-by-clause, having-clause and order-by clause, while
aggregation functions output values are only available in the select-clause and having-clause.

[top]


When to use on-demand fire-and-forget queries versus on-select statements?

Sometimes user requirements are such that a query against data maintained by the runtime must be fired. Sometimes such intra-day queries are well-defined and known upfront, sometimes not.

Via named windows Esper allows predefined statements based on the on-select clause.

Via named windows Esper also allows fire-and-forget queries that leave no trace. Fire-and-forget queries can also be compiled for repeated execution. Here is a sample code snippet to prepare and call a fire-and-forget query:

String stmtText = "select * from SensorWindow where temperature = 80";
    EPCompiled compiled = compileFAF(stmtText); // see doc for how to compile a fire-and-forget query
    EPOnDemandPreparedQuery onDemandQuery = runtime.getFireAndForgetService().prepareQuery(compiled);
    EPOnDemandQueryResult result = onDemandQuery.execute();
    System.out.println(result.getArray()[0].get("sensor"));

A on-demand fire-and-forget query has the penalty of compiling the query and executing the query against an un-indexed data set, making the query slower to execute compared to pre-defined statements. The advantage is that it allows any type of query. Compare this to a predefined query based on the on-select clause. The next code snippet creates and executes a pre-defined query:

String stmtText = "on SensorQueryEvent select sensor from SensorWindow where temperature = querytemp";
EPDeployment deployment = compileDeploy(runtime, stmtText); // make sure you compile with the allow-subscriber flag on
deployment.getStatements()[0].setSubscriber(this);	// make sure you have an update(String sensor) method for the class

// Execute query, results are delivered via call to the update method.
// The SensorQueryEvent is expected to have a "querytemp" property as used in the on-select.
runtime.getEventService().sendEventBean(new SensorQueryEvent(80), "SensorQueryEvent");

A predefined query allows the Esper runtime to inspect the query conditions and thus maintain a proper index on named window contents to evaluate each query in a very efficient fashion. Thereby a predefined query can exhibt much better performance then a fire-and-forget query. See also the named window query benchmark for performance tests of both approaches.

[top]


How do I integrate with the Spring framework? How to use Spring support for Groovy or other scripting languages with EPL?

The Spring Framework (or Spring for short) is an open source application framework. This FAQ entry describes how a Spring XML file can hold EPL statements and inject listeners. It also shows how the Groovy dynamic scripting language can provide inlined scripts that acts as listeners to EPL continuous-query statements. A sample XML file for use with Spring follows. The XML relies on two classes in your classpath:
EsperBean and StatementBean. These classes are NOT part of the Esper distribution. They are instead listed below as examples.

    	    package org.springframework.scripting.groovy;
    	    import com.espertech.esper.client.UpdateListener
    	    import com.espertech.esper.client.EventBean;

    	    class GroovyMessenger implements UpdateListener {
    		public void update(EventBean[] eventBeans, EventBean[] eventBeans1) {
    		    System.out.println(Arrays.toString(eventBeans) + "from groovy");
    		}
    	    }

The EsperBean class below represents a thin wrapper for an EPRuntime:

public class EsperBean implements BeanNameAware, InitializingBean, DisposableBean {
        private EPRuntime epRuntime;
        private String name;
        private Set statementBeans = new LinkedHashSet();

        public void setStatements(StatementBean... statementBeans) {
    	for (StatementBean statementBean : statementBeans) {
    	    addStatement(statementBean);
    	}
        }

        public void addStatement(StatementBean statementBean) {
    	statementBeans.add(statementBean);
        }

        public void sendEvent(Object event) {
    	epRuntime.getEventService().sendEventBean(event, event.getClass().getSimpleName());
        }

        public void setBeanName(String name) {
    	this.name = name;
        }

        public void afterPropertiesSet() throws Exception {
    	epRuntime = EPRuntimeProvider.getProvider(name);
    	for (StatementBean statementBean : statementBeans) {
    	    EPDeployment deployment = compileDeploy(runtime, statementBean.getEPL());
    	    EPStatement epStatement = deployment.getStatements[0];
    	    statementBean.setEPStatement(epStatement);
    	}
        }

        public void destroy() throws Exception {
    	epRuntime.destroy();
        }
    }

The StatementBean class is a thin wrapper for an EPStatement, and is also required for the example:

public class StatementBean {
        private String epl;
        private EPStatement epStatement;
        private Set listeners = new LinkedHashSet();

        public StatementBean(String epl) {
            this.epl = epl;
        }

        public String getEPL(){
            return epl;
        }

        public void setListeners(UpdateListener... listeners) {
            for (UpdateListener listener : listeners) {
                addListener(listener);
            }
        }
        public void addListener(UpdateListener listener) {
            listeners.add(listener);
            if (epStatement != null) {
                epStatement.addListener(listener);
            }
        }

        void setEPStatement(EPStatement epStatement) {
            this.epStatement = epStatement;
            for (UpdateListener listener : listeners) {
                epStatement.addListener(listener);
            }
        }
    }

Finally, next is a sample code snippet for loading the XML file in Spring, which will automatically hook
up the statements
and listeners as defined in the XML:

ClassPathXmlApplicationContext appContext = new ClassPathXmlApplicationContext(new String[]{"esperspring.xml"});
    EsperBean esperBean = (EsperBean) appContext.getBean("esperBean", EsperBean.class);
    esperBean.sendEvent(testEvent);
    // ...when done, destroy the context...
    appContext.destroy();

[top]


How to change statements at runtime?

The services of the runtime available from EPRuntime allow to add, set and remove variables and deploy and un-deploy compiled modules.

Variables are the preferred way to introduce dynamic thresholds, change filters on the fly or generally parameterize a statement. Variables can be scalar, object(any) type and can be classes with properties as well as event-typed to hold events.

Consider using subqueries and named windows as a second and alternative approach. Named windows are similar to a relational database table, automatically indexed based on the standing statements against them or explicitly indexed for use in on-demand fire-and-forget queries.

Alternatively you may use any of the extension points such as user-defined function, custom aggregation functions (these are often useful in subqueries returning multiple rows) or custom data windows.

[top]


I'm considering creating a very large number of statements (such as 100k+ statements)?

I'm thinking of creating a large number of similar statements. I have a large number of customers and each customer can select values and alert names.
But I know that creating a statement for each combination of customer, values and alert names may not be the best approach, since the number of EPL statements could become 100000 or more (100k+) statements. Customers can change these at runtime and create new combinations.
Here is an example of the type of thing I need:

select "alert-1" as alertName, * from MyEvent(customer='A' and value = 1);
select "alert-2" as alertName, * from MyEvent(customer='B' and value = 20);
.....
select "alert-N" as alertName, * from MyEvent(customer='N' and value = 123);
// N could go up to 100000 or more

Esper has a concept called overlapping context. Contexts allow instantiating, parameterizing (allocating) and destroying analysis, or more generally controlling analysis lifecycle. This is useful here since your analysis is the same just parameterized differently.
There is no need to create 100k+ statements: Just create the below two statements (excluding type definitions) and send a special start-event to start analyzing for a specific customer, alert name and value.
Send a special stop-event to stop analyzing. Below is the EPL that you can cut&paste into the online tool.

create schema MyEvent(customer string, value int);

create schema AllocateAnalysis(customer string, value int, alertName string);

create schema DestroyAnalysis(customer string, value int, alertName string);

create context AnalysisContext
  initiated by AllocateAnalysis as aa
  terminated by DestroyAnalysis(customer = aa.customer and value = aa.value and alertName = aa.alertName);

context AnalysisContext
  select context.aa.alertName as alertName, *
  from MyEvent(customer = context.aa.customer and value = context.aa.value);

Every time that you want to start analyzing, send an AllocateAnalysis event into the runtime.
Below is the syntax for the EPL online tool (matches the example rows above).

AllocateAnalysis={customer='A', value=1, alertName='alert-1'}
AllocateAnalysis={customer='B', value=20, alertName='alert-2'}
AllocateAnalysis={customer='N', value=123, alertName='alert-N'}

To destroy a specific analysis send an DestroyAnalysis event into the runtime.

DestroyAnalysis={customer='A', value=1, alertName='alert-1'}

You should only insist on creating 100k+ individual EPL statements if each of these 100k+ statements is indeed very different from others. Most likely 100k+ statements have much in common therefore the approach above should be used. If the 100k+ statements really cannot be put into above approach, here are some ways of injecting a large number of statements. The compile can be done in parallel by more than one thread.

[top]


[top]

[/fusion_text][/fusion_builder_column][/fusion_builder_row][/fusion_builder_container]