Mercurial > hg > GearsTemplate
changeset 511:647716041772
merge
author | Shinji KONO <kono@ie.u-ryukyu.ac.jp> |
---|---|
date | Wed, 03 Jan 2018 18:22:38 +0900 |
parents | 51f0d5e5d1e5 (current diff) 64869af1f3ef (diff) |
children | 044c25475ed4 |
files | |
diffstat | 8 files changed, 192 insertions(+), 40 deletions(-) [+] |
line wrap: on
line diff
--- a/src/parallel_execution/CMakeLists.txt Wed Jan 03 18:22:14 2018 +0900 +++ b/src/parallel_execution/CMakeLists.txt Wed Jan 03 18:22:38 2018 +0900 @@ -138,5 +138,5 @@ TARGET boundedBuffer SOURCES - examples/boundedBuffer/main.cbc examples/boundedBuffer/initBuffer.cbc examples/boundedBuffer/SemaphoreImpl.cbc examples/boundedBuffer/BoundedBuffer.cbc examples/boundedBuffer/consumer.cbc examples/boundedBuffer/producer.cbc CPUWorker.cbc TaskManagerImpl.cbc SingleLinkedQueue.cbc SynchronizedQueue.cbc MultiDimIterator.cbc AtomicReference.cbc + examples/boundedBuffer/main.cbc examples/boundedBuffer/initBuffer.cbc examples/boundedBuffer/SemaphoreImpl.cbc examples/boundedBuffer/BoundedBuffer.cbc examples/boundedBuffer/consumer.cbc examples/boundedBuffer/producer.cbc SpinLock.cbc CPUWorker.cbc TaskManagerImpl.cbc SingleLinkedQueue.cbc SynchronizedQueue.cbc MultiDimIterator.cbc AtomicReference.cbc )
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/parallel_execution/Lock.h Wed Jan 03 18:22:38 2018 +0900 @@ -0,0 +1,6 @@ +typedef struct Lock<Impl>{ + union Data* lock; + __code doLock(Impl* lock, __code next(...)); + __code doUnlock(Impl* lock, __code next(...)); + __code next(...); +} Lock;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/parallel_execution/LockImpl.cbc Wed Jan 03 18:22:38 2018 +0900 @@ -0,0 +1,86 @@ +#include "../context.h" +#interface "Queue.h" +#interface "Atomic.h" +#interface "Lock.h" +#interface "Worker.h" +#interface "TaskManager.h" + +Lock* createLockImpl(struct Context* context) { + struct Lock* lock = new Lock(); + struct LockImpl* lockImpl = new LockImpl(); + lockImpl->lock = NULL; + lockImpl->waitThreadQueue = createSynchronizedQueue(context); + lockImpl->atomic = createAtomicReference(context); + lock->lock = (union Data*)lockImpl; + lock->doLock = C_doLockLockImpl; + lock->doUnlock = C_doUnlockLockImpl; + return lock; +} + +__code doLockLockImpl(struct LockImpl* lock, __code next(...)) { + struct Atomic* atomic = lock->atomic; + goto atomic->checkAndSet(&lock->lock, NULL, 1, doLockLockImpl1, doLockLockImpl2); +} + +__code doLockLockImpl1(struct LockImpl* lock, __code next(...)) { + lock->lockContext = context; + goto next(...); +} + +__code doLockLockImpl2(struct LockImpl* lock, __code next(...)) { + struct Queue* queue = lock->waitThreadQueue; + context->next= C_doLockLockImpl; + printf("Put task\n"); + goto queue->put(context, doLockLockImpl3); +} + +__code doLockLockImpl3(struct LockImpl* lock, struct Worker* worker, __code next(...)) { + goto worker->taskReceive(); // goto shceduler +} + +__code doLockLockImpl3_stub(struct Context* context) { + // switch worker context + struct Context* workerContext = context->worker->worker->CPUWorker.context; + LockImpl* lockImpl = (LockImpl*)GearImpl(context, Lock, lock); + goto doLockLockImpl3(workerContext, + lockImpl, + context->worker, + Gearef(context, Lock)->next); +} + +__code doUnlockLockImpl(struct LockImpl* lock, __code next(...)) { + if (lock->lockContext == context) { + struct Atomic* atomic = lock->atomic; + goto atomic->checkAndSet(&lock->lock, 1, NULL, doUnlockLockImpl1, doUnlockLockImpl); + } + goto next(...); +} + +__code doUnlockLockImpl1(struct LockImpl* lock, __code next(...)) { + struct Queue* queue = lock->waitThreadQueue; + goto queue->isEmpty(doUnlockLockImpl2, doUnlockLockImpl4); +} + +__code doUnlockLockImpl2(struct LockImpl* lock, __code next(...)) { + struct Queue* queue = lock->waitThreadQueue; + printf("%p: Take task\n", lock); + goto queue->take(doUnlockLockImpl3); +} + +__code doUnlockLockImpl3(struct LockImpl* lock, struct Context* waitTask, __code next(...)) { + struct TaskManager* taskManager = waitTask->taskManager; + goto taskManager->spawn(waitTask, next(...)); //notify +} + +__code doUnlockLockImpl3_stub(struct Context* context) { + LockImpl* lockImpl = (LockImpl*)GearImpl(context, Lock, lock); + struct Context* waitTask = &Gearef(context, Queue)->data->Context; + goto doUnlockLockImpl3(context, + lockImpl, + waitTask, + Gearef(context, Lock)->next); +} + +__code doUnlockLockImpl4(struct LockImpl* lock, __code next(...)) { + goto next(...); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/parallel_execution/SpinLock.cbc Wed Jan 03 18:22:38 2018 +0900 @@ -0,0 +1,32 @@ +#include "../context.h" +#interface "Atomic.h" +#interface "Lock.h" + +Lock* createSpinLock(struct Context* context) { + struct Lock* lock = new Lock(); + struct SpinLock* spinLock = new SpinLock(); + spinLock->lock = NULL; + spinLock->atomic = createAtomicReference(context); + lock->lock = (union Data*)spinLock; + lock->doLock = C_doLockSpinLock; + lock->doUnlock = C_doUnlockSpinLock; + return lock; +} + +__code doLockSpinLock(struct SpinLock* lock, __code next(...)) { + struct Atomic* atomic = lock->atomic; + goto atomic->checkAndSet(&lock->lock, NULL, 1, doLockSpinLock1, doLockSpinLock); +} + +__code doLockSpinLock1(struct SpinLock* lock, __code next(...)) { + lock->lockContext = context; + goto next(...); +} + +__code doUnlockSpinLock(struct SpinLock* lock, __code next(...)) { + if (lock->lockContext == context) { + struct Atomic* atomic = lock->atomic; + goto atomic->checkAndSet(&lock->lock, 1, NULL, next(...), doUnlockSpinLock); + } + goto next(...); +}
--- a/src/parallel_execution/context.h Wed Jan 03 18:22:14 2018 +0900 +++ b/src/parallel_execution/context.h Wed Jan 03 18:22:38 2018 +0900 @@ -325,7 +325,7 @@ } Semaphore; struct SemaphoreImpl { int value; - pthread_mutex_t mutex; + struct Lock* lock; struct Queue* waitThreadQueue; } SemaphoreImpl; struct Allocate { @@ -414,6 +414,23 @@ struct Semaphore* emptyCount; struct Semaphore* lock; } BoundedBuffer; + struct Lock { + union Data* lock; + enum Code doLock; + enum Code doUnlock; + enum Code next; + } Lock; + struct LockImpl { + Int* lock; + struct Queue* waitThreadQueue; + struct Atomic* atomic; + struct Context* lockContext; + } LockImpl; + struct SpinLock { + Int* lock; + struct Atomic* atomic; + struct Context* lockContext; + } SpinLock; }; // union Data end this is necessary for context generator typedef union Data Data;
--- a/src/parallel_execution/examples/boundedBuffer/BoundedBuffer.cbc Wed Jan 03 18:22:14 2018 +0900 +++ b/src/parallel_execution/examples/boundedBuffer/BoundedBuffer.cbc Wed Jan 03 18:22:38 2018 +0900 @@ -18,13 +18,13 @@ } __code putBoundedBuffer(struct BoundedBuffer* buffer, union Data* data, __code next(...)) { - struct Semaphore* sem = buffer->emptyCount; - goto sem->p(putBoundedBuffer1); + struct Semaphore* semaphore = buffer->emptyCount; + goto semaphore->p(putBoundedBuffer1); } __code putBoundedBuffer1(struct BoundedBuffer* buffer, union Data* data, __code next(...)) { - struct Semaphore* sem = buffer->lock; - goto sem->p(putBoundedBuffer2); + struct Semaphore* semaphore = buffer->lock; + goto semaphore->p(putBoundedBuffer2); } __code putBoundedBuffer2(struct BoundedBuffer* buffer, union Data* data, __code next(...)) { @@ -34,26 +34,26 @@ struct Element* last = buffer->last; last->next = element; buffer->last = element; - struct Semaphore* sem = buffer->lock; - goto sem->v(putBoundedBuffer3); + struct Semaphore* semaphore = buffer->lock; + goto semaphore->v(putBoundedBuffer3); } __code putBoundedBuffer3(struct BoundedBuffer* buffer, union Data* data, __code next(...)) { - struct Semaphore* sem = buffer->fullCount; - goto sem->v(putBoundedBuffer4); + struct Semaphore* semaphore = buffer->fullCount; + goto semaphore->v(putBoundedBuffer4); } __code putBoundedBuffer4(struct BoundedBuffer* buffer, union Data* data, __code next(...)) { goto next(...); } __code takeBoundedBuffer(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) { - struct Semaphore* sem = buffer->fullCount; - goto sem->p(takeBoundedBuffer1); + struct Semaphore* semaphore = buffer->fullCount; + goto semaphore->p(takeBoundedBuffer1); } __code takeBoundedBuffer1(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) { - struct Semaphore* sem = buffer->lock; - goto sem->p(takeBoundedBuffer2); + struct Semaphore* semaphore = buffer->lock; + goto semaphore->p(takeBoundedBuffer2); } __code takeBoundedBuffer2(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) { @@ -62,13 +62,13 @@ data = nextElement->data; *O_data =data; buffer->top = nextElement; - struct Semaphore* sem = buffer->lock; - goto sem->v(takeBoundedBuffer3); + struct Semaphore* semaphore = buffer->lock; + goto semaphore->v(takeBoundedBuffer3); } __code takeBoundedBuffer3(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) { - struct Semaphore* sem = buffer->emptyCount; - goto sem->v(takeBoundedBuffer4); + struct Semaphore* semaphore = buffer->emptyCount; + goto semaphore->v(takeBoundedBuffer4); } __code takeBoundedBuffer4(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) {
--- a/src/parallel_execution/examples/boundedBuffer/SemaphoreImpl.cbc Wed Jan 03 18:22:14 2018 +0900 +++ b/src/parallel_execution/examples/boundedBuffer/SemaphoreImpl.cbc Wed Jan 03 18:22:38 2018 +0900 @@ -2,22 +2,23 @@ #interface "Semaphore.h" #interface "Queue.h" #interface "TaskManager.h" +#interface "Lock.h" Semaphore* createSemaphoreImpl(struct Context* context, int n) { struct Semaphore* semaphore = new Semaphore(); struct SemaphoreImpl* semaphoreImpl = new SemaphoreImpl(); semaphore->semaphore = (union Data*)semaphoreImpl; semaphoreImpl->value = n; - semaphoreImpl->waitThreadQueue = createSynchronizedQueue(context); - pthread_mutex_init(&semaphoreImpl->mutex, NULL); + semaphoreImpl->waitThreadQueue = createSingleLinkedQueue(context); + semaphoreImpl->lock = createSpinLock(context); semaphore->p = C_pOperationSemaphoreImpl; semaphore->v = C_vOperationSemaphoreImpl; return semaphore; } __code pOperationSemaphoreImpl(struct SemaphoreImpl* semaphore, __code next(...)) { - pthread_mutex_lock(&semaphore->mutex); - goto pOperationSemaphoreImpl1(); + struct Lock* lock = semaphore->lock; + goto lock->doLock(pOperationSemaphoreImpl1); } __code pOperationSemaphoreImpl1(struct SemaphoreImpl* semaphore, __code next(...)) { @@ -27,52 +28,60 @@ goto queue->put(context, pOperationSemaphoreImpl2); // put this context(thread, process) } semaphore->value--; - pthread_mutex_unlock(&semaphore->mutex); - goto next(...); + struct Lock* lock = semaphore->lock; + goto lock->doUnlock(next(...)); } -__code pOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, struct Worker* worker, __code next(...)) { - pthread_mutex_unlock(&semaphore->mutex); +__code pOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, __code next(...)) { + struct Lock* lock = semaphore->lock; + goto lock->doUnlock(pOperationSemaphoreImpl3); +} + +__code pOperationSemaphoreImpl3(struct SemaphoreImpl* semaphore, struct Worker* worker, __code next(...)) { goto worker->taskReceive(); // goto shceduler } -__code pOperationSemaphoreImpl2_stub(struct Context* context) { +__code pOperationSemaphoreImpl3_stub(struct Context* context) { // switch worker context struct Context* workerContext = context->worker->worker->CPUWorker.context; SemaphoreImpl* semaphoreImpl = (SemaphoreImpl*)GearImpl(context, Semaphore, semaphore); - goto pOperationSemaphoreImpl2(workerContext, + goto pOperationSemaphoreImpl3(workerContext, semaphoreImpl, context->worker, Gearef(context, Semaphore)->next); } __code vOperationSemaphoreImpl(struct SemaphoreImpl* semaphore, __code next(...)) { - pthread_mutex_lock(&semaphore->mutex); - semaphore->value++; - struct Queue* queue = semaphore->waitThreadQueue; - goto queue->isEmpty(vOperationSemaphoreImpl1, vOperationSemaphoreImpl3); + struct Lock* lock = semaphore->lock; + goto lock->doLock(vOperationSemaphoreImpl1); } __code vOperationSemaphoreImpl1(struct SemaphoreImpl* semaphore, __code next(...)) { + semaphore->value++; struct Queue* queue = semaphore->waitThreadQueue; - goto queue->take(vOperationSemaphoreImpl2); + goto queue->isEmpty(vOperationSemaphoreImpl2, vOperationSemaphoreImpl4); +} + +__code vOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, __code next(...)) { + struct Queue* queue = semaphore->waitThreadQueue; + goto queue->take(vOperationSemaphoreImpl3); } -__code vOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, struct Context* waitTask, __code next(...)) { +__code vOperationSemaphoreImpl3(struct SemaphoreImpl* semaphore, struct Context* waitTask, __code next(...)) { struct TaskManager* taskManager = waitTask->taskManager; - goto taskManager->spawn(waitTask, vOperationSemaphoreImpl3); //notify + goto taskManager->spawn(waitTask, vOperationSemaphoreImpl4); //notify } -__code vOperationSemaphoreImpl2_stub(struct Context* context) { +__code vOperationSemaphoreImpl3_stub(struct Context* context) { SemaphoreImpl* semaphoreImpl = (SemaphoreImpl*)GearImpl(context, Semaphore, semaphore); struct Context* waitTask = &Gearef(context, Queue)->data->Context; - goto vOperationSemaphoreImpl2(context, + goto vOperationSemaphoreImpl3(context, semaphoreImpl, waitTask, Gearef(context, Semaphore)->next); } -__code vOperationSemaphoreImpl3(struct SemaphoreImpl* semaphore, __code next(...)) { - pthread_mutex_unlock(&semaphore->mutex); - goto next(...); +__code vOperationSemaphoreImpl4(struct SemaphoreImpl* semaphore, __code next(...)) { + struct Lock* lock = semaphore->lock; + goto lock->doUnlock(next(...)); }
--- a/src/parallel_execution/examples/boundedBuffer/main.cbc Wed Jan 03 18:22:14 2018 +0900 +++ b/src/parallel_execution/examples/boundedBuffer/main.cbc Wed Jan 03 18:22:38 2018 +0900 @@ -51,6 +51,8 @@ __code createTask1(struct TaskManager* taskManager) { struct Buffer* buffer = createBoundedBuffer(context, buffer_size); par goto producer(buffer, __exit); + par goto producer(buffer, __exit); + par goto consumer(buffer, __exit); par goto consumer(buffer, __exit); par goto initBuffer(buffer, __exit); goto code2();