#!/usr/bin/env python3 # # SPDX-License-Identifier: MIT """ EI protocol parser This parser is intended to be generically useful for language bindings other than libei/libeis. If it isn't, please file a bug. When used as ei-scanner, it converts a Jinja2 template with the scanned protocol. Otherwise, use the `parse()` function to parse the protocol and return its structure as a set of Python classes. Opcodes for events and request are assigned in order as they appear in the XML file. """ from typing import Any, Optional, Union, Tuple from pathlib import Path from textwrap import dedent import argparse import attr import jinja2 import jinja2.environment import os import sys import xml.sax import xml.sax.handler """ Mapping of allowed protocol types to the single-character signature strings used in the various code pieces. """ PROTOCOL_TYPES = { "uint": "u", "int": "i", "float": "f", "fd": "h", "new_id": "n", "object": "o", "string": "s", } def snake2camel(s: str) -> str: """ Convert snake_case to CamelCase (well, strictly speaking PascalCase """ return s.replace("_", " ").title().replace(" ", "") @attr.s class Description: summary: str = attr.ib(default="") text: str = attr.ib(default="") @attr.s class Argument: """ Argument to a request or a reply """ name: str = attr.ib() protocol_type: str = attr.ib() summary: str = attr.ib() enum: Optional["Enum"] = attr.ib() interface: Optional["Interface"] = attr.ib() @property def signature(self) -> str: """ The single-character signature for this argument """ return PROTOCOL_TYPES[self.protocol_type] @interface.validator # type: ignore def _validate_interface(self, attribute, value): if value is not None and self.signature not in ["n", "o"]: raise ValueError("Interface may only be set for object types") @property def as_c_arg(self) -> str: return f"{self.c_type} {self.name}" @property def c_type(self) -> str: return { "uint": "uint32_t", "int": "int32_t", "string": "const char *", "fd": "int", "float": "float", "object": "object_id_t", "new_id": "new_id_t", }[self.protocol_type] @protocol_type.validator # type: ignore def _validate_protocol_type(self, attribute, value): assert ( value is not None and value in PROTOCOL_TYPES ), f"Failed to parse protocol_type {value}" @classmethod def create( cls, name: str, protocol_type: str, summary: str = "", enum: Optional["Enum"] = None, interface: Optional["Interface"] = None, ) -> "Argument": return cls( name=name, protocol_type=protocol_type, summary=summary, enum=enum, interface=interface, ) @attr.s class Message: """ Parent class for a wire message (Request or Event). """ name: str = attr.ib() since: int = attr.ib() opcode: int = attr.ib() interface: "Interface" = attr.ib() description: Optional[Description] = attr.ib(default=None) is_destructor: bool = attr.ib(default=False) arguments: list[Argument] = attr.ib(init=False, factory=list) def add_argument(self, arg: Argument) -> None: if arg.name in [a.name for a in self.arguments]: raise ValueError(f"Duplicate argument name '{arg.name}'") self.arguments.append(arg) @property def num_arguments(self) -> int: return len(self.arguments) @property def signature(self) -> str: return "".join([a.signature for a in self.arguments]) @property def camel_name(self) -> str: return snake2camel(self.name) @attr.s class Request(Message): @classmethod def create( cls, name: str, opcode: int, interface: "Interface", since: int = 1, is_destructor: bool = False, ) -> "Request": return cls( name=name, opcode=opcode, since=since, interface=interface, is_destructor=is_destructor, ) @property def fqdn(self) -> str: """ The full name of this Request as _request_ """ return f"{self.interface.name}_request_{self.name}" @attr.s class Event(Message): @classmethod def create( cls, name: str, opcode: int, interface: "Interface", since: int = 1, is_destructor: bool = False, ) -> "Event": return cls( name=name, opcode=opcode, since=since, interface=interface, is_destructor=is_destructor, ) @property def fqdn(self) -> str: """ The full name of this Event as _event_ """ return f"{self.interface.name}_event_{self.name}" @attr.s class Entry: """ An enum entry """ name: str = attr.ib() value: int = attr.ib() enum: "Enum" = attr.ib() summary: str = attr.ib() since: int = attr.ib() @classmethod def create( cls, name: str, value: int, enum: "Enum", summary: str = "", since: int = 1 ) -> "Entry": return cls(name=name, value=value, enum=enum, summary=summary, since=since) @property def fqdn(self) -> str: """ The full name of this Entry as __ """ return f"{self.enum.fqdn}_{self.name}" @attr.s class Enum: name: str = attr.ib() since: int = attr.ib() interface: "Interface" = attr.ib() is_bitfield: bool = attr.ib(default=False) description: Optional[Description] = attr.ib(default=None) entries: list[Entry] = attr.ib(init=False, factory=list) @classmethod def create( cls, name: str, interface: "Interface", since: int = 1, is_bitfield: bool = False, ) -> "Enum": return cls(name=name, since=since, interface=interface, is_bitfield=is_bitfield) def add_entry(self, entry: Entry) -> None: for e in self.entries: if e.name == entry.name: raise ValueError(f"Duplicate enum name '{entry.name}'") if e.value == entry.value: raise ValueError(f"Duplicate enum value '{entry.value}'") if self.is_bitfield: if e.value < 0: raise ValueError("Bitmasks must not be less than zero") if e.value.bit_count() > 1: raise ValueError("Bitmasks must have exactly one bit set") self.entries.append(entry) @property def fqdn(self): """ The full name of this Enum as _ """ return f"{self.interface.name}_{self.name}" @property def camel_name(self) -> str: return snake2camel(self.name) @attr.s class Interface: name: str = attr.ib() version: int = attr.ib() requests: list[Request] = attr.ib(init=False, factory=list) events: list[Event] = attr.ib(init=False, factory=list) enums: list[Enum] = attr.ib(init=False, factory=list) mode: str = attr.ib() # "ei" or "eis" description: Optional[Description] = attr.ib(default=None) def add_request(self, request: Request) -> None: if request.name in [r.name for r in self.requests]: raise ValueError(f"Duplicate request name '{request.name}'") self.requests.append(request) def add_event(self, event: Event) -> None: if event.name in [r.name for r in self.events]: raise ValueError(f"Duplicate event name '{event.name}'") self.events.append(event) def add_enum(self, enum: Enum) -> None: if enum.name in [r.name for r in self.enums]: raise ValueError(f"Duplicate enum name '{enum.name}'") self.enums.append(enum) @property def outgoing(self) -> list[Message]: """ Returns the list of messages outgoing from this implementation. We use the same class for both ei and eis. To make the template simpler, the class maps requests/events to incoming/outgoing as correct relative to the implementation. """ if self.mode == "ei": return self.requests # type: ignore elif self.mode == "eis": return self.events # type: ignore else: raise NotImplementedError( f"Interface.outgoing is not supported for mode {self.mode}" ) @property def incoming(self) -> list[Message]: """ Returns the list of messages incoming to this implementation. We use the same class for both ei and eis. To make the template simpler, the class maps requests/events to incoming/outgoing as correct relative to the implementation. """ if self.mode == "ei": return self.events # type: ignore elif self.mode == "eis": return self.requests # type: ignore else: raise NotImplementedError( f"Interface.incoming is not supported for mode {self.mode}" ) @property def c_type(self) -> str: return f"struct {self.name} *" @property def as_c_arg(self) -> str: return f"{self.c_type} {self.name}" @property def camel_name(self) -> str: return snake2camel(self.name) @classmethod def create(cls, name: str, version: int, mode: str = "ei") -> "Interface": assert mode in ["ei", "eis", "brei"] return cls(name=name, version=version, mode=mode) @attr.s class XmlError(Exception): line: int = attr.ib() column: int = attr.ib() message: str = attr.ib() def __str__(self) -> str: return f"line {self.line}:{self.column}: {self.message}" @classmethod def create(cls, message: str, location: Tuple[int, int] = (0, 0)) -> "XmlError": return cls(line=location[0], column=location[1], message=message) @attr.s class Copyright: text: str = attr.ib(default="") is_complete: bool = attr.ib(init=False, default=False) @attr.s class Protocol: copyright: Optional[str] = attr.ib(default=None) interfaces: list[Interface] = attr.ib(factory=list) @attr.s class ProtocolParser(xml.sax.handler.ContentHandler): component: str = attr.ib() interfaces: list[Interface] = attr.ib(factory=list) current_interface: Optional[Interface] = attr.ib(init=False, default=None) current_message: Optional[Union[Message, Enum]] = attr.ib(init=False, default=None) current_description: Optional[Description] = attr.ib(init=False, default=None) copyright: Optional[Copyright] = attr.ib(init=False, default=None) _run_counter: int = attr.ib(init=False, default=0, repr=False) @property def location(self) -> Tuple[int, int]: line = self._locator.getLineNumber() # type: ignore col = self._locator.getColumnNumber() # type: ignore return line, col def interface_by_name(self, name) -> Interface: try: return [iface for iface in self.interfaces if iface.name == name].pop() except IndexError: raise XmlError.create(f"Unable to find interface {name}", self.location) def startDocument(self): self._run_counter += 1 def startElement(self, element: str, attrs: dict): if element == "interface": if self.current_interface is not None: raise XmlError.create( f"Invalid element '{element}' inside interface '{self.current_interface.name}'", self.location, ) try: name = attrs["name"] version = attrs["version"] except KeyError as e: raise XmlError.create( f"Missing attribute {e} in element '{element}'", self.location, ) if name.startswith("ei"): name = f"{self.component}{name[2:]}" # We only create the interface on the first run, in subsequent runs we # re-use them so we can cross reference correctly if self._run_counter > 1: intf = self.interface_by_name(name) else: intf = Interface.create(name=name, version=version, mode=self.component) self.interfaces.append(intf) self.current_interface = intf # first run only parses interfaces if self._run_counter <= 1: return if element == "request": if self.current_interface is None: raise XmlError.create( f"Invalid element '{element}' outside an ", self.location, ) try: name = attrs["name"] since = attrs["since"] except KeyError as e: raise XmlError.create( f"Missing attribute {e} in element '{element}'", self.location, ) is_destructor = attrs.get("type", "") == "destructor" opcode = len(self.current_interface.requests) request = Request.create( name=name, since=since, opcode=opcode, interface=self.current_interface, is_destructor=is_destructor, ) try: self.current_interface.add_request(request) except ValueError as e: raise XmlError.create(str(e), self.location) self.current_message = request elif element == "event": if self.current_interface is None: raise XmlError.create( f"Invalid element '{element}' outside an ", self.location, ) if self.current_message is not None: raise XmlError.create( f"Invalid element '{element}' inside '{self.current_message.name}'", self.location, ) try: name = attrs["name"] since = attrs["since"] except KeyError as e: raise XmlError.create( f"Missing attribute {e} in element '{element}'", self.location, ) is_destructor = attrs.get("type", "") == "destructor" opcode = len(self.current_interface.events) event = Event.create( name=name, since=since, opcode=opcode, interface=self.current_interface, is_destructor=is_destructor, ) try: self.current_interface.add_event(event) except ValueError as e: raise XmlError.create(str(e), self.location) self.current_message = event elif element == "enum": if self.current_interface is None: raise XmlError.create( f"Invalid element '{element}' outside an ", self.location, ) if self.current_message is not None: raise XmlError.create( f"Invalid element '{element}' inside '{self.current_message.name}'", self.location, ) try: name = attrs["name"] since = attrs["since"] except KeyError as e: raise XmlError.create( f"Missing attribute {e} in element '{element}'", self.location, ) try: is_bitfield = { "true": True, "false": False, }[attrs.get("bitfield", "false")] except KeyError as e: raise XmlError.create( f"Invalid value {e} for boolean bitfield attribute in '{element}'", self.location, ) enum = Enum.create( name=name, since=since, interface=self.current_interface, is_bitfield=is_bitfield, ) try: self.current_interface.add_enum(enum) except ValueError as e: raise XmlError.create(str(e), self.location) self.current_message = enum elif element == "arg": if self.current_interface is None: raise XmlError.create( f"Invalid element '{element}' outside an ", self.location, ) if not isinstance(self.current_message, Message): raise XmlError.create( f"Invalid element '{element}' must be inside or ", self.location, ) name = attrs["name"] proto_type = attrs["type"] if proto_type not in PROTOCOL_TYPES: raise XmlError.create( f"Invalid type '{proto_type}' for '{self.current_interface.name}.{self.current_message.name}::{name}'", self.location, ) summary = attrs.get("summary", "") interface_name = attrs.get("interface", None) if interface_name is not None: if interface_name.startswith("ei"): interface_name = f"{self.component}{interface_name[2:]}" interface = self.interface_by_name(interface_name) else: interface = None enum = attrs.get("enum", None) if enum is not None and enum not in [ e.name for e in self.current_interface.enums ]: raise XmlError.create( f"Failed to find enum '{self.current_interface.name}.{enum}'", self.location, ) arg = Argument.create( name=name, protocol_type=proto_type, summary=summary, enum=enum, interface=interface, ) self.current_message.add_argument(arg) elif element == "entry": if self.current_interface is None: raise XmlError.create( f"Invalid element '{element}' outside an ", self.location, ) if not isinstance(self.current_message, Enum): raise XmlError.create( f"Invalid element '{element}' must be inside ", self.location, ) name = attrs["name"] value = int(attrs["value"]) summary = attrs.get("summary", "") since = int(attrs.get("since", 1)) entry = Entry.create( name=name, value=value, enum=self.current_message, summary=summary, since=since, ) try: self.current_message.add_entry(entry) except ValueError as e: raise XmlError.create(str(e), self.location) elif element == "description": summary = attrs.get("summary", "") self.current_description = Description(summary=summary) elif element == "copyright": if self.copyright is not None: raise XmlError.create( "Multiple tags in file", self.location ) self.copyright = Copyright() def endElement(self, name): if name == "interface": assert self.current_interface is not None self.current_interface = None # first run only parses interfaces if self._run_counter <= 1: return if name == "request": assert isinstance(self.current_message, Request) self.current_message = None elif name == "event": assert isinstance(self.current_message, Event) self.current_message = None elif name == "enum": assert isinstance(self.current_message, Enum) self.current_message = None elif name == "description": assert self.current_description is not None self.current_description.text = dedent(self.current_description.text) if self.current_message is None: assert self.current_interface is not None self.current_interface.description = self.current_description else: self.current_message.description = self.current_description self.current_description = None elif name == "copyright": assert self.copyright is not None self.copyright.text = dedent(self.copyright.text) self.copyright.is_complete = True def characters(self, content): if self.current_description is not None: self.current_description.text += content elif self.copyright is not None and not self.copyright.is_complete: self.copyright.text += content @classmethod def create(cls, component: str) -> "ProtocolParser": h = cls(component=component) return h def parse(protofile: Path, component: str) -> Protocol: proto = ProtocolParser.create(component=component) xml.sax.parse(os.fspath(protofile), proto) # We parse two times, once to fetch all the interfaces, then to parse the details xml.sax.parse(os.fspath(protofile), proto) copyright = proto.copyright.text if proto.copyright else None return Protocol(copyright=copyright, interfaces=proto.interfaces) def generate_source( proto: Protocol, headerfile: Optional[str], template: Path, component: str ) -> jinja2.environment.TemplateStream: assert component in ["ei", "eis", "brei"] data: dict[str, Any] = {} data["component"] = component data["interfaces"] = proto.interfaces if headerfile: data["headerfile"] = headerfile env = jinja2.Environment( loader=jinja2.FileSystemLoader(os.fspath(template.parent)), trim_blocks=True, lstrip_blocks=True, ) # jinja filter to convert foo into "struct foo *" def filter_c_type(name): return f"struct {name} *" # jinja filter to convert foo into "struct foo *foo" def filter_as_c_arg(name): return f"struct {name} *{name}" env.filters["c_type"] = filter_c_type env.filters["as_c_arg"] = filter_as_c_arg env.filters["camel"] = snake2camel jtemplate = env.get_template(template.name) return jtemplate.stream(data) def main() -> None: import textwrap parser = argparse.ArgumentParser( description=textwrap.dedent( """ ei-scanner is a tool to parse the EI protocol description XML and pass the data to a Jinja2 template. That template can then be used to generate protocol bindings for the desired language. typical usages: ei-scanner --component=ei protocol.xml my-template.tpl ei-scanner --component=eis --output=bindings.rs protocol.xml bindings.rs.tpl Elements in the XML file are provided as variables with attributes generally matching the XML file. For example, each interface has requests, events and enums, and each of those has a name. ei-scanner additionally provides the following values to the Jinja2 templates: - interface.incoming and interface.outgoing: maps to the requests/events of the interface, depending on the component. - argument.signature: a single-character signature type mapping from the protocol XML type: uint -> "u" int -> "i" float -> "f" fd -> "h" new_id -> "n" object -> "o" string -> "s" ei-scanner adds the following Jinja2 filters for convenience: {{foo|c_type}} ... resolves to "struct foo *" {{foo|as_c_arg}} ... resolves to "struct foo *foo" {{foo_bar|camel}} ... resolves to "FooBar" """ ), formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "--component", type=str, choices=["ei", "eis", "brei"], default="ei" ) parser.add_argument( "--output", type=str, default="-", help="Output file to write to" ) parser.add_argument("protocol", type=Path, help="The protocol XML file") parser.add_argument( "template", type=Path, help="The Jinja2 compatible template file" ) ns = parser.parse_args() assert ns.template.exists() assert ns.protocol.exists() try: proto = parse( protofile=ns.protocol, component=ns.component, ) except xml.sax.SAXParseException as e: print(f"Parser error: {e}", file=sys.stderr) sys.exit(1) except XmlError as e: print(f"Protocol XML error: {e}", file=sys.stderr) sys.exit(1) headerfile = f"{Path(ns.output).stem}.h" if ns.output != "-" else None stream = generate_source( proto=proto, headerfile=headerfile, template=ns.template, component=ns.component ) file = sys.stdout if ns.output == "-" else open(ns.output, "w") stream.dump(file) if __name__ == "__main__": main()