mirror of
https://gitlab.freedesktop.org/dbus/dbus.git
synced 2025-12-28 21:20:10 +01:00
* glib/dbus-gmain.c: Make the previous commit compile. * python/_dbus.py, python/matchrules.py: Patch from Ole Andre Ravnaas <ole.andre.ravnaas@collabora.co.uk> to allow you to specify sender_keyword="foo", path_keyword="bar" when adding a signal listener, so that you can bind to signals generically but still do something useful in your callback. * python/dbus_bindings.pyx: Demarshal the byte type as unsigned chars so that they're not cast to chars and made negative. Thanks to Jakub Stachowski for reporting this and testing the fix.
232 lines
7.5 KiB
Python
232 lines
7.5 KiB
Python
from exceptions import DBusException
|
|
|
|
class SignalMatchNode:
|
|
def __init__(self):
|
|
self.wildcard = None
|
|
self.finite = {}
|
|
self.rules = []
|
|
|
|
def add(self, key, leaf=None):
|
|
node = None
|
|
|
|
if key:
|
|
if self.finite.has_key(key):
|
|
node = self.finite[key]
|
|
else:
|
|
node = SignalMatchNode()
|
|
self.finite[key] = node
|
|
else:
|
|
if self.wildcard:
|
|
node = self.wildcard
|
|
else:
|
|
node = SignalMatchNode()
|
|
self.wildcard = node
|
|
|
|
node.rules.append(leaf)
|
|
return node
|
|
|
|
def get_matches(self, key):
|
|
result = []
|
|
if self.wildcard:
|
|
result.append(self.wildcard)
|
|
|
|
if self.finite.has_key(key):
|
|
result.append(self.finite[key])
|
|
|
|
return result
|
|
|
|
def get_match(self, key):
|
|
if key:
|
|
if self.finite.has_key(key):
|
|
return self.finite[key]
|
|
else:
|
|
return None
|
|
|
|
return self.wildcard
|
|
|
|
def has_children(self):
|
|
if self.wildcard or len(self.finite.iterkeys()) > 0:
|
|
return True
|
|
return False
|
|
|
|
def remove_child(self, child, key=None):
|
|
if self.wildcard == child:
|
|
self.wildcard = None
|
|
elif self.finite.has_key(key):
|
|
del self.finite[key]
|
|
|
|
class SignalMatchTree:
|
|
"""This class creates an ordered tree of SignalMatchRules
|
|
to speed searchs. Left branches are wildcard elements
|
|
and all other branches are concreet elements.
|
|
"""
|
|
def __init__(self):
|
|
self._tree = SignalMatchNode()
|
|
|
|
def add(self, rule):
|
|
interface = self._tree.add(rule.sender)
|
|
signal = interface.add(rule.dbus_interface)
|
|
path = signal.add(rule.signal_name)
|
|
path.add(rule.path, leaf=rule)
|
|
|
|
def exec_matches(self, match_rule, message):
|
|
args = message.get_args_list()
|
|
|
|
sender_matches = self._tree.get_matches(match_rule.sender)
|
|
for sender_node in sender_matches:
|
|
interface_matches = sender_node.get_matches(match_rule.dbus_interface)
|
|
for interface_node in interface_matches:
|
|
signal_matches = interface_node.get_matches(match_rule.signal_name)
|
|
for signal_node in signal_matches:
|
|
path_matches = signal_node.get_matches(match_rule.path)
|
|
for path_node in path_matches:
|
|
if(path_node.rules):
|
|
for rule in path_node.rules:
|
|
if (rule.match_args_from_list(args)):
|
|
rule.execute(message, args)
|
|
|
|
def remove(self, rule):
|
|
try:
|
|
sender = self._tree.get_match(rule.sender)
|
|
interface = sender.get_match(rule.dbus_interface)
|
|
signal = interface.get_match(rule.signal_name)
|
|
path = signal.get_match(rule.path)
|
|
|
|
rule_matches = []
|
|
for _rule in path.rules:
|
|
if _rule.is_match(rule):
|
|
rule_matches.append(_rule)
|
|
|
|
for _rule in rule_matches:
|
|
path.rules.remove(_rule)
|
|
|
|
#clean up tree
|
|
if len(path.rules) == 0:
|
|
signal.remove_child(path, key = rule.path)
|
|
if not signal.has_children():
|
|
interface.remove_child(signal, key = rule.signal_name)
|
|
if not interface.has_children():
|
|
sender.remove_child(interface, key = rule.dbus_interface)
|
|
if not sender.has_children():
|
|
self._tree.remove_child(sender, key = rule.sender)
|
|
|
|
except:
|
|
raise DBusException ("Trying to remove unkown rule: %s"%str(rule))
|
|
|
|
class SignalMatchRule:
|
|
"""This class represents a dbus rule used to filter signals.
|
|
When a rule matches a filter, the signal is propagated to the handler_funtions
|
|
"""
|
|
def __init__(self, signal_name, dbus_interface, sender, path):
|
|
self.handler_functions = []
|
|
|
|
self.signal_name = signal_name
|
|
self.dbus_interface = dbus_interface
|
|
self.sender = sender
|
|
self.path = path
|
|
self.args = None
|
|
|
|
def add_args_match(self, args):
|
|
self.args = args
|
|
|
|
def execute(self, message, args=None):
|
|
keywords = {}
|
|
|
|
if self.sender_keyword is not None:
|
|
keywords[self.sender_keyword] = message.get_sender()
|
|
if self.path_keyword is not None:
|
|
keywords[self.path_keyword] = message.get_path()
|
|
|
|
# optimization just in case we already extracted the args
|
|
if not args:
|
|
args = message.get_args_list()
|
|
|
|
for handler in self.handler_functions:
|
|
if getattr(handler, "_dbus_pass_message", False):
|
|
keywords["dbus_message"] = message
|
|
|
|
if len(keywords) == 0:
|
|
handler(*args)
|
|
else:
|
|
handler(*args, **keywords)
|
|
|
|
def add_handler(self, handler):
|
|
self.handler_functions.append(handler)
|
|
|
|
#matches only those arguments listed by self
|
|
def match_args_from_list(self, args_list):
|
|
if not self.args:
|
|
return True
|
|
|
|
last_index = len(args_list) - 1
|
|
for (index, value) in self.args.iteritems():
|
|
if index > last_index:
|
|
return False
|
|
|
|
if not (args_list[index] == value):
|
|
return False
|
|
|
|
return True
|
|
|
|
#does exact matching
|
|
def match_args_from_rule(self, rule):
|
|
if self.args == rule.args:
|
|
return True
|
|
|
|
if self.args == None or rule.args == None:
|
|
return False
|
|
|
|
my_args_list = self.args.items()
|
|
match_args_list = rule.args.iterms()
|
|
|
|
if len(my_args_list) != len(match_args_list):
|
|
return False
|
|
|
|
for (key, value) in my_args_list:
|
|
if rule.args.get(key) != value:
|
|
return False
|
|
|
|
return True
|
|
|
|
def is_match(self, rule):
|
|
if (self.signal_name == rule.signal_name and
|
|
self.dbus_interface == rule.dbus_interface and
|
|
self.sender == rule.sender and
|
|
self.path == rule.path and
|
|
self.match_args_from_rule(rule)):
|
|
if rule.handler_functions == []:
|
|
return True
|
|
|
|
_funcs_copy_a = self.handler_functions[0:]
|
|
_funcs_copy_b = rule.handler_functions[0:]
|
|
_funcs_copy_a.sort()
|
|
_funcs_copy_b.sort()
|
|
|
|
return _funcs_copy_a == _funcs_copy_b
|
|
|
|
return False
|
|
|
|
def __repr__(self):
|
|
"""Returns a custom representation of this DBusMatchRule that can
|
|
be used with dbus_bindings
|
|
"""
|
|
repr = "type='signal'"
|
|
if (self.dbus_interface):
|
|
repr = repr + ",interface='%s'" % (self.dbus_interface)
|
|
|
|
if (self.sender):
|
|
repr = repr + ",sender='%s'" % (self.sender)
|
|
|
|
if (self.path):
|
|
repr = repr + ",path='%s'" % (self.path)
|
|
|
|
if (self.signal_name):
|
|
repr = repr + ",member='%s'" % (self.signal_name)
|
|
|
|
if (self.args):
|
|
my_args_list = self.args.items()
|
|
my_args_list.sort()
|
|
for (index, value) in my_args_list:
|
|
repr = repr + ",arg%i='%s'" % (index, value)
|
|
|
|
return repr
|