Browse Source

mysql binlog日志读取

master
xyiege 6 months ago
parent
commit
0e5d3e25b2
  1. 28
      scagent/core/FileRpc.go
  2. 10
      scagent/core/UpFile.go

28
scagent/core/FileRpc.go

@ -97,6 +97,7 @@ func clientUploadFile(remote string, filePath string, uploadPath string) {
if err != nil { if err != nil {
logger.Error("NewUpFileClient failed", zap.Error(err)) logger.Error("NewUpFileClient failed", zap.Error(err))
} }
defer client.rpcClient.Close()
// 调用 transferFile // 调用 transferFile
err = client.TransferFile(filePath, uploadPath) err = client.TransferFile(filePath, uploadPath)
if err != nil { if err != nil {
@ -110,6 +111,7 @@ func clientUploadFile(remote string, filePath string, uploadPath string) {
// 传输文件 // 传输文件
func (c *UpFileClient) TransferFile(filePath string, uploadPath string) error { func (c *UpFileClient) TransferFile(filePath string, uploadPath string) error {
fmt.Printf("TransferFile filePath: %s, uploadPath: %s\n", filePath, uploadPath)
// 发送文件信息 // 发送文件信息
// 获取文件信息 // 获取文件信息
fileInfo, err := os.Stat(filePath) fileInfo, err := os.Stat(filePath)
@ -123,17 +125,23 @@ func (c *UpFileClient) TransferFile(filePath string, uploadPath string) error {
// 远程的文件名 // 远程的文件名
fileName := filepath.Base(uploadPath) fileName := filepath.Base(uploadPath)
fmt.Printf("fileName: %s\n", fileName) fmt.Printf("fileName: %s\n", fileName)
// 远程服务器的路径
dirpath := filepath.Dir(uploadPath)
fmt.Printf("remote dirpath: %s\n", dirpath)
fmt.Printf("file size: %d\n", fileInfo.Size())
// 发送文件信息 // 发送文件信息
var reply bool var reply string
err = c.rpcClient.Call("UpFileService.SendFileInfo", FileInfo{ // 异步
FileName: filePath, go func() {
FileSize: fileInfo.Size(), c.rpcClient.Call("UpFileService.SendFileInfo", FileInfo{
}, &reply) FileName: uploadPath,
if err != nil { FileSize: fileInfo.Size(),
fmt.Printf("发送文件信息失败: %v\n", err) }, &reply)
return err // 输出执行的结果
} fmt.Printf("SendFileInfo result: %s\n", reply)
fmt.Printf("replay: %v\n", reply) }()
return nil return nil
} }

10
scagent/core/UpFile.go

@ -28,7 +28,7 @@ type FileChunk struct {
// 发送文件的RPC // 发送文件的RPC
// SendFile 接收文件信息(名称和大小) // SendFile 接收文件信息(名称和大小)
func (f *UpFileService) SendFileInfo(info FileInfo, reply *bool) error { func (f *UpFileService) SendFileInfo(info FileInfo, reply *string) error {
logger := util.NewProductionLogger() logger := util.NewProductionLogger()
defer logger.Sync() defer logger.Sync()
// 创建保存文件的目录 // 创建保存文件的目录
@ -44,10 +44,14 @@ func (f *UpFileService) SendFileInfo(info FileInfo, reply *bool) error {
if _, err := os.Stat(dirPath); os.IsNotExist(err) { if _, err := os.Stat(dirPath); os.IsNotExist(err) {
// 文件夹不存在,创建它 // 文件夹不存在,创建它
if err := os.MkdirAll(dirPath, 0755); err != nil { if err := os.MkdirAll(dirPath, 0755); err != nil {
return err logger.Error("SendFileInfo mkdir failed", zap.Error(err))
*reply = fmt.Sprintf("mkdir folder: %s failed", dirPath)
// return err
} }
} }
*reply = true
*reply = fmt.Sprintf("folder: %s is exists", dirPath)
// *reply = true
return nil return nil
} }

Loading…
Cancel
Save