Mercurial > hg > GearsTemplate
changeset 492:9333486471b9
Add boundedBuffer example
author | Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Sat, 30 Dec 2017 22:03:33 +0900 |
parents | f985815ad032 |
children | 82f0c49750f1 |
files | src/parallel_execution/CMakeLists.txt src/parallel_execution/context.h src/parallel_execution/examples/boundedBuffer/BoundedBuffer.cbc src/parallel_execution/examples/boundedBuffer/SemaphoreImpl.cbc |
diffstat | 4 files changed, 145 insertions(+), 2 deletions(-) [+] |
line wrap: on
line diff
--- a/src/parallel_execution/CMakeLists.txt Sat Dec 30 03:16:05 2017 +0900 +++ b/src/parallel_execution/CMakeLists.txt Sat Dec 30 22:03:33 2017 +0900 @@ -131,5 +131,12 @@ TARGET rbtree SOURCES - SingleLinkedQueue.cbc test/rbTree_test.cbc RedBlackTree.cbc SingleLinkedStack.cbc + SingleLinkedQueue.cbc test/rbTree_test.cbc RedBlackTree.cbc SingleLinkedStack.cbc ) + +GearsCommand( + TARGET + boundedBuffer + SOURCES + examples/boundedBuffer/SemaphoreImpl.cbc examples/boundedBuffer/BoundedBuffer.cbc examples/boundedBuffer/consumer.cbc examples/boundedBuffer/producer.cbc CPUWorker.cbc TaskManagerImpl.cbc SingleLinkedQueue.cbc SynchronizedQueue.cbc MultiDimIterator.cbc AtomicReference.cbc +)
--- a/src/parallel_execution/context.h Sat Dec 30 03:16:05 2017 +0900 +++ b/src/parallel_execution/context.h Sat Dec 30 22:03:33 2017 +0900 @@ -326,7 +326,6 @@ struct SemaphoreImpl { int value; pthread_mutex_t mutex; - pthread_cond_t cond; } SemaphoreImpl; struct Allocate { enum Code next;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/parallel_execution/examples/boundedBuffer/BoundedBuffer.cbc Sat Dec 30 22:03:33 2017 +0900 @@ -0,0 +1,76 @@ +#include "../../context.h" +#interface Queue.h +#interface Semaphore.h + +Queue* createBoundedBuffer(struct Context* context, int size) { + struct Queue* queue = new Queue(); + struct BoundedBuffer* boundedBuffer = new BoundedBuffer(); + boundedBuffer->top = new Element(); + boundedBuffer->top->next = NULL; + boundedBuffer->last = boundedBuffer->top; + boundedBuffer->fullCount = createSemaphoreImpl(context, 0); + boundedBuffer->emptyCount = createSemaphoreImpl(context, size); + boundedBuffer->lock = createSemaphoreImpl(context, 1); // binary semaphore + queue->queue = (union Data*)boundedBuffer; + queue->take = C_takeBoundedBuffer; + queue->put = C_putBoundedBuffer; + queue->isEmpty = C_isEmptyBoundedBuffer; + queue->clear = C_clearBoundedBuffer; + return queue; +} + +__code putBoudnedBuffer(struct BoundedBuffer* boundedBuffer, union Data* data, __code next(union Data* data, ...)) { + struct Semaphore sem = boundedBuffer->emptyCount; + goto sem->p(putBoudnedBuffer1); +} + +__code putBoudnedBuffer1(struct BoundedBuffer* boundedBuffer, union Data* data, __code next(union Data* data, ...)) { + struct Semaphore sem = boundedBuffer->lock; + goto sem->p(putBoudnedBuffer2); +} + +__code putBoudnedBuffer2(struct BoundedBuffer* boundedBuffer, union Data* data, __code next(union Data* data, ...)) { + struct Element* element = new Element(); + element->data = data; + element->next = NULL; + struct Element* last = queue->last; + last->next = element; + struct Semaphore sem = boundedBuffer->lock; + goto sem->v(putBoudnedBuffer3); +} + +__code putBoudnedBuffer3(struct BoundedBuffer* boundedBuffer, union Data* data, __code next(union Data* data, ...)) { + struct Semaphore sem = boundedBuffer->fullCount; + goto sem->v(putBoudnedBuffer4); +} + +__code putBoudnedBuffer4(struct BoundedBuffer* boundedBuffer, union Data* data, __code next(union Data* data, ...)) { + goto next(data, ...); +} +__code takeBoudnedBuffer(struct BoundedBuffer* boundedBuffer, __code next(union Data* data, ...)) { + struct Semaphore sem = boundedBuffer->fullCount; + goto sem->p(takeBoudnedBuffer1); +} + +__code takeBoudnedBuffer1(struct BoundedBuffer* boundedBuffer, __code next(union Data* data, ...)) { + struct Semaphore sem = boundedBuffer->lock; + goto sem->p(takeBoudnedBuffer2); +} + +__code takeBoudnedBuffer2(struct BoundedBuffer* boundedBuffer, __code next(union Data* data, ...)) { + struct Element* top = queue->top; + struct Element* nextElement = top->next; + data = nextElement->data; + queue->top = nextElement; + struct Semaphore sem = boundedBuffer->lock; + goto sem->v(takeBoudnedBuffer3); +} + +__code takeBoudnedBuffer3(struct BoundedBuffer* boundedBuffer, __code next(union Data* data, ...)) { + struct Semaphore sem = boundedBuffer->emptyCount; + goto sem->v(takeBoudnedBuffer4); +} + +__code takeBoudnedBuffer4(struct BoundedBuffer* boundedBuffer, __code next(union Data* data, ...)) { + goto next(data, ...); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/parallel_execution/examples/boundedBuffer/SemaphoreImpl.cbc Sat Dec 30 22:03:33 2017 +0900 @@ -0,0 +1,61 @@ +#include "../context.h" +#interface "semaphore.h" + +Semaphore* createSemaphoreImpl(struct Context* context, int n) { + struct Semaphore* semaphore = new Semaphore(); + struct SemaphoreImpl* semaphoreImpl = new SemaphoreImpl(); + semaphore->semaphore = (union Data*)semaphoreImpl; + semaphoreImpl->value = n; + semaphoreImpl->waitThreadQueue = createSynchronizedQueue(context); + pthread_mutex_init(&semaphoreImpl->mutex, NULL); + semaphore->p = C_pOperationSemaphoreImpl; + semaphore->v = C_vOperationSemaphoreImpl; + return semaphore; +} + +__code pOperationSemaphoreImpl(struct SemaphoreImpl* semaphore, __code next(...)) { + pthread_mutex_lock(&semaphore->mutex); + goto meta(context, C_pOperationSemaphoreImpl1); +} + +__code pOperationSemaphoreImpl1(struct SemaphoreImpl* semaphore, __code next(...)) { + if(semaphore->value == 0) { + context->next= C_pOperationSemaphoreImpl; + struct Queue* queue = semaphoreImpl->waitThreadQueue; + goto queue->put(context, pOperationSemaphoreImpl2); // put this context(thread, process) + } + semaphore->value--; + pthread_mutex_unlock(&semaphore->mutex); + goto next(...); +} + +__code pOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, struct Worker* worker, __code next(...)) { + pthread_mutex_unlock(&semaphore->mutex); + goto worker->taskReceive(); // goto shceduler +} + +__code pOperationSemaphoreImpl2_stub(sturct Context* context) { + // switch worker context + struct Context* workerContext = context->worker->worker->CPUWorker.context; + SemaphoreImpl* semaphoreImpl = (SemaphoreImpl*)GearImpl(context, Semaphore, semaphore); + goto odgCommitCPUWorker(workerContext, + semaphoreImpl, + context->worker, + Gearef(context, Semaphore)->next); +} + +__code vOperationSemaphoreImpl(struct SemaphoreImpl* semaphore, __code next(...)) { + pthread_mutex_lock(&semaphore->mutex); + semaphore->value++; + struct Queue* queue = semaphoreImpl->waitThreadQueue; + goto queue->take(context, vOperationSemaphoreImpl1); +} + +__code vOperationSemaphoreImpl1(struct SemaphoreImpl* semaphore, __code next(...), struct Context* waitTask) { + goto taskManager->spawn(waitTask, vOperationSemaphoreImpl2); //notify +} + +__code vOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, __code next(...), struct Context* waitTask) { + pthread_mutex_unlock(&semaphore->mutex); + goto next(...); +}