#!/usr/bin/env python3 # # SPDX-License-Identifier: MIT from typing import Optional, Union, Tuple from pathlib import Path import argparse import attr import jinja2 import jinja2.environment import os import sys import xml.sax import xml.sax.handler import xml.sax._exceptions def snake2camel(s: str) -> str: return s.replace("_", " ").title().replace(" ", "") def proto_to_type(proto: str) -> Optional[str]: """ Conversion of protocol types to the signatures we use in the code """ return { "uint": "u", "int": "i", "float": "f", "fd": "h", "new_id": "n", "object": "o", "string": "s", }.get(proto) @attr.s class Target: """ Defines the target struct for the base "ei" interface. In libei we have a `struct ei` but in libeis the equivalent level is `struct eis_client`. This target type maps those two. """ name: str = attr.ib() context: str = attr.ib() @property def ctype(self) -> str: return f"struct {self.name} *" @property def as_param(self) -> str: return f"struct {self.name}* {self.name}" @property def as_arg(self) -> str: return self.as_param @classmethod def create(cls, name: str, context: str) -> "Target": return cls(name=name, context=context) @attr.s class Argument: """ Argument to a request or a reply """ name: str = attr.ib() signature: str = attr.ib(converter=proto_to_type) summary: str = attr.ib() enum: Optional["Enum"] = attr.ib() interface: Optional["Interface"] = attr.ib() @interface.validator # type: ignore def _validate_interface(self, attribute, value): if value is not None and self.signature not in ["n", "o"]: raise ValueError("Interface may only be set for object types") @property def as_arg(self) -> str: return f"{self.ctype} {self.name}" @property def ctype(self) -> str: return { "u": "uint32_t", "i": "int32_t", "s": "const char *", "h": "int", "f": "float", "o": "object_id_t", "n": "new_id_t", }[self.signature] @property def argtype(self) -> str: return { "u": "u32", "i": "i32", "s": "str", "h": "fd", "f": "f32", "o": "obj", "n": "obj", }[self.signature] @signature.validator # type: ignore def _validate_signature(self, attribute, value): types = "iufhnos" assert value in types, f"Failed to parse signature {value}" @classmethod def create( cls, name: str, signature: str, summary: str = "", enum: Optional["Enum"] = None, interface: Optional["Interface"] = None, ) -> "Argument": return cls(name, signature, summary, enum, interface=interface) @attr.s class Message: """ Parent class for the wire message (Request or Event). """ name: str = attr.ib() since: int = attr.ib() opcode: int = attr.ib() interface: "Interface" = attr.ib() arguments: list[Argument] = attr.ib(init=False, factory=list) def add_argument(self, arg: Argument) -> None: self.arguments.append(arg) @property def num_arguments(self) -> int: return len(self.arguments) @property def signature(self) -> str: return "".join([a.signature for a in self.arguments]) @property def camel_name(self) -> str: return snake2camel(self.name) @attr.s class Request(Message): @classmethod def create( cls, name: str, opcode: int, interface: "Interface", since: int = 1 ) -> "Request": return cls(name=name, opcode=opcode, since=since, interface=interface) @property def fqdn(self): return f"{self.interface.name}_request_{self.name}" @attr.s class Event(Message): @classmethod def create( cls, name: str, opcode: int, interface: "Interface", since: int = 1 ) -> "Event": return cls(name=name, opcode=opcode, since=since, interface=interface) @property def fqdn(self): return f"{self.interface.name}_event_{self.name}" @attr.s class Entry: """ An enum entry """ name: str = attr.ib() value: int = attr.ib() summary: str = attr.ib() @classmethod def create(cls, name: str, value: int, summary: str = "") -> "Entry": return cls(name=name, value=value, summary=summary) @attr.s class Enum: name: str = attr.ib() since: int = attr.ib() interface: "Interface" = attr.ib() is_bitmask: bool = attr.ib(default=False) entries: list[Entry] = attr.ib(init=False, factory=list) @classmethod def create( cls, name: str, interface: "Interface", since: int = 1, is_bitmask: bool = False ) -> "Enum": return cls(name=name, since=since, interface=interface, is_bitmask=is_bitmask) def add_entry(self, entry: Entry) -> None: for e in self.entries: if e.value == entry.value: raise ValueError(f"Duplicate enum value {entry.value}") if self.is_bitmask: if e.value < 0: raise ValueError("Bitmasks must not be less than zero") if e.value.bit_count() > 1: raise ValueError("Bitmasks must have exactly one bit set") self.entries.append(entry) @property def fqdn(self): return f"{self.interface.name}_{self.name}" @property def camel_name(self) -> str: return snake2camel(self.name) @attr.s class Interface: name: str = attr.ib() version: int = attr.ib() requests: list[Request] = attr.ib(init=False, factory=list) events: list[Event] = attr.ib(init=False, factory=list) enums: list[Enum] = attr.ib(init=False, factory=list) mode: str = attr.ib() # "ei" or "eis" def add_request(self, request: Request) -> None: self.requests.append(request) def add_event(self, event: Event) -> None: self.events.append(event) def add_enum(self, enum: Enum) -> None: self.enums.append(enum) @property def outgoing(self) -> list[Message]: """ Returns the list of messages outgoing from this implementation. We use the same class for both ei and eis. To make the template simpler, the class maps requests/events to incoming/outgoing as correct relative to the implementation. """ if self.mode == "ei": return self.requests # type: ignore else: return self.events # type: ignore @property def incoming(self) -> list[Message]: """ Returns the list of messages incoming to this implementation. We use the same class for both ei and eis. To make the template simpler, the class maps requests/events to incoming/outgoing as correct relative to the implementation. """ if self.mode == "ei": return self.events # type: ignore else: return self.requests # type: ignore @property def ctype(self) -> str: return f"struct {self.name} *" @property def as_arg(self) -> str: return f"{self.ctype} {self.name}" @property def camel_name(self) -> str: return snake2camel(self.name) @classmethod def create(cls, name: str, version: int, mode: str = "ei") -> "Interface": assert mode in ["ei", "eis"] return cls(name=name, version=version, mode=mode) @attr.s class XmlError(Exception): line: int = attr.ib() column: int = attr.ib() message: str = attr.ib() def __str__(self) -> str: return f"line {self.line}:{self.column}: {self.message}" @attr.s class Protocol(xml.sax.handler.ContentHandler): component: str = attr.ib() interfaces: list[Interface] = attr.ib(factory=list) current_interface: Optional[Interface] = attr.ib(init=False, default=None) current_message: Optional[Union[Message, Enum]] = attr.ib(init=False, default=None) run: int = attr.ib(init=False, default=0) def xmlerror(self, msg) -> None: line = self._locator.getLineNumber() # type: ignore col = self._locator.getColumnNumber() # type: ignore raise XmlError(msg, line, col) @property def location(self) -> Tuple[int, int]: line = self._locator.getLineNumber() # type: ignore col = self._locator.getColumnNumber() # type: ignore return line, col def interface_by_name(self, name) -> Interface: try: return [iface for iface in self.interfaces if iface.name == name].pop() except IndexError: raise XmlError(*self.location, f"Unable to find interface {name}") def startDocument(self): self.run += 1 def startElement(self, element: str, attrs: dict): if element == "interface": if self.current_interface is not None: raise XmlError( *self.location, f"Invalid element '{element}' inside interface '{self.current_interface.name}'", ) try: name = attrs["name"] version = attrs["version"] except KeyError as e: raise XmlError( *self.location, f"Missing attribute {e} in element '{element}'" ) if name.startswith("ei"): name = f"{self.component}{name[2:]}" # We only create the interface on the first run, in subsequent runs we # re-use them so we can cross reference correctly if self.run > 1: intf = self.interface_by_name(name) else: intf = Interface.create(name=name, version=version, mode=self.component) self.interfaces.append(intf) self.current_interface = intf # first run only parses interfaces if self.run <= 1: return if element == "request": if self.current_interface is None: raise XmlError( *self.location, f"Invalid element '{element}' outside an ", ) try: name = attrs["name"] since = attrs["since"] except KeyError as e: raise XmlError( *self.location, f"Missing attribute {e} in element '{element}'" ) opcode = len(self.current_interface.requests) request = Request.create( name=name, since=since, opcode=opcode, interface=self.current_interface ) self.current_interface.add_request(request) self.current_message = request elif element == "event": if self.current_interface is None: raise XmlError( *self.location, f"Invalid element '{element}' outside an ", ) if self.current_message is not None: raise XmlError( *self.location, f"Invalid element '{element}' inside '{self.current_message.name}'", ) try: name = attrs["name"] since = attrs["since"] except KeyError as e: raise XmlError( *self.location, f"Missing attribute {e} in element '{element}'" ) opcode = len(self.current_interface.events) event = Event.create( name=name, since=since, opcode=opcode, interface=self.current_interface ) self.current_interface.add_event(event) self.current_message = event elif element == "enum": if self.current_interface is None: raise XmlError( *self.location, f"Invalid element '{element}' outside an ", ) if self.current_message is not None: raise XmlError( *self.location, f"Invalid element '{element}' inside '{self.current_message.name}'", ) try: name = attrs["name"] since = attrs["since"] except KeyError as e: raise XmlError( *self.location, f"Missing attribute {e} in element '{element}'" ) enum_type = attrs.get("type", None) if enum_type is not None and enum_type not in ["bitmask"]: raise XmlError( *self.location, f"Invalid enum type {enum_type} in element '{element}'", ) is_bitmask = enum_type == "bitmask" enum = Enum.create( name=name, since=since, interface=self.current_interface, is_bitmask=is_bitmask, ) self.current_interface.add_enum(enum) self.current_message = enum elif element == "arg": if self.current_interface is None: raise XmlError( *self.location, f"Invalid element '{element}' outside an ", ) if not isinstance(self.current_message, Message): raise XmlError( *self.location, f"Invalid element '{element}' must be inside or ", ) name = attrs["name"] sig = attrs["type"] summary = attrs.get("summary", "") interface_name = attrs.get("interface", None) if interface_name is not None: if interface_name.startswith("ei"): interface_name = f"{self.component}{interface_name[2:]}" interface = self.interface_by_name(interface_name) else: interface = None enum = attrs.get("enum", None) if enum is not None and enum not in [ e.name for e in self.current_interface.enums ]: raise XmlError( *self.location, f"Failed to find enum '{self.current_interface.name}.{enum}'", ) arg = Argument.create( name=name, signature=sig, summary=summary, enum=enum, interface=interface, ) self.current_message.add_argument(arg) elif element == "entry": if self.current_interface is None: raise XmlError( *self.location, f"Invalid element '{element}' outside an ", ) if not isinstance(self.current_message, Enum): raise XmlError( *self.location, f"Invalid element '{element}' must be inside " ) name = attrs["name"] value = int(attrs["value"]) summary = attrs.get("summary", "") entry = Entry.create(name=name, value=value, summary=summary) try: self.current_message.add_entry(entry) except ValueError as e: raise XmlError(*self.location, str(e)) def endElement(self, name): if name == "interface": assert self.current_interface is not None self.current_interface = None # first run only parses interfaces if self.run <= 1: return if name == "request": assert isinstance(self.current_message, Request) self.current_message = None elif name == "event": assert isinstance(self.current_message, Event) self.current_message = None elif name == "enum": assert isinstance(self.current_message, Enum) self.current_message = None def characters(self, content): pass @classmethod def create(cls, component: str) -> "Protocol": h = cls(component=component) return h def parse(protofile: Path, component: str) -> Protocol: proto = Protocol.create(component=component) xml.sax.parse(os.fspath(protofile), proto) # We parse two times, once to fetch all the interfaces, then to parse the details xml.sax.parse(os.fspath(protofile), proto) return proto def generate_source( proto: Protocol, headerfile: Optional[str], template: Path, component: str ) -> jinja2.environment.TemplateStream: assert component in ["ei", "eis"] target = { "ei": Target.create("ei", context="context"), "eis": Target.create("eis_client", context="client"), }[component] data = {} data["target"] = target data["interfaces"] = proto.interfaces if headerfile: data["headerfile"] = headerfile env = jinja2.Environment( loader=jinja2.FileSystemLoader(os.fspath(template.parent)), trim_blocks=True, lstrip_blocks=True, ) jtemplate = env.get_template(template.name) return jtemplate.stream(data) def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--component", type=str, choices=["ei", "eis"], default="ei") parser.add_argument( "--output", type=str, default="-", help="Output file to write to" ) parser.add_argument("protocol", type=Path, help="The protocol XML file") parser.add_argument("template", type=Path, help="The template file") ns = parser.parse_args() assert ns.template.exists() assert ns.protocol.exists() try: proto = parse( protofile=ns.protocol, component=ns.component, ) except xml.sax._exceptions.SAXParseException as e: print(f"Parser error: {e}", file=sys.stderr) sys.exit(1) except XmlError as e: print(f"Protocol XML error: {e}", file=sys.stderr) sys.exit(1) headerfile = f"{Path(ns.output).stem}.h" if ns.output != "-" else None stream = generate_source( proto=proto, headerfile=headerfile, template=ns.template, component=ns.component ) file = sys.stdout if ns.output == "-" else open(ns.output, "w") stream.dump(file) if __name__ == "__main__": main()