Producer–consumer problem
In computing, the producer–consumer problem[1][2] (also known as the bounded-buffer problem) is a classic example of a multi-process synchronization problem, proposed by Edsger W. Dijkstra.[3] The problem describes two processes, the producer and the consumer, who share a common, fixed-size buffer used as a queue. The producer's job is to generate data, put it into the buffer, and start again. At the same time, the consumer is consuming the data (i.e., removing it from the buffer), one piece at a time. The problem is to make sure that the producer won't try to add data into the buffer if it's full and that the consumer won't try to remove data from an empty buffer.
The solution for the producer is to either go to sleep or discard data if the buffer is full. The next time the consumer removes an item from the buffer, it notifies the producer, who starts to fill the buffer again. In the same way, the consumer can go to sleep if it finds the buffer empty. The next time the producer puts data into the buffer, it wakes up the sleeping consumer. The solution can be reached by means of inter-process communication, typically using semaphores. An inadequate solution could result in a deadlock where both processes are waiting to be awakened. The problem can also be generalized to have multiple producers and consumers.
Inadequate implementation
To solve the problem, some programmers might come up with a solution shown below. In the solution two library routines are used, sleep
and wakeup
. When sleep is called, the caller is blocked until another process wakes it up by using the wakeup routine. The global variable itemCount
holds the number of items in the buffer.
int itemCount = 0;
procedure producer()
{
while (true)
{
item = produceItem();
if (itemCount == BUFFER_SIZE)
{
sleep();
}
putItemIntoBuffer(item);
itemCount = itemCount + 1;
if (itemCount == 1)
{
wakeup(consumer);
}
}
}
procedure consumer()
{
while (true)
{
if (itemCount == 0)
{
sleep();
}
item = removeItemFromBuffer();
itemCount = itemCount - 1;
if (itemCount == BUFFER_SIZE - 1)
{
wakeup(producer);
}
consumeItem(item);
}
}
The problem with this solution is that it contains a race condition that can lead to a deadlock. Consider the following scenario:
- The
consumer
has just read the variableitemCount
, noticed it's zero and is just about to move inside theif
block. - Just before calling sleep, the consumer is interrupted and the producer is resumed.
- The producer creates an item, puts it into the buffer, and increases
itemCount
. - Because the buffer was empty prior to the last addition, the producer tries to wake up the consumer.
- Unfortunately, the consumer wasn't yet sleeping, and the wakeup call is lost. When the consumer resumes, it goes to sleep and will never be awakened again. This is because the consumer is only awakened by the producer when
itemCount
is equal to 1. - The producer will loop until the buffer is full, after which it will also go to sleep.
Since both processes will sleep forever, we have run into a deadlock. This solution therefore is unsatisfactory.
An alternative analysis is that if the programming language does not define the semantics of concurrent accesses to shared variables (in this case itemCount
) with use of synchronization, then the solution is unsatisfactory for that reason, without needing to explicitly demonstrate a race condition.
Using semaphores
Semaphores solve the problem of lost wakeup calls. In the solution below we use two semaphores, fillCount
and emptyCount
, to solve the problem. fillCount
is the number of items already in the buffer and available to be read, while emptyCount
is the number of available spaces in the buffer where items could be written. fillCount
is incremented and emptyCount
decremented when a new item is put into the buffer. If the producer tries to decrement emptyCount
when its value is zero, the producer is put to sleep. The next time an item is consumed, emptyCount
is incremented and the producer wakes up. The consumer works analogously.
semaphore fillCount = 0; // items produced
semaphore emptyCount = BUFFER_SIZE; // remaining space
procedure producer()
{
while (true)
{
item = produceItem();
down(emptyCount);
putItemIntoBuffer(item);
up(fillCount);
}
}
procedure consumer()
{
while (true)
{
down(fillCount);
item = removeItemFromBuffer();
up(emptyCount);
consumeItem(item);
}
}
The solution above works fine when there is only one producer and consumer. With multiple producers sharing the same memory space for the item buffer, or multiple consumers sharing the same memory space, this solution contains a serious race condition that could result in two or more processes reading or writing into the same slot at the same time. To understand how this is possible, imagine how the procedure putItemIntoBuffer()
can be implemented. It could contain two actions, one determining the next available slot and the other writing into it. If the procedure can be executed concurrently by multiple producers, then the following scenario is possible:
- Two producers decrement
emptyCount
- One of the producers determines the next empty slot in the buffer
- Second producer determines the next empty slot and gets the same result as the first producer
- Both producers write into the same slot
To overcome this problem, we need a way to make sure that only one producer is executing putItemIntoBuffer()
at a time. In other words, we need a way to execute a critical section with mutual exclusion. The solution for multiple producers and consumers is shown below.
mutex buffer_mutex; // similar to "semaphore buffer_mutex = 1", but different (see notes below)
semaphore fillCount = 0;
semaphore emptyCount = BUFFER_SIZE;
procedure producer()
{
while (true)
{
item = produceItem();
down(emptyCount);
down(buffer_mutex);
putItemIntoBuffer(item);
up(buffer_mutex);
up(fillCount);
}
}
procedure consumer()
{
while (true)
{
down(fillCount);
down(buffer_mutex);
item = removeItemFromBuffer();
up(buffer_mutex);
up(emptyCount);
consumeItem(item);
}
}
Notice that the order in which different semaphores are incremented or decremented is essential: changing the order might result in a deadlock. It is important to note here that though mutex seems to work as a semaphore with value of 1 (binary semaphore), but there is difference in the fact that mutex has ownership concept. Ownership means that mutex can only be "incremented" back (set to 1) by the same process that "decremented" it (set to 0), and all other tasks wait until mutex is available for decrement (effectively meaning that resource is available), which ensures mutual exclusivity and avoids deadlock. Thus using mutexes improperly can stall many processes when exclusive access is not required, but mutex is used instead of semaphore.
Using monitors
The following pseudo code shows a solution to the producer–consumer problem using monitors. Since mutual exclusion is implicit with monitors, no extra effort is necessary to protect the critical section. In other words, the solution shown below works with any number of producers and consumers without any modifications. It is also noteworthy that it is less likely for a programmer to write code that suffers from race conditions when using monitors than when using semaphores.
monitor ProducerConsumer
{
int itemCount = 0;
condition full;
condition empty;
procedure add(item)
{
if (itemCount == BUFFER_SIZE)
{
wait(full);
}
putItemIntoBuffer(item);
itemCount = itemCount + 1;
if (itemCount == 1)
{
notify(empty);
}
}
procedure remove()
{
if (itemCount == 0)
{
wait(empty);
}
item = removeItemFromBuffer();
itemCount = itemCount - 1;
if (itemCount == BUFFER_SIZE - 1)
{
notify(full);
}
return item;
}
}
procedure producer()
{
while (true)
{
item = produceItem();
ProducerConsumer.add(item);
}
}
procedure consumer()
{
while (true)
{
item = ProducerConsumer.remove();
consumeItem(item);
}
}
Without semaphores or monitors
The producer–consumer problem, particularly in the case of a single producer and single consumer, strongly relates to implementing a FIFO or a channel. The producer–consumer pattern can provide highly efficient data communication without relying on semaphores, mutexes, or monitors for data transfer. The use of those primitives can be expansive in terms of performance, in comparison to basic read/write atomic operation. Channels and FIFOs are popular just because they avoid the need for end-to-end atomic synchronization. A basic example coded in C is shown below. Note that:
- Atomic read-modify-write access to shared variables is avoided, as each of the two
Count
variables is updated only by a single thread. Also, these variables support an unbounded number of increment operations; the relation remains correct when their values wrap around on an integer overflow. - This example does not put threads to sleep, which may be acceptable depending on the system context. The
schedulerYield()
is inserted as an attempt to improve performance, and may be omitted. Thread libraries typically require semaphores or condition variables to control the sleep/wakeup of threads. In a multi-processor environment, thread sleep/wakeup would occur much less frequently than passing of data tokens, so avoiding atomic operations on data passing is beneficial. - This example does not work for multiple producers and/or consumers because there is a race condition when checking the state. For example, if only one token is in the storage buffer and two consumers find the buffer non-empty, then both will consume the same token, and possibly increase the counter for the consumed tokens above the counter for the produced tokens.
- This example, as written, requires that
UINT_MAX + 1
is evenly divisible byBUFFER_SIZE
; if it is not evenly divisible,[Count % BUFFER_SIZE]
produces the wrong buffer index afterCount
wraps pastUINT_MAX
back to zero. An alternative solution that avoids this limitation employs two additionalIdx
variables to track the current buffer index for the head (producer) and tail (consumer). TheseIdx
variables would be used in place of[Count % BUFFER_SIZE]
, and each of them would have to be incremented at the same time as the respectiveCount
variable is incremented, as follows:Idx = (Idx + 1) % BUFFER_SIZE
. - The two
Count
variables need to be sufficiently small to support atomic read and write actions. Otherwise, there is a race condition where the other thread reads a partially-updated and thus a wrong value.
volatile unsigned int produceCount = 0, consumeCount = 0;
TokenType sharedBuffer[BUFFER_SIZE];
void producer(void) {
while (1) {
while (produceCount - consumeCount == BUFFER_SIZE) {
schedulerYield(); /* sharedBuffer is full */
}
/* Write to sharedBuffer _before_ incrementing produceCount */
sharedBuffer[produceCount % BUFFER_SIZE] = produceToken();
/* Memory barrier required here to ensure update of the sharedBuffer is
visible to other threads before the update of produceCount */
++produceCount;
}
}
void consumer(void) {
while (1) {
while (produceCount - consumeCount == 0) {
schedulerYield(); /* sharedBuffer is empty */
}
consumeToken(&sharedBuffer[consumeCount % BUFFER_SIZE]);
++consumeCount;
}
}
The above solution employs counters that, when used frequently, may become overloaded and reach their maximal value UINT_MAX
. The idea outlined on the fourth bullet, originally suggested by Leslie Lamport,[4] explains how the counters can be replaced with finite-range counters. Specifically, they can be replaced with finite-range counters with maximal value N, the buffer's capacity.
Four decades after the presentation of the producer-consumer problem, Aguilera, Gafni and Lamport showed that the problem can be solved so that the processes access only fixed-range counters (i.e. a range that is independent of the size of the buffer) while determining if the buffer is empty or full.[5] The motivation for this efficiency measure is to accelerate interactions between a processor and devices that interact through FIFO channels. They proposed a solution in which counters of maximal value are read to determine whether it is safe to access the buffer. However, their solution still employs unbounded counters that infinitely grow, only that those counters are not accessed during the described check-phase.
Later, Abraham and Amram [6] proposed a simpler solution, presented below in pseudo-code, that possesses the discussed fixed-range property. The solution employs counters of maximal value N. However, to determine whether the buffer is empty or full, the processes access only finite-range single writer registers. Each of the processes owns a 12-valued single-writer. The producer process writes to Flag_p
, and the consumer process writes to Flap_c
, both are 3-field arrays. Flag_p[2]
and Flag_c[2]
may store `full’, `empty’, or `safe’, which correspondingly indicate if the buffer is full, empty, or neither full nor empty.
The idea behind the algorithm is as follows. The processes count the number of items delivered and removed modulo N+1 through registers CountDelivered
and CountRemoved
. When a process delivers or removes an item it compares those counters and thus successfully determines the buffer’s status, and stores this data into Flag_p[2]
, or Flag_c[2]
. In a check phase, the executing process reads Flag_p
and Flag_c
, and tries to estimate which value among Flag_p[2]
and Flag_c[2]
reflects the current status of the buffer. Two synchronization techniques assist to achieve this goal.
- After delivering an item, the producer writes to
Flag_p[0]
the value it read fromFlag_c[0]
, and after removing an item, the consumer writes toFlag_c[1]
the value:1-Flag_p[0]
. Hence, conditionFlag_p[0] == Flag_c[0]
suggests that the producer recently checked the buffer’s status, whileFlag_p[0] != Flag_c[0]
suggests the opposite. - A delivery(removal) operation ends by writing to
Flag_p[1]
(Flag_c[1]
) the value stored inFlag_p[0]
(Flag_c[0]
). Hence, conditionFlag_p[0] == Flag_p[1]
suggests that the producer finished its last deliver operation. Likewise, ConditionFlag_c[0] = Flag_c[1]
suggests that the consumer’s last removal was already terminated.
Therefore, at the check phase, if the producer finds that Flag_c[0] != Flag_p[0] & Flag_c[0] == Flag_c[1]
, it acts according to the value of Flag_c[2]
, and otherwise according to the value stored in Flag_p[2]
. Analogously, if the consumer finds that Flag_p[0] == Flag_c[0] & Flag_p[0] == Flag_p[1]
, it acts according to the value of Flag_p[2]
, and otherwise according to the value stored in Flag_c[2]
.
At the code below, capitalized variables indicate shared registers, written by one of the processes and read by both processes. Non-capitalized variables are local variables into which the processes copy the values read from the shared registers.
countDelivered = 0; countRemoved=0;
Flag_p[0] = 0; Flag_p[1] = 0; Flag_p[2] = `empty’;
Flag_c[0] = 0; Flag_c[1] = 0; Flag_c[2] = `empty’;
procedure producer()
{
while (true) {
item = produceItem();
/* check phase: busy wait until the buffer is not full */
repeat{
flag_c = Flag_c;
if (flag_c[0] != Flag_p[0] & flag_c[0] == flag_c[1]) ans = flag_c[2];
else ans = Flag_p[2];}
until(ans != `full’)
/* item delivery phase */
putItemIntoBuffer(item);
CountDeliverd = countDelivered+1 % N+1;
flag_c = Flag_c;
Flag_p[0] = flag_c[0];
removed = CountRemoved;
if (CountDelivered – removed == N) { Flag_p[1] = flag_c[0]; Flag_p[2] = `full’;}
if (CountDelivered – removed == 0) { Flag_p[1] = flag_c[0]; Flag_p[2] = `empty’;}
if (0 < CountDelivered – removed < N) { Flag_p[1] = flag_c[0]; Flag_p[2] = `safe’;}
}
}
procedure consumer()
{
while (true) {
/* check phase: busy wait until the buffer is not empty */
repeat{
flag_p = Flag_p;
if (flag_p[0] == Flag_c[0] & flag_p[1] == flag_p[0]) ans = flag_p[2]);
else ans = Flag_c[2];}
until(ans != `empty’)
/* item removal phase */
Item = removeItemFromBuffer();
countRemoved = countRemoved+1 % N+1;
flag_p = Flag_p;
Flag_c[0] = 1-flag_p[0];
delivered = CountDelivered;
if (delivered – CountRemoved == N) { Flag_c[1] = 1-flag_p[0]; Flag_c[2] = `full’;}
if (delivered – CountRemoved == 0) { Flag_c[1] = 1-flag_p[0]; Flag_c[2] = `empty’;}
if (0 < delivered – CountRemoved < N) { Flag_c[1] = 1-flag_p[0]; Flag_c[2] =`safe’;}
}
}
The correctness of the code relies on the assumption that the processes can read an entire array, or write to several fields of an array, in a single atomic action. Since this assumption is not realistic, in practice, one should replace Flag_p
and Flag_c
with (log(12)-bit) integers that encode the values of those arrays. Flag_p
and Flag_c
are presented here as arrays only for the readability of the code.
References
- Arpaci-Dusseau, Remzi H.; Arpaci-Dusseau, Andrea C. (2014), Operating Systems: Three Easy Pieces [Chapter: Condition Variables] (PDF), Arpaci-Dusseau Books
- Arpaci-Dusseau, Remzi H.; Arpaci-Dusseau, Andrea C. (2014), Operating Systems: Three Easy Pieces [Chapter: Semaphores] (PDF), Arpaci-Dusseau Books
- Dijkstra, E. W. "Information streams sharing a finite buffer." Information Processing Letters 1.5 (1972): 179-180.
- Lamport, Leslie. "Proving the correctness of multiprocess programs." IEEE transactions on software engineering 2 (1977): 125-143.
- Aguilera, Marcos K., Eli Gafni, and Leslie Lamport. "The mailbox problem." Distributed Computing 23.2 (2010): 113-134.
- Abraham, Uri, and Gal Amram. "Two-process synchronization." Theoretical Computer Science 688 (2017): 2-23.
Further reading
- Mark Grand Patterns in Java, Volume 1, A Catalog of Reusable Design Patterns Illustrated with UML
- C/C++ Users Journal (Dr.Dobb's) January 2004, "A C++ Producer-Consumer Concurrency Template Library", by Ted Yuan, is a ready-to-use C++ template library. The small template library source code and examples can be found here