test: add pytest-based test suite

This test suite is primarily aimed at reproducing the various CVE issues
we've had over the years that require custom crafted protocol requests.
It may also be useful for other testing.

Wrapped in python because pytest is a powerful test suite runner and
writing custom buffers is easy.

The architecture is so that we fork off an X server (one or more of
Xvfb, Xwayland, Xorg) and then run our test clients against that to
check whether we get the right reply, or crash the server, or whether
valgrind complains about something (valgrind is started automatically
for tests that are marked as such).

Tests can be run manually via pytest or via meson test.

Assisted-by: Claude:claude-claude-opus-4-6
Part-of: <https://gitlab.freedesktop.org/xorg/xserver/-/merge_requests/2187>
This commit is contained in:
Peter Hutterer 2026-04-18 09:44:09 +10:00 committed by Marge Bot
parent 2e4a5aecef
commit bea8d65fc8
16 changed files with 2384 additions and 1 deletions

View file

@ -21,7 +21,7 @@ variables:
REPO_URL_XORGPROTO: 'https://gitlab.freedesktop.org/xorg/proto/xorgproto.git'
XORG_DEBIAN_VERSION: 'bookworm-slim'
XORG_DEBIAN_EXEC: 'env FDO_CI_CONCURRENT=${FDO_CI_CONCURRENT} bash .gitlab-ci/debian-install.sh'
XORG_DEBIAN_TAG: '2026-03-16-image-update'
XORG_DEBIAN_TAG: '2026-04-28-pytest'
XORG_FREEBSD_VERSION: '14.2'
XORG_FREEBSD_EXEC: ''
XORG_FREEBSD_TAG: '2025-02-18-vm-image'
@ -210,6 +210,21 @@ xwayland-nolibdecor:
BUILD_XVFB: false
MESON_EXTRA_ARGS: -Dlibdecor=false ${MESON_DDX_BUILD_ARGS}
meson-asan:
extends: .common-build-and-test
script:
- .gitlab-ci/meson-build.sh --run-test
variables:
MESON_EXTRA_ARGS: >
-Db_sanitize=address
-Db_lundef=false
-Dxwayland=false
-Dxorg=false
-Dxephyr=false
-Dxnest=false
-Dxvfb=true
MESON_TEST_ARGS: --suite pyxtest
mingw-cross-build:
extends: .common-build-and-test
script:
@ -341,6 +356,18 @@ check-whitespace:
script:
- .gitlab-ci/whitespace-check.py $(git ls-files hw/xwayland)
ruff-format-pyxtest:
extends: .fdo.ci-fairy
stage: build-and-test
script:
- uvx ruff format --check --diff test/pyxtest
ruff-check-pyxtest:
extends: .fdo.ci-fairy
stage: build-and-test
script:
- uvx ruff check test/pyxtest
#
# Workflow rules needed due to:
# https://gitlab.freedesktop.org/freedesktop/freedesktop/-/issues/438

View file

@ -112,6 +112,7 @@ apt-get install -y \
pkg-config \
python3-attr \
python3-jinja2 \
python3-pytest \
python3-mako \
python3-numpy \
python3-six \

View file

@ -166,6 +166,7 @@ subdir('bigreq')
subdir('damage')
subdir('sync')
subdir('bugs')
subdir('pyxtest')
if build_xorg
# Tests that require at least some DDX functions in order to fully link

180
test/pyxtest/README.md Normal file
View file

@ -0,0 +1,180 @@
# pyxtest - pytest-based X server test suite
This is a pytest-based test suite that launches X servers and sends crafted
protocol requests to verify that security vulnerabilities and other bugs
are properly handled.
It can be run against Xvfb, Xwayland, or Xorg but the latter is potentially
flaky and requires some setup outside the test suite. The test suite
uses both AddressSanitizer (ASAN) and valgrind for detecting
memory errors such as out-of-bounds reads/writes and use-after-free.
## Running tests
### Via meson
The test suite (via Xvfb) is integrated into the meson tests and can be
run with normal meson commands.
```sh
# run the python test suite
meson test --suite pyxtest
# run a set of tests
meson test pyxtest-test_randr.py
```
Consult the meson documentation for further details.
### Directly with pytest
For running against a custom path, point the test suite at the server binary to
test using environment variables or CLI options:
```sh
# Using environment variable
XVFB_PATH=build/hw/vfb/Xvfb pytest test/pyxtest/ -v
# Using --server-path
pytest test/pyxtest/ -v --server-path=build/hw/vfb/Xvfb
# Using the system Xvfb (fallback if no path is set)
pytest test/pyxtest/ -v
```
The normal pytest options work as expected (`-k` for test selection, etc.)
### Running with AddressSanitizer (ASAN)
ASAN is a compile-time instrumentation that detects memory errors such as
heap buffer overflows and use-after-free. To use ASAN, build the server with
sanitizer support:
```sh
meson setup build-asan -Db_sanitize=address -Db_lundef=false
meson compile -C build-asan
```
Then run the tests against the ASAN-built binary:
```sh
XSERVER_ASAN=1 XVFB_PATH=build-asan/hw/vfb/Xvfb pytest test/pyxtest/ -v
```
When using meson test, `XSERVER_ASAN` is set automatically if the build
was configured with `-Db_sanitize=address`.
Tests marked with `@pytest.mark.asan` are skipped unless `XSERVER_ASAN=1`
is set. When ASAN detects an error, the server process is killed and the
ASAN error report is included in the test failure message.
**Note:** ASAN and valgrind are mutually incompatible. When `XSERVER_ASAN=1`
is set, valgrind wrapping is automatically disabled even if `--valgrind` is
passed.
### Running with valgrind
The `--valgrind` flag runs **all** servers under valgrind:
```sh
pytest test/pyxtest/ -v --valgrind
```
Tests marked with `@pytest.mark.valgrind` automatically run their server
under valgrind even without the `--valgrind` flag. This is useful for
bugs that are only detectable via valgrind (e.g. use of uninitialised
values).
### Testing multiple server types
By default only `Xvfb` is tested. Use `--server-type` to test additional
servers. Tests using the `xserver` fixture are automatically run once per
server type:
```sh
pytest test/pyxtest/ -v --server-type=xvfb --server-type=xwayland
```
## CLI options
| Option | Description |
|--------------------------------|-------------------------------------------|
| `--valgrind` | Run all X servers under valgrind memcheck |
| `--valgrind-suppressions=PATH` | Path to a valgrind suppressions file |
| `--server-type=TYPE` | Server type to test (`xvfb`, `xwayland`, `xorg`). Repeatable. Default: `xvfb` |
| `--server-path=PATH` | Explicit path to the X server binary |
## Environment variables
The server binary is located by checking, in order:
1. `--server-path` CLI option
2. `XVFB_PATH` / `XWAYLAND_PATH` / `XORG_PATH` environment variable
3. `XSERVER_BUILDDIR` environment variable (looks for `hw/vfb/Xvfb` etc.)
4. `build/` directory relative to the source root
5. System `PATH` (prints a warning)
`VALGRIND_SUPPRESSIONS` can point to a suppressions file.
`XSERVER_ASAN` set to `1` indicates the server binary was built with
AddressSanitizer. This is set automatically by meson when
`-Db_sanitize=address` is used. It can also be set manually.
## Test markers
| Marker | Effect |
|-------------------------------|-------------------------------------------------------------|
| `@pytest.mark.asan` | Test requires ASAN (`XSERVER_ASAN=1` must be set |
| `@pytest.mark.valgrind` | Test requires valgrind (skipped if `XSERVER_ASAN` is set) |
| `@pytest.mark.xwayland_only` | Test is skipped unless `--server-type=xwayland` |
| `@pytest.mark.xorg_only` | Test is skipped unless `--server-type=xorg` |
| `@pytest.mark.swapped_client` | Test uses a byte-swapped (big-endian) client connection |
## Writing a new test
1. Create or edit a `test_*.py` file.
2. Use the `xserver` and `xclient` or `xclient_swapped` fixtures to get a
running server and connection:
```python
def test_something(self, xserver, xclient):
# xclient is a RawX11Connection to xserver
...
assert xserver.is_alive, "Server crashed"
```
3. If the test needs an extension, create a fixture that handles
negotiation:
```python
@pytest.fixture
def render_client(xclient):
ext = xclient.query_extension(Extension.RENDER)
if not ext:
pytest.skip("RENDER not available")
# ... send version negotiation ...
return xclient
```
4. Build protocol requests using dataclasses from `proto/`:
```python
from proto import xi
req = xi.XIChangeHierarchyRequest(
opcode=opcode,
num_changes=1,
changes_data=change_data,
)
xclient.send_request(req.to_bytes())
```
5. If a new extension module is needed, create `proto/myext.py` with
constants and `@dataclass` request builders following the existing
pattern.
6. If the bug is only detectable via a memory sanitizer (OOB reads,
use-after-free), mark the test with `@pytest.mark.asan`. Use
`@pytest.mark.valgrind` only for bugs that specifically require
valgrind (e.g. use of uninitialised values that ASAN does not
detect).

117
test/pyxtest/asan.py Normal file
View file

@ -0,0 +1,117 @@
# SPDX-License-Identifier: MIT
#
# AddressSanitizer (ASAN) output parser for extracting memory errors.
from __future__ import annotations
import re
from dataclasses import dataclass
from pathlib import Path
@dataclass
class AsanError:
"""Represents a single ASAN error extracted from log/stderr output."""
kind: str # e.g. "heap-buffer-overflow", "heap-use-after-free"
description: str # The full ERROR line
stack_frames: list[tuple[str, str | None, str | None]] # (func, file, line)
def __str__(self):
lines = [f"{self.kind}: {self.description}"]
for func, srcfile, line in self.stack_frames[:8]:
loc = f" ({srcfile}:{line})" if srcfile else ""
lines.append(f" at {func}{loc}")
return "\n".join(lines)
@classmethod
def from_log(cls, log_path: Path) -> list[AsanError]:
"""Parse ASAN output from a log file.
ASAN may append a PID suffix to the log_path, so we glob for
matching files (e.g. log_path.1234).
"""
errors: list[AsanError] = []
# ASAN appends .<pid> to the log_path
candidates = list(log_path.parent.glob(f"{log_path.name}.*"))
if log_path.is_file():
candidates.append(log_path)
for path in candidates:
try:
text = path.read_text(errors="replace")
except OSError:
continue
errors.extend(cls.from_text(text))
return errors
@classmethod
def from_text(cls, text: str) -> list[AsanError]:
"""Parse ASAN errors from text output (stderr or log file).
Recognises the standard ASAN report format::
==PID==ERROR: AddressSanitizer: heap-buffer-overflow on ...
READ of size 4 at 0x... thread T0
#0 0xaddr in func file.c:123
#1 0xaddr in func2 file2.c:456
SUMMARY: AddressSanitizer: heap-buffer-overflow ...
"""
errors: list[AsanError] = []
# Pattern for the ERROR line
error_re = re.compile(r"==\d+==ERROR: AddressSanitizer: ([\w-]+)(.*)")
# Pattern for stack frames:
# #0 0x... in function_name file.c:123:45
# #0 0x... in function_name (module+0xoffset)
# #0 0x... (module+0xoffset)
frame_re = re.compile(
r"\s+#\d+\s+\S+\s+in\s+(\S+)\s+(\S+?)(?::(\d+))?(?::\d+)?\s*$"
)
# Frame without source info (just address in module)
frame_nosrc_re = re.compile(r"\s+#\d+\s+\S+\s+in\s+(\S+)")
lines = text.splitlines()
i = 0
while i < len(lines):
m = error_re.search(lines[i])
if not m:
i += 1
continue
kind = m.group(1)
description = m.group(2).strip()
# Collect stack frames following the ERROR line
frames: list[tuple[str, str | None, str | None]] = []
i += 1
while i < len(lines):
line = lines[i]
# Stop at blank lines, SUMMARY lines, or new ERROR lines
if not line.strip() or line.strip().startswith("SUMMARY:"):
break
if error_re.search(line):
break
fm = frame_re.match(line)
if fm:
func = fm.group(1)
srcfile = fm.group(2)
lineno = fm.group(3)
# Filter out non-file entries like (module+0xoffset)
if srcfile and srcfile.startswith("("):
srcfile = None
lineno = None
frames.append((func, srcfile, lineno))
else:
fm2 = frame_nosrc_re.match(line)
if fm2:
frames.append((fm2.group(1), None, None))
i += 1
errors.append(cls(kind=kind, description=description, stack_frames=frames))
return errors

280
test/pyxtest/conftest.py Normal file
View file

@ -0,0 +1,280 @@
# SPDX-License-Identifier: MIT
#
# pytest configuration and fixtures for X server testing.
import os
import shutil
import pytest
from pathlib import Path
from xserver import XServerProcess
from xclient import RawX11Connection, X11ConnectionError, XlibConnection
def pytest_addoption(parser):
parser.addoption(
"--valgrind",
action="store_true",
default=False,
help="Run X server under valgrind memcheck",
)
parser.addoption(
"--valgrind-suppressions",
default=None,
help="Path to valgrind suppressions file",
)
parser.addoption(
"--server-type",
action="append",
default=[],
help="Server types to test (xvfb, xwayland, xorg). "
"Can be specified multiple times. Default: xvfb",
)
parser.addoption(
"--server-path", default=None, help="Explicit path to the X server binary"
)
def pytest_configure(config):
config.addinivalue_line(
"markers", "valgrind: mark test as requiring valgrind to detect the issue"
)
config.addinivalue_line(
"markers",
"asan: mark test as requiring AddressSanitizer to detect the issue",
)
config.addinivalue_line(
"markers", "xwayland_only: mark test as only applicable to Xwayland"
)
config.addinivalue_line(
"markers", "xorg_only: mark test as only applicable to Xorg"
)
config.addinivalue_line(
"markers", "swapped_client: mark test as requiring a byte-swapped client"
)
def get_server_types(config) -> list[str]:
"""Get the list of server types to test, default to xvfb."""
return config.getoption("--server-type") or ["xvfb"]
def get_valgrind_suppressions(config) -> Path | None:
"""Find the valgrind suppressions file."""
explicit = config.getoption("--valgrind-suppressions")
if explicit:
return Path(explicit)
env = os.environ.get("VALGRIND_SUPPRESSIONS")
if env:
f = Path(env)
if f.is_file():
return f
# Try next to this file (test/pyxtest/valgrind.suppressions)
default = Path(__file__).resolve().parent / "valgrind.suppressions"
if default.is_file():
return default
return None
def is_valgrind_available() -> bool:
"""Check if valgrind is available on the system."""
return shutil.which("valgrind") is not None
def is_asan_build() -> bool:
"""Check if the X server binary was built with AddressSanitizer.
This is determined by the ``XSERVER_ASAN`` environment variable
which is set by meson when ``-Db_sanitize=address`` is used.
It can also be set manually when running pytest directly.
"""
return os.environ.get("XSERVER_ASAN") == "1"
def pytest_collection_modifyitems(config, items):
"""Skip tests based on markers and configuration."""
server_types = get_server_types(config)
asan = is_asan_build()
for item in items:
if item.get_closest_marker("xwayland_only") and "xwayland" not in server_types:
item.add_marker(pytest.mark.skip(reason="Test only applies to Xwayland"))
if item.get_closest_marker("xorg_only") and "xorg" not in server_types:
item.add_marker(pytest.mark.skip(reason="Test only applies to Xorg"))
if item.get_closest_marker("asan") and not asan:
item.add_marker(
pytest.mark.skip(
reason="Test requires ASAN build (XSERVER_ASAN=1 not set)"
)
)
if item.get_closest_marker("valgrind") and asan:
item.add_marker(
pytest.mark.skip(
reason="Test requires valgrind, incompatible with ASAN build"
)
)
def pytest_generate_tests(metafunc):
"""Parametrize tests that use the xserver fixture over all configured
server types so each test runs once per type."""
if "xserver" in metafunc.fixturenames:
server_types = get_server_types(metafunc.config)
metafunc.parametrize("xserver", server_types, indirect=True)
def _start_server(request, server_type, log_file=None):
"""Start an X server of the given type for a test.
Shared implementation for the xvfb, xwayland, xorg, and generic
xserver fixtures. Valgrind is enabled if the test is marked with
``@pytest.mark.valgrind`` or if ``--valgrind`` is passed on the
command line. ASAN is enabled automatically when ``XSERVER_ASAN=1``
is set in the environment (typically by meson when the server is
built with ``-Db_sanitize=address``).
"""
use_valgrind = (
request.config.getoption("--valgrind")
or request.node.get_closest_marker("valgrind") is not None
)
if use_valgrind and not is_valgrind_available():
pytest.skip("valgrind not available")
use_asan = is_asan_build()
server_path = request.config.getoption("--server-path")
suppressions = get_valgrind_suppressions(request.config)
server = XServerProcess(
server_type=server_type,
valgrind=use_valgrind,
valgrind_suppressions=suppressions,
asan=use_asan,
server_path=server_path,
log_file=log_file,
)
try:
server.start(timeout=60 if use_valgrind else 15)
except (FileNotFoundError, RuntimeError) as e:
msg = f"Failed to start {server_type} server: {e}"
if server.log_file:
msg += f"\nLog file: {server.log_file}"
pytest.fail(msg)
yield server
# Check if the server was killed by ASAN before we stop it
asan_errors = []
if use_asan and not server.is_alive:
asan_errors = server.get_asan_errors()
valgrind_errors = server.stop()
if use_asan and asan_errors:
msg = f"AddressSanitizer found {len(asan_errors)} error(s):\n\n"
msg += "\n\n".join(str(e) for e in asan_errors)
pytest.fail(msg)
if use_valgrind and valgrind_errors:
serious = [
e
for e in valgrind_errors
if e.kind
not in (
"Leak_DefinitelyLost",
"Leak_PossiblyLost",
"Leak_StillReachable",
"Leak_IndirectlyLost",
"SyscallParam",
)
]
if serious:
msg = f"Valgrind found {len(serious)} memory error(s):\n\n"
msg += "\n\n".join(str(e) for e in serious)
pytest.fail(msg)
@pytest.fixture
def xserver(request, tmp_path):
"""
Start an X server for this test.
Automatically parametrized via ``pytest_generate_tests`` so every
test that uses this fixture runs once per configured --server-type.
A fresh server per test, killed afterward. With --valgrind,
valgrind memory errors cause test failure during teardown.
For a fixture that targets a specific server type use the xvfb,
xwayland, or xorg fixtures instead.
"""
server_type = request.param
# Skip server-specific markers
if request.node.get_closest_marker("xwayland_only") and server_type != "xwayland":
pytest.skip("Test only applies to Xwayland")
if request.node.get_closest_marker("xorg_only") and server_type != "xorg":
pytest.skip("Test only applies to Xorg")
kwargs = {}
if server_type == "xorg":
kwargs["log_file"] = tmp_path / f"{server_type}.log"
yield from _start_server(request, server_type, **kwargs)
@pytest.fixture
def xvfb(request, tmp_path):
"""Start an Xvfb server for this test."""
if "xvfb" not in get_server_types(request.config):
pytest.skip("Xvfb not in --server-type list")
yield from _start_server(request, "xvfb")
@pytest.fixture
def xwayland(request, tmp_path):
"""Start an Xwayland server for this test."""
if "xwayland" not in get_server_types(request.config):
pytest.skip("Xwayland not in --server-type list")
yield from _start_server(request, "xwayland")
@pytest.fixture
def xorg(request, tmp_path):
"""Start an Xorg server for this test."""
if "xorg" not in get_server_types(request.config):
pytest.skip("Xorg not in --server-type list")
yield from _start_server(request, "xorg", log_file=tmp_path / "xorg.log")
@pytest.fixture
def xclient(xserver):
"""Create a raw X11 connection to the test server."""
conn = RawX11Connection(xserver.display_num)
yield conn
conn.close()
@pytest.fixture
def xclient_swapped(xserver):
"""Create a big-endian (byte-swapped) X11 connection."""
try:
conn = RawX11Connection(xserver.display_num, swapped=True)
except X11ConnectionError as e:
if "endian" in str(e).lower():
pytest.skip("Server does not accept big-endian clients")
raise
yield conn
conn.close()
@pytest.fixture
def xlib_client(xserver):
"""Create a python-xlib connection for higher-level X operations."""
conn = XlibConnection(xserver.display_num)
yield conn
conn.close()

View file

@ -0,0 +1,17 @@
#!/bin/bash
#
# Helper script used by meson to ensure each test_foo.py file has
# a corresponding entry in the tests_pyxtest array
SOURCEDIR="${1:-@SOURCEDIR@}"
shift
TESTS="${*:-@TESTS@}"
FILES=$(find "$SOURCEDIR" -name "test_*.py" -printf "%f\n")
DIFF=$(diff -u <(echo "$TESTS" | tr " " "\n" | sort) <(echo "$FILES" | sort))
if [[ $? -ne 0 ]]; then
echo "ERROR: Missing test file in meson tests list:" >&2
echo "$DIFF" >&2
exit 1
fi

85
test/pyxtest/meson.build Normal file
View file

@ -0,0 +1,85 @@
# pytest-based test suite for the X server
#
# Uses pytest to launch X servers which allows us to e.g. send crafted protocol
# requests that exercise specific security fixes. Supports Xvfb, Xwayland,
# and Xorg, with optional valgrind and AddressSanitizer (ASAN) integration
# for detecting use-after-free and out-of-bounds memory access.
#
# Run with: meson test --suite pyxtest
# Or directly: pytest test/pyxtest/ -v
pymod = import('python')
pytest = find_program('pytest', 'pytest-3', required: false)
if pytest.found()
pyxtest_env = environment()
pyxtest_env.set('XSERVER_BUILDDIR', meson.project_build_root())
pyxtest_env.set('XSERVER_DIR', meson.project_source_root())
pyxtest_env.set('PYTHONDONTWRITEBYTECODE', '1')
if build_xvfb
pyxtest_env.set('XVFB_PATH', xvfb_server.full_path())
endif
if build_xwayland
pyxtest_env.set('XWAYLAND_PATH', xwayland_server.full_path())
endif
# Tell the test suite if the server was built with AddressSanitizer
if 'address' in get_option('b_sanitize')
pyxtest_env.set('XSERVER_ASAN', '1')
endif
# We are *not* setting XORG_PATH in meson because we don't want
# to start lots of Xorg instances as part of a meson test run.
#
# if build_xorg
# pyxtest_env.set('XORG_PATH', xorg_server.full_path())
# endif
pytest_args = [
'-v',
'--tb=short',
]
# pytest-xdist speeds up the test suite by running tests in parallel,
# but it's not required
if pymod.find_installation('python3', modules: ['xdist'], required: false).found()
pytest_args += ['-n', 'auto']
endif
# pytest-timeout means we can fail in pytest before hitting the meson limits
if pymod.find_installation('python3', modules: ['pytest_timeout'], required: false).found()
pytest_args += ['--timeout=120']
endif
# This needs to be kept in sync with the test_foo.py files in the tree
tests_pyxtest = [
]
test_list_data = configuration_data()
test_list_data.set('TESTS', '\n'.join(tests_pyxtest))
test_list_data.set('SOURCEDIR', meson.current_source_dir())
test_list_check = configure_file(
input: 'ensure-meson-tests.sh',
output: 'ensure-meson-tests.sh',
configuration: test_list_data,
)
test('ensure-meson-tests',
test_list_check,
suite:'pyxtest'
)
# As part of the normal meson test run we only run the Xvfb tests
if build_xvfb
foreach t: tests_pyxtest
test(f'pyxtest-@t@',
pytest,
args: pytest_args + [files(t)],
env: pyxtest_env,
timeout: 600,
suite: 'pyxtest',
)
endforeach
endif
endif

View file

@ -0,0 +1,2 @@
# SPDX-License-Identifier: MIT
# Protocol request builders for X server testing.

View file

@ -0,0 +1,24 @@
# SPDX-License-Identifier: MIT
#
# BIG-REQUESTS extension protocol request builders
import struct
from dataclasses import dataclass
# BIG-REQUESTS minor opcodes
BigRequestsEnable = 0
@dataclass
class BigRequestsEnableRequest:
"""BIG-REQUESTS Enable request."""
opcode: int
def to_bytes(self, byte_order: str = "<") -> bytes:
return struct.pack(
f"{byte_order}BBH",
self.opcode,
BigRequestsEnable, # sub-opcode
1, # request length
)

118
test/pyxtest/proto/x11.py Normal file
View file

@ -0,0 +1,118 @@
# SPDX-License-Identifier: MIT
#
# Core X11 protocol request builders
import struct
from dataclasses import dataclass
# Core protocol opcodes
CreateWindow = 1
CreatePixmap = 53
InternAtom = 16
QueryExtension = 98
def _pad(data: bytes) -> bytes:
"""Pad data to a 4-byte boundary."""
return data + b"\x00" * ((4 - len(data) % 4) % 4)
@dataclass
class CreateWindowRequest:
"""X11 CreateWindow request."""
wid: int
parent: int
x: int
y: int
width: int
height: int
depth: int
border_width: int = 0
window_class: int = 1 # InputOutput
visual: int = 0
value_mask: int = 0x0800 # override-redirect
override_redirect: int = 0
def to_bytes(self, byte_order: str = "<") -> bytes:
return struct.pack(
f"{byte_order}BBH II hhHHHH II I",
CreateWindow, # opcode
self.depth,
9, # request length in 4-byte units
self.wid,
self.parent,
self.x,
self.y,
self.width,
self.height,
self.border_width,
self.window_class,
self.visual,
self.value_mask,
self.override_redirect,
)
@dataclass
class CreatePixmapRequest:
"""X11 CreatePixmap request."""
pid: int
drawable: int
width: int
height: int
depth: int
def to_bytes(self, byte_order: str = "<") -> bytes:
return struct.pack(
f"{byte_order}BBHIIHH",
CreatePixmap, # opcode
self.depth,
4, # request length
self.pid,
self.drawable,
self.width,
self.height,
)
@dataclass
class InternAtomRequest:
"""X11 InternAtom request."""
name: str
only_if_exists: bool = False
def to_bytes(self, byte_order: str = "<") -> bytes:
name_bytes = self.name.encode("ascii")
padded = _pad(name_bytes)
req_len = (8 + len(padded)) // 4
return (
struct.pack(
f"{byte_order}BBHHxx",
InternAtom, # opcode
1 if self.only_if_exists else 0,
req_len,
len(name_bytes),
)
+ padded
)
@dataclass
class QueryExtensionRequest:
"""X11 QueryExtension request."""
name: str
def to_bytes(self, byte_order: str = "<") -> bytes:
name_bytes = self.name.encode("ascii")
padded = _pad(name_bytes)
req_len = (8 + len(padded)) // 4
return (
struct.pack(
f"{byte_order}BBHHxx", QueryExtension, 0, req_len, len(name_bytes)
)
+ padded
)

473
test/pyxtest/proto/xkb.py Normal file
View file

@ -0,0 +1,473 @@
# SPDX-License-Identifier: MIT
#
# XKB protocol request builders
#
# All fields are controllable via keyword arguments so tests can craft
# malformed requests with inconsistent lengths, out-of-range indices, etc.
import struct
from dataclasses import dataclass
# XKB minor opcodes
XkbUseExtension = 0
XkbSelectEvents = 1
XkbBell = 3
XkbGetState = 4
XkbLatchLockState = 5
XkbGetControls = 6
XkbSetControls = 7
XkbGetMap = 8
XkbSetMap = 9
XkbGetCompatMap = 10
XkbSetCompatMap = 11
XkbGetIndicatorState = 12
XkbGetIndicatorMap = 13
XkbSetIndicatorMap = 14
XkbGetNamedIndicator = 15
XkbSetNamedIndicator = 16
XkbGetNames = 17
XkbSetNames = 18
XkbGetGeometry = 19
XkbSetGeometry = 20
XkbPerClientFlags = 21
XkbListComponents = 22
XkbGetKbdByName = 23
XkbGetDeviceInfo = 24
XkbSetDeviceInfo = 25
XkbSetDebuggingFlags = 101
# XKB constants
XkbUseCoreKbd = 0x0100
# SetMap present flags
XkbKeyTypesMask = 0x0001
XkbKeySymsMask = 0x0002
XkbModifierMapMask = 0x0004
XkbExplicitComponentsMask = 0x0008
XkbKeyActionsMask = 0x0010
XkbKeyBehaviorsMask = 0x0020
XkbVirtualModsMask = 0x0040
XkbVirtualModMapMask = 0x0080
# SetMap flags
XkbSetMapResizeTypes = 1
XkbSetMapRecomputeActions = 2
XkbNumRequiredTypes = 4
XkbMaxLegalKeyCode = 255
XkbNoShape = 0xFF
@dataclass
class UseExtensionRequest:
"""XkbUseExtension request (minor opcode 0)."""
opcode: int
major: int = 1
minor: int = 0
def to_bytes(self, byte_order: str = "<") -> bytes:
return struct.pack(
f"{byte_order}BBHHH xx",
self.opcode,
XkbUseExtension,
3, # 12 bytes = 3 words
self.major,
self.minor,
)
# Keep as a module-level function for backward compatibility with
# xclient.py which calls xkb.use_extension() directly.
def use_extension(
opcode: int, *, major: int = 1, minor: int = 0, byte_order: str = "<"
) -> bytes:
"""Build XkbUseExtension request (minor opcode 0)."""
return UseExtensionRequest(opcode=opcode, major=major, minor=minor).to_bytes(
byte_order
)
@dataclass
class SetMapRequest:
"""
xkbSetMapReq (minor opcode 9).
The 36-byte header is followed by variable-length payload data
whose format depends on the 'present' bitmask. For security testing,
payload is passed as raw bytes, allowing intentionally malformed data.
"""
opcode: int
device_spec: int = XkbUseCoreKbd
present: int = 0
flags: int = 0
min_key_code: int = 8
max_key_code: int = 255
first_type: int = 0
n_types: int = 0
first_key_sym: int = 0
n_key_syms: int = 0
total_syms: int = 0
first_key_act: int = 0
n_key_acts: int = 0
total_acts: int = 0
first_key_behavior: int = 0
n_key_behaviors: int = 0
total_key_behaviors: int = 0
first_key_explicit: int = 0
n_key_explicit: int = 0
total_key_explicit: int = 0
first_mod_map_key: int = 0
n_mod_map_keys: int = 0
total_mod_map_keys: int = 0
first_vmod_map_key: int = 0
n_vmod_map_keys: int = 0
total_vmod_map_keys: int = 0
virtual_mods: int = 0
payload: bytes = b""
length_override: int | None = None
def to_bytes(self, byte_order: str = "<") -> bytes:
total_bytes = 36 + len(self.payload)
pad_len = (4 - total_bytes % 4) % 4
total_bytes += pad_len
length = (
self.length_override
if self.length_override is not None
else total_bytes // 4
)
header = struct.pack(
f"{byte_order}BBH" # reqType, xkbReqType, length
f"HHH" # deviceSpec, present, flags
f"BBB" # minKeyCode, maxKeyCode, firstType
f"B" # nTypes
f"BB" # firstKeySym, nKeySyms
f"H" # totalSyms
f"BB" # firstKeyAct, nKeyActs
f"H" # totalActs
f"BB" # firstKeyBehavior, nKeyBehaviors
f"B" # totalKeyBehaviors
f"BB" # firstKeyExplicit, nKeyExplicit
f"B" # totalKeyExplicit
f"BB" # firstModMapKey, nModMapKeys
f"B" # totalModMapKeys
f"BB" # firstVModMapKey, nVModMapKeys
f"B" # totalVModMapKeys
f"H", # virtualMods
self.opcode,
XkbSetMap,
length,
self.device_spec,
self.present,
self.flags,
self.min_key_code,
self.max_key_code,
self.first_type,
self.n_types,
self.first_key_sym,
self.n_key_syms,
self.total_syms,
self.first_key_act,
self.n_key_acts,
self.total_acts,
self.first_key_behavior,
self.n_key_behaviors,
self.total_key_behaviors,
self.first_key_explicit,
self.n_key_explicit,
self.total_key_explicit,
self.first_mod_map_key,
self.n_mod_map_keys,
self.total_mod_map_keys,
self.first_vmod_map_key,
self.n_vmod_map_keys,
self.total_vmod_map_keys,
self.virtual_mods,
)
return header + self.payload + b"\x00" * pad_len
@dataclass
class KeyTypeWire:
"""
xkbKeyTypeWireDesc (8 bytes) with optional entries/preserve data.
"""
num_levels: int = 2
has_preserve: bool = False
n_map_entries: int = 0
def to_bytes(self, byte_order: str = "<") -> bytes:
header = struct.pack(
f"{byte_order}BBH BBBB",
0,
0,
0, # mask, realMods, virtualMods
self.num_levels,
self.n_map_entries,
1 if self.has_preserve else 0, # preserve
0, # pad
)
# Map entries: 4 bytes each (level(1), realMods(1), virtualMods(2))
entries = b"\x00" * (4 * self.n_map_entries)
# Preserve entries: 4 bytes each (realMods(1), pad(1), virtualMods(2))
preserve = b"\x00" * (4 * self.n_map_entries) if self.has_preserve else b""
return header + entries + preserve
@dataclass
class SetCompatMapRequest:
"""xkbSetCompatMapReq (minor opcode 11). Header is 16 bytes."""
opcode: int
device_spec: int = XkbUseCoreKbd
recompute_actions: int = 0
truncate_si: int = 0
groups: int = 0
first_si: int = 0
n_si: int = 0
payload: bytes = b""
length_override: int | None = None
def to_bytes(self, byte_order: str = "<") -> bytes:
total_bytes = 16 + len(self.payload)
pad_len = (4 - total_bytes % 4) % 4
total_bytes += pad_len
length = (
self.length_override
if self.length_override is not None
else total_bytes // 4
)
header = struct.pack(
f"{byte_order}BBH" # reqType, xkbReqType, length
f"H" # deviceSpec
f"xB" # pad, recomputeActions
f"B" # truncateSI
f"B" # groups
f"H" # firstSI
f"H" # nSI
f"xx", # pad
self.opcode,
XkbSetCompatMap,
length,
self.device_spec,
self.recompute_actions,
self.truncate_si,
self.groups,
self.first_si,
self.n_si,
)
return header + self.payload + b"\x00" * pad_len
@dataclass
class SymInterpretWire:
"""A single xkbSymInterpretWireDesc (16 bytes)."""
def to_bytes(self, byte_order: str = "<") -> bytes:
return struct.pack(
f"{byte_order}I" # sym (KeySym)
f"BBBx" # mods, match, virtualMod, pad
f"8s", # action (8 bytes)
0,
0,
0,
0,
b"\x00" * 8,
)
@dataclass
class SetGeometryRequest:
"""xkbSetGeometryReq (minor opcode 20). Header is 28 bytes."""
opcode: int
device_spec: int = XkbUseCoreKbd
n_shapes: int = 0
n_sections: int = 0
name_atom: int = 0
width_mm: int = 100
height_mm: int = 100
n_properties: int = 0
n_colors: int = 0
n_doodads: int = 0
n_key_aliases: int = 0
base_color_ndx: int = 0
label_color_ndx: int = 1
payload: bytes = b""
length_override: int | None = None
def to_bytes(self, byte_order: str = "<") -> bytes:
total_bytes = 28 + len(self.payload)
pad_len = (4 - total_bytes % 4) % 4
total_bytes += pad_len
length = (
self.length_override
if self.length_override is not None
else total_bytes // 4
)
header = struct.pack(
f"{byte_order}BBH" # reqType, xkbReqType, length
f"H" # deviceSpec
f"BB" # nShapes, nSections
f"I" # name (Atom)
f"HH" # widthMM, heightMM
f"HH" # nProperties, nColors
f"HH" # nDoodads, nKeyAliases
f"BB" # baseColorNdx, labelColorNdx
f"xx", # pad
self.opcode,
XkbSetGeometry,
length,
self.device_spec,
self.n_shapes,
self.n_sections,
self.name_atom,
self.width_mm,
self.height_mm,
self.n_properties,
self.n_colors,
self.n_doodads,
self.n_key_aliases,
self.base_color_ndx,
self.label_color_ndx,
)
return header + self.payload + b"\x00" * pad_len
@dataclass
class CountedString:
"""A counted string: CARD16 length + chars, padded to 4 bytes."""
value: str | bytes
def to_bytes(self, byte_order: str = "<") -> bytes:
s = self.value
if isinstance(s, str):
s = s.encode("ascii")
length = len(s)
total = 2 + length # CARD16 header + string bytes
pad_len = (4 - total % 4) % 4
return struct.pack(f"{byte_order}H", length) + s + b"\x00" * pad_len
# Keep as a module-level function for convenience since tests use it
# inline in payload construction.
def build_counted_string(s: str | bytes, byte_order: str = "<") -> bytes:
"""Build a counted string: CARD16 length + chars, padded to 4 bytes."""
return CountedString(value=s).to_bytes(byte_order)
@dataclass
class ShapeWire:
"""
xkbShapeWireDesc (4 bytes) + outline data.
Each outline: header(4 bytes) + nPoints * point(4 bytes).
"""
n_outlines: int = 1
primary_ndx: int = 0
approx_ndx: int = 0
def to_bytes(self, byte_order: str = "<") -> bytes:
header = struct.pack(
f"{byte_order}BBBx",
self.n_outlines,
self.primary_ndx,
self.approx_ndx,
)
outlines = b""
for _ in range(self.n_outlines):
outline_hdr = struct.pack(f"{byte_order}BBxx", 1, 0) # 1 point
point = struct.pack(f"{byte_order}hh", 0, 0)
outlines += outline_hdr + point
return header + outlines
@dataclass
class SectionWire:
"""xkbSectionWireDesc (20 bytes) + row data."""
n_rows: int = 1
n_doodads: int = 0
n_overlays: int = 0
def to_bytes(self, byte_order: str = "<") -> bytes:
header = struct.pack(
f"{byte_order}I hh HH h BBBBxx",
0, # name (Atom)
0,
0, # top, left
100,
100, # width, height
0, # angle
0, # priority
self.n_rows,
self.n_doodads,
self.n_overlays,
)
rows = b""
for _ in range(self.n_rows):
rows += struct.pack(f"{byte_order}hhBBxx", 0, 0, 0, 0)
return header + rows
@dataclass
class OverlayWire:
"""xkbOverlayWireDesc + overlay rows."""
n_rows: int = 1
rows_under: list[int] | None = None
def to_bytes(self, byte_order: str = "<") -> bytes:
header = struct.pack(f"{byte_order}IBxxx", 0, self.n_rows)
rows_under = (
self.rows_under if self.rows_under is not None else list(range(self.n_rows))
)
rows = b""
for row_under in rows_under:
rows += struct.pack(f"{byte_order}BBxx", row_under, 0)
return header + rows
@dataclass
class GetKbdByNameRequest:
"""xkbGetKbdByNameReq (minor opcode 23)."""
opcode: int
device_spec: int = XkbUseCoreKbd
need: int = 0
want: int = 0
load: int = 0
payload: bytes = b""
length_override: int | None = None
def to_bytes(self, byte_order: str = "<") -> bytes:
total_bytes = 12 + len(self.payload)
pad_len = (4 - total_bytes % 4) % 4
total_bytes += pad_len
length = (
self.length_override
if self.length_override is not None
else total_bytes // 4
)
header = struct.pack(
f"{byte_order}BBH HHHBx",
self.opcode,
XkbGetKbdByName,
length,
self.device_spec,
self.need,
self.want,
self.load,
)
return header + self.payload + b"\x00" * pad_len

94
test/pyxtest/valgrind.py Normal file
View file

@ -0,0 +1,94 @@
# SPDX-License-Identifier: MIT
#
# Valgrind XML output parser for extracting memory errors and suppressions.
from __future__ import annotations
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from pathlib import Path
def _suppression_from_element(elem: ET.Element) -> str:
"""Format a ``<suppression>`` XML element as a suppression file entry."""
name = elem.findtext("sname", "")
kind = elem.findtext("skind", "")
auxiliary = elem.findtext("skaux")
lines = ["{", f" {name}", f" {kind}"]
if auxiliary:
lines.append(f" {auxiliary}")
for sframe in elem.iter("sframe"):
fun = sframe.findtext("fun")
obj = sframe.findtext("obj")
if fun is not None:
lines.append(f" fun:{fun}")
elif obj is not None:
lines.append(f" obj:{obj}")
else:
lines.append(" ...")
lines.append("}")
return "\n".join(lines)
@dataclass
class ValgrindError:
"""Represents a single valgrind error extracted from XML output."""
kind: str # e.g. "InvalidRead", "InvalidWrite"
what: str # human-readable description
stack_frames: list[tuple[str, str | None, str | None]] # (func, file, line)
suppression: str | None = None # ready-to-paste suppression file entry
def __str__(self):
lines = [f"{self.kind}: {self.what}"]
for func, srcfile, line in self.stack_frames[:8]:
loc = f" ({srcfile}:{line})" if srcfile else ""
lines.append(f" at {func}{loc}")
if self.suppression:
lines.append("")
lines.append(self.suppression)
return "\n".join(lines)
@classmethod
def from_xml(cls, xml_path: Path) -> list[ValgrindError]:
"""Parse valgrind XML output and return a list of ValgrindError."""
if not xml_path.is_file():
return []
try:
tree = ET.parse(xml_path)
except ET.ParseError:
return []
errors = []
root = tree.getroot()
for error_elem in root.iter("error"):
kind_elem = error_elem.find("kind")
kind = kind_elem.text if kind_elem is not None else "Unknown"
assert kind is not None
what_elem = error_elem.find("what")
if what_elem is None:
what_elem = error_elem.find("xwhat/text")
what = what_elem.text if what_elem is not None else "Unknown error"
assert what is not None
frames = []
stack_elem = error_elem.find("stack")
if stack_elem is not None:
for frame in stack_elem.iter("frame"):
fn = frame.findtext("fn", "???")
srcfile = frame.findtext("file", "")
line = frame.findtext("line", "")
frames.append((fn, srcfile, line))
suppression = None
supp_elem = error_elem.find("suppression")
if supp_elem is not None:
suppression = _suppression_from_element(supp_elem)
errors.append(ValgrindError(kind, what, frames, suppression))
return errors

View file

@ -0,0 +1,18 @@
# Debian complains about invalid reads
# InvalidRead: Invalid read of size 8
# at strncmp (strcmp-sse2.S:162)
# at is_dst (dl-load.c:216)
# at _dl_dst_count (dl-load.c:253)
# at expand_dynamic_string_token (dl-load.c:395)
# at fillin_rpath.isra.0 (dl-load.c:483)
# at decompose_rpath (dl-load.c:654)
# at _dl_map_object (dl-load.c:2111)
# at openaux (dl-deps.c:64)
{
Glibc_LD_SO_Linker_Noise
Memcheck:Addr8
fun:strncmp
...
fun:_dl_map_object_deps
...
}

517
test/pyxtest/xclient.py Normal file
View file

@ -0,0 +1,517 @@
# SPDX-License-Identifier: MIT
#
# X11 client connection utilities
import select
import socket
import struct
import time
from dataclasses import dataclass
from enum import StrEnum
from proto.bigrequests import BigRequestsEnableRequest
from proto.x11 import (
CreatePixmapRequest,
CreateWindowRequest,
InternAtomRequest,
QueryExtensionRequest,
)
from proto import xkb
class X11ConnectionError(Exception):
"""Raised when the X11 connection fails."""
pass
@dataclass
class XExtensionData:
"""Cached result of a QueryExtension reply."""
opcode: int
first_event: int
first_error: int
class Extension(StrEnum):
"""X11 extension wire names as used in QueryExtension requests."""
BIG_REQUESTS = "BIG-REQUESTS"
COMPOSITE = "Composite"
DAMAGE = "DAMAGE"
DBE = "DOUBLE-BUFFER"
DPMS = "DPMS"
DRI2 = "DRI2"
DRI3 = "DRI3"
GENERIC_EVENT = "Generic Event Extension"
GLX = "GLX"
MIT_SCREEN_SAVER = "MIT-SCREEN-SAVER"
MIT_SHM = "MIT-SHM"
PRESENT = "Present"
RANDR = "RANDR"
RECORD = "RECORD"
RENDER = "RENDER"
SECURITY = "SECURITY"
SHAPE = "SHAPE"
SYNC = "SYNC"
XC_MISC = "XC-MISC"
XF86BIGFONT = "XFree86-Bigfont"
XF86DGA = "XFree86-DGA"
XF86VIDMODE = "XFree86-VidModeExtension"
XFIXES = "XFIXES"
XI = "XInputExtension"
XRES = "X-Resource"
XINERAMA = "XINERAMA"
XKB = "XKEYBOARD"
XTEST = "XTEST"
XVIDEO = "XVideo"
XVIDEO_MC = "XVideo-MotionCompensation"
@dataclass
class X11Error:
"""An X11 error reply from the server."""
response_type: int
error_code: int
sequence: int
resource_id: int
minor_code: int
major_code: int
@classmethod
def from_data(cls, data: bytes, byte_order: str = "<") -> "X11Error":
if len(data) < 32:
data = data + b"\x00" * (32 - len(data))
response_type, error_code, sequence, resource_id, minor_code, major_code = (
struct.unpack_from(f"{byte_order}BBHIHB", data)
)
return cls(
response_type, error_code, sequence, resource_id, minor_code, major_code
)
def __repr__(self):
return (
f"<X11Error code={self.error_code} seq={self.sequence} "
f"major={self.major_code} minor={self.minor_code}>"
)
@dataclass
class X11Reply:
"""An X11 reply from the server."""
data: bytes
response_type: int
sequence: int
length: int
@classmethod
def from_data(cls, data: bytes, byte_order: str = "<") -> "X11Reply":
if len(data) >= 8:
response_type = data[0]
sequence = struct.unpack_from(f"{byte_order}H", data, 2)[0]
length = struct.unpack_from(f"{byte_order}I", data, 4)[0]
else:
response_type = sequence = length = 0
return cls(data, response_type, sequence, length)
def __repr__(self):
return (
f"<X11Reply seq={self.sequence} "
f"extra_len={self.length} total={len(self.data)}>"
)
class RawX11Connection:
"""
Minimal X11 connection for sending raw (possibly malformed) requests.
Set swapped=True for testing byte-swap code paths (SProcXxx).
"""
def __init__(self, display_num, swapped=False):
self.display_num = display_num
self.swapped = swapped
self._byte_order = ">" if swapped else "<"
self.sock = None
self.seq = 0
self.root_window = 0
self.root_visual = 0
self.root_depth = 0
self._resource_id_base = 0
self._resource_id_mask = 0
self._next_resource_id = 0
self._extensions = {}
self._connect()
def _connect(self):
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
path = f"/tmp/.X11-unix/X{self.display_num}"
try:
self.sock.connect(path)
except (ConnectionRefusedError, FileNotFoundError) as e:
self.sock.close()
self.sock = None
raise X11ConnectionError(f"Cannot connect to {path}: {e}")
try:
self._handshake()
except Exception:
self.sock.close()
self.sock = None
raise
def _handshake(self):
byte_order = 0x42 if self.swapped else 0x6C
bo = self._byte_order
setup = struct.pack(f"{bo}BxHHHHxx", byte_order, 11, 0, 0, 0)
assert self.sock
self.sock.sendall(setup)
header = self._recv_exact(8)
status = header[0]
if status == 0:
reason_len = header[1]
extra_len = struct.unpack_from(f"{bo}H", header, 6)[0]
extra = self._recv_exact(extra_len * 4)
reason = extra[:reason_len].decode("ascii", errors="replace")
raise X11ConnectionError(f"X server refused connection: {reason}")
if status == 2:
raise X11ConnectionError("X server requires authentication")
if status != 1:
raise X11ConnectionError(f"Unexpected setup status: {status}")
extra_len = struct.unpack_from(f"{bo}H", header, 6)[0]
data = self._recv_exact(extra_len * 4)
(_, res_base, res_mask, _, vendor_len, _, num_screens, num_formats) = (
struct.unpack_from(f"{bo}IIIIHH BB", data, 0)
)
self._resource_id_base = res_base
self._resource_id_mask = res_mask
self._next_resource_id = 1
offset = 32 + ((vendor_len + 3) & ~3) + num_formats * 8
if num_screens > 0 and offset + 40 <= len(data):
(
self.root_window,
_,
_,
_,
_,
_,
_,
_,
_,
_,
_,
self.root_visual,
_,
_,
self.root_depth,
) = struct.unpack_from(f"{bo}IIIIIHHHHHHI BBB", data, offset)
# --- Core protocol helpers ---
def alloc_id(self) -> int:
xid = self._resource_id_base | self._next_resource_id
self._next_resource_id += 1
return xid
def send_request(self, data: bytes) -> int:
self.seq += 1
assert self.sock
self.sock.sendall(data)
return self.seq
def recv_response(self, timeout: float = 5.0) -> X11Error | X11Reply | None:
ready = select.select([self.sock], [], [], timeout)
if not ready[0]:
return None
try:
header = self._recv_exact(32, timeout=timeout)
except (ConnectionResetError, BrokenPipeError, OSError, X11ConnectionError):
return None
if not header:
return None
rtype = header[0]
bo = self._byte_order
if rtype == 0:
return X11Error.from_data(header, bo)
elif rtype == 1:
extra_len = struct.unpack_from(f"{bo}I", header, 4)[0]
if extra_len > 0:
try:
extra = self._recv_exact(extra_len * 4, timeout=timeout)
return X11Reply.from_data(header + extra, bo)
except (
ConnectionResetError,
BrokenPipeError,
OSError,
X11ConnectionError,
):
return X11Reply.from_data(header, bo)
return X11Reply.from_data(header, bo)
else:
return X11Reply.from_data(header, bo)
def flush_responses(self, timeout: float = 0.5) -> list[X11Error | X11Reply]:
responses: list[X11Error | X11Reply] = []
while True:
resp = self.recv_response(timeout=timeout)
if resp is None:
break
responses.append(resp)
return responses
def is_connected(self) -> bool:
assert self.sock
try:
ready = select.select([self.sock], [], [], 0)
if ready[0]:
data = self.sock.recv(1, socket.MSG_PEEK)
return len(data) > 0
return True
except (ConnectionResetError, BrokenPipeError, OSError):
return False
def wait_for_disconnect(self, timeout: float = 5.0) -> bool:
assert self.sock
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
remaining = deadline - time.monotonic()
if remaining <= 0:
break
ready = select.select([self.sock], [], [], min(remaining, 0.5))
if ready[0]:
try:
data = self.sock.recv(4096)
if not data:
return True
except (ConnectionResetError, BrokenPipeError, OSError):
return True
return False
# --- Extension negotiation ---
def query_extension(self, name: str) -> XExtensionData | None:
if name in self._extensions:
return self._extensions[name]
req = QueryExtensionRequest(name=name)
self.send_request(req.to_bytes(self._byte_order))
resp = self.recv_response(timeout=5.0)
if resp is None or isinstance(resp, X11Error):
return None
if len(resp.data) >= 12:
present, major, first_event, first_error = struct.unpack_from(
"BBBB", resp.data, 8
)
if present:
result = XExtensionData(major, first_event, first_error)
self._extensions[name] = result
return result
return None
def xkb_use_extension(self, major_version: int = 1, minor_version: int = 0) -> int:
ext = self.query_extension(Extension.XKB)
if not ext:
raise X11ConnectionError("XKB extension not available")
req = xkb.use_extension(
ext.opcode,
major=major_version,
minor=minor_version,
byte_order=self._byte_order,
)
self.send_request(req)
resp = self.recv_response(timeout=5.0)
if isinstance(resp, X11Error):
raise X11ConnectionError(f"XkbUseExtension failed: error {resp.error_code}")
return ext.opcode
def enable_big_requests(self) -> int:
ext = self.query_extension(Extension.BIG_REQUESTS)
if not ext:
raise X11ConnectionError("BIG-REQUESTS not available")
req = BigRequestsEnableRequest(opcode=ext.opcode)
self.send_request(req.to_bytes(self._byte_order))
resp = self.recv_response(timeout=5.0)
if isinstance(resp, X11Error):
raise X11ConnectionError("BigRequestsEnable failed")
if resp and len(resp.data) >= 12:
return struct.unpack_from(f"{self._byte_order}I", resp.data, 8)[0]
return 0
# --- Resource creation helpers ---
def create_window(
self,
width: int = 100,
height: int = 100,
depth: int | None = None,
parent: int | None = None,
x: int = 0,
y: int = 0,
) -> int:
wid = self.alloc_id()
if parent is None:
parent = self.root_window
if depth is None:
depth = self.root_depth
assert parent is not None
assert depth is not None
req = CreateWindowRequest(
wid=wid,
parent=parent,
x=x,
y=y,
width=width,
height=height,
depth=depth,
)
self.send_request(req.to_bytes(self._byte_order))
self.flush_responses(timeout=0.2)
return wid
def create_pixmap(
self,
width: int = 100,
height: int = 100,
depth: int | None = None,
drawable: int | None = None,
) -> int:
pid = self.alloc_id()
if drawable is None:
drawable = self.root_window
if depth is None:
depth = self.root_depth
assert drawable is not None
assert depth is not None
req = CreatePixmapRequest(
pid=pid,
drawable=drawable,
width=width,
height=height,
depth=depth,
)
self.send_request(req.to_bytes(self._byte_order))
self.flush_responses(timeout=0.2)
return pid
def intern_atom(self, name: str, only_if_exists: bool = False) -> int:
req = InternAtomRequest(name=name, only_if_exists=only_if_exists)
self.send_request(req.to_bytes(self._byte_order))
resp = self.recv_response(timeout=5.0)
if isinstance(resp, X11Error) or resp is None:
return 0
if len(resp.data) >= 12:
return struct.unpack_from(f"{self._byte_order}I", resp.data, 8)[0]
return 0
def get_fd(self) -> int:
assert self.sock
return self.sock.fileno()
def close(self) -> None:
if self.sock:
try:
self.sock.close()
except OSError:
pass
self.sock = None
def _recv_exact(self, nbytes: int, timeout: float = 10.0) -> bytes:
assert self.sock
data = b""
deadline = time.monotonic() + timeout
while len(data) < nbytes:
remaining = deadline - time.monotonic()
if remaining <= 0:
raise X11ConnectionError(
f"Timeout reading ({len(data)}/{nbytes} bytes)"
)
ready = select.select([self.sock], [], [], min(remaining, 1.0))
if ready[0]:
chunk = self.sock.recv(nbytes - len(data))
if not chunk:
raise X11ConnectionError(
f"Connection closed ({len(data)}/{nbytes} bytes)"
)
data += chunk
return data
def __enter__(self) -> "RawX11Connection":
return self
def __exit__(self, *args) -> bool:
self.close()
return False
def __del__(self) -> None:
self.close()
class XlibConnection:
"""python-xlib based connection for higher-level X operations."""
def __init__(self, display_num):
from Xlib import display as xlib_display
self.display_num = display_num
self.display = xlib_display.Display(f":{display_num}")
self.screen = self.display.screen()
self.root = self.screen.root
def get_fd(self):
return self.display.fileno()
def create_window(self, width=100, height=100, x=0, y=0):
from Xlib import X
window = self.root.create_window(
x,
y,
width,
height,
0,
self.screen.root_depth,
X.InputOutput,
X.CopyFromParent,
)
self.display.sync()
return window
def flush(self):
self.display.flush()
def sync(self):
self.display.sync()
def close(self):
try:
self.display.close()
except Exception:
pass
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
return False

429
test/pyxtest/xserver.py Normal file
View file

@ -0,0 +1,429 @@
# SPDX-License-Identifier: MIT
#
# X server lifecycle manager
#
# Handles starting and stopping Xvfb, Xwayland, and Xorg servers,
# optionally under valgrind or with AddressSanitizer (ASAN) support,
# with automatic display number allocation via the -displayfd mechanism.
from typing import Iterator
import os
import select
import shutil
import subprocess
import tempfile
import time
import warnings
from pathlib import Path
from asan import AsanError
from valgrind import ValgrindError
class XServerProcess:
"""
Manages an X server subprocess for testing.
Supports Xvfb, Xwayland, and Xorg. Uses the -displayfd pipe mechanism
for automatic display number allocation. Optionally wraps the server
in valgrind for memory error detection, or detects AddressSanitizer
(ASAN) errors when running an ASAN-instrumented binary.
"""
VALGRIND_ERROR_EXIT = 99
def __init__(
self,
server_type="xvfb",
valgrind=False,
valgrind_suppressions=None,
asan=False,
server_path=None,
extra_args=None,
log_file=None,
):
self.server_type = server_type
self.asan = asan
if asan and valgrind:
warnings.warn(
"ASAN and valgrind are incompatible; disabling valgrind "
"in favour of ASAN.",
stacklevel=2,
)
valgrind = False
self.valgrind = valgrind
if valgrind_suppressions is None:
default_supp = Path(__file__).resolve().parent / "valgrind.suppressions"
self.valgrind_suppressions = (
default_supp if default_supp.is_file() else None
)
else:
self.valgrind_suppressions = valgrind_suppressions
self.server_path = server_path or self._find_server()
self.extra_args = extra_args or []
self.log_file = log_file
self._process = None
self._display_num = None
self._valgrind_xml_file = None
self._asan_log_path = None
self._stderr_file = None
self._weston_process = None
def _find_server(self):
"""Discover the server binary from environment or build directory."""
env_map = {
"xvfb": "XVFB_PATH",
"xwayland": "XWAYLAND_PATH",
"xorg": "XORG_PATH",
}
env_var = env_map.get(self.server_type)
if env_var and os.environ.get(env_var):
path = Path(os.environ[env_var])
if path.is_file() and os.access(path, os.X_OK):
return path
def find_meson_builddir(source_root: Path) -> Iterator[Path]:
for d in source_root.iterdir():
if d.is_dir() and (d / "meson-private").exists():
yield d
# Try XSERVER_BUILDDIR env var or fall back to the first
# build directory relative to this file's location
# test/pyxtest/xserver.py -> ../../build/hw/...
builddir = os.environ.get("XSERVER_BUILDDIR")
if builddir is None:
try:
builddir = next(
find_meson_builddir(Path(__file__).resolve().parent.parent.parent)
)
except StopIteration:
pass
if builddir:
build_paths = {
"xvfb": Path(builddir, "hw", "vfb", "Xvfb"),
"xwayland": Path(builddir, "hw", "xwayland", "Xwayland"),
"xorg": Path(builddir, "hw", "xfree86", "Xorg"),
}
path = build_paths.get(self.server_type)
if path and path.is_file() and os.access(path, os.X_OK):
return path
# Fall back to system PATH
binary_names = {
"xvfb": "Xvfb",
"xwayland": "Xwayland",
"xorg": "Xorg",
}
name = binary_names.get(self.server_type)
server_in_path = shutil.which(name)
if server_in_path:
warnings.warn(
f"Using system {self.server_type} server from PATH: {server_in_path}. "
f"Set {env_var} or XSERVER_BUILDDIR to use a specific build.",
stacklevel=2,
)
return server_in_path
raise FileNotFoundError(
f"Failed to find {self.server_type} server binary. "
f"Set {env_var} environment variable or build the server first."
)
@property
def display_num(self):
return self._display_num
@property
def display(self):
if self._display_num is not None:
return f":{self._display_num}"
return None
def start(self, timeout=10):
"""Start the X server and wait for it to be ready."""
read_fd, write_fd = os.pipe()
cmd = self._build_command(write_fd)
self._stderr_file = tempfile.NamedTemporaryFile(
prefix=f"xserver-{self.server_type}-stderr-",
suffix=".log",
delete=False,
mode="w",
)
if self.server_type == "xwayland":
self._start_wayland_compositor()
env = os.environ.copy()
if self._weston_process:
env.setdefault("WAYLAND_DISPLAY", "wayland-security-test")
if self.asan:
self._setup_asan_env(env)
try:
self._process = subprocess.Popen(
cmd,
stderr=self._stderr_file,
pass_fds=(write_fd,),
env=env,
)
except Exception:
os.close(write_fd)
os.close(read_fd)
raise
os.close(write_fd)
try:
display_bytes = b""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if self._process.poll() is not None:
display_bytes = None
break
ready, _, _ = select.select([read_fd], [], [], 1.0)
if ready:
chunk = os.read(read_fd, 64)
if not chunk:
break
display_bytes += chunk
if b"\n" in display_bytes:
break
if not display_bytes:
if self._process.poll() is not None:
stderr_content = self._read_stderr()
raise RuntimeError(
f"X server exited with code {self._process.returncode} "
f"before sending display number (waited {timeout}s).\n"
f"Command: {' '.join(str(s) for s in cmd)}\n"
f"Stderr:\n{stderr_content}"
)
self.stop()
raise RuntimeError(
f"Timed out waiting for X server display number (waited {timeout}s)"
)
self._display_num = int(display_bytes.strip())
finally:
os.close(read_fd)
return self._display_num
def _build_command(self, displayfd):
"""Build the full command line including optional valgrind wrapper."""
server_args = [self.server_path]
if self.server_type == "xvfb":
server_args.extend(
[
"-screen",
"scrn",
"1280x1024x24",
]
)
elif self.server_type == "xwayland":
server_args.extend(["-nokeymap"])
elif self.server_type == "xorg":
# -logfile is only permitted if we're running as root
if self.log_file and os.geteuid() == 0:
server_args.extend(["-logfile", str(self.log_file)])
server_args.extend(["-noreset", "+byteswappedclients"])
# Auto-detect xkb directory if the compiled-in default doesn't exist
xkb_dir = os.environ.get("XKB_CONFIG_ROOT")
if not xkb_dir:
for candidate in ["/usr/share/X11/xkb", "/usr/local/share/X11/xkb"]:
if os.path.isdir(candidate):
xkb_dir = candidate
break
if xkb_dir:
server_args.extend(["-xkbdir", xkb_dir])
server_args.extend(["-displayfd", str(displayfd)])
server_args.extend(self.extra_args)
if not self.valgrind:
return server_args
self._valgrind_xml_file = tempfile.NamedTemporaryFile(
prefix=f"xserver-{self.server_type}-valgrind-", suffix=".xml", delete=False
)
self._valgrind_xml_file.close()
valgrind_cmd = [
"valgrind",
"--tool=memcheck",
"--leak-check=full",
"--track-origins=yes",
"--show-reachable=no",
"--gen-suppressions=all",
f"--error-exitcode={self.VALGRIND_ERROR_EXIT}",
"--xml=yes",
f"--xml-file={self._valgrind_xml_file.name}",
]
if self.valgrind_suppressions:
valgrind_cmd.append(f"--suppressions={self.valgrind_suppressions}")
return valgrind_cmd + server_args
def _setup_asan_env(self, env):
"""Configure ASAN_OPTIONS for the server subprocess.
Sets up a log file for ASAN output and configures ASAN to not
detect leaks (too noisy for these tests). Any existing
ASAN_OPTIONS from the environment are preserved.
"""
asan_log_file = tempfile.NamedTemporaryFile(
prefix=f"xserver-{self.server_type}-asan-",
suffix=".log",
delete=False,
)
self._asan_log_path = Path(asan_log_file.name)
asan_log_file.close()
asan_opts = {
"log_path": str(self._asan_log_path),
"detect_leaks": "0",
}
# Merge with any existing ASAN_OPTIONS from the environment
existing = env.get("ASAN_OPTIONS", "")
for part in existing.split(":"):
part = part.strip()
if "=" in part:
key, val = part.split("=", 1)
# Don't override our log_path
if key not in asan_opts:
asan_opts[key] = val
env["ASAN_OPTIONS"] = ":".join(f"{k}={v}" for k, v in asan_opts.items())
def _start_wayland_compositor(self):
"""Start weston as a headless compositor for Xwayland testing."""
if not shutil.which("weston"):
raise FileNotFoundError(
"weston is required for Xwayland testing but was not found"
)
self._weston_process = subprocess.Popen(
["weston", "--no-config", "--backend=headless-backend.so"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
time.sleep(1)
if self._weston_process.poll() is not None:
raise RuntimeError(
f"weston exited with code {self._weston_process.returncode}"
)
@property
def is_alive(self) -> bool:
"""Check if the server process is still running."""
if self._process is None:
return False
return self._process.poll() is None
@property
def crash_signal(self) -> int | None:
"""If the server crashed, return the signal number. Otherwise None."""
if self._process is None:
return None
ret = self._process.poll()
if ret is not None and ret < 0:
return -ret
return None
def stop(self):
"""Stop the server and return any valgrind/ASAN errors."""
errors = []
if self._process and self._process.poll() is None:
self._process.terminate()
try:
self._process.wait(timeout=10)
except subprocess.TimeoutExpired:
self._process.kill()
self._process.wait(timeout=5)
if self._weston_process and self._weston_process.poll() is None:
self._weston_process.terminate()
try:
self._weston_process.wait(timeout=5)
except subprocess.TimeoutExpired:
self._weston_process.kill()
self._weston_process.wait()
if self.valgrind and self._valgrind_xml_file:
errors = ValgrindError.from_xml(Path(self._valgrind_xml_file.name))
if self._stderr_file:
self._stderr_file.close()
return errors
def _read_stderr(self):
"""Read the captured stderr content."""
if self._stderr_file:
self._stderr_file.flush()
try:
return Path(self._stderr_file.name).read_text()
except OSError:
pass
return ""
def check_valgrind_errors(self):
"""Parse and assert no valgrind errors occurred."""
if self._valgrind_xml_file is None:
return []
errors = ValgrindError.from_xml(Path(self._valgrind_xml_file.name))
if errors:
msg = f"Valgrind found {len(errors)} error(s):\n\n"
msg += "\n\n".join(str(e) for e in errors)
raise AssertionError(msg)
return errors
def get_asan_errors(self) -> list[AsanError]:
"""Parse ASAN errors from log file and/or stderr."""
errors: list[AsanError] = []
# Check the ASAN log file(s) first (ASAN appends .<pid> to log_path)
if self._asan_log_path:
errors.extend(AsanError.from_log(self._asan_log_path))
# Also check stderr in case ASAN wrote there
stderr_text = self._read_stderr()
if stderr_text and "AddressSanitizer" in stderr_text:
stderr_errors = AsanError.from_text(stderr_text)
# Avoid duplicates: only add stderr errors if log file had none
if not errors:
errors.extend(stderr_errors)
return errors
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
return False
def __repr__(self):
state = "running" if self.is_alive else "stopped"
display = self.display or "N/A"
flags = ""
if self.valgrind:
flags += " +valgrind"
if self.asan:
flags += " +asan"
return f"<XServerProcess {self.server_type} {display} [{state}]{flags}>"