Mercurial > hg > Database > Cassandra
diff src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java @ 0:d485154379c8 default tip
apache-cassandra-0.6.0-rc1-src
author | Shinji KONO <kono@ie.u-ryukyu.ac.jp> |
---|---|
date | Fri, 02 Apr 2010 13:36:02 +0900 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java Fri Apr 02 13:36:02 2010 +0900 @@ -0,0 +1,82 @@ +/** + * + */ +package org.apache.cassandra.service; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.locator.IEndPointSnitch; +import org.apache.cassandra.locator.DatacenterEndPointSnitch; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.utils.FBUtilities; + +/** + * This class will basically will block for the replication factor which is + * provided in the input map. it will block till we recive response from (DC, n) + * nodes. + */ +public class DatacenterWriteResponseHandler extends WriteResponseHandler +{ + private final AtomicInteger blockFor; + private final DatacenterEndPointSnitch endpointsnitch; + private final InetAddress localEndpoint; + + public DatacenterWriteResponseHandler(int blockFor, String table) + { + // Response is been managed by the map so the waitlist size really doesnt matter. + super(blockFor, table); + this.blockFor = new AtomicInteger(blockFor); + endpointsnitch = (DatacenterEndPointSnitch) DatabaseDescriptor.getEndPointSnitch(table); + localEndpoint = FBUtilities.getLocalAddress(); + } + + @Override + public void response(Message message) + { + //Is optimal to check if same datacenter than comparing Arrays. + int b = -1; + try + { + if (endpointsnitch.isInSameDataCenter(localEndpoint, message.getFrom())) + { + b = blockFor.decrementAndGet(); + } + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + responses.add(message); + if (b == 0) + { + //Singnal when Quorum is recived. + condition.signal(); + } + if (logger.isDebugEnabled()) + logger.debug("Processed Message: " + message.toString()); + } +}