mirror of
https://gitlab.freedesktop.org/libinput/libei.git
synced 2026-05-03 18:28:02 +02:00
This protocol is wayland-like though it uses a slightly different message format. The XML file uses the same structure, except for the "fixed" type which is "float" here. The scanner uses a jinja template to generate source and header files for ei and eis which are now used instead of the protobuf-generated objects. Note that the scanner is a minimal working version, some features like enum value checks are not yet implemented. Unlike wayland we do not need to generate the libwayland-like library, we only need the wire protocol parser - some shortcuts can thus be taken. To keep the changes simple, the protocol currently is a flat protocol with only one interface and all messages copied over from the previous ei.proto file. In future commits, this will be moved to the respective interfaces instead.
405 lines
12 KiB
Python
Executable file
405 lines
12 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
#
|
|
# SPDX-License-Identifier: MIT
|
|
|
|
from typing import Optional, Union
|
|
from pathlib import Path
|
|
|
|
import argparse
|
|
import attr
|
|
import jinja2
|
|
import jinja2.environment
|
|
import os
|
|
import sys
|
|
import xml.sax
|
|
import xml.sax.handler
|
|
import xml.sax._exceptions
|
|
|
|
|
|
def proto_to_type(proto: str) -> Optional[str]:
|
|
"""
|
|
Conversion of protocol types to the signatures we use in the code
|
|
"""
|
|
|
|
return {
|
|
"uint": "u",
|
|
"int": "i",
|
|
"float": "f",
|
|
"fd": "h",
|
|
"new_id": "n",
|
|
"object": "o",
|
|
"string": "s",
|
|
}.get(proto)
|
|
|
|
|
|
@attr.s
|
|
class Target:
|
|
"""
|
|
Defines the target struct for the base "ei" interface.
|
|
|
|
In libei we have a `struct ei` but in libeis the equivalent
|
|
level is `struct eis_client`. This target type maps those two.
|
|
"""
|
|
|
|
name: str = attr.ib()
|
|
context: str = attr.ib()
|
|
|
|
@property
|
|
def ctype(self) -> str:
|
|
return f"struct {self.name} *"
|
|
|
|
@property
|
|
def as_param(self) -> str:
|
|
return f"struct {self.name}* {self.name}"
|
|
|
|
@property
|
|
def as_arg(self) -> str:
|
|
return self.as_param
|
|
|
|
@classmethod
|
|
def create(cls, name: str, context: str) -> "Target":
|
|
return cls(name=name, context=context)
|
|
|
|
|
|
@attr.s
|
|
class Argument:
|
|
"""
|
|
Argument to a request or a reply
|
|
"""
|
|
|
|
name: str = attr.ib()
|
|
signature: str = attr.ib(converter=proto_to_type)
|
|
summary: str = attr.ib()
|
|
|
|
@property
|
|
def as_arg(self) -> str:
|
|
return f"{self.ctype} {self.name}"
|
|
|
|
@property
|
|
def ctype(self) -> str:
|
|
return {
|
|
"u": "uint32_t",
|
|
"i": "int32_t",
|
|
"s": "const char *",
|
|
"h": "int",
|
|
"f": "float",
|
|
"o": "object_id_t",
|
|
"n": "new_id_t",
|
|
}[self.signature]
|
|
|
|
@property
|
|
def argtype(self) -> str:
|
|
return {
|
|
"u": "u32",
|
|
"i": "i32",
|
|
"s": "str",
|
|
"h": "fd",
|
|
"f": "f32",
|
|
"o": "obj",
|
|
"n": "obj",
|
|
}[self.signature]
|
|
|
|
@signature.validator # type: ignore
|
|
def _validate_signature(self, attribute, value):
|
|
types = "iufhnos"
|
|
assert value in types, f"Failed to parse signature {value}"
|
|
|
|
@classmethod
|
|
def create(cls, name: str, signature: str, summary: str = "") -> "Argument":
|
|
# FIXME: enum value checks
|
|
return cls(name, signature, summary)
|
|
|
|
|
|
@attr.s
|
|
class Message:
|
|
"""
|
|
Parent class for the wire message (Request or Event).
|
|
"""
|
|
|
|
name: str = attr.ib()
|
|
since: int = attr.ib()
|
|
opcode: int = attr.ib()
|
|
interface: "Interface" = attr.ib()
|
|
|
|
arguments: list[Argument] = attr.ib(init=False, factory=list)
|
|
|
|
def add_argument(self, arg: Argument) -> None:
|
|
self.arguments.append(arg)
|
|
|
|
@property
|
|
def num_arguments(self) -> int:
|
|
return len(self.arguments)
|
|
|
|
@property
|
|
def signature(self) -> str:
|
|
return "".join([a.signature for a in self.arguments])
|
|
|
|
|
|
@attr.s
|
|
class Request(Message):
|
|
@classmethod
|
|
def create(
|
|
cls, name: str, opcode: int, interface: "Interface", since: int = 1
|
|
) -> "Request":
|
|
return cls(name=name, opcode=opcode, since=since, interface=interface)
|
|
|
|
@property
|
|
def fqdn(self):
|
|
return f"{self.interface.name}_request_{self.name}"
|
|
|
|
|
|
@attr.s
|
|
class Event(Message):
|
|
@classmethod
|
|
def create(
|
|
cls, name: str, opcode: int, interface: "Interface", since: int = 1
|
|
) -> "Event":
|
|
return cls(name=name, opcode=opcode, since=since, interface=interface)
|
|
|
|
@property
|
|
def fqdn(self):
|
|
return f"{self.interface.name}_event_{self.name}"
|
|
|
|
|
|
@attr.s
|
|
class Entry:
|
|
"""
|
|
An enum entry
|
|
"""
|
|
|
|
name: str = attr.ib()
|
|
value: int = attr.ib()
|
|
summary: str = attr.ib()
|
|
|
|
@classmethod
|
|
def create(cls, name: str, value: int, summary: str = "") -> "Entry":
|
|
return cls(name=name, value=value, summary=summary)
|
|
|
|
|
|
@attr.s
|
|
class Enum:
|
|
name: str = attr.ib()
|
|
since: int = attr.ib()
|
|
interface: "Interface" = attr.ib()
|
|
|
|
entries: list[Entry] = attr.ib(init=False, factory=list)
|
|
|
|
@classmethod
|
|
def create(cls, name: str, interface: "Interface", since: int = 1) -> "Enum":
|
|
return cls(name=name, since=since, interface=interface)
|
|
|
|
def add_entry(self, entry: Entry) -> None:
|
|
self.entries.append(entry)
|
|
|
|
@property
|
|
def fqdn(self):
|
|
return f"{self.interface.name}_{self.name}"
|
|
|
|
|
|
@attr.s
|
|
class Interface:
|
|
name: str = attr.ib()
|
|
version: int = attr.ib()
|
|
|
|
requests: list[Request] = attr.ib(init=False, factory=list)
|
|
events: list[Event] = attr.ib(init=False, factory=list)
|
|
enums: list[Enum] = attr.ib(init=False, factory=list)
|
|
|
|
mode: str = attr.ib() # "ei" or "eis"
|
|
|
|
def add_request(self, request: Request) -> None:
|
|
self.requests.append(request)
|
|
|
|
def add_event(self, event: Event) -> None:
|
|
self.events.append(event)
|
|
|
|
def add_enum(self, enum: Enum) -> None:
|
|
self.enums.append(enum)
|
|
|
|
@property
|
|
def outgoing(self) -> list[Message]:
|
|
"""
|
|
Returns the list of messages outgoing from this implementation.
|
|
|
|
We use the same class for both ei and eis. To make the
|
|
template simpler, the class maps requests/events to
|
|
incoming/outgoing as correct relative to the implementation.
|
|
"""
|
|
if self.mode == "ei":
|
|
return self.requests # type: ignore
|
|
else:
|
|
return self.events # type: ignore
|
|
|
|
@property
|
|
def incoming(self) -> list[Message]:
|
|
"""
|
|
Returns the list of messages incoming to this implementation.
|
|
|
|
We use the same class for both ei and eis. To make the
|
|
template simpler, the class maps requests/events to
|
|
incoming/outgoing as correct relative to the implementation.
|
|
"""
|
|
if self.mode == "ei":
|
|
return self.events # type: ignore
|
|
else:
|
|
return self.requests # type: ignore
|
|
|
|
@classmethod
|
|
def create(cls, name: str, version: int, mode: str = "ei") -> "Interface":
|
|
assert mode in ["ei", "eis"]
|
|
return cls(name=name, version=version, mode=mode)
|
|
|
|
|
|
@attr.s
|
|
class Protocol(xml.sax.handler.ContentHandler):
|
|
component: str = attr.ib()
|
|
interfaces: list[Interface] = attr.ib(factory=list)
|
|
|
|
current_interface: Optional[Interface] = attr.ib(init=False, default=None)
|
|
current_message: Optional[Union[Message, Enum]] = attr.ib(init=False, default=None)
|
|
|
|
def startElement(self, element: str, attrs: dict):
|
|
if element == "interface":
|
|
assert self.current_interface is None
|
|
name = attrs["name"]
|
|
if name.startswith("ei"):
|
|
name = f"{self.component}{name[2:]}"
|
|
version = attrs["version"]
|
|
intf = Interface.create(name=name, version=version, mode=self.component)
|
|
self.current_interface = intf
|
|
self.interfaces.append(intf)
|
|
elif element == "request":
|
|
assert self.current_interface is not None
|
|
assert self.current_message is None
|
|
name = attrs["name"]
|
|
since = attrs.get("since", 1)
|
|
opcode = len(self.current_interface.requests)
|
|
request = Request.create(
|
|
name=name, since=since, opcode=opcode, interface=self.current_interface
|
|
)
|
|
self.current_interface.add_request(request)
|
|
self.current_message = request
|
|
elif element == "event":
|
|
assert self.current_interface is not None
|
|
assert self.current_message is None
|
|
|
|
name = attrs["name"]
|
|
since = attrs.get("since", 1)
|
|
opcode = len(self.current_interface.events)
|
|
event = Event.create(
|
|
name=name, since=since, opcode=opcode, interface=self.current_interface
|
|
)
|
|
self.current_interface.add_event(event)
|
|
self.current_message = event
|
|
elif element == "enum":
|
|
assert self.current_interface is not None
|
|
assert self.current_message is None
|
|
name = attrs["name"]
|
|
since = attrs.get("since", 1)
|
|
enum = Enum.create(name=name, since=since, interface=self.current_interface)
|
|
self.current_interface.add_enum(enum)
|
|
self.current_message = enum
|
|
elif element == "arg":
|
|
assert self.current_interface is not None
|
|
assert isinstance(self.current_message, Message)
|
|
name = attrs["name"]
|
|
sig = attrs["type"]
|
|
summary = attrs.get("summary", "")
|
|
arg = Argument.create(name=name, signature=sig, summary=summary)
|
|
self.current_message.add_argument(arg)
|
|
elif element == "entry":
|
|
assert self.current_interface is not None
|
|
assert isinstance(self.current_message, Enum)
|
|
name = attrs["name"]
|
|
value = attrs["value"]
|
|
summary = attrs.get("summary", "")
|
|
entry = Entry.create(name=name, value=value, summary=summary)
|
|
self.current_message.add_entry(entry)
|
|
|
|
def endElement(self, name):
|
|
if name == "interface":
|
|
assert self.current_interface is not None
|
|
self.current_interface = None
|
|
elif name == "request":
|
|
assert isinstance(self.current_message, Request)
|
|
self.current_message = None
|
|
elif name == "event":
|
|
assert isinstance(self.current_message, Event)
|
|
self.current_message = None
|
|
elif name == "enum":
|
|
assert isinstance(self.current_message, Enum)
|
|
self.current_message = None
|
|
|
|
def characters(self, content):
|
|
pass
|
|
|
|
@classmethod
|
|
def create(cls, component: str) -> "Protocol":
|
|
h = cls(component=component)
|
|
return h
|
|
|
|
|
|
def parse(protofile: Path, component: str) -> Protocol:
|
|
proto = Protocol.create(component=component)
|
|
xml.sax.parse(os.fspath(protofile), proto)
|
|
return proto
|
|
|
|
|
|
def generate_source(
|
|
proto: Protocol, headerfile: Optional[str], template: Path, component: str
|
|
) -> jinja2.environment.TemplateStream:
|
|
assert component in ["ei", "eis"]
|
|
target = {
|
|
"ei": Target.create("ei", context="context"),
|
|
"eis": Target.create("eis_client", context="client"),
|
|
}[component]
|
|
|
|
data = {}
|
|
data["target"] = target
|
|
data["interfaces"] = proto.interfaces
|
|
if headerfile:
|
|
data["headerfile"] = headerfile
|
|
|
|
env = jinja2.Environment(
|
|
loader=jinja2.FileSystemLoader(os.fspath(template.parent)),
|
|
trim_blocks=True,
|
|
lstrip_blocks=True,
|
|
)
|
|
jtemplate = env.get_template(template.name)
|
|
return jtemplate.stream(data)
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--component", type=str, choices=["ei", "eis"], default="ei")
|
|
parser.add_argument(
|
|
"--output", type=str, default="-", help="Output file to write to"
|
|
)
|
|
parser.add_argument("protocol", type=Path, help="The protocol XML file")
|
|
parser.add_argument("template", type=Path, help="The template file")
|
|
|
|
ns = parser.parse_args()
|
|
assert ns.template.exists()
|
|
assert ns.protocol.exists()
|
|
|
|
try:
|
|
proto = parse(
|
|
protofile=ns.protocol,
|
|
component=ns.component,
|
|
)
|
|
except xml.sax._exceptions.SAXParseException as e:
|
|
print(f"Parser error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
headerfile = f"{Path(ns.output).stem}.h" if ns.output != "-" else None
|
|
|
|
stream = generate_source(
|
|
proto=proto, headerfile=headerfile, template=ns.template, component=ns.component
|
|
)
|
|
|
|
file = sys.stdout if ns.output == "-" else open(ns.output, "w")
|
|
stream.dump(file)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|