Fixes and improvements

pull/1149/head
mahemium 2026-02-25 10:34:16 +03:00
parent 0c32e6073f
commit 6e7b999310
6 changed files with 27 additions and 21 deletions

View File

@ -975,8 +975,11 @@ def API_addPeers(configName):
for i in allowed_ips:
found = False
for subnet in availableIps.keys():
network = ipaddress.ip_network(subnet, False)
ap = ipaddress.ip_network(i)
try:
network = ipaddress.ip_network(subnet, False)
ap = ipaddress.ip_network(i)
except ValueError as e:
return ResponseObject(False, str(e))
if network.version == ap.version and ap.subnet_of(network):
found = True
@ -1000,8 +1003,7 @@ def API_addPeers(configName):
return ResponseObject(status=status, message=message, data=addedPeers)
except Exception as e:
app.logger.error("Add peers failed", e)
return ResponseObject(False,
f"Add peers failed. Reason: {message}")
return ResponseObject(False, f"Add peers failed.")
return ResponseObject(False, "Configuration does not exist")
@ -1734,4 +1736,4 @@ def index():
if __name__ == "__main__":
startThreads()
DashboardPlugins.startThreads()
app.run(host=app_ip, debug=False, port=app_port)
app.run(host=app_ip, debug=False, port=app_port)

View File

@ -6,7 +6,7 @@ from flask import current_app
from .PeerJobs import PeerJobs
from .AmneziaPeer import AmneziaPeer
from .PeerShareLinks import PeerShareLinks
from .Utilities import RegexMatch
from .Utilities import RegexMatch, CheckAddress
from .WireguardConfiguration import WireguardConfiguration
from .DashboardWebHooks import DashboardWebHooks
@ -277,13 +277,13 @@ class AmneziaConfiguration(WireguardConfiguration):
f.write(p['preshared_key'])
newAllowedIPs = p['allowed_ip'].replace(" ", "")
if not re.match(r"^[0-9a-fA-F\.\,:/ ]+$", newAllowedIPs):
if not CheckAddress(newAllowedIPs):
return False, [], "Allowed IPs entry format is incorrect"
if not re.match(r"^[A-Za-z0-9+/]{42}[A-Ea-e0-9]=$", p["id"]):
return False, [], "Peer key format is incorrect"
command = [self.Protocol, "set", self.Name, "peer", p['id'], "allowed-ips", newAllowedIPs, "preshared-key", uid if presharedKeyExist else ""]
command = [self.Protocol, "set", self.Name, "peer", p['id'], "allowed-ips", newAllowedIPs, "preshared-key", uid if presharedKeyExist else "/dev/null"]
subprocess.check_output(command, stderr=subprocess.STDOUT)
if presharedKeyExist:
@ -311,4 +311,4 @@ class AmneziaConfiguration(WireguardConfiguration):
with self.engine.connect() as conn:
restricted = conn.execute(self.peersRestrictedTable.select()).mappings().fetchall()
for i in restricted:
self.RestrictedPeers.append(AmneziaPeer(i, self))
self.RestrictedPeers.append(AmneziaPeer(i, self))

View File

@ -78,8 +78,7 @@ class AmneziaPeer(Peer):
f.write(preshared_key)
newAllowedIPs = allowed_ip.replace(" ", "")
if not re.match(r"^[0-9a-fA-F\.\,:/ ]+$", newAllowedIPs):
if not CheckAddress(newAllowedIPs):
return False, "Allowed IPs entry format is incorrect"
command = [self.configuration.Protocol, "set", self.configuration.Name, "peer", self.id, "allowed-ips", newAllowedIPs, "preshared-key", uid if psk_exist else "/dev/null"]

View File

@ -115,7 +115,7 @@ class Peer:
f.write(preshared_key)
newAllowedIPs = allowed_ip.replace(" ", "")
if not re.match(r"^[0-9a-fA-F\.\,:/ ]+$", newAllowedIPs):
if not CheckAddress(newAllowedIPs):
return False, "Allowed IPs entry format is incorrect"
command = [self.configuration.Protocol, "set", self.configuration.Name, "peer", self.id, "allowed-ips", newAllowedIPs, "preshared-key", uid if psk_exist else "/dev/null"]
@ -400,4 +400,4 @@ class Peer:
hours, remainder = divmod(delta.total_seconds(), 3600)
minutes, seconds = divmod(remainder, 60)
return f"{int(hours):02}:{int(minutes):02}:{int(seconds):02}"
return f"{int(hours):02}:{int(minutes):02}:{int(seconds):02}"

View File

@ -54,6 +54,9 @@ def CheckAddress(ips_str: str) -> bool:
return False
return True
def CheckPeerKey(peer_key: str) -> bool:
return re.match(r"^[A-Za-z0-9+/]{43}=$", peer_key)
def ValidateDNSAddress(addresses_str: str) -> tuple[bool, str | None]:
if len(addresses_str) == 0:
return False, "Got an empty list/string to check for valid DNS-addresses"
@ -110,4 +113,4 @@ def ValidatePasswordStrength(password: str) -> tuple[bool, str] | tuple[bool, No
if not re.search(r'[$&+,:;=?@#|\'<>.\-^*()%!~_-]', password):
return False, "Password must contain at least 1 special character from $&+,:;=?@#|'<>.-^*()%!~_-"
return True, None
return True, None

View File

@ -19,7 +19,9 @@ from .Utilities import StringToBoolean, \
GenerateWireguardPublicKey, \
RegexMatch, \
ValidateDNSAddress, \
ValidateEndpointAllowedIPs
ValidateEndpointAllowedIPs, \
CheckAddress, \
CheckPeerKey
from .WireguardConfigurationInfo import WireguardConfigurationInfo, PeerGroupsClass
from .DashboardWebHooks import DashboardWebHooks
@ -546,13 +548,13 @@ class WireguardConfiguration:
f.write(p['preshared_key'])
newAllowedIPs = p['allowed_ip'].replace(" ", "")
if not re.match(r"^[0-9a-fA-F\.\,:/ ]+$", newAllowedIPs):
if not CheckAddress(newAllowedIPs):
return False, [], "Allowed IPs entry format is incorrect"
if not re.match(r"^[A-Za-z0-9+/]{42}[A-Ea-e0-9]=$", p["id"]):
if not CheckPeerKey(p["id"]):
return False, [], "Peer key format is incorrect"
command = [self.Protocol, "set", self.Name, "peer", p['id'], "allowed-ips", newAllowedIPs, "preshared-key", uid if presharedKeyExist else ""]
command = [self.Protocol, "set", self.Name, "peer", p['id'], "allowed-ips", newAllowedIPs, "preshared-key", uid if presharedKeyExist else "/dev/null"]
subprocess.check_output(command, stderr=subprocess.STDOUT)
if presharedKeyExist:
@ -611,13 +613,13 @@ class WireguardConfiguration:
f.write(restrictedPeer['preshared_key'])
newAllowedIPs = restrictedPeer['allowed_ip'].replace(" ", "")
if not re.match(r"^[0-9a-fA-F\.\,:/ ]+$", newAllowedIPs):
if not CheckAddress(newAllowedIPs):
return False, "Allowed IPs entry format is incorrect"
if not re.match(r"^[A-Za-z0-9+/]{42}[A-Ea-e0-9]=$", restrictedPeer["id"]):
if not CheckPeerKey(restrictedPeer["id"]):
return False, "Peer key format is incorrect"
command = [self.Protocol, "set", self.Name, "peer", restrictedPeer["id"], "allowed-ips", newAllowedIPs, "preshared-key", uid if presharedKeyExist else ""]
command = [self.Protocol, "set", self.Name, "peer", restrictedPeer["id"], "allowed-ips", newAllowedIPs, "preshared-key", uid if presharedKeyExist else "/dev/null"]
subprocess.check_output(command, stderr=subprocess.STDOUT)
if presharedKeyExist: os.remove(uid)