You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
164 lines
4.0 KiB
164 lines
4.0 KiB
package main
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"log"
|
|
"strings"
|
|
|
|
"github.com/go-mysql-org/go-mysql/canal"
|
|
"github.com/go-mysql-org/go-mysql/mysql"
|
|
)
|
|
|
|
// 自定义事件处理结构体
|
|
type MyEventHandler struct {
|
|
canal.DummyEventHandler
|
|
}
|
|
|
|
// 处理行事件
|
|
func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error {
|
|
table := e.Table
|
|
sql := ""
|
|
|
|
switch e.Action {
|
|
case canal.InsertAction:
|
|
// 处理插入事件
|
|
columns := make([]string, len(table.Columns))
|
|
for i, col := range table.Columns {
|
|
columns[i] = col.Name
|
|
}
|
|
|
|
values := make([]string, len(e.Rows[0]))
|
|
for i, val := range e.Rows[0] {
|
|
values[i] = formatValue(val)
|
|
}
|
|
|
|
sql = fmt.Sprintf("INSERT INTO `%s`.`%s` (%s) VALUES (%s);",
|
|
table.Schema, table.Name,
|
|
strings.Join(columns, ", "),
|
|
strings.Join(values, ", "))
|
|
|
|
case canal.UpdateAction:
|
|
// 处理更新事件
|
|
oldRow := e.Rows[0]
|
|
newRow := e.Rows[1]
|
|
|
|
sets := make([]string, 0)
|
|
wheres := make([]string, 0)
|
|
|
|
for i, col := range table.Columns {
|
|
if oldRow[i] != newRow[i] {
|
|
sets = append(sets, fmt.Sprintf("`%s` = %s", col.Name, formatValue(newRow[i])))
|
|
}
|
|
wheres = append(wheres, fmt.Sprintf("`%s` = %s", col.Name, formatValue(oldRow[i])))
|
|
}
|
|
|
|
sql = fmt.Sprintf("UPDATE `%s`.`%s` SET %s WHERE %s;",
|
|
table.Schema, table.Name,
|
|
strings.Join(sets, ", "),
|
|
strings.Join(wheres, " AND "))
|
|
|
|
case canal.DeleteAction:
|
|
// 处理删除事件
|
|
wheres := make([]string, len(table.Columns))
|
|
for i, col := range table.Columns {
|
|
wheres[i] = fmt.Sprintf("`%s` = %s", col.Name, formatValue(e.Rows[0][i]))
|
|
}
|
|
|
|
sql = fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE %s;",
|
|
table.Schema, table.Name,
|
|
strings.Join(wheres, " AND "))
|
|
}
|
|
|
|
if sql != "" {
|
|
fmt.Println(sql)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// string
|
|
func (h *MyEventHandler) String() string { return "MyEventHandler" }
|
|
|
|
// 处理DDL事件
|
|
// func (h *MyEventHandler) OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error {
|
|
// sql := string(queryEvent.Query)
|
|
// if sql != "" {
|
|
// fmt.Println(sql + ";")
|
|
// }
|
|
// return nil
|
|
// }
|
|
|
|
// 格式化值为SQL表示形式
|
|
func formatValue(value interface{}) string {
|
|
if value == nil {
|
|
return "NULL"
|
|
}
|
|
|
|
switch v := value.(type) {
|
|
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64:
|
|
return fmt.Sprintf("%v", v)
|
|
case []byte:
|
|
// 处理二进制数据
|
|
return fmt.Sprintf("X'%x'", v)
|
|
case string:
|
|
// 转义单引号
|
|
return fmt.Sprintf("'%s'", strings.ReplaceAll(v, "'", "''"))
|
|
default:
|
|
return fmt.Sprintf("'%v'", v)
|
|
}
|
|
}
|
|
|
|
func main() {
|
|
// 解析命令行参数
|
|
host := flag.String("host", "localhost", "MySQL主机地址")
|
|
port := flag.Uint("port", 3306, "MySQL端口")
|
|
user := flag.String("user", "root", "MySQL用户名")
|
|
password := flag.String("password", "", "MySQL密码")
|
|
serverID := flag.Uint("server-id", 1001, "客户端服务器ID")
|
|
flavor := flag.String("flavor", "mysql", "数据库类型 (mysql或mariadb)")
|
|
startFile := flag.String("start-file", "", "开始读取的binlog文件名")
|
|
startPos := flag.Uint("start-pos", 4, "开始读取的位置")
|
|
|
|
flag.Parse()
|
|
|
|
// 创建canal配置
|
|
cfg := canal.NewDefaultConfig()
|
|
cfg.Addr = fmt.Sprintf("%s:%d", *host, *port)
|
|
cfg.User = *user
|
|
cfg.Password = *password
|
|
cfg.ServerID = uint32(*serverID)
|
|
cfg.Flavor = *flavor
|
|
|
|
// 设置需要监听的数据库和表,默认监听所有
|
|
// cfg.Dump.TableDB = "test_db"
|
|
// cfg.Dump.Tables = []string{"test_table"}
|
|
|
|
// 创建canal实例
|
|
c, err := canal.NewCanal(cfg)
|
|
if err != nil {
|
|
log.Fatalf("创建canal实例失败: %v", err)
|
|
}
|
|
|
|
// 设置事件处理器
|
|
c.SetEventHandler(&MyEventHandler{})
|
|
|
|
// 设置起始位置
|
|
var pos mysql.Position
|
|
if *startFile != "" {
|
|
pos = mysql.Position{Name: *startFile, Pos: uint32(*startPos)}
|
|
} else {
|
|
// 如果未指定,从当前位置开始
|
|
pos, err = c.GetMasterPos()
|
|
if err != nil {
|
|
log.Fatalf("获取主库位置失败: %v", err)
|
|
}
|
|
fmt.Printf("从binlog位置 %s:%d 开始读取\n", pos.Name, pos.Pos)
|
|
}
|
|
|
|
// 开始同步
|
|
err = c.RunFrom(pos)
|
|
if err != nil {
|
|
log.Fatalf("同步失败: %v", err)
|
|
}
|
|
}
|
|
|