-
-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathshell.py
More file actions
204 lines (169 loc) · 8.11 KB
/
Copy pathshell.py
File metadata and controls
204 lines (169 loc) · 8.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
#! /usr/bin/env python3
# -*- coding: utf-8; py-indent-offset: 4 -*-
#
# Author: Linuxfabrik GmbH, Zurich, Switzerland
# Contact: info (at) linuxfabrik (dot) ch
# https://www.linuxfabrik.ch/
# License: The Unlicense, see LICENSE file.
# https://github.com/Linuxfabrik/monitoring-plugins/blob/main/CONTRIBUTING.rst
"""Communicates with the Shell on Linux and Windows."""
__author__ = 'Linuxfabrik GmbH, Zurich/Switzerland'
__version__ = '2026061202'
import os
import shutil
import subprocess # nosec B404 - this library is the subprocess helper
from . import txt
RETC_SSHPASS = {
1: 'Invalid command line argument',
2: 'Conflicting arguments given',
3: 'General runtime error',
4: 'Unrecognized response from ssh (parse error)',
5: 'Invalid/incorrect password',
6: 'Host public key is unknown. sshpass exits without confirming the new key.',
7: 'IP public key changed. sshpass exits without confirming the new key.',
}
def shell_exec(cmd, env=None, stdin='', cwd=None, timeout=None, lc_all='C'):
"""
Execute a command in a subprocess, given as a list of arguments (argv).
The command is always run with `shell=False`, so no shell is involved: arguments are passed
verbatim to the executable and are never interpreted for pipes (`|`), redirection, globbing,
variable expansion or any other shell metacharacter. This makes the helper safe to call with
untrusted argument values: a value like `|reboot|` or `; rm -rf /` ends up as one literal
argument, not as a command.
Because there is no shell, `cmd` must be a list (argv). Passing a string raises `TypeError`.
Build the command as a list, for example `['df', '--human-readable', mountpoint]`, where
`mountpoint` may be untrusted. A genuine pipeline (`a | b`) has to be expressed in code by
running the stages and connecting them explicitly, or by post-processing the first command's
output; it can no longer be expressed as a shell string.
### Parameters
- **cmd** (`list`):
The command to execute, as a list of arguments (argv), e.g. `['ls', '-l', '/tmp']`.
The first element is the program, the rest are its arguments.
- **env** (`dict`, optional):
A dictionary of environment variables to merge with the current OS environment.
Defaults to the current environment.
- **stdin** (`str`, optional):
A string to pass as standard input to the command. Defaults to an empty string.
- **cwd** (`str`, optional):
Working directory in which to execute the command. Defaults to None (current directory).
- **timeout** (`int` or `float`, optional):
Maximum time (in seconds) to allow the command to run. If exceeded, the process is
terminated. Defaults to None (no timeout).
- **lc_all** (`str`, optional):
Value to set for the `LC_ALL` environment variable, forcing command output locale.
Defaults to `'C'` (POSIX "C" locale, i.e., English).
### Returns
- **tuple**:
- On success:
`(True, (stdout, stderr, return_code))`
- **stdout** (`str`): Standard output of the command (decoded to text).
- **stderr** (`str`): Standard error of the command (decoded to text).
- **return_code** (`int`): Exit status of the command.
- On failure:
`(False, error_message)` — a string describing the error.
### Notes
- The environment is merged with `env` and always includes `LC_ALL=<lc_all>`, forcing output
to the specified locale.
- Exceptions such as `OSError`, `ValueError`, or other execution errors during process
creation are caught and reported as `(False, <error message>)`.
- If the process exceeds the specified `timeout`, it is killed, and the function returns
`(False, "Timeout after <timeout> seconds.")`.
"""
if not isinstance(cmd, (list, tuple)):
raise TypeError(
'shell_exec() requires cmd as a list of arguments '
f'(for example ["df", "-h"]), got {type(cmd).__name__}.'
)
env = {**os.environ.copy(), **(env or {})}
env['LC_ALL'] = lc_all
try:
p = subprocess.Popen( # nosec B603 - shell=False, cmd is an argv list, no shell interpretation
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
shell=False,
cwd=cwd,
)
except (OSError, ValueError, Exception) as e:
return False, f'Error "{e}" while calling command "{cmd}"'
try:
stdout, stderr = p.communicate(
input=txt.to_bytes(stdin) if stdin else None,
timeout=timeout,
)
except subprocess.TimeoutExpired:
p.kill()
p.communicate()
return False, f'Timeout after {timeout} seconds.'
# Decode the captured bytes. On Windows, console programs (query, schtasks,
# w32tm, ...) write their piped output in the OEM / console output code page
# (e.g. cp437, cp850), not UTF-8 and not the ANSI code page, so a username
# like "müller" would otherwise be mangled (Linuxfabrik/monitoring-plugins#681).
if os.name == 'nt':
encoding, errors = _windows_output_encoding(), 'replace'
else:
encoding, errors = 'utf-8', 'surrogateescape'
return True, (
txt.to_text(stdout, encoding=encoding, errors=errors),
txt.to_text(stderr, encoding=encoding, errors=errors),
p.returncode,
)
def _windows_output_encoding():
"""
Best-effort code page name for decoding a Windows subprocess's piped output.
Console programs emit their pipe output in the OEM / console output code page, not UTF-8 and
not the ANSI code page (`chcp 65001` has no effect on a pipe; see PEP 528). Prefer the console
output code page, fall back to the OEM code page when the process has no console (for example
when run headless by a monitoring agent), and fall back to UTF-8 if neither is available.
### Returns
- **str**: A Python codec name such as `'cp437'`, or `'utf-8'` as a last resort.
"""
try:
import ctypes
kernel32 = ctypes.windll.kernel32
code_page = kernel32.GetConsoleOutputCP() or kernel32.GetOEMCP()
if code_page:
return f'cp{code_page}'
except (OSError, AttributeError, ValueError):
pass
return 'utf-8'
def safe_cli_value(value, name='value'):
"""
Reject a value that a called program could misinterpret as an option.
Building a command as an argument list (argv) and running it with `shell_exec()` prevents
shell injection, but it does not stop a value that starts with `-` from being picked up as an
*option* by the program being run, for example an ssh destination `-oProxyCommand=...` (remote
code execution) or a `ping` target `-f` (flood). Use this for values that reach a command as a
positional argument or as a command target, where option-style values have no legitimate
meaning. Values that are bound to an explicit option (`--name=<value>` or `-H <value>`) do not
need this guard.
### Parameters
- **value** (`any`): The value to check. Non-string values pass through unchanged.
- **name** (`str`, optional): Human-readable name used in the error message. Defaults to
`'value'`.
### Returns
- **tuple**: `(True, value)` if the value is safe, else `(False, error_message)`. The shape
is suitable for `lib.base.coe()`.
### Example
>>> host = lib.base.coe(lib.shell.safe_cli_value(args.HOSTNAME, '--hostname'))
"""
if isinstance(value, str) and value.startswith('-'):
return False, f'Refusing {name} that starts with "-": {value}'
return True, value
def which(name):
"""
Locate an executable in the system PATH, like the `which` command.
Thin wrapper around `shutil.which()` so callers do not need to import it
directly and the lookup stays consistent across consumers.
### Parameters
- **name** (`str`): Program name to look for (e.g. `lynis`).
### Returns
- **str or None**: The absolute path to the executable, or `None` if it is
not found in PATH.
### Example
>>> which('sh')
'/usr/bin/sh'
"""
return shutil.which(name)