# HG changeset patch
# User shoshi
# Date 1300371848 -32400
# Node ID fc19e38b669b27e4fd30a05836cdb0bca57b7806
# Parent 12604eb6b615247f297fac2e3537bd9a3566d1c2
added concurrent access client for cassandr
diff -r 12604eb6b615 -r fc19e38b669b CHANGELOG
--- a/CHANGELOG Mon Mar 14 23:24:38 2011 +0900
+++ b/CHANGELOG Thu Mar 17 23:24:08 2011 +0900
@@ -1,5 +1,7 @@
ChangeLog.
+2011-03-17
+ added concurrent access client for cassandra
2011-03-14
added javadoc treecms.api
2011-02-28
diff -r 12604eb6b615 -r fc19e38b669b src/treecms/api/Node.java
--- a/src/treecms/api/Node.java Mon Mar 14 23:24:38 2011 +0900
+++ b/src/treecms/api/Node.java Thu Mar 17 23:24:08 2011 +0900
@@ -12,12 +12,12 @@
{
/**
* Nodeに対応するNodeIDを取得します.
- * @return
+ * @return Nodeに対応するNodeID
*/
public NodeID getID();
/**
- * Nodeが保持するデータを取得します
+ * Nodeが保持するデータを取得します.クライアントはこのメソッドを用いて取得されるNodeDataを用いてNodeの内容を変更できません。
* @return Nodeが保持するNodeData
*/
public NodeData getData();
diff -r 12604eb6b615 -r fc19e38b669b src/treecms/api/NodeData.java
--- a/src/treecms/api/NodeData.java Mon Mar 14 23:24:38 2011 +0900
+++ b/src/treecms/api/NodeData.java Thu Mar 17 23:24:08 2011 +0900
@@ -142,6 +142,15 @@
}
/**
+ * NodeDataが保持しているすべてのキーと値の組のマップを返します.
+ * @return すべてのキーと値のマップ
+ */
+ public Map getAll()
+ {
+ return Collections.unmodifiableMap(m_attrs);
+ }
+
+ /**
* 子供Nodeのリストをクリアします.(すべて削除します.)
*/
public void clear()
diff -r 12604eb6b615 -r fc19e38b669b src/treecms/api/TreeEditor.java
--- a/src/treecms/api/TreeEditor.java Mon Mar 14 23:24:38 2011 +0900
+++ b/src/treecms/api/TreeEditor.java Thu Mar 17 23:24:08 2011 +0900
@@ -1,12 +1,14 @@
package treecms.api;
+import treecms.tree.util.PathNotFoundException;
+
/**
* 木構造を非破壊的に更新する機能を提供します.TreeEditorはTreeを非破壊的に更新していき,commitすることでTreeに更新を適用します.
* TreeEditor.getRootはcommitされていない状態のRootNodeを取得します.
* この機能は分散リポジトリを参考に考案されました.
* @author shoshi
*/
-public interface TreeEditor extends Tree
+public interface TreeEditor
{
/**
* 非破壊的に更新した木構造を適用します.
@@ -34,10 +36,17 @@
public void merge();
/**
- * 木構造を非破壊的に更新します.
+ * この木構造のルートNodeを返します。
+ * @return この木構造のルートNode
+ */
+ public Node getRoot();
+
+ /**
+ * 木構造を非破壊的に更新します.変更の対象となるNodeが木構造内に見つからない場合,PathNotFoundExceptionがスローされます.
* @param _target 更新する対象のNode
* @param _newData 新しいNodeに割り当てられるNodeData
* @return 更新された新しいNode
+ * @throws PathNotFoundException パスが見つからない場合
*/
- public Node updateTree(Node _target,NodeData _newData);
+ public Node updateTree(Node _target,NodeData _newData) throws PathNotFoundException;
}
diff -r 12604eb6b615 -r fc19e38b669b src/treecms/memory/OnMemoryForest.java
--- a/src/treecms/memory/OnMemoryForest.java Mon Mar 14 23:24:38 2011 +0900
+++ b/src/treecms/memory/OnMemoryForest.java Thu Mar 17 23:24:08 2011 +0900
@@ -10,79 +10,157 @@
import treecms.api.NodeID;
import treecms.tree.id.AbstractRandomNodeID;
+/**
+ * Forestのオンメモリ上の実装.
+ * このクラスはスレッドセーフだと思います.たぶん
+ * @author shoshi
+ */
public class OnMemoryForest implements Forest
{
+ //Nodeのマップ
Map m_table;
+ //最新版Nodeのマップ
Map m_tipTable;
+ /**
+ * コンストラクタ
+ */
public OnMemoryForest()
{
m_table = new ConcurrentHashMap();
m_tipTable = new ConcurrentHashMap();
}
- public OnMemoryNode createNode(NodeID _id,NodeData _newData)
+ /**
+ * 新しくNodeを作成します
+ * @param _id Nodeに適用されるNodeID
+ * @param _newData Nodeが保持するNodeData
+ * @return 作成されたOnMemoryNode
+ */
+ OnMemoryNode createNode(NodeID _id,NodeData _newData)
{
NodeID newID = (_id != null) ? _id : createID();
NodeData newData = (_newData != null) ? _newData : new NodeData();
+
+ //newIDとnewDataを元にOnMemoryNodeを生成する.
OnMemoryNode newNode = new OnMemoryNode(this,newID,newData);
+ //登録
m_table.put(newID,newNode);
m_tipTable.put(newID.getUUID(),newNode);
return newNode;
}
- NodeID createID()
+ /**
+ * 新しいNodeIDを作成します
+ * @return 新しいNodeID
+ */
+ private NodeID createID()
{
return new RandomNodeID(null);
}
+ /**
+ * NodeIDに対応するNodeを取得します
+ * @return NodeIDに対応するNode
+ */
@Override
public Node get(NodeID _id)
{
return m_table.get(_id);
}
+ /**
+ * 新しくNodeを作成します.
+ * @return 新しいNode
+ */
@Override
public Node create()
{
return createNode(null,null);
}
-
- class RandomNodeID extends AbstractRandomNodeID
+
+ /**
+ * NodeDataを保持した新しいNodeを作成します
+ * @return NodeDataを保持した新しいNode
+ */
+ @Override
+ public Node create(NodeData _data)
+ {
+ return createNode(null,_data);
+ }
+
+ /**
+ * 最新のNodeを取得します.
+ * @return UUIDに対応する最新のNode
+ */
+ @Override
+ public Node getTip(String _uuid)
{
- String m_uuid;
- long m_version;
+ return m_tipTable.get(_uuid);
+ }
+
+ /**
+ * ランダムにバージョン番号を生成するNodeIDです.ファクトリを内包します.
+ * @author shoshi
+ */
+ private static class RandomNodeID extends AbstractRandomNodeID
+ {
+ /**
+ * UUID
+ */
+ private String m_uuid;
+ /**
+ * バージョン番号(ランダム値)
+ */
+ private long m_version;
+
+ /**
+ * コンストラクタ
+ * @param _uuid 継承するUUID
+ */
public RandomNodeID(String _uuid)
{
- if(_uuid != null){
- m_uuid = _uuid;
- }else{
- m_uuid = UUID.randomUUID().toString();
- }
+ m_uuid = (_uuid != null) ? _uuid : UUID.randomUUID().toString();
m_version = (new Random()).nextLong();
}
+ /**
+ * 新しいRandomNodeIDを作成します。
+ * @return 新しいRandomNodeID
+ */
@Override
public NodeID create()
{
return new RandomNodeID(null);
}
+ /**
+ * UUIDを継承したRandomNodeIDを作成します.
+ * @return 新しいRandomNodeID
+ */
@Override
public NodeID update()
{
return new RandomNodeID(m_uuid);
}
+ /**
+ * UUIDを取得します.
+ * @return UUID
+ */
@Override
public String getUUID()
{
return m_uuid;
}
+ /**
+ * バージョンを取得します.
+ * @return バージョン
+ */
@Override
public String getVersion()
{
diff -r 12604eb6b615 -r fc19e38b669b src/treecms/memory/OnMemoryNode.java
--- a/src/treecms/memory/OnMemoryNode.java Mon Mar 14 23:24:38 2011 +0900
+++ b/src/treecms/memory/OnMemoryNode.java Thu Mar 17 23:24:08 2011 +0900
@@ -1,19 +1,29 @@
package treecms.memory;
-import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import treecms.api.Forest;
import treecms.api.Node;
import treecms.api.NodeData;
import treecms.api.NodeID;
-public class OnMemoryNode implements Node
+/**
+ * オンメモリ上でのNodeの実装です。
+ * @author shoshi
+ */
+class OnMemoryNode implements Node
{
- OnMemoryForest m_forest;
+ private OnMemoryForest m_forest;
- NodeID m_id;
- NodeData m_data;
+ private NodeID m_id;
+ private NodeData m_data;
+ /**
+ * コンストラクタです.
+ * @param _forest このNodeが属するForestです.
+ * @param _id このNodeのNodeIDです.
+ * @param _newData このNodeに割り当てるNodeDataです.防御的にコピーします.
+ */
public OnMemoryNode(OnMemoryForest _forest,NodeID _id,NodeData _newData)
{
m_id = _id;
@@ -21,21 +31,105 @@
m_data = (_newData != null) ? _newData.deepCopy() : new NodeData();
}
+ /**
+ * Nodeが属するForestを取得します.
+ * @return Nodeが属するForest
+ */
@Override
public Forest getForest()
{
return m_forest;
}
+ /**
+ * Nodeに対応するNodeIDを取得します.
+ * @return Nodeに対応するNodeID
+ */
@Override
public NodeID getID()
{
return m_id;
}
+ /**
+ * Nodeが保持するデータを取得します.クライアントはこのメソッドを用いて取得されるNodeDataを用いてNodeの内容を変更できません。
+ * @return Nodeが保持するNodeData
+ */
@Override
public NodeData getData()
{
- return m_data;
+ return m_data.deepCopy();
+ }
+
+ /**
+ * 指定されたNodeを子供Nodeとして追加します.
+ * @param _child
+ */
+ @Override
+ public void add(Node _child)
+ {
+ m_data.add(_child);
+ }
+
+ /**
+ * 指定されたリストに含まれるNodeを,すべて子供Nodeとして追加します.
+ * @param _children 追加される子供Nodeを保持するリスト
+ */
+ @Override
+ public void addAll(List _children)
+ {
+ m_data.addAll(_children);
+ }
+
+ /**
+ * 子供Nodeのリストを取得します..
+ * @return 子供Nodeのリスト
+ */
+ @Override
+ public List children()
+ {
+ return m_data.children();
+ }
+
+ /**
+ * このNodeが保持する値の中で指定されたキーと対応する値を取得します.
+ * @param _key データに対応するキー
+ * @return キーと対応する値,見つからない場合はnull
+ */
+ @Override
+ public byte[] get(byte[] _key)
+ {
+ return m_data.get(_key);
+ }
+
+ /**
+ * このNodeが保持するデータをマップとしてすべて取得します.
+ * @return Nodeが保持するすべてのデータのマップ
+ */
+ @Override
+ public Map getAll()
+ {
+ return m_data.getAll();
+ }
+
+ /**
+ * キーとそれに対応する値を保存します.キーが重複した場合は上書きされます.
+ * @param _key キー
+ * @param _value 値
+ */
+ @Override
+ public void put(byte[] _key, byte[] _value)
+ {
+ m_data.put(_key,_value);
+ }
+
+ /**
+ * キーとそれに対応する値を複数保持するマップを引数としてとり,マップが保持する値をすべて追加します.
+ * @param _map 追加される値のマップ
+ */
+ @Override
+ public void putAll(Map _map)
+ {
+ m_data.putAll(_map);
}
}
diff -r 12604eb6b615 -r fc19e38b669b src/treecms/memory/OnMemoryTree.java
--- a/src/treecms/memory/OnMemoryTree.java Mon Mar 14 23:24:38 2011 +0900
+++ b/src/treecms/memory/OnMemoryTree.java Thu Mar 17 23:24:08 2011 +0900
@@ -1,54 +1,159 @@
package treecms.memory;
-import java.util.LinkedList;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
import treecms.api.Forest;
import treecms.api.Node;
import treecms.api.NodeData;
import treecms.api.NodeID;
import treecms.api.Tree;
-import treecms.tree.util.PreorderTreewalker;
-public class OnMemoryTree implements Tree
+/**
+ * オンメモリのTree実装です。
+ * 木構造のルートとなるNodeをメンバーとして持ち、操作はすべて転送します。
+ * @author shoshi
+ */
+final class OnMemoryTree implements Tree
{
- OnMemoryNode m_root;
- OnMemoryForest m_forest;
- ConcurrentHashMap m_table;
+ private OnMemoryNode m_root;
+ private AtomicReference m_ref;
- public OnMemoryTree(OnMemoryNode _newRoot,OnMemoryForest _forest)
+ /**
+ * コンストラクタです。
+ * @param _newRoot 木構造のルートノードです。
+ */
+ public OnMemoryTree(OnMemoryNode _newRoot)
{
m_root = _newRoot;
- m_forest = _forest;
-
- m_table = new ConcurrentHashMap();
- for(Node elem : new PreorderTreewalker(m_root)){
- m_table.put(elem.getID().getUUID(),(OnMemoryNode)elem);
- }
+ m_ref = new AtomicReference(_newRoot);
}
+ /**
+ * Nodeが属するForestを取得します.
+ * @return Nodeが属するForest
+ */
@Override
public Forest getForest()
{
- return m_forest;
+ return m_ref.get().getForest();
}
+ /**
+ * Nodeに対応するNodeIDを取得します.
+ * @return Nodeに対応するNodeID
+ */
@Override
public NodeID getID()
{
- return m_root.getID();
+ return m_ref.get().getID();
}
+ /**
+ * Nodeが保持するデータを取得します.クライアントはこのメソッドを用いて取得されるNodeDataを用いてNodeの内容を変更できません。
+ * @return Nodeが保持するNodeData
+ */
@Override
public NodeData getData()
{
- return m_root.getData();
+ return m_ref.get().getData();
+ }
+
+ /**
+ * この木構造のルートNodeを取得します.
+ * @return ルートNode
+ */
+ @Override
+ public Node getRoot()
+ {
+ return m_ref.get();
+ }
+
+ /**
+ * 指定されたNodeを子供Nodeとして追加します.
+ * @param _child
+ */
+ @Override
+ public void add(Node _child)
+ {
+ m_ref.get().add(_child);
+ }
+
+ /**
+ * 指定されたリストに含まれるNodeを,すべて子供Nodeとして追加します.
+ * @param _children 追加される子供Nodeを保持するリスト
+ */
+ @Override
+ public void addAll(List _children)
+ {
+ m_ref.get().addAll(_children);
}
+ /**
+ * 子供Nodeのリストを取得します..
+ * @return 子供Nodeのリスト
+ */
@Override
- public Node getNodeByUUID(String _uuid)
+ public List children()
{
- return m_table.get(_uuid);
+ return m_ref.get().children();
+ }
+
+ /**
+ * このNodeが保持する値の中で指定されたキーと対応する値を取得します.
+ * @param _key データに対応するキー
+ * @return キーと対応する値,見つからない場合はnull
+ */
+ @Override
+ public byte[] get(byte[] _key)
+ {
+ return m_ref.get().get(_key);
+ }
+
+ /**
+ * このNodeが保持するデータをマップとしてすべて取得します.
+ * @return Nodeが保持するすべてのデータのマップ
+ */
+ @Override
+ public Map getAll()
+ {
+ return m_ref.get().getAll();
}
+ /**
+ * キーとそれに対応する値を保存します.キーが重複した場合は上書きされます.
+ * @param _key キー
+ * @param _value 値
+ */
+ @Override
+ public void put(byte[] _key,byte[] _value)
+ {
+ m_ref.get().put(_key,_value);
+ }
+
+ /**
+ * キーとそれに対応する値を複数保持するマップを引数としてとり,マップが保持する値をすべて追加します.
+ * @param _map 追加される値のマップ
+ */
+ @Override
+ public void putAll(Map _map)
+ {
+ m_ref.get().putAll(_map);
+ }
+
+ /**
+ * ルートNodeを比較して置き換えます.
+ * @param _except 比較する対象
+ * @param _newRoot 一致した場合置き換える対象
+ */
+ public boolean compareAndSwapRootNode(OnMemoryNode _except,OnMemoryNode _newRoot,boolean _force)
+ {
+ if(_force){
+ return m_ref.compareAndSet(_except,_newRoot);
+ }
+
+ m_ref.set(_newRoot);
+ return true;
+ }
}
diff -r 12604eb6b615 -r fc19e38b669b src/treecms/memory/OnMemoryTreeEditor.java
--- a/src/treecms/memory/OnMemoryTreeEditor.java Mon Mar 14 23:24:38 2011 +0900
+++ b/src/treecms/memory/OnMemoryTreeEditor.java Thu Mar 17 23:24:08 2011 +0900
@@ -4,100 +4,127 @@
import treecms.api.Node;
import treecms.api.NodeData;
+import treecms.api.NodeID;
import treecms.api.TreeEditor;
import treecms.merger.Merger;
import treecms.merger.ReplaceMerger;
+import treecms.tree.util.NodePathFinder;
+import treecms.tree.util.PathNotFoundException;
-public class OnMemoryTreeEditor extends OnMemoryTree implements TreeEditor
+/**
+ * 木構造を非破壊的に編集するオンメモリの実装です。
+ * @author shoshi
+ */
+final class OnMemoryTreeEditor implements TreeEditor
{
- OnMemoryTree m_tree;
- OnMemoryNode m_oldRoot;
+ private OnMemoryTree m_tree;
+ private OnMemoryNode m_backup;
+ private OnMemoryNode m_root;
- public OnMemoryTreeEditor(OnMemoryForest _forest,OnMemoryTree _tree)
+ /**
+ * コンストラクタです。
+ * @param _tree 監視の対象とするOnMemoryTree
+ */
+ public OnMemoryTreeEditor(OnMemoryTree _tree)
{
- super(_tree.m_root,_forest);
- m_oldRoot = m_root;
+ m_root = (OnMemoryNode)_tree.getRoot();
+ m_backup = m_root;
}
+ /**
+ * 変更を元の木構造にコミットします。この操作はコミット先が先にアップデートされていた場合は失敗します
+ * @param _force trueの場合は強制的に置き換えます。
+ * @return コミットが成功した場合true
+ */
@Override
public boolean commit(boolean _force)
{
- if(!check() || _force){
- m_tree.m_root = m_root;
- }
- return false;
+ return m_tree.compareAndSwapRootNode(m_backup,m_root,_force);
}
+ /**
+ * 監視している木構造のルートと自身のルートに上書きします。
+ * @return true
+ */
@Override
public boolean pull()
{
- m_root = m_tree.m_root;
+ m_root = (OnMemoryNode)m_tree.getRoot();
+ m_backup = m_root;
return true;
}
+ /**
+ * 監視している木構造が更新されていないか確認します。
+ * @return 更新されている場合true、されていない場合はfalse
+ */
@Override
public boolean check()
{
- if(m_tree.m_root.getID().equals(m_oldRoot.getID())){
+ if(m_tree.getRoot().getID().equals(m_root.getID())){
return false;
}
return true;
}
+ /**
+ * 監視している木構造の内容を自分の木構造にマージします
+ * 最新版の木構造とマージする場合は、先にpull()を実行してください。
+ */
@Override
public void merge()
{
- //call merger
Merger merger = new ReplaceMerger();
- m_root = (OnMemoryNode)merger.merge(m_tree.m_root,m_root);
+ m_root = (OnMemoryNode)merger.merge(m_backup,m_root);
}
+ /**
+ * 木構造を非破壊的に更新します.
+ * @param _target 更新する対象
+ * @param _newData 更新に適用されるNodeData
+ * @return 更新されたNode
+ * @throws ルートNodeから_targetまでのパスが見つからない場合、PathNotFoundException
+ */
@Override
- public synchronized Node updateTree(Node _target,NodeData _newData)
+ public synchronized Node updateTree(Node _target,NodeData _newData) throws PathNotFoundException
{
- LinkedList path = findAndClone(m_root,(OnMemoryNode)_target,_newData);
+ NodePathFinder finder = new NodePathFinder(m_root,_target);
+ OnMemoryForest forest = (OnMemoryForest)m_tree.getForest();
- if(path == null)
- {
- //not found.
- return null;
+ for(Node path : finder){
+ NodeData data = path.getData();
+ NodeID newID = path.getID().update();
+ Node clone = forest.createNode(newID,data);
}
- m_root = path.peekFirst();
- return path.peekLast();
+ return null;
}
- OnMemoryNode cloneNode(OnMemoryNode _target,NodeData _newData)
+ private LinkedList findAndClone(OnMemoryNode _parent,OnMemoryNode _target,NodeData _newData)
{
- OnMemoryNode clone = m_forest.createNode(_target.getID().update(),_newData);
- m_table.put(clone.getID().getUUID(),clone);
- return clone;
- }
-
- LinkedList findAndClone(OnMemoryNode _parent,OnMemoryNode _target,NodeData _newData)
- {
- if(_parent.getID().isFamily(_target.getID())){
- //find.
+ OnMemoryForest forest = (OnMemoryForest)_target.getForest();
+ if(_parent.getID().equals(_target.getID())){
LinkedList path = new LinkedList();
- OnMemoryNode clone = cloneNode((OnMemoryNode)_parent,_newData);
+ OnMemoryNode clone = forest.createNode(_target.getID().update(),_newData);
path.addFirst(clone);
return path;
}
- for(Node child : _parent.getData().list()){
+ for(Node child : _parent.children()){
LinkedList path = findAndClone((OnMemoryNode)child,_target,_newData);
if(path != null){
- OnMemoryNode clone = cloneNode((OnMemoryNode)_parent,null);
- clone.getData().list().remove(child);
- clone.getData().list().add(path.peekFirst());
- path.addFirst(clone);
+ path.addFirst(_parent);
return path;
}
}
- return null; //not found.
+ return null;
}
-
+
+ /**
+ * ルートNodeを取得します。
+ * @return この木構造のルートNode
+ * */
@Override
public Node getRoot()
{
diff -r 12604eb6b615 -r fc19e38b669b src/treecms/tree/cassandra/v1/CassandraForest.java
--- a/src/treecms/tree/cassandra/v1/CassandraForest.java Mon Mar 14 23:24:38 2011 +0900
+++ b/src/treecms/tree/cassandra/v1/CassandraForest.java Thu Mar 17 23:24:08 2011 +0900
@@ -10,6 +10,7 @@
import java.util.StringTokenizer;
import java.util.UUID;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -37,9 +38,9 @@
import treecms.tree.id.AbstractRandomNodeID;
/**
- * implementation of TreeCMS with Cassandra backend.
+ * Cassandra上で非破壊的木構造を実現するためのForestの実装です。
*
- * TreeCMSKS.NodeTable (table of all nodes)
+ * TreeCMSKS.NODETABLE (table of all nodes)
*
* +---------------------------------------------+
* + Key | Col1 | Col2 | Col3 | ... |
@@ -47,7 +48,7 @@
* + NodeID | Children | _attr1 | _attr2 | ... |
* +---------------------------------------------+
*
- * TreeCMSKS.TipTable (table of tip)
+ * TreeCMSKS.TIPTABLE (table of tip)
*
* +--------------------+
* + Key | Col1 |
@@ -63,197 +64,47 @@
ExecutorService m_service;
//column families.
- static final String NODETABLE = "NodeTable";
- static final String TIPTABLE = "TipTable";
+ static final String NODETABLE = "NODETABLE";
+ static final String TIPTABLE = "TIPTABLE";
- //reserved word.
- static final String NODE = "Node";
- static final String CHILDREN = "Children";
+ //reserved column.
+ static final byte[] TIPID = "TIPID".getBytes();
+ static final byte[] CHILDREN = "CHILDREN".getBytes();
static final char PREFIX = '_';
- //id table reserved
- static final String TIPID = "TipID";
+ //cache
+ private ConcurrentHashMap m_cache;
+ private ConcurrentHashMap m_tipCache;
public CassandraForest(String _host,int _port,String _ks,int _threads)
{
- m_service = Executors.newFixedThreadPool(_threads,new RequestSenderFactory(_host,_port,_ks));
+ m_service = Executors.newFixedThreadPool(_threads,new ClientThreadFactory(_host,_port));
+ m_cache = new ConcurrentHashMap();
+ m_tipCache = new ConcurrentHashMap();
}
@Override
public Node get(NodeID _id)
{
-
return new CassandraNode(this,_id);
}
@Override
public Node create()
{
- return createNode(null,null); //create new node
+ return createNode(null,null);
}
public NodeData getNodeData(NodeID _id)
{
final NodeID id = _id;
-
- //create future task.
- Callable> task = new Callable>(){
- @Override
- public List call() throws Exception
- {
- RequestSender sender = (RequestSender)Thread.currentThread();
- List res = sender.get_slice(NODETABLE,id.toString(),ConsistencyLevel.ONE);
- return res;
- }
- };
- Future> future = m_service.submit(task);
-
- NodeData data = new NodeData();
- try{
- List slice = future.get();
-
- //iterate column
- for(ColumnOrSuperColumn column : slice){
- String name = new String(column.column.name);
-
- //if column name matches CHILDREN , deserialize value to child list.
- if(name.equals(CHILDREN)){
- List tmp = deserialize(new String(column.column.value));
- data.add(tmp);
- }else{
- String key = name.substring(1); //size of prefix
- data.set(key.getBytes(),column.column.value);
- }
- }
- }catch(Exception _e){
- _e.printStackTrace();
- }
-
- return data;
- }
-
- public List multiCreateNode(List _list)
- {
- final Map>> mutationMap = new HashMap>>();
-
- Map> nodeTable = new HashMap>();
- Map> tipTable = new HashMap>();
- for(CassandraNode node : _list){
- LinkedList list = new LinkedList();
- Mutation mut = new Mutation();
- ColumnOrSuperColumn column = new ColumnOrSuperColumn();
- mut.column_or_supercolumn = column;
- column.column.name = CHILDREN.getBytes();
- column.column.value = serialize(node.getData().list()).getBytes();
- list.add(mut);
-
- for(byte[] key : node.getData().keys()){
- mut = new Mutation();
- column = new ColumnOrSuperColumn();
- mut.column_or_supercolumn = column;
- column.column.name = key;
- column.column.value = node.getData().get(key);
-
- list.add(mut);
- }
-
- nodeTable.put(node.getID().toString(),list);
-
- mut = new Mutation();
- column = new ColumnOrSuperColumn();
- column.column.name = TIPID.getBytes();
- column.column.value = node.getID().getVersion().getBytes();
- list = new LinkedList();
- list.add(mut);
- tipTable.put(node.getID().getUUID(),list);
- }
- mutationMap.put(NODETABLE,nodeTable);
- mutationMap.put(TIPTABLE,tipTable);
-
- Runnable task = new Runnable(){
- @Override
- public void run()
- {
- RequestSender sender = (RequestSender)Thread.currentThread();
- sender.batch_mutate(mutationMap,ConsistencyLevel.ONE);
- }
- };
-
- m_service.execute(task);
-
- return _list;
- }
-
- /**
- * list serializer.
- * ex. list{"hoge","fuga"} -> "hoge,fuga"
- * @param _list
- * @return selialized string
- */
- public String serialize(List _list)
- {
- String prefix = "";
- StringBuffer buf = new StringBuffer();
- for(Node child : _list){
- buf.append(prefix+child.getID().toString());
- prefix = ",";
- }
-
- return buf.toString();
- }
-
- /**
- * string deserializer.
- * ex. "hoge,fuga" -> list{"hoge","fuga"}
- * @param _selialized
- * @return list
- */
- public LinkedList deserialize(String _serialized) throws IllegalArgumentException
- {
- StringTokenizer tokens = new StringTokenizer(_serialized,",");
- LinkedList res = new LinkedList();
-
- while(tokens.hasMoreElements()){
- String tmp = tokens.nextToken();
- StringTokenizer uuidAndVer = new StringTokenizer(tmp,"@");
-
- try{
- NodeID id = createID(uuidAndVer.nextToken(),uuidAndVer.nextToken());
- res.add(get(id));
- }catch(Exception _e){
- throw new IllegalArgumentException("unable to deserialize string ["+_serialized+"]",_e);
- }
- }
-
- return res;
+ return null;
}
public NodeID getTipID(String _uuid)
{
final String uuid = _uuid;
- Callable task = new Callable(){
- @Override
- public byte[] call() throws Exception
- {
- RequestSender sender = (RequestSender)Thread.currentThread();
- byte[] value = sender.get(NODETABLE,uuid,TIPID.getBytes(),ConsistencyLevel.ONE);
- return value;
- }
- };
-
- Future future = m_service.submit(task);
-
- try {
- byte[] value = future.get();
- String id = new String(value);
- StringTokenizer token = new StringTokenizer(id,"@");
- NodeID nodeID = createID(token.nextToken(),token.nextToken());
- return nodeID;
- }catch(Exception _e){
- _e.printStackTrace();
- }
-
- return null; //not found.
+ return null;
}
public Node createNode(NodeID _id,NodeData _data)
@@ -265,65 +116,7 @@
@Override
public Boolean call() throws Exception
{
- RequestSender sender = (RequestSender)Thread.currentThread();
-
- //mutation map
- HashMap>> map = new HashMap>>();
-
- /*
- * create mutation map for NODETABLE
- */
- if(data != null){
- LinkedList list = new LinkedList();
- HashMap> info = new HashMap>();
- Iterator itr = data.list().iterator();
-
- /*
- * create CSV from child list.
- */
- StringBuffer buffer = new StringBuffer();
- for(String prefix = "";itr.hasNext();prefix = ","){
- buffer.append(String.format("%s%s",prefix,itr.next().getID().toString()));
- }
- Mutation mutChildren = new Mutation();
- ColumnOrSuperColumn children = new ColumnOrSuperColumn();
- children.column.name = CHILDREN.getBytes();
- children.column.value = buffer.toString().getBytes();
- mutChildren.column_or_supercolumn = children;
- list.add(mutChildren);
-
- /*
- *
- */
- for(byte[] key : data.keys()){
- Mutation mut = new Mutation();
- ColumnOrSuperColumn column = new ColumnOrSuperColumn();
- column.column.name = key;
- column.column.value = data.get(key);
- mut.column_or_supercolumn = column;
- list.add(mut);
- }
- info.put(id.toString(),list);
-
- map.put(NODETABLE,info);
- }
-
- /*
- * create mutation map for NODEIDTABLE
- */
- HashMap> idtable_mutations = new HashMap>();
- LinkedList list = new LinkedList();
-
- Mutation mutTipID = new Mutation();
- ColumnOrSuperColumn tipID = new ColumnOrSuperColumn();
- tipID.column.name = TIPID.getBytes();
- tipID.column.value = id.getVersion().getBytes();
- mutTipID.column_or_supercolumn = tipID;
-
- list.add(mutTipID);
- idtable_mutations.put(TIPTABLE,list);
-
- return sender.batch_mutate(map,ConsistencyLevel.ONE);
+ return true;
}
};
@@ -372,141 +165,16 @@
return m_version;
}
}
-
- private static class RequestSender extends Thread
+
+ @Override
+ public Node create(NodeData data)
{
- private int m_port;
- private String m_host,m_ks;
- private Cassandra.Client m_client;
-
- public RequestSender(Runnable _runnable,String _host,int _port,String _ks) throws TTransportException
- {
- super(_runnable);
- m_port = _port;
- m_host = _host;
- m_ks = _ks;
-
- connect();
- }
-
- public void connect() throws TTransportException
- {
- TTransport tr = new TSocket(m_host,m_port);
- TProtocol proto = new TBinaryProtocol(tr);
- m_client = new Cassandra.Client(proto);
-
- tr.open();
- }
-
- public static RequestSender newInstance(Runnable _runnable,String _host,int _port,String _ks)
- {
- RequestSender sender = null;
- try {
- sender = new RequestSender(_runnable,_host,_port,_ks);
- } catch (TTransportException _e) {
- _e.printStackTrace();
- }
-
- return sender;
- }
-
- public byte[] get(String _cf,String _key,byte[] _name,ConsistencyLevel _lv)
- {
- byte[] ret = null;
-
- ColumnPath path = new ColumnPath();
- path.column_family = _cf;
- path.column = _name;
- try {
- ColumnOrSuperColumn cors = m_client.get(m_ks,_key,path,_lv);
- ret = cors.column.value;
- }catch(NotFoundException _e){
- System.out.println(String.format("column not found [%s][%s][%s]",_cf,_key,new String(_name)));
- }catch(Exception _e){
- _e.printStackTrace();
- }
-
- return ret;
- }
-
- public boolean insert(String _cf,String _key,byte[] _name,byte[] _value,ConsistencyLevel _lv)
- {
- ColumnPath path = new ColumnPath();
- path.column_family = _cf;
- path.column = _name;
-
- try{
- m_client.insert(m_ks,_key,path,_value,System.currentTimeMillis()/1000,_lv);
- return true;
- }catch(Exception _e){
- _e.printStackTrace();
- }
-
- return false;
- }
-
- public List get_slice(String _cf,String _key,ConsistencyLevel _lv)
- {
- List ret = null;
- SliceRange sr = new SliceRange(new byte[0],new byte[0],false,-1);
- SlicePredicate sp = new SlicePredicate();
- sp.slice_range = sr;
-
- try {
- ret = m_client.get_slice(m_ks,_key,new ColumnParent(_cf),sp,_lv);
- }catch(Exception _e){
- _e.printStackTrace();
- }
-
- return ret;
- }
-
- public boolean batch_insert(String _cf,Map> _map,ConsistencyLevel _lv)
- {
- try{
- m_client.batch_insert(m_ks,_cf,_map,_lv);
- return true;
- }catch(Exception _e){
- _e.printStackTrace();
- }
-
- return false;
- }
-
- public boolean batch_mutate(Map>> _mutateMap,ConsistencyLevel _lv)
- {
- try {
- m_client.batch_mutate(m_ks,_mutateMap,_lv);
- return true;
- }catch(Exception _e){
- _e.printStackTrace();
- }
-
- return false;
- }
-
- public String toString()
- {
- return "[thread="+this.getName()+",host="+m_host+",port="+m_port+",ks="+m_ks+"]";
- }
+ return null;
}
-
- private class RequestSenderFactory implements ThreadFactory
+
+ @Override
+ public Node getTip(String uuid)
{
- private int m_port;
- private String m_host,m_ks;
-
- public RequestSenderFactory(String _host,int _port,String _ks)
- {
- m_host = _host;
- m_port = _port;
- m_ks = _ks;
- }
-
- @Override
- public Thread newThread(Runnable _runnable)
- {
- return RequestSender.newInstance(_runnable,m_host,m_port,m_ks);
- }
+ return null;
}
}
diff -r 12604eb6b615 -r fc19e38b669b src/treecms/tree/cassandra/v1/ClientThread.java
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/treecms/tree/cassandra/v1/ClientThread.java Thu Mar 17 23:24:08 2011 +0900
@@ -0,0 +1,48 @@
+package treecms.tree.cassandra.v1;
+
+import org.apache.thrift.transport.TTransportException;
+
+/**
+ * CassandraのClientを保持したスレッドオブジェクトです。
+ * @author shoshi
+ */
+final class ClientThread extends Thread
+{
+ private ClientWrapper m_wrapper;
+
+ /**
+ * コンストラクタです。
+ * @param _host Cassandraのホスト名
+ * @param _port Cassandraのポート番号
+ * @param _runnable このスレッドで動作するRunnable
+ * @throws TTransportException
+ */
+ private ClientThread(String _host,int _port,Runnable _runnable) throws TTransportException
+ {
+ super(_runnable);
+ m_wrapper = new ClientWrapper(_host,_port,2);
+ }
+
+ /**
+ * ファクトリメソッドです。
+ * @param _host Cassandraのホスト名
+ * @param _port Cassandraのポート番号
+ * @param _runnable このスレッドで動作するRunnable
+ * @return 新しいインスタンス
+ * @throws TTransportException Cassandraへの接続が失敗したとき
+ */
+ public static ClientThread newInstance(String _host,int _port,Runnable _runnable) throws TTransportException
+ {
+ ClientThread thread = new ClientThread(_host,_port,_runnable);
+ return thread;
+ }
+
+ /**
+ * ClientWrapperを取得します
+ * @return CassandraへのClientWrapper
+ */
+ public ClientWrapper getClientWrapper()
+ {
+ return m_wrapper;
+ }
+}
diff -r 12604eb6b615 -r fc19e38b669b src/treecms/tree/cassandra/v1/ClientThreadFactory.java
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/treecms/tree/cassandra/v1/ClientThreadFactory.java Thu Mar 17 23:24:08 2011 +0900
@@ -0,0 +1,42 @@
+package treecms.tree.cassandra.v1;
+
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.thrift.transport.TTransportException;
+
+/**
+ * ローカル変数としてCassandra.Clientを保持するスレッドオブジェクトを生成するスレッドファクトリーです。
+ * @author shoshi
+ */
+final class ClientThreadFactory implements ThreadFactory
+{
+ private String m_host;
+ private int m_port;
+
+ /**
+ * コンストラクタです。
+ * @param _host Cassandraのアドレス・ホスト名
+ * @param _port Thriftポート番号
+ */
+ public ClientThreadFactory(String _host,int _port)
+ {
+ m_host = _host;
+ m_port = _port;
+ }
+
+ /**
+ * Cassandra.Clientを保持するスレッドオブジェクトを新しく作成します。
+ */
+ @Override
+ public Thread newThread(Runnable _runnable)
+ {
+ ClientThread client = null;
+ try{
+ client = ClientThread.newInstance(m_host,m_port,_runnable);
+ }catch(TTransportException _e) {
+ _e.printStackTrace();
+ throw new RuntimeException(_e);
+ }
+ return client;
+ }
+}
diff -r 12604eb6b615 -r fc19e38b669b src/treecms/tree/cassandra/v1/ClientWrapper.java
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/treecms/tree/cassandra/v1/ClientWrapper.java Thu Mar 17 23:24:08 2011 +0900
@@ -0,0 +1,203 @@
+package treecms.tree.cassandra.v1;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.ColumnPath;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.KeyRange;
+import org.apache.cassandra.thrift.KeySlice;
+import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.thrift.NotFoundException;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.TimedOutException;
+import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+ * Cassandra.Clientのラッパークラスです。Cassandra.Clientを使いやすくするための機能を提供します。
+ * 接続済みのCassandra.Clientの接続が切断されたときに再接続を行います。このクラスはスレッドセーフでは有りません。
+ *
+ * このラッパークラスはCassandra 0.6.xのAPIをベースに作成します。
+ * @author shoshi
+ */
+final class ClientWrapper
+{
+ private String m_host;
+ private int m_port;
+ private int m_retryCount;
+
+ private TTransport m_tr;
+ private Cassandra.Client m_client;
+
+ /**
+ * コンストラクタです。初期化して接続します。
+ * @param _host Cassandraのホスト名
+ * @param _port Cassandraのポート番号
+ * @param _retryCount リクエストが失敗した場合リトライする回数
+ * @throws TTransportException
+ */
+ public ClientWrapper(String _host,int _port,int _retryCount) throws TTransportException
+ {
+ m_host = _host;
+ m_port = _port;
+ m_retryCount = _retryCount;
+
+ connect();
+ }
+
+ /**
+ * Cassandraに接続します。
+ * @throws TTransportException
+ */
+ private void connect() throws TTransportException
+ {
+ Cassandra.Client client;
+ TTransport tr = new TSocket(m_host,m_port);
+ client = new Cassandra.Client(new TBinaryProtocol(tr));
+
+ tr.open();
+
+ m_tr = tr;
+ m_client = client;
+ }
+
+ /**
+ * Cassandraへの接続を切ります。
+ */
+ private void disconnect()
+ {
+ m_tr.close();
+ m_client = null;
+ }
+
+ /**
+ * Cassandraへ再接続を行います。
+ * @return 再接続が成功した場合true
+ */
+ private boolean reconnect()
+ {
+ disconnect();
+ try {
+ connect();
+ }catch(TTransportException _e){
+ _e.printStackTrace();
+ }
+ return true;
+ }
+
+ /**
+ * ここで共通する例外処理(再接続など)を行う
+ * @param _e 例外
+ */
+ private void exceptionHandler(Exception _e)
+ {
+ _e.printStackTrace();
+ }
+
+ /**
+ * Cassandra.Client.getのラッパーメソッド
+ */
+ public ColumnOrSuperColumn get(String _ks,String _key,ColumnPath _path,ConsistencyLevel _level) throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException, TException
+ {
+ for(int i = 0;i < m_retryCount;i ++){
+ if(m_client == null){
+ throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port);
+ }
+
+ try {
+ ColumnOrSuperColumn cors = m_client.get(_ks,_key,_path,_level);
+ return cors;
+ }catch(Exception _e){
+ exceptionHandler(_e);
+ }
+ }
+ return m_client.get(_ks,_key,_path,_level);
+ }
+
+ /**
+ * Cassandra.Client.get_sliceのラッパーメソッド
+ */
+ public List get_slice(String _ks,String _key,ColumnParent _parent,SlicePredicate _predicate,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
+ {
+ for(int i = 0;i < m_retryCount;i ++){
+ if(m_client == null){
+ throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port);
+ }
+
+ try {
+ List list = m_client.get_slice(_ks,_key,_parent,_predicate,_level);
+ return list;
+ }catch(Exception _e){
+ exceptionHandler(_e);
+ }
+ }
+ return m_client.get_slice(_ks,_key,_parent,_predicate,_level);
+ }
+
+ /**
+ * Cassandra.Client.get_range_slicesのラッパーメソッド
+ */
+ public List get_range_slices(String _ks,ColumnParent _parent,SlicePredicate _predicate,KeyRange _range,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
+ {
+ for(int i = 0;i < m_retryCount;i ++){
+ if(m_client == null){
+ throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port);
+ }
+
+ try {
+ return m_client.get_range_slices(_ks,_parent,_predicate,_range,_level);
+ }catch(Exception _e){
+ exceptionHandler(_e);
+ }
+ }
+ return m_client.get_range_slices(_ks,_parent,_predicate,_range,_level);
+ }
+
+ /**
+ * Cassandra.Client.insertのラッパーメソッド
+ */
+ public void insert(String _ks,String _key,ColumnPath _path,byte[] _value,long _timestamp,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
+ {
+ for(int i = 1;i < m_retryCount;i ++){
+ if(m_client == null){
+ throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port);
+ }
+
+ try{
+ m_client.insert(_ks,_key,_path,_value,_timestamp,_level);
+ }catch(Exception _e){
+ exceptionHandler(_e);
+ }
+ }
+ m_client.insert(_ks,_key,_path,_value,_timestamp,_level);
+ return;
+ }
+
+ /**
+ * Cassandra.Client.batch_mutateのラッパーメソッド
+ */
+ public void batch_mutate(String _ks,Map>> _mutation_map,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
+ {
+ for(int i = 1;i < m_retryCount;i ++){
+ if(m_client == null){
+ throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port);
+ }
+
+ try{
+ m_client.batch_mutate(_ks,_mutation_map,_level);
+ }catch(Exception _e){
+ exceptionHandler(_e);
+ }
+ }
+ m_client.batch_mutate(_ks,_mutation_map,_level);
+ return;
+ }
+}
diff -r 12604eb6b615 -r fc19e38b669b src/treecms/tree/util/NodePathFinder.java
--- a/src/treecms/tree/util/NodePathFinder.java Mon Mar 14 23:24:38 2011 +0900
+++ b/src/treecms/tree/util/NodePathFinder.java Thu Mar 17 23:24:08 2011 +0900
@@ -21,11 +21,12 @@
* @param _root 木構造のトップです.
* @param _target パス検索の対象となるNodeです.
*/
- public NodePathFinder(Node _root,Node _target)
+ public NodePathFinder(Node _root,Node _target) throws PathNotFoundException
{
m_root = _root;
m_target = _target;
- m_path = Collections.unmodifiableList(findPath(m_root,m_target));
+ List path = findPath(m_root,m_target);
+ m_path = Collections.unmodifiableList(path);
}
/**
@@ -57,14 +58,14 @@
{
if(_from.getID().isFamily(_target.getID())){
LinkedList path = new LinkedList();
- path.add(_from);
+ path.addFirst(_from);
return path;
}
for(Node child : _from.children()){
LinkedList path = findPath(child,_target);
if(path != null){
- path.add(_from);
+ path.addFirst(_from);
}
}
diff -r 12604eb6b615 -r fc19e38b669b src/treecms/tree/util/PathNotFoundException.java
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/treecms/tree/util/PathNotFoundException.java Thu Mar 17 23:24:08 2011 +0900
@@ -0,0 +1,22 @@
+package treecms.tree.util;
+
+import treecms.api.Node;
+
+/**
+ * あるNodeからNodeまでのパスが検索したが見つからない場合に発生します.
+ * @author shoshi
+ */
+public class PathNotFoundException extends Exception
+{
+ private static final long serialVersionUID = 6372927818478170944L;
+
+ /**
+ * コンストラクタです.
+ * @param _from 木構造のルートNode
+ * @param _to 検索する対象のNode
+ */
+ public PathNotFoundException(Node _from,Node _to)
+ {
+ super("Path Not Found from "+_from.getID()+" to "+_to.getID());
+ }
+}