Harden config import path handling
- Validate configuration and backup filenames - Ensure paths stay within configured base directories - Reject invalid/binary uploads earlierpull/1111/head
parent
6f71fc201f
commit
d91ebdcf7c
|
|
@ -69,6 +69,23 @@ def ResponseObject(status=True, message=None, data=None, status_code = 200) -> F
|
|||
response.content_type = "application/json"
|
||||
return response
|
||||
|
||||
def _safe_basename(value: str) -> str | None:
|
||||
if not value or value in (".", ".."):
|
||||
return None
|
||||
if "\x00" in value:
|
||||
return None
|
||||
if os.path.basename(value) != value:
|
||||
return None
|
||||
if os.sep in value or (os.altsep and os.altsep in value):
|
||||
return None
|
||||
return value
|
||||
|
||||
def _is_path_within_base(path: str, base: str) -> bool:
|
||||
try:
|
||||
return os.path.commonpath([os.path.realpath(path), os.path.realpath(base)]) == os.path.realpath(base)
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
'''
|
||||
Flask App
|
||||
'''
|
||||
|
|
@ -375,6 +392,11 @@ def API_addWireguardConfiguration():
|
|||
if data.get("Protocol") not in ProtocolsEnabled():
|
||||
return ResponseObject(False, "Please provide a valid protocol: wg / awg.")
|
||||
|
||||
config_name = _safe_basename(data.get("ConfigurationName", ""))
|
||||
if not config_name:
|
||||
return ResponseObject(False, "Invalid configuration name", "ConfigurationName")
|
||||
data["ConfigurationName"] = config_name
|
||||
|
||||
# Check duplicate names, ports, address
|
||||
for i in WireguardConfigurations.values():
|
||||
if i.Name == data['ConfigurationName']:
|
||||
|
|
@ -398,18 +420,27 @@ def API_addWireguardConfiguration():
|
|||
"wg": DashboardConfig.GetConfig("Server", "wg_conf_path")[1],
|
||||
"awg": DashboardConfig.GetConfig("Server", "awg_conf_path")[1]
|
||||
}
|
||||
backup_name = _safe_basename(data["Backup"])
|
||||
if not backup_name or not backup_name.endswith(".conf"):
|
||||
return ResponseObject(False, "Invalid backup filename")
|
||||
backup_dir_wg = os.path.join(path['wg'], 'WGDashboard_Backup')
|
||||
backup_dir_awg = os.path.join(path['awg'], 'WGDashboard_Backup')
|
||||
wg_conf = os.path.join(backup_dir_wg, backup_name)
|
||||
wg_sql = os.path.join(backup_dir_wg, backup_name.replace('.conf', '.sql'))
|
||||
awg_conf = os.path.join(backup_dir_awg, backup_name)
|
||||
awg_sql = os.path.join(backup_dir_awg, backup_name.replace('.conf', '.sql'))
|
||||
if not (_is_path_within_base(wg_conf, backup_dir_wg) and _is_path_within_base(awg_conf, backup_dir_awg)):
|
||||
return ResponseObject(False, "Invalid backup filename")
|
||||
|
||||
if (os.path.exists(os.path.join(path['wg'], 'WGDashboard_Backup', data["Backup"])) and
|
||||
os.path.exists(os.path.join(path['wg'], 'WGDashboard_Backup', data["Backup"].replace('.conf', '.sql')))):
|
||||
if (os.path.exists(wg_conf) and os.path.exists(wg_sql)):
|
||||
protocol = "wg"
|
||||
elif (os.path.exists(os.path.join(path['awg'], 'WGDashboard_Backup', data["Backup"])) and
|
||||
os.path.exists(os.path.join(path['awg'], 'WGDashboard_Backup', data["Backup"].replace('.conf', '.sql')))):
|
||||
elif (os.path.exists(awg_conf) and os.path.exists(awg_sql)):
|
||||
protocol = "awg"
|
||||
else:
|
||||
return ResponseObject(False, "Backup does not exist")
|
||||
|
||||
shutil.copy(
|
||||
os.path.join(path[protocol], 'WGDashboard_Backup', data["Backup"]),
|
||||
os.path.join(path[protocol], 'WGDashboard_Backup', backup_name),
|
||||
os.path.join(path[protocol], f'{data["ConfigurationName"]}.conf')
|
||||
)
|
||||
WireguardConfigurations[data['ConfigurationName']] = (
|
||||
|
|
|
|||
|
|
@ -64,6 +64,8 @@ class WireguardConfiguration:
|
|||
self.AllPeerShareLinks = AllPeerShareLinks
|
||||
self.DashboardWebHooks = DashboardWebHooks
|
||||
self.configPath = os.path.join(self.__getProtocolPath(), f'{self.Name}.conf')
|
||||
if not self.__isPathWithinBase(self.configPath, self.__getProtocolPath()):
|
||||
raise self.InvalidConfigurationFileException("Configuration path is invalid")
|
||||
self.engine: sqlalchemy.Engine = sqlalchemy.create_engine(ConnectionString("wgdashboard"))
|
||||
self.metadata: sqlalchemy.MetaData = sqlalchemy.MetaData()
|
||||
self.dbType = self.DashboardConfig.GetConfig("Database", "type")[1]
|
||||
|
|
@ -83,6 +85,8 @@ class WireguardConfiguration:
|
|||
else:
|
||||
self.Name = data["ConfigurationName"]
|
||||
self.configPath = os.path.join(self.__getProtocolPath(), f'{self.Name}.conf')
|
||||
if not self.__isPathWithinBase(self.configPath, self.__getProtocolPath()):
|
||||
raise self.InvalidConfigurationFileException("Configuration path is invalid")
|
||||
|
||||
for i in dir(self):
|
||||
if str(i) in data.keys():
|
||||
|
|
|
|||
Loading…
Reference in New Issue