From ac545346aebf3f1a47b2ca7898764f33084001fc Mon Sep 17 00:00:00 2001 From: Hadrien-Cornier Date: Wed, 10 Mar 2021 11:28:51 -0500 Subject: [PATCH 01/15] adding args to connect to mongo adding username and password to mongo tsbs_load options --- pkg/targets/mongo/implemented_target.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/targets/mongo/implemented_target.go b/pkg/targets/mongo/implemented_target.go index d92242c16..991272819 100644 --- a/pkg/targets/mongo/implemented_target.go +++ b/pkg/targets/mongo/implemented_target.go @@ -21,6 +21,8 @@ func (t *mongoTarget) TargetSpecificFlags(flagPrefix string, flagSet *pflag.Flag flagSet.String(flagPrefix+"url", "localhost:27017", "Mongo URL.") flagSet.Duration(flagPrefix+"write-timeout", 10*time.Second, "Write timeout.") flagSet.Bool(flagPrefix+"document-per-event", false, "Whether to use one document per event or aggregate by hour") + flagSet.String(flagPrefix+"user", "", "User to connect") + flagSet.String(flagPrefix+"pass", "", "Password for user connecting to MongoDB (leave blank if not password protected)") } func (t *mongoTarget) TargetName() string { From 09adf9024f9eb336576a2ae56795abcbb654746f Mon Sep 17 00:00:00 2001 From: Hadrien-Cornier Date: Tue, 16 Mar 2021 23:05:34 -0400 Subject: [PATCH 02/15] Changed the mongo files to use the official mongo golang driver : https://pkg.go.dev/go.mongodb.org/mongo-driver Which is stil being maintained instead of the unmaintained mgo driver which does not connect to mongo v.4.4 --- cmd/tsbs_load_mongo/aggregate_loader.go | 72 +++++++------- cmd/tsbs_load_mongo/creator.go | 106 ++++++++++++--------- cmd/tsbs_load_mongo/document_per_loader.go | 25 ++--- 3 files changed, 107 insertions(+), 96 deletions(-) diff --git a/cmd/tsbs_load_mongo/aggregate_loader.go b/cmd/tsbs_load_mongo/aggregate_loader.go index 00033b889..0e8581e22 100644 --- a/cmd/tsbs_load_mongo/aggregate_loader.go +++ b/cmd/tsbs_load_mongo/aggregate_loader.go @@ -1,18 +1,21 @@ package main import ( + "context" "fmt" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" "hash/fnv" "log" "sync" "time" - - "github.com/globalsign/mgo" - "github.com/globalsign/mgo/bson" + //"github.com/globalsign/mgo" + //"github.com/globalsign/mgo/bson" "github.com/timescale/tsbs/load" "github.com/timescale/tsbs/pkg/data" "github.com/timescale/tsbs/pkg/targets" - "github.com/timescale/tsbs/pkg/targets/mongo" + tsbsmongo "github.com/timescale/tsbs/pkg/targets/mongo" //to resolve name collision with mongo driver ) type hostnameIndexer struct { @@ -20,8 +23,8 @@ type hostnameIndexer struct { } func (i *hostnameIndexer) GetIndex(item data.LoadedPoint) uint { - p := item.Data.(*mongo.MongoPoint) - t := &mongo.MongoTag{} + p := item.Data.(*tsbsmongo.MongoPoint) + t := &tsbsmongo.MongoTag{} for j := 0; j < p.TagsLength(); j++ { p.Tags(t, j) key := string(t.Key()) @@ -29,7 +32,10 @@ func (i *hostnameIndexer) GetIndex(item data.LoadedPoint) uint { // the hostame is the defacto index for devops tags // the truck name is the defacto index for iot tags h := fnv.New32a() - h.Write([]byte(string(t.Value()))) + _, err := h.Write(t.Value()) + if err != nil { + panic("cannot write value to hash") + } return uint(h.Sum32()) % i.partitions } } @@ -78,7 +84,7 @@ var pPool = &sync.Pool{New: func() interface{} { return &point{} }} type aggProcessor struct { dbc *dbCreator - collection *mgo.Collection + collection *mongo.Collection createdDocs map[string]bool createQueue []interface{} @@ -86,13 +92,11 @@ type aggProcessor struct { func (p *aggProcessor) Init(_ int, doLoad, _ bool) { if doLoad { - sess := p.dbc.session.Copy() - db := sess.DB(loader.DatabaseName()) - p.collection = db.C(collectionName) + db := p.dbc.client.Database(loader.DatabaseName()) + p.collection = db.Collection(collectionName) } p.createdDocs = make(map[string]bool) p.createQueue = []interface{}{} - } // ProcessBatch receives a batch of bson.M documents (BSON maps) that @@ -127,7 +131,7 @@ func (p *aggProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uint6 eventCnt := uint64(0) for _, event := range batch.arr { tagsMap := map[string]string{} - t := &mongo.MongoTag{} + t := &tsbsmongo.MongoTag{} for j := 0; j < event.TagsLength(); j++ { event.Tags(t, j) tagsMap[string(t.Key())] = string(t.Value()) @@ -161,7 +165,7 @@ func (p *aggProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uint6 } x := pPool.Get().(*point) x.Fields = map[string]interface{}{} - f := &mongo.MongoReading{} + f := &tsbsmongo.MongoReading{} for j := 0; j < event.FieldsLength(); j++ { event.Fields(f, j) x.Fields[string(f.Key())] = f.Value() @@ -171,38 +175,36 @@ func (p *aggProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uint6 docToEvents[docKey] = append(docToEvents[docKey], x) } + coll := p.dbc.client.Database("benchmark").Collection("test") if doLoad { // Checks if any new documents need to be made and does so - bulk := p.collection.Bulk() - bulk = insertNewAggregateDocs(p.collection, bulk, p.createQueue) + //bulk := p.collection.Bulk() + insertNewAggregateDocs(coll, p.dbc.ctx, p.createQueue) p.createQueue = p.createQueue[:0] - // For each document, create one 'set' command for all records // that belong to the document for docKey, events := range docToEvents { - selector := bson.M{aggDocID: docKey} updateMap := bson.M{} for _, event := range events { minKey := (event.Timestamp / (1e9 * 60)) % 60 secKey := (event.Timestamp / 1e9) % 60 key := fmt.Sprintf("events.%d.%d", minKey, secKey) val := event.Fields - val[timestampField] = event.Timestamp updateMap[key] = val } - update := bson.M{"$set": updateMap} - bulk.Update(selector, update) - } - - // All documents accounted for, finally run the operation - _, err := bulk.Run() - if err != nil { - log.Fatalf("Bulk aggregate update err: %s\n", err.Error()) + filter := bson.M{ + aggDocID: bson.M{ + "$eq": docKey, //check if aggDocID field has value of docKey + }, + } + _, err := coll.UpdateOne(p.dbc.ctx, filter, update) + if err != nil { + log.Fatalf(" Update err: %s\n", err.Error()) + } } - for _, events := range docToEvents { for _, e := range events { delete(e.Fields, timestampField) @@ -215,8 +217,7 @@ func (p *aggProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uint6 // insertNewAggregateDocs handles creating new aggregated documents when new devices // or time periods are encountered -func insertNewAggregateDocs(collection *mgo.Collection, bulk *mgo.Bulk, createQueue []interface{}) *mgo.Bulk { - b := bulk +func insertNewAggregateDocs(collection *mongo.Collection, ctx context.Context, createQueue []interface{}) { if len(createQueue) > 0 { off := 0 for off < len(createQueue) { @@ -224,17 +225,12 @@ func insertNewAggregateDocs(collection *mgo.Collection, bulk *mgo.Bulk, createQu if l > len(createQueue) { l = len(createQueue) } - - b.Insert(createQueue[off:l]...) - _, err := b.Run() + opts := options.InsertMany().SetOrdered(false) + _, err := collection.InsertMany(ctx, createQueue[off:l], opts) if err != nil { - log.Fatalf("Bulk aggregate docs err: %s\n", err.Error()) + log.Fatalf("Insert many aggregate docs err: %s\n", err.Error()) } - b = collection.Bulk() - off = l } } - - return b } diff --git a/cmd/tsbs_load_mongo/creator.go b/cmd/tsbs_load_mongo/creator.go index 0fbcc1e33..1b75d0bdc 100644 --- a/cmd/tsbs_load_mongo/creator.go +++ b/cmd/tsbs_load_mongo/creator.go @@ -1,29 +1,47 @@ package main import ( + "context" "fmt" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readpref" "log" "strings" - - "github.com/globalsign/mgo" - "github.com/globalsign/mgo/bson" ) type dbCreator struct { - session *mgo.Session + client *mongo.Client + ctx context.Context + cancel context.CancelFunc } +//var daemonURL string = "mongodb+srv://dbadmin:QBrtK7nGNwryvSTs@cluster0.op8ed.mongodb.net"///benchmark?retryWrites=true&w=majority" +//var writeTimeout time.Duration = 30*time.Second +//tested - OK func (d *dbCreator) Init() { var err error - d.session, err = mgo.DialWithTimeout(daemonURL, writeTimeout) + d.ctx, d.cancel = context.WithTimeout(context.Background(), writeTimeout) + //defer d.cancel() + log.Println("TRYING TO CONNECT") + d.client, err = mongo.Connect(d.ctx, options.Client().ApplyURI(daemonURL)) if err != nil { + log.Println("DID NOT MANAGE TO CONNECT") log.Fatal(err) + } else { + err = d.client.Ping(d.ctx, readpref.Primary()) + if err != nil { + log.Println("DID NOT MANAGE TO CONNECT") + log.Fatal(err) + } else { + log.Println("MANAGED TO CONNECT") + } } - d.session.SetMode(mgo.Eventual, false) } func (d *dbCreator) DBExists(dbName string) bool { - dbs, err := d.session.DatabaseNames() + dbs, err := d.client.ListDatabaseNames(d.ctx, bson.D{}) if err != nil { log.Fatal(err) } @@ -36,74 +54,70 @@ func (d *dbCreator) DBExists(dbName string) bool { } func (d *dbCreator) RemoveOldDB(dbName string) error { - collections, err := d.session.DB(dbName).CollectionNames() + collection_names, err := d.client.Database(dbName).ListCollectionNames(d.ctx, bson.D{}) + log.Printf("collection_names : %s", collection_names) if err != nil { return err } - for _, name := range collections { - d.session.DB(dbName).C(name).DropCollection() + for _, coll := range collection_names { + log.Printf("collection found : ", d.client.Database(dbName).Collection(coll)) + log.Println("deleting the previous collection") + err := d.client.Database(dbName).Collection(coll).Drop(d.ctx) + if err != nil { + log.Printf("Could not delete collection : ", err.Error()) + } } - return nil } func (d *dbCreator) CreateDB(dbName string) error { - cmd := make(bson.D, 0, 4) - cmd = append(cmd, bson.DocElem{Name: "create", Value: collectionName}) - - // wiredtiger settings - cmd = append(cmd, bson.DocElem{ - Name: "storageEngine", Value: map[string]interface{}{ - "wiredTiger": map[string]interface{}{ - "configString": "block_compressor=snappy", - }, - }, - }) - - err := d.session.DB(dbName).Run(cmd, nil) + //Starting in MongoDB 3.2, the WiredTiger storage engine is the default storage engine + err := d.client.Database(dbName).CreateCollection(d.ctx, collectionName) if err != nil { if strings.Contains(err.Error(), "already exists") { + log.Printf("collection %s already exists", dbName) return nil } + log.Printf("create collection err: %v", err) return fmt.Errorf("create collection err: %v", err) } - - collection := d.session.DB(dbName).C(collectionName) - var key []string + collection := d.client.Database(dbName).Collection(collectionName) + var key bson.D if documentPer { - key = []string{"measurement", "tags.hostname", timestampField} + key = bson.D{{"measurement", 1}, {"tags.hostname", 1}, {timestampField, 1}} } else { - key = []string{aggKeyID, "measurement", "tags.hostname"} + key = bson.D{{aggKeyID, 1}, {"measurement", 1}, {"tags.hostname", 1}} } - - index := mgo.Index{ - Key: key, - Unique: false, // Unique does not work on the entire array of tags! - Background: false, - Sparse: false, + index := mongo.IndexModel{ + Keys: key, + Options: options.Index().SetName("default_index"), } - err = collection.EnsureIndex(index) + idxview := collection.Indexes() + _, err = idxview.CreateOne(d.ctx, index) if err != nil { - return fmt.Errorf("create basic index err: %v", err) + log.Printf("create index err: %v", err) + panic(err) } - // To make updates for new records more efficient, we need a efficient doc // lookup index if !documentPer { - err = collection.EnsureIndex(mgo.Index{ - Key: []string{aggDocID}, - Unique: false, - Background: false, - Sparse: false, + _, err := idxview.CreateOne(d.ctx, mongo.IndexModel{ + Keys: bson.D{{aggDocID, 1}}, + Options: options.Index().SetName("default_index"), }) if err != nil { - return fmt.Errorf("create agg doc index err: %v", err) + log.Printf("create index err: %v", err) + panic(err) } } - return nil } func (d *dbCreator) Close() { - d.session.Close() + log.Println("losing database connection") + var err error + (d.cancel)() + if err = d.client.Disconnect(d.ctx); err != nil { + panic(err) + } } diff --git a/cmd/tsbs_load_mongo/document_per_loader.go b/cmd/tsbs_load_mongo/document_per_loader.go index c24c01f3b..5c82c9e2a 100644 --- a/cmd/tsbs_load_mongo/document_per_loader.go +++ b/cmd/tsbs_load_mongo/document_per_loader.go @@ -1,13 +1,14 @@ package main import ( + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" "log" "sync" - - "github.com/globalsign/mgo" + //"github.com/globalsign/mgo" "github.com/timescale/tsbs/load" "github.com/timescale/tsbs/pkg/targets" - "github.com/timescale/tsbs/pkg/targets/mongo" + tsbsmongo "github.com/timescale/tsbs/pkg/targets/mongo" ) // naiveBenchmark allows you to run a benchmark using the naive, one document per @@ -39,16 +40,15 @@ var spPool = &sync.Pool{New: func() interface{} { return &singlePoint{} }} type naiveProcessor struct { dbc *dbCreator - collection *mgo.Collection + collection *mongo.Collection pvs []interface{} } func (p *naiveProcessor) Init(_ int, doLoad, _ bool) { if doLoad { - sess := p.dbc.session.Copy() - db := sess.DB(loader.DatabaseName()) - p.collection = db.C(collectionName) + db := p.dbc.client.Database(loader.DatabaseName()) + p.collection = db.Collection(collectionName) } p.pvs = []interface{}{} } @@ -70,12 +70,12 @@ func (p *naiveProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uin x.Timestamp = event.Timestamp() x.Fields = map[string]interface{}{} x.Tags = map[string]string{} - f := &mongo.MongoReading{} + f := &tsbsmongo.MongoReading{} for j := 0; j < event.FieldsLength(); j++ { event.Fields(f, j) x.Fields[string(f.Key())] = f.Value() } - t := &mongo.MongoTag{} + t := &tsbsmongo.MongoTag{} for j := 0; j < event.TagsLength(); j++ { event.Tags(t, j) x.Tags[string(t.Key())] = string(t.Value()) @@ -85,9 +85,10 @@ func (p *naiveProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uin } if doLoad { - bulk := p.collection.Bulk() - bulk.Insert(p.pvs...) - _, err := bulk.Run() + //bulk := p.collection.Bulk() + opts := options.InsertMany().SetOrdered(false) + //bulk.Insert(p.pvs...) + _, err := p.collection.InsertMany(p.dbc.ctx, p.pvs, opts) if err != nil { log.Fatalf("Bulk insert docs err: %s\n", err.Error()) } From 31ddb0f671b90130240975cb4a254d90cb3652c6 Mon Sep 17 00:00:00 2001 From: Hadrien-Cornier Date: Tue, 16 Mar 2021 23:17:28 -0400 Subject: [PATCH 03/15] removed credentials and changed them also --- cmd/tsbs_load_mongo/creator.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/cmd/tsbs_load_mongo/creator.go b/cmd/tsbs_load_mongo/creator.go index 1b75d0bdc..f0b9b5e4a 100644 --- a/cmd/tsbs_load_mongo/creator.go +++ b/cmd/tsbs_load_mongo/creator.go @@ -17,9 +17,6 @@ type dbCreator struct { cancel context.CancelFunc } -//var daemonURL string = "mongodb+srv://dbadmin:QBrtK7nGNwryvSTs@cluster0.op8ed.mongodb.net"///benchmark?retryWrites=true&w=majority" -//var writeTimeout time.Duration = 30*time.Second -//tested - OK func (d *dbCreator) Init() { var err error d.ctx, d.cancel = context.WithTimeout(context.Background(), writeTimeout) From 2b490f05043a9f3b44848bfc9dc32bb2f0abbb76 Mon Sep 17 00:00:00 2001 From: Hadrien-Cornier Date: Wed, 17 Mar 2021 11:20:31 -0400 Subject: [PATCH 04/15] upgraded mongo-driver dependency --- go.mod | 1 + go.sum | 12 ++++++++++++ 2 files changed, 13 insertions(+) diff --git a/go.mod b/go.mod index 1106116ed..df049189c 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/tklauser/go-sysconf v0.3.5 // indirect github.com/transceptor-technology/go-qpack v0.0.0-20190116123619-49a14b216a45 github.com/valyala/fasthttp v1.15.1 + go.mongodb.org/mongo-driver v1.5.0 go.uber.org/atomic v1.6.0 golang.org/x/net v0.0.0-20200904194848-62affa334b73 golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e diff --git a/go.sum b/go.sum index 6b3ac40ef..f5a24ab5c 100644 --- a/go.sum +++ b/go.sum @@ -108,6 +108,7 @@ github.com/asaskevich/govalidator v0.0.0-20200108200545-475eaeb16496/go.mod h1:o github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU= github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.34.9/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= +github.com/aws/aws-sdk-go v1.34.28/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48= github.com/aws/aws-sdk-go v1.35.13 h1:Y49GifH2czbooBMkVpoXwokur1JRBFKVLVCQzO0YsW8= github.com/aws/aws-sdk-go v1.35.13/go.mod h1:tlPOdRjfxPBpNIwqDj61rmsnA85v9jc0Ps9+muhnW+k= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= @@ -298,6 +299,7 @@ github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-toolsmith/astcast v1.0.0/go.mod h1:mt2OdQTeAQcY4DQgPSArJjHCcOwlX+Wl/kwN+LbLGQ4= github.com/go-toolsmith/astcopy v1.0.0/go.mod h1:vrgyG+5Bxrnz4MZWPF+pI4R8h3qKRjjyvV/DSez4WVQ= @@ -743,6 +745,7 @@ github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtP github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.4.0 h1:u3Z1r+oOXJIkxqw34zVhyPgjBsm6X2wn21NWs/HfSeg= github.com/pelletier/go-toml v1.4.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUruD3k1mMwo= +github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/peterh/liner v1.0.1-0.20180619022028-8c1271fcf47f/go.mod h1:xIteQHvHuaLYG9IFj6mSxM0fCKrs34IrEQUhOYuGPHc= @@ -895,6 +898,7 @@ github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69 github.com/tdakkota/asciicheck v0.0.0-20200416190851-d7f85be797a2/go.mod h1:yHp0ai0Z9gUljN3o0xMhYJnH/IcvkdTBOX2fmJ93JEM= github.com/testcontainers/testcontainers-go v0.5.1/go.mod h1:Oc/G02bjZiX0p3lzyh6b1GCELP0e4/6Cg3ciU/LnFvU= github.com/tetafro/godot v0.4.8/go.mod h1:/7NLHhv08H1+8DNj0MElpAACw1ajsCuf3TKNQxA5S+0= +github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/timakin/bodyclose v0.0.0-20190930140734-f7f2e9bca95e/go.mod h1:Qimiffbc6q9tBWlVV6x0P9sat/ao1xEkREYPPj9hphk= github.com/timescale/promscale v0.0.0-20201006153045-6a66a36f5c84 h1:jdJdzLyz0SNBuvt5rYyBxDqhgZ2EcbA7eWVBMqcyEHc= @@ -927,12 +931,15 @@ github.com/valyala/quicktemplate v1.6.2/go.mod h1:mtEJpQtUiBV0SHhMX6RtiJtqxncgrf github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw= github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc h1:n+nNi93yXLkJvKwXNP9d55HC7lGK4H/SRcwB5IaUZLo= github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg= github.com/xlab/treeprint v1.0.0/go.mod h1:IoImgRak9i3zJyuxOKUP1v4UZd1tMoKkq/Cimt1uhCg= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -945,7 +952,10 @@ go.mongodb.org/mongo-driver v1.0.3/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qL go.mongodb.org/mongo-driver v1.1.1/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.1.2/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.3.0/go.mod h1:MSWZXKOynuguX+JSvwP8i+58jYCXxbia8HS3gZBapIE= +go.mongodb.org/mongo-driver v1.3.2 h1:IYppNjEV/C+/3VPbhHVxQ4t04eVW0cLp0/pNdW++6Ug= go.mongodb.org/mongo-driver v1.3.2/go.mod h1:MSWZXKOynuguX+JSvwP8i+58jYCXxbia8HS3gZBapIE= +go.mongodb.org/mongo-driver v1.5.0 h1:REddm85e1Nl0JPXGGhgZkgJdG/yOe6xvpXUcYK5WLt0= +go.mongodb.org/mongo-driver v1.5.0/go.mod h1:boiGPFqyBs5R0R5qf2ErokGRekMfwn+MqKaUyHs7wy0= go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= @@ -984,6 +994,7 @@ golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191202143827-86a70503ff7e/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a h1:vclmkQCjlDX5OydZ9wv8rBCcS0QyQY66Mpf/7BZbInM= @@ -1078,6 +1089,7 @@ golang.org/x/sync v0.0.0-20190412183630-56d357773e84/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 h1:qwRHBd0NqMbJxfbotnDhm2ByMI1Shq4Y6oRJo21SGJA= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= From 558a111d90176cedd301d1e879eb0c5b4d614753 Mon Sep 17 00:00:00 2001 From: Hadrien-Cornier Date: Wed, 17 Mar 2021 11:28:55 -0400 Subject: [PATCH 05/15] added formatting --- cmd/tsbs_load_mongo/creator.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/tsbs_load_mongo/creator.go b/cmd/tsbs_load_mongo/creator.go index f0b9b5e4a..98e7d4cf6 100644 --- a/cmd/tsbs_load_mongo/creator.go +++ b/cmd/tsbs_load_mongo/creator.go @@ -57,11 +57,11 @@ func (d *dbCreator) RemoveOldDB(dbName string) error { return err } for _, coll := range collection_names { - log.Printf("collection found : ", d.client.Database(dbName).Collection(coll)) + log.Printf("collection found : %s", d.client.Database(dbName).Collection(coll)) log.Println("deleting the previous collection") err := d.client.Database(dbName).Collection(coll).Drop(d.ctx) if err != nil { - log.Printf("Could not delete collection : ", err.Error()) + log.Printf("Could not delete collection : %s", err.Error()) } } return nil From 547f8f84d51e2cf7b2c4fc928c2a3a2b5c0bb6b9 Mon Sep 17 00:00:00 2001 From: Hadrien-Cornier Date: Wed, 17 Mar 2021 11:31:24 -0400 Subject: [PATCH 06/15] removed user and pass because we can pass it into the url instead --- pkg/targets/mongo/implemented_target.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/targets/mongo/implemented_target.go b/pkg/targets/mongo/implemented_target.go index 991272819..d92242c16 100644 --- a/pkg/targets/mongo/implemented_target.go +++ b/pkg/targets/mongo/implemented_target.go @@ -21,8 +21,6 @@ func (t *mongoTarget) TargetSpecificFlags(flagPrefix string, flagSet *pflag.Flag flagSet.String(flagPrefix+"url", "localhost:27017", "Mongo URL.") flagSet.Duration(flagPrefix+"write-timeout", 10*time.Second, "Write timeout.") flagSet.Bool(flagPrefix+"document-per-event", false, "Whether to use one document per event or aggregate by hour") - flagSet.String(flagPrefix+"user", "", "User to connect") - flagSet.String(flagPrefix+"pass", "", "Password for user connecting to MongoDB (leave blank if not password protected)") } func (t *mongoTarget) TargetName() string { From f6f4a7721f334fe0679838678f0f1607fb0b5fdf Mon Sep 17 00:00:00 2001 From: Hadrien-Cornier Date: Wed, 17 Mar 2021 12:05:31 -0400 Subject: [PATCH 07/15] corrected collection logging bug --- cmd/tsbs_load_mongo/creator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/tsbs_load_mongo/creator.go b/cmd/tsbs_load_mongo/creator.go index 98e7d4cf6..08ba5eb8e 100644 --- a/cmd/tsbs_load_mongo/creator.go +++ b/cmd/tsbs_load_mongo/creator.go @@ -57,7 +57,7 @@ func (d *dbCreator) RemoveOldDB(dbName string) error { return err } for _, coll := range collection_names { - log.Printf("collection found : %s", d.client.Database(dbName).Collection(coll)) + log.Printf("collection found : %s", coll) log.Println("deleting the previous collection") err := d.client.Database(dbName).Collection(coll).Drop(d.ctx) if err != nil { From 823f4931589845fcd71ee3caa2fbc72933f6ca2e Mon Sep 17 00:00:00 2001 From: Hadrien-Cornier Date: Thu, 18 Mar 2021 20:17:31 -0400 Subject: [PATCH 08/15] tested, the mongo loader works when using pre-generated data --- cmd/tsbs_load_mongo/aggregate_loader.go | 2 +- cmd/tsbs_load_mongo/creator.go | 14 +++++++------- go.sum | 2 ++ 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/cmd/tsbs_load_mongo/aggregate_loader.go b/cmd/tsbs_load_mongo/aggregate_loader.go index 0e8581e22..70dd30194 100644 --- a/cmd/tsbs_load_mongo/aggregate_loader.go +++ b/cmd/tsbs_load_mongo/aggregate_loader.go @@ -175,7 +175,7 @@ func (p *aggProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uint6 docToEvents[docKey] = append(docToEvents[docKey], x) } - coll := p.dbc.client.Database("benchmark").Collection("test") + coll := p.dbc.client.Database("benchmark").Collection(collectionName) if doLoad { // Checks if any new documents need to be made and does so diff --git a/cmd/tsbs_load_mongo/creator.go b/cmd/tsbs_load_mongo/creator.go index 08ba5eb8e..633bdbf9f 100644 --- a/cmd/tsbs_load_mongo/creator.go +++ b/cmd/tsbs_load_mongo/creator.go @@ -18,6 +18,7 @@ type dbCreator struct { } func (d *dbCreator) Init() { + //log.Println("tsbs_load_mongo/creator/Init") var err error d.ctx, d.cancel = context.WithTimeout(context.Background(), writeTimeout) //defer d.cancel() @@ -38,6 +39,7 @@ func (d *dbCreator) Init() { } func (d *dbCreator) DBExists(dbName string) bool { + //log.Println("tsbs_load_mongo/creator/DBExists") dbs, err := d.client.ListDatabaseNames(d.ctx, bson.D{}) if err != nil { log.Fatal(err) @@ -51,6 +53,7 @@ func (d *dbCreator) DBExists(dbName string) bool { } func (d *dbCreator) RemoveOldDB(dbName string) error { + //log.Println("tsbs_load_mongo/creator/RemoveOldDB") collection_names, err := d.client.Database(dbName).ListCollectionNames(d.ctx, bson.D{}) log.Printf("collection_names : %s", collection_names) if err != nil { @@ -68,6 +71,7 @@ func (d *dbCreator) RemoveOldDB(dbName string) error { } func (d *dbCreator) CreateDB(dbName string) error { + //log.Println("tsbs_load_mongo/creator/CreateDB") //Starting in MongoDB 3.2, the WiredTiger storage engine is the default storage engine err := d.client.Database(dbName).CreateCollection(d.ctx, collectionName) if err != nil { @@ -100,7 +104,7 @@ func (d *dbCreator) CreateDB(dbName string) error { if !documentPer { _, err := idxview.CreateOne(d.ctx, mongo.IndexModel{ Keys: bson.D{{aggDocID, 1}}, - Options: options.Index().SetName("default_index"), + Options: options.Index().SetName("doc_lookup_index"), }) if err != nil { log.Printf("create index err: %v", err) @@ -111,10 +115,6 @@ func (d *dbCreator) CreateDB(dbName string) error { } func (d *dbCreator) Close() { - log.Println("losing database connection") - var err error - (d.cancel)() - if err = d.client.Disconnect(d.ctx); err != nil { - panic(err) - } + //closing the database connection here + //causes an error in the bulk loading } diff --git a/go.sum b/go.sum index f5a24ab5c..5322e9b0d 100644 --- a/go.sum +++ b/go.sum @@ -745,6 +745,7 @@ github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtP github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.4.0 h1:u3Z1r+oOXJIkxqw34zVhyPgjBsm6X2wn21NWs/HfSeg= github.com/pelletier/go-toml v1.4.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUruD3k1mMwo= +github.com/pelletier/go-toml v1.7.0 h1:7utD74fnzVc/cpcyy8sjrlFr5vYpypUixARcHIMIGuI= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= @@ -939,6 +940,7 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg= github.com/xlab/treeprint v1.0.0/go.mod h1:IoImgRak9i3zJyuxOKUP1v4UZd1tMoKkq/Cimt1uhCg= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA= github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= From 98a4c0c7cf9752c7ce97469d37546cca7e3ee58f Mon Sep 17 00:00:00 2001 From: Hadrien-Cornier Date: Fri, 19 Mar 2021 19:50:36 -0400 Subject: [PATCH 09/15] changed to official Go mongo driver that can connect to the latest version of mongo (4.4) and added batchsize option to the query executor (going from 1 to 1000 batchsize can speed queries by a factor of 30) as well as an indicator of number of queries executed --- cmd/tsbs_run_queries_mongo/main.go | 90 +++++++++++++++++++++--------- 1 file changed, 65 insertions(+), 25 deletions(-) diff --git a/cmd/tsbs_run_queries_mongo/main.go b/cmd/tsbs_run_queries_mongo/main.go index b00b68e02..c780fa9df 100644 --- a/cmd/tsbs_run_queries_mongo/main.go +++ b/cmd/tsbs_run_queries_mongo/main.go @@ -5,46 +5,53 @@ package main import ( + "context" "encoding/gob" "fmt" - "log" - "time" - "github.com/blagojts/viper" - "github.com/globalsign/mgo" - "github.com/globalsign/mgo/bson" + mgobson "github.com/globalsign/mgo/bson" "github.com/spf13/pflag" "github.com/timescale/tsbs/internal/utils" "github.com/timescale/tsbs/pkg/query" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readpref" + "log" + "time" ) // Program option vars: var ( daemonURL string timeout time.Duration + batchsize int32 ) // Global vars: var ( - runner *query.BenchmarkRunner - session *mgo.Session + runner *query.BenchmarkRunner + //session *mgo.Session\ + processedqueries int64 ) // Parse args: func init() { + log.Println("init") // needed for deserializing the mongo query from gob gob.Register([]interface{}{}) gob.Register(map[string]interface{}{}) gob.Register([]map[string]interface{}{}) gob.Register(bson.M{}) gob.Register([]bson.M{}) - + gob.Register(mgobson.M{}) + gob.Register([]mgobson.M{}) var config query.BenchmarkRunnerConfig config.AddToFlagSet(pflag.CommandLine) pflag.String("url", "mongodb://localhost:27017", "Daemon URL.") pflag.Duration("read-timeout", 30*time.Second, "Timeout value for individual queries") - + pflag.Int32("batchsize", 1, "Batch size for queries") pflag.Parse() err := utils.SetupConfigFile() @@ -59,42 +66,77 @@ func init() { daemonURL = viper.GetString("url") timeout = viper.GetDuration("read-timeout") - + batchsize = viper.GetInt32("batchsize") runner = query.NewBenchmarkRunner(config) + log.Println("exiting init") + processedqueries = 0 } func main() { - var err error - session, err = mgo.DialWithTimeout(daemonURL, timeout) - if err != nil { - log.Fatal(err) - } + log.Println("launching run") runner.Run(&query.MongoPool, newProcessor) } type processor struct { - collection *mgo.Collection + client *mongo.Client + collection *mongo.Collection + ctx context.Context + cancel context.CancelFunc } func newProcessor() query.Processor { return &processor{} } func (p *processor) Init(workerNumber int) { - sess := session.Copy() - db := sess.DB(runner.DatabaseName()) - p.collection = db.C("point_data") + var err error + p.ctx, p.cancel = context.WithTimeout(context.Background(), timeout) + log.Println("TRYING TO CONNECT") + p.client, err = mongo.Connect(p.ctx, options.Client().ApplyURI(daemonURL)) + db := p.client.Database(runner.DatabaseName()) + p.collection = db.Collection("point_data") + if err != nil { + log.Println("DID NOT MANAGE TO CONNECT") + log.Fatal(err) + } else { + err = p.client.Ping(p.ctx, readpref.Primary()) + if err != nil { + log.Println("DID NOT MANAGE TO CONNECT") + log.Fatal(err) + } else { + log.Println("MANAGED TO CONNECT") + } + } } func (p *processor) ProcessQuery(q query.Query, _ bool) ([]*query.Stat, error) { + processedqueries++ + log.Printf("processed queries : %d", processedqueries) mq := q.(*query.Mongo) start := time.Now().UnixNano() - pipe := p.collection.Pipe(mq.BsonDoc).AllowDiskUse() - iter := pipe.Iter() + //mgo.Collection.Pipe() + var opts options.AggregateOptions + opts.SetAllowDiskUse(true) + opts.SetBatchSize(batchsize) + //log.Println("creating cursor") + cursor, err := p.collection.Aggregate(p.ctx, mq.BsonDoc, &opts) + if err != nil { + log.Fatal(err.Error()) + } + //log.Println("cursor created") + //pipe := p.collection.Pipe(mq.BsonDoc).AllowDiskUse() + //defer cursor.Close(p.ctx) + //iter := pipe.Iter() if runner.DebugLevel() > 0 { fmt.Println(mq.BsonDoc) } - var result map[string]interface{} + //var result map[string]interface{} cnt := 0 - for iter.Next(&result) { + //iter.Next(&result) + //log.Println("going through cursor") + for cursor.Next(p.ctx) { + var result bson.M + if err := cursor.Decode(&result); err != nil { + log.Println(err) + } if runner.DoPrintResponses() { fmt.Printf("ID %d: %v\n", q.GetID(), result) } @@ -103,8 +145,6 @@ func (p *processor) ProcessQuery(q query.Query, _ bool) ([]*query.Stat, error) { if runner.DebugLevel() > 0 { fmt.Println(cnt) } - err := iter.Close() - took := time.Now().UnixNano() - start lag := float64(took) / 1e6 // milliseconds stat := query.GetStat() From df86d33aec3bcef599dcdc307dbb6b84ea811b3b Mon Sep 17 00:00:00 2001 From: Hadrien-Cornier Date: Tue, 23 Mar 2021 17:18:22 -0400 Subject: [PATCH 10/15] added some logging and error handling to the timestream loader --- .../timestream/common_dimensions_processor.go | 1 + pkg/targets/timestream/db_creator.go | 37 ++++++++++++++++--- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/pkg/targets/timestream/common_dimensions_processor.go b/pkg/targets/timestream/common_dimensions_processor.go index d7b1ef786..c4926d2b4 100644 --- a/pkg/targets/timestream/common_dimensions_processor.go +++ b/pkg/targets/timestream/common_dimensions_processor.go @@ -54,6 +54,7 @@ func (c *commonDimensionsProcessor) expandDimensionBuffer(requiredDimensions int } } func (c *commonDimensionsProcessor) writeToTable(table string, rows []deserializedPoint) (metricCount uint64, err error) { + log.Printf("inserting into table %s the number of row %d ", table, len(rows)) for _, row := range rows { c.expandDimensionBuffer(len(row.tagKeys)) numDimensions := convertTagsToDimensions(row.tagKeys, row.tags, c._dimensionsBuffer) diff --git a/pkg/targets/timestream/db_creator.go b/pkg/targets/timestream/db_creator.go index 29fc2655a..dee688a4e 100644 --- a/pkg/targets/timestream/db_creator.go +++ b/pkg/targets/timestream/db_creator.go @@ -83,7 +83,9 @@ func (d *dbCreator) PostCreateDB(dbName string) error { log.Println("Creating Timestream tables") headers := d.ds.Headers() var requiredTables []string + log.Printf("We need the following tables %v", headers.FieldKeys) for tableName := range headers.FieldKeys { + log.Printf("trying to create table : %s", tableName) requiredTables = append(requiredTables, tableName) createTableInput := ×treamwrite.CreateTableInput{ DatabaseName: &dbName, @@ -94,13 +96,38 @@ func (d *dbCreator) PostCreateDB(dbName string) error { TableName: &tableName, } _, err := d.writeSvc.CreateTable(createTableInput) - if _, ok := err.(*timestreamwrite.ConflictException); !ok { - return errors.Wrap(err, "could not create table '"+tableName+"': ") - } else { - log.Println("Table " + tableName + " exists, skipping create") + switch err.(type) { + case *timestreamwrite.ConflictException: + log.Printf("failure in creating : %s", tableName) + return errors.Wrap(err, "could not create table '"+tableName+"' because it already exists: ") + case *timestreamwrite.ValidationException: + log.Printf("failure in creating : %s", tableName) + return errors.Wrap(err, "could not create table '"+tableName+"' because the request is malformed: ") + case *timestreamwrite.AccessDeniedException: + log.Printf("failure in creating : %s", tableName) + return errors.Wrap(err, "could not create table '"+tableName+"' because access denied: ") + case *timestreamwrite.ResourceNotFoundException: + log.Printf("failure in creating : %s", tableName) + return errors.Wrap(err, "could not create table '"+tableName+"' because resource not found : ") + case *timestreamwrite.ServiceQuotaExceededException: + log.Printf("failure in creating : %s", tableName) + return errors.Wrap(err, "could not create table '"+tableName+"' because service quota exceeded : ") + case *timestreamwrite.ThrottlingException: + log.Printf("failure in creating : %s", tableName) + return errors.Wrap(err, "could not create table '"+tableName+"' because too many requests were "+ + "made by a user exceeding service quotas. The request was throttled : ") + case *timestreamwrite.InternalServerException: + log.Printf("failure in creating : %s", tableName) + return errors.Wrap(err, "could not create table '"+tableName+"' because Timestream was unable "+ + "to fully process this request because of an internal server error : ") + case *timestreamwrite.InvalidEndpointException: + log.Printf("failure in creating : %s", tableName) + return errors.Wrap(err, "could not create table '"+tableName+"' because the requested endpoint "+ + "was invalid : ") + default: + log.Printf("success in creating : %s", tableName) } } - fmt.Println("DB created, checking table status") if err := d.waitForTables(dbName, requiredTables); err != nil { return errors.Wrap(err, "could not create timestream tables") From 249678ca3755b4fb1b8225612194c1cbf42b3ad9 Mon Sep 17 00:00:00 2001 From: Hadrien-Cornier Date: Mon, 17 May 2021 12:32:10 -0400 Subject: [PATCH 11/15] Update cmd/tsbs_load_mongo/creator.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Jônatas Davi Paganini --- cmd/tsbs_load_mongo/creator.go | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/cmd/tsbs_load_mongo/creator.go b/cmd/tsbs_load_mongo/creator.go index 633bdbf9f..551247336 100644 --- a/cmd/tsbs_load_mongo/creator.go +++ b/cmd/tsbs_load_mongo/creator.go @@ -21,19 +21,10 @@ func (d *dbCreator) Init() { //log.Println("tsbs_load_mongo/creator/Init") var err error d.ctx, d.cancel = context.WithTimeout(context.Background(), writeTimeout) - //defer d.cancel() - log.Println("TRYING TO CONNECT") d.client, err = mongo.Connect(d.ctx, options.Client().ApplyURI(daemonURL)) if err != nil { - log.Println("DID NOT MANAGE TO CONNECT") + log.Println("Can't establish connection with", daemonURL) log.Fatal(err) - } else { - err = d.client.Ping(d.ctx, readpref.Primary()) - if err != nil { - log.Println("DID NOT MANAGE TO CONNECT") - log.Fatal(err) - } else { - log.Println("MANAGED TO CONNECT") } } } From b563ff5882d94b9457d913d21bf7956bdbf24d53 Mon Sep 17 00:00:00 2001 From: Hadrien-Cornier Date: Mon, 17 May 2021 12:32:38 -0400 Subject: [PATCH 12/15] Update cmd/tsbs_load_mongo/creator.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Jônatas Davi Paganini --- cmd/tsbs_load_mongo/creator.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/tsbs_load_mongo/creator.go b/cmd/tsbs_load_mongo/creator.go index 551247336..e4ad38dfe 100644 --- a/cmd/tsbs_load_mongo/creator.go +++ b/cmd/tsbs_load_mongo/creator.go @@ -62,7 +62,6 @@ func (d *dbCreator) RemoveOldDB(dbName string) error { } func (d *dbCreator) CreateDB(dbName string) error { - //log.Println("tsbs_load_mongo/creator/CreateDB") //Starting in MongoDB 3.2, the WiredTiger storage engine is the default storage engine err := d.client.Database(dbName).CreateCollection(d.ctx, collectionName) if err != nil { From ddd88249cced3e25affded5f3803ae641ed96517 Mon Sep 17 00:00:00 2001 From: Hadrien-Cornier Date: Mon, 17 May 2021 12:32:57 -0400 Subject: [PATCH 13/15] Update cmd/tsbs_load_mongo/document_per_loader.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Jônatas Davi Paganini --- cmd/tsbs_load_mongo/document_per_loader.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/tsbs_load_mongo/document_per_loader.go b/cmd/tsbs_load_mongo/document_per_loader.go index 5c82c9e2a..e5ae3ba93 100644 --- a/cmd/tsbs_load_mongo/document_per_loader.go +++ b/cmd/tsbs_load_mongo/document_per_loader.go @@ -5,7 +5,6 @@ import ( "go.mongodb.org/mongo-driver/mongo/options" "log" "sync" - //"github.com/globalsign/mgo" "github.com/timescale/tsbs/load" "github.com/timescale/tsbs/pkg/targets" tsbsmongo "github.com/timescale/tsbs/pkg/targets/mongo" From e9a575f098a6a0442322c8076de69933cb702cc9 Mon Sep 17 00:00:00 2001 From: Hadrien-Cornier Date: Mon, 17 May 2021 12:33:20 -0400 Subject: [PATCH 14/15] Update cmd/tsbs_load_mongo/document_per_loader.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Jônatas Davi Paganini --- cmd/tsbs_load_mongo/document_per_loader.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/cmd/tsbs_load_mongo/document_per_loader.go b/cmd/tsbs_load_mongo/document_per_loader.go index e5ae3ba93..59ce98d76 100644 --- a/cmd/tsbs_load_mongo/document_per_loader.go +++ b/cmd/tsbs_load_mongo/document_per_loader.go @@ -84,9 +84,7 @@ func (p *naiveProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uin } if doLoad { - //bulk := p.collection.Bulk() opts := options.InsertMany().SetOrdered(false) - //bulk.Insert(p.pvs...) _, err := p.collection.InsertMany(p.dbc.ctx, p.pvs, opts) if err != nil { log.Fatalf("Bulk insert docs err: %s\n", err.Error()) From a4b6940a3877988d56eea0cc2cd9b1f09f1d2677 Mon Sep 17 00:00:00 2001 From: Ramon Ribeiro Date: Thu, 8 Jun 2023 11:27:31 -0300 Subject: [PATCH 15/15] Fix closing brackets on tsbs_mongo_load --- cmd/tsbs_load_mongo/creator.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cmd/tsbs_load_mongo/creator.go b/cmd/tsbs_load_mongo/creator.go index e4ad38dfe..861f92a06 100644 --- a/cmd/tsbs_load_mongo/creator.go +++ b/cmd/tsbs_load_mongo/creator.go @@ -3,12 +3,12 @@ package main import ( "context" "fmt" + "log" + "strings" + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" - "go.mongodb.org/mongo-driver/mongo/readpref" - "log" - "strings" ) type dbCreator struct { @@ -25,7 +25,6 @@ func (d *dbCreator) Init() { if err != nil { log.Println("Can't establish connection with", daemonURL) log.Fatal(err) - } } }