Commit 6504913a authored by 谢建斌's avatar 谢建斌

raft单机多节点

parent 16b2a979
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cnf</artifactId>
<groupId>top.ninwoo</groupId>
<version>1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>top.jbxie</groupId>
<artifactId>cnf-space-iot</artifactId>
<packaging>pom</packaging>
<modules>
<module>space-iot-mqtt</module>
<module>space-iot-flooding</module>
<module>space-iot-raft</module>
</modules>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${springboot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${springboot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${springboot.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${springboot.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${springboot.version}</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cnf-space-iot</artifactId>
<groupId>top.ninwoo</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>space-iot-flooding</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>top.ninwoo</groupId>
<artifactId>cnf-client-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${springboot.version}</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package com.jbxie.spaceiotflooding;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class FloodingMain {
public static void main(String[] args) {
}
}
package com.jbxie.spaceiotflooding.entity;
import lombok.Data;
import java.util.ArrayList;
@Data
public class DataPackage {
public int[][] nodeLink = {{0,1,1,0,0,1,0,0,0,0},
{1,0,1,0,0,0,0,0,0,1},
{1,1,0,0,0,0,0,0,0,0},
{0,0,0,0,0,1,0,0,0,0},
{0,0,0,0,0,1,0,0,0,0},
{1,0,0,1,1,0,0,1,0,0},
{0,0,0,0,0,0,0,1,0,0},
{0,0,0,0,0,1,1,0,1,1},
{0,0,0,0,0,0,0,1,0,0},
{0,1,0,0,0,0,0,1,0,0}};
// 计数器
private int counter;
// 传输路径
private ArrayList<String> route;
public DataPackage(int counter) {
this.counter = counter;
}
public void decrement() {
this.counter = this.counter - 1;
}
@SuppressWarnings("unchecked")
@Override
public DataPackage clone() {
DataPackage result = null;
try {
result = (DataPackage) super.clone();
result.route = (ArrayList<String>) this.route.clone();
} catch (CloneNotSupportedException e) {
e.printStackTrace();
}
return result;
}
@Override
public String toString() {
return String.format("报文的传输路径为: %s", route);
}
//业务标识
private String businessName;
//订阅端ip
private String subIp;
//订阅主题
private String topic;
//消息qos
private String qos;
//发布端ip
private String pubIp;
}
package com.jbxie.spaceiotflooding.entity;
import lombok.Data;
import java.util.List;
@Data
public class Node {
// 结点名称
private String name;
// 当前节点已处理的业务列表
private List<String> businessList;
public Node(String name) {
this.name = name;
}
}
\ No newline at end of file
package com.jbxie.spaceiotflooding.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import top.ninwoo.bishe.starter.service.NetworkService;
import java.util.List;
@Service
public class IPService {
@Autowired
private NetworkService networkService;
public String getIpByAppName(String appName) {
List<String> ipList = networkService.getIpListByAppName(11111l, appName);
if (!ipList.isEmpty()) {
String ip_tmp = ipList.get(0);
String[] split_list = ip_tmp.split("/");
return split_list[0];
}
return null;
}
}
package com.jbxie.spaceiotflooding.service;
import com.jbxie.spaceiotflooding.entity.DataPackage;
import com.jbxie.spaceiotflooding.entity.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import top.ninwoo.common.entity.DockerContainer;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
public class NodeService {
private static final Logger logger = LoggerFactory.getLogger(NodeService.class);
@Autowired
DockerContainer dockerContainer;
@Autowired
IPService ipService;
/**
* 获取当前节点名
* (需要重写)
*/
public String getName() {
return dockerContainer.getName();
}
/**
* 当前节点接收到数据包进行处理
* 先写一个单向的
* @param dataPackage
*/
private void accept(DataPackage dataPackage) {
String currentName = getName();
int currentHop = getCurrentHop(currentName);
Node node = new Node(currentName);
ServerSocket serverSocket = null;
InputStream inputStream = null;
ObjectInputStream objectInputStream = null;
try {
serverSocket = new ServerSocket(2021);
while (true) {
Socket client = serverSocket.accept();
inputStream = client.getInputStream();
objectInputStream = new ObjectInputStream(new BufferedInputStream(inputStream));
Object object = objectInputStream.readObject();
DataPackage dataPackage1 = (DataPackage) object;
List<String> businessList = node.getBusinessList();
String businessName = dataPackage.getBusinessName();
ArrayList<String> route = dataPackage.getRoute();
route.add("sate" + currentHop);
// 如果计数器仍然等于零或当前节点已经是最终节点,则打印路由信息
// 如果节点业务列表已包含当前传输的业务则终止
// 否则继续传输
if (dataPackage.getCounter() == 0) {
logger.info("传输失败,已超出生命周期:" + dataPackage);
continue;
} else if (dataPackage.getPubIp() == ipService.getIpByAppName(currentName)) {
logger.info("传输成功: " + dataPackage);
continue;
} else if (businessList.contains(businessName)){
logger.info("该节点已处理,该链路传输终止");
continue;
} else {
dataPackage.decrement();
businessList.add(businessName);
node.setBusinessList(businessList);
List<Integer> relativeNodesList = getLink(currentName, dataPackage);
for (int nextNode : relativeNodesList) {
transferPackage(dataPackage, nextNode);
}
continue;
}
}
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
} finally {
try {
if (objectInputStream != null) {
objectInputStream.close();
}
if (inputStream != null) {
inputStream.close();
}
if (serverSocket != null) {
serverSocket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 根据节点名和拓扑获取当前节点连接关系
* @param name
* @param dataPackage
* @return
*/
private List<Integer> getLink(String name, DataPackage dataPackage) {
List<Integer> relativeNodesList = new ArrayList<>();
int[][] nodeLink = dataPackage.getNodeLink();
int currentHop = getCurrentHop(name);
for (int i = 0; i < nodeLink[currentHop - 1].length; i++) {
if (i == 1) {
relativeNodesList.add(i + 1);
}
}
return relativeNodesList;
}
/**
* 获取当前节点序列号
* @param name
* @return
*/
private int getCurrentHop (String name) {
if(name == null || name.length() == 0) {
throw new RuntimeException("节点名序号错误");
}
return Integer.valueOf(name.substring(4, name.length()));
}
/**
* 发送数据包
* @param dataPackage
* @param nextHop
*/
private void transferPackage(DataPackage dataPackage, int nextHop) {
//获取目标节点ip
String node_name = "node" + nextHop;
String IP = ipService.getIpByAppName(node_name);
//发送至下一节点
send(dataPackage, IP, 2021);
}
private void send(DataPackage dataPackage, String ip, int port) {
Socket socket = null;
try {
socket = new Socket(ip, port);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(dataPackage);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (socket != null) {
socket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cnf-space-iot</artifactId>
<groupId>top.ninwoo</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>space-iot-mqtt</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>top.ninwoo</groupId>
<artifactId>cnf-client-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
<version>5.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.41</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${springboot.version}</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package com.jbxie.spaceiotmqtt;
import com.jbxie.spaceiotmqtt.mqttUtils.MyMqttClient;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class IotMqttMain {
public static void main(String[] args) {
String broker = "tcp://192.168.31.198:1883";
// MyMqttClient mqttClient_sub = new MyMqttClient();
// MyMqttClient mqttClient_pub = new MyMqttClient();
//
// mqttClient_sub.createClient(broker);
// mqttClient_pub.createClient(broker);
//
// mqttClient_pub.pubMessage("world/test1", "hello:test_1", 1);
// mqttClient_sub.subTopic("world/test1");
MyMqttClient mqttClient = new MyMqttClient();
mqttClient.createClient(broker);
mqttClient.subTopic("world/test1", 1);
mqttClient.pubMessage(1, "world/test1", "hello:test_1");
SpringApplication.run(IotMqttMain.class, args);
}
}
package com.jbxie.spaceiotmqtt.common;
import lombok.Data;
import java.util.ArrayList;
@Data
public class DataPackage {
public int[][] nodeLink = {{0,1,1,0,0,1,0,0,0,0},
{1,0,1,0,0,0,0,0,0,1},
{1,1,0,0,0,0,0,0,0,0},
{0,0,0,0,0,1,0,0,0,0},
{0,0,0,0,0,1,0,0,0,0},
{1,0,0,1,1,0,0,1,0,0},
{0,0,0,0,0,0,0,1,0,0},
{0,0,0,0,0,1,1,0,1,1},
{0,0,0,0,0,0,0,1,0,0},
{0,1,0,0,0,0,0,1,0,0}};
// 计数器
private int counter;
// 传输路径
private ArrayList<String> route;
public DataPackage(int counter) {
this.counter = counter;
}
public void decrement() {
this.counter = this.counter - 1;
}
@SuppressWarnings("unchecked")
@Override
public DataPackage clone() {
DataPackage result = null;
try {
result = (DataPackage) super.clone();
result.route = (ArrayList<String>) this.route.clone();
} catch (CloneNotSupportedException e) {
e.printStackTrace();
}
return result;
}
@Override
public String toString() {
return String.format("报文的传输路径为: %s", route);
}
//业务标识
private String businessName;
//订阅端ip
private String subIp;
//订阅主题
private String topic;
//消息qos
private String qos;
//发布端ip
private String pubIp;
}
package com.jbxie.spaceiotmqtt.common;
import lombok.Data;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@Data
public class Node {
// 结点名称
private String name;
// 当前节点已处理的业务列表
private List<String> businessList;
public Node(String name) {
this.name = name;
}
}
package com.jbxie.spaceiotmqtt.controller;
import com.jbxie.spaceiotmqtt.mqttUtils.MyMqttClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Random;
@RestController
@RequestMapping("/")
public class MqttController {
@Autowired
MyMqttClient myMqttClient;
@GetMapping(value = "/publishTopic")
public String publishTopic() {
String topicString = "test";
myMqttClient.pubMessage(0, false, topicString, "测试一下发布消息");
return "ok";
}
// 发送自定义消息内容(使用默认主题)
@RequestMapping("/publishTopic/{data}")
public String test1(@PathVariable("data") String data) {
String topicString = "test";
myMqttClient.pubMessage(0,false,topicString, data);
return "ok";
}
// 发送自定义消息内容,且指定主题
@RequestMapping("/publishTopic/{topic}/{data}")
public String test2(@PathVariable("topic") String topic, @PathVariable("data") String data) {
myMqttClient.pubMessage(0,false,topic, data);
return "ok";
}
@RequestMapping("/mqttsub")
public String mqttsub() {
String TOPIC1="test_topic1";
String TOPIC2="test_topic2";
String TOPIC3="test_topic3";
String TOPIC4="test_topic4";
int Qos1=1;
int Qos2=1;
int Qos3=1;
int Qos4=1;
String[] topics={TOPIC1,TOPIC2,TOPIC3,TOPIC4};
int[] qos={Qos1,Qos2,Qos3,Qos4};
myMqttClient.subTopic(topics,qos);
return "订阅主题";
}
@RequestMapping("/mqttest")
public String mqtest(){
String TOPIC1="test_topic1";
String TOPIC2="test_topic2";
String TOPIC3="test_topic3";
String TOPIC4="test_topic4";
String[] topics={TOPIC1,TOPIC2,TOPIC3,TOPIC4};
myMqttClient.pubMessage(topics[new Random().nextInt(5)], "测试消息发送");
return "发送成功吗!!!!!!!!";
}
}
package com.jbxie.spaceiotmqtt.mqttUtils;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 发布消息的回调类
*
* 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
* 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。
* 在回调中,将它用来标识已经启动了该回调的哪个实例。
* 必须在回调类中实现三个方法:
*
* public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
*
* public void connectionLost(Throwable cause)在断开连接时调用。
*
* public void deliveryComplete(MqttDeliveryToken token))
* 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
* 由 MqttClient.connect 激活此回调。
*
*/
@Slf4j
@Component
public class MqttClientCallback implements MqttCallback {
private static final Logger logger = LoggerFactory.getLogger(MqttClientCallback.class);
@Override
public void connectionLost(Throwable arg0) {
//logger.info("连接断开,可以做重连");
System.out.println("mqtt失去了连接");
MyMqttClient.reConnect();
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
//logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
System.out.println("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
// subscribe后得到的消息会执行到这里面
// logger.info("接收消息主题 : " + topic);
// logger.info("接收消息Qos : " + mqttMessage.getQos());
// logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));
String messages = new String(mqttMessage.getPayload());
if(!messages.equals("close")){
System.out.println("{");
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + mqttMessage.getQos());
System.out.println("接收消息内容 : " + messages);
System.out.println("}");
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
logger.info("接收时间========" + df.format(new Date()) + " " + System.currentTimeMillis());
// try {
// perform(topic,json);
// }catch (Exception e){
// }
}
// if (MyMqttClient.executorService != null) {
// MyMqttClient.executorService.execute(new HandlerThread(topic, content));
// }
}
public void perform(String topicP, JSONObject json) throws MqttException, UnsupportedEncodingException {
//你的业务模块
}
}
package com.jbxie.spaceiotmqtt.mqttUtils;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
/**
* mqtt订阅消息客户端
*/
@Component
public class MyMqttClient {
private static final Logger logger = LoggerFactory.getLogger(MyMqttClient.class);
//@Autowired
private static MqttClientCallback mqttClientCallback = new MqttClientCallback();
public static MqttClient mqttClient = null;
private static MemoryPersistence memoryPersistence = null;
private static MqttConnectOptions mqttConnectOptions = null;
/**
* 要发布的消息队列
*/
private static LinkedBlockingQueue<String[]> queue = new LinkedBlockingQueue<String[]>();
public static ExecutorService executorService = Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors() + 1);
/**
* mqtt broker 连接配置,填自己的mqtt地址,及账号密码
*/
//private static String broker; //= "tcp://192.168.31.198:1883";
private static String clientId;
private static String username = "xjb";
private static String password = "123456";
/**
* 创建订阅客户端
*/
public void createClient(String broker) {
System.out.println("create mqttClient connect " + broker);
//String clientId = UUID.randomUUID().toString().trim().replaceAll("-", "");
clientId = "mqttServer" + String.valueOf(System.currentTimeMillis());
System.out.println("mqtt clientId = " + clientId);
try {
memoryPersistence = new MemoryPersistence(); //设置持久化方式
mqttClient = new MqttClient(broker, clientId, memoryPersistence); //MemoryPersistence设置clientid的保存形式,默认为以内存保存
if (!mqttClient.isConnected()) {
//logger.info("客户端连接mqtt-broker: " + broker);
System.out.println("create mqtt clientConnection");
try {
mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setCleanSession(true); //true可以安全地使用内存持久性作为客户端断开连接时清除的所有状态
mqttConnectOptions.setMaxInflight(100000);
mqttConnectOptions.setConnectionTimeout(30); //设置连接超时,单位为秒
//mqttConnectOptions.setKeepAliveInterval(20); //保持连接数,设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
if (username != null && !"".equals(username)) {
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
}
mqttClient.setCallback(mqttClientCallback); //客户端添加回调函数
mqttClient.connect(mqttConnectOptions);
System.out.println(mqttClient.isConnected());
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 重新连接
*/
public static void reConnect() {
while (true) {
try {
if (mqttClient != null && !mqttClient.isConnected() && mqttConnectOptions != null) {
Thread.sleep(1000);
clientId = "mqttServer" + String.valueOf(System.currentTimeMillis());
mqttClient.connect(mqttConnectOptions);
logger.info("=======尝试重新连接==============");
break;
}
} catch (MqttException | InterruptedException e) {
logger.info("=======重新连接失败:{}==============", e.toString());
continue;
}
}
}
/**
* 关闭连接
*/
public void closeConnect() {
//关闭存储方式
if(null != memoryPersistence) {
try {
memoryPersistence.close();
} catch (MqttPersistenceException e) {
e.printStackTrace();
}
} else {
System.out.println("memoryPersistence is null");
}
//关闭连接
if( mqttClient != null) {
if(mqttClient.isConnected()) {
try {
mqttClient.disconnect();
mqttClient.close();
} catch (MqttException me) {
me.printStackTrace();
}
} else {
System.out.println("mqttClient is not connect");
}
} else {
System.out.println("mqttClient is null");
}
}
/**
* 订阅主题
*/
public static void subTopic(String topic, int qos) {
if (mqttClient != null && mqttClient.isConnected()) {
try {
mqttConnectOptions.setWill(topic, "close".getBytes(), 2, true);
logger.info("clientId: " + mqttClient.getClientId() + "订阅主题" + topic);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
logger.info("订阅时间========" + df.format(new Date()) + " " + System.currentTimeMillis());
mqttClient.subscribe(topic, qos);
mqttClient.setCallback(mqttClientCallback);
//logger.info("订阅成功");
} catch (MqttException me) {
me.printStackTrace();
}
} else {
System.out.println("mqttClient is Error");
}
}
/**
* 订阅主题
*/
public static void subTopic(String[] topic, int[] qos) {
if (mqttClient != null && mqttClient.isConnected()) {
try {
//logger.info("clientId: " + mqttClient.getClientId() + "订阅主题" + topic);
System.out.println("mqttClient subscribe topic : " + topic);
mqttClient.subscribe(topic, qos);
mqttClient.setCallback(mqttClientCallback);
//logger.info("订阅成功");
} catch (MqttException me) {
me.printStackTrace();
}
} else {
System.out.println("mqttClient is Error");
}
}
/**
* 清空主题
*/
public static void cleanTopic(String topic) {
if (mqttClient != null && !mqttClient.isConnected()) {
try {
mqttClient.unsubscribe(topic);
} catch (MqttException me) {
me.printStackTrace();
}
} else {
System.out.println("mqttClient is Error");
}
}
/**
* 发布,默认qos为0,非持久化
* @param topic
* @param message
*/
public static void pubMessage(String topic, String message) {
pubMessage(0, false, topic, message);
}
/**
* 发布消息的服务质量(推荐为:2-确保消息到达一次。0-至多一次到达;1-至少一次到达,可能重复),
* retained 默认:false-非持久化(是指一条消息消费完,就会被删除;持久化,消费完,还会保存在服务器中,当新的订阅者出现,继续给新订阅者消费)
* @param topic
* @param message
*/
public static void pubMessage(int qos, String topic, String message){
pubMessage(qos, false, topic, message);
}
/**
* 发布消息
* @param pubTopic
* @param message
* @param qos
* @param retained
*/
public static void pubMessage(int qos, boolean retained, String pubTopic, String message) {
if (mqttClient != null && mqttClient.isConnected()) {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
mqttMessage.setPayload(message.getBytes());
MqttTopic topic = mqttClient.getTopic(pubTopic);
if (topic == null) {
logger.error("topic not exist");
}
MqttDeliveryToken token;
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
logger.info("发送时间========" + df.format(new Date()));
logger.info("clientId: " + mqttClient.getClientId() + "发送主题:" + topic + "成功,内容:" + message);
try {
token = topic.publish(mqttMessage);
token.waitForCompletion();
//logger.info("message is published completely!" + token.isComplete());
System.out.println("message is published completely! "
+ token.isComplete());
} catch (MqttPersistenceException me) {
logger.error("publish fail", me);
me.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}else {
reConnect();
}
}
/**
* 发布消息
*/
public static void pubMessage(MqttTopic topic, MqttMessage message) throws MqttPersistenceException,
MqttException {
if (topic == null) {
logger.error("topic not exist");
}
MqttDeliveryToken token = topic.publish(message);
token.waitForCompletion();
//logger.info("message is published completely!" + token.isComplete());
System.out.println("message is published completely! "
+ token.isComplete());
}
/**
* 循环从队列queue获取要发布的消息发布
*/
public void pubMessageFromQueue() {
//启动发布消息的线程, 循环从队列queue获取要发布的消息发布。也就是说要发消息,只要将消息写入队列queue。
new Thread(new PublishThread()).start();
}
/**
* 将要发布的消息添加到队列
* @param topic
* @param content
*/
public static void publish_common(String topic, String content, int qos) {
String[] array = {topic, content, String.valueOf(qos)};
//要发布的消息,入队列。
//PublishThread会循环从这个队列获取消息发布.
queue.offer(array);
}
/**
* 从队列获取要发布的信息
*/
public static String[] poll() {
return (String[]) queue.poll();
}
}
package com.jbxie.spaceiotmqtt.mqttUtils;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.stereotype.Component;
import static com.jbxie.spaceiotmqtt.mqttUtils.MyMqttClient.*;
@Component
public class PublishThread implements Runnable {
@Override
public void run()
{
String[] obj = null;
System.out.println("mqtt publish thread start");
while (true) {
obj = poll();
if (obj != null) {
String topic = obj[0];
String content = obj[1];
int qos = Integer.valueOf(obj[2]);
System.out.println("mqtt从队列取出topic:" + topic + ",content:" + content + ",qos" + qos);
try {
MyMqttClient.pubMessage(1, true, topic, content);
System.out.println("发送mqtt消息,topic: "+ topic +" ,content: " + content);
} catch (Exception e) {
System.out.println("发消息给设备,topic:" + topic + ",content:" + content);
e.printStackTrace();
}
} else{
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
package com.jbxie.spaceiotmqtt.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import top.ninwoo.bishe.starter.service.NetworkService;
import java.util.List;
@Service
public class IPService {
@Autowired
private NetworkService networkService;
public String getIpByAppName(String appName) {
List<String> ipList = networkService.getIpListByAppName(11111l, appName);
if (!ipList.isEmpty()) {
String ip_tmp = ipList.get(0);
String[] split_list = ip_tmp.split("/");
return split_list[0];
}
return null;
}
}
package com.jbxie.spaceiotmqtt.service;
import com.jbxie.spaceiotmqtt.common.DataPackage;
import com.jbxie.spaceiotmqtt.common.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import top.ninwoo.bishe.starter.service.NetworkService;
import top.ninwoo.common.entity.DockerContainer;
import javax.xml.crypto.Data;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
public class NodeService {
private static final Logger logger = LoggerFactory.getLogger(NodeService.class);
@Autowired
DockerContainer dockerContainer;
@Autowired
IPService ipService;
/**
* 获取当前节点名
* (需要重写)
*/
public String getName() {
return dockerContainer.getName();
}
/**
* 当前节点接收到数据包进行处理
* 先写一个单向的
* @param dataPackage
*/
private void accept(DataPackage dataPackage) {
String currentName = getName();
int currentHop = getCurrentHop(currentName);
Node node = new Node(currentName);
ServerSocket serverSocket = null;
InputStream inputStream = null;
ObjectInputStream objectInputStream = null;
try {
serverSocket = new ServerSocket(2021);
while (true) {
Socket client = serverSocket.accept();
inputStream = client.getInputStream();
objectInputStream = new ObjectInputStream(new BufferedInputStream(inputStream));
Object object = objectInputStream.readObject();
DataPackage dataPackage1 = (DataPackage) object;
List<String> businessList = node.getBusinessList();
String businessName = dataPackage.getBusinessName();
ArrayList<String> route = dataPackage.getRoute();
route.add("sate" + currentHop);
// 如果计数器仍然等于零或当前节点已经是最终节点,则打印路由信息
// 如果节点业务列表已包含当前传输的业务则终止
// 否则继续传输
if (dataPackage.getCounter() == 0) {
logger.info("传输失败,已超出生命周期:" + dataPackage);
continue;
} else if (dataPackage.getPubIp() == ipService.getIpByAppName(currentName)) {
logger.info("传输成功: " + dataPackage);
continue;
} else if (businessList.contains(businessName)){
logger.info("该节点已处理,该链路传输终止");
continue;
} else {
dataPackage.decrement();
businessList.add(businessName);
node.setBusinessList(businessList);
List<Integer> relativeNodesList = getLink(currentName, dataPackage);
for (int nextNode : relativeNodesList) {
transferPackage(dataPackage, nextNode);
}
continue;
}
}
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
} finally {
try {
if (objectInputStream != null) {
objectInputStream.close();
}
if (inputStream != null) {
inputStream.close();
}
if (serverSocket != null) {
serverSocket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 根据节点名和拓扑获取当前节点连接关系
* @param name
* @param dataPackage
* @return
*/
private List<Integer> getLink(String name, DataPackage dataPackage) {
List<Integer> relativeNodesList = new ArrayList<>();
int[][] nodeLink = dataPackage.getNodeLink();
int currentHop = getCurrentHop(name);
for (int i = 0; i < nodeLink[currentHop - 1].length; i++) {
if (i == 1) {
relativeNodesList.add(i + 1);
}
}
return relativeNodesList;
}
/**
* 获取当前节点序列号
* @param name
* @return
*/
private int getCurrentHop (String name) {
if(name == null || name.length() == 0) {
throw new RuntimeException("节点名序号错误");
}
return Integer.valueOf(name.substring(4, name.length()));
}
/**
* 发送数据包
* @param dataPackage
* @param nextHop
*/
private void transferPackage(DataPackage dataPackage, int nextHop) {
//获取目标节点ip
String node_name = "node" + nextHop;
String IP = ipService.getIpByAppName(node_name);
//发送至下一节点
send(dataPackage, IP, 2021);
}
private void send(DataPackage dataPackage, String ip, int port) {
Socket socket = null;
try {
socket = new Socket(ip, port);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(dataPackage);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (socket != null) {
socket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
#mq配置
#com:
# mqtt:
# host: tcp://192.168.31.198:1883
# clientid: mqtt_client1
# topic: good,test,yes
# username: xjb
# password: 123456
# timeout: 10
# keepalive: 20
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cnf-space-iot</artifactId>
<groupId>top.jbxie</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>space-iot-raft</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>top.ninwoo</groupId>
<artifactId>cnf-client-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.0-jre</version>
</dependency>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>6.13.3</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>hessian</artifactId>
<version>4.0.4</version>
</dependency>
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>bolt</artifactId>
<version>1.6.3</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${springboot.version}</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
## Raft-KV-Storage
这是一个 Java 版本的 Raft(CP) KV 分布式存储实现. 可用于 Raft 初学者深入学习 Raft 协议.
相关文章 http://thinkinjava.cn/2019/01/12/2019-01-12-lu-raft-kv/
为了尽可能的保证数据一致性,该实现的"性能"没有基于 AP 的实现好。
目前实现了 Raft 4 大核心功能的其中 2 个功能.
1. leader 选举
2. 日志复制
3. 成员变更(未测试)
4. 快照压缩(未实现)
## Design
完全是参照 RAFT 论文来写的. 没有任何妥协.
![image](https://user-images.githubusercontent.com/24973360/50371851-b13de880-05fd-11e9-958a-5813b3b6d761.png)
## quick start
#### 验证 "leader 选举"
1. 在 idea 中配置 5 个 application 启动项,配置 main 类为 RaftNodeBootStrap 类, 加入 -DserverPort=8775 -DserverPort=8776 -DserverPort=8777 -DserverPort=8778 -DserverPort=8779
系统配置, 表示分布式环境下的 5 个机器节点.
2. 依次启动 5 个 RaftNodeBootStrap 节点, 端口分别是 8775,8776, 8777, 8778, 8779.
3. 观察控制台, 约 6 秒后, 会发生选举事件,此时,会产生一个 leader. 而 leader 会立刻发送心跳维持自己的地位.
4. 如果leader 的端口是 8775, 使用 idea 关闭 8775 端口,模拟节点挂掉, 大约 15 秒后, 会重新开始选举, 并且会在剩余的 4 个节点中,产生一个新的 leader. 并开始发送心跳日志。
#### 验证"日志复制"
##### 正常状态下
1. 在 idea 中配置 5 个 application 启动项,配置 main 类为 RaftNodeBootStrap 类, 加入 -DserverPort=8775 -DserverPort=8776 -DserverPort=8777 -DserverPort=8778 -DserverPort=8779
2. 依次启动 5 个 RaftNodeBootStrap 节点, 端口分别是 8775,8776, 8777, 8778, 8779.
3. 使用客户端写入 kv 数据.
4. 杀掉所有节点, 使用 junit test 读取每个 rocksDB 的值, 验证每个节点的数据是否一致.
##### 非正常状态下
1. 在 idea 中配置 5 个 application 启动项,配置 main 类为 RaftNodeBootStrap 类, 加入 -DserverPort=8775 -DserverPort=8776 -DserverPort=8777 -DserverPort=8778 -DserverPort=8779
2. 依次启动 5 个 RaftNodeBootStrap 节点, 端口分别是 8775,8776, 8777, 8778, 8779.
3. 使用客户端写入 kv 数据.
4. 杀掉 leader (假设是 8775).
5. 再次写入数据.
6. 重启 8775.
7. 关闭所有节点, 读取 RocksDB 验证数据一致性.
## And
欢迎提交 RP, issue. 加微信一起探讨 Raft。
本人微信:
![image](https://user-images.githubusercontent.com/24973360/50372024-5f975d00-0601-11e9-8247-139e145b1123.png)
## Acknowledgments
感谢 SOFA-Bolt 提供 RPC 网络框架 https://github.com/alipay/sofa-bolt
感谢 rocksDB 提供 KV 存储 https://github.com/facebook/rocksdb
package top.jbxie.raft;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.core.env.Environment;
import top.jbxie.raft.nodeCommon.NodeConfig;
import top.jbxie.raft.util.Node;
import top.jbxie.raft.util.impl.DefaultNode;
import java.util.Arrays;
/**
* -DserverPort=8775
* -DserverPort=8776
* -DserverPort=8777
* -DserverPort=8778
* -DserverPort=8779
*
* 启动类,对自身节点的ip port的包装
*/
@SpringBootApplication
public class RaftNodeBootStrap {
// public static void main(String[] args) {
// SpringApplication.run(RaftNodeBootStrap.class, args);
// }
public static void main(String[] args) throws Throwable {
main0();
}
public static void main0() throws Throwable {
String[] peerAddr = {"localhost:8775","localhost:8776","localhost:8777", "localhost:8778", "localhost:8779"};//集群列表
NodeConfig config = new NodeConfig();
//自身节点地址
config.setSelfAddr(Integer.valueOf(System.getProperty("serverPort")).intValue());
//config.setSelfAddr(8779);
//所有节点地址
config.setPeerAddrs(Arrays.asList(peerAddr));
Node node = DefaultNode.getInstance();
node.setConfig(config);
node.init();
// 为Java应用程序添加退出事件处理
// 通过Runtime.getRuntime().addShutdownHook(Thread hook)方法,事件监听,捕获系统退出消息到来,向Java虚拟机
// 注册一个shutdown钩子事件,这样程序一旦运行到此处,就运行线程hook。
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
node.destroy();
} catch (Throwable throwable) {
throwable.printStackTrace();
}
}));
}
}
package top.jbxie.raft.client;
import lombok.Data;
import java.io.Serializable;
/**
* 客户端发起请求的返回值
*/
@Data
public class ClientKVAck implements Serializable {
Object result; // logEntry.getCommand();
public ClientKVAck(Object result) {
this.result = result;
}
private ClientKVAck(Builder builder) {
setResult(builder.result);
}
public static ClientKVAck ok() {
return new ClientKVAck("ok");
}
public static ClientKVAck fail() {
return new ClientKVAck("fail");
}
public static Builder newBuilder() {
return new Builder();
}
public static final class Builder {
private Object result;
private Builder() {
}
public Builder result(Object val) {
result = val;
return this;
}
public ClientKVAck build() {
return new ClientKVAck(this);
}
}
}
package top.jbxie.raft.client;
import lombok.Data;
import java.io.Serializable;
/**
* 客户端发起的请求
*/
@Data
public class ClientKVReq implements Serializable {
public static int PUT = 0;
public static int GET = 1;
int type;
String key;
String value;
private ClientKVReq(Builder builder) {
setType(builder.type);
setKey(builder.key);
setValue(builder.value);
}
public static Builder newBuilder() {
return new Builder();
}
public enum Type {
PUT(0), GET(1);
int code;
Type(int code) {
this.code = code;
}
public static Type value(int code ) {
for (Type type : values()) {
if (type.code == code) {
return type;
}
}
return null;
}
}
public static final class Builder {
private int type;
private String key;
private String value;
private Builder() {
}
public Builder type(int val) {
type = val;
return this;
}
public Builder key(String val) {
key = val;
return this;
}
public Builder value(String val) {
value = val;
return this;
}
public ClientKVReq build() {
return new ClientKVReq(this);
}
}
}
package top.jbxie.raft.client;
import com.alipay.remoting.exception.RemotingException;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import top.jbxie.raft.current.SleepHelper;
import top.jbxie.raft.entity.LogEntry;
import top.jbxie.raft.rpc.Request;
import top.jbxie.raft.rpc.Response;
import top.jbxie.raft.rpc.RpcClient;
import top.jbxie.raft.rpc.impl.DefaultRpcClient;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
public class RaftClient {
private static final Logger LOGGER = LoggerFactory.getLogger(RaftClient.class);
private final static RpcClient client = new DefaultRpcClient();
static String addr = "localhost:8775";
static List<String> list = Lists.newArrayList("localhost:8775","localhost:8776","localhost:8777", "localhost:8778", "localhost:8779");
public static void main(String[] args) throws RemotingException, InterruptedException {
AtomicLong count = new AtomicLong(5);
for (int i = 3; ; i++) {
try {
int index = (int) (count.incrementAndGet() % list.size());
addr = list.get(index);
ClientKVReq obj = ClientKVReq.newBuilder().key("hello:" + i).value("world:" + i).type(ClientKVReq.PUT).build();
Request<ClientKVReq> r = new Request<>();
r.setObj(obj);
r.setUrl(addr);
r.setCmd(Request.CLIENT_REQ);
Response<String> response;
try {
response = client.send(r);
} catch (Exception e) {
r.setUrl(list.get((int) ((count.incrementAndGet()) % list.size())));
response = client.send(r);
}
LOGGER.info("request content : {}, url : {}, put response : {}", obj.key + "=" + obj.getValue(), r.getUrl(), response.getResult());
SleepHelper.sleep(1000);
obj = ClientKVReq.newBuilder().key("hello:" + i).type(ClientKVReq.GET).build();
addr = list.get(index);
r.setUrl(addr);
r.setObj(obj);
Response<LogEntry> response2;
try {
response2 = client.send(r);
} catch (Exception e) {
r.setUrl(list.get((int) ((count.incrementAndGet()) % list.size())));
response2 = client.send(r);
}
LOGGER.info("request content : {}, url : {}, get response : {}", obj.key + "=" + obj.getValue(), r.getUrl(), response2.getResult());
} catch (Exception e) {
e.printStackTrace();
i = i - 1;
}
SleepHelper.sleep(5000);
}
}
}
package top.jbxie.raft.client;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import top.jbxie.raft.entity.LogEntry;
import top.jbxie.raft.rpc.Request;
import top.jbxie.raft.rpc.Response;
import top.jbxie.raft.rpc.RpcClient;
import top.jbxie.raft.rpc.impl.DefaultRpcClient;
import java.util.List;
public class RaftClient2 {
private static final Logger LOGGER = LoggerFactory.getLogger(RaftClient.class);
private final static RpcClient client = new DefaultRpcClient();
static String addr = "localhost:8778";
static List<String> list3 = Lists.newArrayList("localhost:8777", "localhost:8778", "localhost:8779");
static List<String> list2 = Lists.newArrayList( "localhost:8777", "localhost:8779");
static List<String> list1 = Lists.newArrayList( "localhost:8779");
public static void main(String[] args) throws InterruptedException {
for (int i = 3; ; i++) {
try {
Request<ClientKVReq> r = new Request<>();
int size = list2.size();
ClientKVReq obj = ClientKVReq.newBuilder().key("hello:" + i).type(ClientKVReq.GET).build();
int index = (i) % size;
addr = list2.get(index);
r.setUrl(addr);
r.setObj(obj);
r.setCmd(Request.CLIENT_REQ);
Response<LogEntry> response2 = client.send(r);
LOGGER.info("request content : {}, url : {}, get response : {}", obj.key + "=" + obj.getValue(), r.getUrl(), response2.getResult());
} catch (Exception e) {
e.printStackTrace();
} finally {
Thread.sleep(1000);
}
}
}
}
package top.jbxie.raft.client;
import com.alipay.remoting.exception.RemotingException;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import top.jbxie.raft.current.SleepHelper;
import top.jbxie.raft.entity.LogEntry;
import top.jbxie.raft.rpc.Request;
import top.jbxie.raft.rpc.Response;
import top.jbxie.raft.rpc.RpcClient;
import top.jbxie.raft.rpc.impl.DefaultRpcClient;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
public class RaftClient3 {
private static final Logger LOGGER = LoggerFactory.getLogger(RaftClient3.class);
private final static RpcClient client = new DefaultRpcClient();
static String addr = "localhost:8777";
static List<String> list = Lists.newArrayList("localhost:8777", "localhost:8778", "localhost:8779");
public static void main(String[] args) throws RemotingException, InterruptedException {
AtomicLong count = new AtomicLong(3);
int keyNum = 4;
try {
int index = (int) (count.incrementAndGet() % list.size());
index = 1;
addr = list.get(index);
ClientKVReq obj = ClientKVReq.newBuilder().key("hello:" + keyNum).value("world:" + keyNum).type(ClientKVReq.PUT).build();
Request<ClientKVReq> r = new Request<>();
r.setObj(obj);
r.setUrl(addr);
r.setCmd(Request.CLIENT_REQ);
Response<String> response = null;
try {
response = client.send(r);
} catch (Exception e) {
}
LOGGER.info("request content : {}, url : {}, put response : {}", obj.key + "=" + obj.getValue(), r.getUrl(), response.getResult());
SleepHelper.sleep(1000);
obj = ClientKVReq.newBuilder().key("hello:" + keyNum).type(ClientKVReq.GET).build();
addr = list.get(index);
addr = list.get(index);
r.setUrl(addr);
r.setObj(obj);
Response<LogEntry> response2;
try {
response2 = client.send(r);
} catch (Exception e) {
r.setUrl(list.get((int) ((count.incrementAndGet()) % list.size())));
response2 = client.send(r);
}
if (response.getResult() == null) {
LOGGER.error("request content : {}, url : {}, get response : {}", obj.key + "=" + obj.getValue(), r.getUrl(), response2.getResult());
System.exit(1);
return;
}
LOGGER.info("request content : {}, url : {}, get response : {}", obj.key + "=" + obj.getValue(), r.getUrl(), response2.getResult());
} catch (Exception e) {
e.printStackTrace();
}
System.exit(1);
}
}
package top.jbxie.raft.client;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import top.jbxie.raft.nodeCommon.NodeConfig;
@RestController
public class test {
// @Autowired
// private NodeConfig nodeConfig;
//
@Autowired
public static Environment environment;
// @RequestMapping("/bweather")
// private String getBWeather() {
// return String.format("weather.dt:%s ===== weather.humidity:%s ==== port:%s", nodeConfig.getDt(), nodeConfig.getHumidity(), environment.getProperty("server.port"));
// }
// public static void main(String[] args) {
// System.out.println(System.getProperty("server.port"));;
// }
}
package top.jbxie.raft.clusterService;
import top.jbxie.raft.nodeCommon.Peer;
/**
* 集群配置变更接口.
*/
public interface ClusterMembershipChanges {
/**
* 添加节点.
* @param newPeer
* @return
*/
Result addPeer(Peer newPeer);
/**
* 删除节点.
* @param oldPeer
* @return
*/
Result removePeer(Peer oldPeer);
}
package top.jbxie.raft.clusterService;
import lombok.Getter;
import lombok.Setter;
@Setter
@Getter
public class Result {
public static final int FAIL = 0;
public static final int SUCCESS = 1;
int status;
String leaderHint;
public Result() {
}
public Result(Builder builder) {
setStatus(builder.status);
setLeaderHint(builder.leaderHint);
}
@Override
public String toString() {
return "Result{" +
"status=" + status +
", leaderHint='" + leaderHint + '\'' +
'}';
}
public static Builder newBuilder() {
return new Builder();
}
@Getter
public enum Status {
FAIL(0), SUCCESS(1);
int code;
Status(int code) {
this.code = code;
}
public static Status value(int v) {
for (Status i : values()) {
if (i.code == v) {
return i;
}
}
return null;
}
}
public static final class Builder {
private int status;
private String leaderHint;
private Builder() {
}
public Builder status(int val) {
status = val;
return this;
}
public Builder leaderHint(String val) {
leaderHint = val;
return this;
}
public Result build() {
return new Result(this);
}
}
}
package top.jbxie.raft.clusterService;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class Server {
String address;
}
package top.jbxie.raft.clusterService.impl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import top.jbxie.raft.clusterService.ClusterMembershipChanges;
import top.jbxie.raft.clusterService.Result;
import top.jbxie.raft.entity.LogEntry;
import top.jbxie.raft.nodeCommon.NodeStatus;
import top.jbxie.raft.nodeCommon.Peer;
import top.jbxie.raft.rpc.Request;
import top.jbxie.raft.rpc.Response;
import top.jbxie.raft.util.impl.DefaultNode;
/**
* 集群配置变更接口默认实现.
*/
public class ClusterMembershipChangesImpl implements ClusterMembershipChanges {
private static final Logger LOGGER = LoggerFactory.getLogger(ClusterMembershipChangesImpl.class);
private final DefaultNode node;
public ClusterMembershipChangesImpl(DefaultNode node) {
this.node = node;
}
/** 必须是同步的,一次只能添加一个节点
* @param newPeer*/
@Override
public synchronized Result addPeer(Peer newPeer) {
// 已经存在
if (node.peerSet.getPeersWithOutSelf().contains(newPeer)) {
return new Result();
}
node.peerSet.getPeersWithOutSelf().add(newPeer);
if (node.status == NodeStatus.LEADER) {
node.getNextIndexs().put(newPeer, 0L);
node.getMatchIndexs().put(newPeer, 0L);
// node.nextIndexs.put(newPeer, 0L);
// node.matchIndexs.put(newPeer, 0L);
for (long i = 0; i < node.getLogModule().getLastIndex(); i++) {
LogEntry e = node.getLogModule().read(i);
if (e != null) {
node.replication(newPeer, e);
}
}
for (Peer item : node.peerSet.getPeersWithOutSelf()) {
// TODO 同步到其他节点.
Request request = Request.newBuilder()
.cmd(Request.CHANGE_CONFIG_ADD)
.url(newPeer.getAddr())
.obj(newPeer)
.build();
Response response = node.rpcClient.send(request);
Result result = (Result) response.getResult();
if (result != null && result.getStatus() == Result.Status.SUCCESS.getCode()) {
LOGGER.info("replication config success, peer : {}, newServer : {}", newPeer, newPeer);
} else {
LOGGER.warn("replication config fail, peer : {}, newServer : {}", newPeer, newPeer);
}
}
}
return new Result();
}
/** 必须是同步的,一次只能删除一个节点
* @param oldPeer*/
@Override
public synchronized Result removePeer(Peer oldPeer) {
node.peerSet.getPeersWithOutSelf().remove(oldPeer);
node.getNextIndexs().remove(oldPeer);
node.getMatchIndexs().remove(oldPeer);
return new Result();
}
}
package top.jbxie.raft.current;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* raft线程异常捕获
*/
public class RaftThread extends Thread {
private static final Logger LOGGER = LoggerFactory.getLogger(RaftThread.class);
// 在Thread ApI中提供的UncaughtExceptionHandle,它能检测出某个由于未捕获的异常而终结的情况,有效地防止线程泄露问题。
// uncaughtException(Thread t, Throwable e);
private static final UncaughtExceptionHandler uncaughtExceptionHandler = (t, e)
-> LOGGER.warn("Exception occurred from thread {}", t.getName(), e);
public RaftThread(String threadName, Runnable r) {
super(r, threadName);
setUncaughtExceptionHandler(uncaughtExceptionHandler);
}
}
package top.jbxie.raft.current;
import java.util.concurrent.*;
/**
* raft线程池配置
*/
public class RaftThreadPool {
private static int cup = Runtime.getRuntime().availableProcessors(); // 核心线程数
private static int maxPoolSize = cup * 2; // 最大线程池大小
private static final int queueSize = 1024; // 队列长度
private static final long keepTime = 1000 * 60; // 线程池维护线程所允许的空闲时间,当线程数大于核心时,这是多余空闲线程在终止前等待新任务的最长时间。
private static TimeUnit keepTimeUnit = TimeUnit.MILLISECONDS; // 时间单位---毫秒
// ScheduledExecutorService,是基于线程池设计的定时任务类,每个调度任务都会分配到线程池中的一个线程去执行,也就是说,任务是并发执行,互不影响。
// ScheduledThreadPoolExecutor处理定时任务
private static ScheduledExecutorService ss = new ScheduledThreadPoolExecutor(cup, new NameThreadFactory());
//定义线程池
private static ThreadPoolExecutor te = new RaftThreadPoolExecutor(
cup,
maxPoolSize,
keepTime,
keepTimeUnit,
new LinkedBlockingQueue<>(queueSize), //任务队列
new NameThreadFactory()); // 执行器创建新线程时要使用的工厂
// private static ThreadPoolExecutor getThreadPool() {
// return new RaftThreadPoolExecutor(
// cup,
// maxPoolSize,
// keepTime,
// keepTimeUnit,
// new LinkedBlockingQueue<>(queueSize),
// new NameThreadFactory());
// }
// private static ScheduledExecutorService getScheduled() {
// return new ScheduledThreadPoolExecutor(cup, new NameThreadFactory());
// }
/**
* scheduleAtFixedRate每次执行时间为上一次任务开始起向后推一个时间间隔
* @param r
* @param initDelay 首次执行任务的延迟时间
* @param delay 每次执行任务的间隔时间
*/
public static void scheduleAtFixedRate(Runnable r, long initDelay, long delay) {
ss.scheduleAtFixedRate(r, initDelay, delay, TimeUnit.MILLISECONDS);
}
/**
* scheduleWithFixedDelay每次执行时间为上一次任务结束起向后推一个时间间隔
* @param r
* @param delay
*/
public static void scheduleWithFixedDelay(Runnable r, long delay) {
ss.scheduleWithFixedDelay(r, 0, delay, TimeUnit.MILLISECONDS);
}
@SuppressWarnings("unchecked")
public static <T> Future<T> submit(Callable r) {
return te.submit(r);
}
/**
* 线程提交
* @param r Runnable接口实现类
*/
public static void execute(Runnable r) {
te.execute(r);
}
public static void execute(Runnable r, boolean sync) {
if (sync) {
r.run();
} else {
te.execute(r);
}
}
static class NameThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = new RaftThread("Raft thread", r);
t.setDaemon(true); //守护线程
t.setPriority(5); // 优先级
return t;
}
}
}
package top.jbxie.raft.current;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* raft线程池
*/
public class RaftThreadPoolExecutor extends ThreadPoolExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(RaftThreadPoolExecutor.class);
// 用ThreadLocal作为容器,当每个线程访问这个COST_TIME_WATCH变量时,ThreadLocal会为每个线程提供一份变量,各个线程互不影响。
private static final ThreadLocal<Long> COST_TIME_WATCH = ThreadLocal.withInitial(System::currentTimeMillis);
public RaftThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, RaftThreadPool.NameThreadFactory nameThreadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, nameThreadFactory);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
COST_TIME_WATCH.get();
LOGGER.debug("raft thread pool before Execute");
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
LOGGER.debug("raft thread pool after Execute, cost time : {}", System.currentTimeMillis() - COST_TIME_WATCH.get());
COST_TIME_WATCH.remove();
}
@Override
protected void terminated() {
LOGGER.info("active count : {}, queueSize : {}, poolSize : {}", getActiveCount(), getQueue().size(), getPoolSize());
}
}
package top.jbxie.raft.current;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
public class SleepHelper {
private static final Logger LOGGER = LoggerFactory.getLogger(SleepHelper.class);
/**
* 线程休眠,单位毫秒
* @param ms
*/
public static void sleep(int ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
LOGGER.warn(e.getMessage());
}
}
/**
* 线程休眠,单位秒
* @param seconds
*/
public static void sleep2(int seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
LOGGER.warn(e.getMessage());
}
}
}
package top.jbxie.raft.entity;
import lombok.Getter;
import lombok.Setter;
import top.jbxie.raft.util.Consensus;
import java.util.Arrays;
/**
* 附加日志RPC参数. handlerAppendEntries
* 请求的参数(leader->follwer)
* 功能:日志复制
* @see Consensus#appendEntries(AentryParam)
*/
@Setter
@Getter
public class AentryParam extends BaseParam {
// 领导人的 Id,以便于跟随者重定向请求
String leaderId;
// 新的日志条目紧随之前的索引值
long prevLogIndex;
// prevLogIndex条目的任期号
long preLogTerm;
// 准备存储的日志条目(表示心跳时为空;一次性发送多个是为了提高效率)
LogEntry[] entries;
// 领导人已经提交的日志的索引值
long leaderCommit;
public AentryParam() {
}
private AentryParam(Builder builder) {
setTerm(builder.term);
setServerId(builder.serverId);
setLeaderId(builder.leaderId);
setPrevLogIndex(builder.prevLogIndex);
setPreLogTerm(builder.preLogTerm);
setEntries(builder.entries);
setLeaderCommit(builder.leaderCommit);
}
@Override
public String toString() {
return "AentryParam{" +
"leaderId='" + leaderId + '\'' +
", prevLogIndex=" + prevLogIndex +
", preLogTerm=" + preLogTerm +
", entries=" + Arrays.toString(entries) +
", leaderCommit=" + leaderCommit +
", term=" + term +
", serverId='" + serverId + '\'' +
'}';
}
public static Builder newBuilder() {
return new Builder();
}
public static final class Builder {
private long term;
private String serverId;
private String leaderId;
private long prevLogIndex;
private long preLogTerm;
private LogEntry[] entries;
private long leaderCommit;
private Builder() {
}
public Builder term(long val) {
term = val;
return this;
}
public Builder serverId(String val) {
serverId = val;
return this;
}
public Builder leaderId(String val) {
leaderId = val;
return this;
}
public Builder prevLogIndex(long val) {
prevLogIndex = val;
return this;
}
public Builder preLogTerm(long val) {
preLogTerm = val;
return this;
}
public Builder entries(LogEntry[] val) {
entries = val;
return this;
}
public Builder leaderCommit(long val) {
leaderCommit = val;
return this;
}
public AentryParam build() {
return new AentryParam(this);
}
}
}
package top.jbxie.raft.entity;
import lombok.Data;
import java.io.Serializable;
/**
* 附加 RPC 日志返回值.
* 请求的返回值
* 功能:日志复制
*/
@Data
public class AentryResult implements Serializable {
// 当前的任期号,用于领导人去更新自己
long term;
// 跟随者包含了匹配上prevLogIndex和prevLogTerm的日志时为真
boolean success;
public AentryResult(long term) {
this.term = term;
}
public AentryResult(boolean success) {
this.success = success;
}
public AentryResult(long term, boolean success) {
this.term = term;
this.success = success;
}
private AentryResult(Builder builder) {
setTerm(builder.term);
setSuccess(builder.success);
}
public static AentryResult fail() {
return new AentryResult(false);
}
public static AentryResult ok() {
return new AentryResult(true);
}
public static Builder newBuilder() {
return new Builder();
}
public static final class Builder {
private long term;
private boolean success;
private Builder() {
}
public Builder term(long val) {
term = val;
return this;
}
public Builder success(boolean val) {
success = val;
return this;
}
public AentryResult build() {
return new AentryResult(this);
}
}
}
package top.jbxie.raft.entity;
import lombok.Data;
import java.io.Serializable;
/**
* 请求投票 RPC 参数(候选人)
* 基础共有参数
*/
@Data
public class BaseParam implements Serializable {
// 候选人的任期号
public long term;
// 被请求者 ID(ip:selfAddr)
public String serverId;
}
package top.jbxie.raft.entity;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import java.io.Serializable;
import java.util.Objects;
@Getter
@Setter
@ToString
public class Command implements Serializable {
String key;
String value;
public Command(String key, String value) {
this.key = key;
this.value = value;
}
private Command(Builder builder) {
setKey(builder.key);
setValue(builder.value);
}
public static Builder newBuilder() {
return new Builder();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Command command = (Command) o;
return Objects.equals(key, command.key) &&
Objects.equals(value, command.value);
}
@Override
public int hashCode() {
return Objects.hash(key, value);
}
public static final class Builder {
private String key;
private String value;
private Builder() {
}
public Builder key(String val) {
key = val;
return this;
}
public Builder value(String val) {
value = val;
return this;
}
public Command build() {
return new Command(this);
}
}
}
package top.jbxie.raft.entity;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
import java.util.Objects;
import top.jbxie.raft.util.LogModule;
/**
* 日志条目
*
* @see LogModule
*/
@Getter
@Setter
public class LogEntry implements Serializable, Comparable {
// 日志索引值
private Long index;
// 日志任期号
private long term;
// 日志存储的内容(待执行命令)
private Command command;
public LogEntry() {
}
public LogEntry(long term, Command command) {
this.term = term;
this.command = command;
}
public LogEntry(Long index, long term, Command command) {
this.index = index;
this.term = term;
this.command = command;
}
private LogEntry(Builder builder) {
setIndex(builder.index);
setTerm(builder.term);
setCommand(builder.command);
}
public static Builder newBuilder() {
return new Builder();
}
@Override
public String toString() {
return "{" +
"index=" + index +
", term=" + term +
", command=" + command +
'}';
}
@Override
public int compareTo(Object o) {
if (o == null) {
return -1;
}
if (this.getIndex() > ((LogEntry) o).getIndex()) {
return 1;
}
return -1;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LogEntry logEntry = (LogEntry) o;
return term == logEntry.term &&
Objects.equals(index, logEntry.index) &&
Objects.equals(command, logEntry.command);
}
@Override
public int hashCode() {
return Objects.hash(index, term, command);
}
public static final class Builder {
private Long index;
private long term;
private Command command;
private Builder() {
}
public Builder index(Long val) {
index = val;
return this;
}
public Builder term(long val) {
term = val;
return this;
}
public Builder command(Command val) {
command = val;
return this;
}
public LogEntry build() {
return new LogEntry(this);
}
}
}
package top.jbxie.raft.entity;
import top.jbxie.raft.nodeCommon.Peer;
import java.util.concurrent.Callable;
public class ReplicationFailModel {
static String count = "_count";
static String success = "_success";
public String countKey;
public String successKey;
public Callable callable;
public LogEntry logEntry;
public Peer peer;
public Long offerTime;
public ReplicationFailModel(Callable callable, LogEntry logEntry, Peer peer, Long offerTime) {
this.callable = callable;
this.logEntry = logEntry;
this.peer = peer;
this.offerTime = offerTime;
countKey = logEntry.getCommand().getKey() + count;
successKey = logEntry.getCommand().getKey() + success;
}
private ReplicationFailModel(Builder builder) {
countKey = builder.countKey;
successKey = builder.successKey;
callable = builder.callable;
logEntry = builder.logEntry;
peer = builder.peer;
offerTime = builder.offerTime;
}
public static Builder newBuilder() {
return new Builder();
}
public static final class Builder {
private String countKey;
private String successKey;
private Callable callable;
private LogEntry logEntry;
private Peer peer;
private Long offerTime;
private Builder() {
}
public Builder countKey(String val) {
countKey = val;
return this;
}
public Builder successKey(String val) {
successKey = val;
return this;
}
public Builder callable(Callable val) {
callable = val;
return this;
}
public Builder logEntry(LogEntry val) {
logEntry = val;
return this;
}
public Builder peer(Peer val) {
peer = val;
return this;
}
public Builder offerTime(Long val) {
offerTime = val;
return this;
}
public ReplicationFailModel build() {
return new ReplicationFailModel(this);
}
}
}
package top.jbxie.raft.entity;
import top.jbxie.raft.util.Consensus;
import lombok.Getter;
import lombok.Setter;
/**
* 请求投票RPC参数(候选人).
* 功能:领导人选举
* @see Consensus#requestVote(RvoteParam)
*/
@Getter
@Setter
public class RvoteParam extends BaseParam {
// 请求选票的候选人的 Id(ip:selfAddr)
String candidateId;
// 候选人的最后日志条目的索引值
long lastLogIndex;
// 候选人最后日志条目的任期号
long lastLogTerm;
private RvoteParam(Builder builder) {
setTerm(builder.term);
setServerId(builder.serverId);
setCandidateId(builder.candidateId);
setLastLogIndex(builder.lastLogIndex);
setLastLogTerm(builder.lastLogTerm);
}
@Override
public String toString() {
return "RvoteParam{" +
"candidateId='" + candidateId + '\'' +
", lastLogIndex=" + lastLogIndex +
", lastLogTerm=" + lastLogTerm +
", term=" + term +
", serverId='" + serverId + '\'' +
'}';
}
public static Builder newBuilder() {
return new Builder();
}
public static final class Builder {
private long term;
private String serverId;
private String candidateId;
private long lastLogIndex;
private long lastLogTerm;
private Builder() {
}
public Builder term(long val) {
term = val;
return this;
}
public Builder serverId(String val) {
serverId = val;
return this;
}
public Builder candidateId(String val) {
candidateId = val;
return this;
}
public Builder lastLogIndex(long val) {
lastLogIndex = val;
return this;
}
public Builder lastLogTerm(long val) {
lastLogTerm = val;
return this;
}
public RvoteParam build() {
return new RvoteParam(this);
}
}
}
package top.jbxie.raft.entity;
import lombok.Data;
import java.io.Serializable;
/**
* 候选人请求投票RPC返回值.
* 功能:领导人选举
*/
@Data
public class RvoteResult implements Serializable {
// 当前任期号,以便于候选人去更新自己的任期
long term;
// 候选人赢得了此张选票时为真
boolean voteGranted;
public RvoteResult(boolean voteGranted) {
this.voteGranted = voteGranted;
}
private RvoteResult(Builder builder) {
setTerm(builder.term);
setVoteGranted(builder.voteGranted);
}
public static RvoteResult fail() {
return new RvoteResult(false);
}
public static RvoteResult ok() {
return new RvoteResult(true);
}
public static Builder newBuilder() {
return new Builder();
}
public static final class Builder {
private long term;
private boolean voteGranted;
private Builder() {
}
public Builder term(long term) {
this.term = term;
return this;
}
public Builder voteGranted(boolean voteGranted) {
this.voteGranted = voteGranted;
return this;
}
public RvoteResult build() {
return new RvoteResult(this);
}
}
}
package top.jbxie.raft.exception;
public class RaftNotSupportException extends RuntimeException {
public RaftNotSupportException() {
}
public RaftNotSupportException(String message) {
super(message);
}
}
package top.jbxie.raft.exception;
public class RaftRemotingException extends RuntimeException {
public RaftRemotingException() {
super();
}
public RaftRemotingException(String message) {
super(message);
}
}
package top.jbxie.raft.nodeCommon;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 节点配置
*/
@Data
public class NodeConfig {
// 自身节点地址(目前为端口,后面修改为ip地址)
public int selfAddr;
//所有节点地址
public List<String> peerAddrs;
}
\ No newline at end of file
package top.jbxie.raft.nodeCommon;
import lombok.Getter;
/**
* 节点状态枚举
*/
public interface NodeStatus {
int FOLLOWER = 0;
int CANDIDATE = 1;
int LEADER = 2;
@Getter
enum Enum {
FOLLOWER(0), CANDIDATE(1), LEADER(2);
Enum(int code) {
this.code = code;
}
int code;
public static Enum value(int i) {
for (Enum value : Enum.values()) {
if (value.code == i) {
return value;
}
}
return null;
}
}
}
package top.jbxie.raft.nodeCommon;
import lombok.Getter;
import lombok.Setter;
import java.util.Objects;
/**
* 当前节点
*/
@Getter
@Setter
public class Peer {
/** ip:selfAddr */
private final String addr;
public Peer(String addr) {
this.addr = addr;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Peer peer = (Peer) o;
return Objects.equals(addr, peer.addr);
}
@Override
public int hashCode() {
return Objects.hash(addr);
}
@Override
public String toString() {
return "Peer{" +
"addr='" + addr + '\'' +
'}';
}
}
package top.jbxie.raft.nodeCommon;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/**
* 节点集合. 去重.
*/
public class PeerSet implements Serializable {
//节点集合
private List<Peer> list = new ArrayList<>();
//leader节点
private volatile Peer leader;
// final
private volatile Peer self;
private PeerSet() {
}
public static PeerSet getInstance() {
return PeerSetLazyHolder.INSTANCE;
}
private static class PeerSetLazyHolder {
private static final PeerSet INSTANCE = new PeerSet();
}
public void setSelf(Peer peer) {
self = peer;
}
public Peer getSelf() {
return self;
}
public void addPeer(Peer peer) {
list.add(peer);
}
public void removePeer(Peer peer) {
list.remove(peer);
}
public List<Peer> getPeers() {
return list;
}
public List<Peer> getPeersWithOutSelf() {
List<Peer> list2 = new ArrayList<>(list);
list2.remove(self);
return list2;
}
public Peer getLeader() {
return leader;
}
public void setLeader(Peer peer) {
leader = peer;
}
@Override
public String toString() {
return "PeerSet{" +
"list=" + list +
", leader=" + leader +
", self=" + self +
'}';
}
}
package top.jbxie.raft.rpc;
import com.alipay.remoting.AsyncContext;
import com.alipay.remoting.BizContext;
import com.alipay.remoting.rpc.protocol.AbstractUserProcessor;
import top.jbxie.raft.exception.RaftNotSupportException;
public abstract class RaftUserProcessor<T> extends AbstractUserProcessor<T> {
@Override
public void handleRequest(BizContext bizCtx, AsyncContext asyncCtx, T request) {
throw new RaftNotSupportException(
"Raft Server not support handleRequest(BizContext bizCtx, AsyncContext asyncCtx, T request) ");
}
@Override
public String interest() {
return Request.class.getName();
}
}
package top.jbxie.raft.rpc;
import lombok.Data;
import top.jbxie.raft.client.ClientKVReq;
import top.jbxie.raft.entity.AentryParam;
import top.jbxie.raft.entity.RvoteParam;
import java.io.Serializable;
/**
* 发起请求
* @param <T>
*/
@Data
public class Request<T> implements Serializable {
// 请求投票
public static final int R_VOTE = 0;
// 附加日志
public static final int A_ENTRIES = 1;
// 客户端
public static final int CLIENT_REQ = 2;
// 配置变更. add
public static final int CHANGE_CONFIG_ADD = 3;
// 配置变更. remove
public static final int CHANGE_CONFIG_REMOVE = 4;
// 请求类型
private int cmd = -1;
/** param 请求的参数
* @see AentryParam
* @see RvoteParam
* @see ClientKVReq
* */
private T obj;
String url;
public Request() {
}
public Request(T obj) {
this.obj = obj;
}
public Request(int cmd, T obj, String url) {
this.cmd = cmd;
this.obj = obj;
this.url = url;
}
private Request(Builder builder) {
setCmd(builder.cmd);
setObj((T) builder.obj);
setUrl(builder.url);
}
public static Builder newBuilder() {
return new Builder<>();
}
public final static class Builder<T> {
private int cmd;
private Object obj;
private String url;
private Builder() {
}
public Builder cmd(int val) {
cmd = val;
return this;
}
public Builder obj(Object val) {
obj = val;
return this;
}
public Builder url(String val) {
url = val;
return this;
}
public Request<T> build() {
return new Request<T>(this);
}
}
}
package top.jbxie.raft.rpc;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
/**
* 请求返回结果
* @param <T> AentryResult,RvoteResult
*/
@Getter
@Setter
public class Response<T> implements Serializable {
private T result;
public Response(T result) {
this.result = result;
}
private Response(Builder builder) {
setResult((T) builder.result);
}
public static Response ok() {
return new Response<>("ok");
}
public static Response fail() {
return new Response<>("fail");
}
public static Builder newBuilder() {
return new Builder();
}
@Override
public String toString() {
return "Response{" +
"result=" + result +
'}';
}
public static final class Builder {
private Object result;
private Builder() {
}
public Builder result(Object val) {
result = val;
return this;
}
public Response build() {
return new Response(this);
}
}
}
package top.jbxie.raft.rpc;
public interface RpcClient {
Response send(Request request);
}
package top.jbxie.raft.rpc;
public interface RpcServer {
void start();
void stop();
Response handlerRequest(Request request);
}
package top.jbxie.raft.rpc.impl;
import com.alipay.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import top.jbxie.raft.exception.RaftRemotingException;
import top.jbxie.raft.rpc.Request;
import top.jbxie.raft.rpc.Response;
import top.jbxie.raft.rpc.RpcClient;
public class DefaultRpcClient implements RpcClient {
public static Logger logger = LoggerFactory.getLogger(DefaultRpcClient.class.getName());
private final static com.alipay.remoting.rpc.RpcClient CLIENT = new com.alipay.remoting.rpc.RpcClient();
static {
CLIENT.init();
}
@Override
public Response send(Request request) {
Response result = null;
try {
result = (Response) CLIENT.invokeSync(request.getUrl(), request, 200000);
} catch (RemotingException e) {
e.printStackTrace();
logger.info("rpc RaftRemotingException ");
throw new RaftRemotingException();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
return result;
}
}
}
package top.jbxie.raft.rpc.impl;
import com.alipay.remoting.BizContext;
import top.jbxie.raft.client.ClientKVReq;
import top.jbxie.raft.clusterService.ClusterMembershipChanges;
import top.jbxie.raft.entity.AentryParam;
import top.jbxie.raft.entity.RvoteParam;
import top.jbxie.raft.nodeCommon.Peer;
import top.jbxie.raft.rpc.RaftUserProcessor;
import top.jbxie.raft.rpc.Request;
import top.jbxie.raft.rpc.Response;
import top.jbxie.raft.rpc.RpcServer;
import top.jbxie.raft.util.impl.DefaultNode;
public class DefaultRpcServer implements RpcServer {
private volatile boolean flag;
private DefaultNode node;
private com.alipay.remoting.rpc.RpcServer rpcServer;
public DefaultRpcServer(String ip, int port, DefaultNode node) {
if (flag) {
return;
}
synchronized (this) {
if (flag) {
return;
}
rpcServer = new com.alipay.remoting.rpc.RpcServer(ip, port);
rpcServer.registerUserProcessor(new RaftUserProcessor<Request>() {
@Override
public Object handleRequest(BizContext bizCtx, Request request) throws Exception {
return handlerRequest(request);
}
});
this.node = node;
flag = true;
}
}
public DefaultRpcServer(int port, DefaultNode node) {
if (flag) {
return;
}
synchronized (this) {
if (flag) {
return;
}
rpcServer = new com.alipay.remoting.rpc.RpcServer(port, false, false);
rpcServer.registerUserProcessor(new RaftUserProcessor<Request>() {
@Override
public Object handleRequest(BizContext bizCtx, Request request) throws Exception {
return handlerRequest(request);
}
});
this.node = node;
flag = true;
}
}
@Override
public void start() {
rpcServer.start();
}
@Override
public void stop() {
rpcServer.stop();
}
@Override
public Response handlerRequest(Request request) {
if (request.getCmd() == Request.R_VOTE) {
return new Response(node.handlerRequestVote((RvoteParam) request.getObj()));
} else if (request.getCmd() == Request.A_ENTRIES) {
return new Response(node.handlerAppendEntries((AentryParam) request.getObj()));
} else if (request.getCmd() == Request.CLIENT_REQ) {
return new Response(node.handlerClientRequest((ClientKVReq) request.getObj()));
} else if (request.getCmd() == Request.CHANGE_CONFIG_REMOVE) {
return new Response(((ClusterMembershipChanges) node).removePeer((Peer) request.getObj()));
} else if (request.getCmd() == Request.CHANGE_CONFIG_ADD) {
return new Response(((ClusterMembershipChanges) node).addPeer((Peer) request.getObj()));
}
return null;
}
}
package top.jbxie.raft.util;
import top.jbxie.raft.entity.AentryParam;
import top.jbxie.raft.entity.AentryResult;
import top.jbxie.raft.entity.RvoteParam;
import top.jbxie.raft.entity.RvoteResult;
/**
* raft一致性模块
*/
public interface Consensus {
/**
* 功能:请求投票 RPC
*
* 接收者实现:
*
* 如果term < currentTerm返回 false (5.2 节)
* 如果 votedFor 为空或者就是 candidateId,并且候选人的日志至少和自己一样新,那么就投票给他(5.2 节,5.4 节)
* @return
*/
RvoteResult requestVote(RvoteParam param);
/**
* 功能:附加日志(多个日志,为了提高效率) RPC
*
* 接收者实现:
*
* 如果 term < currentTerm 就返回 false (5.1 节)
* 如果日志在 prevLogIndex 位置处的日志条目的任期号和 prevLogTerm 不匹配,则返回 false (5.3 节)
* 如果已经存在的日志条目和新的产生冲突(索引值相同但是任期号不同),删除这一条和之后所有的 (5.3 节)
* 附加任何在已有的日志中不存在的条目
* 如果 leaderCommit > commitIndex,令 commitIndex 等于 leaderCommit 和 新日志条目索引值中较小的一个
* @return
*/
AentryResult appendEntries(AentryParam param);
}
package top.jbxie.raft.util;
public interface LifeCycle {
void init() throws Throwable;
void destroy() throws Throwable;
}
package top.jbxie.raft.util;
import top.jbxie.raft.entity.LogEntry;
/**
* @see top.jbxie.raft.entity.LogEntry;
*/
public interface LogModule {
void write(LogEntry logEntry);
LogEntry read(Long index);
void removeOnStartIndex(Long startIndex);
LogEntry getLast(); //获取最后一条日志条目
Long getLastIndex();
}
package top.jbxie.raft.util;
public class LongConvert {
public static long convert(Long l) {
if (l == null) {
return 0;
}
return l;
}
}
package top.jbxie.raft.util;
import top.jbxie.raft.client.ClientKVAck;
import top.jbxie.raft.client.ClientKVReq;
import top.jbxie.raft.entity.*;
import top.jbxie.raft.nodeCommon.NodeConfig;
/**
* 节点功能
*/
public interface Node<T> extends LifeCycle {
/**
* 设置配置文件.
* @param config
*/
void setConfig(NodeConfig config);
/**
* 处理请求投票 RPC.
* 请求投票RPC参数RvoteParam
* 请求投票RPC返回值RvoteResult(候选人)
* 功能:领导者选举
* @param param
* @return
*/
RvoteResult handlerRequestVote(RvoteParam param);
/**
* 处理附加日志请求.
* 功能:日志复制
* @param param
* @return
*/
AentryResult handlerAppendEntries(AentryParam param);
/**
* 处理客户端请求.
*
* @param request
* @return
*/
ClientKVAck handlerClientRequest(ClientKVReq request);
/**
* 客户端发起的请求转发给 leader 节点.
* @param request
* @return
*/
ClientKVAck redirect(ClientKVReq request);
}
package top.jbxie.raft.util;
import top.jbxie.raft.entity.LogEntry;
/**
* 状态机接口
*/
public interface StateMachine {
/**
* 将数据应用到状态机.
*
* 原则上,只需这一个方法(apply). 其他的方法是为了更方便的使用状态机.
* @param logEntry 日志中的数据.
*/
void apply(LogEntry logEntry);
LogEntry get(String key);
String getString(String key);
void setString(String key, String value);
void delString(String... key);
}
package top.jbxie.raft.util.impl;
import io.netty.util.internal.StringUtil;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import top.jbxie.raft.entity.*;
import top.jbxie.raft.nodeCommon.NodeStatus;
import top.jbxie.raft.nodeCommon.Peer;
import top.jbxie.raft.util.Consensus;
import java.util.concurrent.locks.ReentrantLock;
/**
* 默认一致性模块实现
*/
@Data
public class DefaultConsensus implements Consensus {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultConsensus.class);
public final DefaultNode node;
public final ReentrantLock voteLock = new ReentrantLock();
public final ReentrantLock appendLock = new ReentrantLock();
public DefaultConsensus(DefaultNode node) {
this.node = node;
}
/**
* 功能:请求投票 RPC
* 接收者实现:
* 如果 param.term < node.currentTerm 返回 false (5.2 节)
* 如果 votedFor 为空或者就是 candidateId,并且候选人的日志至少和自己一样新,那么就投票给他(5.2 节,5.4 节)
*
* @param param
* @return
*/
@Override
public RvoteResult requestVote(RvoteParam param) {
try {
RvoteResult.Builder builder = RvoteResult.newBuilder();
if (!voteLock.tryLock()) {
return builder.term(node.getCurrentTerm()).voteGranted(false).build();
}
// 对方任期没有自己新
if (param.getTerm() < node.getCurrentTerm()) {
return builder.term(node.getCurrentTerm()).voteGranted(false).build();
}
// (当前节点并没有投票 或者 已经投票过了且是对方节点) && 对方日志和自己一样新
LOGGER.info("node {} current vote for [{}], param candidateId : {}", node.peerSet.getSelf(), node.getVotedFor(), param.getCandidateId());
LOGGER.info("node {} current term {}, peer term : {}", node.peerSet.getSelf(), node.getCurrentTerm(), param.getTerm());
if ((StringUtil.isNullOrEmpty(node.getVotedFor())) || node.getVotedFor().equals(param.getCandidateId())) {
if (node.getLogModule().getLast() != null) {
// 先比较term, term大的优先级大
if (node.getLogModule().getLast().getTerm() > param.getLastLogTerm()) {
return RvoteResult.fail();
}
// term >= 自己,再比较lastLongIndex
if (node.getLogModule().getLastIndex() > param.getLastLogIndex()) {
return RvoteResult.fail();
}
}
//切换状态
node.status = NodeStatus.FOLLOWER;
//更新
node.peerSet.setLeader(new Peer(param.getCandidateId()));
node.setCurrentTerm(param.getTerm());
node.setVotedFor(param.serverId);
//返回成功
return builder.term(node.currentTerm).voteGranted(true).build();
}
return builder.term(node.currentTerm).voteGranted(false).build();
} finally {
voteLock.unlock();
}
}
/**
* 功能:附加日志(多个日志,为了提高效率) RPC
* 接收者实现:
* 如果 term < currentTerm 就返回 false (5.1 节)
* 如果日志在 prevLogIndex 位置处的日志条目的任期号和 prevLogTerm 不匹配,则返回 false (5.3 节)
* 如果已经存在的日志条目和新的产生冲突(索引值相同但是任期号不同),删除这一条和之后所有的 (5.3 节)
* 附加任何在已有的日志中不存在的条目
* 如果 leaderCommit > commitIndex,令 commitIndex 等于 leaderCommit 和 新日志条目索引值中较小的一个
* @param param
* @return
*/
@Override
public AentryResult appendEntries(AentryParam param) {
AentryResult result = AentryResult.fail();
try {
if (!appendLock.tryLock()) {
return result;
}
result.setTerm(node.getCurrentTerm());
//不够格
if (param.getTerm() < node.getCurrentTerm()) {
return result;
}
node.preHeartBeatTime = System.currentTimeMillis();
node.preElectionTime = System.currentTimeMillis();
node.peerSet.setLeader(new Peer(param.getLeaderId()));
// 够格
if (param.getTerm() >= node.getCurrentTerm()) {
LOGGER.debug("node {} become FOLLOWER, currentTerm : {}, param Term : {}, param serverId",
node.peerSet.getSelf(), node.currentTerm, param.getTerm(), param.getServerId());
node.status = NodeStatus.FOLLOWER;
}
node.setCurrentTerm(param.getTerm());
// 心跳
if (param.getEntries()== null || param.getEntries().length == 0) {
LOGGER.info("node {} append heartbeat success , he's term : {}, my term : {}",
param.getLeaderId(), param.getTerm(), node.getCurrentTerm());
return AentryResult.newBuilder().term(node.getCurrentTerm()).success(true).build();
}
// 真实日志
// 第一次
if (node.getLogModule().getLastIndex() != 0 && param.getPrevLogIndex() != 0) {
LogEntry logEntry;
if ((logEntry = node.getLogModule().read(param.getPrevLogIndex())) != null) {
// 如果日志在 prevLogIndex 位置处的日志条目的任期号和 prevLogTerm 不匹配,则返回 false
// 需要减小 nextIndex 重试.
if (logEntry.getTerm() != param.getPreLogTerm()) {
return result;
}
} else {
// index 不对, 需要递减 nextIndex 重试.
return result;
}
}
// 如果已经存在的日志条目和新的产生冲突(索引值相同但是任期号不同),删除这一条和之后所有的
LogEntry existLog = node.getLogModule().read((param.getPrevLogIndex() + 1));
if (existLog != null && existLog.getTerm() != param.getEntries()[0].getTerm()) {
// 删除这一条和之后所有的, 然后写入日志和状态机.
node.getLogModule().removeOnStartIndex(param.getPrevLogIndex() + 1);
} else if (existLog != null) {
// 已经有日志了, 不能重复写入.
result.setSuccess(true);
return result;
}
// 写进日志并且应用到状态机
for (LogEntry entry : param.getEntries()) {
node.getLogModule().write(entry);
node.stateMachine.apply(entry);
result.setSuccess(true);
}
//如果 leaderCommit > commitIndex,令 commitIndex 等于 leaderCommit 和 新日志条目索引值中较小的一个
if (param.getLeaderCommit() > node.getCommitIndex()) {
int commitIndex = (int) Math.min(param.getLeaderCommit(), node.getLogModule().getLastIndex());
node.setCommitIndex(commitIndex);
node.setLastApplied(commitIndex);
}
result.setTerm(node.getCurrentTerm());
node.status = NodeStatus.FOLLOWER;
// TODO, 是否应当在成功回复之后, 才正式提交? 防止 leader "等待回复"过程中 挂掉.
return result;
} finally {
appendLock.unlock();
}
}
}
package top.jbxie.raft.util.impl;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import top.jbxie.raft.entity.LogEntry;
import top.jbxie.raft.nodeCommon.NodeConfig;
import top.jbxie.raft.util.LogModule;
import java.io.File;
import java.util.concurrent.locks.ReentrantLock;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
/**
* 默认的日志实现. 日志模块不关心 key, 只关心 index.
*/
@Getter
@Setter
public class DefaultLogModule implements LogModule {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultLogModule.class);
/** public just for test */
public static String dbDir;
public static String logsDir;
private static RocksDB logDb;
public final static byte[] LAST_INDEX_KEY = "LAST_INDEX_KEY".getBytes();
ReentrantLock lock = new ReentrantLock();
static {
if (dbDir == null) {
dbDir = "./rocksDB-raft/" + System.getProperty("serverPort");
}
if (logsDir == null) {
logsDir = dbDir + "/logModule";
}
RocksDB.loadLibrary(); // 因为RocksDB是由C++编写的,在Java中使用首先需要加载Native库, 默认这个方法会加压一个共享库到java.io.tmpdir
}
private DefaultLogModule() {
// 1. 打开数据库
// 1.1 创建数据库配置
Options options = new Options();
// 1.2 配置当数据库不存在时自动创建
options.setCreateIfMissing(true);
// 1.3 创建存储位置
File file = new File(logsDir);
boolean success = false;
if (!file.exists()) {
success = file.mkdirs();
}
if (success) {
LOGGER.warn("make a new dir : " + logsDir);
}
try {
// 1.4 打开数据库。因为RocksDB默认是保存在本地磁盘,所以需要指定位置
logDb = RocksDB.open(options, logsDir);
} catch (RocksDBException e) {
LOGGER.warn(e.getMessage());
}
}
public static DefaultLogModule getInstance() {
return DefaultLogsLazyHolder.INSTANCE;
}
private static class DefaultLogsLazyHolder {
private static final DefaultLogModule INSTANCE = new DefaultLogModule();
}
/**
* logEntry 的 index 就是 key. 严格保证递增.
*
* @param logEntry
*/
@Override
public void write(LogEntry logEntry) {
boolean success = false;
try {
lock.tryLock(3000, MILLISECONDS);
logEntry.setIndex(getLastIndex() + 1);
// 2. 写入数据
// RocksDB都是以字节流的方式写入数据库中,所以我们需要将字符串转换为字节流再写入。
// 调用put方法写入数据
logDb.put(logEntry.getIndex().toString().getBytes(), JSON.toJSONBytes(logEntry));
success = true;
LOGGER.info("DefaultLogModule write rocksDB success, logEntry info : [{}]", logEntry);
} catch (RocksDBException | InterruptedException e) {
LOGGER.warn(e.getMessage());
} finally {
if (success) {
updateLastIndex(logEntry.getIndex());
}
lock.unlock();
}
}
@Override
public LogEntry read(Long index) {
try {
// 3. 调用get方法读取数据
byte[] result = logDb.get(convert(index));
if (result == null) {
return null;
}
return JSON.parseObject(result, LogEntry.class);
} catch (RocksDBException e) {
e.printStackTrace();
}
return null;
}
@Override
public void removeOnStartIndex(Long startIndex) {
boolean success = false;
int count = 0;
try {
lock.tryLock(3000, MILLISECONDS);
for (long i = startIndex; i <= getLastIndex(); i++) {
// 4. 移除数据
logDb.delete(String.valueOf(i).getBytes());
++count;
}
success = true;
LOGGER.warn("rocksDB removeOnStartIndex success, count={} startIndex={}, lastIndex={}", count, startIndex, getLastIndex());
} catch (InterruptedException | RocksDBException e) {
LOGGER.warn(e.getMessage());
} finally {
if (success) {
updateLastIndex(getLastIndex() - count);
}
lock.unlock();
}
}
@Override
public LogEntry getLast() {
try {
byte[] result = logDb.get(convert(getLastIndex()));
if (result == null) {
return null;
}
return JSON.parseObject(result, LogEntry.class);
} catch (RocksDBException e) {
e.printStackTrace();
}
return null;
}
@Override
public Long getLastIndex() {
byte[] lastIndex = "-1".getBytes();
try {
lastIndex = logDb.get(LAST_INDEX_KEY);
if (lastIndex == null) {
lastIndex = "-1".getBytes();
}
} catch (RocksDBException e) {
e.printStackTrace();
}
return Long.valueOf(new String(lastIndex));
}
private byte[] convert(Long key) {
return key.toString().getBytes();
}
// 更新最新索引值
private void updateLastIndex(Long index) {
try {
// overWrite
logDb.put(LAST_INDEX_KEY, index.toString().getBytes());
} catch (RocksDBException e) {
e.printStackTrace();
}
}
}
package top.jbxie.raft.util.impl;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import top.jbxie.raft.client.ClientKVAck;
import top.jbxie.raft.client.ClientKVReq;
import top.jbxie.raft.clusterService.ClusterMembershipChanges;
import top.jbxie.raft.clusterService.Result;
import top.jbxie.raft.clusterService.impl.ClusterMembershipChangesImpl;
import top.jbxie.raft.current.RaftThreadPool;
import top.jbxie.raft.entity.*;
import top.jbxie.raft.exception.RaftRemotingException;
import top.jbxie.raft.nodeCommon.NodeConfig;
import top.jbxie.raft.nodeCommon.NodeStatus;
import top.jbxie.raft.nodeCommon.Peer;
import top.jbxie.raft.nodeCommon.PeerSet;
import top.jbxie.raft.rpc.Request;
import top.jbxie.raft.rpc.Response;
import top.jbxie.raft.rpc.RpcClient;
import top.jbxie.raft.rpc.RpcServer;
import top.jbxie.raft.rpc.impl.DefaultRpcClient;
import top.jbxie.raft.rpc.impl.DefaultRpcServer;
import top.jbxie.raft.util.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static top.jbxie.raft.nodeCommon.NodeStatus.LEADER;
/**
* 抽象机器节点, 初始为 follower, 角色随时变化.
*/
@Data
public class DefaultNode<T> implements Node<T>, LifeCycle, ClusterMembershipChanges {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultNode.class);
// 选举时间间隔基数(可修改为时隙时长)
public volatile long electionTime = 120 * 1000;
//上次选举时间
public volatile long preElectionTime = 0;
// 心跳间隔数
public final long heartBeatTick = 30 * 1000;
// 上次心跳时间戳
public volatile long preHeartBeatTime = 0;
private HeartBeatTask heartBeatTask = new HeartBeatTask();
private ElectionTask electionTask = new ElectionTask();
private ReplicationFailQueueConsumer replicationFailQueueConsumer = new ReplicationFailQueueConsumer();
private LinkedBlockingQueue<ReplicationFailModel> replicationFailQueue = new LinkedBlockingQueue<>(2048);
/**
* 节点当前状态
* @see NodeStatus
*/
public volatile int status = NodeStatus.FOLLOWER;
public PeerSet peerSet;
/* ============ 所有服务器上持久存在的 ============= */
// 服务器最后一次知道的任期号(初始化为 0,持续递增)
volatile long currentTerm = 0;
// 在当前获得选票的候选人的Id
volatile String votedFor;
// 日志条目集;每一个条目包含一个用户状态机执行的指令,和收到时的任期号
LogModule logModule;
/* ============ 所有服务器上经常变的 ============= */
// 已知的最大的已经被提交的日志条目的索引值
volatile long commitIndex;
// 最后被应用到状态机的日志条目索引值(初始化为 0,持续递增)
volatile long lastApplied = 0;
/* ========== 在领导人里经常改变的(选举后重新初始化) ================== */
//对于每一个服务器,需要发送给他的下一个日志条目的索引值(初始化为领导人最后索引值加一)
Map<Peer, Long> nextIndexs;
// 对于每一个服务器,已经复制给他的日志的最高索引值
Map<Peer, Long> matchIndexs;
/* ======================================== */
public volatile boolean started;
public NodeConfig config;
public static RpcServer RPC_SERVER;
public RpcClient rpcClient = new DefaultRpcClient();
public StateMachine stateMachine;
/* ============================== */
/** 一致性模块实现 */
Consensus consensus;
ClusterMembershipChanges delegate;
/* ============================== */
private DefaultNode() {
}
public static DefaultNode getInstance() {
return DefaultNodeLazyHolder.INSTANCE;
}
private static class DefaultNodeLazyHolder {
private static final DefaultNode INSTANCE = new DefaultNode();
}
/* =============================Node接口================================== */
/**
* 设置配置文件.PeerSet节点集合
* 需修改
* @param config
*/
@Override
public void setConfig(NodeConfig config) {
this.config = config;
stateMachine = DefaultStateMachine.getInstance();
logModule = DefaultLogModule.getInstance();
peerSet = PeerSet.getInstance();
for (String s : config.getPeerAddrs()) {
Peer peer = new Peer(s);
peerSet.addPeer(peer);
if (s.equals("localhost:" + config.getSelfAddr())) {
peerSet.setSelf(peer);
}
}
// 开启了一个RPCServer,接收投票RPCVoteRequest, 和附加日志RPCLogAppendRequest(心跳也是日志附加Request,只是日志内容为null)
RPC_SERVER = new DefaultRpcServer(config.selfAddr, this);
}
/**
* 处理请求投票 RPC.
* 请求投票RPC参数RvoteParam
* 请求投票RPC返回值RvoteResult(候选人)
* 功能:领导者选举
*
* @param param
* @return
*/
@Override
public RvoteResult handlerRequestVote(RvoteParam param) {
LOGGER.warn("handlerRequestVote will be invoke, param info : {}", param);
return consensus.requestVote(param);
}
/**
* 处理附加日志请求.
* 功能:日志复制
*
* @param param
* @return
*/
@Override
public AentryResult handlerAppendEntries(AentryParam param) {
if (param.getEntries() != null) {
LOGGER.warn("node receive node {} append entry, entry content = {}", param.getLeaderId(), param.getEntries());
}
return consensus.appendEntries(param);
}
/**
* 处理客户端请求.
* 客户端的每一个请求都包含一条被复制状态机执行的指令。
* 领导人把这条指令作为一条新的日志条目附加到日志中去,然后并行的发起附加条目RPCs给其他的服务器,让他们复制这条日志条目。
* 当这条日志条目被安全的复制(下面会介绍),领导人会应用这条日志条目到它的状态机中然后把执行的结果返回给客户端。
* 如果跟随者崩溃或者运行缓慢,再或者网络丢包,
* 领导人会不断的重复尝试附加日志条目 RPCs (尽管已经回复了客户端)直到所有的跟随者都最终存储了所有的日志条目。
* @param request
* @return
*/
@Override
public ClientKVAck handlerClientRequest(ClientKVReq request) {
LOGGER.warn("handlerClientRequest handler {} operation, and key : [{}], value : [{}]",
ClientKVReq.Type.value(request.getType()), request.getKey(), request.getValue());
if (status != LEADER) {
LOGGER.warn("I am not leader , only invoke redirect method, leader addr : {}, my addr : {}",
peerSet.getLeader(), peerSet.getSelf().getAddr());
return redirect(request);
}
if (request.getType() == ClientKVReq.GET) {
LogEntry logEntry = stateMachine.get(request.getKey());
if (logEntry != null) {
return new ClientKVAck(logEntry.getCommand());
}
return new ClientKVAck(null);
}
LogEntry logEntry = LogEntry.newBuilder()
.command(Command.newBuilder().
key(request.getKey()).
value(request.getValue()).
build())
.term(currentTerm)
.build();
// 预提交到本地日志, TODO 预提交
logModule.write(logEntry);
LOGGER.info("write logModule success, logEntry info : {}, log index : {}", logEntry, logEntry.getIndex());
final AtomicInteger success = new AtomicInteger(0);
List<Future<Boolean>> futureList = new CopyOnWriteArrayList<>(); //CopyOnWriteArrayList这是一个ArrayList的线程安全的变体
int count = 0;
// 复制到其他机器
for (Peer peer : peerSet.getPeersWithOutSelf()) {
// TODO check self and RaftThreadPool
count++;
// 并行发起 RPC 复制.
futureList.add(replication(peer, logEntry));
}
CountDownLatch latch = new CountDownLatch(futureList.size());
List<Boolean> resultList = new CopyOnWriteArrayList<>();
getRPCAppendResult(futureList, latch, resultList);
try {
latch.await(4000, MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (Boolean aBoolean : resultList) {
if (aBoolean) {
success.incrementAndGet();
}
}
// 如果存在一个满足N > commitIndex的 N,并且大多数的matchIndex[i] ≥ N成立,
// 并且log[N].term == currentTerm成立,那么令 commitIndex 等于这个 N (5.3 和 5.4 节)
List<Long> matchIndexList = new ArrayList<Long>(matchIndexs.values());
// 小于 2, 没有意义
int median = 0;
if (matchIndexList.size() >= 2) {
Collections.sort(matchIndexList);
median = matchIndexList.size() / 2;
}
Long N = matchIndexList.get(median);
if (N > commitIndex) {
LogEntry entry = logModule.read(N);
if (entry != null && entry.getTerm() == currentTerm) {
commitIndex = N;
}
}
// 响应客户端(成功一半)
if (success.get() >= (count / 2)) {
// 更新
commitIndex = logEntry.getIndex();
// 应用到状态机
stateMachine.apply(logEntry);
lastApplied = commitIndex;
LOGGER.info("success apply local state machine, logEntry info : {}", logEntry);
// 返回成功.
return ClientKVAck.ok();
} else {
logModule.removeOnStartIndex(logEntry.getIndex());
LOGGER.warn("fail apply local state machine, logEntry info : {}", logEntry);
// TODO 不应用到状态机,但已经记录到日志中.由定时任务从重试队列取出,然后重复尝试,当达到条件时,应用到状态机.
// 这里应该返回错误, 因为没有成功复制过半机器.
return ClientKVAck.fail();
}
}
private void getRPCAppendResult(List<Future<Boolean>> futureList, CountDownLatch latch, List<Boolean> resultList) {
for (Future<Boolean> future : futureList) {
RaftThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
resultList.add(future.get(3000, MILLISECONDS));
} catch (CancellationException | TimeoutException | ExecutionException | InterruptedException e) {
e.printStackTrace();
resultList.add(false);
} finally {
latch.countDown();
}
}
});
}
}
/** 复制到其他机器 */
public Future<Boolean> replication(Peer peer, LogEntry entry) {
return RaftThreadPool.submit(new Callable() {
@Override
public Boolean call() throws Exception {
long start = System.currentTimeMillis(), end = start;
// 20 秒重试时间
while (end - start < 20 * 1000L) {
AentryParam aentryParam = new AentryParam();
aentryParam.setTerm(currentTerm);
aentryParam.setServerId(peer.getAddr());
aentryParam.setLeaderId(peerSet.getSelf().getAddr());
aentryParam.setLeaderCommit(commitIndex);
// 以我这边为准, 这个行为通常是成为 leader 后,首次进行 RPC 才有意义.
Long nextIndex = nextIndexs.get(peer);
LinkedList<LogEntry> logEntries = new LinkedList<>();
if (entry.getIndex() >= nextIndex) {
for (long i = nextIndex; i <= entry.getIndex(); i++) {
LogEntry l = logModule.read(i);
if (l != null) {
logEntries.add(l);
}
}
} else {
logEntries.add(entry);
}
// 最小的那个日志.
LogEntry preLog = getPreLog(logEntries.getFirst());
aentryParam.setPreLogTerm(preLog.getTerm());
aentryParam.setPrevLogIndex(preLog.getIndex());
aentryParam.setEntries(logEntries.toArray(new LogEntry[0]));
Request request = Request.newBuilder()
.cmd(Request.A_ENTRIES)
.obj(aentryParam)
.url(peer.getAddr())
.build();
try {
Response response = rpcClient.send(request);
if (response == null) {
return false;
}
AentryResult result = (AentryResult) response.getResult();
if (result != null && result.isSuccess()) {
LOGGER.info("append follower entry success , follower=[{}], entry=[{}]", peer, aentryParam.getEntries());
// update 这两个追踪值
nextIndexs.put(peer, entry.getIndex() + 1);
matchIndexs.put(peer, entry.getIndex());
return true;
} else if (result != null) {
// 对方比我大
if (result.getTerm() > currentTerm) {
LOGGER.warn("follower [{}] term [{}] than more self, and my term = [{}], so, I will become follower",
peer, result.getTerm(), currentTerm);
currentTerm = result.getTerm();
// 认怂, 变成跟随者
status = NodeStatus.FOLLOWER;
return false;
} // 没我大, 却失败了,说明 index 不对.或者 term 不对.
else {
// 递减
if (nextIndex == 0) {
nextIndex = 1L;
}
nextIndexs.put(peer, nextIndex - 1);
LOGGER.warn("follower {} nextIndex not match, will reduce nextIndex and retry RPC append, nextIndex : [{}]", peer.getAddr(),
nextIndex);
// 重来, 直到成功.
}
}
end = System.currentTimeMillis();
} catch (Exception e) {
LOGGER.warn(e.getMessage(), e);
// TODO 到底要不要放队列重试?
// ReplicationFailModel model = ReplicationFailModel.newBuilder()
// .callable(this)
// .logEntry(entry)
// .peer(peer)
// .offerTime(System.currentTimeMillis())
// .build();
// replicationFailQueue.offer(model);
return false;
}
}
// 超时了,没办法了
return false;
}
});
}
private LogEntry getPreLog(LogEntry logEntry) {
LogEntry entry = logModule.read(logEntry.getIndex() - 1);
if (entry == null) {
LOGGER.warn("get perLog is null , parameter logEntry : {}", logEntry);
entry = LogEntry.newBuilder().index(0L).term(0).command(null).build();
}
return entry;
}
class ReplicationFailQueueConsumer implements Runnable {
/** 一分钟 */
long intervalTime = 1000 * 60;
@Override
public void run() {
for (; ; ) {
try {
ReplicationFailModel model = replicationFailQueue.take();
if (status != LEADER) {
// 应该清空?
replicationFailQueue.clear();
continue;
}
LOGGER.warn("replication Fail Queue Consumer take a task, will be retry replication, content detail : [{}]", model.logEntry);
long offerTime = model.offerTime;
if (System.currentTimeMillis() - offerTime > intervalTime) {
LOGGER.warn("replication Fail event Queue maybe full or handler slow");
}
Callable callable = model.callable;
Future<Boolean> future = RaftThreadPool.submit(callable);
Boolean r = future.get(3000, MILLISECONDS);
// 重试成功.
if (r) {
// 可能有资格应用到状态机.
tryApplyStateMachine(model);
}
} catch (InterruptedException e) {
// ignore
} catch (ExecutionException | TimeoutException e) {
LOGGER.warn(e.getMessage());
}
}
}
}
private void tryApplyStateMachine(ReplicationFailModel model) {
String success = stateMachine.getString(model.successKey);
stateMachine.setString(model.successKey, String.valueOf(Integer.valueOf(success) + 1));
String count = stateMachine.getString(model.countKey);
if (Integer.valueOf(success) >= Integer.valueOf(count) / 2) {
stateMachine.apply(model.logEntry);
stateMachine.delString(model.countKey, model.successKey);
}
}
/**
* 客户端发起的请求转发给 leader 节点.
*
* @SuppressWarnings("unchecked")
* @param request
* @return
*/
@Override
public ClientKVAck redirect(ClientKVReq request) {
Request<ClientKVReq> r = Request.newBuilder()
.obj(request)
.url(peerSet.getLeader()
.getAddr())
.cmd(Request.CLIENT_REQ)
.build();
Response response = rpcClient.send(r);
return (ClientKVAck)response.getResult();
}
/* =============================LifeCycle接口================================== */
@Override
public void init() throws Throwable {
if (started) {
return;
}
synchronized (this) {
if (started) {
return;
}
RPC_SERVER.start();
// consensus和delegate都是使用KV数据库实现的(这里使用的RocksDB)
consensus = new DefaultConsensus(this); // 启动一致性模块consensus
//delegate = new ClusterMembershipChangesImpl(this); // 初始化集群成员变更模块 delegate
RaftThreadPool.scheduleWithFixedDelay(heartBeatTask, 500); // 调度心跳任务
RaftThreadPool.scheduleAtFixedRate(electionTask, 6000, 500); // 调度选举任务
//RaftThreadPool.execute(replicationFailQueueConsumer);
LogEntry logEntry = logModule.getLast();
if (logEntry != null) {
currentTerm = logEntry.getTerm();
}
started = true;
LOGGER.info("start success, selfId :{}", peerSet.getSelf());
}
}
@Override
public void destroy() throws Throwable {
RPC_SERVER.stop();
}
/**
* 选举任务
* 1. 在转变成候选人后就立即开始选举过程
* 自增当前的任期号(currentTerm)
* 给自己投票
* 重置选举超时计时器
* 发送请求投票的 RPC 给其他所有服务器
* 2. 如果接收到大多数服务器的选票,那么就变成领导人
* 3. 如果接收到来自新的领导人的附加日志 RPC,转变成跟随者
* 4. 如果选举过程超时,再次发起一轮选举
*/
class ElectionTask implements Runnable {
@Override
public void run() {
if (status == LEADER) {
return;
}
long current = System.currentTimeMillis();
// 基于 RAFT 的随机时间,解决冲突.
electionTime = electionTime + ThreadLocalRandom.current().nextInt(50);
if (current - preElectionTime < electionTime) {
return;
}
status = NodeStatus.CANDIDATE;
LOGGER.error("node {} will become CANDIDATE and start election leader, current term : [{}], LastEntry : [{}]",
peerSet.getSelf(), currentTerm, logModule.getLast());
preElectionTime = System.currentTimeMillis() + ThreadLocalRandom.current().nextInt(200) + 150;
currentTerm = currentTerm + 1;
// 推荐自己.
votedFor = peerSet.getSelf().getAddr();
List<Peer> peers = peerSet.getPeersWithOutSelf();
ArrayList<Future> futureArrayList = new ArrayList<>();
LOGGER.info("peerList size : {}, peer list content : {}", peers.size(), peers);
// 向所有的同伴 发送请求
for (Peer peer : peers) {
futureArrayList.add(RaftThreadPool.submit(new Callable() {
@Override
public Object call() throws Exception {
long lastTerm = 0L;
LogEntry last = logModule.getLast();
if (last != null) {
lastTerm = last.getTerm();
}
RvoteParam param = RvoteParam.newBuilder().
term(currentTerm).
candidateId(peerSet.getSelf().getAddr()).
lastLogIndex(LongConvert.convert(logModule.getLastIndex())).
lastLogTerm(lastTerm).
build();
Request request = Request.newBuilder()
.cmd(Request.R_VOTE)
.obj(param)
.url(peer.getAddr())
.build();
try {
@SuppressWarnings("unchecked")
Response<RvoteResult> response = rpcClient.send(request); //getRpcClient().send(request);
return response;
} catch (RaftRemotingException e) {
LOGGER.error("ElectionTask RPC Fail , URL : " + request.getUrl());
return null;
}
}
}));
}
AtomicInteger success2 = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(futureArrayList.size());
for (Future future : futureArrayList) {
RaftThreadPool.submit(new Callable() {
@Override
public Object call() throws Exception {
try {
@SuppressWarnings("unchecked")
Response<RvoteResult> response = (Response<RvoteResult>) future.get(3000, MILLISECONDS);
if (response == null) {
return -1;
}
boolean isVoteGranted = response.getResult().isVoteGranted();
if (isVoteGranted) {
success2.incrementAndGet();
} else {
// 更新自己的任期.
long resTerm = response.getResult().getTerm();
if (resTerm >= currentTerm) {
currentTerm = resTerm;
}
}
return 0;
} catch (Exception e) {
LOGGER.error("future.get exception , e : ", e);
return -1;
} finally {
latch.countDown();
}
}
});
}
try {
// 稍等片刻
latch.await(3500, MILLISECONDS);
} catch (InterruptedException e) {
LOGGER.warn("InterruptedException By Master election Task");
}
int success = success2.get();
LOGGER.info("node {} maybe become leader , success count = {} , status : {}", peerSet.getSelf(), success, NodeStatus.Enum.value(status));
// 如果投票期间,有其他合法的Leader(在RPCServer.hangdlerRequest()中处理的req) 发送appendEntry, 就可能变成 follower
if (status == NodeStatus.FOLLOWER) {
return;
}
// 加上自身.
if (success >= peers.size() / 2) {
LOGGER.warn("node {} become leader ", peerSet.getSelf());
status = LEADER;
peerSet.setLeader(peerSet.getSelf());
votedFor = "";
becomeLeaderToDoThing();
} else {
// else 重新选举
votedFor = "";
}
}
}
/**
* 初始化所有的 nextIndex 值为自己的最后一条日志的 index + 1. 如果下次 RPC 时, 跟随者和leader 不一致,就会失败.
* 那么 leader 尝试递减 nextIndex 并进行重试.最终将达成一致.
*/
private void becomeLeaderToDoThing() {
nextIndexs = new ConcurrentHashMap<>();
matchIndexs = new ConcurrentHashMap<>();
for (Peer peer : peerSet.getPeersWithOutSelf()) {
nextIndexs.put(peer, logModule.getLastIndex() + 1);
matchIndexs.put(peer, 0L);
}
}
/**
* 心跳测试任务,leader节点调用
* 只有在是Leader的状态的时候,才会向自己的同伴节点发送心跳。
*/
class HeartBeatTask implements Runnable {
@Override
public void run() {
// 判断是否为Leader节点
if (status != LEADER) {
return;
}
long current = System.currentTimeMillis();
// 判断是否达到心跳检测时间
if (current - preHeartBeatTime < heartBeatTick) {
return;
}
LOGGER.info("=========== NextIndex =============");
for (Peer peer : peerSet.getPeersWithOutSelf()) {
LOGGER.info("Peer {} nextIndex={}", peer.getAddr(), nextIndexs.get(peer));
}
preHeartBeatTime = System.currentTimeMillis();
// 心跳只关心term(任期)和leaderID
for (Peer peer : peerSet.getPeersWithOutSelf()) {
AentryParam param = AentryParam.newBuilder() //附加日志RPC参数,测试心跳,空日志
.entries(null)
.leaderId(peerSet.getSelf().getAddr())
.serverId(peer.getAddr())
.term(currentTerm)
.build();
Request<AentryParam> request = new Request<>(
Request.A_ENTRIES,
param,
peer.getAddr()); // 向集群内其他服务器发起请求
RaftThreadPool.execute(() -> {
try {
Response response = rpcClient.send(request);
AentryResult aentryResult = (AentryResult)response.getResult();
long term = aentryResult.getTerm();
if (term > currentTerm) {
LOGGER.error("self will become follower, he's term : {}, my term : {}", term, currentTerm);
currentTerm = term;
votedFor = "";
status = NodeStatus.FOLLOWER;
}
} catch (Exception e) {
LOGGER.error("HeartBeatTask RPC Fail, request URL : {} ", request.getUrl());
}
}, false);
}
}
}
/**
* 添加节点.
*
* @param newPeer
* @return
*/
@Override
public Result addPeer(Peer newPeer) {
return delegate.addPeer(newPeer);
}
/**
* 删除节点.
*
* @param oldPeer
* @return
*/
@Override
public Result removePeer(Peer oldPeer) {
return delegate.removePeer(oldPeer);
}
}
package top.jbxie.raft.util.impl;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import top.jbxie.raft.entity.Command;
import top.jbxie.raft.entity.LogEntry;
import top.jbxie.raft.nodeCommon.NodeConfig;
import top.jbxie.raft.util.StateMachine;
import java.io.File;
/**
* 默认的状态机实现.
*/
@Getter
@Setter
public class DefaultStateMachine implements StateMachine {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultStateMachine.class);
/** public just for test */
public static String dbDir;
public static String stateMachineDir;
public static RocksDB machineDb;
static {
if (dbDir == null) {
dbDir = "./rocksDB-raft/" + System.getProperty("serverPort");
//dbDir = "./rocksDB-raft/" + 8779;
}
if (stateMachineDir == null) {
stateMachineDir = dbDir + "/stateMachine";
}
RocksDB.loadLibrary();
}
private DefaultStateMachine() {
synchronized (this) {
try {
File file = new File(stateMachineDir);
boolean success = false;
if (!file.exists()) {
success = file.mkdirs();
}
if (success) {
LOGGER.warn("make a new dir : " + stateMachineDir);
}
Options options = new Options();
options.setCreateIfMissing(true);
machineDb = RocksDB.open(options, stateMachineDir);
} catch (RocksDBException e) {
LOGGER.info(e.getMessage());
}
}
}
public static DefaultStateMachine getInstance() {
return DefaultStateMachineLazyHolder.INSTANCE;
}
private static class DefaultStateMachineLazyHolder {
private static final DefaultStateMachine INSTANCE = new DefaultStateMachine();
}
/**
* key为command中的key,可定义为消息主题,而value可定义为对应节点的ip
* @param key
* @return
*/
@Override
public LogEntry get(String key) {
try {
byte[] result = machineDb.get(key.getBytes());
if (result == null) {
return null;
}
return JSON.parseObject(result, LogEntry.class);
} catch (RocksDBException e) {
LOGGER.info(e.getMessage());
}
return null;
}
@Override
public String getString(String key) {
try {
byte[] bytes = machineDb.get(key.getBytes());
if (bytes != null) {
return new String(bytes);
}
} catch (RocksDBException e) {
LOGGER.info(e.getMessage());
}
return "";
}
@Override
public void setString(String key, String value) {
try {
machineDb.put(key.getBytes(), value.getBytes());
} catch (RocksDBException e) {
LOGGER.info(e.getMessage());
}
}
@Override
public void delString(String... key) {
try {
for (String s : key) {
machineDb.delete(s.getBytes());
}
} catch (RocksDBException e) {
LOGGER.info(e.getMessage());
}
}
/**
* 将数据应用到状态机.
* 原则上,只需这一个方法(apply). 其他的方法是为了更方便的使用状态机.
*
* @param logEntry 日志中的数据.
*/
@Override
public void apply(LogEntry logEntry) {
try {
Command command = logEntry.getCommand();
if (command == null) {
throw new IllegalArgumentException("command can not be null, logEntry : " + logEntry.toString());
}
String key = command.getKey();
machineDb.put(key.getBytes(), JSON.toJSONBytes(logEntry));
} catch (RocksDBException e) {
LOGGER.info(e.getMessage());
}
}
}
#server:
# port: 8775
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
<appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %t %5p [%c:%M:%L] - %m%n"/>
</layout>
</appender>
<root>
<level value="INFO"/>
<appender-ref ref="CONSOLE"/>
</root>
</log4j:configuration>
\ No newline at end of file
package top.jbxie.raft.impl;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import top.jbxie.raft.entity.Command;
import top.jbxie.raft.entity.LogEntry;
import top.jbxie.raft.util.impl.DefaultLogModule;
public class DefaultLogModuleTest {
static {
System.setProperty("serverPort", "8775");
DefaultLogModule.dbDir = "./rocksDB-raft/" + System.getProperty("serverPort");
DefaultLogModule.logsDir = DefaultLogModule.dbDir + "/logModule";
}
DefaultLogModule defaultLogs;
@Before
public void setUp() throws Exception {
System.setProperty("serverPort", "8775");
defaultLogs = DefaultLogModule.getInstance();
}
@After
public void tearDown() throws Exception {
}
@Test
public void write() {
LogEntry entry = LogEntry.newBuilder().
term(1).
command(Command.newBuilder().key("hello").value("world").build()).
build();
defaultLogs.write(entry);
Assert.assertEquals(entry, defaultLogs.read(entry.getIndex()));
}
@Test
public void read() {
System.out.println(defaultLogs.getLastIndex());
}
@Test
public void remove() {
defaultLogs.removeOnStartIndex(3L);
}
@Test
public void getLast() {
}
@Test
public void getLastIndex() {
}
@Test
public void getDbDir() {
}
@Test
public void getLogsDir() {
}
@Test
public void setDbDir() {
}
}
package top.jbxie.raft.impl;
import org.junit.Before;
import org.junit.Test;
import org.rocksdb.RocksDBException;
import top.jbxie.raft.entity.Command;
import top.jbxie.raft.entity.LogEntry;
import top.jbxie.raft.util.impl.DefaultStateMachine;
public class DefaultStateMachineTest {
static {
System.setProperty("serverPort", "8776");
DefaultStateMachine.dbDir = "./rocksDB-raft/" + System.getProperty("serverPort");
DefaultStateMachine.stateMachineDir = DefaultStateMachine.dbDir + "/stateMachine";
}
DefaultStateMachine machine;
@Before
public void before() {
machine = DefaultStateMachine.getInstance();
}
@Test
public void apply() {
LogEntry logEntry = LogEntry.newBuilder().term(1).command(Command.newBuilder().key("hello").value("value1").build()).build();
machine.apply(logEntry);
}
@Test
public void applyRead() throws RocksDBException {
System.out.println(machine.get("hello:4"));
}
}
package top.jbxie.raft.impl;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.junit.Before;
import org.junit.Test;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.springframework.beans.factory.annotation.Autowired;
import top.jbxie.raft.util.impl.DefaultLogModule;
import java.io.File;
public class RocksDBTest {
public static String dbDir;
public static String stateMachineDir;
public static RocksDB machineDb;
static {
if (dbDir == null) {
dbDir = "./rocksDB-raft/" + System.getProperty("serverPort");
}
if (stateMachineDir == null) {
stateMachineDir = dbDir + "/stateMachine";
}
RocksDB.loadLibrary();
}
public RocksDBTest() {
try {
System.setProperty("serverPort", "8779");
File file = new File(stateMachineDir);
if (!file.exists()) {
file.mkdirs();
}
Options options = new Options();
options.setCreateIfMissing(true);
machineDb = RocksDB.open(options, stateMachineDir); // 打开数据库
} catch (RocksDBException e) {
e.printStackTrace();
}
}
public static RocksDBTest getInstance() {
return RocksDBTestLazyHolder.INSTANCE;
}
private static class RocksDBTestLazyHolder {
private static final RocksDBTest INSTANCE = new RocksDBTest();
}
@Test
public void testRead() throws RocksDBException {
System.out.println(machineDb.get("hello:4".getBytes()));
}
// private String dbDir = "./rocksDB-raft/" + System.getProperty("serverPort");
// private String stateMachineDir = dbDir + "/test";
//
// public RocksDB machineDb;
//
// static {
// RocksDB.loadLibrary(); // 加载jni
// }
//
// public byte[] lastIndexKey = "LAST_INDEX_KEY".getBytes();
//
// public RocksDBTest() {
// try {
// System.setProperty("serverPort", "8776");
// File file = new File(stateMachineDir);
// if (!file.exists()) {
// file.mkdirs();
// }
// Options options = new Options();
// options.setCreateIfMissing(true);
// machineDb = RocksDB.open(options, stateMachineDir); // 打开数据库
//
// } catch (RocksDBException e) {
// e.printStackTrace();
// }
// }
//
//
// public static RocksDBTest getInstance() {
// return RocksDBTestLazyHolder.INSTANCE;
// }
//
// private static class RocksDBTestLazyHolder {
// private static final RocksDBTest INSTANCE = new RocksDBTest();
// }
//
// RocksDBTest instance;
////
//// @Autowired
//// DefaultLogModule defaultLogModule;
//
//// @Test
//// public void testRead() throws RocksDBException {
//// System.out.println(defaultLogModule.getLastIndex());
//// System.out.println(defaultLogModule.read(defaultLogModule.getLastIndex()));
//// }
//
// @Before
// public void before() {
// instance = getInstance();
// }
//
// /**
// * key: lastIndexKey, value: lastIndex
// * key: lastIndex, value: Cmd
// * @throws RocksDBException
// */
// @Test
// public void test() throws RocksDBException {
// System.out.println(getLastIndex());
// System.out.println(get(getLastIndex()));
//
// write(new Cmd("hello", "value"));
//
// System.out.println(getLastIndex());
//
// System.out.println(get(getLastIndex()));
//
// deleteOnStartIndex(getLastIndex());
//
// write(new Cmd("hello", "value"));
//
// deleteOnStartIndex(1L);
//
// System.out.println(getLastIndex());
//
// System.out.println(get(getLastIndex()));
//
//
// }
//
// public synchronized void write(Cmd cmd) {
// try {
// cmd.setIndex(getLastIndex() + 1);
// machineDb.put(cmd.getIndex().toString().getBytes(), JSON.toJSONBytes(cmd));
// } catch (RocksDBException e) {
// e.printStackTrace();
// } finally {
// updateLastIndex(cmd.getIndex());
// }
// }
//
// public synchronized void deleteOnStartIndex(Long index) {
// try {
// for (long i = index; i <= getLastIndex(); i++) {
// try {
// machineDb.delete((i + "").getBytes());
// } catch (RocksDBException e) {
// e.printStackTrace();
// }
// }
//
// } finally {
// updateLastIndex(index - 1);
// }
// }
//
// public Cmd get(Long index) {
// try {
// if (index == null) {
// throw new IllegalArgumentException();
// }
// byte[] cmd = machineDb.get(index.toString().getBytes());
// if (cmd != null) {
// return JSON.parseObject(machineDb.get(index.toString().getBytes()), Cmd.class);
// }
// } catch (RocksDBException e) {
// e.printStackTrace();
// }
// return null;
// }
//
//
// public void updateLastIndex(Long index) {
// try {
// // overWrite
// machineDb.put(this.lastIndexKey, index.toString().getBytes());
// } catch (RocksDBException e) {
// e.printStackTrace();
// }
// }
//
// public Long getLastIndex() {
// byte[] lastIndex = new byte[0];
// try {
// lastIndex = machineDb.get(this.lastIndexKey);
// if (lastIndex == null) {
// lastIndex = "0".getBytes();
// }
// } catch (RocksDBException e) {
// e.printStackTrace();
// }
// return Long.valueOf(new String(lastIndex));
// }
//
// @Setter
// @Getter
// @ToString
// static class Cmd {
//
// Long index;
// String key;
// String value;
//
// public Cmd() {
// }
//
// public Cmd(String key, String value) {
// this.key = key;
// this.value = value;
// }
// }
}
package top.jbxie.raft.rpc;
import com.alipay.remoting.exception.RemotingException;
import top.jbxie.raft.rpc.Request;
import top.jbxie.raft.rpc.Response;
public class RpcClientTest {
private final static com.alipay.remoting.rpc.RpcClient CLIENT = new com.alipay.remoting.rpc.RpcClient();
static {
CLIENT.init();
}
public static void main(String[] args) throws RemotingException, InterruptedException {
send();
}
public static void send() throws RemotingException, InterruptedException {
Response res = null;
for (int i = 0; i < 10; i++) {
Request request = Request.newBuilder()
.cmd(Request.A_ENTRIES)
.obj("req")
.build();
res = (Response) CLIENT.invokeSync("127.0.0.1:8080", request, 200000);
System.out.println("Client" + (i +1 ) + "次收到服务端回应" + res.getResult());
}
}
}
package top.jbxie.raft.rpc;
import com.alipay.remoting.BizContext;
import top.jbxie.raft.rpc.RaftUserProcessor;
import top.jbxie.raft.rpc.Request;
import top.jbxie.raft.rpc.Response;
public class RpcServerTest {
private static com.alipay.remoting.rpc.RpcServer rpcServer;
static {
rpcServer = new com.alipay.remoting.rpc.RpcServer(8080);
rpcServer.registerUserProcessor(new RaftUserProcessor<Request>() {
@Override
public Object handleRequest(BizContext bizCtx, Request request) throws Exception {
return handlerRequest(request);
}
});
}
public static void main(String[] args) {
start();
}
public static void start() {
rpcServer.start();
}
private static Response handlerRequest(Request req) {
System.out.println("dddddd");
return new Response("服务端返回");
}
}
......@@ -9,7 +9,6 @@
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>cnf-utils</module>
<module>cnf-common-api</module>
<module>cnf-cloud-ipservice</module>
......@@ -20,6 +19,7 @@
<module>apps/cnf-weixingsim</module>
<module>apps/cnf-case-dis</module>
<module>apps/cnf-app-demo</module>
<module>apps/cnf-space-iot</module>
</modules>
......@@ -29,6 +29,7 @@
<spring.version>5.1.4.RELEASE</spring.version>
<docker.version>8.16.0</docker.version>
<curator.version>2.12.0</curator.version>
<lombok.version>1.18.12</lombok.version>
</properties>
<dependencyManagement>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment