package storage import ( "encoding/json" "os" "sync" "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-redis/redis/v8" ) type PositionStorage interface { Save(position mysql.Position) error Load() (mysql.Position, error) } // 文件存储 type FileStorage struct { filePath string mutex sync.RWMutex } // NewFileStorage 创建一个新的文件存储实例 // 1. filePath: 位置信息存储的文件路径 func NewFileStorage(filePath string) *FileStorage { return &FileStorage{filePath: filePath} } func (fs *FileStorage) Save(pos mysql.Position) error { fs.mutex.Lock() defer fs.mutex.Unlock() data, err := json.Marshal(pos) if err != nil { return err } return os.WriteFile(fs.filePath, data, 0644) } // Load 从文件加载位置信息 // 1. 如果文件不存在,返回空位置 // 2. 如果文件存在,解析JSON数据 func (fs *FileStorage) Load() (mysql.Position, error) { fs.mutex.RLock() defer fs.mutex.RUnlock() data, err := os.ReadFile(fs.filePath) if err != nil { if os.IsNotExist(err) { return mysql.Position{}, nil } return mysql.Position{}, err } var pos mysql.Position err = json.Unmarshal(data, &pos) return pos, err } // Redis存储 type RedisStorage struct { client *redis.Client key string } func NewRedisStorage(redisURL, key string) (*RedisStorage, error) { opt, err := redis.ParseURL(redisURL) if err != nil { return nil, err } client := redis.NewClient(opt) return &RedisStorage{client: client, key: key}, nil } func (rs *RedisStorage) Save(pos mysql.Position) error { data, err := json.Marshal(pos) if err != nil { return err } return rs.client.Set(rs.client.Context(), rs.key, data, 0).Err() } func (rs *RedisStorage) Load() (mysql.Position, error) { data, err := rs.client.Get(rs.client.Context(), rs.key).Bytes() if err == redis.Nil { return mysql.Position{}, nil } if err != nil { return mysql.Position{}, err } var pos mysql.Position err = json.Unmarshal(data, &pos) return pos, err }