xserver/test/pyxtest/proto/sync.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

275 lines
6.1 KiB
Python
Raw Permalink Normal View History

# SPDX-License-Identifier: MIT
#
# SYNC extension protocol request builders.
import struct
from dataclasses import dataclass
# SYNC minor opcodes
SyncInitialize = 0
SyncCreateCounter = 2
SyncSetCounter = 3
SyncChangeCounter = 4
SyncDestroyCounter = 6
SyncAwait = 7
SyncCreateAlarm = 8
SyncQueryAlarm = 10
SyncCreateFence = 14
SyncTriggerFence = 15
SyncResetFence = 16
SyncDestroyFence = 17
SyncAwaitFence = 19
# SYNC alarm value mask bits
SyncCACounter = 1 << 0
SyncCAValueType = 1 << 1
SyncCAValue = 1 << 2
SyncCATestType = 1 << 3
SyncCADelta = 1 << 4
SyncCAEvents = 1 << 5
# Value types
SyncAbsolute = 0
SyncRelative = 1
# Test types
SyncPositiveTransition = 0
SyncNegativeTransition = 1
SyncPositiveComparison = 2
SyncNegativeComparison = 3
@dataclass
class InitializeRequest:
"""SyncInitialize request."""
opcode: int
major: int = 3
minor: int = 1
def to_bytes(self, byte_order: str = "<") -> bytes:
return struct.pack(
f"{byte_order}BBHBB xx",
self.opcode,
SyncInitialize,
2, # 8 bytes = 2 words
self.major,
self.minor,
)
@dataclass
class CreateAlarmRequest:
"""SyncCreateAlarm request.
value_mask selects which attributes are present in the value list.
"""
opcode: int
alarm_id: int
value_mask: int = 0
values: bytes = b""
def to_bytes(self, byte_order: str = "<") -> bytes:
total = 12 + len(self.values)
pad_len = (4 - total % 4) % 4
length = (total + pad_len) // 4
header = struct.pack(
f"{byte_order}BBH I I",
self.opcode,
SyncCreateAlarm,
length,
self.alarm_id,
self.value_mask,
)
return header + self.values + b"\x00" * pad_len
@dataclass
class QueryAlarmRequest:
"""SyncQueryAlarm request."""
opcode: int
alarm_id: int
def to_bytes(self, byte_order: str = "<") -> bytes:
return struct.pack(
f"{byte_order}BBH I",
self.opcode,
SyncQueryAlarm,
2,
self.alarm_id,
)
@dataclass
class CreateCounterRequest:
"""SyncCreateCounter request (16 bytes)."""
opcode: int
counter_id: int
initial_value_hi: int = 0
initial_value_lo: int = 0
def to_bytes(self, byte_order: str = "<") -> bytes:
return struct.pack(
f"{byte_order}BBH I iI",
self.opcode,
SyncCreateCounter,
4, # length = 4 words
self.counter_id,
self.initial_value_hi,
self.initial_value_lo,
)
@dataclass
class SetCounterRequest:
"""SyncSetCounter request (16 bytes)."""
opcode: int
counter_id: int
value_hi: int = 0
value_lo: int = 0
def to_bytes(self, byte_order: str = "<") -> bytes:
return struct.pack(
f"{byte_order}BBH I iI",
self.opcode,
SyncSetCounter,
4, # length = 4 words
self.counter_id,
self.value_hi,
self.value_lo,
)
@dataclass
class DestroyCounterRequest:
"""SyncDestroyCounter request (8 bytes)."""
opcode: int
counter_id: int
def to_bytes(self, byte_order: str = "<") -> bytes:
return struct.pack(
f"{byte_order}BBH I",
self.opcode,
SyncDestroyCounter,
2, # length = 2 words
self.counter_id,
)
@dataclass
class AwaitRequest:
"""SyncAwait request with variable number of wait conditions.
Each wait condition is 28 bytes:
counter(4) + value_type(4) + wait_value_hi(4) + wait_value_lo(4) +
test_type(4) + event_threshold_hi(4) + event_threshold_lo(4)
"""
opcode: int
conditions: list[tuple[int, int, int, int, int, int, int]]
"""List of (counter, value_type, wait_hi, wait_lo, test_type, thresh_hi, thresh_lo)"""
def to_bytes(self, byte_order: str = "<") -> bytes:
n = len(self.conditions)
total_bytes = 4 + n * 28
length = total_bytes // 4
header = struct.pack(
f"{byte_order}BBH",
self.opcode,
SyncAwait,
length,
)
payload = b""
for (
counter,
vtype,
wait_hi,
wait_lo,
ttype,
thresh_hi,
thresh_lo,
) in self.conditions:
payload += struct.pack(
f"{byte_order}I I iI I iI",
counter,
vtype,
wait_hi,
wait_lo,
ttype,
thresh_hi,
thresh_lo,
)
return header + payload
@dataclass
class CreateFenceRequest:
"""SyncCreateFence request (16 bytes)."""
opcode: int
drawable: int
fence_id: int
initially_triggered: int = 0
def to_bytes(self, byte_order: str = "<") -> bytes:
return struct.pack(
f"{byte_order}BBH I I B xxx",
self.opcode,
SyncCreateFence,
4, # length = 4 words
self.drawable,
self.fence_id,
self.initially_triggered,
)
@dataclass
class DestroyFenceRequest:
"""SyncDestroyFence request (8 bytes)."""
opcode: int
fence_id: int
def to_bytes(self, byte_order: str = "<") -> bytes:
return struct.pack(
f"{byte_order}BBH I",
self.opcode,
SyncDestroyFence,
2, # length = 2 words
self.fence_id,
)
@dataclass
class AwaitFenceRequest:
"""SyncAwaitFence request with variable number of fence IDs."""
opcode: int
fence_ids: list[int]
def to_bytes(self, byte_order: str = "<") -> bytes:
n = len(self.fence_ids)
length = 1 + n # 1 word header + n words of fence IDs
header = struct.pack(
f"{byte_order}BBH",
self.opcode,
SyncAwaitFence,
length,
)
payload = b""
for fid in self.fence_ids:
payload += struct.pack(f"{byte_order}I", fid)
return header + payload