mirror of
https://gitlab.freedesktop.org/libinput/libei.git
synced 2025-12-20 06:50:06 +01:00
341 lines
9.5 KiB
Python
341 lines
9.5 KiB
Python
# SPDX-License-Identifier: MIT
|
|
#
|
|
# This file is formatted with Python Black
|
|
|
|
from templates import Request, Response, Session, MockParams
|
|
from typing import Dict
|
|
from itertools import count
|
|
|
|
import dbus
|
|
import dbus.service
|
|
import enum
|
|
import logging
|
|
|
|
logger = logging.getLogger(f"templates.{__name__}")
|
|
|
|
BUS_NAME = "org.freedesktop.portal.Desktop"
|
|
MAIN_OBJ = "/org/freedesktop/portal/desktop"
|
|
SYSTEM_BUS = False
|
|
MAIN_IFACE = "org.freedesktop.portal.RemoteDesktop"
|
|
|
|
_restore_tokens = count()
|
|
|
|
|
|
class RDSession(Session):
|
|
class State(enum.IntEnum):
|
|
CREATED = enum.auto()
|
|
SELECTED = enum.auto()
|
|
STARTED = enum.auto()
|
|
CONNECTED = enum.auto()
|
|
|
|
@property
|
|
def state(self):
|
|
try:
|
|
return self._session_state
|
|
except AttributeError:
|
|
self._session_state = RDSession.State.CREATED
|
|
return self._session_state
|
|
|
|
def advance_state(self):
|
|
if self.state != RDSession.State.CONNECTED:
|
|
self._session_state += 1
|
|
|
|
|
|
def load(mock, parameters):
|
|
logger.debug(f"loading {MAIN_IFACE} template")
|
|
|
|
params = MockParams.get(mock, MAIN_IFACE)
|
|
params.delay = 500
|
|
params.version = parameters.get("version", 2)
|
|
params.response = parameters.get("response", 0)
|
|
params.devices = parameters.get("devices", 0b111)
|
|
params.sessions: Dict[str, RDSession] = {}
|
|
|
|
mock.AddProperties(
|
|
MAIN_IFACE,
|
|
dbus.Dictionary(
|
|
{
|
|
"version": dbus.UInt32(params.version),
|
|
"AvailableDeviceTypes": dbus.UInt32(
|
|
parameters.get("device-types", params.devices)
|
|
),
|
|
}
|
|
),
|
|
)
|
|
|
|
|
|
@dbus.service.method(
|
|
MAIN_IFACE,
|
|
sender_keyword="sender",
|
|
in_signature="a{sv}",
|
|
out_signature="o",
|
|
)
|
|
def CreateSession(self, options, sender):
|
|
try:
|
|
logger.debug(f"CreateSession: {options}")
|
|
params = MockParams.get(self, MAIN_IFACE)
|
|
request = Request(bus_name=self.bus_name, sender=sender, options=options)
|
|
|
|
session = RDSession(bus_name=self.bus_name, sender=sender, options=options)
|
|
params.sessions[session.handle] = session
|
|
|
|
response = Response(params.response, {"session_handle": session.handle})
|
|
|
|
request.respond(response, delay=params.delay)
|
|
|
|
return request.handle
|
|
except Exception as e:
|
|
logger.critical(e)
|
|
|
|
|
|
@dbus.service.method(
|
|
MAIN_IFACE,
|
|
sender_keyword="sender",
|
|
in_signature="oa{sv}",
|
|
out_signature="o",
|
|
)
|
|
def SelectDevices(self, session_handle, options, sender):
|
|
try:
|
|
logger.debug(f"SelectDevices: {session_handle} {options}")
|
|
params = MockParams.get(self, MAIN_IFACE)
|
|
request = Request(bus_name=self.bus_name, sender=sender, options=options)
|
|
|
|
try:
|
|
session = params.sessions[session_handle]
|
|
if session.state != RDSession.State.CREATED:
|
|
raise dbus.exceptions.DBusException(
|
|
f"Session in state {session.state}, expected CREATED",
|
|
name="org.freedesktop.DBus.Error.AccessDenied",
|
|
)
|
|
else:
|
|
resp = params.response
|
|
if resp == 0:
|
|
session.advance_state()
|
|
except KeyError:
|
|
raise dbus.exceptions.DBusException(
|
|
"Invalid session", name="org.freedesktop.DBus.Error.AccessDenied"
|
|
)
|
|
|
|
response = Response(resp, {})
|
|
request.respond(response, delay=params.delay)
|
|
|
|
return request.handle
|
|
except Exception as e:
|
|
logger.critical(e)
|
|
if isinstance(e, dbus.exceptions.DBusException):
|
|
raise e
|
|
|
|
|
|
@dbus.service.method(
|
|
MAIN_IFACE,
|
|
sender_keyword="sender",
|
|
in_signature="osa{sv}",
|
|
out_signature="o",
|
|
)
|
|
def Start(self, session_handle, parent_window, options, sender):
|
|
try:
|
|
logger.debug(f"Start: {session_handle} {options}")
|
|
params = MockParams.get(self, MAIN_IFACE)
|
|
request = Request(bus_name=self.bus_name, sender=sender, options=options)
|
|
|
|
results = {
|
|
"devices": dbus.UInt32(params.devices),
|
|
}
|
|
|
|
try:
|
|
session = params.sessions[session_handle]
|
|
if session.state != RDSession.State.SELECTED:
|
|
raise dbus.exceptions.DBusException(
|
|
f"Session in state {session.state}, expected SELECTED",
|
|
name="org.freedesktop.DBus.Error.AccessDenied",
|
|
)
|
|
else:
|
|
resp = params.response
|
|
if resp == 0:
|
|
session.advance_state()
|
|
except KeyError:
|
|
raise dbus.exceptions.DBusException(
|
|
"Invalid session", name="org.freedesktop.DBus.Error.AccessDenied"
|
|
)
|
|
|
|
response = Response(resp, results)
|
|
request.respond(response, delay=params.delay)
|
|
|
|
return request.handle
|
|
except Exception as e:
|
|
logger.critical(e)
|
|
if isinstance(e, dbus.exceptions.DBusException):
|
|
raise e
|
|
|
|
|
|
@dbus.service.method(
|
|
MAIN_IFACE,
|
|
in_signature="oa{sv}dd",
|
|
out_signature="",
|
|
)
|
|
def NotifyPointerMotion(self, session_handle, options, dx, dy):
|
|
try:
|
|
logger.debug(f"NotifyPointerMotion: {session_handle} {options} {dx} {dy}")
|
|
except Exception as e:
|
|
logger.critical(e)
|
|
|
|
|
|
@dbus.service.method(
|
|
MAIN_IFACE,
|
|
in_signature="oa{sv}udd",
|
|
out_signature="",
|
|
)
|
|
def NotifyPointerMotionAbsolute(self, session_handle, options, stream, x, y):
|
|
try:
|
|
logger.debug(
|
|
f"NotifyPointerMotionAbsolute: {session_handle} {options} {stream} {x} {y}"
|
|
)
|
|
except Exception as e:
|
|
logger.critical(e)
|
|
|
|
|
|
@dbus.service.method(
|
|
MAIN_IFACE,
|
|
in_signature="oa{sv}iu",
|
|
out_signature="",
|
|
)
|
|
def NotifyPointerButton(self, session_handle, options, button, state):
|
|
try:
|
|
logger.debug(
|
|
f"NotifyPointerButton: {session_handle} {options} {button} {state}"
|
|
)
|
|
except Exception as e:
|
|
logger.critical(e)
|
|
|
|
|
|
@dbus.service.method(
|
|
MAIN_IFACE,
|
|
in_signature="oa{sv}dd",
|
|
out_signature="",
|
|
)
|
|
def NotifyPointerAxis(self, session_handle, options, dx, dy):
|
|
try:
|
|
logger.debug(f"NotifyPointerAxis: {session_handle} {options} {dx} {dx}")
|
|
except Exception as e:
|
|
logger.critical(e)
|
|
|
|
|
|
@dbus.service.method(
|
|
MAIN_IFACE,
|
|
in_signature="oa{sv}ui",
|
|
out_signature="",
|
|
)
|
|
def NotifyPointerAxisDiscrete(self, session_handle, options, axis, steps):
|
|
try:
|
|
logger.debug(
|
|
f"NotifyPointerAxisDiscrete: {session_handle} {options} {axis} {steps}"
|
|
)
|
|
except Exception as e:
|
|
logger.critical(e)
|
|
|
|
|
|
@dbus.service.method(
|
|
MAIN_IFACE,
|
|
in_signature="oa{sv}iu",
|
|
out_signature="",
|
|
)
|
|
def NotifyKeyboardKeycode(self, session_handle, options, keycode, state):
|
|
try:
|
|
logger.debug(
|
|
f"NotifyKeyboardKeycode: {session_handle} {options} {keycode} {state}"
|
|
)
|
|
except Exception as e:
|
|
logger.critical(e)
|
|
|
|
|
|
@dbus.service.method(
|
|
MAIN_IFACE,
|
|
in_signature="oa{sv}iu",
|
|
out_signature="",
|
|
)
|
|
def NotifyKeyboardKeysym(self, session_handle, options, keysym, state):
|
|
try:
|
|
logger.debug(
|
|
f"NotifyKeyboardKeysym: {session_handle} {options} {keysym} {state}"
|
|
)
|
|
except Exception as e:
|
|
logger.critical(e)
|
|
|
|
|
|
@dbus.service.method(
|
|
MAIN_IFACE,
|
|
in_signature="oa{sv}uudd",
|
|
out_signature="",
|
|
)
|
|
def NotifyTouchDown(self, session_handle, options, stream, slot, x, y):
|
|
try:
|
|
logger.debug(
|
|
f"NotifyTouchDown: {session_handle} {options} {stream} {slot} {x} {y}"
|
|
)
|
|
except Exception as e:
|
|
logger.critical(e)
|
|
|
|
|
|
@dbus.service.method(
|
|
MAIN_IFACE,
|
|
in_signature="oa{sv}uudd",
|
|
out_signature="",
|
|
)
|
|
def NotifyTouchMotion(self, session_handle, options, stream, slot, x, y):
|
|
try:
|
|
logger.debug(
|
|
f"NotifyTouchMotion: {session_handle} {options} {stream} {slot} {x} {y}"
|
|
)
|
|
except Exception as e:
|
|
logger.critical(e)
|
|
|
|
|
|
@dbus.service.method(
|
|
MAIN_IFACE,
|
|
in_signature="oa{sv}u",
|
|
out_signature="",
|
|
)
|
|
def NotifyTouchUp(self, session_handle, options, slot):
|
|
try:
|
|
logger.debug(f"NotifyTouchMotion: {session_handle} {options} {slot}")
|
|
except Exception as e:
|
|
logger.critical(e)
|
|
|
|
|
|
@dbus.service.method(
|
|
MAIN_IFACE,
|
|
in_signature="oa{sv}",
|
|
out_signature="h",
|
|
)
|
|
def ConnectToEIS(self, session_handle, options):
|
|
try:
|
|
logger.debug(f"ConnectToEIS: {session_handle} {options}")
|
|
|
|
params = MockParams.get(self, MAIN_IFACE)
|
|
try:
|
|
session = params.sessions[session_handle]
|
|
if session.state != RDSession.State.STARTED:
|
|
logger.error(f"Session in state {session.state}, expected STARTED")
|
|
raise dbus.exceptions.DBusException(
|
|
"Session must be started before ConnectToEIS",
|
|
name="org.freedesktop.DBus.Error.AccesDenied",
|
|
)
|
|
except KeyError:
|
|
raise dbus.exceptions.DBusException(
|
|
"Invalid session", name="org.freedesktop.DBus.Error.AccessDenied"
|
|
)
|
|
|
|
import socket
|
|
|
|
sockets = socket.socketpair()
|
|
# Write some random data down so it'll break anything that actually
|
|
# expects the socket to be a real EIS socket, plus it makes it
|
|
# easy to check if we connected to the right EIS socket in our tests.
|
|
sockets[0].send(b"VANILLA")
|
|
fd = sockets[1]
|
|
logger.debug(f"ConnectToEIS with fd {fd.fileno()}")
|
|
return dbus.types.UnixFd(fd)
|
|
except Exception as e:
|
|
logger.critical(e)
|
|
if isinstance(e, dbus.exceptions.DBusException):
|
|
raise e
|