You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
377 lines
9.9 KiB
377 lines
9.9 KiB
/*
|
|
* Copyright 2020-2021 the original author(https://github.com/wj596)
|
|
*
|
|
* <p>
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
* </p>
|
|
*/
|
|
package endpoint
|
|
|
|
import (
|
|
"context"
|
|
"log"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/juju/errors"
|
|
"github.com/siddontang/go-mysql/canal"
|
|
"github.com/siddontang/go-mysql/mysql"
|
|
"go.mongodb.org/mongo-driver/bson"
|
|
"go.mongodb.org/mongo-driver/mongo"
|
|
"go.mongodb.org/mongo-driver/mongo/options"
|
|
"go.mongodb.org/mongo-driver/mongo/readpref"
|
|
|
|
"go-mysql-transfer/global"
|
|
"go-mysql-transfer/metrics"
|
|
"go-mysql-transfer/model"
|
|
"go-mysql-transfer/service/luaengine"
|
|
"go-mysql-transfer/util/logs"
|
|
"go-mysql-transfer/util/stringutil"
|
|
)
|
|
|
|
type cKey struct {
|
|
database string
|
|
collection string
|
|
}
|
|
|
|
type MongoEndpoint struct {
|
|
options *options.ClientOptions
|
|
client *mongo.Client
|
|
lock sync.Mutex
|
|
collections map[cKey]*mongo.Collection
|
|
collLock sync.RWMutex
|
|
|
|
retryLock sync.Mutex
|
|
}
|
|
|
|
func newMongoEndpoint() *MongoEndpoint {
|
|
addrList := strings.Split(global.Cfg().MongodbAddr, ",")
|
|
opts := &options.ClientOptions{
|
|
Hosts: addrList,
|
|
}
|
|
|
|
if global.Cfg().MongodbUsername != "" && global.Cfg().MongodbPassword != "" {
|
|
opts.Auth = &options.Credential{
|
|
Username: global.Cfg().MongodbUsername,
|
|
Password: global.Cfg().MongodbPassword,
|
|
}
|
|
}
|
|
|
|
r := &MongoEndpoint{}
|
|
r.options = opts
|
|
r.collections = make(map[cKey]*mongo.Collection)
|
|
return r
|
|
}
|
|
|
|
func (s *MongoEndpoint) Connect() error {
|
|
client, err := mongo.Connect(context.Background(), s.options)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
s.client = client
|
|
|
|
s.collLock.Lock()
|
|
for _, rule := range global.RuleInsList() {
|
|
cc := s.client.Database(rule.MongodbDatabase).Collection(rule.MongodbCollection)
|
|
s.collections[s.collectionKey(rule.MongodbDatabase, rule.MongodbCollection)] = cc
|
|
}
|
|
s.collLock.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *MongoEndpoint) Ping() error {
|
|
return s.client.Ping(context.Background(), readpref.Primary())
|
|
}
|
|
|
|
func (s *MongoEndpoint) isDuplicateKeyError(stack string) bool {
|
|
return strings.Contains(stack, "E11000 duplicate key error")
|
|
}
|
|
|
|
func (s *MongoEndpoint) collectionKey(database, collection string) cKey {
|
|
return cKey{
|
|
database: database,
|
|
collection: collection,
|
|
}
|
|
}
|
|
|
|
func (s *MongoEndpoint) collection(key cKey) *mongo.Collection {
|
|
s.collLock.RLock()
|
|
c, ok := s.collections[key]
|
|
s.collLock.RUnlock()
|
|
if ok {
|
|
return c
|
|
}
|
|
|
|
s.collLock.Lock()
|
|
c = s.client.Database(key.database).Collection(key.collection)
|
|
s.collections[key] = c
|
|
s.collLock.Unlock()
|
|
|
|
return c
|
|
}
|
|
|
|
func (s *MongoEndpoint) Consume(from mysql.Position, rows []*model.RowRequest) error {
|
|
models := make(map[cKey][]mongo.WriteModel, 0)
|
|
for _, row := range rows {
|
|
rule, _ := global.RuleIns(row.RuleKey)
|
|
if rule.TableColumnSize != len(row.Row) {
|
|
logs.Warnf("%s schema mismatching", row.RuleKey)
|
|
continue
|
|
}
|
|
|
|
metrics.UpdateActionNum(row.Action, row.RuleKey)
|
|
|
|
if rule.LuaEnable() {
|
|
kvm := rowMap(row, rule, true)
|
|
ls, err := luaengine.DoMongoOps(kvm, row.Action, rule)
|
|
if err != nil {
|
|
return errors.Errorf("lua 脚本执行失败 : %s ", errors.ErrorStack(err))
|
|
}
|
|
for _, resp := range ls {
|
|
var model mongo.WriteModel
|
|
switch resp.Action {
|
|
case canal.InsertAction:
|
|
model = mongo.NewInsertOneModel().SetDocument(resp.Table)
|
|
case canal.UpdateAction:
|
|
model = mongo.NewUpdateOneModel().SetFilter(bson.M{"_id": resp.Id}).SetUpdate(bson.M{"$set": resp.Table})
|
|
case global.UpsertAction:
|
|
model = mongo.NewUpdateOneModel().SetFilter(bson.M{"_id": resp.Id}).SetUpsert(true).SetUpdate(bson.M{"$set": resp.Table})
|
|
case canal.DeleteAction:
|
|
model = mongo.NewDeleteOneModel().SetFilter(bson.M{"_id": resp.Id})
|
|
}
|
|
|
|
key := s.collectionKey(rule.MongodbDatabase, resp.Collection)
|
|
array, ok := models[key]
|
|
if !ok {
|
|
array = make([]mongo.WriteModel, 0)
|
|
}
|
|
|
|
logs.Infof("action:%s, collection:%s, id:%v, data:%v", resp.Action, resp.Collection, resp.Id, resp.Table)
|
|
|
|
array = append(array, model)
|
|
models[key] = array
|
|
}
|
|
} else {
|
|
kvm := rowMap(row, rule, false)
|
|
id := primaryKey(row, rule)
|
|
kvm["_id"] = id
|
|
var model mongo.WriteModel
|
|
switch row.Action {
|
|
case canal.InsertAction:
|
|
model = mongo.NewInsertOneModel().SetDocument(kvm)
|
|
case canal.UpdateAction:
|
|
model = mongo.NewUpdateOneModel().SetFilter(bson.M{"_id": id}).SetUpdate(bson.M{"$set": kvm})
|
|
case canal.DeleteAction:
|
|
model = mongo.NewDeleteOneModel().SetFilter(bson.M{"_id": id})
|
|
}
|
|
|
|
ccKey := s.collectionKey(rule.MongodbDatabase, rule.MongodbCollection)
|
|
array, ok := models[ccKey]
|
|
if !ok {
|
|
array = make([]mongo.WriteModel, 0)
|
|
}
|
|
|
|
logs.Infof("action:%s, collection:%s, id:%v, data:%v", row.Action, rule.MongodbCollection, id, kvm)
|
|
|
|
array = append(array, model)
|
|
models[ccKey] = array
|
|
}
|
|
}
|
|
|
|
var slowly bool
|
|
for key, model := range models {
|
|
collection := s.collection(key)
|
|
_, err := collection.BulkWrite(context.Background(), model)
|
|
if err != nil {
|
|
if s.isDuplicateKeyError(err.Error()) {
|
|
slowly = true
|
|
} else {
|
|
return err
|
|
}
|
|
logs.Error(errors.ErrorStack(err))
|
|
break
|
|
}
|
|
}
|
|
if slowly {
|
|
_, err := s.doConsumeSlowly(rows)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
logs.Infof("处理完成 %d 条数据", len(rows))
|
|
return nil
|
|
}
|
|
|
|
func (s *MongoEndpoint) Stock(rows []*model.RowRequest) int64 {
|
|
expect := true
|
|
models := make(map[cKey][]mongo.WriteModel, 0)
|
|
for _, row := range rows {
|
|
rule, _ := global.RuleIns(row.RuleKey)
|
|
if rule.TableColumnSize != len(row.Row) {
|
|
logs.Warnf("%s schema mismatching", row.RuleKey)
|
|
continue
|
|
}
|
|
|
|
if rule.LuaEnable() {
|
|
kvm := rowMap(row, rule, true)
|
|
ls, err := luaengine.DoMongoOps(kvm, row.Action, rule)
|
|
if err != nil {
|
|
log.Println("Lua 脚本执行失败!!! ,详情请参见日志")
|
|
logs.Errorf("lua 脚本执行失败 : %s ", errors.ErrorStack(err))
|
|
expect = false
|
|
break
|
|
}
|
|
|
|
for _, resp := range ls {
|
|
ccKey := s.collectionKey(rule.MongodbDatabase, resp.Collection)
|
|
model := mongo.NewInsertOneModel().SetDocument(resp.Table)
|
|
array, ok := models[ccKey]
|
|
if !ok {
|
|
array = make([]mongo.WriteModel, 0)
|
|
}
|
|
array = append(array, model)
|
|
models[ccKey] = array
|
|
}
|
|
} else {
|
|
kvm := rowMap(row, rule, false)
|
|
id := primaryKey(row, rule)
|
|
kvm["_id"] = id
|
|
|
|
ccKey := s.collectionKey(rule.MongodbDatabase, rule.MongodbCollection)
|
|
model := mongo.NewInsertOneModel().SetDocument(kvm)
|
|
array, ok := models[ccKey]
|
|
if !ok {
|
|
array = make([]mongo.WriteModel, 0)
|
|
}
|
|
array = append(array, model)
|
|
models[ccKey] = array
|
|
}
|
|
}
|
|
|
|
if !expect {
|
|
return 0
|
|
}
|
|
|
|
var slowly bool
|
|
var sum int64
|
|
for key, vs := range models {
|
|
collection := s.collection(key)
|
|
rr, err := collection.BulkWrite(context.Background(), vs)
|
|
if err != nil {
|
|
if s.isDuplicateKeyError(err.Error()) {
|
|
slowly = true
|
|
}
|
|
logs.Error(errors.ErrorStack(err))
|
|
break
|
|
}
|
|
sum += rr.InsertedCount
|
|
}
|
|
|
|
if slowly {
|
|
logs.Info("do consume slowly ... ... ")
|
|
slowlySum, err := s.doConsumeSlowly(rows)
|
|
if err != nil {
|
|
logs.Warnf(err.Error())
|
|
}
|
|
return slowlySum
|
|
}
|
|
|
|
return sum
|
|
}
|
|
|
|
func (s *MongoEndpoint) doConsumeSlowly(rows []*model.RowRequest) (int64, error) {
|
|
var sum int64
|
|
for _, row := range rows {
|
|
rule, _ := global.RuleIns(row.RuleKey)
|
|
if rule.TableColumnSize != len(row.Row) {
|
|
logs.Warnf("%s schema mismatching", row.RuleKey)
|
|
continue
|
|
}
|
|
|
|
if rule.LuaEnable() {
|
|
kvm := rowMap(row, rule, true)
|
|
ls, err := luaengine.DoMongoOps(kvm, row.Action, rule)
|
|
if err != nil {
|
|
logs.Errorf("lua 脚本执行失败 : %s ", errors.ErrorStack(err))
|
|
return sum, err
|
|
}
|
|
for _, resp := range ls {
|
|
collection := s.collection(s.collectionKey(rule.MongodbDatabase, resp.Collection))
|
|
switch resp.Action {
|
|
case canal.InsertAction:
|
|
_, err := collection.InsertOne(context.Background(), resp.Table)
|
|
if err != nil {
|
|
if s.isDuplicateKeyError(err.Error()) {
|
|
logs.Warnf("duplicate key [ %v ]", stringutil.ToJsonString(resp.Table))
|
|
} else {
|
|
return sum, err
|
|
}
|
|
}
|
|
case canal.UpdateAction:
|
|
_, err := collection.UpdateOne(context.Background(), bson.M{"_id": resp.Id}, bson.M{"$set": resp.Table})
|
|
if err != nil {
|
|
return sum, err
|
|
}
|
|
case canal.DeleteAction:
|
|
_, err := collection.DeleteOne(context.Background(), bson.M{"_id": resp.Id})
|
|
if err != nil {
|
|
return sum, err
|
|
}
|
|
}
|
|
logs.Infof("action:%s, collection:%s, id:%v, data:%v",
|
|
row.Action, collection.Name(), resp.Id, resp.Table)
|
|
}
|
|
} else {
|
|
kvm := rowMap(row, rule, false)
|
|
id := primaryKey(row, rule)
|
|
kvm["_id"] = id
|
|
|
|
collection := s.collection(s.collectionKey(rule.MongodbDatabase, rule.MongodbCollection))
|
|
|
|
switch row.Action {
|
|
case canal.InsertAction:
|
|
_, err := collection.InsertOne(context.Background(), kvm)
|
|
if err != nil {
|
|
if s.isDuplicateKeyError(err.Error()) {
|
|
logs.Warnf("duplicate key [ %v ]", stringutil.ToJsonString(kvm))
|
|
} else {
|
|
return sum, err
|
|
}
|
|
}
|
|
case canal.UpdateAction:
|
|
_, err := collection.UpdateOne(context.Background(), bson.M{"_id": id}, bson.M{"$set": kvm})
|
|
if err != nil {
|
|
return sum, err
|
|
}
|
|
case canal.DeleteAction:
|
|
_, err := collection.DeleteOne(context.Background(), bson.M{"_id": id})
|
|
if err != nil {
|
|
return sum, err
|
|
}
|
|
}
|
|
|
|
logs.Infof("action:%s, collection:%s, id:%v, data:%v", row.Action, collection.Name(), id, kvm)
|
|
}
|
|
sum++
|
|
}
|
|
return sum, nil
|
|
}
|
|
|
|
func (s *MongoEndpoint) Close() {
|
|
if s.client != nil {
|
|
s.client.Disconnect(context.Background())
|
|
}
|
|
}
|
|
|