Browse Source

调整代码

master
xyiege 2 months ago
parent
commit
7faa937d47
  1. 8
      xttbinlog/config.yaml
  2. 2
      xttbinlog/main.go
  3. 5
      xttbinlog/storage/position_storage.go
  4. 18
      xttbinlog/syncer/data_syncer.go

8
xttbinlog/config.yaml

@ -1,7 +1,7 @@
mysql:
addr: "127.0.0.1:3306"
user: "canal"
password: "canal_password"
user: "root"
password: "root"
database: "test"
sync:
@ -15,3 +15,7 @@ sync:
# storage:
# type: "redis"
# redis_url: "redis://localhost:6379/0"
storage:
type: "file"
file_path: "./position.json"

2
xttbinlog/main.go

@ -34,6 +34,7 @@ func main() {
}
}
// loadConfig 从文件加载配置
func loadConfig(filename string) (*config.Config, error) {
data, err := os.ReadFile(filename)
if err != nil {
@ -49,6 +50,7 @@ func loadConfig(filename string) (*config.Config, error) {
return &cfg, nil
}
// setupSignalHandler 设置信号处理
func setupSignalHandler(syncer *syncer.DataSyncer) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

5
xttbinlog/storage/position_storage.go

@ -20,6 +20,8 @@ type FileStorage struct {
mutex sync.RWMutex
}
// NewFileStorage 创建一个新的文件存储实例
// 1. filePath: 位置信息存储的文件路径
func NewFileStorage(filePath string) *FileStorage {
return &FileStorage{filePath: filePath}
}
@ -36,6 +38,9 @@ func (fs *FileStorage) Save(pos mysql.Position) error {
return os.WriteFile(fs.filePath, data, 0644)
}
// Load 从文件加载位置信息
// 1. 如果文件不存在,返回空位置
// 2. 如果文件存在,解析JSON数据
func (fs *FileStorage) Load() (mysql.Position, error) {
fs.mutex.RLock()
defer fs.mutex.RUnlock()

18
xttbinlog/syncer/data_syncer.go

@ -19,6 +19,10 @@ type DataSyncer struct {
config *config.Config
}
// NewDataSyncer 创建一个新的DataSyncer实例
// 1. 创建canal实例
// 2. 创建位置存储器
// 3. 创建事件处理器
func NewDataSyncer(cfg *config.Config) (*DataSyncer, error) {
// 创建canal实例
c, err := canal.NewCanal(cfg.ToCanalConfig())
@ -41,19 +45,23 @@ func NewDataSyncer(cfg *config.Config) (*DataSyncer, error) {
}
// 创建事件处理器
eventHandler := handler.NewEventHandler(cfg.Sync.BatchSize, cfg.Sync.Workers)
evehalder := handler.NewEventHandler(cfg.Sync.BatchSize, cfg.Sync.Workers)
syncer := &DataSyncer{
canal: c,
handler: eventHandler,
handler: evehalder,
storage: posStorage,
config: cfg,
}
c.SetEventHandler(eventHandler)
// 设置事件处理器 - 这个设置是正确的
// c.SetEventHandler(evehalder)
return syncer, nil
}
// Start 启动数据同步
// 1. 加载保存的位置
// 2. 启动canal连接
func (ds *DataSyncer) Start() error {
// 加载保存的位置
savedPos, err := ds.storage.Load()
@ -98,6 +106,10 @@ func (ds *DataSyncer) positionSaver() {
}
}
// Stop 停止数据同步
// 1. 保存最后的位置
// 2. 关闭事件处理器
// 3. 关闭canal连接
func (ds *DataSyncer) Stop() {
log.Printf("停止数据同步...")

Loading…
Cancel
Save