changeset 510:647716041772

merge
author Shinji KONO <kono@ie.u-ryukyu.ac.jp>
date Wed, 03 Jan 2018 18:22:38 +0900
parents 51f0d5e5d1e5 (current diff) 64869af1f3ef (diff)
children 044c25475ed4
files
diffstat 8 files changed, 192 insertions(+), 40 deletions(-) [+]
line wrap: on
line diff
--- a/src/parallel_execution/CMakeLists.txt	Wed Jan 03 18:22:14 2018 +0900
+++ b/src/parallel_execution/CMakeLists.txt	Wed Jan 03 18:22:38 2018 +0900
@@ -138,5 +138,5 @@
   TARGET
       boundedBuffer
   SOURCES
-      examples/boundedBuffer/main.cbc examples/boundedBuffer/initBuffer.cbc examples/boundedBuffer/SemaphoreImpl.cbc examples/boundedBuffer/BoundedBuffer.cbc examples/boundedBuffer/consumer.cbc examples/boundedBuffer/producer.cbc CPUWorker.cbc TaskManagerImpl.cbc SingleLinkedQueue.cbc SynchronizedQueue.cbc MultiDimIterator.cbc AtomicReference.cbc
+  examples/boundedBuffer/main.cbc examples/boundedBuffer/initBuffer.cbc examples/boundedBuffer/SemaphoreImpl.cbc examples/boundedBuffer/BoundedBuffer.cbc examples/boundedBuffer/consumer.cbc examples/boundedBuffer/producer.cbc SpinLock.cbc CPUWorker.cbc TaskManagerImpl.cbc SingleLinkedQueue.cbc SynchronizedQueue.cbc MultiDimIterator.cbc AtomicReference.cbc
 )
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/parallel_execution/Lock.h	Wed Jan 03 18:22:38 2018 +0900
@@ -0,0 +1,6 @@
+typedef struct Lock<Impl>{
+        union Data* lock;
+        __code doLock(Impl* lock, __code next(...)); 
+        __code doUnlock(Impl* lock, __code next(...)); 
+        __code next(...);
+} Lock;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/parallel_execution/LockImpl.cbc	Wed Jan 03 18:22:38 2018 +0900
@@ -0,0 +1,86 @@
+#include "../context.h"
+#interface "Queue.h"
+#interface "Atomic.h"
+#interface "Lock.h"
+#interface "Worker.h"
+#interface "TaskManager.h"
+
+Lock* createLockImpl(struct Context* context) {
+    struct Lock* lock = new Lock();
+    struct LockImpl* lockImpl = new LockImpl();
+    lockImpl->lock = NULL;
+    lockImpl->waitThreadQueue = createSynchronizedQueue(context);
+    lockImpl->atomic = createAtomicReference(context);
+    lock->lock = (union Data*)lockImpl;
+    lock->doLock = C_doLockLockImpl;
+    lock->doUnlock = C_doUnlockLockImpl;
+    return lock;
+}
+
+__code doLockLockImpl(struct LockImpl* lock, __code next(...)) {
+    struct Atomic* atomic = lock->atomic;
+    goto atomic->checkAndSet(&lock->lock, NULL, 1, doLockLockImpl1, doLockLockImpl2);
+}
+
+__code doLockLockImpl1(struct LockImpl* lock, __code next(...)) {
+    lock->lockContext = context;
+    goto next(...);
+}
+
+__code doLockLockImpl2(struct LockImpl* lock, __code next(...)) {
+    struct Queue* queue = lock->waitThreadQueue;
+    context->next= C_doLockLockImpl;
+    printf("Put task\n");
+    goto queue->put(context, doLockLockImpl3);
+}
+
+__code doLockLockImpl3(struct LockImpl* lock, struct Worker* worker, __code next(...)) {
+    goto worker->taskReceive(); // goto shceduler
+}
+
+__code doLockLockImpl3_stub(struct Context* context) {
+    // switch worker context
+    struct Context* workerContext = context->worker->worker->CPUWorker.context;
+    LockImpl* lockImpl = (LockImpl*)GearImpl(context, Lock, lock);
+    goto doLockLockImpl3(workerContext,
+            lockImpl,
+            context->worker,
+            Gearef(context, Lock)->next);
+}
+
+__code doUnlockLockImpl(struct LockImpl* lock, __code next(...)) {
+    if (lock->lockContext == context) {
+        struct Atomic* atomic = lock->atomic;
+        goto atomic->checkAndSet(&lock->lock, 1, NULL, doUnlockLockImpl1, doUnlockLockImpl);
+    }
+    goto next(...);
+}
+
+__code doUnlockLockImpl1(struct LockImpl* lock, __code next(...)) {
+    struct Queue* queue = lock->waitThreadQueue;
+    goto queue->isEmpty(doUnlockLockImpl2, doUnlockLockImpl4);
+}
+
+__code doUnlockLockImpl2(struct LockImpl* lock, __code next(...)) {
+    struct Queue* queue = lock->waitThreadQueue;
+    printf("%p: Take task\n", lock);
+    goto queue->take(doUnlockLockImpl3);
+}
+
+__code doUnlockLockImpl3(struct LockImpl* lock, struct Context* waitTask, __code next(...)) {
+    struct TaskManager* taskManager = waitTask->taskManager;
+    goto taskManager->spawn(waitTask, next(...)); //notify
+}
+
+__code doUnlockLockImpl3_stub(struct Context* context) {
+    LockImpl* lockImpl = (LockImpl*)GearImpl(context, Lock, lock);
+    struct Context* waitTask = &Gearef(context, Queue)->data->Context;
+    goto doUnlockLockImpl3(context,
+            lockImpl,
+            waitTask,
+            Gearef(context, Lock)->next);
+}
+
+__code doUnlockLockImpl4(struct LockImpl* lock, __code next(...)) {
+    goto next(...);
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/parallel_execution/SpinLock.cbc	Wed Jan 03 18:22:38 2018 +0900
@@ -0,0 +1,32 @@
+#include "../context.h"
+#interface "Atomic.h"
+#interface "Lock.h"
+
+Lock* createSpinLock(struct Context* context) {
+    struct Lock* lock = new Lock();
+    struct SpinLock* spinLock = new SpinLock();
+    spinLock->lock = NULL;
+    spinLock->atomic = createAtomicReference(context);
+    lock->lock = (union Data*)spinLock;
+    lock->doLock = C_doLockSpinLock;
+    lock->doUnlock = C_doUnlockSpinLock;
+    return lock;
+}
+
+__code doLockSpinLock(struct SpinLock* lock, __code next(...)) {
+    struct Atomic* atomic = lock->atomic;
+    goto atomic->checkAndSet(&lock->lock, NULL, 1, doLockSpinLock1, doLockSpinLock);
+}
+
+__code doLockSpinLock1(struct SpinLock* lock, __code next(...)) {
+    lock->lockContext = context;
+    goto next(...);
+}
+
+__code doUnlockSpinLock(struct SpinLock* lock, __code next(...)) {
+    if (lock->lockContext == context) {
+        struct Atomic* atomic = lock->atomic;
+        goto atomic->checkAndSet(&lock->lock, 1, NULL, next(...), doUnlockSpinLock);
+    }
+    goto next(...);
+}
--- a/src/parallel_execution/context.h	Wed Jan 03 18:22:14 2018 +0900
+++ b/src/parallel_execution/context.h	Wed Jan 03 18:22:38 2018 +0900
@@ -325,7 +325,7 @@
     } Semaphore;
     struct SemaphoreImpl {
         int value;
-        pthread_mutex_t mutex;
+        struct Lock* lock;
         struct Queue* waitThreadQueue;
     } SemaphoreImpl;
     struct Allocate {
@@ -414,6 +414,23 @@
         struct Semaphore* emptyCount;
         struct Semaphore* lock;
     } BoundedBuffer;
+    struct Lock {
+        union Data* lock;
+        enum Code doLock;
+        enum Code doUnlock;
+        enum Code next;
+    } Lock;
+    struct LockImpl {
+        Int* lock;
+        struct Queue* waitThreadQueue;
+        struct Atomic* atomic;
+        struct Context* lockContext;
+    } LockImpl;
+    struct SpinLock {
+        Int* lock;
+        struct Atomic* atomic;
+        struct Context* lockContext;
+    } SpinLock;
 }; // union Data end       this is necessary for context generator
 typedef union Data Data;
 
--- a/src/parallel_execution/examples/boundedBuffer/BoundedBuffer.cbc	Wed Jan 03 18:22:14 2018 +0900
+++ b/src/parallel_execution/examples/boundedBuffer/BoundedBuffer.cbc	Wed Jan 03 18:22:38 2018 +0900
@@ -18,13 +18,13 @@
 }
 
 __code putBoundedBuffer(struct BoundedBuffer* buffer, union Data* data, __code next(...)) {
-    struct Semaphore* sem = buffer->emptyCount;
-    goto sem->p(putBoundedBuffer1);
+    struct Semaphore* semaphore = buffer->emptyCount;
+    goto semaphore->p(putBoundedBuffer1);
 }
 
 __code putBoundedBuffer1(struct BoundedBuffer* buffer, union Data* data, __code next(...)) {
-    struct Semaphore* sem = buffer->lock;
-    goto sem->p(putBoundedBuffer2);
+    struct Semaphore* semaphore = buffer->lock;
+    goto semaphore->p(putBoundedBuffer2);
 }
 
 __code putBoundedBuffer2(struct BoundedBuffer* buffer, union Data* data, __code next(...)) {
@@ -34,26 +34,26 @@
     struct Element* last = buffer->last;
     last->next = element;
     buffer->last = element;
-    struct Semaphore* sem = buffer->lock;
-    goto sem->v(putBoundedBuffer3);
+    struct Semaphore* semaphore = buffer->lock;
+    goto semaphore->v(putBoundedBuffer3);
 }
 
 __code putBoundedBuffer3(struct BoundedBuffer* buffer, union Data* data, __code next(...)) {
-    struct Semaphore* sem = buffer->fullCount;
-    goto sem->v(putBoundedBuffer4);
+    struct Semaphore* semaphore = buffer->fullCount;
+    goto semaphore->v(putBoundedBuffer4);
 }
 
 __code putBoundedBuffer4(struct BoundedBuffer* buffer, union Data* data, __code next(...)) {
     goto next(...);
 }
 __code takeBoundedBuffer(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) {
-    struct Semaphore* sem = buffer->fullCount;
-    goto sem->p(takeBoundedBuffer1);
+    struct Semaphore* semaphore = buffer->fullCount;
+    goto semaphore->p(takeBoundedBuffer1);
 }
 
 __code takeBoundedBuffer1(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) {
-    struct Semaphore* sem = buffer->lock;
-    goto sem->p(takeBoundedBuffer2);
+    struct Semaphore* semaphore = buffer->lock;
+    goto semaphore->p(takeBoundedBuffer2);
 }
 
 __code takeBoundedBuffer2(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) {
@@ -62,13 +62,13 @@
     data = nextElement->data;
     *O_data =data;
     buffer->top = nextElement;
-    struct  Semaphore* sem = buffer->lock;
-    goto sem->v(takeBoundedBuffer3);
+    struct Semaphore* semaphore = buffer->lock;
+    goto semaphore->v(takeBoundedBuffer3);
 }
 
 __code takeBoundedBuffer3(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) {
-    struct Semaphore* sem = buffer->emptyCount;
-    goto sem->v(takeBoundedBuffer4);
+    struct Semaphore* semaphore = buffer->emptyCount;
+    goto semaphore->v(takeBoundedBuffer4);
 }
 
 __code takeBoundedBuffer4(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) {
--- a/src/parallel_execution/examples/boundedBuffer/SemaphoreImpl.cbc	Wed Jan 03 18:22:14 2018 +0900
+++ b/src/parallel_execution/examples/boundedBuffer/SemaphoreImpl.cbc	Wed Jan 03 18:22:38 2018 +0900
@@ -2,22 +2,23 @@
 #interface "Semaphore.h"
 #interface "Queue.h"
 #interface "TaskManager.h"
+#interface "Lock.h"
 
 Semaphore* createSemaphoreImpl(struct Context* context, int n) {
     struct Semaphore* semaphore = new Semaphore();
     struct SemaphoreImpl* semaphoreImpl = new SemaphoreImpl();
     semaphore->semaphore = (union Data*)semaphoreImpl;
     semaphoreImpl->value =  n;
-    semaphoreImpl->waitThreadQueue = createSynchronizedQueue(context);
-    pthread_mutex_init(&semaphoreImpl->mutex, NULL);
+    semaphoreImpl->waitThreadQueue = createSingleLinkedQueue(context);
+    semaphoreImpl->lock = createSpinLock(context);
     semaphore->p = C_pOperationSemaphoreImpl;
     semaphore->v = C_vOperationSemaphoreImpl;
     return semaphore;
 }
 
 __code pOperationSemaphoreImpl(struct SemaphoreImpl* semaphore, __code next(...)) {
-    pthread_mutex_lock(&semaphore->mutex);
-    goto pOperationSemaphoreImpl1();
+    struct Lock* lock = semaphore->lock;
+    goto lock->doLock(pOperationSemaphoreImpl1);
 }
 
 __code pOperationSemaphoreImpl1(struct SemaphoreImpl* semaphore, __code next(...)) {
@@ -27,52 +28,60 @@
         goto queue->put(context, pOperationSemaphoreImpl2); // put this context(thread, process)
     }
     semaphore->value--;
-    pthread_mutex_unlock(&semaphore->mutex);
-    goto next(...);
+    struct Lock* lock = semaphore->lock;
+    goto lock->doUnlock(next(...));
 }
 
-__code pOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, struct Worker* worker, __code next(...)) {
-    pthread_mutex_unlock(&semaphore->mutex);
+__code pOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, __code next(...)) {
+    struct Lock* lock = semaphore->lock;
+    goto lock->doUnlock(pOperationSemaphoreImpl3);
+}
+
+__code pOperationSemaphoreImpl3(struct SemaphoreImpl* semaphore, struct Worker* worker, __code next(...)) {
     goto worker->taskReceive(); // goto shceduler
 }
 
-__code pOperationSemaphoreImpl2_stub(struct Context* context) {
+__code pOperationSemaphoreImpl3_stub(struct Context* context) {
     // switch worker context
     struct Context* workerContext = context->worker->worker->CPUWorker.context;
     SemaphoreImpl* semaphoreImpl = (SemaphoreImpl*)GearImpl(context, Semaphore, semaphore);
-    goto pOperationSemaphoreImpl2(workerContext,
+    goto pOperationSemaphoreImpl3(workerContext,
                                   semaphoreImpl,
                                   context->worker,
                                   Gearef(context, Semaphore)->next);
 }
 
 __code vOperationSemaphoreImpl(struct SemaphoreImpl* semaphore, __code next(...)) {
-    pthread_mutex_lock(&semaphore->mutex);
-    semaphore->value++;
-    struct Queue* queue = semaphore->waitThreadQueue;
-    goto queue->isEmpty(vOperationSemaphoreImpl1, vOperationSemaphoreImpl3);
+    struct Lock* lock = semaphore->lock;
+    goto lock->doLock(vOperationSemaphoreImpl1);
 }
 
 __code vOperationSemaphoreImpl1(struct SemaphoreImpl* semaphore, __code next(...)) {
+    semaphore->value++;
     struct Queue* queue = semaphore->waitThreadQueue;
-    goto queue->take(vOperationSemaphoreImpl2);
+    goto queue->isEmpty(vOperationSemaphoreImpl2, vOperationSemaphoreImpl4);
+}
+
+__code vOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, __code next(...)) {
+    struct Queue* queue = semaphore->waitThreadQueue;
+    goto queue->take(vOperationSemaphoreImpl3);
 }
 
-__code vOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, struct Context* waitTask, __code next(...)) {
+__code vOperationSemaphoreImpl3(struct SemaphoreImpl* semaphore, struct Context* waitTask, __code next(...)) {
     struct TaskManager* taskManager = waitTask->taskManager;
-    goto taskManager->spawn(waitTask, vOperationSemaphoreImpl3); //notify
+    goto taskManager->spawn(waitTask, vOperationSemaphoreImpl4); //notify
 }
 
-__code vOperationSemaphoreImpl2_stub(struct Context* context) {
+__code vOperationSemaphoreImpl3_stub(struct Context* context) {
     SemaphoreImpl* semaphoreImpl = (SemaphoreImpl*)GearImpl(context, Semaphore, semaphore);
     struct Context* waitTask = &Gearef(context, Queue)->data->Context;
-    goto vOperationSemaphoreImpl2(context,
+    goto vOperationSemaphoreImpl3(context,
                                   semaphoreImpl,
                                   waitTask,
                                   Gearef(context, Semaphore)->next);
 }
 
-__code vOperationSemaphoreImpl3(struct SemaphoreImpl* semaphore, __code next(...)) {
-    pthread_mutex_unlock(&semaphore->mutex);
-    goto next(...);
+__code vOperationSemaphoreImpl4(struct SemaphoreImpl* semaphore, __code next(...)) {
+    struct Lock* lock = semaphore->lock;
+    goto lock->doUnlock(next(...));
 }
--- a/src/parallel_execution/examples/boundedBuffer/main.cbc	Wed Jan 03 18:22:14 2018 +0900
+++ b/src/parallel_execution/examples/boundedBuffer/main.cbc	Wed Jan 03 18:22:38 2018 +0900
@@ -51,6 +51,8 @@
 __code createTask1(struct TaskManager* taskManager) {
     struct Buffer* buffer = createBoundedBuffer(context, buffer_size);
     par goto producer(buffer, __exit);
+    par goto producer(buffer, __exit);
+    par goto consumer(buffer, __exit);
     par goto consumer(buffer, __exit);
     par goto initBuffer(buffer, __exit);
     goto code2();