selftests/net: Add env for container based tests

Add an env NetDrvContEnv for container based selftests. This automates
the setup of a netns, netkit pair with one inside the netns, and a BPF
program that forwards skbs from the NETIF host inside the container.

Currently only netkit is used, but other virtual netdevs e.g. veth can
be used too.

Expect netkit container datapath selftests to have a publicly routable
IP prefix to assign to netkit in a container, such that packets will
land on eth0. The BPF skb forward program will then forward such packets
from the host netns to the container netns.

Signed-off-by: David Wei <dw@davidwei.uk>
Signed-off-by: Daniel Borkmann <daniel@iogearbox.net>
Acked-by: Stanislav Fomichev <sdf@fomichev.me>
Reviewed-by: Nikolay Aleksandrov <razor@blackwall.org>
Link: https://patch.msgid.link/20260115082603.219152-15-daniel@iogearbox.net
Signed-off-by: Paolo Abeni <pabeni@redhat.com>
master
David Wei 2026-01-15 09:26:01 +01:00 committed by Paolo Abeni
parent 61d99ce3df
commit 6be87fbb27
4 changed files with 127 additions and 6 deletions

View File

@ -62,6 +62,13 @@ LOCAL_V4, LOCAL_V6, REMOTE_V4, REMOTE_V6
Local and remote endpoint IP addresses.
LOCAL_PREFIX_V4, LOCAL_PREFIX_V6
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Local IP prefix/subnet which can be used to allocate extra IP addresses (for
network name spaces behind macvlan, veth, netkit devices). DUT must be
reachable using these addresses from the endpoint.
REMOTE_TYPE
~~~~~~~~~~~

View File

@ -3,6 +3,7 @@
"""
Driver test environment (hardware-only tests).
NetDrvEnv and NetDrvEpEnv are the main environment classes.
NetDrvContEnv extends NetDrvEpEnv with netkit container support.
Former is for local host only tests, latter creates / connects
to a remote endpoint. See NIPA wiki for more information about
running and writing driver tests.
@ -29,7 +30,7 @@ try:
from net.lib.py import ksft_eq, ksft_ge, ksft_in, ksft_is, ksft_lt, \
ksft_ne, ksft_not_in, ksft_raises, ksft_true, ksft_gt, ksft_not_none
from drivers.net.lib.py import GenerateTraffic, Remote, Iperf3Runner
from drivers.net.lib.py import NetDrvEnv, NetDrvEpEnv
from drivers.net.lib.py import NetDrvEnv, NetDrvEpEnv, NetDrvContEnv
__all__ = ["NetNS", "NetNSEnter", "NetdevSimDev",
"EthtoolFamily", "NetdevFamily", "NetshaperFamily",
@ -44,8 +45,8 @@ try:
"ksft_eq", "ksft_ge", "ksft_in", "ksft_is", "ksft_lt",
"ksft_ne", "ksft_not_in", "ksft_raises", "ksft_true", "ksft_gt",
"ksft_not_none", "ksft_not_none",
"NetDrvEnv", "NetDrvEpEnv", "GenerateTraffic", "Remote",
"Iperf3Runner"]
"NetDrvEnv", "NetDrvEpEnv", "NetDrvContEnv", "GenerateTraffic",
"Remote", "Iperf3Runner"]
except ModuleNotFoundError as e:
print("Failed importing `net` library from kernel sources")
print(str(e))

View File

@ -3,6 +3,7 @@
"""
Driver test environment.
NetDrvEnv and NetDrvEpEnv are the main environment classes.
NetDrvContEnv extends NetDrvEpEnv with netkit container support.
Former is for local host only tests, latter creates / connects
to a remote endpoint. See NIPA wiki for more information about
running and writing driver tests.
@ -43,12 +44,12 @@ try:
"ksft_ne", "ksft_not_in", "ksft_raises", "ksft_true", "ksft_gt",
"ksft_not_none", "ksft_not_none"]
from .env import NetDrvEnv, NetDrvEpEnv
from .env import NetDrvEnv, NetDrvEpEnv, NetDrvContEnv
from .load import GenerateTraffic, Iperf3Runner
from .remote import Remote
__all__ += ["NetDrvEnv", "NetDrvEpEnv", "GenerateTraffic", "Remote",
"Iperf3Runner"]
__all__ += ["NetDrvEnv", "NetDrvEpEnv", "NetDrvContEnv", "GenerateTraffic",
"Remote", "Iperf3Runner"]
except ModuleNotFoundError as e:
print("Failed importing `net` library from kernel sources")
print(str(e))

View File

@ -1,6 +1,8 @@
# SPDX-License-Identifier: GPL-2.0
import ipaddress
import os
import re
import time
from pathlib import Path
from lib.py import KsftSkipEx, KsftXfailEx
@ -8,6 +10,7 @@ from lib.py import ksft_setup, wait_file
from lib.py import cmd, ethtool, ip, CmdExitFailure
from lib.py import NetNS, NetdevSimDev
from .remote import Remote
from . import bpftool
class NetDrvEnvBase:
@ -289,3 +292,112 @@ class NetDrvEpEnv(NetDrvEnvBase):
data.get('stats-block-usecs', 0) / 1000 / 1000
time.sleep(self._stats_settle_time)
class NetDrvContEnv(NetDrvEpEnv):
"""
Class for an environment with a netkit pair setup for forwarding traffic
between the physical interface and a network namespace.
"""
def __init__(self, src_path, nk_rxqueues=1, **kwargs):
super().__init__(src_path, **kwargs)
self.require_ipver("6")
local_prefix = self.env.get("LOCAL_PREFIX_V6")
if not local_prefix:
raise KsftSkipEx("LOCAL_PREFIX_V6 required")
local_prefix = local_prefix.rstrip("/64").rstrip("::").rstrip(":")
self.ipv6_prefix = f"{local_prefix}::"
self.nk_host_ipv6 = f"{local_prefix}::2:1"
self.nk_guest_ipv6 = f"{local_prefix}::2:2"
self.netns = None
self._nk_host_ifname = None
self._nk_guest_ifname = None
self._tc_attached = False
self._bpf_prog_pref = None
self._bpf_prog_id = None
ip(f"link add type netkit mode l2 forward peer forward numrxqueues {nk_rxqueues}")
all_links = ip("-d link show", json=True)
netkit_links = [link for link in all_links
if link.get('linkinfo', {}).get('info_kind') == 'netkit'
and 'UP' not in link.get('flags', [])]
if len(netkit_links) != 2:
raise KsftSkipEx("Failed to create netkit pair")
netkit_links.sort(key=lambda x: x['ifindex'])
self._nk_host_ifname = netkit_links[1]['ifname']
self._nk_guest_ifname = netkit_links[0]['ifname']
self.nk_host_ifindex = netkit_links[1]['ifindex']
self.nk_guest_ifindex = netkit_links[0]['ifindex']
self._setup_ns()
self._attach_bpf()
def __del__(self):
if self._tc_attached:
cmd(f"tc filter del dev {self.ifname} ingress pref {self._bpf_prog_pref}")
self._tc_attached = False
if self._nk_host_ifname:
cmd(f"ip link del dev {self._nk_host_ifname}")
self._nk_host_ifname = None
self._nk_guest_ifname = None
if self.netns:
del self.netns
self.netns = None
super().__del__()
def _setup_ns(self):
self.netns = NetNS()
ip(f"link set dev {self._nk_guest_ifname} netns {self.netns.name}")
ip(f"link set dev {self._nk_host_ifname} up")
ip(f"-6 addr add fe80::1/64 dev {self._nk_host_ifname} nodad")
ip(f"-6 route add {self.nk_guest_ipv6}/128 via fe80::2 dev {self._nk_host_ifname}")
ip("link set lo up", ns=self.netns)
ip(f"link set dev {self._nk_guest_ifname} up", ns=self.netns)
ip(f"-6 addr add fe80::2/64 dev {self._nk_guest_ifname}", ns=self.netns)
ip(f"-6 addr add {self.nk_guest_ipv6}/64 dev {self._nk_guest_ifname} nodad", ns=self.netns)
ip(f"-6 route add default via fe80::1 dev {self._nk_guest_ifname}", ns=self.netns)
def _attach_bpf(self):
bpf_obj = self.test_dir / "nk_forward.bpf.o"
if not bpf_obj.exists():
raise KsftSkipEx("BPF prog not found")
cmd(f"tc filter add dev {self.ifname} ingress bpf obj {bpf_obj} sec tc/ingress direct-action")
self._tc_attached = True
tc_info = cmd(f"tc filter show dev {self.ifname} ingress").stdout
match = re.search(r'pref (\d+).*nk_forward\.bpf.*id (\d+)', tc_info)
if not match:
raise Exception("Failed to get BPF prog ID")
self._bpf_prog_pref = int(match.group(1))
self._bpf_prog_id = int(match.group(2))
prog_info = bpftool(f"prog show id {self._bpf_prog_id}", json=True)
map_ids = prog_info.get("map_ids", [])
bss_map_id = None
for map_id in map_ids:
map_info = bpftool(f"map show id {map_id}", json=True)
if map_info.get("name").endswith("bss"):
bss_map_id = map_id
if bss_map_id is None:
raise Exception("Failed to find .bss map")
ipv6_addr = ipaddress.IPv6Address(self.ipv6_prefix)
ipv6_bytes = ipv6_addr.packed
ifindex_bytes = self.nk_host_ifindex.to_bytes(4, byteorder='little')
value = ipv6_bytes + ifindex_bytes
value_hex = ' '.join(f'{b:02x}' for b in value)
bpftool(f"map update id {bss_map_id} key hex 00 00 00 00 value hex {value_hex}")