Commit 7cb45ac8 authored by wutu's avatar wutu

基础的边缘节点注册模块

parent 2bc4f92c
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bishe</artifactId>
<groupId>top.ninwoo</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bishe-cloud-center</artifactId>
<description>用作集群工的汇总</description>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.1.2.RELEASE</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 对zookeeper的底层api的一些封装 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<!-- 封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式Barrier -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<!--日志模块-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
<dependency>
<groupId>top.ninwoo</groupId>
<artifactId>bishe-utils</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package top.ninwoo.cloudcenter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class CloudCenterMain {
public static void main(String[] args) {
SpringApplication.run(CloudCenterMain.class, args);
}
}
package top.ninwoo.cloudcenter.register;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import top.ninwoo.utils.util.impl.IpUtils;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.List;
@Component
public class CloudRegisterCenter implements Register {
private final static Logger LOG = LoggerFactory.getLogger(CloudRegisterCenter.class);
private final static String hostAddress = IpUtils.getHostIp("10.");
@Value("${bs.cloudcenter.name}")
private String cloudCenterName;
@Resource
private CuratorFramework zkClient;
// 边缘计算节点的监控器
private PathChildrenCache edgeNodesWatcher = null;
@PostConstruct
public void init() {
if("".equals(cloudCenterName)) {
cloudCenterName = "default-cloud-center-name";
}
// 注册节点
registerNode(cloudCenterName);
// 注册监听节点
edgeNodesWatcher = createEdgeNodesWatcher(cloudCenterName);
}
@PreDestroy
public void close() {
if(edgeNodesWatcher != null) {
try {
edgeNodesWatcher.close();
} catch (IOException e) {
LOG.warn("监听器关闭失败");
e.printStackTrace();
}
}
}
@Override
public void registerNode(final String nodeId) {
try {
List<String> homeNodes = zkClient.getChildren().forPath("/");
if(!homeNodes.contains(nodeId)) {
LOG.info("创建初始化的云节点");
zkClient.create()
.withMode(CreateMode.PERSISTENT) // 这里选择创建持久节点
.forPath("/" + nodeId, hostAddress.getBytes());
} else {
// 更新IP地址
LOG.info("Cloud 地址更新");
zkClient.setData().forPath("/" + nodeId, hostAddress.getBytes());
}
} catch (Exception e) {
LOG.error("创建cloud节点{}失败", nodeId);
e.printStackTrace();
}
}
/**
* 创建节点监听器
* @param nodeId
* @return
*/
public PathChildrenCache createEdgeNodesWatcher(final String nodeId) {
PathChildrenCache watcher = new PathChildrenCache(zkClient, "/" + nodeId, true);
watcher.getListenable().addListener((client, event) -> {
if(event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
LOG.info("新的节点{}已接入", event.getData().getPath());
}
else if(event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
LOG.info("节点{}已离线", event.getData().getPath());
}
});
try {
watcher.start();
LOG.info("节点监听器已启动");
return watcher;
} catch (Exception e) {
LOG.warn("节点监听器启动失败");
e.printStackTrace();
return null;
}
}
}
package top.ninwoo.cloudcenter.register;
public interface Register {
/**
* 根据nodeId注册节点在zookeeper
* @param nodeId
*/
void registerNode(String nodeId);
}
package top.ninwoo.cloudcenter.register;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ZookeeperConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperConfiguration.class);
@Value("${zookeeper.url}")
private String url;
@Bean(destroyMethod = "close")
public CuratorFramework zkClient() {
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(url, retryPolicy);
try {
LOG.info("Zookeeper客户端[{}]已经连接", url);
client.start();
} catch (Exception e) {
LOG.error("Zookeeper客户端[{}]连接失败", url);
e.printStackTrace();
}
return client;
}
}
package top.ninwoo.cloudcenter.utils;
/**
* 用于发送消息的通用接口
*/
public interface Transporter {
}
server:
port: 9090
zookeeper:
url: 127.0.0.1:2181
bs:
cloudcenter:
name: my-bs-cloud-center
\ No newline at end of file
package top.ninwoo.edgecenter.config;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ZookeeperConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperConfiguration.class);
@Value("${zookeeper.url}")
private String url;
@Bean(destroyMethod = "close")
public CuratorFramework zkClient() {
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(url, retryPolicy);
try {
LOG.info("Zookeeper客户端[{}]已经连接", url);
client.start();
} catch (Exception e) {
LOG.error("Zookeeper客户端[{}]连接失败", url);
e.printStackTrace();
}
return client;
}
}
package top.ninwoo.edgecenter.register;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import top.ninwoo.utils.util.impl.IpUtils;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.UUID;
/**
* 负责将边缘服务器的信息注册到Zookeeper上
*/
@Component
public class EdgeNodeRegister implements Register {
private final static Logger LOG = LoggerFactory.getLogger(EdgeNodeRegister.class);
@Resource
CuratorFramework zkClient;
@Value("${bs.cloudcenter.name}")
private String cloudCenterName;
@Value("${bs.edgenode.name}")
private String edgeNodeId;
@PostConstruct
public void init() {
LOG.info("开始注册边缘节点{}", edgeNodeId);
regsiterNode(edgeNodeId);
}
@Override
public void regsiterNode(String nodeId) {
if(nodeId.equals("random")) {
nodeId = UUID.randomUUID().toString();
}
try {
zkClient.create()
.withMode(CreateMode.EPHEMERAL)
.forPath("/" + cloudCenterName + "/" + nodeId, IpUtils.getHostIp("10.").getBytes());
LOG.info("临时节点【{}】已创建", nodeId);
} catch (Exception e) {
LOG.error("临时节点【{}】创建失败", nodeId);
e.printStackTrace();
}
}
}
package top.ninwoo.edgecenter.register;
public interface Register {
void regsiterNode(String nodeId);
}
......@@ -13,3 +13,13 @@ mybatis:
debug: false
server:
port: 8081
bs:
cloudcenter:
name: my-bs-cloud-center
edgenode:
name: random
zookeeper:
url: 127.0.0.1:2181
......@@ -43,7 +43,6 @@
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package top.ninwoo.utils.util.impl;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Enumeration;
public class IpUtils {
public static String getHostIp(String prefixString) {
Enumeration<NetworkInterface> netInterfaces = null;
try {
netInterfaces = NetworkInterface.getNetworkInterfaces();
while (netInterfaces.hasMoreElements()) {
NetworkInterface nif = netInterfaces.nextElement();
Enumeration<InetAddress> InetAddress = nif.getInetAddresses();
while (InetAddress.hasMoreElements()) {
String ip = InetAddress.nextElement().getHostAddress();
if (ip.startsWith(prefixString)) {
return ip;
}
}
}
} catch (SocketException e) {
e.printStackTrace();
}
return "";
}
}
......@@ -11,6 +11,7 @@
<modules>
<module>bishe-edge-center</module>
<module>bishe-utils</module>
<module>bishe-cloud-center</module>
</modules>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment