Event Bus in vert.x: How it works

Reading Time: 4 minutes

An event bus is a light-weight distributed messaging system. It allows different parts of your application, or different applications and services to communicate with each in a loosely coupled way.

A Vert.x application is made up of one or more verticles, and each verticle forms a unit for processing asynchronous events. When we need to work with multiple verticles in our applications then we also need communication between these verticles. Verticles communicate with each other by sending messages on the event bus.

Example

For instance, we have a verticle for dealing with HTTP requests, and a verticle for managing access to the database. The event bus allows the HTTP verticle to send a request to the database verticle that performs a SQL query, and responds back to the HTTP verticle.

The Vert.x event bus is the main tool for different verticles to communicate through asynchronous message passing.

Vert.x is Polygot:

Since Vert.x is polyglot, the event bus allows verticles written in different languages to communicate with each other without requiring any complex language interoperability layer, whether for communications inside the same JVM process or across the network.

Messages are sent and retrieved from destinations. A destination is simply a free-form string, such as incoming.message.event or incoming-message-event. Messages have a body and optional headers for storing metadata.

For example, we have created two verticles. One is the sender and the other is the receiver. 

The below code for SenderVerticle.

  @Override
  public void start() throws Exception {
    // Create a Router
    Router router = Router.router(vertx);
//    Mount the handler for incoming request
    router.get("/send/:message").handler(this::sendMessage);

    // ******************** Creating Server *********************
    HttpServer server = vertx.createHttpServer();
    // Handle every request using the router
    server.requestHandler(router)
    // start listening on port 8282
      .listen(8282).onSuccess(msg -> {
        System.out.println("*************** Server started on "
           + server.actualPort() + " *************");
      });
  }

  private void sendMessage(RoutingContext context) {
//    generating random id for message
    String uuid = UUID.randomUUID().toString();
//    create event bus object
    final EventBus eventBus = vertx.eventBus();
    final String message = context.request().getParam("message");
//    creating json object for message
    JsonObject entries = new JsonObject();
    entries.put("id", uuid);
    entries.put("message", message);
    entries.put("time", System.currentTimeMillis());
    eventBus.request("incoming.message.event", entries, reply -> {
      if (reply.succeeded()) {
        context.json(reply.result().body());
      } else {
        System.out.println("No reply");
      }
    });
  }

where:

  • “incoming.message.event” is the destination address that consumer use.
  • this:onMessage defines the function that will call on receiving a message.

Now, lets see the ReceiverVerticle code. It will consume the message and reply to the sender for confirmation of message. Here we are sending the received message in reply, that means the sender will receive the json body of the message with reference to the confirmation.

public class ReceiverVerticle extends AbstractVerticle {
  @Override
  public void start() throws Exception {
    vertx.eventBus().consumer("incoming.message.event",this::onMessage);
  }
//  onMessage() will be called when a message is received.
  private <T> void onMessage(Message<T> tMessage) {
      JsonObject message = (JsonObject) tMessage.body();
      System.out.println("Message Received " + message);
      tMessage.reply(message);
  }
}

Output

When you start the application it will print the running port of the server to the console.

When you access the URL http://localhost:8282/send/:message, here message is the parameter that you are sending to verticle and getting the message body in reply. for example if we make a request on http://localhost:8282/send/Welcome to knoldus family. then the response will be like below pic:

Console output:

Note: You can get the access of this code from this repository.

The event bus supports the following communication patterns:

  • point-to-point messaging
  • request-response messaging
  • publish / subscribe for broadcasting messages.

Point-to-point messaging

Producers sent messages to destinations. Destination names are free-form strings, but the convention in the Vert.x community is to use separating dots.

Request-response messaging

Request-reply messaging communication pattern is a variation on point-to-point messaging. When a message will send in point-to-point messaging, it is possible to register a reply handler. When you do, the event bus generates a temporary destination name dedicated solely to communications between the request message producer that is expecting a reply, and the consumer that will eventually receive and process the message.

For example, an HTTP API verticle can send a request to a data store verticle to fetch some data, and the data store verticle eventually returns a reply message.

Publish / subscribe messages

In publish/subscribe communications, there is even more decoupling between producers and consumers. If a message sent to a destination, all subscribers receive it. for example –

Messages M1M2, and M3 are each sent by a different producer, and all subscribers receive the messages, unlike in the case of point-to-point messaging. It is not possible to specify reply handlers for publish/subscribe communications on the event bus.

If you need message consumers to get back to the entity that sent the event, go for request-reply.

knoldus

Written by 

Aasif Ali is a Software Consultant at Knoldus Inc. He has done Post Graduation from Quantum University Roorkee. He has the knowledge of various programming languages. He is passionate about Java development and curious to learn Java Technologies. He is always impatient and enthusiastic to learn new things. He is a quick learner, problem solver and always enjoy to help others. His hobbies are watching Sci-fi movies , Playing badminton and listening to songs.