mirror of
https://gitlab.freedesktop.org/xorg/xserver.git
synced 2026-06-07 12:18:52 +02:00
Add a regression test that reproduces the XKB num_levels stack overflow. The test sends an XkbSetMap request with XkbSetMapResizeTypes that includes a non-canonical key type with numLevels=255, exceeding XkbMaxShiftLevel (63). Without the fix, this type would be accepted and stored in the server's type table. A subsequent ChangeKeyboardMapping would trigger XkbUpdateKeyTypesFromCore -> XkbKeyTypesForCoreSymbols, where the oversized num_levels is used as groupsWidth, causing indices into the tsyms[252] stack buffer to reach up to 1019 and overflow. Assisted-by: Claude:claude-opus-4-6 Part-of: <https://gitlab.freedesktop.org/xorg/xserver/-/merge_requests/2228>
587 lines
17 KiB
Python
587 lines
17 KiB
Python
# SPDX-License-Identifier: MIT
|
|
#
|
|
# X11 client connection utilities
|
|
|
|
import select
|
|
import socket
|
|
import struct
|
|
import time
|
|
from dataclasses import dataclass
|
|
from enum import StrEnum
|
|
from typing import Protocol, runtime_checkable
|
|
|
|
from proto.bigrequests import BigRequestsEnableRequest
|
|
from proto.x11 import (
|
|
ChangeKeyboardMappingRequest,
|
|
CreatePixmapRequest,
|
|
CreateWindowRequest,
|
|
InternAtomRequest,
|
|
QueryExtensionRequest,
|
|
)
|
|
from proto import xkb
|
|
|
|
|
|
class X11ConnectionError(Exception):
|
|
"""Raised when the X11 connection fails."""
|
|
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class XExtensionData:
|
|
"""Cached result of a QueryExtension reply."""
|
|
|
|
opcode: int
|
|
first_event: int
|
|
first_error: int
|
|
|
|
|
|
@runtime_checkable
|
|
class X11Request(Protocol):
|
|
"""Protocol for X11 request objects that can be serialized to wire format.
|
|
|
|
All ``@dataclass`` request builders in ``proto/`` satisfy this protocol.
|
|
"""
|
|
|
|
def to_bytes(self, byte_order: str = "<") -> bytes: ...
|
|
|
|
|
|
class Extension(StrEnum):
|
|
"""X11 extension wire names as used in QueryExtension requests."""
|
|
|
|
BIG_REQUESTS = "BIG-REQUESTS"
|
|
COMPOSITE = "Composite"
|
|
DAMAGE = "DAMAGE"
|
|
DBE = "DOUBLE-BUFFER"
|
|
DPMS = "DPMS"
|
|
DRI2 = "DRI2"
|
|
DRI3 = "DRI3"
|
|
GENERIC_EVENT = "Generic Event Extension"
|
|
GLX = "GLX"
|
|
MIT_SCREEN_SAVER = "MIT-SCREEN-SAVER"
|
|
MIT_SHM = "MIT-SHM"
|
|
PRESENT = "Present"
|
|
RANDR = "RANDR"
|
|
RECORD = "RECORD"
|
|
RENDER = "RENDER"
|
|
SECURITY = "SECURITY"
|
|
SHAPE = "SHAPE"
|
|
SYNC = "SYNC"
|
|
XC_MISC = "XC-MISC"
|
|
XF86BIGFONT = "XFree86-Bigfont"
|
|
XF86DGA = "XFree86-DGA"
|
|
XF86VIDMODE = "XFree86-VidModeExtension"
|
|
XFIXES = "XFIXES"
|
|
XI = "XInputExtension"
|
|
XRES = "X-Resource"
|
|
XINERAMA = "XINERAMA"
|
|
XKB = "XKEYBOARD"
|
|
XTEST = "XTEST"
|
|
XVIDEO = "XVideo"
|
|
XVIDEO_MC = "XVideo-MotionCompensation"
|
|
|
|
|
|
# X11 core protocol error codes (from X.h)
|
|
BadRequest = 1
|
|
BadValue = 2
|
|
BadWindow = 3
|
|
BadPixmap = 4
|
|
BadAtom = 5
|
|
BadCursor = 6
|
|
BadFont = 7
|
|
BadMatch = 8
|
|
BadDrawable = 9
|
|
BadAccess = 10
|
|
BadAlloc = 11
|
|
BadColor = 12
|
|
BadGC = 13
|
|
BadIDChoice = 14
|
|
BadName = 15
|
|
BadLength = 16
|
|
BadImplementation = 17
|
|
|
|
|
|
@dataclass
|
|
class X11Error:
|
|
"""An X11 error reply from the server."""
|
|
|
|
response_type: int
|
|
error_code: int
|
|
sequence: int
|
|
resource_id: int
|
|
minor_code: int
|
|
major_code: int
|
|
|
|
@classmethod
|
|
def from_data(cls, data: bytes, byte_order: str = "<") -> "X11Error":
|
|
if len(data) < 32:
|
|
data = data + b"\x00" * (32 - len(data))
|
|
response_type, error_code, sequence, resource_id, minor_code, major_code = (
|
|
struct.unpack_from(f"{byte_order}BBHIHB", data)
|
|
)
|
|
return cls(
|
|
response_type, error_code, sequence, resource_id, minor_code, major_code
|
|
)
|
|
|
|
def __repr__(self):
|
|
return (
|
|
f"<X11Error code={self.error_code} seq={self.sequence} "
|
|
f"major={self.major_code} minor={self.minor_code}>"
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class X11Reply:
|
|
"""An X11 reply from the server."""
|
|
|
|
data: bytes
|
|
response_type: int
|
|
sequence: int
|
|
length: int
|
|
|
|
@classmethod
|
|
def from_data(cls, data: bytes, byte_order: str = "<") -> "X11Reply":
|
|
if len(data) >= 8:
|
|
response_type = data[0]
|
|
sequence = struct.unpack_from(f"{byte_order}H", data, 2)[0]
|
|
length = struct.unpack_from(f"{byte_order}I", data, 4)[0]
|
|
else:
|
|
response_type = sequence = length = 0
|
|
return cls(data, response_type, sequence, length)
|
|
|
|
def __repr__(self):
|
|
return (
|
|
f"<X11Reply seq={self.sequence} "
|
|
f"extra_len={self.length} total={len(self.data)}>"
|
|
)
|
|
|
|
|
|
class RawX11Connection:
|
|
"""
|
|
Minimal X11 connection for sending raw (possibly malformed) requests.
|
|
|
|
Set swapped=True for testing byte-swap code paths (SProcXxx).
|
|
"""
|
|
|
|
def __init__(self, display_num, swapped=False):
|
|
self.display_num = display_num
|
|
self.swapped = swapped
|
|
self._byte_order = ">" if swapped else "<"
|
|
self.sock = None
|
|
self.seq = 0
|
|
self.root_window = 0
|
|
self.root_visual = 0
|
|
self.root_depth = 0
|
|
self._resource_id_base = 0
|
|
self._resource_id_mask = 0
|
|
self._next_resource_id = 0
|
|
self._extensions = {}
|
|
self._connect()
|
|
|
|
def _connect(self):
|
|
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
path = f"/tmp/.X11-unix/X{self.display_num}"
|
|
try:
|
|
self.sock.connect(path)
|
|
except (ConnectionRefusedError, FileNotFoundError) as e:
|
|
self.sock.close()
|
|
self.sock = None
|
|
raise X11ConnectionError(f"Cannot connect to {path}: {e}")
|
|
try:
|
|
self._handshake()
|
|
except Exception:
|
|
self.sock.close()
|
|
self.sock = None
|
|
raise
|
|
|
|
def _handshake(self):
|
|
byte_order = 0x42 if self.swapped else 0x6C
|
|
bo = self._byte_order
|
|
setup = struct.pack(f"{bo}BxHHHHxx", byte_order, 11, 0, 0, 0)
|
|
assert self.sock
|
|
self.sock.sendall(setup)
|
|
|
|
header = self._recv_exact(8)
|
|
status = header[0]
|
|
|
|
if status == 0:
|
|
reason_len = header[1]
|
|
extra_len = struct.unpack_from(f"{bo}H", header, 6)[0]
|
|
extra = self._recv_exact(extra_len * 4)
|
|
reason = extra[:reason_len].decode("ascii", errors="replace")
|
|
raise X11ConnectionError(f"X server refused connection: {reason}")
|
|
|
|
if status == 2:
|
|
raise X11ConnectionError("X server requires authentication")
|
|
if status != 1:
|
|
raise X11ConnectionError(f"Unexpected setup status: {status}")
|
|
|
|
extra_len = struct.unpack_from(f"{bo}H", header, 6)[0]
|
|
data = self._recv_exact(extra_len * 4)
|
|
|
|
(_, res_base, res_mask, _, vendor_len, _, num_screens, num_formats) = (
|
|
struct.unpack_from(f"{bo}IIIIHH BB", data, 0)
|
|
)
|
|
|
|
self._resource_id_base = res_base
|
|
self._resource_id_mask = res_mask
|
|
self._next_resource_id = 1
|
|
|
|
offset = 32 + ((vendor_len + 3) & ~3) + num_formats * 8
|
|
|
|
if num_screens > 0 and offset + 40 <= len(data):
|
|
(
|
|
self.root_window,
|
|
_,
|
|
_,
|
|
_,
|
|
_,
|
|
_,
|
|
_,
|
|
_,
|
|
_,
|
|
_,
|
|
_,
|
|
self.root_visual,
|
|
_,
|
|
_,
|
|
self.root_depth,
|
|
) = struct.unpack_from(f"{bo}IIIIIHHHHHHI BBB", data, offset)
|
|
|
|
# --- Core protocol helpers ---
|
|
|
|
def alloc_id(self) -> int:
|
|
xid = self._resource_id_base | self._next_resource_id
|
|
self._next_resource_id += 1
|
|
return xid
|
|
|
|
def send_request(self, data: bytes | X11Request) -> int:
|
|
"""Send a raw or structured X11 request.
|
|
|
|
*data* may be a ``bytes`` object (sent as-is) or any object that
|
|
implements :class:`X11Request` (i.e. has a ``to_bytes(byte_order)``
|
|
method). In the latter case ``to_bytes`` is called automatically
|
|
with the connection's byte order.
|
|
"""
|
|
if isinstance(data, X11Request):
|
|
data = data.to_bytes(self._byte_order)
|
|
self.seq += 1
|
|
assert self.sock
|
|
self.sock.sendall(data)
|
|
return self.seq
|
|
|
|
def recv_response(self, timeout: float = 5.0) -> X11Error | X11Reply | None:
|
|
ready = select.select([self.sock], [], [], timeout)
|
|
if not ready[0]:
|
|
return None
|
|
try:
|
|
header = self._recv_exact(32, timeout=timeout)
|
|
except (ConnectionResetError, BrokenPipeError, OSError, X11ConnectionError):
|
|
return None
|
|
if not header:
|
|
return None
|
|
|
|
rtype = header[0]
|
|
bo = self._byte_order
|
|
if rtype == 0:
|
|
return X11Error.from_data(header, bo)
|
|
elif rtype == 1:
|
|
extra_len = struct.unpack_from(f"{bo}I", header, 4)[0]
|
|
if extra_len > 0:
|
|
try:
|
|
extra = self._recv_exact(extra_len * 4, timeout=timeout)
|
|
return X11Reply.from_data(header + extra, bo)
|
|
except (
|
|
ConnectionResetError,
|
|
BrokenPipeError,
|
|
OSError,
|
|
X11ConnectionError,
|
|
):
|
|
return X11Reply.from_data(header, bo)
|
|
return X11Reply.from_data(header, bo)
|
|
else:
|
|
return X11Reply.from_data(header, bo)
|
|
|
|
def flush_responses(self, timeout: float = 0.5) -> list[X11Error | X11Reply]:
|
|
responses: list[X11Error | X11Reply] = []
|
|
while True:
|
|
resp = self.recv_response(timeout=timeout)
|
|
if resp is None:
|
|
break
|
|
responses.append(resp)
|
|
return responses
|
|
|
|
def is_connected(self) -> bool:
|
|
assert self.sock
|
|
try:
|
|
ready = select.select([self.sock], [], [], 0)
|
|
if ready[0]:
|
|
data = self.sock.recv(1, socket.MSG_PEEK)
|
|
return len(data) > 0
|
|
return True
|
|
except (ConnectionResetError, BrokenPipeError, OSError):
|
|
return False
|
|
|
|
def wait_for_disconnect(self, timeout: float = 5.0) -> bool:
|
|
assert self.sock
|
|
deadline = time.monotonic() + timeout
|
|
while time.monotonic() < deadline:
|
|
remaining = deadline - time.monotonic()
|
|
if remaining <= 0:
|
|
break
|
|
ready = select.select([self.sock], [], [], min(remaining, 0.5))
|
|
if ready[0]:
|
|
try:
|
|
data = self.sock.recv(4096)
|
|
if not data:
|
|
return True
|
|
except (ConnectionResetError, BrokenPipeError, OSError):
|
|
return True
|
|
return False
|
|
|
|
# --- Extension negotiation ---
|
|
|
|
def query_extension(self, name: str) -> XExtensionData | None:
|
|
if name in self._extensions:
|
|
return self._extensions[name]
|
|
|
|
req = QueryExtensionRequest(name=name)
|
|
self.send_request(req)
|
|
resp = self.recv_response(timeout=5.0)
|
|
if resp is None or isinstance(resp, X11Error):
|
|
return None
|
|
|
|
if len(resp.data) >= 12:
|
|
present, major, first_event, first_error = struct.unpack_from(
|
|
"BBBB", resp.data, 8
|
|
)
|
|
if present:
|
|
result = XExtensionData(major, first_event, first_error)
|
|
self._extensions[name] = result
|
|
return result
|
|
return None
|
|
|
|
def xkb_use_extension(self, major_version: int = 1, minor_version: int = 0) -> int:
|
|
ext = self.query_extension(Extension.XKB)
|
|
if not ext:
|
|
raise X11ConnectionError("XKB extension not available")
|
|
|
|
req = xkb.UseExtensionRequest(
|
|
opcode=ext.opcode,
|
|
major=major_version,
|
|
minor=minor_version,
|
|
)
|
|
self.send_request(req)
|
|
|
|
resp = self.recv_response(timeout=5.0)
|
|
if isinstance(resp, X11Error):
|
|
raise X11ConnectionError(f"XkbUseExtension failed: error {resp.error_code}")
|
|
return ext.opcode
|
|
|
|
def enable_big_requests(self) -> int:
|
|
ext = self.query_extension(Extension.BIG_REQUESTS)
|
|
if not ext:
|
|
raise X11ConnectionError("BIG-REQUESTS not available")
|
|
|
|
req = BigRequestsEnableRequest(opcode=ext.opcode)
|
|
self.send_request(req)
|
|
|
|
resp = self.recv_response(timeout=5.0)
|
|
if isinstance(resp, X11Error):
|
|
raise X11ConnectionError("BigRequestsEnable failed")
|
|
if resp and len(resp.data) >= 12:
|
|
return struct.unpack_from(f"{self._byte_order}I", resp.data, 8)[0]
|
|
return 0
|
|
|
|
# --- Resource creation helpers ---
|
|
|
|
def create_window(
|
|
self,
|
|
width: int = 100,
|
|
height: int = 100,
|
|
depth: int | None = None,
|
|
parent: int | None = None,
|
|
x: int = 0,
|
|
y: int = 0,
|
|
) -> int:
|
|
wid = self.alloc_id()
|
|
if parent is None:
|
|
parent = self.root_window
|
|
if depth is None:
|
|
depth = self.root_depth
|
|
|
|
assert parent is not None
|
|
assert depth is not None
|
|
|
|
req = CreateWindowRequest(
|
|
wid=wid,
|
|
parent=parent,
|
|
x=x,
|
|
y=y,
|
|
width=width,
|
|
height=height,
|
|
depth=depth,
|
|
)
|
|
self.send_request(req)
|
|
self.flush_responses(timeout=0.2)
|
|
return wid
|
|
|
|
def create_pixmap(
|
|
self,
|
|
width: int = 100,
|
|
height: int = 100,
|
|
depth: int | None = None,
|
|
drawable: int | None = None,
|
|
) -> int:
|
|
pid = self.alloc_id()
|
|
if drawable is None:
|
|
drawable = self.root_window
|
|
if depth is None:
|
|
depth = self.root_depth
|
|
|
|
assert drawable is not None
|
|
assert depth is not None
|
|
|
|
req = CreatePixmapRequest(
|
|
pid=pid,
|
|
drawable=drawable,
|
|
width=width,
|
|
height=height,
|
|
depth=depth,
|
|
)
|
|
self.send_request(req)
|
|
self.flush_responses(timeout=0.2)
|
|
return pid
|
|
|
|
def intern_atom(self, name: str, only_if_exists: bool = False) -> int:
|
|
req = InternAtomRequest(name=name, only_if_exists=only_if_exists)
|
|
self.send_request(req)
|
|
|
|
resp = self.recv_response(timeout=5.0)
|
|
if isinstance(resp, X11Error) or resp is None:
|
|
return 0
|
|
if len(resp.data) >= 12:
|
|
return struct.unpack_from(f"{self._byte_order}I", resp.data, 8)[0]
|
|
return 0
|
|
|
|
def xkb_get_map(
|
|
self, opcode: int, full: int = 0, partial: int = 0, **kwargs
|
|
) -> xkb.GetMapReply | None:
|
|
"""Send XkbGetMap and return the parsed reply."""
|
|
req = xkb.GetMapRequest(opcode=opcode, full=full, partial=partial, **kwargs)
|
|
self.send_request(req.to_bytes(self._byte_order))
|
|
resp = self.recv_response(timeout=5.0)
|
|
if isinstance(resp, X11Error) or resp is None:
|
|
return None
|
|
# resp.data is the full reply: 32 standard header + extra
|
|
header_data = resp.data[:32]
|
|
extra_data = resp.data[32:]
|
|
return xkb.GetMapReply.from_bytes(header_data, extra_data)
|
|
|
|
def change_keyboard_mapping(
|
|
self,
|
|
first_keycode: int,
|
|
keysyms_per_keycode: int,
|
|
keycodes: int = 1,
|
|
keysyms: list[int] | None = None,
|
|
) -> None:
|
|
"""Send ChangeKeyboardMapping request."""
|
|
req = ChangeKeyboardMappingRequest(
|
|
first_keycode=first_keycode,
|
|
keysyms_per_keycode=keysyms_per_keycode,
|
|
keycodes=keycodes,
|
|
keysyms=keysyms,
|
|
)
|
|
self.send_request(req.to_bytes(self._byte_order))
|
|
|
|
def get_fd(self) -> int:
|
|
assert self.sock
|
|
return self.sock.fileno()
|
|
|
|
def close(self) -> None:
|
|
if self.sock:
|
|
try:
|
|
self.sock.close()
|
|
except OSError:
|
|
pass
|
|
self.sock = None
|
|
|
|
def _recv_exact(self, nbytes: int, timeout: float = 10.0) -> bytes:
|
|
assert self.sock
|
|
data = b""
|
|
deadline = time.monotonic() + timeout
|
|
while len(data) < nbytes:
|
|
remaining = deadline - time.monotonic()
|
|
if remaining <= 0:
|
|
raise X11ConnectionError(
|
|
f"Timeout reading ({len(data)}/{nbytes} bytes)"
|
|
)
|
|
ready = select.select([self.sock], [], [], min(remaining, 1.0))
|
|
if ready[0]:
|
|
chunk = self.sock.recv(nbytes - len(data))
|
|
if not chunk:
|
|
raise X11ConnectionError(
|
|
f"Connection closed ({len(data)}/{nbytes} bytes)"
|
|
)
|
|
data += chunk
|
|
return data
|
|
|
|
def __enter__(self) -> "RawX11Connection":
|
|
return self
|
|
|
|
def __exit__(self, *args) -> bool:
|
|
self.close()
|
|
return False
|
|
|
|
def __del__(self) -> None:
|
|
self.close()
|
|
|
|
|
|
class XlibConnection:
|
|
"""python-xlib based connection for higher-level X operations."""
|
|
|
|
def __init__(self, display_num):
|
|
from Xlib import display as xlib_display
|
|
|
|
self.display_num = display_num
|
|
self.display = xlib_display.Display(f":{display_num}")
|
|
self.screen = self.display.screen()
|
|
self.root = self.screen.root
|
|
|
|
def get_fd(self):
|
|
return self.display.fileno()
|
|
|
|
def create_window(self, width=100, height=100, x=0, y=0):
|
|
from Xlib import X
|
|
|
|
window = self.root.create_window(
|
|
x,
|
|
y,
|
|
width,
|
|
height,
|
|
0,
|
|
self.screen.root_depth,
|
|
X.InputOutput,
|
|
X.CopyFromParent,
|
|
)
|
|
self.display.sync()
|
|
return window
|
|
|
|
def flush(self):
|
|
self.display.flush()
|
|
|
|
def sync(self):
|
|
self.display.sync()
|
|
|
|
def close(self):
|
|
try:
|
|
self.display.close()
|
|
except Exception:
|
|
pass
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *args):
|
|
self.close()
|
|
return False
|