changeset 492:9333486471b9

Add boundedBuffer example
author Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp>
date Sat, 30 Dec 2017 22:03:33 +0900
parents f985815ad032
children 82f0c49750f1
files src/parallel_execution/CMakeLists.txt src/parallel_execution/context.h src/parallel_execution/examples/boundedBuffer/BoundedBuffer.cbc src/parallel_execution/examples/boundedBuffer/SemaphoreImpl.cbc
diffstat 4 files changed, 145 insertions(+), 2 deletions(-) [+]
line wrap: on
line diff
--- a/src/parallel_execution/CMakeLists.txt	Sat Dec 30 03:16:05 2017 +0900
+++ b/src/parallel_execution/CMakeLists.txt	Sat Dec 30 22:03:33 2017 +0900
@@ -131,5 +131,12 @@
   TARGET
       rbtree
   SOURCES
-     SingleLinkedQueue.cbc test/rbTree_test.cbc RedBlackTree.cbc SingleLinkedStack.cbc
+      SingleLinkedQueue.cbc test/rbTree_test.cbc RedBlackTree.cbc SingleLinkedStack.cbc
 )
+
+GearsCommand(
+  TARGET
+      boundedBuffer
+  SOURCES
+      examples/boundedBuffer/SemaphoreImpl.cbc examples/boundedBuffer/BoundedBuffer.cbc examples/boundedBuffer/consumer.cbc examples/boundedBuffer/producer.cbc CPUWorker.cbc TaskManagerImpl.cbc SingleLinkedQueue.cbc SynchronizedQueue.cbc MultiDimIterator.cbc AtomicReference.cbc
+)
--- a/src/parallel_execution/context.h	Sat Dec 30 03:16:05 2017 +0900
+++ b/src/parallel_execution/context.h	Sat Dec 30 22:03:33 2017 +0900
@@ -326,7 +326,6 @@
     struct SemaphoreImpl {
         int value;
         pthread_mutex_t mutex;
-        pthread_cond_t cond;
     } SemaphoreImpl;
     struct Allocate {
         enum Code next;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/parallel_execution/examples/boundedBuffer/BoundedBuffer.cbc	Sat Dec 30 22:03:33 2017 +0900
@@ -0,0 +1,76 @@
+#include "../../context.h"
+#interface Queue.h
+#interface Semaphore.h
+
+Queue* createBoundedBuffer(struct Context* context, int size) {
+    struct Queue* queue = new Queue();
+    struct BoundedBuffer* boundedBuffer = new BoundedBuffer();
+    boundedBuffer->top = new Element();
+    boundedBuffer->top->next = NULL;
+    boundedBuffer->last = boundedBuffer->top;
+    boundedBuffer->fullCount = createSemaphoreImpl(context, 0);
+    boundedBuffer->emptyCount = createSemaphoreImpl(context, size);
+    boundedBuffer->lock = createSemaphoreImpl(context, 1); // binary semaphore
+    queue->queue = (union Data*)boundedBuffer;
+    queue->take = C_takeBoundedBuffer;
+    queue->put = C_putBoundedBuffer;
+    queue->isEmpty = C_isEmptyBoundedBuffer;
+    queue->clear = C_clearBoundedBuffer;
+    return queue;
+}
+
+__code putBoudnedBuffer(struct BoundedBuffer* boundedBuffer, union Data* data, __code next(union Data* data, ...)) {
+    struct Semaphore sem = boundedBuffer->emptyCount;
+    goto sem->p(putBoudnedBuffer1);
+}
+
+__code putBoudnedBuffer1(struct BoundedBuffer* boundedBuffer, union Data* data, __code next(union Data* data, ...)) {
+    struct Semaphore sem = boundedBuffer->lock;
+    goto sem->p(putBoudnedBuffer2);
+}
+
+__code putBoudnedBuffer2(struct BoundedBuffer* boundedBuffer, union Data* data, __code next(union Data* data, ...)) {
+    struct Element* element = new Element();
+    element->data = data;
+    element->next = NULL;
+    struct Element* last = queue->last;
+    last->next = element;
+    struct Semaphore sem = boundedBuffer->lock;
+    goto sem->v(putBoudnedBuffer3);
+}
+
+__code putBoudnedBuffer3(struct BoundedBuffer* boundedBuffer, union Data* data, __code next(union Data* data, ...)) {
+    struct Semaphore sem = boundedBuffer->fullCount;
+    goto sem->v(putBoudnedBuffer4);
+}
+
+__code putBoudnedBuffer4(struct BoundedBuffer* boundedBuffer, union Data* data, __code next(union Data* data, ...)) {
+    goto next(data, ...);
+}
+__code takeBoudnedBuffer(struct BoundedBuffer* boundedBuffer, __code next(union Data* data, ...)) {
+    struct Semaphore sem = boundedBuffer->fullCount;
+    goto sem->p(takeBoudnedBuffer1);
+}
+
+__code takeBoudnedBuffer1(struct BoundedBuffer* boundedBuffer, __code next(union Data* data, ...)) {
+    struct Semaphore sem = boundedBuffer->lock;
+    goto sem->p(takeBoudnedBuffer2);
+}
+
+__code takeBoudnedBuffer2(struct BoundedBuffer* boundedBuffer, __code next(union Data* data, ...)) {
+    struct Element* top = queue->top;
+    struct Element* nextElement = top->next;
+    data = nextElement->data;
+    queue->top = nextElement;
+    struct Semaphore sem = boundedBuffer->lock;
+    goto sem->v(takeBoudnedBuffer3);
+}
+
+__code takeBoudnedBuffer3(struct BoundedBuffer* boundedBuffer, __code next(union Data* data, ...)) {
+    struct Semaphore sem = boundedBuffer->emptyCount;
+    goto sem->v(takeBoudnedBuffer4);
+}
+
+__code takeBoudnedBuffer4(struct BoundedBuffer* boundedBuffer, __code next(union Data* data, ...)) {
+    goto next(data, ...);
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/parallel_execution/examples/boundedBuffer/SemaphoreImpl.cbc	Sat Dec 30 22:03:33 2017 +0900
@@ -0,0 +1,61 @@
+#include "../context.h"
+#interface "semaphore.h"
+
+Semaphore* createSemaphoreImpl(struct Context* context, int n) {
+    struct Semaphore* semaphore = new Semaphore();
+    struct SemaphoreImpl* semaphoreImpl = new SemaphoreImpl();
+    semaphore->semaphore = (union Data*)semaphoreImpl;
+    semaphoreImpl->value =  n;
+    semaphoreImpl->waitThreadQueue = createSynchronizedQueue(context);
+    pthread_mutex_init(&semaphoreImpl->mutex, NULL);
+    semaphore->p = C_pOperationSemaphoreImpl;
+    semaphore->v = C_vOperationSemaphoreImpl;
+    return semaphore;
+}
+
+__code pOperationSemaphoreImpl(struct SemaphoreImpl* semaphore, __code next(...)) {
+    pthread_mutex_lock(&semaphore->mutex);
+    goto meta(context, C_pOperationSemaphoreImpl1);
+}
+
+__code pOperationSemaphoreImpl1(struct SemaphoreImpl* semaphore, __code next(...)) {
+    if(semaphore->value == 0) {
+        context->next= C_pOperationSemaphoreImpl;
+        struct Queue* queue = semaphoreImpl->waitThreadQueue;
+        goto queue->put(context, pOperationSemaphoreImpl2); // put this context(thread, process)
+    }
+    semaphore->value--;
+    pthread_mutex_unlock(&semaphore->mutex);
+    goto next(...);
+}
+
+__code pOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, struct Worker* worker, __code next(...)) {
+    pthread_mutex_unlock(&semaphore->mutex);
+    goto worker->taskReceive(); // goto shceduler
+}
+
+__code pOperationSemaphoreImpl2_stub(sturct Context* context) {
+    // switch worker context
+    struct Context* workerContext = context->worker->worker->CPUWorker.context;
+    SemaphoreImpl* semaphoreImpl = (SemaphoreImpl*)GearImpl(context, Semaphore, semaphore);
+    goto odgCommitCPUWorker(workerContext,
+                            semaphoreImpl,
+                            context->worker,
+                            Gearef(context, Semaphore)->next);
+}
+
+__code vOperationSemaphoreImpl(struct SemaphoreImpl* semaphore, __code next(...)) {
+    pthread_mutex_lock(&semaphore->mutex);
+    semaphore->value++;
+    struct Queue* queue = semaphoreImpl->waitThreadQueue;
+    goto queue->take(context, vOperationSemaphoreImpl1);                        
+}
+
+__code vOperationSemaphoreImpl1(struct SemaphoreImpl* semaphore, __code next(...), struct Context* waitTask) {
+    goto taskManager->spawn(waitTask, vOperationSemaphoreImpl2); //notify
+}
+
+__code vOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, __code next(...), struct Context* waitTask) {
+    pthread_mutex_unlock(&semaphore->mutex);
+    goto next(...);
+}