What is executor service?
Executor service is a framework which helps us to run the task asynchronously with their own thread pool. Future uses executor service to execute their task separately.
We can define the executor with the different thread pools. Some of the common thread pools are as follows.
- New Cached Thread Pool
- New Work Stealing
- New Fixed Thread Pool
New Cached Thread Pool: Creates a new thread if no thread a available else will use the existing thread.
New Work Stealing Pool: It will maintain the specified number of threads. It will create the and shutdown threads to maintain parallelism.
New Fixed Thread Pool: It will use the fixed number of threads.
Which Thread Pool to pick?
Which thread to pick completely depends upon your use case. CachedThreadPool and WorkStealingPool will both create new threads if required. These can be used when you need faster and non-blocking calls. However, we should be careful while using these thread pools. If these threads are not killed then it will lead to an increase of idle threads.
In FixedThreadPool if you have given pool size less than your actual number of tasks that can be processed in parallel. In that case your task will wait in queue for thread to become available. This will lead to low performance of the program. Ideal Pool size should equal to the core size if you have CPU intensive work. In case we have more IO intensive operation then we can have more threads.
How can we use the executor service ?
Below is an example of a helper class that we have created to show the use case of the Executor service. It is a class which has write and read methods for files.
//Helper Class for read and write operations for a file. | |
class FileStore { | |
private File storeFolder; | |
// Locks is used so that only one thread can update the file at a | |
// time. | |
private ReadWriteLock lock = new ReentrantReadWriteLock(); | |
/** | |
* Construct an FileStore Store | |
* @param storeFolder This folder will be used to store the object data. | |
*/ | |
FileBasedObjectStore(Path storeFolder) { | |
this.storeFolder = storeFolder.toFile(); | |
} | |
/** | |
* Reads the file from the file store. | |
* | |
* @param id – the ID of the object | |
* @return the object if it exists or empty if it does not. | |
*/ | |
@Override | |
public Optional<Object> read(UUID id) { | |
lock.readLock().lock(); | |
File file = new File(storeFolder, id.toString()); | |
Optional<Object> result; | |
try ( | |
FileInputStream fileIn = new FileInputStream(file); | |
ObjectInputStream in = new ObjectInputStream(fileIn) | |
) { | |
result = Optional.ofNullable(in.readObject()); | |
} catch ( IOException | ClassNotFoundException ex) { | |
result = Optional.empty(); | |
} finally { | |
lock.readLock().unlock(); | |
} | |
return result; | |
} | |
/** | |
* Writes an object to a file in the file store. | |
* | |
* @param id – the id of the object being written. | |
* @param obj – the object to be written. | |
* @return true if the write was successful, false if it failed. | |
*/ | |
@Override | |
public boolean write(UUID id, Object content) { | |
// lock before we can write i file | |
lock.writeLock().lock(); | |
File file = new File(storeFolder, id.toString()); | |
boolean result; | |
try ( | |
FileOutputStream fileOut = new FileOutputStream(file); | |
ObjectOutputStream out = new ObjectOutputStream(fileOut) | |
) { | |
out.writeObject(content); | |
result = true; | |
} catch (IOException ex) { | |
result = false; | |
} finally { | |
lock.writeLock().unlock(); | |
} | |
return result; | |
} | |
} |
In the below example we will be using fixedThreadPool with size 2.
// Example of the executor service | |
class UserRepository implements Closeable { | |
private FileStore fileStore; | |
private ConcurrentHashMap<UUID, User> cache = new ConcurrentHashMap<>(); | |
private ReadWriteLock lock = new ReentrantReadWriteLock(); | |
private ExecutorService executors = Executors.newFixedThreadPool(2); | |
UserRepository(FileStore fnileStore) { | |
this.fileStore = fileStore; | |
} | |
@Override | |
public CompletableFuture<Void> saveUser(User user) { | |
return CompletableFuture.runAsync(() -> { | |
lock.writeLock().lock(); | |
fileStore.write(user.getId(), user); | |
cache.put(user.getId(), user); | |
lock.writeLock().unlock(); | |
}, executors); | |
} | |
@Override | |
public void close() { | |
executors.isShutdown() | |
}); |
The above code block shows how we can use the Executor in CompletableFuture. We create the Executor Service object at line 7 with thread pool as fixed thread pool with 2 as value. As a next step in line 20, we just simply provide it in the runAsync() method as a parameter of CompletableFuture class.
You can see that the class UserRepository implements the closeable interface. We implement this interface so we can shutdown the executor service as it doesn’t stop implicitly and as a result JVM will keep on running.
The shutdown() method doesn’t cause immediate destruction of the ExecutorService but it will not accepting any new tasks and shut down after all running threads finish their current work.
That’s all for the small introduction for the process if you have any more queries or want to know more about it you can add the comment. I am happy to answer them. 🙂
References
