Commit 1adf8cee authored by wutu's avatar wutu

完善容器监控模块

parent 9ad2cfc3
...@@ -3,19 +3,18 @@ package top.ninwoo.edgecenter.controller; ...@@ -3,19 +3,18 @@ package top.ninwoo.edgecenter.controller;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import top.ninwoo.edgecenter.entity.ClusterConfig; import top.ninwoo.edgecenter.entity.ClusterConfig;
import top.ninwoo.edgecenter.entity.ContainerInfo;
import top.ninwoo.edgecenter.service.ClusterService; import top.ninwoo.edgecenter.service.ClusterService;
import top.ninwoo.utils.entity.DockerContainer; import top.ninwoo.utils.entity.DockerContainer;
import top.ninwoo.utils.entity.NetworkInfo; import top.ninwoo.utils.entity.NetworkInfo;
import top.ninwoo.utils.entity.OvsBridge; import top.ninwoo.utils.entity.OvsBridge;
import java.util.ArrayList; import java.util.*;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*; import java.util.concurrent.*;
@RestController @RestController
public class IndexController { public class IndexController {
ConcurrentHashMap<String, NetworkInfo> networkInfoMap = new ConcurrentHashMap<>(); ConcurrentHashMap<String, ContainerInfo> containerInfoMap = new ConcurrentHashMap<>();
ConcurrentHashMap<String, Future> scheduledFutureMap = new ConcurrentHashMap<>(); ConcurrentHashMap<String, Future> scheduledFutureMap = new ConcurrentHashMap<>();
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(8); ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(8);
...@@ -42,25 +41,25 @@ public class IndexController { ...@@ -42,25 +41,25 @@ public class IndexController {
* @return * @return
*/ */
// 这里一次查询需要两秒,需要采用异步进行查询 // 这里一次查询需要两秒,需要采用异步进行查询
@RequestMapping("/networkMonitor") @RequestMapping("/containerMonitor")
public List<NetworkInfo> getNetworkMonitor(@RequestParam("clusterId") Long clusterId,@RequestParam("containerId") String containerId) { public List<ContainerInfo> getContainerMonitor(@RequestParam("clusterId") Long clusterId,@RequestParam("containerId") String containerId) {
// 获取全部的容器id // 获取全部的容器id
Set<String> cids = clusterService.getContainerIdsByClusterId(clusterId, containerId); Set<String> cids = clusterService.getContainerIdsByClusterId(clusterId, containerId);
// 根据容器id获取全部的 // 根据容器id获取全部的
//List<NetworkInfo> networkTraffic = clusterService.getNetworkTraffic(containerIdsByClusterId); //List<NetworkInfo> networkTraffic = clusterService.getNetworkTraffic(containerIdsByClusterId);
List<NetworkInfo> networkInfos = new ArrayList<>(); List<ContainerInfo> containerInfos = new ArrayList<>();
cids.forEach( cid -> { cids.forEach( cid -> {
if(networkInfoMap.containsKey(cid)) { if(containerInfoMap.containsKey(cid)) {
NetworkInfo networkInfo = networkInfoMap.get(cid); ContainerInfo networkInfo = containerInfoMap.get(cid);
networkInfos.add(networkInfo); containerInfos.add(networkInfo);
} }
// TODO: 这里还可以再添加新的处理方式 // TODO: 这里还可以再添加新的处理方式
}); });
return networkInfos; return containerInfos;
} }
@RequestMapping("/enableNetworkMonitor") @RequestMapping("/enableContainerMonitor")
public boolean enableNetworkMonitor(long clusterId, String containerName) { public boolean enableNetworkMonitor(long clusterId, String containerName) {
// 先获取全部的容器id // 先获取全部的容器id
Set<String> cids = clusterService.getContainerIdsByClusterId(clusterId, containerName); Set<String> cids = clusterService.getContainerIdsByClusterId(clusterId, containerName);
...@@ -69,9 +68,9 @@ public class IndexController { ...@@ -69,9 +68,9 @@ public class IndexController {
} }
cids.forEach(cid -> { cids.forEach(cid -> {
// 为每一个cid容器创建一个定时任务 // 为每一个cid容器创建一个定时任务
NetworkMonitorThread networkMonitorThread = new NetworkMonitorThread(cid, networkInfoMap, clusterService); NetworkMonitorThread networkMonitorThread = new NetworkMonitorThread(cid, containerInfoMap, clusterService);
// 放入线程池执行任务 // 放入线程池执行任务
ScheduledFuture<?> future = scheduledExecutorService.schedule(networkMonitorThread, 1, TimeUnit.SECONDS); ScheduledFuture<?> future = scheduledExecutorService.scheduleAtFixedRate(networkMonitorThread, 0, 1, TimeUnit.SECONDS);
// 保存任务,以方便进行定时任务的关闭 // 保存任务,以方便进行定时任务的关闭
scheduledFutureMap.put(cid, future); scheduledFutureMap.put(cid, future);
}); });
...@@ -79,6 +78,23 @@ public class IndexController { ...@@ -79,6 +78,23 @@ public class IndexController {
return true; return true;
} }
@RequestMapping("/cancelNetworkMonitor")
public boolean cancelNetworkMonitor(long clusterId, String containerName) {
// 先获取全部的容器id
Set<String> cids = clusterService.getContainerIdsByClusterId(clusterId, containerName);
if(cids == null || cids.isEmpty()) {
return false;
}
cids.forEach(cid -> {
scheduledFutureMap.get(cid).cancel(true);
scheduledFutureMap.remove(cid);
containerInfoMap.remove(cid);
});
return true;
}
@RequestMapping(value = "/createCluster", method = RequestMethod.POST) @RequestMapping(value = "/createCluster", method = RequestMethod.POST)
public long createCluster(@RequestBody ClusterConfig clusterConfig) { public long createCluster(@RequestBody ClusterConfig clusterConfig) {
...@@ -93,12 +109,19 @@ public class IndexController { ...@@ -93,12 +109,19 @@ public class IndexController {
return clusterService.getContainerIdsByClusterId(clusterId, containerName); return clusterService.getContainerIdsByClusterId(clusterId, containerName);
} }
@RequestMapping(value = "/createNetwork")
public String createNetwork(long clusterId, String appName) {
// TODO: 测试用
// 判断是否存在
return clusterService.createNetwork(clusterId, appName);
}
private static class NetworkMonitorThread implements Runnable { private static class NetworkMonitorThread implements Runnable {
private String containerId; private String containerId;
private ConcurrentHashMap<String, NetworkInfo> map; private ConcurrentHashMap<String, ContainerInfo> map;
private ClusterService clusterService; private ClusterService clusterService;
public NetworkMonitorThread(String containerId, ConcurrentHashMap<String, NetworkInfo> map, ClusterService clusterService) { public NetworkMonitorThread(String containerId, ConcurrentHashMap<String, ContainerInfo> map, ClusterService clusterService) {
this.containerId = containerId; this.containerId = containerId;
this.map = map; this.map = map;
this.clusterService = clusterService; this.clusterService = clusterService;
...@@ -107,11 +130,26 @@ public class IndexController { ...@@ -107,11 +130,26 @@ public class IndexController {
@Override @Override
public void run() { public void run() {
NetworkInfo networkInfo = clusterService.getNetworkInfoByContainerId(containerId); NetworkInfo networkInfo = clusterService.getNetworkInfoByContainerId(containerId);
map.put(containerId, networkInfo); String cpu = clusterService.cpu(containerId);
String mem = clusterService.mem(containerId);
ContainerInfo containerInfo = new ContainerInfo();
containerInfo.setCpu(cpu);
containerInfo.setId(containerId);
containerInfo.setNetworkInfo(networkInfo);
containerInfo.setMem(mem);
map.put(containerId, containerInfo);
} }
public void setContainerId(String containerId) { public void setContainerId(String containerId) {
this.containerId = containerId; this.containerId = containerId;
} }
} }
@RequestMapping("/edgecenter")
public Map<String, String> edgeCenter() {
HashMap<String, String> edgeInfo = new HashMap<>();
edgeInfo.put("cpu", clusterService.cpu());
edgeInfo.put("mem", clusterService.mem());
return edgeInfo;
}
} }
package top.ninwoo.edgecenter.entity;
import lombok.Data;
import top.ninwoo.utils.entity.NetworkInfo;
/**
* @Author joliu
* @Description
* @Date Create in 下午4:36 2019/11/6
*/
@Data
public class ContainerInfo {
private String id;
private NetworkInfo networkInfo;
private String cpu;
private String mem;
}
...@@ -35,4 +35,16 @@ public interface ClusterService { ...@@ -35,4 +35,16 @@ public interface ClusterService {
List<NetworkInfo> getNetworkTraffic(Set<String> containerIds); List<NetworkInfo> getNetworkTraffic(Set<String> containerIds);
NetworkInfo getNetworkInfoByContainerId(String containerId); NetworkInfo getNetworkInfoByContainerId(String containerId);
// TODO: 临时的服务接口,待移除
String createNetwork(long clusterId, String appName);
String cpu();
String mem();
String cpu(String containerId);
String mem(String containerId);
} }
...@@ -12,10 +12,13 @@ import top.ninwoo.utils.entity.NetworkInfo; ...@@ -12,10 +12,13 @@ import top.ninwoo.utils.entity.NetworkInfo;
import top.ninwoo.utils.entity.OvsBridge; import top.ninwoo.utils.entity.OvsBridge;
import top.ninwoo.utils.service.DockerService; import top.ninwoo.utils.service.DockerService;
import top.ninwoo.utils.service.OVSService; import top.ninwoo.utils.service.OVSService;
import top.ninwoo.utils.service.OvsDockerService;
import top.ninwoo.utils.service.TcService; import top.ninwoo.utils.service.TcService;
import top.ninwoo.utils.util.OSUtils;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* @author joliu * @author joliu
...@@ -36,6 +39,13 @@ public class ClusterServiceImpl implements ClusterService { ...@@ -36,6 +39,13 @@ public class ClusterServiceImpl implements ClusterService {
@Autowired @Autowired
TcService tcService; TcService tcService;
// TODO: 这个接口临时使用
@Autowired
OvsDockerService ovsDockerService;
@Autowired
OSUtils osUtils;
/** /**
* 定时更新容器列表 * 定时更新容器列表
*/ */
...@@ -208,8 +218,71 @@ public class ClusterServiceImpl implements ClusterService { ...@@ -208,8 +218,71 @@ public class ClusterServiceImpl implements ClusterService {
try { try {
return tcService.networkUsage(containerId); return tcService.networkUsage(containerId);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); // 查询已经中断
LOG.info("[" + containerId + "]网络监控以中断");
} }
return null; return null;
} }
// TODO: 临时的服务接口,待移除
@Override
public String createNetwork(long clusterId, String appName) {
if(!clustersInfo.containsKey(clusterId)) {
return "failed!";
}
if(!clustersInfo.get(clusterId).containsKey(appName)) {
return "failed!";
}
Set<String> containerIdsByClusterId = getContainerIdsByClusterId(clusterId, appName);
ovsService.addBridge(appName + "br");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
int i = 2;
for (String cid : containerIdsByClusterId) {
ovsDockerService.addPort(appName + "br", "eth1", cid, "10.0.1." + (i++) + "/24");
}
return "success!";
}
/**
* 查询边缘服务器CPU情况
* @return
*/
@Override
public String cpu() {
return osUtils.cpuUsage() + "%";
}
/**
* 查询边缘服务器的内存使用情况
* @return
*/
@Override
public String mem() {
return osUtils.memoryUsage() + "%";
}
/**
* 查询Docker cpu使用情况
* @param containerId
* @return
*/
@Override
public String cpu(String containerId) {
return osUtils.cpuUsage(containerId) + "%";
}
@Override
public String mem(String containerId) {
return osUtils.memoryUsage(containerId) + "%";
}
} }
\ No newline at end of file
...@@ -63,6 +63,15 @@ public class ClusterServiceTest { ...@@ -63,6 +63,15 @@ public class ClusterServiceTest {
Assert.assertTrue(clusterService.getClusterIds().contains(clusterId)); Assert.assertTrue(clusterService.getClusterIds().contains(clusterId));
System.out.println(clusterService.getContainerIdsByClusterId(clusterId, "APP")); System.out.println(clusterService.getContainerIdsByClusterId(clusterId, "APP"));
clusterService.createNetwork(clusterId, "APP");
}
@Test
public void testCreateOvsNetwork() {
// TODO: 这里只是使用了底层的网络组件,还应该使用配置文件格式的参数创建网络
// 创建一个ovs网桥
// 创建一个
} }
@After @After
......
...@@ -123,6 +123,7 @@ public class TcServiceImpl implements TcService { ...@@ -123,6 +123,7 @@ public class TcServiceImpl implements TcService {
networkInfo.setCurRate(curRate); networkInfo.setCurRate(curRate);
networkInfo.setDropRate(dropRate); networkInfo.setDropRate(dropRate);
networkInfo.setErrorRate(errorRate); networkInfo.setErrorRate(errorRate);
networkInfo.setContainerId(containerId);
} }
return networkInfo; return networkInfo;
} }
......
...@@ -10,7 +10,13 @@ public interface OSUtils { ...@@ -10,7 +10,13 @@ public interface OSUtils {
int cpuUsage(); int cpuUsage();
int cpuUsage(String containerId);
int cpuUsage(boolean inDocker, String containerId);
int memoryUsage(); int memoryUsage();
int memoryUsage(String containerId);
float networkUsage(); float networkUsage();
} }
package top.ninwoo.utils.util.impl; package top.ninwoo.utils.util.impl;
import org.apache.commons.io.FileSystemUtils; import org.apache.commons.io.FileSystemUtils;
import org.springframework.beans.factory.annotation.Autowired;
import top.ninwoo.utils.util.DockerUtils;
import top.ninwoo.utils.util.OSUtils; import top.ninwoo.utils.util.OSUtils;
import top.ninwoo.utils.util.Utils; import top.ninwoo.utils.util.Utils;
...@@ -14,6 +16,9 @@ import java.util.*; ...@@ -14,6 +16,9 @@ import java.util.*;
*/ */
@Utils @Utils
public class OSUtilsImpl implements OSUtils { public class OSUtilsImpl implements OSUtils {
@Autowired
DockerUtils dockerUtils;
float TotalBandwidth = 20; float TotalBandwidth = 20;
/** /**
* 查看宿主机硬盘资源 * 查看宿主机硬盘资源
...@@ -31,15 +36,32 @@ public class OSUtilsImpl implements OSUtils { ...@@ -31,15 +36,32 @@ public class OSUtilsImpl implements OSUtils {
return 0; return 0;
} }
@Override
public int cpuUsage() {
return cpuUsage(false, "");
}
@Override
public int cpuUsage(String containerId) {
return cpuUsage(true, containerId);
}
/** /**
* 功能:获取Linux系统cpu使用率 * 功能:获取Linux系统cpu使用率
* @param inDocker 是否在容器中
* @param containerId 容器id
* */ * */
@Override @Override
public int cpuUsage() { public int cpuUsage(boolean inDocker, String containerId) {
try { try {
Map<?, ?> map1 = cpuinfo(); Map<?, ?> map1 = inDocker ? cpuinfoInDocker(containerId) : cpuinfo();
// 这里防止容器不存在,map数组为空
if(map1.size() == 0) {
return -1;
}
Thread.sleep(5 * 1000); Thread.sleep(5 * 1000);
Map<?, ?> map2 = cpuinfo(); Map<?, ?> map2 = inDocker ? cpuinfoInDocker(containerId) : cpuinfo();
long user1 = Long.parseLong(map1.get("user").toString()); long user1 = Long.parseLong(map1.get("user").toString());
long nice1 = Long.parseLong(map1.get("nice").toString()); long nice1 = Long.parseLong(map1.get("nice").toString());
...@@ -118,6 +140,65 @@ public class OSUtilsImpl implements OSUtils { ...@@ -118,6 +140,65 @@ public class OSUtilsImpl implements OSUtils {
return 0; return 0;
} }
@Override
public int memoryUsage(String containerId) {
Map<String, Object> map = new HashMap<String, Object>();
String res = dockerUtils.execInDocker(containerId,"cat /proc/meminfo");
String[] lines = res.split("\n");
for (String line : lines) {
int beginIndex = 0;
int endIndex = line.indexOf(":");
if (endIndex != -1) {
String key = line.substring(beginIndex, endIndex);
beginIndex = endIndex + 1;
endIndex = line.length();
String memory = line.substring(beginIndex, endIndex);
String value = memory.replace("kB", "").trim();
map.put(key, value);
}
}
if(map.size() == 0) {
return 0;
}
long memTotal = Long.parseLong(map.get("MemTotal").toString());
long memFree = Long.parseLong(map.get("MemFree").toString());
long memused = memTotal - memFree;
long buffers = Long.parseLong(map.get("Buffers").toString());
long cached = Long.parseLong(map.get("Cached").toString());
double usage = (double) (memused - buffers - cached) / memTotal * 100;
return (int) usage;
}
public Map<?, ?> cpuinfoInDocker(String containerId) {
Map<String, Object> map = new HashMap<String, Object>();
String res = dockerUtils.execInDocker(containerId, "cat /proc/stat");
String[] lines = res.split("\n");
for (int i = 0; i < lines.length; i++) {
String line = lines[i];
if (line.startsWith("cpu")) {
StringTokenizer tokenizer = new StringTokenizer(line);
List<String> temp = new ArrayList<String>();
while (tokenizer.hasMoreElements()) {
String value = tokenizer.nextToken();
temp.add(value);
}
map.put("user", temp.get(1));
map.put("nice", temp.get(2));
map.put("system", temp.get(3));
map.put("idle", temp.get(4));
map.put("iowait", temp.get(5));
map.put("irq", temp.get(6));
map.put("softirq", temp.get(7));
map.put("stealstolen", temp.get(8));
break;
}
}
return map;
}
public Map<?, ?> cpuinfo() { public Map<?, ?> cpuinfo() {
InputStreamReader inputs = null; InputStreamReader inputs = null;
BufferedReader buffer = null; BufferedReader buffer = null;
...@@ -241,4 +322,4 @@ public class OSUtilsImpl implements OSUtils { ...@@ -241,4 +322,4 @@ public class OSUtilsImpl implements OSUtils {
} }
return netUsage; return netUsage;
} }
} }
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment