自动更新管控端
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

167 lines
3.3 KiB

package handler
import (
"context"
"log"
"sync"
"time"
"github.com/go-mysql-org/go-mysql/canal"
"github.com/go-mysql-org/go-mysql/mysql"
)
type DataEvent struct {
Action string `json:"action"`
Schema string `json:"schema"`
Table string `json:"table"`
Timestamp time.Time `json:"timestamp"`
Data []interface{} `json:"data"`
OldData []interface{} `json:"old_data,omitempty"`
}
type EventHandler struct {
canal.DummyEventHandler
eventChan chan *DataEvent
batchSize int
workers int
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
func NewEventHandler(batchSize, workers int) *EventHandler {
ctx, cancel := context.WithCancel(context.Background())
handler := &EventHandler{
eventChan: make(chan *DataEvent, 1000),
batchSize: batchSize,
workers: workers,
ctx: ctx,
cancel: cancel,
}
handler.startWorkers()
return handler
}
func (h *EventHandler) startWorkers() {
for i := 0; i < h.workers; i++ {
h.wg.Add(1)
go h.worker(i)
}
}
func (h *EventHandler) worker(id int) {
defer h.wg.Done()
var batch []*DataEvent
batchTimer := time.NewTimer(100 * time.Millisecond)
defer batchTimer.Stop()
for {
select {
case event, ok := <-h.eventChan:
if !ok {
// 通道关闭,处理剩余数据
if len(batch) > 0 {
h.processBatch(batch)
}
return
}
batch = append(batch, event)
if len(batch) >= h.batchSize {
h.processBatch(batch)
batch = nil
batchTimer.Reset(100 * time.Millisecond)
}
case <-batchTimer.C:
if len(batch) > 0 {
h.processBatch(batch)
batch = nil
}
batchTimer.Reset(100 * time.Millisecond)
case <-h.ctx.Done():
if len(batch) > 0 {
h.processBatch(batch)
}
return
}
}
}
func (h *EventHandler) processBatch(events []*DataEvent) {
// 这里实现数据同步逻辑
// 可以同步到其他数据库、消息队列、ES等
log.Printf("处理批次: %d 个事件", len(events))
for _, event := range events {
// 示例:打印事件信息
log.Printf("同步事件: %s.%s %s", event.Schema, event.Table, event.Action)
// 实际应用中,这里可以同步到:
// 1. 其他MySQL数据库
// 2. Elasticsearch
// 3. Redis缓存
// 4. Kafka消息队列
// 5. 其他数据存储
}
}
func (h *EventHandler) OnRow(e *canal.RowsEvent) error {
var event *DataEvent
switch e.Action {
case "insert":
for _, row := range e.Rows {
event = &DataEvent{
Action: "insert",
Schema: e.Table.Schema,
Table: e.Table.Name,
Timestamp: time.Now(),
Data: row,
}
h.eventChan <- event
}
case "update":
for i := 0; i < len(e.Rows); i += 2 {
event = &DataEvent{
Action: "update",
Schema: e.Table.Schema,
Table: e.Table.Name,
Timestamp: time.Now(),
OldData: e.Rows[i],
Data: e.Rows[i+1],
}
h.eventChan <- event
}
case "delete":
for _, row := range e.Rows {
event = &DataEvent{
Action: "delete",
Schema: e.Table.Schema,
Table: e.Table.Name,
Timestamp: time.Now(),
Data: row,
}
h.eventChan <- event
}
}
return nil
}
func (h *EventHandler) OnPosSynced(pos mysql.Position, force bool) error {
log.Printf("位置已同步: %s, force: %t", pos, force)
return nil
}
func (h *EventHandler) Close() {
h.cancel()
close(h.eventChan)
h.wg.Wait()
}