Mercurial > hg > Papers > 2018 > parusu-master
diff paper/src/takeSynchronizedQueue.cbc @ 14:c615afa6c8b9
Update
author | Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Wed, 24 Jan 2018 03:27:17 +0900 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/paper/src/takeSynchronizedQueue.cbc Wed Jan 24 03:27:17 2018 +0900 @@ -0,0 +1,97 @@ + +#include "../context.h" +#interface "Queue.h" +#interface "Atomic.h" + +#include <stdio.h> + +/* + * Non-blocking queue of Paper: Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms(https://www.research.ibm.com/people/m/michael/podc-1996.pdf). + */ + +Queue* createSynchronizedQueue(struct Context* context) { + struct Queue* queue = new Queue(); + struct SynchronizedQueue* synchronizedQueue = new SynchronizedQueue(); + synchronizedQueue->top = new Element(); // allocate a free node + synchronizedQueue->top->next = NULL; + synchronizedQueue->last = synchronizedQueue->top; + synchronizedQueue->atomic = createAtomicReference(context); + queue->queue = (union Data*)synchronizedQueue; + queue->take = C_takeSynchronizedQueue; + queue->put = C_putSynchronizedQueue; + queue->isEmpty = C_isEmptySynchronizedQueue; + queue->clear = C_clearSynchronizedQueue; + return queue; +} + +__code clearSynchronizedQueue(struct SynchronizedQueue* queue, __code next(...)) { + struct Element* top = queue->top; + struct Atomic* atomic = queue->atomic; + goto atomic->checkAndSet(&queue->top, top, NULL, next(...), clearSynchronizedQueue); +} + +__code putSynchronizedQueue(struct SynchronizedQueue* queue, union Data* data, __code next(...)) { + Element* element = new Element(); + element->data = data; + element->next = NULL; + Element* last = queue->last; + Element* nextElement = last->next; + if (last != queue->last) { + goto putSynchronizedQueue(); + } + if (nextElement == NULL) { + struct Atomic* atomic = queue->atomic; + goto atomic->checkAndSet(&last->next, nextElement, element, next(...), putSynchronizedQueue); + } else { // wrong last element + struct Atomic* atomic = queue->atomic; + goto atomic->checkAndSet(&queue->last, last, nextElement, putSynchronizedQueue, putSynchronizedQueue); + } +} + +__code takeSynchronizedQueue(struct SynchronizedQueue* queue, __code next(union Data* data, ...)) { + struct Element* top = queue->top; + struct Element* last = queue->last; + struct Element* nextElement = top->next; + if (top != queue->top) { + goto takeSynchronizedQueue(); + } + if (top == last) { + if (nextElement != NULL) { + struct Atomic* atomic = queue->atomic; + goto atomic->checkAndSet(&queue->last, last, nextElement, takeSynchronizedQueue, takeSynchronizedQueue); + } + } else { + struct Atomic* atomic = queue->atomic; + goto atomic->checkAndSet(&queue->top, top, nextElement, takeSynchronizedQueue1, takeSynchronizedQueue); + } + goto takeSynchronizedQueue(); +} + +__code takeSynchronizedQueue1(struct SynchronizedQueue* queue, __code next(union Data* data, ...), struct Element* nextElement) { + data = nextElement->data; + goto next(data, ...); +} + +__code takeSynchronizedQueue1_stub(struct Context* context) { + SynchronizedQueue* queue = (SynchronizedQueue*)GearImpl(context, Queue, queue); + enum Code next = Gearef(context, Queue)->next; + Data** O_data = &Gearef(context, Queue)->data; + goto takeSynchronizedQueue1(context, + queue, + next, + O_data, + (struct Element*)Gearef(context, Atomic)->newData); +} + +__code isEmptySynchronizedQueue(struct SynchronizedQueue* queue, __code next(...), __code whenEmpty(...)) { + struct Element* top = queue->top; + struct Element* last = queue->last; + struct Element* nextElement = top->next; + if (top != queue->top) { + goto isEmptySynchronizedQueue(); + } + if (top == last && nextElement == NULL) { + goto whenEmpty(...); + } + goto next(...); +}