Mercurial > hg > Members > shoshi > TreeCMSv2
changeset 7:fc19e38b669b
added concurrent access client for cassandr
author | shoshi |
---|---|
date | Thu, 17 Mar 2011 23:24:08 +0900 |
parents | 12604eb6b615 |
children | f96193babac0 |
files | CHANGELOG src/treecms/api/Node.java src/treecms/api/NodeData.java src/treecms/api/TreeEditor.java src/treecms/memory/OnMemoryForest.java src/treecms/memory/OnMemoryNode.java src/treecms/memory/OnMemoryTree.java src/treecms/memory/OnMemoryTreeEditor.java src/treecms/tree/cassandra/v1/CassandraForest.java src/treecms/tree/cassandra/v1/ClientThread.java src/treecms/tree/cassandra/v1/ClientThreadFactory.java src/treecms/tree/cassandra/v1/ClientWrapper.java src/treecms/tree/util/NodePathFinder.java src/treecms/tree/util/PathNotFoundException.java |
diffstat | 14 files changed, 752 insertions(+), 444 deletions(-) [+] |
line wrap: on
line diff
--- a/CHANGELOG Mon Mar 14 23:24:38 2011 +0900 +++ b/CHANGELOG Thu Mar 17 23:24:08 2011 +0900 @@ -1,5 +1,7 @@ ChangeLog. +2011-03-17 + added concurrent access client for cassandra 2011-03-14 added javadoc treecms.api 2011-02-28
--- a/src/treecms/api/Node.java Mon Mar 14 23:24:38 2011 +0900 +++ b/src/treecms/api/Node.java Thu Mar 17 23:24:08 2011 +0900 @@ -12,12 +12,12 @@ { /** * Nodeに対応するNodeIDを取得します. - * @return + * @return Nodeに対応するNodeID */ public NodeID getID(); /** - * Nodeが保持するデータを取得します + * Nodeが保持するデータを取得します.クライアントはこのメソッドを用いて取得されるNodeDataを用いてNodeの内容を<b>変更できません</b>。 * @return Nodeが保持するNodeData */ public NodeData getData();
--- a/src/treecms/api/NodeData.java Mon Mar 14 23:24:38 2011 +0900 +++ b/src/treecms/api/NodeData.java Thu Mar 17 23:24:08 2011 +0900 @@ -142,6 +142,15 @@ } /** + * NodeDataが保持しているすべてのキーと値の組のマップを返します. + * @return すべてのキーと値のマップ + */ + public Map<byte[],byte[]> getAll() + { + return Collections.unmodifiableMap(m_attrs); + } + + /** * 子供Nodeのリストをクリアします.(すべて削除します.) */ public void clear()
--- a/src/treecms/api/TreeEditor.java Mon Mar 14 23:24:38 2011 +0900 +++ b/src/treecms/api/TreeEditor.java Thu Mar 17 23:24:08 2011 +0900 @@ -1,12 +1,14 @@ package treecms.api; +import treecms.tree.util.PathNotFoundException; + /** * 木構造を非破壊的に更新する機能を提供します.TreeEditorはTreeを非破壊的に更新していき,commitすることでTreeに更新を適用します. * TreeEditor.getRootはcommitされていない状態のRootNodeを取得します. * この機能は分散リポジトリを参考に考案されました. * @author shoshi */ -public interface TreeEditor extends Tree +public interface TreeEditor { /** * 非破壊的に更新した木構造を適用します. @@ -34,10 +36,17 @@ public void merge(); /** - * 木構造を非破壊的に更新します. + * この木構造のルートNodeを返します。 + * @return この木構造のルートNode + */ + public Node getRoot(); + + /** + * 木構造を非破壊的に更新します.変更の対象となるNodeが木構造内に見つからない場合,PathNotFoundExceptionがスローされます. * @param _target 更新する対象のNode * @param _newData 新しいNodeに割り当てられるNodeData * @return 更新された新しいNode + * @throws PathNotFoundException パスが見つからない場合 */ - public Node updateTree(Node _target,NodeData _newData); + public Node updateTree(Node _target,NodeData _newData) throws PathNotFoundException; }
--- a/src/treecms/memory/OnMemoryForest.java Mon Mar 14 23:24:38 2011 +0900 +++ b/src/treecms/memory/OnMemoryForest.java Thu Mar 17 23:24:08 2011 +0900 @@ -10,79 +10,157 @@ import treecms.api.NodeID; import treecms.tree.id.AbstractRandomNodeID; +/** + * Forestのオンメモリ上の実装. + * このクラスはスレッドセーフだと思います.たぶん + * @author shoshi + */ public class OnMemoryForest implements Forest { + //Nodeのマップ Map<NodeID,OnMemoryNode> m_table; + //最新版Nodeのマップ Map<String,OnMemoryNode> m_tipTable; + /** + * コンストラクタ + */ public OnMemoryForest() { m_table = new ConcurrentHashMap<NodeID,OnMemoryNode>(); m_tipTable = new ConcurrentHashMap<String,OnMemoryNode>(); } - public OnMemoryNode createNode(NodeID _id,NodeData _newData) + /** + * 新しくNodeを作成します + * @param _id Nodeに適用されるNodeID + * @param _newData Nodeが保持するNodeData + * @return 作成されたOnMemoryNode + */ + OnMemoryNode createNode(NodeID _id,NodeData _newData) { NodeID newID = (_id != null) ? _id : createID(); NodeData newData = (_newData != null) ? _newData : new NodeData(); + + //newIDとnewDataを元にOnMemoryNodeを生成する. OnMemoryNode newNode = new OnMemoryNode(this,newID,newData); + //登録 m_table.put(newID,newNode); m_tipTable.put(newID.getUUID(),newNode); return newNode; } - NodeID createID() + /** + * 新しいNodeIDを作成します + * @return 新しいNodeID + */ + private NodeID createID() { return new RandomNodeID(null); } + /** + * NodeIDに対応するNodeを取得します + * @return NodeIDに対応するNode + */ @Override public Node get(NodeID _id) { return m_table.get(_id); } + /** + * 新しくNodeを作成します. + * @return 新しいNode + */ @Override public Node create() { return createNode(null,null); } - - class RandomNodeID extends AbstractRandomNodeID + + /** + * NodeDataを保持した新しいNodeを作成します + * @return NodeDataを保持した新しいNode + */ + @Override + public Node create(NodeData _data) + { + return createNode(null,_data); + } + + /** + * 最新のNodeを取得します. + * @return UUIDに対応する最新のNode + */ + @Override + public Node getTip(String _uuid) { - String m_uuid; - long m_version; + return m_tipTable.get(_uuid); + } + + /** + * ランダムにバージョン番号を生成するNodeIDです.ファクトリを内包します. + * @author shoshi + */ + private static class RandomNodeID extends AbstractRandomNodeID + { + /** + * UUID + */ + private String m_uuid; + /** + * バージョン番号(ランダム値) + */ + private long m_version; + + /** + * コンストラクタ + * @param _uuid 継承するUUID + */ public RandomNodeID(String _uuid) { - if(_uuid != null){ - m_uuid = _uuid; - }else{ - m_uuid = UUID.randomUUID().toString(); - } + m_uuid = (_uuid != null) ? _uuid : UUID.randomUUID().toString(); m_version = (new Random()).nextLong(); } + /** + * 新しいRandomNodeIDを作成します。 + * @return 新しいRandomNodeID + */ @Override public NodeID create() { return new RandomNodeID(null); } + /** + * UUIDを継承したRandomNodeIDを作成します. + * @return 新しいRandomNodeID + */ @Override public NodeID update() { return new RandomNodeID(m_uuid); } + /** + * UUIDを取得します. + * @return UUID + */ @Override public String getUUID() { return m_uuid; } + /** + * バージョンを取得します. + * @return バージョン + */ @Override public String getVersion() {
--- a/src/treecms/memory/OnMemoryNode.java Mon Mar 14 23:24:38 2011 +0900 +++ b/src/treecms/memory/OnMemoryNode.java Thu Mar 17 23:24:08 2011 +0900 @@ -1,19 +1,29 @@ package treecms.memory; -import java.util.LinkedList; import java.util.List; +import java.util.Map; import treecms.api.Forest; import treecms.api.Node; import treecms.api.NodeData; import treecms.api.NodeID; -public class OnMemoryNode implements Node +/** + * オンメモリ上でのNodeの実装です。 + * @author shoshi + */ +class OnMemoryNode implements Node { - OnMemoryForest m_forest; + private OnMemoryForest m_forest; - NodeID m_id; - NodeData m_data; + private NodeID m_id; + private NodeData m_data; + /** + * コンストラクタです. + * @param _forest このNodeが属するForestです. + * @param _id このNodeのNodeIDです. + * @param _newData このNodeに割り当てるNodeDataです.防御的にコピーします. + */ public OnMemoryNode(OnMemoryForest _forest,NodeID _id,NodeData _newData) { m_id = _id; @@ -21,21 +31,105 @@ m_data = (_newData != null) ? _newData.deepCopy() : new NodeData(); } + /** + * Nodeが属するForestを取得します. + * @return Nodeが属するForest + */ @Override public Forest getForest() { return m_forest; } + /** + * Nodeに対応するNodeIDを取得します. + * @return Nodeに対応するNodeID + */ @Override public NodeID getID() { return m_id; } + /** + * Nodeが保持するデータを取得します.クライアントはこのメソッドを用いて取得されるNodeDataを用いてNodeの内容を<b>変更できません</b>。 + * @return Nodeが保持するNodeData + */ @Override public NodeData getData() { - return m_data; + return m_data.deepCopy(); + } + + /** + * 指定されたNodeを子供Nodeとして追加します. + * @param _child + */ + @Override + public void add(Node _child) + { + m_data.add(_child); + } + + /** + * 指定されたリストに含まれるNodeを,すべて子供Nodeとして追加します. + * @param _children 追加される子供Nodeを保持するリスト + */ + @Override + public void addAll(List<Node> _children) + { + m_data.addAll(_children); + } + + /** + * 子供Nodeのリストを取得します.. + * @return 子供Nodeのリスト + */ + @Override + public List<Node> children() + { + return m_data.children(); + } + + /** + * このNodeが保持する値の中で指定されたキーと対応する値を取得します. + * @param _key データに対応するキー + * @return キーと対応する値,見つからない場合はnull + */ + @Override + public byte[] get(byte[] _key) + { + return m_data.get(_key); + } + + /** + * このNodeが保持するデータをマップとしてすべて取得します. + * @return Nodeが保持するすべてのデータのマップ + */ + @Override + public Map<byte[],byte[]> getAll() + { + return m_data.getAll(); + } + + /** + * キーとそれに対応する値を保存します.キーが重複した場合は上書きされます. + * @param _key キー + * @param _value 値 + */ + @Override + public void put(byte[] _key, byte[] _value) + { + m_data.put(_key,_value); + } + + /** + * キーとそれに対応する値を複数保持するマップを引数としてとり,マップが保持する値をすべて追加します. + * @param _map 追加される値のマップ + */ + @Override + public void putAll(Map<byte[], byte[]> _map) + { + m_data.putAll(_map); } }
--- a/src/treecms/memory/OnMemoryTree.java Mon Mar 14 23:24:38 2011 +0900 +++ b/src/treecms/memory/OnMemoryTree.java Thu Mar 17 23:24:08 2011 +0900 @@ -1,54 +1,159 @@ package treecms.memory; -import java.util.LinkedList; -import java.util.concurrent.ConcurrentHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import treecms.api.Forest; import treecms.api.Node; import treecms.api.NodeData; import treecms.api.NodeID; import treecms.api.Tree; -import treecms.tree.util.PreorderTreewalker; -public class OnMemoryTree implements Tree +/** + * オンメモリのTree実装です。 + * 木構造のルートとなるNodeをメンバーとして持ち、操作はすべて転送します。 + * @author shoshi + */ +final class OnMemoryTree implements Tree { - OnMemoryNode m_root; - OnMemoryForest m_forest; - ConcurrentHashMap<String,OnMemoryNode> m_table; + private OnMemoryNode m_root; + private AtomicReference<OnMemoryNode> m_ref; - public OnMemoryTree(OnMemoryNode _newRoot,OnMemoryForest _forest) + /** + * コンストラクタです。 + * @param _newRoot 木構造のルートノードです。 + */ + public OnMemoryTree(OnMemoryNode _newRoot) { m_root = _newRoot; - m_forest = _forest; - - m_table = new ConcurrentHashMap<String,OnMemoryNode>(); - for(Node elem : new PreorderTreewalker(m_root)){ - m_table.put(elem.getID().getUUID(),(OnMemoryNode)elem); - } + m_ref = new AtomicReference<OnMemoryNode>(_newRoot); } + /** + * Nodeが属するForestを取得します. + * @return Nodeが属するForest + */ @Override public Forest getForest() { - return m_forest; + return m_ref.get().getForest(); } + /** + * Nodeに対応するNodeIDを取得します. + * @return Nodeに対応するNodeID + */ @Override public NodeID getID() { - return m_root.getID(); + return m_ref.get().getID(); } + /** + * Nodeが保持するデータを取得します.クライアントはこのメソッドを用いて取得されるNodeDataを用いてNodeの内容を<b>変更できません</b>。 + * @return Nodeが保持するNodeData + */ @Override public NodeData getData() { - return m_root.getData(); + return m_ref.get().getData(); + } + + /** + * この木構造のルートNodeを取得します. + * @return ルートNode + */ + @Override + public Node getRoot() + { + return m_ref.get(); + } + + /** + * 指定されたNodeを子供Nodeとして追加します. + * @param _child + */ + @Override + public void add(Node _child) + { + m_ref.get().add(_child); + } + + /** + * 指定されたリストに含まれるNodeを,すべて子供Nodeとして追加します. + * @param _children 追加される子供Nodeを保持するリスト + */ + @Override + public void addAll(List<Node> _children) + { + m_ref.get().addAll(_children); } + /** + * 子供Nodeのリストを取得します.. + * @return 子供Nodeのリスト + */ @Override - public Node getNodeByUUID(String _uuid) + public List<Node> children() { - return m_table.get(_uuid); + return m_ref.get().children(); + } + + /** + * このNodeが保持する値の中で指定されたキーと対応する値を取得します. + * @param _key データに対応するキー + * @return キーと対応する値,見つからない場合はnull + */ + @Override + public byte[] get(byte[] _key) + { + return m_ref.get().get(_key); + } + + /** + * このNodeが保持するデータをマップとしてすべて取得します. + * @return Nodeが保持するすべてのデータのマップ + */ + @Override + public Map<byte[],byte[]> getAll() + { + return m_ref.get().getAll(); } + /** + * キーとそれに対応する値を保存します.キーが重複した場合は上書きされます. + * @param _key キー + * @param _value 値 + */ + @Override + public void put(byte[] _key,byte[] _value) + { + m_ref.get().put(_key,_value); + } + + /** + * キーとそれに対応する値を複数保持するマップを引数としてとり,マップが保持する値をすべて追加します. + * @param _map 追加される値のマップ + */ + @Override + public void putAll(Map<byte[], byte[]> _map) + { + m_ref.get().putAll(_map); + } + + /** + * ルートNodeを比較して置き換えます. + * @param _except 比較する対象 + * @param _newRoot 一致した場合置き換える対象 + */ + public boolean compareAndSwapRootNode(OnMemoryNode _except,OnMemoryNode _newRoot,boolean _force) + { + if(_force){ + return m_ref.compareAndSet(_except,_newRoot); + } + + m_ref.set(_newRoot); + return true; + } }
--- a/src/treecms/memory/OnMemoryTreeEditor.java Mon Mar 14 23:24:38 2011 +0900 +++ b/src/treecms/memory/OnMemoryTreeEditor.java Thu Mar 17 23:24:08 2011 +0900 @@ -4,100 +4,127 @@ import treecms.api.Node; import treecms.api.NodeData; +import treecms.api.NodeID; import treecms.api.TreeEditor; import treecms.merger.Merger; import treecms.merger.ReplaceMerger; +import treecms.tree.util.NodePathFinder; +import treecms.tree.util.PathNotFoundException; -public class OnMemoryTreeEditor extends OnMemoryTree implements TreeEditor +/** + * 木構造を非破壊的に編集するオンメモリの実装です。 + * @author shoshi + */ +final class OnMemoryTreeEditor implements TreeEditor { - OnMemoryTree m_tree; - OnMemoryNode m_oldRoot; + private OnMemoryTree m_tree; + private OnMemoryNode m_backup; + private OnMemoryNode m_root; - public OnMemoryTreeEditor(OnMemoryForest _forest,OnMemoryTree _tree) + /** + * コンストラクタです。 + * @param _tree 監視の対象とするOnMemoryTree + */ + public OnMemoryTreeEditor(OnMemoryTree _tree) { - super(_tree.m_root,_forest); - m_oldRoot = m_root; + m_root = (OnMemoryNode)_tree.getRoot(); + m_backup = m_root; } + /** + * 変更を元の木構造にコミットします。この操作はコミット先が先にアップデートされていた場合は失敗します + * @param _force trueの場合は強制的に置き換えます。 + * @return コミットが成功した場合true + */ @Override public boolean commit(boolean _force) { - if(!check() || _force){ - m_tree.m_root = m_root; - } - return false; + return m_tree.compareAndSwapRootNode(m_backup,m_root,_force); } + /** + * 監視している木構造のルートと自身のルートに上書きします。 + * @return true + */ @Override public boolean pull() { - m_root = m_tree.m_root; + m_root = (OnMemoryNode)m_tree.getRoot(); + m_backup = m_root; return true; } + /** + * 監視している木構造が更新されていないか確認します。 + * @return 更新されている場合true、されていない場合はfalse + */ @Override public boolean check() { - if(m_tree.m_root.getID().equals(m_oldRoot.getID())){ + if(m_tree.getRoot().getID().equals(m_root.getID())){ return false; } return true; } + /** + * 監視している木構造の内容を自分の木構造にマージします + * 最新版の木構造とマージする場合は、先にpull()を実行してください。 + */ @Override public void merge() { - //call merger Merger merger = new ReplaceMerger(); - m_root = (OnMemoryNode)merger.merge(m_tree.m_root,m_root); + m_root = (OnMemoryNode)merger.merge(m_backup,m_root); } + /** + * 木構造を非破壊的に更新します. + * @param _target 更新する対象 + * @param _newData 更新に適用されるNodeData + * @return 更新されたNode + * @throws ルートNodeから_targetまでのパスが見つからない場合、PathNotFoundException + */ @Override - public synchronized Node updateTree(Node _target,NodeData _newData) + public synchronized Node updateTree(Node _target,NodeData _newData) throws PathNotFoundException { - LinkedList<OnMemoryNode> path = findAndClone(m_root,(OnMemoryNode)_target,_newData); + NodePathFinder finder = new NodePathFinder(m_root,_target); + OnMemoryForest forest = (OnMemoryForest)m_tree.getForest(); - if(path == null) - { - //not found. - return null; + for(Node path : finder){ + NodeData data = path.getData(); + NodeID newID = path.getID().update(); + Node clone = forest.createNode(newID,data); } - m_root = path.peekFirst(); - return path.peekLast(); + return null; } - OnMemoryNode cloneNode(OnMemoryNode _target,NodeData _newData) + private LinkedList<OnMemoryNode> findAndClone(OnMemoryNode _parent,OnMemoryNode _target,NodeData _newData) { - OnMemoryNode clone = m_forest.createNode(_target.getID().update(),_newData); - m_table.put(clone.getID().getUUID(),clone); - return clone; - } - - LinkedList<OnMemoryNode> findAndClone(OnMemoryNode _parent,OnMemoryNode _target,NodeData _newData) - { - if(_parent.getID().isFamily(_target.getID())){ - //find. + OnMemoryForest forest = (OnMemoryForest)_target.getForest(); + if(_parent.getID().equals(_target.getID())){ LinkedList<OnMemoryNode> path = new LinkedList<OnMemoryNode>(); - OnMemoryNode clone = cloneNode((OnMemoryNode)_parent,_newData); + OnMemoryNode clone = forest.createNode(_target.getID().update(),_newData); path.addFirst(clone); return path; } - for(Node child : _parent.getData().list()){ + for(Node child : _parent.children()){ LinkedList<OnMemoryNode> path = findAndClone((OnMemoryNode)child,_target,_newData); if(path != null){ - OnMemoryNode clone = cloneNode((OnMemoryNode)_parent,null); - clone.getData().list().remove(child); - clone.getData().list().add(path.peekFirst()); - path.addFirst(clone); + path.addFirst(_parent); return path; } } - return null; //not found. + return null; } - + + /** + * ルートNodeを取得します。 + * @return この木構造のルートNode + * */ @Override public Node getRoot() {
--- a/src/treecms/tree/cassandra/v1/CassandraForest.java Mon Mar 14 23:24:38 2011 +0900 +++ b/src/treecms/tree/cassandra/v1/CassandraForest.java Thu Mar 17 23:24:08 2011 +0900 @@ -10,6 +10,7 @@ import java.util.StringTokenizer; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -37,9 +38,9 @@ import treecms.tree.id.AbstractRandomNodeID; /** - * implementation of TreeCMS with Cassandra backend. + * Cassandra上で非破壊的木構造を実現するためのForestの実装です。 * - * TreeCMSKS.NodeTable (table of all nodes) + * TreeCMSKS.NODETABLE (table of all nodes) * * +---------------------------------------------+ * + Key | Col1 | Col2 | Col3 | ... | @@ -47,7 +48,7 @@ * + NodeID | Children | _attr1 | _attr2 | ... | * +---------------------------------------------+ * - * TreeCMSKS.TipTable (table of tip) + * TreeCMSKS.TIPTABLE (table of tip) * * +--------------------+ * + Key | Col1 | @@ -63,197 +64,47 @@ ExecutorService m_service; //column families. - static final String NODETABLE = "NodeTable"; - static final String TIPTABLE = "TipTable"; + static final String NODETABLE = "NODETABLE"; + static final String TIPTABLE = "TIPTABLE"; - //reserved word. - static final String NODE = "Node"; - static final String CHILDREN = "Children"; + //reserved column. + static final byte[] TIPID = "TIPID".getBytes(); + static final byte[] CHILDREN = "CHILDREN".getBytes(); static final char PREFIX = '_'; - //id table reserved - static final String TIPID = "TipID"; + //cache + private ConcurrentHashMap<NodeID,CassandraNode> m_cache; + private ConcurrentHashMap<String,CassandraNode> m_tipCache; public CassandraForest(String _host,int _port,String _ks,int _threads) { - m_service = Executors.newFixedThreadPool(_threads,new RequestSenderFactory(_host,_port,_ks)); + m_service = Executors.newFixedThreadPool(_threads,new ClientThreadFactory(_host,_port)); + m_cache = new ConcurrentHashMap<NodeID,CassandraNode>(); + m_tipCache = new ConcurrentHashMap<String,CassandraNode>(); } @Override public Node get(NodeID _id) { - return new CassandraNode(this,_id); } @Override public Node create() { - return createNode(null,null); //create new node + return createNode(null,null); } public NodeData getNodeData(NodeID _id) { final NodeID id = _id; - - //create future task. - Callable<List<ColumnOrSuperColumn>> task = new Callable<List<ColumnOrSuperColumn>>(){ - @Override - public List<ColumnOrSuperColumn> call() throws Exception - { - RequestSender sender = (RequestSender)Thread.currentThread(); - List<ColumnOrSuperColumn> res = sender.get_slice(NODETABLE,id.toString(),ConsistencyLevel.ONE); - return res; - } - }; - Future<List<ColumnOrSuperColumn>> future = m_service.submit(task); - - NodeData data = new NodeData(); - try{ - List<ColumnOrSuperColumn> slice = future.get(); - - //iterate column - for(ColumnOrSuperColumn column : slice){ - String name = new String(column.column.name); - - //if column name matches CHILDREN , deserialize value to child list. - if(name.equals(CHILDREN)){ - List<Node> tmp = deserialize(new String(column.column.value)); - data.add(tmp); - }else{ - String key = name.substring(1); //size of prefix - data.set(key.getBytes(),column.column.value); - } - } - }catch(Exception _e){ - _e.printStackTrace(); - } - - return data; - } - - public List<CassandraNode> multiCreateNode(List<CassandraNode> _list) - { - final Map<String,Map<String,List<Mutation>>> mutationMap = new HashMap<String,Map<String,List<Mutation>>>(); - - Map<String,List<Mutation>> nodeTable = new HashMap<String,List<Mutation>>(); - Map<String,List<Mutation>> tipTable = new HashMap<String,List<Mutation>>(); - for(CassandraNode node : _list){ - LinkedList<Mutation> list = new LinkedList<Mutation>(); - Mutation mut = new Mutation(); - ColumnOrSuperColumn column = new ColumnOrSuperColumn(); - mut.column_or_supercolumn = column; - column.column.name = CHILDREN.getBytes(); - column.column.value = serialize(node.getData().list()).getBytes(); - list.add(mut); - - for(byte[] key : node.getData().keys()){ - mut = new Mutation(); - column = new ColumnOrSuperColumn(); - mut.column_or_supercolumn = column; - column.column.name = key; - column.column.value = node.getData().get(key); - - list.add(mut); - } - - nodeTable.put(node.getID().toString(),list); - - mut = new Mutation(); - column = new ColumnOrSuperColumn(); - column.column.name = TIPID.getBytes(); - column.column.value = node.getID().getVersion().getBytes(); - list = new LinkedList<Mutation>(); - list.add(mut); - tipTable.put(node.getID().getUUID(),list); - } - mutationMap.put(NODETABLE,nodeTable); - mutationMap.put(TIPTABLE,tipTable); - - Runnable task = new Runnable(){ - @Override - public void run() - { - RequestSender sender = (RequestSender)Thread.currentThread(); - sender.batch_mutate(mutationMap,ConsistencyLevel.ONE); - } - }; - - m_service.execute(task); - - return _list; - } - - /** - * list serializer. - * ex. list{"hoge","fuga"} -> "hoge,fuga" - * @param _list - * @return selialized string - */ - public String serialize(List<Node> _list) - { - String prefix = ""; - StringBuffer buf = new StringBuffer(); - for(Node child : _list){ - buf.append(prefix+child.getID().toString()); - prefix = ","; - } - - return buf.toString(); - } - - /** - * string deserializer. - * ex. "hoge,fuga" -> list{"hoge","fuga"} - * @param _selialized - * @return list - */ - public LinkedList<Node> deserialize(String _serialized) throws IllegalArgumentException - { - StringTokenizer tokens = new StringTokenizer(_serialized,","); - LinkedList<Node> res = new LinkedList<Node>(); - - while(tokens.hasMoreElements()){ - String tmp = tokens.nextToken(); - StringTokenizer uuidAndVer = new StringTokenizer(tmp,"@"); - - try{ - NodeID id = createID(uuidAndVer.nextToken(),uuidAndVer.nextToken()); - res.add(get(id)); - }catch(Exception _e){ - throw new IllegalArgumentException("unable to deserialize string ["+_serialized+"]",_e); - } - } - - return res; + return null; } public NodeID getTipID(String _uuid) { final String uuid = _uuid; - Callable<byte[]> task = new Callable<byte[]>(){ - @Override - public byte[] call() throws Exception - { - RequestSender sender = (RequestSender)Thread.currentThread(); - byte[] value = sender.get(NODETABLE,uuid,TIPID.getBytes(),ConsistencyLevel.ONE); - return value; - } - }; - - Future<byte[]> future = m_service.submit(task); - - try { - byte[] value = future.get(); - String id = new String(value); - StringTokenizer token = new StringTokenizer(id,"@"); - NodeID nodeID = createID(token.nextToken(),token.nextToken()); - return nodeID; - }catch(Exception _e){ - _e.printStackTrace(); - } - - return null; //not found. + return null; } public Node createNode(NodeID _id,NodeData _data) @@ -265,65 +116,7 @@ @Override public Boolean call() throws Exception { - RequestSender sender = (RequestSender)Thread.currentThread(); - - //mutation map - HashMap<String,Map<String,List<Mutation>>> map = new HashMap<String,Map<String,List<Mutation>>>(); - - /* - * create mutation map for NODETABLE - */ - if(data != null){ - LinkedList<Mutation> list = new LinkedList<Mutation>(); - HashMap<String,List<Mutation>> info = new HashMap<String,List<Mutation>>(); - Iterator<Node> itr = data.list().iterator(); - - /* - * create CSV from child list. - */ - StringBuffer buffer = new StringBuffer(); - for(String prefix = "";itr.hasNext();prefix = ","){ - buffer.append(String.format("%s%s",prefix,itr.next().getID().toString())); - } - Mutation mutChildren = new Mutation(); - ColumnOrSuperColumn children = new ColumnOrSuperColumn(); - children.column.name = CHILDREN.getBytes(); - children.column.value = buffer.toString().getBytes(); - mutChildren.column_or_supercolumn = children; - list.add(mutChildren); - - /* - * - */ - for(byte[] key : data.keys()){ - Mutation mut = new Mutation(); - ColumnOrSuperColumn column = new ColumnOrSuperColumn(); - column.column.name = key; - column.column.value = data.get(key); - mut.column_or_supercolumn = column; - list.add(mut); - } - info.put(id.toString(),list); - - map.put(NODETABLE,info); - } - - /* - * create mutation map for NODEIDTABLE - */ - HashMap<String,List<Mutation>> idtable_mutations = new HashMap<String,List<Mutation>>(); - LinkedList<Mutation> list = new LinkedList<Mutation>(); - - Mutation mutTipID = new Mutation(); - ColumnOrSuperColumn tipID = new ColumnOrSuperColumn(); - tipID.column.name = TIPID.getBytes(); - tipID.column.value = id.getVersion().getBytes(); - mutTipID.column_or_supercolumn = tipID; - - list.add(mutTipID); - idtable_mutations.put(TIPTABLE,list); - - return sender.batch_mutate(map,ConsistencyLevel.ONE); + return true; } }; @@ -372,141 +165,16 @@ return m_version; } } - - private static class RequestSender extends Thread + + @Override + public Node create(NodeData data) { - private int m_port; - private String m_host,m_ks; - private Cassandra.Client m_client; - - public RequestSender(Runnable _runnable,String _host,int _port,String _ks) throws TTransportException - { - super(_runnable); - m_port = _port; - m_host = _host; - m_ks = _ks; - - connect(); - } - - public void connect() throws TTransportException - { - TTransport tr = new TSocket(m_host,m_port); - TProtocol proto = new TBinaryProtocol(tr); - m_client = new Cassandra.Client(proto); - - tr.open(); - } - - public static RequestSender newInstance(Runnable _runnable,String _host,int _port,String _ks) - { - RequestSender sender = null; - try { - sender = new RequestSender(_runnable,_host,_port,_ks); - } catch (TTransportException _e) { - _e.printStackTrace(); - } - - return sender; - } - - public byte[] get(String _cf,String _key,byte[] _name,ConsistencyLevel _lv) - { - byte[] ret = null; - - ColumnPath path = new ColumnPath(); - path.column_family = _cf; - path.column = _name; - try { - ColumnOrSuperColumn cors = m_client.get(m_ks,_key,path,_lv); - ret = cors.column.value; - }catch(NotFoundException _e){ - System.out.println(String.format("column not found [%s][%s][%s]",_cf,_key,new String(_name))); - }catch(Exception _e){ - _e.printStackTrace(); - } - - return ret; - } - - public boolean insert(String _cf,String _key,byte[] _name,byte[] _value,ConsistencyLevel _lv) - { - ColumnPath path = new ColumnPath(); - path.column_family = _cf; - path.column = _name; - - try{ - m_client.insert(m_ks,_key,path,_value,System.currentTimeMillis()/1000,_lv); - return true; - }catch(Exception _e){ - _e.printStackTrace(); - } - - return false; - } - - public List<ColumnOrSuperColumn> get_slice(String _cf,String _key,ConsistencyLevel _lv) - { - List<ColumnOrSuperColumn> ret = null; - SliceRange sr = new SliceRange(new byte[0],new byte[0],false,-1); - SlicePredicate sp = new SlicePredicate(); - sp.slice_range = sr; - - try { - ret = m_client.get_slice(m_ks,_key,new ColumnParent(_cf),sp,_lv); - }catch(Exception _e){ - _e.printStackTrace(); - } - - return ret; - } - - public boolean batch_insert(String _cf,Map<String,List<ColumnOrSuperColumn>> _map,ConsistencyLevel _lv) - { - try{ - m_client.batch_insert(m_ks,_cf,_map,_lv); - return true; - }catch(Exception _e){ - _e.printStackTrace(); - } - - return false; - } - - public boolean batch_mutate(Map<String,Map<String,List<Mutation>>> _mutateMap,ConsistencyLevel _lv) - { - try { - m_client.batch_mutate(m_ks,_mutateMap,_lv); - return true; - }catch(Exception _e){ - _e.printStackTrace(); - } - - return false; - } - - public String toString() - { - return "[thread="+this.getName()+",host="+m_host+",port="+m_port+",ks="+m_ks+"]"; - } + return null; } - - private class RequestSenderFactory implements ThreadFactory + + @Override + public Node getTip(String uuid) { - private int m_port; - private String m_host,m_ks; - - public RequestSenderFactory(String _host,int _port,String _ks) - { - m_host = _host; - m_port = _port; - m_ks = _ks; - } - - @Override - public Thread newThread(Runnable _runnable) - { - return RequestSender.newInstance(_runnable,m_host,m_port,m_ks); - } + return null; } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/treecms/tree/cassandra/v1/ClientThread.java Thu Mar 17 23:24:08 2011 +0900 @@ -0,0 +1,48 @@ +package treecms.tree.cassandra.v1; + +import org.apache.thrift.transport.TTransportException; + +/** + * CassandraのClientを保持したスレッドオブジェクトです。 + * @author shoshi + */ +final class ClientThread extends Thread +{ + private ClientWrapper m_wrapper; + + /** + * コンストラクタです。 + * @param _host Cassandraのホスト名 + * @param _port Cassandraのポート番号 + * @param _runnable このスレッドで動作するRunnable + * @throws TTransportException + */ + private ClientThread(String _host,int _port,Runnable _runnable) throws TTransportException + { + super(_runnable); + m_wrapper = new ClientWrapper(_host,_port,2); + } + + /** + * ファクトリメソッドです。 + * @param _host Cassandraのホスト名 + * @param _port Cassandraのポート番号 + * @param _runnable このスレッドで動作するRunnable + * @return 新しいインスタンス + * @throws TTransportException Cassandraへの接続が失敗したとき + */ + public static ClientThread newInstance(String _host,int _port,Runnable _runnable) throws TTransportException + { + ClientThread thread = new ClientThread(_host,_port,_runnable); + return thread; + } + + /** + * ClientWrapperを取得します + * @return CassandraへのClientWrapper + */ + public ClientWrapper getClientWrapper() + { + return m_wrapper; + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/treecms/tree/cassandra/v1/ClientThreadFactory.java Thu Mar 17 23:24:08 2011 +0900 @@ -0,0 +1,42 @@ +package treecms.tree.cassandra.v1; + +import java.util.concurrent.ThreadFactory; + +import org.apache.thrift.transport.TTransportException; + +/** + * ローカル変数としてCassandra.Clientを保持するスレッドオブジェクトを生成するスレッドファクトリーです。 + * @author shoshi + */ +final class ClientThreadFactory implements ThreadFactory +{ + private String m_host; + private int m_port; + + /** + * コンストラクタです。 + * @param _host Cassandraのアドレス・ホスト名 + * @param _port Thriftポート番号 + */ + public ClientThreadFactory(String _host,int _port) + { + m_host = _host; + m_port = _port; + } + + /** + * Cassandra.Clientを保持するスレッドオブジェクトを新しく作成します。 + */ + @Override + public Thread newThread(Runnable _runnable) + { + ClientThread client = null; + try{ + client = ClientThread.newInstance(m_host,m_port,_runnable); + }catch(TTransportException _e) { + _e.printStackTrace(); + throw new RuntimeException(_e); + } + return client; + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/treecms/tree/cassandra/v1/ClientWrapper.java Thu Mar 17 23:24:08 2011 +0900 @@ -0,0 +1,203 @@ +package treecms.tree.cassandra.v1; + +import java.util.List; +import java.util.Map; +import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.ColumnOrSuperColumn; +import org.apache.cassandra.thrift.ColumnParent; +import org.apache.cassandra.thrift.ColumnPath; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.thrift.KeyRange; +import org.apache.cassandra.thrift.KeySlice; +import org.apache.cassandra.thrift.Mutation; +import org.apache.cassandra.thrift.NotFoundException; +import org.apache.cassandra.thrift.SlicePredicate; +import org.apache.cassandra.thrift.TimedOutException; +import org.apache.cassandra.thrift.UnavailableException; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +/** + * Cassandra.Clientのラッパークラスです。Cassandra.Clientを使いやすくするための機能を提供します。 + * 接続済みのCassandra.Clientの接続が切断されたときに再接続を行います。このクラスはスレッドセーフでは有りません。 + * + * このラッパークラスはCassandra 0.6.xのAPIをベースに作成します。 + * @author shoshi + */ +final class ClientWrapper +{ + private String m_host; + private int m_port; + private int m_retryCount; + + private TTransport m_tr; + private Cassandra.Client m_client; + + /** + * コンストラクタです。初期化して接続します。 + * @param _host Cassandraのホスト名 + * @param _port Cassandraのポート番号 + * @param _retryCount リクエストが失敗した場合リトライする回数 + * @throws TTransportException + */ + public ClientWrapper(String _host,int _port,int _retryCount) throws TTransportException + { + m_host = _host; + m_port = _port; + m_retryCount = _retryCount; + + connect(); + } + + /** + * Cassandraに接続します。 + * @throws TTransportException + */ + private void connect() throws TTransportException + { + Cassandra.Client client; + TTransport tr = new TSocket(m_host,m_port); + client = new Cassandra.Client(new TBinaryProtocol(tr)); + + tr.open(); + + m_tr = tr; + m_client = client; + } + + /** + * Cassandraへの接続を切ります。 + */ + private void disconnect() + { + m_tr.close(); + m_client = null; + } + + /** + * Cassandraへ再接続を行います。 + * @return 再接続が成功した場合true + */ + private boolean reconnect() + { + disconnect(); + try { + connect(); + }catch(TTransportException _e){ + _e.printStackTrace(); + } + return true; + } + + /** + * ここで共通する例外処理(再接続など)を行う + * @param _e 例外 + */ + private void exceptionHandler(Exception _e) + { + _e.printStackTrace(); + } + + /** + * Cassandra.Client.getのラッパーメソッド + */ + public ColumnOrSuperColumn get(String _ks,String _key,ColumnPath _path,ConsistencyLevel _level) throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException, TException + { + for(int i = 0;i < m_retryCount;i ++){ + if(m_client == null){ + throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port); + } + + try { + ColumnOrSuperColumn cors = m_client.get(_ks,_key,_path,_level); + return cors; + }catch(Exception _e){ + exceptionHandler(_e); + } + } + return m_client.get(_ks,_key,_path,_level); + } + + /** + * Cassandra.Client.get_sliceのラッパーメソッド + */ + public List<ColumnOrSuperColumn> get_slice(String _ks,String _key,ColumnParent _parent,SlicePredicate _predicate,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException + { + for(int i = 0;i < m_retryCount;i ++){ + if(m_client == null){ + throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port); + } + + try { + List<ColumnOrSuperColumn> list = m_client.get_slice(_ks,_key,_parent,_predicate,_level); + return list; + }catch(Exception _e){ + exceptionHandler(_e); + } + } + return m_client.get_slice(_ks,_key,_parent,_predicate,_level); + } + + /** + * Cassandra.Client.get_range_slicesのラッパーメソッド + */ + public List<KeySlice> get_range_slices(String _ks,ColumnParent _parent,SlicePredicate _predicate,KeyRange _range,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException + { + for(int i = 0;i < m_retryCount;i ++){ + if(m_client == null){ + throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port); + } + + try { + return m_client.get_range_slices(_ks,_parent,_predicate,_range,_level); + }catch(Exception _e){ + exceptionHandler(_e); + } + } + return m_client.get_range_slices(_ks,_parent,_predicate,_range,_level); + } + + /** + * Cassandra.Client.insertのラッパーメソッド + */ + public void insert(String _ks,String _key,ColumnPath _path,byte[] _value,long _timestamp,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException + { + for(int i = 1;i < m_retryCount;i ++){ + if(m_client == null){ + throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port); + } + + try{ + m_client.insert(_ks,_key,_path,_value,_timestamp,_level); + }catch(Exception _e){ + exceptionHandler(_e); + } + } + m_client.insert(_ks,_key,_path,_value,_timestamp,_level); + return; + } + + /** + * Cassandra.Client.batch_mutateのラッパーメソッド + */ + public void batch_mutate(String _ks,Map<String,Map<String,List<Mutation>>> _mutation_map,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException + { + for(int i = 1;i < m_retryCount;i ++){ + if(m_client == null){ + throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port); + } + + try{ + m_client.batch_mutate(_ks,_mutation_map,_level); + }catch(Exception _e){ + exceptionHandler(_e); + } + } + m_client.batch_mutate(_ks,_mutation_map,_level); + return; + } +}
--- a/src/treecms/tree/util/NodePathFinder.java Mon Mar 14 23:24:38 2011 +0900 +++ b/src/treecms/tree/util/NodePathFinder.java Thu Mar 17 23:24:08 2011 +0900 @@ -21,11 +21,12 @@ * @param _root 木構造のトップです. * @param _target パス検索の対象となるNodeです. */ - public NodePathFinder(Node _root,Node _target) + public NodePathFinder(Node _root,Node _target) throws PathNotFoundException { m_root = _root; m_target = _target; - m_path = Collections.unmodifiableList(findPath(m_root,m_target)); + List<Node> path = findPath(m_root,m_target); + m_path = Collections.unmodifiableList(path); } /** @@ -57,14 +58,14 @@ { if(_from.getID().isFamily(_target.getID())){ LinkedList<Node> path = new LinkedList<Node>(); - path.add(_from); + path.addFirst(_from); return path; } for(Node child : _from.children()){ LinkedList<Node> path = findPath(child,_target); if(path != null){ - path.add(_from); + path.addFirst(_from); } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/treecms/tree/util/PathNotFoundException.java Thu Mar 17 23:24:08 2011 +0900 @@ -0,0 +1,22 @@ +package treecms.tree.util; + +import treecms.api.Node; + +/** + * あるNodeからNodeまでのパスが検索したが見つからない場合に発生します. + * @author shoshi + */ +public class PathNotFoundException extends Exception +{ + private static final long serialVersionUID = 6372927818478170944L; + + /** + * コンストラクタです. + * @param _from 木構造のルートNode + * @param _to 検索する対象のNode + */ + public PathNotFoundException(Node _from,Node _to) + { + super("Path Not Found from "+_from.getID()+" to "+_to.getID()); + } +}