Commit 29112de4 authored by 谢建斌's avatar 谢建斌

代码重构

parent a0ca07ec
package com.jbxie.spaceiotmqtt.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import top.ninwoo.bishe.starter.service.NetworkService;
import top.ninwoo.common.entity.DockerContainer;
import top.ninwoo.utils.util.DockerUtils;
import top.ninwoo.utils.util.LinuxCtlUtils;
import java.util.List;
@Service
class IPService {
@Autowired
private NetworkService networkService;
@Autowired(required=false)
private LinuxCtlUtils linuxCtlUtils;
@Autowired(required=false)
private DockerUtils dockerUtils;
public String getDockerId() {
String cmd = "head -1 /proc/self/cgroup|cut -d/ -f3|cut -c1-12";
String res = linuxCtlUtils.runCmd(cmd);
if(res == null) {
throw new RuntimeException("get Docker Id Error");
}
return res;
}
public String getSelfDockerIpById(Long clusterId) {
String id = getDockerId();
DockerContainer dockerContainer = dockerUtils.getDockerById(id);
return getIpByAppName(clusterId, dockerContainer.getName());
}
public String getIpByAppName(Long clusterId,String appName) {
List<String> ipList = networkService.getIpListByAppName(clusterId, appName);
if(!ipList.isEmpty()){
String ip_tmp = ipList.get(0);
String[] split_list = ip_tmp.split("/");
return split_list[0] + ":8775";
}
return null;
}
public String[] getIpList(Long clusterId, String[] wxName_list) {
int n = wxName_list.length;
String[] ipList = new String[n];
for (int k = 0; k < wxName_list.length; k++){
ipList[k] = getIpByAppName(clusterId,wxName_list[k]);//通过卫星名字获取卫星ip
}
return ipList;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cnf-spaceiot-longConnection</artifactId>
<groupId>top.ninwoo</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>csl-common-api</artifactId>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package top.ninwoo.common.entity;
import lombok.Data;
import java.io.Serializable;
@Data
public class DataPackage implements Serializable {
private String key;
private String value;
private String subNode; // 订阅节点(消息订阅点固定北京,groung3)
private String pubNode; // 发布节点
private String[][] route; // 路由列表
private int totalHops; // 总跳数
private int remainingHops; // 剩余跳数
private String nextHop; // 下一跳
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cnf-spaceiot-longConnection</artifactId>
<groupId>top.ninwoo</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>csl-node-ground</artifactId>
<dependencies>
<dependency>
<groupId>org.python</groupId>
<artifactId>jython-standalone</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>top.ninwoo</groupId>
<artifactId>csl-common-api</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${springboot.version}</version>
<configuration>
<mainClass>top.ninwoo.ground.GroundMain</mainClass>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package top.ninwoo.ground;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class GroundMain {
public static void main(String[] args) {
SpringApplication.run(GroundMain.class, args);
}
}
package top.ninwoo.ground.register;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import top.ninwoo.common.entity.DataPackage;
import top.ninwoo.ground.service.GroundService;
import top.ninwoo.ground.service.IPService;
import javax.xml.crypto.Data;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.List;
@Component
public class MyApplicationRunner implements ApplicationRunner {
private static final Logger logger = LoggerFactory.getLogger(MyApplicationRunner.class);
private static final String fileName = "route_path.txt"; //生成的路由保存文件
private static final String timeSlot = "35"; //时隙,可修改
private static final String sourceNode = "ground3";
private static final String destinationNode = "groung14";
@Autowired
GroundService groundService;
@Autowired
IPService ipService;
@Autowired
DataPackage dataPackage;
/*================================订阅节点的run函数=============================================*/
@Override
public void run(ApplicationArguments args) throws Exception {
logger.info("BeiJing Node Initiate a request");
dataPackage.setKey("temperature");
dataPackage.setSubNode(sourceNode);
dataPackage.setPubNode(destinationNode);
groundService.execPy(timeSlot, sourceNode, destinationNode);
String nextHop = groundService.readFileContent(fileName);
String nextIp = ipService.getIpByAppName(nextHop);
send(dataPackage, nextIp, 2021);
// 接收回传信息
ServerSocket serverSocket = new ServerSocket(8900);
//获取数据流
Socket accept = serverSocket.accept();
InputStream inputStream = accept.getInputStream();
//读取数据流
ObjectInputStream objectInputStream = new ObjectInputStream(new BufferedInputStream(inputStream));
Object object = null;
try {
object = objectInputStream.readObject();
DataPackage receiveData = (DataPackage)object;
//输出数据流
logger.info("BeiJing Node Receive Data");
logger.info(receiveData.getKey() + ":" + receiveData.getValue());
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
serverSocket.close();
accept.close();
System.out.println();
}
/*================================其他节点的run函数=============================================*/
// @Override
// public void run(ApplicationArguments args) throws Exception {
// ServerSocket serverSocket = null;
// InputStream inputStream = null;
// ObjectInputStream objectInputStream = null;
// try {
// serverSocket = new ServerSocket(2021);
// while (true) {
// Socket client = serverSocket.accept();
// inputStream = client.getInputStream();
// objectInputStream = new ObjectInputStream(new BufferedInputStream(inputStream));
// Object object = objectInputStream.readObject();
// DataPackage dataPackage = (DataPackage) object;
//
// // 获取当前节点剩余跳数,若为0,则当前节点为目的节点;若不为0,则当前节点为进行转发的地面节点
// int remainingHops = dataPackage.getRemainingHops() - 1;
// if (remainingHops == 0) {
// logger.info("到达消息拥有节点,获取消息然后发送至消息订阅节点");
// dataPackage.setValue(String.valueOf(groundService.getRandom()));
// groundService.execPy(timeSlot, destinationNode, sourceNode);
// String nextHop = groundService.readFileContent(fileName);
// String nextIp = ipService.getIpByAppName(nextHop);
//
// send(dataPackage, nextIp, 2021);
// continue;
// } else {
// logger.info("中间地面节点,转发至下一节点");
// dataPackage.setRemainingHops(remainingHops);
// String[][] route = dataPackage.getRoute();
// String nextHop = route[0][dataPackage.getTotalHops() - remainingHops + 1];
// String nextIp = ipService.getIpByAppName(nextHop);
// send(dataPackage, nextIp, 2021);
// continue;
// }
// }
// } catch (IOException | ClassNotFoundException e) {
// e.printStackTrace();
// } finally {
// try {
// if (objectInputStream != null) {
// objectInputStream.close();
// }
// if (inputStream != null) {
// inputStream.close();
// }
// if (serverSocket != null) {
// serverSocket.close();
// }
// } catch (IOException e) {
// e.printStackTrace();
// }
// }
// }
/**
* 发送数据包到下一节点
* @param dataPackage
* @param ip
* @param port
*/
private static void send(DataPackage dataPackage, String ip, int port) {
Socket socket = null;
try {
socket = new Socket(ip, port);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(dataPackage);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (socket != null) {
socket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
package top.ninwoo.ground.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import top.ninwoo.common.entity.DataPackage;
import java.io.*;
import java.util.ArrayList;
import java.util.Random;
public class GroundService {
private static final Logger logger = LoggerFactory.getLogger(GroundService.class);
private static final String pyFilePath = "E:\\xjb\\xjb.py"; //如果未指定.py文件的完全路径,则默认从工程当前目录下搜索
private static final String pyAllPath = "E:\\xjb"; //注意:当命令行参数分开写的时候,exec后面不用添加一个空格。当命令行参数一起写的时候,exe后面一定要添加一个空格
private static final String pyRoutePath = "E:\\xjb";
private static Process proc = null;//java进程类
@Autowired
DataPackage dataPackage;
@Autowired
GroundService groundService;
/**
* 执行*.py文件
*/
public void execPy(String timeSlot, String sourceNode, String destinationNode) {
String[] arguments = new String[] {"py", pyFilePath, pyAllPath, pyRoutePath, timeSlot, sourceNode, destinationNode};
try {
proc = Runtime.getRuntime().exec(arguments);
BufferedReader in = new BufferedReader(new InputStreamReader(proc.getInputStream()));
String line = null;
while ((line = in.readLine()) != null) {
System.out.println(line);
}
in.close();
int re = proc.waitFor();//返回0:成功。其余返回值均表示失败,如:返回错误代码1:操作不允许,表示调用python脚本失败
System.out.println(re);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 读取route_path.txt生成路由表,返回下一跳节点
* @param fileName
*/
public String readFileContent(String fileName) throws IOException {
/**
* 文件读入
*/
String filePath = pyRoutePath + "\\" + fileName;
// File file = new file(filePath);
FileReader in = null;
try {
in = new FileReader(filePath);
} catch (FileNotFoundException e) {
logger.error("File doesn't exsit!");
return null ;
}
/**
* 使用BufferedReader读入并保存
*/
BufferedReader br = new BufferedReader(in);
ArrayList<String> list = new ArrayList<String>();
String s = null;
while(true) {
try {
s = br.readLine();
} catch(IOException e) {
e.printStackTrace();
}
list.add(s);
if(s == null)
break;
}
/**
* 创建矩阵,将数据写入二维数组
*/
int colLength; //列数
int rowLength; //行数
rowLength = list.size() - 1;
colLength = list.get(0).split(" ").length;
String[][] square = new String[rowLength][colLength]; //幻方
for (int i = 0; i < rowLength; i++) {
String[] linex = list.get(i).split(" ");
for (int j = 0; j < colLength; j++) {
square[i][j] = linex[j];
}
}
br.close();
dataPackage.setRoute(square);
dataPackage.setRemainingHops(Integer.valueOf(square[3][0]));
dataPackage.setTotalHops(Integer.valueOf(square[3][0]));
return square[0][1];
}
/**
* 得到0.5到1的随机数
* @return
*/
public int getRandom(){
// 0<= random and random < n
Random ran = new Random();
int random = ran.nextInt(40);
return random;
}
public static void Print(int colLength, int rowLength, String[][] square) {
for (int i = 0; i < rowLength; i++) {
if (i > 0)
System.out.print("\n");
for (int j = 0; j < colLength; j++) {
System.out.print(square[i][j] + "\t");
}
}
System.out.println(Integer.valueOf(square[3][0]));
}
public static void main(String[] args) throws IOException {
// execPy();
// readFileContent("route_path.txt");
}
}
package com.jbxie.spaceiotmqtt.service; package top.ninwoo.ground.service;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -8,17 +8,17 @@ import java.util.List; ...@@ -8,17 +8,17 @@ import java.util.List;
@Service @Service
public class IPService { public class IPService {
@Autowired @Autowired
private NetworkService networkService; private NetworkService networkService;
public String getIpByAppName(String appName) { public String getIpByAppName(String appName) {
List<String> ipList = networkService.getIpListByAppName(11111l, appName); List<String> ipList = networkService.getIpListByAppName(11113l, appName);
if (!ipList.isEmpty()) { if(!ipList.isEmpty()){
String ip_tmp = ipList.get(0); String ip_tmp = ipList.get(0);
String[] split_list = ip_tmp.split("/"); String[] split_list = ip_tmp.split("/");
return split_list[0]; return split_list[0];
} }
return null; return null;
} }
} }
from __future__ import print_function
from sklearn import datasets
from sklearn.linear_model import LinearRegression
import matplotlib.pyplot as plt
X, y = datasets.make_regression(n_samples=100, n_features=1, n_targets=1, noise=10)
plt.scatter(X, y)
plt.show()
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cnf-spaceiot-longConnection</artifactId>
<groupId>top.ninwoo</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>csl-node-satellite</artifactId>
<dependencies>
<dependency>
<groupId>top.ninwoo</groupId>
<artifactId>csl-common-api</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${springboot.version}</version>
<configuration>
<mainClass>top.ninwoo.satellite.SatelliteMain</mainClass>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package top.ninwoo.satellite;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SatelliteMain {
public static void main(String[] args) {
SpringApplication.run(SatelliteMain.class, args);
}
}
package top.ninwoo.satellite.register;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import top.ninwoo.common.entity.DataPackage;
import top.ninwoo.satellite.service.IPService;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
@Component
public class MyApplicationRunner implements ApplicationRunner {
private static final Logger logger = LoggerFactory.getLogger(MyApplicationRunner.class);
@Autowired
IPService ipService;
@Autowired
DataPackage dataPackage;
/**
* Callback used to run the bean.
*
* @param args incoming application arguments
* @throws Exception on error
*/
@Override
public void run(ApplicationArguments args) throws Exception {
ServerSocket serverSocket = null;
InputStream inputStream = null;
ObjectInputStream objectInputStream = null;
try {
serverSocket = new ServerSocket(2021);
while (true) {
Socket client = serverSocket.accept();
inputStream = client.getInputStream();
objectInputStream = new ObjectInputStream(new BufferedInputStream(inputStream));
Object object = objectInputStream.readObject();
DataPackage dataPackage = (DataPackage) object;
//卫星节点转发数据
logger.info("中间卫星节点,转发至下一节点");
int remainingHops = dataPackage.getRemainingHops() - 1;
dataPackage.setRemainingHops(remainingHops);
String[][] route = dataPackage.getRoute();
String nextHop = route[0][dataPackage.getTotalHops() - remainingHops + 1];
String nextIp = ipService.getIpByAppName(nextHop);
send(dataPackage, nextIp, 2021);
continue;
}
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
} finally {
try {
if (objectInputStream != null) {
objectInputStream.close();
}
if (inputStream != null) {
inputStream.close();
}
if (serverSocket != null) {
serverSocket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 发送数据包到下一节点
* @param dataPackage
* @param ip
* @param port
*/
private static void send(DataPackage dataPackage, String ip, int port) {
Socket socket = null;
try {
socket = new Socket(ip, port);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(dataPackage);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (socket != null) {
socket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
package top.ninwoo.satellite.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import top.ninwoo.bishe.starter.service.NetworkService;
import java.util.List;
@Service
public class IPService {
@Autowired
private NetworkService networkService;
public String getIpByAppName(String appName) {
List<String> ipList = networkService.getIpListByAppName(11113l, appName);
if(!ipList.isEmpty()){
String ip_tmp = ipList.get(0);
String[] split_list = ip_tmp.split("/");
return split_list[0];
}
return null;
}
}
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cnf</artifactId>
<groupId>top.ninwoo</groupId>
<version>1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cnf-spaceiot-longConnection</artifactId>
<packaging>pom</packaging>
<modules>
<module>csl-node-satellite</module>
<module>csl-node-ground</module>
<module>csl-common-api</module>
</modules>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>top.ninwoo</groupId>
<artifactId>cnf-client-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${springboot.version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<!-- <version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>-->
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
...@@ -3,13 +3,14 @@ ...@@ -3,13 +3,14 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent> <parent>
<artifactId>cnf-space-iot</artifactId> <artifactId>cnf</artifactId>
<groupId>top.ninwoo</groupId> <groupId>top.ninwoo</groupId>
<version>1.0-SNAPSHOT</version> <version>1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>space-iot-mqtt</artifactId> <artifactId>cnf-spaceiot-mqtt</artifactId>
<dependencies> <dependencies>
<dependency> <dependency>
...@@ -73,5 +74,4 @@ ...@@ -73,5 +74,4 @@
</plugin> </plugin>
</plugins> </plugins>
</build> </build>
</project> </project>
\ No newline at end of file
package com.jbxie.spaceiotmqtt; package top.ninwoo.mqtt;
import com.jbxie.spaceiotmqtt.mqttUtils.MyMqttClient;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import top.ninwoo.mqtt.mqttUtils.MyMqttClient;
@SpringBootApplication @SpringBootApplication
public class IotMqttMain { public class IotMqttMain {
public static void main(String[] args) { public static void main(String[] args) throws InterruptedException {
String broker = "tcp://192.168.31.198:1883"; String broker = "tcp://192.168.43.129:1883";
// MyMqttClient mqttClient_sub = new MyMqttClient(); // MyMqttClient mqttClient_sub = new MyMqttClient();
// MyMqttClient mqttClient_pub = new MyMqttClient(); // MyMqttClient mqttClient_pub = new MyMqttClient();
// //
...@@ -21,6 +21,10 @@ public class IotMqttMain { ...@@ -21,6 +21,10 @@ public class IotMqttMain {
mqttClient.createClient(broker); mqttClient.createClient(broker);
mqttClient.subTopic("world/test1", 1); mqttClient.subTopic("world/test1", 1);
mqttClient.pubMessage(1, "world/test1", "hello:test_1"); mqttClient.pubMessage(1, "world/test1", "hello:test_1");
Thread.currentThread().sleep(100);
System.out.println(mqttClient.payload);
//System.out.println(mqttClient.subTopic("world/test1", 1));
SpringApplication.run(IotMqttMain.class, args); SpringApplication.run(IotMqttMain.class, args);
} }
} }
\ No newline at end of file
package com.jbxie.spaceiotmqtt.controller; package top.ninwoo.mqtt.controller;
import com.jbxie.spaceiotmqtt.mqttUtils.MyMqttClient;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import top.ninwoo.mqtt.mqttUtils.MyMqttClient;
import java.util.Random; import java.util.Random;
...@@ -67,6 +67,4 @@ public class MqttController { ...@@ -67,6 +67,4 @@ public class MqttController {
myMqttClient.pubMessage(topics[new Random().nextInt(5)], "测试消息发送"); myMqttClient.pubMessage(topics[new Random().nextInt(5)], "测试消息发送");
return "发送成功吗!!!!!!!!"; return "发送成功吗!!!!!!!!";
} }
} }
package com.jbxie.spaceiotmqtt.mqttUtils; package top.ninwoo.mqtt.mqttUtils;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
...@@ -8,6 +8,7 @@ import org.eclipse.paho.client.mqttv3.MqttException; ...@@ -8,6 +8,7 @@ import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
...@@ -31,12 +32,14 @@ import java.util.Date; ...@@ -31,12 +32,14 @@ import java.util.Date;
* 由 MqttClient.connect 激活此回调。 * 由 MqttClient.connect 激活此回调。
* *
*/ */
@Slf4j @Slf4j
@Component @Component
public class MqttClientCallback implements MqttCallback { public class MqttClientCallback implements MqttCallback {
private static final Logger logger = LoggerFactory.getLogger(MqttClientCallback.class); private static final Logger logger = LoggerFactory.getLogger(MqttClientCallback.class);
@Autowired
MyMqttClient myMqttClient;
@Override @Override
public void connectionLost(Throwable arg0) { public void connectionLost(Throwable arg0) {
//logger.info("连接断开,可以做重连"); //logger.info("连接断开,可以做重连");
...@@ -57,6 +60,7 @@ public class MqttClientCallback implements MqttCallback { ...@@ -57,6 +60,7 @@ public class MqttClientCallback implements MqttCallback {
// logger.info("接收消息Qos : " + mqttMessage.getQos()); // logger.info("接收消息Qos : " + mqttMessage.getQos());
// logger.info("接收消息内容 : " + new String(mqttMessage.getPayload())); // logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));
String messages = new String(mqttMessage.getPayload()); String messages = new String(mqttMessage.getPayload());
myMqttClient.payload = messages;
if(!messages.equals("close")){ if(!messages.equals("close")){
System.out.println("{"); System.out.println("{");
System.out.println("接收消息主题 : " + topic); System.out.println("接收消息主题 : " + topic);
...@@ -65,14 +69,8 @@ public class MqttClientCallback implements MqttCallback { ...@@ -65,14 +69,8 @@ public class MqttClientCallback implements MqttCallback {
System.out.println("}"); System.out.println("}");
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式 SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
logger.info("接收时间========" + df.format(new Date()) + " " + System.currentTimeMillis()); logger.info("接收时间========" + df.format(new Date()) + " " + System.currentTimeMillis());
// try {
// perform(topic,json);
// }catch (Exception e){
// }
} }
// if (MyMqttClient.executorService != null) {
// MyMqttClient.executorService.execute(new HandlerThread(topic, content));
// }
} }
public void perform(String topicP, JSONObject json) throws MqttException, UnsupportedEncodingException { public void perform(String topicP, JSONObject json) throws MqttException, UnsupportedEncodingException {
......
package com.jbxie.spaceiotmqtt.mqttUtils; package top.ninwoo.mqtt.mqttUtils;
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
...@@ -33,6 +32,7 @@ public class MyMqttClient { ...@@ -33,6 +32,7 @@ public class MyMqttClient {
private static LinkedBlockingQueue<String[]> queue = new LinkedBlockingQueue<String[]>(); private static LinkedBlockingQueue<String[]> queue = new LinkedBlockingQueue<String[]>();
public static ExecutorService executorService = Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors() + 1); public static ExecutorService executorService = Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors() + 1);
public static String payload;
/** /**
* mqtt broker 连接配置,填自己的mqtt地址,及账号密码 * mqtt broker 连接配置,填自己的mqtt地址,及账号密码
*/ */
...@@ -58,9 +58,10 @@ public class MyMqttClient { ...@@ -58,9 +58,10 @@ public class MyMqttClient {
System.out.println("create mqtt clientConnection"); System.out.println("create mqtt clientConnection");
try { try {
mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setCleanSession(true); //true可以安全地使用内存持久性作为客户端断开连接时清除的所有状态 mqttConnectOptions.setCleanSession(false); //true可以安全地使用内存持久性作为客户端断开连接时清除的所有状态
mqttConnectOptions.setMaxInflight(100000); mqttConnectOptions.setMaxInflight(100000);
mqttConnectOptions.setConnectionTimeout(30); //设置连接超时,单位为秒 mqttConnectOptions.setConnectionTimeout(30); //设置连接超时,单位为秒
//mqttConnectOptions.setKeepAliveInterval(20); //保持连接数,设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 //mqttConnectOptions.setKeepAliveInterval(20); //保持连接数,设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
if (username != null && !"".equals(username)) { if (username != null && !"".equals(username)) {
mqttConnectOptions.setUserName(username); mqttConnectOptions.setUserName(username);
...@@ -137,7 +138,7 @@ public class MyMqttClient { ...@@ -137,7 +138,7 @@ public class MyMqttClient {
/** /**
* 订阅主题 * 订阅主题
*/ */
public static void subTopic(String topic, int qos) { public static String subTopic(String topic, int qos) {
if (mqttClient != null && mqttClient.isConnected()) { if (mqttClient != null && mqttClient.isConnected()) {
try { try {
mqttConnectOptions.setWill(topic, "close".getBytes(), 2, true); mqttConnectOptions.setWill(topic, "close".getBytes(), 2, true);
...@@ -145,7 +146,10 @@ public class MyMqttClient { ...@@ -145,7 +146,10 @@ public class MyMqttClient {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式 SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
logger.info("订阅时间========" + df.format(new Date()) + " " + System.currentTimeMillis()); logger.info("订阅时间========" + df.format(new Date()) + " " + System.currentTimeMillis());
mqttClient.subscribe(topic, qos); mqttClient.subscribe(topic, qos);
mqttClient.setCallback(mqttClientCallback); //mqttClient.setCallback(mqttClientCallback);
// MqttMessage mqttMessage = new MqttMessage();
// String messages = new String(mqttMessage.getPayload());
// return messages;
//logger.info("订阅成功"); //logger.info("订阅成功");
} catch (MqttException me) { } catch (MqttException me) {
me.printStackTrace(); me.printStackTrace();
...@@ -153,6 +157,7 @@ public class MyMqttClient { ...@@ -153,6 +157,7 @@ public class MyMqttClient {
} else { } else {
System.out.println("mqttClient is Error"); System.out.println("mqttClient is Error");
} }
return null;
} }
/** /**
* 订阅主题 * 订阅主题
...@@ -228,7 +233,7 @@ public class MyMqttClient { ...@@ -228,7 +233,7 @@ public class MyMqttClient {
} }
MqttDeliveryToken token; MqttDeliveryToken token;
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式 SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
logger.info("发送时间========" + df.format(new Date())); logger.info("发送时间========" + df.format(new Date()) + " " + System.currentTimeMillis());
logger.info("clientId: " + mqttClient.getClientId() + "发送主题:" + topic + "成功,内容:" + message); logger.info("clientId: " + mqttClient.getClientId() + "发送主题:" + topic + "成功,内容:" + message);
try { try {
token = topic.publish(mqttMessage); token = topic.publish(mqttMessage);
...@@ -288,3 +293,4 @@ public class MyMqttClient { ...@@ -288,3 +293,4 @@ public class MyMqttClient {
return (String[]) queue.poll(); return (String[]) queue.poll();
} }
} }
package com.jbxie.spaceiotmqtt.mqttUtils; package top.ninwoo.mqtt.mqttUtils;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import static com.jbxie.spaceiotmqtt.mqttUtils.MyMqttClient.*; import static top.ninwoo.mqtt.mqttUtils.MyMqttClient.poll;
@Component @Component
public class PublishThread implements Runnable { public class PublishThread implements Runnable {
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cnf-spaceiot-shortConnection</artifactId>
<groupId>top.ninwoo</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>css-common-api</artifactId>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package top.jbxie.raft.client; package top.ninwoo.common.entity;
import lombok.Data; import lombok.Data;
...@@ -10,7 +10,7 @@ import java.io.Serializable; ...@@ -10,7 +10,7 @@ import java.io.Serializable;
@Data @Data
public class ClientKVAck implements Serializable { public class ClientKVAck implements Serializable {
Object result; // logEntry.getCommand(); public Object result; // logEntry.getCommand();
public ClientKVAck(Object result) { public ClientKVAck(Object result) {
this.result = result; this.result = result;
...@@ -49,4 +49,4 @@ public class ClientKVAck implements Serializable { ...@@ -49,4 +49,4 @@ public class ClientKVAck implements Serializable {
} }
} }
\ No newline at end of file
package top.jbxie.raft.client; package top.ninwoo.common.entity;
import lombok.Data; import lombok.Data;
...@@ -15,9 +15,9 @@ public class ClientKVReq implements Serializable { ...@@ -15,9 +15,9 @@ public class ClientKVReq implements Serializable {
int type; int type;
String key; public String key;
String value; public String value;
private ClientKVReq(Builder builder) { private ClientKVReq(Builder builder) {
setType(builder.type); setType(builder.type);
...@@ -77,3 +77,4 @@ public class ClientKVReq implements Serializable { ...@@ -77,3 +77,4 @@ public class ClientKVReq implements Serializable {
} }
} }
} }
...@@ -3,32 +3,25 @@ ...@@ -3,32 +3,25 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent> <parent>
<artifactId>cnf-space-iot</artifactId> <artifactId>cnf-spaceiot-shortConnection</artifactId>
<groupId>top.ninwoo</groupId> <groupId>top.ninwoo</groupId>
<version>1.0-SNAPSHOT</version> <version>1.0-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>space-iot-flooding</artifactId> <artifactId>css-node-ground</artifactId>
<dependencies> <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency> <dependency>
<groupId>top.ninwoo</groupId> <groupId>top.ninwoo</groupId>
<artifactId>cnf-client-starter</artifactId> <artifactId>css-common-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>top.ninwoo</groupId>
<artifactId>spring-boot-starter-test</artifactId> <artifactId>css-node-satellite</artifactId>
<scope>test</scope> <version>1.0-SNAPSHOT</version>
</dependency> </dependency>
</dependencies> </dependencies>
...@@ -38,6 +31,9 @@ ...@@ -38,6 +31,9 @@
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId> <artifactId>spring-boot-maven-plugin</artifactId>
<version>${springboot.version}</version> <version>${springboot.version}</version>
<configuration>
<mainClass>top.ninwoo.ground.GroundMain</mainClass>
</configuration>
<executions> <executions>
<execution> <execution>
<goals> <goals>
......
package top.ninwoo.ground;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class GroundMain {
public static void main(String[] args) {
SpringApplication.run(GroundMain.class, args);
}
}
//package top.ninwoo.ground.register;
//
//import com.alipay.remoting.exception.RemotingException;
//import com.google.common.collect.Lists;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//import top.ninwoo.common.entity.ClientKVReq;
//import top.ninwoo.satellite.current.SleepHelper;
//import top.ninwoo.satellite.entity.LogEntry;
//import top.ninwoo.satellite.rpc.Request;
//import top.ninwoo.satellite.rpc.Response;
//import top.ninwoo.satellite.rpc.RpcClient;
//import top.ninwoo.satellite.rpc.impl.DefaultRpcClient;
//
//import java.util.List;
//import java.util.concurrent.atomic.AtomicLong;
//
//public class RaftClient {
// private static final Logger LOGGER = LoggerFactory.getLogger(RaftClient.class);
//
// private final static RpcClient client = new DefaultRpcClient();
//
// static String addr = "localhost:8775";
// static List<String> list = Lists.newArrayList("localhost:8775","localhost:8776","localhost:8777", "localhost:8778", "localhost:8779");
//
// public static void main(String[] args) throws RemotingException, InterruptedException {
//
// AtomicLong count = new AtomicLong(5);
//
// for (int i = 3; ; i++) {
// try {
// int index = (int) (count.incrementAndGet() % list.size());
// addr = list.get(index);
//
// ClientKVReq obj = ClientKVReq.newBuilder().key("hello:" + i).value("world:" + i).type(ClientKVReq.PUT).build();
//
// Request<ClientKVReq> r = new Request<>();
// r.setObj(obj);
// r.setUrl(addr);
// r.setCmd(Request.CLIENT_REQ);
// Response<String> response;
// try {
// response = client.send(r);
// } catch (Exception e) {
// r.setUrl(list.get((int) ((count.incrementAndGet()) % list.size())));
// response = client.send(r);
// }
//
// LOGGER.info("request content : {}, url : {}, put response : {}", obj.key + "=" + obj.getValue(), r.getUrl(), response.getResult());
//
// SleepHelper.sleep(1000);
//
// obj = ClientKVReq.newBuilder().key("hello:" + i).type(ClientKVReq.GET).build();
//
// addr = list.get(index);
// r.setUrl(addr);
// r.setObj(obj);
//
// Response<LogEntry> response2;
// try {
// response2 = client.send(r);
// } catch (Exception e) {
// r.setUrl(list.get((int) ((count.incrementAndGet()) % list.size())));
// response2 = client.send(r);
// }
//
// LOGGER.info("request content : {}, url : {}, get response : {}", obj.key + "=" + obj.getValue(), r.getUrl(), response2.getResult());
// } catch (Exception e) {
// e.printStackTrace();
// i = i - 1;
// }
//
// SleepHelper.sleep(5000);
// }
//
// }
//}
//package top.ninwoo.ground.register;
//
//import com.google.common.collect.Lists;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//import top.ninwoo.common.entity.ClientKVReq;
//import top.ninwoo.satellite.entity.LogEntry;
//import top.ninwoo.satellite.rpc.Request;
//import top.ninwoo.satellite.rpc.Response;
//import top.ninwoo.satellite.rpc.RpcClient;
//import top.ninwoo.satellite.rpc.impl.DefaultRpcClient;
//
//import java.util.List;
//
//public class RaftClient2 {
// private static final Logger LOGGER = LoggerFactory.getLogger(RaftClient2.class);
//
//
// private final static RpcClient client = new DefaultRpcClient();
//
// static String addr = "localhost:8778";
// static List<String> list3 = Lists.newArrayList("localhost:8777", "localhost:8778", "localhost:8779");
// static List<String> list2 = Lists.newArrayList( "localhost:8777", "localhost:8779");
// static List<String> list1 = Lists.newArrayList( "localhost:8779");
//
// public static void main(String[] args) throws InterruptedException {
// for (int i = 3; ; i++) {
//
// try {
// Request<ClientKVReq> r = new Request<>();
//
// int size = list2.size();
//
// ClientKVReq obj = ClientKVReq.newBuilder().key("hello:" + i).type(ClientKVReq.GET).build();
// int index = (i) % size;
// addr = list2.get(index);
// r.setUrl(addr);
// r.setObj(obj);
// r.setCmd(Request.CLIENT_REQ);
//
// Response<LogEntry> response2 = client.send(r);
//
// LOGGER.info("request content : {}, url : {}, get response : {}", obj.key + "=" + obj.getValue(), r.getUrl(), response2.getResult());
// } catch (Exception e) {
// e.printStackTrace();
// } finally {
// Thread.sleep(1000);
//
// }
//
// }
// }
//}
\ No newline at end of file
//package top.ninwoo.ground.register;
//
//import com.alipay.remoting.exception.RemotingException;
//import com.google.common.collect.Lists;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//import top.ninwoo.common.entity.ClientKVReq;
//import top.ninwoo.satellite.current.SleepHelper;
//import top.ninwoo.satellite.entity.LogEntry;
//import top.ninwoo.satellite.rpc.Request;
//import top.ninwoo.satellite.rpc.Response;
//import top.ninwoo.satellite.rpc.RpcClient;
//import top.ninwoo.satellite.rpc.impl.DefaultRpcClient;
//
//import java.util.List;
//import java.util.concurrent.atomic.AtomicLong;
//
//public class RaftClient3 {
// private static final Logger LOGGER = LoggerFactory.getLogger(RaftClient3.class);
//
//
// private final static RpcClient client = new DefaultRpcClient();
//
// static String addr = "localhost:8777";
// static List<String> list = Lists.newArrayList("localhost:8777", "localhost:8778", "localhost:8779");
//
// public static void main(String[] args) throws RemotingException, InterruptedException {
//
// AtomicLong count = new AtomicLong(3);
//
// int keyNum = 4;
// try {
// int index = (int) (count.incrementAndGet() % list.size());
// index = 1;
// addr = list.get(index);
//
// ClientKVReq obj = ClientKVReq.newBuilder().key("hello:" + keyNum).value("world:" + keyNum).type(ClientKVReq.PUT).build();
//
// Request<ClientKVReq> r = new Request<>();
// r.setObj(obj);
// r.setUrl(addr);
// r.setCmd(Request.CLIENT_REQ);
// Response<String> response = null;
// try {
// response = client.send(r);
// } catch (Exception e) {
// }
//
// LOGGER.info("request content : {}, url : {}, put response : {}", obj.key + "=" + obj.getValue(), r.getUrl(), response.getResult());
//
// SleepHelper.sleep(1000);
//
// obj = ClientKVReq.newBuilder().key("hello:" + keyNum).type(ClientKVReq.GET).build();
//
// addr = list.get(index);
// addr = list.get(index);
// r.setUrl(addr);
// r.setObj(obj);
//
// Response<LogEntry> response2;
// try {
// response2 = client.send(r);
// } catch (Exception e) {
// r.setUrl(list.get((int) ((count.incrementAndGet()) % list.size())));
// response2 = client.send(r);
// }
//
// if (response.getResult() == null) {
// LOGGER.error("request content : {}, url : {}, get response : {}", obj.key + "=" + obj.getValue(), r.getUrl(), response2.getResult());
// System.exit(1);
// return;
// }
// LOGGER.info("request content : {}, url : {}, get response : {}", obj.key + "=" + obj.getValue(), r.getUrl(), response2.getResult());
// } catch (Exception e) {
// e.printStackTrace();
// }
//
// System.exit(1);
//
// }
//}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cnf-spaceiot-shortConnection</artifactId>
<groupId>top.ninwoo</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>css-node-satellite</artifactId>
<dependencies>
<dependency>
<groupId>top.ninwoo</groupId>
<artifactId>css-common-api</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<!--mqtt-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
<version>5.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${springboot.version}</version>
<configuration>
<mainClass>top.ninwoo.satellite.RaftNodeBootStrap</mainClass>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package top.jbxie.raft; package top.ninwoo.satellite;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
......
package top.jbxie.raft.clusterService; package top.ninwoo.satellite.clusterService;
import top.jbxie.raft.nodeCommon.Peer;
import top.ninwoo.satellite.nodeCommon.Peer;
/** /**
* 集群配置变更接口. * 集群配置变更接口.
......
package top.jbxie.raft.clusterService; package top.ninwoo.satellite.clusterService;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
......
package top.jbxie.raft.clusterService; package top.ninwoo.satellite.clusterService;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
......
package top.jbxie.raft.clusterService.impl; package top.ninwoo.satellite.clusterService.impl;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import top.jbxie.raft.clusterService.ClusterMembershipChanges; import top.ninwoo.satellite.clusterService.ClusterMembershipChanges;
import top.jbxie.raft.clusterService.Result; import top.ninwoo.satellite.clusterService.Result;
import top.jbxie.raft.entity.LogEntry; import top.ninwoo.satellite.entity.LogEntry;
import top.jbxie.raft.nodeCommon.NodeStatus; import top.ninwoo.satellite.nodeCommon.NodeStatus;
import top.jbxie.raft.nodeCommon.Peer; import top.ninwoo.satellite.nodeCommon.Peer;
import top.jbxie.raft.rpc.Request; import top.ninwoo.satellite.rpc.Request;
import top.jbxie.raft.rpc.Response; import top.ninwoo.satellite.rpc.Response;
import top.jbxie.raft.util.impl.DefaultNode; import top.ninwoo.satellite.util.impl.DefaultNode;
/** /**
* 集群配置变更接口默认实现. * 集群配置变更接口默认实现.
......
package top.jbxie.raft.current; package top.ninwoo.satellite.current;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
......
package top.jbxie.raft.current; package top.ninwoo.satellite.current;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
......
package top.jbxie.raft.current; package top.ninwoo.satellite.current;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
......
package top.jbxie.raft.entity; package top.ninwoo.satellite.entity;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import top.jbxie.raft.util.Consensus; import top.ninwoo.satellite.util.Consensus;
import java.util.Arrays; import java.util.Arrays;
/** /**
......
package top.jbxie.raft.entity; package top.ninwoo.satellite.entity;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
......
package top.jbxie.raft.entity; package top.ninwoo.satellite.entity;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import top.ninwoo.satellite.util.LogModule;
import java.io.Serializable; import java.io.Serializable;
import java.util.Objects; import java.util.Objects;
import top.jbxie.raft.util.LogModule;
/** /**
* 日志条目 * 日志条目
......
package top.jbxie.raft.entity; package top.ninwoo.satellite.entity;
import top.jbxie.raft.nodeCommon.Peer; import top.ninwoo.satellite.nodeCommon.Peer;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
......
package top.jbxie.raft.entity; package top.ninwoo.satellite.entity;
import top.jbxie.raft.util.Consensus;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import top.ninwoo.satellite.util.Consensus;
/** /**
* 请求投票RPC参数(候选人). * 请求投票RPC参数(候选人).
......
package top.jbxie.raft.exception; package top.ninwoo.satellite.exception;
public class RaftNotSupportException extends RuntimeException { public class RaftNotSupportException extends RuntimeException {
public RaftNotSupportException() { public RaftNotSupportException() {
......
package top.jbxie.raft.exception; package top.ninwoo.satellite.exception;
public class RaftRemotingException extends RuntimeException { public class RaftRemotingException extends RuntimeException {
public RaftRemotingException() { public RaftRemotingException() {
......
package top.ninwoo.satellite.nodeCommon;
import lombok.Data;
import java.util.List;
/**
* 节点配置
*/
@Data
public class NodeConfig {
// 自身节点地址
public String selfAddr;
//所有节点地址
public List<String> peerAddrs;
}
\ No newline at end of file
package top.jbxie.raft.nodeCommon; package top.ninwoo.satellite.nodeCommon;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
......
package top.jbxie.raft.nodeCommon; package top.ninwoo.satellite.nodeCommon;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
......
package top.jbxie.raft.register; package top.ninwoo.satellite.register;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner; import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import top.jbxie.raft.nodeCommon.NodeConfig; import top.ninwoo.satellite.nodeCommon.NodeConfig;
import top.jbxie.raft.service.impl.IPServiceImpl; import top.ninwoo.satellite.service.impl.IPServiceImpl;
import top.jbxie.raft.util.Node; import top.ninwoo.satellite.util.Node;
import top.jbxie.raft.util.impl.DefaultNode; import top.ninwoo.satellite.util.impl.DefaultNode;
import java.util.Arrays; import java.util.Arrays;
...@@ -51,7 +51,7 @@ public class MyApplicationRunner implements ApplicationRunner { ...@@ -51,7 +51,7 @@ public class MyApplicationRunner implements ApplicationRunner {
Thread.currentThread().sleep(20000); Thread.currentThread().sleep(20000);
// 自身节点地址 // 自身节点地址
config.setSelfAddr(ipServiceImpl.getSelfIp()); config.setSelfAddr(ipServiceImpl.getSelfIp() + ":8775");
// 所有节点地址 // 所有节点地址
// Long clusterId = 11113l; // Long clusterId = 11113l;
......
package top.jbxie.raft.rpc; package top.ninwoo.satellite.rpc;
import com.alipay.remoting.AsyncContext; import com.alipay.remoting.AsyncContext;
import com.alipay.remoting.BizContext; import com.alipay.remoting.BizContext;
import com.alipay.remoting.rpc.protocol.AbstractUserProcessor; import com.alipay.remoting.rpc.protocol.AbstractUserProcessor;
import top.jbxie.raft.exception.RaftNotSupportException; import top.ninwoo.satellite.exception.RaftNotSupportException;
public abstract class RaftUserProcessor<T> extends AbstractUserProcessor<T> { public abstract class RaftUserProcessor<T> extends AbstractUserProcessor<T> {
......
package top.jbxie.raft.rpc; package top.ninwoo.satellite.rpc;
import lombok.Data; import lombok.Data;
import top.jbxie.raft.client.ClientKVReq; import top.ninwoo.satellite.entity.AentryParam;
import top.jbxie.raft.entity.AentryParam; import top.ninwoo.satellite.entity.RvoteParam;
import top.jbxie.raft.entity.RvoteParam; import top.ninwoo.common.entity.ClientKVReq;
import java.io.Serializable; import java.io.Serializable;
......
package top.jbxie.raft.rpc; package top.ninwoo.satellite.rpc;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
......
package top.jbxie.raft.rpc; package top.ninwoo.satellite.rpc;
public interface RpcServer { public interface RpcServer {
void start(); void start();
......
package top.jbxie.raft.rpc.impl; package top.ninwoo.satellite.rpc.impl;
import com.alipay.remoting.exception.RemotingException; import com.alipay.remoting.exception.RemotingException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import top.jbxie.raft.exception.RaftRemotingException; import top.ninwoo.satellite.exception.RaftRemotingException;
import top.jbxie.raft.rpc.Request; import top.ninwoo.satellite.rpc.Request;
import top.jbxie.raft.rpc.Response; import top.ninwoo.satellite.rpc.Response;
import top.jbxie.raft.rpc.RpcClient; import top.ninwoo.satellite.rpc.RpcClient;
public class DefaultRpcClient implements RpcClient { public class DefaultRpcClient implements RpcClient {
public static Logger logger = LoggerFactory.getLogger(DefaultRpcClient.class.getName()); public static Logger logger = LoggerFactory.getLogger(DefaultRpcClient.class.getName());
......
package top.jbxie.raft.rpc.impl; package top.ninwoo.satellite.rpc.impl;
import com.alipay.remoting.BizContext; import com.alipay.remoting.BizContext;
import top.jbxie.raft.client.ClientKVReq; import top.ninwoo.common.entity.ClientKVReq;
import top.jbxie.raft.clusterService.ClusterMembershipChanges; import top.ninwoo.satellite.clusterService.ClusterMembershipChanges;
import top.jbxie.raft.entity.AentryParam; import top.ninwoo.satellite.entity.AentryParam;
import top.jbxie.raft.entity.RvoteParam; import top.ninwoo.satellite.entity.RvoteParam;
import top.jbxie.raft.nodeCommon.Peer; import top.ninwoo.satellite.nodeCommon.Peer;
import top.jbxie.raft.rpc.RaftUserProcessor; import top.ninwoo.satellite.rpc.RaftUserProcessor;
import top.jbxie.raft.rpc.Request; import top.ninwoo.satellite.rpc.Request;
import top.jbxie.raft.rpc.Response; import top.ninwoo.satellite.rpc.Response;
import top.jbxie.raft.rpc.RpcServer; import top.ninwoo.satellite.rpc.RpcServer;
import top.jbxie.raft.util.impl.DefaultNode; import top.ninwoo.satellite.util.impl.DefaultNode;
public class DefaultRpcServer implements RpcServer { public class DefaultRpcServer implements RpcServer {
......
package top.jbxie.raft.service; package top.ninwoo.satellite.service;
import java.net.UnknownHostException; import java.net.UnknownHostException;
......
package top.jbxie.raft.service.impl; package top.ninwoo.satellite.service.impl;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import top.jbxie.raft.service.IPService;
import top.ninwoo.bishe.starter.service.NetworkService; import top.ninwoo.bishe.starter.service.NetworkService;
import top.ninwoo.satellite.service.IPService;
import java.net.Inet4Address; import java.net.Inet4Address;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
...@@ -18,7 +19,7 @@ public class IPServiceImpl implements IPService { ...@@ -18,7 +19,7 @@ public class IPServiceImpl implements IPService {
@Override @Override
public String getSelfIp() throws UnknownHostException { public String getSelfIp() throws UnknownHostException {
InetAddress addr = Inet4Address.getLocalHost(); InetAddress addr = Inet4Address.getLocalHost();
return addr.getHostAddress() + ":8775"; return addr.getHostAddress();
} }
@Override @Override
...@@ -36,4 +37,15 @@ public class IPServiceImpl implements IPService { ...@@ -36,4 +37,15 @@ public class IPServiceImpl implements IPService {
return ipList; return ipList;
} }
// @Autowired(required=false)
// private LinuxCtlUtils linuxCtlUtils;
//
// public String getDockerId() {
// String cmd = "head -1 /proc/self/cgroup|cut -d/ -f3|cut -c1-12";
// String res = linuxCtlUtils.runCmd(cmd);
// if(res == null) {
// throw new RuntimeException("get Docker Id Error");
// }
// return res;
// }
} }
package top.jbxie.raft.util; package top.ninwoo.satellite.util;
import top.jbxie.raft.entity.AentryParam;
import top.jbxie.raft.entity.AentryResult; import top.ninwoo.satellite.entity.AentryParam;
import top.jbxie.raft.entity.RvoteParam; import top.ninwoo.satellite.entity.AentryResult;
import top.jbxie.raft.entity.RvoteResult; import top.ninwoo.satellite.entity.RvoteParam;
import top.ninwoo.satellite.entity.RvoteResult;
/** /**
* raft一致性模块 * raft一致性模块
......
package top.jbxie.raft.util; package top.ninwoo.satellite.util;
import top.jbxie.raft.entity.LogEntry; import top.ninwoo.satellite.entity.LogEntry;
/** /**
* @see top.jbxie.raft.entity.LogEntry; * @see LogEntry ;
*/ */
public interface LogModule { public interface LogModule {
void write(LogEntry logEntry); void write(LogEntry logEntry);
......
package top.jbxie.raft.util; package top.ninwoo.satellite.util;
public class LongConvert { public class LongConvert {
public static long convert(Long l) { public static long convert(Long l) {
......
package top.jbxie.raft.util; package top.ninwoo.satellite.util;
import top.jbxie.raft.client.ClientKVAck; import org.eclipse.paho.client.mqttv3.MqttException;
import top.jbxie.raft.client.ClientKVReq; import top.ninwoo.common.entity.ClientKVAck;
import top.jbxie.raft.entity.*; import top.ninwoo.common.entity.ClientKVReq;
import top.jbxie.raft.nodeCommon.NodeConfig; import top.ninwoo.satellite.entity.AentryParam;
import top.ninwoo.satellite.entity.AentryResult;
import top.ninwoo.satellite.entity.RvoteParam;
import top.ninwoo.satellite.entity.RvoteResult;
import top.ninwoo.satellite.nodeCommon.NodeConfig;
/** /**
* 节点功能 * 节点功能
...@@ -14,7 +18,7 @@ public interface Node<T> extends LifeCycle { ...@@ -14,7 +18,7 @@ public interface Node<T> extends LifeCycle {
* 设置配置文件. * 设置配置文件.
* @param config * @param config
*/ */
void setConfig(NodeConfig config); void setConfig(NodeConfig config) throws MqttException;
/** /**
* 处理请求投票 RPC. * 处理请求投票 RPC.
......
package top.jbxie.raft.util; package top.ninwoo.satellite.util;
import top.jbxie.raft.entity.LogEntry;
import top.ninwoo.satellite.entity.LogEntry;
/** /**
* 状态机接口 * 状态机接口
......
package top.jbxie.raft.util.impl; package top.ninwoo.satellite.util.impl;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import lombok.Data; import lombok.Data;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest; import top.ninwoo.satellite.entity.*;
import top.jbxie.raft.clusterService.Server; import top.ninwoo.satellite.nodeCommon.NodeStatus;
import top.jbxie.raft.entity.*; import top.ninwoo.satellite.nodeCommon.Peer;
import top.jbxie.raft.nodeCommon.NodeStatus; import top.ninwoo.satellite.util.Consensus;
import top.jbxie.raft.nodeCommon.Peer;
import top.jbxie.raft.util.Consensus;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
......
package top.jbxie.raft.util.impl; package top.ninwoo.satellite.util.impl;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import org.rocksdb.Options; import org.rocksdb.Options;
...@@ -9,11 +8,8 @@ import org.rocksdb.RocksDB; ...@@ -9,11 +8,8 @@ import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import top.ninwoo.satellite.entity.LogEntry;
import org.springframework.core.env.Environment; import top.ninwoo.satellite.util.LogModule;
import top.jbxie.raft.entity.LogEntry;
import top.jbxie.raft.nodeCommon.NodeConfig;
import top.jbxie.raft.util.LogModule;
import java.io.File; import java.io.File;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
......
package top.jbxie.raft.util.impl; package top.ninwoo.satellite.util.impl;
import lombok.Data; import lombok.Data;
import lombok.Getter; import org.eclipse.paho.client.mqttv3.MqttException;
import lombok.Setter;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import top.jbxie.raft.client.ClientKVAck; import top.ninwoo.common.entity.ClientKVAck;
import top.jbxie.raft.client.ClientKVReq; import top.ninwoo.common.entity.ClientKVReq;
import top.jbxie.raft.clusterService.ClusterMembershipChanges; import top.ninwoo.satellite.clusterService.ClusterMembershipChanges;
import top.jbxie.raft.clusterService.Result; import top.ninwoo.satellite.clusterService.Result;
import top.jbxie.raft.clusterService.impl.ClusterMembershipChangesImpl; import top.ninwoo.satellite.current.RaftThreadPool;
import top.jbxie.raft.current.RaftThreadPool; import top.ninwoo.satellite.entity.*;
import top.jbxie.raft.entity.*; import top.ninwoo.satellite.exception.RaftRemotingException;
import top.jbxie.raft.exception.RaftRemotingException; import top.ninwoo.satellite.nodeCommon.NodeConfig;
import top.jbxie.raft.nodeCommon.NodeConfig; import top.ninwoo.satellite.nodeCommon.NodeStatus;
import top.jbxie.raft.nodeCommon.NodeStatus; import top.ninwoo.satellite.nodeCommon.Peer;
import top.jbxie.raft.nodeCommon.Peer; import top.ninwoo.satellite.nodeCommon.PeerSet;
import top.jbxie.raft.nodeCommon.PeerSet; import top.ninwoo.satellite.rpc.Request;
import top.jbxie.raft.rpc.Request; import top.ninwoo.satellite.rpc.Response;
import top.jbxie.raft.rpc.Response; import top.ninwoo.satellite.rpc.RpcClient;
import top.jbxie.raft.rpc.RpcClient; import top.ninwoo.satellite.rpc.RpcServer;
import top.jbxie.raft.rpc.RpcServer; import top.ninwoo.satellite.rpc.impl.DefaultRpcClient;
import top.jbxie.raft.rpc.impl.DefaultRpcClient; import top.ninwoo.satellite.rpc.impl.DefaultRpcServer;
import top.jbxie.raft.rpc.impl.DefaultRpcServer; import top.ninwoo.satellite.util.mqttUtils.MyMqttClient;
import top.jbxie.raft.util.*; import top.ninwoo.satellite.util.*;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static top.jbxie.raft.nodeCommon.NodeStatus.LEADER; import static top.ninwoo.satellite.nodeCommon.NodeStatus.LEADER;
/** /**
* 抽象机器节点, 初始为 follower, 角色随时变化. * 抽象机器节点, 初始为 follower, 角色随时变化.
...@@ -101,6 +100,8 @@ public class DefaultNode<T> implements Node<T>, LifeCycle, ClusterMembershipChan ...@@ -101,6 +100,8 @@ public class DefaultNode<T> implements Node<T>, LifeCycle, ClusterMembershipChan
public RpcClient rpcClient = new DefaultRpcClient(); public RpcClient rpcClient = new DefaultRpcClient();
public MyMqttClient myMqttClient;
public StateMachine stateMachine; public StateMachine stateMachine;
/* ============================== */ /* ============================== */
...@@ -128,16 +129,17 @@ public class DefaultNode<T> implements Node<T>, LifeCycle, ClusterMembershipChan ...@@ -128,16 +129,17 @@ public class DefaultNode<T> implements Node<T>, LifeCycle, ClusterMembershipChan
* @param config * @param config
*/ */
@Override @Override
public void setConfig(NodeConfig config) { public void setConfig(NodeConfig config) throws MqttException {
this.config = config; this.config = config;
stateMachine = DefaultStateMachine.getInstance(); stateMachine = DefaultStateMachine.getInstance();
// myMqttClient = new MyMqttClient();
logModule = DefaultLogModule.getInstance(); logModule = DefaultLogModule.getInstance();
peerSet = PeerSet.getInstance(); peerSet = PeerSet.getInstance();
for (String s : config.getPeerAddrs()) { for (String s : config.getPeerAddrs()) {
Peer peer = new Peer(s); Peer peer = new Peer(s);
peerSet.addPeer(peer); peerSet.addPeer(peer);
//if (s.equals("localhost:" + config.getSelfAddr())) { // if (s.equals("localhost:" + config.getSelfAddr())) {
if (s.equals(config.getSelfAddr())) { if (s.equals(config.getSelfAddr())) {
peerSet.setSelf(peer); peerSet.setSelf(peer);
} }
......
package top.jbxie.raft.util.impl; package top.ninwoo.satellite.util.impl;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import lombok.Getter; import lombok.Getter;
...@@ -8,12 +8,9 @@ import org.rocksdb.RocksDB; ...@@ -8,12 +8,9 @@ import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import top.ninwoo.satellite.entity.Command;
import org.springframework.core.env.Environment; import top.ninwoo.satellite.entity.LogEntry;
import top.jbxie.raft.entity.Command; import top.ninwoo.satellite.util.StateMachine;
import top.jbxie.raft.entity.LogEntry;
import top.jbxie.raft.nodeCommon.NodeConfig;
import top.jbxie.raft.util.StateMachine;
import java.io.File; import java.io.File;
......
package top.ninwoo.satellite.util.mqttUtils;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 发布消息的回调类
*
* 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
* 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。
* 在回调中,将它用来标识已经启动了该回调的哪个实例。
* 必须在回调类中实现三个方法:
*
* public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
*
* public void connectionLost(Throwable cause)在断开连接时调用。
*
* public void deliveryComplete(MqttDeliveryToken token))
* 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
* 由 MqttClient.connect 激活此回调。
*
*/
@Slf4j
@Component
public class MqttClientCallback implements MqttCallback {
private static final Logger logger = LoggerFactory.getLogger(MqttClientCallback.class);
@Override
public void connectionLost(Throwable arg0) {
logger.info("mqtt连接断开,可以做重连");
MyMqttClient.reConnect();
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
String messages = new String(mqttMessage.getPayload());
if(!messages.equals("close")) {
System.out.println("{");
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + mqttMessage.getQos());
System.out.println("接收消息内容 : " + messages);
System.out.println("}");
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
logger.info("接收时间========" + df.format(new Date()) + " " + System.currentTimeMillis());
}
}
public void perform(String topicP, JSONObject json) throws MqttException, UnsupportedEncodingException {
//你的业务模块
}
}
package top.ninwoo.satellite.util.mqttUtils;
import org.springframework.stereotype.Component;
import static top.ninwoo.satellite.util.mqttUtils.MyMqttClient.poll;
@Component
public class PublishThread implements Runnable {
@Override
public void run()
{
String[] obj = null;
System.out.println("mqtt publish thread start");
while (true) {
obj = poll();
if (obj != null) {
String topic = obj[0];
String content = obj[1];
int qos = Integer.valueOf(obj[2]);
System.out.println("mqtt从队列取出topic:" + topic + ",content:" + content + ",qos" + qos);
try {
MyMqttClient.pubMessage(1, true, topic, content);
System.out.println("发送mqtt消息,topic: "+ topic +" ,content: " + content);
} catch (Exception e) {
System.out.println("发消息给设备,topic:" + topic + ",content:" + content);
e.printStackTrace();
}
} else{
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
package top.jbxie.raft.impl; package top.ninwoo.satellite.impl;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import top.jbxie.raft.entity.Command; import top.ninwoo.satellite.entity.Command;
import top.jbxie.raft.entity.LogEntry; import top.ninwoo.satellite.entity.LogEntry;
import top.jbxie.raft.util.impl.DefaultLogModule; import top.ninwoo.satellite.util.impl.DefaultLogModule;
public class DefaultLogModuleTest { public class DefaultLogModuleTest {
static { static {
......
package top.jbxie.raft.impl; package top.ninwoo.satellite.impl;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
import top.jbxie.raft.entity.Command; import top.ninwoo.satellite.entity.Command;
import top.jbxie.raft.entity.LogEntry; import top.ninwoo.satellite.entity.LogEntry;
import top.jbxie.raft.util.impl.DefaultStateMachine; import top.ninwoo.satellite.util.impl.DefaultStateMachine;
public class DefaultStateMachineTest { public class DefaultStateMachineTest {
static { static {
......
package top.jbxie.raft.impl; package top.ninwoo.satellite.impl;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.rocksdb.Options; import org.rocksdb.Options;
import org.rocksdb.RocksDB; import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
import org.springframework.beans.factory.annotation.Autowired;
import top.jbxie.raft.util.impl.DefaultLogModule;
import java.io.File; import java.io.File;
......
package top.jbxie.raft.rpc; package top.ninwoo.satellite.rpc;
import com.alipay.remoting.exception.RemotingException; import com.alipay.remoting.exception.RemotingException;
import top.jbxie.raft.rpc.Request;
import top.jbxie.raft.rpc.Response;
public class RpcClientTest { public class RpcClientTest {
private final static com.alipay.remoting.rpc.RpcClient CLIENT = new com.alipay.remoting.rpc.RpcClient(); private final static com.alipay.remoting.rpc.RpcClient CLIENT = new com.alipay.remoting.rpc.RpcClient();
......
package top.jbxie.raft.rpc; package top.ninwoo.satellite.rpc;
import com.alipay.remoting.BizContext; import com.alipay.remoting.BizContext;
import top.jbxie.raft.rpc.RaftUserProcessor;
import top.jbxie.raft.rpc.Request;
import top.jbxie.raft.rpc.Response;
public class RpcServerTest { public class RpcServerTest {
private static com.alipay.remoting.rpc.RpcServer rpcServer; private static com.alipay.remoting.rpc.RpcServer rpcServer;
......
...@@ -10,21 +10,18 @@ ...@@ -10,21 +10,18 @@
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>top.jbxie</groupId> <artifactId>cnf-spaceiot-shortConnection</artifactId>
<artifactId>cnf-space-iot</artifactId>
<packaging>pom</packaging> <packaging>pom</packaging>
<modules> <modules>
<module>space-iot-mqtt</module> <module>css-node-satellite</module>
<module>space-iot-flooding</module> <module>css-node-ground</module>
<module>space-iot-raft</module> <module>css-common-api</module>
</modules> </modules>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>top.ninwoo</groupId> <groupId>top.ninwoo</groupId>
<artifactId>cnf-client-starter</artifactId> <artifactId>cnf-client-starter</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.projectlombok</groupId> <groupId>org.projectlombok</groupId>
...@@ -60,13 +57,56 @@ ...@@ -60,13 +57,56 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework</groupId> <groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId> <artifactId>spring-context</artifactId>
<version>${spring.version}</version> <version>${spring.version}</version>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.0-jre</version>
</dependency>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>6.13.3</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>hessian</artifactId>
<version>4.0.4</version>
</dependency>
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>bolt</artifactId>
<version>1.6.3</version>
<exclusions>
<exclusion>
<artifactId>slf4j2-api</artifactId>
<groupId>org.slf4j2</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies> </dependencies>
<build> <build>
...@@ -88,5 +128,4 @@ ...@@ -88,5 +128,4 @@
</plugins> </plugins>
</build> </build>
</project> </project>
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment