fix: peer key validation regex (#1158)

* fix: peer key validation regex

* refactor: cache cleaned AllowedIPs from validation pass to avoid duplication

---------

Co-authored-by: Dan Hollis <dh@redteam.sh>
pull/1165/head
Dan Hollis 2026-03-03 03:04:30 -05:00 committed by GitHub
parent b9c271ff4c
commit eeedf705aa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 21 additions and 17 deletions

View File

@ -6,7 +6,7 @@ from flask import current_app
from .PeerJobs import PeerJobs
from .AmneziaPeer import AmneziaPeer
from .PeerShareLinks import PeerShareLinks
from .Utilities import RegexMatch, CheckAddress
from .Utilities import RegexMatch, CheckAddress, CheckPeerKey
from .WireguardConfiguration import WireguardConfiguration
from .DashboardWebHooks import DashboardWebHooks
@ -241,6 +241,15 @@ class AmneziaConfiguration(WireguardConfiguration):
"peers": []
}
try:
cleanedAllowedIPs = {}
for p in peers:
newAllowedIPs = p['allowed_ip'].replace(" ", "")
if not CheckAddress(newAllowedIPs):
return False, [], "Allowed IPs entry format is incorrect"
if not CheckPeerKey(p["id"]):
return False, [], "Peer key format is incorrect"
cleanedAllowedIPs[p["id"]] = newAllowedIPs
with self.engine.begin() as conn:
for i in peers:
newPeer = {
@ -276,14 +285,7 @@ class AmneziaConfiguration(WireguardConfiguration):
with open(uid, "w+") as f:
f.write(p['preshared_key'])
newAllowedIPs = p['allowed_ip'].replace(" ", "")
if not CheckAddress(newAllowedIPs):
return False, [], "Allowed IPs entry format is incorrect"
if not re.match(r"^[A-Za-z0-9+/]{42}[A-Ea-e0-9]=$", p["id"]):
return False, [], "Peer key format is incorrect"
command = [self.Protocol, "set", self.Name, "peer", p['id'], "allowed-ips", newAllowedIPs, "preshared-key", uid if presharedKeyExist else "/dev/null"]
command = [self.Protocol, "set", self.Name, "peer", p['id'], "allowed-ips", cleanedAllowedIPs[p["id"]], "preshared-key", uid if presharedKeyExist else "/dev/null"]
subprocess.check_output(command, stderr=subprocess.STDOUT)
if presharedKeyExist:

View File

@ -512,6 +512,15 @@ class WireguardConfiguration:
"peers": []
}
try:
cleanedAllowedIPs = {}
for p in peers:
newAllowedIPs = p['allowed_ip'].replace(" ", "")
if not CheckAddress(newAllowedIPs):
return False, [], "Allowed IPs entry format is incorrect"
if not CheckPeerKey(p["id"]):
return False, [], "Peer key format is incorrect"
cleanedAllowedIPs[p["id"]] = newAllowedIPs
with self.engine.begin() as conn:
for i in peers:
newPeer = {
@ -547,14 +556,7 @@ class WireguardConfiguration:
with open(uid, "w+") as f:
f.write(p['preshared_key'])
newAllowedIPs = p['allowed_ip'].replace(" ", "")
if not CheckAddress(newAllowedIPs):
return False, [], "Allowed IPs entry format is incorrect"
if not CheckPeerKey(p["id"]):
return False, [], "Peer key format is incorrect"
command = [self.Protocol, "set", self.Name, "peer", p['id'], "allowed-ips", newAllowedIPs, "preshared-key", uid if presharedKeyExist else "/dev/null"]
command = [self.Protocol, "set", self.Name, "peer", p['id'], "allowed-ips", cleanedAllowedIPs[p["id"]], "preshared-key", uid if presharedKeyExist else "/dev/null"]
subprocess.check_output(command, stderr=subprocess.STDOUT)
if presharedKeyExist: