Commit 5582836a authored by ymwangya's avatar ymwangya

hadoop

parent b42b9fdd
...@@ -12,6 +12,11 @@ ...@@ -12,6 +12,11 @@
<artifactId>dbc-business-client</artifactId> <artifactId>dbc-business-client</artifactId>
<dependencies> <dependencies>
<dependency>
<groupId>top.ninwoo</groupId>
<artifactId>cnf-client-starter</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency> <dependency>
<groupId>top.ninwoo</groupId> <groupId>top.ninwoo</groupId>
<artifactId>dbc-commom-api</artifactId> <artifactId>dbc-commom-api</artifactId>
......
...@@ -126,6 +126,7 @@ public class FileService{ ...@@ -126,6 +126,7 @@ public class FileService{
index = this.getIndex(ipList.length); index = this.getIndex(ipList.length);
//设置切片的数量
int sliceNum = 5; int sliceNum = 5;
// todo 这里需要制定文件的fileId // todo 这里需要制定文件的fileId
......
...@@ -36,18 +36,14 @@ public class FileServiceImplement implements DistributedComService { ...@@ -36,18 +36,14 @@ public class FileServiceImplement implements DistributedComService {
} }
//对分割后的碎片,按照其数据量大小排序 //对分割后的碎片,按照其数据量大小排序
for (int i = 0; i < fileSliceList.size()-1; i++) { /* for (int i = 0; i < fileSliceList.size()-1; i++) {
for (int j = 0; j < fileSliceList.size()-i-1; j++) { for (int j = 0; j < fileSliceList.size()-i-1; j++) {
if (fileSliceList.get(j).getFileBytes().length<fileSliceList.get(j+1).getFileBytes().length) { if (fileSliceList.get(j).getFileBytes().length<fileSliceList.get(j+1).getFileBytes().length) {
/* int temp = fileSliceList.get(j).getSliceId();
nums[j] = nums[j+1];
nums[j+1] = temp;*/
fileSliceList.set(j,fileSliceList.get(j+1)); fileSliceList.set(j,fileSliceList.get(j+1));
fileSliceList.set(j+1,fileSliceList.get(j)); fileSliceList.set(j+1,fileSliceList.get(j));
} }
} }
} }*/
SplitResult splitResult = new SplitResult(); SplitResult splitResult = new SplitResult();
splitResult.setFileSliceList(fileSliceList); splitResult.setFileSliceList(fileSliceList);
return splitResult; return splitResult;
......
...@@ -28,6 +28,11 @@ ...@@ -28,6 +28,11 @@
<dependencyManagement> <dependencyManagement>
<dependencies> <dependencies>
<dependency>
<groupId>top.ninwoo</groupId>
<artifactId>cnf-client-starter</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency> <dependency>
<groupId>top.ninwoo</groupId> <groupId>top.ninwoo</groupId>
<artifactId>dbc-commom-api</artifactId> <artifactId>dbc-commom-api</artifactId>
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cnf-hadoop-computing</artifactId>
<groupId>top.ninwoo</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cnf-hadoop-api</artifactId>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package top.ninwoo.hadoop.api.po;
import lombok.Data;
import java.util.List;
@Data
public class ComputingResult {
private List<FileSliceComputing> fileSliceComputingList;
}
package top.ninwoo.hadoop.api.po;
import lombok.Data;
/**
* @author ymwang
* 文件的切片类
*/
@Data
public class FileSlice {
// 文件的唯一id
private Long fileId;
// 切片的id,唯一且有顺序要求
private int sliceId;
// 文件的比特数据
private byte[] fileBytes;
}
package top.ninwoo.hadoop.api.po;
import lombok.Data;
/**
* @author ymwang
* 文件的切片类
*/
@Data
public class FileSliceComputing {
// 文件的唯一id
private Long fileId;
// 切片的id,唯一且有顺序要求
private int sliceId;
// 文件的比特数据
private byte[] fileBytes;
}
package top.ninwoo.hadoop.api.po;
import lombok.Data;
import java.awt.image.BufferedImage;
@Data
public class MergeResult {
//private byte[] fileBytes;
private BufferedImage images;
}
package top.ninwoo.hadoop.api.po;
import lombok.Data;
import java.util.List;
@Data
public class SendResult {
//发送到云端的文件的包装
//private List<FileCloudSlice> fileCloudSliceList;
}
package top.ninwoo.hadoop.api.po;
import lombok.Data;
import java.util.List;
@Data
public class SplitResult {
private List<FileSlice> fileSliceList;
}
package top.ninwoo.hadoop.api.service;
import top.ninwoo.hadoop.api.po.*;
import java.awt.image.BufferedImage;
public interface HadoopComService {
/**
* 文件切片
* @param bufferedImage 图片
* @param sliceNum 切片数量
* @return
*/
SplitResult fileSplit(BufferedImage bufferedImage, int sliceNum);
//SplitResult fileSplit(String fileName, int sliceNum);
/**
* 切片处理
* @param splitResult
* @return
*/
ComputingResult sliceComputing(SplitResult splitResult);
/**
* 切片聚合接口
* @param computingResult
* @param sliceNum 切片数量
* @return
*/
MergeResult sliceMerge(ComputingResult computingResult, int sliceNum);
/**
* 切片发送服务
* @param targetIp
* @param fileSlice
* @return
*/
SendResult sendFileSlice(String targetIp, FileSlice fileSlice);
/**
* 指定地址的ip获取切片
* @param targetIp
* @param fileId
* @param sliceId
* @return
*/
FileSlice getFileSlice(String targetIp, Long fileId, int sliceId);
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cnf-hadoop-computing</artifactId>
<groupId>top.ninwoo</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cnf-hadoop-client</artifactId>
<dependencies>
<dependency>
<groupId>top.ninwoo</groupId>
<artifactId>cnf-client-starter</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>top.ninwoo</groupId>
<artifactId>cnf-hadoop-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>top.ninwoo</groupId>
<artifactId>cnf-hadoop-utils</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-thymeleaf -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<!--这里写上main方法所在类的路径-->
<configuration>
<mainClass>top.ninwoo.hadoop.client.HadoopClientStarter</mainClass>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package top.ninwoo.hadoop.client;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
//这个注解告诉我们,这是一个springboot项目
@SpringBootApplication
@EnableAsync
public class HadoopClientStarter {
public static void main(String[] args) {
SpringApplication.run(HadoopClientStarter.class,args);
}
}
package top.ninwoo.hadoop.client.configure;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@EnableAsync // 开启异步调用 多线程
public class AsyncTaskConfig{
@Bean("taskExecutor")
public Executor taskExecutor() {
// 新建一个任务执行器
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(10);////核心线程池大小
taskExecutor.setMaxPoolSize(30);// 设置最大的线程数量
taskExecutor.setQueueCapacity(25);// 等待队列
taskExecutor.initialize();// 如果不初始化,导致找不到执行器
return taskExecutor;
}
}
package top.ninwoo.hadoop.client.configure;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;
import top.ninwoo.hadoop.api.service.HadoopComService;
import top.ninwoo.hadoop.utils.FileServiceImplement;
@Configuration
public class ClientConfigure {
@Primary
@Bean(name="fileRestTemplates")
public RestTemplate restTemplate(ClientHttpRequestFactory factory){
return new RestTemplate(factory);
}
@Bean
public ClientHttpRequestFactory simpleClientHttpRequestFactory(){
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
factory.setReadTimeout(5000);//单位为ms
factory.setConnectTimeout(5000);//单位为ms
return factory;
}
@Bean
public HadoopComService hadoopComService() {
return new FileServiceImplement();
}
}
package top.ninwoo.hadoop.client.configure;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
@Configuration
public class StaticResourceConfiguration extends WebMvcConfigurerAdapter {
@Override
public void addResourceHandlers(ResourceHandlerRegistry registry) {
registry.addResourceHandler("/file/**").addResourceLocations("file:/tmp/static/");
//registry.addResourceHandler("/file/**").addResourceLocations("file:/F:/resources/static/");
super.addResourceHandlers(registry);
}
}
package top.ninwoo.hadoop.client.controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import top.ninwoo.hadoop.api.po.FileSlice;
import top.ninwoo.hadoop.client.service.FileService;
import javax.annotation.Resource;
@RestController
public class ClientController {
/* @RequestMapping("/helloClient")
@ResponseBody
public String helloClient(){
return "hello,client";
}*/
@Resource
private FileService fileService;
@GetMapping("test1")
public String sendData(String name) {
FileSlice fileSlice = new FileSlice();
fileSlice.setFileBytes(name.getBytes());
fileSlice.setFileId(1111L);
String result = fileService.sendFileSlice("127.0.0.1:8080", fileSlice);
return result;
}
/* @GetMapping("sendFile")
public String sendFile(String name, Long fileId) {
BufferedImage image = fileService.readImage(name);
return fileService.sendFile(fileId, image);
}*/
@GetMapping("getFile")
public String getFile(String name, Long fileId) {
return fileService.getFile(name, fileId);
}
}
package top.ninwoo.hadoop.client.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.multipart.MultipartFile;
import top.ninwoo.hadoop.client.service.FileService;
import javax.imageio.ImageIO;
import java.awt.image.BufferedImage;
import java.io.FileNotFoundException;
import java.io.IOException;
@Controller
public class FileController {
@Autowired
private FileService fileService;
@Value("${dbc.directory.output}")
private String directoryOutput;
/*文件上传*/
// 访问路径为:http://ip:port/upload
@RequestMapping(value = "/upload",method = RequestMethod.GET)
public String upload() {
return "fileUpload";
}
@RequestMapping(value = "/upload", method = RequestMethod.POST)
@ResponseBody
public String upload(@RequestParam("file") MultipartFile file, @RequestParam("fileId") Long fileId) {
if (!file.isEmpty()) {
try {
//BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(new File(file.getOriginalFilename())));
//FileOutputStream out = new FileOutputStream(new File(file.getOriginalFilename()));
BufferedImage image = ImageIO.read(file.getInputStream());
fileService.sendFile(fileId,image);
/*out.flush();
out.close();*/
} catch (FileNotFoundException e) {
e.printStackTrace();
return "上传失败," + e.getMessage();
} catch (IOException e) {
e.printStackTrace();
return "上传失败," + e.getMessage();
}
return "上传成功!";
} else {
return "上传失败,因为文件是空的.";
}
}
/*文件下载*/
//访问路径为:http://ip:port/download?name=x.jpg&fileId=1111
@RequestMapping("download")
public String view(String name, Long fileId, Model model) {
fileService.getFile(name, fileId);
model.addAttribute("fileName", name);
return "fileDownload";
}
}
package top.ninwoo.hadoop.client.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import top.ninwoo.hadoop.api.po.ComputingResult;
import top.ninwoo.hadoop.api.po.FileSlice;
import top.ninwoo.hadoop.api.po.MergeResult;
import top.ninwoo.hadoop.api.po.SplitResult;
import top.ninwoo.hadoop.api.service.HadoopComService;
import javax.imageio.ImageIO;
import java.awt.image.BufferedImage;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.*;
//import top.ninwoo.bishe.starter.service.NetworkService;
/**
* 文件服务
*/
@Service
public class FileService {
@Value("${dbc.directory.output}")
private String directoryOutput;
@Qualifier(value="fileRestTemplates")
@Autowired
private RestTemplate restTemplate;
@Autowired
private HadoopComService hadoopComService;
//不需要了,我懒得改
Long clusterId = 11113L;//docker集群ID
String[] wxName_list = new String[]{"sate1","sate2","sate3","sate4","sate5","sate6","sate7"};//获取卫星名字
private String[] ipList = new String[29];
private List <Integer> index = new ArrayList<Integer>() {} ;
/* @PostConstruct
public void init() {
*//* ipList = getIpList(11113l,"dbc_server");
System.out.println(Arrays.toString(ipList));*//*
for (int k = 0; k < wxName_list.length; k++){
ipList[k] = getIpList(clusterId,wxName_list[k]);//通过卫星名字获取卫星ip
}
}
*/
/*获取容器ip*/
/* String[] ipList = new String[]{"127.0.0.1:8080","127.0.0.1:8081","127.0.0.1:8082","127.0.0.1:8083","127.0.0.1:8084",
"127.0.0.1:8085","127.0.0.1:8086"};*/
/* public String[] getIpList(Long clusterId, String appName) {
List<String> ipListS = networkService.getIpListByAppName(clusterId, appName);//用于存储容器ip的list集合
String[] containerIp = new String[ipListS.size()];//将集合中的元素存在数组里,因为下边用的是数组
for (int i = 0; i < ipListS.size(); i++) {
if(!ipListS.isEmpty()){
String ip_tmp = ipListS.get(i);
String rep = ip_tmp.replaceAll("[\\[\\]]","");
String[] split_list = rep.split("/");
containerIp[i] = split_list[0]+":8082";
}
}
return containerIp;
}*/
/* public String getIpByAppName(Long clusterId,String appName) {
List<String> ipList = networkService.getIpListByAppName(clusterId, appName);
if(!ipList.isEmpty()){
String ip_tmp = ipList.get(0);
String[] split_list = ip_tmp.split("/");
return split_list[0] + ":8082";
}
return null;
}*/
//todo 可能会有找不到文件的情况
public String getIpByAppName(Long clusterId,String appName) {
String str;
File file = new File(directoryOutput+"sateIP.txt");
try{
BufferedReader br = new BufferedReader(new FileReader(file));
while((str = br.readLine()) != null){
if (str.startsWith(appName)){
String sateIp = str.split(":")[1];
return sateIp + ":8082";
}
}
}catch(Exception e){
e.printStackTrace();
}
return null;
}
/**
* 发送文件到分布式节点
* @param fileId
* @param bufferedImage
* @return
*/
@Async("taskExecutor")
public String sendFile(Long fileId, BufferedImage bufferedImage) {
// 通过集群服务器接口获取当前集群的节点数量
//int sliceNum = ipList.length;
//ipList = getIpList(11113l,"dbc_server");
for (int i = 0; i < wxName_list.length; i++){
ipList[i] = getIpByAppName(clusterId,wxName_list[i]);//通过卫星名字获取卫星ip
}
//查看ip
System.out.println(ipList.length);
System.out.println(Arrays.toString(ipList));
index = this.getIndex(ipList.length);
//设置切片的数量
int sliceNum = 5;
// todo 这里需要制定文件的fileId
SplitResult splitResult = hadoopComService.fileSplit(bufferedImage, sliceNum);
//int ipIndex = 0;
int i = 0;
for (FileSlice slice : splitResult.getFileSliceList()) {
slice.setFileId(fileId);
//异常处理
while(true){
try {
sendFileSlice(ipList[index.get(i)], slice);
//
System.out.println("send "+wxName_list[index.get(i)]+" success");
i = i+1;
//ipIndex = (ipIndex + 1) % ipList.length;
break;
}catch (Exception e){
//ipIndex = (ipIndex + 1) % ipList.length;
//
System.out.println("send "+wxName_list[index.get(i)]+" failed");
}
}
}
return "success";
/*for (FileSlice slice : splitResult.getFileSliceList()) {
slice.setFileId(fileId);
String result = sendFileSlice(ipList[ipIndex], slice);
ipIndex = (ipIndex + 1) % ipList.length;
if(result.equals("failed")) {
return "fail";
}
}
return "success";*/
}
// 发送文件切片到目标地址
public String sendFileSlice(String targetIp, FileSlice fileSlice) {
ResponseEntity<String> response = restTemplate.postForEntity("http://" + targetIp + "/data/put/", fileSlice, String.class);
if (!response.getStatusCode().is2xxSuccessful()) {
return "failed!";
}
return response.getBody();
}
public Set<FileSlice> getFileSlice(String targetIp, Long fileId) {
FileSlice[] fileSlices = restTemplate.getForObject("http://" + targetIp + "/data/get/" + fileId, FileSlice[].class);
HashSet<FileSlice> fileSliceSet = new HashSet<>();
fileSliceSet.addAll(Arrays.asList(fileSlices));
return fileSliceSet;
}
public String getFile(String fileName, Long fileId) {
//ipList = getIpList(11113l,"dbc_server");
for (int i = 0; i < wxName_list.length; i++){
ipList[i] = getIpByAppName(clusterId,wxName_list[i]);//通过卫星名字获取卫星ip
}
int sliceNum = 5;
//int sliceNum = ipList.length;
Set<FileSlice> result = new HashSet<>();
/* for (String ip : ipList) {
Set<FileSlice> fileSliceSet = getFileSlice(ip, fileId); //将fileId=1的碎片收回,HashSet无序收回
result.addAll(fileSliceSet);
}*/
//异常处理
for (int k = 0; k < 5; k++) {
try {
Set<FileSlice> fileSliceSet = getFileSlice(ipList[index.get(k)], fileId);
System.out.println("recive "+wxName_list[index.get(k)]+" success");
result.addAll(fileSliceSet);
}catch (Exception e){
}
}
List<FileSlice> list = new ArrayList<>(result);
SplitResult splitResult = new SplitResult();
splitResult.setFileSliceList(sortList(list));
//merge
ComputingResult computingResult = hadoopComService.sliceComputing(splitResult);
MergeResult mergeResult = hadoopComService.sliceMerge(computingResult,sliceNum);
//save
try {
saveFile(fileName, mergeResult);
return "success";
} catch (Exception e) {
e.printStackTrace();
return "fail";
}
}
//给List集合里的元素进行排序,便于图片正常恢复
public List<FileSlice> sortList(List<FileSlice> list){
//List<FileSlice> list = new ArrayList<FileSlice>();
for (int i = 0; i < list.size(); i++) {
for (int j = list .size()-1; j > i; j--) {
int no= list.get(j).getSliceId();
int no_1= list.get(j-1).getSliceId();
if (no<no_1) {
//互换位置
FileSlice fil = list.get(j);
list.set(j, list.get(j-1));
list.set(j-1, fil );
}
}
}
return list;
}
//对分割后得到的碎片List按照碎片的大小进行排序---使用冒泡排序
/* public SplitResult sortSpiltList(){
return null;
}*/
public void saveFile(String fileName, MergeResult mergeResult) {
BufferedImage images = mergeResult.getImages();
//输出拼接后的图像
try {
ImageIO.write(images, "jpg", new File(directoryOutput + fileName));
} catch (IOException e) {
e.printStackTrace();
}
}
public List <Integer> getIndex(int num){
Random r = new Random();
List <Integer> v = new ArrayList<Integer>() {} ;
int count = 0;
while(count < 5){
int number = r.nextInt(num) + 1;
if(!v.contains(number)){
//不在集合中,就添加
v.add(number);
count++;
}
}
return v;
}
}
<!DOCTYPE html>
<html lang="en" xmlns="http://www.w3.org/1999/xhtml" xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="UTF-8">
<title>文件显示</title>
</head>
<body>
<hr/>
<!--<a href="jpg/1.jpg">预览图片</a>-->
<!--<a href="@{/getFile/(fileName=${fileName},fileId=${fileId})}">预览图片</a>-->
<img th:src="file+'/'+${fileName}">
</body>
</html>
<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="UTF-8">
<title>文件上传</title>
</head>
<body>
<hr/>
<form method="POST" enctype="multipart/form-data" action="/upload" id="uploadForm">
<p>
文件:<input type="file" name="file" />
</p>
<p>
文件ID:<input type="text" name="fileId" palcegolder="请输入" />
</p>
<p>
<input type="submit" value="上传" />
</p>
</form>
</body>
</html>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cnf-hadoop-computing</artifactId>
<groupId>top.ninwoo</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cnf-hadoop-server</artifactId>
<dependencies>
<dependency>
<groupId>top.ninwoo</groupId>
<artifactId>cnf-hadoop-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>top.ninwoo</groupId>
<artifactId>dbc-business-utils</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<!--这里写上main方法所在类的路径-->
<configuration>
<mainClass>top.ninwoo.hadoop.server.HadoopServerStarter</mainClass>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package top.ninwoo.hadoop.server;
import top.ninwoo.dbc.api.po.ComputingResult;
import top.ninwoo.dbc.api.po.FileSlice;
import java.io.IOException;
public interface FileService {
String fileComputing(FileSlice fileSlice) throws IOException;
void fileSave(ComputingResult computingResult);
}
package top.ninwoo.hadoop.server;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class HadoopServerStarter {
public static void main(String[] args) {
SpringApplication.run(HadoopServerStarter.class, args);
}
}
package top.ninwoo.hadoop.server.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import top.ninwoo.dbc.api.po.FileSlice;
import top.ninwoo.hadoop.server.FileService;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@RestController
public class ServerController {
/* @RequestMapping("/hello")
@ResponseBody
public String hello(){
return "hello,springboot";
}*/
@Autowired
private FileService fileService;
private static final Map<Long, Map<Integer, FileSlice>> fileSliceMap = new HashMap<>();
@GetMapping("/hello/{name}/")
public String hello(@PathVariable(name = "name") String name) {
return "Hello " + name;
}
@PostMapping("/data/put/")
public String putData(@RequestBody FileSlice fileSlice) throws IOException {
if(fileSlice == null) {
return "failed";
}
if(!fileSliceMap.containsKey(fileSlice.getFileId())) {
fileSliceMap.put(fileSlice.getFileId(), new HashMap<>());
}
Map<Integer, FileSlice> sliceMap = fileSliceMap.get(fileSlice.getFileId());
sliceMap.put(fileSlice.getSliceId(), fileSlice);
//完成分散计算
fileService.fileComputing(fileSlice);
System.out.println("image gray sucess");
return "success";
}
@GetMapping("/data/get/{fileId}/{sliceId}")
public FileSlice getData(@PathVariable("fileId") Long fileId, @PathVariable("sliceId") int sliceId) {
try {
return fileSliceMap.get(fileId).get(sliceId);
} catch (Exception e) {
throw new RuntimeException("未找到对应的文件");
}
}
@GetMapping("/data/get/{fileId}")
public Set<FileSlice> getData(@PathVariable("fileId") Long fileId) {
try {
return new HashSet<FileSlice>(fileSliceMap.get(fileId).values());
} catch (Exception e) {
throw new RuntimeException("未找到文件");
}
}
}
package top.ninwoo.hadoop.server.serviceImpl;
import org.springframework.stereotype.Service;
import top.ninwoo.dbc.api.po.ComputingResult;
import top.ninwoo.dbc.api.po.FileSlice;
import top.ninwoo.hadoop.server.FileService;
import javax.imageio.ImageIO;
import java.awt.image.BufferedImage;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
@Service
public class FileServiceImpl implements FileService {
/*@Value("${dbc.directory.output}")
private String directoryOutput;*/
//分散计算
@Override
public String fileComputing(FileSlice fileSlice) throws IOException {
if(this.greySlice(fileSlice)){
return "sucess";
}
else{
return "failed";
}
}
//存储模块
//todo
//目前仅是灰度处理,如果为多个计算,则需要添加
@Override
public void fileSave(ComputingResult computingResult){}
/**
* 具体任务
* 灰度
* todo 以后继续添加
*/
private boolean greySlice(FileSlice fileSlice) throws IOException {
//读图片
//npe,可能会出问题
if(fileSlice.getFileBytes().equals(null)){
return false;
}else{
ByteArrayInputStream bis = new ByteArrayInputStream(fileSlice.getFileBytes());
BufferedImage image = ImageIO.read(bis);
int width = image.getWidth();
int height = image.getHeight();
BufferedImage grayImage = new BufferedImage(width, height, BufferedImage.TYPE_BYTE_GRAY);
for(int j= 0 ; j < width ; j++){
for(int k = 0 ; k < height; k++){
int rgb = image.getRGB(j, k);
grayImage.setRGB(j, k, rgb);
}
}
//这里需提供新的文件输出
ImageIO.write(grayImage, "jpg", new File("/tmp/static/out.jpg"));
System.out.println("download sucess");
return true;
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cnf-hadoop-computing</artifactId>
<groupId>top.ninwoo</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cnf-hadoop-utils</artifactId>
<dependencies>
<dependency>
<groupId>top.ninwoo</groupId>
<artifactId>cnf-hadoop-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package top.ninwoo.hadoop.utils;
import org.springframework.stereotype.Service;
import top.ninwoo.hadoop.api.po.*;
import top.ninwoo.hadoop.api.service.HadoopComService;
import javax.imageio.ImageIO;
import java.awt.image.BufferedImage;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@Service
public class FileServiceImplement implements HadoopComService {
ImageToArrayUtils itau = new ImageToArrayUtils();
// 图片切分
@Override
public SplitResult fileSplit(BufferedImage bufferedImage, int sliceNum) {
/* // 读入大图
BufferedImage image = itau.readImage(srcOrig);*/
// 分割图片,并将其转化为byte数组
List<byte[]> byteList = itau.imageSplit(bufferedImage,sliceNum);
List<FileSlice> fileSliceList = new ArrayList<>();
for (int i = 0; i < byteList.size(); i++) {
FileSlice fileSlice = new FileSlice();
fileSlice.setFileId(1L);//文件的唯一标识
fileSlice.setSliceId(i);//数据碎片的id
fileSlice.setFileBytes(byteList.get(i));//文件的比特数据
fileSliceList.add(fileSlice);
}
//对分割后的碎片,按照其数据量大小排序
/* for (int i = 0; i < fileSliceList.size()-1; i++) {
for (int j = 0; j < fileSliceList.size()-i-1; j++) {
if (fileSliceList.get(j).getFileBytes().length<fileSliceList.get(j+1).getFileBytes().length) {
fileSliceList.set(j,fileSliceList.get(j+1));
fileSliceList.set(j+1,fileSliceList.get(j));
}
}
}*/
SplitResult splitResult = new SplitResult();
splitResult.setFileSliceList(fileSliceList);
return splitResult;
}
//小图片处理---灰度处理
@Override
public ComputingResult sliceComputing(SplitResult splitResult){
List<FileSlice> fileSliceList = splitResult.getFileSliceList();
List<byte[]> grayByteList = itau.grayImage(fileSliceList);
//把每个小图片转为一个byte数组
//List<byte[]> grayByteList = itau.imageToByte(grayImage);
List<FileSliceComputing> grayfileSliceList = new ArrayList<>();
for (int i = 0; i < grayByteList.size(); i++) {
FileSliceComputing fileSliceComputing = new FileSliceComputing();
fileSliceComputing.setFileId(1L);//文件的唯一标识
fileSliceComputing.setSliceId(i);//数据碎片的id
fileSliceComputing.setFileBytes(grayByteList.get(i));//文件的比特数据
grayfileSliceList.add(fileSliceComputing);
}
ComputingResult computingResult = new ComputingResult();
computingResult.setFileSliceComputingList(grayfileSliceList);
return computingResult;
}
@Override
public MergeResult sliceMerge(ComputingResult computingResult, int sliceNum) {
List<FileSliceComputing> grayfileSliceList = computingResult.getFileSliceComputingList();
BufferedImage finalImg = itau.imageMerge(grayfileSliceList, sliceNum);
MergeResult mergeResult = new MergeResult();
mergeResult.setImages(finalImg);
return mergeResult;
}
public SendResult sendFileSlice(String targetIp, FileSlice fileSlice) {
return null;
}
public FileSlice getFileSlice(String targetIp, Long fileId, int sliceId) {
return null;
}
}
package top.ninwoo.hadoop.utils;
import top.ninwoo.hadoop.api.po.FileSlice;
import top.ninwoo.hadoop.api.po.FileSliceComputing;
import javax.imageio.ImageIO;
import java.awt.*;
import java.awt.image.BufferedImage;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class ImageToArrayUtils {
public static int rows = 1 ;
//public static int sliceNum ;
//将image读为BufferedImage
/* public static BufferedImage readImage(String fileName){
//File file = new File(directoryInput+fileName);
File file = new File(fileName);
FileInputStream fis = null;
BufferedImage image = null;
try {
fis = new FileInputStream(file);
image = ImageIO.read(fis);
} catch (Exception e) {
if (fis != null) {
try {
fis.close();
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
throw new RuntimeException("文件读取错误" + e.getMessage());
}
return image;
}*/
public static List<byte[]> imageSplit(BufferedImage images, int sliceNum){
//BufferedImage images = readImage(name);
int cols = sliceNum / rows;
// 计算每个小图的宽度和高度
int chunkWidth = images.getWidth() / cols;
int chunkHeight = images.getHeight() / rows;
int count = 0;
//BufferedImage imgs[] = new BufferedImage[chunks];
BufferedImage imgs[] = new BufferedImage[sliceNum];
for (int x = 0; x < rows; x++) {
for (int y = 0; y < cols; y++) {
//设置小图的大小和类型
imgs[count] = new BufferedImage(chunkWidth, chunkHeight, images.getType());
//写入图像内容
Graphics2D gr = imgs[count++].createGraphics();
gr.drawImage(images, 0, 0,
chunkWidth, chunkHeight,
chunkWidth* y, chunkHeight * x,
chunkWidth * y + chunkWidth,
chunkHeight * x + chunkHeight, null);
gr.dispose();
}
}
//将imgs[]转为相应的byte数组
List<byte[]> byteList = new ArrayList<byte[]>();
for (int i = 0; i < imgs.length; i++) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
ImageIO.write(imgs[i],"jpg",bos);
} catch (IOException e) {
e.printStackTrace();
}
try {
bos.close();
} catch (IOException e) {
e.printStackTrace();
}
byteList.add(bos.toByteArray());
}
// 输出小图
/* for (int i = 0; i < imgs.length; i++) {
try {
ImageIO.write(imgs[i], "jpg", new File("F:\\images\\split\\image" + i + ".jpg"));
} catch (IOException e) {
e.printStackTrace();
}
}*/
return byteList;
}
//图片灰度化,同时将grayImage[]转为byte[]数组
public static List<byte[]> grayImage(List<FileSlice> fileSliceList){
BufferedImage images[] = new BufferedImage[fileSliceList.size()];
BufferedImage grayImage[] = new BufferedImage[fileSliceList.size()];
int[] width = new int[fileSliceList.size()];
int[] height = new int[fileSliceList.size()];
for (int i = 0; i < fileSliceList.size(); i++) {
ByteArrayInputStream bis = new ByteArrayInputStream(fileSliceList.get(i).getFileBytes());
try {
images[i] = ImageIO.read(bis);
width[i] = images[i].getWidth();
height[i] = images[i].getHeight();
grayImage[i] = new BufferedImage(width[i], height[i], BufferedImage.TYPE_BYTE_GRAY);//重点,技巧在这个参数BufferedImage.TYPE_BYTE_GRAY
for(int j= 0 ; j < width[i] ; j++){
for(int k = 0 ; k < height[i]; k++){
int rgb = images[i].getRGB(j, k);
grayImage[i].setRGB(j, k, rgb);
}
}
ImageIO.write(grayImage[i], "jpg", new File("\\tmp\\statics\\image" + i + ".jpg"));
} catch (IOException e) {
e.printStackTrace();
}
try {
bis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
List<byte[]> byteList = new ArrayList<>();
for (int i = 0; i < grayImage.length; i++) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
ImageIO.write(grayImage[i],"jpg",bos);
} catch (IOException e) {
e.printStackTrace();
}
try {
bos.close();
} catch (IOException e) {
e.printStackTrace();
}
byteList.add(bos.toByteArray());
}
return byteList;
}
//小图片灰度化后,进行合并
public static BufferedImage imageMerge(List<FileSliceComputing> fileSliceComputingList, int sliceNum){
//byte[]--->BufferedImage
BufferedImage images[] = new BufferedImage[fileSliceComputingList.size()];
for (int i = 0; i < fileSliceComputingList.size(); i++) {
ByteArrayInputStream bis = new ByteArrayInputStream(fileSliceComputingList.get(i).getFileBytes());
try {
images[i] = ImageIO.read(bis);
//ImageIO.write(images[i], "jpg", new File("F:\\images\\grayImage\\image" + i + ".jpg"));
ImageIO.write(images[i], "jpg", new File("\\tmp\\statics\\image" + i + ".jpg"));
} catch (IOException e) {
e.printStackTrace();
}
try {
bis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
int cols = sliceNum/rows;
int chunks = sliceNum;
int chunkWidth, chunkHeight;
int type;
//读入小图
File[] imgFiles = new File[chunks];
for (int i = 0; i < chunks; i++) {
//imgFiles[i] = new File("F:\\images\\grayImage\\image" + i + ".jpg");
imgFiles[i] = new File("\\tmp\\statics\\image" + i + ".jpg");
}
//创建BufferedImage
BufferedImage[] buffImages = new BufferedImage[chunks];
for (int i = 0; i < chunks; i++) {
try {
buffImages[i] = ImageIO.read(imgFiles[i]);
} catch (IOException e) {
e.printStackTrace();
}
}
type = buffImages[0].getType();
chunkWidth = buffImages[0].getWidth();
chunkHeight = buffImages[0].getHeight();
//设置拼接后图的大小和类型
BufferedImage finalImg = new BufferedImage(chunkWidth * cols, chunkHeight * rows, type);
//写入图像内容
int num = 0;
for (int i = 0; i < rows; i++) {
for (int j = 0; j < cols; j++) {
finalImg.createGraphics().drawImage(buffImages[num], chunkWidth * j, chunkHeight * i, null);
num++;
}
}
//输出拼接后的图像
/* try {
ImageIO.write(finalImg, "jpeg", new File("D:\\vx-ymwang\\images\\mergeImage\\finalImg.jpg"));
} catch (IOException e) {
e.printStackTrace();
}*/
return finalImg;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cnf</artifactId>
<groupId>top.ninwoo</groupId>
<version>1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cnf-hadoop-computing</artifactId>
<packaging>pom</packaging>
<modules>
<module>cnf-hadoop-client</module>
<module>cnf-hadoop-server</module>
<module>cnf-hadoop-utils</module>
<module>cnf-hadoop-api</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>top.ninwoo</groupId>
<artifactId>cnf-client-starter</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>top.ninwoo</groupId>
<artifactId>cnf-hadoop-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>top.ninwoo</groupId>
<artifactId>cnf-hadoop-client</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${springboot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${springboot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${springboot.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${springboot.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
<scope>compile</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-thymeleaf -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
<version>${thymeleaf.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${springboot.version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
...@@ -22,6 +22,8 @@ ...@@ -22,6 +22,8 @@
<module>apps/cnf-app-demo</module> <module>apps/cnf-app-demo</module>
<module>apps/cnf-distributed-file-transfer</module> <module>apps/cnf-distributed-file-transfer</module>
<module>apps/cnf-distributed-business-computing</module> <module>apps/cnf-distributed-business-computing</module>
<module>apps/cnf-hadoop-computing</module>
<module>apps/cnf-hadoop-computing</module>
</modules> </modules>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment