libei/test/test_protocol.py

827 lines
26 KiB
Python
Raw Normal View History

#!/usr/bin/python3
#
# SPDX-License-Identifier: MIT
#
#
# EIS protocol test suite. This suite tests an EIS implementation, by default the
# eis-demo-server to see how whether it handles protocol messages correctly.
#
# To test another implementation:
# - set LIBEI_TEST_SOCKET to the path your EIS implementation is listening on
# - set LIBEI_TEST_SERVER to the executable of your EIS implementation,
# or the empty string to connect to a running process
#
# To run $LIBEI_TEST_SERVER in valgrind, set LIBEI_USE_VALGRIND to a boolean true.
#
# e.g.
# $ export LIBEI_TEST_SOCKET=/run/user/1000/eis-0
# $ export LIBEI_TEST_SERVER=""
# $ pytest3 -v --log-level=DEBUG -k 'some string'
#
# Will run that test against whatever is providing that socket.
from typing import Generator, Optional
from pathlib import Path
import attr
import itertools
import os
import pytest
import subprocess
import time
import shlex
import signal
import socket
import structlog
try:
from eiproto import (
hexlify,
Context,
Interface,
MessageHeader,
EiCallback,
EiConnection,
EiConnectionSetup,
EiDevice,
EiSeat,
)
except ImportError:
# This file needs to be processed by meson, so let's skip when this fails in the source dir
pytest.skip(allow_module_level=True)
logger = structlog.get_logger()
VALGRIND_EXITCODE = 3
def VERSION_V(v):
"""Noop function that helps with grepping for hardcoded version numbers"""
return v
@pytest.fixture
def socketpath(tmp_path) -> Path:
test_socket_override = os.environ.get("LIBEI_TEST_SOCKET")
if test_socket_override:
return Path(test_socket_override)
return Path(tmp_path) / "eis-0"
@pytest.fixture
def valgrind() -> list[str]:
"""
Return the list of arguments to run our eis_executable in valgrind
"""
if bool(os.environ.get("LIBEI_USE_VALGRIND", False)):
valgrind = [
"valgrind",
"--leak-check=full",
f"--error-exitcode={VALGRIND_EXITCODE}",
]
else:
valgrind = []
return valgrind
@pytest.fixture
def eis_executable(valgrind, socketpath) -> Optional[list[str]]:
"""
Returns a list of arguments of the EIS executable to run, to be passed
into Popen.
Returns None if we're expected to connect to an already running instance.
"""
program = os.environ.get("LIBEI_TEST_SERVER", None)
# if the variable is empty, use an existing running server
if program == "":
return None
# If it's not set at all, we use our eis-demo-server
if program is None:
program = f"@LIBEI_TEST_SERVER@ --socketpath={socketpath} --verbose" # set by meson to eis-demo-server
return valgrind + shlex.split(program)
@pytest.fixture
def eis(socketpath, eis_executable) -> Generator["Eis", None, None]:
if not eis_executable:
yield Eis.create_existing_implementation(socketpath)
else:
eis = Eis.create(socketpath, eis_executable)
yield eis
eis.terminate()
@attr.s
class Ei:
sock: socket.socket = attr.ib()
context: Context = attr.ib()
connection: Optional[EiConnection] = attr.ib(default=None)
seats: list[EiSeat] = attr.ib(init=False, default=attr.Factory(list))
object_ids: Generator[int, None, None] = attr.ib(
init=False, default=attr.Factory(lambda: itertools.count(3))
)
_data: bytes = attr.ib(init=False, default=attr.Factory(bytes)) # type: ignore
@property
def data(self) -> bytes:
return self._data
def send(self, msg: bytes) -> None:
logger.debug(f"sending {len(msg)} bytes", bytes=hexlify(msg))
self.sock.sendmsg([msg])
def find_objects_by_interface(self, interface: str) -> list[Interface]:
return [o for o in self.context.objects.values() if o.name == interface]
def callback_roundtrip(self) -> bool:
assert self.connection is not None
cb = EiCallback.create(next(self.object_ids), VERSION_V(1))
self.context.register(cb)
self.send(self.connection.Sync(cb.object_id))
return self.wait_for(
lambda: cb not in self.find_objects_by_interface("ei_callback")
)
@property
def connection_setup(self) -> EiConnectionSetup:
setup = self.context.objects[0]
assert isinstance(setup, EiConnectionSetup)
return setup
def init_default_sender_connection(self) -> None:
setup = self.connection_setup
self.send(setup.ContextType(EiConnectionSetup.EiContextType.SENDER))
self.send(setup.Name("test client"))
self.send(
setup.InterfaceVersion("ei_connection", VERSION_V(1))
) # this one is required
self.send(setup.InterfaceVersion("ei_callback", VERSION_V(1)))
self.send(setup.InterfaceVersion("ei_pingpong", VERSION_V(1)))
self.send(setup.InterfaceVersion("ei_seat", VERSION_V(1)))
self.send(setup.InterfaceVersion("ei_device", VERSION_V(1)))
self.send(setup.InterfaceVersion("ei_pointer", VERSION_V(1)))
self.send(setup.InterfaceVersion("ei_keyboard", VERSION_V(1)))
self.send(setup.InterfaceVersion("ei_touchscreen", VERSION_V(1)))
self.send(setup.Finish())
self.dispatch()
def wait_for_seat(self, timeout=5) -> bool:
def seat_is_done():
return self.seats and [
call for call in self.seats[0].calllog if call.name == "Done"
]
return self.wait_for(seat_is_done, timeout)
def wait_for_connection(self, timeout=5) -> bool:
return self.wait_for(lambda: self.connection is not None, timeout)
def wait_for(self, callable, timeout=5) -> bool:
expire = time.time() + timeout
while not callable():
self.dispatch()
if time.time() > expire:
return False
time.sleep(0.01)
return True
def recv(self) -> bytes:
try:
data = self.sock.recv(1024)
while data:
self._data += data
data = self.sock.recv(1024)
except BlockingIOError:
pass
return self.data
def dispatch(self, timeout=0.01) -> None:
if not self.data:
expire = time.time() + timeout
while not self.recv():
now = time.time()
if now >= expire:
break
time.sleep(min(0.01, expire - now))
if now >= expire:
break
while self.data:
logger.debug("data pending dispatch: ", bytes=hexlify(self.data[:64]))
header = MessageHeader.from_data(self.data)
logger.debug(f"dispatching message: ", header=header)
consumed = self.context.dispatch(self.data)
if consumed == 0:
break
self.pop(consumed)
def pop(self, count: int) -> None:
self._data = self._data[count:]
@classmethod
def create(cls, socketpath: Path):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
while not socketpath.exists():
time.sleep(0.01)
for _ in range(3):
try:
sock.connect_ex(os.fspath(socketpath))
break
except ConnectionRefusedError:
time.sleep(0.1)
else:
assert False, "Failed to connect to EIS"
ctx = Context.create()
ei = cls(sock=sock, context=ctx)
# callback for new objects
def register_cb(interface: Interface) -> None:
if isinstance(interface, EiConnection):
assert ei.connection is None
ei.connection = interface
# Automatic ping/pong handler
def ping(conn, id, version, new_objects={}):
pingpong = new_objects["ping"]
ei.send(pingpong.Done(0))
ei.connection.connect("Ping", ping)
elif isinstance(interface, EiSeat):
assert interface not in ei.seats
ei.seats.append(interface)
def unregister_cb(interface: Interface) -> None:
if interface == ei.connection:
assert ei.connection is not None
ei.connection = None
elif interface in ei.seats:
ei.seats.remove(interface)
ctx.connect("register", register_cb)
ctx.connect("unregister", unregister_cb)
return ei
@attr.s
class Eis:
process: Optional[subprocess.Popen] = attr.ib()
ei: Ei = attr.ib()
_stdout: Optional[str] = attr.ib(init=False, default=None)
_stderr: Optional[str] = attr.ib(init=False, default=None)
def terminate(self) -> None:
if self.process is None:
return
def kill_gently(process) -> Generator[None, None, None]:
process.send_signal(signal.SIGINT)
yield
process.terminate()
yield
process.kill()
stdout, stderr = None, None
for _ in kill_gently(self.process):
try:
stdout, stderr = self.process.communicate(timeout=1)
break
except subprocess.TimeoutExpired:
pass
if stdout:
for line in stdout.split("\n"):
logger.info(line)
if stderr:
for line in stderr.split("\n"):
logger.info(line)
self.process.wait()
rc = self.process.returncode
if rc not in [0, -signal.SIGTERM]:
if rc == VALGRIND_EXITCODE:
assert (
rc != VALGRIND_EXITCODE
), "valgrind reported errors, see valgrind error messages"
else:
assert (
rc == -signal.SIGTERM
), f"Process exited with {signal.Signals(-rc).name}"
self.process = None # allow this to be called multiple times
@classmethod
def create_existing_implementation(cls, socketpath) -> "Eis":
ei = Ei.create(socketpath)
return cls(process=None, ei=ei)
@classmethod
def create(cls, socketpath, executable) -> "Eis":
process = subprocess.Popen(
executable,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf-8",
text=True,
bufsize=1,
universal_newlines=True,
)
ei = Ei.create(socketpath)
return cls(process=process, ei=ei)
class TestEiProtocol:
@property
def using_demo_server(self) -> bool:
return "@LIBEI_TEST_SERVER@".endswith("eis-demo-server")
def test_server_sends_version_event_immediately(self, eis):
"""
The server is expected to send ei_connection_setup.interface_version immediately
on connect
"""
ei = eis.ei
ei.dispatch()
setup = ei.context.objects[0]
assert isinstance(setup, EiConnectionSetup)
ei.wait_for(lambda: bool(setup.calllog))
call = setup.calllog[0]
assert call.name == "InterfaceVersion"
assert call.args["name"] == "ei_connection_setup"
assert call.args["version"] == VERSION_V(1)
eis.terminate()
def test_server_sends_interface_version_events(self, eis):
"""
The server is expected to send ei_connection_setup.interface_version immediately
on connect
"""
ei = eis.ei
ei.dispatch()
setup = ei.connection_setup
ei.init_default_sender_connection()
ei.dispatch()
ei.wait_for_connection()
announced_interfaces = []
for call in setup.calllog:
if call.name == "InterfaceVersion":
announced_interfaces.append((call.args["name"], call.args["version"]))
assert ("ei_callback", 1) in announced_interfaces
eis.terminate()
def test_send_wrong_context_type(self, eis):
"""
Connect with an invalid context type, expect to be disconnected
"""
ei = eis.ei
ei.dispatch()
# Pick some random type (and make sure it's not a valid type in the current API)
invalid_type = 4
try:
EiConnectionSetup.EiContextType(invalid_type)
assert (
False
), f"{invalid_type} should not be a valid ContextType, this test needs an update"
except ValueError:
pass
ei.send(ei.connection_setup.ContextType(invalid_type))
try:
# The server either disconnects the socket because we sent garbage
# or immediately disconnects us after the .done request
ei.dispatch()
for interface in ["ei_connection", "ei_callback", "ei_pingpong"]:
ei.send(
ei.connection_setup.InterfaceVersion(interface, VERSION_V(1))
) # these are required
ei.send(ei.connection_setup.Finish())
ei.dispatch()
ei.wait_for_connection(timeout=1)
# ok, still not socket-disconnected, let's make sure
# we did immediately get a Disconnected message
if ei.connection:
assert ei.connection is not None
call = ei.connection.calllog[0]
assert call.name == "Disconnected"
assert call.args["reason"] == EiConnection.EiDisconnectReason.ERROR
assert call.args["explanation"] is not None
# Now let's trigger a BrokenPipeError
ei.send(bytes(16))
assert False, "The server should have disconnected us"
except (ConnectionResetError, BrokenPipeError):
pass
eis.terminate()
def test_connect_and_disconnect(self, eis):
"""
Connect to the server with a valid sequence, then disconnect
once we get the connection object
"""
ei = eis.ei
# drain any messages
ei.dispatch()
# Establish our connection
ei.init_default_sender_connection()
ei.wait_for_connection()
# This should've set our connection object
assert ei.connection is not None
connection = ei.connection
ei.send(connection.Disconnect())
try:
# Send disconnect twice, just to test that case, should be ignored by the
# server
ei.send(connection.Disconnect())
ei.dispatch()
time.sleep(0.1)
ei.dispatch()
except (ConnectionResetError, BrokenPipeError):
pass
ei.wait_for(
lambda: bool([c for c in connection.calllog if c.name == "Disconnected"])
)
for call in connection.calllog:
if call.name == "Disconnected":
assert call.args["explanation"] is None
assert (
call.args["reason"] == EiConnection.EiDisconnectReason.DISCONNECTED
)
break
else:
assert (
False
), f"Expected Disconnected event, got none in {connection.calllog}"
try:
ei.send(connection.Disconnect())
assert False, "Expected socket to be closed"
except BrokenPipeError:
pass
eis.terminate()
def test_connect_receive_seat(self, eis):
"""
Ensure we get a seat object after setting our connection
"""
ei = eis.ei
ei.dispatch()
setup = ei.connection_setup
# Establish our connection
ei.send(setup.ContextType(EiConnectionSetup.EiContextType.SENDER))
ei.send(setup.Name("test client"))
for interface in ["ei_connection", "ei_callback", "ei_pingpong"]:
ei.send(
setup.InterfaceVersion(interface, VERSION_V(1))
) # these are required
ei.send(setup.InterfaceVersion("ei_seat", VERSION_V(100))) # excessive version
ei.send(
setup.InterfaceVersion("ei_device", VERSION_V(100))
) # excessive version
ei.send(setup.Finish())
ei.dispatch()
ei.wait_for_seat()
assert ei.seats
for seat in ei.seats:
assert seat.version == 1 # we have 100, but the server only has 1
for call in seat.calllog:
if call.name == "Capabilities":
assert call.args["capabilities"] != 0
if self.using_demo_server:
caps = call.args["capabilities"]
assert (
caps
== EiDevice.EiCapabilities.POINTER
| EiDevice.EiCapabilities.POINTER_ABSOLUTE
| EiDevice.EiCapabilities.KEYBOARD
| EiDevice.EiCapabilities.TOUCHSCREEN
)
break
else:
assert (
False
), f"Expected ei_seat.capabilities, but got none in {seat.calllog}"
for call in seat.calllog:
if call.name == "Name":
assert call.args["name"] is not None
if self.using_demo_server:
assert call.args["name"] == "default"
break
else:
assert False, f"Expected ei_seat.name, but got none in {seat.calllog}"
for call in seat.calllog:
if call.name == "Done":
break
else:
assert False, f"Expected ei_seat.done, but got none in {seat.calllog}"
def test_connect_no_seat_without_ei_seat(self, eis):
"""
Ensure we do not get a seat object if we don't announce support for ei_seat
"""
ei = eis.ei
ei.dispatch()
setup = ei.connection_setup
# Establish our connection
ei.send(setup.ContextType(EiConnectionSetup.EiContextType.SENDER))
ei.send(setup.Name("test client"))
for interface in ["ei_connection", "ei_callback", "ei_pingpong"]:
ei.send(
setup.InterfaceVersion(interface, VERSION_V(1))
) # these are required
# Do not announce ei_seat support
ei.send(setup.Finish())
ei.dispatch()
assert not ei.seats
eis.terminate()
def test_seat_bind_no_caps(self, eis):
"""
Ensure nothing happens if we bind to a seat with capabilities outside what is supported
"""
ei = eis.ei
ei.dispatch()
ei.init_default_sender_connection()
ei.wait_for_seat()
seat = ei.seats[0]
ei.send(seat.Bind(0x00)) # binding to no caps is fine
ei.dispatch()
time.sleep(0.1)
ei.dispatch()
eis.terminate()
def test_seat_bind_invalid_caps_expect_disconnection(self, eis):
ei = eis.ei
ei.dispatch()
ei.init_default_sender_connection()
ei.wait_for_seat()
connection = ei.connection
assert connection is not None
seat = ei.seats[0]
ei.send(seat.Bind(0x40)) # binding to invalid caps should get us disconnected
try:
ei.dispatch()
time.sleep(0.1)
ei.dispatch()
for call in seat.calllog:
if call.name == "Destroyed":
break
else:
assert False, "Expected seat to get destroyed but didn't"
for call in connection.calllog:
if call.name == "Disconnected":
assert call.args["reason"] == EiConnection.EiDisconnectReason.VALUE
assert "Invalid capabilities" in call.args["explanation"]
break
else:
assert False, "Expected disconnection event"
except ConnectionResetError:
pass
eis.terminate()
@pytest.mark.parametrize("bind_first", (True, False))
def test_seat_release_expect_destroyed(self, eis, bind_first):
ei = eis.ei
ei.dispatch()
ei.init_default_sender_connection()
ei.wait_for_seat()
seat = ei.seats[0]
have_seat_destroyed = False
def destroyed_cb(_, serial):
nonlocal have_seat_destroyed
have_seat_destroyed = True
seat.connect("Destroyed", destroyed_cb)
if bind_first:
ei.send(seat.Bind(EiDevice.EiCapabilities.POINTER))
ei.send(seat.Release())
ei.dispatch()
ei.wait_for(lambda: have_seat_destroyed)
def test_connection_sync(self, eis):
"""
Test the ei_connection.sync() callback mechanism
"""
ei = eis.ei
ei.dispatch()
ei.init_default_sender_connection()
ei.wait_for_seat()
cb = EiCallback.create(next(ei.object_ids), VERSION_V(1))
ei.context.register(cb)
assert ei.connection is not None
ei.send(ei.connection.Sync(cb.object_id))
ei.dispatch()
assert cb.calllog[0].name == "Done"
assert cb.calllog[0].args["callback_data"] == 0 # hardcoded in libeis for now
def test_invalid_object(self, eis):
"""
Send a message for an invalid object and ensure we get the event back
"""
ei = eis.ei
ei.dispatch()
ei.init_default_sender_connection()
ei.wait_for_seat()
seat: EiSeat = ei.seats[0]
have_invalid_object_event = False
have_sync = False
def invalid_object_cb(_, last_serial, id):
nonlocal have_invalid_object_event
assert id == seat.object_id
have_invalid_object_event = True
ei.connection.connect("InvalidObject", invalid_object_cb)
release = seat.Release()
ei.send(release)
ei.dispatch()
cb = EiCallback.create(next(ei.object_ids), VERSION_V(1))
ei.context.register(cb)
def sync_cb(_, unused):
nonlocal have_sync
have_sync = True
cb.connect("Done", sync_cb)
# Send the invalid object request
ei.send(release)
ei.send(ei.connection.Sync(cb.object_id))
ei.wait_for(lambda: have_sync)
assert have_invalid_object_event, "Expected invalid_object event, got none"
def test_disconnect_before_setup_finish(self, eis):
ei = eis.ei
ei.dispatch()
ei.send(ei.connection_setup.ContextType(EiConnectionSetup.EiContextType.SENDER))
ei.sock.close()
time.sleep(0.5)
# Not much we can test here other than hoping the EIS implementation doesn't segfault
@pytest.mark.parametrize(
"missing_interface",
(
"ei_callback",
"ei_connection",
"ei_pingpong",
"ei_seat",
"ei_device",
"ei_pointer",
),
)
def test_connect_without_ei_interfaces(self, eis, missing_interface):
ei = eis.ei
ei.dispatch()
setup = ei.connection_setup
ei.send(setup.ContextType(EiConnectionSetup.EiContextType.SENDER))
ei.send(setup.Name("test client"))
for interface in [
"ei_connection",
"ei_callback",
"ei_pingpong",
"ei_seat",
"ei_device",
"ei_pointer",
"ei_keyboard",
"ei_touchscreen",
]:
if interface != missing_interface:
ei.send(setup.InterfaceVersion(interface, 1))
@attr.s
class Status:
connected: bool = attr.ib(default=False)
disconnected: bool = attr.ib(default=False)
seats: bool = attr.ib(default=False)
devices: bool = attr.ib(default=False)
status = Status()
try:
def on_device(seat, id, version, new_objects={}):
assert missing_interface not in ["ei_device"]
status.devices = True
def on_seat(connection, id, version, new_objects={}):
assert missing_interface not in ["ei_seat"]
seat = new_objects["seat"]
assert seat is not None
seat.connect("Device", on_device)
ei.send(seat.Bind(EiDevice.EiCapabilities.POINTER))
status.seats = True
def on_disconnected(connection, last_serial, reason, explanation):
assert missing_interface in ["ei_seat", "ei_device"]
status.disconnected = True
def on_connection(setup, serial, id, version, new_objects={}):
# these three must be present, otherwise we get disconnected
assert missing_interface not in [
"ei_connection",
"ei_callback",
"ei_pingpong",
]
status.connected = True
connection = new_objects["connection"]
assert connection is not None
connection.connect("Seat", on_seat)
connection.connect("Disconnected", on_disconnected)
setup.connect("Connection", on_connection)
ei.send(setup.Finish())
ei.dispatch()
if missing_interface in ["ei_connection", "ei_callback", "ei_pingpong"]:
# valgrind is slow, so let's wait for it to catch up
time.sleep(0.3)
ei.dispatch()
assert not status.connected
assert not status.disconnected # we never get the Disconnected event
ei.send(bytes(16))
assert False, "We should've been disconnected by now"
ei.wait_for(lambda: status.connected)
if missing_interface in ["ei_device", "ei_seat"]:
assert ei.wait_for(lambda: status.disconnected)
assert (
status.disconnected
), f"Expected to be disconnected for missing {missing_interface}"
else:
assert (
missing_interface == "ei_pointer"
) # otherwise we shouldn't get here
assert ei.callback_roundtrip(), "Callback roundtrip failed"
assert status.connected
assert not status.disconnected
assert ei.wait_for(lambda: status.seats)
assert ei.wait_for(lambda: status.devices)
assert status.devices
except BrokenPipeError:
assert missing_interface in ["ei_connection", "ei_callback", "ei_pingpong"]