A BlockingQueue collapses the classic producer-consumer problem into a handful of lines. Producers put, consumers take, and the queue handles all the synchronization. Shutdown is the tricky part — usually solved with a "poison pill" sentinel or interruption.
Minimal pattern
public class Pipeline {
private static final Task POISON = new Task(-1, null); // sentinel
private final BlockingQueue<Task> queue = new ArrayBlockingQueue<>(256);
public void run() throws InterruptedException {
ExecutorService consumers = Executors.newFixedThreadPool(4);
for (int i = 0; i < 4; i++) {
consumers.submit(this::consume);
}
// single producer in the main thread
for (int i = 0; i < 10_000; i++) {
queue.put(new Task(i, "payload-" + i));
}
// signal shutdown: one pill per consumer
for (int i = 0; i < 4; i++) queue.put(POISON);
consumers.shutdown();
consumers.awaitTermination(1, TimeUnit.MINUTES);
}
private void consume() {
try {
while (true) {
Task t = queue.take();
if (t == POISON) return; // shutdown
process(t);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // restore the flag
}
}
private void process(Task t) {
// do the work
System.out.println(Thread.currentThread().getName() + " -> " + t);
}
record Task(int id, String payload) {}
}
Why each piece matters
- Bounded queue. Backpressure — if consumers are slow,
putblocks the producer. put/take. No spin loops, no manualwait/notify. Interrupt-safe.- Poison pill. One sentinel per consumer guarantees each one wakes up and exits cleanly. Order is preserved (FIFO) so it follows all real work.
InterruptedExceptionrestored. LetsshutdownNow()and outer cancellation work as intended.
Alternative shutdown: interrupt
private void consume() {
while (!Thread.currentThread().isInterrupted()) {
try {
Task t = queue.poll(1, TimeUnit.SECONDS);
if (t == null) continue;
process(t);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
Driver:
consumers.shutdownNow(); // interrupts workers
consumers.awaitTermination(10, TimeUnit.SECONDS);
Trade-off: interrupt is "drop pending work"; poison pill is "drain then exit". Pick by use case.
Multiple producers
Works identically — each producer calls queue.put(...). The trick is knowing when to stop sending pills. A Phaser or AtomicInteger counter coordinates producers; the last producer to finish enqueues the pills.
AtomicInteger remainingProducers = new AtomicInteger(producerCount);
// each producer, after its last task:
if (remainingProducers.decrementAndGet() == 0) {
for (int i = 0; i < consumerCount; i++) queue.put(POISON);
}
What NOT to do
// BAD: spin loop
while (true) {
Task t = queue.poll();
if (t != null) process(t);
// burns CPU when queue is empty
}
// BAD: manual wait/notify
synchronized (queue) {
while (queue.isEmpty()) queue.wait(); // reinventing BlockingQueue
process(queue.poll());
}
// BAD: swallow InterruptedException
try { queue.put(t); } catch (InterruptedException ignored) {}
// shutdown signals lost — service won't terminate
Stream-style alternative
If you don't need fine-grained shutdown semantics, just use an executor:
ExecutorService pool = new ThreadPoolExecutor(
4, 4, 0L, MILLISECONDS,
new ArrayBlockingQueue<>(256),
new ThreadPoolExecutor.CallerRunsPolicy()
);
for (int i = 0; i < 10_000; i++) {
int id = i;
pool.submit(() -> process(new Task(id, "payload-" + id)));
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.MINUTES);
Same underlying mechanism — ThreadPoolExecutor's worker threads loop on workQueue.take() exactly like the manual consumer above.