Mercurial > hg > Database > Alice
changeset 615:82736f6fae50 dispose
merge
author | surug |
---|---|
date | Mon, 24 Jul 2017 19:16:35 +0900 |
parents | 9324ef9728c0 (current diff) 86b39f5bf1d7 (diff) |
children | 747bcd5bbba1 |
files | Alice.iml build.gradle |
diffstat | 23 files changed, 163 insertions(+), 117 deletions(-) [+] |
line wrap: on
line diff
--- a/.hgignore Mon Jul 24 19:15:01 2017 +0900 +++ b/.hgignore Mon Jul 24 19:16:35 2017 +0900 @@ -7,4 +7,8 @@ ^Alice\.jar$ syntax: regexp syntax: glob -.classpath \ No newline at end of file +.classpath + +build +.gradle +.idea
--- a/.hgtags Mon Jul 24 19:15:01 2017 +0900 +++ b/.hgtags Mon Jul 24 19:16:35 2017 +0900 @@ -1,2 +1,3 @@ d7a3ecc8c4a193008f48513e24d8bd50481f0cc2 working d5d9ca4cbe87215b6a6e97e141b3cf98e095687d fish-example-worked +924e5f52a61f58687719fe36f8d5e4a76472d180 before-multi-topology-manager
--- a/src/main/java/alice/daemon/AliceDaemon.java Mon Jul 24 19:15:01 2017 +0900 +++ b/src/main/java/alice/daemon/AliceDaemon.java Mon Jul 24 19:16:35 2017 +0900 @@ -7,39 +7,39 @@ import java.net.ServerSocket; import java.nio.channels.ServerSocketChannel; -import org.apache.log4j.Logger; -import org.apache.log4j.PatternLayout; -import org.apache.log4j.WriterAppender; +//import org.apache.log4j.Logger; +//import org.apache.log4j.PatternLayout; +//import org.apache.log4j.WriterAppender; public class AliceDaemon { private Config conf; private AcceptThread acceptThread; - private Logger log = Logger.getLogger(AliceDaemon.class); + //private Logger log = Logger.getLogger(AliceDaemon.class); public AliceDaemon(Config conf) { this.conf = conf; - setLogger(); + //setLogger(); } - private void setLogger() { - Logger root = Logger.getRootLogger(); - if (conf.level != null) - root.setLevel(conf.level); - if (conf.logFile == null) - return; - PatternLayout layout = new PatternLayout(); - layout.setConversionPattern("%d %-5p %c - %m [%t] (%F:%L)%n"); - try { - FileWriter writer = new FileWriter(conf.logFile); - WriterAppender writerAppender = new WriterAppender(layout, writer); - root.removeAllAppenders(); - root.addAppender(writerAppender); - } catch (IOException e) { - e.printStackTrace(); - } - log.info("configured"); - } +// private void setLogger() { +// Logger root = Logger.getRootLogger(); +// if (conf.level != null) +// root.setLevel(conf.level); +// if (conf.logFile == null) +// return; +// PatternLayout layout = new PatternLayout(); +// layout.setConversionPattern("%d %-5p %c - %m [%t] (%F:%L)%n"); +// try { +// FileWriter writer = new FileWriter(conf.logFile); +// WriterAppender writerAppender = new WriterAppender(layout, writer); +// root.removeAllAppenders(); +// root.addAppender(writerAppender); +// } catch (IOException e) { +// e.printStackTrace(); +// } +// log.info("configured"); +// } public void listen() { try {
--- a/src/main/java/alice/daemon/CommandMessage.java Mon Jul 24 19:15:01 2017 +0900 +++ b/src/main/java/alice/daemon/CommandMessage.java Mon Jul 24 19:16:35 2017 +0900 @@ -12,25 +12,27 @@ public int seq;//DSの待ち合わせを行っているCSを表すunique number public String key;//DS key public boolean quickFlag = false;//SEDAを挟まずに処理を行うかどうか - public boolean serialized = false;//シリアライズされているかどうか public boolean compressed = false;//圧縮されているかどうか - public int dataSize = 0; + public int dataSize = 0;//圧縮前のサイズ - public boolean setTime = false;//? - public long time;//? - public int depth;//? + //計測用 + public boolean setTime = false; + public long time; + public int depth; + public boolean setZepped = false; + public int zippedDataSize;//圧縮後のサイズ public CommandMessage() {} public CommandMessage(int type, int index, int seq, String key - , boolean qFlag, boolean sFlag, boolean cFlag, int datasize) { + , boolean qFlag, boolean cFlag, int datasize) { this.type = type; this.index = index; this.seq = seq; this.key = key; this.quickFlag = qFlag; - this.serialized = sFlag; this.compressed = cFlag; this.dataSize = datasize; } + }
--- a/src/main/java/alice/daemon/ConnectionInfo.java Mon Jul 24 19:15:01 2017 +0900 +++ b/src/main/java/alice/daemon/ConnectionInfo.java Mon Jul 24 19:16:35 2017 +0900 @@ -15,7 +15,7 @@ public ConnectionInfo(String name, Socket socket){ this.nodeName = name; - this.hostname = socket.getInetAddress().getHostName(); + this.hostname = socket.getInetAddress().getHostAddress(); this.addr = socket.getInetAddress().getHostAddress(); this.port = socket.getPort(); }
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java Mon Jul 24 19:15:01 2017 +0900 +++ b/src/main/java/alice/daemon/IncomingTcpConnection.java Mon Jul 24 19:16:35 2017 +0900 @@ -59,16 +59,16 @@ switch (type) { case UPDATE: case PUT: - if (msg.compressed) { - rData = new ReceiveData(packer.read(unpacker.getSerializedByteArray(unpacker.readInt()), byte[].class), true, msg.dataSize); - } else { - rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), false, msg.dataSize); - } + int dataSize = unpacker.readInt(); + rData = new ReceiveData(unpacker.getSerializedByteArray(dataSize), msg.compressed, msg.dataSize); if (msg.setTime) { - rData.setTime = true; - rData.time = msg.time; - rData.depth = msg.depth; + rData.setTimes(msg.time, true, msg.depth); + } + if (msg.setZepped){ + rData.setZipped(msg.zippedDataSize, true); + } else { + rData.setZipped(dataSize, true); } cmd = new Command(type, null, null, rData, 0, 0, null, null, reverseKey); @@ -100,11 +100,7 @@ case REPLY: cmd = manager.getAndRemoveCmd(msg.seq); - if (msg.compressed) { - rData = new ReceiveData(packer.read(unpacker.getSerializedByteArray(unpacker.readInt()), byte[].class), true, msg.dataSize); - } else { - rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), false, msg.dataSize); - } + rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), msg.compressed, msg.dataSize); Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, ""); cmd.setCompressFlag(msg.compressed);
--- a/src/main/java/alice/daemon/IncomingUdpConnection.java Mon Jul 24 19:15:01 2017 +0900 +++ b/src/main/java/alice/daemon/IncomingUdpConnection.java Mon Jul 24 19:16:35 2017 +0900 @@ -73,7 +73,6 @@ getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd); break; case REPLY: - System.out.println("in UDP REPLY"); cmd = manager.getAndRemoveCmd(msg.seq); rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()));
--- a/src/main/java/alice/datasegment/Command.java Mon Jul 24 19:15:01 2017 +0900 +++ b/src/main/java/alice/datasegment/Command.java Mon Jul 24 19:16:35 2017 +0900 @@ -80,8 +80,6 @@ byte[] header = null; byte[] data = null; byte[] dataSize = null; - boolean serialized = false; - boolean compressed = false; switch (type) { /* * UPDATE, PUT, REPLY need send DataSegment to RemoteDataSegment @@ -95,19 +93,21 @@ case PUT: case REPLY: if(compressFlag){ - // ToDo: Do not pack again - data = packer.write(rData.getZMessagePack()); - compressed = true; + data = rData.getZMessagePack(); } else { data = rData.getMessagePack(); - serialized = true; } - CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, serialized, compressed, rData.getDataSize()); - if (rData.setTime) { + CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, compressFlag, rData.getDataSize()); + if (rData.getSetTime()) { cm.setTime = true; - cm.time = rData.time; - cm.depth = rData.depth + 1; + cm.time = rData.getTime(); + cm.depth = rData.getDepth() + 1; + } + + if (rData.getSetZipped()){ + cm.setZepped = true; + cm.zippedDataSize = rData.getZippedDataSize(); } header = packer.write(cm); @@ -118,7 +118,7 @@ buf.put(data); break; default: - header = packer.write(new CommandMessage(type.id, index, seq, key, quickFlag, false, compressFlag, 0)); + header = packer.write(new CommandMessage(type.id, index, seq, key, quickFlag, compressFlag, 0)); buf = ByteBuffer.allocate(header.length); buf.put(header); break;
--- a/src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java Mon Jul 24 19:15:01 2017 +0900 +++ b/src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java Mon Jul 24 19:16:35 2017 +0900 @@ -17,30 +17,12 @@ public CompressedLocalDataSegmentManager(LocalDataSegmentManager manager) { this.manager = manager; - new Thread(replyThread, "CompressedLocalDataSegmentManager-replyCommand").start(); } public void setReverseKey(String s){ reverseKey = s; } - private class RunCommand implements Runnable { - - DataSegmentKey key; - Command cmd; - - public RunCommand(DataSegmentKey key, Command cmd) { - this.key = key; - this.cmd = cmd; - } - - @Override - public void run() { - key.runCommand(cmd); - } - - } - public void submitCommand(DataSegmentKey key, Command cmd) { manager.submitCommand(key, cmd); }
--- a/src/main/java/alice/datasegment/DataSegmentKey.java Mon Jul 24 19:15:01 2017 +0900 +++ b/src/main/java/alice/datasegment/DataSegmentKey.java Mon Jul 24 19:16:35 2017 +0900 @@ -1,5 +1,6 @@ package alice.datasegment; +import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; @@ -85,6 +86,14 @@ } public void replyValue(Command cmd, DataSegmentValue data, boolean cFlag){ + if (cFlag && !data.rData.compressed()){ + try { + data.rData.zip(); + System.out.println("in reply zip"); + } catch (IOException e) { + e.printStackTrace(); + } + } Command rCmd = new Command(CommandType.REPLY, null, null, data.rData, data.index, cmd.seq, null, null, data.from); rCmd.setCompressFlag(cFlag);
--- a/src/main/java/alice/datasegment/DataSegmentManager.java Mon Jul 24 19:15:01 2017 +0900 +++ b/src/main/java/alice/datasegment/DataSegmentManager.java Mon Jul 24 19:16:35 2017 +0900 @@ -39,7 +39,7 @@ }; public Command getAndRemoveCmd(int index){ - System.err.println("DSM getAndRemoveCmd seq : " + index); + //System.err.println("DSM getAndRemoveCmd seq : " + index); return seqHash.remove(index); }
--- a/src/main/java/alice/datasegment/ReceiveData.java Mon Jul 24 19:15:01 2017 +0900 +++ b/src/main/java/alice/datasegment/ReceiveData.java Mon Jul 24 19:16:35 2017 +0900 @@ -16,12 +16,14 @@ private Object val;//for Object DS private byte[] messagePack;//for byteArray(serialized) DS private byte[] zMessagePack;//for byteArray(compressed) DS - private int dataSize; + private int dataSize;//圧縮前(MessagePack)のデータサイズ private Class<?> clazz; - public long time;//測定用 - public boolean setTime = false; - public int depth = 1; + private long time;//測定用 + private boolean setTime = false; + private int depth = 1; + private boolean setZepped = false; + private int zippedDataSize;//圧縮後のデータサイズ private static final MessagePack packer = new MessagePack(); @@ -100,7 +102,6 @@ * @return */ public <T> T asClass(Class<T> clazz) {///javasist - System.out.println("in asClass val:" + val + ", MP:" + messagePack + ", zMP:" + zMessagePack); try { if (val != null) { @@ -136,7 +137,6 @@ public byte[] getZMessagePack(){ if (zMessagePack != null){ - System.out.println("have zMessagePack"); return zMessagePack; } else { try { @@ -151,7 +151,6 @@ } public void zip() throws IOException { - System.out.println("in zip"); LinkedList<ByteBuffer> inputs = new LinkedList<ByteBuffer>(); int inputIndex = 0; LinkedList<ByteBuffer> outputs = new LinkedList<ByteBuffer>(); @@ -197,40 +196,23 @@ System.arraycopy(outputs.get(i).array(), 0, zMessagePack, 0 + tmp, outputs.get(i).limit());//limit? remaining? tmp += outputs.get(i).limit(); } - System.out.print("in make zMessagePack2: "); - for (int i = 0; i < zMessagePack.length; i++) { - System.out.print(Integer.toHexString(zMessagePack[i] & 0xff)); - } - System.out.print("\n"); } - protected byte[] unzip(byte[] input, int zippedLength) {///read header & unzip + protected byte[] unzip(byte[] input, int dataSize) {///read header & unzip int length = input.length; Inflater inflater = new Inflater(); - System.out.print("unziped input: "); - for (int i = 0; i < input.length; i++) { - System.out.print(Integer.toHexString(input[i] & 0xff)); - } - System.out.print("\n"); - - byte [] output = new byte [zippedLength];///byteArray for unziped data + byte [] output = new byte [dataSize];///byteArray for unziped data inflater.setInput(input, 0, length);///set unzip data without header try { - inflater.inflate(output, 0, zippedLength);///unzip + inflater.inflate(output, 0, dataSize);///unzip } catch (DataFormatException e) { e.printStackTrace(); } inflater.reset(); - System.out.print("unziped: "); - for (int i = 0; i < output.length; i++) { - System.out.print(Integer.toHexString(output[i] & 0xff)); - } - System.out.print("\n"); - return output; } @@ -264,4 +246,35 @@ this.dataSize = datasize; } + public void setTimes(long time, boolean setTime, int depth){ + this.time = time; + this.setTime = setTime; + this.depth = depth; + } + + public long getTime(){ + return this.time; + } + + public boolean getSetTime(){ + return this.setTime; + } + + public int getDepth(){ + return this.depth; + } + + public void setZipped(int zippedDataSize, boolean setZepped){ + this.zippedDataSize = zippedDataSize; + this.setZepped = setZepped; + } + + public int getZippedDataSize(){ + return this.zippedDataSize; + } + + public boolean getSetZipped(){ + return this.setZepped; + } + }
--- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Mon Jul 24 19:15:01 2017 +0900 +++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Mon Jul 24 19:16:35 2017 +0900 @@ -18,7 +18,7 @@ public RemoteDataSegmentManager(){} - public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port) { + public RemoteDataSegmentManager(final String connectionKey, final String reverseKey, final String hostName, final int port) { logger = Logger.getLogger(connectionKey); connection = new Connection(); connection.name = connectionKey; @@ -92,7 +92,7 @@ public void take1(boolean quickFlag, Command cmd) { int seq = this.seq.getAndIncrement(); - System.err.println("DataSegment take seq :" + seq); + //System.err.println("DataSegment take seq :" + seq); cmd.setSeq(seq); seqHash.put(seq, cmd); cmd.setQuickFlag(quickFlag);
--- a/src/main/java/alice/test/codesegment/remote/RemoteIncrement.java Mon Jul 24 19:15:01 2017 +0900 +++ b/src/main/java/alice/test/codesegment/remote/RemoteIncrement.java Mon Jul 24 19:16:35 2017 +0900 @@ -13,14 +13,20 @@ */ @Override public void run() { + String z = ""; + if (num.isCompressed()){ + z = "zMP"; + } int num = this.num.asInteger(); - System.out.println("[CodeSegment] " + num++); - if (num == 10) System.exit(0); + System.out.println("[CodeSegment" + z + "] " + num++); + if (num == 5) System.exit(0); RemoteIncrement cs = new RemoteIncrement(); - cs.num.setKey("compressedremote", "num"); - ods.put("compressedlocal", "num", num); + ods.put("compressedremote", "num", num); + ods.put("remote", "num", num); + + cs.num.setKey("compressedlocal", "num"); } } \ No newline at end of file
--- a/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java Mon Jul 24 19:15:01 2017 +0900 +++ b/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java Mon Jul 24 19:16:35 2017 +0900 @@ -7,8 +7,9 @@ @Override public void run() { RemoteIncrement cs = new RemoteIncrement(); - cs.num.setKey("compressedremote", "num"); - ods.put("compressedlocal", "num", 0); + ods.put("compressedremote", "num", 0); + + cs.num.setKey("compressedlocal", "num"); } } \ No newline at end of file
--- a/src/main/java/alice/test/topology/aquarium/fx/Aquarium.java Mon Jul 24 19:15:01 2017 +0900 +++ b/src/main/java/alice/test/topology/aquarium/fx/Aquarium.java Mon Jul 24 19:16:35 2017 +0900 @@ -31,7 +31,7 @@ @Override public void start(Stage primaryStage) throws IOException { - String myName = getParameters().getRaw().get(0); // name + final String myName = getParameters().getRaw().get(0); // name primaryStage.setTitle("Aquarium "+ myName); primaryStage.setResizable(false); primaryStage.setOnCloseRequest(new EventHandler<WindowEvent>(){
--- a/src/main/java/alice/topology/manager/CreateTreeTopology.java Mon Jul 24 19:15:01 2017 +0900 +++ b/src/main/java/alice/topology/manager/CreateTreeTopology.java Mon Jul 24 19:16:35 2017 +0900 @@ -31,7 +31,7 @@ @Override public void run() { String cookie = info3.asString(); - System.out.println(cookie); + System.out.println("cookie:" + cookie); HostMessage host = info.asClass(HostMessage.class); int comingHostCount = info1.asInteger(); ParentManager manager = info6.asClass(ParentManager.class);
--- a/src/main/java/alice/topology/node/SaveCookie.java Mon Jul 24 19:15:01 2017 +0900 +++ b/src/main/java/alice/topology/node/SaveCookie.java Mon Jul 24 19:16:35 2017 +0900 @@ -14,7 +14,7 @@ @Override public void run() { - System.out.println(info.asString()); + System.out.println("SaveCookie:" + info.asString()); }
--- a/src/main/java/alice/topology/node/StartTopologyNode.java Mon Jul 24 19:15:01 2017 +0900 +++ b/src/main/java/alice/topology/node/StartTopologyNode.java Mon Jul 24 19:16:35 2017 +0900 @@ -23,7 +23,7 @@ DataSegment.connect("manager", "manager", conf.getManagerHostName(), conf.getManagerPort()); String localHostName = null; try { - localHostName = InetAddress.getLocalHost().getHostName(); + localHostName = InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException e) { e.printStackTrace(); }
--- a/src/main/java/alice/topology/node/TopologyNodeConfig.java Mon Jul 24 19:15:01 2017 +0900 +++ b/src/main/java/alice/topology/node/TopologyNodeConfig.java Mon Jul 24 19:16:35 2017 +0900 @@ -1,6 +1,8 @@ package alice.topology.node; import java.util.ArrayList; +import java.util.LinkedList; + import alice.daemon.Config; public class TopologyNodeConfig extends Config { @@ -10,7 +12,6 @@ public String cookie; private ArrayList<Class> eventList = new ArrayList<Class>(); - public TopologyNodeConfig(String[] args) { super(args); for (int i = 0; i< args.length; i++) { @@ -24,6 +25,36 @@ } } + public static LinkedList<TopologyNodeConfig> MultiTopologyNodeCongingFactory(String[] args) { + LinkedList<TopologyNodeConfig> configList = new LinkedList<TopologyNodeConfig>(); + LinkedList<String> nodeConfigArgs = new LinkedList<String>(); + + for(int i = 0; i < args.length; ++i) { + if("-host".equals(args[i])) { + if(!nodeConfigArgs.isEmpty()){ + configList.add(new TopologyNodeConfig((String[]) nodeConfigArgs.toArray())); + nodeConfigArgs = new LinkedList<String>(); + } + ++i; + nodeConfigArgs.add("-host"); + nodeConfigArgs.add(args[i]); + } else if("-port".equals(args[i])) { + ++i; + nodeConfigArgs.add("-port"); + nodeConfigArgs.add(args[i]); + } else if("-cookie".equals(args[i])) { + ++i; + nodeConfigArgs.add("-cookie"); + nodeConfigArgs.add(args[i]); + } + } + + configList.add(new TopologyNodeConfig(nodeConfigArgs.toArray(new String[] {}))); + + return configList; + + } + public String getManagerHostName() { return managerHostName; } @@ -49,8 +80,10 @@ for (Class clazz : eventList) try { clazz.newInstance(); - } catch (InstantiationException | IllegalAccessException e) { + } catch (InstantiationException e) { e.printStackTrace(); + } catch (IllegalAccessException e){ + } }