Mercurial > hg > Members > shoshi > TreeCMSv2
changeset 4:f5ed85be5640
finished treecms.cassandra.v1 implementation (not tested yet)
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/.classpath Thu Feb 24 21:30:18 2011 +0900 @@ -0,0 +1,25 @@ +<?xml version="1.0" encoding="UTF-8"?> +<classpath> + <classpathentry kind="src" path="src"/> + <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/> + <classpathentry kind="lib" path="lib/antlr-3.1.3.jar"/> + <classpathentry kind="lib" path="lib/apache-cassandra-0.6.8.jar"/> + <classpathentry kind="lib" path="lib/clhm-production.jar"/> + <classpathentry kind="lib" path="lib/commons-cli-1.1.jar"/> + <classpathentry kind="lib" path="lib/commons-codec-1.2.jar"/> + <classpathentry kind="lib" path="lib/commons-collections-3.2.1.jar"/> + <classpathentry kind="lib" path="lib/commons-lang-2.4.jar"/> + <classpathentry kind="lib" path="lib/google-collections-1.0.jar"/> + <classpathentry kind="lib" path="lib/hadoop-core-0.20.1.jar"/> + <classpathentry kind="lib" path="lib/high-scale-lib.jar"/> + <classpathentry kind="lib" path="lib/ivy-2.1.0.jar"/> + <classpathentry kind="lib" path="lib/jackson-core-asl-1.4.0.jar"/> + <classpathentry kind="lib" path="lib/jackson-mapper-asl-1.4.0.jar"/> + <classpathentry kind="lib" path="lib/jline-0.9.94.jar"/> + <classpathentry kind="lib" path="lib/json-simple-1.1.jar"/> + <classpathentry kind="lib" path="lib/libthrift-r917130.jar"/> + <classpathentry kind="lib" path="lib/log4j-1.2.14.jar"/> + <classpathentry kind="lib" path="lib/slf4j-api-1.5.8.jar"/> + <classpathentry kind="lib" path="lib/slf4j-log4j12-1.5.8.jar"/> + <classpathentry kind="output" path="bin"/> +</classpath>
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/.project Thu Feb 24 21:30:18 2011 +0900 @@ -0,0 +1,17 @@ +<?xml version="1.0" encoding="UTF-8"?> +<projectDescription> + <name>TreeCMS</name> + <comment></comment> + <projects> + </projects> + <buildSpec> + <buildCommand> + <name>org.eclipse.jdt.core.javabuilder</name> + <arguments> + </arguments> + </buildCommand> + </buildSpec> + <natures> + <nature>org.eclipse.jdt.core.javanature</nature> + </natures> +</projectDescription>
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/.settings/org.eclipse.jdt.core.prefs Thu Feb 24 21:30:18 2011 +0900 @@ -0,0 +1,12 @@ +#Wed Feb 02 15:37:28 JST 2011 +eclipse.preferences.version=1 +org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled +org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6 +org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve +org.eclipse.jdt.core.compiler.compliance=1.6 +org.eclipse.jdt.core.compiler.debug.lineNumber=generate +org.eclipse.jdt.core.compiler.debug.localVariable=generate +org.eclipse.jdt.core.compiler.debug.sourceFile=generate +org.eclipse.jdt.core.compiler.problem.assertIdentifier=error +org.eclipse.jdt.core.compiler.problem.enumIdentifier=error +org.eclipse.jdt.core.compiler.source=1.6
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CHANGELOG Thu Feb 24 21:30:18 2011 +0900 @@ -0,0 +1,15 @@ +ChangeLog. + +2011-02-25 + finished treecms.cassandra.v1 implementation ( not tested yet. ) +2011-02-18 + finished treecms.memory basic implementation ( not tested yet. ) +2011-02-16 + added OnMemoryForest +2011-02-16 + add Forest + + +Requirements. + + Cassandra 0.6.x
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/documents/cassandra.thrift Thu Feb 24 21:30:18 2011 +0900 @@ -0,0 +1,464 @@ +#!/usr/local/bin/thrift --java --php --py +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# *** PLEASE REMEMBER TO EDIT THE VERSION CONSTANT WHEN MAKING CHANGES *** +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +# +# Interface definition for Cassandra Service +# + +namespace java org.apache.cassandra.thrift +namespace cpp org.apache.cassandra +namespace csharp Apache.Cassandra +namespace py cassandra +namespace php cassandra +namespace perl Cassandra + +# Thrift.rb has a bug where top-level modules that include modules +# with the same name are not properly referenced, so we can't do +# Cassandra::Cassandra::Client. +namespace rb CassandraThrift + +# The API version (NOT the product version), composed as a dot delimited +# string with major, minor, and patch level components. +# +# - Major: Incremented for backward incompatible changes. An example would +# be changes to the number or disposition of method arguments. +# - Minor: Incremented for backward compatible changes. An example would +# be the addition of a new (optional) method. +# - Patch: Incremented for bug fixes. The patch level should be increased +# for every edit that doesn't result in a change to major/minor. +# +# See the Semantic Versioning Specification (SemVer) http://semver.org. +const string VERSION = "2.2.0" + +# +# data structures +# + +/** Basic unit of data within a ColumnFamily. + * @param name. A column name can act both as structure (a label) or as data (like value). Regardless, the name of the column + * is used as a key to its value. + * @param value. Some data + * @param timestamp. Used to record when data was sent to be written. + */ +struct Column { + 1: required binary name, + 2: required binary value, + 3: required i64 timestamp, +} + +/** A named list of columns. + * @param name. see Column.name. + * @param columns. A collection of standard Columns. The columns within a super column are defined in an adhoc manner. + * Columns within a super column do not have to have matching structures (similarly named child columns). + */ +struct SuperColumn { + 1: required binary name, + 2: required list<Column> columns, +} + +/** + Methods for fetching rows/records from Cassandra will return either a single instance of ColumnOrSuperColumn or a list + of ColumnOrSuperColumns (get_slice()). If you're looking up a SuperColumn (or list of SuperColumns) then the resulting + instances of ColumnOrSuperColumn will have the requested SuperColumn in the attribute super_column. For queries resulting + in Columns, those values will be in the attribute column. This change was made between 0.3 and 0.4 to standardize on + single query methods that may return either a SuperColumn or Column. + + @param column. The Column returned by get() or get_slice(). + @param super_column. The SuperColumn returned by get() or get_slice(). + */ +struct ColumnOrSuperColumn { + 1: optional Column column, + 2: optional SuperColumn super_column, +} + + +# +# Exceptions +# (note that internal server errors will raise a TApplicationException, courtesy of Thrift) +# + +/** A specific column was requested that does not exist. */ +exception NotFoundException { +} + +/** Invalid request could mean keyspace or column family does not exist, required parameters are missing, or a parameter is malformed. + why contains an associated error message. +*/ +exception InvalidRequestException { + 1: required string why +} + +/** Not all the replicas required could be created and/or read. */ +exception UnavailableException { +} + +/** RPC timeout was exceeded. either a node failed mid-operation, or load was too high, or the requested op was too large. */ +exception TimedOutException { +} + +/** invalid authentication request (user does not exist or credentials invalid) */ +exception AuthenticationException { + 1: required string why +} + +/** invalid authorization request (user does not have access to keyspace) */ +exception AuthorizationException { + 1: required string why +} + + +# +# service api +# +/** The ConsistencyLevel is an enum that controls both read and write behavior based on <ReplicationFactor> in your + * storage-conf.xml. The different consistency levels have different meanings, depending on if you're doing a write or read + * operation. Note that if W + R > ReplicationFactor, where W is the number of nodes to block for on write, and R + * the number to block for on reads, you will have strongly consistent behavior; that is, readers will always see the most + * recent write. Of these, the most interesting is to do QUORUM reads and writes, which gives you consistency while still + * allowing availability in the face of node failures up to half of <ReplicationFactor>. Of course if latency is more + * important than consistency then you can use lower values for either or both. + * + * Write: + * ZERO Ensure nothing. A write happens asynchronously in background + * ANY Ensure that the write has been written once somewhere, including possibly being hinted in a non-target node. + * ONE Ensure that the write has been written to at least 1 node's commit log and memory table before responding to the client. + * QUORUM Ensure that the write has been written to <ReplicationFactor> / 2 + 1 nodes before responding to the client. + * ALL Ensure that the write is written to <code><ReplicationFactor></code> nodes before responding to the client. + * + * Read: + * ZERO Not supported, because it doesn't make sense. + * ANY Not supported. You probably want ONE instead. + * ONE Will return the record returned by the first node to respond. A consistency check is always done in a + * background thread to fix any consistency issues when ConsistencyLevel.ONE is used. This means subsequent + * calls will have correct data even if the initial read gets an older value. (This is called 'read repair'.) + * QUORUM Will query all storage nodes and return the record with the most recent timestamp once it has at least a + * majority of replicas reported. Again, the remaining replicas will be checked in the background. + * ALL Queries all storage nodes and returns the record with the most recent timestamp. +*/ +enum ConsistencyLevel { + ZERO = 0, + ONE = 1, + QUORUM = 2, + DCQUORUM = 3, + DCQUORUMSYNC = 4, + ALL = 5, + ANY = 6, +} + +/** + ColumnParent is used when selecting groups of columns from the same ColumnFamily. In directory structure terms, imagine + ColumnParent as ColumnPath + '/../'. + + See also <a href="cassandra.html#Struct_ColumnPath">ColumnPath</a> + */ +struct ColumnParent { + 3: required string column_family, + 4: optional binary super_column, +} + +/** The ColumnPath is the path to a single column in Cassandra. It might make sense to think of ColumnPath and + * ColumnParent in terms of a directory structure. + * + * ColumnPath is used to looking up a single column. + * + * @param column_family. The name of the CF of the column being looked up. + * @param super_column. The super column name. + * @param column. The column name. + */ +struct ColumnPath { + 3: required string column_family, + 4: optional binary super_column, + 5: optional binary column, +} + +/** + A slice range is a structure that stores basic range, ordering and limit information for a query that will return + multiple columns. It could be thought of as Cassandra's version of LIMIT and ORDER BY + + @param start. The column name to start the slice with. This attribute is not required, though there is no default value, + and can be safely set to '', i.e., an empty byte array, to start with the first column name. Otherwise, it + must a valid value under the rules of the Comparator defined for the given ColumnFamily. + @param finish. The column name to stop the slice at. This attribute is not required, though there is no default value, + and can be safely set to an empty byte array to not stop until 'count' results are seen. Otherwise, it + must also be a valid value to the ColumnFamily Comparator. + @param reversed. Whether the results should be ordered in reversed order. Similar to ORDER BY blah DESC in SQL. + @param count. How many keys to return. Similar to LIMIT 100 in SQL. May be arbitrarily large, but Thrift will + materialize the whole result into memory before returning it to the client, so be aware that you may + be better served by iterating through slices by passing the last value of one call in as the 'start' + of the next instead of increasing 'count' arbitrarily large. + */ +struct SliceRange { + 1: required binary start, + 2: required binary finish, + 3: required bool reversed=0, + 4: required i32 count=100, +} + +/** + A SlicePredicate is similar to a mathematic predicate (see http://en.wikipedia.org/wiki/Predicate_(mathematical_logic)), + which is described as "a property that the elements of a set have in common." + + SlicePredicate's in Cassandra are described with either a list of column_names or a SliceRange. If column_names is + specified, slice_range is ignored. + + @param column_name. A list of column names to retrieve. This can be used similar to Memcached's "multi-get" feature + to fetch N known column names. For instance, if you know you wish to fetch columns 'Joe', 'Jack', + and 'Jim' you can pass those column names as a list to fetch all three at once. + @param slice_range. A SliceRange describing how to range, order, and/or limit the slice. + */ +struct SlicePredicate { + 1: optional list<binary> column_names, + 2: optional SliceRange slice_range, +} + +/** +The semantics of start keys and tokens are slightly different. +Keys are start-inclusive; tokens are start-exclusive. Token +ranges may also wrap -- that is, the end token may be less +than the start one. Thus, a range from keyX to keyX is a +one-element range, but a range from tokenY to tokenY is the +full ring. +*/ +struct KeyRange { + 1: optional string start_key, + 2: optional string end_key, + 3: optional string start_token, + 4: optional string end_token, + 5: required i32 count=100 +} + +/** + A KeySlice is key followed by the data it maps to. A collection of KeySlice is returned by the get_range_slice operation. + + @param key. a row key + @param columns. List of data represented by the key. Typically, the list is pared down to only the columns specified by + a SlicePredicate. + */ +struct KeySlice { + 1: required string key, + 2: required list<ColumnOrSuperColumn> columns, +} + +struct Deletion { + 1: required i64 timestamp, + 2: optional binary super_column, + 3: optional SlicePredicate predicate, +} + +/** + A Mutation is either an insert, represented by filling column_or_supercolumn, or a deletion, represented by filling the deletion attribute. + @param column_or_supercolumn. An insert to a column or supercolumn + @param deletion. A deletion of a column or supercolumn +*/ +struct Mutation { + 1: optional ColumnOrSuperColumn column_or_supercolumn, + 2: optional Deletion deletion, +} + +struct TokenRange { + 1: required string start_token, + 2: required string end_token, + 3: required list<string> endpoints, +} + +/** + Authentication requests can contain any data, dependent on the AuthenticationBackend used +*/ +struct AuthenticationRequest { + 1: required map<string, string> credentials, +} + + +service Cassandra { + # auth methods + void login(1: required string keyspace, 2:required AuthenticationRequest auth_request) throws (1:AuthenticationException authnx, 2:AuthorizationException authzx), + + # retrieval methods + + /** + Get the Column or SuperColumn at the given column_path. If no value is present, NotFoundException is thrown. (This is + the only method that can throw an exception under non-failure conditions.) + */ + ColumnOrSuperColumn get(1:required string keyspace, + 2:required string key, + 3:required ColumnPath column_path, + 4:required ConsistencyLevel consistency_level=ONE) + throws (1:InvalidRequestException ire, 2:NotFoundException nfe, 3:UnavailableException ue, 4:TimedOutException te), + + /** + Get the group of columns contained by column_parent (either a ColumnFamily name or a ColumnFamily/SuperColumn name + pair) specified by the given SlicePredicate. If no matching values are found, an empty list is returned. + */ + list<ColumnOrSuperColumn> get_slice(1:required string keyspace, + 2:required string key, + 3:required ColumnParent column_parent, + 4:required SlicePredicate predicate, + 5:required ConsistencyLevel consistency_level=ONE) + throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te), + + /** + Perform a get for column_path in parallel on the given list<string> keys. The return value maps keys to the + ColumnOrSuperColumn found. If no value corresponding to a key is present, the key will still be in the map, but both + the column and super_column references of the ColumnOrSuperColumn object it maps to will be null. + @deprecated; use multiget_slice + */ + map<string,ColumnOrSuperColumn> multiget(1:required string keyspace, + 2:required list<string> keys, + 3:required ColumnPath column_path, + 4:required ConsistencyLevel consistency_level=ONE) + throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te), + + /** + Performs a get_slice for column_parent and predicate for the given keys in parallel. + */ + map<string,list<ColumnOrSuperColumn>> multiget_slice(1:required string keyspace, + 2:required list<string> keys, + 3:required ColumnParent column_parent, + 4:required SlicePredicate predicate, + 5:required ConsistencyLevel consistency_level=ONE) + throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te), + + /** + returns the number of columns for a particular <code>key</code> and <code>ColumnFamily</code> or <code>SuperColumn</code>. + */ + i32 get_count(1:required string keyspace, + 2:required string key, + 3:required ColumnParent column_parent, + 4:required ConsistencyLevel consistency_level=ONE) + throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te), + + /** + returns a subset of columns for a range of keys. + @Deprecated. Use get_range_slices instead + */ + list<KeySlice> get_range_slice(1:required string keyspace, + 2:required ColumnParent column_parent, + 3:required SlicePredicate predicate, + 4:required string start_key="", + 5:required string finish_key="", + 6:required i32 row_count=100, + 7:required ConsistencyLevel consistency_level=ONE) + throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te), + + /** + returns a subset of columns for a range of keys. + */ + list<KeySlice> get_range_slices(1:required string keyspace, + 2:required ColumnParent column_parent, + 3:required SlicePredicate predicate, + 4:required KeyRange range, + 5:required ConsistencyLevel consistency_level=ONE) + throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te), + + # modification methods + + /** + Insert a Column consisting of (column_path.column, value, timestamp) at the given column_path.column_family and optional + column_path.super_column. Note that column_path.column is here required, since a SuperColumn cannot directly contain binary + values -- it can only contain sub-Columns. + */ + void insert(1:required string keyspace, + 2:required string key, + 3:required ColumnPath column_path, + 4:required binary value, + 5:required i64 timestamp, + 6:required ConsistencyLevel consistency_level=ONE) + throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te), + + /** + Insert Columns or SuperColumns across different Column Families for the same row key. batch_mutation is a + map<string, list<ColumnOrSuperColumn>> -- a map which pairs column family names with the relevant ColumnOrSuperColumn + objects to insert. + @deprecated; use batch_mutate instead + */ + void batch_insert(1:required string keyspace, + 2:required string key, + 3:required map<string, list<ColumnOrSuperColumn>> cfmap, + 4:required ConsistencyLevel consistency_level=ONE) + throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te), + + /** + Remove data from the row specified by key at the granularity specified by column_path, and the given timestamp. Note + that all the values in column_path besides column_path.column_family are truly optional: you can remove the entire + row by just specifying the ColumnFamily, or you can remove a SuperColumn or a single Column by specifying those levels too. + */ + void remove(1:required string keyspace, + 2:required string key, + 3:required ColumnPath column_path, + 4:required i64 timestamp, + 5:ConsistencyLevel consistency_level=ONE) + throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te), + + /** + Mutate many columns or super columns for many row keys. See also: Mutation. + + mutation_map maps key to column family to a list of Mutation objects to take place at that scope. + **/ + void batch_mutate(1:required string keyspace, + 2:required map<string, map<string, list<Mutation>>> mutation_map, + 3:required ConsistencyLevel consistency_level=ONE) + throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te), + + // Meta-APIs -- APIs to get information about the node or cluster, + // rather than user data. The nodeprobe program provides usage examples. + + /** get property whose value is of type string. @Deprecated */ + string get_string_property(1:required string property), + + /** get property whose value is list of strings. @Deprecated */ + list<string> get_string_list_property(1:required string property), + + /** list the defined keyspaces in this cluster */ + set<string> describe_keyspaces(), + + /** get the cluster name */ + string describe_cluster_name(), + + /** get the thrift api version */ + string describe_version(), + + /** get the token ring: a map of ranges to host addresses, + represented as a set of TokenRange instead of a map from range + to list of endpoints, because you can't use Thrift structs as + map keys: + https://issues.apache.org/jira/browse/THRIFT-162 + + for the same reason, we can't return a set here, even though + order is neither important nor predictable. */ + list<TokenRange> describe_ring(1:required string keyspace) + throws (1:InvalidRequestException ire), + + /** returns the partitioner used by this cluster */ + string describe_partitioner(), + + /** describe specified keyspace */ + map<string, map<string, string>> describe_keyspace(1:required string keyspace) + throws (1:NotFoundException nfe), + + /** experimental API for hadoop/parallel query support. + may change violently and without warning. + + returns list of token strings such that first subrange is (list[0], list[1]], + next is (list[1], list[2]], etc. */ + list<string> describe_splits(1:required string start_token, + 2:required string end_token, + 3:required i32 keys_per_split), +}
--- a/src/treecms/api/Node.java Fri Feb 18 02:14:10 2011 +0900 +++ b/src/treecms/api/Node.java Thu Feb 24 21:30:18 2011 +0900 @@ -4,6 +4,5 @@ { public NodeID getID(); public NodeData getData(); - public NodeData newData(); public Forest getForest(); } \ No newline at end of file
--- a/src/treecms/api/NodeData.java Fri Feb 18 02:14:10 2011 +0900 +++ b/src/treecms/api/NodeData.java Thu Feb 24 21:30:18 2011 +0900 @@ -1,14 +1,63 @@ package treecms.api; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; import java.util.List; +import java.util.Set; -public interface NodeData +public class NodeData { - public String get(); - public void set(String _str); + LinkedList<Node> m_children; + HashMap<byte[],byte[]> m_attrs; + + public NodeData() + { + m_children = new LinkedList<Node>(); + m_attrs = new HashMap<byte[],byte[]>(); + } + + public NodeData deepCopy() + { + NodeData copy = new NodeData(); + copy.m_children.addAll(m_children); + copy.m_attrs.putAll(m_attrs); + + return copy; + } + + public Set<byte[]> keys() + { + return m_attrs.keySet(); + } - public List<Node> list(); - public void add(List<Node> _child); - public void del(List<Node> _child); - public void clear(); + public byte[] get(byte[] _name) + { + return m_attrs.get(_name); + } + + public void set(byte[] _name,byte[] _value) + { + m_attrs.put(_name,_value); + } + + public List<Node> list() + { + return Collections.unmodifiableList(m_children); + } + + public void add(List<Node> _child) + { + m_children.addAll(_child); + } + + public void del(List<Node> _child) + { + m_children.removeAll(_child); + } + + public void clear() + { + m_children.clear(); + } }
--- a/src/treecms/memory/OnMemoryForest.java Fri Feb 18 02:14:10 2011 +0900 +++ b/src/treecms/memory/OnMemoryForest.java Thu Feb 24 21:30:18 2011 +0900 @@ -6,6 +6,7 @@ import java.util.concurrent.ConcurrentHashMap; import treecms.api.Forest; import treecms.api.Node; +import treecms.api.NodeData; import treecms.api.NodeID; import treecms.tree.id.RandomNodeID; @@ -18,16 +19,12 @@ m_table = new ConcurrentHashMap<NodeID,OnMemoryNode>(); } - public OnMemoryNode createNode(NodeID _id) + public OnMemoryNode createNode(NodeID _id,NodeData _newData) { - OnMemoryNode newNode; - if(_id == null){ - newNode = new OnMemoryNode(this,createID()); - }else{ - newNode = new OnMemoryNode(this,_id); - } - m_table.put(newNode.getID(),newNode); - + NodeID newID = (_id != null) ? _id : createID(); + NodeData newData = (_newData != null) ? _newData : new NodeData(); + OnMemoryNode newNode = new OnMemoryNode(this,newID,newData); + m_table.put(newID,newNode); return newNode; } @@ -45,7 +42,7 @@ @Override public Node create() { - return createNode(null); + return createNode(null,null); } class RandomNodeIDImpl extends RandomNodeID
--- a/src/treecms/memory/OnMemoryNode.java Fri Feb 18 02:14:10 2011 +0900 +++ b/src/treecms/memory/OnMemoryNode.java Thu Feb 24 21:30:18 2011 +0900 @@ -9,15 +9,16 @@ public class OnMemoryNode implements Node { + OnMemoryForest m_forest; + NodeID m_id; - OnMemoryForest m_forest; NodeData m_data; - public OnMemoryNode(OnMemoryForest _forest,NodeID _id) + public OnMemoryNode(OnMemoryForest _forest,NodeID _id,NodeData _newData) { m_id = _id; m_forest = _forest; - m_data = new NodeDataImpl(); + m_data = (_newData != null) ? _newData.deepCopy() : new NodeData(); } @Override @@ -37,58 +38,4 @@ { return m_data; } - - @Override - public NodeData newData() - { - return new NodeDataImpl(); - } - - class NodeDataImpl implements NodeData - { - List<Node> m_children; - String m_value; - - public NodeDataImpl() - { - m_children = new LinkedList<Node>(); - } - - @Override - public List<Node> list() - { - return m_children; - } - - @Override - public String get() - { - return m_value; - } - - @Override - public void set(String _str) - { - m_value = _str; - } - - @Override - public void add(List<Node> _child) - { - m_children.addAll(_child); - } - - @Override - public void del(List<Node> _child) - { - m_children.removeAll(_child); - } - - @Override - public void clear() - { - m_children.clear(); - } - - } }
--- a/src/treecms/memory/OnMemoryTree.java Fri Feb 18 02:14:10 2011 +0900 +++ b/src/treecms/memory/OnMemoryTree.java Thu Feb 24 21:30:18 2011 +0900 @@ -46,12 +46,6 @@ } @Override - public NodeData newData() - { - return m_root.newData(); - } - - @Override public Node getNodeByUUID(String _uuid) { return m_table.get(_uuid); @@ -74,16 +68,7 @@ OnMemoryNode cloneNode(OnMemoryNode _target,NodeData _newData) { - OnMemoryNode clone = m_forest.createNode(_target.getID().update()); - - if(_newData != null){ - clone.m_data.add(_newData.list()); - clone.m_data.set(_newData.get()); - }else{ - clone.m_data.add(_target.m_data.list()); - clone.m_data.set(_target.m_data.get()); - } - + OnMemoryNode clone = m_forest.createNode(_target.getID().update(),_newData); m_table.put(clone.getID().getUUID(),clone); return clone; }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/treecms/tree/cassandra/v1/CassandraForest.java Thu Feb 24 21:30:18 2011 +0900 @@ -0,0 +1,511 @@ +package treecms.tree.cassandra.v1; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.StringTokenizer; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; + +import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.ColumnOrSuperColumn; +import org.apache.cassandra.thrift.ColumnParent; +import org.apache.cassandra.thrift.ColumnPath; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.Mutation; +import org.apache.cassandra.thrift.NotFoundException; +import org.apache.cassandra.thrift.SlicePredicate; +import org.apache.cassandra.thrift.SliceRange; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +import treecms.api.Forest; +import treecms.api.Node; +import treecms.api.NodeData; +import treecms.api.NodeID; +import treecms.tree.id.RandomNodeID; + +/** + * implementation of TreeCMS with Cassandra backend. + * + * TreeCMSKS.NodeTable (table of all nodes) + * + * +---------------------------------------------+ + * + Key | Col1 | Col2 | Col3 | ... | + * +---------------------------------------------+ + * + NodeID | Children | _attr1 | _attr2 | ... | + * +---------------------------------------------+ + * + * TreeCMSKS.TipTable (table of tip) + * + * +--------------------+ + * + Key | Col1 | + * +--------------------+ + * + NodeID | version | + * +--------------------+ + * + * @author shoshi + */ + +public class CassandraForest implements Forest +{ + ExecutorService m_service; + + //column families. + static final String NODETABLE = "NodeTable"; + static final String TIPTABLE = "TipTable"; + + //reserved word. + static final String NODE = "Node"; + static final String CHILDREN = "Children"; + static final char PREFIX = '_'; + + //id table reserved + static final String TIPID = "TipID"; + + public CassandraForest(String _host,int _port,String _ks,int _threads) + { + m_service = Executors.newFixedThreadPool(_threads,new RequestSenderFactory(_host,_port,_ks)); + } + + @Override + public Node get(NodeID _id) + { + + return new CassandraNode(this,_id); + } + + @Override + public Node create() + { + return createNode(null,null); //create new node + } + + public NodeData getNodeData(NodeID _id) + { + final NodeID id = _id; + + //create future task. + Callable<List<ColumnOrSuperColumn>> task = new Callable<List<ColumnOrSuperColumn>>(){ + @Override + public List<ColumnOrSuperColumn> call() throws Exception + { + RequestSender sender = (RequestSender)Thread.currentThread(); + List<ColumnOrSuperColumn> res = sender.get_slice(NODETABLE,id.toString(),ConsistencyLevel.ONE); + return res; + } + }; + Future<List<ColumnOrSuperColumn>> future = m_service.submit(task); + + NodeData data = new NodeData(); + try{ + List<ColumnOrSuperColumn> slice = future.get(); + + //iterate column + for(ColumnOrSuperColumn column : slice){ + String name = new String(column.column.name); + + //if column name matches CHILDREN , deserialize value to child list. + if(name.equals(CHILDREN)){ + List<Node> tmp = deserialize(new String(column.column.value)); + data.add(tmp); + }else{ + String key = name.substring(1); //size of prefix + data.set(key.getBytes(),column.column.value); + } + } + }catch(Exception _e){ + _e.printStackTrace(); + } + + return data; + } + + public List<CassandraNode> multiCreateNode(List<CassandraNode> _list) + { + final Map<String,Map<String,List<Mutation>>> mutationMap = new HashMap<String,Map<String,List<Mutation>>>(); + + Map<String,List<Mutation>> nodeTable = new HashMap<String,List<Mutation>>(); + Map<String,List<Mutation>> tipTable = new HashMap<String,List<Mutation>>(); + for(CassandraNode node : _list){ + LinkedList<Mutation> list = new LinkedList<Mutation>(); + Mutation mut = new Mutation(); + ColumnOrSuperColumn column = new ColumnOrSuperColumn(); + mut.column_or_supercolumn = column; + column.column.name = CHILDREN.getBytes(); + column.column.value = serialize(node.getData().list()).getBytes(); + list.add(mut); + + for(byte[] key : node.getData().keys()){ + mut = new Mutation(); + column = new ColumnOrSuperColumn(); + mut.column_or_supercolumn = column; + column.column.name = key; + column.column.value = node.getData().get(key); + + list.add(mut); + } + + nodeTable.put(node.getID().toString(),list); + + mut = new Mutation(); + column = new ColumnOrSuperColumn(); + column.column.name = TIPID.getBytes(); + column.column.value = node.getID().getVersion().getBytes(); + list = new LinkedList<Mutation>(); + list.add(mut); + tipTable.put(node.getID().getUUID(),list); + } + mutationMap.put(NODETABLE,nodeTable); + mutationMap.put(TIPTABLE,tipTable); + + Runnable task = new Runnable(){ + @Override + public void run() + { + RequestSender sender = (RequestSender)Thread.currentThread(); + sender.batch_mutate(mutationMap,ConsistencyLevel.ONE); + } + }; + + m_service.execute(task); + + return _list; + } + + /** + * list serializer. + * ex. list{"hoge","fuga"} -> "hoge,fuga" + * @param _list + * @return selialized string + */ + public String serialize(List<Node> _list) + { + String prefix = ""; + StringBuffer buf = new StringBuffer(); + for(Node child : _list){ + buf.append(prefix+child.getID().toString()); + prefix = ","; + } + + return buf.toString(); + } + + /** + * string deserializer. + * ex. "hoge,fuga" -> list{"hoge","fuga"} + * @param _selialized + * @return list + */ + public LinkedList<Node> deserialize(String _serialized) throws IllegalArgumentException + { + StringTokenizer tokens = new StringTokenizer(_serialized,","); + LinkedList<Node> res = new LinkedList<Node>(); + + while(tokens.hasMoreElements()){ + String tmp = tokens.nextToken(); + StringTokenizer uuidAndVer = new StringTokenizer(tmp,"@"); + + try{ + NodeID id = createID(uuidAndVer.nextToken(),uuidAndVer.nextToken()); + res.add(get(id)); + }catch(Exception _e){ + throw new IllegalArgumentException("unable to deserialize string ["+_serialized+"]",_e); + } + } + + return res; + } + + public NodeID getTipID(String _uuid) + { + final String uuid = _uuid; + Callable<byte[]> task = new Callable<byte[]>(){ + @Override + public byte[] call() throws Exception + { + RequestSender sender = (RequestSender)Thread.currentThread(); + byte[] value = sender.get(NODETABLE,uuid,TIPID.getBytes(),ConsistencyLevel.ONE); + return value; + } + }; + + Future<byte[]> future = m_service.submit(task); + + try { + byte[] value = future.get(); + String id = new String(value); + StringTokenizer token = new StringTokenizer(id,"@"); + NodeID nodeID = createID(token.nextToken(),token.nextToken()); + return nodeID; + }catch(Exception _e){ + _e.printStackTrace(); + } + + return null; //not found. + } + + public Node createNode(NodeID _id,NodeData _data) + { + final NodeData data = _data; + final NodeID id = (_id != null) ? _id : createID(null,null); + + Callable<Boolean> task = new Callable<Boolean>(){ + @Override + public Boolean call() throws Exception + { + RequestSender sender = (RequestSender)Thread.currentThread(); + + //mutation map + HashMap<String,Map<String,List<Mutation>>> map = new HashMap<String,Map<String,List<Mutation>>>(); + + /* + * create mutation map for NODETABLE + */ + if(data != null){ + LinkedList<Mutation> list = new LinkedList<Mutation>(); + HashMap<String,List<Mutation>> info = new HashMap<String,List<Mutation>>(); + Iterator<Node> itr = data.list().iterator(); + + /* + * create CSV from child list. + */ + StringBuffer buffer = new StringBuffer(); + for(String prefix = "";itr.hasNext();prefix = ","){ + buffer.append(String.format("%s%s",prefix,itr.next().getID().toString())); + } + Mutation mutChildren = new Mutation(); + ColumnOrSuperColumn children = new ColumnOrSuperColumn(); + children.column.name = CHILDREN.getBytes(); + children.column.value = buffer.toString().getBytes(); + mutChildren.column_or_supercolumn = children; + list.add(mutChildren); + + /* + * + */ + for(byte[] key : data.keys()){ + Mutation mut = new Mutation(); + ColumnOrSuperColumn column = new ColumnOrSuperColumn(); + column.column.name = key; + column.column.value = data.get(key); + mut.column_or_supercolumn = column; + list.add(mut); + } + info.put(id.toString(),list); + + map.put(NODETABLE,info); + } + + /* + * create mutation map for NODEIDTABLE + */ + HashMap<String,List<Mutation>> idtable_mutations = new HashMap<String,List<Mutation>>(); + LinkedList<Mutation> list = new LinkedList<Mutation>(); + + Mutation mutTipID = new Mutation(); + ColumnOrSuperColumn tipID = new ColumnOrSuperColumn(); + tipID.column.name = TIPID.getBytes(); + tipID.column.value = id.getVersion().getBytes(); + mutTipID.column_or_supercolumn = tipID; + + list.add(mutTipID); + idtable_mutations.put(TIPTABLE,list); + + return sender.batch_mutate(map,ConsistencyLevel.ONE); + } + }; + + m_service.submit(task); + + return new CassandraNode(this,id); + } + + public NodeID createID(String _uuid,String _version) + { + return new RandomNodeIDImpl(_uuid,_version); + } + + class RandomNodeIDImpl extends RandomNodeID + { + String m_uuid; + String m_version; + + public RandomNodeIDImpl(String _uuid,String _version) + { + m_uuid = (_uuid != null) ? _uuid : UUID.randomUUID().toString(); + m_version = (_version != null) ? _version : Long.toHexString((new Random()).nextLong()); + } + + @Override + public NodeID create() + { + return new RandomNodeIDImpl(null,null); + } + + @Override + public NodeID update() + { + return new RandomNodeIDImpl(m_uuid,null); + } + + @Override + public String getUUID() + { + return m_uuid; + } + + @Override + public String getVersion() + { + return m_version; + } + } + + private static class RequestSender extends Thread + { + private int m_port; + private String m_host,m_ks; + private Cassandra.Client m_client; + + public RequestSender(Runnable _runnable,String _host,int _port,String _ks) throws TTransportException + { + super(_runnable); + m_port = _port; + m_host = _host; + m_ks = _ks; + + connect(); + } + + public void connect() throws TTransportException + { + TTransport tr = new TSocket(m_host,m_port); + TProtocol proto = new TBinaryProtocol(tr); + m_client = new Cassandra.Client(proto); + + tr.open(); + } + + public static RequestSender newInstance(Runnable _runnable,String _host,int _port,String _ks) + { + RequestSender sender = null; + try { + sender = new RequestSender(_runnable,_host,_port,_ks); + } catch (TTransportException _e) { + _e.printStackTrace(); + } + + return sender; + } + + public byte[] get(String _cf,String _key,byte[] _name,ConsistencyLevel _lv) + { + byte[] ret = null; + + ColumnPath path = new ColumnPath(); + path.column_family = _cf; + path.column = _name; + try { + ColumnOrSuperColumn cors = m_client.get(m_ks,_key,path,_lv); + ret = cors.column.value; + }catch(NotFoundException _e){ + System.out.println(String.format("column not found [%s][%s][%s]",_cf,_key,new String(_name))); + }catch(Exception _e){ + _e.printStackTrace(); + } + + return ret; + } + + public boolean insert(String _cf,String _key,byte[] _name,byte[] _value,ConsistencyLevel _lv) + { + ColumnPath path = new ColumnPath(); + path.column_family = _cf; + path.column = _name; + + try{ + m_client.insert(m_ks,_key,path,_value,System.currentTimeMillis()/1000,_lv); + return true; + }catch(Exception _e){ + _e.printStackTrace(); + } + + return false; + } + + public List<ColumnOrSuperColumn> get_slice(String _cf,String _key,ConsistencyLevel _lv) + { + List<ColumnOrSuperColumn> ret = null; + SliceRange sr = new SliceRange(new byte[0],new byte[0],false,-1); + SlicePredicate sp = new SlicePredicate(); + sp.slice_range = sr; + + try { + ret = m_client.get_slice(m_ks,_key,new ColumnParent(_cf),sp,_lv); + }catch(Exception _e){ + _e.printStackTrace(); + } + + return ret; + } + + public boolean batch_insert(String _cf,Map<String,List<ColumnOrSuperColumn>> _map,ConsistencyLevel _lv) + { + try{ + m_client.batch_insert(m_ks,_cf,_map,_lv); + return true; + }catch(Exception _e){ + _e.printStackTrace(); + } + + return false; + } + + public boolean batch_mutate(Map<String,Map<String,List<Mutation>>> _mutateMap,ConsistencyLevel _lv) + { + try { + m_client.batch_mutate(m_ks,_mutateMap,_lv); + return true; + }catch(Exception _e){ + _e.printStackTrace(); + } + + return false; + } + + public String toString() + { + return "[thread="+this.getName()+",host="+m_host+",port="+m_port+",ks="+m_ks+"]"; + } + } + + private class RequestSenderFactory implements ThreadFactory + { + private int m_port; + private String m_host,m_ks; + + public RequestSenderFactory(String _host,int _port,String _ks) + { + m_host = _host; + m_port = _port; + m_ks = _ks; + } + + @Override + public Thread newThread(Runnable _runnable) + { + return RequestSender.newInstance(_runnable,m_host,m_port,m_ks); + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/treecms/tree/cassandra/v1/CassandraNode.java Thu Feb 24 21:30:18 2011 +0900 @@ -0,0 +1,39 @@ +package treecms.tree.cassandra.v1; + +import treecms.api.Forest; +import treecms.api.Node; +import treecms.api.NodeData; +import treecms.api.NodeID; + +public class CassandraNode implements Node +{ + NodeID m_id; + NodeData m_data; + CassandraForest m_forest; + + public CassandraNode(CassandraForest _forest,NodeID _id) + { + m_id = _id; + m_forest = _forest; + m_data = null; + } + + @Override + public NodeID getID() + { + return m_id; + } + + @Override + public NodeData getData() + { + return (m_data != null) ? m_data : (m_data = m_forest.getNodeData(m_id)); + } + + @Override + public Forest getForest() + { + return m_forest; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/treecms/tree/cassandra/v1/CassandraTree.java Thu Feb 24 21:30:18 2011 +0900 @@ -0,0 +1,98 @@ +package treecms.tree.cassandra.v1; + +import java.util.LinkedList; +import java.util.concurrent.ConcurrentHashMap; +import treecms.api.Forest; +import treecms.api.Node; +import treecms.api.NodeData; +import treecms.api.NodeID; +import treecms.api.Tree; + +public class CassandraTree implements Tree +{ + CassandraNode m_root; + CassandraForest m_forest; + ConcurrentHashMap<String,CassandraNode> m_table; + + CassandraTree(CassandraNode _node,CassandraForest _forest) + { + m_root = _node; + m_forest = _forest; + m_table = new ConcurrentHashMap<String,CassandraNode>(); + } + + @Override + public NodeID getID() + { + return m_root.getID(); + } + + @Override + public NodeData getData() + { + return m_root.getData(); + } + + @Override + public Forest getForest() + { + return m_forest; + } + + @Override + public Node getRoot() + { + return m_root; + } + + @Override + public Node getNodeByUUID(String _uuid) + { + return m_table.get(_uuid); + } + + @Override + public synchronized Node updateTree(Node _target,NodeData _newData) + { + LinkedList<CassandraNode> path = findPath(m_root,(CassandraNode)_target,_newData); + + if(path == null) + { + //not found. + return null; + } + + + //clone + + m_root = path.peekFirst(); + return path.peekLast(); + } + + CassandraNode cloneNode(CassandraNode _target,NodeData _newData) + { + CassandraNode clone = (CassandraNode)m_forest.createNode(_target.getID().update(),_newData); + m_table.put(clone.getID().getUUID(),clone); + return clone; + } + + LinkedList<CassandraNode> findPath(CassandraNode _parent,CassandraNode _target,NodeData _newData) + { + if(_parent.getID().isFamily(_target.getID())){ + //find. + LinkedList<CassandraNode> path = new LinkedList<CassandraNode>(); + path.addFirst(_target); + return path; + } + + for(Node child : _parent.getData().list()){ + LinkedList<CassandraNode> path = findPath((CassandraNode)child,_target,_newData); + if(path != null){ + path.addFirst(_parent); + return path; + } + } + + return null; //not found. + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/treecms/tree/cassandra/v1/CassandraTreeEditor.java Thu Feb 24 21:30:18 2011 +0900 @@ -0,0 +1,38 @@ +package treecms.tree.cassandra.v1; + +import treecms.api.TreeEditor; +import treecms.merger.Merger; +import treecms.merger.ReplaceMerger; + +public class CassandraTreeEditor extends CassandraTree implements TreeEditor +{ + public CassandraTreeEditor(CassandraTree _tree,CassandraForest _forest) + { + super(_tree.m_root,_forest); + } + + @Override + public boolean commit(boolean _force) + { + return false; + } + + @Override + public boolean pull() + { + return false; + } + + @Override + public boolean check() + { + return false; + } + + @Override + public void merge() + { + Merger merger = new ReplaceMerger(); + + } +}