Browse Source

代码提交

master
xyiege 2 months ago
parent
commit
7eeabbd93d
  1. 6
      mblog/go.mod
  2. BIN
      mblog/mblog
  3. 362
      mblog/mbmain.go
  4. 78
      mblog/mbmain.go_bak

6
mblog/go.mod

@ -2,16 +2,14 @@ module mblog
go 1.24.6
require (
github.com/go-mysql-org/go-mysql v1.13.0
github.com/pingcap/errors v0.11.5-0.20250318082626-8f80e5cb09ec
)
require github.com/go-mysql-org/go-mysql v1.13.0
require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/pingcap/errors v0.11.5-0.20250318082626-8f80e5cb09ec // indirect
github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a // indirect
github.com/pingcap/tidb/pkg/parser v0.0.0-20250421232622-526b2c79173d // indirect
github.com/shopspring/decimal v1.2.0 // indirect

BIN
mblog/mblog

Binary file not shown.

362
mblog/mbmain.go

@ -2,77 +2,343 @@ package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/pingcap/errors"
)
var host = flag.String("host", "127.0.0.1", "MySQL host")
var port = flag.Int("port", 3306, "MySQL port")
var user = flag.String("user", "root", "MySQL user, must have replication privilege")
var password = flag.String("password", "****", "MySQL password")
// SQLWriter 用于异步输出SQL语句
type SQLWriter struct {
sqlCh chan string
}
func NewSQLWriter() *SQLWriter {
return &SQLWriter{
sqlCh: make(chan string, 1000), // 缓冲通道
}
}
// Start 启动异步SQL写入器
func (w *SQLWriter) Start() {
go func() {
for sql := range w.sqlCh {
// 这里可以替换为实际写入文件、发送到消息队列等操作
fmt.Printf("[%s] %s\n", time.Now().Format("2006-01-02 15:04:05"), sql)
}
}()
}
var flavor = flag.String("flavor", "mysql", "Flavor: mysql or mariadb")
// Write 异步写入SQL
func (w *SQLWriter) Write(sql string) {
select {
case w.sqlCh <- sql:
// 成功写入通道
default:
log.Println("警告: SQL通道拥堵,丢弃SQL语句:", sql)
}
}
var file = flag.String("file", "mysql-bin.000032", "Binlog filename")
var pos = flag.Int("pos", 3070, "Binlog position")
// Stop 停止写入器
func (w *SQLWriter) Stop() {
close(w.sqlCh)
}
var semiSync = flag.Bool("semisync", false, "Support semi sync")
var backupPath = flag.String("backup_path", "", "backup path to store binlog files")
type PositionManager struct {
filePath string
}
var rawMode = flag.Bool("raw", false, "Use raw mode")
func (pm *PositionManager) Save(pos mysql.Position) error {
data, _ := json.Marshal(pos)
return os.WriteFile(pm.filePath, data, 0644)
}
func (pm *PositionManager) Load() (mysql.Position, error) {
data, err := os.ReadFile(pm.filePath)
if err != nil {
return mysql.Position{}, err
}
var pos mysql.Position
err = json.Unmarshal(data, &pos)
return pos, err
}
// 程序启动参数
var user = flag.String("user", "root", "MySQL user, must have replication privilege")
var password = flag.String("password", "****", "MySQL password")
// 从 arg参数中获取配置信息
func main() {
flag.Parse()
// 创建SQL写入器
sqlWriter := NewSQLWriter()
sqlWriter.Start()
defer sqlWriter.Stop()
// 配置Binlog同步器
cfg := replication.BinlogSyncerConfig{
ServerID: 101,
Flavor: *flavor,
Host: *host,
Port: uint16(*port),
User: *user,
Password: *password,
RawModeEnabled: *rawMode,
SemiSyncEnabled: *semiSync,
UseDecimal: true,
}
b := replication.NewBinlogSyncer(cfg)
pos := mysql.Position{Name: *file, Pos: uint32(*pos)}
if len(*backupPath) > 0 {
// Backup will always use RawMode.
err := b.StartBackup(*backupPath, pos, 0)
if err != nil {
fmt.Printf("Start backup error: %v\n", errors.ErrorStack(err))
return
}
} else {
s, err := b.StartSync(pos)
if err != nil {
fmt.Printf("Start sync error: %v\n", errors.ErrorStack(err))
return
}
ServerID: 100, // 唯一的ServerID
Flavor: "mysql",
Host: "localhost",
Port: 3306,
User: *user,
Password: *password,
Charset: "utf8mb4",
}
for {
e, err := s.GetEvent(context.Background())
syncer := replication.NewBinlogSyncer(cfg)
defer syncer.Close()
// 获取当前Binlog位置(可选)
// 也可以从指定的位置开始,如 mysql.Position{Name: "mysql-bin.000001", Pos: 4}
position := mysql.Position{Name: "", Pos: 4}
streamer, err := syncer.StartSync(position)
if err != nil {
log.Fatalf("Failed to start sync: %v", err)
}
log.Println("开始监听MySQL Binlog...")
// 处理优雅退出
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 启动事件处理循环
go eventLoop(ctx, streamer, sqlWriter)
<-signalCh
log.Println("收到退出信号,停止监听...")
}
// 事件处理循环
func eventLoop(ctx context.Context, streamer *replication.BinlogStreamer, writer *SQLWriter) {
for {
select {
case <-ctx.Done():
return
default:
ev, err := streamer.GetEvent(ctx)
if err != nil {
// Try to output all left events
events := s.DumpEvents()
for _, e := range events {
e.Dump(os.Stdout)
if err == context.Canceled {
return
}
fmt.Printf("Get event error: %v\n", errors.ErrorStack(err))
return
log.Printf("获取Binlog事件错误: %v", err)
continue
}
e.Dump(os.Stdout)
// 解析Binlog事件
if err := parseBinlogEvent(ev, writer); err != nil {
log.Printf("解析Binlog事件错误: %v", err)
}
}
}
}
// 解析Binlog事件并生成SQL语句
func parseBinlogEvent(ev *replication.BinlogEvent, writer *SQLWriter) error {
event := ev.Header.EventType
switch event {
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
return handleWriteRows(ev, writer)
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
return handleDeleteRows(ev, writer)
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
return handleUpdateRows(ev, writer)
case replication.QUERY_EVENT:
return handleQueryEvent(ev, writer)
}
return nil
}
// 处理INSERT事件
func handleWriteRows(ev *replication.BinlogEvent, writer *SQLWriter) error {
rowsEvent, ok := ev.Event.(*replication.RowsEvent)
if !ok {
return fmt.Errorf("类型断言失败: 期望*replication.RowsEvent")
}
tableName := string(rowsEvent.Table.Table)
schemaName := string(rowsEvent.Table.Schema)
for _, row := range rowsEvent.Rows {
columns := make([]string, len(row))
values := make([]interface{}, len(row))
for i, value := range row {
columns[i] = fmt.Sprintf("column%d", i)
values[i] = value
}
sql := generateInsertSQL(schemaName, tableName, columns, values)
writer.Write(sql)
}
return nil
}
// 处理DELETE事件
func handleDeleteRows(ev *replication.BinlogEvent, writer *SQLWriter) error {
rowsEvent, ok := ev.Event.(*replication.RowsEvent)
if !ok {
return fmt.Errorf("类型断言失败: 期望*replication.RowsEvent")
}
tableName := string(rowsEvent.Table.Table)
schemaName := string(rowsEvent.Table.Schema)
for _, row := range rowsEvent.Rows {
whereClause := generateWhereClause(row)
sql := fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE %s", schemaName, tableName, whereClause)
writer.Write(sql)
}
return nil
}
// 处理UPDATE事件
func handleUpdateRows(ev *replication.BinlogEvent, writer *SQLWriter) error {
rowsEvent, ok := ev.Event.(*replication.RowsEvent)
if !ok {
return fmt.Errorf("类型断言失败: 期望*replication.RowsEvent")
}
tableName := string(rowsEvent.Table.Table)
schemaName := string(rowsEvent.Table.Schema)
// Rows是成对出现的: [旧值, 新值]
for i := 0; i < len(rowsEvent.Rows); i += 2 {
if i+1 >= len(rowsEvent.Rows) {
break
}
oldRow := rowsEvent.Rows[i]
newRow := rowsEvent.Rows[i+1]
setClause := generateSetClause(oldRow, newRow)
whereClause := generateWhereClause(oldRow)
sql := fmt.Sprintf("UPDATE `%s`.`%s` SET %s WHERE %s",
schemaName, tableName, setClause, whereClause)
writer.Write(sql)
}
return nil
}
// 处理QUERY事件(DDL语句)
func handleQueryEvent(ev *replication.BinlogEvent, writer *SQLWriter) error {
queryEvent, ok := ev.Event.(*replication.QueryEvent)
if !ok {
return fmt.Errorf("类型断言失败: 期望*replication.QueryEvent")
}
sql := string(queryEvent.Query)
writer.Write("-- DDL操作: " + sql)
return nil
}
// 生成INSERT SQL语句
func generateInsertSQL(schema, table string, columns []string, values []interface{}) string {
valueStrs := make([]string, len(values))
for i, v := range values {
valueStrs[i] = formatValue(v)
}
return fmt.Sprintf("INSERT INTO `%s`.`%s` VALUES (%s);",
schema, table, joinValues(valueStrs))
}
// 生成WHERE子句
func generateWhereClause(row []interface{}) string {
parts := make([]string, len(row))
for i, value := range row {
parts[i] = fmt.Sprintf("column%d = %s", i, formatValue(value))
}
return joinValues(parts)
}
// 生成SET子句
func generateSetClause(oldRow, newRow []interface{}) string {
parts := make([]string, len(newRow))
for i, newValue := range newRow {
oldValue := oldRow[i]
// 只更新有变化的字段
if fmt.Sprintf("%v", oldValue) != fmt.Sprintf("%v", newValue) {
parts[i] = fmt.Sprintf("column%d = %s", i, formatValue(newValue))
}
}
// 过滤空值
var nonEmptyParts []string
for _, part := range parts {
if part != "" {
nonEmptyParts = append(nonEmptyParts, part)
}
}
return joinValues(nonEmptyParts)
}
// 格式化值
func formatValue(value interface{}) string {
if value == nil {
return "NULL"
}
switch v := value.(type) {
case string:
return fmt.Sprintf("'%s'", escapeString(v))
case []byte:
return fmt.Sprintf("'%s'", escapeString(string(v)))
default:
return fmt.Sprintf("%v", v)
}
}
// 转义字符串
func escapeString(s string) string {
// 简单的转义,实际应用中可能需要更完整的实现
return s
}
// 连接值
func joinValues(values []string) string {
if len(values) == 0 {
return ""
}
result := values[0]
for i := 1; i < len(values); i++ {
result += ", " + values[i]
}
return result
}
// withRetry 重试操作直到成功或超过最大重试次数
func withRetry(operation func() error, maxRetries int) error {
var err error
for i := 0; i < maxRetries; i++ {
if err = operation(); err == nil {
return nil
}
log.Printf("操作失败,尝试重连 (%d/%d): %v", i+1, maxRetries, err)
time.Sleep(time.Duration(i+1) * time.Second)
}
return fmt.Errorf("超过最大重试次数: %v", err)
}

78
mblog/mbmain.go_bak

@ -0,0 +1,78 @@
package main
import (
"context"
"flag"
"fmt"
"os"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/pingcap/errors"
)
var host = flag.String("host", "127.0.0.1", "MySQL host")
var port = flag.Int("port", 3306, "MySQL port")
var user = flag.String("user", "root", "MySQL user, must have replication privilege")
var password = flag.String("password", "****", "MySQL password")
var flavor = flag.String("flavor", "mysql", "Flavor: mysql or mariadb")
var file = flag.String("file", "mysql-bin.000032", "Binlog filename")
var pos = flag.Int("pos", 3070, "Binlog position")
var semiSync = flag.Bool("semisync", false, "Support semi sync")
var backupPath = flag.String("backup_path", "", "backup path to store binlog files")
var rawMode = flag.Bool("raw", false, "Use raw mode")
func main() {
flag.Parse()
cfg := replication.BinlogSyncerConfig{
ServerID: 101,
Flavor: *flavor,
Host: *host,
Port: uint16(*port),
User: *user,
Password: *password,
RawModeEnabled: *rawMode,
SemiSyncEnabled: *semiSync,
UseDecimal: true,
}
b := replication.NewBinlogSyncer(cfg)
pos := mysql.Position{Name: *file, Pos: uint32(*pos)}
if len(*backupPath) > 0 {
// Backup will always use RawMode.
err := b.StartBackup(*backupPath, pos, 0)
if err != nil {
fmt.Printf("Start backup error: %v\n", errors.ErrorStack(err))
return
}
} else {
s, err := b.StartSync(pos)
if err != nil {
fmt.Printf("Start sync error: %v\n", errors.ErrorStack(err))
return
}
for {
e, err := s.GetEvent(context.Background())
if err != nil {
// Try to output all left events
events := s.DumpEvents()
for _, e := range events {
e.Dump(os.Stdout)
}
fmt.Printf("Get event error: %v\n", errors.ErrorStack(err))
return
}
e.Dump(os.Stdout)
}
}
}
Loading…
Cancel
Save