Mercurial > hg > Papers > 2012 > kazz-master
view paper/chapter3.tex @ 20:6045705ee5fb
add Remote Data Segment
author | kazz <kazz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Wed, 08 Feb 2012 07:13:50 +0900 |
parents | 4a1a8c51d9bc |
children | ef806c34566e |
line wrap: on
line source
\chapter{分散フレームワーク Alice の実装} 前章では、分散ネットワークアプリケーションフレームワーク Alice の設計について示した。 本章では、それらの設計を踏まえ、 Java でどのように実装を行うのがよいかということから始め、具体的にどのような実装を行ったかについて、詳細に記すことにする。 \section{Java} Federated Linda は Java を用いて実装が行われていたが、シングルスレッドを用いた設計であったがために、並列処理が含まれていなかった。今回の実装を行う前に並列処理の観点から、 Java について改めて見直し、フレームワークの実装方法について考えることにした。 \subsection{Java の選定} Alice の実装に Java を選んだ理由は、充実したライブラリ群の存在である。 java.io、 java.nio 等の入出力ライブラリを始め、 java.net 等のネットワークライブラリも揃っている。また、並列処理ライブラリである java.util.concurrent が非常に強力である。ここでは java.util.concurrent の非常に便利な部品を挙げる。 \subsubsection{java.util.concurrent.BlockingQueue} BlockingQueue は、並列で使用できるキュー構造である。複数のスレッドがデータの追加、取り出しを行なっても、問題なく動作する。 また、キュー内にデータがない場合、取り出しを行うときにブロッキングが入る。(ソースコード \ref{src:blockingQueue})そのため、データが来るまで待つという処理を記述することができる。 \begin{lstlisting}[label=src:blockingQueue, caption=キューにデータがない場合、処理がブロックされる] // 別スレッドで blockingQueue.put(value); されるまでブロック blockingQueue.take(); \end{lstlisting} そういった機能を持つ故に BlockingQueue は異なるスレッド間で通信を行うのにしばしば用いられる。例えば、 SEDA アーキテクチャで作られた Cassandra の実装においても、ステージスレッド間でパイプライン処理をする際、データの受け渡しに使用されている。 \subsubsection{java.util.concurrent.ConcurrentMap} ConcurrentMap は並列で使用できるハッシュ構造である。複数のスレッドがデータを追加、読み込み、削除を行なっても問題なく動作する。 キーを指定してデータの存在を確認し、データが無ければ追加するといった場合を考える。(ソースコード \ref{src:concurrentMap1})他のスレッドが同時に同じキーで異なるデータを書きこみを行おうとした場合、通常は排他処理ができていないため、不整合が発生する。しかし、 ConcurrentMap の putIfAbsent メソッドを用いれば、原子的に確認とデータ操作を行うため、その問題を解消することができる。(ソースコード \ref{src:concurrentMap2}) \begin{lstlisting}[label=src:concurrentMap1, caption=キーの確認とデータ操作が排他処理されていない] if (!map.containsKey(key)) return map.put(key, value); else return map.get(key); \end{lstlisting} \begin{lstlisting}[label=src:concurrentMap2, caption=キーの確認とデータ操作が内部で原子的に実行される] return map.putIfAbsent(key, value); \end{lstlisting} \subsubsection{java.util.concurrent.Executor} Executor は受け取った Runnable タスクを実行するオブジェクトである。(ソースコード \ref{src:executor}) Executor で様々な種類のスレッドプールを作成することができる。 \begin{lstlisting}[label=src:executor, caption=スレッドプールに Runnable を投入] executor.execute(runnable); \end{lstlisting} \subsubsection{java.util.concurrent.atomic.AtomicInteger} AtomicInteger は、原子的なデータ更新が可能な int 値である。 例えば、データを取得してその後にインクリメントを行うという処理を考える。(ソースコード \ref{src:increment1}) synchronized ブロックで囲み、排他制御を行わないと、データの不整合を起こす。 そこで、 AtomicInteger の getAndIncrement()を用いることで、取得と変更の処理を原子的に行うことが可能になる。(ソースコード \ref{src:increment2}) \begin{lstlisting}[label=src:increment1, caption=synchronized を用いて排他制御を行う必要がある] synchronized (i) { num = i++; } \end{lstlisting} \begin{lstlisting}[label=src:increment2, caption=getAndIncrement() を用いると原子的に実行される。] num = i.getAndIncrement(); \end{lstlisting} この機能は、複数のスレッドで共有された ID を重複なく連番発行する処理などで用いられる。 このような並列処理において利便性が高く性能のよいライブラリが揃っているため、 Java を実装に用いるプログラミング言語に選定した。 \section{Data Segment の詳細な設計と実装} まず、 Alice の実装を行うにあたって、データベース機能はもちろんのこと、タスク間通信やネットワーク間通信の要となる Data Segment の設計が重要である。本節では、 Data Segment の詳細な設計と実装を記す。 \subsection{Data Segment Manager} 大量の Data Segment を管理するのが Data Segment Manager である。 Data Segment Manager は、文字列のキーで Data Segment を整理する。 キーごとにデータを出し入れすることになる。 各キーごとに、キュー構造を持っている。(図 \ref{fig:datasegmentKey}) それらを Data Segment API を用いて操作する。 \begin{figure}[htbp] \begin{center} \includegraphics[width=60mm]{./images/datasegment_key.pdf} \end{center} \caption{キーごとにデータがキュー構造で管理される} \label{fig:datasegmentKey} \end{figure} また、データの読み出し("peek" または "take")時に、希望のデータがなかった場合、ブロッキングを行う機能を持つ。 しかし、ブロッキングといってもそこで同期するわけではない。 非同期でデータを通信する。 そのため、 "peek" と "take" は他の API とは違い、レスポンスが発生する。(図 \ref{fig:datasegmentReceiver}) \begin{figure}[htbp] \begin{center} \includegraphics[width=80mm]{./images/datasegment_receiver.pdf} \end{center} \caption{"peek" や "take" に対して、レスポンスが発生する} \label{fig:datasegmentReceiver} \end{figure} \subsection{Data Segment API} Data Segment Manager をユーザーが操作できるインタフェース、それが Data Segment API である。 まずは以下のとおり Data Segment API を詳細に定義した。 \begin{itemize} \item {\ttfamily void put(String key, Value val)} \item {\ttfamily void update(String key, Value val)} \item {\ttfamily void peek(Receiver receiver, String key, int id)} \item {\ttfamily void take(Receiver receiver, String key, int id)} \end{itemize} \subsubsection{"put"} "put" はデータを追加するための API である。 "put" は受け取ったデータ val を Data Segment 内のキューに対してエンキューする。 この時、キーごとに重複しない連番の ID を受け取った順に振る。(図 \ref{fig:put}) \begin{figure}[htbp] \begin{center} \includegraphics[width=100mm]{./images/put.pdf} \end{center} \caption{"put" は重複しない ID を振りながらデータを追加する} \label{fig:put} \end{figure} \subsubsection{"update"} "update" はデータを置き換えるための API である。 "update" はキューの先頭にあるデータをひとつだけ削除する。 その後は "put" と同じく、 受け取ったデータ val を Data Segment 内のキューに対してエンキューする。 この時、キーごとに重複しない連番の ID を受け取った順に振る。(図 \ref{fig:update}) \begin{figure}[htbp] \begin{center} \includegraphics[width=100mm]{./images/update.pdf} \end{center} \caption{"update" は先頭データを取り除き、重複しない ID を振りながらデータを追加する} \label{fig:update} \end{figure} \subsubsection{"peek"} "peek" はデータを読み込むための API である。 "peek" は前回読み込んだデータの id を引数で指定する。省略した場合は、 0 が id として渡される。 id よりも値の大きい id のデータがキューに含まれていれば、そのデータを receiver に返す。 もし id 以下のデータしか無いならば、データの更新が前回の "peek" 発行時から更新が無いものと考え、リストに格納されて保留される。(図 \ref{fig:peek}) "take" や "update" によりデータの更新があれば、 "peek" が直ちに実行される。 \begin{figure}[htbp] \begin{center} \includegraphics[width=90mm]{./images/peek.pdf} \end{center} \caption{"peek" はデータを receiver に読み込む。希望のデータがない場合は保留する} \label{fig:peek} \end{figure} \subsubsection{"take"} "take" もデータを読み込むための API である。 基本的な id に関する部分は "peek" と同じである。 "peek" との決定的な違いは、読み込まれたデータは Data Segment 内のキューから取り除かれるということである。(図 \ref{fig:take}) \begin{figure}[htbp] \begin{center} \includegraphics[width=70mm]{./images/take.pdf} \end{center} \caption{"take" はデータを receiver に読み込む。その際、読み込んだデータは削除される} \label{fig:take} \end{figure} \subsection{コマンドを処理する流れ} これらの API から発行されたコマンドを Data Segment Manager は複数のスレッドから受け取る。 その後、 ConcurrentHashMap で文字列であるキーから Data Segment を解決する。 キーが異なればデータセグメント間に依存関係は全く無いので、別スレッドでこれらの API を処理する事ができる。(図 \ref{fig:datasegmentKeyThread}) \begin{figure}[htbp] \begin{center} \includegraphics[width=60mm]{./images/datasegment_key_thread.pdf} \end{center} \caption{キーごとに Data Segment を処理する Thread を持つ} \label{fig:datasegmentKeyThread} \end{figure} Data Segment Manager から各キーの Thread へのコマンドの受け渡しには、\\ java.util.concurrent.LinkedBlockingQueue が使われる。これをコマンドキューと呼ぶことにする。 各キーの Thread では、コマンドキューが空になるまでコマンドを繰り返し取り出す。 その取り出したコマンドに従って処理が行われる。 キューが空になったときは、次のコマンドが挿入されるまでブロックされる。 Data Segment Manager はユーザーが API を使うと、コマンドを作成し、キーから Data Segment を探し、そのコマンドキューに挿入する。(図 \ref{fig:datasegmentCommandQueue}) \begin{figure}[htbp] \begin{center} \includegraphics[width=100mm]{./images/datasegment_command_queue.pdf} \end{center} \caption{Data Segment Manager はコマンドを作成し、コマンドキューに渡す} \label{fig:datasegmentCommandQueue} \end{figure} \subsection{Data Segment のデータ表現} Data Segment のデータ表現には MessagePack を利用する。 Java 版の MessagePack の実装 MessagePack for Java を利用して実装を行う。 MessagePack に関して Java におけるデータ表現は以下の3段階ある。 これらのデータ表現は、型の種類等の制限を伴うが互いに変換可能である。 \begin{enumerate} \item {一般的な Java のクラスオブジェクト} \item {MessagePack for Java の {\ttfamily Value} オブジェクト} \item {{\ttfamily byte[]} で表現されたバイナリ} \end{enumerate} Data Segment API では、この MessagePack for Java の Value オブジェクトを用いてデータを表現する。 MessagePack は Java のように静的に型付けされたオブジェクトではなく、自己記述的なデータ形式である。 MessagePack for Java の Value オブジェクトは MessagePack のバイナリにシリアライズできる型のみで構成された Java のオブジェクトである。 そのため、 Value も自己記述式のデータ形式となっている。 例えば、 ArrayValue を用いれば、ユーザーがデータを後付けで繋げたりすることも可能になる。 この Value オブジェクトは通信に関わる時は、シリアライズ・デシリアライズを高速に行うことができ、ユーザーは便利なメソッドを用いてオブジェクト内部のデータを閲覧、編集できる。例えば、 Value の toString メソッドは、 JSON 形式で出力してくれる。(ソースコード \ref{src:msgpack2}) このように MessagePack Value は Java の静的な型付けを脱却しようとした形式である。 また、ユーザーは一般的なクラスを IDL (Interface Definition Language) のように用いてデータを表現してもよい。 そのように使う場合には、クラス宣言時に {\ttfamily @Message} というアノテーションを付けるだけでよい。(ソースコード \ref{src:msgpack1})もちろん、 MessagePack で扱うことができるデータのみをフィールドに入れなくてはならない。 \begin{lstlisting}[label=src:msgpack1, caption=一般的なクラスを IDL のように使用] import org.msgpack.annotation.Message @Message public class MessagePackTest { public String key; public int val; } \end{lstlisting} \begin{lstlisting}[label=src:msgpack2, caption=一般クラスオブジェクトから Value に逆変換して JSON 形式で出力] msgpackTest = new MessagePackTest(); msgpackTest.key = "Test"; msgpackTest.val = 1000; MessagePack msgpack = new MessagePack(); Value value = msgpack.unconvert(msgpackTest); System.out.println(value); // ["Test",1000] \end{lstlisting} MessagePack for Java は内部で {\ttfamily @Message} アノテーションが付けられたクラスを変換する時に、 Javassist を用いて動的にテンプレートを生成してコンパイルしている。そのため高速に Value オブジェクトと一般クラスオブジェクトの変換、逆変換を行うことができる。 また、 MessagePack は Packer と Unpacker を用いることにより、次から次へとストリームからシーケンシャルにバイナリをシリアライズ・デシリアライズすることもできる。 そのため、通信を行うときの入出力部分のコード記述もシンプルになる。(ソースコード \ref{src:msgpack3}) 従来のプロトコルでは、通信を行う際、パケットサイズなどを含んだ固定長のヘッダーなどを作成しなくてはいけなかったが、 MessagePack は自己記述的なデータ形式なので、先頭にデータの長さが含まれているため、プログラマーが管理する固定長ヘッダーは必要無くなる。 \begin{lstlisting}[label=src:msgpack3, caption=Unpacker を用いると通信入力部分の記述がシンプルになる] MessagePack msgpack = new MessagePack(); Unpacker unpacker = msgpack.createUnpacker(socket.getInputStream()); while (true) { CommandMessage msg = unpacker.read(CommandMessage.class); // block // メッセージを受け取った後の処理を記述するだけでよい。 } \end{lstlisting} \subsection{Remote Data Segment Manager} これまで説明してきた Data Segment Manager はローカルで動作する Data Segment Manager である。これからリモート接続版の Data Segment Manager へと拡張するにあたって区別のためにローカルで動作する Data Segment Manager を Local Data Segment Manager とする。これに対し、リモート接続したホスト上の Local Data Segment Manager の操作を行う機構を Remote Data Segment Manager とする。(図 \ref{fig:remoteDatasegment}) \begin{figure}[htbp] \begin{center} \includegraphics[width=80mm]{./images/remote_datasegment.pdf} \end{center} \caption{Remote Data Segment Manager は、他のマシン上の Local Data Segment Manager を操作できる} \label{fig:remoteDatasegment} \end{figure} ローカルでもリモートでも、 Data Segment Manager の API は共通なので、継承して実装する。(図 \ref{datasegmentInheritance})そのことにより、 Remote Data Segment Manager にアクセスするときもローカルと同じようにアクセスすることができる。 \begin{figure}[htbp] \begin{center} \includegraphics[width=70mm]{./images/datasegment_inheritance.pdf} \end{center} \caption{Data Segment Manager を継承して、 Local DSM と Remote DSM を作成する} \label{fig:datasegmentInheritance} \end{figure}