diff --git a/cmd/tsbs_load_mongo/aggregate_loader.go b/cmd/tsbs_load_mongo/aggregate_loader.go index 00033b889..70dd30194 100644 --- a/cmd/tsbs_load_mongo/aggregate_loader.go +++ b/cmd/tsbs_load_mongo/aggregate_loader.go @@ -1,18 +1,21 @@ package main import ( + "context" "fmt" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" "hash/fnv" "log" "sync" "time" - - "github.com/globalsign/mgo" - "github.com/globalsign/mgo/bson" + //"github.com/globalsign/mgo" + //"github.com/globalsign/mgo/bson" "github.com/timescale/tsbs/load" "github.com/timescale/tsbs/pkg/data" "github.com/timescale/tsbs/pkg/targets" - "github.com/timescale/tsbs/pkg/targets/mongo" + tsbsmongo "github.com/timescale/tsbs/pkg/targets/mongo" //to resolve name collision with mongo driver ) type hostnameIndexer struct { @@ -20,8 +23,8 @@ type hostnameIndexer struct { } func (i *hostnameIndexer) GetIndex(item data.LoadedPoint) uint { - p := item.Data.(*mongo.MongoPoint) - t := &mongo.MongoTag{} + p := item.Data.(*tsbsmongo.MongoPoint) + t := &tsbsmongo.MongoTag{} for j := 0; j < p.TagsLength(); j++ { p.Tags(t, j) key := string(t.Key()) @@ -29,7 +32,10 @@ func (i *hostnameIndexer) GetIndex(item data.LoadedPoint) uint { // the hostame is the defacto index for devops tags // the truck name is the defacto index for iot tags h := fnv.New32a() - h.Write([]byte(string(t.Value()))) + _, err := h.Write(t.Value()) + if err != nil { + panic("cannot write value to hash") + } return uint(h.Sum32()) % i.partitions } } @@ -78,7 +84,7 @@ var pPool = &sync.Pool{New: func() interface{} { return &point{} }} type aggProcessor struct { dbc *dbCreator - collection *mgo.Collection + collection *mongo.Collection createdDocs map[string]bool createQueue []interface{} @@ -86,13 +92,11 @@ type aggProcessor struct { func (p *aggProcessor) Init(_ int, doLoad, _ bool) { if doLoad { - sess := p.dbc.session.Copy() - db := sess.DB(loader.DatabaseName()) - p.collection = db.C(collectionName) + db := p.dbc.client.Database(loader.DatabaseName()) + p.collection = db.Collection(collectionName) } p.createdDocs = make(map[string]bool) p.createQueue = []interface{}{} - } // ProcessBatch receives a batch of bson.M documents (BSON maps) that @@ -127,7 +131,7 @@ func (p *aggProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uint6 eventCnt := uint64(0) for _, event := range batch.arr { tagsMap := map[string]string{} - t := &mongo.MongoTag{} + t := &tsbsmongo.MongoTag{} for j := 0; j < event.TagsLength(); j++ { event.Tags(t, j) tagsMap[string(t.Key())] = string(t.Value()) @@ -161,7 +165,7 @@ func (p *aggProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uint6 } x := pPool.Get().(*point) x.Fields = map[string]interface{}{} - f := &mongo.MongoReading{} + f := &tsbsmongo.MongoReading{} for j := 0; j < event.FieldsLength(); j++ { event.Fields(f, j) x.Fields[string(f.Key())] = f.Value() @@ -171,38 +175,36 @@ func (p *aggProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uint6 docToEvents[docKey] = append(docToEvents[docKey], x) } + coll := p.dbc.client.Database("benchmark").Collection(collectionName) if doLoad { // Checks if any new documents need to be made and does so - bulk := p.collection.Bulk() - bulk = insertNewAggregateDocs(p.collection, bulk, p.createQueue) + //bulk := p.collection.Bulk() + insertNewAggregateDocs(coll, p.dbc.ctx, p.createQueue) p.createQueue = p.createQueue[:0] - // For each document, create one 'set' command for all records // that belong to the document for docKey, events := range docToEvents { - selector := bson.M{aggDocID: docKey} updateMap := bson.M{} for _, event := range events { minKey := (event.Timestamp / (1e9 * 60)) % 60 secKey := (event.Timestamp / 1e9) % 60 key := fmt.Sprintf("events.%d.%d", minKey, secKey) val := event.Fields - val[timestampField] = event.Timestamp updateMap[key] = val } - update := bson.M{"$set": updateMap} - bulk.Update(selector, update) - } - - // All documents accounted for, finally run the operation - _, err := bulk.Run() - if err != nil { - log.Fatalf("Bulk aggregate update err: %s\n", err.Error()) + filter := bson.M{ + aggDocID: bson.M{ + "$eq": docKey, //check if aggDocID field has value of docKey + }, + } + _, err := coll.UpdateOne(p.dbc.ctx, filter, update) + if err != nil { + log.Fatalf(" Update err: %s\n", err.Error()) + } } - for _, events := range docToEvents { for _, e := range events { delete(e.Fields, timestampField) @@ -215,8 +217,7 @@ func (p *aggProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uint6 // insertNewAggregateDocs handles creating new aggregated documents when new devices // or time periods are encountered -func insertNewAggregateDocs(collection *mgo.Collection, bulk *mgo.Bulk, createQueue []interface{}) *mgo.Bulk { - b := bulk +func insertNewAggregateDocs(collection *mongo.Collection, ctx context.Context, createQueue []interface{}) { if len(createQueue) > 0 { off := 0 for off < len(createQueue) { @@ -224,17 +225,12 @@ func insertNewAggregateDocs(collection *mgo.Collection, bulk *mgo.Bulk, createQu if l > len(createQueue) { l = len(createQueue) } - - b.Insert(createQueue[off:l]...) - _, err := b.Run() + opts := options.InsertMany().SetOrdered(false) + _, err := collection.InsertMany(ctx, createQueue[off:l], opts) if err != nil { - log.Fatalf("Bulk aggregate docs err: %s\n", err.Error()) + log.Fatalf("Insert many aggregate docs err: %s\n", err.Error()) } - b = collection.Bulk() - off = l } } - - return b } diff --git a/cmd/tsbs_load_mongo/creator.go b/cmd/tsbs_load_mongo/creator.go index 0fbcc1e33..861f92a06 100644 --- a/cmd/tsbs_load_mongo/creator.go +++ b/cmd/tsbs_load_mongo/creator.go @@ -1,29 +1,36 @@ package main import ( + "context" "fmt" "log" "strings" - "github.com/globalsign/mgo" - "github.com/globalsign/mgo/bson" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" ) type dbCreator struct { - session *mgo.Session + client *mongo.Client + ctx context.Context + cancel context.CancelFunc } func (d *dbCreator) Init() { + //log.Println("tsbs_load_mongo/creator/Init") var err error - d.session, err = mgo.DialWithTimeout(daemonURL, writeTimeout) + d.ctx, d.cancel = context.WithTimeout(context.Background(), writeTimeout) + d.client, err = mongo.Connect(d.ctx, options.Client().ApplyURI(daemonURL)) if err != nil { + log.Println("Can't establish connection with", daemonURL) log.Fatal(err) } - d.session.SetMode(mgo.Eventual, false) } func (d *dbCreator) DBExists(dbName string) bool { - dbs, err := d.session.DatabaseNames() + //log.Println("tsbs_load_mongo/creator/DBExists") + dbs, err := d.client.ListDatabaseNames(d.ctx, bson.D{}) if err != nil { log.Fatal(err) } @@ -36,74 +43,67 @@ func (d *dbCreator) DBExists(dbName string) bool { } func (d *dbCreator) RemoveOldDB(dbName string) error { - collections, err := d.session.DB(dbName).CollectionNames() + //log.Println("tsbs_load_mongo/creator/RemoveOldDB") + collection_names, err := d.client.Database(dbName).ListCollectionNames(d.ctx, bson.D{}) + log.Printf("collection_names : %s", collection_names) if err != nil { return err } - for _, name := range collections { - d.session.DB(dbName).C(name).DropCollection() + for _, coll := range collection_names { + log.Printf("collection found : %s", coll) + log.Println("deleting the previous collection") + err := d.client.Database(dbName).Collection(coll).Drop(d.ctx) + if err != nil { + log.Printf("Could not delete collection : %s", err.Error()) + } } - return nil } func (d *dbCreator) CreateDB(dbName string) error { - cmd := make(bson.D, 0, 4) - cmd = append(cmd, bson.DocElem{Name: "create", Value: collectionName}) - - // wiredtiger settings - cmd = append(cmd, bson.DocElem{ - Name: "storageEngine", Value: map[string]interface{}{ - "wiredTiger": map[string]interface{}{ - "configString": "block_compressor=snappy", - }, - }, - }) - - err := d.session.DB(dbName).Run(cmd, nil) + //Starting in MongoDB 3.2, the WiredTiger storage engine is the default storage engine + err := d.client.Database(dbName).CreateCollection(d.ctx, collectionName) if err != nil { if strings.Contains(err.Error(), "already exists") { + log.Printf("collection %s already exists", dbName) return nil } + log.Printf("create collection err: %v", err) return fmt.Errorf("create collection err: %v", err) } - - collection := d.session.DB(dbName).C(collectionName) - var key []string + collection := d.client.Database(dbName).Collection(collectionName) + var key bson.D if documentPer { - key = []string{"measurement", "tags.hostname", timestampField} + key = bson.D{{"measurement", 1}, {"tags.hostname", 1}, {timestampField, 1}} } else { - key = []string{aggKeyID, "measurement", "tags.hostname"} + key = bson.D{{aggKeyID, 1}, {"measurement", 1}, {"tags.hostname", 1}} } - - index := mgo.Index{ - Key: key, - Unique: false, // Unique does not work on the entire array of tags! - Background: false, - Sparse: false, + index := mongo.IndexModel{ + Keys: key, + Options: options.Index().SetName("default_index"), } - err = collection.EnsureIndex(index) + idxview := collection.Indexes() + _, err = idxview.CreateOne(d.ctx, index) if err != nil { - return fmt.Errorf("create basic index err: %v", err) + log.Printf("create index err: %v", err) + panic(err) } - // To make updates for new records more efficient, we need a efficient doc // lookup index if !documentPer { - err = collection.EnsureIndex(mgo.Index{ - Key: []string{aggDocID}, - Unique: false, - Background: false, - Sparse: false, + _, err := idxview.CreateOne(d.ctx, mongo.IndexModel{ + Keys: bson.D{{aggDocID, 1}}, + Options: options.Index().SetName("doc_lookup_index"), }) if err != nil { - return fmt.Errorf("create agg doc index err: %v", err) + log.Printf("create index err: %v", err) + panic(err) } } - return nil } func (d *dbCreator) Close() { - d.session.Close() + //closing the database connection here + //causes an error in the bulk loading } diff --git a/cmd/tsbs_load_mongo/document_per_loader.go b/cmd/tsbs_load_mongo/document_per_loader.go index c24c01f3b..59ce98d76 100644 --- a/cmd/tsbs_load_mongo/document_per_loader.go +++ b/cmd/tsbs_load_mongo/document_per_loader.go @@ -1,13 +1,13 @@ package main import ( + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" "log" "sync" - - "github.com/globalsign/mgo" "github.com/timescale/tsbs/load" "github.com/timescale/tsbs/pkg/targets" - "github.com/timescale/tsbs/pkg/targets/mongo" + tsbsmongo "github.com/timescale/tsbs/pkg/targets/mongo" ) // naiveBenchmark allows you to run a benchmark using the naive, one document per @@ -39,16 +39,15 @@ var spPool = &sync.Pool{New: func() interface{} { return &singlePoint{} }} type naiveProcessor struct { dbc *dbCreator - collection *mgo.Collection + collection *mongo.Collection pvs []interface{} } func (p *naiveProcessor) Init(_ int, doLoad, _ bool) { if doLoad { - sess := p.dbc.session.Copy() - db := sess.DB(loader.DatabaseName()) - p.collection = db.C(collectionName) + db := p.dbc.client.Database(loader.DatabaseName()) + p.collection = db.Collection(collectionName) } p.pvs = []interface{}{} } @@ -70,12 +69,12 @@ func (p *naiveProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uin x.Timestamp = event.Timestamp() x.Fields = map[string]interface{}{} x.Tags = map[string]string{} - f := &mongo.MongoReading{} + f := &tsbsmongo.MongoReading{} for j := 0; j < event.FieldsLength(); j++ { event.Fields(f, j) x.Fields[string(f.Key())] = f.Value() } - t := &mongo.MongoTag{} + t := &tsbsmongo.MongoTag{} for j := 0; j < event.TagsLength(); j++ { event.Tags(t, j) x.Tags[string(t.Key())] = string(t.Value()) @@ -85,9 +84,8 @@ func (p *naiveProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uin } if doLoad { - bulk := p.collection.Bulk() - bulk.Insert(p.pvs...) - _, err := bulk.Run() + opts := options.InsertMany().SetOrdered(false) + _, err := p.collection.InsertMany(p.dbc.ctx, p.pvs, opts) if err != nil { log.Fatalf("Bulk insert docs err: %s\n", err.Error()) } diff --git a/cmd/tsbs_run_queries_mongo/main.go b/cmd/tsbs_run_queries_mongo/main.go index b00b68e02..c780fa9df 100644 --- a/cmd/tsbs_run_queries_mongo/main.go +++ b/cmd/tsbs_run_queries_mongo/main.go @@ -5,46 +5,53 @@ package main import ( + "context" "encoding/gob" "fmt" - "log" - "time" - "github.com/blagojts/viper" - "github.com/globalsign/mgo" - "github.com/globalsign/mgo/bson" + mgobson "github.com/globalsign/mgo/bson" "github.com/spf13/pflag" "github.com/timescale/tsbs/internal/utils" "github.com/timescale/tsbs/pkg/query" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readpref" + "log" + "time" ) // Program option vars: var ( daemonURL string timeout time.Duration + batchsize int32 ) // Global vars: var ( - runner *query.BenchmarkRunner - session *mgo.Session + runner *query.BenchmarkRunner + //session *mgo.Session\ + processedqueries int64 ) // Parse args: func init() { + log.Println("init") // needed for deserializing the mongo query from gob gob.Register([]interface{}{}) gob.Register(map[string]interface{}{}) gob.Register([]map[string]interface{}{}) gob.Register(bson.M{}) gob.Register([]bson.M{}) - + gob.Register(mgobson.M{}) + gob.Register([]mgobson.M{}) var config query.BenchmarkRunnerConfig config.AddToFlagSet(pflag.CommandLine) pflag.String("url", "mongodb://localhost:27017", "Daemon URL.") pflag.Duration("read-timeout", 30*time.Second, "Timeout value for individual queries") - + pflag.Int32("batchsize", 1, "Batch size for queries") pflag.Parse() err := utils.SetupConfigFile() @@ -59,42 +66,77 @@ func init() { daemonURL = viper.GetString("url") timeout = viper.GetDuration("read-timeout") - + batchsize = viper.GetInt32("batchsize") runner = query.NewBenchmarkRunner(config) + log.Println("exiting init") + processedqueries = 0 } func main() { - var err error - session, err = mgo.DialWithTimeout(daemonURL, timeout) - if err != nil { - log.Fatal(err) - } + log.Println("launching run") runner.Run(&query.MongoPool, newProcessor) } type processor struct { - collection *mgo.Collection + client *mongo.Client + collection *mongo.Collection + ctx context.Context + cancel context.CancelFunc } func newProcessor() query.Processor { return &processor{} } func (p *processor) Init(workerNumber int) { - sess := session.Copy() - db := sess.DB(runner.DatabaseName()) - p.collection = db.C("point_data") + var err error + p.ctx, p.cancel = context.WithTimeout(context.Background(), timeout) + log.Println("TRYING TO CONNECT") + p.client, err = mongo.Connect(p.ctx, options.Client().ApplyURI(daemonURL)) + db := p.client.Database(runner.DatabaseName()) + p.collection = db.Collection("point_data") + if err != nil { + log.Println("DID NOT MANAGE TO CONNECT") + log.Fatal(err) + } else { + err = p.client.Ping(p.ctx, readpref.Primary()) + if err != nil { + log.Println("DID NOT MANAGE TO CONNECT") + log.Fatal(err) + } else { + log.Println("MANAGED TO CONNECT") + } + } } func (p *processor) ProcessQuery(q query.Query, _ bool) ([]*query.Stat, error) { + processedqueries++ + log.Printf("processed queries : %d", processedqueries) mq := q.(*query.Mongo) start := time.Now().UnixNano() - pipe := p.collection.Pipe(mq.BsonDoc).AllowDiskUse() - iter := pipe.Iter() + //mgo.Collection.Pipe() + var opts options.AggregateOptions + opts.SetAllowDiskUse(true) + opts.SetBatchSize(batchsize) + //log.Println("creating cursor") + cursor, err := p.collection.Aggregate(p.ctx, mq.BsonDoc, &opts) + if err != nil { + log.Fatal(err.Error()) + } + //log.Println("cursor created") + //pipe := p.collection.Pipe(mq.BsonDoc).AllowDiskUse() + //defer cursor.Close(p.ctx) + //iter := pipe.Iter() if runner.DebugLevel() > 0 { fmt.Println(mq.BsonDoc) } - var result map[string]interface{} + //var result map[string]interface{} cnt := 0 - for iter.Next(&result) { + //iter.Next(&result) + //log.Println("going through cursor") + for cursor.Next(p.ctx) { + var result bson.M + if err := cursor.Decode(&result); err != nil { + log.Println(err) + } if runner.DoPrintResponses() { fmt.Printf("ID %d: %v\n", q.GetID(), result) } @@ -103,8 +145,6 @@ func (p *processor) ProcessQuery(q query.Query, _ bool) ([]*query.Stat, error) { if runner.DebugLevel() > 0 { fmt.Println(cnt) } - err := iter.Close() - took := time.Now().UnixNano() - start lag := float64(took) / 1e6 // milliseconds stat := query.GetStat() diff --git a/go.mod b/go.mod index 1106116ed..df049189c 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/tklauser/go-sysconf v0.3.5 // indirect github.com/transceptor-technology/go-qpack v0.0.0-20190116123619-49a14b216a45 github.com/valyala/fasthttp v1.15.1 + go.mongodb.org/mongo-driver v1.5.0 go.uber.org/atomic v1.6.0 golang.org/x/net v0.0.0-20200904194848-62affa334b73 golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e diff --git a/go.sum b/go.sum index 6b3ac40ef..5322e9b0d 100644 --- a/go.sum +++ b/go.sum @@ -108,6 +108,7 @@ github.com/asaskevich/govalidator v0.0.0-20200108200545-475eaeb16496/go.mod h1:o github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU= github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.34.9/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= +github.com/aws/aws-sdk-go v1.34.28/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48= github.com/aws/aws-sdk-go v1.35.13 h1:Y49GifH2czbooBMkVpoXwokur1JRBFKVLVCQzO0YsW8= github.com/aws/aws-sdk-go v1.35.13/go.mod h1:tlPOdRjfxPBpNIwqDj61rmsnA85v9jc0Ps9+muhnW+k= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= @@ -298,6 +299,7 @@ github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-toolsmith/astcast v1.0.0/go.mod h1:mt2OdQTeAQcY4DQgPSArJjHCcOwlX+Wl/kwN+LbLGQ4= github.com/go-toolsmith/astcopy v1.0.0/go.mod h1:vrgyG+5Bxrnz4MZWPF+pI4R8h3qKRjjyvV/DSez4WVQ= @@ -743,6 +745,8 @@ github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtP github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.4.0 h1:u3Z1r+oOXJIkxqw34zVhyPgjBsm6X2wn21NWs/HfSeg= github.com/pelletier/go-toml v1.4.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUruD3k1mMwo= +github.com/pelletier/go-toml v1.7.0 h1:7utD74fnzVc/cpcyy8sjrlFr5vYpypUixARcHIMIGuI= +github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/peterh/liner v1.0.1-0.20180619022028-8c1271fcf47f/go.mod h1:xIteQHvHuaLYG9IFj6mSxM0fCKrs34IrEQUhOYuGPHc= @@ -895,6 +899,7 @@ github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69 github.com/tdakkota/asciicheck v0.0.0-20200416190851-d7f85be797a2/go.mod h1:yHp0ai0Z9gUljN3o0xMhYJnH/IcvkdTBOX2fmJ93JEM= github.com/testcontainers/testcontainers-go v0.5.1/go.mod h1:Oc/G02bjZiX0p3lzyh6b1GCELP0e4/6Cg3ciU/LnFvU= github.com/tetafro/godot v0.4.8/go.mod h1:/7NLHhv08H1+8DNj0MElpAACw1ajsCuf3TKNQxA5S+0= +github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/timakin/bodyclose v0.0.0-20190930140734-f7f2e9bca95e/go.mod h1:Qimiffbc6q9tBWlVV6x0P9sat/ao1xEkREYPPj9hphk= github.com/timescale/promscale v0.0.0-20201006153045-6a66a36f5c84 h1:jdJdzLyz0SNBuvt5rYyBxDqhgZ2EcbA7eWVBMqcyEHc= @@ -927,12 +932,16 @@ github.com/valyala/quicktemplate v1.6.2/go.mod h1:mtEJpQtUiBV0SHhMX6RtiJtqxncgrf github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw= github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc h1:n+nNi93yXLkJvKwXNP9d55HC7lGK4H/SRcwB5IaUZLo= github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg= github.com/xlab/treeprint v1.0.0/go.mod h1:IoImgRak9i3zJyuxOKUP1v4UZd1tMoKkq/Cimt1uhCg= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -945,7 +954,10 @@ go.mongodb.org/mongo-driver v1.0.3/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qL go.mongodb.org/mongo-driver v1.1.1/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.1.2/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.3.0/go.mod h1:MSWZXKOynuguX+JSvwP8i+58jYCXxbia8HS3gZBapIE= +go.mongodb.org/mongo-driver v1.3.2 h1:IYppNjEV/C+/3VPbhHVxQ4t04eVW0cLp0/pNdW++6Ug= go.mongodb.org/mongo-driver v1.3.2/go.mod h1:MSWZXKOynuguX+JSvwP8i+58jYCXxbia8HS3gZBapIE= +go.mongodb.org/mongo-driver v1.5.0 h1:REddm85e1Nl0JPXGGhgZkgJdG/yOe6xvpXUcYK5WLt0= +go.mongodb.org/mongo-driver v1.5.0/go.mod h1:boiGPFqyBs5R0R5qf2ErokGRekMfwn+MqKaUyHs7wy0= go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= @@ -984,6 +996,7 @@ golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191202143827-86a70503ff7e/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a h1:vclmkQCjlDX5OydZ9wv8rBCcS0QyQY66Mpf/7BZbInM= @@ -1078,6 +1091,7 @@ golang.org/x/sync v0.0.0-20190412183630-56d357773e84/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 h1:qwRHBd0NqMbJxfbotnDhm2ByMI1Shq4Y6oRJo21SGJA= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/pkg/targets/timestream/common_dimensions_processor.go b/pkg/targets/timestream/common_dimensions_processor.go index d7b1ef786..c4926d2b4 100644 --- a/pkg/targets/timestream/common_dimensions_processor.go +++ b/pkg/targets/timestream/common_dimensions_processor.go @@ -54,6 +54,7 @@ func (c *commonDimensionsProcessor) expandDimensionBuffer(requiredDimensions int } } func (c *commonDimensionsProcessor) writeToTable(table string, rows []deserializedPoint) (metricCount uint64, err error) { + log.Printf("inserting into table %s the number of row %d ", table, len(rows)) for _, row := range rows { c.expandDimensionBuffer(len(row.tagKeys)) numDimensions := convertTagsToDimensions(row.tagKeys, row.tags, c._dimensionsBuffer) diff --git a/pkg/targets/timestream/db_creator.go b/pkg/targets/timestream/db_creator.go index 29fc2655a..dee688a4e 100644 --- a/pkg/targets/timestream/db_creator.go +++ b/pkg/targets/timestream/db_creator.go @@ -83,7 +83,9 @@ func (d *dbCreator) PostCreateDB(dbName string) error { log.Println("Creating Timestream tables") headers := d.ds.Headers() var requiredTables []string + log.Printf("We need the following tables %v", headers.FieldKeys) for tableName := range headers.FieldKeys { + log.Printf("trying to create table : %s", tableName) requiredTables = append(requiredTables, tableName) createTableInput := ×treamwrite.CreateTableInput{ DatabaseName: &dbName, @@ -94,13 +96,38 @@ func (d *dbCreator) PostCreateDB(dbName string) error { TableName: &tableName, } _, err := d.writeSvc.CreateTable(createTableInput) - if _, ok := err.(*timestreamwrite.ConflictException); !ok { - return errors.Wrap(err, "could not create table '"+tableName+"': ") - } else { - log.Println("Table " + tableName + " exists, skipping create") + switch err.(type) { + case *timestreamwrite.ConflictException: + log.Printf("failure in creating : %s", tableName) + return errors.Wrap(err, "could not create table '"+tableName+"' because it already exists: ") + case *timestreamwrite.ValidationException: + log.Printf("failure in creating : %s", tableName) + return errors.Wrap(err, "could not create table '"+tableName+"' because the request is malformed: ") + case *timestreamwrite.AccessDeniedException: + log.Printf("failure in creating : %s", tableName) + return errors.Wrap(err, "could not create table '"+tableName+"' because access denied: ") + case *timestreamwrite.ResourceNotFoundException: + log.Printf("failure in creating : %s", tableName) + return errors.Wrap(err, "could not create table '"+tableName+"' because resource not found : ") + case *timestreamwrite.ServiceQuotaExceededException: + log.Printf("failure in creating : %s", tableName) + return errors.Wrap(err, "could not create table '"+tableName+"' because service quota exceeded : ") + case *timestreamwrite.ThrottlingException: + log.Printf("failure in creating : %s", tableName) + return errors.Wrap(err, "could not create table '"+tableName+"' because too many requests were "+ + "made by a user exceeding service quotas. The request was throttled : ") + case *timestreamwrite.InternalServerException: + log.Printf("failure in creating : %s", tableName) + return errors.Wrap(err, "could not create table '"+tableName+"' because Timestream was unable "+ + "to fully process this request because of an internal server error : ") + case *timestreamwrite.InvalidEndpointException: + log.Printf("failure in creating : %s", tableName) + return errors.Wrap(err, "could not create table '"+tableName+"' because the requested endpoint "+ + "was invalid : ") + default: + log.Printf("success in creating : %s", tableName) } } - fmt.Println("DB created, checking table status") if err := d.waitForTables(dbName, requiredTables); err != nil { return errors.Wrap(err, "could not create timestream tables")