libei/proto/scanner.py
Peter Hutterer a0b2cfffa9 scanner: switch to a two-run parsing of the XML file
On the first run we extract the interfaces only, on the second run all
the rest. This allows us to pass the interface to the Argument where
appropriate.
2023-03-03 11:21:26 +10:00

593 lines
18 KiB
Python
Executable file

#!/usr/bin/env python3
#
# SPDX-License-Identifier: MIT
from typing import Optional, Union, Tuple
from pathlib import Path
import argparse
import attr
import jinja2
import jinja2.environment
import os
import sys
import xml.sax
import xml.sax.handler
import xml.sax._exceptions
def proto_to_type(proto: str) -> Optional[str]:
"""
Conversion of protocol types to the signatures we use in the code
"""
return {
"uint": "u",
"int": "i",
"float": "f",
"fd": "h",
"new_id": "n",
"object": "o",
"string": "s",
}.get(proto)
@attr.s
class Target:
"""
Defines the target struct for the base "ei" interface.
In libei we have a `struct ei` but in libeis the equivalent
level is `struct eis_client`. This target type maps those two.
"""
name: str = attr.ib()
context: str = attr.ib()
@property
def ctype(self) -> str:
return f"struct {self.name} *"
@property
def as_param(self) -> str:
return f"struct {self.name}* {self.name}"
@property
def as_arg(self) -> str:
return self.as_param
@classmethod
def create(cls, name: str, context: str) -> "Target":
return cls(name=name, context=context)
@attr.s
class Argument:
"""
Argument to a request or a reply
"""
name: str = attr.ib()
signature: str = attr.ib(converter=proto_to_type)
summary: str = attr.ib()
enum: Optional["Enum"] = attr.ib()
interface: Optional["Interface"] = attr.ib()
@interface.validator # type: ignore
def _validate_interface(self, attribute, value):
if value is not None and self.signature not in ["n", "o"]:
raise ValueError("Interface may only be set for object types")
@property
def as_arg(self) -> str:
return f"{self.ctype} {self.name}"
@property
def ctype(self) -> str:
return {
"u": "uint32_t",
"i": "int32_t",
"s": "const char *",
"h": "int",
"f": "float",
"o": "object_id_t",
"n": "new_id_t",
}[self.signature]
@property
def argtype(self) -> str:
return {
"u": "u32",
"i": "i32",
"s": "str",
"h": "fd",
"f": "f32",
"o": "obj",
"n": "obj",
}[self.signature]
@signature.validator # type: ignore
def _validate_signature(self, attribute, value):
types = "iufhnos"
assert value in types, f"Failed to parse signature {value}"
@classmethod
def create(
cls,
name: str,
signature: str,
summary: str = "",
enum: Optional["Enum"] = None,
interface: Optional["Interface"] = None,
) -> "Argument":
return cls(name, signature, summary, enum, interface=interface)
@attr.s
class Message:
"""
Parent class for the wire message (Request or Event).
"""
name: str = attr.ib()
since: int = attr.ib()
opcode: int = attr.ib()
interface: "Interface" = attr.ib()
arguments: list[Argument] = attr.ib(init=False, factory=list)
def add_argument(self, arg: Argument) -> None:
self.arguments.append(arg)
@property
def num_arguments(self) -> int:
return len(self.arguments)
@property
def signature(self) -> str:
return "".join([a.signature for a in self.arguments])
@attr.s
class Request(Message):
@classmethod
def create(
cls, name: str, opcode: int, interface: "Interface", since: int = 1
) -> "Request":
return cls(name=name, opcode=opcode, since=since, interface=interface)
@property
def fqdn(self):
return f"{self.interface.name}_request_{self.name}"
@attr.s
class Event(Message):
@classmethod
def create(
cls, name: str, opcode: int, interface: "Interface", since: int = 1
) -> "Event":
return cls(name=name, opcode=opcode, since=since, interface=interface)
@property
def fqdn(self):
return f"{self.interface.name}_event_{self.name}"
@attr.s
class Entry:
"""
An enum entry
"""
name: str = attr.ib()
value: int = attr.ib()
summary: str = attr.ib()
@classmethod
def create(cls, name: str, value: int, summary: str = "") -> "Entry":
return cls(name=name, value=value, summary=summary)
@attr.s
class Enum:
name: str = attr.ib()
since: int = attr.ib()
interface: "Interface" = attr.ib()
is_bitmask: bool = attr.ib(default=False)
entries: list[Entry] = attr.ib(init=False, factory=list)
@classmethod
def create(
cls, name: str, interface: "Interface", since: int = 1, is_bitmask: bool = False
) -> "Enum":
return cls(name=name, since=since, interface=interface, is_bitmask=is_bitmask)
def add_entry(self, entry: Entry) -> None:
for e in self.entries:
if e.value == entry.value:
raise ValueError(f"Duplicate enum value {entry.value}")
if self.is_bitmask:
if e.value < 0:
raise ValueError("Bitmasks must not be less than zero")
if e.value.bit_count() > 1:
raise ValueError("Bitmasks must have exactly one bit set")
self.entries.append(entry)
@property
def fqdn(self):
return f"{self.interface.name}_{self.name}"
@attr.s
class Interface:
name: str = attr.ib()
version: int = attr.ib()
requests: list[Request] = attr.ib(init=False, factory=list)
events: list[Event] = attr.ib(init=False, factory=list)
enums: list[Enum] = attr.ib(init=False, factory=list)
mode: str = attr.ib() # "ei" or "eis"
def add_request(self, request: Request) -> None:
self.requests.append(request)
def add_event(self, event: Event) -> None:
self.events.append(event)
def add_enum(self, enum: Enum) -> None:
self.enums.append(enum)
@property
def outgoing(self) -> list[Message]:
"""
Returns the list of messages outgoing from this implementation.
We use the same class for both ei and eis. To make the
template simpler, the class maps requests/events to
incoming/outgoing as correct relative to the implementation.
"""
if self.mode == "ei":
return self.requests # type: ignore
else:
return self.events # type: ignore
@property
def incoming(self) -> list[Message]:
"""
Returns the list of messages incoming to this implementation.
We use the same class for both ei and eis. To make the
template simpler, the class maps requests/events to
incoming/outgoing as correct relative to the implementation.
"""
if self.mode == "ei":
return self.events # type: ignore
else:
return self.requests # type: ignore
@property
def ctype(self) -> str:
return f"struct {self.name} *"
@property
def as_arg(self) -> str:
return f"{self.ctype} {self.name}"
@classmethod
def create(cls, name: str, version: int, mode: str = "ei") -> "Interface":
assert mode in ["ei", "eis"]
return cls(name=name, version=version, mode=mode)
@attr.s
class XmlError(Exception):
line: int = attr.ib()
column: int = attr.ib()
message: str = attr.ib()
def __str__(self) -> str:
return f"line {self.line}:{self.column}: {self.message}"
@attr.s
class Protocol(xml.sax.handler.ContentHandler):
component: str = attr.ib()
interfaces: list[Interface] = attr.ib(factory=list)
current_interface: Optional[Interface] = attr.ib(init=False, default=None)
current_message: Optional[Union[Message, Enum]] = attr.ib(init=False, default=None)
run: int = attr.ib(init=False, default=0)
def xmlerror(self, msg) -> None:
line = self._locator.getLineNumber() # type: ignore
col = self._locator.getColumnNumber() # type: ignore
raise XmlError(msg, line, col)
@property
def location(self) -> Tuple[int, int]:
line = self._locator.getLineNumber() # type: ignore
col = self._locator.getColumnNumber() # type: ignore
return line, col
def interface_by_name(self, name) -> Interface:
try:
return [iface for iface in self.interfaces if iface.name == name].pop()
except IndexError:
raise XmlError(*self.location, f"Unable to find interface {name}")
def startDocument(self):
self.run += 1
def startElement(self, element: str, attrs: dict):
if element == "interface":
if self.current_interface is not None:
raise XmlError(
*self.location,
f"Invalid element '{element}' inside interface '{self.current_interface.name}'",
)
try:
name = attrs["name"]
version = attrs["version"]
except KeyError as e:
raise XmlError(
*self.location, f"Missing attribute {e} in element '{element}'"
)
if name.startswith("ei"):
name = f"{self.component}{name[2:]}"
# We only create the interface on the first run, in subsequent runs we
# re-use them so we can cross reference correctly
if self.run > 1:
intf = self.interface_by_name(name)
else:
intf = Interface.create(name=name, version=version, mode=self.component)
self.interfaces.append(intf)
self.current_interface = intf
# first run only parses interfaces
if self.run <= 1:
return
if element == "request":
if self.current_interface is None:
raise XmlError(
*self.location,
f"Invalid element '{element}' outside an <interface>",
)
try:
name = attrs["name"]
since = attrs["since"]
except KeyError as e:
raise XmlError(
*self.location, f"Missing attribute {e} in element '{element}'"
)
opcode = len(self.current_interface.requests)
request = Request.create(
name=name, since=since, opcode=opcode, interface=self.current_interface
)
self.current_interface.add_request(request)
self.current_message = request
elif element == "event":
if self.current_interface is None:
raise XmlError(
*self.location,
f"Invalid element '{element}' outside an <interface>",
)
if self.current_message is not None:
raise XmlError(
*self.location,
f"Invalid element '{element}' inside '{self.current_message.name}'",
)
try:
name = attrs["name"]
since = attrs["since"]
except KeyError as e:
raise XmlError(
*self.location, f"Missing attribute {e} in element '{element}'"
)
opcode = len(self.current_interface.events)
event = Event.create(
name=name, since=since, opcode=opcode, interface=self.current_interface
)
self.current_interface.add_event(event)
self.current_message = event
elif element == "enum":
if self.current_interface is None:
raise XmlError(
*self.location,
f"Invalid element '{element}' outside an <interface>",
)
if self.current_message is not None:
raise XmlError(
*self.location,
f"Invalid element '{element}' inside '{self.current_message.name}'",
)
try:
name = attrs["name"]
since = attrs["since"]
except KeyError as e:
raise XmlError(
*self.location, f"Missing attribute {e} in element '{element}'"
)
enum_type = attrs.get("type", None)
if enum_type is not None and enum_type not in ["bitmask"]:
raise XmlError(
*self.location,
f"Invalid enum type {enum_type} in element '{element}'",
)
is_bitmask = enum_type == "bitmask"
enum = Enum.create(
name=name,
since=since,
interface=self.current_interface,
is_bitmask=is_bitmask,
)
self.current_interface.add_enum(enum)
self.current_message = enum
elif element == "arg":
if self.current_interface is None:
raise XmlError(
*self.location,
f"Invalid element '{element}' outside an <interface>",
)
if not isinstance(self.current_message, Message):
raise XmlError(
*self.location,
f"Invalid element '{element}' must be inside <request> or <event>",
)
name = attrs["name"]
sig = attrs["type"]
summary = attrs.get("summary", "")
interface_name = attrs.get("interface", None)
if interface_name is not None:
if interface_name.startswith("ei"):
interface_name = f"{self.component}{interface_name[2:]}"
interface = self.interface_by_name(interface_name)
else:
interface = None
enum = attrs.get("enum", None)
if enum is not None and enum not in [
e.name for e in self.current_interface.enums
]:
raise XmlError(
*self.location,
f"Failed to find enum '{self.current_interface.name}.{enum}'",
)
arg = Argument.create(
name=name,
signature=sig,
summary=summary,
enum=enum,
interface=interface,
)
self.current_message.add_argument(arg)
elif element == "entry":
if self.current_interface is None:
raise XmlError(
*self.location,
f"Invalid element '{element}' outside an <interface>",
)
if not isinstance(self.current_message, Enum):
raise XmlError(
*self.location, f"Invalid element '{element}' must be inside <enum>"
)
name = attrs["name"]
value = int(attrs["value"])
summary = attrs.get("summary", "")
entry = Entry.create(name=name, value=value, summary=summary)
try:
self.current_message.add_entry(entry)
except ValueError as e:
raise XmlError(*self.location, str(e))
def endElement(self, name):
if name == "interface":
assert self.current_interface is not None
self.current_interface = None
# first run only parses interfaces
if self.run <= 1:
return
if name == "request":
assert isinstance(self.current_message, Request)
self.current_message = None
elif name == "event":
assert isinstance(self.current_message, Event)
self.current_message = None
elif name == "enum":
assert isinstance(self.current_message, Enum)
self.current_message = None
def characters(self, content):
pass
@classmethod
def create(cls, component: str) -> "Protocol":
h = cls(component=component)
return h
def parse(protofile: Path, component: str) -> Protocol:
proto = Protocol.create(component=component)
xml.sax.parse(os.fspath(protofile), proto)
# We parse two times, once to fetch all the interfaces, then to parse the details
xml.sax.parse(os.fspath(protofile), proto)
return proto
def generate_source(
proto: Protocol, headerfile: Optional[str], template: Path, component: str
) -> jinja2.environment.TemplateStream:
assert component in ["ei", "eis"]
target = {
"ei": Target.create("ei", context="context"),
"eis": Target.create("eis_client", context="client"),
}[component]
data = {}
data["target"] = target
data["interfaces"] = proto.interfaces
if headerfile:
data["headerfile"] = headerfile
env = jinja2.Environment(
loader=jinja2.FileSystemLoader(os.fspath(template.parent)),
trim_blocks=True,
lstrip_blocks=True,
)
jtemplate = env.get_template(template.name)
return jtemplate.stream(data)
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--component", type=str, choices=["ei", "eis"], default="ei")
parser.add_argument(
"--output", type=str, default="-", help="Output file to write to"
)
parser.add_argument("protocol", type=Path, help="The protocol XML file")
parser.add_argument("template", type=Path, help="The template file")
ns = parser.parse_args()
assert ns.template.exists()
assert ns.protocol.exists()
try:
proto = parse(
protofile=ns.protocol,
component=ns.component,
)
except xml.sax._exceptions.SAXParseException as e:
print(f"Parser error: {e}", file=sys.stderr)
sys.exit(1)
except XmlError as e:
print(f"Protocol XML error: {e}", file=sys.stderr)
sys.exit(1)
headerfile = f"{Path(ns.output).stem}.h" if ns.output != "-" else None
stream = generate_source(
proto=proto, headerfile=headerfile, template=ns.template, component=ns.component
)
file = sys.stdout if ns.output == "-" else open(ns.output, "w")
stream.dump(file)
if __name__ == "__main__":
main()