diff --git a/plugins/inputs/exec/README.md b/plugins/inputs/exec/README.md index c07e37e2a3329..70114e65add5d 100644 --- a/plugins/inputs/exec/README.md +++ b/plugins/inputs/exec/README.md @@ -23,7 +23,11 @@ plugin ordering. See [CONFIGURATION.md][CONFIGURATION.md] for more details. ```toml @sample.conf # Read metrics from one or more commands that can output to stdout [[inputs.exec]] - ## Commands array + ## Commands array with each command being an array of the command and its + ## flags, e.g. + ## commands = [['C:\Windows\System32\w32tm.exe', "/stripchart", "/computer:SERVER1", "/dataonly", "/samples:3"]] + ## or + ## commands = [["echo", "value 1"], ["echo", "value 2"]] commands = [] ## Environment variables diff --git a/plugins/inputs/exec/dev/telegraf.conf b/plugins/inputs/exec/dev/telegraf.conf deleted file mode 100644 index 04433410ed5ec..0000000000000 --- a/plugins/inputs/exec/dev/telegraf.conf +++ /dev/null @@ -1,26 +0,0 @@ -[agent] - interval="1s" - flush_interval="1s" - -[[inputs.exec]] - timeout = "1s" - data_format = "influx" - commands = [ - "echo 'deal,computer_name=hosta message=\"stuff\" 1530654676316265790'", - "echo 'deal,computer_name=hostb message=\"stuff\" 1530654676316265790'", - ] - -[[processors.regex]] - [[processors.regex.tags]] - key = "computer_name" - pattern = "^(.*?)a$" - replacement = "${1}" - result_key = "server_name" - [[processors.regex.tags]] - key = "computer_name" - pattern = "^(.*?)b$" - replacement = "${1}" - result_key = "server_name" - -[[outputs.file]] - files = ["stdout"] diff --git a/plugins/inputs/exec/exec.go b/plugins/inputs/exec/exec.go index 36a7c22f88fde..4a73e651178c6 100644 --- a/plugins/inputs/exec/exec.go +++ b/plugins/inputs/exec/exec.go @@ -5,12 +5,16 @@ import ( "bufio" "bytes" _ "embed" + "errors" "fmt" "path/filepath" + "strconv" "strings" "sync" "time" + "github.com/kballard/go-shellquote" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" @@ -27,7 +31,7 @@ var once sync.Once const maxStderrBytes int = 512 type Exec struct { - Commands []string `toml:"commands"` + Commands []interface{} `toml:"commands"` Command string `toml:"command"` Environment []string `toml:"environment"` IgnoreError bool `toml:"ignore_error"` @@ -38,6 +42,7 @@ type Exec struct { parser telegraf.Parser runner runner + cmds [][]string // Allow post-processing of command exit codes exitCodeHandler exitCodeHandlerFunc @@ -47,7 +52,7 @@ type Exec struct { type exitCodeHandlerFunc func([]telegraf.Metric, error, []byte) []telegraf.Metric type runner interface { - run(string) ([]byte, []byte, error) + run([]string) ([]byte, []byte, error) } type commandRunner struct { @@ -66,6 +71,64 @@ func (e *Exec) Init() error { e.Commands = append(e.Commands, e.Command) } + if len(e.Commands) == 0 { + return errors.New("no command specified") + } + + e.cmds = make([][]string, 0, len(e.Commands)) + for _, raw := range e.Commands { + switch c := raw.(type) { + case string: + // Legacy single string command setting + if c == "" { + return errors.New("command string cannot be empty") + } + + // Convert the legacy command string to a string list and output + // deprecation notice + cmd, err := shellquote.Split(c) + if err != nil { + return fmt.Errorf("unable to parse command %q: %w", c, err) + } + if len(cmd) == 0 { + return errors.New("command cannot be empty") + } + // Create the corresponding command in the new syntax to ease migration + suggestion := make([]string, 0, len(cmd)) + for _, a := range cmd { + suggestion = append(suggestion, strconv.Quote(a)) + } + config.PrintOptionValueDeprecationNotice("inputs.exec", "command", c, telegraf.DeprecationInfo{ + Since: "1.39.0", + RemovalIn: "1.45.0", + Notice: fmt.Sprintf("Use array syntax instead: [%s]", strings.Join(suggestion, ",")), + }) + e.cmds = append(e.cmds, cmd) + case []string: + if len(c) == 0 { + return errors.New("command cannot be empty") + } + e.cmds = append(e.cmds, c) + case []interface{}: + if len(c) == 0 { + return errors.New("command cannot be empty") + } + + // Convert the entries to a string list + converted := make([]string, 0, len(c)) + for _, r := range c { + v, ok := r.(string) + if !ok { + return fmt.Errorf("command %v has invalid entry %v of type %T, expected string", raw, r, r) + } + converted = append(converted, v) + } + e.cmds = append(e.cmds, converted) + default: + return fmt.Errorf("command %v has invalid type %T, expected string list", raw, raw) + } + } + e.runner = &commandRunner{ environment: e.Environment, timeout: time.Duration(e.Timeout), @@ -92,45 +155,39 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error { commands := e.updateRunners() var wg sync.WaitGroup - for _, cmd := range commands { + for _, item := range commands { wg.Add(1) - - go func(c string) { + go func(c []string) { defer wg.Done() acc.AddError(e.processCommand(acc, c)) - }(cmd) + }(item) } wg.Wait() return nil } -func (e *Exec) updateRunners() []string { - commands := make([]string, 0, len(e.Commands)) - for _, pattern := range e.Commands { - if pattern == "" { - continue - } - +func (e *Exec) updateRunners() [][]string { + commands := make([][]string, 0, len(e.cmds)) + for _, cmd := range e.cmds { // Try to expand globbing expressions - cmd, args, found := strings.Cut(pattern, " ") - matches, err := filepath.Glob(cmd) + matches, err := filepath.Glob(cmd[0]) if err != nil { - e.Log.Errorf("Matching command %q failed: %v", cmd, err) + e.Log.Errorf("Matching command %q failed: %v", cmd[0], err) continue } if len(matches) == 0 { // There were no matches with the glob pattern, so let's assume // the command is in PATH and just run it as it is - commands = append(commands, pattern) + commands = append(commands, cmd) } else { // There were matches, so we'll append each match together with // the arguments to the commands slice for _, match := range matches { - if found { - match += " " + args - } - commands = append(commands, match) + expanded := make([]string, 0, len(cmd)) + expanded = append(expanded, match) + expanded = append(expanded, cmd[1:]...) + commands = append(commands, expanded) } } } @@ -138,10 +195,10 @@ func (e *Exec) updateRunners() []string { return commands } -func (e *Exec) processCommand(acc telegraf.Accumulator, cmd string) error { +func (e *Exec) processCommand(acc telegraf.Accumulator, cmd []string) error { out, errBuf, runErr := e.runner.run(cmd) if !e.IgnoreError && !e.parseDespiteError && runErr != nil { - return fmt.Errorf("exec: %w for command %q: %s", runErr, cmd, string(errBuf)) + return fmt.Errorf("exec: %w for command %q: %s", runErr, strings.Join(cmd, " "), string(errBuf)) } // Log output in stderr diff --git a/plugins/inputs/exec/exec_test.go b/plugins/inputs/exec/exec_test.go index b662fd59aff96..ae8a29c81ed54 100644 --- a/plugins/inputs/exec/exec_test.go +++ b/plugins/inputs/exec/exec_test.go @@ -44,14 +44,56 @@ const malformedJSON = ` "status": "green", ` -type runnerMock struct { - out []byte - errout []byte - err error -} +func TestInitFail(t *testing.T) { + tests := []struct { + name string + config string + expected string + }{ + { + name: "no command", + config: ` + [[inputs.exec]] + commands = [] + `, + expected: "no command specified", + }, + { + name: "empty command", + config: ` + [[inputs.exec]] + commands = [[]] + `, + expected: "command cannot be empty", + }, + { + name: "invalid type in command", + config: ` + [[inputs.exec]] + commands = [[21, 42]] + `, + expected: "command [21 42] has invalid entry 21 of type int64", + }, + } -func (r runnerMock) run(string) (out, errout []byte, err error) { - return r.out, r.errout, r.err + // Register the plugin + inputs.Add("exec", func() telegraf.Input { + return &Exec{ + Timeout: config.Duration(5 * time.Second), + Log: testutil.Logger{}, + } + }) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Setup the plugin + cfg := config.NewConfig() + require.NoError(t, cfg.LoadConfigData([]byte(tt.config), config.EmptySourcePath)) + require.Len(t, cfg.Inputs, 1) + plugin := cfg.Inputs[0] + require.ErrorContains(t, plugin.Init(), tt.expected) + }) + } } func TestExec(t *testing.T) { @@ -61,7 +103,7 @@ func TestExec(t *testing.T) { // Setup plugin plugin := &Exec{ - Commands: []string{"testcommand arg1"}, + Commands: []interface{}{[]string{"testcommand", "arg1"}}, Log: testutil.Logger{}, } plugin.SetParser(parser) @@ -99,7 +141,7 @@ func TestExecMalformed(t *testing.T) { // Setup plugin plugin := &Exec{ - Commands: []string{"badcommand arg1"}, + Commands: []interface{}{[]string{"badcommand", "arg1"}}, Log: testutil.Logger{}, } plugin.SetParser(parser) @@ -119,7 +161,7 @@ func TestCommandError(t *testing.T) { // Setup plugin plugin := &Exec{ - Commands: []string{"badcommand"}, + Commands: []interface{}{[]string{"badcommand"}}, Log: testutil.Logger{}, } plugin.SetParser(parser) @@ -139,7 +181,7 @@ func TestCommandIgnoreError(t *testing.T) { // Setup plugin plugin := &Exec{ - Commands: []string{"badcommand"}, + Commands: []interface{}{[]string{"badcommand"}}, IgnoreError: true, Log: testutil.Logger{}, } @@ -185,7 +227,7 @@ func TestExecCommandWithGlob(t *testing.T) { // Setup plugin plugin := &Exec{ - Commands: []string{"/bin/ech[o] metric_value"}, + Commands: []interface{}{[]string{"/bin/ech[o]", "metric_value"}}, Timeout: config.Duration(5 * time.Second), Log: testutil.Logger{}, } @@ -219,7 +261,7 @@ func TestExecCommandWithoutGlob(t *testing.T) { // Setup plugin plugin := &Exec{ - Commands: []string{"/bin/echo metric_value"}, + Commands: []interface{}{[]string{"/bin/echo", "metric_value"}}, Timeout: config.Duration(5 * time.Second), Log: testutil.Logger{}, } @@ -253,7 +295,7 @@ func TestExecCommandWithoutGlobAndPath(t *testing.T) { // Setup plugin plugin := &Exec{ - Commands: []string{"echo metric_value"}, + Commands: []interface{}{[]string{"echo", "metric_value"}}, Timeout: config.Duration(5 * time.Second), Log: testutil.Logger{}, } @@ -287,7 +329,7 @@ func TestExecCommandWithEnv(t *testing.T) { // Setup plugin plugin := &Exec{ - Commands: []string{"/bin/sh -c 'echo ${METRIC_NAME}'"}, + Commands: []interface{}{[]string{"/bin/sh", "-c", "echo ${METRIC_NAME}"}}, Environment: []string{"METRIC_NAME=metric_value"}, Timeout: config.Duration(5 * time.Second), Log: testutil.Logger{}, @@ -312,6 +354,74 @@ func TestExecCommandWithEnv(t *testing.T) { testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) } +func TestDeprecatedSingleCommand(t *testing.T) { + // Setup parser + parser := value.Parser{ + MetricName: "metric", + DataType: "string", + } + require.NoError(t, parser.Init()) + + // Setup plugin + plugin := &Exec{ + Command: "/bin/sh -c 'echo metric_value'", + Timeout: config.Duration(5 * time.Second), + Log: testutil.Logger{}, + } + plugin.SetParser(&parser) + require.NoError(t, plugin.Init()) + + // Gather the metrics and check the result + var acc testutil.Accumulator + require.NoError(t, acc.GatherError(plugin.Gather)) + + expected := []telegraf.Metric{ + metric.New( + "metric", + map[string]string{}, + map[string]interface{}{ + "value": "metric_value", + }, + time.Unix(0, 0), + ), + } + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) +} + +func TestDeprecatedStringBasedCommands(t *testing.T) { + // Setup parser + parser := value.Parser{ + MetricName: "metric", + DataType: "string", + } + require.NoError(t, parser.Init()) + + // Setup plugin + plugin := &Exec{ + Commands: []interface{}{"/bin/sh -c 'echo metric_value'"}, + Timeout: config.Duration(5 * time.Second), + Log: testutil.Logger{}, + } + plugin.SetParser(&parser) + require.NoError(t, plugin.Init()) + + // Gather the metrics and check the result + var acc testutil.Accumulator + require.NoError(t, acc.GatherError(plugin.Gather)) + + expected := []telegraf.Metric{ + metric.New( + "metric", + map[string]string{}, + map[string]interface{}{ + "value": "metric_value", + }, + time.Unix(0, 0), + ), + } + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) +} + func TestStderrLogging(t *testing.T) { tests := []struct { name string @@ -432,7 +542,7 @@ T! very detailed details // Setup plugin plugin := &Exec{ - Commands: []string{"echo 42"}, + Commands: []interface{}{[]string{"echo", "42"}}, LogStdErr: true, Log: logger, } @@ -518,7 +628,7 @@ func TestCSVBehavior(t *testing.T) { // Setup the plugin plugin := &Exec{ - Commands: []string{"echo \"a,b\n1,2\n3,4\""}, + Commands: []interface{}{[]string{"echo", "a,b\n1,2\n3,4"}}, Timeout: config.Duration(5 * time.Second), Log: testutil.Logger{}, } @@ -585,6 +695,154 @@ func TestCSVBehavior(t *testing.T) { } func TestCases(t *testing.T) { + tests := []struct { + name string + config string + expected []telegraf.Metric + }{ + { + name: "deprecated single command", + config: ` + [[inputs.exec]] + command = "echo \"a,b\n1,2\n3,4\"" + data_format = "csv" + csv_header_row_count = 1 + `, + expected: []telegraf.Metric{ + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(1), + "b": int64(2), + }, + time.Unix(0, 1), + ), + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(3), + "b": int64(4), + }, + time.Unix(0, 2), + ), + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(1), + "b": int64(2), + }, + time.Unix(0, 3), + ), + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(3), + "b": int64(4), + }, + time.Unix(0, 4), + ), + }, + }, + { + name: "deprecated commands string", + config: ` + [[inputs.exec]] + commands = ["echo \"a,b\n1,2\n3,4\""] + data_format = "csv" + csv_header_row_count = 1 + `, + expected: []telegraf.Metric{ + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(1), + "b": int64(2), + }, + time.Unix(0, 1), + ), + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(3), + "b": int64(4), + }, + time.Unix(0, 2), + ), + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(1), + "b": int64(2), + }, + time.Unix(0, 3), + ), + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(3), + "b": int64(4), + }, + time.Unix(0, 4), + ), + }, + }, + { + name: "native", + config: ` + [[inputs.exec]] + commands = [["echo", "a,b\n1,2\n3,4"]] + data_format = "csv" + csv_header_row_count = 1 + `, + expected: []telegraf.Metric{ + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(1), + "b": int64(2), + }, + time.Unix(0, 1), + ), + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(3), + "b": int64(4), + }, + time.Unix(0, 2), + ), + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(1), + "b": int64(2), + }, + time.Unix(0, 3), + ), + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(3), + "b": int64(4), + }, + time.Unix(0, 4), + ), + }, + }, + } + // Register the plugin inputs.Add("exec", func() telegraf.Input { return &Exec{ @@ -593,73 +851,43 @@ func TestCases(t *testing.T) { } }) - // Setup the plugin - cfg := config.NewConfig() - require.NoError(t, cfg.LoadConfigData([]byte(` - [[inputs.exec]] - commands = [ "echo \"a,b\n1,2\n3,4\"" ] - data_format = "csv" - csv_header_row_count = 1 -`), config.EmptySourcePath)) - require.Len(t, cfg.Inputs, 1) - plugin := cfg.Inputs[0] - require.NoError(t, plugin.Init()) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Setup the plugin + cfg := config.NewConfig() + require.NoError(t, cfg.LoadConfigData([]byte(tt.config), config.EmptySourcePath)) + require.Len(t, cfg.Inputs, 1) + plugin := cfg.Inputs[0] + require.NoError(t, plugin.Init()) - expected := []telegraf.Metric{ - metric.New( - "exec", - map[string]string{}, - map[string]interface{}{ - "a": int64(1), - "b": int64(2), - }, - time.Unix(0, 1), - ), - metric.New( - "exec", - map[string]string{}, - map[string]interface{}{ - "a": int64(3), - "b": int64(4), - }, - time.Unix(0, 2), - ), - metric.New( - "exec", - map[string]string{}, - map[string]interface{}{ - "a": int64(1), - "b": int64(2), - }, - time.Unix(0, 3), - ), - metric.New( - "exec", - map[string]string{}, - map[string]interface{}{ - "a": int64(3), - "b": int64(4), - }, - time.Unix(0, 4), - ), + // Run gather twice to collect metrics + var acc testutil.Accumulator + require.NoError(t, plugin.Gather(&acc)) + require.NoError(t, plugin.Gather(&acc)) + + require.Eventuallyf(t, func() bool { + acc.Lock() + defer acc.Unlock() + return acc.NMetrics() >= uint64(len(tt.expected)) + }, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(tt.expected), acc.NMetrics()) + + // Check the result + options := []cmp.Option{ + testutil.SortMetrics(), + testutil.IgnoreTime(), + } + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, tt.expected, actual, options...) + }) } +} - var acc testutil.Accumulator - // Run gather once - require.NoError(t, plugin.Gather(&acc)) - // Run gather a second time - require.NoError(t, plugin.Gather(&acc)) - require.Eventuallyf(t, func() bool { - acc.Lock() - defer acc.Unlock() - return acc.NMetrics() >= uint64(len(expected)) - }, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics()) +type runnerMock struct { + out []byte + errout []byte + err error +} - // Check the result - options := []cmp.Option{ - testutil.SortMetrics(), - testutil.IgnoreTime(), - } - actual := acc.GetTelegrafMetrics() - testutil.RequireMetricsEqual(t, expected, actual, options...) +func (r runnerMock) run([]string) (out, errout []byte, err error) { + return r.out, r.errout, r.err } diff --git a/plugins/inputs/exec/run_notwindows.go b/plugins/inputs/exec/run_notwindows.go index 1457f0a201518..8d6de0f6208ad 100644 --- a/plugins/inputs/exec/run_notwindows.go +++ b/plugins/inputs/exec/run_notwindows.go @@ -4,22 +4,14 @@ package exec import ( "bytes" - "fmt" "os" "os/exec" "syscall" - "github.com/kballard/go-shellquote" - "github.com/influxdata/telegraf/internal" ) -func (c *commandRunner) run(command string) (out, errout []byte, err error) { - splitCmd, err := shellquote.Split(command) - if err != nil || len(splitCmd) == 0 { - return nil, nil, fmt.Errorf("exec: unable to parse command %q: %w", command, err) - } - +func (c *commandRunner) run(splitCmd []string) (out, errout []byte, err error) { cmd := exec.Command(splitCmd[0], splitCmd[1:]...) cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} diff --git a/plugins/inputs/exec/run_windows.go b/plugins/inputs/exec/run_windows.go index c07146fe9353a..96c745ba2d485 100644 --- a/plugins/inputs/exec/run_windows.go +++ b/plugins/inputs/exec/run_windows.go @@ -5,23 +5,15 @@ package exec import ( "bytes" "errors" - "fmt" "io" "os" "os/exec" "syscall" - "github.com/kballard/go-shellquote" - "github.com/influxdata/telegraf/internal" ) -func (c *commandRunner) run(command string) (out, errout []byte, err error) { - splitCmd, err := shellquote.Split(command) - if err != nil || len(splitCmd) == 0 { - return nil, nil, fmt.Errorf("exec: unable to parse command: %w", err) - } - +func (c *commandRunner) run(splitCmd []string) (out, errout []byte, err error) { cmd := exec.Command(splitCmd[0], splitCmd[1:]...) cmd.SysProcAttr = &syscall.SysProcAttr{ CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP, diff --git a/plugins/inputs/exec/sample.conf b/plugins/inputs/exec/sample.conf index 7e9d92390719e..09612901c871e 100644 --- a/plugins/inputs/exec/sample.conf +++ b/plugins/inputs/exec/sample.conf @@ -1,6 +1,10 @@ # Read metrics from one or more commands that can output to stdout [[inputs.exec]] - ## Commands array + ## Commands array with each command being an array of the command and its + ## flags, e.g. + ## commands = [['C:\Windows\System32\w32tm.exe', "/stripchart", "/computer:SERVER1", "/dataonly", "/samples:3"]] + ## or + ## commands = [["echo", "value 1"], ["echo", "value 2"]] commands = [] ## Environment variables