Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion plugins/inputs/exec/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ plugin ordering. See [CONFIGURATION.md][CONFIGURATION.md] for more details.
```toml @sample.conf
# Read metrics from one or more commands that can output to stdout
[[inputs.exec]]
## Commands array
## Commands array with each command being an array of the command and its
## flags, e.g.
## commands = [['C:\Windows\System32\w32tm.exe', "/stripchart", "/computer:SERVER1", "/dataonly", "/samples:3"]]
## or
## commands = [["echo", "value 1"], ["echo", "value 2"]]
commands = []

## Environment variables
Expand Down
26 changes: 0 additions & 26 deletions plugins/inputs/exec/dev/telegraf.conf

This file was deleted.

103 changes: 80 additions & 23 deletions plugins/inputs/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@ import (
"bufio"
"bytes"
_ "embed"
"errors"
"fmt"
"path/filepath"
"strconv"
"strings"
"sync"
"time"

"github.com/kballard/go-shellquote"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
Expand All @@ -27,7 +31,7 @@ var once sync.Once
const maxStderrBytes int = 512

type Exec struct {
Commands []string `toml:"commands"`
Commands []interface{} `toml:"commands"`
Command string `toml:"command"`
Environment []string `toml:"environment"`
IgnoreError bool `toml:"ignore_error"`
Expand All @@ -38,6 +42,7 @@ type Exec struct {
parser telegraf.Parser

runner runner
cmds [][]string

// Allow post-processing of command exit codes
exitCodeHandler exitCodeHandlerFunc
Expand All @@ -47,7 +52,7 @@ type Exec struct {
type exitCodeHandlerFunc func([]telegraf.Metric, error, []byte) []telegraf.Metric

type runner interface {
run(string) ([]byte, []byte, error)
run([]string) ([]byte, []byte, error)
}

type commandRunner struct {
Expand All @@ -66,6 +71,64 @@ func (e *Exec) Init() error {
e.Commands = append(e.Commands, e.Command)
}

if len(e.Commands) == 0 {
return errors.New("no command specified")
}

e.cmds = make([][]string, 0, len(e.Commands))
for _, raw := range e.Commands {
switch c := raw.(type) {
case string:
// Legacy single string command setting
if c == "" {
return errors.New("command string cannot be empty")
}

// Convert the legacy command string to a string list and output
// deprecation notice
cmd, err := shellquote.Split(c)
if err != nil {
return fmt.Errorf("unable to parse command %q: %w", c, err)
}
if len(cmd) == 0 {
return errors.New("command cannot be empty")
}
// Create the corresponding command in the new syntax to ease migration
suggestion := make([]string, 0, len(cmd))
for _, a := range cmd {
suggestion = append(suggestion, strconv.Quote(a))
}
config.PrintOptionValueDeprecationNotice("inputs.exec", "command", c, telegraf.DeprecationInfo{
Since: "1.39.0",
RemovalIn: "1.45.0",
Notice: fmt.Sprintf("Use array syntax instead: [%s]", strings.Join(suggestion, ",")),
})
e.cmds = append(e.cmds, cmd)
case []string:
if len(c) == 0 {
return errors.New("command cannot be empty")
}
e.cmds = append(e.cmds, c)
case []interface{}:
if len(c) == 0 {
return errors.New("command cannot be empty")
}

// Convert the entries to a string list
converted := make([]string, 0, len(c))
for _, r := range c {
v, ok := r.(string)
if !ok {
return fmt.Errorf("command %v has invalid entry %v of type %T, expected string", raw, r, r)
}
converted = append(converted, v)
}
e.cmds = append(e.cmds, converted)
default:
return fmt.Errorf("command %v has invalid type %T, expected string list", raw, raw)
}
}

e.runner = &commandRunner{
environment: e.Environment,
timeout: time.Duration(e.Timeout),
Expand All @@ -92,56 +155,50 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error {
commands := e.updateRunners()

var wg sync.WaitGroup
for _, cmd := range commands {
for _, item := range commands {
wg.Add(1)

go func(c string) {
go func(c []string) {
defer wg.Done()
acc.AddError(e.processCommand(acc, c))
}(cmd)
}(item)
}
wg.Wait()
return nil
}

func (e *Exec) updateRunners() []string {
commands := make([]string, 0, len(e.Commands))
for _, pattern := range e.Commands {
if pattern == "" {
continue
}

func (e *Exec) updateRunners() [][]string {
commands := make([][]string, 0, len(e.cmds))
for _, cmd := range e.cmds {
// Try to expand globbing expressions
cmd, args, found := strings.Cut(pattern, " ")
matches, err := filepath.Glob(cmd)
matches, err := filepath.Glob(cmd[0])
if err != nil {
e.Log.Errorf("Matching command %q failed: %v", cmd, err)
e.Log.Errorf("Matching command %q failed: %v", cmd[0], err)
continue
}

if len(matches) == 0 {
// There were no matches with the glob pattern, so let's assume
// the command is in PATH and just run it as it is
commands = append(commands, pattern)
commands = append(commands, cmd)
} else {
// There were matches, so we'll append each match together with
// the arguments to the commands slice
for _, match := range matches {
if found {
match += " " + args
}
commands = append(commands, match)
expanded := make([]string, 0, len(cmd))
expanded = append(expanded, match)
expanded = append(expanded, cmd[1:]...)
commands = append(commands, expanded)
}
}
}

return commands
}

func (e *Exec) processCommand(acc telegraf.Accumulator, cmd string) error {
func (e *Exec) processCommand(acc telegraf.Accumulator, cmd []string) error {
out, errBuf, runErr := e.runner.run(cmd)
if !e.IgnoreError && !e.parseDespiteError && runErr != nil {
return fmt.Errorf("exec: %w for command %q: %s", runErr, cmd, string(errBuf))
return fmt.Errorf("exec: %w for command %q: %s", runErr, strings.Join(cmd, " "), string(errBuf))
}

// Log output in stderr
Expand Down
Loading
Loading