diff --git a/aufs/core/sendzip.go b/aufs/core/sendzip.go index 4a9882c..28ee954 100644 --- a/aufs/core/sendzip.go +++ b/aufs/core/sendzip.go @@ -71,8 +71,6 @@ func SendZip(w http.ResponseWriter, r *http.Request) { RemotePath: remotepath[0], } - fmt.Printf("zipblock:%v", zipblock) - // 从serip 切割出ip 和端口 splitip := strings.Split(serip[0], ":") if len(splitip) != 2 { @@ -85,7 +83,8 @@ func SendZip(w http.ResponseWriter, r *http.Request) { service := fmt.Sprintf("%v:%v", srcip, sport) client, err := jsonrpc.Dial("tcp", service) if err != nil { - fmt.Fprintf(w, "jsonrpc dial faild %v", err) + // fmt.Fprintf(w, "jsonrpc dial faild %v", err) + http.Error(w, "jsonrpc dial faild "+err.Error(), http.StatusInternalServerError) return } // 调用远程方法 @@ -95,7 +94,8 @@ func SendZip(w http.ResponseWriter, r *http.Request) { defer client.Close() if err != nil { - fmt.Fprintf(w, "jsonrpc call faild %v", err) + // fmt.Fprintf(w, "jsonrpc call faild %v", err) + http.Error(w, "jsonrpc call faild "+err.Error(), http.StatusInternalServerError) return } // 输出内容 diff --git a/scagent/core/FileRpc.go b/scagent/core/FileRpc.go index f581bd8..163722d 100644 --- a/scagent/core/FileRpc.go +++ b/scagent/core/FileRpc.go @@ -1,9 +1,7 @@ package core import ( - "bufio" "fmt" - "io" "net/rpc" "os" "path" @@ -28,75 +26,6 @@ type ZipBlock struct { // FileTransferService 定义文件传输服务 type FileService struct{} -// FileInfo 包含文件的元信息 -type FileInfo struct { - FileName string - FileSize int64 -} - -// FileChunk 表示文件的一部分 -type FileChunk struct { - Data []byte - FileName string - Offset int64 - IsLast bool -} - -// SendFile 接收文件信息(名称和大小) -func (f *FileService) SendFileInfo(info FileInfo, reply *bool) error { - // 创建保存文件的目录 - // if err := os.MkdirAll("received_files", 0755); err != nil { - // return err - // } - - // 提取路径 /www/wwwroot/bidemo.com/BIU_20250918_183144.zip - dirPath := filepath.Dir(info.FileName) - // 创建目录 - if err := os.MkdirAll(dirPath, 0755); err != nil { - return err - } - - // 创建文件(如果存在则截断) - filePath := filepath.Join("received_files", info.FileName) - file, err := os.Create(filePath) - if err != nil { - return err - } - defer file.Close() - - *reply = true - return nil -} - -// SendFileChunk 接收文件块并写入文件 -func (f *FileService) SendFileChunk(chunk FileChunk, reply *bool) error { - // filePath := filepath.Join("received_files", chunk.FileName) - fmt.Printf("recive file :%s", chunk.FileName) - // 合并为实际路径 - // filePath := filepath.Join("received_files", chunk.FileName) - filePath := chunk.FileName - - // 打开文件,使用追加模式 - file, err := os.OpenFile(filePath, os.O_WRONLY, 0644) - if err != nil { - return err - } - defer file.Close() - - // 移动到指定偏移量 - if _, err := file.Seek(chunk.Offset, io.SeekStart); err != nil { - return err - } - - // 写入数据 - if _, err := file.Write(chunk.Data); err != nil { - return err - } - - *reply = true - return nil -} - // 压缩文件 // 返回压缩包的名称 func (f *FileService) Compress(args *ZipBlock, reply *string) error { @@ -139,7 +68,7 @@ func (f *FileService) Compress(args *ZipBlock, reply *string) error { // 客户端调用RPC传送文件的函数 // 传入zip的实际路径,远程的服务器带端口,远程文件存放路径 func clientUploadFile(remote string, filePath string, uploadPath string) error { - // fmt.Printf("remote: %s, filePath: %s, uploadPath: %s\n", remote, filePath, uploadPath) + fmt.Printf("remote: %s, filePath: %s, uploadPath: %s\n", remote, filePath, uploadPath) // 启用日志 logger := util.NewProductionLogger() defer logger.Sync() @@ -157,68 +86,31 @@ func clientUploadFile(remote string, filePath string, uploadPath string) error { return err } defer client.Close() + // 获取文件信息 fileInfo, err := os.Stat(filePath) if err != nil { - panic(err) + // panic(err) + fmt.Printf("获取文件信息失败: %v\n", err) + return err } // 提取文件名 - fileName := filepath.Base(filePath) + // 远程的文件名 + fileName := filepath.Base(uploadPath) + fmt.Printf("fileName: %s\n", fileName) // 发送文件信息 var reply bool err = client.Call("FileService.SendFileInfo", FileInfo{ FileName: fileName, FileSize: fileInfo.Size(), }, &reply) - if err != nil || !reply { - logger.Error("发送文件信息失败", zap.Error(err)) - } - // 打开文件准备读取 - file, err := os.Open(filePath) if err != nil { - panic(err) - } - defer file.Close() - - // 读取并发送文件块 - chunkSize := 1 * 1024 * 1024 // 1MB - buffer := make([]byte, chunkSize) // 4KB块大小 - reader := bufio.NewReader(file) - var offset int64 = 0 - - // logger.Info("开始发送文件: %s (大小: %d bytes)", zap.String("filename", fileName), zap.String("path", filePath)) - fmt.Printf("开始发送文件: %s (大小: %d bytes)\n", fileName, fileInfo.Size()) - - // 发送文件 - for { - n, err := reader.Read(buffer) - if err != nil { - break - } - - // 判断是否是最后一块 - isLast := offset+int64(n) >= fileInfo.Size() - - // 发送文件块 - err = client.Call("FileService.SendFileChunk", FileChunk{ - Data: buffer[:n], - FileName: uploadPath, - Offset: offset, - IsLast: isLast, - }, &reply) - - if err != nil || !reply { - logger.Error("发送文件块失败", zap.Error(err)) - } - - offset += int64(n) - - // 显示进度 - progress := float64(offset) / float64(fileInfo.Size()) * 100 - fmt.Printf("\r发送进度: %.2f%%", progress) + logger.Error("发送文件信息失败", zap.Error(err)) + fmt.Printf("发送文件信息失败: %v\n", err) + return err } - fmt.Println("\n文件发送完成!") + fmt.Printf("replay: %v\n", reply) return nil } diff --git a/scagent/core/UpFile.go b/scagent/core/UpFile.go new file mode 100644 index 0000000..dacde20 --- /dev/null +++ b/scagent/core/UpFile.go @@ -0,0 +1,80 @@ +package core + +import ( + "fmt" + "io" + "os" + "path/filepath" + "scagent/util" + + "go.uber.org/zap" +) + +type UpFileService struct{} + +// FileInfo 包含文件的元信息 +type FileInfo struct { + FileName string + FileSize int64 +} + +// FileChunk 表示文件的一部分 +type FileChunk struct { + Data []byte + FileName string + Offset int64 + IsLast bool +} + +// 发送文件的RPC +// SendFile 接收文件信息(名称和大小) +func (f *UpFileService) SendFileInfo(info FileInfo, reply *bool) error { + logger := util.NewProductionLogger() + defer logger.Sync() + // 创建保存文件的目录 + // if err := os.MkdirAll("received_files", 0755); err != nil { + // return err + // } + + // 提取路径 /www/wwwroot/bidemo.com/BIU_20250918_183144.zip + dirPath := filepath.Dir(info.FileName) + logger.Info("SendFileInfo", zap.String("dirPath", dirPath)) + // 检查文件夹是否存在 + if _, err := os.Stat(dirPath); os.IsNotExist(err) { + // 文件夹不存在,创建它 + if err := os.MkdirAll(dirPath, 0755); err != nil { + return err + } + } + *reply = true + return nil +} + +// SendFileChunk 接收文件块并写入文件 +func (f *UpFileService) SendFileChunk(chunk FileChunk, reply *bool) error { + // filePath := filepath.Join("received_files", chunk.FileName) + fmt.Printf("recive file :%s", chunk.FileName) + // 合并为实际路径 + // filePath := filepath.Join("received_files", chunk.FileName) + filePath := chunk.FileName + + // 打开文件,使用追加模式 + file, err := os.OpenFile(filePath, os.O_WRONLY, 0644) + if err != nil { + return err + } + defer file.Close() + + // 移动到指定偏移量 + if _, err := file.Seek(chunk.Offset, io.SeekStart); err != nil { + return err + } + + // 写入数据 + if _, err := file.Write(chunk.Data); err != nil { + return err + } + + *reply = true + return nil +} diff --git a/scagent/main.go b/scagent/main.go index 240da88..ad9b379 100644 --- a/scagent/main.go +++ b/scagent/main.go @@ -79,6 +79,13 @@ func main() { return } + // 文件上传的服务 + err = rpc.RegisterName("UpFileService", new(core.UpFileService)) + if err != nil { + logger.Error("Register failed", zap.Error(err)) + return + } + // sport := fmt.Sprintf(":%v", config.G.Port) logger.Info("Listen port", zap.String("port", sport))