Browse Source

调整输出函数

master
xyiege 4 months ago
parent
commit
13e65c38a9
  1. 275
      scagent/core/FileRpc.go

275
scagent/core/FileRpc.go

@ -1,38 +1,21 @@
package core
import (
"context"
"bufio"
"fmt"
"io"
"log"
"mime"
"net/rpc"
"os"
"path"
"path/filepath"
"scagent/config"
pb "scagent/proto/fileupload"
"scagent/util"
"strings"
"time"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// FileService 提供文件传输服务
type FileService struct {
pb.UnimplementedFileUploadServiceServer
}
// FileChunk 表示文件分片
type FileChunk struct {
FileName string // 文件名
Data []byte // 数据分片
Offset int64 // 偏移量,用于重组
IsLast bool // 是否为最后一个分片
}
// 压缩文件
type ZipBlock struct {
Basepath string
@ -42,73 +25,65 @@ type ZipBlock struct {
RemotePath string // 远程主机的存储路径
}
// 处理文件上传
// dirpath 目标文件存放位置
func (f *FileService) UploadFile(stream pb.FileUploadService_UploadFileServer, dirpath string) error {
var (
file *os.File
fileInfo *pb.FileInfo
)
// 启用日志
logger := util.NewProductionLogger()
defer logger.Sync()
// FileTransferService 定义文件传输服务
type FileService struct{}
// 检查目录是否存在
if _, err := os.Stat(dirpath); os.IsNotExist(err) {
// 目录不存在,创建目录
if err := os.MkdirAll(dirpath, 0755); err != nil {
return err
}
// FileInfo 包含文件的元信息
type FileInfo struct {
FileName string
FileSize int64
}
// FileChunk 表示文件的一部分
type FileChunk struct {
Data []byte
FileName string
Offset int64
IsLast bool
}
// SendFile 接收文件信息(名称和大小)
func (f *FileService) SendFileInfo(info FileInfo, reply *bool) error {
// 创建保存文件的目录
if err := os.MkdirAll("received_files", 0755); err != nil {
return err
}
for {
// 接收客户端发送的文件块
chunk, err := stream.Recv()
if err == io.EOF {
// 所有块接收完毕,关闭文件并返回响应
if file != nil {
file.Close()
}
// 客户端关闭流,上传完成
logger.Info("UploadFile", zap.String("msg", "Client closed stream"))
return nil
}
if err != nil {
return err
}
// 处理第一个块,包含文件元信息
if chunk.ChunkIndex == 0 && fileInfo == nil {
fileInfo = chunk.Info
if fileInfo == nil {
logger.Error("UploadFile", zap.String("msg", "fileInfo is nil"))
return nil
}
// 创建文件保存路径
filePath := filepath.Join(dirpath, fileInfo.FileName)
// 打开或创建文件,使用追加模式
file, err = os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
return err
}
logger.Info("开始接收文件", zap.String("filename", fileInfo.FileName), zap.String("path", filePath))
}
// 写入文件内容
if file != nil && len(chunk.Content) > 0 {
_, err = file.Write(chunk.Content)
if err != nil {
logger.Error("UploadFile", zap.String("msg", "写入文件失败"), zap.Error(err))
return nil
}
// 创建文件(如果存在则截断)
filePath := filepath.Join("received_files", info.FileName)
file, err := os.Create(filePath)
if err != nil {
return err
}
defer file.Close()
// 打印上传进度
if chunk.ChunkIndex%10 == 0 { // 每10个块打印一次
progress := float64(chunk.ChunkIndex+1) / float64(chunk.TotalChunks) * 100
logger.Info("UploadFile", zap.String("msg", "文件上传进度"), zap.Float64("progress", progress), zap.Int("chunkIndex", int(chunk.ChunkIndex+1)), zap.Int("totalChunks", int(chunk.TotalChunks)))
}
}
*reply = true
return nil
}
// SendFileChunk 接收文件块并写入文件
func (f *FileService) SendFileChunk(chunk FileChunk, reply *bool) error {
filePath := filepath.Join("received_files", chunk.FileName)
// 打开文件,使用追加模式
file, err := os.OpenFile(filePath, os.O_WRONLY, 0644)
if err != nil {
return err
}
defer file.Close()
// 移动到指定偏移量
if _, err := file.Seek(chunk.Offset, io.SeekStart); err != nil {
return err
}
// 写入数据
if _, err := file.Write(chunk.Data); err != nil {
return err
}
*reply = true
return nil
}
// 压缩文件
@ -150,8 +125,6 @@ func (f *FileService) Compress(args *ZipBlock, reply *string) error {
return nil
}
// 按照1mb 大小切割分块
const chunkSize = 1024 * 1024 // 1MB
// 客户端调用RPC传送文件的函数
// 传入zip的实际路径,远程的服务器带端口,远程文件存放路径
func clientUploadFile(remote string, filePath string, uploadPath string) error {
@ -164,121 +137,73 @@ func clientUploadFile(remote string, filePath string, uploadPath string) error {
dstip := harr[0]
dport := harr[1]
// 连接到服务器
// 连接到RPC服务器
service := fmt.Sprintf("%v:%v", dstip, dport)
fmt.Printf("service: %s\n", service)
conn, err := grpc.NewClient(service, grpc.WithTransportCredentials(insecure.NewCredentials())) // 使用安全连接
// conn, err := grpc.NewClient(service)
client, err := rpc.Dial("tcp", service)
if err != nil {
logger.Error("clientUploadFile", zap.String("msg", "无法连接到服务器"), zap.Error(err))
return err
panic(err)
}
defer conn.Close()
client := pb.NewFileUploadServiceClient(conn)
// 调用RPC方法
// stream, err := client.UploadFile(context.Background())
// if err != nil {
// logger.Error("clientUploadFile", zap.String("msg", "无法创建上传流"), zap.Error(err))
// return err
// }
// 打开文件
file, err := os.Open(filePath)
if err != nil {
logger.Error("clientUploadFile", zap.String("msg", "无法打开文件"), zap.Error(err))
return err
}
defer file.Close()
defer client.Close()
// 获取文件信息
fileInfo, err := file.Stat()
fileInfo, err := os.Stat(filePath)
if err != nil {
logger.Error("clientUploadFile", zap.String("msg", "无法获取文件信息"), zap.Error(err))
return err
panic(err)
}
// 获取文件 MIME 类型
contentType := mime.TypeByExtension(filepath.Ext(filePath))
if contentType == "" {
contentType = "application/octet-stream"
// 提取文件名
fileName := filepath.Base(filePath)
// 发送文件信息
var reply bool
err = client.Call("FileService.SendFileInfo", FileInfo{
FileName: fileName,
FileSize: fileInfo.Size(),
}, &reply)
if err != nil || !reply {
logger.Error("发送文件信息失败", zap.Error(err))
}
// 计算总块数
totalChunks := int32((fileInfo.Size() + int64(chunkSize) - 1) / int64(chunkSize))
log.Printf("准备上传文件: %s, 大小: %d bytes, 总块数: %d",
fileInfo.Name(), fileInfo.Size(), totalChunks)
// 创建流式上传
// eg 函数为 远程的调用函数
stream, err := client.UploadFile(context.Background(), uploadPath)
// 打开文件准备读取
file, err := os.Open(filePath)
if err != nil {
logger.Error("clientUploadFile", zap.String("msg", "无法创建上传流"), zap.Error(err))
return err
panic(err)
}
defer file.Close()
// 分块读取并发送文件
buffer := make([]byte, chunkSize)
var chunkIndex int32 = 0
// 读取并发送文件块
buffer := make([]byte, 4096) // 4KB块大小
reader := bufio.NewReader(file)
var offset int64 = 0
// 同步等候文件的传送
for {
// 读取文件块
n, err := file.Read(buffer)
if err != nil && err != io.EOF {
logger.Error("clientUploadFile", zap.String("msg", "读取文件失败"), zap.Error(err))
return err
}
logger.Info("开始发送文件: %s (大小: %d bytes)", zap.String("filename", fileName), zap.String("path", filePath))
// 构建文件块消息
chunk := &pb.FileChunk{
Content: buffer[:n],
ChunkIndex: chunkIndex,
TotalChunks: totalChunks,
// 发送文件
for {
n, err := reader.Read(buffer)
if err != nil {
break
}
// 第一个块需要包含文件元信息
if chunkIndex == 0 {
chunk.Info = &pb.FileInfo{
FileName: fileInfo.Name(),
FileSize: fileInfo.Size(),
ContentType: contentType,
UploadPath: uploadPath,
}
}
// 判断是否是最后一块
isLast := offset+int64(n) >= fileInfo.Size()
// 发送文件块
if err := stream.Send(chunk); err != nil {
logger.Error("clientUploadFile", zap.String("msg", "发送文件块失败"), zap.Error(err))
return err
}
// 打印上传进度
if chunkIndex%10 == 0 { // 每10个块打印一次
progress := float64(chunkIndex+1) / float64(totalChunks) * 100
logger.Info("clientUploadFile", zap.String("msg", "上传进度"), zap.Float64("progress", progress), zap.Int32("chunkIndex", chunkIndex+1), zap.Int32("totalChunks", totalChunks))
err = client.Call("FileService.SendFileChunk", FileChunk{
Data: buffer[:n],
FileName: fileName,
Offset: offset,
IsLast: isLast,
}, &reply)
if err != nil || !reply {
logger.Error("发送文件块失败", zap.Error(err))
}
// 检查是否到达文件末尾
if err == io.EOF {
break
}
chunkIndex++
}
// 接收服务器响应
response, err := stream.CloseAndRecv()
if err != nil {
logger.Error("clientUploadFile", zap.String("msg", "接收服务器响应失败"), zap.Error(err))
return err
}
offset += int64(n)
if response.Success {
logger.Info("clientUploadFile", zap.String("msg", "文件上传成功"), zap.String("filePath", response.FilePath), zap.Int64("receivedSize", response.ReceivedSize))
} else {
logger.Error("clientUploadFile", zap.String("msg", "服务器返回错误"), zap.String("message", response.Message))
return fmt.Errorf("服务器返回错误: %s", response.Message)
// 显示进度
progress := float64(offset) / float64(fileInfo.Size()) * 100
fmt.Printf("\r发送进度: %.2f%%", progress)
}
fmt.Println("\n文件发送完成!")
return nil
}

Loading…
Cancel
Save