Commit 41453776 authored by 谢建斌's avatar 谢建斌

raft迁移到Docker

parent 6504913a
...@@ -21,6 +21,11 @@ ...@@ -21,6 +21,11 @@
</modules> </modules>
<dependencies> <dependencies>
<dependency>
<groupId>top.ninwoo</groupId>
<artifactId>cnf-client-starter</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency> <dependency>
<groupId>org.projectlombok</groupId> <groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId> <artifactId>lombok</artifactId>
...@@ -62,7 +67,6 @@ ...@@ -62,7 +67,6 @@
<version>${spring.version}</version> <version>${spring.version}</version>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
</dependencies> </dependencies>
<build> <build>
...@@ -71,15 +75,18 @@ ...@@ -71,15 +75,18 @@
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId> <artifactId>spring-boot-maven-plugin</artifactId>
<version>${springboot.version}</version> <version>${springboot.version}</version>
<executions> </plugin>
<execution> <plugin>
<goals> <groupId>org.apache.maven.plugins</groupId>
<goal>repackage</goal> <artifactId>maven-compiler-plugin</artifactId>
</goals> <!-- <version>3.7.0</version>
</execution> <configuration>
</executions> <source>1.8</source>
<target>1.8</target>
</configuration>-->
</plugin> </plugin>
</plugins> </plugins>
</build> </build>
</project> </project>
\ No newline at end of file
package com.jbxie.spaceiotflooding.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import top.ninwoo.bishe.starter.service.NetworkService;
import java.util.List;
@Service
public class IPService {
@Autowired
private NetworkService networkService;
public String getIpByAppName(String appName) {
List<String> ipList = networkService.getIpListByAppName(11111l, appName);
if (!ipList.isEmpty()) {
String ip_tmp = ipList.get(0);
String[] split_list = ip_tmp.split("/");
return split_list[0];
}
return null;
}
}
package com.jbxie.spaceiotflooding.service; //package com.jbxie.spaceiotflooding.service;
//
//
import com.jbxie.spaceiotflooding.entity.DataPackage; //import com.jbxie.spaceiotflooding.entity.DataPackage;
import com.jbxie.spaceiotflooding.entity.Node; //import com.jbxie.spaceiotflooding.entity.Node;
import org.slf4j.Logger; //import org.slf4j.Logger;
import org.slf4j.LoggerFactory; //import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import top.ninwoo.common.entity.DockerContainer; //import top.ninwoo.common.entity.DockerContainer;
//
import java.io.*; //import java.io.*;
import java.net.ServerSocket; //import java.net.ServerSocket;
import java.net.Socket; //import java.net.Socket;
import java.util.ArrayList; //import java.util.ArrayList;
import java.util.List; //import java.util.List;
//
public class NodeService { //public class NodeService {
private static final Logger logger = LoggerFactory.getLogger(NodeService.class); // private static final Logger logger = LoggerFactory.getLogger(NodeService.class);
//
@Autowired // @Autowired
DockerContainer dockerContainer; // DockerContainer dockerContainer;
//
@Autowired // @Autowired
IPService ipService; // IPService ipService;
//
/** // /**
* 获取当前节点名 // * 获取当前节点名
* (需要重写) // * (需要重写)
*/ // */
public String getName() { // public String getName() {
return dockerContainer.getName(); // return dockerContainer.getName();
} // }
//
/** // /**
* 当前节点接收到数据包进行处理 // * 当前节点接收到数据包进行处理
* 先写一个单向的 // * 先写一个单向的
* @param dataPackage // * @param dataPackage
*/ // */
private void accept(DataPackage dataPackage) { // private void accept(DataPackage dataPackage) {
String currentName = getName(); // String currentName = getName();
int currentHop = getCurrentHop(currentName); // int currentHop = getCurrentHop(currentName);
Node node = new Node(currentName); // Node node = new Node(currentName);
ServerSocket serverSocket = null; // ServerSocket serverSocket = null;
InputStream inputStream = null; // InputStream inputStream = null;
ObjectInputStream objectInputStream = null; // ObjectInputStream objectInputStream = null;
try { // try {
serverSocket = new ServerSocket(2021); // serverSocket = new ServerSocket(2021);
while (true) { // while (true) {
Socket client = serverSocket.accept(); // Socket client = serverSocket.accept();
inputStream = client.getInputStream(); // inputStream = client.getInputStream();
objectInputStream = new ObjectInputStream(new BufferedInputStream(inputStream)); // objectInputStream = new ObjectInputStream(new BufferedInputStream(inputStream));
Object object = objectInputStream.readObject(); // Object object = objectInputStream.readObject();
DataPackage dataPackage1 = (DataPackage) object; // DataPackage dataPackage1 = (DataPackage) object;
//
List<String> businessList = node.getBusinessList(); // List<String> businessList = node.getBusinessList();
String businessName = dataPackage.getBusinessName(); // String businessName = dataPackage.getBusinessName();
ArrayList<String> route = dataPackage.getRoute(); // ArrayList<String> route = dataPackage.getRoute();
route.add("sate" + currentHop); // route.add("sate" + currentHop);
// 如果计数器仍然等于零或当前节点已经是最终节点,则打印路由信息 // // 如果计数器仍然等于零或当前节点已经是最终节点,则打印路由信息
// 如果节点业务列表已包含当前传输的业务则终止 // // 如果节点业务列表已包含当前传输的业务则终止
// 否则继续传输 // // 否则继续传输
if (dataPackage.getCounter() == 0) { // if (dataPackage.getCounter() == 0) {
logger.info("传输失败,已超出生命周期:" + dataPackage); // logger.info("传输失败,已超出生命周期:" + dataPackage);
continue; // continue;
} else if (dataPackage.getPubIp() == ipService.getIpByAppName(currentName)) { // } else if (dataPackage.getPubIp() == ipService.getIpByAppName(currentName)) {
logger.info("传输成功: " + dataPackage); // logger.info("传输成功: " + dataPackage);
continue; // continue;
} else if (businessList.contains(businessName)){ // } else if (businessList.contains(businessName)){
logger.info("该节点已处理,该链路传输终止"); // logger.info("该节点已处理,该链路传输终止");
continue; // continue;
} else { // } else {
dataPackage.decrement(); // dataPackage.decrement();
businessList.add(businessName); // businessList.add(businessName);
node.setBusinessList(businessList); // node.setBusinessList(businessList);
List<Integer> relativeNodesList = getLink(currentName, dataPackage); // List<Integer> relativeNodesList = getLink(currentName, dataPackage);
for (int nextNode : relativeNodesList) { // for (int nextNode : relativeNodesList) {
transferPackage(dataPackage, nextNode); // transferPackage(dataPackage, nextNode);
} // }
continue; // continue;
} // }
} // }
//
} catch (IOException | ClassNotFoundException e) { // } catch (IOException | ClassNotFoundException e) {
e.printStackTrace(); // e.printStackTrace();
} finally { // } finally {
try { // try {
if (objectInputStream != null) { // if (objectInputStream != null) {
objectInputStream.close(); // objectInputStream.close();
} // }
if (inputStream != null) { // if (inputStream != null) {
inputStream.close(); // inputStream.close();
} // }
if (serverSocket != null) { // if (serverSocket != null) {
serverSocket.close(); // serverSocket.close();
} // }
} catch (IOException e) { // } catch (IOException e) {
e.printStackTrace(); // e.printStackTrace();
} // }
} // }
//
//
} // }
//
/** // /**
* 根据节点名和拓扑获取当前节点连接关系 // * 根据节点名和拓扑获取当前节点连接关系
* @param name // * @param name
* @param dataPackage // * @param dataPackage
* @return // * @return
*/ // */
private List<Integer> getLink(String name, DataPackage dataPackage) { // private List<Integer> getLink(String name, DataPackage dataPackage) {
List<Integer> relativeNodesList = new ArrayList<>(); // List<Integer> relativeNodesList = new ArrayList<>();
int[][] nodeLink = dataPackage.getNodeLink(); // int[][] nodeLink = dataPackage.getNodeLink();
int currentHop = getCurrentHop(name); // int currentHop = getCurrentHop(name);
for (int i = 0; i < nodeLink[currentHop - 1].length; i++) { // for (int i = 0; i < nodeLink[currentHop - 1].length; i++) {
if (i == 1) { // if (i == 1) {
relativeNodesList.add(i + 1); // relativeNodesList.add(i + 1);
} // }
} // }
return relativeNodesList; // return relativeNodesList;
} // }
//
/** // /**
* 获取当前节点序列号 // * 获取当前节点序列号
* @param name // * @param name
* @return // * @return
*/ // */
private int getCurrentHop (String name) { // private int getCurrentHop (String name) {
if(name == null || name.length() == 0) { // if(name == null || name.length() == 0) {
throw new RuntimeException("节点名序号错误"); // throw new RuntimeException("节点名序号错误");
} // }
return Integer.valueOf(name.substring(4, name.length())); // return Integer.valueOf(name.substring(4, name.length()));
} // }
//
/** // /**
* 发送数据包 // * 发送数据包
* @param dataPackage // * @param dataPackage
* @param nextHop // * @param nextHop
*/ // */
private void transferPackage(DataPackage dataPackage, int nextHop) { // private void transferPackage(DataPackage dataPackage, int nextHop) {
//获取目标节点ip // //获取目标节点ip
String node_name = "node" + nextHop; // String node_name = "node" + nextHop;
String IP = ipService.getIpByAppName(node_name); // String IP = ipService.getIpByAppName(node_name);
//发送至下一节点 // //发送至下一节点
send(dataPackage, IP, 2021); // send(dataPackage, IP, 2021);
} // }
//
private void send(DataPackage dataPackage, String ip, int port) { // private void send(DataPackage dataPackage, String ip, int port) {
Socket socket = null; // Socket socket = null;
try { // try {
socket = new Socket(ip, port); // socket = new Socket(ip, port);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); // ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(dataPackage); // objectOutputStream.writeObject(dataPackage);
} catch (IOException e) { // } catch (IOException e) {
e.printStackTrace(); // e.printStackTrace();
} finally { // } finally {
try { // try {
if (socket != null) { // if (socket != null) {
socket.close(); // socket.close();
} // }
} catch (IOException e) { // } catch (IOException e) {
e.printStackTrace(); // e.printStackTrace();
} // }
} // }
} // }
//
} //}
...@@ -51,6 +51,10 @@ ...@@ -51,6 +51,10 @@
<artifactId>org.eclipse.paho.client.mqttv3</artifactId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version> <version>1.2.0</version>
</dependency> </dependency>
<dependency>
<groupId>top.ninwoo</groupId>
<artifactId>cnf-utils</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>
......
...@@ -63,7 +63,7 @@ public class NodeService { ...@@ -63,7 +63,7 @@ public class NodeService {
if (dataPackage.getCounter() == 0) { if (dataPackage.getCounter() == 0) {
logger.info("传输失败,已超出生命周期:" + dataPackage); logger.info("传输失败,已超出生命周期:" + dataPackage);
continue; continue;
} else if (dataPackage.getPubIp() == ipService.getIpByAppName(currentName)) { } else if (dataPackage.getPubIp() == ipService.getIpByAppName(1111l, currentName)) {
logger.info("传输成功: " + dataPackage); logger.info("传输成功: " + dataPackage);
continue; continue;
} else if (businessList.contains(businessName)){ } else if (businessList.contains(businessName)){
...@@ -140,7 +140,7 @@ public class NodeService { ...@@ -140,7 +140,7 @@ public class NodeService {
private void transferPackage(DataPackage dataPackage, int nextHop) { private void transferPackage(DataPackage dataPackage, int nextHop) {
//获取目标节点ip //获取目标节点ip
String node_name = "node" + nextHop; String node_name = "node" + nextHop;
String IP = ipService.getIpByAppName(node_name); String IP = ipService.getIpByAppName(1111l, node_name);
//发送至下一节点 //发送至下一节点
send(dataPackage, IP, 2021); send(dataPackage, IP, 2021);
} }
......
package com.jbxie.spaceiotmqtt.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import top.ninwoo.bishe.starter.service.NetworkService;
import top.ninwoo.common.entity.DockerContainer;
import top.ninwoo.utils.util.DockerUtils;
import top.ninwoo.utils.util.LinuxCtlUtils;
import java.util.List;
@Service
class IPService {
@Autowired
private NetworkService networkService;
@Autowired(required=false)
private LinuxCtlUtils linuxCtlUtils;
@Autowired(required=false)
private DockerUtils dockerUtils;
public String getDockerId() {
String cmd = "head -1 /proc/self/cgroup|cut -d/ -f3|cut -c1-12";
String res = linuxCtlUtils.runCmd(cmd);
if(res == null) {
throw new RuntimeException("get Docker Id Error");
}
return res;
}
public String getSelfDockerIpById(Long clusterId) {
String id = getDockerId();
DockerContainer dockerContainer = dockerUtils.getDockerById(id);
return getIpByAppName(clusterId, dockerContainer.getName());
}
public String getIpByAppName(Long clusterId,String appName) {
List<String> ipList = networkService.getIpListByAppName(clusterId, appName);
if(!ipList.isEmpty()){
String ip_tmp = ipList.get(0);
String[] split_list = ip_tmp.split("/");
return split_list[0] + ":8775";
}
return null;
}
public String[] getIpList(Long clusterId, String[] wxName_list) {
int n = wxName_list.length;
String[] ipList = new String[n];
for (int k = 0; k < wxName_list.length; k++){
ipList[k] = getIpByAppName(clusterId,wxName_list[k]);//通过卫星名字获取卫星ip
}
return ipList;
}
}
...@@ -14,13 +14,12 @@ ...@@ -14,13 +14,12 @@
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId> <artifactId>spring-boot-starter</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>top.ninwoo</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>cnf-client-starter</artifactId> <artifactId>spring-boot-starter-web</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId> <artifactId>spring-boot-starter-test</artifactId>
...@@ -64,8 +63,8 @@ ...@@ -64,8 +63,8 @@
<version>1.6.3</version> <version>1.6.3</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<artifactId>slf4j-api</artifactId> <artifactId>slf4j2-api</artifactId>
<groupId>org.slf4j</groupId> <groupId>org.slf4j2</groupId>
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
...@@ -77,6 +76,9 @@ ...@@ -77,6 +76,9 @@
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId> <artifactId>spring-boot-maven-plugin</artifactId>
<version>${springboot.version}</version> <version>${springboot.version}</version>
<configuration>
<mainClass>top.jbxie.raft.RaftNodeBootStrap</mainClass>
</configuration>
<executions> <executions>
<execution> <execution>
<goals> <goals>
......
package top.jbxie.raft; package top.jbxie.raft;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.core.env.Environment;
import top.jbxie.raft.nodeCommon.NodeConfig;
import top.jbxie.raft.util.Node;
import top.jbxie.raft.util.impl.DefaultNode;
import java.util.Arrays;
/** /**
* -DserverPort=8775 * -DserverPort=8775
* -DserverPort=8776
* -DserverPort=8777
* -DserverPort=8778
* -DserverPort=8779
* *
* 启动类,对自身节点的ip port的包装 * 启动类,对自身节点的ip port的包装
*/ */
@SpringBootApplication @SpringBootApplication
public class RaftNodeBootStrap { public class RaftNodeBootStrap {
// public static void main(String[] args) { public static void main(String[] args) {
// SpringApplication.run(RaftNodeBootStrap.class, args); SpringApplication.run(RaftNodeBootStrap.class, args);
// }
public static void main(String[] args) throws Throwable {
main0();
} }
public static void main0() throws Throwable {
String[] peerAddr = {"localhost:8775","localhost:8776","localhost:8777", "localhost:8778", "localhost:8779"};//集群列表
NodeConfig config = new NodeConfig();
//自身节点地址
config.setSelfAddr(Integer.valueOf(System.getProperty("serverPort")).intValue());
//config.setSelfAddr(8779);
//所有节点地址
config.setPeerAddrs(Arrays.asList(peerAddr));
Node node = DefaultNode.getInstance();
node.setConfig(config);
node.init();
// 为Java应用程序添加退出事件处理
// 通过Runtime.getRuntime().addShutdownHook(Thread hook)方法,事件监听,捕获系统退出消息到来,向Java虚拟机
// 注册一个shutdown钩子事件,这样程序一旦运行到此处,就运行线程hook。
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
node.destroy();
} catch (Throwable throwable) {
throwable.printStackTrace();
}
}));
}
} }
package top.jbxie.raft.client; package top.jbxie.raft.client;
import java.net.Inet4Address;
import java.net.InetAddress;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import top.jbxie.raft.nodeCommon.NodeConfig;
@RestController
public class test { public class test {
// @Autowired public static void main(String[] args) throws Exception {
// private NodeConfig nodeConfig; Inet4Address.getLocalHost();
// InetAddress addr = Inet4Address.getLocalHost();
@Autowired System.out.println("Local HostAddress: "+addr.getHostAddress());
public static Environment environment; String hostname = addr.getHostName();
System.out.println("Local host name: "+hostname);
// @RequestMapping("/bweather") }
// private String getBWeather() {
// return String.format("weather.dt:%s ===== weather.humidity:%s ==== port:%s", nodeConfig.getDt(), nodeConfig.getHumidity(), environment.getProperty("server.port"));
// }
// public static void main(String[] args) {
// System.out.println(System.getProperty("server.port"));;
// }
} }
...@@ -25,7 +25,8 @@ public class ClusterMembershipChangesImpl implements ClusterMembershipChanges { ...@@ -25,7 +25,8 @@ public class ClusterMembershipChangesImpl implements ClusterMembershipChanges {
} }
/** 必须是同步的,一次只能添加一个节点 /** 必须是同步的,一次只能添加一个节点
* @param newPeer*/ * @param newPeer
*/
@Override @Override
public synchronized Result addPeer(Peer newPeer) { public synchronized Result addPeer(Peer newPeer) {
// 已经存在 // 已经存在
......
...@@ -13,6 +13,6 @@ public class BaseParam implements Serializable { ...@@ -13,6 +13,6 @@ public class BaseParam implements Serializable {
// 候选人的任期号 // 候选人的任期号
public long term; public long term;
// 被请求者 ID(ip:selfAddr) // 被请求者 ID(目标节点ip)
public String serverId; public String serverId;
} }
...@@ -13,7 +13,7 @@ import lombok.Setter; ...@@ -13,7 +13,7 @@ import lombok.Setter;
@Setter @Setter
public class RvoteParam extends BaseParam { public class RvoteParam extends BaseParam {
// 请求选票的候选人的 Id(ip:selfAddr) // 请求选票的候选人的 Id(ip)
String candidateId; String candidateId;
// 候选人的最后日志条目的索引值 // 候选人的最后日志条目的索引值
......
...@@ -14,8 +14,8 @@ import java.util.List; ...@@ -14,8 +14,8 @@ import java.util.List;
@Data @Data
public class NodeConfig { public class NodeConfig {
// 自身节点地址(目前为端口,后面修改为ip地址) // 自身节点地址
public int selfAddr; public String selfAddr;
//所有节点地址 //所有节点地址
public List<String> peerAddrs; public List<String> peerAddrs;
......
...@@ -11,7 +11,7 @@ import java.util.Objects; ...@@ -11,7 +11,7 @@ import java.util.Objects;
@Getter @Getter
@Setter @Setter
public class Peer { public class Peer {
/** ip:selfAddr */ /** selfAddr*/
private final String addr; private final String addr;
public Peer(String addr) { public Peer(String addr) {
......
...@@ -15,7 +15,7 @@ public class PeerSet implements Serializable { ...@@ -15,7 +15,7 @@ public class PeerSet implements Serializable {
//leader节点 //leader节点
private volatile Peer leader; private volatile Peer leader;
// final //自身
private volatile Peer self; private volatile Peer self;
private PeerSet() { private PeerSet() {
......
package top.jbxie.raft.register;
import lombok.SneakyThrows;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import top.jbxie.raft.nodeCommon.NodeConfig;
import top.jbxie.raft.service.impl.IPServiceImpl;
import top.jbxie.raft.util.Node;
import top.jbxie.raft.util.impl.DefaultNode;
import java.util.Arrays;
@Component
public class MyApplicationRunner implements ApplicationRunner {
@SneakyThrows
@Override
public void run(ApplicationArguments args) throws Exception {
// String[] peerAddr = {"localhost:8775","localhost:8776","localhost:8777", "localhost:8778", "localhost:8779"};//集群列表
//
// NodeConfig config = new NodeConfig();
//
// //自身节点地址
// config.setSelfAddr(Integer.valueOf(System.getProperty("serverPort")).intValue());
// //config.setSelfAddr(8779);
//
// //所有节点地址
// config.setPeerAddrs(Arrays.asList(peerAddr));
//
// Node node = DefaultNode.getInstance();
//
// node.setConfig(config);
//
// node.init();
//
// // 为Java应用程序添加退出事件处理
// // 通过Runtime.getRuntime().addShutdownHook(Thread hook)方法,事件监听,捕获系统退出消息到来,向Java虚拟机
// // 注册一个shutdown钩子事件,这样程序一旦运行到此处,就运行线程hook。
// Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// try {
// node.destroy();
// } catch (Throwable throwable) {
// throwable.printStackTrace();
// }
// }));
IPServiceImpl ipServiceImpl = new IPServiceImpl();
NodeConfig config = new NodeConfig();
Thread.currentThread().sleep(20000);
// 自身节点地址
config.setSelfAddr(ipServiceImpl.getSelfIp());
// 所有节点地址
// Long clusterId = 11113l;
// config.setPeerAddrs(Arrays.asList(ipServiceImpl.getIpList(clusterId, "sate")));
String[] peerAddr = {"10.10.1.2:8775","10.10.1.3:8775","10.10.1.4:8775", "10.10.1.5:8775", "10.10.1.6:8775", "10.10.1.7:8775", "10.10.1.8:8775"};//集群列表
config.setPeerAddrs(Arrays.asList(peerAddr));
Node node = DefaultNode.getInstance();
node.setConfig(config);
node.init();
// 为Java应用程序添加退出事件处理
// 通过Runtime.getRuntime().addShutdownHook(Thread hook)方法,事件监听,捕获系统退出消息到来,向Java虚拟机
// 注册一个shutdown钩子事件,这样程序一旦运行到此处,就运行线程hook。
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
node.destroy();
} catch (Throwable throwable) {
throwable.printStackTrace();
}
}));
}
}
...@@ -20,28 +20,28 @@ public class DefaultRpcServer implements RpcServer { ...@@ -20,28 +20,28 @@ public class DefaultRpcServer implements RpcServer {
private com.alipay.remoting.rpc.RpcServer rpcServer; private com.alipay.remoting.rpc.RpcServer rpcServer;
public DefaultRpcServer(String ip, int port, DefaultNode node) { // public DefaultRpcServer(String ip, int port, DefaultNode node) {
if (flag) { // if (flag) {
return; // return;
} // }
synchronized (this) { // synchronized (this) {
if (flag) { // if (flag) {
return; // return;
} // }
rpcServer = new com.alipay.remoting.rpc.RpcServer(ip, port); // rpcServer = new com.alipay.remoting.rpc.RpcServer(ip, port, false);
//
rpcServer.registerUserProcessor(new RaftUserProcessor<Request>() { // rpcServer.registerUserProcessor(new RaftUserProcessor<Request>() {
//
@Override // @Override
public Object handleRequest(BizContext bizCtx, Request request) throws Exception { // public Object handleRequest(BizContext bizCtx, Request request) throws Exception {
return handlerRequest(request); // return handlerRequest(request);
} // }
}); // });
//
this.node = node; // this.node = node;
flag = true; // flag = true;
} // }
} // }
public DefaultRpcServer(int port, DefaultNode node) { public DefaultRpcServer(int port, DefaultNode node) {
if (flag) { if (flag) {
......
package top.jbxie.raft.service;
import java.net.UnknownHostException;
public interface IPService {
String getSelfIp() throws UnknownHostException;
String[] getIpList(Long clusterId, String appName);
}
package top.jbxie.raft.service.impl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import top.jbxie.raft.service.IPService;
import top.ninwoo.bishe.starter.service.NetworkService;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
@Service
public class IPServiceImpl implements IPService {
@Autowired
private NetworkService networkService;
@Override
public String getSelfIp() throws UnknownHostException {
InetAddress addr = Inet4Address.getLocalHost();
return addr.getHostAddress() + ":8775";
}
@Override
public String[] getIpList(Long clusterId, String appName) {
List<String> ipListS = networkService.getIpListByAppName(clusterId, appName);
int n = ipListS.size();
String[] ipList = new String[n];
for (int i = 0; i < n; i++) {
if(!ipListS.isEmpty()){
String ip_tmp = ipListS.get(i);
String[] split_list = ip_tmp.split("/");
ipList[i] = split_list[0]+":8775";
}
}
return ipList;
}
}
...@@ -4,6 +4,8 @@ import io.netty.util.internal.StringUtil; ...@@ -4,6 +4,8 @@ import io.netty.util.internal.StringUtil;
import lombok.Data; import lombok.Data;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import top.jbxie.raft.clusterService.Server;
import top.jbxie.raft.entity.*; import top.jbxie.raft.entity.*;
import top.jbxie.raft.nodeCommon.NodeStatus; import top.jbxie.raft.nodeCommon.NodeStatus;
import top.jbxie.raft.nodeCommon.Peer; import top.jbxie.raft.nodeCommon.Peer;
......
...@@ -40,7 +40,8 @@ public class DefaultLogModule implements LogModule { ...@@ -40,7 +40,8 @@ public class DefaultLogModule implements LogModule {
static { static {
if (dbDir == null) { if (dbDir == null) {
dbDir = "./rocksDB-raft/" + System.getProperty("serverPort"); //dbDir = "./rocksDB-raft/" + System.getProperty("serverPort");
dbDir = "./rocksDB-raft/";
} }
if (logsDir == null) { if (logsDir == null) {
logsDir = dbDir + "/logModule"; logsDir = dbDir + "/logModule";
......
...@@ -137,13 +137,14 @@ public class DefaultNode<T> implements Node<T>, LifeCycle, ClusterMembershipChan ...@@ -137,13 +137,14 @@ public class DefaultNode<T> implements Node<T>, LifeCycle, ClusterMembershipChan
for (String s : config.getPeerAddrs()) { for (String s : config.getPeerAddrs()) {
Peer peer = new Peer(s); Peer peer = new Peer(s);
peerSet.addPeer(peer); peerSet.addPeer(peer);
if (s.equals("localhost:" + config.getSelfAddr())) { //if (s.equals("localhost:" + config.getSelfAddr())) {
if (s.equals(config.getSelfAddr())) {
peerSet.setSelf(peer); peerSet.setSelf(peer);
} }
} }
// 开启了一个RPCServer,接收投票RPCVoteRequest, 和附加日志RPCLogAppendRequest(心跳也是日志附加Request,只是日志内容为null) // 开启了一个RPCServer,接收投票RPCVoteRequest, 和附加日志RPCLogAppendRequest(心跳也是日志附加Request,只是日志内容为null)
RPC_SERVER = new DefaultRpcServer(config.selfAddr, this); RPC_SERVER = new DefaultRpcServer(8775, this);
} }
/** /**
......
...@@ -34,8 +34,8 @@ public class DefaultStateMachine implements StateMachine { ...@@ -34,8 +34,8 @@ public class DefaultStateMachine implements StateMachine {
static { static {
if (dbDir == null) { if (dbDir == null) {
dbDir = "./rocksDB-raft/" + System.getProperty("serverPort"); // dbDir = "./rocksDB-raft/" + System.getProperty("serverPort");
//dbDir = "./rocksDB-raft/" + 8779; dbDir = "./rocksDB-raft/";
} }
if (stateMachineDir == null) { if (stateMachineDir == null) {
stateMachineDir = dbDir + "/stateMachine"; stateMachineDir = dbDir + "/stateMachine";
......
-DserverPort=8775 #项目端口
\ No newline at end of file server.port=8081
#接入云端的端口设置
bishe.app.zookeeper-url=192.168.31.37:2181
bishe.app.app-name=joliu
bishe.app.cloud-url=192.168.31.37:9091
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
<appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender"> <!--日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL -->
<layout class="org.apache.log4j.PatternLayout"> <!--Configuration后面的status,这个用于设置log4j2自身内部的信息输出,可以不设置,当设置成trace时,你会看到log4j2内部各种详细输出-->
<param name="ConversionPattern" value="%d %t %5p [%c:%M:%L] - %m%n"/>
</layout>
</appender>
<root> <!--monitorInterval:Log4j能够自动检测修改配置 文件和重新配置本身,设置间隔秒数-->
<level value="INFO"/> <configuration status="WARN" monitorInterval="30">
<appender-ref ref="CONSOLE"/>
</root>
</log4j:configuration> <Properties>
\ No newline at end of file <Property name="log.path">log</Property>
</Properties>
<!--先定义所有的appender-->
<appenders>
<!--这个输出控制台的配置-->
<console name="Console" target="SYSTEM_OUT">
<!--输出日志的格式-->
<PatternLayout pattern="[%d{HH:mm:ss:SSS}] [%p] - %l - %m%n"/>
</console>
<File name="log" fileName="${log.path}/test.log" append="false">
<PatternLayout pattern="[%d{HH:mm:ss:SSS}] [%p] - %l - %m%n"/>
</File>
<RollingFile name="RollingFileInfo" fileName="${log.path}/info.log"
filePattern="${log.path}/logs/${date:yyyy-MM}/info-%d{yyyy-MM-dd}.log.zip">
<!--只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch)-->
<ThresholdFilter level="info" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout pattern="[%d{HH:mm:ss:SSS}] [%p] - %l - %m%n"/>
<Policies>
<TimeBasedTriggeringPolicy modulate="true" interval="1"/>
</Policies>
</RollingFile>
</appenders>
<!--然后定义logger,只有定义了logger并引入appender,appender才会生效-->
<loggers>
<!--过滤掉spring和mybatis的一些无用的DEBUG信息-->
<logger name="org.springframework" level="INFO"/>
<logger name="org.mybatis" level="INFO"/>
<logger name="com.baiding" level="INFO"/>
<root level="info">
<appender-ref ref="Console"/>
<appender-ref ref="RollingFileInfo"/>
</root>
</loggers>
</configuration>
\ No newline at end of file
...@@ -2,10 +2,12 @@ server: ...@@ -2,10 +2,12 @@ server:
port: 9091 port: 9091
zookeeper: zookeeper:
url: zk.cnf.org:2181 #url: zk.cnf.org:2181
url: 192.168.31.37:2181
bs: bs:
cloudcenter: cloudcenter:
name: my-bs-cloud-center name: my-bs-cloud-center
ipservice: ipservice:
url: ipservice.cnf.org:23333 #url: ipservice.cnf.org:23333
\ No newline at end of file url: 192.168.31.37:23333
\ No newline at end of file
...@@ -29,6 +29,8 @@ import java.util.concurrent.ConcurrentHashMap; ...@@ -29,6 +29,8 @@ import java.util.concurrent.ConcurrentHashMap;
@Service @Service
public class ClusterServiceImpl implements ClusterService { public class ClusterServiceImpl implements ClusterService {
private final Logger LOG = LoggerFactory.getLogger(ClusterServiceImpl.class); private final Logger LOG = LoggerFactory.getLogger(ClusterServiceImpl.class);
// ConcurrentHashMap<ClusterConfig.id, Map<DockerContainer.name, Set<DockerContainer.id>>> 一个基础容器名,对应多个容器id
private ConcurrentHashMap<Long, Map<String, Set<String>>> clustersInfo = new ConcurrentHashMap<>(); private ConcurrentHashMap<Long, Map<String, Set<String>>> clustersInfo = new ConcurrentHashMap<>();
@Autowired @Autowired
DockerService dockerService; DockerService dockerService;
......
...@@ -26,12 +26,15 @@ bs: ...@@ -26,12 +26,15 @@ bs:
name: random name: random
ip-prefix: 192.168.31 ip-prefix: 192.168.31
ipservice: ipservice:
ip: ipservice.cnf.org:23333 #ip: ipservice.cnf.org:23333
ip: 192.168.31.37:23333
sdn-controller: sdn-controller:
host: sdn.cnf.org host: sdn.cnf.org
port: 6653 port: 6653
zookeeper: zookeeper:
url: zk.cnf.org:2181 #url: zk.cnf.org:2181
url: 192.168.31.37:2181
cnf: cnf:
passwd: Vudo3423 passwd: Vudo3423
bishe.app.app-name=joliu bishe.app.app-name=joliu
bishe.app.cloud-url=192.168.31.156:9090 bishe.app.cloud-url=192.168.31.37:9091
spring.influx.url=http://192.168.0.108:8086 spring.influx.url=http://192.168.0.108:8086
spring.influx.user=admin spring.influx.user=admin
......
package top.ninwoo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import top.ninwoo.bishe.starter.entity.ContainerMonitor;
import top.ninwoo.bishe.starter.service.ClusterService;
import top.ninwoo.bishe.starter.service.NetworkService;
import top.ninwoo.client.BisheTestMain;
import top.ninwoo.common.entity.*;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = BisheTestMain.class)
public class JbxieRaftTests {
@Autowired
private ClusterService clusterService;
@Resource
private NetworkService networkService;
@Test
public void testSendSeparateConfig() {
ArrayList<SeparatedClusterConfig> clusterConfigs = new ArrayList<>();
SeparatedClusterConfig separatedClusterConfig = new SeparatedClusterConfig();
// TODO: 这个ID应该是从借口获取的
separatedClusterConfig.setEdgeNodeId("192.168.31.171:18088"); //边缘节点IP+端口号
ClusterConfig clusterConfigCom = new ClusterConfig();
clusterConfigCom.setId(11113l);
clusterConfigCom.setOwner("joliu");
List<ContainerDescription> cds = new ArrayList<>();
ContainerDescription containerDescriptionClient = new ContainerDescription();
containerDescriptionClient.setMode("normal");
containerDescriptionClient.setReplicas(7);
DockerContainer container = new DockerContainer();
container.setName("sate");//创建一个client容器
container.setCommand("bash");
container.setImage("raft_test");
containerDescriptionClient.setDockerContainer(container);
cds.add(containerDescriptionClient);//将client容器添加到list集合中
clusterConfigCom.setDockers(cds);
NetworkTopology topo = new NetworkTopology();
topo.setAppNames(new String[]{"sate", "br:ovs1"});
// topo.setAppNames(new String[]{"sate_0", "sate_1", "sate_2", "sate_3", "sate_4", "sate_5", "sate_6", "br:ovs1"});
// 这个参数好像没啥用
topo.setTopologyId(11);
// topo.setTopology(new int[][]{{0, 0, 0, 0, 0, 0, 0, 0},
// {0, 0, 0, 0, 0, 0, 0, 0},
// {0, 0, 0, 0, 0, 0, 0, 0},
// {0, 0, 0, 0, 0, 0, 0, 0},
// {0, 0, 0, 0, 0, 0, 0, 0},
// {0, 0, 0, 0, 0, 0, 0, 0},
// {0, 0, 0, 0, 0, 0, 0, 0},
// {1, 1, 1, 1, 1, 1, 1, 0}});
topo.setTopology(new int[][]{{0, 0}, {1, 0}});
clusterConfigCom.setTopology(topo);
separatedClusterConfig.setClusterConfig(clusterConfigCom);
clusterConfigs.add(separatedClusterConfig);
clusterService.sendClusterConfigToEdgeNode(clusterConfigs);
}
//测试容器ip
@Test
public void getIpListByAppNameTest() {
List<String> ipList = networkService.getIpListByAppName(11113L, "sate");
System.out.println(ipList); // [10.10.1.8/24, 10.10.1.7/24, 10.10.1.4/24, 10.10.1.3/24, 10.10.1.5/24, 10.10.1.6/24, 10.10.1.2/24]
}
@Test
public void getIpList() {
List<String> ipListS = networkService.getIpListByAppName(11113L, "sate");
String[] ipList = new String[ipListS.size()];
for (int i = 0; i < ipListS.size(); i++) {
if(!ipListS.isEmpty()){
String ip_tmp = ipListS.get(i);
String[] split_list = ip_tmp.split("/");
ipList[i] = split_list[0]+":8775";
}
}
for (int j = 0; j < ipList.length; j++) {
System.out.println(ipList[j]);
}
/**
* 10.10.1.8:8775
* 10.10.1.7:8775
* 10.10.1.4:8775
* 10.10.1.3:8775
* 10.10.1.5:8775
* 10.10.1.6:8775
* 10.10.1.2:8775
*/
}
@Test
public void removeClusterFromEdgeNodeTest() {
clusterService.removeClusterFromEdgeNode(11113L);
}
@Test
public void getLogicalNetworkTopologyTest() {
NetworkTopology logicalNetworkTopology = networkService.getLogicalNetworkTopology(11113L);
System.out.println(logicalNetworkTopology);
}
@Test
public void enableNetworkMonitorTest() throws InterruptedException {
String res = networkService.enableNetworkMonitor(11112L, "server");
System.out.println(res);
}
@Test
public void cancelNetworkMonitorTest() {
String run = networkService.cancelNetworkMonitor(11112L, "server");
System.out.println(run);
}
@Test
public void getNetworkMonitorTest() throws InterruptedException {
List<ContainerMonitor> run = networkService.getContainerMonitors(11112L, "Run");
int i = 0;
while(i < 10) {
run.forEach(c -> {
System.out.println(c.getContainerInfo());
});
Thread.sleep(500);
i++;
}
}
}
...@@ -97,9 +97,10 @@ public class DockerUtilsImpl implements DockerUtils { ...@@ -97,9 +97,10 @@ public class DockerUtilsImpl implements DockerUtils {
//String cmd = "docker run -itd --name " + container.getName() + " --privileged " + container.getImage() + " " + container.getCommand(); //String cmd = "docker run -itd --name " + container.getName() + " --privileged " + container.getImage() + " " + container.getCommand();
String cmd; String cmd;
if(container.getPorts() == null){ if(container.getPorts() == null){
cmd = "docker run -itd --name " + container.getName() + " --privileged " + container.getImage() + " " + container.getCommand(); // cmd = "docker run -itd --name " + container.getName() + " --privileged " + container.getImage() + " " + container.getCommand();
}else{ cmd = "docker run -itd -P " + " --name " + container.getName() + " --net=none" + " --privileged " + container.getImage() + " " + container.getCommand();
cmd = "docker run -itd -p " + container.getPorts()+ ":" +container.getPorts()+ " " + "--name " + container.getName() + " --privileged " + container.getImage() + " " + container.getCommand(); }else {
cmd = "docker run -itd -p " + container.getPorts()+ ":" +container.getPorts()+ " " + "--name " + container.getName() + " --net=none" + " --privileged " + container.getImage() + " " + container.getCommand();
} }
String result = linuxCtlUtils.runCmd(cmd); String result = linuxCtlUtils.runCmd(cmd);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment