package main import ( "flag" "fmt" "log" "strings" "github.com/go-mysql-org/go-mysql/canal" "github.com/go-mysql-org/go-mysql/mysql" ) // 自定义事件处理结构体 type MyEventHandler struct { canal.DummyEventHandler } // 处理行事件 func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error { table := e.Table sql := "" switch e.Action { case canal.InsertAction: // 处理插入事件 columns := make([]string, len(table.Columns)) for i, col := range table.Columns { columns[i] = col.Name } values := make([]string, len(e.Rows[0])) for i, val := range e.Rows[0] { values[i] = formatValue(val) } sql = fmt.Sprintf("INSERT INTO `%s`.`%s` (%s) VALUES (%s);", table.Schema, table.Name, strings.Join(columns, ", "), strings.Join(values, ", ")) case canal.UpdateAction: // 处理更新事件 oldRow := e.Rows[0] newRow := e.Rows[1] sets := make([]string, 0) wheres := make([]string, 0) for i, col := range table.Columns { if oldRow[i] != newRow[i] { sets = append(sets, fmt.Sprintf("`%s` = %s", col.Name, formatValue(newRow[i]))) } wheres = append(wheres, fmt.Sprintf("`%s` = %s", col.Name, formatValue(oldRow[i]))) } sql = fmt.Sprintf("UPDATE `%s`.`%s` SET %s WHERE %s;", table.Schema, table.Name, strings.Join(sets, ", "), strings.Join(wheres, " AND ")) case canal.DeleteAction: // 处理删除事件 wheres := make([]string, len(table.Columns)) for i, col := range table.Columns { wheres[i] = fmt.Sprintf("`%s` = %s", col.Name, formatValue(e.Rows[0][i])) } sql = fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE %s;", table.Schema, table.Name, strings.Join(wheres, " AND ")) } if sql != "" { fmt.Println(sql) } return nil } // string func (h *MyEventHandler) String() string { return "MyEventHandler" } // 处理DDL事件 // func (h *MyEventHandler) OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error { // sql := string(queryEvent.Query) // if sql != "" { // fmt.Println(sql + ";") // } // return nil // } // 格式化值为SQL表示形式 func formatValue(value interface{}) string { if value == nil { return "NULL" } switch v := value.(type) { case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64: return fmt.Sprintf("%v", v) case []byte: // 处理二进制数据 return fmt.Sprintf("X'%x'", v) case string: // 转义单引号 return fmt.Sprintf("'%s'", strings.ReplaceAll(v, "'", "''")) default: return fmt.Sprintf("'%v'", v) } } func main() { // 解析命令行参数 host := flag.String("host", "localhost", "MySQL主机地址") port := flag.Uint("port", 3306, "MySQL端口") user := flag.String("user", "root", "MySQL用户名") password := flag.String("password", "", "MySQL密码") serverID := flag.Uint("server-id", 1001, "客户端服务器ID") flavor := flag.String("flavor", "mysql", "数据库类型 (mysql或mariadb)") startFile := flag.String("start-file", "", "开始读取的binlog文件名") startPos := flag.Uint("start-pos", 4, "开始读取的位置") flag.Parse() // 创建canal配置 cfg := canal.NewDefaultConfig() cfg.Addr = fmt.Sprintf("%s:%d", *host, *port) cfg.User = *user cfg.Password = *password cfg.ServerID = uint32(*serverID) cfg.Flavor = *flavor // 设置需要监听的数据库和表,默认监听所有 // cfg.Dump.TableDB = "test_db" // cfg.Dump.Tables = []string{"test_table"} // 创建canal实例 c, err := canal.NewCanal(cfg) if err != nil { log.Fatalf("创建canal实例失败: %v", err) } // 设置事件处理器 c.SetEventHandler(&MyEventHandler{}) // 设置起始位置 var pos mysql.Position if *startFile != "" { pos = mysql.Position{Name: *startFile, Pos: uint32(*startPos)} } else { // 如果未指定,从当前位置开始 pos, err = c.GetMasterPos() if err != nil { log.Fatalf("获取主库位置失败: %v", err) } fmt.Printf("从binlog位置 %s:%d 开始读取\n", pos.Name, pos.Pos) } // 开始同步 err = c.RunFrom(pos) if err != nil { log.Fatalf("同步失败: %v", err) } }