diff --git a/Makefile b/Makefile index f6a73c0c3..76b8ef413 100644 --- a/Makefile +++ b/Makefile @@ -20,6 +20,7 @@ loaders: tsbs_load \ tsbs_load_cassandra \ tsbs_load_clickhouse \ tsbs_load_cratedb \ + tsbs_load_doris \ tsbs_load_influx \ tsbs_load_mongo \ tsbs_load_prometheus \ @@ -32,6 +33,7 @@ runners: tsbs_run_queries_akumuli \ tsbs_run_queries_cassandra \ tsbs_run_queries_clickhouse \ tsbs_run_queries_cratedb \ + tsbs_run_queries_doris \ tsbs_run_queries_influx \ tsbs_run_queries_mongo \ tsbs_run_queries_siridb \ @@ -66,3 +68,8 @@ lint: fmt: $(GOFMT) ./... + +clean: + @echo 'clean you make directory "./bin"';\ + rm -rf bin + diff --git a/cmd/tsbs_generate_queries/databases/clickhouse/devops.go b/cmd/tsbs_generate_queries/databases/clickhouse/devops.go index a10144bde..d9c0c2266 100644 --- a/cmd/tsbs_generate_queries/databases/clickhouse/devops.go +++ b/cmd/tsbs_generate_queries/databases/clickhouse/devops.go @@ -76,9 +76,11 @@ const clickhouseTimeStringFormat = "2006-01-02 15:04:05" // SELECT MAX(metric1), ..., MAX(metricN) // FROM cpu // WHERE -// (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N') -// AND time >= '$HOUR_START' -// AND time < '$HOUR_END' +// +// (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N') +// AND time >= '$HOUR_START' +// AND time < '$HOUR_END' +// // GROUP BY hour // ORDER BY hour // @@ -137,7 +139,7 @@ func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { hostnameField := "hostname" joinClause := "" if d.UseTags { - joinClause = "ANY INNER JOIN tags USING (id)" + joinClause = "INNER JOIN tags USING (id)" } sql := fmt.Sprintf(` @@ -290,9 +292,11 @@ func (d *Devops) LastPointPerHost(qi query.Query) { // SELECT minute, max(metric1), ..., max(metricN) // FROM cpu // WHERE -// (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N') -// AND time >= '$HOUR_START' -// AND time < '$HOUR_END' +// +// (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N') +// AND time >= '$HOUR_START' +// AND time < '$HOUR_END' +// // GROUP BY minute // ORDER BY minute ASC // diff --git a/cmd/tsbs_generate_queries/databases/clickhouse/devops_test.go b/cmd/tsbs_generate_queries/databases/clickhouse/devops_test.go index d127ea767..afbbdd06b 100644 --- a/cmd/tsbs_generate_queries/databases/clickhouse/devops_test.go +++ b/cmd/tsbs_generate_queries/databases/clickhouse/devops_test.go @@ -257,7 +257,7 @@ func TestGroupByTimeAndPrimaryTag(t *testing.T) { hour, id ) AS cpu_avg - ANY INNER JOIN tags USING (id) + INNER JOIN tags USING (id) ORDER BY hour ASC, hostname diff --git a/cmd/tsbs_generate_queries/databases/doris/common.go b/cmd/tsbs_generate_queries/databases/doris/common.go new file mode 100644 index 000000000..5602769d7 --- /dev/null +++ b/cmd/tsbs_generate_queries/databases/doris/common.go @@ -0,0 +1,44 @@ +package doris + +import ( + "time" + + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops" + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/utils" + "github.com/timescale/tsbs/pkg/query" +) + +// BaseGenerator contains settings specific for Doris. +type BaseGenerator struct { + UseTags bool +} + +// GenerateEmptyQuery returns an empty query.Doris. +func (g *BaseGenerator) GenerateEmptyQuery() query.Query { + return query.NewDoris() +} + +// fill Query fills the query struct with data +func (g *BaseGenerator) fillInQuery(qi query.Query, humanLabel, humanDesc, table, sql string) { + q := qi.(*query.Doris) + q.HumanLabel = []byte(humanLabel) + q.HumanDescription = []byte(humanDesc) + q.Table = []byte(table) + q.SqlQuery = []byte(sql) +} + +// NewDevops creates a new devops use case query generator. +func (g *BaseGenerator) NewDevops(start, end time.Time, scale int) (utils.QueryGenerator, error) { + core, err := devops.NewCore(start, end, scale) + + if err != nil { + return nil, err + } + + d := &Devops{ + BaseGenerator: g, + Core: core, + } + + return d, nil +} diff --git a/cmd/tsbs_generate_queries/databases/doris/devops.go b/cmd/tsbs_generate_queries/databases/doris/devops.go new file mode 100644 index 000000000..8352fcc25 --- /dev/null +++ b/cmd/tsbs_generate_queries/databases/doris/devops.go @@ -0,0 +1,336 @@ +package doris + +import ( + "fmt" + "strings" + "time" + + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops" + "github.com/timescale/tsbs/pkg/query" +) + +func panicIfErr(err error) { + if err != nil { + panic(err.Error()) + } +} + +// Devops produces Doris-specific queries for all the devops query types. +type Devops struct { + *BaseGenerator + *devops.Core +} + +// getHostWhereWithHostnames creates WHERE SQL statement for multiple hostnames. +// NOTE: 'WHERE' itself is not included, just hostname filter clauses, ready to concatenate to 'WHERE' string +func (d *Devops) getHostWhereWithHostnames(hostnames []string) string { + hostnameSelectionClauses := []string{} + + if d.UseTags { + // Use separated table for Tags + // Need to prepare WHERE with `tags` table + // WHERE tags_id IN (SELECT those tag.id FROM separated tags table WHERE ) + for _, s := range hostnames { + hostnameSelectionClauses = append(hostnameSelectionClauses, fmt.Sprintf("'%s'", s)) + } + return fmt.Sprintf("tags_id IN (SELECT tags_id FROM tags WHERE hostname IN (%s))", strings.Join(hostnameSelectionClauses, ",")) + } + + // Here we DO NOT use tags as a separate table + // So hostname is embedded into processed table itself and we can build direct WHERE statement as + // ((hostname = 'H1') OR (hostname = 'H2') ...) + + // All tags are included into one table + // Need to prepare WHERE (hostname = 'host1' OR hostname = 'host2') clause + for _, s := range hostnames { + hostnameSelectionClauses = append(hostnameSelectionClauses, fmt.Sprintf("hostname = '%s'", s)) + } + // (host=h1 OR host=h2) + return "(" + strings.Join(hostnameSelectionClauses, " OR ") + ")" +} + +// getHostWhereString gets multiple random hostnames and create WHERE SQL statement for these hostnames. +func (d *Devops) getHostWhereString(nhosts int) string { + hostnames, err := d.GetRandomHosts(nhosts) + panicIfErr(err) + return d.getHostWhereWithHostnames(hostnames) +} + +// getSelectClausesAggMetrics gets specified aggregate function clause for multiple memtrics +// Ex.: max(cpu_time) AS max_cpu_time +func (d *Devops) getSelectClausesAggMetrics(aggregateFunction string, metrics []string) []string { + selectAggregateClauses := make([]string, len(metrics)) + for i, metric := range metrics { + selectAggregateClauses[i] = fmt.Sprintf("%[1]s(%[2]s) AS %[1]s_%[2]s", aggregateFunction, metric) + } + return selectAggregateClauses +} + +// Doris understands and can compare time presented as strings of this format +const dorisTimeStringFormat = "2006-01-02 15:04:05" + +// MaxAllCPU selects the MAX of all metrics under 'cpu' per hour for nhosts hosts, +// e.g. in pseudo-SQL: +// +// SELECT MAX(metric1), ..., MAX(metricN) +// FROM cpu +// WHERE +// +// (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N') +// AND time >= '$HOUR_START' +// AND time < '$HOUR_END' +// +// GROUP BY hour +// ORDER BY hour +// +// Resultsets: +// cpu-max-all-1 +// cpu-max-all-8 +func (d *Devops) MaxAllCPU(qi query.Query, nHosts int, duration time.Duration) { + interval := d.Interval.MustRandWindow(duration) + metrics := devops.GetAllCPUMetrics() + selectClauses := d.getSelectClausesAggMetrics("max", metrics) + + sql := fmt.Sprintf(` + SELECT + DATE_TRUNC('HOUR', created_at) AS hour, + %s + FROM cpu + WHERE + %s + AND (created_at >= '%s') + AND (created_at < '%s') + GROUP BY hour + ORDER BY hour ASC; + `, + strings.Join(selectClauses, ", "), + d.getHostWhereString(nHosts), + interval.Start().Format(dorisTimeStringFormat), + interval.End().Format(dorisTimeStringFormat)) + + humanLabel := devops.GetMaxAllLabel("Doris", nHosts) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + d.fillInQuery(qi, humanLabel, humanDesc, devops.TableName, sql) +} + +// GroupByTimeAndPrimaryTag selects the AVG of numMetrics metrics under 'cpu' per device per hour for a day, +// e.g. in pseudo-SQL: +// +// SELECT AVG(metric1), ..., AVG(metricN) +// FROM cpu +// WHERE time >= '$HOUR_START' AND time < '$HOUR_END' +// GROUP BY hour, hostname +// ORDER BY hour +// +// Resultsets: +// double-groupby-1 +// double-groupby-5 +// double-groupby-all +func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { + metrics, err := devops.GetCPUMetricsSlice(numMetrics) + panicIfErr(err) + interval := d.Interval.MustRandWindow(devops.DoubleGroupByDuration) + + selectClauses := make([]string, numMetrics) + meanClauses := make([]string, numMetrics) + for i, m := range metrics { + meanClauses[i] = "mean_" + m + selectClauses[i] = fmt.Sprintf("avg(%s) AS %s", m, meanClauses[i]) + } + + hostnameField := "hostname" + joinClause := "" + if d.UseTags { + joinClause = "INNER JOIN tags ON cpu_avg.tags_id = tags.tags_id" + } + + sql := fmt.Sprintf(` + SELECT + hour, + %s + %s + FROM + ( + SELECT + DATE_TRUNC('HOUR', created_at) AS hour, + tags_id, + %s + FROM cpu + WHERE + created_at >= '%s' + AND created_at < '%s' + GROUP BY + hour, + tags_id + ) as cpu_avg + %s + ORDER BY + hour ASC, + %s + `, + hostnameField, // main SELECT %s + strings.Join(meanClauses, ", "), // main SELECT %s + strings.Join(selectClauses, ", "), // cpu_avg SELECT %s + interval.Start().Format(dorisTimeStringFormat), // cpu_avg time >= '%s' + interval.End().Format(dorisTimeStringFormat), // cpu_avg time < '%s' + joinClause, // JOIN clause + hostnameField) // ORDER BY %s + + humanLabel := devops.GetDoubleGroupByLabel("Doris", numMetrics) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + d.fillInQuery(qi, humanLabel, humanDesc, devops.TableName, sql) +} + +// GroupByOrderByLimit populates a query.Query that has a time WHERE clause, that groups by a truncated date, orders by that date, and takes a limit: +// SELECT time_bucket('1 minute', time) AS t, MAX(cpu) +// FROM cpu +// WHERE time < '$TIME' +// GROUP BY t +// ORDER BY t DESC +// LIMIT $LIMIT +// +// Resultsets: +// groupby-orderby-limit +func (d *Devops) GroupByOrderByLimit(qi query.Query) { + interval := d.Interval.MustRandWindow(time.Hour) + + sql := fmt.Sprintf(` + SELECT + DATE_TRUNC('MINUTE', CAST(created_at AS DATETIME)) AS minute, + MAX(usage_user) AS max_usage_user + FROM cpu + WHERE + created_at < '%s' + GROUP BY minute + ORDER BY minute DESC + LIMIT 5; + `, + interval.End().Format(dorisTimeStringFormat)) + + humanLabel := "Doris max cpu over last 5 min-intervals (random end)" + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.EndString()) + d.fillInQuery(qi, humanLabel, humanDesc, devops.TableName, sql) +} + +// HighCPUForHosts populates a query that gets CPU metrics when the CPU has high +// usage between a time period for a number of hosts (if 0, it will search all hosts), +// e.g. in pseudo-SQL: +// +// SELECT * FROM cpu +// WHERE usage_user > 90.0 +// AND time >= '$TIME_START' AND time < '$TIME_END' +// AND (hostname = '$HOST' OR hostname = '$HOST2'...) +// +// Resultsets: +// high-cpu-1 +// high-cpu-all +func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) { + var hostWhereClause string + if nHosts == 0 { + hostWhereClause = "" + } else { + hostWhereClause = fmt.Sprintf("AND (%s)", d.getHostWhereString(nHosts)) + } + interval := d.Interval.MustRandWindow(devops.HighCPUDuration) + + sql := fmt.Sprintf(` + SELECT * + FROM cpu + WHERE (usage_user > 90.0) + AND (created_at >= '%s') + AND (created_at < '%s') + %s + `, + interval.Start().Format(dorisTimeStringFormat), + interval.End().Format(dorisTimeStringFormat), + hostWhereClause) + + humanLabel, err := devops.GetHighCPULabel("Doris", nHosts) + panicIfErr(err) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + d.fillInQuery(qi, humanLabel, humanDesc, devops.TableName, sql) +} + +// LastPointPerHost finds the last row for every host in the dataset +// +// Resultsets: +// lastPoint +func (d *Devops) LastPointPerHost(qi query.Query) { + var sql string + if d.UseTags { + // get the lastest tags + sql = fmt.Sprintf(` + SELECT + c.*, + t.* + FROM ( + SELECT + cpu.* + FROM + cpu + INNER JOIN ( + SELECT + tags_id, + MAX(time) AS max_time + FROM + cpu + GROUP BY + tags_id + ) AS latest ON cpu.tags_id = latest.tags_id AND cpu.time = latest.max_time + ) c + INNER JOIN tags t ON c.tags_id = t.tags_id + ORDER BY + t.hostname ASC, + c.time DESC; + `) + } else { + sql = fmt.Sprintf(` + SELECT DISTINCT(hostname), * + FROM cpu + ORDER BY + hostname, + time DESC + `) + } + + humanLabel := "Doris last row per host" + humanDesc := humanLabel + d.fillInQuery(qi, humanLabel, humanDesc, devops.TableName, sql) +} + +// GroupByTime selects the MAX for numMetrics metrics under 'cpu', +// per minute for nhosts hosts +// Resultsets: +// single-groupby-1-1-12 +// single-groupby-1-1-1 +// single-groupby-1-8-1 +// single-groupby-5-1-12 +// single-groupby-5-1-1 +// single-groupby-5-8-1 +func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange time.Duration) { + interval := d.Interval.MustRandWindow(timeRange) + metrics, err := devops.GetCPUMetricsSlice(numMetrics) + panicIfErr(err) + selectClauses := d.getSelectClausesAggMetrics("max", metrics) + + sql := fmt.Sprintf(` + SELECT + DATE_TRUNC('MINUTE', CAST(created_at AS DATETIME)) AS minute, + %s + FROM cpu + WHERE + %s + AND (created_at >= '%s') + AND (created_at < '%s') + GROUP BY minute + ORDER BY minute; + `, + strings.Join(selectClauses, ", "), + d.getHostWhereString(nHosts), + interval.Start().Format(dorisTimeStringFormat), + interval.End().Format(dorisTimeStringFormat)) + + humanLabel := fmt.Sprintf("Doris %d cpu metric(s), random %4d hosts, random %s by 1m", numMetrics, nHosts, timeRange) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + d.fillInQuery(qi, humanLabel, humanDesc, devops.TableName, sql) +} diff --git a/cmd/tsbs_load_doris/main.go b/cmd/tsbs_load_doris/main.go new file mode 100644 index 000000000..88ced16c8 --- /dev/null +++ b/cmd/tsbs_load_doris/main.go @@ -0,0 +1,57 @@ +// tsbs_load_doris loads a Doris instance with data from stdin. +// +// If the database exists beforehand, it will be *DROPPED*. +package main + +import ( + "fmt" + "github.com/blagojts/viper" + "github.com/spf13/pflag" + "github.com/timescale/tsbs/internal/utils" + "github.com/timescale/tsbs/load" + "github.com/timescale/tsbs/pkg/targets" + "github.com/timescale/tsbs/pkg/targets/doris" +) + +// Global vars +var ( + target targets.ImplementedTarget +) + +var loader load.BenchmarkRunner +var loaderConf load.BenchmarkRunnerConfig +var conf *doris.DorisConfig + +// Parse args: +func init() { + loaderConf = load.BenchmarkRunnerConfig{} + target := doris.NewTarget() + loaderConf.AddToFlagSet(pflag.CommandLine) + target.TargetSpecificFlags("", pflag.CommandLine) + pflag.Parse() + + err := utils.SetupConfigFile() + + if err != nil { + panic(fmt.Errorf("fatal error config file: %s", err)) + } + + if err := viper.Unmarshal(&loaderConf); err != nil { + panic(fmt.Errorf("unable to decode config: %s", err)) + } + conf = &doris.DorisConfig{ + Host: viper.GetString("host"), + Port: viper.GetInt("port"), + User: viper.GetString("user"), + Password: viper.GetString("password"), + LogBatches: viper.GetBool("log-batches"), + Debug: viper.GetInt("debug"), + DbName: loaderConf.DBName, + } + + loader = load.GetBenchmarkRunner(loaderConf) +} + +func main() { + loader.RunBenchmark(doris.NewBenchmark(loaderConf.FileName, loaderConf.HashWorkers, conf)) +} diff --git a/cmd/tsbs_run_queries_doris/main.go b/cmd/tsbs_run_queries_doris/main.go new file mode 100644 index 000000000..3883335b2 --- /dev/null +++ b/cmd/tsbs_run_queries_doris/main.go @@ -0,0 +1,181 @@ +package main + +import ( + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/blagojts/viper" + _ "github.com/go-sql-driver/mysql" + "github.com/jmoiron/sqlx" + "github.com/spf13/pflag" + "github.com/timescale/tsbs/internal/utils" + "github.com/timescale/tsbs/pkg/query" +) + +// Program option vars: +var ( + chConnect string + hostsList []string + user string + password string + port int + showExplain bool +) + +// Global vars: +var ( + runner *query.BenchmarkRunner +) + +// Parse args: +func init() { + var config query.BenchmarkRunnerConfig + config.AddToFlagSet(pflag.CommandLine) + var hosts string + + pflag.String("additional-params", "sslmode=disable", + "String of additional Doris connection parameters, e.g., 'sslmode=disable'.") + pflag.String("hosts", "localhost", + "Comma separated list of Doris hosts (pass multiple values for sharding reads on a multi-node setup)") + pflag.String("port", "9030", "Port on which to listen for connections.") + pflag.String("user", "root", "User to connect to Doris as (default root)") + pflag.String("password", "", "Password to connect to Doris (default empty)") + + pflag.Parse() + + err := utils.SetupConfigFile() + + if err != nil { + panic(fmt.Errorf("fatal error config file: %s", err)) + } + + if err := viper.Unmarshal(&config); err != nil { + panic(fmt.Errorf("unable to decode config: %s", err)) + } + + chConnect = viper.GetString("additional-params") + hosts = viper.GetString("hosts") + port = viper.GetInt("port") + user = viper.GetString("user") + password = viper.GetString("password") + + // Parse comma separated string of hosts and put in a slice (for multi-node setups) + for _, host := range strings.Split(hosts, ",") { + hostsList = append(hostsList, host) + } + + runner = query.NewBenchmarkRunner(config) +} + +func main() { + runner.Run(&query.DorisPool, newProcessor) +} + +// Get the connection string for a connection to PostgreSQL. + +// If we're running queries against multiple nodes we need to balance the queries +// across replicas. Each worker is assigned a sequence number -- we'll use that +// to evenly distribute hosts to worker connections +func getConnectString(workerNumber int) string { + // Round-robin the host/worker assignment by assigning a host based on workerNumber % totalNumberOfHosts + host := hostsList[workerNumber%len(hostsList)] + + return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=UTC", user, password, host, port, runner.DatabaseName()) +} + +// prettyPrintResponse prints a Query and its response in JSON format with two +// keys: 'query' which has a value of the SQL used to generate the second key +// 'results' which is an array of each row in the return set. +func prettyPrintResponse(rows *sqlx.Rows, q *query.Doris) { + resp := make(map[string]interface{}) + resp["query"] = string(q.SqlQuery) + + results := []map[string]interface{}{} + for rows.Next() { + r := make(map[string]interface{}) + if err := rows.MapScan(r); err != nil { + panic(err) + } + results = append(results, r) + resp["results"] = results + } + + line, err := json.MarshalIndent(resp, "", " ") + if err != nil { + panic(err) + } + + fmt.Println(string(line) + "\n") +} + +type queryExecutorOptions struct { + showExplain bool + debug bool + printResponse bool +} + +// query.Processor interface implementation +type processor struct { + db *sqlx.DB + opts *queryExecutorOptions +} + +// query.Processor interface implementation +func newProcessor() query.Processor { + return &processor{} +} + +// Init query.Processor interface implementation +func (p *processor) Init(workerNumber int) { + p.db = sqlx.MustConnect("mysql", getConnectString(workerNumber)) + p.opts = &queryExecutorOptions{ + // Doris could not do EXPLAIN + showExplain: false, + debug: runner.DebugLevel() > 0, + printResponse: runner.DoPrintResponses(), + } +} + +// ProcessQuery query.Processor interface implementation +func (p *processor) ProcessQuery(q query.Query, isWarm bool) ([]*query.Stat, error) { + // No need to run again for EXPLAIN + if isWarm && p.opts.showExplain { + return nil, nil + } + + // Ensure Doris query + chQuery := q.(*query.Doris) + + start := time.Now() + + // SqlQuery is []byte, so cast is needed + sql := string(chQuery.SqlQuery) + + // Main action - run the query + rows, err := p.db.Queryx(sql) + if err != nil { + return nil, err + } + + // Print some extra info if needed + if p.opts.debug { + fmt.Println(sql) + } + if p.opts.printResponse { + prettyPrintResponse(rows, chQuery) + } + + // Finalize the query + err = rows.Close() + if err != nil { + return nil, err + } + took := float64(time.Since(start).Nanoseconds()) / 1e6 + + stat := query.GetStat() + stat.Init(q.HumanLabelName(), took) + + return []*query.Stat{stat}, err +} diff --git a/internal/inputs/generator_data.go b/internal/inputs/generator_data.go index ee639659c..0fa4db1dc 100644 --- a/internal/inputs/generator_data.go +++ b/internal/inputs/generator_data.go @@ -133,13 +133,15 @@ func (g *DataGenerator) getSerializer(sim common.Simulator, target targets.Imple fallthrough case constants.FormatClickhouse: fallthrough + case constants.FormatDoris: + fallthrough case constants.FormatTimescaleDB: g.writeHeader(sim.Headers()) } return target.Serializer(), nil } -//TODO should be implemented in targets package +// TODO should be implemented in targets package func (g *DataGenerator) writeHeader(headers *common.GeneratedDataHeaders) { g.bufOut.WriteString("tags") diff --git a/pkg/query/Doris.go b/pkg/query/Doris.go new file mode 100644 index 000000000..d59d18a04 --- /dev/null +++ b/pkg/query/Doris.go @@ -0,0 +1,70 @@ +package query + +import ( + "fmt" + "sync" +) + +// Doris encodes a Doris query. +// This will be serialized for use by the tsbs_run_queries_Doris program. +type Doris struct { + HumanLabel []byte + HumanDescription []byte + + Table []byte // e.g. "cpu" + SqlQuery []byte + id uint64 +} + +// DorisPool is a sync.Pool of Doris Query types +var DorisPool = sync.Pool{ + New: func() interface{} { + return &Doris{ + HumanLabel: make([]byte, 0, 1024), + HumanDescription: make([]byte, 0, 1024), + Table: make([]byte, 0, 1024), + SqlQuery: make([]byte, 0, 1024), + } + }, +} + +// NewDoris returns a new Doris Query instance +func NewDoris() *Doris { + return DorisPool.Get().(*Doris) +} + +// GetID returns the ID of this Query +func (ch *Doris) GetID() uint64 { + return ch.id +} + +// SetID sets the ID for this Query +func (ch *Doris) SetID(n uint64) { + ch.id = n +} + +// String produces a debug-ready description of a Query. +func (ch *Doris) String() string { + return fmt.Sprintf("HumanLabel: %s, HumanDescription: %s, Table: %s, Query: %s", ch.HumanLabel, ch.HumanDescription, ch.Table, ch.SqlQuery) +} + +// HumanLabelName returns the human-readable name of this Query +func (ch *Doris) HumanLabelName() []byte { + return ch.HumanLabel +} + +// HumanDescriptionName returns the human readable description of this Query +func (ch *Doris) HumanDescriptionName() []byte { + return ch.HumanDescription +} + +// Release resets and returns this Query to its pool +func (ch *Doris) Release() { + ch.HumanLabel = ch.HumanLabel[:0] + ch.HumanDescription = ch.HumanDescription[:0] + + ch.Table = ch.Table[:0] + ch.SqlQuery = ch.SqlQuery[:0] + + DorisPool.Put(ch) +} diff --git a/pkg/query/factories/init_factories.go b/pkg/query/factories/init_factories.go index ff3faf47d..ff1f1218c 100644 --- a/pkg/query/factories/init_factories.go +++ b/pkg/query/factories/init_factories.go @@ -5,6 +5,7 @@ import ( "github.com/timescale/tsbs/cmd/tsbs_generate_queries/databases/cassandra" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/databases/clickhouse" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/databases/cratedb" + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/databases/doris" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/databases/influx" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/databases/mongo" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/databases/questdb" @@ -22,6 +23,9 @@ func InitQueryFactories(config *config.QueryGeneratorConfig) map[string]interfac factories[constants.FormatClickhouse] = &clickhouse.BaseGenerator{ UseTags: config.ClickhouseUseTags, } + factories[constants.FormatDoris] = &doris.BaseGenerator{ + UseTags: config.ClickhouseUseTags, + } factories[constants.FormatCrateDB] = &cratedb.BaseGenerator{} factories[constants.FormatInflux] = &influx.BaseGenerator{} factories[constants.FormatTimescaleDB] = ×caledb.BaseGenerator{ diff --git a/pkg/targets/clickhouse/creator.go b/pkg/targets/clickhouse/creator.go index 9e991d36b..3e38a2607 100644 --- a/pkg/targets/clickhouse/creator.go +++ b/pkg/targets/clickhouse/creator.go @@ -18,14 +18,14 @@ type dbCreator struct { config *ClickhouseConfig } -// loader.DBCreator interface implementation +// Init loader.DBCreator interface implementation func (d *dbCreator) Init() { // fills dbCreator struct with data structure (tables description) // specified at the beginning of the data file d.headers = d.ds.Headers() } -// loader.DBCreator interface implementation +// DBExists loader.DBCreator interface implementation func (d *dbCreator) DBExists(dbName string) bool { db := sqlx.MustConnect(dbType, getConnectString(d.config, false)) defer db.Close() @@ -52,7 +52,7 @@ func (d *dbCreator) DBExists(dbName string) bool { return false } -// loader.DBCreator interface implementation +// RemoveOldDB loader.DBCreator interface implementation func (d *dbCreator) RemoveOldDB(dbName string) error { db := sqlx.MustConnect(dbType, getConnectString(d.config, false)) defer db.Close() @@ -64,7 +64,7 @@ func (d *dbCreator) RemoveOldDB(dbName string) error { return nil } -// loader.DBCreator interface implementation +// CreateDB loader.DBCreator interface implementation func (d *dbCreator) CreateDB(dbName string) error { // Connect to ClickHouse in general and CREATE DATABASE db := sqlx.MustConnect(dbType, getConnectString(d.config, false)) @@ -143,7 +143,9 @@ func createMetricsTable(conf *ClickhouseConfig, db *sqlx.DB, tableName string, f tags_id UInt32, %s, additional_tags String DEFAULT '' - ) ENGINE = MergeTree(created_date, (tags_id, created_at), 8192) + ) ENGINE = MergeTree() + ORDER BY (created_date, (tags_id, created_at)) + SETTINGS index_granularity = 8192; `, tableName, strings.Join(columnsWithType, ",")) @@ -180,7 +182,9 @@ func generateTagsTableQuery(tagNames, tagTypes []string) string { "created_at DateTime DEFAULT now(),\n"+ "id UInt32,\n"+ "%s"+ - ") ENGINE = MergeTree(created_date, (%s), 8192)", + ") ENGINE = MergeTree()"+ + "ORDER BY (created_date, (%s))"+ + "SETTINGS index_granularity = 8192;", cols, index) } diff --git a/pkg/targets/clickhouse/processor.go b/pkg/targets/clickhouse/processor.go index c868592d4..094687456 100644 --- a/pkg/targets/clickhouse/processor.go +++ b/pkg/targets/clickhouse/processor.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/jmoiron/sqlx" "github.com/timescale/tsbs/pkg/targets" + "log" "strconv" "strings" "sync" @@ -17,7 +18,7 @@ type processor struct { conf *ClickhouseConfig } -// load.Processor interface implementation +// Init load.Processor interface implementation func (p *processor) Init(workerNum int, doLoad, hashWorkers bool) { if doLoad { p.db = sqlx.MustConnect(dbType, getConnectString(p.conf, true)) @@ -29,14 +30,17 @@ func (p *processor) Init(workerNum int, doLoad, hashWorkers bool) { } } -// load.ProcessorCloser interface implementation +// Close load.ProcessorCloser interface implementation func (p *processor) Close(doLoad bool) { if doLoad { - p.db.Close() + err := p.db.Close() + if err != nil { + log.Fatal(err) + } } } -// load.Processor interface implementation +// ProcessBatch load.Processor interface implementation func (p *processor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uint64) { batches := b.(*tableArr) rowCnt := 0 @@ -84,13 +88,12 @@ func (p *processor) processCSI(tableName string, rows []*insertData) uint64 { dataRows := make([][]interface{}, 0, len(rows)) ret := uint64(0) commonTagsLen := len(tableCols["tags"]) - colLen := len(tableCols[tableName]) + 2 if p.conf.InTableTag { colLen++ } - var tagsIdPosition int = 0 + var tagsIdPosition = 0 for _, row := range rows { // Split the tags into individual common tags and @@ -142,7 +145,7 @@ func (p *processor) processCSI(tableName string, rows []*insertData) uint64 { panic(err) } timeUTC := time.Unix(0, timestampNano) - TimeUTCStr := timeUTC.Format("2006-01-02 15:04:05.999999 -0700") + TimeUTCStr := timeUTC.UTC().Format("2006-01-02 15:04:05.999999 -0700") // use nil at 2-nd position as placeholder for tagKey r := make([]interface{}, 0, colLen) @@ -183,12 +186,17 @@ func (p *processor) processCSI(tableName string, rows []*insertData) uint64 { // New tags in this batch, need to be inserted newTags := make([][]string, 0, len(rows)) p.csi.mutex.RLock() + // judge by device hostname(unique) + seen := make(map[string]bool) for _, tagRow := range tagRows { // tagRow contains what was called `tags` earlier - see one screen higher // tagRow[0] = hostname if _, ok := p.csi.m[tagRow[0]]; !ok { // Tags of this hostname are not listed as inserted - new tags line, add it for creation - newTags = append(newTags, tagRow) + if !seen[tagRow[0]] { + newTags = append(newTags, tagRow) + seen[tagRow[0]] = true + } } } p.csi.mutex.RUnlock() @@ -324,7 +332,7 @@ func insertTags(conf *ClickhouseConfig, db *sqlx.DB, startID int, rows [][]strin // more details on the item: // https://blog.learngoprogramming.com/golang-variadic-funcs-how-to-patterns-369408f19085 // Passing a slice to variadic param with an empty-interface - var variadicArgs []interface{} = make([]interface{}, len(row)+1) // +1 here for additional 'id' column value + var variadicArgs = make([]interface{}, len(row)+1) // +1 here for additional 'id' column value // Place id at the beginning variadicArgs[0] = id // And all the rest of column values afterwards diff --git a/pkg/targets/constants/constants.go b/pkg/targets/constants/constants.go index 516093d09..45689810d 100644 --- a/pkg/targets/constants/constants.go +++ b/pkg/targets/constants/constants.go @@ -4,6 +4,7 @@ package constants const ( FormatCassandra = "cassandra" FormatClickhouse = "clickhouse" + FormatDoris = "doris" FormatInflux = "influx" FormatMongo = "mongo" FormatSiriDB = "siridb" @@ -20,6 +21,7 @@ func SupportedFormats() []string { return []string{ FormatCassandra, FormatClickhouse, + FormatDoris, FormatInflux, FormatMongo, FormatSiriDB, diff --git a/pkg/targets/doris/benchmark.go b/pkg/targets/doris/benchmark.go new file mode 100644 index 000000000..8d2c29b16 --- /dev/null +++ b/pkg/targets/doris/benchmark.go @@ -0,0 +1,135 @@ +package doris + +import ( + "bufio" + "fmt" + "log" + + "github.com/timescale/tsbs/load" + "github.com/timescale/tsbs/pkg/data" + "github.com/timescale/tsbs/pkg/targets" +) + +const dbType = "mysql" + +type DorisConfig struct { + Host string + Port int + User string + Password string + + LogBatches bool + InTableTag bool + Debug int + DbName string +} + +// String values of tags and fields to insert - string representation +type insertData struct { + tags string // hostname=host_0,region=eu-west-1,datacenter=eu-west-1b,rack=67,os=Ubuntu16.10,arch=x86,team=NYC,service=7,service_version=0,service_environment=production + fields string // 1451606400000000000,58,2,24,61,22,63,6,44,80,38 +} + +var tableCols map[string][]string + +var tagColumnTypes []string + +// allows for testing +var fatal = log.Fatalf + +// getConnectString() builds connect string to Doris +// db - whether database specification should be added to the connection string +func getConnectString(conf *DorisConfig, db bool) string { + // connectString: tcp://127.0.0.1:9000?debug=true + // Doris ex.: + if db { + return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=UTC", conf.User, conf.Password, conf.Host, conf.Port, conf.DbName) + } + + return fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&parseTime=True&loc=UTC", conf.User, conf.Password, conf.Host, conf.Port) +} + +// Point is a single row of data keyed by which table it belongs +// Ex.: +// tags,hostname=host_0,region=eu-west-1,datacenter=eu-west-1b,rack=67,os=Ubuntu16.10,arch=x86,team=NYC,service=7,service_version=0,service_environment=production +// cpu,1451606400000000000,58,2,24,61,22,63,6,44,80,38 +type point struct { + table string + row *insertData +} + +// scan.Batch interface implementation +type tableArr struct { + m map[string][]*insertData + cnt uint +} + +// scan.Batch interface implementation +func (ta *tableArr) Len() uint { + return ta.cnt +} + +// scan.Batch interface implementation +func (ta *tableArr) Append(item data.LoadedPoint) { + that := item.Data.(*point) + k := that.table + ta.m[k] = append(ta.m[k], that.row) + ta.cnt++ +} + +// scan.BatchFactory interface implementation +type factory struct{} + +// scan.BatchFactory interface implementation +func (f *factory) New() targets.Batch { + return &tableArr{ + m: map[string][]*insertData{}, + cnt: 0, + } +} + +const tagsPrefix = "tags" + +func NewBenchmark(file string, hashWorkers bool, conf *DorisConfig) targets.Benchmark { + return &benchmark{ + ds: &fileDataSource{ + scanner: bufio.NewScanner(load.GetBufferedReader(file)), + }, + hashWorkers: hashWorkers, + conf: conf, + } +} + +// targets.Benchmark interface implementation +type benchmark struct { + ds targets.DataSource + hashWorkers bool + conf *DorisConfig +} + +func (b *benchmark) GetDataSource() targets.DataSource { + return b.ds +} + +func (b *benchmark) GetBatchFactory() targets.BatchFactory { + return &factory{} +} + +func (b *benchmark) GetPointIndexer(maxPartitions uint) targets.PointIndexer { + if b.hashWorkers { + return &hostnameIndexer{ + partitions: maxPartitions, + } + } + return &targets.ConstantIndexer{} +} + +// loader.Benchmark interface implementation +func (b *benchmark) GetProcessor() targets.Processor { + return &processor{conf: b.conf} +} + +// loader.Benchmark interface implementation +func (b *benchmark) GetDBCreator() targets.DBCreator { + return &dbCreator{ds: b.GetDataSource(), config: b.conf} +} diff --git a/pkg/targets/doris/creator.go b/pkg/targets/doris/creator.go new file mode 100644 index 000000000..3c1508bdb --- /dev/null +++ b/pkg/targets/doris/creator.go @@ -0,0 +1,189 @@ +package doris + +import ( + "fmt" + "strings" + + _ "github.com/go-sql-driver/mysql" + "github.com/jmoiron/sqlx" + "github.com/timescale/tsbs/pkg/data/usecases/common" + "github.com/timescale/tsbs/pkg/targets" +) + +// loader.DBCreator interface implementation +type dbCreator struct { + ds targets.DataSource + headers *common.GeneratedDataHeaders + connStr string + config *DorisConfig +} + +// Init loader.DBCreator interface implementation +func (d *dbCreator) Init() { + // fills dbCreator struct with data structure (tables description) + // specified at the beginning of the data file + d.headers = d.ds.Headers() +} + +// DBExists loader.DBCreator interface implementation +func (d *dbCreator) DBExists(dbName string) bool { + db := sqlx.MustConnect(dbType, getConnectString(d.config, false)) + defer db.Close() + + const sql = `SELECT SCHEMA_NAME + FROM information_schema.SCHEMATA + WHERE SCHEMA_NAME = ?` + + if d.config.Debug > 0 { + fmt.Printf("[DEBUG] Checking database existence: %s\n", dbName+" SQL: "+sql) + } + + var exists bool + // 直接查询是否存在目标数据库 + err := db.Get(&exists, `SELECT EXISTS(SELECT 1 FROM information_schema.SCHEMATA WHERE SCHEMA_NAME = ?)`, dbName) + if err != nil { + panic(err) + } + + return exists +} + +// RemoveOldDB loader.DBCreator interface implementation +func (d *dbCreator) RemoveOldDB(dbName string) error { + db := sqlx.MustConnect(dbType, getConnectString(d.config, false)) + defer db.Close() + + sql := fmt.Sprintf("DROP DATABASE IF EXISTS %s", dbName) + if _, err := db.Exec(sql); err != nil { + panic(err) + } + return nil +} + +// CreateDB loader.DBCreator interface implementation +func (d *dbCreator) CreateDB(dbName string) error { + // Connect to Doris in general and CREATE DATABASE + db := sqlx.MustConnect(dbType, getConnectString(d.config, false)) + sql := fmt.Sprintf("CREATE DATABASE %s", dbName) + _, err := db.Exec(sql) + if err != nil { + panic(err) + } + db.Close() + db = nil + + // Connect to specified database within Doris + db = sqlx.MustConnect(dbType, getConnectString(d.config, true)) + defer db.Close() + + createTagsTable(d.config, db, d.headers.TagKeys, d.headers.TagTypes) + if tableCols == nil { + tableCols = make(map[string][]string) + } + tableCols["tags"] = d.headers.TagKeys + tagColumnTypes = d.headers.TagTypes + + for tableName, fieldColumns := range d.headers.FieldKeys { + //tableName: cpu + // fieldColumns content: + // usage_user,usage_system,usage_idle,usage_nice,usage_iowait,usage_irq,usage_softirq,usage_steal,usage_guest,usage_guest_nice + createMetricsTable(d.config, db, tableName, fieldColumns) + } + + return nil +} + +// createTagsTable builds CREATE TABLE SQL statement and runs it +func createTagsTable(conf *DorisConfig, db *sqlx.DB, tagNames, tagTypes []string) { + sql := generateTagsTableQuery(tagNames, tagTypes) + if conf.Debug > 0 { + fmt.Printf(sql) + } + _, err := db.Exec(sql) + if err != nil { + panic(err) + } +} + +// createMetricsTable builds CREATE TABLE SQL statement and runs it +func createMetricsTable(conf *DorisConfig, db *sqlx.DB, tableName string, fieldColumns []string) { + tableCols[tableName] = fieldColumns + + // We'll have some service columns in table to be created and columnNames contains all column names to be created + var columnNames []string + + if conf.InTableTag { + // First column in the table - service column - partitioning field + partitioningColumn := tableCols["tags"][0] // would be 'hostname' + columnNames = append(columnNames, partitioningColumn) + } + + // Add all column names from fieldColumns into columnNames + columnNames = append(columnNames, fieldColumns...) + + // columnsWithType - column specifications with type. Ex.: "cpu_usage Float64" + var columnsWithType []string + for _, column := range columnNames { + if len(column) == 0 { + // Skip nameless columns + continue + } + columnsWithType = append(columnsWithType, fmt.Sprintf("%s FLOAT", column)) + } + + sql := fmt.Sprintf(` + CREATE TABLE %s ( + tags_id BIGINT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + created_date DATE DEFAULT CURRENT_DATE, + time BIGINT, + %s, + additional_tags String DEFAULT '', + INDEX idx_created_at(created_at) USING INVERTED + ) DUPLICATE KEY(%s) + DISTRIBUTED BY HASH(%s) BUCKETS 10 PROPERTIES('replication_num' = '1') + `, + tableName, + strings.Join(columnsWithType, ","), + "`tags_id`,`created_at`", "`tags_id`,`created_at`") + + if conf.Debug > 0 { + fmt.Printf(sql) + } + _, err := db.Exec(sql) + if err != nil { + panic(err) + } +} + +func generateTagsTableQuery(tagNames, tagTypes []string) string { + // prepare COLUMNs specification for CREATE TABLE statement + // all columns would be of the type specified in the tags header + // e.g. tags, tag2 string,tag2 int32... + if len(tagNames) != len(tagTypes) { + panic("wrong number of tag names and tag types") + } + + tagColumnDefinitions := make([]string, len(tagNames)) + for i, tagName := range tagNames { + //tagType := serializedTypeToDorisType(tagTypes[i]) + tagColumnDefinitions[i] = fmt.Sprintf("%s %s", tagName, "varchar(30)") + } + + cols := strings.Join(tagColumnDefinitions, ",\n") + + return fmt.Sprintf(` + CREATE TABLE tags( + tags_id BIGINT, + created_date DATE DEFAULT CURRENT_DATE, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + %s, + INDEX idx_hostname(hostname) USING INVERTED, + INDEX idx_created_at(created_at) USING INVERTED + ) + DUPLICATE KEY(%s) + DISTRIBUTED BY HASH(tags_id) BUCKETS 10 + PROPERTIES("replication_num" = "1") + `, + cols, "`tags_id`") +} diff --git a/pkg/targets/doris/file_data_source.go b/pkg/targets/doris/file_data_source.go new file mode 100644 index 000000000..3d146274b --- /dev/null +++ b/pkg/targets/doris/file_data_source.go @@ -0,0 +1,162 @@ +package doris + +import ( + "bufio" + "strings" + + "github.com/timescale/tsbs/pkg/data" + "github.com/timescale/tsbs/pkg/data/usecases/common" +) + +// scan.PointDecoder interface implementation +type fileDataSource struct { + scanner *bufio.Scanner + //cached headers (should be read only at start of file) + headers *common.GeneratedDataHeaders +} + +// NextItem scan.PointDecoder interface implementation +func (d *fileDataSource) NextItem() data.LoadedPoint { + // Data Point Example + // tags,hostname=host_0,region=eu-west-1,datacenter=eu-west-1b,rack=67,os=Ubuntu16.10,arch=x86,team=NYC,service=7,service_version=0,service_environment=production + // cpu,1451606400000000000,58,2,24,61,22,63,6,44,80,38 + + newPoint := &insertData{} + ok := d.scanner.Scan() + if !ok && d.scanner.Err() == nil { + // nothing scanned & no error = EOF + return data.LoadedPoint{} + } else if !ok { + fatal("scan error: %v", d.scanner.Err()) + return data.LoadedPoint{} + } + + // The first line is a CSV line of tags with the first element being "tags" + // Ex.: + // tags,hostname=host_0,region=eu-west-1,datacenter=eu-west-1b,rack=67,os=Ubuntu16.10,arch=x86,team=NYC,service=7,service_version=0,service_environment=production + parts := strings.SplitN(d.scanner.Text(), ",", 2) // prefix & then rest of line + prefix := parts[0] + if prefix != tagsPrefix { + fatal("data file in invalid format; got %s expected %s", prefix, tagsPrefix) + return data.LoadedPoint{} + } + newPoint.tags = parts[1] + + // Scan again to get the data line + // cpu,1451606400000000000,58,2,24,61,22,63,6,44,80,38 + ok = d.scanner.Scan() + if !ok { + fatal("scan error: %v", d.scanner.Err()) + return data.LoadedPoint{} + } + parts = strings.SplitN(d.scanner.Text(), ",", 2) // prefix & then rest of line + prefix = parts[0] + newPoint.fields = parts[1] + + return data.NewLoadedPoint(&point{ + table: prefix, + row: newPoint, + }) +} + +func (d *fileDataSource) Headers() *common.GeneratedDataHeaders { + if d.headers != nil { + return d.headers + } + // First N lines are header, describing data structure. + // The first line containing tags table name ('tags') followed by list of tags, comma-separated. + // Ex.: tags,hostname,region,datacenter,rack,os,arch,team,service,service_version + // The second through N-1 line containing table name (ex.: 'cpu') followed by list of column names, + // comma-separated. Ex.: cpu,usage_user,usage_system,usage_idle,usage_nice,usage_iowait,usage_irq,usage_softirq + // The last line being blank to separate from the data + // + // Header example: + // tags,hostname,region,datacenter,rack,os,arch,team,service,service_version,service_environment + // cpu,usage_user,usage_system,usage_idle,usage_nice,usage_iowait,usage_irq,usage_softirq,usage_steal,usage_guest,usage_guest_nice + // disk,total,free,used,used_percent,inodes_total,inodes_free,inodes_used + // nginx,accepts,active,handled,reading,requests,waiting,writing + var tags string + var cols []string + i := 0 + for { + var line string + ok := d.scanner.Scan() + if !ok && d.scanner.Err() == nil { // nothing scanned & no error = EOF + fatal("reached EOF, but not enough things scanned") + return nil + } else if !ok { + fatal("scan error: %v", d.scanner.Err()) + return nil + } + if i == 0 { + // read first line - list of tags + tags = d.scanner.Text() + tags = strings.TrimSpace(tags) + } else { + // read the second and further lines - metrics descriptions + line = d.scanner.Text() + line = strings.TrimSpace(line) + if len(line) == 0 { + // empty line - end of header + break + } + // append new table/columns set to the list of tables/columns set + cols = append(cols, line) + } + i++ + } + + // tags content: + //tags,hostname,region,datacenter,rack,os,arch,team,service,service_version,service_environment + // + // Parts would contain + // 0: tags - reserved word - tags mark + // 1: + // N: actual tags + // so we'll use tags[1:] for tags specification + parts := strings.Split(tags, ",") + if parts[0] != "tags" { + fatal("input header in wrong format. got '%s', expected 'tags'", parts[0]) + return nil + } + tagNames, tagTypes := extractTagNamesAndTypes(parts[1:]) + fieldKeys := make(map[string][]string) + // cols content are lines (metrics descriptions) as: + // cpu,usage_user,usage_system,usage_idle,usage_nice,usage_iowait,usage_irq,usage_softirq,usage_steal,usage_guest,usage_guest_nice + // disk,total,free,used,used_percent,inodes_total,inodes_free,inodes_used + // nginx,accepts,active,handled,reading,requests,waiting,writing + // generalised description: + // tableName,fieldName1,...,fieldNameX + for _, colsForMeasure := range cols { + tableSpec := strings.Split(colsForMeasure, ",") + // tableSpec contain + // 0: table name + // 1: table column name 1 + // N: table column name N + + // Ex.: cpu OR disk OR nginx + tableName := tableSpec[0] + fieldKeys[tableName] = tableSpec[1:] + } + d.headers = &common.GeneratedDataHeaders{ + TagKeys: tagNames, + TagTypes: tagTypes, + FieldKeys: fieldKeys, + } + return d.headers +} + +func extractTagNamesAndTypes(tags []string) ([]string, []string) { + tagNames := make([]string, len(tags)) + tagTypes := make([]string, len(tags)) + for i, tagWithType := range tags { + tagAndType := strings.Split(tagWithType, " ") + if len(tagAndType) != 2 { + panic("tag header has invalid format") + } + tagNames[i] = tagAndType[0] + tagTypes[i] = tagAndType[1] + } + + return tagNames, tagTypes +} diff --git a/pkg/targets/doris/implemented_target.go b/pkg/targets/doris/implemented_target.go new file mode 100644 index 000000000..19b9e867c --- /dev/null +++ b/pkg/targets/doris/implemented_target.go @@ -0,0 +1,38 @@ +package doris + +import ( + "github.com/blagojts/viper" + "github.com/spf13/pflag" + "github.com/timescale/tsbs/pkg/data/serialize" + "github.com/timescale/tsbs/pkg/data/source" + "github.com/timescale/tsbs/pkg/targets" + "github.com/timescale/tsbs/pkg/targets/constants" + "github.com/timescale/tsbs/pkg/targets/timescaledb" +) + +func NewTarget() targets.ImplementedTarget { + return &dorisTarget{} +} + +type dorisTarget struct{} + +func (c dorisTarget) Benchmark(string, *source.DataSourceConfig, *viper.Viper) (targets.Benchmark, error) { + panic("you must implement me") +} + +func (c dorisTarget) Serializer() serialize.PointSerializer { + return ×caledb.Serializer{} +} + +func (c dorisTarget) TargetSpecificFlags(flagPrefix string, flagSet *pflag.FlagSet) { + flagSet.String(flagPrefix+"host", "localhost", "Hostname of Doris instance") + flagSet.String(flagPrefix+"port", "9030", "Port of Doris's mysql client") + flagSet.String(flagPrefix+"user", "root", "User to connect to Doris as(default: root)") + flagSet.String(flagPrefix+"password", "", "Password for user connecting to Doris(default: null)") + flagSet.Bool(flagPrefix+"log-batches", false, "Whether to time individual batches.") + flagSet.Int(flagPrefix+"debug", 0, "Debug printing (choices: 0, 1, 2). (default 0)") +} + +func (c dorisTarget) TargetName() string { + return constants.FormatDoris +} diff --git a/pkg/targets/doris/indexer.go b/pkg/targets/doris/indexer.go new file mode 100644 index 000000000..9f56f75e7 --- /dev/null +++ b/pkg/targets/doris/indexer.go @@ -0,0 +1,22 @@ +package doris + +import ( + "hash/fnv" + "strings" + + "github.com/timescale/tsbs/pkg/data" +) + +// hostnameIndexer is used to consistently send the same hostnames to the same queue +type hostnameIndexer struct { + partitions uint +} + +// GetIndex scan.PointIndexer interface implementation +func (i *hostnameIndexer) GetIndex(item data.LoadedPoint) uint { + p := item.Data.(*point) + hostname := strings.SplitN(p.row.tags, ",", 2)[0] + h := fnv.New32a() + h.Write([]byte(hostname)) + return uint(h.Sum32()) % i.partitions +} diff --git a/pkg/targets/doris/processor.go b/pkg/targets/doris/processor.go new file mode 100644 index 000000000..c638bef69 --- /dev/null +++ b/pkg/targets/doris/processor.go @@ -0,0 +1,373 @@ +package doris + +import ( + "fmt" + "github.com/jmoiron/sqlx" + "github.com/timescale/tsbs/pkg/targets" + "strconv" + "strings" + "sync" + "time" +) + +// load.Processor interface implementation +type processor struct { + db *sqlx.DB + csi *syncCSI + conf *DorisConfig +} + +// Init load.Processor interface implementation +func (p *processor) Init(workerNum int, doLoad, hashWorkers bool) { + if doLoad { + p.db = sqlx.MustConnect(dbType, getConnectString(p.conf, true)) + if hashWorkers { + p.csi = newSyncCSI() + } else { + p.csi = globalSyncCSI + } + } +} + +// Close load.ProcessorCloser interface implementation +func (p *processor) Close(doLoad bool) { + if doLoad { + err := p.db.Close() + if err != nil { + return + } + } +} + +// ProcessBatch load.Processor interface implementation +func (p *processor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uint64) { + batches := b.(*tableArr) + rowCnt := 0 + metricCnt := uint64(0) + + for tableName, rows := range batches.m { + rowCnt += len(rows) + if doLoad { + start := time.Now() + metricCnt += p.processCSI(tableName, rows) + if p.conf.LogBatches { + now := time.Now() + took := now.Sub(start) + batchSize := len(rows) + fmt.Printf("BATCH: batchsize %d row rate %f/sec (took %v)\n", batchSize, float64(batchSize)/took.Seconds(), took) + } + } + } + batches.m = map[string][]*insertData{} + batches.cnt = 0 + + return metricCnt, uint64(rowCnt) +} + +func newSyncCSI() *syncCSI { + return &syncCSI{ + m: make(map[string]int64), + mutex: &sync.RWMutex{}, + } +} + +type syncCSI struct { + // Map hostname to tags.id for this host + m map[string]int64 + mutex *sync.RWMutex +} + +// globalSyncCSI is used when data is not hashed by some function to a worker consistently so +// therefore all workers need to know about the same map from hostname -> tags_id +var globalSyncCSI = newSyncCSI() + +// Process part of incoming data - insert into tables +func (p *processor) processCSI(tableName string, rows []*insertData) uint64 { + // start doris group commit suit + dorisGroupCommitSql := "set group_commit = async_mode;" + _, err := p.db.Exec(dorisGroupCommitSql) + if err != nil { + panic(err) + } + tagRows := make([][]string, 0, len(rows)) + dataRows := make([][]interface{}, 0, len(rows)) + ret := uint64(0) + commonTagsLen := len(tableCols["tags"]) + + colLen := len(tableCols[tableName]) + 2 + if p.conf.InTableTag { + colLen++ + } + + for _, row := range rows { + // Split the tags into individual common tags and + // an extra bit leftover for non-common tags that need to be added separately. + // For each of the common tags, remove everything after = in the form