Commit 45845869 authored by wutu's avatar wutu

添加了多线程构建容器集群,但是好像没起作用

parent a8028b50
package top.ninwoo.cloudcenter.config; package top.ninwoo.cloudcenter.config;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
@Configuration @Configuration
public class RestTemplateConfiguration { public class RestTemplateConfiguration {
@Bean @Bean
public RestTemplate restTemplate() { public RestTemplate restTemplate() {
return new RestTemplate(); return new RestTemplate(clientHttpRequestFactory());
}
@Bean
public HttpClientConnectionManager poolingConnectionManager() {
PoolingHttpClientConnectionManager poolingConnectionManager = new PoolingHttpClientConnectionManager();
poolingConnectionManager.setMaxTotal(1000); // 最大连接数
poolingConnectionManager.setDefaultMaxPerRoute(100); // 每个主机的并发
return poolingConnectionManager;
}
@Bean
public HttpClientBuilder httpClientBuilder() {
HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
httpClientBuilder.setConnectionManager(poolingConnectionManager());
return httpClientBuilder;
}
@Bean
public ClientHttpRequestFactory clientHttpRequestFactory() {
HttpComponentsClientHttpRequestFactory clientHttpRequestFactory = new HttpComponentsClientHttpRequestFactory();
clientHttpRequestFactory.setHttpClient(httpClientBuilder().build());
//clientHttpRequestFactory.setConnectTimeout(2000); // 连接超时
//clientHttpRequestFactory.setReadTimeout(2000); // 读写超时,毫秒
return clientHttpRequestFactory;
} }
} }
...@@ -17,7 +17,8 @@ public class ClusterController { ...@@ -17,7 +17,8 @@ public class ClusterController {
@RequestMapping(value = "/sendClusterConfigToEdgeNode", method = RequestMethod.POST) @RequestMapping(value = "/sendClusterConfigToEdgeNode", method = RequestMethod.POST)
public List<SeparatedClusterConfig> sendClusterConfigToEdgeNode(@RequestBody List<SeparatedClusterConfig> clusterConfigs) { public List<SeparatedClusterConfig> sendClusterConfigToEdgeNode(@RequestBody List<SeparatedClusterConfig> clusterConfigs) {
return cloudService.sendClusterConfigToEdgeNode(clusterConfigs); // return cloudService.sendClusterConfigToEdgeNode(clusterConfigs);
return cloudService.sendClusterConfigToEdgeNodeByThreads(clusterConfigs);
} }
@RequestMapping(value = "/removeClusterFromEdgeNode") @RequestMapping(value = "/removeClusterFromEdgeNode")
......
...@@ -18,6 +18,8 @@ public interface CloudService { ...@@ -18,6 +18,8 @@ public interface CloudService {
List<SeparatedClusterConfig> sendClusterConfigToEdgeNode(List<SeparatedClusterConfig> clusterConfigs); List<SeparatedClusterConfig> sendClusterConfigToEdgeNode(List<SeparatedClusterConfig> clusterConfigs);
List<SeparatedClusterConfig> sendClusterConfigToEdgeNodeByThreads(List<SeparatedClusterConfig> clusterConfigs);
boolean deleteClusterFromEdgeNode(long clusterId); boolean deleteClusterFromEdgeNode(long clusterId);
void updateLogicalTopology(long clusterId, NetworkTopology topology); void updateLogicalTopology(long clusterId, NetworkTopology topology);
......
...@@ -12,7 +12,7 @@ import top.ninwoo.common.entity.*; ...@@ -12,7 +12,7 @@ import top.ninwoo.common.entity.*;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.*;
@Service @Service
public class CloudServiceImpl implements CloudService { public class CloudServiceImpl implements CloudService {
...@@ -25,6 +25,10 @@ public class CloudServiceImpl implements CloudService { ...@@ -25,6 +25,10 @@ public class CloudServiceImpl implements CloudService {
private static final String REMOTE_IP_LIST_BY_APPNAME = "/getIpListByAppName?clusterId={clusterId}&appName={appName}"; private static final String REMOTE_IP_LIST_BY_APPNAME = "/getIpListByAppName?clusterId={clusterId}&appName={appName}";
private static final String ENABLE_NETWORK_MONITOR = "/enableContainerMonitor?clusterId={clusterId}&containerName={appName}"; private static final String ENABLE_NETWORK_MONITOR = "/enableContainerMonitor?clusterId={clusterId}&containerName={appName}";
private static final String CANCEL_NETWORK_MONITOR = "/cancelNetworkMonitor?clusterId={clusterId}&containerName={appName}"; private static final String CANCEL_NETWORK_MONITOR = "/cancelNetworkMonitor?clusterId={clusterId}&containerName={appName}";
private static final ExecutorService initEdgeExecutorPool = Executors.newFixedThreadPool(6 * 2);
private static final CompletionService<ThreadResult> initEdgeExcutorCompletionPool = new ExecutorCompletionService<>(initEdgeExecutorPool);
@Value("${bs.ipservice.url}") @Value("${bs.ipservice.url}")
private String ipServiceUrl; private String ipServiceUrl;
// 全部的逻辑拓扑 // 全部的逻辑拓扑
...@@ -220,6 +224,7 @@ public class CloudServiceImpl implements CloudService { ...@@ -220,6 +224,7 @@ public class CloudServiceImpl implements CloudService {
} }
List<String> allAppNames = new ArrayList<>(); List<String> allAppNames = new ArrayList<>();
// 这里应该做一个判断,edgeNode的ip是否可用,如果不可用,或者未设定,将随机挑选一个进行设置 // 这里应该做一个判断,edgeNode的ip是否可用,如果不可用,或者未设定,将随机挑选一个进行设置
// TODO:这里使用一个线程进行创建,加快docker启动速度
clusterConfigs.forEach(c -> { clusterConfigs.forEach(c -> {
if (c.getEdgeNodeId() == null || "".equals(c.getEdgeNodeId())) { if (c.getEdgeNodeId() == null || "".equals(c.getEdgeNodeId())) {
String edgeNode = chooseEdgeNodes(1).get(0); String edgeNode = chooseEdgeNodes(1).get(0);
...@@ -277,6 +282,132 @@ public class CloudServiceImpl implements CloudService { ...@@ -277,6 +282,132 @@ public class CloudServiceImpl implements CloudService {
return clusterConfigs; return clusterConfigs;
} }
@Override
public List<SeparatedClusterConfig> sendClusterConfigToEdgeNodeByThreads(List<SeparatedClusterConfig> clusterConfigs) {
if(clusterConfigs.size() == 0) {
LOG.warn("下发的集群配置为空。");
return clusterConfigs;
}
List<String> allAppNames = new ArrayList<>();
// 这里应该做一个判断,edgeNode的ip是否可用,如果不可用,或者未设定,将随机挑选一个进行设置
// TODO:这里使用一个线程进行创建,加快docker启动速度
clusterConfigs.forEach(c -> {
// 如果edgeId为空,随机挑选一个
if (c.getEdgeNodeId() == null || "".equals(c.getEdgeNodeId())) {
String edgeNode = chooseEdgeNodes(1).get(0);
c.setEdgeNodeId(edgeNode);
}
// 创建工作线程
InitEdgeThread initEdgeThread = new InitEdgeThread(restTemplate, c);
// 提交到工作线程中
initEdgeExcutorCompletionPool.submit(initEdgeThread);
});
// 获取返回值
for (int i = 0; i < clusterConfigs.size(); i++) {
try {
Future<ThreadResult> result = initEdgeExcutorCompletionPool.take();
ThreadResult threadResult = result.get();
SeparatedClusterConfig c = threadResult.getSeparatedClusterConfig();
if (threadResult.getResult() == null) {
continue;
}
// 更新配置
//c.setClusterConfig(response.getBody());
// TODO: 存储appName和containerId的映射
if (!allContainerIds.containsKey(c.getClusterConfig().getId())) {
allContainerIds.put(c.getClusterConfig().getId(), new HashMap<>());
}
Map<String, Map<String, Set<String>>> allNames = allContainerIds.get(c.getClusterConfig().getId());
if (!allNames.containsKey(c.getEdgeNodeId())) {
allNames.put(c.getEdgeNodeId(), new HashMap<>());
}
Map<String, Set<String>> edgeContainerIds = allNames.get(c.getEdgeNodeId());
Map<String, List<String>> nameToIds = threadResult.getResult();
nameToIds.forEach((name, v) -> {
if (!edgeContainerIds.containsKey(name)) {
edgeContainerIds.put(name, new HashSet<>());
}
edgeContainerIds.get(name).addAll(v);
});
// TODO: 对应需要设计一个删除这个配置的操作
// 构建初始化的逻辑拓扑
for (String appName : c.getClusterConfig().getTopology().getAppNames()) {
if (!appName.startsWith("br")) {
allAppNames.add(appName);
if (!allAppStatus.containsKey(c.getClusterConfig().getId())) {
allAppStatus.put(c.getClusterConfig().getId(), new HashMap<>());
}
// 保存appnames
Map<String, Set<String>> clusterAppConfig = allAppStatus.get(c.getClusterConfig().getId());
if (!clusterAppConfig.containsKey(appName)) {
clusterAppConfig.put(appName, new HashSet<>());
}
Set<String> edgeNodes = clusterAppConfig.get(appName);
edgeNodes.add(c.getEdgeNodeId());
}
}
if (!allClusterConfig.containsKey(c.getClusterConfig().getId())) {
allClusterConfig.put(c.getClusterConfig().getId(), new ArrayList<>());
}
allClusterConfig.get(c.getClusterConfig().getId()).add(c);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
allLogicalTopo.put(clusterConfigs.get(0).getClusterConfig().getId(), initNetworkTopology(allAppNames));
return clusterConfigs;
}
private static class ThreadResult {
private SeparatedClusterConfig separatedClusterConfig;
private Map result;
public ThreadResult(SeparatedClusterConfig separatedClusterConfig, Map result) {
this.separatedClusterConfig = separatedClusterConfig;
this.result = result;
}
public SeparatedClusterConfig getSeparatedClusterConfig() {
return separatedClusterConfig;
}
public Map getResult() {
return result;
}
}
private static class InitEdgeThread implements Callable<ThreadResult> {
private static final Logger LOG = LoggerFactory.getLogger(InitEdgeThread.class);
private RestTemplate restTemplate;
// 实际上为EdgeNode的Ip地址
private SeparatedClusterConfig separatedClusterConfig;
public InitEdgeThread(RestTemplate restTemplate, SeparatedClusterConfig separatedClusterConfig) {
this.restTemplate = new RestTemplate();
this.separatedClusterConfig = separatedClusterConfig;
}
@Override
public ThreadResult call() throws Exception {
LOG.info("边缘节点:{}正在初始化", separatedClusterConfig.getEdgeNodeId());
ResponseEntity<Map> response = restTemplate.postForEntity("http://" + separatedClusterConfig.getEdgeNodeId()
+ CREATE_CLUSTER, separatedClusterConfig.getClusterConfig(), Map.class);
if(!response.getStatusCode().is2xxSuccessful()) {
LOG.error("集群配置下发失败:{}:{}", separatedClusterConfig.getEdgeNodeId(), response.getBody());
return null;
}
LOG.info("边缘节点:{}初始化完成", separatedClusterConfig.getEdgeNodeId());
Map body = response.getBody();
LOG.info("执行完成!");
return new ThreadResult(separatedClusterConfig, body);
}
}
private NetworkTopology initNetworkTopology(List<String> allAppNames) { private NetworkTopology initNetworkTopology(List<String> allAppNames) {
NetworkTopology networkTopology = new NetworkTopology(); NetworkTopology networkTopology = new NetworkTopology();
networkTopology.setTopologyId(1); networkTopology.setTopologyId(1);
......
...@@ -30,7 +30,7 @@ public class BisheMultiNodeTests { ...@@ -30,7 +30,7 @@ public class BisheMultiNodeTests {
ArrayList<SeparatedClusterConfig> clusterConfigs = new ArrayList<>(); ArrayList<SeparatedClusterConfig> clusterConfigs = new ArrayList<>();
SeparatedClusterConfig separatedClusterConfig = new SeparatedClusterConfig(); SeparatedClusterConfig separatedClusterConfig = new SeparatedClusterConfig();
// TODO: 这个ID应该是从借口获取的 // TODO: 这个ID应该是从借口获取的
separatedClusterConfig.setEdgeNodeId("192.168.31.154:8081"); separatedClusterConfig.setEdgeNodeId("192.168.31.52:8081");
ClusterConfig clusterConfig = new ClusterConfig(); ClusterConfig clusterConfig = new ClusterConfig();
clusterConfig.setId(11111l); clusterConfig.setId(11111l);
clusterConfig.setOwner("joliu"); clusterConfig.setOwner("joliu");
...@@ -59,7 +59,7 @@ public class BisheMultiNodeTests { ...@@ -59,7 +59,7 @@ public class BisheMultiNodeTests {
clusterConfig.setDockers(cds); clusterConfig.setDockers(cds);
NetworkTopology topo = new NetworkTopology(); NetworkTopology topo = new NetworkTopology();
topo.setAppNames(new String[]{"APP1", "APP2", "br:ovs1", "br:remote:ovs2:192.168.31.16"}); topo.setAppNames(new String[]{"APP1", "APP2", "br:ovs1", "br:remote:ovs2:192.168.31.50"});
// 这个参数好像没啥用 // 这个参数好像没啥用
topo.setTopologyId(11); topo.setTopologyId(11);
topo.setTopology(new int[][]{{0,0,0,0},{0,0,0,0},{1,1,0,0},{0,0,1,0}}); topo.setTopology(new int[][]{{0,0,0,0},{0,0,0,0},{1,1,0,0},{0,0,1,0}});
...@@ -70,7 +70,7 @@ public class BisheMultiNodeTests { ...@@ -70,7 +70,7 @@ public class BisheMultiNodeTests {
// 构建第二个节点 // 构建第二个节点
SeparatedClusterConfig separatedClusterConfig1 = new SeparatedClusterConfig(); SeparatedClusterConfig separatedClusterConfig1 = new SeparatedClusterConfig();
// TODO: 这个ID应该是从借口获取的 // TODO: 这个ID应该是从借口获取的
separatedClusterConfig1.setEdgeNodeId("192.168.31.16:8081"); separatedClusterConfig1.setEdgeNodeId("192.168.31.50:8081");
ClusterConfig clusterConfig1 = new ClusterConfig(); ClusterConfig clusterConfig1 = new ClusterConfig();
clusterConfig1.setId(11111l); clusterConfig1.setId(11111l);
clusterConfig1.setOwner("joliu"); clusterConfig1.setOwner("joliu");
...@@ -99,7 +99,7 @@ public class BisheMultiNodeTests { ...@@ -99,7 +99,7 @@ public class BisheMultiNodeTests {
clusterConfig1.setDockers(cds1); clusterConfig1.setDockers(cds1);
NetworkTopology topo1 = new NetworkTopology(); NetworkTopology topo1 = new NetworkTopology();
topo1.setAppNames(new String[]{"APP3", "APP4", "br:ovs2", "br:remote:ovs1:192.168.31.154"}); topo1.setAppNames(new String[]{"APP3", "APP4", "br:ovs2", "br:remote:ovs1:192.168.31.52"});
// 这个参数好像没啥用 // 这个参数好像没啥用
topo1.setTopologyId(11); topo1.setTopologyId(11);
topo1.setTopology(new int[][]{{0,0,0,0},{0,0,0,0},{1,1,0,0},{0,0,1,0}}); topo1.setTopology(new int[][]{{0,0,0,0},{0,0,0,0},{1,1,0,0},{0,0,1,0}});
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment