diff --git a/clients/tests/test-client.py b/clients/tests/test-client.py index bafbc17bd6..03e19ac5fa 100755 --- a/clients/tests/test-client.py +++ b/clients/tests/test-client.py @@ -93,6 +93,7 @@ import shlex import re import dbus import time +import random import dbus.service import dbus.mainloop.glib @@ -171,20 +172,50 @@ class Util: return "'" + s.replace("'", "'\"'\"'") + "'" @staticmethod - def popen_wait(p, timeout = None): - # wait() has a timeout argument only since 3.3 + def popen_wait(p, timeout = 0): if Util.python_has_version(3, 3): - return p.wait(timeout) - if timeout is None: - return p.wait() + if timeout == 0: + return p.poll() + try: + return p.wait(timeout) + except subprocess.TimeoutExpired: + return None start = NM.utils_get_timestamp_msec() while True: if p.poll() is not None: return p.returncode - if start + (timeout * 1000) < NM.utils_get_timestamp_msec(): - raise Exception("timeout expired") + if timeout == 0 or start + (timeout * 1000) < NM.utils_get_timestamp_msec(): + return None time.sleep(0.05) + @staticmethod + def random_job(jobs): + jobs = list(jobs) + l = len(jobs) + t = l * (l + 1) / 2 + while True: + # we return a random jobs from the list, but the indexes at the front of + # the list are more likely. The idea is, that those jobs were started first, + # and are expected to complete first. As we poll, we want to check more frequently + # on the elements at the beginning of the list... + # + # Let's assign probabilities with an arithmetic series. + # That is, if there are 16 jobs, then the first gets weighted + # with 16, the second with 15, then 14, and so on, until the + # last has weight 1. That means, the first element is 16 times + # more probable than the last. + # Element at idx (starting with 0) is picked with probability + # 1 / (l*(l+1)/2) * (l - idx) + r = random.random() * t + idx = 0 + rx = 0 + while True: + rx += (l - idx) + if rx >= r or idx == l - 1: + yield jobs[idx] + break + idx += 1 + @staticmethod def iter_single(itr, min_num = 1, max_num = 1): itr = list(itr) @@ -336,7 +367,7 @@ class NMStubServer: if (NM.utils_get_timestamp_msec() - start) >= 4000: p.stdin.close() p.kill() - Util.popen_wait(p, 1000) + Util.popen_wait(p, 1) raise Exception("after starting stub service the D-Bus name was not claimed in time") self._nmobj = nmobj @@ -344,14 +375,17 @@ class NMStubServer: self._p = p def shutdown(self): + conn = self._conn + p = self._p self._nmobj = None self._nmiface = None self._conn = None - self._p.stdin.close() - self._p.kill() - Util.popen_wait(self._p, 1000) self._p = None - if self._conn_get_main_object(self._conn) is not None: + p.stdin.close() + p.kill() + if Util.popen_wait(p, 1) is None: + raise Exception("Stub service did not exit in time") + if self._conn_get_main_object(conn) is not None: raise Exception("Stub service is not still here although it should shut down") class _MethodProxy: @@ -409,51 +443,64 @@ class AsyncProcess(): def __init__(self, args, env, - complete_cb): - self._args = args + complete_cb, + max_waittime_msec = 2000): + self._args = list(args) self._env = env self._complete_cb = complete_cb + self._max_waittime_msec = max_waittime_msec def start(self): if not hasattr(self, '_p'): + self._p_start_timestamp = NM.utils_get_timestamp_msec() self._p = subprocess.Popen(self._args, stdout = subprocess.PIPE, stderr = subprocess.PIPE, env = self._env) - def wait(self): + def _timeout_remaining_time(self): + # note that we call this during poll() and wait_and_complete(). + # we don't know the exact time when the process terminated, + # so this is only approximately correct, if we call poll/wait + # frequently. + # Worst case, we will think that the process did not time out, + # when in fact it was running longer than max-waittime. + return self._max_waittime_msec - (NM.utils_get_timestamp_msec() - self._p_start_timestamp) + def poll(self, timeout = 0): self.start() - error = False - try: - Util.popen_wait(self._p, 2000) - except Exception as e: - error = True - raise e - finally: - (returncode, stdout, stderr) = (self._p.returncode, - self._p.stdout.read(), - self._p.stderr.read()) + return_code = Util.popen_wait(self._p, timeout) + if return_code is not None \ + and self._timeout_remaining_time() <= 0: + raise Exception("process is still running after timeout: %s" % (' '.join(self._args))) + return return_code - self._p.stdout.close() - self._p.stderr.close() - self._p = None + def wait_and_complete(self): + self.start() - if error: - print(stdout) - print(stderr) + p = self._p + self._p = None - try: - self._complete_cb(self, returncode, stdout, stderr) - except Exception as e: - raise e + return_code = Util.popen_wait(p, max(0, self._timeout_remaining_time()) / 1000) + (stdout, stderr) = (p.stdout.read(), p.stderr.read()) + p.stdout.close() + p.stderr.close() + + if return_code is None: + print(stdout) + print(stderr) + raise Exception("process did not complete in time: %s" % (' '.join(self._args))) + + self._complete_cb(self, return_code, stdout, stderr) ############################################################################### class NmTestBase(unittest.TestCase): pass +MAX_JOBS = 15 + class TestNmcli(NmTestBase): @staticmethod @@ -638,6 +685,9 @@ class TestNmcli(NmTestBase): if expected_stderr is _DEFAULT_ARG: expected_stderr = None + results_idx = len(self._results) + self._results.append(None) + def complete_cb(async_job, returncode, stdout, @@ -699,11 +749,11 @@ class TestNmcli(NmTestBase): content = ('size: %s\n' % (len(content))).encode('utf8') + \ content - self._results.append({ + self._results[results_idx] = { 'test_name' : test_name, 'ignore_l10n_diff' : ignore_l10n_diff, 'content' : content, - }) + } async_job = AsyncProcess(args = args, env = env, @@ -711,20 +761,46 @@ class TestNmcli(NmTestBase): self._async_jobs.append(async_job) - if sync_barrier: - self.async_wait() - else: - self.async_start() + self.async_start(wait_all = sync_barrier) - def async_start(self): - # limit number parallel running jobs - for async_job in self._async_jobs[0:15]: - async_job.start() + def async_start(self, wait_all = False): + + while True: + + while True: + for async_job in list(self._async_jobs[0:MAX_JOBS]): + async_job.start() + # start up to MAX_JOBS jobs, but poll() and complete those + # that are already exited. Retry, until there are no more + # jobs to start, or until MAX_JOBS are running. + jobs_running = [] + for async_job in list(self._async_jobs[0:MAX_JOBS]): + if async_job.poll() is not None: + self._async_jobs.remove(async_job) + async_job.wait_and_complete() + continue + jobs_running.append(async_job) + if len(jobs_running) >= len(self._async_jobs): + break + if len(jobs_running) >= MAX_JOBS: + break + + if not jobs_running: + return + if not wait_all: + return + + # in a loop, indefinitely poll the running jobs until we find one that + # completes. Note that poll() itself will raise an exception if a + # jobs times out. + for async_job in Util.random_job(jobs_running): + if async_job.poll(timeout = 0.03) is not None: + self._async_jobs.remove(async_job) + async_job.wait_and_complete() + break def async_wait(self): - while self._async_jobs: - self.async_start() - self._async_jobs.pop(0).wait() + return self.async_start(wait_all = True) def _nm_test_pre(self): self._calling_num = {}