In the last post we looked at the basics of Complex Event Processing (CEP). In this post we are going to give that a try using the community edition of Esper which is an open source event stream processing (ESP) and event correlation engine (CEP).
Let us assume a simple scenario. Say you want to go from Foster City to Redwood city on US 101. There are 2 meters installed. One at the point where you would hit US 101 from Foster City (A) and the other as soon as you would take the exit for Redwood City (B) on US 101. Now for the sake of simplicity let us assume that events are being triggered after every 5 seconds which let you know the average time that it takes for a car which is traveling from A to B. Now assume that you want to make this trip. You would like to take a quick peek on how the traffic is doing before you can commit a time to your date. Luckily, there is a hosted system on the web which lets find out the average time to complete the trip on the basis of data collected from the last 30 seconds. This is the sliding window for which we are interested to be as real as possible.
Esper provides a rich Event Processing Language (EPL) to express filtering, aggregation, and joins, possibly over sliding windows of multiple event streams.
In our case let us see what a traffic event would look like
[sourcecode language=”java”]
package org.inphina.cep;
public class TrafficEvent {
private String roadName;
private double timeSpentToCoverTheDistance;
public TrafficEvent(String roadName, double timeSpentToCoverTheDistance) {
this.timeSpentToCoverTheDistance = timeSpentToCoverTheDistance;
}
public String getRoadName() {
return roadName;
}
public void setRoadName(String roadName) {
this.roadName = roadName;
}
public double getTimeSpentToCoverTheDistance() {
return timeSpentToCoverTheDistance;
}
public void setTimeSpentToCoverTheDistance(double timeSpentToCoverTheDistance) {
this.timeSpentToCoverTheDistance = timeSpentToCoverTheDistance;
}
}
[/sourcecode]
Here, for now we are interested in the time taken to complete the trip from A to B, which is the property timeSpentToCoverTheDistance.
Also, since we are interested in a sliding window of 30 s hence we should be able to gather events for the last 30 s and do a computation on that. Hence, our statement looks like this
[sourcecode language=”java”]
private static final String EVENT_QUERY = "select avg(timeSpentToCoverTheDistance) from org.inphina.cep.TrafficEvent.win:time(30 sec)";
protected final Log logger = LogFactory.getLog(getClass());
public static void main(String[] args) throws InterruptedException {
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
EPStatement statement = epService.getEPAdministrator().createEPL(EVENT_QUERY);
[/sourcecode]
This statement is a continuous query. All the event data is passed through this statement.
We have also added a listener which is invoked whenever there is a change in the resultset of the statement. The listener is either invoked when the resultset changes or when the data expires with the sliding window. The listener in our case looks at the average time taken to commute and lets the consumer know whether it is good to travel at this time or not.
[sourcecode language=”java”]
public class TrafficEventListener implements UpdateListener {
private static final int TRAFFIC_5SECONDS = 5;
private static final int TRAFFIC_3SECONDS = 3;
private static final String AVG_TRAFFIC_TIME_IS = "Avg traffic time is = ";
private static final String HEAVY_TRAFFIC_MESSAGE = "Peak Traffic, Rather stay where you are! ";
private static final String MODERATE_TRAFFIC_MESSAGE = "Slow Traffic, but still tolerable! ";
private static final String EASY_TRAFFIC_MESSAGE = "You would fly! ";
protected final Log logger = LogFactory.getLog(getClass());
public void update(EventBean[] newEvents, EventBean[] oldEvents) {
long timeStart = System.currentTimeMillis();
logger.info("In the event handler " + timeStart/1000);
EventBean event = newEvents[0];
showTrafficStatus((Double)event.get("avg(timeSpentToCoverTheDistance)"));
logger.info("avg=" + event.get("avg(timeSpentToCoverTheDistance)"));
}
private void showTrafficStatus(Double averageTime) {
if (averageTime < TRAFFIC_3SECONDS){
displayMessage(EASY_TRAFFIC_MESSAGE, averageTime);
}else if (averageTime >= TRAFFIC_3SECONDS && averageTime <= TRAFFIC_5SECONDS){
displayMessage(MODERATE_TRAFFIC_MESSAGE, averageTime);
}else{
displayMessage(HEAVY_TRAFFIC_MESSAGE, averageTime);
}
}
private void displayMessage(String message, Double averageTime){
logger.info(message + AVG_TRAFFIC_TIME_IS + averageTime);
}
[/sourcecode]
All right, Now we have the infrastructure ready. Let us fire events. The traffic events in our case are fired like this
[sourcecode language=”java”]
private void generateTrafficEvents(EPServiceProvider epService) throws InterruptedException {
long timeStart = System.currentTimeMillis();
logger.info("Start " + timeStart / 1000);
TrafficEvent event = new TrafficEvent("US 101", 1);
epService.getEPRuntime().sendEvent(event);
Thread.sleep(5000);
TrafficEvent event1 = new TrafficEvent("US 101", 2);
epService.getEPRuntime().sendEvent(event1);
Thread.sleep(5000);// 5s
TrafficEvent event2 = new TrafficEvent("US 101", 3);
epService.getEPRuntime().sendEvent(event2);
Thread.sleep(5000);// 10s
TrafficEvent event3 = new TrafficEvent("US 101", 4);
epService.getEPRuntime().sendEvent(event3);
Thread.sleep(5000);// 15s
TrafficEvent event4 = new TrafficEvent("US 101", 5);
epService.getEPRuntime().sendEvent(event4);
Thread.sleep(5000);// 20s
TrafficEvent event5 = new TrafficEvent("US 101", 6);
epService.getEPRuntime().sendEvent(event5);
Thread.sleep(40000);// 25s
TrafficEvent event6 = new TrafficEvent("US 101", 7);
epService.getEPRuntime().sendEvent(event6);
Thread.sleep(5000);// 30s
TrafficEvent event7 = new TrafficEvent("US 101", 8);
epService.getEPRuntime().sendEvent(event7);
Thread.sleep(5000);// 35s
TrafficEvent event8 = new TrafficEvent("US 101", 9);
epService.getEPRuntime().sendEvent(event8);
logger.info("thats all");
}
[/sourcecode]
For the purpose of demonstration, we are showing a message from the event listener whenever an event is triggered. We could customize it to show the message only when specifically someone logs into the application and wants to know whether “now” is a good time to travel or not. In our scenario this is what the logs look like on the basis of above events.
[sourcecode language=”text”]
11:37:01,256 INFO Client:30 – Start 1271657221
11:37:01,265 INFO TrafficEventListener:20 – In the event handler 1271657221
11:37:01,265 INFO TrafficEventListener:40 – You would fly! Avg traffic time is = 1.0
11:37:01,266 INFO TrafficEventListener:25 – avg=1.0
11:37:06,267 INFO TrafficEventListener:20 – In the event handler 1271657226
11:37:06,267 INFO TrafficEventListener:40 – You would fly! Avg traffic time is = 1.5
11:37:06,268 INFO TrafficEventListener:25 – avg=1.5
11:37:11,270 INFO TrafficEventListener:20 – In the event handler 1271657231
11:37:11,271 INFO TrafficEventListener:40 – You would fly! Avg traffic time is = 2.0
11:37:11,271 INFO TrafficEventListener:25 – avg=2.0
11:37:16,272 INFO TrafficEventListener:20 – In the event handler 1271657236
11:37:16,273 INFO TrafficEventListener:40 – You would fly! Avg traffic time is = 2.5
11:37:16,274 INFO TrafficEventListener:25 – avg=2.5
11:37:21,275 INFO TrafficEventListener:20 – In the event handler 1271657241
11:37:21,276 INFO TrafficEventListener:40 – Slow Traffic, but still tolerable! Avg traffic time is = 3.0
11:37:21,276 INFO TrafficEventListener:25 – avg=3.0
11:37:26,277 INFO TrafficEventListener:20 – In the event handler 1271657246
11:37:26,278 INFO TrafficEventListener:40 – Slow Traffic, but still tolerable! Avg traffic time is = 3.5
11:37:26,279 INFO TrafficEventListener:25 – avg=3.5
11:37:31,226 INFO TrafficEventListener:20 – In the event handler 1271657251
11:37:31,227 INFO TrafficEventListener:40 – Slow Traffic, but still tolerable! Avg traffic time is = 4.0
11:37:31,235 INFO TrafficEventListener:25 – avg=4.0
11:37:36,224 INFO TrafficEventListener:20 – In the event handler 1271657256
11:37:36,225 INFO TrafficEventListener:40 – Slow Traffic, but still tolerable! Avg traffic time is = 4.5
11:37:36,225 INFO TrafficEventListener:25 – avg=4.5
11:37:41,224 INFO TrafficEventListener:20 – In the event handler 1271657261
11:37:41,225 INFO TrafficEventListener:40 – Slow Traffic, but still tolerable! Avg traffic time is = 5.0
11:37:41,225 INFO TrafficEventListener:25 – avg=5.0
11:37:46,224 INFO TrafficEventListener:20 – In the event handler 1271657266
11:37:46,225 INFO TrafficEventListener:40 – Peak Traffic, Rather stay where you are! Avg traffic time is = 5.5
11:37:46,225 INFO TrafficEventListener:25 – avg=5.5
11:37:51,224 INFO TrafficEventListener:20 – In the event handler 1271657271
11:37:51,225 INFO TrafficEventListener:40 – Peak Traffic, Rather stay where you are! Avg traffic time is = 6.0
11:37:51,225 INFO TrafficEventListener:25 – avg=6.0
11:38:06,281 INFO TrafficEventListener:20 – In the event handler 1271657286
11:38:06,281 INFO TrafficEventListener:40 – Peak Traffic, Rather stay where you are! Avg traffic time is = 7.0
11:38:06,282 INFO TrafficEventListener:25 – avg=7.0
11:38:11,283 INFO TrafficEventListener:20 – In the event handler 1271657291
11:38:11,284 INFO TrafficEventListener:40 – Peak Traffic, Rather stay where you are! Avg traffic time is = 7.5
11:38:11,285 INFO TrafficEventListener:25 – avg=7.5
11:38:16,286 INFO TrafficEventListener:20 – In the event handler 1271657296
11:38:16,287 INFO TrafficEventListener:40 – Peak Traffic, Rather stay where you are! Avg traffic time is = 8.0
11:38:16,288 INFO TrafficEventListener:25 – avg=8.0
11:38:16,289 INFO Client:67 – thats all
[/sourcecode]
Again, the events could be triggered at non regular intervals from multiple places. Esper would make all the event data pass through the statement. This is inline with what we had mentioned in the last post about the fact that CEP is database inverted. Instead of running queries on data which is already there in the database, data is passed through the statements as and when it happens.
[sourcecode language=”text”]
Thus, a CEP framework like Esper can be easily plugged into and be made to process large amounts of incoming events to give real time push for action. The benchmarks favor Esper as a performant CEP engine.
Esper exceeds over 500 000 event/s on a dual CPU 2GHz Intel based hardware, with engine latency below 3 microseconds average (below 10us with more than 99% predictability) on a VWAP (volume weighted average) benchmark with 1000 statements registered in the system – this tops at 70 Mbit/s at 85% CPU usage. Esper also demonstrates linear scalability from 100 000 to 500 000 event/s on this hardware, with consistent results across different statements.
[/sourcecode]
I too had been looking into CEP for some initiative within the company. Taking to next level, I had explored Drools Fusion and also looking into ESPER too. Also, I had given presentations over it during Camp this year.
Sounds good Gaurav, do you have your results about Drools Fusion and your presentation in the public domain. Would be great to have a look.