You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
125 lines
2.7 KiB
125 lines
2.7 KiB
package syncer
|
|
|
|
import (
|
|
"log"
|
|
"time"
|
|
|
|
"github.com/go-mysql-org/go-mysql/canal"
|
|
"github.com/go-mysql-org/go-mysql/mysql"
|
|
|
|
"xttbinlog/config"
|
|
"xttbinlog/handler"
|
|
"xttbinlog/storage"
|
|
)
|
|
|
|
type DataSyncer struct {
|
|
canal *canal.Canal
|
|
handler *handler.EventHandler
|
|
storage storage.PositionStorage
|
|
config *config.Config
|
|
}
|
|
|
|
// NewDataSyncer 创建一个新的DataSyncer实例
|
|
// 1. 创建canal实例
|
|
// 2. 创建位置存储器
|
|
// 3. 创建事件处理器
|
|
func NewDataSyncer(cfg *config.Config) (*DataSyncer, error) {
|
|
// 创建canal实例
|
|
c, err := canal.NewCanal(cfg.ToCanalConfig())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// 创建位置存储器
|
|
var posStorage storage.PositionStorage
|
|
switch cfg.Storage.Type {
|
|
case "redis":
|
|
posStorage, err = storage.NewRedisStorage(cfg.Storage.RedisURL, "binlog_position")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
case "file":
|
|
posStorage = storage.NewFileStorage(cfg.Storage.FilePath)
|
|
default:
|
|
posStorage = storage.NewFileStorage("./binlog.position")
|
|
}
|
|
|
|
// 创建事件处理器
|
|
evehalder := handler.NewEventHandler(cfg.Sync.BatchSize, cfg.Sync.Workers)
|
|
|
|
syncer := &DataSyncer{
|
|
canal: c,
|
|
handler: evehalder,
|
|
storage: posStorage,
|
|
config: cfg,
|
|
}
|
|
|
|
// 设置事件处理器 - 这个设置是正确的
|
|
// c.SetEventHandler(evehalder)
|
|
return syncer, nil
|
|
}
|
|
|
|
// Start 启动数据同步
|
|
// 1. 加载保存的位置
|
|
// 2. 启动canal连接
|
|
func (ds *DataSyncer) Start() error {
|
|
// 加载保存的位置
|
|
savedPos, err := ds.storage.Load()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var startPos mysql.Position
|
|
|
|
if savedPos.Name != "" {
|
|
log.Printf("从保存的位置开始: %s", savedPos)
|
|
startPos = savedPos
|
|
} else {
|
|
// 第一次启动,从当前位置开始
|
|
pos, err := ds.canal.GetMasterPos()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
startPos = pos
|
|
log.Printf("第一次启动,从当前位置开始: %s", startPos)
|
|
}
|
|
|
|
// 启动同步
|
|
go ds.positionSaver()
|
|
|
|
log.Printf("开始数据同步...")
|
|
return ds.canal.RunFrom(startPos)
|
|
}
|
|
|
|
// positionSaver 定期保存当前位置
|
|
func (ds *DataSyncer) positionSaver() {
|
|
ticker := time.NewTicker(5 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
pos := ds.canal.SyncedPosition()
|
|
if err := ds.storage.Save(pos); err != nil {
|
|
log.Printf("保存位置失败: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Stop 停止数据同步
|
|
// 1. 保存最后的位置
|
|
// 2. 关闭事件处理器
|
|
// 3. 关闭canal连接
|
|
func (ds *DataSyncer) Stop() {
|
|
log.Printf("停止数据同步...")
|
|
|
|
// 保存最后的位置
|
|
pos := ds.canal.SyncedPosition()
|
|
if err := ds.storage.Save(pos); err != nil {
|
|
log.Printf("保存最后位置失败: %v", err)
|
|
}
|
|
|
|
ds.handler.Close()
|
|
ds.canal.Close()
|
|
}
|
|
|