mirror of
https://gitlab.freedesktop.org/xorg/xserver.git
synced 2026-06-07 02:58:22 +02:00
pyxtest: add tests for the byteswapping patches
Not a full list since not all can easily be tested but hey, better than nothing. See https://gitlab.freedesktop.org/xorg/xserver/-/merge_requests/2181 Assisted-by: Claude:claude-claude-opus-4-6 Part-of: <https://gitlab.freedesktop.org/xorg/xserver/-/merge_requests/2187>
This commit is contained in:
parent
7f7bb53cf9
commit
acbc46e708
18 changed files with 1813 additions and 25 deletions
|
|
@ -55,11 +55,17 @@ if pytest.found()
|
|||
|
||||
# This needs to be kept in sync with the test_foo.py files in the tree
|
||||
tests_pyxtest = [
|
||||
'test_present.py',
|
||||
'test_randr.py',
|
||||
'test_record.py',
|
||||
'test_render.py',
|
||||
'test_screensaver.py',
|
||||
'test_shm.py',
|
||||
'test_vidmode.py',
|
||||
'test_xinerama.py',
|
||||
'test_xi.py',
|
||||
'test_xkb.py',
|
||||
'test_xres.py',
|
||||
]
|
||||
|
||||
test_list_data = configuration_data()
|
||||
|
|
|
|||
157
test/pyxtest/proto/present.py
Normal file
157
test/pyxtest/proto/present.py
Normal file
|
|
@ -0,0 +1,157 @@
|
|||
# SPDX-License-Identifier: MIT
|
||||
#
|
||||
# Present extension protocol request builders for byteswap testing.
|
||||
|
||||
import struct
|
||||
from dataclasses import dataclass
|
||||
|
||||
# Present minor opcodes
|
||||
PresentQueryVersion = 0
|
||||
PresentPixmap = 1
|
||||
PresentNotifyMSC = 2
|
||||
PresentSelectInput = 3
|
||||
PresentQueryCapabilities = 4
|
||||
|
||||
|
||||
@dataclass
|
||||
class QueryVersionRequest:
|
||||
"""PresentQueryVersion request."""
|
||||
|
||||
opcode: int
|
||||
major: int = 1
|
||||
minor: int = 2
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBHII",
|
||||
self.opcode,
|
||||
PresentQueryVersion,
|
||||
3,
|
||||
self.major,
|
||||
self.minor,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SelectInputRequest:
|
||||
"""PresentSelectInput request."""
|
||||
|
||||
opcode: int
|
||||
eid: int
|
||||
window: int
|
||||
event_mask: int = 0
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBH III",
|
||||
self.opcode,
|
||||
PresentSelectInput,
|
||||
4,
|
||||
self.eid,
|
||||
self.window,
|
||||
self.event_mask,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class QueryCapabilitiesRequest:
|
||||
"""PresentQueryCapabilities request."""
|
||||
|
||||
opcode: int
|
||||
target: int
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBH I",
|
||||
self.opcode,
|
||||
PresentQueryCapabilities,
|
||||
2,
|
||||
self.target,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class PixmapRequest:
|
||||
"""PresentPixmap request.
|
||||
|
||||
This is a large request with many fields. All fields are included
|
||||
to test that byte-swapping handles them all correctly.
|
||||
"""
|
||||
|
||||
opcode: int
|
||||
window: int
|
||||
pixmap: int
|
||||
serial: int = 0
|
||||
valid: int = 0 # region or None
|
||||
update: int = 0 # region or None
|
||||
x_off: int = 0
|
||||
y_off: int = 0
|
||||
target_crtc: int = 0
|
||||
wait_fence: int = 0
|
||||
idle_fence: int = 0
|
||||
options: int = 0
|
||||
target_msc: int = 0
|
||||
divisor: int = 0
|
||||
remainder: int = 0
|
||||
# notifies follow but we omit them for testing
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBH" # header: opcode, sub-opcode, length
|
||||
f"III" # window, pixmap, serial
|
||||
f"II" # valid, update
|
||||
f"hh" # x_off, y_off
|
||||
f"III" # target_crtc, wait_fence, idle_fence
|
||||
f"I" # options
|
||||
f"xxxx" # pad
|
||||
f"Q" # target_msc (CARD64)
|
||||
f"Q" # divisor (CARD64)
|
||||
f"Q", # remainder (CARD64)
|
||||
self.opcode,
|
||||
PresentPixmap,
|
||||
18, # 72 bytes = 18 words
|
||||
self.window,
|
||||
self.pixmap,
|
||||
self.serial,
|
||||
self.valid,
|
||||
self.update,
|
||||
self.x_off,
|
||||
self.y_off,
|
||||
self.target_crtc,
|
||||
self.wait_fence,
|
||||
self.idle_fence,
|
||||
self.options,
|
||||
self.target_msc,
|
||||
self.divisor,
|
||||
self.remainder,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class NotifyMSCRequest:
|
||||
"""PresentNotifyMSC request."""
|
||||
|
||||
opcode: int
|
||||
window: int
|
||||
serial: int = 0
|
||||
target_msc: int = 0
|
||||
divisor: int = 0
|
||||
remainder: int = 0
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBH"
|
||||
f"II" # window, serial
|
||||
f"xxxx" # pad
|
||||
f"Q" # target_msc
|
||||
f"Q" # divisor
|
||||
f"Q", # remainder
|
||||
self.opcode,
|
||||
PresentNotifyMSC,
|
||||
10, # 40 bytes = 10 words
|
||||
self.window,
|
||||
self.serial,
|
||||
self.target_msc,
|
||||
self.divisor,
|
||||
self.remainder,
|
||||
)
|
||||
|
|
@ -7,10 +7,17 @@ from dataclasses import dataclass
|
|||
|
||||
# RandR minor opcodes
|
||||
RRQueryVersion = 0
|
||||
RRSetScreenConfig = 2
|
||||
RRSelectInput = 4
|
||||
RRGetScreenResources = 8
|
||||
RRGetOutputInfo = 9
|
||||
RRCreateMode = 16
|
||||
RRChangeOutputProperty = 13
|
||||
RRGetOutputProperty = 15
|
||||
RRGetScreenResourcesCurrent = 25
|
||||
RRGetProviderInfo = 33
|
||||
RRCreateLease = 45
|
||||
RRFreeLease = 46
|
||||
|
||||
RR_MAJOR = 1
|
||||
RR_MINOR = 6
|
||||
|
|
@ -132,3 +139,127 @@ class GetOutputPropertyRequest:
|
|||
1 if self.pending else 0,
|
||||
0, # pad
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SetScreenConfigRequest:
|
||||
"""RRSetScreenConfig request (RandR 1.1 format with rate)."""
|
||||
|
||||
opcode: int
|
||||
drawable: int
|
||||
timestamp: int = 0
|
||||
config_timestamp: int = 0
|
||||
size_id: int = 0
|
||||
rotation: int = 1 # RR_Rotate_0
|
||||
rate: int = 0
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBH I II HH H xx",
|
||||
self.opcode,
|
||||
RRSetScreenConfig,
|
||||
6, # 24 bytes = 6 words
|
||||
self.drawable,
|
||||
self.timestamp,
|
||||
self.config_timestamp,
|
||||
self.size_id,
|
||||
self.rotation,
|
||||
self.rate,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class GetScreenResourcesRequest:
|
||||
"""RRGetScreenResources request."""
|
||||
|
||||
opcode: int
|
||||
window: int
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBHI",
|
||||
self.opcode,
|
||||
RRGetScreenResources,
|
||||
2,
|
||||
self.window,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class CreateLeaseRequest:
|
||||
"""RRCreateLease request.
|
||||
|
||||
Header is 16 bytes followed by nCrtcs CARD32 crtc IDs and
|
||||
nOutputs CARD32 output IDs.
|
||||
"""
|
||||
|
||||
opcode: int
|
||||
window: int
|
||||
lid: int
|
||||
crtcs: list[int] | None = None
|
||||
outputs: list[int] | None = None
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
crtc_list = self.crtcs or []
|
||||
output_list = self.outputs or []
|
||||
|
||||
crtc_data = b""
|
||||
for c in crtc_list:
|
||||
crtc_data += struct.pack(f"{byte_order}I", c)
|
||||
output_data = b""
|
||||
for o in output_list:
|
||||
output_data += struct.pack(f"{byte_order}I", o)
|
||||
|
||||
total = 16 + len(crtc_data) + len(output_data)
|
||||
pad_len = (4 - total % 4) % 4
|
||||
length = (total + pad_len) // 4
|
||||
|
||||
header = struct.pack(
|
||||
f"{byte_order}BBH II HH",
|
||||
self.opcode,
|
||||
RRCreateLease,
|
||||
length,
|
||||
self.window,
|
||||
self.lid,
|
||||
len(crtc_list),
|
||||
len(output_list),
|
||||
)
|
||||
return header + crtc_data + output_data + b"\x00" * pad_len
|
||||
|
||||
|
||||
@dataclass
|
||||
class SelectInputRequest:
|
||||
"""RRSelectInput request."""
|
||||
|
||||
opcode: int
|
||||
window: int
|
||||
enable: int = 0
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBH I H xx",
|
||||
self.opcode,
|
||||
RRSelectInput,
|
||||
3,
|
||||
self.window,
|
||||
self.enable,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class GetProviderInfoRequest:
|
||||
"""RRGetProviderInfo request."""
|
||||
|
||||
opcode: int
|
||||
provider: int
|
||||
config_timestamp: int = 0
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBH II",
|
||||
self.opcode,
|
||||
RRGetProviderInfo,
|
||||
3,
|
||||
self.provider,
|
||||
self.config_timestamp,
|
||||
)
|
||||
|
|
|
|||
193
test/pyxtest/proto/render.py
Normal file
193
test/pyxtest/proto/render.py
Normal file
|
|
@ -0,0 +1,193 @@
|
|||
# SPDX-License-Identifier: MIT
|
||||
#
|
||||
# Render extension protocol request builders
|
||||
|
||||
import struct
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
# Render minor opcodes
|
||||
RenderQueryVersion = 0
|
||||
RenderQueryPictFormats = 1
|
||||
RenderCreatePicture = 4
|
||||
RenderCreateGlyphSet = 17
|
||||
RenderCompositeGlyphs8 = 23
|
||||
RenderCompositeGlyphs16 = 24
|
||||
RenderCompositeGlyphs32 = 25
|
||||
RenderSetPictureFilter = 30
|
||||
|
||||
|
||||
@dataclass
|
||||
class QueryVersionRequest:
|
||||
"""RenderQueryVersion request."""
|
||||
|
||||
opcode: int
|
||||
major: int = 0
|
||||
minor: int = 11
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBHII",
|
||||
self.opcode,
|
||||
RenderQueryVersion,
|
||||
3,
|
||||
self.major,
|
||||
self.minor,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class QueryPictFormatsRequest:
|
||||
"""RenderQueryPictFormats request."""
|
||||
|
||||
opcode: int
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBH",
|
||||
self.opcode,
|
||||
RenderQueryPictFormats,
|
||||
1,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class CreatePictureRequest:
|
||||
"""RenderCreatePicture request."""
|
||||
|
||||
opcode: int
|
||||
picture_id: int
|
||||
drawable: int
|
||||
format_id: int
|
||||
value_mask: int = 0
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBHIIII",
|
||||
self.opcode,
|
||||
RenderCreatePicture,
|
||||
5,
|
||||
self.picture_id,
|
||||
self.drawable,
|
||||
self.format_id,
|
||||
self.value_mask,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class CreateGlyphSetRequest:
|
||||
"""RenderCreateGlyphSet request."""
|
||||
|
||||
opcode: int
|
||||
glyph_set_id: int
|
||||
format_id: int
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBHII",
|
||||
self.opcode,
|
||||
RenderCreateGlyphSet,
|
||||
3,
|
||||
self.glyph_set_id,
|
||||
self.format_id,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class _CompositeGlyphsRequestBase:
|
||||
"""Base class for RenderCompositeGlyphs{8,16,32} requests.
|
||||
|
||||
glyph_elts is raw xGlyphElt + glyph ID data.
|
||||
Subclasses set _minor_opcode for the glyph size (8/16/32 bit IDs).
|
||||
"""
|
||||
|
||||
_minor_opcode: int = field(default=0, init=False, repr=False)
|
||||
|
||||
opcode: int = 0
|
||||
src_picture: int = 0
|
||||
dst_picture: int = 0
|
||||
glyph_set: int = 0
|
||||
mask_format: int = 0
|
||||
op: int = 3 # PictOpOver
|
||||
src_x: int = 0
|
||||
src_y: int = 0
|
||||
glyph_elts: bytes = b""
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
total = 28 + len(self.glyph_elts)
|
||||
pad_len = (4 - total % 4) % 4
|
||||
length = (total + pad_len) // 4
|
||||
|
||||
header = struct.pack(
|
||||
f"{byte_order}BBH BxH IIII hh",
|
||||
self.opcode,
|
||||
self._minor_opcode,
|
||||
length,
|
||||
self.op,
|
||||
0, # pad2
|
||||
self.src_picture,
|
||||
self.dst_picture,
|
||||
self.mask_format,
|
||||
self.glyph_set,
|
||||
self.src_x,
|
||||
self.src_y,
|
||||
)
|
||||
return header + self.glyph_elts + b"\x00" * pad_len
|
||||
|
||||
|
||||
@dataclass
|
||||
class CompositeGlyphs8Request(_CompositeGlyphsRequestBase):
|
||||
"""RenderCompositeGlyphs8 request (minor opcode 23)."""
|
||||
|
||||
_minor_opcode: int = field(default=RenderCompositeGlyphs8, init=False, repr=False)
|
||||
|
||||
|
||||
@dataclass
|
||||
class CompositeGlyphs16Request(_CompositeGlyphsRequestBase):
|
||||
"""RenderCompositeGlyphs16 request (minor opcode 24)."""
|
||||
|
||||
_minor_opcode: int = field(default=RenderCompositeGlyphs16, init=False, repr=False)
|
||||
|
||||
|
||||
@dataclass
|
||||
class CompositeGlyphs32Request(_CompositeGlyphsRequestBase):
|
||||
"""RenderCompositeGlyphs32 request (minor opcode 25)."""
|
||||
|
||||
_minor_opcode: int = field(default=RenderCompositeGlyphs32, init=False, repr=False)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SetPictureFilterRequest:
|
||||
"""RenderSetPictureFilter request.
|
||||
|
||||
The filter name is a variable-length string followed by padding,
|
||||
then xFixed (CARD32) filter parameter values.
|
||||
"""
|
||||
|
||||
opcode: int
|
||||
picture: int
|
||||
filter_name: str = "nearest"
|
||||
params: list[int] | None = None # xFixed values (CARD32)
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
name_bytes = self.filter_name.encode("ascii")
|
||||
name_len = len(name_bytes)
|
||||
name_padded = name_bytes + b"\x00" * ((4 - name_len % 4) % 4)
|
||||
|
||||
param_data = b""
|
||||
if self.params:
|
||||
for p in self.params:
|
||||
param_data += struct.pack(f"{byte_order}I", p)
|
||||
|
||||
total = 12 + len(name_padded) + len(param_data)
|
||||
pad_len = (4 - total % 4) % 4
|
||||
length = (total + pad_len) // 4
|
||||
|
||||
header = struct.pack(
|
||||
f"{byte_order}BBH I H xx",
|
||||
self.opcode,
|
||||
RenderSetPictureFilter,
|
||||
length,
|
||||
self.picture,
|
||||
name_len,
|
||||
)
|
||||
return header + name_padded + param_data + b"\x00" * pad_len
|
||||
56
test/pyxtest/proto/shm.py
Normal file
56
test/pyxtest/proto/shm.py
Normal file
|
|
@ -0,0 +1,56 @@
|
|||
# SPDX-License-Identifier: MIT
|
||||
#
|
||||
# MIT-SHM extension protocol request builders for byteswap testing.
|
||||
|
||||
import struct
|
||||
from dataclasses import dataclass
|
||||
|
||||
# SHM minor opcodes
|
||||
ShmQueryVersion = 0
|
||||
ShmAttach = 1
|
||||
ShmDetach = 2
|
||||
ShmPutImage = 3
|
||||
ShmGetImage = 4
|
||||
ShmCreatePixmap = 5
|
||||
ShmAttachFd = 6
|
||||
ShmCreateSegment = 7
|
||||
|
||||
|
||||
@dataclass
|
||||
class QueryVersionRequest:
|
||||
"""ShmQueryVersion request."""
|
||||
|
||||
opcode: int
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBH",
|
||||
self.opcode,
|
||||
ShmQueryVersion,
|
||||
1,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class CreateSegmentRequest:
|
||||
"""ShmCreateSegment request.
|
||||
|
||||
Creates a shared memory segment owned by the server.
|
||||
Returns a file descriptor and whether the segment is read-only.
|
||||
"""
|
||||
|
||||
opcode: int
|
||||
shmseg: int
|
||||
size: int
|
||||
read_only: bool = False
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBH II Bxxx",
|
||||
self.opcode,
|
||||
ShmCreateSegment,
|
||||
4, # 16 bytes = 4 words
|
||||
self.shmseg,
|
||||
self.size,
|
||||
1 if self.read_only else 0,
|
||||
)
|
||||
198
test/pyxtest/proto/vidmode.py
Normal file
198
test/pyxtest/proto/vidmode.py
Normal file
|
|
@ -0,0 +1,198 @@
|
|||
# SPDX-License-Identifier: MIT
|
||||
#
|
||||
# XFree86-VidModeExtension protocol request builders for byteswap testing.
|
||||
|
||||
import struct
|
||||
from dataclasses import dataclass
|
||||
|
||||
# VidMode minor opcodes
|
||||
VidModeQueryVersion = 0
|
||||
VidModeGetModeLine = 1
|
||||
VidModeModModeLine = 2
|
||||
VidModeSwitchMode = 3
|
||||
VidModeGetMonitor = 4
|
||||
VidModeLockModeSwitch = 5
|
||||
VidModeGetAllModeLines = 6
|
||||
VidModeAddModeLine = 7
|
||||
VidModeDeleteModeLine = 8
|
||||
VidModeValidateModeLine = 9
|
||||
VidModeSwitchToMode = 10
|
||||
VidModeGetViewPort = 11
|
||||
VidModeSetViewPort = 12
|
||||
|
||||
|
||||
@dataclass
|
||||
class QueryVersionRequest:
|
||||
"""VidModeQueryVersion request."""
|
||||
|
||||
opcode: int
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBH",
|
||||
self.opcode,
|
||||
VidModeQueryVersion,
|
||||
1,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class GetModeLineRequest:
|
||||
"""VidModeGetModeLine request."""
|
||||
|
||||
opcode: int
|
||||
screen: int = 0
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBH H xx",
|
||||
self.opcode,
|
||||
VidModeGetModeLine,
|
||||
2,
|
||||
self.screen,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SwitchModeRequest:
|
||||
"""VidModeSwitchMode request."""
|
||||
|
||||
opcode: int
|
||||
screen: int = 0
|
||||
zoom: int = 1
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBH HH",
|
||||
self.opcode,
|
||||
VidModeSwitchMode,
|
||||
2,
|
||||
self.screen,
|
||||
self.zoom,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SwitchToModeRequest:
|
||||
"""VidModeSwitchToMode request (v2 format, 52 bytes).
|
||||
|
||||
xXF86VidModeSwitchToModeReq:
|
||||
reqType(1) + xf86vidmodeReqType(1) + length(2) +
|
||||
screen(4) + dotclock(4) +
|
||||
hdisplay(2) + hsyncstart(2) + hsyncend(2) + htotal(2) + hskew(2) +
|
||||
vdisplay(2) + vsyncstart(2) + vsyncend(2) + vtotal(2) + pad1(2) +
|
||||
flags(4) + reserved1(4) + reserved2(4) + reserved3(4) + privsize(4)
|
||||
"""
|
||||
|
||||
opcode: int
|
||||
screen: int = 0
|
||||
dotclock: int = 0
|
||||
hdisplay: int = 0
|
||||
hsyncstart: int = 0
|
||||
hsyncend: int = 0
|
||||
htotal: int = 0
|
||||
hskew: int = 0
|
||||
vdisplay: int = 0
|
||||
vsyncstart: int = 0
|
||||
vsyncend: int = 0
|
||||
vtotal: int = 0
|
||||
flags: int = 0
|
||||
privsize: int = 0
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBH" # header (4)
|
||||
f"I" # screen (4)
|
||||
f"I" # dotclock (4)
|
||||
f"HHHHH" # hdisplay, hsyncstart, hsyncend, htotal, hskew (10)
|
||||
f"HHHH" # vdisplay, vsyncstart, vsyncend, vtotal (8)
|
||||
f"xx" # pad1 (2)
|
||||
f"I" # flags (4)
|
||||
f"I" # reserved1 (4)
|
||||
f"I" # reserved2 (4)
|
||||
f"I" # reserved3 (4)
|
||||
f"I", # privsize (4)
|
||||
self.opcode,
|
||||
VidModeSwitchToMode,
|
||||
13, # 52 bytes = 13 words
|
||||
self.screen,
|
||||
self.dotclock,
|
||||
self.hdisplay,
|
||||
self.hsyncstart,
|
||||
self.hsyncend,
|
||||
self.htotal,
|
||||
self.hskew,
|
||||
self.vdisplay,
|
||||
self.vsyncstart,
|
||||
self.vsyncend,
|
||||
self.vtotal,
|
||||
self.flags,
|
||||
0, # reserved1
|
||||
0, # reserved2
|
||||
0, # reserved3
|
||||
self.privsize,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class GetAllModeLinesRequest:
|
||||
"""VidModeGetAllModeLines request."""
|
||||
|
||||
opcode: int
|
||||
screen: int = 0
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBH H xx",
|
||||
self.opcode,
|
||||
VidModeGetAllModeLines,
|
||||
2,
|
||||
self.screen,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ValidateModeLineRequest:
|
||||
"""VidModeValidateModeLine request (v2 format, 52 bytes).
|
||||
|
||||
Same layout as SwitchToModeRequest.
|
||||
"""
|
||||
|
||||
opcode: int
|
||||
screen: int = 0
|
||||
dotclock: int = 0
|
||||
hdisplay: int = 0
|
||||
hsyncstart: int = 0
|
||||
hsyncend: int = 0
|
||||
htotal: int = 0
|
||||
hskew: int = 0
|
||||
vdisplay: int = 0
|
||||
vsyncstart: int = 0
|
||||
vsyncend: int = 0
|
||||
vtotal: int = 0
|
||||
flags: int = 0
|
||||
privsize: int = 0
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBHIIHHHHHHHHHxxIIIII",
|
||||
self.opcode,
|
||||
VidModeValidateModeLine,
|
||||
13, # 52 bytes = 13 words
|
||||
self.screen,
|
||||
self.dotclock,
|
||||
self.hdisplay,
|
||||
self.hsyncstart,
|
||||
self.hsyncend,
|
||||
self.htotal,
|
||||
self.hskew,
|
||||
self.vdisplay,
|
||||
self.vsyncstart,
|
||||
self.vsyncend,
|
||||
self.vtotal,
|
||||
self.flags,
|
||||
0, # reserved1
|
||||
0, # reserved2
|
||||
0, # reserved3
|
||||
self.privsize,
|
||||
)
|
||||
|
|
@ -5,6 +5,9 @@
|
|||
import struct
|
||||
from dataclasses import dataclass
|
||||
|
||||
# XI (v1) minor opcodes
|
||||
XChangeDeviceControl = 35
|
||||
|
||||
# XI2 minor opcodes
|
||||
XIQueryVersion = 47
|
||||
XIPassiveGrabDevice = 54
|
||||
|
|
@ -43,6 +46,13 @@ PropModeAppend = 2
|
|||
VirtualCorePointer = 2
|
||||
VirtualCoreKeyboard = 3
|
||||
|
||||
# Device control types (for XChangeDeviceControl)
|
||||
DEVICE_RESOLUTION = 1
|
||||
DEVICE_ABS_CALIB = 2
|
||||
DEVICE_ABS_AREA = 3
|
||||
DEVICE_CORE = 4
|
||||
DEVICE_ENABLE = 5
|
||||
|
||||
|
||||
@dataclass
|
||||
class XIQueryVersionRequest:
|
||||
|
|
@ -225,3 +235,64 @@ class XIGetPropertyRequest:
|
|||
self.offset,
|
||||
self.length,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class XChangeDeviceControlRequest:
|
||||
"""XChangeDeviceControl request (XI v1, minor opcode 35).
|
||||
|
||||
The request header is 8 bytes (xChangeDeviceControlReq), followed by
|
||||
a device control structure that depends on the control type.
|
||||
For DEVICE_RESOLUTION, the control is xDeviceResolutionCtl (12 bytes)
|
||||
followed by num_valuators CARD32 values.
|
||||
"""
|
||||
|
||||
opcode: int
|
||||
control: int
|
||||
deviceid: int
|
||||
control_data: bytes = b""
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
total = 8 + len(self.control_data)
|
||||
pad_len = (4 - total % 4) % 4
|
||||
length = (total + pad_len) // 4
|
||||
|
||||
header = struct.pack(
|
||||
f"{byte_order}BBH HBx",
|
||||
self.opcode,
|
||||
XChangeDeviceControl,
|
||||
length,
|
||||
self.control,
|
||||
self.deviceid,
|
||||
)
|
||||
return header + self.control_data + b"\x00" * pad_len
|
||||
|
||||
|
||||
@dataclass
|
||||
class DeviceResolutionCtl:
|
||||
"""xDeviceResolutionCtl structure (8 bytes + valuator values).
|
||||
|
||||
control(2) + length(2) + first_valuator(1) + num_valuators(1) + pad(2)
|
||||
followed by num_valuators CARD32 resolution values.
|
||||
"""
|
||||
|
||||
first_valuator: int = 0
|
||||
num_valuators: int = 0
|
||||
resolutions: list[int] | None = None
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
vals = self.resolutions if self.resolutions is not None else []
|
||||
val_data = b""
|
||||
for v in vals:
|
||||
val_data += struct.pack(f"{byte_order}I", v)
|
||||
|
||||
ctl_length = (8 + len(val_data)) // 4
|
||||
|
||||
header = struct.pack(
|
||||
f"{byte_order}HH BB xx",
|
||||
DEVICE_RESOLUTION,
|
||||
ctl_length,
|
||||
self.first_valuator,
|
||||
self.num_valuators,
|
||||
)
|
||||
return header + val_data
|
||||
|
|
|
|||
102
test/pyxtest/proto/xinerama.py
Normal file
102
test/pyxtest/proto/xinerama.py
Normal file
|
|
@ -0,0 +1,102 @@
|
|||
# SPDX-License-Identifier: MIT
|
||||
#
|
||||
# Xinerama (PanoramiX / pseudoramiX) extension protocol request builders
|
||||
# for byteswap testing.
|
||||
|
||||
import struct
|
||||
from dataclasses import dataclass
|
||||
|
||||
# Xinerama minor opcodes
|
||||
PanoramiXQueryVersion = 0
|
||||
PanoramiXGetState = 1
|
||||
PanoramiXGetScreenCount = 2
|
||||
PanoramiXGetScreenSize = 3
|
||||
XineramaIsActive = 4
|
||||
XineramaQueryScreens = 5
|
||||
|
||||
|
||||
@dataclass
|
||||
class QueryVersionRequest:
|
||||
"""PanoramiXQueryVersion request."""
|
||||
|
||||
opcode: int
|
||||
major: int = 1
|
||||
minor: int = 1
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBHBB xx",
|
||||
self.opcode,
|
||||
PanoramiXQueryVersion,
|
||||
2,
|
||||
self.major,
|
||||
self.minor,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class GetStateRequest:
|
||||
"""PanoramiXGetState request."""
|
||||
|
||||
opcode: int
|
||||
window: int
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBH I",
|
||||
self.opcode,
|
||||
PanoramiXGetState,
|
||||
2,
|
||||
self.window,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class GetScreenCountRequest:
|
||||
"""PanoramiXGetScreenCount request."""
|
||||
|
||||
opcode: int
|
||||
window: int
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBH I",
|
||||
self.opcode,
|
||||
PanoramiXGetScreenCount,
|
||||
2,
|
||||
self.window,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class GetScreenSizeRequest:
|
||||
"""PanoramiXGetScreenSize request."""
|
||||
|
||||
opcode: int
|
||||
window: int
|
||||
screen: int
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBH II",
|
||||
self.opcode,
|
||||
PanoramiXGetScreenSize,
|
||||
3,
|
||||
self.window,
|
||||
self.screen,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class IsActiveRequest:
|
||||
"""XineramaIsActive request."""
|
||||
|
||||
opcode: int
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBH",
|
||||
self.opcode,
|
||||
XineramaIsActive,
|
||||
1,
|
||||
)
|
||||
85
test/pyxtest/proto/xres.py
Normal file
85
test/pyxtest/proto/xres.py
Normal file
|
|
@ -0,0 +1,85 @@
|
|||
# SPDX-License-Identifier: MIT
|
||||
#
|
||||
# X-Resource extension protocol request builders for byteswap testing.
|
||||
|
||||
import struct
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
# XRes minor opcodes
|
||||
XResQueryVersion = 0
|
||||
XResQueryClients = 1
|
||||
XResQueryClientResources = 2
|
||||
XResQueryClientPixmapBytes = 3
|
||||
XResQueryClientIds = 4
|
||||
XResQueryResourceBytes = 5
|
||||
|
||||
|
||||
@dataclass
|
||||
class QueryVersionRequest:
|
||||
"""XResQueryVersion request."""
|
||||
|
||||
opcode: int
|
||||
major: int = 1
|
||||
minor: int = 2
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBHBB xx",
|
||||
self.opcode,
|
||||
XResQueryVersion,
|
||||
2,
|
||||
self.major,
|
||||
self.minor,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class QueryClientsRequest:
|
||||
"""XResQueryClients request."""
|
||||
|
||||
opcode: int
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
return struct.pack(
|
||||
f"{byte_order}BBH",
|
||||
self.opcode,
|
||||
XResQueryClients,
|
||||
1,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class QueryClientIdsRequest:
|
||||
"""XResQueryClientIds request.
|
||||
|
||||
Followed by numSpecs xXResClientIdSpec entries (8 bytes each:
|
||||
client CARD32 + mask CARD32).
|
||||
"""
|
||||
|
||||
opcode: int
|
||||
specs: list[tuple[int, int]] = field(default_factory=list) # (client, mask) pairs
|
||||
num_specs_override: int | None = None
|
||||
|
||||
def to_bytes(self, byte_order: str = "<") -> bytes:
|
||||
num_specs = (
|
||||
self.num_specs_override
|
||||
if self.num_specs_override is not None
|
||||
else len(self.specs)
|
||||
)
|
||||
|
||||
spec_data = b""
|
||||
for client, mask in self.specs:
|
||||
spec_data += struct.pack(f"{byte_order}II", client, mask)
|
||||
|
||||
total = 8 + len(spec_data)
|
||||
pad_len = (4 - total % 4) % 4
|
||||
length = (total + pad_len) // 4
|
||||
|
||||
header = struct.pack(
|
||||
f"{byte_order}BBH I",
|
||||
self.opcode,
|
||||
XResQueryClientIds,
|
||||
length,
|
||||
num_specs,
|
||||
)
|
||||
return header + spec_data + b"\x00" * pad_len
|
||||
57
test/pyxtest/test_present.py
Normal file
57
test/pyxtest/test_present.py
Normal file
|
|
@ -0,0 +1,57 @@
|
|||
# SPDX-License-Identifier: MIT
|
||||
#
|
||||
# Tests for Present extension.
|
||||
|
||||
import pytest
|
||||
|
||||
from proto import present
|
||||
from xclient import Extension, X11Error
|
||||
|
||||
|
||||
class TestPresentSelectInput:
|
||||
@pytest.mark.swapped_client
|
||||
def test_present_select_input_eid_swapped(self, xserver, xclient_swapped):
|
||||
"""
|
||||
sproc_present_select_input was missing swapl(&stuff->eid).
|
||||
Without the swap, the eid fails LEGAL_NEW_RESOURCE because
|
||||
the client-bits portion of the garbled XID doesn't match
|
||||
clientAsMask → BadIDChoice error.
|
||||
|
||||
Fixed in commit a5ac3c871219 ("present: add missing byte
|
||||
swapping for various fields").
|
||||
"""
|
||||
conn = xclient_swapped
|
||||
|
||||
ext = conn.query_extension(Extension.PRESENT)
|
||||
if not ext:
|
||||
pytest.skip("Present extension not available")
|
||||
|
||||
req = present.QueryVersionRequest(opcode=ext.opcode)
|
||||
conn.send_request(req.to_bytes(">"))
|
||||
conn.recv_response(timeout=5.0)
|
||||
|
||||
win = conn.create_window()
|
||||
eid = conn.alloc_id()
|
||||
|
||||
# Use a non-zero event_mask so the server reaches
|
||||
# LEGAL_NEW_RESOURCE(eid). With event_mask=0 the server
|
||||
# returns Success immediately without validating the eid.
|
||||
PresentConfigureNotifyMask = 1
|
||||
req = present.SelectInputRequest(
|
||||
opcode=ext.opcode,
|
||||
eid=eid,
|
||||
window=win,
|
||||
event_mask=PresentConfigureNotifyMask,
|
||||
)
|
||||
conn.send_request(req.to_bytes(">"))
|
||||
responses = conn.flush_responses(timeout=1.0)
|
||||
|
||||
assert xserver.is_alive, "Server crashed"
|
||||
|
||||
# With the fix: no error (void request succeeds silently).
|
||||
# Without the fix: BadIDChoice error.
|
||||
errors = [r for r in responses if isinstance(r, X11Error)]
|
||||
assert len(errors) == 0, (
|
||||
f"PresentSelectInput returned error(s): {errors} - "
|
||||
"eid not swapped → BadIDChoice"
|
||||
)
|
||||
|
|
@ -45,6 +45,20 @@ def randr_xclient(xclient):
|
|||
return xclient, ext.opcode, output_id
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def randr_xclient_swapped(xclient_swapped):
|
||||
"""Provide a byte-swapped xclient with RandR initialized."""
|
||||
ext = xclient_swapped.query_extension(Extension.RANDR)
|
||||
if not ext:
|
||||
pytest.skip("RANDR extension not available")
|
||||
|
||||
req = randr.QueryVersionRequest(opcode=ext.opcode)
|
||||
xclient_swapped.send_request(req.to_bytes(">"))
|
||||
xclient_swapped.recv_response(timeout=5.0)
|
||||
|
||||
return xclient_swapped, ext.opcode
|
||||
|
||||
|
||||
class TestRandROutputProperty:
|
||||
"""Tests for RRChangeOutputProperty vulnerabilities."""
|
||||
|
||||
|
|
@ -169,3 +183,128 @@ class TestRandROutputProperty:
|
|||
f"Expected BadLength (16), got error code {resp.error_code} - "
|
||||
f"integer truncation not caught by length check"
|
||||
)
|
||||
|
||||
|
||||
class TestRandRSetScreenConfig:
|
||||
@pytest.mark.swapped_client
|
||||
def test_set_screen_config_config_timestamp_swapped(
|
||||
self, xserver, randr_xclient_swapped
|
||||
):
|
||||
"""
|
||||
SProcRRSetScreenConfig was missing swapl(&stuff->configTimestamp).
|
||||
|
||||
First query the server's configTimestamp via GetScreenResources,
|
||||
then send it back in SetScreenConfig. Without the fix, the
|
||||
unswapped configTimestamp won't match → the reply has
|
||||
status=RRSetConfigInvalidConfigTime (1) instead of
|
||||
RRSetConfigSuccess (0).
|
||||
|
||||
Fixed in commit ac45f9b29e3a ("randr: add missing byte swapping
|
||||
for various fields").
|
||||
"""
|
||||
conn, opcode = randr_xclient_swapped
|
||||
|
||||
# Get screen resources to obtain the configTimestamp
|
||||
req = randr.GetScreenResourcesCurrentRequest(
|
||||
opcode=opcode,
|
||||
window=conn.root_window,
|
||||
)
|
||||
conn.send_request(req.to_bytes(">"))
|
||||
resp = conn.recv_response(timeout=5.0)
|
||||
|
||||
assert isinstance(resp, X11Reply), f"Expected reply, got {resp}"
|
||||
assert len(resp.data) >= 32, "Reply too short"
|
||||
|
||||
# xRRGetScreenResourcesReply:
|
||||
# [8] timestamp(4)
|
||||
# [12] configTimestamp(4)
|
||||
config_ts = struct.unpack_from(">I", resp.data, 12)[0]
|
||||
|
||||
req = randr.SetScreenConfigRequest(
|
||||
opcode=opcode,
|
||||
drawable=conn.root_window,
|
||||
timestamp=0, # CurrentTime
|
||||
config_timestamp=config_ts,
|
||||
size_id=0,
|
||||
rotation=1,
|
||||
)
|
||||
conn.send_request(req.to_bytes(">"))
|
||||
resp = conn.recv_response(timeout=5.0)
|
||||
|
||||
assert xserver.is_alive, "Server crashed"
|
||||
assert isinstance(resp, X11Reply), f"Expected reply, got {resp}"
|
||||
|
||||
# xRRSetScreenConfigReply:
|
||||
# [1] status
|
||||
# RRSetConfigSuccess = 0
|
||||
# RRSetConfigInvalidConfigTime = 1
|
||||
# RRSetConfigInvalidTime = 2
|
||||
# RRSetConfigFailed = 3
|
||||
#
|
||||
# Without the fix, the unswapped configTimestamp fails
|
||||
# the equality check → status 1 (RRSetConfigInvalidConfigTime).
|
||||
status = resp.data[1]
|
||||
assert status != 1, (
|
||||
"SetScreenConfig status = 1 "
|
||||
"(RRSetConfigInvalidConfigTime) - configTimestamp "
|
||||
"was not byte-swapped correctly."
|
||||
)
|
||||
|
||||
|
||||
class TestRandRCreateLease:
|
||||
@pytest.mark.swapped_client
|
||||
def test_create_lease_lid_swapped(self, xserver, randr_xclient_swapped):
|
||||
"""
|
||||
SProcRRCreateLease was missing swapl(&stuff->lid).
|
||||
Without the swap, the garbled lid fails LEGAL_NEW_RESOURCE
|
||||
→ BadIDChoice error (error code 14).
|
||||
|
||||
With the fix, the request should succeed far enough to reach
|
||||
the crtc/output validation (possibly returning BadValue for
|
||||
the empty lists, but NOT BadIDChoice).
|
||||
|
||||
Fixed in commit ac45f9b29e3a ("randr: add missing byte swapping
|
||||
for various fields").
|
||||
"""
|
||||
conn, opcode = randr_xclient_swapped
|
||||
|
||||
lid = conn.alloc_id()
|
||||
|
||||
# Get screen resources to find a crtc and output
|
||||
req = randr.GetScreenResourcesCurrentRequest(
|
||||
opcode=opcode,
|
||||
window=conn.root_window,
|
||||
)
|
||||
conn.send_request(req.to_bytes(">"))
|
||||
resp = conn.recv_response(timeout=5.0)
|
||||
assert isinstance(resp, X11Reply), f"Expected reply, got {resp}"
|
||||
|
||||
n_crtcs = struct.unpack_from(">H", resp.data, 16)[0]
|
||||
n_outputs = struct.unpack_from(">H", resp.data, 18)[0]
|
||||
|
||||
crtcs = []
|
||||
outputs = []
|
||||
if n_crtcs > 0:
|
||||
crtcs = [struct.unpack_from(">I", resp.data, 32)[0]]
|
||||
if n_outputs > 0:
|
||||
offset = 32 + n_crtcs * 4
|
||||
outputs = [struct.unpack_from(">I", resp.data, offset)[0]]
|
||||
|
||||
req = randr.CreateLeaseRequest(
|
||||
opcode=opcode,
|
||||
window=conn.root_window,
|
||||
lid=lid,
|
||||
crtcs=crtcs,
|
||||
outputs=outputs,
|
||||
)
|
||||
conn.send_request(req.to_bytes(">"))
|
||||
resp = conn.recv_response(timeout=5.0)
|
||||
|
||||
assert xserver.is_alive, "Server crashed"
|
||||
|
||||
# Without the fix: BadIDChoice (error code 14).
|
||||
# With the fix: either success or some other error.
|
||||
if isinstance(resp, X11Error):
|
||||
assert resp.error_code != 14, (
|
||||
"CreateLease returned BadIDChoice (error 14) - lid not byte-swapped"
|
||||
)
|
||||
|
|
|
|||
113
test/pyxtest/test_render.py
Normal file
113
test/pyxtest/test_render.py
Normal file
|
|
@ -0,0 +1,113 @@
|
|||
# SPDX-License-Identifier: MIT
|
||||
#
|
||||
# Security tests for Render extension vulnerabilities.
|
||||
|
||||
import struct
|
||||
|
||||
import pytest
|
||||
|
||||
from proto import render
|
||||
from xclient import Extension, X11Error, X11Reply
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def render_xclient_swapped(xclient_swapped):
|
||||
"""Provide a byte-swapped xclient with Render initialized."""
|
||||
ext = xclient_swapped.query_extension(Extension.RENDER)
|
||||
if not ext:
|
||||
pytest.skip("RENDER extension not available")
|
||||
|
||||
req = render.QueryVersionRequest(opcode=ext.opcode)
|
||||
xclient_swapped.send_request(req.to_bytes(">"))
|
||||
xclient_swapped.recv_response(timeout=5.0)
|
||||
|
||||
return xclient_swapped, ext.opcode
|
||||
|
||||
|
||||
class TestRenderSetPictureFilter:
|
||||
@pytest.mark.swapped_client
|
||||
def test_set_picture_filter_convolution_params_swapped(
|
||||
self, xserver, render_xclient_swapped
|
||||
):
|
||||
"""
|
||||
SProcRenderSetPictureFilter was missing SwapLongs() for the
|
||||
xFixed filter parameter values.
|
||||
|
||||
Set a 3x1 convolution filter with params [3.0, 1.0, 1.0, 1.0, 1.0]
|
||||
(in xFixed: [0x00030000, 0x00010000, ...]). Without the swap,
|
||||
the server sees garbled width/height and rejects with BadMatch.
|
||||
With the swap, it succeeds.
|
||||
|
||||
Fixed in commit c98273d0bc00 ("render: add missing byte-swap of
|
||||
filter params in SProcRenderSetPictureFilter").
|
||||
"""
|
||||
conn, opcode = render_xclient_swapped
|
||||
|
||||
# Get a PictFormat that matches the root depth.
|
||||
# xRenderQueryPictFormatsReply header (32 bytes):
|
||||
# [8] numFormats(4)
|
||||
# followed by numFormats xPictFormInfo entries (28 bytes each):
|
||||
# [0] id(4) [4] type(1) [5] depth(1) ...
|
||||
req = render.QueryPictFormatsRequest(opcode=opcode)
|
||||
conn.send_request(req.to_bytes(">"))
|
||||
resp = conn.recv_response(timeout=5.0)
|
||||
|
||||
assert isinstance(resp, X11Reply), "QueryPictFormats failed"
|
||||
num_formats = struct.unpack_from(">I", resp.data, 8)[0]
|
||||
|
||||
format_id = 0
|
||||
for i in range(num_formats):
|
||||
off = 32 + i * 28
|
||||
if off + 6 > len(resp.data):
|
||||
break
|
||||
fid = struct.unpack_from(">I", resp.data, off)[0]
|
||||
fdepth = resp.data[off + 5]
|
||||
if fdepth == conn.root_depth:
|
||||
format_id = fid
|
||||
break
|
||||
|
||||
if format_id == 0:
|
||||
pytest.skip("No PictFormat matching root depth")
|
||||
|
||||
pix = conn.create_pixmap(width=10, height=10)
|
||||
pic = conn.alloc_id()
|
||||
|
||||
req = render.CreatePictureRequest(
|
||||
opcode=opcode,
|
||||
picture_id=pic,
|
||||
drawable=pix,
|
||||
format_id=format_id,
|
||||
)
|
||||
conn.send_request(req.to_bytes(">"))
|
||||
errors = conn.flush_responses(timeout=0.5)
|
||||
create_errors = [r for r in errors if isinstance(r, X11Error)]
|
||||
assert len(create_errors) == 0, f"CreatePicture failed: {create_errors}"
|
||||
|
||||
# Set convolution filter: 3x1 kernel with all weights = 1.0
|
||||
# xFixed 3.0 = 0x00030000, xFixed 1.0 = 0x00010000
|
||||
# params: [width=3, height=1, k0=1.0, k1=1.0, k2=1.0]
|
||||
req = render.SetPictureFilterRequest(
|
||||
opcode=opcode,
|
||||
picture=pic,
|
||||
filter_name="convolution",
|
||||
params=[
|
||||
0x00030000,
|
||||
0x00010000,
|
||||
0x00010000,
|
||||
0x00010000,
|
||||
0x00010000,
|
||||
],
|
||||
)
|
||||
conn.send_request(req.to_bytes(">"))
|
||||
responses = conn.flush_responses(timeout=1.0)
|
||||
|
||||
assert xserver.is_alive, "Server crashed"
|
||||
|
||||
# With the fix: no error (filter set successfully).
|
||||
# Without the fix: BadMatch because
|
||||
# convolutionFilterValidateParams rejects the garbled params.
|
||||
errors = [r for r in responses if isinstance(r, X11Error)]
|
||||
assert len(errors) == 0, (
|
||||
f"SetPictureFilter returned error(s): {errors} - "
|
||||
"filter params not byte-swapped → BadMatch"
|
||||
)
|
||||
|
|
@ -7,7 +7,7 @@ import time
|
|||
import pytest
|
||||
|
||||
from proto import screensaver
|
||||
from xclient import Extension, RawX11Connection, X11ConnectionError
|
||||
from xclient import Extension
|
||||
|
||||
|
||||
class TestScreenSaverSuspend:
|
||||
|
|
@ -15,7 +15,7 @@ class TestScreenSaverSuspend:
|
|||
|
||||
@pytest.mark.swapped_client
|
||||
@pytest.mark.asan
|
||||
def test_suspend_swap_before_size_check(self, xserver):
|
||||
def test_suspend_swap_before_size_check(self, xserver, xclient_swapped):
|
||||
"""
|
||||
CVE-2021-4010 / ZDI-CAN-14951: SProcScreenSaverSuspend() did
|
||||
swapl() on stuff->suspend before REQUEST_SIZE_MATCH, so a
|
||||
|
|
@ -26,29 +26,21 @@ class TestScreenSaverSuspend:
|
|||
Fixed in commit 6c4c53010772 ("Xext: Fix out of bounds access
|
||||
in SProcScreenSaverSuspend()").
|
||||
"""
|
||||
try:
|
||||
conn = RawX11Connection(xserver.display_num, swapped=True)
|
||||
except X11ConnectionError as e:
|
||||
if "endian" in str(e).lower():
|
||||
pytest.skip("Server does not accept big-endian clients")
|
||||
raise
|
||||
conn = xclient_swapped
|
||||
|
||||
try:
|
||||
ext = conn.query_extension(Extension.MIT_SCREEN_SAVER)
|
||||
if not ext:
|
||||
pytest.skip("MIT-SCREEN-SAVER extension not available")
|
||||
ext = conn.query_extension(Extension.MIT_SCREEN_SAVER)
|
||||
if not ext:
|
||||
pytest.skip("MIT-SCREEN-SAVER extension not available")
|
||||
|
||||
# Send a valid ScreenSaverSuspend (the fix ensures proper
|
||||
# validation order: size check before swap).
|
||||
req = screensaver.SuspendRequest(
|
||||
opcode=ext.opcode,
|
||||
suspend=1,
|
||||
)
|
||||
conn.send_request(req.to_bytes(">"))
|
||||
time.sleep(0.5)
|
||||
# Send a valid ScreenSaverSuspend (the fix ensures proper
|
||||
# validation order: size check before swap).
|
||||
req = screensaver.SuspendRequest(
|
||||
opcode=ext.opcode,
|
||||
suspend=1,
|
||||
)
|
||||
conn.send_request(req.to_bytes(">"))
|
||||
time.sleep(0.5)
|
||||
|
||||
assert xserver.is_alive, (
|
||||
"Server crashed - SProcScreenSaverSuspend (CVE-2021-4010)"
|
||||
)
|
||||
finally:
|
||||
conn.close()
|
||||
assert xserver.is_alive, (
|
||||
"Server crashed - SProcScreenSaverSuspend (CVE-2021-4010)"
|
||||
)
|
||||
|
|
|
|||
82
test/pyxtest/test_shm.py
Normal file
82
test/pyxtest/test_shm.py
Normal file
|
|
@ -0,0 +1,82 @@
|
|||
# SPDX-License-Identifier: MIT
|
||||
#
|
||||
# Tests for MIT-SHM extension.
|
||||
|
||||
import struct
|
||||
|
||||
import pytest
|
||||
|
||||
from proto import shm
|
||||
from xclient import Extension, X11Reply
|
||||
|
||||
|
||||
class TestShmCreateSegment:
|
||||
@pytest.mark.swapped_client
|
||||
def test_create_segment_reply_sequence_swapped(self, xserver, xclient_swapped):
|
||||
"""
|
||||
ProcShmCreateSegment was missing swaps(&rep.sequenceNumber) and
|
||||
swapl(&rep.length) for byte-swapped clients.
|
||||
|
||||
Without the fix the sequence number in the reply is
|
||||
byte-reversed. We send the request and verify that the
|
||||
reply's sequence number matches the expected value.
|
||||
|
||||
ShmCreateSegment requires SHM fd-passing (SHM 1.2+) and a
|
||||
local connection (Unix socket). The reply also carries an fd
|
||||
via SCM_RIGHTS which our simple recv() silently drops — that's
|
||||
fine, we only need to check the header fields.
|
||||
|
||||
Fixed in commit c49c150dcfb7 ("Xext/shm: add missing reply
|
||||
byte-swap in ProcShmCreateSegment").
|
||||
"""
|
||||
conn = xclient_swapped
|
||||
|
||||
ext = conn.query_extension(Extension.MIT_SHM)
|
||||
if not ext:
|
||||
pytest.skip("MIT-SHM extension not available")
|
||||
|
||||
# Query SHM version
|
||||
req = shm.QueryVersionRequest(opcode=ext.opcode)
|
||||
conn.send_request(req.to_bytes(">"))
|
||||
resp = conn.recv_response(timeout=5.0)
|
||||
|
||||
if not isinstance(resp, X11Reply):
|
||||
pytest.skip("SHM QueryVersion failed")
|
||||
|
||||
# Check for SHM 1.2+ (fd-passing / CreateSegment support)
|
||||
# Reply: type(1) + sharedPixmaps(1) + seq(2) + length(4) +
|
||||
# majorVersion(2) + minorVersion(2) + ...
|
||||
if len(resp.data) < 12:
|
||||
pytest.skip("SHM QueryVersion reply too short")
|
||||
|
||||
major = struct.unpack_from(">H", resp.data, 8)[0]
|
||||
minor = struct.unpack_from(">H", resp.data, 10)[0]
|
||||
if major < 1 or (major == 1 and minor < 2):
|
||||
pytest.skip(f"SHM {major}.{minor} does not support CreateSegment")
|
||||
|
||||
shmseg_id = conn.alloc_id()
|
||||
seq = conn.send_request(
|
||||
shm.CreateSegmentRequest(
|
||||
opcode=ext.opcode,
|
||||
shmseg=shmseg_id,
|
||||
size=4096,
|
||||
read_only=False,
|
||||
).to_bytes(">")
|
||||
)
|
||||
|
||||
# Read the 32-byte reply header. The fd arrives as
|
||||
# ancillary data which plain recv() drops, but the data
|
||||
# bytes still arrive on the stream.
|
||||
resp = conn.recv_response(timeout=5.0)
|
||||
|
||||
assert xserver.is_alive, "Server crashed"
|
||||
assert isinstance(resp, X11Reply), f"Expected reply, got {resp}"
|
||||
|
||||
# Verify the sequence number was swapped correctly.
|
||||
reply_seq = struct.unpack_from(">H", resp.data, 2)[0]
|
||||
assert reply_seq == (seq & 0xFFFF), (
|
||||
f"Reply sequence = {reply_seq:#06x}, "
|
||||
f"expected {seq & 0xFFFF:#06x}. "
|
||||
f"sequenceNumber not byte-swapped in "
|
||||
f"ProcShmCreateSegment reply."
|
||||
)
|
||||
106
test/pyxtest/test_vidmode.py
Normal file
106
test/pyxtest/test_vidmode.py
Normal file
|
|
@ -0,0 +1,106 @@
|
|||
# SPDX-License-Identifier: MIT
|
||||
#
|
||||
# Tests for XFree86-VidModeExtension.
|
||||
|
||||
import struct
|
||||
|
||||
import pytest
|
||||
|
||||
from proto import vidmode
|
||||
from xclient import Extension, X11Error, X11Reply
|
||||
|
||||
|
||||
class TestVidModeSwitchToMode:
|
||||
@pytest.mark.swapped_client
|
||||
@pytest.mark.xorg_only
|
||||
def test_switch_to_mode_fields_swapped(self, xserver, xclient_swapped):
|
||||
"""
|
||||
SProcVidModeSwitchToMode previously only swapped stuff->screen.
|
||||
All mode-line fields (dotclock, hdisplay, …) were left in
|
||||
wire byte order, so ProcVidModeSwitchToMode could never match
|
||||
them against any existing mode → BadValue.
|
||||
|
||||
Strategy: query the current mode with GetModeLine, then send
|
||||
the same timings back via SwitchToMode. With the fix the
|
||||
values match → success. Without the fix the garbled timings
|
||||
don't match anything → BadValue error.
|
||||
|
||||
Fixed in commit 751e631e1c99 ("Xext/vidmode: fix
|
||||
SProcVidModeSwitchToMode swapping only screen field").
|
||||
"""
|
||||
conn = xclient_swapped
|
||||
|
||||
ext = conn.query_extension(Extension.XF86VIDMODE)
|
||||
if not ext:
|
||||
pytest.skip("XF86-VidModeExtension not available")
|
||||
|
||||
req = vidmode.QueryVersionRequest(opcode=ext.opcode)
|
||||
conn.send_request(req.to_bytes(">"))
|
||||
resp = conn.recv_response(timeout=5.0)
|
||||
if not isinstance(resp, X11Reply):
|
||||
pytest.skip("VidMode QueryVersion failed")
|
||||
|
||||
# Get the current mode line so we can echo it back.
|
||||
# xXF86VidModeGetModeLineReply (v2, 52 bytes):
|
||||
# [8] dotclock(4)
|
||||
# [12] hdisplay(2) hsyncstart(2)
|
||||
# [16] hsyncend(2) htotal(2)
|
||||
# [20] hskew(4)
|
||||
# [24] vdisplay(2) vsyncstart(2)
|
||||
# [28] vsyncend(2) vtotal(2)
|
||||
# [32] flags(4)
|
||||
# [36] privsize(4)
|
||||
req = vidmode.GetModeLineRequest(
|
||||
opcode=ext.opcode,
|
||||
screen=0,
|
||||
)
|
||||
conn.send_request(req.to_bytes(">"))
|
||||
resp = conn.recv_response(timeout=5.0)
|
||||
|
||||
if isinstance(resp, X11Error):
|
||||
pytest.skip("GetModeLine not supported (VidMode not initialised?)")
|
||||
assert isinstance(resp, X11Reply), f"Expected reply, got {resp}"
|
||||
assert len(resp.data) >= 40, f"Reply too short: {len(resp.data)}"
|
||||
|
||||
dotclock = struct.unpack_from(">I", resp.data, 8)[0]
|
||||
hdisplay = struct.unpack_from(">H", resp.data, 12)[0]
|
||||
hsyncstart = struct.unpack_from(">H", resp.data, 14)[0]
|
||||
hsyncend = struct.unpack_from(">H", resp.data, 16)[0]
|
||||
htotal = struct.unpack_from(">H", resp.data, 18)[0]
|
||||
hskew = struct.unpack_from(">I", resp.data, 20)[0]
|
||||
vdisplay = struct.unpack_from(">H", resp.data, 24)[0]
|
||||
vsyncstart = struct.unpack_from(">H", resp.data, 26)[0]
|
||||
vsyncend = struct.unpack_from(">H", resp.data, 28)[0]
|
||||
vtotal = struct.unpack_from(">H", resp.data, 30)[0]
|
||||
flags = struct.unpack_from(">I", resp.data, 32)[0]
|
||||
|
||||
# Switch to the same mode — should succeed.
|
||||
req = vidmode.SwitchToModeRequest(
|
||||
opcode=ext.opcode,
|
||||
screen=0,
|
||||
dotclock=dotclock,
|
||||
hdisplay=hdisplay,
|
||||
hsyncstart=hsyncstart,
|
||||
hsyncend=hsyncend,
|
||||
htotal=htotal,
|
||||
hskew=hskew,
|
||||
vdisplay=vdisplay,
|
||||
vsyncstart=vsyncstart,
|
||||
vsyncend=vsyncend,
|
||||
vtotal=vtotal,
|
||||
flags=flags,
|
||||
privsize=0,
|
||||
)
|
||||
conn.send_request(req.to_bytes(">"))
|
||||
resp = conn.recv_response(timeout=2.0)
|
||||
|
||||
assert xserver.is_alive, "Server crashed"
|
||||
# With the fix: no response (void request, success).
|
||||
# Without the fix: BadValue error because the garbled
|
||||
# timing fields don't match any mode.
|
||||
if resp is not None:
|
||||
assert not isinstance(resp, X11Error), (
|
||||
f"SwitchToMode returned error {resp} - "
|
||||
f"mode-line fields not byte-swapped in "
|
||||
f"SProcVidModeSwitchToMode"
|
||||
)
|
||||
|
|
@ -236,3 +236,64 @@ class TestXIChangeProperty:
|
|||
assert values == (1, 2, 10, 20, 30), (
|
||||
f"Expected (1, 2, 10, 20, 30), got {values}"
|
||||
)
|
||||
|
||||
|
||||
class TestXIChangeDeviceControl:
|
||||
@pytest.mark.swapped_client
|
||||
@pytest.mark.xorg_only
|
||||
def test_change_device_control_resolution_values_swapped(
|
||||
self, xserver, xclient_swapped
|
||||
):
|
||||
"""
|
||||
SProcXChangeDeviceControl did not byte-swap the resolution
|
||||
values array for DEVICE_RESOLUTION.
|
||||
|
||||
Send a ChangeDeviceControl/DEVICE_RESOLUTION with a resolution
|
||||
value that is valid in native byte order but out-of-range when
|
||||
byte-reversed (e.g. 1000 = 0x000003E8 → 0xE8030000 reversed).
|
||||
|
||||
Without the fix: the garbled value exceeds max_resolution →
|
||||
BadValue error.
|
||||
With the fix: the correct value is in range → success reply
|
||||
(or BadMatch if the device doesn't support it, but not BadValue).
|
||||
|
||||
Fixed in commit e24bd73e9d6f ("Xi: add missing byte-swap of
|
||||
resolution values in SProcXChangeDeviceControl").
|
||||
"""
|
||||
conn = xclient_swapped
|
||||
|
||||
ext = conn.query_extension(Extension.XI)
|
||||
if not ext:
|
||||
pytest.skip("XInput extension not available")
|
||||
|
||||
req = xi.XIQueryVersionRequest(opcode=ext.opcode)
|
||||
conn.send_request(req.to_bytes(">"))
|
||||
conn.recv_response(timeout=5.0)
|
||||
|
||||
ctl = xi.DeviceResolutionCtl(
|
||||
first_valuator=0,
|
||||
num_valuators=1,
|
||||
resolutions=[1000],
|
||||
)
|
||||
ctl_bytes = ctl.to_bytes(">")
|
||||
|
||||
req = xi.XChangeDeviceControlRequest(
|
||||
opcode=ext.opcode,
|
||||
control=xi.DEVICE_RESOLUTION,
|
||||
deviceid=xi.VirtualCorePointer,
|
||||
control_data=ctl_bytes,
|
||||
)
|
||||
conn.send_request(req.to_bytes(">"))
|
||||
resp = conn.recv_response(timeout=2.0)
|
||||
|
||||
assert xserver.is_alive, "Server crashed"
|
||||
|
||||
# Without the fix: BadValue (error code 2) because the
|
||||
# byte-reversed resolution 0xE8030000 exceeds max_resolution.
|
||||
# With the fix: either a reply (success) or BadMatch (device
|
||||
# doesn't support resolution control), but NOT BadValue.
|
||||
if isinstance(resp, X11Error):
|
||||
assert resp.error_code != 2, (
|
||||
"ChangeDeviceControl returned BadValue (error 2) - "
|
||||
"resolution values not byte-swapped"
|
||||
)
|
||||
|
|
|
|||
100
test/pyxtest/test_xinerama.py
Normal file
100
test/pyxtest/test_xinerama.py
Normal file
|
|
@ -0,0 +1,100 @@
|
|||
# SPDX-License-Identifier: MIT
|
||||
#
|
||||
# Tests for XINERAMA (PanoramiX / pseudoramiX) extension.
|
||||
|
||||
import pytest
|
||||
|
||||
from proto import xinerama
|
||||
from xclient import Extension, X11Reply
|
||||
|
||||
|
||||
class TestPseudoramiXGetState:
|
||||
@pytest.mark.swapped_client
|
||||
def test_xinerama_get_state_window_swapped(self, xserver, xclient_swapped):
|
||||
"""
|
||||
SProcPseudoramiXGetState was missing swapl(&stuff->window).
|
||||
Without it, dixLookupWindow fails → BadWindow error.
|
||||
|
||||
Fixed in commit 6c51a0f9053c ("pseudoramiX: add missing byte
|
||||
swapping in various fields").
|
||||
"""
|
||||
conn = xclient_swapped
|
||||
|
||||
ext = conn.query_extension(Extension.XINERAMA)
|
||||
if not ext:
|
||||
pytest.skip("XINERAMA extension not available")
|
||||
|
||||
req = xinerama.GetStateRequest(
|
||||
opcode=ext.opcode,
|
||||
window=conn.root_window,
|
||||
)
|
||||
conn.send_request(req.to_bytes(">"))
|
||||
resp = conn.recv_response(timeout=5.0)
|
||||
|
||||
assert xserver.is_alive, "Server crashed"
|
||||
assert isinstance(resp, X11Reply), (
|
||||
f"Expected reply from GetState, got {resp} - "
|
||||
"window field not swapped → BadWindow"
|
||||
)
|
||||
|
||||
|
||||
class TestPseudoramiXGetScreenCount:
|
||||
@pytest.mark.swapped_client
|
||||
def test_xinerama_get_screen_count_window_swapped(self, xserver, xclient_swapped):
|
||||
"""
|
||||
SProcPseudoramiXGetScreenCount was missing swapl(&stuff->window).
|
||||
Without it, dixLookupWindow fails → BadWindow error.
|
||||
|
||||
Fixed in commit 6c51a0f9053c ("pseudoramiX: add missing byte
|
||||
swapping in various fields").
|
||||
"""
|
||||
conn = xclient_swapped
|
||||
|
||||
ext = conn.query_extension(Extension.XINERAMA)
|
||||
if not ext:
|
||||
pytest.skip("XINERAMA extension not available")
|
||||
|
||||
req = xinerama.GetScreenCountRequest(
|
||||
opcode=ext.opcode,
|
||||
window=conn.root_window,
|
||||
)
|
||||
conn.send_request(req.to_bytes(">"))
|
||||
resp = conn.recv_response(timeout=5.0)
|
||||
|
||||
assert xserver.is_alive, "Server crashed"
|
||||
assert isinstance(resp, X11Reply), (
|
||||
f"Expected reply from GetScreenCount, got {resp} - "
|
||||
"window field not swapped → BadWindow"
|
||||
)
|
||||
|
||||
|
||||
class TestPseudoramiXGetScreenSize:
|
||||
@pytest.mark.swapped_client
|
||||
def test_xinerama_get_screen_size_fields_swapped(self, xserver, xclient_swapped):
|
||||
"""
|
||||
SProcPseudoramiXGetScreenSize was missing swapl(&stuff->window)
|
||||
and swapl(&stuff->screen). Without window swap,
|
||||
dixLookupWindow fails → BadWindow error.
|
||||
|
||||
Fixed in commit 6c51a0f9053c ("pseudoramiX: add missing byte
|
||||
swapping in various fields").
|
||||
"""
|
||||
conn = xclient_swapped
|
||||
|
||||
ext = conn.query_extension(Extension.XINERAMA)
|
||||
if not ext:
|
||||
pytest.skip("XINERAMA extension not available")
|
||||
|
||||
req = xinerama.GetScreenSizeRequest(
|
||||
opcode=ext.opcode,
|
||||
window=conn.root_window,
|
||||
screen=0,
|
||||
)
|
||||
conn.send_request(req.to_bytes(">"))
|
||||
resp = conn.recv_response(timeout=5.0)
|
||||
|
||||
assert xserver.is_alive, "Server crashed"
|
||||
assert isinstance(resp, X11Reply), (
|
||||
f"Expected reply from GetScreenSize, got {resp} - "
|
||||
"window/screen fields not swapped"
|
||||
)
|
||||
139
test/pyxtest/test_xres.py
Normal file
139
test/pyxtest/test_xres.py
Normal file
|
|
@ -0,0 +1,139 @@
|
|||
# SPDX-License-Identifier: MIT
|
||||
#
|
||||
# Tests for X-Resource extension.
|
||||
|
||||
import struct
|
||||
|
||||
import pytest
|
||||
|
||||
from proto import xres
|
||||
from xclient import Extension, X11Reply
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def xres_xclient_swapped(xclient_swapped):
|
||||
"""Provide a byte-swapped xclient with X-Resource initialized."""
|
||||
ext = xclient_swapped.query_extension(Extension.XRES)
|
||||
if not ext:
|
||||
pytest.skip("X-Resource extension not available")
|
||||
|
||||
req = xres.QueryVersionRequest(opcode=ext.opcode)
|
||||
xclient_swapped.send_request(req.to_bytes(">"))
|
||||
resp = xclient_swapped.recv_response(timeout=5.0)
|
||||
if not isinstance(resp, X11Reply):
|
||||
pytest.skip("XRes QueryVersion failed")
|
||||
|
||||
return xclient_swapped, ext.opcode
|
||||
|
||||
|
||||
class TestXResQueryClientIds:
|
||||
@pytest.mark.swapped_client
|
||||
def test_query_client_ids_spec_entries_swapped(self, xserver, xres_xclient_swapped):
|
||||
"""
|
||||
SProcXResQueryClientIds was missing byte-swaps for the
|
||||
xXResClientIdSpec entries. Without the swaps, the mask field
|
||||
(e.g. XResClientXIDMask=1) is seen as 0x01000000, which
|
||||
doesn't match any known mask → the reply contains zero results.
|
||||
|
||||
With the fix, the reply should contain at least one result.
|
||||
|
||||
Fixed in commit f7b574931544 ("Xext/xres: add missing byte-swap
|
||||
of spec entries in SProcXResQueryClientIds").
|
||||
"""
|
||||
conn, opcode = xres_xclient_swapped
|
||||
|
||||
# X_XResClientXIDMask = 1.
|
||||
# client=0 means "all clients" (the wildcard).
|
||||
req = xres.QueryClientIdsRequest(
|
||||
opcode=opcode,
|
||||
specs=[(0, 1)], # mask=1 (XResClientXIDMask)
|
||||
)
|
||||
conn.send_request(req.to_bytes(">"))
|
||||
resp = conn.recv_response(timeout=5.0)
|
||||
|
||||
assert xserver.is_alive, "Server crashed"
|
||||
assert isinstance(resp, X11Reply), f"Expected reply, got {resp}"
|
||||
|
||||
# xXResQueryClientIdsReply:
|
||||
# [8] numIds(4)
|
||||
num_ids = struct.unpack_from(">I", resp.data, 8)[0]
|
||||
assert num_ids > 0, (
|
||||
"QueryClientIds returned numIds=0, expected >0. "
|
||||
"The spec entry mask was not byte-swapped, so the "
|
||||
"server didn't recognize XResClientXIDMask."
|
||||
)
|
||||
|
||||
@pytest.mark.swapped_client
|
||||
def test_construct_client_id_value_swap_check(
|
||||
self, xserver, xres_xclient_swapped, xclient
|
||||
):
|
||||
"""
|
||||
ConstructClientIdValue used client->swapped instead of
|
||||
sendClient->swapped. When a big-endian client queries
|
||||
a native client's ID, the spec.client field in the reply
|
||||
is not swapped → garbled value.
|
||||
|
||||
We create an additional native (little-endian) connection.
|
||||
The big-endian connection queries the native client's XID.
|
||||
With the fix, the returned spec.client should match the
|
||||
native client's resource base. Without the fix, it's
|
||||
byte-swapped garbage.
|
||||
|
||||
Fixed in commit d2d4fb35e798 ("Xext/xres: fix wrong swap
|
||||
check").
|
||||
"""
|
||||
conn, opcode = xres_xclient_swapped
|
||||
conn_native = xclient
|
||||
|
||||
# Get the list of clients
|
||||
req = xres.QueryClientsRequest(opcode=opcode)
|
||||
conn.send_request(req.to_bytes(">"))
|
||||
resp = conn.recv_response(timeout=5.0)
|
||||
assert isinstance(resp, X11Reply), f"Expected reply, got {resp}"
|
||||
|
||||
num_clients = struct.unpack_from(">I", resp.data, 8)[0]
|
||||
assert num_clients >= 2, f"Expected at least 2 clients, got {num_clients}"
|
||||
|
||||
# Collect all client XIDs from the reply.
|
||||
# Each xXResClient is 8 bytes: resource_base(4) + resource_mask(4)
|
||||
client_xids = []
|
||||
for i in range(num_clients):
|
||||
offset = 32 + i * 8
|
||||
if offset + 4 <= len(resp.data):
|
||||
xid = struct.unpack_from(">I", resp.data, offset)[0]
|
||||
client_xids.append(xid)
|
||||
|
||||
# Find the native client's resource base
|
||||
native_base = conn_native._resource_id_base
|
||||
target_xid = None
|
||||
for xid in client_xids:
|
||||
if (xid & 0xFFE00000) == (native_base & 0xFFE00000):
|
||||
target_xid = xid
|
||||
break
|
||||
|
||||
if target_xid is None:
|
||||
pytest.skip("Could not find native client in XRes client list")
|
||||
|
||||
# Query the native client's XID using XResClientXIDMask
|
||||
req = xres.QueryClientIdsRequest(
|
||||
opcode=opcode,
|
||||
specs=[(target_xid, 1)], # XResClientXIDMask
|
||||
)
|
||||
conn.send_request(req.to_bytes(">"))
|
||||
resp = conn.recv_response(timeout=5.0)
|
||||
|
||||
assert isinstance(resp, X11Reply), f"Expected reply, got {resp}"
|
||||
|
||||
num_ids = struct.unpack_from(">I", resp.data, 8)[0]
|
||||
assert num_ids > 0, f"Expected >0 IDs, got {num_ids}"
|
||||
|
||||
# xXResClientIdValue:
|
||||
# spec.client(4) + spec.mask(4) + length(4) + value(4*length)
|
||||
spec_client = struct.unpack_from(">I", resp.data, 32)[0]
|
||||
|
||||
assert spec_client == target_xid, (
|
||||
f"spec.client = {spec_client:#010x}, "
|
||||
f"expected {target_xid:#010x}. "
|
||||
f"The swap check used client->swapped instead of "
|
||||
f"sendClient->swapped."
|
||||
)
|
||||
Loading…
Add table
Reference in a new issue