|
|
|
@ -1,7 +1,11 @@ |
|
|
|
package core |
|
|
|
|
|
|
|
import ( |
|
|
|
"context" |
|
|
|
"fmt" |
|
|
|
"io" |
|
|
|
"log" |
|
|
|
"mime" |
|
|
|
"os" |
|
|
|
"path" |
|
|
|
"path/filepath" |
|
|
|
@ -11,6 +15,7 @@ import ( |
|
|
|
"time" |
|
|
|
|
|
|
|
"go.uber.org/zap" |
|
|
|
"google.golang.org/grpc" |
|
|
|
) |
|
|
|
|
|
|
|
// FileService 提供文件传输服务
|
|
|
|
@ -131,34 +136,142 @@ func (f *FileService) Compress(args *ZipBlock, reply *string) error { |
|
|
|
// 返回压缩包名称
|
|
|
|
*reply = ziprl |
|
|
|
// 发送文件到远程服务器上
|
|
|
|
|
|
|
|
err := clientUploadFile(args.remote, ziprl, "/www/wwwroot") |
|
|
|
if err != nil { |
|
|
|
logger.Error("Compress", zap.String("msg", "上传文件到远程服务器失败"), zap.Error(err)) |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// 接收传送来的文件存储在指定的目录下
|
|
|
|
func (f *FileService) Download(args *FileChunk, reply *bool) error { |
|
|
|
// 按照1mb 大小切割分块
|
|
|
|
const chunkSize = 1024 * 1024 // 1MB
|
|
|
|
// 客户端调用RPC传送文件的函数
|
|
|
|
// 传入zip的实际路径,远程的服务器带端口,远程文件存放路径
|
|
|
|
func clientUploadFile(remote string, filePath string, uploadPath string) error { |
|
|
|
|
|
|
|
// 启用日志
|
|
|
|
logger := util.NewProductionLogger() |
|
|
|
defer logger.Sync() |
|
|
|
// 实际路径
|
|
|
|
realFilePath := filepath.Join(args.FileName) |
|
|
|
logger.Info("Download", zap.String("realFilePath", realFilePath)) |
|
|
|
// 打开或创建文件,使用追加模式
|
|
|
|
file, err := os.OpenFile(realFilePath, os.O_WRONLY|os.O_CREATE, 0644) |
|
|
|
// 将服务器的信息拆分为ip和端口
|
|
|
|
harr := strings.Split(remote, ":") |
|
|
|
dstip := harr[0] |
|
|
|
dport := harr[1] |
|
|
|
|
|
|
|
// 连接到服务器
|
|
|
|
service := fmt.Sprintf("%v:%v", dstip, dport) |
|
|
|
conn, err := grpc.NewClient(service) |
|
|
|
if err != nil { |
|
|
|
logger.Error("clientUploadFile", zap.String("msg", "无法连接到服务器"), zap.Error(err)) |
|
|
|
return err |
|
|
|
} |
|
|
|
defer conn.Close() |
|
|
|
client := pb.NewFileUploadServiceClient(conn) |
|
|
|
// 调用RPC方法
|
|
|
|
stream, err := client.UploadFile(context.Background()) |
|
|
|
if err != nil { |
|
|
|
logger.Error("clientUploadFile", zap.String("msg", "无法创建上传流"), zap.Error(err)) |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
// 打开文件
|
|
|
|
file, err := os.Open(filePath) |
|
|
|
if err != nil { |
|
|
|
logger.Error("clientUploadFile", zap.String("msg", "无法打开文件"), zap.Error(err)) |
|
|
|
return err |
|
|
|
} |
|
|
|
defer file.Close() |
|
|
|
// 移动到指定偏移量
|
|
|
|
if _, err := file.Seek(args.Offset, io.SeekStart); err != nil { |
|
|
|
|
|
|
|
// 获取文件信息
|
|
|
|
fileInfo, err := file.Stat() |
|
|
|
if err != nil { |
|
|
|
logger.Error("clientUploadFile", zap.String("msg", "无法获取文件信息"), zap.Error(err)) |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
// 获取文件 MIME 类型
|
|
|
|
contentType := mime.TypeByExtension(filepath.Ext(filePath)) |
|
|
|
if contentType == "" { |
|
|
|
contentType = "application/octet-stream" |
|
|
|
} |
|
|
|
|
|
|
|
// 计算总块数
|
|
|
|
totalChunks := int32((fileInfo.Size() + int64(chunkSize) - 1) / int64(chunkSize)) |
|
|
|
log.Printf("准备上传文件: %s, 大小: %d bytes, 总块数: %d", |
|
|
|
fileInfo.Name(), fileInfo.Size(), totalChunks) |
|
|
|
|
|
|
|
// 创建流式上传
|
|
|
|
stream, err = client.UploadFile(context.Background()) |
|
|
|
if err != nil { |
|
|
|
logger.Error("clientUploadFile", zap.String("msg", "无法创建上传流"), zap.Error(err)) |
|
|
|
return err |
|
|
|
} |
|
|
|
// 写入数据
|
|
|
|
if _, err := file.Write(args.Data); err != nil { |
|
|
|
|
|
|
|
// 分块读取并发送文件
|
|
|
|
buffer := make([]byte, chunkSize) |
|
|
|
var chunkIndex int32 = 0 |
|
|
|
|
|
|
|
// 同步等候文件的传送
|
|
|
|
for { |
|
|
|
// 读取文件块
|
|
|
|
n, err := file.Read(buffer) |
|
|
|
if err != nil && err != io.EOF { |
|
|
|
logger.Error("clientUploadFile", zap.String("msg", "读取文件失败"), zap.Error(err)) |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
// 构建文件块消息
|
|
|
|
chunk := &pb.FileChunk{ |
|
|
|
Content: buffer[:n], |
|
|
|
ChunkIndex: chunkIndex, |
|
|
|
TotalChunks: totalChunks, |
|
|
|
} |
|
|
|
|
|
|
|
// 第一个块需要包含文件元信息
|
|
|
|
if chunkIndex == 0 { |
|
|
|
chunk.Info = &pb.FileInfo{ |
|
|
|
FileName: fileInfo.Name(), |
|
|
|
FileSize: fileInfo.Size(), |
|
|
|
ContentType: contentType, |
|
|
|
UploadPath: uploadPath, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// 发送文件块
|
|
|
|
if err := stream.Send(chunk); err != nil { |
|
|
|
logger.Error("clientUploadFile", zap.String("msg", "发送文件块失败"), zap.Error(err)) |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
// 打印上传进度
|
|
|
|
if chunkIndex%10 == 0 { // 每10个块打印一次
|
|
|
|
progress := float64(chunkIndex+1) / float64(totalChunks) * 100 |
|
|
|
logger.Info("clientUploadFile", zap.String("msg", "上传进度"), zap.Float64("progress", progress), zap.Int32("chunkIndex", chunkIndex+1), zap.Int32("totalChunks", totalChunks)) |
|
|
|
} |
|
|
|
|
|
|
|
// 检查是否到达文件末尾
|
|
|
|
if err == io.EOF { |
|
|
|
break |
|
|
|
} |
|
|
|
|
|
|
|
chunkIndex++ |
|
|
|
} |
|
|
|
|
|
|
|
// 接收服务器响应
|
|
|
|
response, err := stream.CloseAndRecv() |
|
|
|
if err != nil { |
|
|
|
logger.Error("clientUploadFile", zap.String("msg", "接收服务器响应失败"), zap.Error(err)) |
|
|
|
return err |
|
|
|
} |
|
|
|
*reply = true |
|
|
|
|
|
|
|
if response.Success { |
|
|
|
logger.Info("clientUploadFile", zap.String("msg", "文件上传成功"), zap.String("filePath", response.FilePath), zap.Int64("receivedSize", response.ReceivedSize)) |
|
|
|
} else { |
|
|
|
logger.Error("clientUploadFile", zap.String("msg", "服务器返回错误"), zap.String("message", response.Message)) |
|
|
|
return fmt.Errorf("服务器返回错误: %s", response.Message) |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
|