diff --git a/src/modules/Utilities.py b/src/modules/Utilities.py index c655133c..c14123dd 100644 --- a/src/modules/Utilities.py +++ b/src/modules/Utilities.py @@ -68,12 +68,55 @@ def ValidateEndpointAllowedIPs(IPs) -> tuple[bool, str] | tuple[bool, None]: return False, str(e) return True, None +_ALLOWED_COMMANDS = { + "wg": ["/usr/sbin/wg", "/usr/bin/wg"], + "wg-quick": ["/usr/sbin/wg-quick", "/usr/bin/wg-quick"], + "awg": ["/usr/sbin/awg", "/usr/bin/awg"], + "awg-quick": ["/usr/sbin/awg-quick", "/usr/bin/awg-quick"], +} +_ALLOWED_SUDO = ["/usr/sbin/sudo", "/usr/bin/sudo"] +_ALLOWED_ARG_RE = re.compile(r"^[A-Za-z0-9@._+:/=,-]+$") + + +def _resolve_allowed_exe(exe: str) -> str: + if not exe: + raise ValueError("Empty command") + exe_name = os.path.basename(exe) + if exe_name not in _ALLOWED_COMMANDS: + raise ValueError(f"Command not allowed: {exe_name}") + resolved = exe if os.path.isabs(exe) else shutil.which(exe_name) + if not resolved: + raise FileNotFoundError(f"{exe_name} not found in PATH") + resolved = os.path.realpath(resolved) + if resolved not in _ALLOWED_COMMANDS[exe_name]: + raise FileNotFoundError(f"Unexpected path for {exe_name}: {resolved}") + return resolved + + +def _validate_args(args: list[str]) -> list[str]: + cleaned = [] + for arg in args: + if arg is None: + raise ValueError("Invalid argument: None") + if not isinstance(arg, str): + arg = str(arg) + if "\x00" in arg or not _ALLOWED_ARG_RE.fullmatch(arg): + raise ValueError(f"Invalid argument: {arg}") + cleaned.append(arg) + return cleaned + + def RunCommand(cmd: list[str], input_data: bytes | None = None, require_root: bool = False) -> bytes: - exe = shutil.which(cmd[0]) if not os.path.isabs(cmd[0]) else cmd[0] - if exe: - cmd = [exe] + cmd[1:] + if not cmd: + raise ValueError("Empty command list") + exe = _resolve_allowed_exe(cmd[0]) + args = _validate_args(cmd[1:]) + cmd = [exe] + args if require_root and os.geteuid() != 0: sudo_path = shutil.which("sudo") or "/usr/sbin/sudo" + sudo_path = os.path.realpath(sudo_path) + if sudo_path not in _ALLOWED_SUDO: + raise FileNotFoundError(f"Unexpected sudo path: {sudo_path}") cmd = [sudo_path, "--non-interactive"] + cmd return subprocess.check_output(cmd, input=input_data, stderr=subprocess.STDOUT)