Commit 245e984e authored by wutu's avatar wutu

终于开发完了逻辑topo更新的模块

parent 81831dd5
...@@ -15,4 +15,6 @@ public interface CloudService { ...@@ -15,4 +15,6 @@ public interface CloudService {
List<SeparatedClusterConfig> sendClusterConfigToEdgeNode(List<SeparatedClusterConfig> clusterConfigs); List<SeparatedClusterConfig> sendClusterConfigToEdgeNode(List<SeparatedClusterConfig> clusterConfigs);
boolean deleteClusterFromEdgeNode(long clusterId); boolean deleteClusterFromEdgeNode(long clusterId);
void updateLogicalTopology(long clusterId, NetworkTopology topology);
} }
...@@ -22,8 +22,10 @@ public class CloudServiceImpl implements CloudService { ...@@ -22,8 +22,10 @@ public class CloudServiceImpl implements CloudService {
private static final Random randomInt = new Random(14); private static final Random randomInt = new Random(14);
private static final String CREATE_CLUSTER = "/createCluster"; private static final String CREATE_CLUSTER = "/createCluster";
private static final String DELETE_CLUSTER = "/delCluster"; private static final String DELETE_CLUSTER = "/delCluster";
private static final String DROP_DOCKER_NETWORK = "dropDockerNetwork"; private static final String DROP_DOCKER_NETWORK = "/dropDockerNetwork?clusterId={clusterId}&appName={appName}&ipList={ipList}";
private static final String CANCEL_DROP_DOCKER_NETWORK = "cancdelDropDockerNetwork"; private static final String CANCEL_DROP_DOCKER_NETWORK = "/cancelDropDockerNetwork?clusterId={clusterId}&appName={appName}&ipList={ipList}";
private static final String REMOTE_IP_LIST_BY_APPNAME = "http://127.0.0.1:23333/getIpListByAppName?clusterId={clusterId}&appName={appName}";
// 全部的逻辑拓扑 // 全部的逻辑拓扑
// 全部的逻辑拓扑 // 全部的逻辑拓扑
private ConcurrentHashMap<Long, NetworkTopology> allLogicalTopo = new ConcurrentHashMap<>(); private ConcurrentHashMap<Long, NetworkTopology> allLogicalTopo = new ConcurrentHashMap<>();
...@@ -61,6 +63,18 @@ public class CloudServiceImpl implements CloudService { ...@@ -61,6 +63,18 @@ public class CloudServiceImpl implements CloudService {
return ""; return "";
} }
private Set<String> getIpListFromRemoteIpServiceByAppName(long clusterId, String appName) {
Map<String, Object> param = new HashMap<>();
param.put("clusterId", clusterId);
param.put("appName", appName);
ResponseEntity<Set> restEntity = restTemplate.getForEntity(REMOTE_IP_LIST_BY_APPNAME, Set.class, param);
if(!restEntity.getStatusCode().is2xxSuccessful()) {
LOG.error("远程IP服务无法连接");
throw new RuntimeException("远程服务无法连接:" + REMOTE_IP_LIST_BY_APPNAME);
}
return restEntity.getBody();
}
private ClusterConfig registerClusterInDataBase(ClusterConfig clusterConfig) { private ClusterConfig registerClusterInDataBase(ClusterConfig clusterConfig) {
clusterConfig.setId(11111); clusterConfig.setId(11111);
return clusterConfig; return clusterConfig;
...@@ -201,7 +215,7 @@ public class CloudServiceImpl implements CloudService { ...@@ -201,7 +215,7 @@ public class CloudServiceImpl implements CloudService {
LOG.warn("下发的集群配置为空。"); LOG.warn("下发的集群配置为空。");
return clusterConfigs; return clusterConfigs;
} }
Set<String> allAppNames = new HashSet<>(); List<String> allAppNames = new ArrayList<>();
// 这里应该做一个判断,edgeNode的ip是否可用,如果不可用,或者未设定,将随机挑选一个进行设置 // 这里应该做一个判断,edgeNode的ip是否可用,如果不可用,或者未设定,将随机挑选一个进行设置
clusterConfigs.forEach(c -> { clusterConfigs.forEach(c -> {
if (c.getEdgeNodeId() == null || "".equals(c.getEdgeNodeId())) { if (c.getEdgeNodeId() == null || "".equals(c.getEdgeNodeId())) {
...@@ -242,7 +256,7 @@ public class CloudServiceImpl implements CloudService { ...@@ -242,7 +256,7 @@ public class CloudServiceImpl implements CloudService {
return clusterConfigs; return clusterConfigs;
} }
private NetworkTopology initNetworkTopology(Set<String> allAppNames) { private NetworkTopology initNetworkTopology(List<String> allAppNames) {
NetworkTopology networkTopology = new NetworkTopology(); NetworkTopology networkTopology = new NetworkTopology();
networkTopology.setTopologyId(1); networkTopology.setTopologyId(1);
networkTopology.setAppNames(allAppNames.toArray(new String[allAppNames.size()])); networkTopology.setAppNames(allAppNames.toArray(new String[allAppNames.size()]));
...@@ -283,10 +297,11 @@ public class CloudServiceImpl implements CloudService { ...@@ -283,10 +297,11 @@ public class CloudServiceImpl implements CloudService {
} }
/** /**
* 设置网络的逻辑拓扑 * 设置网络的逻辑拓扑,
* @param clusterId * @param clusterId
* @param topology * @param topology 这里指的是逻辑拓扑
*/ */
@Override
public void updateLogicalTopology(long clusterId, NetworkTopology topology) { public void updateLogicalTopology(long clusterId, NetworkTopology topology) {
// 校验下topo是否合法 // 校验下topo是否合法
for (String appName : topology.getAppNames()) { for (String appName : topology.getAppNames()) {
...@@ -306,7 +321,8 @@ public class CloudServiceImpl implements CloudService { ...@@ -306,7 +321,8 @@ public class CloudServiceImpl implements CloudService {
throw new RuntimeException("更新的topo不一致"); throw new RuntimeException("更新的topo不一致");
} }
for (int i = 0; i < appNames.length; i++) { for (int i = 0; i < appNames.length; i++) {
if (appNames[i] != topology.getAppNames()[i]) { // 这里的名称还需要在做处理
if (!appNames[i].equals(topology.getAppNames()[i])) {
throw new RuntimeException("更新的名称不一致"); throw new RuntimeException("更新的名称不一致");
} }
} }
...@@ -315,24 +331,82 @@ public class CloudServiceImpl implements CloudService { ...@@ -315,24 +331,82 @@ public class CloudServiceImpl implements CloudService {
int[][] newTopo = topology.getTopology(); int[][] newTopo = topology.getTopology();
// 对比更新 // 对比更新
for (int i = 0; i < appNames.length; i++) { for (int i = 1; i < appNames.length; i++) {
for (int j = 0; j < appNames.length; j++) { for (int j = 0; j < i; j++) {
if(oldTopo[i][j] == 1 && newTopo[i][j] == 0) { if(oldTopo[i][j] == 1 && newTopo[i][j] == 0) {
// 阻断连接 // 阻断连接
// TODO step1 找到appNames对应的边缘服务器 // 找到appNames对应的边缘服务器
Set<String> edgeNodesIdSrc = allAppStatus.get(clusterId).get(appNames[i]); Set<String> edgeNodesIdSrc = allAppStatus.get(clusterId).get(appNames[i]);
// TODO step2 更新网络
Set<String> edgeNodesIdDst = allAppStatus.get(clusterId).get(appNames[j]); Set<String> edgeNodesIdDst = allAppStatus.get(clusterId).get(appNames[j]);
// 获取全部的Dst 容器网络 // 获取AppName对应的全部容器IP地址
Set<String> appName1List = getIpListFromRemoteIpServiceByAppName(clusterId, appNames[i]);
Set<String> appName2List = getIpListFromRemoteIpServiceByAppName(clusterId, appNames[j]);
String dstAppName = appNames[i];
edgeNodesIdSrc.forEach(edgeIp -> {
// 调用远程阻断连接端口
Map<String, Object> param = new HashMap<>();
param.put("clusterId", clusterId);
param.put("appName", dstAppName);
param.put("ipList", appName2List.toArray());
ResponseEntity<String> forEntity = restTemplate.getForEntity("http://" + edgeIp + DROP_DOCKER_NETWORK, String.class, param);
if (!forEntity.getStatusCode().is2xxSuccessful()){
LOG.error("下发逻辑拓扑修改出现错误");
}
});
String srcAppName = appNames[j];
edgeNodesIdDst.forEach(edgeIp -> {
// 调用远程阻断连接端口
Map<String, Object> param = new HashMap<>();
param.put("clusterId", clusterId);
param.put("appName", srcAppName);
param.put("ipList", appName1List.toArray());
ResponseEntity<String> forEntity = restTemplate.getForEntity("http://" + edgeIp + DROP_DOCKER_NETWORK, String.class, param);
if (!forEntity.getStatusCode().is2xxSuccessful()){
LOG.error("下发逻辑拓扑修改出现错误");
}
});
// step2 调用远程接口,添加阻断连接 // step2 调用远程接口,添加阻断连接
} }
else if(oldTopo[i][j] == 0 && newTopo[i][j] == 1) { else if(oldTopo[i][j] == 0 && newTopo[i][j] == 1) {
// 创建连接 // 创建连接
// step1 找到appNames对应的边缘服务器 // step1 找到appNames对应的边缘服务器
// step2 调用远程接口,删除阻断连接 // step2 调用远程接口,删除阻断连接
Set<String> edgeNodesIdSrc = allAppStatus.get(clusterId).get(appNames[i]);
Set<String> edgeNodesIdDst = allAppStatus.get(clusterId).get(appNames[j]);
// 获取AppName对应的全部容器IP地址
Set<String> appName1List = getIpListFromRemoteIpServiceByAppName(clusterId, appNames[i]);
Set<String> appName2List = getIpListFromRemoteIpServiceByAppName(clusterId, appNames[j]);
String dstAppName = appNames[i];
edgeNodesIdSrc.forEach(edgeIp -> {
// 调用远程阻断连接端口
Map<String, Object> param = new HashMap<>();
param.put("clusterId", clusterId);
param.put("appName", dstAppName);
param.put("ipList", appName2List.toArray());
ResponseEntity<String> forEntity = restTemplate.getForEntity("http://" + edgeIp + CANCEL_DROP_DOCKER_NETWORK, String.class, param);
if (!forEntity.getStatusCode().is2xxSuccessful()){
LOG.error("下发逻辑拓扑修改出现错误");
}
});
String srcAppName = appNames[j];
edgeNodesIdDst.forEach(edgeIp -> {
// 调用远程阻断连接端口
Map<String, Object> param = new HashMap<>();
param.put("clusterId", clusterId);
param.put("appName", srcAppName);
param.put("ipList", appName1List.toArray());
ResponseEntity<String> forEntity = restTemplate.getForEntity("http://" + edgeIp + CANCEL_DROP_DOCKER_NETWORK, String.class, param);
if (!forEntity.getStatusCode().is2xxSuccessful()){
LOG.error("下发逻辑拓扑修改出现错误");
}
});
} }
} }
} }
networkTopology.setTopology(newTopo);
allLogicalTopo.put(clusterId, networkTopology);
} }
} }
package top.ninwoo.cloud; package top.ninwoo.cloud;
import com.spotify.docker.client.messages.BlockIoStats;
import com.spotify.docker.client.messages.Network;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.web.client.RestTemplate;
import top.ninwoo.cloudcenter.CloudCenterMain; import top.ninwoo.cloudcenter.CloudCenterMain;
import top.ninwoo.cloudcenter.entity.SeparatedClusterConfig; import top.ninwoo.cloudcenter.entity.SeparatedClusterConfig;
import top.ninwoo.cloudcenter.service.CloudService; import top.ninwoo.cloudcenter.service.CloudService;
...@@ -13,8 +20,7 @@ import top.ninwoo.common.entity.ContainerDescription; ...@@ -13,8 +20,7 @@ import top.ninwoo.common.entity.ContainerDescription;
import top.ninwoo.common.entity.DockerContainer; import top.ninwoo.common.entity.DockerContainer;
import top.ninwoo.common.entity.NetworkTopology; import top.ninwoo.common.entity.NetworkTopology;
import java.util.ArrayList; import java.util.*;
import java.util.List;
@RunWith(SpringRunner.class) @RunWith(SpringRunner.class)
@SpringBootTest(classes = CloudCenterMain.class) @SpringBootTest(classes = CloudCenterMain.class)
...@@ -22,11 +28,15 @@ public class CloudServiceImplTest { ...@@ -22,11 +28,15 @@ public class CloudServiceImplTest {
@Autowired @Autowired
private CloudService cloudService; private CloudService cloudService;
@Test @Autowired
private RestTemplate restTemplate;
@Before
public void sendClusterConfigToEdgeNodeTest() { public void sendClusterConfigToEdgeNodeTest() {
ArrayList<SeparatedClusterConfig> clusterConfigs = new ArrayList<>(); ArrayList<SeparatedClusterConfig> clusterConfigs = new ArrayList<>();
SeparatedClusterConfig separatedClusterConfig = new SeparatedClusterConfig(); SeparatedClusterConfig separatedClusterConfig = new SeparatedClusterConfig();
separatedClusterConfig.setEdgeNodeId("10.174.22.207:8081"); // TODO: 这个ID应该是从借口获取的
separatedClusterConfig.setEdgeNodeId("127.0.0.1:8081");
ClusterConfig clusterConfig = new ClusterConfig(); ClusterConfig clusterConfig = new ClusterConfig();
clusterConfig.setId(11111l); clusterConfig.setId(11111l);
clusterConfig.setOwner("joliu"); clusterConfig.setOwner("joliu");
...@@ -63,12 +73,42 @@ public class CloudServiceImplTest { ...@@ -63,12 +73,42 @@ public class CloudServiceImplTest {
separatedClusterConfig.setClusterConfig(clusterConfig); separatedClusterConfig.setClusterConfig(clusterConfig);
clusterConfigs.add(separatedClusterConfig); clusterConfigs.add(separatedClusterConfig);
// 下发配置到边缘服务器
cloudService.sendClusterConfigToEdgeNode(clusterConfigs); cloudService.sendClusterConfigToEdgeNode(clusterConfigs);
cloudService.deleteClusterFromEdgeNode(11111l);
} }
@After
public void close() {
// 清除集群
cloudService.deleteClusterFromEdgeNode(11111l);
}
@Test @Test
public void deleteClusterFromEdgeNodeTest() { public void deleteClusterFromEdgeNodeTest() {
} }
@Test
public void getIpListByAppNameFromRemoteIpService() {
final String remoteIpService = "http://127.0.0.1:23333/getIpListByAppName?clusterId={clusterId}&appName={appName}";
Map<String, Object> map = new HashMap<>();
map.put("clusterId", 11111);
map.put("appName", "test");
ResponseEntity<HashSet> forEntity = restTemplate.getForEntity(remoteIpService, HashSet.class, map);
Assert.assertTrue(forEntity.getStatusCode().is2xxSuccessful());
}
;
/**
* 测试更新网络拓扑的功能模块
*/
@Test
public void updateTopologyTest() {
NetworkTopology logicalTopology = new NetworkTopology();
logicalTopology.setAppNames(new String[]{"Run", "APP"});
logicalTopology.setTopology(new int[][]{{0,0},{0,0}});
cloudService.updateLogicalTopology(11111l, logicalTopology);
logicalTopology.setTopology(new int[][]{{0,0}, {1,0}});
cloudService.updateLogicalTopology(11111l, logicalTopology);
}
} }
...@@ -23,7 +23,13 @@ public class IpController { ...@@ -23,7 +23,13 @@ public class IpController {
return ipService.getContainerIp(cid); return ipService.getContainerIp(cid);
} }
@RequestMapping("/getIpListByAppName")
public Set<String> getIpListByAppName(long clusterId, String appName) { public Set<String> getIpListByAppName(long clusterId, String appName) {
return ipService.getIpListByAppName(clusterId, appName); return ipService.getIpListByAppName(clusterId, appName);
} }
@RequestMapping("/clearIpByClusterId")
public String clearIpByClusterId(long clusterId) {
return ipService.clearIpByClusterId(clusterId);
}
} }
\ No newline at end of file
...@@ -9,4 +9,6 @@ public interface IpService { ...@@ -9,4 +9,6 @@ public interface IpService {
String getContainerIp(String containerId); String getContainerIp(String containerId);
Set<String> getIpListByAppName(long clusterId, String appName); Set<String> getIpListByAppName(long clusterId, String appName);
String clearIpByClusterId(long clusterId);
} }
...@@ -67,7 +67,7 @@ public class IpServiceImpl implements IpService { ...@@ -67,7 +67,7 @@ public class IpServiceImpl implements IpService {
@Override @Override
public String getContainerIp(String containerId) { public String getContainerIp(String containerId) {
if(!ipMap.containsKey(containerId)) { if(!ipMap.containsKey(containerId)) {
throw new RuntimeException("docker容器并没有设置ip地址"); return "";
} }
return ipMap.get(containerId); return ipMap.get(containerId);
} }
...@@ -87,4 +87,13 @@ public class IpServiceImpl implements IpService { ...@@ -87,4 +87,13 @@ public class IpServiceImpl implements IpService {
} }
// TODO: 差一个把ip还回去的功能模块 // TODO: 差一个把ip还回去的功能模块
@Override
public String clearIpByClusterId(long clusterId) {
ipMap.clear();
appIp.clear();
countMap.clear();
return "success";
}
} }
\ No newline at end of file
...@@ -220,14 +220,15 @@ public class IndexController { ...@@ -220,14 +220,15 @@ public class IndexController {
} }
@RequestMapping(value = "/dropDockerNetwork") @RequestMapping(value = "/dropDockerNetwork")
public String dropDockerNetwork(long clusterId, String appName, List<String> ipList) { public String dropDockerNetwork(long clusterId, String appName, String[] ipList) {
topologyService.dropDockerNetwork(clusterId, appName, ipList); List<String> ips = Arrays.asList(ipList);
topologyService.dropDockerNetwork(clusterId, appName, ips);
return "success"; return "success";
} }
@RequestMapping(value = "/cancelDropDockerNetwork") @RequestMapping(value = "/cancelDropDockerNetwork")
public String cancelDropDockerNetwork(long clusterId, String appName, List<String> ipList) { public String cancelDropDockerNetwork(long clusterId, String appName, String[] ipList) {
topologyService.cancelDropDockerNetwork(clusterId, appName, ipList); topologyService.cancelDropDockerNetwork(clusterId, appName, Arrays.asList(ipList));
return "success"; return "success";
} }
......
package top.ninwoo.edgecenter.service.impl; package top.ninwoo.edgecenter.service.impl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
...@@ -20,10 +22,12 @@ import java.util.concurrent.atomic.AtomicInteger; ...@@ -20,10 +22,12 @@ import java.util.concurrent.atomic.AtomicInteger;
@Service @Service
public class IpServiceImpl implements IpService { public class IpServiceImpl implements IpService {
private static final String ASSIGN_IP = "/assignIp"; private static final Logger LOG = LoggerFactory.getLogger(IpServiceImpl.class);
private static final String GET_IP_BY_CONTAINERID = "/getIpByContainerId"; // clusterId, String appName, String cid, String ipRange
private static final String ASSIGN_IP = "/assignIp?clusterId={clusterId}&appName={appName}&cid={cid}&ipRange={ipRange}";
private static final String GET_IP_BY_CONTAINERID = "/getIpByContainerId?cid=";
@Value("${bs.cloud.ip}") @Value("${bs.ipservice.ip}")
private String ipServiceHost; private String ipServiceHost;
@Autowired @Autowired
...@@ -51,9 +55,11 @@ public class IpServiceImpl implements IpService { ...@@ -51,9 +55,11 @@ public class IpServiceImpl implements IpService {
@Override @Override
public String getContainerIp(String containerId) { public String getContainerIp(String containerId) {
if(!ipMap.containsKey(containerId)) { // 更换为远程接口
throw new RuntimeException("docker容器并没有设置ip地址"); String ip = restTemplate.getForObject("http://" + ipServiceHost + GET_IP_BY_CONTAINERID + containerId, String.class);
if("".equals(ip)) {
LOG.warn("容器 {} 没有查询到已分配的ip", containerId);
} }
return ipMap.get(containerId); return ip;
} }
} }
...@@ -20,7 +20,8 @@ bs: ...@@ -20,7 +20,8 @@ bs:
edgenode: edgenode:
name: random name: random
ip-prefix: 192 ip-prefix: 192
ipservice:
ip: 127.0.0.1:23333
zookeeper: zookeeper:
url: 127.0.0.1:2181 url: 127.0.0.1:2181
...@@ -7,6 +7,8 @@ import org.junit.runner.RunWith; ...@@ -7,6 +7,8 @@ import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.context.junit4.SpringRunner;
import top.ninwoo.common.entity.ClusterConfig;
import top.ninwoo.common.entity.ContainerDescription;
import top.ninwoo.common.entity.DockerContainer; import top.ninwoo.common.entity.DockerContainer;
import top.ninwoo.edgecenter.service.ClusterService; import top.ninwoo.edgecenter.service.ClusterService;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment