Mercurial > hg > Members > Moririn
changeset 507:a7127917c736
SemaphoreImpl use spinlock
author | Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Wed, 03 Jan 2018 17:34:14 +0900 (2018-01-03) |
parents | 04441dd783c5 |
children | 64869af1f3ef |
files | src/parallel_execution/CMakeLists.txt src/parallel_execution/LockImpl.cbc src/parallel_execution/context.h src/parallel_execution/examples/boundedBuffer/BoundedBuffer.cbc src/parallel_execution/examples/boundedBuffer/SemaphoreImpl.cbc src/parallel_execution/examples/boundedBuffer/main.cbc |
diffstat | 6 files changed, 87 insertions(+), 61 deletions(-) [+] |
line wrap: on
line diff
--- a/src/parallel_execution/CMakeLists.txt Tue Jan 02 06:16:40 2018 +0900 +++ b/src/parallel_execution/CMakeLists.txt Wed Jan 03 17:34:14 2018 +0900 @@ -138,5 +138,5 @@ TARGET boundedBuffer SOURCES - examples/boundedBuffer/main.cbc examples/boundedBuffer/initBuffer.cbc examples/boundedBuffer/SemaphoreImpl.cbc examples/boundedBuffer/BoundedBuffer.cbc examples/boundedBuffer/consumer.cbc examples/boundedBuffer/producer.cbc CPUWorker.cbc TaskManagerImpl.cbc SingleLinkedQueue.cbc SynchronizedQueue.cbc MultiDimIterator.cbc AtomicReference.cbc + examples/boundedBuffer/main.cbc examples/boundedBuffer/initBuffer.cbc examples/boundedBuffer/SemaphoreImpl.cbc examples/boundedBuffer/BoundedBuffer.cbc examples/boundedBuffer/consumer.cbc examples/boundedBuffer/producer.cbc SpinLock.cbc CPUWorker.cbc TaskManagerImpl.cbc SingleLinkedQueue.cbc SynchronizedQueue.cbc MultiDimIterator.cbc AtomicReference.cbc )
--- a/src/parallel_execution/LockImpl.cbc Tue Jan 02 06:16:40 2018 +0900 +++ b/src/parallel_execution/LockImpl.cbc Wed Jan 03 17:34:14 2018 +0900 @@ -2,22 +2,24 @@ #interface "Queue.h" #interface "Atomic.h" #interface "Lock.h" +#interface "Worker.h" +#interface "TaskManager.h" Lock* createLockImpl(struct Context* context) { struct Lock* lock = new Lock(); struct LockImpl* lockImpl = new LockImpl(); lockImpl->lock = NULL; - lockImpl->waitThreadList = createSynchornizedQueue(context); + lockImpl->waitThreadQueue = createSynchronizedQueue(context); lockImpl->atomic = createAtomicReference(context); lock->lock = (union Data*)lockImpl; - lock->DoLock = C_doLockLockImpl; - lock->DoUnlock = C_doUnlockLockImpl; + lock->doLock = C_doLockLockImpl; + lock->doUnlock = C_doUnlockLockImpl; return lock; } __code doLockLockImpl(struct LockImpl* lock, __code next(...)) { - struct Atomic atomic = lock->atomic; - goto atomic->checkAndSet(&lockImpl->lock, NULL, 1, doLockLockImpl1, doLockLockImpl2); + struct Atomic* atomic = lock->atomic; + goto atomic->checkAndSet(&lock->lock, NULL, 1, doLockLockImpl1, doLockLockImpl2); } __code doLockLockImpl1(struct LockImpl* lock, __code next(...)) { @@ -26,8 +28,9 @@ } __code doLockLockImpl2(struct LockImpl* lock, __code next(...)) { - struct Queue queue = lock->waitThreadList; + struct Queue* queue = lock->waitThreadQueue; context->next= C_doLockLockImpl; + printf("Put task\n"); goto queue->put(context, doLockLockImpl3); } @@ -35,30 +38,32 @@ goto worker->taskReceive(); // goto shceduler } -__code doLockLockImpl3(struct Context* context) { +__code doLockLockImpl3_stub(struct Context* context) { // switch worker context struct Context* workerContext = context->worker->worker->CPUWorker.context; - LockImpl* lockImpl = (LockImpl*)GearImpl(context, Lock, lockImpl); + LockImpl* lockImpl = (LockImpl*)GearImpl(context, Lock, lock); goto doLockLockImpl3(workerContext, - lockImpl, - context->worker, - Gearef(context, Semaphore)->next); + lockImpl, + context->worker, + Gearef(context, Lock)->next); } __code doUnlockLockImpl(struct LockImpl* lock, __code next(...)) { if (lock->lockContext == context) { - goto atomic->checkAndSet(&lockImpl->lock, 1, NULL, doUnlockLockImpl1, next(...)); + struct Atomic* atomic = lock->atomic; + goto atomic->checkAndSet(&lock->lock, 1, NULL, doUnlockLockImpl1, doUnlockLockImpl); } goto next(...); } __code doUnlockLockImpl1(struct LockImpl* lock, __code next(...)) { struct Queue* queue = lock->waitThreadQueue; - goto queue->isEmpty(doUnlockLockImpl2, next(...)); + goto queue->isEmpty(doUnlockLockImpl2, doUnlockLockImpl4); } __code doUnlockLockImpl2(struct LockImpl* lock, __code next(...)) { struct Queue* queue = lock->waitThreadQueue; + printf("%p: Take task\n", lock); goto queue->take(doUnlockLockImpl3); } @@ -71,7 +76,11 @@ LockImpl* lockImpl = (LockImpl*)GearImpl(context, Lock, lock); struct Context* waitTask = &Gearef(context, Queue)->data->Context; goto doUnlockLockImpl3(context, - lockImpl, - waitTask, - Gearef(context, Lock)->next); + lockImpl, + waitTask, + Gearef(context, Lock)->next); } + +__code doUnlockLockImpl4(struct LockImpl* lock, __code next(...)) { + goto next(...); +}
--- a/src/parallel_execution/context.h Tue Jan 02 06:16:40 2018 +0900 +++ b/src/parallel_execution/context.h Wed Jan 03 17:34:14 2018 +0900 @@ -325,7 +325,7 @@ } Semaphore; struct SemaphoreImpl { int value; - pthread_mutex_t mutex; + struct Lock* lock; struct Queue* waitThreadQueue; } SemaphoreImpl; struct Allocate { @@ -417,14 +417,20 @@ struct Lock { union Data* lock; enum Code doLock; - enum Code doUnLock; + enum Code doUnlock; enum Code next; - } lock; + } Lock; struct LockImpl { Int* lock; - struct Queue waitThreadList; - struct Atomic atomic; - } lockImpl; + struct Queue* waitThreadQueue; + struct Atomic* atomic; + struct Context* lockContext; + } LockImpl; + struct SpinLock { + Int* lock; + struct Atomic* atomic; + struct Context* lockContext; + } SpinLock; }; // union Data end this is necessary for context generator typedef union Data Data;
--- a/src/parallel_execution/examples/boundedBuffer/BoundedBuffer.cbc Tue Jan 02 06:16:40 2018 +0900 +++ b/src/parallel_execution/examples/boundedBuffer/BoundedBuffer.cbc Wed Jan 03 17:34:14 2018 +0900 @@ -18,13 +18,13 @@ } __code putBoundedBuffer(struct BoundedBuffer* buffer, union Data* data, __code next(...)) { - struct Semaphore* sem = buffer->emptyCount; - goto sem->p(putBoundedBuffer1); + struct Semaphore* semaphore = buffer->emptyCount; + goto semaphore->p(putBoundedBuffer1); } __code putBoundedBuffer1(struct BoundedBuffer* buffer, union Data* data, __code next(...)) { - struct Semaphore* sem = buffer->lock; - goto sem->p(putBoundedBuffer2); + struct Semaphore* semaphore = buffer->lock; + goto semaphore->p(putBoundedBuffer2); } __code putBoundedBuffer2(struct BoundedBuffer* buffer, union Data* data, __code next(...)) { @@ -34,26 +34,26 @@ struct Element* last = buffer->last; last->next = element; buffer->last = element; - struct Semaphore* sem = buffer->lock; - goto sem->v(putBoundedBuffer3); + struct Semaphore* semaphore = buffer->lock; + goto semaphore->v(putBoundedBuffer3); } __code putBoundedBuffer3(struct BoundedBuffer* buffer, union Data* data, __code next(...)) { - struct Semaphore* sem = buffer->fullCount; - goto sem->v(putBoundedBuffer4); + struct Semaphore* semaphore = buffer->fullCount; + goto semaphore->v(putBoundedBuffer4); } __code putBoundedBuffer4(struct BoundedBuffer* buffer, union Data* data, __code next(...)) { goto next(...); } __code takeBoundedBuffer(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) { - struct Semaphore* sem = buffer->fullCount; - goto sem->p(takeBoundedBuffer1); + struct Semaphore* semaphore = buffer->fullCount; + goto semaphore->p(takeBoundedBuffer1); } __code takeBoundedBuffer1(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) { - struct Semaphore* sem = buffer->lock; - goto sem->p(takeBoundedBuffer2); + struct Semaphore* semaphore = buffer->lock; + goto semaphore->p(takeBoundedBuffer2); } __code takeBoundedBuffer2(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) { @@ -62,13 +62,13 @@ data = nextElement->data; *O_data =data; buffer->top = nextElement; - struct Semaphore* sem = buffer->lock; - goto sem->v(takeBoundedBuffer3); + struct Semaphore* semaphore = buffer->lock; + goto semaphore->v(takeBoundedBuffer3); } __code takeBoundedBuffer3(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) { - struct Semaphore* sem = buffer->emptyCount; - goto sem->v(takeBoundedBuffer4); + struct Semaphore* semaphore = buffer->emptyCount; + goto semaphore->v(takeBoundedBuffer4); } __code takeBoundedBuffer4(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) {
--- a/src/parallel_execution/examples/boundedBuffer/SemaphoreImpl.cbc Tue Jan 02 06:16:40 2018 +0900 +++ b/src/parallel_execution/examples/boundedBuffer/SemaphoreImpl.cbc Wed Jan 03 17:34:14 2018 +0900 @@ -2,22 +2,23 @@ #interface "Semaphore.h" #interface "Queue.h" #interface "TaskManager.h" +#interface "Lock.h" Semaphore* createSemaphoreImpl(struct Context* context, int n) { struct Semaphore* semaphore = new Semaphore(); struct SemaphoreImpl* semaphoreImpl = new SemaphoreImpl(); semaphore->semaphore = (union Data*)semaphoreImpl; semaphoreImpl->value = n; - semaphoreImpl->waitThreadQueue = createSynchronizedQueue(context); - pthread_mutex_init(&semaphoreImpl->mutex, NULL); + semaphoreImpl->waitThreadQueue = createSingleLinkedQueue(context); + semaphoreImpl->lock = createSpinLock(context); semaphore->p = C_pOperationSemaphoreImpl; semaphore->v = C_vOperationSemaphoreImpl; return semaphore; } __code pOperationSemaphoreImpl(struct SemaphoreImpl* semaphore, __code next(...)) { - pthread_mutex_lock(&semaphore->mutex); - goto pOperationSemaphoreImpl1(); + struct Lock* lock = semaphore->lock; + goto lock->doLock(pOperationSemaphoreImpl1); } __code pOperationSemaphoreImpl1(struct SemaphoreImpl* semaphore, __code next(...)) { @@ -27,52 +28,60 @@ goto queue->put(context, pOperationSemaphoreImpl2); // put this context(thread, process) } semaphore->value--; - pthread_mutex_unlock(&semaphore->mutex); - goto next(...); + struct Lock* lock = semaphore->lock; + goto lock->doUnlock(next(...)); } -__code pOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, struct Worker* worker, __code next(...)) { - pthread_mutex_unlock(&semaphore->mutex); +__code pOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, __code next(...)) { + struct Lock* lock = semaphore->lock; + goto lock->doUnlock(pOperationSemaphoreImpl3); +} + +__code pOperationSemaphoreImpl3(struct SemaphoreImpl* semaphore, struct Worker* worker, __code next(...)) { goto worker->taskReceive(); // goto shceduler } -__code pOperationSemaphoreImpl2_stub(struct Context* context) { +__code pOperationSemaphoreImpl3_stub(struct Context* context) { // switch worker context struct Context* workerContext = context->worker->worker->CPUWorker.context; SemaphoreImpl* semaphoreImpl = (SemaphoreImpl*)GearImpl(context, Semaphore, semaphore); - goto pOperationSemaphoreImpl2(workerContext, + goto pOperationSemaphoreImpl3(workerContext, semaphoreImpl, context->worker, Gearef(context, Semaphore)->next); } __code vOperationSemaphoreImpl(struct SemaphoreImpl* semaphore, __code next(...)) { - pthread_mutex_lock(&semaphore->mutex); - semaphore->value++; - struct Queue* queue = semaphore->waitThreadQueue; - goto queue->isEmpty(vOperationSemaphoreImpl1, vOperationSemaphoreImpl3); + struct Lock* lock = semaphore->lock; + goto lock->doLock(vOperationSemaphoreImpl1); } __code vOperationSemaphoreImpl1(struct SemaphoreImpl* semaphore, __code next(...)) { + semaphore->value++; struct Queue* queue = semaphore->waitThreadQueue; - goto queue->take(vOperationSemaphoreImpl2); + goto queue->isEmpty(vOperationSemaphoreImpl2, vOperationSemaphoreImpl4); +} + +__code vOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, __code next(...)) { + struct Queue* queue = semaphore->waitThreadQueue; + goto queue->take(vOperationSemaphoreImpl3); } -__code vOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, struct Context* waitTask, __code next(...)) { +__code vOperationSemaphoreImpl3(struct SemaphoreImpl* semaphore, struct Context* waitTask, __code next(...)) { struct TaskManager* taskManager = waitTask->taskManager; - goto taskManager->spawn(waitTask, vOperationSemaphoreImpl3); //notify + goto taskManager->spawn(waitTask, vOperationSemaphoreImpl4); //notify } -__code vOperationSemaphoreImpl2_stub(struct Context* context) { +__code vOperationSemaphoreImpl3_stub(struct Context* context) { SemaphoreImpl* semaphoreImpl = (SemaphoreImpl*)GearImpl(context, Semaphore, semaphore); struct Context* waitTask = &Gearef(context, Queue)->data->Context; - goto vOperationSemaphoreImpl2(context, + goto vOperationSemaphoreImpl3(context, semaphoreImpl, waitTask, Gearef(context, Semaphore)->next); } -__code vOperationSemaphoreImpl3(struct SemaphoreImpl* semaphore, __code next(...)) { - pthread_mutex_unlock(&semaphore->mutex); - goto next(...); +__code vOperationSemaphoreImpl4(struct SemaphoreImpl* semaphore, __code next(...)) { + struct Lock* lock = semaphore->lock; + goto lock->doUnlock(next(...)); }
--- a/src/parallel_execution/examples/boundedBuffer/main.cbc Tue Jan 02 06:16:40 2018 +0900 +++ b/src/parallel_execution/examples/boundedBuffer/main.cbc Wed Jan 03 17:34:14 2018 +0900 @@ -51,6 +51,8 @@ __code createTask1(struct TaskManager* taskManager) { struct Buffer* buffer = createBoundedBuffer(context, buffer_size); par goto producer(buffer, __exit); + par goto producer(buffer, __exit); + par goto consumer(buffer, __exit); par goto consumer(buffer, __exit); par goto initBuffer(buffer, __exit); goto code2();