comparison src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java @ 0:d485154379c8 default tip

apache-cassandra-0.6.0-rc1-src
author Shinji KONO <kono@ie.u-ryukyu.ac.jp>
date Fri, 02 Apr 2010 13:36:02 +0900
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:d485154379c8
1 /**
2 *
3 */
4 package org.apache.cassandra.service;
5 /*
6 *
7 * Licensed to the Apache Software Foundation (ASF) under one
8 * or more contributor license agreements. See the NOTICE file
9 * distributed with this work for additional information
10 * regarding copyright ownership. The ASF licenses this file
11 * to you under the Apache License, Version 2.0 (the
12 * "License"); you may not use this file except in compliance
13 * with the License. You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing,
18 * software distributed under the License is distributed on an
19 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
20 * KIND, either express or implied. See the License for the
21 * specific language governing permissions and limitations
22 * under the License.
23 *
24 */
25
26
27 import java.net.InetAddress;
28 import java.net.UnknownHostException;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31 import org.apache.cassandra.config.DatabaseDescriptor;
32 import org.apache.cassandra.locator.IEndPointSnitch;
33 import org.apache.cassandra.locator.DatacenterEndPointSnitch;
34 import org.apache.cassandra.net.Message;
35 import org.apache.cassandra.utils.FBUtilities;
36
37 /**
38 * This class will basically will block for the replication factor which is
39 * provided in the input map. it will block till we recive response from (DC, n)
40 * nodes.
41 */
42 public class DatacenterWriteResponseHandler extends WriteResponseHandler
43 {
44 private final AtomicInteger blockFor;
45 private final DatacenterEndPointSnitch endpointsnitch;
46 private final InetAddress localEndpoint;
47
48 public DatacenterWriteResponseHandler(int blockFor, String table)
49 {
50 // Response is been managed by the map so the waitlist size really doesnt matter.
51 super(blockFor, table);
52 this.blockFor = new AtomicInteger(blockFor);
53 endpointsnitch = (DatacenterEndPointSnitch) DatabaseDescriptor.getEndPointSnitch(table);
54 localEndpoint = FBUtilities.getLocalAddress();
55 }
56
57 @Override
58 public void response(Message message)
59 {
60 //Is optimal to check if same datacenter than comparing Arrays.
61 int b = -1;
62 try
63 {
64 if (endpointsnitch.isInSameDataCenter(localEndpoint, message.getFrom()))
65 {
66 b = blockFor.decrementAndGet();
67 }
68 }
69 catch (UnknownHostException e)
70 {
71 throw new RuntimeException(e);
72 }
73 responses.add(message);
74 if (b == 0)
75 {
76 //Singnal when Quorum is recived.
77 condition.signal();
78 }
79 if (logger.isDebugEnabled())
80 logger.debug("Processed Message: " + message.toString());
81 }
82 }