Executors in Completable Future

Reading Time: 2 minutes

What is executor service?

Executor service is a framework which helps us to run the task asynchronously with their own thread pool. Future uses executor service to execute their task separately.

We can define the executor with the different thread pools. Some of the common thread pools are as follows.

  • New Cached Thread Pool
  • New Work Stealing
  • New Fixed Thread Pool

New Cached Thread Pool: Creates a new thread if no thread a available else will use the existing thread.

New Work Stealing Pool: It will maintain the specified number of threads. It will create the and shutdown threads to maintain parallelism.

New Fixed Thread Pool: It will use the fixed number of threads.

Which Thread Pool to pick?

Which thread to pick completely depends upon your use case. CachedThreadPool and WorkStealingPool will both create new threads if required. These can be used when you need faster and non-blocking calls. However, we should be careful while using these thread pools. If these threads are not killed then it will lead to an increase of idle threads.


In FixedThreadPool if you have given pool size less than your actual number of tasks that can be processed in parallel. In that case your task will wait in queue for thread to become available. This will lead to low performance of the program. Ideal Pool size should equal to the core size if you have CPU intensive work. In case we have more IO intensive operation then we can have more threads.

How can we use the executor service ?

Below is an example of a helper class that we have created to show the use case of the Executor service. It is a class which has write and read methods for files.

//Helper Class for read and write operations for a file.
class FileStore {
private File storeFolder;
// Locks is used so that only one thread can update the file at a
// time.
private ReadWriteLock lock = new ReentrantReadWriteLock();
/**
* Construct an FileStore Store
* @param storeFolder This folder will be used to store the object data.
*/
FileBasedObjectStore(Path storeFolder) {
this.storeFolder = storeFolder.toFile();
}
/**
* Reads the file from the file store.
*
* @param id – the ID of the object
* @return the object if it exists or empty if it does not.
*/
@Override
public Optional<Object> read(UUID id) {
lock.readLock().lock();
File file = new File(storeFolder, id.toString());
Optional<Object> result;
try (
FileInputStream fileIn = new FileInputStream(file);
ObjectInputStream in = new ObjectInputStream(fileIn)
) {
result = Optional.ofNullable(in.readObject());
} catch ( IOException | ClassNotFoundException ex) {
result = Optional.empty();
} finally {
lock.readLock().unlock();
}
return result;
}
/**
* Writes an object to a file in the file store.
*
* @param id – the id of the object being written.
* @param obj – the object to be written.
* @return true if the write was successful, false if it failed.
*/
@Override
public boolean write(UUID id, Object content) {
// lock before we can write i file
lock.writeLock().lock();
File file = new File(storeFolder, id.toString());
boolean result;
try (
FileOutputStream fileOut = new FileOutputStream(file);
ObjectOutputStream out = new ObjectOutputStream(fileOut)
) {
out.writeObject(content);
result = true;
} catch (IOException ex) {
result = false;
} finally {
lock.writeLock().unlock();
}
return result;
}
}
view raw executors.java hosted with ❤ by GitHub

In the below example we will be using fixedThreadPool with size 2.

// Example of the executor service
class UserRepository implements Closeable {
private FileStore fileStore;
private ConcurrentHashMap<UUID, User> cache = new ConcurrentHashMap<>();
private ReadWriteLock lock = new ReentrantReadWriteLock();
private ExecutorService executors = Executors.newFixedThreadPool(2);
UserRepository(FileStore fnileStore) {
this.fileStore = fileStore;
}
@Override
public CompletableFuture<Void> saveUser(User user) {
return CompletableFuture.runAsync(() > {
lock.writeLock().lock();
fileStore.write(user.getId(), user);
cache.put(user.getId(), user);
lock.writeLock().unlock();
}, executors);
}
@Override
public void close() {
executors.isShutdown()
});
view raw example.java hosted with ❤ by GitHub

The above code block shows how we can use the Executor in CompletableFuture. We create the Executor Service object at line 7 with thread pool as fixed thread pool with 2 as value. As a next step in line 20, we just simply provide it in the runAsync() method as a parameter of CompletableFuture class.

You can see that the class UserRepository implements the closeable interface. We implement this interface so we can shutdown the executor service as it doesn’t stop implicitly and as a result JVM will keep on running.

The shutdown() method doesn’t cause immediate destruction of the ExecutorService but it will not accepting any new tasks and shut down after all running threads finish their current work.

That’s all for the small introduction for the process if you have any more queries or want to know more about it you can add the comment. I am happy to answer them. 🙂

References


Knoldus-blog-footer-image

Written by 

Priyanka Thakur is a Software Consultant having 6 months of experience currently working at Knoldus Software LLP. She has done MCA at BVICAM, GGSIPU. She has a graduation degree in BCA from JIMS, GGSIPU. She is familiar with programming language' s such as C, Java, and Scala. She also has interest in Angular and Docker and currently working on Logam. She is dedicated, focused and hardworking person. Her interests are in to face new programming challenges because she believes these challenges produce opportunity. She is keen to learn new technologies. In her leisure time, she prefers reading about mythology, watching movies.