package handler import ( "context" "log" "sync" "time" "github.com/go-mysql-org/go-mysql/canal" "github.com/go-mysql-org/go-mysql/mysql" ) type DataEvent struct { Action string `json:"action"` Schema string `json:"schema"` Table string `json:"table"` Timestamp time.Time `json:"timestamp"` Data []interface{} `json:"data"` OldData []interface{} `json:"old_data,omitempty"` } type EventHandler struct { canal.DummyEventHandler eventChan chan *DataEvent batchSize int workers int wg sync.WaitGroup ctx context.Context cancel context.CancelFunc } func NewEventHandler(batchSize, workers int) *EventHandler { ctx, cancel := context.WithCancel(context.Background()) handler := &EventHandler{ eventChan: make(chan *DataEvent, 1000), batchSize: batchSize, workers: workers, ctx: ctx, cancel: cancel, } handler.startWorkers() return handler } func (h *EventHandler) startWorkers() { for i := 0; i < h.workers; i++ { h.wg.Add(1) go h.worker(i) } } func (h *EventHandler) worker(id int) { defer h.wg.Done() var batch []*DataEvent batchTimer := time.NewTimer(100 * time.Millisecond) defer batchTimer.Stop() for { select { case event, ok := <-h.eventChan: if !ok { // 通道关闭,处理剩余数据 if len(batch) > 0 { h.processBatch(batch) } return } batch = append(batch, event) if len(batch) >= h.batchSize { h.processBatch(batch) batch = nil batchTimer.Reset(100 * time.Millisecond) } case <-batchTimer.C: if len(batch) > 0 { h.processBatch(batch) batch = nil } batchTimer.Reset(100 * time.Millisecond) case <-h.ctx.Done(): if len(batch) > 0 { h.processBatch(batch) } return } } } func (h *EventHandler) processBatch(events []*DataEvent) { // 这里实现数据同步逻辑 // 可以同步到其他数据库、消息队列、ES等 log.Printf("处理批次: %d 个事件", len(events)) for _, event := range events { // 示例:打印事件信息 log.Printf("同步事件: %s.%s %s", event.Schema, event.Table, event.Action) // 实际应用中,这里可以同步到: // 1. 其他MySQL数据库 // 2. Elasticsearch // 3. Redis缓存 // 4. Kafka消息队列 // 5. 其他数据存储 } } func (h *EventHandler) OnRow(e *canal.RowsEvent) error { var event *DataEvent switch e.Action { case "insert": for _, row := range e.Rows { event = &DataEvent{ Action: "insert", Schema: e.Table.Schema, Table: e.Table.Name, Timestamp: time.Now(), Data: row, } h.eventChan <- event } case "update": for i := 0; i < len(e.Rows); i += 2 { event = &DataEvent{ Action: "update", Schema: e.Table.Schema, Table: e.Table.Name, Timestamp: time.Now(), OldData: e.Rows[i], Data: e.Rows[i+1], } h.eventChan <- event } case "delete": for _, row := range e.Rows { event = &DataEvent{ Action: "delete", Schema: e.Table.Schema, Table: e.Table.Name, Timestamp: time.Now(), Data: row, } h.eventChan <- event } } return nil } func (h *EventHandler) OnPosSynced(pos mysql.Position, force bool) error { log.Printf("位置已同步: %s, force: %t", pos, force) return nil } func (h *EventHandler) Close() { h.cancel() close(h.eventChan) h.wg.Wait() }