From 45f7d9dd5589016c6c449bc91b1d0e5b235aee26 Mon Sep 17 00:00:00 2001 From: xc Date: Wed, 17 Sep 2025 17:04:59 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9B=E5=BB=BA=E5=AE=A2=E6=88=B7=E7=AB=AF?= =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E7=9A=84=E4=BD=BF=E7=94=A8=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scagent/core/FileRpc.go | 139 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 126 insertions(+), 13 deletions(-) diff --git a/scagent/core/FileRpc.go b/scagent/core/FileRpc.go index b48b186..55b8c0e 100644 --- a/scagent/core/FileRpc.go +++ b/scagent/core/FileRpc.go @@ -1,7 +1,11 @@ package core import ( + "context" + "fmt" "io" + "log" + "mime" "os" "path" "path/filepath" @@ -11,6 +15,7 @@ import ( "time" "go.uber.org/zap" + "google.golang.org/grpc" ) // FileService 提供文件传输服务 @@ -131,34 +136,142 @@ func (f *FileService) Compress(args *ZipBlock, reply *string) error { // 返回压缩包名称 *reply = ziprl // 发送文件到远程服务器上 - + err := clientUploadFile(args.remote, ziprl, "/www/wwwroot") + if err != nil { + logger.Error("Compress", zap.String("msg", "上传文件到远程服务器失败"), zap.Error(err)) + return err + } } return nil } -// 接收传送来的文件存储在指定的目录下 -func (f *FileService) Download(args *FileChunk, reply *bool) error { +// 按照1mb 大小切割分块 +const chunkSize = 1024 * 1024 // 1MB +// 客户端调用RPC传送文件的函数 +// 传入zip的实际路径,远程的服务器带端口,远程文件存放路径 +func clientUploadFile(remote string, filePath string, uploadPath string) error { + // 启用日志 logger := util.NewProductionLogger() defer logger.Sync() - // 实际路径 - realFilePath := filepath.Join(args.FileName) - logger.Info("Download", zap.String("realFilePath", realFilePath)) - // 打开或创建文件,使用追加模式 - file, err := os.OpenFile(realFilePath, os.O_WRONLY|os.O_CREATE, 0644) + // 将服务器的信息拆分为ip和端口 + harr := strings.Split(remote, ":") + dstip := harr[0] + dport := harr[1] + + // 连接到服务器 + service := fmt.Sprintf("%v:%v", dstip, dport) + conn, err := grpc.NewClient(service) + if err != nil { + logger.Error("clientUploadFile", zap.String("msg", "无法连接到服务器"), zap.Error(err)) + return err + } + defer conn.Close() + client := pb.NewFileUploadServiceClient(conn) + // 调用RPC方法 + stream, err := client.UploadFile(context.Background()) + if err != nil { + logger.Error("clientUploadFile", zap.String("msg", "无法创建上传流"), zap.Error(err)) + return err + } + + // 打开文件 + file, err := os.Open(filePath) if err != nil { + logger.Error("clientUploadFile", zap.String("msg", "无法打开文件"), zap.Error(err)) return err } defer file.Close() - // 移动到指定偏移量 - if _, err := file.Seek(args.Offset, io.SeekStart); err != nil { + + // 获取文件信息 + fileInfo, err := file.Stat() + if err != nil { + logger.Error("clientUploadFile", zap.String("msg", "无法获取文件信息"), zap.Error(err)) + return err + } + + // 获取文件 MIME 类型 + contentType := mime.TypeByExtension(filepath.Ext(filePath)) + if contentType == "" { + contentType = "application/octet-stream" + } + + // 计算总块数 + totalChunks := int32((fileInfo.Size() + int64(chunkSize) - 1) / int64(chunkSize)) + log.Printf("准备上传文件: %s, 大小: %d bytes, 总块数: %d", + fileInfo.Name(), fileInfo.Size(), totalChunks) + + // 创建流式上传 + stream, err = client.UploadFile(context.Background()) + if err != nil { + logger.Error("clientUploadFile", zap.String("msg", "无法创建上传流"), zap.Error(err)) return err } - // 写入数据 - if _, err := file.Write(args.Data); err != nil { + + // 分块读取并发送文件 + buffer := make([]byte, chunkSize) + var chunkIndex int32 = 0 + + // 同步等候文件的传送 + for { + // 读取文件块 + n, err := file.Read(buffer) + if err != nil && err != io.EOF { + logger.Error("clientUploadFile", zap.String("msg", "读取文件失败"), zap.Error(err)) + return err + } + + // 构建文件块消息 + chunk := &pb.FileChunk{ + Content: buffer[:n], + ChunkIndex: chunkIndex, + TotalChunks: totalChunks, + } + + // 第一个块需要包含文件元信息 + if chunkIndex == 0 { + chunk.Info = &pb.FileInfo{ + FileName: fileInfo.Name(), + FileSize: fileInfo.Size(), + ContentType: contentType, + UploadPath: uploadPath, + } + } + + // 发送文件块 + if err := stream.Send(chunk); err != nil { + logger.Error("clientUploadFile", zap.String("msg", "发送文件块失败"), zap.Error(err)) + return err + } + + // 打印上传进度 + if chunkIndex%10 == 0 { // 每10个块打印一次 + progress := float64(chunkIndex+1) / float64(totalChunks) * 100 + logger.Info("clientUploadFile", zap.String("msg", "上传进度"), zap.Float64("progress", progress), zap.Int32("chunkIndex", chunkIndex+1), zap.Int32("totalChunks", totalChunks)) + } + + // 检查是否到达文件末尾 + if err == io.EOF { + break + } + + chunkIndex++ + } + + // 接收服务器响应 + response, err := stream.CloseAndRecv() + if err != nil { + logger.Error("clientUploadFile", zap.String("msg", "接收服务器响应失败"), zap.Error(err)) return err } - *reply = true + + if response.Success { + logger.Info("clientUploadFile", zap.String("msg", "文件上传成功"), zap.String("filePath", response.FilePath), zap.Int64("receivedSize", response.ReceivedSize)) + } else { + logger.Error("clientUploadFile", zap.String("msg", "服务器返回错误"), zap.String("message", response.Message)) + return fmt.Errorf("服务器返回错误: %s", response.Message) + } + return nil }