With the introduction of Java 9, Java community has started to show its support directly towards Reactive streams, which was earlier used by leveraging third-party libraries. Please visit my earlier blog on Reactive streams to understand the basic ideologies which are working behind it like the push-pull model or backpressure. As a part of this blog, we will explore how we can leverage the same in our code while working with Java 9.
Java 9 includes basic interfaces for each of the fundamental Reactive Stream concepts in the Flow Concurrency library. This allows all Java applications to depend on this one library for Reactive Stream interfaces, rather than deciding on a specific implementation.
Java 9 Flow API:
It consists of interrelated interfaces and static methods for establishing flow-controlled components in which Publishers produce items that are consumed by one or more Subscribers, each managed by a Subscription.
These interfaces correspond to the reactive-streams specification. Communication relies on a simple form of flow control (method Flow.Subscription.request(long)) that can be used to avoid resource management problems that may otherwise occur in “push” based systems.
Modifier and Type | Class | Description |
---|---|---|
static interface |
Flow.Processor<T,R> |
A component that acts as both a Subscriber and Publisher.
|
static interface |
Flow.Publisher<T> |
A producer of items (and related control messages) received by Subscribers.
|
static interface |
Flow.Subscriber<T> |
A receiver of messages.
|
static interface |
Flow.Subscription |
Message control linking a
Flow.Publisher and Flow.Subscriber . |
To understand the entire cycle of publishing-subscribing, let’s create a Student class that we will use to create the stream message to be sent from publisher to subscriber.
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
@Builder
@Getter
@AllArgsConstructor
@ToString
public class Student {
int id;
String name;
}
We also have a utility class to create a list of students for our example.
import java.util.ArrayList;
import java.util.List;
public class StudentHelper {
public static List<Student> getStudents() {
Student student1 = Student.builder().id(1).name("Jaya").build();
Student student2 = Student.builder().id(2).name("Rahul").build();
Student student3 = Student.builder().id(3).name("Megha").build();
Student student4 = Student.builder().id(4).name("Tapas").build();
Student student5 = Student.builder().id(5).name("Raghav").build();
List<Student> studentList = new ArrayList<>();
studentList.add(student1);
studentList.add(student2);
studentList.add(student3);
studentList.add(student4);
studentList.add(student5);
return studentList;
}
}
Subscriber:
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
public class CustomStudentSubscriber implements Subscriber<Student> {
private Subscription subscription;
private int counter = 0;
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("onSubscribe for CustomStudentSubscriber called");
this.subscription = subscription;
this.subscription.request(1); //requesting data from publisher
System.out.println("onSubscribe for CustomStudentSubscriber requested 1 student");
}
@Override
public void onNext(Student student) {
System.out.println("Processing Student " + student);
counter++;
this.subscription.request(1);
}
@Override
public void onError(Throwable e) {
System.out.println("Some error happened");
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("All Processing Done");
}
public int getCounter() {
return counter;
}
}
- We are using the subscription variable to keep references so that requests can be made in the onNext method.
- The counter variable is being used to keep count of the number of items processed. Its value is increased in onNext method. This will be used in our main method to wait for execution to finish before ending the main thread.
- The subscription request is invoked in onSubscribe method to start the processing. After processing the item, it is called again in the onNext method to process the next item
- onError and onComplete should be used to perform corrective measures when an error occurs or cleanup of resources when processing completes successfully.
Test Program for Reactive Stream :
Let’s look at the test program for our reactive stream implementation. We will use SubmissionPublisher as Publisher here.
import java.util.List;
import java.util.concurrent.SubmissionPublisher;
public class StudentSubscriptionTestApp {
public static void main(String[] args) throws InterruptedException {
// Create Publisher
SubmissionPublisher<Student> publisher = new SubmissionPublisher<>();
// Register Subscriber
CustomStudentSubscriber subs = new CustomStudentSubscriber();
publisher.subscribe(subs);
List<Student> students = StudentHelper.getStudents();
// Publish items
System.out.println("Publishing Items to Subscriber");
students.forEach(publisher::submit);
// logic to wait till processing of all messages are over
while (students.size() != subs.getCounter()) {
Thread.sleep(10);
}
// close the Publisher
publisher.close();
System.out.println("Exiting the app");
}
}
The most important piece of the above code is the subscribe and submit methods invocation of the publisher. We should always close the publisher to avoid any memory leaks.
Output:
Publishing Items to Subscriber
onSubscribe for CustomStudentSubscriber called
onSubscribe for CustomStudentSubscriber requested 1 student
Processing Student Student(id=1, name=Jaya)
Processing Student Student(id=2, name=Rahul)
Processing Student Student(id=3, name=Megha)
Processing Student Student(id=4, name=Tapas)
Processing Student Student(id=5, name=Raghav)
Exiting the app
All Processing Done
Message Transformation:
We use a Processor to transform the message between a publisher and subscriber. Let’s say we have another subscriber who is expecting a different type of message to process. Let’s say this new message type is EngineeringStudent.
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class EngineeringStudent extends Student {
private int eid;
public EngineeringStudent(int id, int eid, String name) {
super(id, name);
this.eid = eid;
}
@Override
public String toString() {
return "[id=" + super.getId() + ",name=" + super.getName() + ",eid=" + eid + "]";
}
}
We have a new subscriber to consume EngineeringStudent stream data.
import java.util.concurrent.Flow;
public class CustomEngineeringStudentSubscriber implements Flow.Subscriber<EngineeringStudent> {
private Flow.Subscription subscription;
private int counter = 0;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("Subscribed for Engineering Student");
this.subscription = subscription;
this.subscription.request(1); //requesting data from publisher
System.out.println("onSubscribe requested 1 item for Engineering Student");
}
@Override
public void onNext(EngineeringStudent engineeringStudent) {
System.out.println("Processing Engineering Student " + engineeringStudent);
counter++;
this.subscription.request(1);
}
@Override
public void onError(Throwable e) {
System.out.println("Some error happened in CustomEngineeringStudentSubscriber");
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("All Processing Done for CustomEngineeringStudentSubscriber");
}
public int getCounter() {
return counter;
}
}
Processor:
The important part is the implementation of the Processor interface. Since we want to utilize the SubmissionPublisher, we would extend it and use it wherever applicable.
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;
public class CustomProcessor extends SubmissionPublisher<EngineeringStudent> implements Flow.Processor<Student, EngineeringStudent> {
private Flow.Subscription subscription;
private Function<Student, EngineeringStudent> function;
public CustomProcessor(Function<Student, EngineeringStudent> function) {
super();
this.function = function;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Student std) {
submit((EngineeringStudent) function.apply(std));
subscription.request(1);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
}
- Function will be used to convert Student object to EngineeringStudent object.
- We will convert the incoming Student message to the EngineeringStudent message in onNext method and then use SubmissionPublisher submit method to send it to the subscriber.
- Since Processor works as both subscriber and publisher, we can create a chain of processors between end publishers and subscribers.
MessageTransformation Test:
import java.util.List;
import java.util.concurrent.SubmissionPublisher;
public class ProcessorTestApp {
public static void main(String[] args) throws InterruptedException {
// Create End Publisher
SubmissionPublisher<Student> publisher = new SubmissionPublisher<>();
// Create Processor
CustomProcessor transformProcessor = new CustomProcessor(student ->
new EngineeringStudent(student.getId(), student.getId() + 100, student.getName()));
//Create End Subscriber
CustomEngineeringStudentSubscriber subs = new CustomEngineeringStudentSubscriber();
//Create chain of publisher, processor and subscriber
publisher.subscribe(transformProcessor); // publisher to processor
transformProcessor.subscribe(subs); // processor to subscriber
List<Student> emps = StudentHelper.getStudents();
// Publish items
System.out.println("Publishing Items to Subscriber");
emps.forEach(publisher::submit);
// Logic to wait for messages processing to finish
while (emps.size() != subs.getCounter()) {
Thread.sleep(10);
}
// Closing publishers
publisher.close();
transformProcessor.close();
System.out.println("Exiting the app");
}
}
Output:
Publishing Items to Subscriber
Subscribed for Engineering Student
onSubscribe requested 1 item for Engineering Student
Processing Engineering Student [id=1,name=Jaya,eid=101]
Processing Engineering Student [id=2,name=Rahul,eid=102]
Processing Engineering Student [id=3,name=Megha,eid=103]
Processing Engineering Student [id=4,name=Tapas,eid=104]
Processing Engineering Student [id=5,name=Raghav,eid=105]
Exiting the app
All Processing Done for CustomEngineeringStudentSubscriber
Done
Cancel Subscription:
We can use Subscription cancel method to stop receiving the message in subscriber. Note that if we cancel the subscription, then subscriber will not receive onComplete or onError signal.
Here is a sample code where subscriber is consuming only 2 messages and then canceling the subscription.
@Override public void onNext(Student student) { System.out.println("Processing Student " + student); counter++; if(counter==2) { this.subscription.cancel(); return; } this.subscription.request(1); }
Conclusion:
We learned how to create a processing Flow consisting of a Publisher and a Subscriber and a Processor. Java 9 Flow API is a good move towards reactive programming and to create an asynchronous non-blocking application. However, creating a true reactive application is possible only when all the systems API support it.
The above examples can be found at this Github repo.
I hope, you have liked my blog. If you have any doubts or suggestions to make please drop a comment. Thanks!
References:
Java 9 Flow official doc