Commit 8ea4e48b authored by wutu's avatar wutu

完成容器集群的动态扩缩容模块

parent 2e48bc41
...@@ -114,13 +114,6 @@ public class IndexController { ...@@ -114,13 +114,6 @@ public class IndexController {
return clusterService.getContainerIdsByClusterId(clusterId, containerName); return clusterService.getContainerIdsByClusterId(clusterId, containerName);
} }
@RequestMapping(value = "/createNetwork")
public String createNetwork(long clusterId, String appName) {
// TODO: 测试用
// 判断是否存在
return clusterService.createNetwork(clusterId, appName);
}
private static class NetworkMonitorThread implements Runnable { private static class NetworkMonitorThread implements Runnable {
private String containerId; private String containerId;
private ConcurrentHashMap<String, ContainerInfo> map; private ConcurrentHashMap<String, ContainerInfo> map;
...@@ -176,6 +169,11 @@ public class IndexController { ...@@ -176,6 +169,11 @@ public class IndexController {
return "success"; return "success";
} }
@RequestMapping(value = "/forwardTraffic")
public boolean forwardTraffic(String forwardContainer, String targetContainer, int port) {
return topologyService.forwardTraffic(forwardContainer, targetContainer, port);
}
private int[][] parseString(String topology) { private int[][] parseString(String topology) {
String[] lines = topology.split(";"); String[] lines = topology.split(";");
int x = lines.length; int x = lines.length;
...@@ -190,4 +188,14 @@ public class IndexController { ...@@ -190,4 +188,14 @@ public class IndexController {
} }
return topo; return topo;
} }
@RequestMapping(value = "/adjustCluster", method = RequestMethod.POST)
public String adjustCluster(@RequestBody ClusterConfig clusterConfig) {
// 判断id
if (clusterConfig.getId() == 0) {
return "没有设置clusterid";
}
String res = clusterService.adjustCluster(clusterConfig);
return res;
}
} }
\ No newline at end of file
...@@ -20,6 +20,10 @@ public interface ClusterService { ...@@ -20,6 +20,10 @@ public interface ClusterService {
// 创建容器组 // 创建容器组
long initContainers(ClusterConfig clusterConfig); long initContainers(ClusterConfig clusterConfig);
// 动态调整容器规模,涉及到网络的一个调整
String adjustCluster(ClusterConfig clusterConfig);
// 获取集群列表 // 获取集群列表
Set<Long> getClusterIds(); Set<Long> getClusterIds();
// 通过集群Id获取集群中包含的容器IDs // 通过集群Id获取集群中包含的容器IDs
......
...@@ -14,4 +14,6 @@ public interface IpService { ...@@ -14,4 +14,6 @@ public interface IpService {
* @return Ipv4的地址 * @return Ipv4的地址
*/ */
String assignIpString(String networkSegment, String containerId); String assignIpString(String networkSegment, String containerId);
String getContainerIp(String containerId);
} }
...@@ -18,4 +18,10 @@ public interface TopologyService { ...@@ -18,4 +18,10 @@ public interface TopologyService {
void initTopo(ClusterConfig clusterConfig); void initTopo(ClusterConfig clusterConfig);
void createTopo(long clusterId, NetworkTopology topology); void createTopo(long clusterId, NetworkTopology topology);
boolean forwardTraffic(String containerId1, String containerId2, int port);
NetworkTopology getTopology(Long clusterId);
void createLinkForNewer(Long clusterId, String appName, String containerId);
} }
...@@ -7,6 +7,7 @@ import org.springframework.stereotype.Service; ...@@ -7,6 +7,7 @@ import org.springframework.stereotype.Service;
import top.ninwoo.edgecenter.entity.ClusterConfig; import top.ninwoo.edgecenter.entity.ClusterConfig;
import top.ninwoo.edgecenter.entity.ContainerDescription; import top.ninwoo.edgecenter.entity.ContainerDescription;
import top.ninwoo.edgecenter.service.ClusterService; import top.ninwoo.edgecenter.service.ClusterService;
import top.ninwoo.edgecenter.service.TopologyService;
import top.ninwoo.utils.entity.DockerContainer; import top.ninwoo.utils.entity.DockerContainer;
import top.ninwoo.utils.entity.NetworkInfo; import top.ninwoo.utils.entity.NetworkInfo;
import top.ninwoo.utils.entity.OvsBridge; import top.ninwoo.utils.entity.OvsBridge;
...@@ -46,6 +47,9 @@ public class ClusterServiceImpl implements ClusterService { ...@@ -46,6 +47,9 @@ public class ClusterServiceImpl implements ClusterService {
@Autowired @Autowired
OSUtils osUtils; OSUtils osUtils;
@Autowired
TopologyService topologyService;
/** /**
* 定时更新容器列表 * 定时更新容器列表
*/ */
...@@ -121,6 +125,65 @@ public class ClusterServiceImpl implements ClusterService { ...@@ -121,6 +125,65 @@ public class ClusterServiceImpl implements ClusterService {
return clusterConfig.getId(); return clusterConfig.getId();
} }
// 动态调整容器规模,涉及到网络的一个调整
@Override
public String adjustCluster(ClusterConfig clusterConfig) {
// 这里要求集群的配置在调整之前必须存在
if(!clustersInfo.containsKey(clusterConfig.getId())) {
return "集群配置不存在,请先初始化";
}
// 拿到当前的集群情况
Map<String, Set<String>> apps = clustersInfo.get(clusterConfig.getId());
// 与新的配置文件进行一一比较
// TODO:这里实际上应该访问数据库,对比数据库中的配置数据
List<ContainerDescription> dockers = clusterConfig.getDockers();
for (ContainerDescription docker : dockers) {
String appName = docker.getDockerContainer().getName();
if(!apps.containsKey(appName)) {
// 如果不包含这个app的名称,暂时不支持创建新的节点
return "暂不支持创建新的应用集群!";
}
// 对比docker的容器数量
int change = docker.getReplicas() - apps.get(appName).size();
// 执行扩缩容
updateContainerScale(docker.getDockerContainer(), change, apps.get(appName), clusterConfig.getId());
}
return "success";
}
public void updateContainerScale(DockerContainer dockerContainer, int change, Set<String> cids, Long clusterId) {
// 备份原来的容器名称
String name = dockerContainer.getName();
if(change > 0) {
// 扩容
for (int i = 0; i < change; i++) {
// 先修改容器的名称
dockerContainer.setName(name + "_" + cids.size());
DockerContainer newContainer = dockerService.runDocker(dockerContainer);
cids.add(newContainer.getId());
dockerService.addNewContainerId(newContainer);
// 创建相同的网络
topologyService.createLinkForNewer(clusterId, name, newContainer.getId());
}
} else if(change < 0) {
// 缩容
int size = cids.size();
for (int i = 0; i < (-change); i++) {
// step1 通过容器名称进行删除
String cName = name + "_" + (size - 1 - i);
String cid = dockerService.deleteDockerByName(cName);
// 从容器列表中移除
cids.remove(cid);
// step 2 删除ovs上多余的网络
ovsDockerService.deleteContainerPorts(cid);
}
}
// 把docker容器配置恢复初始状态
dockerContainer.setName(name);
}
/** /**
* @description 保存配置文件到数据库 * @description 保存配置文件到数据库
* @param clusterConfig * @param clusterConfig
......
...@@ -31,4 +31,12 @@ public class IpServiceImpl implements IpService { ...@@ -31,4 +31,12 @@ public class IpServiceImpl implements IpService {
ipMap.put(containerId, ip); ipMap.put(containerId, ip);
return ip; return ip;
} }
@Override
public String getContainerIp(String containerId) {
if(!ipMap.containsKey(containerId)) {
throw new RuntimeException("docker容器并没有设置ip地址");
}
return ipMap.get(containerId);
}
} }
...@@ -9,10 +9,12 @@ import top.ninwoo.edgecenter.entity.NetworkTopology; ...@@ -9,10 +9,12 @@ import top.ninwoo.edgecenter.entity.NetworkTopology;
import top.ninwoo.edgecenter.service.ClusterService; import top.ninwoo.edgecenter.service.ClusterService;
import top.ninwoo.edgecenter.service.IpService; import top.ninwoo.edgecenter.service.IpService;
import top.ninwoo.edgecenter.service.TopologyService; import top.ninwoo.edgecenter.service.TopologyService;
import top.ninwoo.utils.service.IptablesService;
import top.ninwoo.utils.service.OVSService; import top.ninwoo.utils.service.OVSService;
import top.ninwoo.utils.service.OvsDockerService; import top.ninwoo.utils.service.OvsDockerService;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/** /**
* @Author joliu * @Author joliu
...@@ -22,6 +24,8 @@ import java.util.Set; ...@@ -22,6 +24,8 @@ import java.util.Set;
@Service @Service
public class TopologyServiceImpl implements TopologyService { public class TopologyServiceImpl implements TopologyService {
private final static Logger LOG = LoggerFactory.getLogger(TopologyServiceImpl.class); private final static Logger LOG = LoggerFactory.getLogger(TopologyServiceImpl.class);
// 用于存储集群的拓扑
private ConcurrentHashMap<Long, NetworkTopology> clustersTopo = new ConcurrentHashMap<>();
@Autowired @Autowired
OVSService ovsService; OVSService ovsService;
...@@ -36,6 +40,9 @@ public class TopologyServiceImpl implements TopologyService { ...@@ -36,6 +40,9 @@ public class TopologyServiceImpl implements TopologyService {
@Autowired @Autowired
IpService ipService; IpService ipService;
@Autowired
IptablesService iptablesService;
@Override @Override
public void initTopo(ClusterConfig clusterConfig) { public void initTopo(ClusterConfig clusterConfig) {
// 获取topology // 获取topology
...@@ -66,6 +73,7 @@ public class TopologyServiceImpl implements TopologyService { ...@@ -66,6 +73,7 @@ public class TopologyServiceImpl implements TopologyService {
} }
} }
} }
clustersTopo.put(clusterId, topology);
} }
public void addLink(long clusterId, String appName1, String appName2) { public void addLink(long clusterId, String appName1, String appName2) {
...@@ -153,4 +161,75 @@ public class TopologyServiceImpl implements TopologyService { ...@@ -153,4 +161,75 @@ public class TopologyServiceImpl implements TopologyService {
throw new RuntimeException("Physical topology does not support c2c link."); throw new RuntimeException("Physical topology does not support c2c link.");
} }
} }
/**
* 将从container1中的流量转发到container2中
* @param containerId1
* @param containerId2
* @param port
* @return
*/
@Override
public boolean forwardTraffic(String containerId1, String containerId2, int port) {
// 查询到container1的ip地址
String ip1 = ipService.getContainerIp(containerId1);
// 查询到container2的ip地址
String ip2 = ipService.getContainerIp(containerId2);
// 在container1中设置转发
return iptablesService.forwardTraffic(containerId1, ip1, port, ip2, port);
}
@Override
public NetworkTopology getTopology(Long clusterId) {
if(!clustersTopo.containsKey(clusterId)) {
return null;
}
return clustersTopo.get(clusterId);
}
/**
* 为新加入的容器创建网络
* @param clusterId
* @param appName
* @param containerId
*/
@Override
public void createLinkForNewer(Long clusterId, String appName, String containerId) {
// 找到topo
if(!clustersTopo.containsKey(clusterId)) {
LOG.warn("clusterId[" + clusterId + "] 拓扑不存在!");
return;
}
NetworkTopology networkTopology = clustersTopo.get(clusterId);
String[] appNames = networkTopology.getAppNames();
// 判断appName的坐标
int index = 0;
for (String name : appNames) {
if(name.equals(appName)) {
break;
}
index++;
}
if(index == appNames.length) {
LOG.warn("appName[" + appName + "]不存在!");
return;
}
// 遍历第topo[index]行,根据连接关系创建网络
int[][] topology = networkTopology.getTopology();
for (int i = 0; i < topology[index].length; i++) {
if(topology[i][index] == 1) {
// 创建连接
// 一端已经确定是容器
if(appNames[i].startsWith("br:")) {
String ovsName = appNames[i].substring(3);
// 添加和交换机的网络链接
String ip = ipService.assignIpString("10.10.1.0/24", containerId);
// ovsDockerService.addPort(ovsName, "eth1", containerName, "ip");
ovsDockerService.addPort(ovsName, "eth1", containerId, ip);
}
// 容器跟容器不能相连, 容器跟远端交换机也不能相连
}
}
}
} }
...@@ -18,6 +18,8 @@ public class ChainEntity { ...@@ -18,6 +18,8 @@ public class ChainEntity {
private String destination; private String destination;
private ChainType type; private ChainType type;
private String more;
private ChainEntity() { private ChainEntity() {
} }
...@@ -33,6 +35,7 @@ public class ChainEntity { ...@@ -33,6 +35,7 @@ public class ChainEntity {
this.target = builder.target; this.target = builder.target;
this.type = builder.type; this.type = builder.type;
this.id = builder.id; this.id = builder.id;
this.more = builder.more;
} }
public static class Builder { public static class Builder {
...@@ -46,7 +49,7 @@ public class ChainEntity { ...@@ -46,7 +49,7 @@ public class ChainEntity {
private String source; private String source;
private String destination; private String destination;
private String opt; private String opt;
private String more;
private ChainType type; private ChainType type;
public Builder() { public Builder() {
...@@ -146,6 +149,16 @@ public class ChainEntity { ...@@ -146,6 +149,16 @@ public class ChainEntity {
return this.type; return this.type;
} }
public String more() {
return this.more;
}
public Builder more(String more) {
this.more = more;
return this;
}
public Builder type(ChainType type) { public Builder type(ChainType type) {
this.type = type; this.type = type;
return this; return this;
...@@ -241,6 +254,14 @@ public class ChainEntity { ...@@ -241,6 +254,14 @@ public class ChainEntity {
this.type = type; this.type = type;
} }
public String getMore() {
return more;
}
public void setMore(String more) {
this.more = more;
}
@Override @Override
public String toString() { public String toString() {
return "ChainEntity{" + return "ChainEntity{" +
......
...@@ -6,7 +6,8 @@ package top.ninwoo.utils.entity; ...@@ -6,7 +6,8 @@ package top.ninwoo.utils.entity;
* @Date Create in 下午10:47 2019/10/28 * @Date Create in 下午10:47 2019/10/28
*/ */
public enum ChainType { public enum ChainType {
INPUT("INPUT"), FORWARD("FORWARD"), OUTPUT("OUTPUT"); INPUT("INPUT"), FORWARD("FORWARD"), OUTPUT("OUTPUT"),
PREROUTING("PREROUTING"), POSTROUTING("POSTROUTING");
private String name; private String name;
ChainType(String name) { ChainType(String name) {
this.name = name; this.name = name;
......
...@@ -9,6 +9,8 @@ import java.util.Map; ...@@ -9,6 +9,8 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
public interface DockerService { public interface DockerService {
void addNewContainerId(DockerContainer newContainer);
/** /**
* 启动一个Docker容器 * 启动一个Docker容器
* @param container * @param container
...@@ -38,6 +40,8 @@ public interface DockerService { ...@@ -38,6 +40,8 @@ public interface DockerService {
*/ */
int getContainersNumber(boolean isAll); int getContainersNumber(boolean isAll);
String deleteDockerByName(String name);
/** /**
* 通过容器id关闭容器 * 通过容器id关闭容器
* @param id * @param id
......
...@@ -17,6 +17,10 @@ public interface IptablesService { ...@@ -17,6 +17,10 @@ public interface IptablesService {
List<ChainEntity> getIptablesByContainerId(String containerId, ChainType chainType); List<ChainEntity> getIptablesByContainerId(String containerId, ChainType chainType);
List<ChainEntity> getIptablesByContainerId(String containerId, ChainType chainType, String type);
ChainEntity getIptablesByContainerId(String containerId, ChainType chainType, int lineNumber);
void checkCache(String containerId); void checkCache(String containerId);
void deleteIptable(String containerId, ChainEntity chainEntity); void deleteIptable(String containerId, ChainEntity chainEntity);
...@@ -24,4 +28,6 @@ public interface IptablesService { ...@@ -24,4 +28,6 @@ public interface IptablesService {
void deleteIptablesFromList(String containerId, ChainEntity chainEntity); void deleteIptablesFromList(String containerId, ChainEntity chainEntity);
void deleteIptableById(String containerId, ChainType type, int lineNumber); void deleteIptableById(String containerId, ChainType type, int lineNumber);
boolean forwardTraffic(String containerId, String selfIp, int selfPort, String toIp, int toPort);
} }
...@@ -16,4 +16,6 @@ public interface OVSService { ...@@ -16,4 +16,6 @@ public interface OVSService {
boolean linkOvs(String br1, String br2); boolean linkOvs(String br1, String br2);
boolean setVxlan(String bridgeName, String remoteIp); boolean setVxlan(String bridgeName, String remoteIp);
boolean delBridgePort(String bridgeName, String portName);
} }
package top.ninwoo.utils.service; package top.ninwoo.utils.service;
import java.util.Set;
/** /**
* @author joliu * @author joliu
* @date 2019-10-16 * @date 2019-10-16
*/ */
public interface OvsDockerService { public interface OvsDockerService {
String addPort(String bridgeName, String devName, String containerId, String ip); String addPort(String bridgeName, String devName, String containerId, String ip);
Set<String> getContainerPorts(String containerId);
void deleteContainerPorts(String containerId);
} }
\ No newline at end of file
...@@ -31,7 +31,12 @@ public class DockerServiceImpl implements DockerService, InitializingBean { ...@@ -31,7 +31,12 @@ public class DockerServiceImpl implements DockerService, InitializingBean {
// 停止的Docker // 停止的Docker
private ConcurrentHashMap<String, DockerContainer> allContainers = new ConcurrentHashMap<>(); private ConcurrentHashMap<String, DockerContainer> allContainers = new ConcurrentHashMap<>();
private ReentrantLock lock = new ReentrantLock(); // 可能有线程安全问题
@Override
public void addNewContainerId(DockerContainer newContainer) {
runingContainers.put(newContainer.getId(), newContainer);
allContainers.put(newContainer.getId(), newContainer);
}
/** /**
* 将DockerClient获取的到结果封装成为我们自定义的数据结构 * 将DockerClient获取的到结果封装成为我们自定义的数据结构
...@@ -88,6 +93,16 @@ public class DockerServiceImpl implements DockerService, InitializingBean { ...@@ -88,6 +93,16 @@ public class DockerServiceImpl implements DockerService, InitializingBean {
return dockerContainer; return dockerContainer;
} }
@Override
public String deleteDockerByName(String name) {
String cid = dockerUtils.getDockerIdByName(name);
if("".equals(cid)) {
throw new RuntimeException("不存在这个容器名称[" + name + "]");
}
dockerUtils.deleteDockerById(name);
return cid;
}
@Override @Override
public boolean deleteDockerById(String id) { public boolean deleteDockerById(String id) {
return dockerUtils.deleteDockerById(id); return dockerUtils.deleteDockerById(id);
......
...@@ -24,31 +24,52 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; ...@@ -24,31 +24,52 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
@Service @Service
public class IptablesServiceImpl implements IptablesService { public class IptablesServiceImpl implements IptablesService {
// 以容器为单位维护着每个容器的iptables // 以容器为单位维护着每个容器的iptables
// 这里里边只是维护了filter的表结构
private ConcurrentHashMap<String, Map<String, List<ChainEntity>>> allIptables private ConcurrentHashMap<String, Map<String, List<ChainEntity>>> allIptables
= new ConcurrentHashMap<>(); = new ConcurrentHashMap<>();
// 维护nat表结构
private ConcurrentHashMap<String, Map<String, List<ChainEntity>>> natIpatbles
= new ConcurrentHashMap<>();
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
@Autowired @Autowired
IptablesUtils iptablesUtils; IptablesUtils iptablesUtils;
// 这个服务中不需要进行任务的定时更新,默认其他人无法访问docker容器中的配置 // 这个服务中不需要进行任务的定时更新,默认其他人无法访问docker容器中的配置
/** /**
* 根据容器Id查询全部的iptables信息 * 根据容器Id查询全部的iptables信息,默认查询的是Filter
* 这里的容器id,要求必须存在,需要在调用前进行验证 * 这里的容器id,要求必须存在,需要在调用前进行验证
* @param containerId * @param containerId
* @return * @return
*/ */
@Override @Override
public Map<String, List<ChainEntity>> getIptablesByContainerId(String containerId) { public Map<String, List<ChainEntity>> getIptablesByContainerId(String containerId) {
checkCache(containerId); return getIpatblesByContainerId(containerId, "filter");
}
/**
* 查看iptables信息
* @param containerId
* @param type
* @return
*/
public Map<String, List<ChainEntity>> getIpatblesByContainerId(String containerId, String type) {
Map<String, List<ChainEntity>> res = null;
try { try {
rwLock.readLock().lock(); rwLock.readLock().lock();
return allIptables.get(containerId); switch (type) {
case "filter":
res = allIptables.get(containerId);
break;
case "nat":
res = natIpatbles.get(containerId);
break;
}
} finally { } finally {
rwLock.readLock().unlock(); rwLock.readLock().unlock();
} }
return res;
} }
/** /**
...@@ -59,16 +80,30 @@ public class IptablesServiceImpl implements IptablesService { ...@@ -59,16 +80,30 @@ public class IptablesServiceImpl implements IptablesService {
*/ */
@Override @Override
public List<ChainEntity> getIptablesByContainerId(String containerId, ChainType chainType) { public List<ChainEntity> getIptablesByContainerId(String containerId, ChainType chainType) {
return getIptablesByContainerId(containerId, chainType, "filter");
}
@Override
public List<ChainEntity> getIptablesByContainerId(String containerId, ChainType chainType, String type) {
List<ChainEntity> res = null;
checkCache(containerId); checkCache(containerId);
try { try {
rwLock.readLock().lock(); rwLock.readLock().lock();
return allIptables.get(containerId).get(chainType.toString()); switch (type) {
case "filter":
res = allIptables.get(containerId).get(chainType.toString());
break;
case "nat":
res = natIpatbles.get(containerId).get(chainType.toString());
break;
}
} finally { } finally {
rwLock.readLock().unlock(); rwLock.readLock().unlock();
} }
return res;
} }
@Override
public ChainEntity getIptablesByContainerId(String containerId, ChainType chainType, int lineNumber) { public ChainEntity getIptablesByContainerId(String containerId, ChainType chainType, int lineNumber) {
checkCache(containerId); checkCache(containerId);
List<ChainEntity> chains = getIptablesByContainerId(containerId, chainType); List<ChainEntity> chains = getIptablesByContainerId(containerId, chainType);
...@@ -94,6 +129,14 @@ public class IptablesServiceImpl implements IptablesService { ...@@ -94,6 +129,14 @@ public class IptablesServiceImpl implements IptablesService {
} }
} }
} }
if(!natIpatbles.containsKey(containerId)) {
synchronized (natIpatbles) {
if(!natIpatbles.containsKey(containerId)) {
Map<String, List<ChainEntity>> nat = iptablesUtils.showIptablesDetail(containerId, "nat");
natIpatbles.put(containerId, nat);
}
}
}
} }
/** /**
...@@ -177,7 +220,7 @@ public class IptablesServiceImpl implements IptablesService { ...@@ -177,7 +220,7 @@ public class IptablesServiceImpl implements IptablesService {
* @param chainEntity * @param chainEntity
*/ */
private void insertIntoIptables(String containerId, ChainEntity chainEntity) { private void insertIntoIptables(String containerId, ChainEntity chainEntity) {
List<ChainEntity> chainEntities = allIptables.get(containerId).get(chainEntity.getType()); List<ChainEntity> chainEntities = allIptables.get(containerId).get(chainEntity.getType().toString());
chainEntities.forEach(c -> {c.setId(c.getId() + 1);}); chainEntities.forEach(c -> {c.setId(c.getId() + 1);});
chainEntities.add(0, chainEntity); chainEntities.add(0, chainEntity);
} }
...@@ -197,7 +240,7 @@ public class IptablesServiceImpl implements IptablesService { ...@@ -197,7 +240,7 @@ public class IptablesServiceImpl implements IptablesService {
iptablesUtils.addIptable(containerId, "append", chainEntity.getType(), iptablesUtils.addIptable(containerId, "append", chainEntity.getType(),
chainEntity.getSource(), chainEntity.getDestination(), chainEntity.getTarget()); chainEntity.getSource(), chainEntity.getDestination(), chainEntity.getTarget());
// 再添加alliptables最后一个 // 再添加alliptables最后一个
List<ChainEntity> chainEntities = allIptables.get(containerId).get(chainEntity.getType()); List<ChainEntity> chainEntities = allIptables.get(containerId).get(chainEntity.getType().toString());
chainEntity.setId(chainEntities.size()); chainEntity.setId(chainEntities.size());
chainEntities.add(chainEntity); chainEntities.add(chainEntity);
} finally { } finally {
...@@ -205,4 +248,92 @@ public class IptablesServiceImpl implements IptablesService { ...@@ -205,4 +248,92 @@ public class IptablesServiceImpl implements IptablesService {
} }
return chainEntity; return chainEntity;
} }
/**
* 转发流量到其他容器
* @param containerId 执行转发操作的容器id
* @param selfIp 执行转发操作的容器ip
* @param selfPort 执行转发操作的容器端口
* @param toIp 转发的目的ip地址
* @param toPort 转发的目的端口
* @return
*/
@Override
public boolean forwardTraffic(String containerId, String selfIp, int selfPort, String toIp, int toPort) {
/**
*
Chain PREROUTING (policy ACCEPT 0 packets, 0 bytes)
pkts bytes target prot opt in out source destination
1 60 DNAT tcp -- * * 0.0.0.0/0 172.17.0.3 tcp dpt:5001 to:172.17.0.4:5001
Chain INPUT (policy ACCEPT 0 packets, 0 bytes)
pkts bytes target prot opt in out source destination
Chain POSTROUTING (policy ACCEPT 0 packets, 0 bytes)
pkts bytes target prot opt in out source destination
1 60 SNAT tcp -- * * 0.0.0.0/0 172.17.0.4 tcp dpt:5001 to:172.17.0.3
Chain OUTPUT (policy ACCEPT 0 packets, 0 bytes)
pkts bytes target prot opt in out source destination
*/
// TODO: 这个实现借口还需要进一步的封装,这里采用一种不是很合理的结构创建
// 正向
selfIp = selfIp.substring(0, selfIp.length() - 3);
toIp = toIp.substring(0, toIp.length() - 3);
String preRouting = "iptables -t nat -A PREROUTING -d " + selfIp + " -p tcp --dport "
+ selfPort + " -j DNAT --to-destination " + toIp + ":" + toPort;
// 回路
String postRouting = "iptables -t nat -A POSTROUTING -d " + toIp + " -p tcp --dport " + toPort + " -j SNAT --to-source " + selfIp;
// 下发iptables
String s = iptablesUtils.supperiseCmd(containerId, preRouting);
if (!"".equals(s)) {
System.out.println(s);
return false;
}
// 这里构建一个iptables
ChainEntity chainEntity = new ChainEntity.Builder()
.type(ChainType.PREROUTING)
.target("DNAT")
.prot("tcp")
.opt("--")
.in("*")
.out("*")
.source("0.0.0.0/0")
.destination(selfIp)
.more("tcp dpt:" + selfIp + " to:" + toIp + ":" + toPort)
.build();
checkCache(containerId);
List<ChainEntity> chainEntities = natIpatbles.get(containerId).get(chainEntity.getType().toString());
chainEntity.setId(chainEntities.size());
chainEntities.add(chainEntity);
// 插入到natIptables中
s = iptablesUtils.supperiseCmd(containerId, postRouting);
if(!"".equals(s)) {
System.out.println(s);
return false;
}
// 构建一个iptables
// 插入到natIptables中
//pkts bytes target prot opt in out source destination
// 1 60 SNAT tcp -- * * 0.0.0.0/0 172.17.0.4 tcp dpt:5001 to:172.17.0.3
ChainEntity chainEntity2 = new ChainEntity.Builder()
.type(ChainType.POSTROUTING)
.target("DNAT")
.prot("tcp")
.opt("--")
.in("*")
.out("*")
.source("0.0.0.0/0")
.destination(selfIp)
.more("tcp dpt:" + selfIp + " to:" + toIp + ":" + toPort)
.build();
List<ChainEntity> chainEntities2 = natIpatbles.get(containerId).get(chainEntity2.getType().toString());
chainEntity2.setId(chainEntities2.size());
chainEntities2.add(chainEntity2);
return true;
}
} }
...@@ -186,11 +186,25 @@ public class OVSServiceImpl implements OVSService, InitializingBean { ...@@ -186,11 +186,25 @@ public class OVSServiceImpl implements OVSService, InitializingBean {
@Override @Override
public boolean setVxlan(String bridgeName, String remoteIp) { public boolean setVxlan(String bridgeName, String remoteIp) {
// TODO: 最好再校验一下RemoteIp是否合法 // TODO: 最好再校验一下RemoteIp是否合法
if(!isBridge(bridgeName)) { if (!isBridge(bridgeName)) {
LOG.warn("bridge[" + bridgeName + "] 不存在"); LOG.warn("bridge[" + bridgeName + "] 不存在");
return false; return false;
} }
return ovsUtils.setVxlan(bridgeName, remoteIp); return ovsUtils.setVxlan(bridgeName, remoteIp);
} }
/**
* 删除ovs port
* @param bridgeName
* @param portName
* @return
*/
@Override
public boolean delBridgePort(String bridgeName, String portName) {
if(!isBridge(bridgeName)) {
LOG.warn("bridge[" + bridgeName + "]不存在");
return false;
}
return ovsUtils.delBridgePort(bridgeName, portName);
}
} }
...@@ -8,8 +8,15 @@ import top.ninwoo.utils.service.OvsDockerService; ...@@ -8,8 +8,15 @@ import top.ninwoo.utils.service.OvsDockerService;
import top.ninwoo.utils.util.LinuxCtlUtils; import top.ninwoo.utils.util.LinuxCtlUtils;
import top.ninwoo.utils.util.OvsDockerUtils; import top.ninwoo.utils.util.OvsDockerUtils;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@Service @Service
public class OvsDockerServiceImpl implements OvsDockerService { public class OvsDockerServiceImpl implements OvsDockerService {
// 在这里维护docker 和 ovs port之间的关联
private ConcurrentHashMap<String, Set<String>> dockerPorts = new ConcurrentHashMap<>();
@Autowired @Autowired
OvsDockerUtils ovsDockerUtils; OvsDockerUtils ovsDockerUtils;
...@@ -29,6 +36,37 @@ public class OvsDockerServiceImpl implements OvsDockerService { ...@@ -29,6 +36,37 @@ public class OvsDockerServiceImpl implements OvsDockerService {
throw new RuntimeException("容器[" + containerId + "]不存在"); throw new RuntimeException("容器[" + containerId + "]不存在");
} }
return ovsDockerUtils.addPort(bridgeName, devName, containerId, ip); String portId = ovsDockerUtils.addPort(bridgeName, devName, containerId, ip);
if (!dockerPorts.containsKey(containerId)) {
dockerPorts.put(containerId, new HashSet<String>());
}
dockerPorts.get(containerId).add(bridgeName + ":" + portId);
return portId;
}
/**
* 获取docker的ovs网络端口
* @param containerId
* @return
*/
@Override
public Set<String> getContainerPorts(String containerId) {
if(!dockerPorts.containsKey(containerId)) {
return new HashSet<>();
}
return dockerPorts.get(containerId);
}
@Override
public void deleteContainerPorts(String containerId) {
// 获取docker端口
Set<String> containerPorts = getContainerPorts(containerId);
containerPorts.forEach( p -> {
String br = p.split(":")[0];
String port = p.split(":")[1];
ovsService.delBridgePort(br, port);
});
} }
} }
...@@ -40,4 +40,6 @@ public interface DockerUtils { ...@@ -40,4 +40,6 @@ public interface DockerUtils {
String execInDocker(String containerId, String... args); String execInDocker(String containerId, String... args);
boolean startContainer(String containerId); boolean startContainer(String containerId);
String getDockerIdByName(String name);
} }
...@@ -15,12 +15,16 @@ import java.util.Map; ...@@ -15,12 +15,16 @@ import java.util.Map;
public interface IptablesUtils { public interface IptablesUtils {
Map<String, List<ChainEntity>> showIptablesDetail(String containerId); Map<String, List<ChainEntity>> showIptablesDetail(String containerId);
Map<String, List<ChainEntity>> showIptablesDetail(String containerId, String type);
String addIptable(String containerId, String kind, ChainType chainType, String source, String destination, String policy); String addIptable(String containerId, String kind, ChainType chainType, String source, String destination, String policy);
String delIptable(String containerId, ChainType chainType, String source, String destination, String policy); String delIptable(String containerId, ChainType chainType, String source, String destination, String policy);
String delIptable(String containerId, TableType tableType, ChainType chainType, int lineNumber); String delIptable(String containerId, TableType tableType, ChainType chainType, int lineNumber);
String supperiseCmd(String containerId, String cmd);
boolean modifyIptable(String containerId, ChainType chainType, String source, String destination, String policy, int lineNumber); boolean modifyIptable(String containerId, ChainType chainType, String source, String destination, String policy, int lineNumber);
boolean flushIptables(String containerId, ChainType chainType); boolean flushIptables(String containerId, ChainType chainType);
......
...@@ -59,12 +59,7 @@ public interface OvsUtils { ...@@ -59,12 +59,7 @@ public interface OvsUtils {
*/ */
boolean addBridgePort(String bridgeName, String port); boolean addBridgePort(String bridgeName, String port);
/** boolean delBridgePort(String bridgeName, String port);
* 删除指定的端口
* @param bridgeName
* @param port
*/
boolean delBridgePort(String bridgeName, int port);
String[] createVethPair(String br1, String br2); String[] createVethPair(String br1, String br2);
......
...@@ -195,4 +195,16 @@ public class DockerUtilsImpl implements DockerUtils { ...@@ -195,4 +195,16 @@ public class DockerUtilsImpl implements DockerUtils {
} }
return false; return false;
} }
@Override
public String getDockerIdByName(String name) {
try {
return dockerClient.inspectContainer(name).id();
} catch (DockerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return "";
}
} }
...@@ -30,6 +30,11 @@ public class IptablesUtilsImpl implements IptablesUtils { ...@@ -30,6 +30,11 @@ public class IptablesUtilsImpl implements IptablesUtils {
@Override @Override
public Map<String, List<ChainEntity>> showIptablesDetail(String containerId) { public Map<String, List<ChainEntity>> showIptablesDetail(String containerId) {
return showIptablesDetail(containerId, "filter");
}
@Override
public Map<String, List<ChainEntity>> showIptablesDetail(String containerId, String type) {
/** /**
* Chain INPUT (policy ACCEPT 0 packets, 0 bytes) * Chain INPUT (policy ACCEPT 0 packets, 0 bytes)
* pkts bytes target prot opt in out source destination * pkts bytes target prot opt in out source destination
...@@ -45,7 +50,19 @@ public class IptablesUtilsImpl implements IptablesUtils { ...@@ -45,7 +50,19 @@ public class IptablesUtilsImpl implements IptablesUtils {
map.put("INPUT", new ArrayList<>()); map.put("INPUT", new ArrayList<>());
map.put("FORWARD", new ArrayList<>()); map.put("FORWARD", new ArrayList<>());
map.put("OUTPUT", new ArrayList<>()); map.put("OUTPUT", new ArrayList<>());
String result = dockerUtils.execInDocker(containerId, new String[]{"iptables", "--line", "-vnL"}); map.put("PREROUTING", new ArrayList<>());
map.put("POSTROUTING", new ArrayList<>());
String cmd = "";
switch (type) {
case "filter":
cmd = "iptables --lines -vnL";
break;
case "nat":
cmd = "iptables -t nat --lines -vnL";
break;
}
String result = dockerUtils.execInDocker(containerId, cmd.split(" "));
String[] lines = result.split("\n"); String[] lines = result.split("\n");
ChainEntity.Builder builder = ChainEntity.builder(); ChainEntity.Builder builder = ChainEntity.builder();
String key = ""; String key = "";
...@@ -74,7 +91,7 @@ public class IptablesUtilsImpl implements IptablesUtils { ...@@ -74,7 +91,7 @@ public class IptablesUtilsImpl implements IptablesUtils {
String[] s = lines[i].replaceAll(" +", " ").trim().split(" "); String[] s = lines[i].replaceAll(" +", " ").trim().split(" ");
ChainEntity chain; ChainEntity chain;
if(s.length == 10) { if(s.length >= 10) {
chain = builder.id(Integer.parseInt(s[0])) chain = builder.id(Integer.parseInt(s[0]))
.pkts(Long.parseLong(s[1])) .pkts(Long.parseLong(s[1]))
.bytes(Long.parseLong(s[2])) .bytes(Long.parseLong(s[2]))
...@@ -86,6 +103,13 @@ public class IptablesUtilsImpl implements IptablesUtils { ...@@ -86,6 +103,13 @@ public class IptablesUtilsImpl implements IptablesUtils {
.source(s[8]) .source(s[8])
.destination(s[9]) .destination(s[9])
.build(); .build();
if(s.length > 10) {
StringBuilder more = new StringBuilder();
for (int j = 10; j < s.length ; j++) {
more.append(s[j]).append(" ");
}
chain.setMore(more.toString());
}
map.get(key).add(chain); map.get(key).add(chain);
} }
...@@ -114,6 +138,17 @@ public class IptablesUtilsImpl implements IptablesUtils { ...@@ -114,6 +138,17 @@ public class IptablesUtilsImpl implements IptablesUtils {
return basicCommand(containerId, null, "delete", chainType, "", "", "", lineNumber); return basicCommand(containerId, null, "delete", chainType, "", "", "", lineNumber);
} }
/**
* 这是一个临时偷懒的借口
* @param containerId
* @param cmd
* @return
*/
@Override
public String supperiseCmd(String containerId, String cmd) {
return dockerUtils.execInDocker(containerId, cmd.split(" "));
}
/** /**
* 修改table条目 * 修改table条目
* @param containerId * @param containerId
......
...@@ -180,7 +180,7 @@ public class OvsUtilsImpl implements OvsUtils { ...@@ -180,7 +180,7 @@ public class OvsUtilsImpl implements OvsUtils {
} }
@Override @Override
public boolean delBridgePort(String bridgeName, int port) { public boolean delBridgePort(String bridgeName, String port) {
String cmd = "echo 'Vudo3423' | sudo -S ovs-vsctl del-port " + bridgeName + " " + port; String cmd = "echo 'Vudo3423' | sudo -S ovs-vsctl del-port " + bridgeName + " " + port;
String res = linuxCtlUtils.runCmd(cmd); String res = linuxCtlUtils.runCmd(cmd);
if(!"".equals(res)) { if(!"".equals(res)) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment