Implement a producer-consumer using BlockingQueue. — Cracked Java
// Java Collections Framework · BlockingQueue Family
MidCodingEPAMAmazon

Implement a producer-consumer using BlockingQueue.

A BlockingQueue collapses the classic producer-consumer problem into a handful of lines. Producers put, consumers take, and the queue handles all the synchronization. Shutdown is the tricky part — usually solved with a "poison pill" sentinel or interruption.

Minimal pattern

public class Pipeline {

    private static final Task POISON = new Task(-1, null);  // sentinel
    private final BlockingQueue<Task> queue = new ArrayBlockingQueue<>(256);

    public void run() throws InterruptedException {
        ExecutorService consumers = Executors.newFixedThreadPool(4);
        for (int i = 0; i < 4; i++) {
            consumers.submit(this::consume);
        }

        // single producer in the main thread
        for (int i = 0; i < 10_000; i++) {
            queue.put(new Task(i, "payload-" + i));
        }

        // signal shutdown: one pill per consumer
        for (int i = 0; i < 4; i++) queue.put(POISON);

        consumers.shutdown();
        consumers.awaitTermination(1, TimeUnit.MINUTES);
    }

    private void consume() {
        try {
            while (true) {
                Task t = queue.take();
                if (t == POISON) return;          // shutdown
                process(t);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();   // restore the flag
        }
    }

    private void process(Task t) {
        // do the work
        System.out.println(Thread.currentThread().getName() + " -> " + t);
    }

    record Task(int id, String payload) {}
}

Why each piece matters

  • Bounded queue. Backpressure — if consumers are slow, put blocks the producer.
  • put / take. No spin loops, no manual wait/notify. Interrupt-safe.
  • Poison pill. One sentinel per consumer guarantees each one wakes up and exits cleanly. Order is preserved (FIFO) so it follows all real work.
  • InterruptedException restored. Lets shutdownNow() and outer cancellation work as intended.

Alternative shutdown: interrupt

private void consume() {
    while (!Thread.currentThread().isInterrupted()) {
        try {
            Task t = queue.poll(1, TimeUnit.SECONDS);
            if (t == null) continue;
            process(t);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
    }
}

Driver:

consumers.shutdownNow();   // interrupts workers
consumers.awaitTermination(10, TimeUnit.SECONDS);

Trade-off: interrupt is "drop pending work"; poison pill is "drain then exit". Pick by use case.

Multiple producers

Works identically — each producer calls queue.put(...). The trick is knowing when to stop sending pills. A Phaser or AtomicInteger counter coordinates producers; the last producer to finish enqueues the pills.

AtomicInteger remainingProducers = new AtomicInteger(producerCount);
// each producer, after its last task:
if (remainingProducers.decrementAndGet() == 0) {
    for (int i = 0; i < consumerCount; i++) queue.put(POISON);
}

What NOT to do

// BAD: spin loop
while (true) {
    Task t = queue.poll();
    if (t != null) process(t);
    // burns CPU when queue is empty
}

// BAD: manual wait/notify
synchronized (queue) {
    while (queue.isEmpty()) queue.wait();    // reinventing BlockingQueue
    process(queue.poll());
}

// BAD: swallow InterruptedException
try { queue.put(t); } catch (InterruptedException ignored) {}
// shutdown signals lost — service won't terminate

Stream-style alternative

If you don't need fine-grained shutdown semantics, just use an executor:

ExecutorService pool = new ThreadPoolExecutor(
    4, 4, 0L, MILLISECONDS,
    new ArrayBlockingQueue<>(256),
    new ThreadPoolExecutor.CallerRunsPolicy()
);

for (int i = 0; i < 10_000; i++) {
    int id = i;
    pool.submit(() -> process(new Task(id, "payload-" + id)));
}

pool.shutdown();
pool.awaitTermination(1, TimeUnit.MINUTES);

Same underlying mechanism — ThreadPoolExecutor's worker threads loop on workQueue.take() exactly like the manual consumer above.

Mark your status