libei/proto/ei-scanner
Peter Hutterer 579f0d07d2 Add a brei-proto.h generated header file
This will be needed to have access to the various disconnect reasons in
brei-shared.c.
2023-03-03 11:21:26 +10:00

680 lines
21 KiB
Python
Executable file

#!/usr/bin/env python3
#
# SPDX-License-Identifier: MIT
from typing import Optional, Union, Tuple
from pathlib import Path
import argparse
import attr
import jinja2
import jinja2.environment
import os
import sys
import xml.sax
import xml.sax.handler
import xml.sax._exceptions
def snake2camel(s: str) -> str:
return s.replace("_", " ").title().replace(" ", "")
def proto_to_type(proto: str) -> Optional[str]:
"""
Conversion of protocol types to the signatures we use in the code
"""
return {
"uint": "u",
"int": "i",
"float": "f",
"fd": "h",
"new_id": "n",
"object": "o",
"string": "s",
}.get(proto)
@attr.s
class Argument:
"""
Argument to a request or a reply
"""
name: str = attr.ib()
signature: str = attr.ib(converter=proto_to_type)
summary: str = attr.ib()
enum: Optional["Enum"] = attr.ib()
interface: Optional["Interface"] = attr.ib()
@interface.validator # type: ignore
def _validate_interface(self, attribute, value):
if value is not None and self.signature not in ["n", "o"]:
raise ValueError("Interface may only be set for object types")
@property
def as_arg(self) -> str:
return f"{self.ctype} {self.name}"
@property
def ctype(self) -> str:
return {
"u": "uint32_t",
"i": "int32_t",
"s": "const char *",
"h": "int",
"f": "float",
"o": "object_id_t",
"n": "new_id_t",
}[self.signature]
@signature.validator # type: ignore
def _validate_signature(self, attribute, value):
assert (
value is not None and value in "iufhnos"
), f"Failed to parse signature {value}"
@classmethod
def create(
cls,
name: str,
signature: str,
summary: str = "",
enum: Optional["Enum"] = None,
interface: Optional["Interface"] = None,
) -> "Argument":
return cls(
name=name,
signature=signature,
summary=summary,
enum=enum,
interface=interface,
)
@attr.s
class Message:
"""
Parent class for the wire message (Request or Event).
"""
name: str = attr.ib()
since: int = attr.ib()
opcode: int = attr.ib()
interface: "Interface" = attr.ib()
is_destructor: bool = attr.ib(default=False)
arguments: list[Argument] = attr.ib(init=False, factory=list)
def add_argument(self, arg: Argument) -> None:
if arg.name in [a.name for a in self.arguments]:
raise ValueError(f"Duplicate argument name '{arg.name}'")
self.arguments.append(arg)
@property
def num_arguments(self) -> int:
return len(self.arguments)
@property
def signature(self) -> str:
return "".join([a.signature for a in self.arguments])
@property
def camel_name(self) -> str:
return snake2camel(self.name)
@attr.s
class Request(Message):
@classmethod
def create(
cls,
name: str,
opcode: int,
interface: "Interface",
since: int = 1,
is_destructor: bool = False,
) -> "Request":
return cls(
name=name,
opcode=opcode,
since=since,
interface=interface,
is_destructor=is_destructor,
)
@property
def fqdn(self):
return f"{self.interface.name}_request_{self.name}"
@attr.s
class Event(Message):
@classmethod
def create(
cls,
name: str,
opcode: int,
interface: "Interface",
since: int = 1,
is_destructor: bool = False,
) -> "Event":
return cls(
name=name,
opcode=opcode,
since=since,
interface=interface,
is_destructor=is_destructor,
)
@property
def fqdn(self):
return f"{self.interface.name}_event_{self.name}"
@attr.s
class Entry:
"""
An enum entry
"""
name: str = attr.ib()
value: int = attr.ib()
summary: str = attr.ib()
since: int = attr.ib()
@classmethod
def create(
cls, name: str, value: int, summary: str = "", since: int = 1
) -> "Entry":
return cls(name=name, value=value, summary=summary, since=since)
@attr.s
class Enum:
name: str = attr.ib()
since: int = attr.ib()
interface: "Interface" = attr.ib()
is_bitmask: bool = attr.ib(default=False)
entries: list[Entry] = attr.ib(init=False, factory=list)
@classmethod
def create(
cls, name: str, interface: "Interface", since: int = 1, is_bitmask: bool = False
) -> "Enum":
return cls(name=name, since=since, interface=interface, is_bitmask=is_bitmask)
def add_entry(self, entry: Entry) -> None:
for e in self.entries:
if e.name == entry.name:
raise ValueError(f"Duplicate enum name '{entry.name}'")
if e.value == entry.value:
raise ValueError(f"Duplicate enum value '{entry.value}'")
if self.is_bitmask:
if e.value < 0:
raise ValueError("Bitmasks must not be less than zero")
if e.value.bit_count() > 1:
raise ValueError("Bitmasks must have exactly one bit set")
self.entries.append(entry)
@property
def fqdn(self):
return f"{self.interface.name}_{self.name}"
@property
def camel_name(self) -> str:
return snake2camel(self.name)
@attr.s
class Interface:
name: str = attr.ib()
version: int = attr.ib()
requests: list[Request] = attr.ib(init=False, factory=list)
events: list[Event] = attr.ib(init=False, factory=list)
enums: list[Enum] = attr.ib(init=False, factory=list)
mode: str = attr.ib() # "ei" or "eis"
def add_request(self, request: Request) -> None:
if request.name in [r.name for r in self.requests]:
raise ValueError(f"Duplicate request name '{request.name}'")
self.requests.append(request)
def add_event(self, event: Event) -> None:
if event.name in [r.name for r in self.events]:
raise ValueError(f"Duplicate event name '{event.name}'")
self.events.append(event)
def add_enum(self, enum: Enum) -> None:
if enum.name in [r.name for r in self.enums]:
raise ValueError(f"Duplicate enum name '{enum.name}'")
self.enums.append(enum)
@property
def outgoing(self) -> list[Message]:
"""
Returns the list of messages outgoing from this implementation.
We use the same class for both ei and eis. To make the
template simpler, the class maps requests/events to
incoming/outgoing as correct relative to the implementation.
"""
if self.mode == "ei":
return self.requests # type: ignore
else:
return self.events # type: ignore
@property
def incoming(self) -> list[Message]:
"""
Returns the list of messages incoming to this implementation.
We use the same class for both ei and eis. To make the
template simpler, the class maps requests/events to
incoming/outgoing as correct relative to the implementation.
"""
if self.mode == "ei":
return self.events # type: ignore
else:
return self.requests # type: ignore
@property
def ctype(self) -> str:
return f"struct {self.name} *"
@property
def as_arg(self) -> str:
return f"{self.ctype} {self.name}"
@property
def camel_name(self) -> str:
return snake2camel(self.name)
@classmethod
def create(cls, name: str, version: int, mode: str = "ei") -> "Interface":
assert mode in ["ei", "eis", "brei"]
return cls(name=name, version=version, mode=mode)
@attr.s
class XmlError(Exception):
line: int = attr.ib()
column: int = attr.ib()
message: str = attr.ib()
def __str__(self) -> str:
return f"line {self.line}:{self.column}: {self.message}"
@attr.s
class Protocol(xml.sax.handler.ContentHandler):
component: str = attr.ib()
interfaces: list[Interface] = attr.ib(factory=list)
current_interface: Optional[Interface] = attr.ib(init=False, default=None)
current_message: Optional[Union[Message, Enum]] = attr.ib(init=False, default=None)
run: int = attr.ib(init=False, default=0)
def xmlerror(self, msg) -> None:
line = self._locator.getLineNumber() # type: ignore
col = self._locator.getColumnNumber() # type: ignore
raise XmlError(msg, line, col)
@property
def location(self) -> Tuple[int, int]:
line = self._locator.getLineNumber() # type: ignore
col = self._locator.getColumnNumber() # type: ignore
return line, col
def interface_by_name(self, name) -> Interface:
try:
return [iface for iface in self.interfaces if iface.name == name].pop()
except IndexError:
raise XmlError(*self.location, f"Unable to find interface {name}")
def startDocument(self):
self.run += 1
def startElement(self, element: str, attrs: dict):
if element == "interface":
if self.current_interface is not None:
raise XmlError(
*self.location,
f"Invalid element '{element}' inside interface '{self.current_interface.name}'",
)
try:
name = attrs["name"]
version = attrs["version"]
except KeyError as e:
raise XmlError(
*self.location, f"Missing attribute {e} in element '{element}'"
)
if name.startswith("ei"):
name = f"{self.component}{name[2:]}"
# We only create the interface on the first run, in subsequent runs we
# re-use them so we can cross reference correctly
if self.run > 1:
intf = self.interface_by_name(name)
else:
intf = Interface.create(name=name, version=version, mode=self.component)
self.interfaces.append(intf)
self.current_interface = intf
# first run only parses interfaces
if self.run <= 1:
return
if element == "request":
if self.current_interface is None:
raise XmlError(
*self.location,
f"Invalid element '{element}' outside an <interface>",
)
try:
name = attrs["name"]
since = attrs["since"]
except KeyError as e:
raise XmlError(
*self.location, f"Missing attribute {e} in element '{element}'"
)
is_destructor = attrs.get("type", "") == "destructor"
opcode = len(self.current_interface.requests)
request = Request.create(
name=name,
since=since,
opcode=opcode,
interface=self.current_interface,
is_destructor=is_destructor,
)
try:
self.current_interface.add_request(request)
except ValueError as e:
raise XmlError(*self.location, str(e))
self.current_message = request
elif element == "event":
if self.current_interface is None:
raise XmlError(
*self.location,
f"Invalid element '{element}' outside an <interface>",
)
if self.current_message is not None:
raise XmlError(
*self.location,
f"Invalid element '{element}' inside '{self.current_message.name}'",
)
try:
name = attrs["name"]
since = attrs["since"]
except KeyError as e:
raise XmlError(
*self.location, f"Missing attribute {e} in element '{element}'"
)
is_destructor = attrs.get("type", "") == "destructor"
opcode = len(self.current_interface.events)
event = Event.create(
name=name,
since=since,
opcode=opcode,
interface=self.current_interface,
is_destructor=is_destructor,
)
try:
self.current_interface.add_event(event)
except ValueError as e:
raise XmlError(*self.location, str(e))
self.current_message = event
elif element == "enum":
if self.current_interface is None:
raise XmlError(
*self.location,
f"Invalid element '{element}' outside an <interface>",
)
if self.current_message is not None:
raise XmlError(
*self.location,
f"Invalid element '{element}' inside '{self.current_message.name}'",
)
try:
name = attrs["name"]
since = attrs["since"]
except KeyError as e:
raise XmlError(
*self.location, f"Missing attribute {e} in element '{element}'"
)
enum_type = attrs.get("type", None)
if enum_type is not None and enum_type not in ["bitmask"]:
raise XmlError(
*self.location,
f"Invalid enum type {enum_type} in element '{element}'",
)
is_bitmask = enum_type == "bitmask"
enum = Enum.create(
name=name,
since=since,
interface=self.current_interface,
is_bitmask=is_bitmask,
)
try:
self.current_interface.add_enum(enum)
except ValueError as e:
raise XmlError(*self.location, str(e))
self.current_message = enum
elif element == "arg":
if self.current_interface is None:
raise XmlError(
*self.location,
f"Invalid element '{element}' outside an <interface>",
)
if not isinstance(self.current_message, Message):
raise XmlError(
*self.location,
f"Invalid element '{element}' must be inside <request> or <event>",
)
name = attrs["name"]
sig = attrs["type"]
if proto_to_type(sig) is None:
raise XmlError(
*self.location,
f"Invalid type '{sig}' for '{self.current_interface.name}.{self.current_message.name}::{name}'",
)
summary = attrs.get("summary", "")
interface_name = attrs.get("interface", None)
if interface_name is not None:
if interface_name.startswith("ei"):
interface_name = f"{self.component}{interface_name[2:]}"
interface = self.interface_by_name(interface_name)
else:
interface = None
enum = attrs.get("enum", None)
if enum is not None and enum not in [
e.name for e in self.current_interface.enums
]:
raise XmlError(
*self.location,
f"Failed to find enum '{self.current_interface.name}.{enum}'",
)
arg = Argument.create(
name=name,
signature=sig,
summary=summary,
enum=enum,
interface=interface,
)
self.current_message.add_argument(arg)
elif element == "entry":
if self.current_interface is None:
raise XmlError(
*self.location,
f"Invalid element '{element}' outside an <interface>",
)
if not isinstance(self.current_message, Enum):
raise XmlError(
*self.location, f"Invalid element '{element}' must be inside <enum>"
)
name = attrs["name"]
value = int(attrs["value"])
summary = attrs.get("summary", "")
since = int(attrs.get("since", 1))
entry = Entry.create(name=name, value=value, summary=summary, since=since)
try:
self.current_message.add_entry(entry)
except ValueError as e:
raise XmlError(*self.location, str(e))
def endElement(self, name):
if name == "interface":
assert self.current_interface is not None
self.current_interface = None
# first run only parses interfaces
if self.run <= 1:
return
if name == "request":
assert isinstance(self.current_message, Request)
self.current_message = None
elif name == "event":
assert isinstance(self.current_message, Event)
self.current_message = None
elif name == "enum":
assert isinstance(self.current_message, Enum)
self.current_message = None
def characters(self, content):
pass
@classmethod
def create(cls, component: str) -> "Protocol":
h = cls(component=component)
return h
def parse(protofile: Path, component: str) -> Protocol:
proto = Protocol.create(component=component)
xml.sax.parse(os.fspath(protofile), proto)
# We parse two times, once to fetch all the interfaces, then to parse the details
xml.sax.parse(os.fspath(protofile), proto)
return proto
def generate_source(
proto: Protocol, headerfile: Optional[str], template: Path, component: str
) -> jinja2.environment.TemplateStream:
assert component in ["ei", "eis", "brei"]
data = {}
data["component"] = component
data["interfaces"] = proto.interfaces
if headerfile:
data["headerfile"] = headerfile
env = jinja2.Environment(
loader=jinja2.FileSystemLoader(os.fspath(template.parent)),
trim_blocks=True,
lstrip_blocks=True,
)
# jinja filter to convert foo into "struct foo *"
def filter_ctype(name):
return f"struct {name} *"
# jinja filter to convert foo into "struct foo *foo"
def filter_as_arg(name):
return f"struct {name} * {name}"
env.filters["ctype"] = filter_ctype
env.filters["as_arg"] = filter_as_arg
env.filters["camel"] = snake2camel
jtemplate = env.get_template(template.name)
return jtemplate.stream(data)
def main() -> None:
import textwrap
parser = argparse.ArgumentParser(description=textwrap.dedent("""
ei-scanner is a tool to parse the EI protocol description XML and
pass the data to a Jinja2 template. That template can then be
used to generate protocol bindings for the desired language.
typical usages:
ei-scanner --component=ei protocol.xml my-template.tpl
ei-scanner --component=eis --output=bindings.rs protocol.xml bindings.rs.tpl
Elements in the XML file are provided as variables with attributes
generally matching the XML file. For example, each interface has requests,
events and enums, and each of those has a name.
ei-scanner additionally provides the following values to the Jinja2 templates:
- interface.incoming and interface.outgoing: maps to the requests/events of
the interface, depending on the component.
- argument.signature: a single-character signature type mapping
from the protocol XML type:
uint -> "u"
int -> "i"
float -> "f"
fd -> "h"
new_id -> "n"
object -> "o"
string -> "s"
ei-scanner adds the following Jinja2 filters for convenience:
{{foo|ctype}} ... resolves to "struct foo *"
{{foo|as_arg}} ... resolves to "struct foo *foo"
{{foo_bar|camel}} ... resolves to "FooBar"
"""),
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument("--component", type=str, choices=["ei", "eis", "brei"], default="ei")
parser.add_argument(
"--output", type=str, default="-", help="Output file to write to"
)
parser.add_argument("protocol", type=Path, help="The protocol XML file")
parser.add_argument("template", type=Path, help="The Jinja2 compatible template file")
ns = parser.parse_args()
assert ns.template.exists()
assert ns.protocol.exists()
try:
proto = parse(
protofile=ns.protocol,
component=ns.component,
)
except xml.sax._exceptions.SAXParseException as e:
print(f"Parser error: {e}", file=sys.stderr)
sys.exit(1)
except XmlError as e:
print(f"Protocol XML error: {e}", file=sys.stderr)
sys.exit(1)
headerfile = f"{Path(ns.output).stem}.h" if ns.output != "-" else None
stream = generate_source(
proto=proto, headerfile=headerfile, template=ns.template, component=ns.component
)
file = sys.stdout if ns.output == "-" else open(ns.output, "w")
stream.dump(file)
if __name__ == "__main__":
main()