Mercurial > hg > Applications > Grep
view parallel_processing/chapter3/ppb_cond_queue/ppb_cond_queue.c @ 122:188d866227a4 pairPro
fix
author | Masataka Kohagura <kohagura@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Mon, 30 Nov 2015 23:43:53 +0900 |
parents | 508b47c8f4d8 |
children |
line wrap: on
line source
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <stdbool.h> #include <pthread.h> #include "ppb_queue.h" #define THREAD_NUM 2 #define DATA_NUM 10 #define MAX_QUEUE_NUM 3 #define THREAD_DATA_NUM (DATA_NUM / THREAD_NUM) extern void enqueue(queue_t, int); extern void dequeue(queue_t, int); typedef struct _queue { int values[MAX_QUEUE_NUM]; volatile int remain; int rp, wp; pthread_mutex_t mutex; pthread_cond_t not_full; pthread_cond_t not_empty; } queue_t; typedef struct _thread_arg { int id; queue_t *queue; } thread_arg_t; void * producer_func(void *arg) { thread_arg_t *targ = (thread_arg_t*)arg; for (int i = 0; i < THREAD_DATA_NUM; i++) { int num = targ->id * THREAD_DATA_NUM + 1; enqueue(targ->queue, num); printf("[Producer %d] ==> %d \n", targ->id, num); sleep(rand() % 3); } enqueue(targ->queue, END_DATA); return 0; } void * consumer_func(void *arg) { thread_arg_t *targ = (thread_arg_t*)arg; int i; while (1) { dequeue(targ->queue, &i); if (i == END_DATA) break; printf("[Consumer %d] ==> %d \n", targ->id, i); sleep(rand() % 3); } return 0; } int main() { pthread_t producer[THREAD_NUM], consumer[THREAD_NUM]; thread_arg_t ptarg[THREAD_NUM], ctarg[THREAD_NUM]; queue_t queue; int i; /* initialize */ queue.rp = queue.wp = 0; queue.remain = 0; pthread_mutex_init(&queue.mutex, NULL); pthread_cond_init(&queue.not_full, NULL); pthread_cond_init(&queue.not_empty, NULL); /* spawn Producer thread */ for (i = 0; i < THREAD_NUM; ++i) { ptarg[i].id = i; ptarg[i].queue = &queue; pthread_create(&consumer[i], NULL, &producer_func, &ptarg[i]); } /* spawn Consumer thread */ for (i = 0; i < THREAD_NUM; ++i) { ctarg[i].id = i; ctarg[i].queue = &queue; pthread_create(&consumer[i], NULL, &consumer_func, &ctarg[i]); } /* wait for running all thread */ for (i = 0; i < THREAD_NUM; ++i) pthread_join(consumer[i], NULL); return 0; }