Mercurial > hg > Database > Cassandra
comparison src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java @ 0:d485154379c8 default tip
apache-cassandra-0.6.0-rc1-src
author | Shinji KONO <kono@ie.u-ryukyu.ac.jp> |
---|---|
date | Fri, 02 Apr 2010 13:36:02 +0900 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:d485154379c8 |
---|---|
1 /** | |
2 * | |
3 */ | |
4 package org.apache.cassandra.service; | |
5 /* | |
6 * | |
7 * Licensed to the Apache Software Foundation (ASF) under one | |
8 * or more contributor license agreements. See the NOTICE file | |
9 * distributed with this work for additional information | |
10 * regarding copyright ownership. The ASF licenses this file | |
11 * to you under the Apache License, Version 2.0 (the | |
12 * "License"); you may not use this file except in compliance | |
13 * with the License. You may obtain a copy of the License at | |
14 * | |
15 * http://www.apache.org/licenses/LICENSE-2.0 | |
16 * | |
17 * Unless required by applicable law or agreed to in writing, | |
18 * software distributed under the License is distributed on an | |
19 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
20 * KIND, either express or implied. See the License for the | |
21 * specific language governing permissions and limitations | |
22 * under the License. | |
23 * | |
24 */ | |
25 | |
26 | |
27 import java.net.InetAddress; | |
28 import java.net.UnknownHostException; | |
29 import java.util.concurrent.atomic.AtomicInteger; | |
30 | |
31 import org.apache.cassandra.config.DatabaseDescriptor; | |
32 import org.apache.cassandra.locator.IEndPointSnitch; | |
33 import org.apache.cassandra.locator.DatacenterEndPointSnitch; | |
34 import org.apache.cassandra.net.Message; | |
35 import org.apache.cassandra.utils.FBUtilities; | |
36 | |
37 /** | |
38 * This class will basically will block for the replication factor which is | |
39 * provided in the input map. it will block till we recive response from (DC, n) | |
40 * nodes. | |
41 */ | |
42 public class DatacenterWriteResponseHandler extends WriteResponseHandler | |
43 { | |
44 private final AtomicInteger blockFor; | |
45 private final DatacenterEndPointSnitch endpointsnitch; | |
46 private final InetAddress localEndpoint; | |
47 | |
48 public DatacenterWriteResponseHandler(int blockFor, String table) | |
49 { | |
50 // Response is been managed by the map so the waitlist size really doesnt matter. | |
51 super(blockFor, table); | |
52 this.blockFor = new AtomicInteger(blockFor); | |
53 endpointsnitch = (DatacenterEndPointSnitch) DatabaseDescriptor.getEndPointSnitch(table); | |
54 localEndpoint = FBUtilities.getLocalAddress(); | |
55 } | |
56 | |
57 @Override | |
58 public void response(Message message) | |
59 { | |
60 //Is optimal to check if same datacenter than comparing Arrays. | |
61 int b = -1; | |
62 try | |
63 { | |
64 if (endpointsnitch.isInSameDataCenter(localEndpoint, message.getFrom())) | |
65 { | |
66 b = blockFor.decrementAndGet(); | |
67 } | |
68 } | |
69 catch (UnknownHostException e) | |
70 { | |
71 throw new RuntimeException(e); | |
72 } | |
73 responses.add(message); | |
74 if (b == 0) | |
75 { | |
76 //Singnal when Quorum is recived. | |
77 condition.signal(); | |
78 } | |
79 if (logger.isDebugEnabled()) | |
80 logger.debug("Processed Message: " + message.toString()); | |
81 } | |
82 } |