diff --git a/xttbinlog/config.yaml b/xttbinlog/config.yaml index eaad4e8..3847f06 100644 --- a/xttbinlog/config.yaml +++ b/xttbinlog/config.yaml @@ -1,7 +1,7 @@ mysql: addr: "127.0.0.1:3306" - user: "canal" - password: "canal_password" + user: "root" + password: "root" database: "test" sync: @@ -14,4 +14,8 @@ sync: # storage: # type: "redis" -# redis_url: "redis://localhost:6379/0" \ No newline at end of file +# redis_url: "redis://localhost:6379/0" + +storage: + type: "file" + file_path: "./position.json" \ No newline at end of file diff --git a/xttbinlog/main.go b/xttbinlog/main.go index f206d5f..80c1cbe 100644 --- a/xttbinlog/main.go +++ b/xttbinlog/main.go @@ -34,6 +34,7 @@ func main() { } } +// loadConfig 从文件加载配置 func loadConfig(filename string) (*config.Config, error) { data, err := os.ReadFile(filename) if err != nil { @@ -49,6 +50,7 @@ func loadConfig(filename string) (*config.Config, error) { return &cfg, nil } +// setupSignalHandler 设置信号处理 func setupSignalHandler(syncer *syncer.DataSyncer) { sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) diff --git a/xttbinlog/storage/position_storage.go b/xttbinlog/storage/position_storage.go index ec49c91..8cba8bd 100644 --- a/xttbinlog/storage/position_storage.go +++ b/xttbinlog/storage/position_storage.go @@ -20,6 +20,8 @@ type FileStorage struct { mutex sync.RWMutex } +// NewFileStorage 创建一个新的文件存储实例 +// 1. filePath: 位置信息存储的文件路径 func NewFileStorage(filePath string) *FileStorage { return &FileStorage{filePath: filePath} } @@ -36,6 +38,9 @@ func (fs *FileStorage) Save(pos mysql.Position) error { return os.WriteFile(fs.filePath, data, 0644) } +// Load 从文件加载位置信息 +// 1. 如果文件不存在,返回空位置 +// 2. 如果文件存在,解析JSON数据 func (fs *FileStorage) Load() (mysql.Position, error) { fs.mutex.RLock() defer fs.mutex.RUnlock() diff --git a/xttbinlog/syncer/data_syncer.go b/xttbinlog/syncer/data_syncer.go index e07ea38..a3b673c 100644 --- a/xttbinlog/syncer/data_syncer.go +++ b/xttbinlog/syncer/data_syncer.go @@ -19,6 +19,10 @@ type DataSyncer struct { config *config.Config } +// NewDataSyncer 创建一个新的DataSyncer实例 +// 1. 创建canal实例 +// 2. 创建位置存储器 +// 3. 创建事件处理器 func NewDataSyncer(cfg *config.Config) (*DataSyncer, error) { // 创建canal实例 c, err := canal.NewCanal(cfg.ToCanalConfig()) @@ -41,19 +45,23 @@ func NewDataSyncer(cfg *config.Config) (*DataSyncer, error) { } // 创建事件处理器 - eventHandler := handler.NewEventHandler(cfg.Sync.BatchSize, cfg.Sync.Workers) + evehalder := handler.NewEventHandler(cfg.Sync.BatchSize, cfg.Sync.Workers) syncer := &DataSyncer{ canal: c, - handler: eventHandler, + handler: evehalder, storage: posStorage, config: cfg, } - c.SetEventHandler(eventHandler) + // 设置事件处理器 - 这个设置是正确的 + // c.SetEventHandler(evehalder) return syncer, nil } +// Start 启动数据同步 +// 1. 加载保存的位置 +// 2. 启动canal连接 func (ds *DataSyncer) Start() error { // 加载保存的位置 savedPos, err := ds.storage.Load() @@ -98,6 +106,10 @@ func (ds *DataSyncer) positionSaver() { } } +// Stop 停止数据同步 +// 1. 保存最后的位置 +// 2. 关闭事件处理器 +// 3. 关闭canal连接 func (ds *DataSyncer) Stop() { log.Printf("停止数据同步...")