One-way & two-way streaming in a Lagom application


Now a days streaming word is a buzz word and you should have heard many types of streaming till now i.e. kafka streaming, spark streaming etc etc. But in this blog we will see a new type of streaming i.e Lagom-streaming.

Lagom-streaming internally uses Akka streams, with the help of which we will see one way & two way streaming. But before going forward, it would be good we will get the difference between one way & two way streaming, so then lets get the difference first and then will move further.

One-way streaming: In this type of streaming, request will be normal but the response will be streamed.

Two-way streaming: In this type of streaming both request & response will be streamed.

Now as we have got the difference, I will not waste your time in theory part. Lets move ahead to see the implementation part in Lagom. We will see both type of streaming together so that we can compare those very easily and can understand the difference quite properly.

API implementation:

One-way streaming:

    ServiceCall<ProductRequest, Source<ProductResponse, ?>> oneWayStreaming();

Two-way streaming:

    ServiceCall<Source<ProductRequest, ?>, Source<ProductResponse, ?>> twoWayStreaming();

Full code for API :

public interface StreamingService extends Service {

    ServiceCall<ProductRequest, Source<ProductResponse, ?>> oneWayStreaming();

    ServiceCall<Source<ProductRequest, ?>, Source<ProductResponse, ?>> twoWayStreaming();

    @Override
    default Descriptor descriptor() {
        return named("streaming").withCalls(
                Service.pathCall("/api/streaming/oneWay", this::oneWayStreaming),
                Service.pathCall("/api/streaming/twoWay", this::twoWayStreaming)
        ).withAutoAcl(true);
    }
}


Service Implementation:
One-way streaming:

 @Override
    public ServiceCall<ProductRequest, Source<ProductResponse, ?>> oneWayStreaming() {
        return (request ->
                CompletableFuture.completedFuture(Source.from(request.getProducts())
                        .mapAsync(1, product -> prepareProductResponse(product, request.getCategory()))));
    }

Two-way streaming:

 @Override
    public ServiceCall<Source<ProductRequest, ?>, Source<ProductResponse, ?>> twoWayStreaming() {
        return (request ->
                CompletableFuture.completedFuture(request.mapAsync(1,
                        productRequest -> prepareProductResponse(productRequest.getProducts().get(0),
                                productRequest.getCategory()))));
    }

Full code:

public class StreamingServiceImpl implements StreamingService {

    @Override
    public ServiceCall<ProductRequest, Source<ProductResponse, ?>> oneWayStreaming() {
        return (request ->
                CompletableFuture.completedFuture(Source.from(request.getProducts())
                        .mapAsync(1, product -> prepareProductResponse(product, request.getCategory()))));
    }

    @Override
    public ServiceCall<Source<ProductRequest, ?>, Source<ProductResponse, ?>> twoWayStreaming() {
        return (request ->
                CompletableFuture.completedFuture(request.mapAsync(1,
                        productRequest -> prepareProductResponse(productRequest.getProducts().get(0),
                                productRequest.getCategory()))));
    }


    private CompletionStage<ProductResponse> prepareProductResponse(String product, String category) {
        return CompletableFuture.completedFuture(new ProductResponse() {{
            setCategory(category);
            setProduct(product);
        }});
    }
}

That’s it. We have implemented the streaming in a Lagom application.

But now the challenge is how to test this. We can not test this through postman or normal rest client, we need websocket client or application to test this. So either create n websocket client by yorself or use the online websocket client to test the streaming application.

I hope it will be worthy for you and you enjoyed the reading. You can get the full code here.


knoldus-advt-sticker


Advertisements

About Rishi Khandelwal

Lead Consultant having more than 6 years industry experience. He has working experience in various technologies such as Scala, Java8, Play, Akka, Lagom, Spark, Hive, Kafka, Cassandra, Akka-http, Akka-Streams, ElasticSearch, Backbone.js, html5, javascript, Less, Amazon EC2, WebRTC, SBT
This entry was posted in Akka, Best Practices, big data, Functional Programming, github, Java, knoldus, Messages, Reactive, Scala, Streaming, Web Services. Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s