Unblocking the java.util.concurrent’s blocking queues in Java
The classes in java.util.concurrent package offer a bunch of nice and easy to use mechanisms to develop asynchronous application modules. One of these mechanism is BlockingQueue and its implementations. Unfortunately, one of the mechanisms Sun has forgotten to add to its API is the ability to gracefully shutdown the blocking queue. Lets consider the typical example of producer/consumer classes (Java source, Java source with line numbers).
In this example we would like to be able to gracefully stop the producer and consumer when needed, for example when the application is exiting. This is a typical producer/consumer model. Look carefully at the source code and at the exit conditions for these threads. What is wrong there?
The problem with this code is that the consumer can exit only when it has control.
45 while(isAlive) { 46 try { 47 QueueItem item = queue.take(); 48 item.doWork(); 49 } catch (InterruptedException e) { ...
And it does not get control until it gets a new item from the blocking queue. In case if the queue is empty and the producer exits, the consumer will never get any new items and, thus, will never get control in order to analyze the exit condition and exit.
You can potentially interrupt the consumer thread. However, it may be dangerous - what if it is processing a work item at the time you call Thread.interrupt() method? If it is working with an interruptible I/O channel it will be closed, i.e. you will interrupt the processing of a transaction. Sometimes it is acceptable (if it is a real transaction that can be aborted and retried later), but in our case the goal was to provide a graceful exit, i.e. complete the current task without interruption and then exit. Potentially we could synchronize the interruption operation so it would not send interrupt() call when the consumer is busy, but there is an easier and less synchronous solution.
So, we do not have any problems if the consumer is processing something. Once processing is done it will exit if the condition is set. Thus, we need to find a way how to "wake up" the consumer if it is doing nothing. Obviously, the only way is to send something to the empty queue. We can create a special transaction called "poison". Upon receiving this transaction the consumer will exit immediately.
20 producer.stop(); 21 consumer.stop(); 22 queue.add(new PoisonQueueItem());
46 while(isAlive) { 47 try { 48 QueueItem item = queue.take(); 49 if (item instanceof PoisonQueueItem) { 50 System.out.println("Arghhhh..."); 51 break; 52 } 53 item.doWork();
The solution is shown in the second example (Java source, Java source with line numbers).
You do not have to create a subclass for this special item type, it really depends on the architecture of your application. If you have only one queue item type you can just add a boolean field "isPoison" that would be false by default for the real work items.
You do not have to provide special priority for the poison transaction. Remember, the goal is to have A transaction in the queue, so if you have a real one that precedes the poison one - that is OK, the consumer will be unblocked anyway.
blog comments powered by Disqus