You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
98 lines
2.0 KiB
98 lines
2.0 KiB
package storage
|
|
|
|
import (
|
|
"encoding/json"
|
|
"os"
|
|
"sync"
|
|
|
|
"github.com/go-mysql-org/go-mysql/mysql"
|
|
"github.com/go-redis/redis/v8"
|
|
)
|
|
|
|
type PositionStorage interface {
|
|
Save(position mysql.Position) error
|
|
Load() (mysql.Position, error)
|
|
}
|
|
|
|
// 文件存储
|
|
type FileStorage struct {
|
|
filePath string
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
// NewFileStorage 创建一个新的文件存储实例
|
|
// 1. filePath: 位置信息存储的文件路径
|
|
func NewFileStorage(filePath string) *FileStorage {
|
|
return &FileStorage{filePath: filePath}
|
|
}
|
|
|
|
func (fs *FileStorage) Save(pos mysql.Position) error {
|
|
fs.mutex.Lock()
|
|
defer fs.mutex.Unlock()
|
|
|
|
data, err := json.Marshal(pos)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return os.WriteFile(fs.filePath, data, 0644)
|
|
}
|
|
|
|
// Load 从文件加载位置信息
|
|
// 1. 如果文件不存在,返回空位置
|
|
// 2. 如果文件存在,解析JSON数据
|
|
func (fs *FileStorage) Load() (mysql.Position, error) {
|
|
fs.mutex.RLock()
|
|
defer fs.mutex.RUnlock()
|
|
|
|
data, err := os.ReadFile(fs.filePath)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return mysql.Position{}, nil
|
|
}
|
|
return mysql.Position{}, err
|
|
}
|
|
|
|
var pos mysql.Position
|
|
err = json.Unmarshal(data, &pos)
|
|
return pos, err
|
|
}
|
|
|
|
// Redis存储
|
|
type RedisStorage struct {
|
|
client *redis.Client
|
|
key string
|
|
}
|
|
|
|
func NewRedisStorage(redisURL, key string) (*RedisStorage, error) {
|
|
opt, err := redis.ParseURL(redisURL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
client := redis.NewClient(opt)
|
|
return &RedisStorage{client: client, key: key}, nil
|
|
}
|
|
|
|
func (rs *RedisStorage) Save(pos mysql.Position) error {
|
|
data, err := json.Marshal(pos)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return rs.client.Set(rs.client.Context(), rs.key, data, 0).Err()
|
|
}
|
|
|
|
func (rs *RedisStorage) Load() (mysql.Position, error) {
|
|
data, err := rs.client.Get(rs.client.Context(), rs.key).Bytes()
|
|
if err == redis.Nil {
|
|
return mysql.Position{}, nil
|
|
}
|
|
if err != nil {
|
|
return mysql.Position{}, err
|
|
}
|
|
|
|
var pos mysql.Position
|
|
err = json.Unmarshal(data, &pos)
|
|
return pos, err
|
|
}
|
|
|