libei/proto/scanner.py
Peter Hutterer d0e6c251b6 protocol: rename the ei core interface to ei_connection
In the protocol this is a simple rename but in the implementation we can
now separate the protocol object out from the ei/ei-client context
itself by having the ei_connection objects.
2023-03-03 11:20:42 +10:00

413 lines
12 KiB
Python
Executable file

#!/usr/bin/env python3
#
# SPDX-License-Identifier: MIT
from typing import Optional, Union
from pathlib import Path
import argparse
import attr
import jinja2
import jinja2.environment
import os
import sys
import xml.sax
import xml.sax.handler
import xml.sax._exceptions
def proto_to_type(proto: str) -> Optional[str]:
"""
Conversion of protocol types to the signatures we use in the code
"""
return {
"uint": "u",
"int": "i",
"float": "f",
"fd": "h",
"new_id": "n",
"object": "o",
"string": "s",
}.get(proto)
@attr.s
class Target:
"""
Defines the target struct for the base "ei" interface.
In libei we have a `struct ei` but in libeis the equivalent
level is `struct eis_client`. This target type maps those two.
"""
name: str = attr.ib()
context: str = attr.ib()
@property
def ctype(self) -> str:
return f"struct {self.name} *"
@property
def as_param(self) -> str:
return f"struct {self.name}* {self.name}"
@property
def as_arg(self) -> str:
return self.as_param
@classmethod
def create(cls, name: str, context: str) -> "Target":
return cls(name=name, context=context)
@attr.s
class Argument:
"""
Argument to a request or a reply
"""
name: str = attr.ib()
signature: str = attr.ib(converter=proto_to_type)
summary: str = attr.ib()
@property
def as_arg(self) -> str:
return f"{self.ctype} {self.name}"
@property
def ctype(self) -> str:
return {
"u": "uint32_t",
"i": "int32_t",
"s": "const char *",
"h": "int",
"f": "float",
"o": "object_id_t",
"n": "new_id_t",
}[self.signature]
@property
def argtype(self) -> str:
return {
"u": "u32",
"i": "i32",
"s": "str",
"h": "fd",
"f": "f32",
"o": "obj",
"n": "obj",
}[self.signature]
@signature.validator # type: ignore
def _validate_signature(self, attribute, value):
types = "iufhnos"
assert value in types, f"Failed to parse signature {value}"
@classmethod
def create(cls, name: str, signature: str, summary: str = "") -> "Argument":
# FIXME: enum value checks
return cls(name, signature, summary)
@attr.s
class Message:
"""
Parent class for the wire message (Request or Event).
"""
name: str = attr.ib()
since: int = attr.ib()
opcode: int = attr.ib()
interface: "Interface" = attr.ib()
arguments: list[Argument] = attr.ib(init=False, factory=list)
def add_argument(self, arg: Argument) -> None:
self.arguments.append(arg)
@property
def num_arguments(self) -> int:
return len(self.arguments)
@property
def signature(self) -> str:
return "".join([a.signature for a in self.arguments])
@attr.s
class Request(Message):
@classmethod
def create(
cls, name: str, opcode: int, interface: "Interface", since: int = 1
) -> "Request":
return cls(name=name, opcode=opcode, since=since, interface=interface)
@property
def fqdn(self):
return f"{self.interface.name}_request_{self.name}"
@attr.s
class Event(Message):
@classmethod
def create(
cls, name: str, opcode: int, interface: "Interface", since: int = 1
) -> "Event":
return cls(name=name, opcode=opcode, since=since, interface=interface)
@property
def fqdn(self):
return f"{self.interface.name}_event_{self.name}"
@attr.s
class Entry:
"""
An enum entry
"""
name: str = attr.ib()
value: int = attr.ib()
summary: str = attr.ib()
@classmethod
def create(cls, name: str, value: int, summary: str = "") -> "Entry":
return cls(name=name, value=value, summary=summary)
@attr.s
class Enum:
name: str = attr.ib()
since: int = attr.ib()
interface: "Interface" = attr.ib()
entries: list[Entry] = attr.ib(init=False, factory=list)
@classmethod
def create(cls, name: str, interface: "Interface", since: int = 1) -> "Enum":
return cls(name=name, since=since, interface=interface)
def add_entry(self, entry: Entry) -> None:
self.entries.append(entry)
@property
def fqdn(self):
return f"{self.interface.name}_{self.name}"
@attr.s
class Interface:
name: str = attr.ib()
version: int = attr.ib()
requests: list[Request] = attr.ib(init=False, factory=list)
events: list[Event] = attr.ib(init=False, factory=list)
enums: list[Enum] = attr.ib(init=False, factory=list)
mode: str = attr.ib() # "ei" or "eis"
def add_request(self, request: Request) -> None:
self.requests.append(request)
def add_event(self, event: Event) -> None:
self.events.append(event)
def add_enum(self, enum: Enum) -> None:
self.enums.append(enum)
@property
def outgoing(self) -> list[Message]:
"""
Returns the list of messages outgoing from this implementation.
We use the same class for both ei and eis. To make the
template simpler, the class maps requests/events to
incoming/outgoing as correct relative to the implementation.
"""
if self.mode == "ei":
return self.requests # type: ignore
else:
return self.events # type: ignore
@property
def incoming(self) -> list[Message]:
"""
Returns the list of messages incoming to this implementation.
We use the same class for both ei and eis. To make the
template simpler, the class maps requests/events to
incoming/outgoing as correct relative to the implementation.
"""
if self.mode == "ei":
return self.events # type: ignore
else:
return self.requests # type: ignore
@property
def ctype(self) -> str:
return f"struct {self.name} *"
@property
def as_arg(self) -> str:
return f"{self.ctype} {self.name}"
@classmethod
def create(cls, name: str, version: int, mode: str = "ei") -> "Interface":
assert mode in ["ei", "eis"]
return cls(name=name, version=version, mode=mode)
@attr.s
class Protocol(xml.sax.handler.ContentHandler):
component: str = attr.ib()
interfaces: list[Interface] = attr.ib(factory=list)
current_interface: Optional[Interface] = attr.ib(init=False, default=None)
current_message: Optional[Union[Message, Enum]] = attr.ib(init=False, default=None)
def startElement(self, element: str, attrs: dict):
if element == "interface":
assert self.current_interface is None
name = attrs["name"]
if name.startswith("ei"):
name = f"{self.component}{name[2:]}"
version = attrs["version"]
intf = Interface.create(name=name, version=version, mode=self.component)
self.current_interface = intf
self.interfaces.append(intf)
elif element == "request":
assert self.current_interface is not None
assert self.current_message is None
name = attrs["name"]
since = attrs.get("since", 1)
opcode = len(self.current_interface.requests)
request = Request.create(
name=name, since=since, opcode=opcode, interface=self.current_interface
)
self.current_interface.add_request(request)
self.current_message = request
elif element == "event":
assert self.current_interface is not None
assert self.current_message is None
name = attrs["name"]
since = attrs.get("since", 1)
opcode = len(self.current_interface.events)
event = Event.create(
name=name, since=since, opcode=opcode, interface=self.current_interface
)
self.current_interface.add_event(event)
self.current_message = event
elif element == "enum":
assert self.current_interface is not None
assert self.current_message is None
name = attrs["name"]
since = attrs.get("since", 1)
enum = Enum.create(name=name, since=since, interface=self.current_interface)
self.current_interface.add_enum(enum)
self.current_message = enum
elif element == "arg":
assert self.current_interface is not None
assert isinstance(self.current_message, Message)
name = attrs["name"]
sig = attrs["type"]
summary = attrs.get("summary", "")
arg = Argument.create(name=name, signature=sig, summary=summary)
self.current_message.add_argument(arg)
elif element == "entry":
assert self.current_interface is not None
assert isinstance(self.current_message, Enum)
name = attrs["name"]
value = attrs["value"]
summary = attrs.get("summary", "")
entry = Entry.create(name=name, value=value, summary=summary)
self.current_message.add_entry(entry)
def endElement(self, name):
if name == "interface":
assert self.current_interface is not None
self.current_interface = None
elif name == "request":
assert isinstance(self.current_message, Request)
self.current_message = None
elif name == "event":
assert isinstance(self.current_message, Event)
self.current_message = None
elif name == "enum":
assert isinstance(self.current_message, Enum)
self.current_message = None
def characters(self, content):
pass
@classmethod
def create(cls, component: str) -> "Protocol":
h = cls(component=component)
return h
def parse(protofile: Path, component: str) -> Protocol:
proto = Protocol.create(component=component)
xml.sax.parse(os.fspath(protofile), proto)
return proto
def generate_source(
proto: Protocol, headerfile: Optional[str], template: Path, component: str
) -> jinja2.environment.TemplateStream:
assert component in ["ei", "eis"]
target = {
"ei": Target.create("ei", context="context"),
"eis": Target.create("eis_client", context="client"),
}[component]
data = {}
data["target"] = target
data["interfaces"] = proto.interfaces
if headerfile:
data["headerfile"] = headerfile
env = jinja2.Environment(
loader=jinja2.FileSystemLoader(os.fspath(template.parent)),
trim_blocks=True,
lstrip_blocks=True,
)
jtemplate = env.get_template(template.name)
return jtemplate.stream(data)
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--component", type=str, choices=["ei", "eis"], default="ei")
parser.add_argument(
"--output", type=str, default="-", help="Output file to write to"
)
parser.add_argument("protocol", type=Path, help="The protocol XML file")
parser.add_argument("template", type=Path, help="The template file")
ns = parser.parse_args()
assert ns.template.exists()
assert ns.protocol.exists()
try:
proto = parse(
protofile=ns.protocol,
component=ns.component,
)
except xml.sax._exceptions.SAXParseException as e:
print(f"Parser error: {e}", file=sys.stderr)
sys.exit(1)
headerfile = f"{Path(ns.output).stem}.h" if ns.output != "-" else None
stream = generate_source(
proto=proto, headerfile=headerfile, template=ns.template, component=ns.component
)
file = sys.stdout if ns.output == "-" else open(ns.output, "w")
stream.dump(file)
if __name__ == "__main__":
main()