libei/test/test_oeffis.py

393 lines
12 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
#
# SPDX-License-Identifier: MIT
#
# This file is formatted with Python Black
#
# Introduction
# ============
#
# This is a Python-based test suite making use of DBusMock to test the
# liboeffis.so C library.
#
# The main components are:
# - LibOeffis: the Python class wrapping liboeffis.so via ctypes.
# This is a manually maintained mapping, any API additions/changes must
# updated here.
# - Oeffis: a pythonic wrapper around LibOeffis so the tests look like Python
# - TestOeffis: the test class for all tests that must talk to DBus
#
# DBusMock integration
# ====================
#
# DBusMock works in that a **separate process** is started that provides a
# DBus session bus that our tests connect to. Templates for dbusmock provide
# the behavior of that bus. Note that the mocked bus cannot be controlled
# from our test code, it's a separate process. Unless you set up special
# DBus signals/methods to talk to it, which we don't.
#
# Any test that requires DBus looks like this:
#
# ```python
# class TestOeffis():
# ....
# def test_foo(self):
# params = {}
# self.setup_daemon(params)
# # now you can talk to the RemoteDesktop portal
# ```
# See the RemoteDesktop template for parameters that can be passed in.
#
# DBusMock templates
# ------------------
#
# See the templates/ directory for the templates used by DBusMock. Templates
# are named after the portal. Available parameters are in the `load()` function.
#
from ctypes import c_char_p, c_int, c_uint32, c_void_p
from typing import Dict, List, Tuple, Type, Optional, TextIO
from gi.repository import GLib # type: ignore
from dbus.mainloop.glib import DBusGMainLoop
import attr
import ctypes
import dbus
import dbusmock
import fcntl
import os
import pytest
import socket
import subprocess
DBusGMainLoop(set_as_default=True)
PREFIX = "oeffis_"
@attr.s
class _Api:
name: str = attr.ib()
args: Tuple[Type[ctypes._SimpleCData], ...] = attr.ib()
return_type: Optional[Type[ctypes._SimpleCData]] = attr.ib()
@property
def basename(self) -> str:
return self.name[len(PREFIX) :]
@attr.s
class _Enum:
name: str = attr.ib()
value: int = attr.ib()
@property
def basename(self) -> str:
return self.name[len(PREFIX) :]
class LibOeffis(object):
"""
liboeffis.so wrapper. This is a singleton ctypes wrapper into liboeffis.so with
minimal processing. Example:
>>> lib = LibOeffis.instance()
>>> ctx = lib.oeffis_new(None)
>>> lib.oeffis_unref(ctx)
>>> print(lib.OEFFIS_EVENT_CLOSED)
In most cases you probably want to use the ``Oeffis`` class instead.
"""
_lib = None
@staticmethod
def _cdll():
return ctypes.CDLL("liboeffis.so", use_errno=True)
@classmethod
def _load(cls):
cls._lib = cls._cdll()
for api in cls._api_prototypes:
func = getattr(cls._lib, api.name)
func.argtypes = api.args
func.restype = api.return_type
setattr(cls, api.name, func)
for e in cls._enums:
setattr(cls, e.name, e.value)
_api_prototypes: List[_Api] = [
_Api(name="oeffis_new", args=(c_void_p,), return_type=c_void_p),
_Api(name="oeffis_ref", args=(c_void_p,), return_type=c_void_p),
_Api(name="oeffis_unref", args=(c_void_p,), return_type=c_void_p),
_Api(name="oeffis_set_user_data", args=(c_void_p, c_void_p), return_type=None),
_Api(name="oeffis_get_user_data", args=(c_void_p,), return_type=c_void_p),
_Api(name="oeffis_get_fd", args=(c_void_p,), return_type=c_int),
_Api(name="oeffis_get_eis_fd", args=(c_void_p,), return_type=c_int),
_Api(name="oeffis_create_session", args=(c_void_p, c_uint32), return_type=None),
_Api(
name="oeffis_create_session_on_bus",
args=(c_void_p, c_char_p, c_uint32),
return_type=None,
),
_Api(name="oeffis_dispatch", args=(c_void_p,), return_type=None),
_Api(name="oeffis_get_event", args=(c_void_p,), return_type=c_int),
_Api(name="oeffis_get_error_message", args=(c_void_p,), return_type=c_char_p),
]
_enums: List[_Enum] = [
_Enum(name="OEFFIS_DEVICE_ALL_DEVICES", value=0),
_Enum(name="OEFFIS_DEVICE_KEYBOARD", value=1),
_Enum(name="OEFFIS_DEVICE_POINTER", value=2),
_Enum(name="OEFFIS_DEVICE_TOUCHSCREEN", value=4),
_Enum(name="OEFFIS_EVENT_NONE", value=0),
_Enum(name="OEFFIS_EVENT_CONNECTED_TO_EIS", value=1),
_Enum(name="OEFFIS_EVENT_CLOSED", value=2),
_Enum(name="OEFFIS_EVENT_DISCONNECTED", value=3),
]
@classmethod
def instance(cls):
if cls._lib is None:
cls._load()
return cls
class Oeffis:
"""
Convenience wrapper to make using liboeffis a bit more pythonic.
>>> o = Oeffis()
>>> fd = o.fd
>>> o.create_session(o.DEVICE_POINTER)
"""
def __init__(self, userdata=None):
l = LibOeffis.instance()
self.ctx = l.oeffis_new(userdata) # type: ignore
def wrapper(func):
return lambda *args, **kwargs: func(self.ctx, *args, **kwargs)
for api in l._api_prototypes:
# skip some APIs that are not be exposed because they don't make sense
# to have in python.
if api.name not in (
"oeffis_ref",
"oeffis_unref",
"oeffis_get_user_data",
"oeffis_set_user_data",
):
func = getattr(l, api.name)
setattr(self, api.basename, wrapper(func))
for e in l._enums:
val = getattr(l, e.name)
setattr(self, e.basename, val)
@property
def fd(self) -> TextIO:
"""
Return the fd we need to monitor for oeffis_dispatch()
"""
return os.fdopen(self.get_fd(), "rb") # type: ignore
@property
def eis_fd(self) -> Optional[socket.socket]:
"""Return the socket connecting us to the EIS implementation or None if we're not ready/disconnected"""
fd = self.get_eis_fd() # type: ignore
if fd != -1:
return socket.socket(fileno=fd)
else:
return None
@property
def error_message(self) -> Optional[str]:
return self.get_error_message() # type: ignore
def __del__(self):
LibOeffis.instance().oeffis_unref(self.ctx) # type: ignore
@pytest.fixture()
def liboeffis():
return LibOeffis.instance()
def test_ref_unref(liboeffis):
o = liboeffis.oeffis_new(None)
assert o is not None
o2 = liboeffis.oeffis_ref(o)
assert o2 == o
assert liboeffis.oeffis_unref(o) is None
assert liboeffis.oeffis_unref(o2) is None
assert liboeffis.oeffis_unref(None) is None
def test_set_user_data(liboeffis):
o = liboeffis.oeffis_new(None)
assert liboeffis.oeffis_get_user_data(o) is None
liboeffis.oeffis_unref(o)
data = ctypes.pointer(ctypes.c_int(52))
o = liboeffis.oeffis_new(data)
assert o is not None
def test_ctx():
oeffis = Oeffis()
assert oeffis.error_message is None
fd = oeffis.fd
assert fd is not None
eisfd = oeffis.eis_fd
assert eisfd is None
def test_error_out():
# Bus doesn't exist
oeffis = Oeffis()
oeffis.create_session_on_bus(b"org.freedesktop.OeffisTest", oeffis.DEVICE_POINTER)
oeffis.dispatch()
e = oeffis.get_event()
assert e == oeffis.EVENT_DISCONNECTED
assert oeffis.error_message is not None
# Uncomment this to have dbus-monitor listen on the normal session address
# rather than the test DBus. This can be useful for cases where *something*
# messes up and tests run against the wrong bus.
#
# session_dbus_address = os.environ["DBUS_SESSION_BUS_ADDRESS"]
def start_dbus_monitor() -> "subprocess.Process":
import subprocess
env = os.environ.copy()
try:
env["DBUS_SESSION_BUS_ADDRESS"] = session_dbus_address
except NameError:
# See comment above
pass
argv = ["dbus-monitor", "--session"]
mon = subprocess.Popen(argv, env=env)
def stop_dbus_monitor():
mon.terminate()
mon.wait()
GLib.timeout_add(2000, stop_dbus_monitor)
return mon
class TestOeffis(dbusmock.DBusTestCase):
"""
Test class that sets up a mocked DBus session bus to be used by liboeffis.so.
"""
@classmethod
def setUpClass(cls):
cls.PORTAL_NAME = "RemoteDesktop"
cls.INTERFACE_NAME = f"org.freedesktop.portal.{cls.PORTAL_NAME}"
def setUp(self):
self.p_mock = None
self._mainloop = None
self.dbus_monitor = None
def setup_daemon(self, params=None, extra_templates: List[Tuple[str, Dict]] = []):
"""
Start a DBusMock daemon in a separate process.
If extra_templates is specified, it is a list of tuples with the
portal name as first value and the param dict to be passed to that
template as second value, e.g. ("ScreenCast", {...}).
"""
self.start_session_bus()
self.p_mock, self.obj_portal = self.spawn_server_template(
template=f"templates/{self.PORTAL_NAME.lower()}.py",
parameters=params or {},
stdout=subprocess.PIPE,
)
flags = fcntl.fcntl(self.p_mock.stdout, fcntl.F_GETFL)
fcntl.fcntl(self.p_mock.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK)
self.mock_interface = dbus.Interface(self.obj_portal, dbusmock.MOCK_IFACE)
self.properties_interface = dbus.Interface(
self.obj_portal, dbus.PROPERTIES_IFACE
)
self.portal_interface = dbus.Interface(self.obj_portal, self.INTERFACE_NAME)
for t, tparams in extra_templates:
template = f"templates/{t.lower()}.py"
self.obj_portal.AddTemplate(
template,
dbus.Dictionary(tparams, signature="sv"),
dbus_interface=dbusmock.MOCK_IFACE,
)
self.dbus_monitor = start_dbus_monitor()
self._mainloop = None
def tearDown(self):
if self.p_mock:
if self.p_mock.stdout:
out = (self.p_mock.stdout.read() or b"").decode("utf-8")
if out:
print(out)
self.p_mock.stdout.close()
self.p_mock.terminate()
self.p_mock.wait()
if self.dbus_monitor:
self.dbus_monitor.terminate()
self.dbus_monitor.wait()
@property
def mainloop(self):
"""
The mainloop for this test. This mainloop automatically quits after a
fixed timeout, but only on the first run. That's usually enough for
tests, if you need to call mainloop.run() repeatedly ensure that a
timeout handler is set to ensure quick test case failure in case of
error.
"""
if self._mainloop is None:
def quit():
self._mainloop.quit() # type: ignore
self._mainloop = None
self._mainloop = GLib.MainLoop()
GLib.timeout_add(2000, quit)
return self._mainloop
def test_create_session(self):
self.setup_daemon()
oeffis = Oeffis()
oeffis.create_session(oeffis.DEVICE_POINTER | oeffis.DEVICE_KEYBOARD) # type: ignore
oeffis.dispatch() # type: ignore
def _dispatch(source, condition):
oeffis.dispatch() # type: ignore
return True
GLib.io_add_watch(oeffis.fd, 0, GLib.IO_IN, _dispatch)
self.mainloop.run()
e = oeffis.get_event() # type: ignore
assert e == oeffis.EVENT_CONNECTED_TO_EIS, oeffis.error_message # type: ignore
eisfd = oeffis.eis_fd
assert eisfd is not None
assert eisfd.recv(64) == b"VANILLA" # that's what the template sends