The Problem

The use case involves tracking three components of a transaction. Each component comes to the engine as an event with the following fields:

  • Transaction ID
  • Time stamp

In addition, we have the following extra fields.

In event A:

  • Customer ID

In event C:

  • Supplier ID (the ID of the supplier that the order was filled through)

We need to take in events A, B and C and produce a single, combined event with the following fields:

  • Transaction ID
  • Customer ID
  • Time stamp from event A
  • Time stamp from event B
  • Time stamp from event C

What we are doing here is matching the transaction IDs on each event, to form an aggregate event. If all these events were in a relational database, this could be done as a simple SQL join except that with 10,000 events per second, you will need some serious database hardware to do it.

Real-Time Summary Data

Further, we need to produce the following:

  • Min,Max,Average total latency from the events (difference in time between A and C) over the past 30 minutes.
  • Min,Max,Average latency grouped by (a) customer ID and (b) supplier ID. In other words, metrics on the the latency of the orders coming from each customer and going to each supplier.

Find Missing Events

We need to detect a transaction that did not make it through all three events. In other words, a transaction with events A or B, but not C. Note that, in this case, what we care about is event C. The lack of events A or B could indicate a failure in the event transport and should be ignored. Although the lack of an event C could also be a transport failure, it merits looking into.

Solution

Event Definitions

Events in Esper are represented via regular Java classes that expose JavaBean-style "getter" methods for access to event properties. Since all components of a transaction have a transaction id and timestamp field, we create a base event class with the common fields as below.

public class TxnEventBase {
    private String transactionId;
    private long timestamp;

    public TxnEventBase(String transactionId, long timestamp) {
            this.transactionId = transactionId;
            this.timestamp = timestamp;
    }

    public String getTransactionId() {
            return transactionId;
    }

    public long getTimestamp() {
            return timestamp;
    }
}

Event A in the 3-component transaction can now simply extend the TxnEventBase class and add the customer id field.

public class TxnEventA extends TxnEventBase {
    private String customerId;

    public TxnEventA(String transactionId, long timestamp, String customerId) {
        super(transactionId, timestamp);
        this.customerId = customerId;
    }

    public String getCustomerId() {
        return customerId;
    }
}

Detecting Combined Events

Now that we have defined the event classes, we can compose an EPL statement to detect combined events in which each component of the transaction is present. We restrict the event matching to the events that arrived within the last 30 minutes.

insert into CombinedEvent(transactionId, customerId, supplierId, latencyAC, latencyBC, latencyAB)
select C.transactionId, customerId, supplierId,
       C.timestamp - A.timestamp, C.timestamp - B.timestamp, B.timestamp - A.timestamp
  from TxnEventA#time(30 minutes) A,
       TxnEventB#time(30 minutes) B,
       TxnEventC#time(30 minutes) C
 where A.transactionId = B.transactionId and B.transactionId = C.transactionId

This statement uses the insert into syntax to generate a CombinedEvent event stream. The next section uses that stream to derive realtime data on latencies between events.

Compiling Realtime Summary Data

We can now use the CombinedEvent stream to derive realtime summary data.

To derive the minimum, maximum and average total latency from the events (difference in time between A and C) over the past 30 minutes we can use the EPL below.

select min(latencyAC) as minLatencyAC, max(latencyAC) as maxLatencyAC, avg(latencyAC) as avgLatencyAC
  from CombinedEvent#time(30 minutes)

We can derive this data per customer and supplier id by specifying a group by clause.

select customerId, min(latencyAC) as minLatencyAC, max(latencyAC) as maxLatencyAC, avg(latencyAC) as avgLatencyAC
  from CombinedEvent#time(30 minutes)
 group by customerId

Finding Missing Events

An outer join allows us to detect a transaction that did not make it through all three events (see solution patterns for further ways to detect missing events).

select rstream *
  from TxnEventA#time(30 minutes) A
       full outer join TxnEventC#time(60 minutes) C on A.transactionId = C.transactionId
       full outer join TxnEventB#time(30 minutes) B on B.transactionId = C.transactionId
 where C.transactionId is null

When TxnEventA or TxnEventB events leave their respective time windows consisting of the last 30 minutes of events, Esper filters out rows in which no EventC row was found. The "rstream" keyword selects the remove stream, meaning events that leave the window (by default and without the rstream keyword a listener receives only insert stream events).

Using the API

Setting up the Compiler and Runtime

The Java code snippet below shows how to compile and deploy the EPL. The Configuration is used to make the event classes known to the compiler and the runtime. This is optional and makes the EPL statements easier to read.

Configuration configuration = new Configuration();
configuration.getCommon().addEventType("TxnEventA", TxnEventA.class);
configuration.getCommon().addEventType("TxnEventB", TxnEventB.class);
configuration.getCommon().addEventType("TxnEventC", TxnEventC.class);

Compiling the EPL

The compiler returns the byte code for the EPL:

String stmtSupplier =
  "select supplierId, min(latencyAC) as minLatency, max(latencyAC) as maxLatency, avg(latencyAC) as avgLatency" +
  "from CombinedEvent#time(30 minutes) " +
  "group by supplierId";

EPCompiled compiled;
try {
   CompilerArguments args = new CompilerArguments(configuration);
   compiled = EPCompilerProvider.getCompiler().compile(stmtSupplier, args); 
} catch (EPCompileException ex) {
  throw new RuntimeException("Compilation failed: " + ex.getMessage(), ex);
}

Allocate a Runtime and Deploy Compiled EPL

The code to get a new runtime and deploy the compiled EPL is:

EPRuntime runtime = EPRuntimeProvider.getDefaultRuntime(configuration);

EPDeployment deployment;
try {
   deployment = runtime.getDeploymentService().deploy(compiled); 
} catch (EPDeployException ex) {
  throw new RuntimeException("Dpeloyment failed: " + ex.getMessage(), ex);
}

Attaching Listeners

Listeners receive events published by statements and can query the received events. The listener interface receives event entering window(s) as new events (aka. istream or insert stream) and events as they are leaving window(s) as old events (aka. rstream or remove stream).

public class RealtimeSummaryTotalsListener implements UpdateListener {

    public void update(EventBean[] newEvents, EventBean[] oldEvents, EPStatement statement, EPRuntime runtime) {

        EventBean event = newEvents[0];
        log.debug(" Totals minAC=" + event.get("minLatencyAC") +
                  " maxAC=" + event.get("maxLatencyAC") +
                  " avgAC=" + event.get("avgLatencyAC"));
    }
}

And this is the code to attach the listener:

deployment.getStatements()[0].addListener(new RealtimeSummaryTotalsListener());

Sending Events

Finally, events are sent for processing by the engine via the runtime interface:

TxnEventA event = new TxnEventA("txnid1", 1, "customerId1");
runtime.getEventService().sendEventBean(event, "TxnEventA");

Running the Example

The complete example code can be found in the "examples" folder of the distribution in package com.espertech.esper.example.transaction. The example contains a transaction simulator that can be invoked from the command line. The readme file in the "examples/etc" folder contains build instructions and command line parameters to run the transaction simulator from the command line. You do not need to perform a build of the compiler or runtime to run the example - the jar files are all included in the distribution.

Please consult the examples section in the reference documentation for more information.