Combining CEP with Grid Computing

In the last few posts, we have talked about Complex Event Processing (CEP), have gone through a simple case of CEP and have also looked at Grid Computing with GridGain. In this post, we are going to see how you can integrate both the paradigms together.

There are 2 ways to look the combination of CEP and Grid.

1) CEP does its job and passes over the events to be further processed on the grid.

2) CEP Engines are deployed on a Grid and then they work together to solve a complex business case.

Let us start with the former and then get to the more complex scenario in another post.

So in the first scenario, suppose that there is a CEP engine which has a statement registered with it, something like

[sourcecode language=”java”]

public static void createLinkedInAggregatorStatement(EPAdministrator admin) {
EPStatement statement = admin.createEPL("select * from com.inphina.cep.event.LinkedInUpdateEvent.win:time_batch(24 hr)");
statement.addListener(new LinkedInUpdateListener());
}

[/sourcecode]

What we are conceptually trying to do here is the following scenario. As we know, there are a lot of updates happening on LinkedIn. Now, as a business case what we need to do is that after 24 hours, all users registered with LinkedIn, should get an aggregated email of all the updates which happen in their network. These include the change of status, uploaded a new photo etc etc. For the sake of simplicity, let us assume that all these are LinkedInUpdateEvent(s).

We register the above statement with the CEP Engine and ask it to batch the events for a period of 24 hours. Once the statement is registered with the Engine, it keeps looking for the events which match the criteria. The criteria that we define above is simple as it just keeps on batching all the events of the type LinkedInUpdateEvent.

So, the batching is easy. The complexity in processing is that out of these several thousand updates which happen in a period of 24 hours, the system should be able to find who are the interested users. And then for each interested user, there could be a number of updates which are applicable.

Let us take a small example. For simplicity, let us assume that there are 5 users in all. U1, U2, U3, U4 and U5

  • U1 has U2, U5 as a friends
  • U2 has U1, U3, U5
  • U3 has U2, U5
  • U4 has U5
  • and U5 has U1, U2, U3, U4

Now if U5 changes something then all U1, U2, U3 and U4 should get an update and if U1 changes something then U2 and U5 should be notified. Now, you can extrapolate this problem to thousands and millions of users for the social networking sites.

This is the area where a grid job could potentially help. The idea is as depicted in the diagram below.

Once we have the events from the engine passed on to the listener after a period of 24 hours, the listener has a method which is gridified.

[sourcecode language=”java”]

public class CompanyNewsEventListener implements UpdateListener {

private Collection<UpdateNotification> generateAggregatedNewsInformationForAllUsers(EventBean[] events, MasterRepository masterRepository) {
Collection<User> users = masterRepository.getAllUsers();
Collection updateEvents = fetchAllLinkedInUpdateEvents(events);
aggregateNewsNotificationsOnGrid(users, upadteEvents);

}

@Gridify(taskClass = AggregateUpdatesForUsersTask.class, timeout = 3000)
private Collection aggregateUpdateNotificationsOnGrid(Collection users, Collection linkedInUpdateEvent) {

}

[/sourcecode]

As you would recall from the earlier post, the AggregateUpdatesForUsersTask is as class where we write the map-reduce logic. The business logic still resides with the aggregateUpdateNotificationsOnGrid method, the AggregateUpdatesForUsersTask provides the infrastructure to Grid enable the processing.

The business logic is to identify the updates for each user by looking at a user watchList and all the events that might match the conditions is his watchlist.
Then all the updateNotifications are aggregated together for each user. When the job is grid enabled, the processing for each user happens on a different node.
Then in the reduce task, it gets a collection of LinkedInUpdateNotification(s) which it can either further process on doing a recursive map-reduce or simply pass them to an email client, which sends out the emails. The key here is that the processing for each individual user and the identification of events that are interesting to him happen on a separate node.

Let us look at the AggregateUpdatesForUsersTask.

[sourcecode language=”java”]
public class AggregateUpdatesForUsersTask extends GridifyTaskSplitAdapter> {

@Override
protected Collection split(int gridSize, GridifyArgument gridArguements) throws GridException {
List jobs = new ArrayList(((Collection) gridArguements.getMethodParameters()[1]).size());
processJobs(gridArguements, jobs);
return jobs;
}

private void processJobs(GridifyArgument gridifyArgument, List jobs) {
Collection newsEvents = (Collection) gridifyArgument.getMethodParameters()[0];
Collection<User> users = (Collection<User>) gridifyArgument.getMethodParameters()[1];
for (User user : users) {
LinkedInUpdateForUserJobAdaptor updateForUserJobAdaptor = new LinkedInUpdateForUserJobAdaptor();
updateForUserJobAdaptor.addArgument(user);
updateForUserJobAdaptor.addArgument((Serializable) newsEvents);
jobs.add(updateForUserJobAdaptor);
}
}

@Override
public Collection reduce(List<GridJobResult> results) throws GridException {
Collection newsNotificationsForAllUsers = new ArrayList();
for (GridJobResult res : results) {
UpdateNotification userAggregatedNewsInformation = (UpdateNotification) res.getData();
if (userAggregatedNewsInformation.hasNews()) {
newsNotificationsForAllUsers.add(userAggregatedNewsInformation);
}
}
return newsNotificationsForAllUsers;
}

[/sourcecode]

Here, if you would notice, we create as many jobs for different nodes as the number of users in the system.

[sourcecode language=”java”]
for (User user : users) {
LinkedInUpdateForUserJobAdaptor updateForUserJobAdaptor = new LinkedInUpdateForUserJobAdaptor();
[/sourcecode]

Now each job is of the type GridJobAdapter, which has a method called processJobs to do the actual processing. The reduce job gets a collection of UpdateNotification(s) which it can process further as needed.

Let us look what the JobAdapter looks like

[sourcecode language=”java”]

public class LinkedInUpdateForUserJobAdaptor extends GridJobAdapter {

private final Log logger = LogFactory.getLog(getClass());

private static final int NUM_ARGUMENTS = 2;

@SuppressWarnings("unchecked")
@Override
public Serializable execute() throws GridException {
List arguments = getAllArguments();
if (arguments != null && arguments.size() == NUM_ARGUMENTS) {
User user = (User) arguments.get(0);
Collection newsEvents = (Collection) arguments.get(1);
return processJob(user, newsEvents);
}
logger.warn("Expected " + NUM_ARGUMENTS + " arguments to be passed for job execution.");
return null;
}

private UpdateNotification processJob(User user, Collection newsEvents) {
logger.info("Aggregating Alerts for user : " + user.getName());
return new LinkedInUpdateEventListener().generateAggregatedNewsInformationForSingleUser(newsEvents, user);
}

}

[/sourcecode]

In the situation above, the CEP Event listener acts as a handshake between the CEP Engine and GridComputing framework. Our listener has the following imports

[sourcecode language=”java”]
import org.gridgain.grid.GridException;
import org.gridgain.grid.GridFactory;
import org.gridgain.grid.gridify.Gridify;
import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.UpdateListener;
[/sourcecode]

Thus, it is not difficult to integrate the output of a CEP Engine with the Grid framework and both of them can coexist with tremendous benefits. In another post we would try to address the complexity of “CEP Engines are deployed on a Grid and then they work together to solve a complex business case.” We would look at the problems with this scenario and possible solutions.

Written by 

Vikas is the CEO and Co-Founder of Knoldus Inc. Knoldus does niche Reactive and Big Data product development on Scala, Spark, and Functional Java. Knoldus has a strong focus on software craftsmanship which ensures high-quality software development. It partners with the best in the industry like Lightbend (Scala Ecosystem), Databricks (Spark Ecosystem), Confluent (Kafka) and Datastax (Cassandra). Vikas has been working in the cutting edge tech industry for 20+ years. He was an ardent fan of Java with multiple high load enterprise systems to boast of till he met Scala. His current passions include utilizing the power of Scala, Akka and Play to make Reactive and Big Data systems for niche startups and enterprises who would like to change the way software is developed. To know more, send a mail to hello@knoldus.com or visit www.knoldus.com

Leave a Reply

%d bloggers like this: