Mercurial > hg > Papers > 2018 > parusu-master
changeset 10:a0f9def49535
Add sections
author | Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Mon, 22 Jan 2018 07:04:13 +0900 |
parents | 729602c52c46 |
children | 10cb71d30fc0 |
files | paper/fig/sendTask.graffle paper/gearsOS.tex paper/master_paper.pdf paper/parallelism_gears.tex paper/src/atomicImpl.cbc paper/src/atomicInterface.h paper/src/createCPUWorker.cbc paper/src/createTaskManager.cbc paper/src/sendTask.cbc paper/src/stubCodeGear.cbc paper/src/taskManagerInterface.cbc paper/src/workerRun.cbc |
diffstat | 12 files changed, 207 insertions(+), 9 deletions(-) [+] |
line wrap: on
line diff
--- a/paper/gearsOS.tex Sun Jan 21 02:09:28 2018 +0900 +++ b/paper/gearsOS.tex Mon Jan 22 07:04:13 2018 +0900 @@ -99,20 +99,25 @@ \begin{itemize} \item Code Gear の名前と関数ポインタとの対応表 + Code Gear の名前とポインタの対応は Context 内の code(\coderef{context} 4行目) に格納される。 code は全ての Code Gear を列挙した enum と関数ポインタの組で表現される。 Code Gear の名前は enum で定義され、コンパイル後には整数へと変換される。 実際に Code Gear に接続する際は番号(enum)を指定することで接続を行う。 これにより、メタ計算の実行時に接続する Code Gear を動的に切り替えることが可能となる。 \item Data Gear の Allocation 用の情報 + Data Gear のメモリ空間は事前に領域を確保した後、必要に応じてその領域を割り当てることで実現する。 実際に Allocation する際は Context内の heap(\coderef{context} 8行目)を Data Gear のサイズ分インクリメントすることで実現する。 \item Code Gear が参照する DataGear へのポインタ + Allocation で生成した Data Gear へのポインタは Context 内のdata(\coderef{context} 6行目) に格納される。 Code Gear は data を参照して Data Gear へアクセスする。 \item 並列実行用の Task 情報 + Context は 並列実行の Task も兼任するため、待っている Input Data Gear のカウンタ、Input/Output Data Gear が格納されている場所を示すインデックス、GPU での実行フラグ等を持っている(\coderef{context} 13-30行目)。 \item Data Gear の型情報 + Data Gear は構造体を用いて定義する(\coderef{context} 34-49行目)。Timer や TimerImplなどの構造体が Data Gear に相当する。 メタ計算では任意のData Gear を一律に扱うため、全ての Data Gear の共用体を定義する(\coderef{context} 33-51行目)。 Data Gear を確保する際のサイズはこの型情報から決定する。 @@ -155,7 +160,7 @@ \coderef{stubCodeGear} に stub Code Gear の例を示す。 stub Code Gear は使用される全ての Code Gear 毎に記述する必要がある。 -しかし、全ての Code Gear に対して stub Code Gear を記述するのは膨大な記述量になってしまうため、基本的にはスクリプトで自動生成する。 +しかし、全ての Code Gear に対して stub Code Gear を記述するのは膨大な記述量になってしまうため、Interface を実装した Code Gear などの型が決まっており、引数が格納されている場所がわかる stub Code Gear はスクリプトで自動生成する。 \lstinputlisting[caption=stub Code Gear, label=code:stubCodeGear]{./src/stubCodeGear.cbc}
--- a/paper/parallelism_gears.tex Sun Jan 21 02:09:28 2018 +0900 +++ b/paper/parallelism_gears.tex Mon Jan 22 07:04:13 2018 +0900 @@ -18,13 +18,13 @@ \coderef{createTaskManager} に TaskManager Interface を実装した Data Gear の初期化部分を示す。 TaskManager は初期化の際に、指定した数の Worekr を生成する。 -その際CPU、GPU の数を指定することができ、指定した分の CPUWorker と GPUWorker が \coderef{createTaskManager} の createWorker 関数で生成される。 +その際CPU、GPU の数を指定することができ、指定した分の CPUWorker と GPUWorker が createWorker 関数 (\coderef{createTaskManager} 27-46行目) で生成される。 \lstinputlisting[caption=TaskManager の初期化, label=code:createTaskManager]{./src/createTaskManager.cbc} -TaskManager は \coderef{sendTask}のように spawn を呼び出した際、実行する Task の Input Data Gear が用意されているかをチェックする。 -Input Data Gear が全て用意されている場合、その Task を Worker の TaskQueue に送信する。 -送信する Worker は Task を実行する環境(CPU、GPU) によって決定する。 +TaskManager は \figref{sendTask}に示すように spawn を呼び出した際、実行する Task の Input Data Gear が用意されているかを判断する。 +Input Data Gear が全て用意されている場合、その Task を Worker の Queue に送信する。 + 送信する Worker は Task を実行する環境(CPU、GPU) によって決定する。 \begin{figure}[htbp] \begin{center} @@ -34,12 +34,52 @@ \label{fig:sendTask} \end{figure} -実際にTask を Worker に送信する処理を\coderef{sendTask}に示す。 +\section{Worker} +Worker は自身の Queue から Task を取得し、Task の Code Gear を実行し、 Output Data Gear の書き出しを行っている。 + +\coderef{createCPUWorker} に Task を CPU で実行する CPUWorker の初期化部分を示す。 +CPUWorker は初期化の際に スレッドを生成する(\coderef{createCPUWorker} 10行目)。 +生成されたスレッドはまず startWorker 関数(\coderef{createCPUWorker} 14-21行目)を呼び出し、このスレッド用の Context を生成する。 +Context をスレッド毎に生成することで、メモリ空間をスレッドごとに持てるため Gearefマクロ で interface の引数を取得する際の競合、メモリ確保の処理での他のスレッドの停止を防ぐ事ができる。 + +\lstinputlisting[caption=CPUWorker の初期化, label=code:createCPUWorker]{./src/createCPUWorker.cbc} + +Context の生成後は Queue から Task を取得する Code Gear へ継続する。 +Task は Context なので、 Worker は Context を入れ替えて Task の実行を行う。 + +CPUWorker での Task の実行を\coderef{workerRun} に示す。 +\coderef{workerRun} は Context の入れ替えを行うため、 Meta Code Gear として記述されており、getTaskCPUWorker(\coderef{workerRun} 1-9行目)の引数に Context を記述している。 +CPUWorker は中身が NULL の task を取得すると Worker の終了処理を行う(\coderef{workerRun} 2-4 行目)。 +Task が取得できた場合 Task の実行後に継続する Code Gear を格納し(\coderef{workerRun} 7行目)、 Task を Context としてCode Gear に継続する(\coderef{workerRun} 8行目)。 +Task の実行後に継続する Code Gear は Data Gear の書き出しと依存関係の解決を行う。 + +\lstinputlisting[caption=CPUWorker でのTaskの実行, label=code:workerRun]{./src/workerRun.cbc} -\lstinputlisting[caption=Task の送信処理, label=code:sendTask]{./src/sentTask.cbc} +\section{SynchornizedQueue} +SynchornizedQueue は Worker の Queue として使用される。 +Worker の Queue は TaskManager を経由して Task を送信するスレッドと Task を取得する Worker 自身のスレッドで扱われる。 +そのため SynchornizeQueue はマルチスレッドでもデータの一貫性を保証する Queue を実装する必要がある。 + +Gears OS では マルチスレッドでもデータの一貫性を保証するために CAS(Check and Set、Compare and Swap) を利用した Queue\cite{queue} を実装している。 +CAS は値の比較、更新をアトミックに行う命令である。 +CAS を使う際は更新前の値と更新後の値を渡し、渡された更新前の値を実際に保存されているメモリ番地の値と比較し、同じならデータ競合がないため、データの更新に成功する。 +異なる場合は他に書き込みがあったとみなされ、値の更新に失敗する。 + +Gears OS ではこの CAS を行うための Interface を定義している(\coderef{atomicInterface})。 +この Interface では、 Data Gear 全てを内包している Data 共用体のポインタの値を更新する CAS を定義している(\coderef{atomicInterface} 6行目)。 -\section{Worker} -\section{SynchornizedQueue} +\lstinputlisting[caption=AtomicInterface, label=code:atomicInterface]{./src/atomicInterface.h} + +AtomicInterface での CAS の実際の実装を \coderef{atomicImpl} に示す。 +実際の実装では \_\_sync\_bool\_compare\_and\_swap 関数を呼び出すことで CAS を行う(\coderef{atomicImpl} 2行目)。 +この関数は第一引数に渡されたアドレスに対して第二引数の値から第三引数の値ヘ CAS を行う。 +CAS に成功した場合、 true を返し、 失敗した場合は false を返す。 +\coderef{atomicImpl} では CAS に成功した場合と失敗した場合それぞれに対応した Code Gear へ継続する。 + +\lstinputlisting[caption=CAS の実装, label=code:atomicImpl]{./src/atomicImpl.cbc} + +SynchornizedQueue では\coderef{atomicImpl} で示した Code Gear を呼び出して実装を行う。 + \section{依存関係の解決} \section{並列処理の記述} \section{Iterator}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/paper/src/atomicImpl.cbc Mon Jan 22 07:04:13 2018 +0900 @@ -0,0 +1,6 @@ +__code checkAndSetAtomicReference(struct AtomicReference* atomic, union Data** ptr, union Data* oldData, union Data* newData, __code next(...), __code fail(...)) { + if (__sync_bool_compare_and_swap(ptr, oldData, newData)) { + goto next(...); + } + goto fail(...); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/paper/src/atomicInterface.h Mon Jan 22 07:04:13 2018 +0900 @@ -0,0 +1,9 @@ +typedef struct Atomic<Impl>{ + union Data* atomic; + union Data** ptr; + union Data* oldData; + union Data* newData; + __code checkAndSet(Impl* atomic, union Data** ptr, union Data* oldData, union Data* newData, __code next(...), __code fail(...)); + __code next(...); + __code fail(...); +} Atomic;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/paper/src/createCPUWorker.cbc Mon Jan 22 07:04:13 2018 +0900 @@ -0,0 +1,21 @@ +Worker* createCPUWorker(struct Context* context, int id, Queue* queue) { + struct Worker* worker = new Worker(); + struct CPUWorker* cpuWorker = new CPUWorker(); + worker->worker = (union Data*)cpuWorker; + worker->tasks = queue; + cpuWorker->id = id; + cpuWorker->loopCounter = 0; + worker->taskReceive = C_taskReceiveCPUWorker; + worker->shutdown = C_shutdownCPUWorker; + pthread_create(&worker->thread, NULL, (void*)&startWorker, worker); + return worker; +} + +static void startWorker(struct Worker* worker) { + struct CPUWorker* cpuWorker = &worker->worker->CPUWorker; + cpuWorker->context = NEW(struct Context); + initContext(cpuWorker->context); + Gearef(cpuWorker->context, Worker)->worker = (union Data*)worker; + Gearef(cpuWorker->context, Worker)->tasks = worker->tasks; + goto meta(cpuWorker->context, worker->taskReceive); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/paper/src/createTaskManager.cbc Mon Jan 22 07:04:13 2018 +0900 @@ -0,0 +1,46 @@ +TaskManager* createTaskManagerImpl(struct Context* context, int numCPU, int numGPU, int numIO) { + struct TaskManager* taskManager = new TaskManager(); + taskManager->spawnTasks = C_spawnTasksTaskManagerImpl; + taskManager->spawn = C_spawnTaskManagerImpl; + taskManager->shutdown = C_shutdownTaskManagerImpl; + taskManager->incrementTaskCount = C_incrementTaskCountTaskManagerImpl; + taskManager->decrementTaskCount = C_decrementTaskCountTaskManagerImpl; + taskManager->setWaitTask = C_setWaitTaskTaskManagerImpl; + struct TaskManagerImpl* taskManagerImpl = new TaskManagerImpl(); + // 0...numIO-1 IOProcessor + // numIO...numIO+numGPU-1 GPUProcessor + // numIO+numGPU...numIO+numGPU+numCPU-1 CPUProcessor + taskManagerImpl->io = 0; + taskManagerImpl->gpu = numIO; + taskManagerImpl->cpu = numIO+numGPU; + taskManagerImpl->maxCPU = numIO+numGPU+numCPU; + taskManagerImpl->numWorker = taskManagerImpl->maxCPU; + taskManagerImpl->sendGPUWorkerIndex = taskManagerImpl->gpu; + taskManagerImpl->sendCPUWorkerIndex = taskManagerImpl->cpu; + taskManagerImpl->taskCount = 0; + taskManagerImpl->loopCounter = 0; + createWorkers(context, taskManagerImpl); + taskManager->taskManager = (union Data*)taskManagerImpl; + return taskManager; +} + +void createWorkers(struct Context* context, TaskManagerImpl* taskManager) { + int i = 0; + taskManager->workers = (Worker**)ALLOCATE_PTR_ARRAY(context, Worker, taskManager->maxCPU); + for (;i<taskManager->gpu;i++) { + Queue* queue = createSynchronizedQueue(context); + taskManager->workers[i] = (Worker*)createCPUWorker(context, i, queue); + } + for (;i<taskManager->cpu;i++) { + Queue* queue = createSynchronizedQueue(context); +#ifdef USE_CUDAWorker + taskManager->workers[i] = (Worker*)createCUDAWorker(context, i, queue,0); +#else + taskManager->workers[i] = (Worker*)createCPUWorker(context, i, queue); +#endif + } + for (;i<taskManager->maxCPU;i++) { + Queue* queue = createSynchronizedQueue(context); + taskManager->workers[i] = (Worker*)createCPUWorker(context, i, queue); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/paper/src/sendTask.cbc Mon Jan 22 07:04:13 2018 +0900 @@ -0,0 +1,32 @@ +__code spawnTaskManagerImpl(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) { + if (task->idgCount == 0) { + goto taskSend(); + } + goto next(...); +} + +__code taskSend(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) { + if (task->gpu) { + goto taskSend1(); + } else { + goto taskSend2(); + } +} + +__code taskSend1(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) { + int workerId = taskManager->sendGPUWorkerIndex; + if(++taskManager->sendGPUWorkerIndex >= taskManager->cpu) { + taskManager->sendGPUWorkerIndex = taskManager->gpu; + } + struct Queue* queue = taskManager->workers[workerId]->tasks; + goto queue->put(task, next(...)); +} + +__code taskSend2(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) { + int workerId = taskManager->sendCPUWorkerIndex; + if(++taskManager->sendCPUWorkerIndex >= taskManager->maxCPU) { + taskManager->sendCPUWorkerIndex = taskManager->cpu; + } + struct Queue* queue = taskManager->workers[workerId]->tasks; + goto queue->put(task, next(...)); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/paper/src/stubCodeGear.cbc Mon Jan 22 07:04:13 2018 +0900 @@ -0,0 +1,10 @@ +__code clearSingleLinkedQueue(struct Context *context,struct SingleLinkedQueue* queue, enum Code next) { + queue->top = NULL; + goto meta(context, next); +} + +__code clearSingleLinkedQueue_stub(struct Context* context) { + SingleLinkedQueue* queue = (SingleLinkedQueue*)GearImpl(context, Queue, queue); + enum Code next = Gearef(context, Queue)->next; + goto clearSingleLinkedQueue(context, queue, next); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/paper/src/taskManagerInterface.cbc Mon Jan 22 07:04:13 2018 +0900 @@ -0,0 +1,13 @@ +typedef struct TaskManager<Impl>{ + union Data* taskManager; + struct Context* task; + struct Element* taskList; + __code spawn(Impl* taskManager, struct Context* task, __code next(...)); + __code spawnTasks(Impl* taskManagerImpl, struct Element* taskList, __code next1(...)); + __code setWaitTask(Impl* taskManagerImpl, struct Context* task, __code next(...)); + __code shutdown(Impl* taskManagerImpl, __code next(...)); + __code incrementTaskCount(Impl* taskManagerImpl, __code next(...)); + __code decrementTaskCount(Impl* taskManagerImpl, __code next(...)); + __code next(...); + __code next1(...); +} TaskManager;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/paper/src/workerRun.cbc Mon Jan 22 07:04:13 2018 +0900 @@ -0,0 +1,16 @@ +__code getTaskCPUWorker(struct Context* context, struct CPUWorker* cpuWorker, struct Context* task, struct Worker* worker) { + if (!task) { + goto worker->shutdown(); // end thread + } + task->worker = worker; + enum Code taskCg = task->next; + task->next = C_odgCommitCPUWorker; // commit outputDG after task exec + goto meta(task, taskCg); // switch task context +} + +__code getTaskCPUWorker_stub(struct Context* context) { + CPUWorker* cpuWorker = (CPUWorker*)GearImpl(context, Worker, worker); + Worker* worker = &Gearef(context,Worker)->worker->Worker; + struct Context* task = &Gearef(context, Queue)->data->Context; + goto getTaskCPUWorker(context, cpuWorker, task, worker); +}