test-client: drop TestNmClient base class from tests

With the unit test framework, we define special methods, like setUp()
and test_*(). This is documented, but not obvious.

Previously, TestNmClient was the base class for our tests classes, and
it provided some functionality (and state). It was utterly confusing how
pieces fit together.

Instead, move the state to a new class NMTestContext(). That contains
most of the code from TestNmClient. Drop TestNmClient and let the test
classes directly descend from unittest.TestCase.

The difference is, when you now look at a certain test (test_001()), you
can easier understand which code runs when. First, the test class has a
setUp() method which runs, but that method is now trivial without extra
context. Second, there is the @nm_test attribute that wraps the
function. But that's it. It's all at one place, and we delegate instead
of inherit.
This commit is contained in:
Thomas Haller 2023-05-11 11:26:56 +02:00
parent 50f97307c5
commit efc14fcbec
No known key found for this signature in database
GPG key ID: 29C2366E4DFC5728

View file

@ -1050,21 +1050,25 @@ class AsyncProcess:
###############################################################################
MAX_JOBS = 15
class NMTestContext:
MAX_JOBS = 15
class TestNmClient(unittest.TestCase):
def __init__(self, *args, **kwargs):
def __init__(self, testMethodName):
self.testMethodName = testMethodName
self._calling_num = {}
self._skip_test_for_l10n_diff = []
self._async_jobs = []
self._results = []
self.ctx_results = []
self.srv = None
unittest.TestCase.__init__(self, *args, **kwargs)
def calling_num(self, calling_fcn):
calling_num = self._calling_num.get(calling_fcn, 0) + 1
self._calling_num[calling_fcn] = calling_num
return calling_num
def srv_start(self):
self.srv_shutdown()
self.srv = NMStubServer(self._testMethodName)
self.srv = NMStubServer(self.testMethodName)
def srv_shutdown(self):
if self.srv is not None:
@ -1077,13 +1081,13 @@ class TestNmClient(unittest.TestCase):
while True:
while True:
for async_job in list(self._async_jobs[0:MAX_JOBS]):
for async_job in list(self._async_jobs[0 : self.MAX_JOBS]):
async_job.start()
# start up to MAX_JOBS jobs, but poll() and complete those
# that are already exited. Retry, until there are no more
# jobs to start, or until MAX_JOBS are running.
jobs_running = []
for async_job in list(self._async_jobs[0:MAX_JOBS]):
for async_job in list(self._async_jobs[0 : self.MAX_JOBS]):
if async_job.poll() is not None:
self._async_jobs.remove(async_job)
async_job.wait_and_complete()
@ -1091,7 +1095,7 @@ class TestNmClient(unittest.TestCase):
jobs_running.append(async_job)
if len(jobs_running) >= len(self._async_jobs):
break
if len(jobs_running) >= MAX_JOBS:
if len(jobs_running) >= self.MAX_JOBS:
break
if not jobs_running:
@ -1111,7 +1115,10 @@ class TestNmClient(unittest.TestCase):
def async_wait(self):
return self.async_start(wait_all=True)
def _nm_test_post(self):
def async_append_job(self, async_job):
self._async_jobs.append(async_job)
def run_post(self):
self.async_wait()
@ -1119,8 +1126,8 @@ class TestNmClient(unittest.TestCase):
self._calling_num = None
results = self._results
self._results = None
results = self.ctx_results
self.ctx_results = None
if len(results) == 0:
return
@ -1128,12 +1135,10 @@ class TestNmClient(unittest.TestCase):
skip_test_for_l10n_diff = self._skip_test_for_l10n_diff
self._skip_test_for_l10n_diff = None
test_name = self._testMethodName
filename = os.path.abspath(
PathConfiguration.srcdir()
+ "/test-client.check-on-disk/"
+ test_name
+ self.testMethodName
+ ".expected"
)
@ -1221,12 +1226,16 @@ class TestNmClient(unittest.TestCase):
% (",".join(skip_test_for_l10n_diff))
)
###############################################################################
class TestNmcli(unittest.TestCase):
def setUp(self):
Util.skip_without_dbus_session()
Util.skip_without_NM()
self.ctx = NMTestContext(self._testMethodName)
class TestNmcli(TestNmClient):
def call_nmcli_l(
self,
args,
@ -1328,11 +1337,10 @@ class TestNmcli(TestNmClient):
):
if sync_barrier:
self.async_wait()
self.ctx.async_wait()
calling_fcn = frame.f_code.co_name
calling_num = self._calling_num.get(calling_fcn, 0) + 1
self._calling_num[calling_fcn] = calling_num
calling_num = self.ctx.calling_num(calling_fcn)
test_name = "%s-%03d" % (calling_fcn, calling_num)
@ -1401,8 +1409,8 @@ class TestNmcli(TestNmClient):
if expected_stderr is _DEFAULT_ARG:
expected_stderr = None
results_idx = len(self._results)
self._results.append(None)
results_idx = len(self.ctx.ctx_results)
self.ctx.ctx_results.append(None)
def complete_cb(async_job, returncode, stdout, stderr):
@ -1477,7 +1485,7 @@ class TestNmcli(TestNmClient):
)
content = ("size: %s\n" % (len(content))).encode("utf8") + content
self._results[results_idx] = {
self.ctx.ctx_results[results_idx] = {
"test_name": test_name,
"ignore_l10n_diff": ignore_l10n_diff,
"content": content,
@ -1486,38 +1494,38 @@ class TestNmcli(TestNmClient):
env = Util.cmd_create_env(lang, calling_num, fatal_warnings, extra_env)
async_job = AsyncProcess(args=args, env=env, complete_cb=complete_cb)
self._async_jobs.append(async_job)
self.ctx.async_append_job(async_job)
self.async_start(wait_all=sync_barrier)
self.ctx.async_start(wait_all=sync_barrier)
def nm_test(func):
def f(self):
self.srv_start()
self.ctx.srv_start()
func(self)
self._nm_test_post()
self.ctx.run_post()
return f
def nm_test_no_dbus(func):
def f(self):
func(self)
self._nm_test_post()
self.ctx.run_post()
return f
def init_001(self):
self.srv.op_AddObj("WiredDevice", iface="eth0")
self.srv.op_AddObj("WiredDevice", iface="eth1")
self.srv.op_AddObj("WifiDevice", iface="wlan0")
self.srv.op_AddObj("WifiDevice", iface="wlan1")
self.ctx.srv.op_AddObj("WiredDevice", iface="eth0")
self.ctx.srv.op_AddObj("WiredDevice", iface="eth1")
self.ctx.srv.op_AddObj("WifiDevice", iface="wlan0")
self.ctx.srv.op_AddObj("WifiDevice", iface="wlan1")
# add another device with an identical ifname. The D-Bus API itself
# does not enforce the ifnames are unique.
self.srv.op_AddObj("WifiDevice", ident="wlan1/x", iface="wlan1")
self.ctx.srv.op_AddObj("WifiDevice", ident="wlan1/x", iface="wlan1")
self.srv.op_AddObj("WifiAp", device="wlan0", rsnf=0x0)
self.ctx.srv.op_AddObj("WifiAp", device="wlan0", rsnf=0x0)
self.srv.op_AddObj("WifiAp", device="wlan0")
self.ctx.srv.op_AddObj("WifiAp", device="wlan0")
NM_AP_FLAGS = getattr(NM, "80211ApSecurityFlags")
rsnf = 0x0
@ -1526,11 +1534,11 @@ class TestNmcli(TestNmClient):
rsnf = rsnf | NM_AP_FLAGS.GROUP_TKIP
rsnf = rsnf | NM_AP_FLAGS.GROUP_CCMP
rsnf = rsnf | NM_AP_FLAGS.KEY_MGMT_SAE
self.srv.op_AddObj("WifiAp", device="wlan0", wpaf=0x0, rsnf=rsnf)
self.ctx.srv.op_AddObj("WifiAp", device="wlan0", wpaf=0x0, rsnf=rsnf)
self.srv.op_AddObj("WifiAp", device="wlan1")
self.ctx.srv.op_AddObj("WifiAp", device="wlan1")
self.srv.addConnection(
self.ctx.srv.addConnection(
{"connection": {"type": "802-3-ethernet", "id": "con-1"}}
)
@ -1590,7 +1598,7 @@ class TestNmcli(TestNmClient):
replace_uuids = []
replace_uuids.append(
self.srv.ReplaceTextConUuid(
self.ctx.srv.ReplaceTextConUuid(
"con-xx1", "UUID-con-xx1-REPLACED-REPLACED-REPLA"
)
)
@ -1605,7 +1613,7 @@ class TestNmcli(TestNmClient):
for con_name, apn in con_gsm_list:
replace_uuids.append(
self.srv.ReplaceTextConUuid(
self.ctx.srv.ReplaceTextConUuid(
con_name, "UUID-" + con_name + "-REPLACED-REPLACED-REPL"
)
)
@ -1637,7 +1645,7 @@ class TestNmcli(TestNmClient):
)
replace_uuids.append(
self.srv.ReplaceTextConUuid(
self.ctx.srv.ReplaceTextConUuid(
"ethernet", "UUID-ethernet-REPLACED-REPLACED-REPL"
)
)
@ -1722,9 +1730,9 @@ class TestNmcli(TestNmClient):
["-f", "ALL", "-t", "dev", "show", "eth0"], replace_stdout=replace_uuids
)
self.async_wait()
self.ctx.async_wait()
self.srv.setProperty(
self.ctx.srv.setProperty(
"/org/freedesktop/NetworkManager/ActiveConnection/1",
"State",
dbus.UInt32(NM.ActiveConnectionState.DEACTIVATING),
@ -1734,8 +1742,8 @@ class TestNmcli(TestNmClient):
for i in [0, 1]:
if i == 1:
self.async_wait()
self.srv.op_ConnectionSetVisible(False, con_id="ethernet")
self.ctx.async_wait()
self.ctx.srv.op_ConnectionSetVisible(False, con_id="ethernet")
for mode in Util.iter_nmcli_output_modes():
self.call_nmcli_l(
@ -1768,7 +1776,7 @@ class TestNmcli(TestNmClient):
replace_uuids = []
replace_uuids.append(
self.srv.ReplaceTextConUuid(
self.ctx.srv.ReplaceTextConUuid(
"con-xx1", "UUID-con-xx1-REPLACED-REPLACED-REPLA"
)
)
@ -1813,10 +1821,10 @@ class TestNmcli(TestNmClient):
)
self.call_nmcli_l(["con", "s", "con-xx1"], replace_stdout=replace_uuids)
self.async_wait()
self.ctx.async_wait()
replace_uuids.append(
self.srv.ReplaceTextConUuid(
self.ctx.srv.ReplaceTextConUuid(
"con-vpn-1", "UUID-con-vpn-1-REPLACED-REPLACED-REP"
)
)
@ -1849,16 +1857,16 @@ class TestNmcli(TestNmClient):
self.call_nmcli_l(["con", "s"], replace_stdout=replace_uuids)
self.call_nmcli_l(["con", "s", "con-vpn-1"], replace_stdout=replace_uuids)
self.async_wait()
self.ctx.async_wait()
self.srv.setProperty(
self.ctx.srv.setProperty(
"/org/freedesktop/NetworkManager/ActiveConnection/2",
"VpnState",
dbus.UInt32(NM.VpnConnectionState.ACTIVATED),
)
uuids = Util.replace_text_sort_list(
[c[1] for c in self.srv.findConnections()], replace_uuids
[c[1] for c in self.ctx.srv.findConnections()], replace_uuids
)
self.call_nmcli_l([], replace_stdout=replace_uuids)
@ -2165,10 +2173,10 @@ class TestNmcli(TestNmClient):
nmc = start_mon(self)
self.srv.op_AddObj("WiredDevice", iface="eth0")
self.ctx.srv.op_AddObj("WiredDevice", iface="eth0")
nmc.pexp.expect("eth0: device created\r\n")
self.srv.addConnection(
self.ctx.srv.addConnection(
{"connection": {"type": "802-3-ethernet", "id": "con-1"}}
)
nmc.pexp.expect("con-1: connection profile created\r\n")
@ -2176,7 +2184,7 @@ class TestNmcli(TestNmClient):
end_mon(self, nmc)
nmc = start_mon(self)
self.srv_shutdown()
self.ctx.srv_shutdown()
Util.pexpect_expect_all(
nmc.pexp,
"con-1: connection profile removed",
@ -2189,7 +2197,11 @@ class TestNmcli(TestNmClient):
###############################################################################
class TestNmCloudSetup(TestNmClient):
class TestNmCloudSetup(unittest.TestCase):
def setUp(self):
Util.skip_without_dbus_session()
Util.skip_without_NM()
self.ctx = NMTestContext(self._testMethodName)
_mac1 = "9e:c0:3e:92:24:2d"
_mac2 = "53:e9:7e:52:8d:a8"
@ -2240,12 +2252,12 @@ class TestNmCloudSetup(TestNmClient):
error = None
self.srv_start()
self.ctx.srv_start()
try:
func(self)
except Exception as e:
error = e
self._nm_test_post()
self.ctx.run_post()
self.md_conn.close()
p.stdin.close()
@ -2259,8 +2271,8 @@ class TestNmCloudSetup(TestNmClient):
def _mock_devices(self):
# Add a device with an active connection that has IPv4 configured
self.srv.op_AddObj("WiredDevice", iface="eth0", mac="9e:c0:3e:92:24:2d")
self.srv.addAndActivateConnection(
self.ctx.srv.op_AddObj("WiredDevice", iface="eth0", mac="9e:c0:3e:92:24:2d")
self.ctx.srv.addAndActivateConnection(
{
"connection": {"type": "802-3-ethernet", "id": "con-eth0"},
"ipv4": {"method": "auto"},
@ -2270,8 +2282,8 @@ class TestNmCloudSetup(TestNmClient):
)
# The second connection has no IPv4
self.srv.op_AddObj("WiredDevice", iface="eth1", mac="53:e9:7e:52:8d:a8")
self.srv.addAndActivateConnection(
self.ctx.srv.op_AddObj("WiredDevice", iface="eth1", mac="53:e9:7e:52:8d:a8")
self.ctx.srv.addAndActivateConnection(
{"connection": {"type": "802-3-ethernet", "id": "con-eth1"}},
"/org/freedesktop/NetworkManager/Devices/2",
"",