diff --git a/mblog/go.mod b/mblog/go.mod index 214261a..43efd75 100644 --- a/mblog/go.mod +++ b/mblog/go.mod @@ -2,14 +2,16 @@ module mblog go 1.24.6 -require github.com/go-mysql-org/go-mysql v1.13.0 +require ( + github.com/go-mysql-org/go-mysql v1.13.0 + github.com/pingcap/errors v0.11.5-0.20250318082626-8f80e5cb09ec +) require ( filippo.io/edwards25519 v1.1.0 // indirect github.com/goccy/go-json v0.10.2 // indirect github.com/google/uuid v1.3.0 // indirect github.com/klauspost/compress v1.17.8 // indirect - github.com/pingcap/errors v0.11.5-0.20250318082626-8f80e5cb09ec // indirect github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a // indirect github.com/pingcap/tidb/pkg/parser v0.0.0-20250421232622-526b2c79173d // indirect github.com/shopspring/decimal v1.2.0 // indirect diff --git a/mblog/mblog b/mblog/mblog new file mode 100644 index 0000000..f997d6a Binary files /dev/null and b/mblog/mblog differ diff --git a/mblog/mbmain.go b/mblog/mbmain.go index 7146a82..b97ca0a 100644 --- a/mblog/mbmain.go +++ b/mblog/mbmain.go @@ -1,49 +1,78 @@ package main import ( + "context" "flag" "fmt" - "mblog/app" "os" - "os/signal" - "runtime" - "syscall" + + "github.com/go-mysql-org/go-mysql/mysql" + "github.com/go-mysql-org/go-mysql/replication" + "github.com/pingcap/errors" ) -var myHost = flag.String("host", "127.0.0.1", "MySQL replication host") -var myPort = flag.Int("port", 3306, "MySQL replication port") -var myUser = flag.String("user", "root", "MySQL replication user") -var myPass = flag.String("pass", "****", "MySQL replication pass") -var serverId = flag.Int("server_id", 1111, "MySQL replication server id") +var host = flag.String("host", "127.0.0.1", "MySQL host") +var port = flag.Int("port", 3306, "MySQL port") +var user = flag.String("user", "root", "MySQL user, must have replication privilege") +var password = flag.String("password", "****", "MySQL password") + +var flavor = flag.String("flavor", "mysql", "Flavor: mysql or mariadb") + +var file = flag.String("file", "mysql-bin.000032", "Binlog filename") +var pos = flag.Int("pos", 3070, "Binlog position") + +var semiSync = flag.Bool("semisync", false, "Support semi sync") +var backupPath = flag.String("backup_path", "", "backup path to store binlog files") + +var rawMode = flag.Bool("raw", false, "Use raw mode") func main() { - sc := make(chan os.Signal, 1) - signal.Notify(sc, - os.Kill, - os.Interrupt, - syscall.SIGHUP, - syscall.SIGQUIT, - syscall.SIGINT, - syscall.SIGTERM, - ) - - runtime.GOMAXPROCS(runtime.NumCPU()/4 + 1) flag.Parse() - cfg := &app.Config{ - *myHost, - *myPort, - *myUser, - *myPass, - *serverId, - "mysql-bin.000032", - 3070, + + cfg := replication.BinlogSyncerConfig{ + ServerID: 101, + Flavor: *flavor, + + Host: *host, + Port: uint16(*port), + User: *user, + Password: *password, + RawModeEnabled: *rawMode, + SemiSyncEnabled: *semiSync, + UseDecimal: true, } - srv := &app.Server{Cfg: cfg} - go srv.Run() - select { - case n := <-sc: - srv.Quit() - fmt.Printf("receive signal %v, closing", n) + b := replication.NewBinlogSyncer(cfg) + + pos := mysql.Position{Name: *file, Pos: uint32(*pos)} + if len(*backupPath) > 0 { + // Backup will always use RawMode. + err := b.StartBackup(*backupPath, pos, 0) + if err != nil { + fmt.Printf("Start backup error: %v\n", errors.ErrorStack(err)) + return + } + } else { + s, err := b.StartSync(pos) + if err != nil { + fmt.Printf("Start sync error: %v\n", errors.ErrorStack(err)) + return + } + + for { + e, err := s.GetEvent(context.Background()) + if err != nil { + // Try to output all left events + events := s.DumpEvents() + for _, e := range events { + e.Dump(os.Stdout) + } + fmt.Printf("Get event error: %v\n", errors.ErrorStack(err)) + return + } + + e.Dump(os.Stdout) + } } + }