Browse Source

抽取upfile

master
xyiege 2 months ago
parent
commit
efbf3fbb98
  1. 8
      aufs/core/sendzip.go
  2. 132
      scagent/core/FileRpc.go
  3. 80
      scagent/core/UpFile.go
  4. 7
      scagent/main.go

8
aufs/core/sendzip.go

@ -71,8 +71,6 @@ func SendZip(w http.ResponseWriter, r *http.Request) {
RemotePath: remotepath[0],
}
fmt.Printf("zipblock:%v", zipblock)
// 从serip 切割出ip 和端口
splitip := strings.Split(serip[0], ":")
if len(splitip) != 2 {
@ -85,7 +83,8 @@ func SendZip(w http.ResponseWriter, r *http.Request) {
service := fmt.Sprintf("%v:%v", srcip, sport)
client, err := jsonrpc.Dial("tcp", service)
if err != nil {
fmt.Fprintf(w, "jsonrpc dial faild %v", err)
// fmt.Fprintf(w, "jsonrpc dial faild %v", err)
http.Error(w, "jsonrpc dial faild "+err.Error(), http.StatusInternalServerError)
return
}
// 调用远程方法
@ -95,7 +94,8 @@ func SendZip(w http.ResponseWriter, r *http.Request) {
defer client.Close()
if err != nil {
fmt.Fprintf(w, "jsonrpc call faild %v", err)
// fmt.Fprintf(w, "jsonrpc call faild %v", err)
http.Error(w, "jsonrpc call faild "+err.Error(), http.StatusInternalServerError)
return
}
// 输出内容

132
scagent/core/FileRpc.go

@ -1,9 +1,7 @@
package core
import (
"bufio"
"fmt"
"io"
"net/rpc"
"os"
"path"
@ -28,75 +26,6 @@ type ZipBlock struct {
// FileTransferService 定义文件传输服务
type FileService struct{}
// FileInfo 包含文件的元信息
type FileInfo struct {
FileName string
FileSize int64
}
// FileChunk 表示文件的一部分
type FileChunk struct {
Data []byte
FileName string
Offset int64
IsLast bool
}
// SendFile 接收文件信息(名称和大小)
func (f *FileService) SendFileInfo(info FileInfo, reply *bool) error {
// 创建保存文件的目录
// if err := os.MkdirAll("received_files", 0755); err != nil {
// return err
// }
// 提取路径 /www/wwwroot/bidemo.com/BIU_20250918_183144.zip
dirPath := filepath.Dir(info.FileName)
// 创建目录
if err := os.MkdirAll(dirPath, 0755); err != nil {
return err
}
// 创建文件(如果存在则截断)
filePath := filepath.Join("received_files", info.FileName)
file, err := os.Create(filePath)
if err != nil {
return err
}
defer file.Close()
*reply = true
return nil
}
// SendFileChunk 接收文件块并写入文件
func (f *FileService) SendFileChunk(chunk FileChunk, reply *bool) error {
// filePath := filepath.Join("received_files", chunk.FileName)
fmt.Printf("recive file :%s", chunk.FileName)
// 合并为实际路径
// filePath := filepath.Join("received_files", chunk.FileName)
filePath := chunk.FileName
// 打开文件,使用追加模式
file, err := os.OpenFile(filePath, os.O_WRONLY, 0644)
if err != nil {
return err
}
defer file.Close()
// 移动到指定偏移量
if _, err := file.Seek(chunk.Offset, io.SeekStart); err != nil {
return err
}
// 写入数据
if _, err := file.Write(chunk.Data); err != nil {
return err
}
*reply = true
return nil
}
// 压缩文件
// 返回压缩包的名称
func (f *FileService) Compress(args *ZipBlock, reply *string) error {
@ -139,7 +68,7 @@ func (f *FileService) Compress(args *ZipBlock, reply *string) error {
// 客户端调用RPC传送文件的函数
// 传入zip的实际路径,远程的服务器带端口,远程文件存放路径
func clientUploadFile(remote string, filePath string, uploadPath string) error {
// fmt.Printf("remote: %s, filePath: %s, uploadPath: %s\n", remote, filePath, uploadPath)
fmt.Printf("remote: %s, filePath: %s, uploadPath: %s\n", remote, filePath, uploadPath)
// 启用日志
logger := util.NewProductionLogger()
defer logger.Sync()
@ -157,68 +86,31 @@ func clientUploadFile(remote string, filePath string, uploadPath string) error {
return err
}
defer client.Close()
// 获取文件信息
fileInfo, err := os.Stat(filePath)
if err != nil {
panic(err)
// panic(err)
fmt.Printf("获取文件信息失败: %v\n", err)
return err
}
// 提取文件名
fileName := filepath.Base(filePath)
// 远程的文件名
fileName := filepath.Base(uploadPath)
fmt.Printf("fileName: %s\n", fileName)
// 发送文件信息
var reply bool
err = client.Call("FileService.SendFileInfo", FileInfo{
FileName: fileName,
FileSize: fileInfo.Size(),
}, &reply)
if err != nil || !reply {
logger.Error("发送文件信息失败", zap.Error(err))
}
// 打开文件准备读取
file, err := os.Open(filePath)
if err != nil {
panic(err)
}
defer file.Close()
// 读取并发送文件块
chunkSize := 1 * 1024 * 1024 // 1MB
buffer := make([]byte, chunkSize) // 4KB块大小
reader := bufio.NewReader(file)
var offset int64 = 0
// logger.Info("开始发送文件: %s (大小: %d bytes)", zap.String("filename", fileName), zap.String("path", filePath))
fmt.Printf("开始发送文件: %s (大小: %d bytes)\n", fileName, fileInfo.Size())
// 发送文件
for {
n, err := reader.Read(buffer)
if err != nil {
break
}
// 判断是否是最后一块
isLast := offset+int64(n) >= fileInfo.Size()
// 发送文件块
err = client.Call("FileService.SendFileChunk", FileChunk{
Data: buffer[:n],
FileName: uploadPath,
Offset: offset,
IsLast: isLast,
}, &reply)
if err != nil || !reply {
logger.Error("发送文件块失败", zap.Error(err))
}
offset += int64(n)
// 显示进度
progress := float64(offset) / float64(fileInfo.Size()) * 100
fmt.Printf("\r发送进度: %.2f%%", progress)
logger.Error("发送文件信息失败", zap.Error(err))
fmt.Printf("发送文件信息失败: %v\n", err)
return err
}
fmt.Println("\n文件发送完成!")
fmt.Printf("replay: %v\n", reply)
return nil
}

80
scagent/core/UpFile.go

@ -0,0 +1,80 @@
package core
import (
"fmt"
"io"
"os"
"path/filepath"
"scagent/util"
"go.uber.org/zap"
)
type UpFileService struct{}
// FileInfo 包含文件的元信息
type FileInfo struct {
FileName string
FileSize int64
}
// FileChunk 表示文件的一部分
type FileChunk struct {
Data []byte
FileName string
Offset int64
IsLast bool
}
// 发送文件的RPC
// SendFile 接收文件信息(名称和大小)
func (f *UpFileService) SendFileInfo(info FileInfo, reply *bool) error {
logger := util.NewProductionLogger()
defer logger.Sync()
// 创建保存文件的目录
// if err := os.MkdirAll("received_files", 0755); err != nil {
// return err
// }
// 提取路径 /www/wwwroot/bidemo.com/BIU_20250918_183144.zip
dirPath := filepath.Dir(info.FileName)
logger.Info("SendFileInfo", zap.String("dirPath", dirPath))
// 检查文件夹是否存在
if _, err := os.Stat(dirPath); os.IsNotExist(err) {
// 文件夹不存在,创建它
if err := os.MkdirAll(dirPath, 0755); err != nil {
return err
}
}
*reply = true
return nil
}
// SendFileChunk 接收文件块并写入文件
func (f *UpFileService) SendFileChunk(chunk FileChunk, reply *bool) error {
// filePath := filepath.Join("received_files", chunk.FileName)
fmt.Printf("recive file :%s", chunk.FileName)
// 合并为实际路径
// filePath := filepath.Join("received_files", chunk.FileName)
filePath := chunk.FileName
// 打开文件,使用追加模式
file, err := os.OpenFile(filePath, os.O_WRONLY, 0644)
if err != nil {
return err
}
defer file.Close()
// 移动到指定偏移量
if _, err := file.Seek(chunk.Offset, io.SeekStart); err != nil {
return err
}
// 写入数据
if _, err := file.Write(chunk.Data); err != nil {
return err
}
*reply = true
return nil
}

7
scagent/main.go

@ -79,6 +79,13 @@ func main() {
return
}
// 文件上传的服务
err = rpc.RegisterName("UpFileService", new(core.UpFileService))
if err != nil {
logger.Error("Register failed", zap.Error(err))
return
}
//
sport := fmt.Sprintf(":%v", config.G.Port)
logger.Info("Listen port", zap.String("port", sport))

Loading…
Cancel
Save