diff --git a/mblog/go.mod b/mblog/go.mod index 9fd35a3..e324f30 100644 --- a/mblog/go.mod +++ b/mblog/go.mod @@ -2,22 +2,4 @@ module mblog go 1.24.6 -require github.com/go-mysql-org/go-mysql v1.13.0 - -require ( - filippo.io/edwards25519 v1.1.0 // indirect - github.com/BurntSushi/toml v1.3.2 // indirect - github.com/goccy/go-json v0.10.2 // indirect - github.com/google/uuid v1.3.0 // indirect - github.com/klauspost/compress v1.17.8 // indirect - github.com/pingcap/errors v0.11.5-0.20250318082626-8f80e5cb09ec // indirect - github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect - github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a // indirect - github.com/pingcap/tidb/pkg/parser v0.0.0-20250421232622-526b2c79173d // indirect - github.com/shopspring/decimal v1.2.0 // indirect - go.uber.org/atomic v1.11.0 // indirect - go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 // indirect - golang.org/x/text v0.24.0 // indirect - gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect -) +require github.com/go-sql-driver/mysql v1.7.1 diff --git a/mblog/go.sum b/mblog/go.sum index 458adf8..fd7ae07 100644 --- a/mblog/go.sum +++ b/mblog/go.sum @@ -1,79 +1,2 @@ -filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= -filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= -github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8= -github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= -github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-mysql-org/go-mysql v1.13.0 h1:Hlsa5x1bX/wBFtMbdIOmb6YzyaVNBWnwrb8gSIEPMDc= -github.com/go-mysql-org/go-mysql v1.13.0/go.mod h1:FQxw17uRbFvMZFK+dPtIPufbU46nBdrGaxOw0ac9MFs= github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= -github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= -github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= -github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= -github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= -github.com/pingcap/errors v0.11.5-0.20250318082626-8f80e5cb09ec h1:3EiGmeJWoNixU+EwllIn26x6s4njiWRXewdx2zlYa84= -github.com/pingcap/errors v0.11.5-0.20250318082626-8f80e5cb09ec/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= -github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE= -github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4= -github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a h1:WIhmJBlNGmnCWH6TLMdZfNEDaiU8cFpZe3iaqDbQ0M8= -github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a/go.mod h1:ORfBOFp1eteu2odzsyaxI+b8TzJwgjwyQcGhI+9SfEA= -github.com/pingcap/tidb/pkg/parser v0.0.0-20250421232622-526b2c79173d h1:3Ej6eTuLZp25p3aH/EXdReRHY12hjZYs3RrGp7iLdag= -github.com/pingcap/tidb/pkg/parser v0.0.0-20250421232622-526b2c79173d/go.mod h1:+8feuexTKcXHZF/dkDfvCwEyBAmgb4paFc3/WeYV2eE= -github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ= -github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= -go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= -go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= -go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= -go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= -go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= -go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= -go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= -go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= -go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= -go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= -golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= -golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= -gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/mblog/main.go b/mblog/main.go index 59901ed..36a1c6e 100644 --- a/mblog/main.go +++ b/mblog/main.go @@ -1,164 +1,461 @@ package main import ( + "database/sql" + "encoding/csv" "flag" "fmt" "log" + "os" + "path/filepath" + "strconv" "strings" + "sync" + "time" - "github.com/go-mysql-org/go-mysql/canal" - "github.com/go-mysql-org/go-mysql/mysql" + _ "github.com/go-sql-driver/mysql" ) -// 自定义事件处理结构体 -type MyEventHandler struct { - canal.DummyEventHandler +// BackupConfig 备份配置 +type BackupConfig struct { + Host string + Port int + User string + Password string + Database string + BackupDir string + KeepDays int + Async bool } -// 处理行事件 -func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error { - table := e.Table - sql := "" - - switch e.Action { - case canal.InsertAction: - // 处理插入事件 - columns := make([]string, len(table.Columns)) - for i, col := range table.Columns { - columns[i] = col.Name - } +// BackupResult 备份结果 +type BackupResult struct { + Success bool + Message string + StartTime time.Time + EndTime time.Time + Filename string +} + +// BackupManager 备份管理器 +type BackupManager struct { + config BackupConfig + db *sql.DB + mutex sync.Mutex + running bool +} + +// NewBackupManager 创建新的备份管理器 +func NewBackupManager(config BackupConfig) (*BackupManager, error) { + dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true", + config.User, config.Password, config.Host, config.Port, config.Database) + + db, err := sql.Open("mysql", dsn) + if err != nil { + return nil, fmt.Errorf("无法连接数据库: %v", err) + } - values := make([]string, len(e.Rows[0])) - for i, val := range e.Rows[0] { - values[i] = formatValue(val) + // 测试连接 + if err := db.Ping(); err != nil { + return nil, fmt.Errorf("数据库连接失败: %v", err) + } + + return &BackupManager{ + config: config, + db: db, + }, nil +} + +// 获取数据库中所有表 +func (m *BackupManager) getTables() ([]string, error) { + query := "SHOW TABLES" + rows, err := m.db.Query(query) + if err != nil { + return nil, fmt.Errorf("查询表失败: %v", err) + } + defer rows.Close() + + var tables []string + for rows.Next() { + var table string + if err := rows.Scan(&table); err != nil { + return nil, fmt.Errorf("扫描表名失败: %v", err) } + tables = append(tables, table) + } - sql = fmt.Sprintf("INSERT INTO `%s`.`%s` (%s) VALUES (%s);", - table.Schema, table.Name, - strings.Join(columns, ", "), - strings.Join(values, ", ")) + return tables, nil +} - case canal.UpdateAction: - // 处理更新事件 - oldRow := e.Rows[0] - newRow := e.Rows[1] +// 获取表结构 +func (m *BackupManager) getTableSchema(table string) (string, error) { + query := fmt.Sprintf("SHOW CREATE TABLE `%s`", table) + rows, err := m.db.Query(query) + if err != nil { + return "", fmt.Errorf("查询表结构失败: %v", err) + } + defer rows.Close() - sets := make([]string, 0) - wheres := make([]string, 0) + var ( + tbl string + sql string + ) - for i, col := range table.Columns { - if oldRow[i] != newRow[i] { - sets = append(sets, fmt.Sprintf("`%s` = %s", col.Name, formatValue(newRow[i]))) - } - wheres = append(wheres, fmt.Sprintf("`%s` = %s", col.Name, formatValue(oldRow[i]))) + if rows.Next() { + if err := rows.Scan(&tbl, &sql); err != nil { + return "", fmt.Errorf("扫描表结构失败: %v", err) } + } + + return sql, nil +} - sql = fmt.Sprintf("UPDATE `%s`.`%s` SET %s WHERE %s;", - table.Schema, table.Name, - strings.Join(sets, ", "), - strings.Join(wheres, " AND ")) +// 备份表数据到CSV文件 +func (m *BackupManager) backupTableData(table string, dataFile string) error { + // 获取表列信息 + columnsQuery := fmt.Sprintf("DESCRIBE `%s`", table) + rows, err := m.db.Query(columnsQuery) + if err != nil { + return fmt.Errorf("查询表列信息失败: %v", err) + } + defer rows.Close() - case canal.DeleteAction: - // 处理删除事件 - wheres := make([]string, len(table.Columns)) - for i, col := range table.Columns { - wheres[i] = fmt.Sprintf("`%s` = %s", col.Name, formatValue(e.Rows[0][i])) + var columns []string + for rows.Next() { + var ( + field, typ, null, key, extra string + defaultVal sql.NullString + ) + if err := rows.Scan(&field, &typ, &null, &key, &defaultVal, &extra); err != nil { + return fmt.Errorf("扫描表列信息失败: %v", err) } + columns = append(columns, field) + } - sql = fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE %s;", - table.Schema, table.Name, - strings.Join(wheres, " AND ")) + // 打开数据文件 + file, err := os.Create(dataFile) + if err != nil { + return fmt.Errorf("创建数据文件失败: %v", err) } + defer file.Close() + + writer := csv.NewWriter(file) + defer writer.Flush() - if sql != "" { - fmt.Println(sql) + // 写入列名 + if err := writer.Write(columns); err != nil { + return fmt.Errorf("写入列名失败: %v", err) } - return nil -} + // 分页查询数据 + limit := 1000 + offset := 0 + columnsStr := "`" + strings.Join(columns, "`, `") + "`" -// string -func (h *MyEventHandler) String() string { return "MyEventHandler" } + for { + dataQuery := fmt.Sprintf("SELECT %s FROM `%s` LIMIT %d OFFSET %d", + columnsStr, table, limit, offset) -// 处理DDL事件 -// func (h *MyEventHandler) OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error { -// sql := string(queryEvent.Query) -// if sql != "" { -// fmt.Println(sql + ";") -// } -// return nil -// } + dataRows, err := m.db.Query(dataQuery) + if err != nil { + return fmt.Errorf("查询表数据失败: %v", err) + } -// 格式化值为SQL表示形式 -func formatValue(value interface{}) string { + // 获取列类型信息 + columnTypes, err := dataRows.ColumnTypes() + if err != nil { + dataRows.Close() + return fmt.Errorf("获取列类型失败: %v", err) + } + + rowCount := 0 + for dataRows.Next() { + rowCount++ + // 创建一个接口切片来存储一行数据 + values := make([]interface{}, len(columns)) + valuePtrs := make([]interface{}, len(columns)) + + for i := range values { + valuePtrs[i] = &values[i] + } + + // 扫描行数据 + if err := dataRows.Scan(valuePtrs...); err != nil { + dataRows.Close() + return fmt.Errorf("扫描表数据失败: %v", err) + } + + // 处理数据并写入CSV + csvRow := make([]string, len(columns)) + for i, val := range values { + colType := columnTypes[i].DatabaseTypeName() + csvRow[i] = formatValue(val, colType) + } + + if err := writer.Write(csvRow); err != nil { + dataRows.Close() + return fmt.Errorf("写入数据到CSV失败: %v", err) + } + } + + dataRows.Close() + + // 如果获取的行数小于limit,说明已经到最后一页 + if rowCount < limit { + break + } + + offset += limit + } + + return nil +} + +// 格式化值为字符串 +func formatValue(value interface{}, colType string) string { if value == nil { return "NULL" } switch v := value.(type) { - case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64: - return fmt.Sprintf("%v", v) case []byte: // 处理二进制数据 - return fmt.Sprintf("X'%x'", v) + return string(v) case string: - // 转义单引号 - return fmt.Sprintf("'%s'", strings.ReplaceAll(v, "'", "''")) + return v + case int64: + return strconv.FormatInt(v, 10) + case float64: + return strconv.FormatFloat(v, 'f', -1, 64) + case bool: + if v { + return "1" + } + return "0" + case time.Time: + return v.Format("2006-01-02 15:04:05") default: - return fmt.Sprintf("'%v'", v) + return fmt.Sprintf("%v", v) } } +// 创建备份目录 +func (m *BackupManager) createBackupDir(timestamp string) (string, error) { + backupPath := filepath.Join(m.config.BackupDir, + fmt.Sprintf("%s_%s", m.config.Database, timestamp)) + + if err := os.MkdirAll(backupPath, 0755); err != nil { + return "", fmt.Errorf("创建备份目录失败: %v", err) + } + + return backupPath, nil +} + +// 备份表结构到文件 +func (m *BackupManager) backupTableSchema(table string, schemaFile string) error { + schema, err := m.getTableSchema(table) + if err != nil { + return err + } + + // 写入表结构到文件 + if err := os.WriteFile(schemaFile, []byte(schema+";\n"), 0644); err != nil { + return fmt.Errorf("写入表结构失败: %v", err) + } + + return nil +} + +// 清理旧备份 +func (m *BackupManager) cleanOldBackups() error { + if m.config.KeepDays <= 0 { + return nil + } + + cutoffTime := time.Now().AddDate(0, 0, -m.config.KeepDays) + + // 读取备份目录 + entries, err := os.ReadDir(m.config.BackupDir) + if err != nil { + return fmt.Errorf("读取备份目录失败: %v", err) + } + + for _, entry := range entries { + if !entry.IsDir() { + continue + } + + // 检查目录名是否符合备份命名规范 + if !strings.HasPrefix(entry.Name(), m.config.Database+"_") { + continue + } + + // 获取目录修改时间 + info, err := entry.Info() + if err != nil { + log.Printf("获取目录信息失败: %v", err) + continue + } + + // 如果目录超过保留天数,删除它 + if info.ModTime().Before(cutoffTime) { + path := filepath.Join(m.config.BackupDir, entry.Name()) + if err := os.RemoveAll(path); err != nil { + log.Printf("删除旧备份失败: %v", err) + } else { + log.Printf("已删除旧备份: %s", path) + } + } + } + + return nil +} + +// 执行备份 +func (m *BackupManager) performBackup() BackupResult { + result := BackupResult{ + StartTime: time.Now(), + Success: false, + } + + // 检查是否已经在运行备份 + m.mutex.Lock() + if m.running { + m.mutex.Unlock() + result.Message = "备份正在进行中" + return result + } + m.running = true + m.mutex.Unlock() + + defer func() { + m.mutex.Lock() + m.running = false + m.mutex.Unlock() + result.EndTime = time.Now() + }() + + // 获取当前时间戳作为备份标识 + timestamp := result.StartTime.Format("20060102150405") + + // 创建备份目录 + backupDir, err := m.createBackupDir(timestamp) + if err != nil { + result.Message = fmt.Sprintf("创建备份目录失败: %v", err) + return result + } + result.Filename = filepath.Base(backupDir) + + // 获取所有表 + tables, err := m.getTables() + if err != nil { + result.Message = fmt.Sprintf("获取表列表失败: %v", err) + return result + } + + // 备份每个表 + for _, table := range tables { + log.Printf("开始备份表: %s", table) + + // 备份表结构 + schemaFile := filepath.Join(backupDir, table+".sql") + if err := m.backupTableSchema(table, schemaFile); err != nil { + result.Message = fmt.Sprintf("备份表 %s 结构失败: %v", table, err) + return result + } + + // 备份表数据 + dataFile := filepath.Join(backupDir, table+".csv") + if err := m.backupTableData(table, dataFile); err != nil { + result.Message = fmt.Sprintf("备份表 %s 数据失败: %v", table, err) + return result + } + + log.Printf("表 %s 备份完成", table) + } + + // 清理旧备份 + if err := m.cleanOldBackups(); err != nil { + log.Printf("清理旧备份时出错: %v", err) + } + + result.Success = true + result.Message = fmt.Sprintf("数据库 %s 备份成功,存储在: %s", m.config.Database, backupDir) + return result +} + +// StartBackup 启动备份(异步或同步) +func (m *BackupManager) StartBackup() (BackupResult, chan BackupResult) { + if m.config.Async { + resultChan := make(chan BackupResult, 1) + go func() { + result := m.performBackup() + resultChan <- result + close(resultChan) + }() + return BackupResult{Success: true, Message: "异步备份已启动"}, resultChan + } + + result := m.performBackup() + return result, nil +} + +// Close 关闭数据库连接 +func (m *BackupManager) Close() error { + return m.db.Close() +} + func main() { // 解析命令行参数 host := flag.String("host", "localhost", "MySQL主机地址") - port := flag.Uint("port", 3306, "MySQL端口") + port := flag.Int("port", 3306, "MySQL端口") user := flag.String("user", "root", "MySQL用户名") password := flag.String("password", "", "MySQL密码") - serverID := flag.Uint("server-id", 1001, "客户端服务器ID") - flavor := flag.String("flavor", "mysql", "数据库类型 (mysql或mariadb)") - startFile := flag.String("start-file", "", "开始读取的binlog文件名") - startPos := flag.Uint("start-pos", 4, "开始读取的位置") + database := flag.String("db", "", "要备份的数据库名") + backupDir := flag.String("dir", "backups", "备份文件存储目录") + keepDays := flag.Int("keep", 7, "备份保留天数") + async := flag.Bool("async", false, "是否异步执行备份") flag.Parse() - // 创建canal配置 - cfg := canal.NewDefaultConfig() - cfg.Addr = fmt.Sprintf("%s:%d", *host, *port) - cfg.User = *user - cfg.Password = *password - cfg.ServerID = uint32(*serverID) - cfg.Flavor = *flavor + if *database == "" { + log.Fatal("必须指定要备份的数据库名 (-db 参数)") + } - // 设置需要监听的数据库和表,默认监听所有 - // cfg.Dump.TableDB = "test_db" - // cfg.Dump.Tables = []string{"test_table"} + // 创建备份配置 + config := BackupConfig{ + Host: *host, + Port: *port, + User: *user, + Password: *password, + Database: *database, + BackupDir: *backupDir, + KeepDays: *keepDays, + Async: *async, + } - // 创建canal实例 - c, err := canal.NewCanal(cfg) + // 创建备份管理器 + manager, err := NewBackupManager(config) if err != nil { - log.Fatalf("创建canal实例失败: %v", err) + log.Fatalf("初始化备份管理器失败: %v", err) } + defer manager.Close() - // 设置事件处理器 - c.SetEventHandler(&MyEventHandler{}) + // 启动备份 + log.Println("开始数据库备份...") + result, resultChan := manager.StartBackup() + log.Println(result.Message) - // 设置起始位置 - var pos mysql.Position - if *startFile != "" { - pos = mysql.Position{Name: *startFile, Pos: uint32(*startPos)} - } else { - // 如果未指定,从当前位置开始 - pos, err = c.GetMasterPos() - if err != nil { - log.Fatalf("获取主库位置失败: %v", err) - } - fmt.Printf("从binlog位置 %s:%d 开始读取\n", pos.Name, pos.Pos) - } + // 如果是异步备份,等待结果 + if *async && resultChan != nil { + log.Println("等待备份完成...") + backupResult := <-resultChan - // 开始同步 - err = c.RunFrom(pos) - if err != nil { - log.Fatalf("同步失败: %v", err) + if backupResult.Success { + log.Printf("备份成功,耗时: %v", backupResult.EndTime.Sub(backupResult.StartTime)) + log.Println(backupResult.Message) + } else { + log.Printf("备份失败: %s", backupResult.Message) + } } } diff --git a/mblog/main.go_1 b/mblog/main.go_1 new file mode 100644 index 0000000..59901ed --- /dev/null +++ b/mblog/main.go_1 @@ -0,0 +1,164 @@ +package main + +import ( + "flag" + "fmt" + "log" + "strings" + + "github.com/go-mysql-org/go-mysql/canal" + "github.com/go-mysql-org/go-mysql/mysql" +) + +// 自定义事件处理结构体 +type MyEventHandler struct { + canal.DummyEventHandler +} + +// 处理行事件 +func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error { + table := e.Table + sql := "" + + switch e.Action { + case canal.InsertAction: + // 处理插入事件 + columns := make([]string, len(table.Columns)) + for i, col := range table.Columns { + columns[i] = col.Name + } + + values := make([]string, len(e.Rows[0])) + for i, val := range e.Rows[0] { + values[i] = formatValue(val) + } + + sql = fmt.Sprintf("INSERT INTO `%s`.`%s` (%s) VALUES (%s);", + table.Schema, table.Name, + strings.Join(columns, ", "), + strings.Join(values, ", ")) + + case canal.UpdateAction: + // 处理更新事件 + oldRow := e.Rows[0] + newRow := e.Rows[1] + + sets := make([]string, 0) + wheres := make([]string, 0) + + for i, col := range table.Columns { + if oldRow[i] != newRow[i] { + sets = append(sets, fmt.Sprintf("`%s` = %s", col.Name, formatValue(newRow[i]))) + } + wheres = append(wheres, fmt.Sprintf("`%s` = %s", col.Name, formatValue(oldRow[i]))) + } + + sql = fmt.Sprintf("UPDATE `%s`.`%s` SET %s WHERE %s;", + table.Schema, table.Name, + strings.Join(sets, ", "), + strings.Join(wheres, " AND ")) + + case canal.DeleteAction: + // 处理删除事件 + wheres := make([]string, len(table.Columns)) + for i, col := range table.Columns { + wheres[i] = fmt.Sprintf("`%s` = %s", col.Name, formatValue(e.Rows[0][i])) + } + + sql = fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE %s;", + table.Schema, table.Name, + strings.Join(wheres, " AND ")) + } + + if sql != "" { + fmt.Println(sql) + } + + return nil +} + +// string +func (h *MyEventHandler) String() string { return "MyEventHandler" } + +// 处理DDL事件 +// func (h *MyEventHandler) OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error { +// sql := string(queryEvent.Query) +// if sql != "" { +// fmt.Println(sql + ";") +// } +// return nil +// } + +// 格式化值为SQL表示形式 +func formatValue(value interface{}) string { + if value == nil { + return "NULL" + } + + switch v := value.(type) { + case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64: + return fmt.Sprintf("%v", v) + case []byte: + // 处理二进制数据 + return fmt.Sprintf("X'%x'", v) + case string: + // 转义单引号 + return fmt.Sprintf("'%s'", strings.ReplaceAll(v, "'", "''")) + default: + return fmt.Sprintf("'%v'", v) + } +} + +func main() { + // 解析命令行参数 + host := flag.String("host", "localhost", "MySQL主机地址") + port := flag.Uint("port", 3306, "MySQL端口") + user := flag.String("user", "root", "MySQL用户名") + password := flag.String("password", "", "MySQL密码") + serverID := flag.Uint("server-id", 1001, "客户端服务器ID") + flavor := flag.String("flavor", "mysql", "数据库类型 (mysql或mariadb)") + startFile := flag.String("start-file", "", "开始读取的binlog文件名") + startPos := flag.Uint("start-pos", 4, "开始读取的位置") + + flag.Parse() + + // 创建canal配置 + cfg := canal.NewDefaultConfig() + cfg.Addr = fmt.Sprintf("%s:%d", *host, *port) + cfg.User = *user + cfg.Password = *password + cfg.ServerID = uint32(*serverID) + cfg.Flavor = *flavor + + // 设置需要监听的数据库和表,默认监听所有 + // cfg.Dump.TableDB = "test_db" + // cfg.Dump.Tables = []string{"test_table"} + + // 创建canal实例 + c, err := canal.NewCanal(cfg) + if err != nil { + log.Fatalf("创建canal实例失败: %v", err) + } + + // 设置事件处理器 + c.SetEventHandler(&MyEventHandler{}) + + // 设置起始位置 + var pos mysql.Position + if *startFile != "" { + pos = mysql.Position{Name: *startFile, Pos: uint32(*startPos)} + } else { + // 如果未指定,从当前位置开始 + pos, err = c.GetMasterPos() + if err != nil { + log.Fatalf("获取主库位置失败: %v", err) + } + fmt.Printf("从binlog位置 %s:%d 开始读取\n", pos.Name, pos.Pos) + } + + // 开始同步 + err = c.RunFrom(pos) + if err != nil { + log.Fatalf("同步失败: %v", err) + } +} diff --git a/mblog/mblog b/mblog/mblog index b01ddcc..b084454 100644 Binary files a/mblog/mblog and b/mblog/mblog differ diff --git a/mblog/readme.md b/mblog/readme.md new file mode 100644 index 0000000..df2bc9e --- /dev/null +++ b/mblog/readme.md @@ -0,0 +1,9 @@ +安装依赖 +go get github.com/go-sql-driver/mysql + + +# 同步备份 +./mysql_backup -host=localhost -port=3306 -user=your_user -password=your_pass -db=your_database + +# 异步备份 +./mysql_backup -host=localhost -port=3306 -user=your_user -password=your_pass -db=your_database -async=true \ No newline at end of file