Browser docs

Poison Pill

Intent

Poison Pill is known predefined data item that allows to provide graceful shutdown for separate distributed consumption process.

Explanation

Real world example

Let’s think about a message queue with one producer and one consumer. The producer keeps pushing new messages in the queue and the consumer keeps reading them. Finally when it’s time to gracefully shut down the producer sends the poison pill message.

In plain words

Poison Pill is a known message structure that ends the message exchange.

Programmatic Example

Let’s define the message structure first. There’s interface Message and implementation SimpleMessage.

 1public interface Message {
 2
 3  ...
 4
 5  enum Headers {
 6    DATE, SENDER
 7  }
 8
 9  void addHeader(Headers header, String value);
10
11  String getHeader(Headers header);
12
13  Map<Headers, String> getHeaders();
14
15  void setBody(String body);
16
17  String getBody();
18}
19
20public class SimpleMessage implements Message {
21
22  private final Map<Headers, String> headers = new HashMap<>();
23  private String body;
24
25  @Override
26  public void addHeader(Headers header, String value) {
27    headers.put(header, value);
28  }
29
30  @Override
31  public String getHeader(Headers header) {
32    return headers.get(header);
33  }
34
35  @Override
36  public Map<Headers, String> getHeaders() {
37    return Collections.unmodifiableMap(headers);
38  }
39
40  @Override
41  public void setBody(String body) {
42    this.body = body;
43  }
44
45  @Override
46  public String getBody() {
47    return body;
48  }
49}

To pass messages we are using message queues. Here we define the types related to the message queue: MqPublishPoint, MqSubscribePoint and MessageQueue. SimpleMessageQueue implements all these interfaces.

 1public interface MqPublishPoint {
 2
 3  void put(Message msg) throws InterruptedException;
 4}
 5
 6public interface MqSubscribePoint {
 7
 8  Message take() throws InterruptedException;
 9}
10
11public interface MessageQueue extends MqPublishPoint, MqSubscribePoint {
12}
13
14public class SimpleMessageQueue implements MessageQueue {
15
16  private final BlockingQueue<Message> queue;
17
18  public SimpleMessageQueue(int bound) {
19    queue = new ArrayBlockingQueue<>(bound);
20  }
21
22  @Override
23  public void put(Message msg) throws InterruptedException {
24    queue.put(msg);
25  }
26
27  @Override
28  public Message take() throws InterruptedException {
29    return queue.take();
30  }
31}

Next we need message Producer and Consumer. Internally they use the message queues from above. It’s important to notice that when Producer stops, it sends out the poison pill to inform Consumer that the messaging has finished.

 1public class Producer {
 2  
 3  ... 
 4
 5  public void send(String body) {
 6    if (isStopped) {
 7      throw new IllegalStateException(String.format(
 8          "Producer %s was stopped and fail to deliver requested message [%s].", body, name));
 9    }
10    var msg = new SimpleMessage();
11    msg.addHeader(Headers.DATE, new Date().toString());
12    msg.addHeader(Headers.SENDER, name);
13    msg.setBody(body);
14
15    try {
16      queue.put(msg);
17    } catch (InterruptedException e) {
18      // allow thread to exit
19      LOGGER.error("Exception caught.", e);
20    }
21  }
22
23  public void stop() {
24    isStopped = true;
25    try {
26      queue.put(Message.POISON_PILL);
27    } catch (InterruptedException e) {
28      // allow thread to exit
29      LOGGER.error("Exception caught.", e);
30    }
31  }
32}
33
34public class Consumer {
35
36  ...
37
38  public void consume() {
39    while (true) {
40      try {
41        var msg = queue.take();
42        if (Message.POISON_PILL.equals(msg)) {
43          LOGGER.info("Consumer {} receive request to terminate.", name);
44          break;
45        }
46        var sender = msg.getHeader(Headers.SENDER);
47        var body = msg.getBody();
48        LOGGER.info("Message [{}] from [{}] received by [{}]", body, sender, name);
49      } catch (InterruptedException e) {
50        // allow thread to exit
51        LOGGER.error("Exception caught.", e);
52        return;
53      }
54    }
55  }
56}

Finally we are ready to present the whole example in action.

 1    var queue = new SimpleMessageQueue(10000);
 2
 3    final var producer = new Producer("PRODUCER_1", queue);
 4    final var consumer = new Consumer("CONSUMER_1", queue);
 5
 6    new Thread(consumer::consume).start();
 7
 8    new Thread(() -> {
 9      producer.send("hand shake");
10      producer.send("some very important information");
11      producer.send("bye!");
12      producer.stop();
13    }).start();

Program output:

Message [hand shake] from [PRODUCER_1] received by [CONSUMER_1]
Message [some very important information] from [PRODUCER_1] received by [CONSUMER_1]
Message [bye!] from [PRODUCER_1] received by [CONSUMER_1]
Consumer CONSUMER_1 receive request to terminate.

Class diagram

alt text

Applicability

Use the Poison Pill idiom when:

  • There’s a need to send signal from one thread/process to another to terminate.

Real world examples