In the last few posts, we have talked about Complex Event Processing (CEP), have gone through a simple case of CEP and have also looked at Grid Computing with GridGain. In this post, we are going to see how you can integrate both the paradigms together.

There are 2 ways to look the combination of CEP and Grid.

1) CEP does its job and passes over the events to be further processed on the grid.

2) CEP Engines are deployed on a Grid and then they work together to solve a complex business case.

Let us start with the former and then get to the more complex scenario in another post.

So in the first scenario, suppose that there is a CEP engine which has a statement registered with it, something like

public static void createLinkedInAggregatorStatement(EPAdministrator admin) {
 EPStatement statement = admin.createEPL("select * from hr)");
 statement.addListener(new LinkedInUpdateListener());

What we are conceptually trying to do here is the following scenario. As we know, there are a lot of updates happening on LinkedIn. Now, as a business case what we need to do is that after 24 hours, all users registered with LinkedIn, should get an aggregated email of all the updates which happen in their network. These include the change of status, uploaded a new photo etc etc. For the sake of simplicity, let us assume that all these are LinkedInUpdateEvent(s).

We register the above statement with the CEP Engine and ask it to batch the events for a period of 24 hours. Once the statement is registered with the Engine, it keeps looking for the events which match the criteria. The criteria that we define above is simple as it just keeps on batching all the events of the type LinkedInUpdateEvent.

So, the batching is easy. The complexity in processing is that out of these several thousand updates which happen in a period of 24 hours, the system should be able to find who are the interested users. And then for each interested user, there could be a number of updates which are applicable.

Let us take a small example. For simplicity, let us assume that there are 5 users in all. U1, U2, U3, U4 and U5

  • U1 has U2, U5 as a friends
  • U2 has U1, U3, U5
  • U3 has U2, U5
  • U4 has U5
  • and U5 has U1, U2, U3, U4

Now if U5 changes something then all U1, U2, U3 and U4 should get an update and if U1 changes something then U2 and U5 should be notified. Now, you can extrapolate this problem to thousands and millions of users for the social networking sites.

This is the area where a grid job could potentially help. The idea is as depicted in the diagram below.

Once we have the events from the engine passed on to the listener after a period of 24 hours, the listener has a method which is gridified.

public class CompanyNewsEventListener implements UpdateListener {


private Collection<UpdateNotification> generateAggregatedNewsInformationForAllUsers(EventBean[] events, MasterRepository masterRepository) {
 Collection<User> users = masterRepository.getAllUsers();
 Collection updateEvents = fetchAllLinkedInUpdateEvents(events);
 aggregateNewsNotificationsOnGrid(users, upadteEvents);

@Gridify(taskClass = AggregateUpdatesForUsersTask.class, timeout = 3000)
 private Collection aggregateUpdateNotificationsOnGrid(Collection users, Collection linkedInUpdateEvent) {



As you would recall from the earlier post, the AggregateUpdatesForUsersTask is as class where we write the map-reduce logic. The business logic still resides with the aggregateUpdateNotificationsOnGrid method, the AggregateUpdatesForUsersTask provides the infrastructure to Grid enable the processing.

The business logic is to identify the updates for each user by looking at a user watchList and all the events that might match the conditions is his watchlist.
Then all the updateNotifications are aggregated together for each user. When the job is grid enabled, the processing for each user happens on a different node.
Then in the reduce task, it gets a collection of LinkedInUpdateNotification(s) which it can either further process on doing a recursive map-reduce or simply pass them to an email client, which sends out the emails. The key here is that the processing for each individual user and the identification of events that are interesting to him happen on a separate node.

Let us look at the AggregateUpdatesForUsersTask.

public class AggregateUpdatesForUsersTask extends GridifyTaskSplitAdapter> {

	protected Collection split(int gridSize, GridifyArgument gridArguements) throws GridException {
		List jobs = new ArrayList(((Collection) gridArguements.getMethodParameters()[1]).size());
		processJobs(gridArguements, jobs);
		return jobs;

	private void processJobs(GridifyArgument gridifyArgument, List jobs) {
		Collection newsEvents = (Collection) gridifyArgument.getMethodParameters()[0];
		Collection<User> users = (Collection<User>) gridifyArgument.getMethodParameters()[1];
		for (User user : users) {
			LinkedInUpdateForUserJobAdaptor updateForUserJobAdaptor = new LinkedInUpdateForUserJobAdaptor();
			updateForUserJobAdaptor.addArgument((Serializable) newsEvents);

	public Collection reduce(List<GridJobResult> results) throws GridException {
		Collection newsNotificationsForAllUsers = new ArrayList();
		for (GridJobResult res : results) {
			UpdateNotification userAggregatedNewsInformation = (UpdateNotification) res.getData();
			if (userAggregatedNewsInformation.hasNews()) {
		return newsNotificationsForAllUsers;

Here, if you would notice, we create as many jobs for different nodes as the number of users in the system.

for (User user : users) {
			LinkedInUpdateForUserJobAdaptor updateForUserJobAdaptor = new LinkedInUpdateForUserJobAdaptor();

Now each job is of the type GridJobAdapter, which has a method called processJobs to do the actual processing. The reduce job gets a collection of UpdateNotification(s) which it can process further as needed.

Let us look what the JobAdapter looks like

public class LinkedInUpdateForUserJobAdaptor extends GridJobAdapter {

	private final Log logger = LogFactory.getLog(getClass());

	private static final int NUM_ARGUMENTS = 2;

	public Serializable execute() throws GridException {
		List arguments = getAllArguments();
		if (arguments != null && arguments.size() == NUM_ARGUMENTS) {
			User user = (User) arguments.get(0);
			Collection newsEvents = (Collection) arguments.get(1);
			return processJob(user, newsEvents);
		logger.warn("Expected " + NUM_ARGUMENTS + " arguments to be passed for job execution.");
		return null;

	private UpdateNotification processJob(User user, Collection newsEvents) {"Aggregating Alerts for user : " + user.getName());
		return new LinkedInUpdateEventListener().generateAggregatedNewsInformationForSingleUser(newsEvents, user);


In the situation above, the CEP Event listener acts as a handshake between the CEP Engine and GridComputing framework. Our listener has the following imports

import org.gridgain.grid.GridException;
import org.gridgain.grid.GridFactory;
import org.gridgain.grid.gridify.Gridify;
import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.UpdateListener;

Thus, it is not difficult to integrate the output of a CEP Engine with the Grid framework and both of them can coexist with tremendous benefits. In another post we would try to address the complexity of “CEP Engines are deployed on a Grid and then they work together to solve a complex business case.” We would look at the problems with this scenario and possible solutions.

About the Author: Vikas Hazrati

Vikas is the Founding Partner @ Knoldus which is a group of software industry veterans who have joined hands to add value to the art of software development. Knoldus does niche Reactive and Big Data product development on Scala, Spark and Functional Java. Knoldus has a strong focus on software craftsmanship which ensures high-quality software development. It partners with the best in the industry like Lightbend (Scala Ecosystem), Databricks (Spark Ecosystem), Confluent (Kafka) and Datastax (Cassandra). To know more, send a mail to or visit

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )


Connecting to %s