From 6e7b999310bee751c27efffc58d922bbd0930e85 Mon Sep 17 00:00:00 2001 From: mahemium Date: Wed, 25 Feb 2026 10:34:16 +0300 Subject: [PATCH] Fixes and improvements --- src/dashboard.py | 12 +++++++----- src/modules/AmneziaConfiguration.py | 8 ++++---- src/modules/AmneziaPeer.py | 3 +-- src/modules/Peer.py | 4 ++-- src/modules/Utilities.py | 5 ++++- src/modules/WireguardConfiguration.py | 16 +++++++++------- 6 files changed, 27 insertions(+), 21 deletions(-) diff --git a/src/dashboard.py b/src/dashboard.py index b13fabe7..bc4a73a6 100644 --- a/src/dashboard.py +++ b/src/dashboard.py @@ -975,8 +975,11 @@ def API_addPeers(configName): for i in allowed_ips: found = False for subnet in availableIps.keys(): - network = ipaddress.ip_network(subnet, False) - ap = ipaddress.ip_network(i) + try: + network = ipaddress.ip_network(subnet, False) + ap = ipaddress.ip_network(i) + except ValueError as e: + return ResponseObject(False, str(e)) if network.version == ap.version and ap.subnet_of(network): found = True @@ -1000,8 +1003,7 @@ def API_addPeers(configName): return ResponseObject(status=status, message=message, data=addedPeers) except Exception as e: app.logger.error("Add peers failed", e) - return ResponseObject(False, - f"Add peers failed. Reason: {message}") + return ResponseObject(False, f"Add peers failed.") return ResponseObject(False, "Configuration does not exist") @@ -1734,4 +1736,4 @@ def index(): if __name__ == "__main__": startThreads() DashboardPlugins.startThreads() - app.run(host=app_ip, debug=False, port=app_port) \ No newline at end of file + app.run(host=app_ip, debug=False, port=app_port) diff --git a/src/modules/AmneziaConfiguration.py b/src/modules/AmneziaConfiguration.py index 898bb500..71ca9b0f 100644 --- a/src/modules/AmneziaConfiguration.py +++ b/src/modules/AmneziaConfiguration.py @@ -6,7 +6,7 @@ from flask import current_app from .PeerJobs import PeerJobs from .AmneziaPeer import AmneziaPeer from .PeerShareLinks import PeerShareLinks -from .Utilities import RegexMatch +from .Utilities import RegexMatch, CheckAddress from .WireguardConfiguration import WireguardConfiguration from .DashboardWebHooks import DashboardWebHooks @@ -277,13 +277,13 @@ class AmneziaConfiguration(WireguardConfiguration): f.write(p['preshared_key']) newAllowedIPs = p['allowed_ip'].replace(" ", "") - if not re.match(r"^[0-9a-fA-F\.\,:/ ]+$", newAllowedIPs): + if not CheckAddress(newAllowedIPs): return False, [], "Allowed IPs entry format is incorrect" if not re.match(r"^[A-Za-z0-9+/]{42}[A-Ea-e0-9]=$", p["id"]): return False, [], "Peer key format is incorrect" - command = [self.Protocol, "set", self.Name, "peer", p['id'], "allowed-ips", newAllowedIPs, "preshared-key", uid if presharedKeyExist else ""] + command = [self.Protocol, "set", self.Name, "peer", p['id'], "allowed-ips", newAllowedIPs, "preshared-key", uid if presharedKeyExist else "/dev/null"] subprocess.check_output(command, stderr=subprocess.STDOUT) if presharedKeyExist: @@ -311,4 +311,4 @@ class AmneziaConfiguration(WireguardConfiguration): with self.engine.connect() as conn: restricted = conn.execute(self.peersRestrictedTable.select()).mappings().fetchall() for i in restricted: - self.RestrictedPeers.append(AmneziaPeer(i, self)) \ No newline at end of file + self.RestrictedPeers.append(AmneziaPeer(i, self)) diff --git a/src/modules/AmneziaPeer.py b/src/modules/AmneziaPeer.py index 4dae3826..509f4305 100644 --- a/src/modules/AmneziaPeer.py +++ b/src/modules/AmneziaPeer.py @@ -78,8 +78,7 @@ class AmneziaPeer(Peer): f.write(preshared_key) newAllowedIPs = allowed_ip.replace(" ", "") - - if not re.match(r"^[0-9a-fA-F\.\,:/ ]+$", newAllowedIPs): + if not CheckAddress(newAllowedIPs): return False, "Allowed IPs entry format is incorrect" command = [self.configuration.Protocol, "set", self.configuration.Name, "peer", self.id, "allowed-ips", newAllowedIPs, "preshared-key", uid if psk_exist else "/dev/null"] diff --git a/src/modules/Peer.py b/src/modules/Peer.py index b7d31d9b..8aa39e12 100644 --- a/src/modules/Peer.py +++ b/src/modules/Peer.py @@ -115,7 +115,7 @@ class Peer: f.write(preshared_key) newAllowedIPs = allowed_ip.replace(" ", "") - if not re.match(r"^[0-9a-fA-F\.\,:/ ]+$", newAllowedIPs): + if not CheckAddress(newAllowedIPs): return False, "Allowed IPs entry format is incorrect" command = [self.configuration.Protocol, "set", self.configuration.Name, "peer", self.id, "allowed-ips", newAllowedIPs, "preshared-key", uid if psk_exist else "/dev/null"] @@ -400,4 +400,4 @@ class Peer: hours, remainder = divmod(delta.total_seconds(), 3600) minutes, seconds = divmod(remainder, 60) - return f"{int(hours):02}:{int(minutes):02}:{int(seconds):02}" \ No newline at end of file + return f"{int(hours):02}:{int(minutes):02}:{int(seconds):02}" diff --git a/src/modules/Utilities.py b/src/modules/Utilities.py index 5e187869..661d3500 100644 --- a/src/modules/Utilities.py +++ b/src/modules/Utilities.py @@ -54,6 +54,9 @@ def CheckAddress(ips_str: str) -> bool: return False return True +def CheckPeerKey(peer_key: str) -> bool: + return re.match(r"^[A-Za-z0-9+/]{43}=$", peer_key) + def ValidateDNSAddress(addresses_str: str) -> tuple[bool, str | None]: if len(addresses_str) == 0: return False, "Got an empty list/string to check for valid DNS-addresses" @@ -110,4 +113,4 @@ def ValidatePasswordStrength(password: str) -> tuple[bool, str] | tuple[bool, No if not re.search(r'[$&+,:;=?@#|\'<>.\-^*()%!~_-]', password): return False, "Password must contain at least 1 special character from $&+,:;=?@#|'<>.-^*()%!~_-" - return True, None \ No newline at end of file + return True, None diff --git a/src/modules/WireguardConfiguration.py b/src/modules/WireguardConfiguration.py index 939ebfe5..bd197aa1 100644 --- a/src/modules/WireguardConfiguration.py +++ b/src/modules/WireguardConfiguration.py @@ -19,7 +19,9 @@ from .Utilities import StringToBoolean, \ GenerateWireguardPublicKey, \ RegexMatch, \ ValidateDNSAddress, \ - ValidateEndpointAllowedIPs + ValidateEndpointAllowedIPs, \ + CheckAddress, \ + CheckPeerKey from .WireguardConfigurationInfo import WireguardConfigurationInfo, PeerGroupsClass from .DashboardWebHooks import DashboardWebHooks @@ -546,13 +548,13 @@ class WireguardConfiguration: f.write(p['preshared_key']) newAllowedIPs = p['allowed_ip'].replace(" ", "") - if not re.match(r"^[0-9a-fA-F\.\,:/ ]+$", newAllowedIPs): + if not CheckAddress(newAllowedIPs): return False, [], "Allowed IPs entry format is incorrect" - if not re.match(r"^[A-Za-z0-9+/]{42}[A-Ea-e0-9]=$", p["id"]): + if not CheckPeerKey(p["id"]): return False, [], "Peer key format is incorrect" - command = [self.Protocol, "set", self.Name, "peer", p['id'], "allowed-ips", newAllowedIPs, "preshared-key", uid if presharedKeyExist else ""] + command = [self.Protocol, "set", self.Name, "peer", p['id'], "allowed-ips", newAllowedIPs, "preshared-key", uid if presharedKeyExist else "/dev/null"] subprocess.check_output(command, stderr=subprocess.STDOUT) if presharedKeyExist: @@ -611,13 +613,13 @@ class WireguardConfiguration: f.write(restrictedPeer['preshared_key']) newAllowedIPs = restrictedPeer['allowed_ip'].replace(" ", "") - if not re.match(r"^[0-9a-fA-F\.\,:/ ]+$", newAllowedIPs): + if not CheckAddress(newAllowedIPs): return False, "Allowed IPs entry format is incorrect" - if not re.match(r"^[A-Za-z0-9+/]{42}[A-Ea-e0-9]=$", restrictedPeer["id"]): + if not CheckPeerKey(restrictedPeer["id"]): return False, "Peer key format is incorrect" - command = [self.Protocol, "set", self.Name, "peer", restrictedPeer["id"], "allowed-ips", newAllowedIPs, "preshared-key", uid if presharedKeyExist else ""] + command = [self.Protocol, "set", self.Name, "peer", restrictedPeer["id"], "allowed-ips", newAllowedIPs, "preshared-key", uid if presharedKeyExist else "/dev/null"] subprocess.check_output(command, stderr=subprocess.STDOUT) if presharedKeyExist: os.remove(uid)