Harden RunCommand with allowlist validation

pull/1091/head
Super User 2026-01-30 13:13:38 -05:00
parent 07630075a3
commit e98cfa8545
1 changed files with 46 additions and 3 deletions

View File

@ -68,12 +68,55 @@ def ValidateEndpointAllowedIPs(IPs) -> tuple[bool, str] | tuple[bool, None]:
return False, str(e)
return True, None
_ALLOWED_COMMANDS = {
"wg": ["/usr/sbin/wg", "/usr/bin/wg"],
"wg-quick": ["/usr/sbin/wg-quick", "/usr/bin/wg-quick"],
"awg": ["/usr/sbin/awg", "/usr/bin/awg"],
"awg-quick": ["/usr/sbin/awg-quick", "/usr/bin/awg-quick"],
}
_ALLOWED_SUDO = ["/usr/sbin/sudo", "/usr/bin/sudo"]
_ALLOWED_ARG_RE = re.compile(r"^[A-Za-z0-9@._+:/=,-]+$")
def _resolve_allowed_exe(exe: str) -> str:
if not exe:
raise ValueError("Empty command")
exe_name = os.path.basename(exe)
if exe_name not in _ALLOWED_COMMANDS:
raise ValueError(f"Command not allowed: {exe_name}")
resolved = exe if os.path.isabs(exe) else shutil.which(exe_name)
if not resolved:
raise FileNotFoundError(f"{exe_name} not found in PATH")
resolved = os.path.realpath(resolved)
if resolved not in _ALLOWED_COMMANDS[exe_name]:
raise FileNotFoundError(f"Unexpected path for {exe_name}: {resolved}")
return resolved
def _validate_args(args: list[str]) -> list[str]:
cleaned = []
for arg in args:
if arg is None:
raise ValueError("Invalid argument: None")
if not isinstance(arg, str):
arg = str(arg)
if "\x00" in arg or not _ALLOWED_ARG_RE.fullmatch(arg):
raise ValueError(f"Invalid argument: {arg}")
cleaned.append(arg)
return cleaned
def RunCommand(cmd: list[str], input_data: bytes | None = None, require_root: bool = False) -> bytes:
exe = shutil.which(cmd[0]) if not os.path.isabs(cmd[0]) else cmd[0]
if exe:
cmd = [exe] + cmd[1:]
if not cmd:
raise ValueError("Empty command list")
exe = _resolve_allowed_exe(cmd[0])
args = _validate_args(cmd[1:])
cmd = [exe] + args
if require_root and os.geteuid() != 0:
sudo_path = shutil.which("sudo") or "/usr/sbin/sudo"
sudo_path = os.path.realpath(sudo_path)
if sudo_path not in _ALLOWED_SUDO:
raise FileNotFoundError(f"Unexpected sudo path: {sudo_path}")
cmd = [sudo_path, "--non-interactive"] + cmd
return subprocess.check_output(cmd, input=input_data, stderr=subprocess.STDOUT)