自动更新管控端
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

125 lines
2.7 KiB

package syncer
import (
"log"
"time"
"github.com/go-mysql-org/go-mysql/canal"
"github.com/go-mysql-org/go-mysql/mysql"
"xttbinlog/config"
"xttbinlog/handler"
"xttbinlog/storage"
)
type DataSyncer struct {
canal *canal.Canal
handler *handler.EventHandler
storage storage.PositionStorage
config *config.Config
}
// NewDataSyncer 创建一个新的DataSyncer实例
// 1. 创建canal实例
// 2. 创建位置存储器
// 3. 创建事件处理器
func NewDataSyncer(cfg *config.Config) (*DataSyncer, error) {
// 创建canal实例
c, err := canal.NewCanal(cfg.ToCanalConfig())
if err != nil {
return nil, err
}
// 创建位置存储器
var posStorage storage.PositionStorage
switch cfg.Storage.Type {
case "redis":
posStorage, err = storage.NewRedisStorage(cfg.Storage.RedisURL, "binlog_position")
if err != nil {
return nil, err
}
case "file":
posStorage = storage.NewFileStorage(cfg.Storage.FilePath)
default:
posStorage = storage.NewFileStorage("./binlog.position")
}
// 创建事件处理器
evehalder := handler.NewEventHandler(cfg.Sync.BatchSize, cfg.Sync.Workers)
syncer := &DataSyncer{
canal: c,
handler: evehalder,
storage: posStorage,
config: cfg,
}
// 设置事件处理器 - 这个设置是正确的
// c.SetEventHandler(evehalder)
return syncer, nil
}
// Start 启动数据同步
// 1. 加载保存的位置
// 2. 启动canal连接
func (ds *DataSyncer) Start() error {
// 加载保存的位置
savedPos, err := ds.storage.Load()
if err != nil {
return err
}
var startPos mysql.Position
if savedPos.Name != "" {
log.Printf("从保存的位置开始: %s", savedPos)
startPos = savedPos
} else {
// 第一次启动,从当前位置开始
pos, err := ds.canal.GetMasterPos()
if err != nil {
return err
}
startPos = pos
log.Printf("第一次启动,从当前位置开始: %s", startPos)
}
// 启动同步
go ds.positionSaver()
log.Printf("开始数据同步...")
return ds.canal.RunFrom(startPos)
}
// positionSaver 定期保存当前位置
func (ds *DataSyncer) positionSaver() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
pos := ds.canal.SyncedPosition()
if err := ds.storage.Save(pos); err != nil {
log.Printf("保存位置失败: %v", err)
}
}
}
}
// Stop 停止数据同步
// 1. 保存最后的位置
// 2. 关闭事件处理器
// 3. 关闭canal连接
func (ds *DataSyncer) Stop() {
log.Printf("停止数据同步...")
// 保存最后的位置
pos := ds.canal.SyncedPosition()
if err := ds.storage.Save(pos); err != nil {
log.Printf("保存最后位置失败: %v", err)
}
ds.handler.Close()
ds.canal.Close()
}