package core import ( "context" "fmt" "io" "log" "mime" "os" "path" "path/filepath" "scagent/config" pb "scagent/proto/fileupload" "scagent/util" "strings" "time" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) // FileService 提供文件传输服务 type FileService struct { pb.UnimplementedFileUploadServiceServer } // FileChunk 表示文件分片 type FileChunk struct { FileName string // 文件名 Data []byte // 数据分片 Offset int64 // 偏移量,用于重组 IsLast bool // 是否为最后一个分片 } // 压缩文件 type ZipBlock struct { Basepath string Curpath string // 当前路径 SelFile []string // 选中的文件 RemoteAddr string // 远程主机地址,带端口 RemotePath string // 远程主机的存储路径 } // 处理文件上传 // dirpath 目标文件存放位置 func (f *FileService) Upload(stream pb.FileUploadService_UploadFileServer, dirpath string) error { var ( file *os.File fileInfo *pb.FileInfo ) // 启用日志 logger := util.NewProductionLogger() defer logger.Sync() // 检查目录是否存在 if _, err := os.Stat(dirpath); os.IsNotExist(err) { // 目录不存在,创建目录 if err := os.MkdirAll(dirpath, 0755); err != nil { return err } } for { // 接收客户端发送的文件块 chunk, err := stream.Recv() if err == io.EOF { // 所有块接收完毕,关闭文件并返回响应 if file != nil { file.Close() } // 客户端关闭流,上传完成 logger.Info("UploadFile", zap.String("msg", "Client closed stream")) return nil } if err != nil { return err } // 处理第一个块,包含文件元信息 if chunk.ChunkIndex == 0 && fileInfo == nil { fileInfo = chunk.Info if fileInfo == nil { logger.Error("UploadFile", zap.String("msg", "fileInfo is nil")) return nil } // 创建文件保存路径 filePath := filepath.Join(dirpath, fileInfo.FileName) // 打开或创建文件,使用追加模式 file, err = os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0644) if err != nil { return err } logger.Info("开始接收文件", zap.String("filename", fileInfo.FileName), zap.String("path", filePath)) } // 写入文件内容 if file != nil && len(chunk.Content) > 0 { _, err = file.Write(chunk.Content) if err != nil { logger.Error("UploadFile", zap.String("msg", "写入文件失败"), zap.Error(err)) return nil } // 打印上传进度 if chunk.ChunkIndex%10 == 0 { // 每10个块打印一次 progress := float64(chunk.ChunkIndex+1) / float64(chunk.TotalChunks) * 100 logger.Info("UploadFile", zap.String("msg", "文件上传进度"), zap.Float64("progress", progress), zap.Int("chunkIndex", int(chunk.ChunkIndex+1)), zap.Int("totalChunks", int(chunk.TotalChunks))) } } } } // 压缩文件 // 返回压缩包的名称 func (f *FileService) Compress(args *ZipBlock, reply *string) error { // fmt.Printf("compress args: %v\n", args) // 启用日志 logger := util.NewProductionLogger() defer logger.Sync() // 实际路径 realFilePath := filepath.Join(args.Basepath, args.Curpath) logger.Info("Compress", zap.String("realFilePath", realFilePath)) // zip 文件名 zpFileName := "BIU_" + time.Now().Format("20060102_150405") + ".zip" // 创建zip 异步? taskId := make(chan string) // 异步压缩 go func() { util.CompressToZip(zpFileName, realFilePath, args.SelFile) taskId <- "arcok" // 日志输出 logger.Info("压缩文件", zap.String("filename", zpFileName), zap.String("path", realFilePath)) }() // ZIP 文件的实际路径 ziprl := path.Join(config.G.FilePath, "/sync_zips", zpFileName) // zip 创建成功后 rest := <-taskId // 如果压缩包生成成功以后再执行 if strings.EqualFold(strings.ToLower(rest), "arcok") { // 合并为实际路径 remotePath := path.Join("/www/wwwroot", args.RemotePath, zpFileName) // 异步发送文件到远程服务器上 go clientUploadFile(args.RemoteAddr, ziprl, remotePath) // 返回压缩包名称 *reply = ziprl } return nil } // 按照1mb 大小切割分块 const chunkSize = 1024 * 1024 // 1MB // 客户端调用RPC传送文件的函数 // 传入zip的实际路径,远程的服务器带端口,远程文件存放路径 func clientUploadFile(remote string, filePath string, uploadPath string) error { // fmt.Printf("remote: %s, filePath: %s, uploadPath: %s\n", remote, filePath, uploadPath) // 启用日志 logger := util.NewProductionLogger() defer logger.Sync() // 将服务器的信息拆分为ip和端口 harr := strings.Split(remote, ":") dstip := harr[0] dport := harr[1] // 连接到服务器 service := fmt.Sprintf("%v:%v", dstip, dport) fmt.Printf("service: %s\n", service) conn, err := grpc.Dial(service, grpc.WithTransportCredentials(insecure.NewCredentials())) // 使用安全连接 // conn, err := grpc.NewClient(service) if err != nil { logger.Error("clientUploadFile", zap.String("msg", "无法连接到服务器"), zap.Error(err)) return err } defer conn.Close() client := pb.NewFileUploadServiceClient(conn) // 调用RPC方法 stream, err := client.UploadFile(context.Background()) if err != nil { logger.Error("clientUploadFile", zap.String("msg", "无法创建上传流"), zap.Error(err)) return err } // 打开文件 file, err := os.Open(filePath) if err != nil { logger.Error("clientUploadFile", zap.String("msg", "无法打开文件"), zap.Error(err)) return err } defer file.Close() // 获取文件信息 fileInfo, err := file.Stat() if err != nil { logger.Error("clientUploadFile", zap.String("msg", "无法获取文件信息"), zap.Error(err)) return err } // 获取文件 MIME 类型 contentType := mime.TypeByExtension(filepath.Ext(filePath)) if contentType == "" { contentType = "application/octet-stream" } // 计算总块数 totalChunks := int32((fileInfo.Size() + int64(chunkSize) - 1) / int64(chunkSize)) log.Printf("准备上传文件: %s, 大小: %d bytes, 总块数: %d", fileInfo.Name(), fileInfo.Size(), totalChunks) // 创建流式上传 stream, err = client.UploadFile(context.Background()) if err != nil { logger.Error("clientUploadFile", zap.String("msg", "无法创建上传流"), zap.Error(err)) return err } // 分块读取并发送文件 buffer := make([]byte, chunkSize) var chunkIndex int32 = 0 // 同步等候文件的传送 for { // 读取文件块 n, err := file.Read(buffer) if err != nil && err != io.EOF { logger.Error("clientUploadFile", zap.String("msg", "读取文件失败"), zap.Error(err)) return err } // 构建文件块消息 chunk := &pb.FileChunk{ Content: buffer[:n], ChunkIndex: chunkIndex, TotalChunks: totalChunks, } // 第一个块需要包含文件元信息 if chunkIndex == 0 { chunk.Info = &pb.FileInfo{ FileName: fileInfo.Name(), FileSize: fileInfo.Size(), ContentType: contentType, UploadPath: uploadPath, } } // 发送文件块 if err := stream.Send(chunk); err != nil { logger.Error("clientUploadFile", zap.String("msg", "发送文件块失败"), zap.Error(err)) return err } // 打印上传进度 if chunkIndex%10 == 0 { // 每10个块打印一次 progress := float64(chunkIndex+1) / float64(totalChunks) * 100 logger.Info("clientUploadFile", zap.String("msg", "上传进度"), zap.Float64("progress", progress), zap.Int32("chunkIndex", chunkIndex+1), zap.Int32("totalChunks", totalChunks)) } // 检查是否到达文件末尾 if err == io.EOF { break } chunkIndex++ } // 接收服务器响应 response, err := stream.CloseAndRecv() if err != nil { logger.Error("clientUploadFile", zap.String("msg", "接收服务器响应失败"), zap.Error(err)) return err } if response.Success { logger.Info("clientUploadFile", zap.String("msg", "文件上传成功"), zap.String("filePath", response.FilePath), zap.Int64("receivedSize", response.ReceivedSize)) } else { logger.Error("clientUploadFile", zap.String("msg", "服务器返回错误"), zap.String("message", response.Message)) return fmt.Errorf("服务器返回错误: %s", response.Message) } return nil }