Unit Testing Esper Statements

Table of contents
Reading Time: 2 minutes

Esper Statements would allow the flow of events to pass through them. Usually what would you like to unit test for a basic scenario?

  1. The valid events are being passed to the listener
  2. The invalid events are filtered out and are not passed to the listener
  3. The batching time, if defined for the statement, is being adhered to.

Let us quickly look at what a simple unit test would look like to cover all the above testing needs.

This is what our statement looks like

[sourcecode language=”java”]
public void createStatement(EPAdministrator admin) {
statement = admin.createEPL("insert into CompanyNewsOutputAlert(companyId, elementType, news) "
+ "select InputEvent.companyId as companyId, InputEvent.elementType as elementType, InputEvent.news as news "
+ "from CompanyNewsInputEvent.win:time_batch(5 sec) as InputEvent "
+ "where InputEvent.elementType=com.inphina.cep.event.NewsElementType.NEWS");
statement.addListener(companyNewsEventListener);
}
[/sourcecode]

The main things to notice in this statement are that we are interested in only CompanyNewsInputEvent and that the event stream of valid events should be batched for a period of 5 sec. Ok, let us see what the test class looks like

[sourcecode language=”java”]
public class CompanyNewsStatementTest {

private EPServiceProvider epService;
private ApplicationContext ctx;
private CompanyNewsStatement stmt;
private FakeCompanyNewsEventListener listener;

@Before
public void setUp() {

// Get Beans from Spring container
ctx = new ClassPathXmlApplicationContext("testApplicationContext.xml");
startEsperEngine();
stmt = (CompanyNewsStatement) ctx.getBean("companyNewsStatement");
listener = (FakeCompanyNewsEventListener) stmt.getCompanyNewsEventListener();

stmt.createStatement(epService.getEPAdministrator());

// Use external clocking for the test
epService.getEPRuntime().sendEvent(new TimerControlEvent(TimerControlEvent.ClockType.CLOCK_EXTERNAL));
}

private void startEsperEngine() {
Configuration configuration = new Configuration();
configuration.addEventType("CompanyNewsInputEvent", CompanyNewsInputEvent.class.getName());
configuration.addEventType("CompanyScoreInputEvent", CompanyScoreInputEvent.class.getName());
epService = EPServiceProviderManager.getProvider("CompanyNewsStatementTest", configuration);

epService.initialize();
}
[/sourcecode]

As you would notice that in this test, we are injecting a fake listener. Since we are unit testing, we would be fine if we can test the statement. We do not want to test the listener logic. We use Spring to inject the FakeCompanyNewsEventListener into the Statement class. Our fake listener has utility methods to let us know if it is invoked and if it is then how many events does it have.

[sourcecode language=”java”]
public class FakeCompanyNewsEventListener implements UpdateListener {

private boolean isInvoked;
private EventBean[] newData = {};

@Override
public void update(EventBean[] newData, EventBean[] oldData) {
this.newData = newData;

isInvoked = true;
}

public boolean isInvoked() {
return isInvoked;
}

public int getSizeOfEvents() {
return newData.length;
}

}
[/sourcecode]

So we are all set, now let us look at the test

[sourcecode language=”java”]
@Test
public void flowStreamOfEvents() throws InterruptedException {
sendEvent(new CurrentTimeEvent(1000));

sendEvent(new CompanyNewsInputEvent(100, NewsElementType.NEWS, "bla"));
sendEvent(new CompanyNewsInputEvent(101, NewsElementType.NEWS, "bla"));
sendEvent(new CompanyNewsInputEvent(102, NewsElementType.NEWS, "bla"));
sendEvent(new CompanyNewsInputEvent(103, NewsElementType.NEWS, "bla"));
sendEvent(new CompanyNewsInputEvent(104, NewsElementType.NEWS, "bla"));
sendEvent(new CompanySMSInputEvent(104, SMSType.SHORT, "bla"));

sendEvent(new CurrentTimeEvent(5000));
Assert.assertFalse(listener.isInvoked());
Assert.assertEquals(0, listener.getSizeOfEvents());

sendEvent(new CurrentTimeEvent(6000));

Assert.assertTrue(listener.isInvoked());
Assert.assertEquals(5, listener.getSizeOfEvents());
}

private void sendEvent(Object event) {
epService.getEPRuntime().sendEvent(event);
}
[/sourcecode]

Here, amongst the news events, we are also sending the SMS event. Obviously our statement should not send that event to the listener and hence we have asserted for 5.
Also, if you have noticed the setup, you would notice another interesting thing. We are telling the Esper engine to use and external clock for clocking.

[sourcecode language=”java”]
// Use external clocking for the test
epService.getEPRuntime().sendEvent(new TimerControlEvent(TimerControlEvent.ClockType.CLOCK_EXTERNAL));
[/sourcecode]

Using this external clock ensures that we can tell the Esper engine how much time has elapsed. So instead of waiting for 5 Secs to let the batching happen, we can tell the Esper engine that the time now is 6s, using a statement like this

sendEvent(new CurrentTimeEvent(6000));

Imagine what would do without this if your batch period was 24 hours. Of course you could also inject the batch period from your spring context as a property. If that is not the case then this is pretty handy.

Written by 

Vikas is the CEO and Co-Founder of Knoldus Inc. Knoldus does niche Reactive and Big Data product development on Scala, Spark, and Functional Java. Knoldus has a strong focus on software craftsmanship which ensures high-quality software development. It partners with the best in the industry like Lightbend (Scala Ecosystem), Databricks (Spark Ecosystem), Confluent (Kafka) and Datastax (Cassandra). Vikas has been working in the cutting edge tech industry for 20+ years. He was an ardent fan of Java with multiple high load enterprise systems to boast of till he met Scala. His current passions include utilizing the power of Scala, Akka and Play to make Reactive and Big Data systems for niche startups and enterprises who would like to change the way software is developed. To know more, send a mail to hello@knoldus.com or visit www.knoldus.com

5 thoughts on “Unit Testing Esper Statements3 min read

  1. Hi
    What if one wants to test ESPER by changing at runtime not only the EPL/EQL statements, but also the Listener’s update without using Reflection? Is Esper prepared to be used in such way?
    Best regards
    Orlando

    1. Hi Orlando, not sure if I understood your question correctly but the listener would only listen once the statements pass on the data to the listener. So if you are controlling the statements passing on data by batching, then you are also controlling the update of the listener. Also, if you feel that the listener should process that data in a batch or after some time then that would be a unit test for the listener logic right? and this could easily be done with jUnit outside of Esper because we are testing the logic of the listener.

      Could you elaborate more on your question if I did not understand that correctly.

  2. Hi Vikas
    My question was on a different scope, but I think I already know the answer. You have made some quite useful remarks though, thanks a lot for the help and also for the quite interesting article.
    Best regards
    Orlando.

Comments are closed.

Discover more from Knoldus Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading