In the previous producer/consumer project, we requested unix system shared memory and unix system semaphores. We used the unix fork command to create the producer and the consumer.
This assignment will be similar, however, we are going to use posix threads, condition variables, and mutex locks to control the execution of two producers and two consumers to ensure the CORRECTNESS of the solution.
What is the implication of using threads? You do not have to ask for shared memory. You can define global variables and each thread has access to them. You may think this is much easier, but beware!!!!!! Global variables are VERY error prone. Be sure to only define globally what the threads must have access to. You will be penalized if you define any global variables that are not necessary.
Like in the previous assignment, you are going to create a buffer of 10 integers that producers are going to produce into and consumers are going to consume from. This should be defined globally as well as the mutexes and condition variables necessary to control the execution of your code.
Mutex: A variable that can be in one of two states: locked or unlocked.This is different than the semaphores that we used in the previous assignment. Counting semaphores can take on integer values.
Condition Variables: Condition variables do not have an internal state value associated with them. Condition variables have two operations associated with them: wait and signal. When a process/thread waits on a condition variable, it causes the process/thread to block unconditionally. When a process/thread issues a signal to a condition variable, if there are no processes/threads waiting on the condition variable, the result is no operation. If there are processes/threads waiting on the condition variable, one process/thread is activated at the instruction following the wait on the condition variable. Note that posix thread support offers a broadcast signal that wakes up all processes/threads waiting on the associated condition variable. This may be convenient to use in this assignment.
To Do
Your main program should create/initialize all of the condition variables, mutexes, etc. necessary for your solution. It should then create two producer threads and two consumer threads. It should associate a thread number with each thread and pass that thread number to the thread on the call to pthread_create().
It should wait, using pthread_join, for the 4 threads to complete and terminate.
The two producer threads should cooperate to produce the values 0-99 into the 10-integer shared buffer. Producers must not interfere with each other. Your solution must not unnecessarily restrict parallelism. The producers must not produce into a buffer slot that has not yet been consumed. The producer must produce the "next" value (2 is the next value after 1).
The producing producer should print a message with its thread number and the value being produced.
After the value 99 has been produced, both producers must terminate.
The two consumer threads must cooperate to consume the values placed in the 10-integer buffer until all values have been consumed. By consume, the values must be displayed.
The consuming consumer must print its thread number along with the value being consumed.
The values must be consumed in the order in which they were produced. After the value 99 has been consumed, both consumers must terminate.
Your solution must present by the two consumers the values 0-99, in order, on the screen. It must not deadlock. All 4 threads must properly terminate. You must not unnecessarily restrict parallelism between producers/producers, consumers/consumers, and producers/consumers. This is not trivial. You have to think very carefully about how to cooperate so that parallelism is not restricted and so that you do not create deadlock. One producer/consumer will produce/consume the last item, the other producer/consumer will be waiting to produce/consume the next item, you must properly handle this.
Just because you run your program over and over again and observe the correct results on the screen does not mean that you have produced a correct solution. You must go through the source code, line by line, and determine if there are places where shared variables are accessed outside of mutual exclusion. Be careful how you structure your loops and tests for termination. You must ensure that you do not have race conditions or conditions for deadlock.
The following function definitions were compiled by Leo Plocicki in completing this assignment as an independent study. You may also wish to use other posix pthread functions. For example, there is also a pthread_cond_signal function. Some of these you may not use as there are many ways to accomplish this task. See the man pages for a complete list.
pthread_t - pthread thread type
pthread_cond_t - pthread condition variable type
pthread_mutex_t - pthread mutex type
pthread_once_t - pthread once type (see explanation
of pthread_once())
PTHREAD_MUTEX_INITIALIZER - default initialization
for statically defined
mutexes
PTHREAD_COND_INITIALIZER - default initialization
for statically defined
condition variables
PTHREAD_ONCE_INIT - default initialization for
the pthread once variable
for use with the pthread_once
function
pthread_mutex_lock(pthread_mutex_t *mv)- This function
will lock the mutex
pointed to by the mv pointer.
pthread_mutex_unlock(pthread_mutex_t *mv)- This
function will unlock the mutex
pointed to by the mv pointer.
pthread_cond_wait(pthread_cond_t *cv,pthread_mutex_t
*mv)- This function
performs a wait on the condition variable pointed to
by cv, note that a mutex
variable is also passed as an argument. A locked mutex
must ALWAYS be
associated with the condition variable when calling pthread_cond_wait.
This is
because when the calling process performs the wait the
mutex is released
allowing any process waiting on the mutex to run. When
the waiting process is
signaled the mutex will be reacquired and the process
will continue to run.
pthread_cond_broadcast(pthread_cond_t *cv)- This
function signals ALL processes
waiting on the condition variable pointed to by cv. Note
that this does not
release only the first process on the condition variables
wait queue, this
releases every process on the queue and allows them to
compete for control.
pthread_once(pthread_once_t *once_variable,void
*once_routine)- This function
pthread_create(pthread_t *thread,pthread_attr_t
*attr,
This is the function used to create a thread. A thread
identifier is stored in
Note that both the function call and the argument to the
function must be cast
A limitation to pthread_create is that only 1 argument
may be passed to the
pthread_join(pthread *thread,void **thread_return)
- This function is analogous
The second argument returns one of 2 values, either the
value passed to
pthread_mutex_destroy(pthread_mutex_t *mutex) -
Deallocates the memory and data
pthread_cond_destroy(pthread_cond_t *cond) - Deallocates
the memory and data
I have provided an example that compiles with GCC for code written in C and an example that compiles with G++ for code written in C++. This assignment is to be written in C. The C++ example is provided for completeness.
#include <stdio.h>
#define READ 1
int data;
pthread_mutex_t lockit=PTHREAD_MUTEX_INITIALIZER;
void produce(int thread_number)
}/* end for*/
void consume(int thread_number)
pthread_mutex_lock(&lockit);
int main()
if((pthread_join(threads[0],&statusp))!=0)
You should flush stdout after each print statements
#include <stdio.h>
#define READ 1
#define WRITTEN 2
int data;
int flag=READ;
pthread_mutex_t lockit=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t signalit=PTHREAD_COND_INITIALIZER;
void *produce(void *thread_number)
{
int i;
printf("I am the producer and my thread number is %ld \n",
(long) thread_number);
for (i=1; i<=10;i++)
{
pthread_mutex_lock(&lockit);
while (flag !=READ)
{
pthread_cond_wait(&signalit,&lockit);
}
data=i;
flag=WRITTEN;
pthread_cond_broadcast(&signalit);
pthread_mutex_unlock(&lockit);
}/* end for*/
printf("producer exiting\n");
return(NULL);
}//end of producer
void *consume(void *thread_number)
{
int i;
printf("I am the consumer and my thread number is %ld \n",
(long)thread_number);
for (i=1;i<=10;i++)
{
pthread_mutex_lock(&lockit);
while (flag != WRITTEN)
{
pthread_cond_wait(&signalit,&lockit);
}
printf("the data is: %i\n",data);
flag=READ;
pthread_cond_broadcast(&signalit);
pthread_mutex_unlock(&lockit);
}/* end for*/
printf("consumer exiting\n");
return(NULL);
}
int main()
{
void *statusp;
pthread_t *threads;
if((threads=(pthread_t *)malloc(sizeof(pthread_t)*2))==NULL)
{
printf("error allocating\n");
exit(-1);
}
if((pthread_create(&threads[0],NULL,produce,(void *)0))!=0)
{
printf("error creating producer\n");
exit(-1);
}
if(pthread_create(&threads[1],NULL,consume,(void *)1)!=0)
{
printf("error creating consumer\n");
}
if((pthread_join(threads[0],&statusp))!=0)
{
printf("couldn't join with producer\n");
}
if((pthread_join(threads[1],&statusp))!=0)
{
printf("couldn't join with consumer\n");
}
if((pthread_mutex_destroy(&lockit))!=0)
{
printf("mutex destroy failed\n");
}
if((pthread_cond_destroy(&signalit))!=0)
{
printf("cond destroy failed\n");
}
}
Hand in
This page is copyright © protected by Barbara Bracken
Last modification: 6/16/2015
is used when a function must be called only once. A pthread_once_t
variable
must be statically initialized with the PTHREAD_ONCE_INIT
definition and
associated with each function called with pthread_once.
This is so exclusive
access to the function can be ensured. Once a pthread_once_t
variable is
associated with a function no other thread may call that
function through
pthread_once without using the same variable. This mechanism
works by allowing
the first thread that calls the function to enter and
complete the function,
while any other thread that calls the function will wait,
if the first thread
has not yet finished, or to continue if the function
has already been executed
once.
void *(start_routine),void *(argument))-
the location pointed to by the "thread" argument. Attributes
for the thread can
be set by passing a pthread_attr_t to argument 2. If
the thread is to be created
with the default attributes NULL should be passed as
the argument. The new thread
will begin execution in the "start_routine" function
passed as argument 3, and
an argument can be sent to "start_routine" as argument
4.
as a (void *).
function passed as the start routine.
to the systemV ipc function wait(), The calling thread
is suspended until the
thread pointed to by the first argument exits (through
pthread_exit() or by an
implicit return). A threads resources cannot be regained
until a it has "joined"
with a process calling pthread_join(). Therefore pthread_join
must be called for
each "joinable" thread. A "joinable" thread is any thread
not created with the
PTHREAD_CREATE_DETACHED attribute or one that does not
call pthread_detach().
pthread_exit() or PTHREAD_CANCELED if the thread was
canceled.
structures associated with the mutex pointed to by the
mutex argument. No Thread
can have the mutex locked when this function is called.
structures associated with the condition variable pointed
to by the cond
argument. No threads can be waiting on the condition
variable when this function
is called.
Example - Sample Code
Below is a simple example that uses some of the functions.
It is a single producer/consumer simply producing a value into a single/byte
buffer. The logic in this example is not correct logic for this project. In this example, the producer and consumer are in step: produce one, consumer one. In the project, you have 10 buffers to fill. The producers and consumers should be able to run independently as long as they have what they need.
This code compiles with GCC. It will not compile with G++
#include <stdlib.h>
#include <pthread.h>
#define WRITTEN 2
int flag=READ;
pthread_cond_t signalit=PTHREAD_COND_INITIALIZER;
{
int i;
printf("I am the producer and my thread number
is %i\n",
thread_number);
for (i=1; i<=10;i++)
{
pthread_mutex_lock(&lockit);
while (flag
!=READ)
{
pthread_cond_wait(&signalit,&lockit);
}
data=i;
flag=WRITTEN;
pthread_cond_broadcast(&signalit);
pthread_mutex_unlock(&lockit);
printf("producer exiting\n");
}//end of producer
{
int i;
printf("I am the consumer and my thread number is %i\n",
thread_number);
for (i=1;i<=10;i++)
{
while (flag != WRITTEN)
{
pthread_cond_wait(&signalit,&lockit);
}
printf("the data is: %i\n",data);
flag=READ;
pthread_cond_broadcast(&signalit);
pthread_mutex_unlock(&lockit);
}/* end for*/
printf("consumer exiting\n");
}
{
void *statusp;
pthread_t *threads;
if((threads=malloc(sizeof(pthread_t)*2))==NULL)
{
printf("error allocating\n");
exit(-1);
}
if((pthread_create(&threads[0],NULL,(void
*)produce,(void *)0))!=0)
{
printf("error creating producer\n");
exit(-1);
}
if(pthread_create(&threads[1],NULL,(void *)consume,(void
*)1)!=0)
{
printf("error creating consumer\n");
}
{
printf("couldn't join with producer\n");
}
if((pthread_join(threads[1],&statusp))!=0)
{
printf("couldn't join with consumer\n");
}
if((pthread_mutex_destroy(&lockit))!=0)
{
printf("mutex destroy failed\n");
}
if((pthread_cond_destroy(&signalit))!=0)
{
printf("cond destroy failed\n");
}
}
The following sample code will compile with G++
#include <stdlib.h>
#include <pthread.h>
Use turnin to submit your program. Select prodconth. The name
of your program must be prodconth.c. You
must test the return value of all system calls. Your program must be liberal
with comments. You must not unnecessarily restrict parallelism. All functions
must properly terminate.