changeset 507:a7127917c736

SemaphoreImpl use spinlock
author Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp>
date Wed, 03 Jan 2018 17:34:14 +0900
parents 04441dd783c5
children 64869af1f3ef
files src/parallel_execution/CMakeLists.txt src/parallel_execution/LockImpl.cbc src/parallel_execution/context.h src/parallel_execution/examples/boundedBuffer/BoundedBuffer.cbc src/parallel_execution/examples/boundedBuffer/SemaphoreImpl.cbc src/parallel_execution/examples/boundedBuffer/main.cbc
diffstat 6 files changed, 87 insertions(+), 61 deletions(-) [+]
line wrap: on
line diff
--- a/src/parallel_execution/CMakeLists.txt	Tue Jan 02 06:16:40 2018 +0900
+++ b/src/parallel_execution/CMakeLists.txt	Wed Jan 03 17:34:14 2018 +0900
@@ -138,5 +138,5 @@
   TARGET
       boundedBuffer
   SOURCES
-      examples/boundedBuffer/main.cbc examples/boundedBuffer/initBuffer.cbc examples/boundedBuffer/SemaphoreImpl.cbc examples/boundedBuffer/BoundedBuffer.cbc examples/boundedBuffer/consumer.cbc examples/boundedBuffer/producer.cbc CPUWorker.cbc TaskManagerImpl.cbc SingleLinkedQueue.cbc SynchronizedQueue.cbc MultiDimIterator.cbc AtomicReference.cbc
+  examples/boundedBuffer/main.cbc examples/boundedBuffer/initBuffer.cbc examples/boundedBuffer/SemaphoreImpl.cbc examples/boundedBuffer/BoundedBuffer.cbc examples/boundedBuffer/consumer.cbc examples/boundedBuffer/producer.cbc SpinLock.cbc CPUWorker.cbc TaskManagerImpl.cbc SingleLinkedQueue.cbc SynchronizedQueue.cbc MultiDimIterator.cbc AtomicReference.cbc
 )
--- a/src/parallel_execution/LockImpl.cbc	Tue Jan 02 06:16:40 2018 +0900
+++ b/src/parallel_execution/LockImpl.cbc	Wed Jan 03 17:34:14 2018 +0900
@@ -2,22 +2,24 @@
 #interface "Queue.h"
 #interface "Atomic.h"
 #interface "Lock.h"
+#interface "Worker.h"
+#interface "TaskManager.h"
 
 Lock* createLockImpl(struct Context* context) {
     struct Lock* lock = new Lock();
     struct LockImpl* lockImpl = new LockImpl();
     lockImpl->lock = NULL;
-    lockImpl->waitThreadList = createSynchornizedQueue(context);
+    lockImpl->waitThreadQueue = createSynchronizedQueue(context);
     lockImpl->atomic = createAtomicReference(context);
     lock->lock = (union Data*)lockImpl;
-    lock->DoLock = C_doLockLockImpl;
-    lock->DoUnlock = C_doUnlockLockImpl;
+    lock->doLock = C_doLockLockImpl;
+    lock->doUnlock = C_doUnlockLockImpl;
     return lock;
 }
 
 __code doLockLockImpl(struct LockImpl* lock, __code next(...)) {
-    struct Atomic atomic = lock->atomic;
-    goto atomic->checkAndSet(&lockImpl->lock, NULL, 1, doLockLockImpl1, doLockLockImpl2);
+    struct Atomic* atomic = lock->atomic;
+    goto atomic->checkAndSet(&lock->lock, NULL, 1, doLockLockImpl1, doLockLockImpl2);
 }
 
 __code doLockLockImpl1(struct LockImpl* lock, __code next(...)) {
@@ -26,8 +28,9 @@
 }
 
 __code doLockLockImpl2(struct LockImpl* lock, __code next(...)) {
-    struct Queue queue = lock->waitThreadList;
+    struct Queue* queue = lock->waitThreadQueue;
     context->next= C_doLockLockImpl;
+    printf("Put task\n");
     goto queue->put(context, doLockLockImpl3);
 }
 
@@ -35,30 +38,32 @@
     goto worker->taskReceive(); // goto shceduler
 }
 
-__code doLockLockImpl3(struct Context* context) {
+__code doLockLockImpl3_stub(struct Context* context) {
     // switch worker context
     struct Context* workerContext = context->worker->worker->CPUWorker.context;
-    LockImpl* lockImpl = (LockImpl*)GearImpl(context, Lock, lockImpl);
+    LockImpl* lockImpl = (LockImpl*)GearImpl(context, Lock, lock);
     goto doLockLockImpl3(workerContext,
-                         lockImpl,
-                         context->worker,
-                         Gearef(context, Semaphore)->next);
+            lockImpl,
+            context->worker,
+            Gearef(context, Lock)->next);
 }
 
 __code doUnlockLockImpl(struct LockImpl* lock, __code next(...)) {
     if (lock->lockContext == context) {
-        goto atomic->checkAndSet(&lockImpl->lock, 1, NULL, doUnlockLockImpl1, next(...));
+        struct Atomic* atomic = lock->atomic;
+        goto atomic->checkAndSet(&lock->lock, 1, NULL, doUnlockLockImpl1, doUnlockLockImpl);
     }
     goto next(...);
 }
 
 __code doUnlockLockImpl1(struct LockImpl* lock, __code next(...)) {
     struct Queue* queue = lock->waitThreadQueue;
-    goto queue->isEmpty(doUnlockLockImpl2, next(...));
+    goto queue->isEmpty(doUnlockLockImpl2, doUnlockLockImpl4);
 }
 
 __code doUnlockLockImpl2(struct LockImpl* lock, __code next(...)) {
     struct Queue* queue = lock->waitThreadQueue;
+    printf("%p: Take task\n", lock);
     goto queue->take(doUnlockLockImpl3);
 }
 
@@ -71,7 +76,11 @@
     LockImpl* lockImpl = (LockImpl*)GearImpl(context, Lock, lock);
     struct Context* waitTask = &Gearef(context, Queue)->data->Context;
     goto doUnlockLockImpl3(context,
-                           lockImpl,
-                           waitTask,
-                           Gearef(context, Lock)->next);
+            lockImpl,
+            waitTask,
+            Gearef(context, Lock)->next);
 }
+
+__code doUnlockLockImpl4(struct LockImpl* lock, __code next(...)) {
+    goto next(...);
+}
--- a/src/parallel_execution/context.h	Tue Jan 02 06:16:40 2018 +0900
+++ b/src/parallel_execution/context.h	Wed Jan 03 17:34:14 2018 +0900
@@ -325,7 +325,7 @@
     } Semaphore;
     struct SemaphoreImpl {
         int value;
-        pthread_mutex_t mutex;
+        struct Lock* lock;
         struct Queue* waitThreadQueue;
     } SemaphoreImpl;
     struct Allocate {
@@ -417,14 +417,20 @@
     struct Lock {
         union Data* lock;
         enum Code doLock;
-        enum Code doUnLock;
+        enum Code doUnlock;
         enum Code next;
-    } lock;
+    } Lock;
     struct LockImpl {
         Int* lock;
-        struct Queue waitThreadList;
-        struct Atomic atomic;
-    } lockImpl;
+        struct Queue* waitThreadQueue;
+        struct Atomic* atomic;
+        struct Context* lockContext;
+    } LockImpl;
+    struct SpinLock {
+        Int* lock;
+        struct Atomic* atomic;
+        struct Context* lockContext;
+    } SpinLock;
 }; // union Data end       this is necessary for context generator
 typedef union Data Data;
 
--- a/src/parallel_execution/examples/boundedBuffer/BoundedBuffer.cbc	Tue Jan 02 06:16:40 2018 +0900
+++ b/src/parallel_execution/examples/boundedBuffer/BoundedBuffer.cbc	Wed Jan 03 17:34:14 2018 +0900
@@ -18,13 +18,13 @@
 }
 
 __code putBoundedBuffer(struct BoundedBuffer* buffer, union Data* data, __code next(...)) {
-    struct Semaphore* sem = buffer->emptyCount;
-    goto sem->p(putBoundedBuffer1);
+    struct Semaphore* semaphore = buffer->emptyCount;
+    goto semaphore->p(putBoundedBuffer1);
 }
 
 __code putBoundedBuffer1(struct BoundedBuffer* buffer, union Data* data, __code next(...)) {
-    struct Semaphore* sem = buffer->lock;
-    goto sem->p(putBoundedBuffer2);
+    struct Semaphore* semaphore = buffer->lock;
+    goto semaphore->p(putBoundedBuffer2);
 }
 
 __code putBoundedBuffer2(struct BoundedBuffer* buffer, union Data* data, __code next(...)) {
@@ -34,26 +34,26 @@
     struct Element* last = buffer->last;
     last->next = element;
     buffer->last = element;
-    struct Semaphore* sem = buffer->lock;
-    goto sem->v(putBoundedBuffer3);
+    struct Semaphore* semaphore = buffer->lock;
+    goto semaphore->v(putBoundedBuffer3);
 }
 
 __code putBoundedBuffer3(struct BoundedBuffer* buffer, union Data* data, __code next(...)) {
-    struct Semaphore* sem = buffer->fullCount;
-    goto sem->v(putBoundedBuffer4);
+    struct Semaphore* semaphore = buffer->fullCount;
+    goto semaphore->v(putBoundedBuffer4);
 }
 
 __code putBoundedBuffer4(struct BoundedBuffer* buffer, union Data* data, __code next(...)) {
     goto next(...);
 }
 __code takeBoundedBuffer(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) {
-    struct Semaphore* sem = buffer->fullCount;
-    goto sem->p(takeBoundedBuffer1);
+    struct Semaphore* semaphore = buffer->fullCount;
+    goto semaphore->p(takeBoundedBuffer1);
 }
 
 __code takeBoundedBuffer1(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) {
-    struct Semaphore* sem = buffer->lock;
-    goto sem->p(takeBoundedBuffer2);
+    struct Semaphore* semaphore = buffer->lock;
+    goto semaphore->p(takeBoundedBuffer2);
 }
 
 __code takeBoundedBuffer2(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) {
@@ -62,13 +62,13 @@
     data = nextElement->data;
     *O_data =data;
     buffer->top = nextElement;
-    struct  Semaphore* sem = buffer->lock;
-    goto sem->v(takeBoundedBuffer3);
+    struct Semaphore* semaphore = buffer->lock;
+    goto semaphore->v(takeBoundedBuffer3);
 }
 
 __code takeBoundedBuffer3(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) {
-    struct Semaphore* sem = buffer->emptyCount;
-    goto sem->v(takeBoundedBuffer4);
+    struct Semaphore* semaphore = buffer->emptyCount;
+    goto semaphore->v(takeBoundedBuffer4);
 }
 
 __code takeBoundedBuffer4(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) {
--- a/src/parallel_execution/examples/boundedBuffer/SemaphoreImpl.cbc	Tue Jan 02 06:16:40 2018 +0900
+++ b/src/parallel_execution/examples/boundedBuffer/SemaphoreImpl.cbc	Wed Jan 03 17:34:14 2018 +0900
@@ -2,22 +2,23 @@
 #interface "Semaphore.h"
 #interface "Queue.h"
 #interface "TaskManager.h"
+#interface "Lock.h"
 
 Semaphore* createSemaphoreImpl(struct Context* context, int n) {
     struct Semaphore* semaphore = new Semaphore();
     struct SemaphoreImpl* semaphoreImpl = new SemaphoreImpl();
     semaphore->semaphore = (union Data*)semaphoreImpl;
     semaphoreImpl->value =  n;
-    semaphoreImpl->waitThreadQueue = createSynchronizedQueue(context);
-    pthread_mutex_init(&semaphoreImpl->mutex, NULL);
+    semaphoreImpl->waitThreadQueue = createSingleLinkedQueue(context);
+    semaphoreImpl->lock = createSpinLock(context);
     semaphore->p = C_pOperationSemaphoreImpl;
     semaphore->v = C_vOperationSemaphoreImpl;
     return semaphore;
 }
 
 __code pOperationSemaphoreImpl(struct SemaphoreImpl* semaphore, __code next(...)) {
-    pthread_mutex_lock(&semaphore->mutex);
-    goto pOperationSemaphoreImpl1();
+    struct Lock* lock = semaphore->lock;
+    goto lock->doLock(pOperationSemaphoreImpl1);
 }
 
 __code pOperationSemaphoreImpl1(struct SemaphoreImpl* semaphore, __code next(...)) {
@@ -27,52 +28,60 @@
         goto queue->put(context, pOperationSemaphoreImpl2); // put this context(thread, process)
     }
     semaphore->value--;
-    pthread_mutex_unlock(&semaphore->mutex);
-    goto next(...);
+    struct Lock* lock = semaphore->lock;
+    goto lock->doUnlock(next(...));
 }
 
-__code pOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, struct Worker* worker, __code next(...)) {
-    pthread_mutex_unlock(&semaphore->mutex);
+__code pOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, __code next(...)) {
+    struct Lock* lock = semaphore->lock;
+    goto lock->doUnlock(pOperationSemaphoreImpl3);
+}
+
+__code pOperationSemaphoreImpl3(struct SemaphoreImpl* semaphore, struct Worker* worker, __code next(...)) {
     goto worker->taskReceive(); // goto shceduler
 }
 
-__code pOperationSemaphoreImpl2_stub(struct Context* context) {
+__code pOperationSemaphoreImpl3_stub(struct Context* context) {
     // switch worker context
     struct Context* workerContext = context->worker->worker->CPUWorker.context;
     SemaphoreImpl* semaphoreImpl = (SemaphoreImpl*)GearImpl(context, Semaphore, semaphore);
-    goto pOperationSemaphoreImpl2(workerContext,
+    goto pOperationSemaphoreImpl3(workerContext,
                                   semaphoreImpl,
                                   context->worker,
                                   Gearef(context, Semaphore)->next);
 }
 
 __code vOperationSemaphoreImpl(struct SemaphoreImpl* semaphore, __code next(...)) {
-    pthread_mutex_lock(&semaphore->mutex);
-    semaphore->value++;
-    struct Queue* queue = semaphore->waitThreadQueue;
-    goto queue->isEmpty(vOperationSemaphoreImpl1, vOperationSemaphoreImpl3);
+    struct Lock* lock = semaphore->lock;
+    goto lock->doLock(vOperationSemaphoreImpl1);
 }
 
 __code vOperationSemaphoreImpl1(struct SemaphoreImpl* semaphore, __code next(...)) {
+    semaphore->value++;
     struct Queue* queue = semaphore->waitThreadQueue;
-    goto queue->take(vOperationSemaphoreImpl2);
+    goto queue->isEmpty(vOperationSemaphoreImpl2, vOperationSemaphoreImpl4);
+}
+
+__code vOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, __code next(...)) {
+    struct Queue* queue = semaphore->waitThreadQueue;
+    goto queue->take(vOperationSemaphoreImpl3);
 }
 
-__code vOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, struct Context* waitTask, __code next(...)) {
+__code vOperationSemaphoreImpl3(struct SemaphoreImpl* semaphore, struct Context* waitTask, __code next(...)) {
     struct TaskManager* taskManager = waitTask->taskManager;
-    goto taskManager->spawn(waitTask, vOperationSemaphoreImpl3); //notify
+    goto taskManager->spawn(waitTask, vOperationSemaphoreImpl4); //notify
 }
 
-__code vOperationSemaphoreImpl2_stub(struct Context* context) {
+__code vOperationSemaphoreImpl3_stub(struct Context* context) {
     SemaphoreImpl* semaphoreImpl = (SemaphoreImpl*)GearImpl(context, Semaphore, semaphore);
     struct Context* waitTask = &Gearef(context, Queue)->data->Context;
-    goto vOperationSemaphoreImpl2(context,
+    goto vOperationSemaphoreImpl3(context,
                                   semaphoreImpl,
                                   waitTask,
                                   Gearef(context, Semaphore)->next);
 }
 
-__code vOperationSemaphoreImpl3(struct SemaphoreImpl* semaphore, __code next(...)) {
-    pthread_mutex_unlock(&semaphore->mutex);
-    goto next(...);
+__code vOperationSemaphoreImpl4(struct SemaphoreImpl* semaphore, __code next(...)) {
+    struct Lock* lock = semaphore->lock;
+    goto lock->doUnlock(next(...));
 }
--- a/src/parallel_execution/examples/boundedBuffer/main.cbc	Tue Jan 02 06:16:40 2018 +0900
+++ b/src/parallel_execution/examples/boundedBuffer/main.cbc	Wed Jan 03 17:34:14 2018 +0900
@@ -51,6 +51,8 @@
 __code createTask1(struct TaskManager* taskManager) {
     struct Buffer* buffer = createBoundedBuffer(context, buffer_size);
     par goto producer(buffer, __exit);
+    par goto producer(buffer, __exit);
+    par goto consumer(buffer, __exit);
     par goto consumer(buffer, __exit);
     par goto initBuffer(buffer, __exit);
     goto code2();