package syncer import ( "log" "time" "github.com/go-mysql-org/go-mysql/canal" "github.com/go-mysql-org/go-mysql/mysql" "xttbinlog/config" "xttbinlog/handler" "xttbinlog/storage" ) type DataSyncer struct { canal *canal.Canal handler *handler.EventHandler storage storage.PositionStorage config *config.Config } // NewDataSyncer 创建一个新的DataSyncer实例 // 1. 创建canal实例 // 2. 创建位置存储器 // 3. 创建事件处理器 func NewDataSyncer(cfg *config.Config) (*DataSyncer, error) { // 创建canal实例 c, err := canal.NewCanal(cfg.ToCanalConfig()) if err != nil { return nil, err } // 创建位置存储器 var posStorage storage.PositionStorage switch cfg.Storage.Type { case "redis": posStorage, err = storage.NewRedisStorage(cfg.Storage.RedisURL, "binlog_position") if err != nil { return nil, err } case "file": posStorage = storage.NewFileStorage(cfg.Storage.FilePath) default: posStorage = storage.NewFileStorage("./binlog.position") } // 创建事件处理器 evehalder := handler.NewEventHandler(cfg.Sync.BatchSize, cfg.Sync.Workers) syncer := &DataSyncer{ canal: c, handler: evehalder, storage: posStorage, config: cfg, } // 设置事件处理器 - 这个设置是正确的 // c.SetEventHandler(evehalder) return syncer, nil } // Start 启动数据同步 // 1. 加载保存的位置 // 2. 启动canal连接 func (ds *DataSyncer) Start() error { // 加载保存的位置 savedPos, err := ds.storage.Load() if err != nil { return err } var startPos mysql.Position if savedPos.Name != "" { log.Printf("从保存的位置开始: %s", savedPos) startPos = savedPos } else { // 第一次启动,从当前位置开始 pos, err := ds.canal.GetMasterPos() if err != nil { return err } startPos = pos log.Printf("第一次启动,从当前位置开始: %s", startPos) } // 启动同步 go ds.positionSaver() log.Printf("开始数据同步...") return ds.canal.RunFrom(startPos) } // positionSaver 定期保存当前位置 func (ds *DataSyncer) positionSaver() { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: pos := ds.canal.SyncedPosition() if err := ds.storage.Save(pos); err != nil { log.Printf("保存位置失败: %v", err) } } } } // Stop 停止数据同步 // 1. 保存最后的位置 // 2. 关闭事件处理器 // 3. 关闭canal连接 func (ds *DataSyncer) Stop() { log.Printf("停止数据同步...") // 保存最后的位置 pos := ds.canal.SyncedPosition() if err := ds.storage.Save(pos); err != nil { log.Printf("保存最后位置失败: %v", err) } ds.handler.Close() ds.canal.Close() }