diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index f088ba2..60ee0f1 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -32,14 +32,14 @@ variables: # See the documentation here: # # https://wayland.freedesktop.org/libinput/doc/latest/building_libinput.html # ############################################################################### - FEDORA_PACKAGES: 'git diffutils gcc gcc-c++ pkgconf-pkg-config meson systemd-devel libxkbcommon-devel libxml2 doxygen python3-attrs python3-pytest python3-dbusmock python3-jinja2 ' + FEDORA_PACKAGES: 'git diffutils gcc gcc-c++ pkgconf-pkg-config meson systemd-devel libxkbcommon-devel libxml2 doxygen python3-attrs python3-pytest python3-dbusmock python3-jinja2 python3-pip ' ############################ end of package lists ############################# # these tags should be updated each time the list of packages is updated # changing these will force rebuilding the associated image # Note: these tags have no meaning and are not tied to a particular # libinput version - FEDORA_TAG: '2023-02-23.0' + FEDORA_TAG: '2023-02-23.1' FDO_UPSTREAM_REPO: libinput/libei @@ -166,6 +166,7 @@ fedora:37@container-prep: FDO_DISTRIBUTION_VERSION: '37' FDO_DISTRIBUTION_PACKAGES: $FEDORA_PACKAGES FDO_DISTRIBUTION_TAG: $FEDORA_TAG + FDO_DISTRIBUTION_EXEC: 'pip install structlog' diff --git a/.gitlab-ci/ci.template b/.gitlab-ci/ci.template index 618f95b..ac2bd68 100644 --- a/.gitlab-ci/ci.template +++ b/.gitlab-ci/ci.template @@ -178,9 +178,7 @@ python-ruff: FDO_DISTRIBUTION_VERSION: '{{version}}' FDO_DISTRIBUTION_PACKAGES: ${{distro.name.upper()}}_PACKAGES FDO_DISTRIBUTION_TAG: ${{distro.name.upper()}}_TAG - {% if version == 'ubuntu'%} - FDO_DISTRIBUTION_EXEC: $UBUNTU_EXEC - {% endif %} + FDO_DISTRIBUTION_EXEC: 'pip install structlog' {% endfor %} {% endfor %} diff --git a/.gitlab-ci/config.yml b/.gitlab-ci/config.yml index 573fafa..28dc14d 100644 --- a/.gitlab-ci/config.yml +++ b/.gitlab-ci/config.yml @@ -3,7 +3,7 @@ # # We're happy to rebuild all containers when one changes. -.default_tag: &default_tag '2023-02-23.0' +.default_tag: &default_tag '2023-02-23.1' distributions: - name: fedora @@ -26,6 +26,7 @@ distributions: - python3-pytest - python3-dbusmock - python3-jinja2 + - python3-pip pages: distro: fedora diff --git a/meson.build b/meson.build index f348f2f..b09c4da 100644 --- a/meson.build +++ b/meson.build @@ -195,7 +195,7 @@ if dep_libevdev.found() endif -executable('eis-demo-server', +eis_demo_server = executable('eis-demo-server', src_eis_demo_server, dependencies: [ dep_libutil, diff --git a/test/eiproto.py.tmpl b/test/eiproto.py.tmpl new file mode 100644 index 0000000..84468a7 --- /dev/null +++ b/test/eiproto.py.tmpl @@ -0,0 +1,309 @@ +#!/usr/bin/env python3 +# +# GENERATED FILE, DO NOT EDIT +# +# SPDX-License-Identifier: MIT +# + +{#- this is a jinja template, warning above is for the generated file + + Non-obvious variables set by the scanner that are used in this template: + - target: because eis is actually eis_client in the code, the target points to + either "ei" or "eis_client". The various attributes on target resolve + accordingly. + - request.fqdn/event.fqdn - the full name of a request/event with the + interface name prefixed, "ei_foo_request_bar" or "ei_foo_event_bar" + - incoming/outgoing: points to the list of requests or events, depending + which one is the outgoing one from the perspective of the file we're + generating (ei or eis) +#} + +from typing import Any, Callable, Generator, Tuple +from enum import IntEnum + +import attr +import binascii +import itertools +import logging +import struct +import structlog +import time + +# type aliases +s = str +i = int +u = int +o = int +n = int +f = float +h = int # FIXME this should be a file-like object + +logger = structlog.get_logger() + + +def hexlify(data): + return binascii.hexlify(data, sep=" ", bytes_per_sep=1) + + +@attr.s +class MethodCall: + name: str = attr.ib() + args: dict[str, Any] = attr.ib() + objects: dict[str, "Interface"] = attr.ib(default=attr.Factory(dict)) + timestamp: float = attr.ib(default=attr.Factory(time.time)) + + +@attr.s +class MessageHeader: + object_id: int = attr.ib(repr=lambda id: f"{id:#x}") + msglen: int = attr.ib() + opcode: int = attr.ib() + + @classmethod + def size(cls) -> int: + return 8 + + @classmethod + def from_data(cls, data: bytes) -> "MessageHeader": + object_id, opcode = struct.unpack("II", data[:8]) + msglen = opcode >> 16 + opcode = opcode & 0xFFFF + return cls(object_id, msglen, opcode) + + @property + def as_tuple(self) -> Tuple[int, int]: + return self.object_id, self.msglen << 16 | self.opcode + + +@attr.s +class Context: + objects: dict[str, "Interface"] = attr.ib(default=attr.Factory(dict)) + _callbacks: dict[str, dict[int, Callable]] = attr.ib(init=False) + _ids: Generator = attr.ib(init=False, default=attr.Factory(itertools.count)) + + @_callbacks.default # type: ignore + def _default_callbacks(self) -> dict[str, dict[int, Callable]]: + return { "register": {}, "unregister": {}} + + def register(self, object: "Interface") -> None: + assert object.object_id not in self.objects + logger.debug(f"registering object", interface=object.name, object_id=f"{object.object_id:#x}") + self.objects[object.object_id] = object + for cb in self._callbacks["register"].values(): + cb(object) + + def unregister(self, object: "Interface") -> None: + assert object.object_id in self.objects + logger.debug(f"unregistering object", interface=object.name, object=object, object_id=f"{object.object_id:#x}") + del self.objects[object.object_id] + for cb in self._callbacks["unregister"].values(): + cb(object) + + def connect(self, signal: str, callback: Callable) -> int: + cbs = self._callbacks[signal] + id = next(self._ids) + cbs[id] = callback + return id + + def disconnect(self, signal: str, id: int) -> None: + del self._callbacks[signal][id] + + def dispatch(self, data: bytes) -> None: + if len(data) < MessageHeader.size(): + return + + header = MessageHeader.from_data(data) + object_id, opcode, msglen = header.object_id, header.opcode, header.msglen + logger.debug(f"incoming packet ({msglen} bytes)", object_id=f"{object_id:x}", opcode=opcode, bytes=hexlify(data[:msglen])) + + try: + dispatcher = self.objects[object_id] + except KeyError: + logger.error("Message from unknown object", object_id=f"{object_id:x}") + return msglen + + try: + logger.debug(f"incoming packet: dispatching", func=f"{dispatcher.name}.{dispatcher.incoming[opcode]}()", object=dispatcher) + except KeyError: + logger.error("Invalid opcode for object", object_id=f"{object_id:x}", opcode=opcode) + return msglen + consumed = dispatcher.dispatch(data, context=self) + return consumed + + @classmethod + def create(cls) -> "Context": + o = cls() + o.register(EiConnectionSetup.create(object_id=0, version=1)) + return o + + +@attr.s +class Interface: + object_id: int = attr.ib(repr=lambda id: f"{id:#x}") + version: int = attr.ib() + callbacks: dict[str, Callable] = attr.ib(init=False, default=attr.Factory(dict), repr=False) + calllog: list[MethodCall] = attr.ib(init=False, default=attr.Factory(list), repr=False) + name: str = attr.ib(default="") + incoming: dict[int, str] = attr.ib(default=attr.Factory(list), repr=False) + outgoing: dict[int, str] = attr.ib(default=attr.Factory(list), repr=False) + + def format(self, *args, opcode: int, signature: str) -> bytes: + encoding = ["II"] + arguments = [] + for sig, arg in zip(signature, args): + if sig in ["u", "n", "o"]: + encoding.append("I") + elif sig in ["i"]: + encoding.append("i") + elif sig in ["f"]: + encoding.append("f") + elif sig in ["s"]: + encoding.append("I") + arguments.append(len(arg) + 1) + slen = ((len(arg) + 3) // 4) * 4 + encoding.append(f"{slen}s") + arg = arg.encode("utf8") + elif sig in ["h"]: + raise NotImplementedError("fd passing is not yet supported here") + + arguments.append(arg) + + format = "".join(encoding) + length = struct.calcsize(format) + header = MessageHeader(self.object_id, length, opcode) + # logger.debug(f"Packing {encoding}: {arguments}") + return struct.pack(format, *header.as_tuple, *arguments) + + def unpack(self, data, signature: str, names: list[str]) -> Tuple[int, dict[str, Any]]: + encoding = ["II"] # the header + for sig in signature: + if sig in ["u", "n", "o"]: + encoding.append("I") + elif sig in ["i"]: + encoding.append("i") + elif sig in ["f"]: + encoding.append("f") + elif sig in ["s"]: + length_so_far = struct.calcsize("".join(encoding)) + slen, = struct.unpack("I", data[length_so_far:length_so_far + 4]) + slen = ((slen + 3) // 4) * 4 + encoding.append(f"I{slen}s") + elif sig in ["h"]: + raise NotImplementedError("fd passing is not yet supported here") + + format = "".join(encoding) + msglen = struct.calcsize(format) + values = list(struct.unpack(format, data[:msglen])) + # logger.debug(f"unpacked {format} to {values}") + + results = [] + results.append(values.pop(0)) # id + results.append(values.pop(0)) # length | opcode + + # we had to insert the string length into the format, filter the + # value for that out again. + for sig in signature: + if sig in ["s"]: + values.pop(0) + s = values.pop(0) + if not s: + s = None # zero-length string is None + else: + s = s.decode("utf8")[:-1] # strip trailing zero + results.append(s) + else: + results.append(values.pop(0)) + + # First two values are object_id and len|opcode + return (msglen, { name: value for name, value in zip(names, results[2:]) }) + + def connect(self, event: str, callback: Callable): + self.callbacks[event] = callback + + +{% for interface in interfaces %} +@attr.s +class {{interface.camel_name}}(Interface): + {% for enum in interface.enums %} + class {{component.capitalize()}}{{enum.camel_name}}(IntEnum): + {% for entry in enum.entries %} + {{entry.name.upper()}} = {{entry.value}} + {% endfor %} + {% endfor %} + + + {% for outgoing in interface.outgoing %} + def {{outgoing.camel_name}}(self{%- for arg in outgoing.arguments %}, {{arg.name}}: {{arg.signature}}{% endfor -%}) -> bytes: + return self.format({%- for arg in outgoing.arguments %}{{arg.name}}, {% endfor -%}opcode={{outgoing.opcode}}, signature="{{outgoing.signature}}") + + {% endfor %} + {% for incoming in interface.incoming %} + def on{{incoming.camel_name}}(self, context: Context{%- for arg in incoming.arguments %}, {{arg.name}}: {{arg.signature}}{% endfor -%}): + new_objects = { + {% for arg in incoming.arguments %} + {% if arg.signature == "n" %} + {# Note: this only works while the version argument is always called version #} + "{{arg.name}}": {{arg.interface.camel_name}}.create({{arg.name}}, version), + {% endif %} + {% endfor %} + } + + for o in new_objects.values(): + context.register(o) + + cb = self.callbacks.get("{{incoming.camel_name}}", None) + if cb is not None: + if new_objects: + cb(self{%- for arg in incoming.arguments %}, {{arg.name}}{% endfor -%}, new_objects=new_objects) + else: + cb(self{%- for arg in incoming.arguments %}, {{arg.name}}{% endfor -%}) + + m = MethodCall(name="{{incoming.camel_name}}", args={ + {% for arg in incoming.arguments %} + "{{arg.name}}": {{arg.name}}, + {% endfor %} + }, objects=new_objects) + self.calllog.append(m) + + {% if incoming.is_destructor %} + context.unregister(self) + {% endif %} + + {% endfor %} + def dispatch(self, data: bytes, context: Context) -> int: + header = MessageHeader.from_data(data) + object_id, opcode = header.object_id, header.opcode + if False: + pass + {% for incoming in interface.incoming %} + elif opcode == {{incoming.opcode}}: + consumed, args = self.unpack(data, signature="{{incoming.signature}}", names=[ + {% for arg in incoming.arguments %} + "{{arg.name}}", + {% endfor %} + ]) + logger.debug("dispatching", object=self, func="{{incoming.camel_name}}", args=args) + self.on{{incoming.camel_name}}(context, **args) + {% endfor %} + else: + raise NotImplementedError(f"Invalid opcode {opcode}") + + return consumed + + @classmethod + def create(cls, object_id: int, version: int): + incoming = { + {% for incoming in interface.incoming %} + {{incoming.opcode}}: "{{incoming.name}}", + {% endfor %} + } + outgoing = { + {% for outgoing in interface.outgoing %} + {{outgoing.opcode}}: "{{outgoing.name}}", + {% endfor %} + } + return cls(object_id=object_id, version=version, name="{{interface.name}}", incoming=incoming, outgoing=outgoing) + + +{% endfor %} + diff --git a/test/meson.build b/test/meson.build index 84567ca..d9c56d9 100644 --- a/test/meson.build +++ b/test/meson.build @@ -37,6 +37,14 @@ test('unit-tests-eis', c_args: ['-D_enable_tests_'], dependencies: [dep_unittest, dep_libutil])) +pymod = import('python') +required_python_modules = ['pytest', 'attr', 'structlog'] +if build_oeffis + required_python_modules += ['dbusmock'] +endif +pymod.find_installation('python3', modules: required_python_modules) +pytest = find_program('pytest-3', 'pytest') + if build_oeffis test('unit-tests-oeffis', executable('unit-tests-oeffis', @@ -48,10 +56,7 @@ if build_oeffis env = environment() env.set('LD_LIBRARY_PATH', meson.project_build_root()) - pymod = import('python') - pymod.find_installation('python3', modules: ['pytest', 'attr', 'dbusmock']) - pytest = find_program('pytest-3', 'pytest') - test('pytest', pytest, + test('oeffis-pytest', pytest, args: ['--verbose', '--log-level=DEBUG'], suite: 'python', workdir: meson.current_source_dir(), @@ -112,3 +117,32 @@ if add_languages('cpp', required: false) include_directories: [inc_src], install: false) endif + +eiproto_python_template = files('eiproto.py.tmpl') +eiproto_python = custom_target('eiproto.py', + input: protocol_xml, + output: 'eiproto.py', + command: [scanner, '--component=ei', '--output=@OUTPUT@', '@INPUT@', eiproto_python_template], + build_by_default: true) + +protocol_test_config = configuration_data() +protocol_test_config.set('LIBEI_TEST_SERVER', eis_demo_server.full_path()) +configure_file(input: 'test_protocol.py', + output: '@PLAINNAME@', + configuration: protocol_test_config) + +test('protocol-test', pytest, + args: ['--verbose', '--log-level=DEBUG'], + suite: 'python', + workdir: meson.project_build_root(), +) +if valgrind.found() + env = environment() + env.set('LIBEI_USE_VALGRIND', '1') + test('protocol-test-valgrind', pytest, + args: ['--verbose', '--log-level=DEBUG'], + suite: 'python', + workdir: meson.project_build_root(), + env: env + ) +endif diff --git a/test/test_protocol.py b/test/test_protocol.py new file mode 100644 index 0000000..c21ae16 --- /dev/null +++ b/test/test_protocol.py @@ -0,0 +1,826 @@ +#!/usr/bin/python3 +# +# SPDX-License-Identifier: MIT +# +# +# EIS protocol test suite. This suite tests an EIS implementation, by default the +# eis-demo-server to see how whether it handles protocol messages correctly. +# +# To test another implementation: +# - set LIBEI_TEST_SOCKET to the path your EIS implementation is listening on +# - set LIBEI_TEST_SERVER to the executable of your EIS implementation, +# or the empty string to connect to a running process +# +# To run $LIBEI_TEST_SERVER in valgrind, set LIBEI_USE_VALGRIND to a boolean true. +# +# e.g. +# $ export LIBEI_TEST_SOCKET=/run/user/1000/eis-0 +# $ export LIBEI_TEST_SERVER="" +# $ pytest3 -v --log-level=DEBUG -k 'some string' +# +# Will run that test against whatever is providing that socket. + +from typing import Generator, Optional +from pathlib import Path + +import attr +import itertools +import os +import pytest +import struct +import subprocess +import time +import shlex +import signal +import socket +import structlog + +try: + from eiproto import ( + hexlify, + Context, + Interface, + MessageHeader, + EiCallback, + EiConnection, + EiConnectionSetup, + EiSeat, + ) +except ImportError: + # This file needs to be processed by meson, so let's skip when this fails in the source dir + pytest.skip(allow_module_level=True) + + +logger = structlog.get_logger() + +VALGRIND_EXITCODE = 3 + + +def VERSION_V(v): + """Noop function that helps with grepping for hardcoded version numbers""" + return v + + +@pytest.fixture +def socketpath(tmp_path) -> Path: + test_socket_override = os.environ.get("LIBEI_TEST_SOCKET") + if test_socket_override: + return Path(test_socket_override) + return Path(tmp_path) / "eis-0" + + +@pytest.fixture +def valgrind() -> list[str]: + """ + Return the list of arguments to run our eis_executable in valgrind + """ + if bool(os.environ.get("LIBEI_USE_VALGRIND", False)): + valgrind = [ + "valgrind", + "--leak-check=full", + f"--error-exitcode={VALGRIND_EXITCODE}", + ] + else: + valgrind = [] + + return valgrind + + +@pytest.fixture +def eis_executable(valgrind, socketpath) -> Optional[list[str]]: + """ + Returns a list of arguments of the EIS executable to run, to be passed + into Popen. + + Returns None if we're expected to connect to an already running instance. + """ + program = os.environ.get("LIBEI_TEST_SERVER", None) + + # if the variable is empty, use an existing running server + if program == "": + return None + + # If it's not set at all, we use our eis-demo-server + if program is None: + program = f"@LIBEI_TEST_SERVER@ --socketpath={socketpath} --verbose" # set by meson to eis-demo-server + + return valgrind + shlex.split(program) + + +@pytest.fixture +def eis(socketpath, eis_executable) -> Generator["Eis", None, None]: + if not eis_executable: + yield Eis.create_existing_implementation(socketpath) + else: + eis = Eis.create(socketpath, eis_executable) + yield eis + eis.terminate() + + +@attr.s +class Ei: + sock: socket.socket = attr.ib() + context: Context = attr.ib() + connection: Optional[EiConnection] = attr.ib(default=None) + seats: list[EiSeat] = attr.ib(init=False, default=attr.Factory(list)) + object_ids: Generator[int, None, None] = attr.ib( + init=False, default=attr.Factory(lambda: itertools.count(3)) + ) + _data: bytes = attr.ib(init=False, default=attr.Factory(bytes)) # type: ignore + + @property + def data(self) -> bytes: + return self._data + + def send(self, msg: bytes) -> None: + logger.debug(f"sending {len(msg)} bytes", bytes=hexlify(msg)) + self.sock.sendmsg([msg]) + + def find_objects_by_interface(self, interface: str) -> list[Interface]: + return [o for o in self.context.objects.values() if o.name == interface] + + def callback_roundtrip(self) -> bool: + assert self.connection is not None + + cb = EiCallback.create(next(self.object_ids), VERSION_V(1)) + self.context.register(cb) + self.send(self.connection.Sync(cb.object_id)) + + return self.wait_for( + lambda: cb not in self.find_objects_by_interface("ei_callback") + ) + + @property + def connection_setup(self) -> EiConnectionSetup: + setup = self.context.objects[0] + assert isinstance(setup, EiConnectionSetup) + return setup + + def init_default_sender_connection(self) -> None: + setup = self.connection_setup + self.send(setup.ContextType(EiConnectionSetup.EiContextType.SENDER)) + self.send(setup.Name("test client")) + self.send( + setup.InterfaceVersion("ei_connection", VERSION_V(1)) + ) # this one is required + self.send(setup.InterfaceVersion("ei_callback", VERSION_V(1))) + self.send(setup.InterfaceVersion("ei_pingpong", VERSION_V(1))) + self.send(setup.InterfaceVersion("ei_seat", VERSION_V(1))) + self.send(setup.InterfaceVersion("ei_device", VERSION_V(1))) + self.send(setup.InterfaceVersion("ei_pointer", VERSION_V(1))) + self.send(setup.InterfaceVersion("ei_keyboard", VERSION_V(1))) + self.send(setup.InterfaceVersion("ei_touchscreen", VERSION_V(1))) + self.send(setup.Done()) + self.dispatch() + + def wait_for_seat(self, timeout=5) -> bool: + def seat_is_done(): + return self.seats and [ + call for call in self.seats[0].calllog if call.name == "Done" + ] + + return self.wait_for(seat_is_done, timeout) + + def wait_for_connection(self, timeout=5) -> bool: + return self.wait_for(lambda: self.connection is not None, timeout) + + def wait_for(self, callable, timeout=5) -> bool: + expire = time.time() + timeout + while not callable(): + self.dispatch() + if time.time() > expire: + return False + time.sleep(0.01) + + return True + + def recv(self) -> bytes: + try: + data = self.sock.recv(1024) + while data: + self._data += data + data = self.sock.recv(1024) + except BlockingIOError: + pass + return self.data + + def dispatch(self, timeout=0.01) -> None: + if not self.data: + expire = time.time() + timeout + while not self.recv(): + now = time.time() + if now >= expire: + break + time.sleep(min(0.01, expire - now)) + if now >= expire: + break + + while self.data: + logger.debug("data pending dispatch: ", bytes=hexlify(self.data[:64])) + header = MessageHeader.from_data(self.data) + logger.debug(f"dispatching message: ", header=header) + consumed = self.context.dispatch(self.data) + if consumed == 0: + break + self.pop(consumed) + + def pop(self, count: int) -> None: + self._data = self._data[count:] + + @classmethod + def create(cls, socketpath: Path): + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM | socket.SOCK_NONBLOCK) + while not socketpath.exists(): + time.sleep(0.01) + + for _ in range(3): + try: + sock.connect_ex(os.fspath(socketpath)) + break + except ConnectionRefusedError: + time.sleep(0.1) + else: + assert False, "Failed to connect to EIS" + + ctx = Context.create() + ei = cls(sock=sock, context=ctx) + + # callback for new objects + def register_cb(interface: Interface) -> None: + if isinstance(interface, EiConnection): + assert ei.connection is None + ei.connection = interface + + # Automatic ping/pong handler + def ping(conn, id, version, new_objects={}): + pingpong = new_objects["ping"] + ei.send(pingpong.Done(0)) + + ei.connection.connect("Ping", ping) + + elif isinstance(interface, EiSeat): + assert interface not in ei.seats + ei.seats.append(interface) + + def unregister_cb(interface: Interface) -> None: + if interface == ei.connection: + assert ei.connection is not None + ei.connection = None + elif interface in ei.seats: + ei.seats.remove(interface) + + ctx.connect("register", register_cb) + ctx.connect("unregister", unregister_cb) + + return ei + + +@attr.s +class Eis: + process: Optional[subprocess.Popen] = attr.ib() + ei: Ei = attr.ib() + _stdout: Optional[str] = attr.ib(init=False, default=None) + _stderr: Optional[str] = attr.ib(init=False, default=None) + + def terminate(self) -> None: + if self.process is None: + return + + def kill_gently(process) -> Generator[None, None, None]: + process.send_signal(signal.SIGINT) + yield + process.terminate() + yield + process.kill() + + stdout, stderr = None, None + for _ in kill_gently(self.process): + try: + stdout, stderr = self.process.communicate(timeout=1) + break + except subprocess.TimeoutExpired: + pass + + if stdout: + for line in stdout.split("\n"): + logger.info(line) + if stderr: + for line in stderr.split("\n"): + logger.info(line) + self.process.wait() + rc = self.process.returncode + if rc not in [0, -signal.SIGTERM]: + if rc == VALGRIND_EXITCODE: + assert ( + rc != VALGRIND_EXITCODE + ), "valgrind reported errors, see valgrind error messages" + else: + assert ( + rc == -signal.SIGTERM + ), f"Process exited with {signal.Signals(-rc).name}" + self.process = None # allow this to be called multiple times + + @classmethod + def create_existing_implementation(cls, socketpath) -> "Eis": + ei = Ei.create(socketpath) + return cls(process=None, ei=ei) + + @classmethod + def create(cls, socketpath, executable) -> "Eis": + process = subprocess.Popen( + executable, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="utf-8", + text=True, + bufsize=1, + universal_newlines=True, + ) + ei = Ei.create(socketpath) + return cls(process=process, ei=ei) + + +class TestEiProtocol: + @property + def using_demo_server(self) -> bool: + return "@LIBEI_TEST_SERVER@".endswith("eis-demo-server") + + def test_server_sends_version_event_immediately(self, eis): + """ + The server is expected to send ei_connection_setup.interface_version immediately + on connect + """ + ei = eis.ei + ei.dispatch() + + setup = ei.context.objects[0] + assert isinstance(setup, EiConnectionSetup) + + ei.wait_for(lambda: bool(setup.calllog)) + + call = setup.calllog[0] + assert call.name == "InterfaceVersion" + assert call.args["name"] == "ei_connection_setup" + assert call.args["version"] == VERSION_V(1) + + eis.terminate() + + def test_server_sends_interface_version_events(self, eis): + """ + The server is expected to send ei_connection_setup.interface_version immediately + on connect + """ + ei = eis.ei + ei.dispatch() + + setup = ei.connection_setup + + ei.init_default_sender_connection() + ei.dispatch() + ei.wait_for_connection() + + announced_interfaces = [] + + for call in setup.calllog: + if call.name == "InterfaceVersion": + announced_interfaces.append((call.args["name"], call.args["version"])) + + assert ("ei_callback", 1) in announced_interfaces + + eis.terminate() + + def test_send_wrong_context_type(self, eis): + """ + Connect with an invalid context type, expect to be disconnected + """ + ei = eis.ei + ei.dispatch() + + # Pick some random type (and make sure it's not a valid type in the current API) + invalid_type = 4 + try: + EiConnectionSetup.EiContextType(invalid_type) + assert ( + False + ), f"{invalid_type} should not be a valid ContextType, this test needs an update" + except ValueError: + pass + + ei.send(ei.connection_setup.ContextType(invalid_type)) + + try: + # The server either disconnects the socket because we sent garbage + # or immediately disconnects us after the .done request + ei.dispatch() + + for interface in ["ei_connection", "ei_callback", "ei_pingpong"]: + ei.send( + ei.connection_setup.InterfaceVersion(interface, VERSION_V(1)) + ) # these are required + ei.send(ei.connection_setup.Done()) + ei.dispatch() + + ei.wait_for_connection(timeout=1) + + # ok, still not socket-disconnected, let's make sure + # we did immediately get a Disconnected message + if ei.connection: + assert ei.connection is not None + call = ei.connection.calllog[0] + assert call.name == "Disconnected" + assert call.args["reason"] == EiConnection.EiDisconnectReason.ERROR + assert call.args["explanation"] is not None + + # Now let's trigger a BrokenPipeError + ei.send(bytes(16)) + + assert False, "The server should have disconnected us" + + except (ConnectionResetError, BrokenPipeError): + pass + + eis.terminate() + + def test_connect_and_disconnect(self, eis): + """ + Connect to the server with a valid sequence, then disconnect + once we get the connection object + """ + ei = eis.ei + + # drain any messages + ei.dispatch() + + # Establish our connection + ei.init_default_sender_connection() + ei.wait_for_connection() + + # This should've set our connection object + assert ei.connection is not None + connection = ei.connection + ei.send(connection.Disconnect()) + try: + # Send disconnect twice, just to test that case, should be ignored by the + # server + ei.send(connection.Disconnect()) + + ei.dispatch() + time.sleep(0.1) + ei.dispatch() + except (ConnectionResetError, BrokenPipeError): + pass + + ei.wait_for( + lambda: bool([c for c in connection.calllog if c.name == "Disconnected"]) + ) + + for call in connection.calllog: + if call.name == "Disconnected": + assert call.args["explanation"] is None + assert ( + call.args["reason"] == EiConnection.EiDisconnectReason.DISCONNECTED + ) + break + else: + assert ( + False + ), f"Expected Disconnected event, got none in {connection.calllog}" + + try: + ei.send(connection.Disconnect()) + assert False, "Expected socket to be closed" + except BrokenPipeError: + pass + + eis.terminate() + + def test_connect_receive_seat(self, eis): + """ + Ensure we get a seat object after setting our connection + """ + ei = eis.ei + ei.dispatch() + setup = ei.connection_setup + + # Establish our connection + ei.send(setup.ContextType(EiConnectionSetup.EiContextType.SENDER)) + ei.send(setup.Name("test client")) + for interface in ["ei_connection", "ei_callback", "ei_pingpong"]: + ei.send( + setup.InterfaceVersion(interface, VERSION_V(1)) + ) # these are required + ei.send(setup.InterfaceVersion("ei_seat", VERSION_V(100))) # excessive version + ei.send( + setup.InterfaceVersion("ei_device", VERSION_V(100)) + ) # excessive version + ei.send(setup.Done()) + ei.dispatch() + + ei.wait_for_seat() + + assert ei.seats + for seat in ei.seats: + assert seat.version == 1 # we have 100, but the server only has 1 + for call in seat.calllog: + if call.name == "Capabilities": + assert call.args["capabilities"] != 0 + if self.using_demo_server: + caps = call.args["capabilities"] + assert ( + caps + == EiSeat.EiCapabilities.POINTER + | EiSeat.EiCapabilities.POINTER_ABSOLUTE + | EiSeat.EiCapabilities.KEYBOARD + | EiSeat.EiCapabilities.TOUCHSCREEN + ) + break + else: + assert ( + False + ), f"Expected ei_seat.capabilities, but got none in {seat.calllog}" + + for call in seat.calllog: + if call.name == "Name": + assert call.args["name"] is not None + if self.using_demo_server: + assert call.args["name"] == "default" + break + else: + assert False, f"Expected ei_seat.name, but got none in {seat.calllog}" + + for call in seat.calllog: + if call.name == "Done": + break + else: + assert False, f"Expected ei_seat.done, but got none in {seat.calllog}" + + def test_connect_no_seat_without_ei_seat(self, eis): + """ + Ensure we do not get a seat object if we don't announce support for ei_seat + """ + ei = eis.ei + ei.dispatch() + setup = ei.connection_setup + + # Establish our connection + ei.send(setup.ContextType(EiConnectionSetup.EiContextType.SENDER)) + ei.send(setup.Name("test client")) + for interface in ["ei_connection", "ei_callback", "ei_pingpong"]: + ei.send( + setup.InterfaceVersion(interface, VERSION_V(1)) + ) # these are required + # Do not announce ei_seat support + ei.send(setup.Done()) + ei.dispatch() + + assert not ei.seats + eis.terminate() + + def test_seat_bind_no_caps(self, eis): + """ + Ensure nothing happens if we bind to a seat with capabilities outside what is supported + """ + ei = eis.ei + ei.dispatch() + ei.init_default_sender_connection() + ei.wait_for_seat() + + seat = ei.seats[0] + ei.send(seat.Bind(0x00)) # binding to no caps is fine + ei.dispatch() + time.sleep(0.1) + ei.dispatch() + + eis.terminate() + + def test_seat_bind_invalid_caps_expect_disconnection(self, eis): + ei = eis.ei + ei.dispatch() + ei.init_default_sender_connection() + ei.wait_for_seat() + + connection = ei.connection + assert connection is not None + seat = ei.seats[0] + ei.send(seat.Bind(0x40)) # binding to invalid caps should get us disconnected + try: + ei.dispatch() + time.sleep(0.1) + ei.dispatch() + + for call in seat.calllog: + if call.name == "Destroyed": + break + else: + assert False, "Expected seat to get destroyed but didn't" + + for call in connection.calllog: + if call.name == "Disconnected": + assert call.args["reason"] == EiConnection.EiDisconnectReason.VALUE + assert "Invalid capabilities" in call.args["explanation"] + break + else: + assert False, "Expected disconnection event" + except ConnectionResetError: + pass + + eis.terminate() + + @pytest.mark.parametrize("bind_first", (True, False)) + def test_seat_release_expect_destroyed(self, eis, bind_first): + ei = eis.ei + ei.dispatch() + ei.init_default_sender_connection() + ei.wait_for_seat() + + seat = ei.seats[0] + + have_seat_destroyed = False + + def destroyed_cb(_, serial): + nonlocal have_seat_destroyed + have_seat_destroyed = True + + seat.connect("Destroyed", destroyed_cb) + if bind_first: + ei.send(seat.Bind(EiSeat.EiCapabilities.POINTER)) + ei.send(seat.Release()) + + ei.dispatch() + ei.wait_for(lambda: have_seat_destroyed) + + def test_connection_sync(self, eis): + """ + Test the ei_connection.sync() callback mechanism + """ + + ei = eis.ei + ei.dispatch() + ei.init_default_sender_connection() + ei.wait_for_seat() + + cb = EiCallback.create(next(ei.object_ids), VERSION_V(1)) + ei.context.register(cb) + assert ei.connection is not None + ei.send(ei.connection.Sync(cb.object_id)) + ei.dispatch() + + assert cb.calllog[0].name == "Done" + assert cb.calllog[0].args["callback_data"] == 0 # hardcoded in libeis for now + + def test_invalid_object(self, eis): + """ + Send a message for an invalid object and ensure we get the event back + """ + + ei = eis.ei + ei.dispatch() + ei.init_default_sender_connection() + ei.wait_for_seat() + + seat: EiSeat = ei.seats[0] + + have_invalid_object_event = False + have_sync = False + + def invalid_object_cb(_, last_serial, id): + nonlocal have_invalid_object_event + assert id == seat.object_id + have_invalid_object_event = True + + ei.connection.connect("InvalidObject", invalid_object_cb) + + release = seat.Release() + + ei.send(release) + ei.dispatch() + cb = EiCallback.create(next(ei.object_ids), VERSION_V(1)) + ei.context.register(cb) + + def sync_cb(_, unused): + nonlocal have_sync + have_sync = True + + cb.connect("Done", sync_cb) + + # Send the invalid object request + ei.send(release) + + ei.send(ei.connection.Sync(cb.object_id)) + ei.wait_for(lambda: have_sync) + + assert have_invalid_object_event, "Expected invalid_object event, got none" + + def test_disconnect_before_setup_done(self, eis): + ei = eis.ei + ei.dispatch() + ei.send(ei.connection_setup.ContextType(EiConnectionSetup.EiContextType.SENDER)) + ei.sock.close() + time.sleep(0.5) + # Not much we can test here other than hoping the EIS implementation doesn't segfault + + @pytest.mark.parametrize( + "missing_interface", + ( + "ei_callback", + "ei_connection", + "ei_pingpong", + "ei_seat", + "ei_device", + "ei_pointer", + ), + ) + def test_connect_without_ei_interfaces(self, eis, missing_interface): + ei = eis.ei + ei.dispatch() + + setup = ei.connection_setup + ei.send(setup.ContextType(EiConnectionSetup.EiContextType.SENDER)) + ei.send(setup.Name("test client")) + + for interface in [ + "ei_connection", + "ei_callback", + "ei_pingpong", + "ei_seat", + "ei_device", + "ei_pointer", + "ei_keyboard", + "ei_touchscreen", + ]: + if interface != missing_interface: + ei.send(setup.InterfaceVersion(interface, 1)) + + @attr.s + class Status: + connected: bool = attr.ib(default=False) + disconnected: bool = attr.ib(default=False) + seats: bool = attr.ib(default=False) + devices: bool = attr.ib(default=False) + + status = Status() + + try: + + def on_device(seat, id, version, new_objects={}): + assert missing_interface not in ["ei_device"] + status.devices = True + + def on_seat(connection, id, version, new_objects={}): + assert missing_interface not in ["ei_seat"] + seat = new_objects["seat"] + assert seat is not None + seat.connect("Device", on_device) + ei.send(seat.Bind(EiSeat.EiCapabilities.POINTER)) + status.seats = True + + def on_disconnected(connection, last_serial, reason, explanation): + assert missing_interface in ["ei_seat", "ei_device"] + status.disconnected = True + + def on_connection(setup, serial, id, version, new_objects={}): + # these three must be present, otherwise we get disconnected + assert missing_interface not in [ + "ei_connection", + "ei_callback", + "ei_pingpong", + ] + status.connected = True + connection = new_objects["connection"] + assert connection is not None + connection.connect("Seat", on_seat) + connection.connect("Disconnected", on_disconnected) + + setup.connect("Connection", on_connection) + + ei.send(setup.Done()) + ei.dispatch() + + if missing_interface in ["ei_connection", "ei_callback", "ei_pingpong"]: + # valgrind is slow, so let's wait for it to catch up + time.sleep(0.3) + ei.dispatch() + assert not status.connected + assert not status.disconnected # we never get the Disconnected event + ei.send(bytes(16)) + assert False, "We should've been disconnected by now" + + ei.wait_for(lambda: status.connected) + if missing_interface in ["ei_device", "ei_seat"]: + assert ei.wait_for(lambda: status.disconnected) + assert ( + status.disconnected + ), f"Expected to be disconnected for missing {missing_interface}" + else: + assert ( + missing_interface == "ei_pointer" + ) # otherwise we shouldn't get here + assert ei.callback_roundtrip(), "Callback roundtrip failed" + assert status.connected + assert not status.disconnected + assert ei.wait_for(lambda: status.seats) + assert ei.wait_for(lambda: status.devices) + assert status.devices + + except BrokenPipeError: + assert missing_interface in ["ei_connection", "ei_callback", "ei_pingpong"]