diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 7cb3a8937..7c96fb5e0 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -21,7 +21,7 @@ variables: REPO_URL_XORGPROTO: 'https://gitlab.freedesktop.org/xorg/proto/xorgproto.git' XORG_DEBIAN_VERSION: 'bookworm-slim' XORG_DEBIAN_EXEC: 'env FDO_CI_CONCURRENT=${FDO_CI_CONCURRENT} bash .gitlab-ci/debian-install.sh' - XORG_DEBIAN_TAG: '2026-03-16-image-update' + XORG_DEBIAN_TAG: '2026-04-28-pytest' XORG_FREEBSD_VERSION: '14.2' XORG_FREEBSD_EXEC: '' XORG_FREEBSD_TAG: '2025-02-18-vm-image' @@ -210,6 +210,21 @@ xwayland-nolibdecor: BUILD_XVFB: false MESON_EXTRA_ARGS: -Dlibdecor=false ${MESON_DDX_BUILD_ARGS} +meson-asan: + extends: .common-build-and-test + script: + - .gitlab-ci/meson-build.sh --run-test + variables: + MESON_EXTRA_ARGS: > + -Db_sanitize=address + -Db_lundef=false + -Dxwayland=false + -Dxorg=false + -Dxephyr=false + -Dxnest=false + -Dxvfb=true + MESON_TEST_ARGS: --suite pyxtest + mingw-cross-build: extends: .common-build-and-test script: @@ -341,6 +356,18 @@ check-whitespace: script: - .gitlab-ci/whitespace-check.py $(git ls-files hw/xwayland) +ruff-format-pyxtest: + extends: .fdo.ci-fairy + stage: build-and-test + script: + - uvx ruff format --check --diff test/pyxtest + +ruff-check-pyxtest: + extends: .fdo.ci-fairy + stage: build-and-test + script: + - uvx ruff check test/pyxtest + # # Workflow rules needed due to: # https://gitlab.freedesktop.org/freedesktop/freedesktop/-/issues/438 diff --git a/.gitlab-ci/debian-install.sh b/.gitlab-ci/debian-install.sh index a2498f943..757c91f67 100644 --- a/.gitlab-ci/debian-install.sh +++ b/.gitlab-ci/debian-install.sh @@ -112,6 +112,7 @@ apt-get install -y \ pkg-config \ python3-attr \ python3-jinja2 \ + python3-pytest \ python3-mako \ python3-numpy \ python3-six \ diff --git a/test/meson.build b/test/meson.build index 199431225..4fea75945 100644 --- a/test/meson.build +++ b/test/meson.build @@ -166,6 +166,7 @@ subdir('bigreq') subdir('damage') subdir('sync') subdir('bugs') +subdir('pyxtest') if build_xorg # Tests that require at least some DDX functions in order to fully link diff --git a/test/pyxtest/README.md b/test/pyxtest/README.md new file mode 100644 index 000000000..66f5a3243 --- /dev/null +++ b/test/pyxtest/README.md @@ -0,0 +1,180 @@ +# pyxtest - pytest-based X server test suite + +This is a pytest-based test suite that launches X servers and sends crafted +protocol requests to verify that security vulnerabilities and other bugs +are properly handled. + +It can be run against Xvfb, Xwayland, or Xorg but the latter is potentially +flaky and requires some setup outside the test suite. The test suite +uses both AddressSanitizer (ASAN) and valgrind for detecting +memory errors such as out-of-bounds reads/writes and use-after-free. + + +## Running tests + +### Via meson +The test suite (via Xvfb) is integrated into the meson tests and can be +run with normal meson commands. + +```sh +# run the python test suite +meson test --suite pyxtest +# run a set of tests +meson test pyxtest-test_randr.py +``` +Consult the meson documentation for further details. + +### Directly with pytest + +For running against a custom path, point the test suite at the server binary to +test using environment variables or CLI options: + +```sh +# Using environment variable +XVFB_PATH=build/hw/vfb/Xvfb pytest test/pyxtest/ -v + +# Using --server-path +pytest test/pyxtest/ -v --server-path=build/hw/vfb/Xvfb + +# Using the system Xvfb (fallback if no path is set) +pytest test/pyxtest/ -v +``` + +The normal pytest options work as expected (`-k` for test selection, etc.) + +### Running with AddressSanitizer (ASAN) + +ASAN is a compile-time instrumentation that detects memory errors such as +heap buffer overflows and use-after-free. To use ASAN, build the server with +sanitizer support: + +```sh +meson setup build-asan -Db_sanitize=address -Db_lundef=false +meson compile -C build-asan +``` + +Then run the tests against the ASAN-built binary: + +```sh +XSERVER_ASAN=1 XVFB_PATH=build-asan/hw/vfb/Xvfb pytest test/pyxtest/ -v +``` + +When using meson test, `XSERVER_ASAN` is set automatically if the build +was configured with `-Db_sanitize=address`. + +Tests marked with `@pytest.mark.asan` are skipped unless `XSERVER_ASAN=1` +is set. When ASAN detects an error, the server process is killed and the +ASAN error report is included in the test failure message. + +**Note:** ASAN and valgrind are mutually incompatible. When `XSERVER_ASAN=1` +is set, valgrind wrapping is automatically disabled even if `--valgrind` is +passed. + +### Running with valgrind + +The `--valgrind` flag runs **all** servers under valgrind: + +```sh +pytest test/pyxtest/ -v --valgrind +``` + +Tests marked with `@pytest.mark.valgrind` automatically run their server +under valgrind even without the `--valgrind` flag. This is useful for +bugs that are only detectable via valgrind (e.g. use of uninitialised +values). + +### Testing multiple server types + +By default only `Xvfb` is tested. Use `--server-type` to test additional +servers. Tests using the `xserver` fixture are automatically run once per +server type: + +```sh +pytest test/pyxtest/ -v --server-type=xvfb --server-type=xwayland +``` + +## CLI options + +| Option | Description | +|--------------------------------|-------------------------------------------| +| `--valgrind` | Run all X servers under valgrind memcheck | +| `--valgrind-suppressions=PATH` | Path to a valgrind suppressions file | +| `--server-type=TYPE` | Server type to test (`xvfb`, `xwayland`, `xorg`). Repeatable. Default: `xvfb` | +| `--server-path=PATH` | Explicit path to the X server binary | + +## Environment variables + +The server binary is located by checking, in order: + +1. `--server-path` CLI option +2. `XVFB_PATH` / `XWAYLAND_PATH` / `XORG_PATH` environment variable +3. `XSERVER_BUILDDIR` environment variable (looks for `hw/vfb/Xvfb` etc.) +4. `build/` directory relative to the source root +5. System `PATH` (prints a warning) + +`VALGRIND_SUPPRESSIONS` can point to a suppressions file. + +`XSERVER_ASAN` set to `1` indicates the server binary was built with +AddressSanitizer. This is set automatically by meson when +`-Db_sanitize=address` is used. It can also be set manually. + +## Test markers + +| Marker | Effect | +|-------------------------------|-------------------------------------------------------------| +| `@pytest.mark.asan` | Test requires ASAN (`XSERVER_ASAN=1` must be set | +| `@pytest.mark.valgrind` | Test requires valgrind (skipped if `XSERVER_ASAN` is set) | +| `@pytest.mark.xwayland_only` | Test is skipped unless `--server-type=xwayland` | +| `@pytest.mark.xorg_only` | Test is skipped unless `--server-type=xorg` | +| `@pytest.mark.swapped_client` | Test uses a byte-swapped (big-endian) client connection | + + +## Writing a new test + +1. Create or edit a `test_*.py` file. + +2. Use the `xserver` and `xclient` or `xclient_swapped` fixtures to get a + running server and connection: + + ```python + def test_something(self, xserver, xclient): + # xclient is a RawX11Connection to xserver + ... + assert xserver.is_alive, "Server crashed" + ``` + +3. If the test needs an extension, create a fixture that handles + negotiation: + + ```python + @pytest.fixture + def render_client(xclient): + ext = xclient.query_extension(Extension.RENDER) + if not ext: + pytest.skip("RENDER not available") + # ... send version negotiation ... + return xclient + ``` + +4. Build protocol requests using dataclasses from `proto/`: + + ```python + from proto import xi + + req = xi.XIChangeHierarchyRequest( + opcode=opcode, + num_changes=1, + changes_data=change_data, + ) + xclient.send_request(req.to_bytes()) + ``` + +5. If a new extension module is needed, create `proto/myext.py` with + constants and `@dataclass` request builders following the existing + pattern. + +6. If the bug is only detectable via a memory sanitizer (OOB reads, + use-after-free), mark the test with `@pytest.mark.asan`. Use + `@pytest.mark.valgrind` only for bugs that specifically require + valgrind (e.g. use of uninitialised values that ASAN does not + detect). diff --git a/test/pyxtest/asan.py b/test/pyxtest/asan.py new file mode 100644 index 000000000..f241df3a2 --- /dev/null +++ b/test/pyxtest/asan.py @@ -0,0 +1,117 @@ +# SPDX-License-Identifier: MIT +# +# AddressSanitizer (ASAN) output parser for extracting memory errors. + +from __future__ import annotations + +import re +from dataclasses import dataclass +from pathlib import Path + + +@dataclass +class AsanError: + """Represents a single ASAN error extracted from log/stderr output.""" + + kind: str # e.g. "heap-buffer-overflow", "heap-use-after-free" + description: str # The full ERROR line + stack_frames: list[tuple[str, str | None, str | None]] # (func, file, line) + + def __str__(self): + lines = [f"{self.kind}: {self.description}"] + for func, srcfile, line in self.stack_frames[:8]: + loc = f" ({srcfile}:{line})" if srcfile else "" + lines.append(f" at {func}{loc}") + return "\n".join(lines) + + @classmethod + def from_log(cls, log_path: Path) -> list[AsanError]: + """Parse ASAN output from a log file. + + ASAN may append a PID suffix to the log_path, so we glob for + matching files (e.g. log_path.1234). + """ + errors: list[AsanError] = [] + + # ASAN appends . to the log_path + candidates = list(log_path.parent.glob(f"{log_path.name}.*")) + if log_path.is_file(): + candidates.append(log_path) + + for path in candidates: + try: + text = path.read_text(errors="replace") + except OSError: + continue + errors.extend(cls.from_text(text)) + + return errors + + @classmethod + def from_text(cls, text: str) -> list[AsanError]: + """Parse ASAN errors from text output (stderr or log file). + + Recognises the standard ASAN report format:: + + ==PID==ERROR: AddressSanitizer: heap-buffer-overflow on ... + READ of size 4 at 0x... thread T0 + #0 0xaddr in func file.c:123 + #1 0xaddr in func2 file2.c:456 + + SUMMARY: AddressSanitizer: heap-buffer-overflow ... + """ + errors: list[AsanError] = [] + + # Pattern for the ERROR line + error_re = re.compile(r"==\d+==ERROR: AddressSanitizer: ([\w-]+)(.*)") + # Pattern for stack frames: + # #0 0x... in function_name file.c:123:45 + # #0 0x... in function_name (module+0xoffset) + # #0 0x... (module+0xoffset) + frame_re = re.compile( + r"\s+#\d+\s+\S+\s+in\s+(\S+)\s+(\S+?)(?::(\d+))?(?::\d+)?\s*$" + ) + # Frame without source info (just address in module) + frame_nosrc_re = re.compile(r"\s+#\d+\s+\S+\s+in\s+(\S+)") + + lines = text.splitlines() + i = 0 + while i < len(lines): + m = error_re.search(lines[i]) + if not m: + i += 1 + continue + + kind = m.group(1) + description = m.group(2).strip() + + # Collect stack frames following the ERROR line + frames: list[tuple[str, str | None, str | None]] = [] + i += 1 + while i < len(lines): + line = lines[i] + # Stop at blank lines, SUMMARY lines, or new ERROR lines + if not line.strip() or line.strip().startswith("SUMMARY:"): + break + if error_re.search(line): + break + + fm = frame_re.match(line) + if fm: + func = fm.group(1) + srcfile = fm.group(2) + lineno = fm.group(3) + # Filter out non-file entries like (module+0xoffset) + if srcfile and srcfile.startswith("("): + srcfile = None + lineno = None + frames.append((func, srcfile, lineno)) + else: + fm2 = frame_nosrc_re.match(line) + if fm2: + frames.append((fm2.group(1), None, None)) + i += 1 + + errors.append(cls(kind=kind, description=description, stack_frames=frames)) + + return errors diff --git a/test/pyxtest/conftest.py b/test/pyxtest/conftest.py new file mode 100644 index 000000000..ab2e1375a --- /dev/null +++ b/test/pyxtest/conftest.py @@ -0,0 +1,280 @@ +# SPDX-License-Identifier: MIT +# +# pytest configuration and fixtures for X server testing. + +import os +import shutil +import pytest +from pathlib import Path + +from xserver import XServerProcess +from xclient import RawX11Connection, X11ConnectionError, XlibConnection + + +def pytest_addoption(parser): + parser.addoption( + "--valgrind", + action="store_true", + default=False, + help="Run X server under valgrind memcheck", + ) + parser.addoption( + "--valgrind-suppressions", + default=None, + help="Path to valgrind suppressions file", + ) + parser.addoption( + "--server-type", + action="append", + default=[], + help="Server types to test (xvfb, xwayland, xorg). " + "Can be specified multiple times. Default: xvfb", + ) + parser.addoption( + "--server-path", default=None, help="Explicit path to the X server binary" + ) + + +def pytest_configure(config): + config.addinivalue_line( + "markers", "valgrind: mark test as requiring valgrind to detect the issue" + ) + config.addinivalue_line( + "markers", + "asan: mark test as requiring AddressSanitizer to detect the issue", + ) + config.addinivalue_line( + "markers", "xwayland_only: mark test as only applicable to Xwayland" + ) + config.addinivalue_line( + "markers", "xorg_only: mark test as only applicable to Xorg" + ) + config.addinivalue_line( + "markers", "swapped_client: mark test as requiring a byte-swapped client" + ) + + +def get_server_types(config) -> list[str]: + """Get the list of server types to test, default to xvfb.""" + return config.getoption("--server-type") or ["xvfb"] + + +def get_valgrind_suppressions(config) -> Path | None: + """Find the valgrind suppressions file.""" + explicit = config.getoption("--valgrind-suppressions") + if explicit: + return Path(explicit) + + env = os.environ.get("VALGRIND_SUPPRESSIONS") + if env: + f = Path(env) + if f.is_file(): + return f + + # Try next to this file (test/pyxtest/valgrind.suppressions) + default = Path(__file__).resolve().parent / "valgrind.suppressions" + if default.is_file(): + return default + + return None + + +def is_valgrind_available() -> bool: + """Check if valgrind is available on the system.""" + return shutil.which("valgrind") is not None + + +def is_asan_build() -> bool: + """Check if the X server binary was built with AddressSanitizer. + + This is determined by the ``XSERVER_ASAN`` environment variable + which is set by meson when ``-Db_sanitize=address`` is used. + It can also be set manually when running pytest directly. + """ + return os.environ.get("XSERVER_ASAN") == "1" + + +def pytest_collection_modifyitems(config, items): + """Skip tests based on markers and configuration.""" + server_types = get_server_types(config) + asan = is_asan_build() + + for item in items: + if item.get_closest_marker("xwayland_only") and "xwayland" not in server_types: + item.add_marker(pytest.mark.skip(reason="Test only applies to Xwayland")) + + if item.get_closest_marker("xorg_only") and "xorg" not in server_types: + item.add_marker(pytest.mark.skip(reason="Test only applies to Xorg")) + + if item.get_closest_marker("asan") and not asan: + item.add_marker( + pytest.mark.skip( + reason="Test requires ASAN build (XSERVER_ASAN=1 not set)" + ) + ) + + if item.get_closest_marker("valgrind") and asan: + item.add_marker( + pytest.mark.skip( + reason="Test requires valgrind, incompatible with ASAN build" + ) + ) + + +def pytest_generate_tests(metafunc): + """Parametrize tests that use the xserver fixture over all configured + server types so each test runs once per type.""" + if "xserver" in metafunc.fixturenames: + server_types = get_server_types(metafunc.config) + metafunc.parametrize("xserver", server_types, indirect=True) + + +def _start_server(request, server_type, log_file=None): + """Start an X server of the given type for a test. + + Shared implementation for the xvfb, xwayland, xorg, and generic + xserver fixtures. Valgrind is enabled if the test is marked with + ``@pytest.mark.valgrind`` or if ``--valgrind`` is passed on the + command line. ASAN is enabled automatically when ``XSERVER_ASAN=1`` + is set in the environment (typically by meson when the server is + built with ``-Db_sanitize=address``). + """ + use_valgrind = ( + request.config.getoption("--valgrind") + or request.node.get_closest_marker("valgrind") is not None + ) + if use_valgrind and not is_valgrind_available(): + pytest.skip("valgrind not available") + use_asan = is_asan_build() + server_path = request.config.getoption("--server-path") + suppressions = get_valgrind_suppressions(request.config) + + server = XServerProcess( + server_type=server_type, + valgrind=use_valgrind, + valgrind_suppressions=suppressions, + asan=use_asan, + server_path=server_path, + log_file=log_file, + ) + + try: + server.start(timeout=60 if use_valgrind else 15) + except (FileNotFoundError, RuntimeError) as e: + msg = f"Failed to start {server_type} server: {e}" + if server.log_file: + msg += f"\nLog file: {server.log_file}" + pytest.fail(msg) + + yield server + + # Check if the server was killed by ASAN before we stop it + asan_errors = [] + if use_asan and not server.is_alive: + asan_errors = server.get_asan_errors() + + valgrind_errors = server.stop() + + if use_asan and asan_errors: + msg = f"AddressSanitizer found {len(asan_errors)} error(s):\n\n" + msg += "\n\n".join(str(e) for e in asan_errors) + pytest.fail(msg) + + if use_valgrind and valgrind_errors: + serious = [ + e + for e in valgrind_errors + if e.kind + not in ( + "Leak_DefinitelyLost", + "Leak_PossiblyLost", + "Leak_StillReachable", + "Leak_IndirectlyLost", + "SyscallParam", + ) + ] + if serious: + msg = f"Valgrind found {len(serious)} memory error(s):\n\n" + msg += "\n\n".join(str(e) for e in serious) + pytest.fail(msg) + + +@pytest.fixture +def xserver(request, tmp_path): + """ + Start an X server for this test. + + Automatically parametrized via ``pytest_generate_tests`` so every + test that uses this fixture runs once per configured --server-type. + A fresh server per test, killed afterward. With --valgrind, + valgrind memory errors cause test failure during teardown. + + For a fixture that targets a specific server type use the xvfb, + xwayland, or xorg fixtures instead. + """ + server_type = request.param + + # Skip server-specific markers + if request.node.get_closest_marker("xwayland_only") and server_type != "xwayland": + pytest.skip("Test only applies to Xwayland") + if request.node.get_closest_marker("xorg_only") and server_type != "xorg": + pytest.skip("Test only applies to Xorg") + + kwargs = {} + if server_type == "xorg": + kwargs["log_file"] = tmp_path / f"{server_type}.log" + + yield from _start_server(request, server_type, **kwargs) + + +@pytest.fixture +def xvfb(request, tmp_path): + """Start an Xvfb server for this test.""" + if "xvfb" not in get_server_types(request.config): + pytest.skip("Xvfb not in --server-type list") + yield from _start_server(request, "xvfb") + + +@pytest.fixture +def xwayland(request, tmp_path): + """Start an Xwayland server for this test.""" + if "xwayland" not in get_server_types(request.config): + pytest.skip("Xwayland not in --server-type list") + yield from _start_server(request, "xwayland") + + +@pytest.fixture +def xorg(request, tmp_path): + """Start an Xorg server for this test.""" + if "xorg" not in get_server_types(request.config): + pytest.skip("Xorg not in --server-type list") + yield from _start_server(request, "xorg", log_file=tmp_path / "xorg.log") + + +@pytest.fixture +def xclient(xserver): + """Create a raw X11 connection to the test server.""" + conn = RawX11Connection(xserver.display_num) + yield conn + conn.close() + + +@pytest.fixture +def xclient_swapped(xserver): + """Create a big-endian (byte-swapped) X11 connection.""" + try: + conn = RawX11Connection(xserver.display_num, swapped=True) + except X11ConnectionError as e: + if "endian" in str(e).lower(): + pytest.skip("Server does not accept big-endian clients") + raise + yield conn + conn.close() + + +@pytest.fixture +def xlib_client(xserver): + """Create a python-xlib connection for higher-level X operations.""" + conn = XlibConnection(xserver.display_num) + yield conn + conn.close() diff --git a/test/pyxtest/ensure-meson-tests.sh b/test/pyxtest/ensure-meson-tests.sh new file mode 100755 index 000000000..0ce6b1c79 --- /dev/null +++ b/test/pyxtest/ensure-meson-tests.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# +# Helper script used by meson to ensure each test_foo.py file has +# a corresponding entry in the tests_pyxtest array + +SOURCEDIR="${1:-@SOURCEDIR@}" +shift + +TESTS="${*:-@TESTS@}" +FILES=$(find "$SOURCEDIR" -name "test_*.py" -printf "%f\n") + +DIFF=$(diff -u <(echo "$TESTS" | tr " " "\n" | sort) <(echo "$FILES" | sort)) +if [[ $? -ne 0 ]]; then + echo "ERROR: Missing test file in meson tests list:" >&2 + echo "$DIFF" >&2 + exit 1 +fi diff --git a/test/pyxtest/meson.build b/test/pyxtest/meson.build new file mode 100644 index 000000000..d7f7bfc3c --- /dev/null +++ b/test/pyxtest/meson.build @@ -0,0 +1,85 @@ +# pytest-based test suite for the X server +# +# Uses pytest to launch X servers which allows us to e.g. send crafted protocol +# requests that exercise specific security fixes. Supports Xvfb, Xwayland, +# and Xorg, with optional valgrind and AddressSanitizer (ASAN) integration +# for detecting use-after-free and out-of-bounds memory access. +# +# Run with: meson test --suite pyxtest +# Or directly: pytest test/pyxtest/ -v + +pymod = import('python') +pytest = find_program('pytest', 'pytest-3', required: false) + +if pytest.found() + pyxtest_env = environment() + pyxtest_env.set('XSERVER_BUILDDIR', meson.project_build_root()) + pyxtest_env.set('XSERVER_DIR', meson.project_source_root()) + pyxtest_env.set('PYTHONDONTWRITEBYTECODE', '1') + + if build_xvfb + pyxtest_env.set('XVFB_PATH', xvfb_server.full_path()) + endif + + if build_xwayland + pyxtest_env.set('XWAYLAND_PATH', xwayland_server.full_path()) + endif + + # Tell the test suite if the server was built with AddressSanitizer + if 'address' in get_option('b_sanitize') + pyxtest_env.set('XSERVER_ASAN', '1') + endif + + # We are *not* setting XORG_PATH in meson because we don't want + # to start lots of Xorg instances as part of a meson test run. + # + # if build_xorg + # pyxtest_env.set('XORG_PATH', xorg_server.full_path()) + # endif + + pytest_args = [ + '-v', + '--tb=short', + ] + + # pytest-xdist speeds up the test suite by running tests in parallel, + # but it's not required + if pymod.find_installation('python3', modules: ['xdist'], required: false).found() + pytest_args += ['-n', 'auto'] + endif + + # pytest-timeout means we can fail in pytest before hitting the meson limits + if pymod.find_installation('python3', modules: ['pytest_timeout'], required: false).found() + pytest_args += ['--timeout=120'] + endif + + # This needs to be kept in sync with the test_foo.py files in the tree + tests_pyxtest = [ + ] + + test_list_data = configuration_data() + test_list_data.set('TESTS', '\n'.join(tests_pyxtest)) + test_list_data.set('SOURCEDIR', meson.current_source_dir()) + test_list_check = configure_file( + input: 'ensure-meson-tests.sh', + output: 'ensure-meson-tests.sh', + configuration: test_list_data, + ) + test('ensure-meson-tests', + test_list_check, + suite:'pyxtest' + ) + + # As part of the normal meson test run we only run the Xvfb tests + if build_xvfb + foreach t: tests_pyxtest + test(f'pyxtest-@t@', + pytest, + args: pytest_args + [files(t)], + env: pyxtest_env, + timeout: 600, + suite: 'pyxtest', + ) + endforeach + endif +endif diff --git a/test/pyxtest/proto/__init__.py b/test/pyxtest/proto/__init__.py new file mode 100644 index 000000000..13b9edaa5 --- /dev/null +++ b/test/pyxtest/proto/__init__.py @@ -0,0 +1,2 @@ +# SPDX-License-Identifier: MIT +# Protocol request builders for X server testing. diff --git a/test/pyxtest/proto/bigrequests.py b/test/pyxtest/proto/bigrequests.py new file mode 100644 index 000000000..c14c5bbad --- /dev/null +++ b/test/pyxtest/proto/bigrequests.py @@ -0,0 +1,24 @@ +# SPDX-License-Identifier: MIT +# +# BIG-REQUESTS extension protocol request builders + +import struct +from dataclasses import dataclass + +# BIG-REQUESTS minor opcodes +BigRequestsEnable = 0 + + +@dataclass +class BigRequestsEnableRequest: + """BIG-REQUESTS Enable request.""" + + opcode: int + + def to_bytes(self, byte_order: str = "<") -> bytes: + return struct.pack( + f"{byte_order}BBH", + self.opcode, + BigRequestsEnable, # sub-opcode + 1, # request length + ) diff --git a/test/pyxtest/proto/x11.py b/test/pyxtest/proto/x11.py new file mode 100644 index 000000000..5358bf942 --- /dev/null +++ b/test/pyxtest/proto/x11.py @@ -0,0 +1,118 @@ +# SPDX-License-Identifier: MIT +# +# Core X11 protocol request builders + +import struct +from dataclasses import dataclass + +# Core protocol opcodes +CreateWindow = 1 +CreatePixmap = 53 +InternAtom = 16 +QueryExtension = 98 + + +def _pad(data: bytes) -> bytes: + """Pad data to a 4-byte boundary.""" + return data + b"\x00" * ((4 - len(data) % 4) % 4) + + +@dataclass +class CreateWindowRequest: + """X11 CreateWindow request.""" + + wid: int + parent: int + x: int + y: int + width: int + height: int + depth: int + border_width: int = 0 + window_class: int = 1 # InputOutput + visual: int = 0 + value_mask: int = 0x0800 # override-redirect + override_redirect: int = 0 + + def to_bytes(self, byte_order: str = "<") -> bytes: + return struct.pack( + f"{byte_order}BBH II hhHHHH II I", + CreateWindow, # opcode + self.depth, + 9, # request length in 4-byte units + self.wid, + self.parent, + self.x, + self.y, + self.width, + self.height, + self.border_width, + self.window_class, + self.visual, + self.value_mask, + self.override_redirect, + ) + + +@dataclass +class CreatePixmapRequest: + """X11 CreatePixmap request.""" + + pid: int + drawable: int + width: int + height: int + depth: int + + def to_bytes(self, byte_order: str = "<") -> bytes: + return struct.pack( + f"{byte_order}BBHIIHH", + CreatePixmap, # opcode + self.depth, + 4, # request length + self.pid, + self.drawable, + self.width, + self.height, + ) + + +@dataclass +class InternAtomRequest: + """X11 InternAtom request.""" + + name: str + only_if_exists: bool = False + + def to_bytes(self, byte_order: str = "<") -> bytes: + name_bytes = self.name.encode("ascii") + padded = _pad(name_bytes) + req_len = (8 + len(padded)) // 4 + return ( + struct.pack( + f"{byte_order}BBHHxx", + InternAtom, # opcode + 1 if self.only_if_exists else 0, + req_len, + len(name_bytes), + ) + + padded + ) + + +@dataclass +class QueryExtensionRequest: + """X11 QueryExtension request.""" + + name: str + + def to_bytes(self, byte_order: str = "<") -> bytes: + name_bytes = self.name.encode("ascii") + padded = _pad(name_bytes) + req_len = (8 + len(padded)) // 4 + return ( + struct.pack( + f"{byte_order}BBHHxx", QueryExtension, 0, req_len, len(name_bytes) + ) + + padded + ) diff --git a/test/pyxtest/proto/xkb.py b/test/pyxtest/proto/xkb.py new file mode 100644 index 000000000..22e1bf5a1 --- /dev/null +++ b/test/pyxtest/proto/xkb.py @@ -0,0 +1,473 @@ +# SPDX-License-Identifier: MIT +# +# XKB protocol request builders +# +# All fields are controllable via keyword arguments so tests can craft +# malformed requests with inconsistent lengths, out-of-range indices, etc. + +import struct +from dataclasses import dataclass + +# XKB minor opcodes +XkbUseExtension = 0 +XkbSelectEvents = 1 +XkbBell = 3 +XkbGetState = 4 +XkbLatchLockState = 5 +XkbGetControls = 6 +XkbSetControls = 7 +XkbGetMap = 8 +XkbSetMap = 9 +XkbGetCompatMap = 10 +XkbSetCompatMap = 11 +XkbGetIndicatorState = 12 +XkbGetIndicatorMap = 13 +XkbSetIndicatorMap = 14 +XkbGetNamedIndicator = 15 +XkbSetNamedIndicator = 16 +XkbGetNames = 17 +XkbSetNames = 18 +XkbGetGeometry = 19 +XkbSetGeometry = 20 +XkbPerClientFlags = 21 +XkbListComponents = 22 +XkbGetKbdByName = 23 +XkbGetDeviceInfo = 24 +XkbSetDeviceInfo = 25 +XkbSetDebuggingFlags = 101 + +# XKB constants +XkbUseCoreKbd = 0x0100 + +# SetMap present flags +XkbKeyTypesMask = 0x0001 +XkbKeySymsMask = 0x0002 +XkbModifierMapMask = 0x0004 +XkbExplicitComponentsMask = 0x0008 +XkbKeyActionsMask = 0x0010 +XkbKeyBehaviorsMask = 0x0020 +XkbVirtualModsMask = 0x0040 +XkbVirtualModMapMask = 0x0080 + +# SetMap flags +XkbSetMapResizeTypes = 1 +XkbSetMapRecomputeActions = 2 + +XkbNumRequiredTypes = 4 +XkbMaxLegalKeyCode = 255 +XkbNoShape = 0xFF + + +@dataclass +class UseExtensionRequest: + """XkbUseExtension request (minor opcode 0).""" + + opcode: int + major: int = 1 + minor: int = 0 + + def to_bytes(self, byte_order: str = "<") -> bytes: + return struct.pack( + f"{byte_order}BBHHH xx", + self.opcode, + XkbUseExtension, + 3, # 12 bytes = 3 words + self.major, + self.minor, + ) + + +# Keep as a module-level function for backward compatibility with +# xclient.py which calls xkb.use_extension() directly. +def use_extension( + opcode: int, *, major: int = 1, minor: int = 0, byte_order: str = "<" +) -> bytes: + """Build XkbUseExtension request (minor opcode 0).""" + return UseExtensionRequest(opcode=opcode, major=major, minor=minor).to_bytes( + byte_order + ) + + +@dataclass +class SetMapRequest: + """ + xkbSetMapReq (minor opcode 9). + + The 36-byte header is followed by variable-length payload data + whose format depends on the 'present' bitmask. For security testing, + payload is passed as raw bytes, allowing intentionally malformed data. + """ + + opcode: int + device_spec: int = XkbUseCoreKbd + present: int = 0 + flags: int = 0 + min_key_code: int = 8 + max_key_code: int = 255 + first_type: int = 0 + n_types: int = 0 + first_key_sym: int = 0 + n_key_syms: int = 0 + total_syms: int = 0 + first_key_act: int = 0 + n_key_acts: int = 0 + total_acts: int = 0 + first_key_behavior: int = 0 + n_key_behaviors: int = 0 + total_key_behaviors: int = 0 + first_key_explicit: int = 0 + n_key_explicit: int = 0 + total_key_explicit: int = 0 + first_mod_map_key: int = 0 + n_mod_map_keys: int = 0 + total_mod_map_keys: int = 0 + first_vmod_map_key: int = 0 + n_vmod_map_keys: int = 0 + total_vmod_map_keys: int = 0 + virtual_mods: int = 0 + payload: bytes = b"" + length_override: int | None = None + + def to_bytes(self, byte_order: str = "<") -> bytes: + total_bytes = 36 + len(self.payload) + pad_len = (4 - total_bytes % 4) % 4 + total_bytes += pad_len + + length = ( + self.length_override + if self.length_override is not None + else total_bytes // 4 + ) + + header = struct.pack( + f"{byte_order}BBH" # reqType, xkbReqType, length + f"HHH" # deviceSpec, present, flags + f"BBB" # minKeyCode, maxKeyCode, firstType + f"B" # nTypes + f"BB" # firstKeySym, nKeySyms + f"H" # totalSyms + f"BB" # firstKeyAct, nKeyActs + f"H" # totalActs + f"BB" # firstKeyBehavior, nKeyBehaviors + f"B" # totalKeyBehaviors + f"BB" # firstKeyExplicit, nKeyExplicit + f"B" # totalKeyExplicit + f"BB" # firstModMapKey, nModMapKeys + f"B" # totalModMapKeys + f"BB" # firstVModMapKey, nVModMapKeys + f"B" # totalVModMapKeys + f"H", # virtualMods + self.opcode, + XkbSetMap, + length, + self.device_spec, + self.present, + self.flags, + self.min_key_code, + self.max_key_code, + self.first_type, + self.n_types, + self.first_key_sym, + self.n_key_syms, + self.total_syms, + self.first_key_act, + self.n_key_acts, + self.total_acts, + self.first_key_behavior, + self.n_key_behaviors, + self.total_key_behaviors, + self.first_key_explicit, + self.n_key_explicit, + self.total_key_explicit, + self.first_mod_map_key, + self.n_mod_map_keys, + self.total_mod_map_keys, + self.first_vmod_map_key, + self.n_vmod_map_keys, + self.total_vmod_map_keys, + self.virtual_mods, + ) + return header + self.payload + b"\x00" * pad_len + + +@dataclass +class KeyTypeWire: + """ + xkbKeyTypeWireDesc (8 bytes) with optional entries/preserve data. + """ + + num_levels: int = 2 + has_preserve: bool = False + n_map_entries: int = 0 + + def to_bytes(self, byte_order: str = "<") -> bytes: + header = struct.pack( + f"{byte_order}BBH BBBB", + 0, + 0, + 0, # mask, realMods, virtualMods + self.num_levels, + self.n_map_entries, + 1 if self.has_preserve else 0, # preserve + 0, # pad + ) + # Map entries: 4 bytes each (level(1), realMods(1), virtualMods(2)) + entries = b"\x00" * (4 * self.n_map_entries) + # Preserve entries: 4 bytes each (realMods(1), pad(1), virtualMods(2)) + preserve = b"\x00" * (4 * self.n_map_entries) if self.has_preserve else b"" + return header + entries + preserve + + +@dataclass +class SetCompatMapRequest: + """xkbSetCompatMapReq (minor opcode 11). Header is 16 bytes.""" + + opcode: int + device_spec: int = XkbUseCoreKbd + recompute_actions: int = 0 + truncate_si: int = 0 + groups: int = 0 + first_si: int = 0 + n_si: int = 0 + payload: bytes = b"" + length_override: int | None = None + + def to_bytes(self, byte_order: str = "<") -> bytes: + total_bytes = 16 + len(self.payload) + pad_len = (4 - total_bytes % 4) % 4 + total_bytes += pad_len + + length = ( + self.length_override + if self.length_override is not None + else total_bytes // 4 + ) + + header = struct.pack( + f"{byte_order}BBH" # reqType, xkbReqType, length + f"H" # deviceSpec + f"xB" # pad, recomputeActions + f"B" # truncateSI + f"B" # groups + f"H" # firstSI + f"H" # nSI + f"xx", # pad + self.opcode, + XkbSetCompatMap, + length, + self.device_spec, + self.recompute_actions, + self.truncate_si, + self.groups, + self.first_si, + self.n_si, + ) + return header + self.payload + b"\x00" * pad_len + + +@dataclass +class SymInterpretWire: + """A single xkbSymInterpretWireDesc (16 bytes).""" + + def to_bytes(self, byte_order: str = "<") -> bytes: + return struct.pack( + f"{byte_order}I" # sym (KeySym) + f"BBBx" # mods, match, virtualMod, pad + f"8s", # action (8 bytes) + 0, + 0, + 0, + 0, + b"\x00" * 8, + ) + + +@dataclass +class SetGeometryRequest: + """xkbSetGeometryReq (minor opcode 20). Header is 28 bytes.""" + + opcode: int + device_spec: int = XkbUseCoreKbd + n_shapes: int = 0 + n_sections: int = 0 + name_atom: int = 0 + width_mm: int = 100 + height_mm: int = 100 + n_properties: int = 0 + n_colors: int = 0 + n_doodads: int = 0 + n_key_aliases: int = 0 + base_color_ndx: int = 0 + label_color_ndx: int = 1 + payload: bytes = b"" + length_override: int | None = None + + def to_bytes(self, byte_order: str = "<") -> bytes: + total_bytes = 28 + len(self.payload) + pad_len = (4 - total_bytes % 4) % 4 + total_bytes += pad_len + + length = ( + self.length_override + if self.length_override is not None + else total_bytes // 4 + ) + + header = struct.pack( + f"{byte_order}BBH" # reqType, xkbReqType, length + f"H" # deviceSpec + f"BB" # nShapes, nSections + f"I" # name (Atom) + f"HH" # widthMM, heightMM + f"HH" # nProperties, nColors + f"HH" # nDoodads, nKeyAliases + f"BB" # baseColorNdx, labelColorNdx + f"xx", # pad + self.opcode, + XkbSetGeometry, + length, + self.device_spec, + self.n_shapes, + self.n_sections, + self.name_atom, + self.width_mm, + self.height_mm, + self.n_properties, + self.n_colors, + self.n_doodads, + self.n_key_aliases, + self.base_color_ndx, + self.label_color_ndx, + ) + return header + self.payload + b"\x00" * pad_len + + +@dataclass +class CountedString: + """A counted string: CARD16 length + chars, padded to 4 bytes.""" + + value: str | bytes + + def to_bytes(self, byte_order: str = "<") -> bytes: + s = self.value + if isinstance(s, str): + s = s.encode("ascii") + length = len(s) + total = 2 + length # CARD16 header + string bytes + pad_len = (4 - total % 4) % 4 + return struct.pack(f"{byte_order}H", length) + s + b"\x00" * pad_len + + +# Keep as a module-level function for convenience since tests use it +# inline in payload construction. +def build_counted_string(s: str | bytes, byte_order: str = "<") -> bytes: + """Build a counted string: CARD16 length + chars, padded to 4 bytes.""" + return CountedString(value=s).to_bytes(byte_order) + + +@dataclass +class ShapeWire: + """ + xkbShapeWireDesc (4 bytes) + outline data. + Each outline: header(4 bytes) + nPoints * point(4 bytes). + """ + + n_outlines: int = 1 + primary_ndx: int = 0 + approx_ndx: int = 0 + + def to_bytes(self, byte_order: str = "<") -> bytes: + header = struct.pack( + f"{byte_order}BBBx", + self.n_outlines, + self.primary_ndx, + self.approx_ndx, + ) + outlines = b"" + for _ in range(self.n_outlines): + outline_hdr = struct.pack(f"{byte_order}BBxx", 1, 0) # 1 point + point = struct.pack(f"{byte_order}hh", 0, 0) + outlines += outline_hdr + point + return header + outlines + + +@dataclass +class SectionWire: + """xkbSectionWireDesc (20 bytes) + row data.""" + + n_rows: int = 1 + n_doodads: int = 0 + n_overlays: int = 0 + + def to_bytes(self, byte_order: str = "<") -> bytes: + header = struct.pack( + f"{byte_order}I hh HH h BBBBxx", + 0, # name (Atom) + 0, + 0, # top, left + 100, + 100, # width, height + 0, # angle + 0, # priority + self.n_rows, + self.n_doodads, + self.n_overlays, + ) + rows = b"" + for _ in range(self.n_rows): + rows += struct.pack(f"{byte_order}hhBBxx", 0, 0, 0, 0) + return header + rows + + +@dataclass +class OverlayWire: + """xkbOverlayWireDesc + overlay rows.""" + + n_rows: int = 1 + rows_under: list[int] | None = None + + def to_bytes(self, byte_order: str = "<") -> bytes: + header = struct.pack(f"{byte_order}IBxxx", 0, self.n_rows) + rows_under = ( + self.rows_under if self.rows_under is not None else list(range(self.n_rows)) + ) + rows = b"" + for row_under in rows_under: + rows += struct.pack(f"{byte_order}BBxx", row_under, 0) + return header + rows + + +@dataclass +class GetKbdByNameRequest: + """xkbGetKbdByNameReq (minor opcode 23).""" + + opcode: int + device_spec: int = XkbUseCoreKbd + need: int = 0 + want: int = 0 + load: int = 0 + payload: bytes = b"" + length_override: int | None = None + + def to_bytes(self, byte_order: str = "<") -> bytes: + total_bytes = 12 + len(self.payload) + pad_len = (4 - total_bytes % 4) % 4 + total_bytes += pad_len + + length = ( + self.length_override + if self.length_override is not None + else total_bytes // 4 + ) + + header = struct.pack( + f"{byte_order}BBH HHHBx", + self.opcode, + XkbGetKbdByName, + length, + self.device_spec, + self.need, + self.want, + self.load, + ) + return header + self.payload + b"\x00" * pad_len diff --git a/test/pyxtest/valgrind.py b/test/pyxtest/valgrind.py new file mode 100644 index 000000000..237ba79d6 --- /dev/null +++ b/test/pyxtest/valgrind.py @@ -0,0 +1,94 @@ +# SPDX-License-Identifier: MIT +# +# Valgrind XML output parser for extracting memory errors and suppressions. + +from __future__ import annotations + +import xml.etree.ElementTree as ET +from dataclasses import dataclass +from pathlib import Path + + +def _suppression_from_element(elem: ET.Element) -> str: + """Format a ```` XML element as a suppression file entry.""" + name = elem.findtext("sname", "") + kind = elem.findtext("skind", "") + auxiliary = elem.findtext("skaux") + + lines = ["{", f" {name}", f" {kind}"] + if auxiliary: + lines.append(f" {auxiliary}") + for sframe in elem.iter("sframe"): + fun = sframe.findtext("fun") + obj = sframe.findtext("obj") + if fun is not None: + lines.append(f" fun:{fun}") + elif obj is not None: + lines.append(f" obj:{obj}") + else: + lines.append(" ...") + lines.append("}") + return "\n".join(lines) + + +@dataclass +class ValgrindError: + """Represents a single valgrind error extracted from XML output.""" + + kind: str # e.g. "InvalidRead", "InvalidWrite" + what: str # human-readable description + stack_frames: list[tuple[str, str | None, str | None]] # (func, file, line) + suppression: str | None = None # ready-to-paste suppression file entry + + def __str__(self): + lines = [f"{self.kind}: {self.what}"] + for func, srcfile, line in self.stack_frames[:8]: + loc = f" ({srcfile}:{line})" if srcfile else "" + lines.append(f" at {func}{loc}") + if self.suppression: + lines.append("") + lines.append(self.suppression) + return "\n".join(lines) + + @classmethod + def from_xml(cls, xml_path: Path) -> list[ValgrindError]: + """Parse valgrind XML output and return a list of ValgrindError.""" + if not xml_path.is_file(): + return [] + + try: + tree = ET.parse(xml_path) + except ET.ParseError: + return [] + + errors = [] + root = tree.getroot() + + for error_elem in root.iter("error"): + kind_elem = error_elem.find("kind") + kind = kind_elem.text if kind_elem is not None else "Unknown" + assert kind is not None + + what_elem = error_elem.find("what") + if what_elem is None: + what_elem = error_elem.find("xwhat/text") + what = what_elem.text if what_elem is not None else "Unknown error" + assert what is not None + + frames = [] + stack_elem = error_elem.find("stack") + if stack_elem is not None: + for frame in stack_elem.iter("frame"): + fn = frame.findtext("fn", "???") + srcfile = frame.findtext("file", "") + line = frame.findtext("line", "") + frames.append((fn, srcfile, line)) + + suppression = None + supp_elem = error_elem.find("suppression") + if supp_elem is not None: + suppression = _suppression_from_element(supp_elem) + + errors.append(ValgrindError(kind, what, frames, suppression)) + + return errors diff --git a/test/pyxtest/valgrind.suppressions b/test/pyxtest/valgrind.suppressions new file mode 100644 index 000000000..ad62211a2 --- /dev/null +++ b/test/pyxtest/valgrind.suppressions @@ -0,0 +1,18 @@ +# Debian complains about invalid reads +# InvalidRead: Invalid read of size 8 +# at strncmp (strcmp-sse2.S:162) +# at is_dst (dl-load.c:216) +# at _dl_dst_count (dl-load.c:253) +# at expand_dynamic_string_token (dl-load.c:395) +# at fillin_rpath.isra.0 (dl-load.c:483) +# at decompose_rpath (dl-load.c:654) +# at _dl_map_object (dl-load.c:2111) +# at openaux (dl-deps.c:64) +{ + Glibc_LD_SO_Linker_Noise + Memcheck:Addr8 + fun:strncmp + ... + fun:_dl_map_object_deps + ... +} diff --git a/test/pyxtest/xclient.py b/test/pyxtest/xclient.py new file mode 100644 index 000000000..b25df75b6 --- /dev/null +++ b/test/pyxtest/xclient.py @@ -0,0 +1,517 @@ +# SPDX-License-Identifier: MIT +# +# X11 client connection utilities + +import select +import socket +import struct +import time +from dataclasses import dataclass +from enum import StrEnum + +from proto.bigrequests import BigRequestsEnableRequest +from proto.x11 import ( + CreatePixmapRequest, + CreateWindowRequest, + InternAtomRequest, + QueryExtensionRequest, +) +from proto import xkb + + +class X11ConnectionError(Exception): + """Raised when the X11 connection fails.""" + + pass + + +@dataclass +class XExtensionData: + """Cached result of a QueryExtension reply.""" + + opcode: int + first_event: int + first_error: int + + +class Extension(StrEnum): + """X11 extension wire names as used in QueryExtension requests.""" + + BIG_REQUESTS = "BIG-REQUESTS" + COMPOSITE = "Composite" + DAMAGE = "DAMAGE" + DBE = "DOUBLE-BUFFER" + DPMS = "DPMS" + DRI2 = "DRI2" + DRI3 = "DRI3" + GENERIC_EVENT = "Generic Event Extension" + GLX = "GLX" + MIT_SCREEN_SAVER = "MIT-SCREEN-SAVER" + MIT_SHM = "MIT-SHM" + PRESENT = "Present" + RANDR = "RANDR" + RECORD = "RECORD" + RENDER = "RENDER" + SECURITY = "SECURITY" + SHAPE = "SHAPE" + SYNC = "SYNC" + XC_MISC = "XC-MISC" + XF86BIGFONT = "XFree86-Bigfont" + XF86DGA = "XFree86-DGA" + XF86VIDMODE = "XFree86-VidModeExtension" + XFIXES = "XFIXES" + XI = "XInputExtension" + XRES = "X-Resource" + XINERAMA = "XINERAMA" + XKB = "XKEYBOARD" + XTEST = "XTEST" + XVIDEO = "XVideo" + XVIDEO_MC = "XVideo-MotionCompensation" + + +@dataclass +class X11Error: + """An X11 error reply from the server.""" + + response_type: int + error_code: int + sequence: int + resource_id: int + minor_code: int + major_code: int + + @classmethod + def from_data(cls, data: bytes, byte_order: str = "<") -> "X11Error": + if len(data) < 32: + data = data + b"\x00" * (32 - len(data)) + response_type, error_code, sequence, resource_id, minor_code, major_code = ( + struct.unpack_from(f"{byte_order}BBHIHB", data) + ) + return cls( + response_type, error_code, sequence, resource_id, minor_code, major_code + ) + + def __repr__(self): + return ( + f"" + ) + + +@dataclass +class X11Reply: + """An X11 reply from the server.""" + + data: bytes + response_type: int + sequence: int + length: int + + @classmethod + def from_data(cls, data: bytes, byte_order: str = "<") -> "X11Reply": + if len(data) >= 8: + response_type = data[0] + sequence = struct.unpack_from(f"{byte_order}H", data, 2)[0] + length = struct.unpack_from(f"{byte_order}I", data, 4)[0] + else: + response_type = sequence = length = 0 + return cls(data, response_type, sequence, length) + + def __repr__(self): + return ( + f"" + ) + + +class RawX11Connection: + """ + Minimal X11 connection for sending raw (possibly malformed) requests. + + Set swapped=True for testing byte-swap code paths (SProcXxx). + """ + + def __init__(self, display_num, swapped=False): + self.display_num = display_num + self.swapped = swapped + self._byte_order = ">" if swapped else "<" + self.sock = None + self.seq = 0 + self.root_window = 0 + self.root_visual = 0 + self.root_depth = 0 + self._resource_id_base = 0 + self._resource_id_mask = 0 + self._next_resource_id = 0 + self._extensions = {} + self._connect() + + def _connect(self): + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + path = f"/tmp/.X11-unix/X{self.display_num}" + try: + self.sock.connect(path) + except (ConnectionRefusedError, FileNotFoundError) as e: + self.sock.close() + self.sock = None + raise X11ConnectionError(f"Cannot connect to {path}: {e}") + try: + self._handshake() + except Exception: + self.sock.close() + self.sock = None + raise + + def _handshake(self): + byte_order = 0x42 if self.swapped else 0x6C + bo = self._byte_order + setup = struct.pack(f"{bo}BxHHHHxx", byte_order, 11, 0, 0, 0) + assert self.sock + self.sock.sendall(setup) + + header = self._recv_exact(8) + status = header[0] + + if status == 0: + reason_len = header[1] + extra_len = struct.unpack_from(f"{bo}H", header, 6)[0] + extra = self._recv_exact(extra_len * 4) + reason = extra[:reason_len].decode("ascii", errors="replace") + raise X11ConnectionError(f"X server refused connection: {reason}") + + if status == 2: + raise X11ConnectionError("X server requires authentication") + if status != 1: + raise X11ConnectionError(f"Unexpected setup status: {status}") + + extra_len = struct.unpack_from(f"{bo}H", header, 6)[0] + data = self._recv_exact(extra_len * 4) + + (_, res_base, res_mask, _, vendor_len, _, num_screens, num_formats) = ( + struct.unpack_from(f"{bo}IIIIHH BB", data, 0) + ) + + self._resource_id_base = res_base + self._resource_id_mask = res_mask + self._next_resource_id = 1 + + offset = 32 + ((vendor_len + 3) & ~3) + num_formats * 8 + + if num_screens > 0 and offset + 40 <= len(data): + ( + self.root_window, + _, + _, + _, + _, + _, + _, + _, + _, + _, + _, + self.root_visual, + _, + _, + self.root_depth, + ) = struct.unpack_from(f"{bo}IIIIIHHHHHHI BBB", data, offset) + + # --- Core protocol helpers --- + + def alloc_id(self) -> int: + xid = self._resource_id_base | self._next_resource_id + self._next_resource_id += 1 + return xid + + def send_request(self, data: bytes) -> int: + self.seq += 1 + assert self.sock + self.sock.sendall(data) + return self.seq + + def recv_response(self, timeout: float = 5.0) -> X11Error | X11Reply | None: + ready = select.select([self.sock], [], [], timeout) + if not ready[0]: + return None + try: + header = self._recv_exact(32, timeout=timeout) + except (ConnectionResetError, BrokenPipeError, OSError, X11ConnectionError): + return None + if not header: + return None + + rtype = header[0] + bo = self._byte_order + if rtype == 0: + return X11Error.from_data(header, bo) + elif rtype == 1: + extra_len = struct.unpack_from(f"{bo}I", header, 4)[0] + if extra_len > 0: + try: + extra = self._recv_exact(extra_len * 4, timeout=timeout) + return X11Reply.from_data(header + extra, bo) + except ( + ConnectionResetError, + BrokenPipeError, + OSError, + X11ConnectionError, + ): + return X11Reply.from_data(header, bo) + return X11Reply.from_data(header, bo) + else: + return X11Reply.from_data(header, bo) + + def flush_responses(self, timeout: float = 0.5) -> list[X11Error | X11Reply]: + responses: list[X11Error | X11Reply] = [] + while True: + resp = self.recv_response(timeout=timeout) + if resp is None: + break + responses.append(resp) + return responses + + def is_connected(self) -> bool: + assert self.sock + try: + ready = select.select([self.sock], [], [], 0) + if ready[0]: + data = self.sock.recv(1, socket.MSG_PEEK) + return len(data) > 0 + return True + except (ConnectionResetError, BrokenPipeError, OSError): + return False + + def wait_for_disconnect(self, timeout: float = 5.0) -> bool: + assert self.sock + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + remaining = deadline - time.monotonic() + if remaining <= 0: + break + ready = select.select([self.sock], [], [], min(remaining, 0.5)) + if ready[0]: + try: + data = self.sock.recv(4096) + if not data: + return True + except (ConnectionResetError, BrokenPipeError, OSError): + return True + return False + + # --- Extension negotiation --- + + def query_extension(self, name: str) -> XExtensionData | None: + if name in self._extensions: + return self._extensions[name] + + req = QueryExtensionRequest(name=name) + self.send_request(req.to_bytes(self._byte_order)) + resp = self.recv_response(timeout=5.0) + if resp is None or isinstance(resp, X11Error): + return None + + if len(resp.data) >= 12: + present, major, first_event, first_error = struct.unpack_from( + "BBBB", resp.data, 8 + ) + if present: + result = XExtensionData(major, first_event, first_error) + self._extensions[name] = result + return result + return None + + def xkb_use_extension(self, major_version: int = 1, minor_version: int = 0) -> int: + ext = self.query_extension(Extension.XKB) + if not ext: + raise X11ConnectionError("XKB extension not available") + + req = xkb.use_extension( + ext.opcode, + major=major_version, + minor=minor_version, + byte_order=self._byte_order, + ) + self.send_request(req) + + resp = self.recv_response(timeout=5.0) + if isinstance(resp, X11Error): + raise X11ConnectionError(f"XkbUseExtension failed: error {resp.error_code}") + return ext.opcode + + def enable_big_requests(self) -> int: + ext = self.query_extension(Extension.BIG_REQUESTS) + if not ext: + raise X11ConnectionError("BIG-REQUESTS not available") + + req = BigRequestsEnableRequest(opcode=ext.opcode) + self.send_request(req.to_bytes(self._byte_order)) + + resp = self.recv_response(timeout=5.0) + if isinstance(resp, X11Error): + raise X11ConnectionError("BigRequestsEnable failed") + if resp and len(resp.data) >= 12: + return struct.unpack_from(f"{self._byte_order}I", resp.data, 8)[0] + return 0 + + # --- Resource creation helpers --- + + def create_window( + self, + width: int = 100, + height: int = 100, + depth: int | None = None, + parent: int | None = None, + x: int = 0, + y: int = 0, + ) -> int: + wid = self.alloc_id() + if parent is None: + parent = self.root_window + if depth is None: + depth = self.root_depth + + assert parent is not None + assert depth is not None + + req = CreateWindowRequest( + wid=wid, + parent=parent, + x=x, + y=y, + width=width, + height=height, + depth=depth, + ) + self.send_request(req.to_bytes(self._byte_order)) + self.flush_responses(timeout=0.2) + return wid + + def create_pixmap( + self, + width: int = 100, + height: int = 100, + depth: int | None = None, + drawable: int | None = None, + ) -> int: + pid = self.alloc_id() + if drawable is None: + drawable = self.root_window + if depth is None: + depth = self.root_depth + + assert drawable is not None + assert depth is not None + + req = CreatePixmapRequest( + pid=pid, + drawable=drawable, + width=width, + height=height, + depth=depth, + ) + self.send_request(req.to_bytes(self._byte_order)) + self.flush_responses(timeout=0.2) + return pid + + def intern_atom(self, name: str, only_if_exists: bool = False) -> int: + req = InternAtomRequest(name=name, only_if_exists=only_if_exists) + self.send_request(req.to_bytes(self._byte_order)) + + resp = self.recv_response(timeout=5.0) + if isinstance(resp, X11Error) or resp is None: + return 0 + if len(resp.data) >= 12: + return struct.unpack_from(f"{self._byte_order}I", resp.data, 8)[0] + return 0 + + def get_fd(self) -> int: + assert self.sock + return self.sock.fileno() + + def close(self) -> None: + if self.sock: + try: + self.sock.close() + except OSError: + pass + self.sock = None + + def _recv_exact(self, nbytes: int, timeout: float = 10.0) -> bytes: + assert self.sock + data = b"" + deadline = time.monotonic() + timeout + while len(data) < nbytes: + remaining = deadline - time.monotonic() + if remaining <= 0: + raise X11ConnectionError( + f"Timeout reading ({len(data)}/{nbytes} bytes)" + ) + ready = select.select([self.sock], [], [], min(remaining, 1.0)) + if ready[0]: + chunk = self.sock.recv(nbytes - len(data)) + if not chunk: + raise X11ConnectionError( + f"Connection closed ({len(data)}/{nbytes} bytes)" + ) + data += chunk + return data + + def __enter__(self) -> "RawX11Connection": + return self + + def __exit__(self, *args) -> bool: + self.close() + return False + + def __del__(self) -> None: + self.close() + + +class XlibConnection: + """python-xlib based connection for higher-level X operations.""" + + def __init__(self, display_num): + from Xlib import display as xlib_display + + self.display_num = display_num + self.display = xlib_display.Display(f":{display_num}") + self.screen = self.display.screen() + self.root = self.screen.root + + def get_fd(self): + return self.display.fileno() + + def create_window(self, width=100, height=100, x=0, y=0): + from Xlib import X + + window = self.root.create_window( + x, + y, + width, + height, + 0, + self.screen.root_depth, + X.InputOutput, + X.CopyFromParent, + ) + self.display.sync() + return window + + def flush(self): + self.display.flush() + + def sync(self): + self.display.sync() + + def close(self): + try: + self.display.close() + except Exception: + pass + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() + return False diff --git a/test/pyxtest/xserver.py b/test/pyxtest/xserver.py new file mode 100644 index 000000000..bc392e322 --- /dev/null +++ b/test/pyxtest/xserver.py @@ -0,0 +1,429 @@ +# SPDX-License-Identifier: MIT +# +# X server lifecycle manager +# +# Handles starting and stopping Xvfb, Xwayland, and Xorg servers, +# optionally under valgrind or with AddressSanitizer (ASAN) support, +# with automatic display number allocation via the -displayfd mechanism. + +from typing import Iterator + +import os +import select +import shutil +import subprocess +import tempfile +import time +import warnings +from pathlib import Path + +from asan import AsanError +from valgrind import ValgrindError + + +class XServerProcess: + """ + Manages an X server subprocess for testing. + + Supports Xvfb, Xwayland, and Xorg. Uses the -displayfd pipe mechanism + for automatic display number allocation. Optionally wraps the server + in valgrind for memory error detection, or detects AddressSanitizer + (ASAN) errors when running an ASAN-instrumented binary. + """ + + VALGRIND_ERROR_EXIT = 99 + + def __init__( + self, + server_type="xvfb", + valgrind=False, + valgrind_suppressions=None, + asan=False, + server_path=None, + extra_args=None, + log_file=None, + ): + self.server_type = server_type + self.asan = asan + if asan and valgrind: + warnings.warn( + "ASAN and valgrind are incompatible; disabling valgrind " + "in favour of ASAN.", + stacklevel=2, + ) + valgrind = False + self.valgrind = valgrind + if valgrind_suppressions is None: + default_supp = Path(__file__).resolve().parent / "valgrind.suppressions" + self.valgrind_suppressions = ( + default_supp if default_supp.is_file() else None + ) + else: + self.valgrind_suppressions = valgrind_suppressions + self.server_path = server_path or self._find_server() + self.extra_args = extra_args or [] + self.log_file = log_file + + self._process = None + self._display_num = None + self._valgrind_xml_file = None + self._asan_log_path = None + self._stderr_file = None + self._weston_process = None + + def _find_server(self): + """Discover the server binary from environment or build directory.""" + env_map = { + "xvfb": "XVFB_PATH", + "xwayland": "XWAYLAND_PATH", + "xorg": "XORG_PATH", + } + env_var = env_map.get(self.server_type) + if env_var and os.environ.get(env_var): + path = Path(os.environ[env_var]) + if path.is_file() and os.access(path, os.X_OK): + return path + + def find_meson_builddir(source_root: Path) -> Iterator[Path]: + for d in source_root.iterdir(): + if d.is_dir() and (d / "meson-private").exists(): + yield d + + # Try XSERVER_BUILDDIR env var or fall back to the first + # build directory relative to this file's location + # test/pyxtest/xserver.py -> ../../build/hw/... + builddir = os.environ.get("XSERVER_BUILDDIR") + if builddir is None: + try: + builddir = next( + find_meson_builddir(Path(__file__).resolve().parent.parent.parent) + ) + except StopIteration: + pass + + if builddir: + build_paths = { + "xvfb": Path(builddir, "hw", "vfb", "Xvfb"), + "xwayland": Path(builddir, "hw", "xwayland", "Xwayland"), + "xorg": Path(builddir, "hw", "xfree86", "Xorg"), + } + path = build_paths.get(self.server_type) + if path and path.is_file() and os.access(path, os.X_OK): + return path + + # Fall back to system PATH + binary_names = { + "xvfb": "Xvfb", + "xwayland": "Xwayland", + "xorg": "Xorg", + } + name = binary_names.get(self.server_type) + server_in_path = shutil.which(name) + if server_in_path: + warnings.warn( + f"Using system {self.server_type} server from PATH: {server_in_path}. " + f"Set {env_var} or XSERVER_BUILDDIR to use a specific build.", + stacklevel=2, + ) + return server_in_path + + raise FileNotFoundError( + f"Failed to find {self.server_type} server binary. " + f"Set {env_var} environment variable or build the server first." + ) + + @property + def display_num(self): + return self._display_num + + @property + def display(self): + if self._display_num is not None: + return f":{self._display_num}" + return None + + def start(self, timeout=10): + """Start the X server and wait for it to be ready.""" + read_fd, write_fd = os.pipe() + + cmd = self._build_command(write_fd) + + self._stderr_file = tempfile.NamedTemporaryFile( + prefix=f"xserver-{self.server_type}-stderr-", + suffix=".log", + delete=False, + mode="w", + ) + + if self.server_type == "xwayland": + self._start_wayland_compositor() + + env = os.environ.copy() + if self._weston_process: + env.setdefault("WAYLAND_DISPLAY", "wayland-security-test") + + if self.asan: + self._setup_asan_env(env) + + try: + self._process = subprocess.Popen( + cmd, + stderr=self._stderr_file, + pass_fds=(write_fd,), + env=env, + ) + except Exception: + os.close(write_fd) + os.close(read_fd) + raise + + os.close(write_fd) + + try: + display_bytes = b"" + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + if self._process.poll() is not None: + display_bytes = None + break + + ready, _, _ = select.select([read_fd], [], [], 1.0) + if ready: + chunk = os.read(read_fd, 64) + if not chunk: + break + display_bytes += chunk + if b"\n" in display_bytes: + break + + if not display_bytes: + if self._process.poll() is not None: + stderr_content = self._read_stderr() + raise RuntimeError( + f"X server exited with code {self._process.returncode} " + f"before sending display number (waited {timeout}s).\n" + f"Command: {' '.join(str(s) for s in cmd)}\n" + f"Stderr:\n{stderr_content}" + ) + self.stop() + raise RuntimeError( + f"Timed out waiting for X server display number (waited {timeout}s)" + ) + + self._display_num = int(display_bytes.strip()) + + finally: + os.close(read_fd) + + return self._display_num + + def _build_command(self, displayfd): + """Build the full command line including optional valgrind wrapper.""" + server_args = [self.server_path] + + if self.server_type == "xvfb": + server_args.extend( + [ + "-screen", + "scrn", + "1280x1024x24", + ] + ) + elif self.server_type == "xwayland": + server_args.extend(["-nokeymap"]) + elif self.server_type == "xorg": + # -logfile is only permitted if we're running as root + if self.log_file and os.geteuid() == 0: + server_args.extend(["-logfile", str(self.log_file)]) + + server_args.extend(["-noreset", "+byteswappedclients"]) + + # Auto-detect xkb directory if the compiled-in default doesn't exist + xkb_dir = os.environ.get("XKB_CONFIG_ROOT") + if not xkb_dir: + for candidate in ["/usr/share/X11/xkb", "/usr/local/share/X11/xkb"]: + if os.path.isdir(candidate): + xkb_dir = candidate + break + if xkb_dir: + server_args.extend(["-xkbdir", xkb_dir]) + + server_args.extend(["-displayfd", str(displayfd)]) + server_args.extend(self.extra_args) + + if not self.valgrind: + return server_args + + self._valgrind_xml_file = tempfile.NamedTemporaryFile( + prefix=f"xserver-{self.server_type}-valgrind-", suffix=".xml", delete=False + ) + self._valgrind_xml_file.close() + + valgrind_cmd = [ + "valgrind", + "--tool=memcheck", + "--leak-check=full", + "--track-origins=yes", + "--show-reachable=no", + "--gen-suppressions=all", + f"--error-exitcode={self.VALGRIND_ERROR_EXIT}", + "--xml=yes", + f"--xml-file={self._valgrind_xml_file.name}", + ] + + if self.valgrind_suppressions: + valgrind_cmd.append(f"--suppressions={self.valgrind_suppressions}") + + return valgrind_cmd + server_args + + def _setup_asan_env(self, env): + """Configure ASAN_OPTIONS for the server subprocess. + + Sets up a log file for ASAN output and configures ASAN to not + detect leaks (too noisy for these tests). Any existing + ASAN_OPTIONS from the environment are preserved. + """ + asan_log_file = tempfile.NamedTemporaryFile( + prefix=f"xserver-{self.server_type}-asan-", + suffix=".log", + delete=False, + ) + self._asan_log_path = Path(asan_log_file.name) + asan_log_file.close() + + asan_opts = { + "log_path": str(self._asan_log_path), + "detect_leaks": "0", + } + + # Merge with any existing ASAN_OPTIONS from the environment + existing = env.get("ASAN_OPTIONS", "") + for part in existing.split(":"): + part = part.strip() + if "=" in part: + key, val = part.split("=", 1) + # Don't override our log_path + if key not in asan_opts: + asan_opts[key] = val + + env["ASAN_OPTIONS"] = ":".join(f"{k}={v}" for k, v in asan_opts.items()) + + def _start_wayland_compositor(self): + """Start weston as a headless compositor for Xwayland testing.""" + if not shutil.which("weston"): + raise FileNotFoundError( + "weston is required for Xwayland testing but was not found" + ) + + self._weston_process = subprocess.Popen( + ["weston", "--no-config", "--backend=headless-backend.so"], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + time.sleep(1) + if self._weston_process.poll() is not None: + raise RuntimeError( + f"weston exited with code {self._weston_process.returncode}" + ) + + @property + def is_alive(self) -> bool: + """Check if the server process is still running.""" + if self._process is None: + return False + return self._process.poll() is None + + @property + def crash_signal(self) -> int | None: + """If the server crashed, return the signal number. Otherwise None.""" + if self._process is None: + return None + ret = self._process.poll() + if ret is not None and ret < 0: + return -ret + return None + + def stop(self): + """Stop the server and return any valgrind/ASAN errors.""" + errors = [] + + if self._process and self._process.poll() is None: + self._process.terminate() + try: + self._process.wait(timeout=10) + except subprocess.TimeoutExpired: + self._process.kill() + self._process.wait(timeout=5) + + if self._weston_process and self._weston_process.poll() is None: + self._weston_process.terminate() + try: + self._weston_process.wait(timeout=5) + except subprocess.TimeoutExpired: + self._weston_process.kill() + self._weston_process.wait() + + if self.valgrind and self._valgrind_xml_file: + errors = ValgrindError.from_xml(Path(self._valgrind_xml_file.name)) + + if self._stderr_file: + self._stderr_file.close() + + return errors + + def _read_stderr(self): + """Read the captured stderr content.""" + if self._stderr_file: + self._stderr_file.flush() + try: + return Path(self._stderr_file.name).read_text() + except OSError: + pass + return "" + + def check_valgrind_errors(self): + """Parse and assert no valgrind errors occurred.""" + if self._valgrind_xml_file is None: + return [] + errors = ValgrindError.from_xml(Path(self._valgrind_xml_file.name)) + if errors: + msg = f"Valgrind found {len(errors)} error(s):\n\n" + msg += "\n\n".join(str(e) for e in errors) + raise AssertionError(msg) + return errors + + def get_asan_errors(self) -> list[AsanError]: + """Parse ASAN errors from log file and/or stderr.""" + errors: list[AsanError] = [] + + # Check the ASAN log file(s) first (ASAN appends . to log_path) + if self._asan_log_path: + errors.extend(AsanError.from_log(self._asan_log_path)) + + # Also check stderr in case ASAN wrote there + stderr_text = self._read_stderr() + if stderr_text and "AddressSanitizer" in stderr_text: + stderr_errors = AsanError.from_text(stderr_text) + # Avoid duplicates: only add stderr errors if log file had none + if not errors: + errors.extend(stderr_errors) + + return errors + + def __enter__(self): + self.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop() + return False + + def __repr__(self): + state = "running" if self.is_alive else "stopped" + display = self.display or "N/A" + flags = "" + if self.valgrind: + flags += " +valgrind" + if self.asan: + flags += " +asan" + return f""