forgot to add files...

This commit is contained in:
Havoc Pennington 2003-09-30 02:40:49 +00:00
parent dfd1292d52
commit 42019c9625
5 changed files with 1452 additions and 0 deletions

28
python/Makefile.am Normal file
View file

@ -0,0 +1,28 @@
INCLUDES=-I$(top_builddir) -I$(top_builddir)/glib $(DBUS_CLIENT_CFLAGS) $(DBUS_GLIB_CFLAGS) $(DBUS_GLIB_TOOL_CFLAGS) $(PYTHON_INCLUDES)
dbusdir = $(pythondir)
dbus_PYTHON = dbus.py
dbusbindingsdir = $(pythondir)
dbusbindings_LTLIBRARIES = dbus_bindings.la
dbus_bindings_la_LDFLAGS = -module -avoid-version -fPIC -export-symbols-regex initdbus_bindings
dbus_bindings_la_LIBADD = $(top_builddir)/dbus/libdbus-1.la $(top_builddir)/glib/libdbus-glib-1.la
dbus_bindings_la_SOURCES = dbus_bindings.c
EXTRA_DIST = \
dbus_bindings.pyx.in \
extract.py \
setup.py \
test.py
CLEANFILES = \
dbus_bindings.pyx \
dbus_bindings.c
dbus_bindings.pyx: dbus_bindings.pyx.in extract.py
-$(PYTHON) extract.py dbus_bindings.pyx.in -I$(top_builddir) > dbus_bindings.pyx
dbus_bindings.c: dbus_bindings.pyx
-pyrexc dbus_bindings.pyx

267
python/dbus.py Normal file
View file

@ -0,0 +1,267 @@
"""Module for high-level communication over the FreeDesktop.org Bus (DBus)
DBus allows you to share and access remote objects between processes
running on the desktop, and also to access system services (such as
the print spool).
To use DBus, first get a Bus object, which provides a connection to one
of a few standard dbus-daemon instances that might be running. From the
Bus you can get a RemoteService. A service is provided by an application or
process connected to the Bus, and represents a set of objects. Once you
have a RemoteService you can get a RemoteObject that implements a specific interface
(an interface is just a standard group of member functions). Then you can call
those member functions directly.
You can think of a complete method call as looking something like:
Bus:SESSION -> Service:org.gnome.Evolution -> Object:/org/gnome/Evolution/Inbox -> Interface: org.gnome.Evolution.MailFolder -> Method: Forward('message1', 'seth@gnome.org')
This communicates over the SESSION Bus to the org.gnome.Evolution process to call the
Forward method of the /org/gnome/Evolution/Inbox object (which provides the
org.gnome.Evolution.MailFolder interface) with two string arguments.
For example, the dbus-daemon itself provides a service and some objects:
# Get a connection to the desktop-wide SESSION bus
bus = dbus.Bus(dbus.Bus.TYPE_SESSION)
# Get the service provided by the dbus-daemon named org.freedesktop.DBus
dbus_service = bus.get_service('org.freedesktop.DBus')
# Get a reference to the desktop bus' standard object, denoted
# by the path /org/freedesktop/DBus. The object /org/freedesktop/DBus
# implements the 'org.freedesktop.DBus' interface
dbus_object = dbus_service.get_object('/org/freedesktop/DBus',
'org.freedesktop.DBus')
# One of the member functions in the org.freedesktop.DBus interface
# is ListServices(), which provides a list of all the other services
# registered on this bus. Call it, and print the list.
print(dbus_object.ListServices())
"""
import dbus_bindings
class Bus:
"""A connection to a DBus daemon.
One of three possible standard buses, the SESSION, SYSTEM,
or ACTIVATION bus
"""
TYPE_SESSION = dbus_bindings.BUS_SESSION
TYPE_SYSTEM = dbus_bindings.BUS_SYSTEM
TYPE_ACTIVATION = dbus_bindings.BUS_ACTIVATION
def __init__(self, bus_type=TYPE_SESSION, glib_mainloop=True):
self._connection = dbus_bindings.bus_get(bus_type)
self._connection.add_filter(self._signal_func)
self._match_rule_to_receivers = { }
if (glib_mainloop):
self._connection.setup_with_g_main()
def get_service(self, service_name="org.freedesktop.Broadcast"):
"""Get one of the RemoteServices connected to this Bus. service_name
is just a string of the form 'com.widgetcorp.MyService'
"""
return RemoteService(self._connection, service_name)
def add_signal_receiver(self, receiver, interface=None, service=None, path=None):
match_rule = self._get_match_rule(interface, service, path)
if (not self._match_rule_to_receivers.has_key(match_rule)):
self._match_rule_to_receivers[match_rule] = [ ]
self._match_rule_to_receivers[match_rule].append(receiver)
dbus_bindings.bus_add_match(self._connection, match_rule)
def get_connection(self):
"""Get the dbus_bindings.Connection object associated with this Bus"""
return self._connection
def _get_match_rule(self, interface, service, path):
## if (interface):
## match_rule = match_rule + ",interface='%s'" % (interface)
## if (service):
## match_rule = match_rule + ",service='%s'" % (service)
## if (path):
## match_rule = match_rule + ",path='%s'" % (path)
# FIXME: use the service here too!!!
return "type='signal',interface='%s',path='%s'" % (interface, path)
def _signal_func(self, connection, message):
if (message.get_type() != dbus_bindings.MESSAGE_TYPE_SIGNAL):
return
interface = message.get_interface()
service = message.get_sender()
path = message.get_path()
member = message.get_member()
match_rule = self._get_match_rule(interface, service, path)
if (self._match_rule_to_receivers.has_key(match_rule)):
receivers = self._match_rule_to_receivers[match_rule]
args = [interface, member, service, path]
for receiver in receivers:
receiver(*args)
class RemoteObject:
"""A remote Object.
A RemoteObject is provided by a RemoteService on a particular Bus. RemoteObjects
have member functions, and can be called like normal Python objects.
"""
def __init__(self, connection, service_name, object_path, interface):
self._connection = connection
self._service_name = service_name
self._object_path = object_path
self._interface = interface
def __getattr__(self, member):
if member == '__call__':
return object.__call__
else:
return RemoteMethod(self._connection, self._service_name,
self._object_path, self._interface, member)
class RemoteMethod:
"""A remote Method.
Typically a member of a RemoteObject. Calls to the
method produce messages that travel over the Bus and are routed
to a specific Service.
"""
def __init__(self, connection, service_name, object_path, interface, method_name):
self._connection = connection
self._service_name = service_name
self._object_path = object_path
self._interface = interface
self._method_name = method_name
def __call__(self, *args):
message = dbus_bindings.MethodCall(self._object_path, self._interface, self._method_name)
message.set_destination(self._service_name)
# Add the arguments to the function
iter = message.get_iter()
for arg in args:
iter.append(arg)
reply_message = self._connection.send_with_reply_and_block(message, 5000)
args_tuple = reply_message.get_args_list()
if len(args_tuple) == 0:
return
elif len(args_tuple) == 1:
return args_tuple[0]
else:
return args_tuple
class Service:
"""A base class for exporting your own Services across the Bus
Just inherit from Service, providing the name of your service
(e.g. org.designfu.SampleService).
"""
def __init__(self, service_name, bus=None):
self._service_name = service_name
if bus == None:
# Get the default bus
self._bus = Bus()
else:
self._bus = bus
dbus_bindings.bus_acquire_service(self._bus.get_connection(), service_name)
def get_bus(self):
"""Get the Bus this Service is on"""
return self._bus
def get_service_name(self):
"""Get the name of this service"""
return self._service_name
class Object:
"""A base class for exporting your own Objects across the Bus.
Just inherit from Object and provide a list of methods to share
across the Bus. These will appear as member functions of your
ServiceObject.
"""
def __init__(self, object_path, methods_to_share, service):
self._object_path = object_path
self._service = service
self._bus = service.get_bus()
self._connection = self._bus.get_connection()
self._method_name_to_method = self._build_method_dictionary(methods_to_share)
self._connection.register_object_path(object_path, self._unregister_cb, self._message_cb)
def broadcast_signal(self, interface, signal_name):
message = dbus_bindings.Signal(self._object_path, interface, signal_name)
#FIXME: need to set_sender, but it always disconnects when we do this
#message.set_sender(self._service.get_service_name())
self._connection.send(message)
def _unregister_cb(self, connection):
print ("Unregister")
def _message_cb(self, connection, message):
target_method_name = message.get_member()
target_method = self._method_name_to_method[target_method_name]
args = message.get_args_list()
try:
retval = target_method(*args)
except Exception, e:
if e.__module__ == '__main__':
# FIXME: is it right to use .__name__ here?
error_name = e.__class__.__name__
else:
error_name = e.__module__ + '.' + str(e.__class__.__name__)
error_contents = str(e)
reply = dbus_bindings.Error(message, error_name, error_contents)
else:
reply = dbus_bindings.MethodReturn(message)
if retval != None:
iter = reply.get_iter()
iter.append(retval)
self._connection.send(reply)
def _build_method_dictionary(self, methods):
method_dict = {}
for method in methods:
if method_dict.has_key(method.__name__):
print ('WARNING: registering DBus Object methods, already have a method named %s' % (method.__name__))
method_dict[method.__name__] = method
return method_dict
class RemoteService:
"""A remote service providing objects.
A service is typically a process or application that provides
remote objects, but can also be the broadcast service that
receives signals from all applications on the Bus.
"""
def __init__(self, connection, service_name):
self._connection = connection
self._service_name = service_name
def get_object(self, object_path, interface):
"""Get an object provided by this Service that implements a
particular interface. object_path is a string of the form
'/com/widgetcorp/MyService/MyObject1'. interface looks a lot
like a service_name (they're often the same) and is of the form,
'com.widgetcorp.MyInterface', and mostly just defines the
set of member functions that will be present in the object.
"""
return RemoteObject(self._connection, self._service_name, object_path, interface)

917
python/dbus_bindings.pyx.in Normal file
View file

@ -0,0 +1,917 @@
# -*- Mode: Python -*-
#include "dbus_h_wrapper.h"
cdef extern from "stdlib.h":
cdef void *malloc(size_t size)
cdef void free(void *ptr)
cdef extern from "dbus-glib.h":
ctypedef struct GMainContext
cdef void dbus_connection_setup_with_g_main (DBusConnection *connection,
GMainContext *context)
cdef void dbus_server_setup_with_g_main (DBusServer *server,
GMainContext *context)
cdef extern from "Python.h":
void Py_XINCREF (object)
void Py_XDECREF (object)
ctypedef struct DBusError:
char *name
char *message
unsigned int dummy1
unsigned int dummy2
unsigned int dummy3
unsigned int dummy4
unsigned int dummy5
void *padding1
ctypedef struct DBusMessageIter:
void *dummy1
void *dummy2
dbus_uint32_t dummy3
int dummy4
int dummy5
int dummy6
int dummy7
int dummy8
int dummy9
int dummy10
int dummy11
int pad1
int pad2
void *pad3
ctypedef struct DBusObjectPathVTable:
DBusObjectPathUnregisterFunction unregister_function
DBusObjectPathMessageFunction message_function
void (* dbus_internal_pad1) (void *)
void (* dbus_internal_pad2) (void *)
void (* dbus_internal_pad3) (void *)
void (* dbus_internal_pad4) (void *)
_user_data_references = [ ]
class DBusException(Exception):
pass
class ConnectionError(Exception):
pass
cdef void cunregister_function_handler (DBusConnection *connection,
void *user_data):
tup = <object>user_data
assert (type(tup) == list)
function = tup[1]
args = [Connection(_conn=<object>connection)]
function(*args)
cdef DBusHandlerResult cmessage_function_handler (DBusConnection *connection,
DBusMessage *msg,
void *user_data):
tup = <object>user_data
assert (type(tup) == list)
function = tup[0]
message = Message(_create=0)
message._set_msg(<object>msg)
args = [Connection(_conn=<object>connection),
message]
retval = function(*args)
if (retval == None):
retval = DBUS_HANDLER_RESULT_HANDLED
return retval
cdef class Connection:
cdef DBusConnection *conn
# FIXME: this is a major major hack. We use this because casting values to
# python objects and returning seemed to be corrupting them. This is a "global variable" :-(
cdef char **_parsed_path
def __init__(self, address=None, _conn=None):
cdef DBusError error
dbus_error_init(&error)
if <DBusConnection*>_conn != NULL:
self.conn = <DBusConnection*>_conn
dbus_connection_ref(self.conn)
else:
self.conn = dbus_connection_open(address,
&error)
if dbus_error_is_set(&error):
raise DBusException, error.message
dbus_connection_ref(self.conn)
def _set_conn(self, conn):
self.conn = <DBusConnection*>conn
def _get_conn(self):
return <object>self.conn
#FIXME: this is totally busted, don't use a class shared member like parsed_path
def _build_parsed_path(self, path_element_list):
cdef char **cpatharray
size = len(path_element_list)
cpatharray = <char **>malloc(sizeof(char*) * (size + 1))
for i in range(size):
path_element = path_element_list[i]
cpatharray[i] = path_element
cpatharray[size] = NULL
self._parsed_path = cpatharray
def get_base_service(self):
return bus_get_base_service(self)
def setup_with_g_main(self):
dbus_connection_setup_with_g_main(self.conn, NULL)
def disconnect(self):
dbus_connection_disconnect(self.conn)
def get_is_connected(self):
return dbus_connection_get_is_connected(self.conn)
def get_is_authenticated(self):
return dbus_connection_get_is_authenticated(self.conn)
def flush(self):
dbus_connection_flush(self.conn)
def borrow_message(self):
m = Message(_create=0)
m._set_msg(<object>dbus_connection_borrow_message(self.conn))
return m
def return_message(self, message):
msg = message._get_msg()
dbus_connection_return_message(self.conn, <DBusMessage*>msg)
def steal_borrowed_message(self, message):
msg = message._get_msg()
dbus_connection_steal_borrowed_message(self.conn,
<DBusMessage*>msg)
def pop_message(self):
cdef DBusMessage *msg
msg = dbus_connection_pop_message(self.conn)
if msg != NULL:
m = Message(_create=0)
m._set_msg(<object>msg)
else:
m = None
return m
def get_dispatch_status(self):
return dbus_connection_get_dispatch_status(self.conn)
def dispatch(self):
return dbus_connection_dispatch(self.conn)
def send(self, message):
#cdef dbus_uint32_t client_serial
#if type(message) != Message:
# raise TypeError
msg = message._get_msg()
retval = dbus_connection_send(self.conn,
<DBusMessage*>msg,
NULL)
return retval
def send_with_reply(self, message, timeout_milliseconds):
cdef dbus_bool_t retval
cdef DBusPendingCall *cpending_call
cdef DBusError error
dbus_error_init(&error)
cpending_call = NULL
msg = message._get_msg()
retval = dbus_connection_send_with_reply(self.conn,
<DBusMessage*>msg,
&cpending_call,
timeout_milliseconds)
if dbus_error_is_set(&error):
raise DBusException, error.message
if (cpending_call != NULL):
pending_call = PendingCall(<object>cpending_call)
else:
pending_call = None
return (retval, pending_call)
def send_with_reply_and_block(self, message,
timeout_milliseconds=0):
cdef DBusMessage * retval
cdef DBusError error
dbus_error_init(&error)
msg = message._get_msg()
retval = dbus_connection_send_with_reply_and_block(
<DBusConnection*>self.conn,
<DBusMessage*>msg,
<int>timeout_milliseconds,
&error)
if dbus_error_is_set(&error):
raise DBusException, error.message
if retval == NULL:
raise AssertionError
m = Message(_create=0)
m._set_msg(<object>retval)
return m
def set_watch_functions(self, add_function, remove_function, data):
pass
def set_timeout_functions(self, add_function, remove_function, data):
pass
def set_wakeup_main_function(self, wakeup_main_function, data):
pass
# FIXME: set_dispatch_status_function, get_unix_user, set_unix_user_function
def add_filter(self, filter_function):
user_data = [ filter_function ]
global _user_data_references
_user_data_references.append(user_data)
return dbus_connection_add_filter(self.conn,
cmessage_function_handler,
<void*>user_data,
NULL)
#FIXME: remove_filter
# this is pretty tricky, we want to only remove the filter
# if we truly have no more calls to our message_function_handler...ugh
def set_data(self, slot, data):
pass
def get_data(self, slot):
pass
def set_max_message_size(self, size):
dbus_connection_set_max_message_size(self.conn, size)
def get_max_message_size(self):
return dbus_connection_get_max_message_size(self.conn)
def set_max_received_size(self, size):
dbus_connection_set_max_received_size(self.conn, size)
def get_max_received_size(self):
return dbus_connection_get_max_received_size(self.conn)
def get_outgoing_size(self):
return dbus_connection_get_outgoing_size(self.conn)
# preallocate_send, free_preallocated_send, send_preallocated
def register_object_path(self, path, unregister_cb, message_cb):
cdef DBusObjectPathVTable cvtable
cvtable.unregister_function = cunregister_function_handler
cvtable.message_function = cmessage_function_handler
user_data = [message_cb, unregister_cb]
global _user_data_references
_user_data_references.append(user_data)
path_element_list = path[1:].split('/')
self._build_parsed_path(path_element_list)
return dbus_connection_register_object_path(self.conn, self._parsed_path, &cvtable,
<void*>user_data)
def register_fallback(self, path, unregister_cb, message_cb):
cdef DBusObjectPathVTable cvtable
cvtable.unregister_function = cunregister_function_handler
cvtable.message_function = cmessage_function_handler
user_data = [message_cb, unregister_cb]
global _user_data_references
_user_data_references.append(user_data)
path_element_list = path[1:].split('/')
self._build_parsed_path(path_element_list)
return dbus_connection_register_fallback(self.conn, self._parsed_path, &cvtable,
<void*>user_data)
#FIXME: unregister_object_path , see problems with remove_filter
def list_registered (self, parent_path):
cdef char **cchild_entries
cdef dbus_bool_t retval
path_element_list = parent_path[1:].split('/')
self._build_parsed_path(path_element_list)
retval = dbus_connection_list_registered(self.conn, self._parsed_path, &cchild_entries)
if (not retval):
#FIXME: raise out of memory exception?
return None
i = 0
child_entries = []
while (cchild_entries[i] != NULL):
child_entries.append(cchild_entries[i])
i = i + 1
dbus_free_string_array(cchild_entries)
return child_entries
cdef class PendingCall:
cdef DBusPendingCall *pending_call
def __init__(self, _pending_call):
self.pending_call = <DBusPendingCall*>_pending_call
dbus_pending_call_ref(self.pending_call)
def _get_pending_call(self):
return <object>self.pending_call
def cancel(self):
dbus_pending_call_cancel(self.pending_call)
def get_completed(self):
return dbus_pending_call_get_completed(self.pending_call)
def get_reply(self):
message = Message(_create=0)
message._set_msg(<object>dbus_pending_call_get_reply(self.pending_call))
return message
def block(self):
dbus_pending_call_block(self.pending_call)
cdef class Watch:
cdef DBusWatch* watch
def __init__(self, cwatch):
self.watch = <DBusWatch*>cwatch
def get_fd(self):
return dbus_watch_get_fd(self.watch)
# FIXME: not picked up correctly by extract.py
#def get_flags(self):
# return dbus_watch_get_flags(self.watch)
def handle(self, flags):
return dbus_watch_handle(self.watch, flags)
def get_enabled(self):
return dbus_watch_get_enabled(self.watch)
cdef class MessageIter:
cdef DBusMessageIter *iter
cdef DBusMessageIter real_iter
def __init__(self, message):
self.iter = &self.real_iter
msg = message._get_msg()
dbus_message_iter_init(<DBusMessage*>msg, self.iter)
def get_iter(self):
return <object>self.iter
def has_next(self):
return dbus_message_iter_has_next(self.iter)
def next(self):
return dbus_message_iter_next(self.iter)
def get(self):
arg_type = self.get_arg_type()
if arg_type == TYPE_INVALID:
raise TypeError, 'Invalid arg type in MessageIter'
elif arg_type == TYPE_STRING:
retval = self.get_string()
elif arg_type == TYPE_INT32:
retval = self.get_int32()
elif arg_type == TYPE_UINT32:
retval = self.get_uint32()
elif arg_type == TYPE_DOUBLE:
retval = self.get_double()
elif arg_type == TYPE_BYTE:
retval = self.get_byte()
elif arg_type == TYPE_BOOLEAN:
retval = self.get_boolean()
elif arg_type == TYPE_ARRAY:
array_type = self.get_array_type()
if array_type == TYPE_STRING:
retval = self.get_string_array()
elif array_type == TYPE_BOOLEAN:
retval = self.get_boolean_array()
else:
raise TypeError, "Unknown array type %d in MessageIter" % (array_type)
else:
raise TypeError, 'Unknown arg type %d in MessageIter' % (argtype)
return retval
def get_arg_type(self):
return dbus_message_iter_get_arg_type(self.iter)
def get_array_type(self):
return dbus_message_iter_get_array_type(self.iter)
#FIXME: implement get_byte
#def get_byte(self):
# return dbus_message_iter_get_byte(self.iter)
def get_boolean(self):
return dbus_message_iter_get_boolean(self.iter)
def get_int32(self):
return dbus_message_iter_get_int32(self.iter)
def get_uint32(self):
return dbus_message_iter_get_uint32(self.iter)
def get_double(self):
return dbus_message_iter_get_double(self.iter)
def get_string(self):
return dbus_message_iter_get_string(self.iter)
def get_dict_key(self):
return dbus_message_iter_get_dict_key(self.iter)
# FIXME: implement dbus_message_iter_get_named
# dbus_message_iter_init_array_iterator
def get_byte_array(self):
cdef int len
cdef unsigned char *retval
dbus_message_iter_get_byte_array(self.iter, &retval, <int*>&len)
list = []
for i from 0 <= i < len:
list.append(chr(retval[i]))
return list
# FIXME: implement dbus_message_iter_get_boolean_array
# dbus_message_iter_get_int32_array
# dbus_message_iter_get_uint32_array
# dbus_message_iter_get_double_array
def get_string_array(self):
cdef int len
cdef char **retval
dbus_message_iter_get_string_array(self.iter, &retval, <int*>&len)
list = []
for i from 0 <= i < len:
list.append(retval[i])
return list
# dbus_message_append_iter_init included in class Message
#FIXME: handle all the different types?
def append(self, value):
value_type = type(value)
if value_type == bool:
retval = self.append_boolean(value)
elif value_type == int:
retval = self.append_int32(value)
elif value_type == float:
retval = self.append_double(value)
elif value_type == str:
retval = self.append_string(value)
elif value_type == list:
if (len(list) == 1):
raise TypeError, "Empty list"
list_type = type(list[0])
if list_type == str:
self.append_string_array(list)
else:
raise TypeError, "List of unknown type '%s'" % (list_type)
else:
raise TypeError, "Argument of unknown type '%s'" % (value_type)
return retval
def append_nil(self):
return dbus_message_iter_append_nil(self.iter)
def append_boolean(self, value):
return dbus_message_iter_append_boolean(self.iter, value)
def append_byte(self, value):
return dbus_message_iter_append_byte(self.iter, value)
def append_int32(self, value):
return dbus_message_iter_append_int32(self.iter, value)
def append_uint32(self, value):
return dbus_message_iter_append_uint32(self.iter, value)
def append_double(self, value):
return dbus_message_iter_append_double(self.iter, value)
def append_string(self, value):
return dbus_message_iter_append_string(self.iter, value)
# FIXME: dbus_message_iter_append_named
def append_dict_key(self, value):
return dbus_message_iter_append_dict_key(self.iter, value)
# FIXME: append_array, append_dict_array, append_boolean_array, append_int32_array, append_uint32_array, append_double_array
def append_byte_array(self, list):
cdef unsigned char * value
cdef int length
length = len(list)
value = <unsigned char*>malloc(length)
for i from 0 <= i < length:
item = list[i]
if type(item) != str or len(item) != 1:
raise TypeError
value[i] = ord(item)
return dbus_message_iter_append_byte_array(self.iter, value, length)
def append_string_array(self, list):
cdef char **value
cdef int length
length = len(list)
value = <char**>malloc(length)
for i from 0 <= i < length:
item = list[i]
if type(item) != str:
raise TypeError
value[i] = item
return dbus_message_iter_append_string_array(self.iter, value, length)
(MESSAGE_TYPE_INVALID, MESSAGE_TYPE_METHOD_CALL, MESSAGE_TYPE_METHOD_RETURN, MESSAGE_TYPE_ERROR, MESSAGE_TYPE_SIGNAL) = range(5)
(TYPE_INVALID, TYPE_NIL, TYPE_BYTE, TYPE_BOOLEAN, TYPE_INT32, TYPE_UINT32, TYPE_INT64, TYPE_UINT64, TYPE_DOUBLE, TYPE_STRING, TYPE_NAMED, TYPE_ARRAY, TYPE_DICT, TYPE_OBJECT_PATH) = range(14)
cdef class Message:
cdef DBusMessage *msg
def __init__(self, message_type=MESSAGE_TYPE_INVALID,
service=None, path=None, interface=None, method=None,
method_call=None,
name=None,
reply_to=None, error_name=None, error_message=None,
_create=1):
cdef char *cservice
if (service == None):
cservice = NULL
else:
cservice = service
if not _create:
return
if message_type == MESSAGE_TYPE_METHOD_CALL:
self.msg = dbus_message_new_method_call(cservice, path, interface, method)
elif message_type == MESSAGE_TYPE_METHOD_RETURN:
cmsg = method_call._get_msg()
self.msg = dbus_message_new_method_return(<DBusMessage*>cmsg)
elif message_type == MESSAGE_TYPE_SIGNAL:
self.msg = dbus_message_new_signal(path, interface, name)
elif message_type == MESSAGE_TYPE_ERROR:
cmsg = reply_to._get_msg()
self.msg = dbus_message_new_error(<DBusMessage*>cmsg, error_name, error_message)
def type_to_name(self, type):
if type == MESSAGE_TYPE_SIGNAL:
return "signal"
elif type == MESSAGE_TYPE_METHOD_CALL:
return "method call"
elif type == MESSAGE_TYPE_METHOD_RETURN:
return "method return"
elif type == MESSAGE_TYPE_ERROR:
return "error"
else:
return "(unknown message type)"
def __str__(self):
message_type = self.get_type()
sender = self.get_sender()
if sender == None:
sender = "(no sender)"
if (message_type == MESSAGE_TYPE_METHOD_CALL) or (message_type == MESSAGE_TYPE_SIGNAL):
retval = '%s interface=%s; member=%s; sender=%s' % (self.type_to_name(message_type),
self.get_interface(),
self.get_member(),
sender)
elif message_type == MESSAGE_TYPE_METHOD_RETURN:
retval = '%s sender=%s' % (self.type_to_name(message_type),
sender)
elif message_type == MESSAGE_TYPE_ERROR:
retval = '%s name=%s; sender=%s' % (self.type_to_name(message_type),
self.get_error_name(),
sender)
else:
retval = "Message of unknown type %d" % (message_type)
# FIXME: should really use self.convert_to_tuple() here
iter = self.get_iter()
value_at_iter = True
while (value_at_iter):
type = iter.get_arg_type()
if type == TYPE_INVALID:
break
elif type == TYPE_STRING:
str = iter.get_string()
arg = 'string:%s\n' % (str)
elif type == TYPE_INT32:
num = iter.get_int32()
arg = 'int32:%d\n' % (num)
elif type == TYPE_UINT32:
num = iter.get_uint32()
arg = 'uint32:%u\n' % (num)
elif type == TYPE_DOUBLE:
num = iter.get_double()
arg = 'double:%f\n' % (num)
elif type == TYPE_BYTE:
num = iter.get_byte()
arg = 'byte:%d\n' % (num)
elif type == TYPE_BOOLEAN:
bool = iter.get_boolean()
if (bool):
str = "true"
else:
str = "false"
arg = 'boolean:%s\n' % (str)
else:
arg = '(unknown arg type %d)\n' % type
retval = retval + arg
value_at_iter = iter.next()
return retval
def _set_msg(self, msg):
self.msg = <DBusMessage*>msg
def _get_msg(self):
return <object>self.msg
def get_iter(self):
return MessageIter(self)
def get_args_list(self):
retval = [ ]
iter = self.get_iter()
try:
retval.append(iter.get())
except TypeError, e:
return [ ]
value_at_iter = iter.next()
while (value_at_iter):
retval.append(iter.get())
value_at_iter = iter.next()
return retval
# FIXME: implement dbus_message_copy?
def get_type(self):
return dbus_message_get_type(self.msg)
def set_path(self, object_path):
return dbus_message_set_path(self.msg, object_path)
def get_path(self):
return dbus_message_get_path(self.msg)
def set_interface(self, interface):
return dbus_message_set_interface(self.msg, interface)
def get_interface(self):
return dbus_message_get_interface(self.msg)
def set_member(self, member):
return dbus_message_set_member(self.msg, member)
def get_member(self):
return dbus_message_get_member(self.msg)
def set_error_name(self, name):
return dbus_message_set_error_name(self.msg, name)
def get_error_name(self):
return dbus_message_get_error_name(self.msg)
def set_destination(self, destination):
return dbus_message_set_destination(self.msg, destination)
def get_destination(self):
return dbus_message_get_destination(self.msg)
def set_sender(self, sender):
return dbus_message_set_sender(self.msg, sender)
def get_sender(self):
cdef char *sender
sender = dbus_message_get_sender(self.msg)
if (sender == NULL):
return None
else:
return sender
def set_no_reply(self, no_reply):
dbus_message_set_no_reply(self.msg, no_reply)
def get_no_reply(self):
return dbus_message_get_no_reply(self.msg)
def is_method_call(self, interface, method):
return dbus_message_is_method_call(self.msg, interface, method)
def is_signal(self, interface, signal_name):
return dbus_message_is_signal(self.msg, interface, signal_name)
def is_error(self, error_name):
return dbus_message_is_error(self.msg, error_name)
def has_destination(self, service):
return dbus_message_has_destination(self.msg, service)
def has_sender(self, service):
return dbus_message_has_sender(self.msg, service)
def get_serial(self):
return dbus_message_get_serial(self.msg)
def set_reply_serial(self, reply_serial):
return dbus_message_set_reply_serial(self.msg, reply_serial)
def get_reply_serial(self):
return dbus_message_get_reply_serial(self.msg)
#FIXME: dbus_message_get_path_decomposed
# FIXME: all the different dbus_message_*args* methods
class Signal(Message):
def __init__(self, spath, sinterface, sname):
Message.__init__(self, MESSAGE_TYPE_SIGNAL, path=spath, interface=sinterface, name=sname)
class MethodCall(Message):
def __init__(self, mpath, minterface, mmethod):
Message.__init__(self, MESSAGE_TYPE_METHOD_CALL, path=mpath, interface=minterface, method=mmethod)
class MethodReturn(Message):
def __init__(self, method_call):
Message.__init__(self, MESSAGE_TYPE_METHOD_RETURN, method_call=method_call)
class Error(Message):
def __init__(self, reply_to, error_name, error_message):
Message.__init__(self, MESSAGE_TYPE_ERROR, reply_to=reply_to, error_name=error_name, error_message=error_message)
cdef class Server:
cdef DBusServer *server
def __init__(self, address):
cdef DBusError error
dbus_error_init(&error)
self.server = dbus_server_listen(address,
&error)
if dbus_error_is_set(&error):
raise DBusException, error.message
def setup_with_g_main (self):
dbus_server_setup_with_g_main(self.server, NULL)
def disconnect(self):
dbus_server_disconnect(self.server)
def get_is_connected(self):
return dbus_server_get_is_connected(self.server)
# def set_new_connection_function(self, function, data):
# dbus_server_set_new_connection_function(self.conn, function,
# data, NULL)
# def set_watch_functions(self, add_function, remove_function, data):
# dbus_server_set_watch_functions(self.server,
# add_function, remove_function,
# data, NULL)
# def set_timeout_functions(self, add_function, remove_function, data):
# dbus_server_set_timeout_functions(self.server,
# add_function, remove_function,
# data, NULL)
# def handle_watch(self, watch, condition):
# dbus_server_handle_watch(self.conn, watch, condition)
BUS_SESSION = DBUS_BUS_SESSION
BUS_SYSTEM = DBUS_BUS_SYSTEM
BUS_ACTIVATION = DBUS_BUS_ACTIVATION
def bus_get (bus_type):
cdef DBusError error
dbus_error_init(&error)
cdef DBusConnection *connection
connection = dbus_bus_get(bus_type,
&error)
if dbus_error_is_set(&error):
raise DBusException, error.message
return Connection(_conn=<object>connection)
def bus_get_base_service(connection):
conn = connection._get_conn()
return dbus_bus_get_base_service(<DBusConnection*>conn)
def bus_register(connection):
cdef DBusError error
dbus_error_init(&error)
cdef dbus_bool_t retval
conn = connection._get_conn()
retval = dbus_bus_register(<DBusConnection*>conn,
&error)
if dbus_error_is_set(&error):
raise DBusException, error.message
return retval
SERVICE_FLAG_PROHIBIT_REPLACEMENT = 0x1
SERVICE_FLAG_REPLACE_EXISTING = 0x2
def bus_acquire_service(connection, service_name, flags=0):
cdef DBusError error
dbus_error_init(&error)
cdef int retval
conn = connection._get_conn()
retval = dbus_bus_acquire_service(<DBusConnection*>conn,
service_name,
flags,
&error)
if dbus_error_is_set(&error):
raise DBusException, error.message
return retval
def bus_service_exists(connection, service_name):
cdef DBusError error
dbus_error_init(&error)
cdef dbus_bool_t retval
conn = connection._get_conn()
retval = dbus_bus_service_exists(<DBusConnection*>conn,
service_name,
&error)
if dbus_error_is_set(&error):
raise DBusException, error.message
return retval
def bus_add_match(connection, rule):
cdef DBusError error
dbus_error_init(&error)
conn = connection._get_conn()
dbus_bus_add_match (<DBusConnection*>conn, rule, &error)
if dbus_error_is_set(&error):
raise DBusException, error.message
def bus_remove_match(connection, rule):
cdef DBusError error
dbus_error_init(&error)
conn = connection._get_conn()
dbus_bus_remove_match (<DBusConnection*>conn, rule, &error)
if dbus_error_is_set(&error):
raise DBusException, error.message

3
python/dbus_h_wrapper.h Normal file
View file

@ -0,0 +1,3 @@
#define DBUS_API_SUBJECT_TO_CHANGE 1
#include <dbus/dbus.h>

237
python/extract.py Normal file
View file

@ -0,0 +1,237 @@
import commands
import glob
import re
import os
import string
import sys
def clean_func(buf):
buf = strip_comments(buf)
pat = re.compile(r"""\\\n""", re.MULTILINE)
buf = pat.sub('',buf)
pat = re.compile(r"""^[#].*?$""", re.MULTILINE)
buf = pat.sub('',buf)
pat = re.compile(r"""^(typedef|struct|enum)(\s|.|\n)*?;\s*""", re.MULTILINE)
buf = pat.sub('',buf)
pat = re.compile(r"""\s+""", re.MULTILINE)
buf = pat.sub(' ',buf)
pat = re.compile(r""";\s*""", re.MULTILINE)
buf = pat.sub('\n',buf)
buf = buf.lstrip()
#pat=re.compile(r'\s+([*|&]+)\s*(\w+)')
pat = re.compile(r' \s+ ([*|&]+) \s* (\w+)',re.VERBOSE)
buf = pat.sub(r'\1 \2', buf)
pat = re.compile(r'\s+ (\w+) \[ \s* \]',re.VERBOSE)
buf = pat.sub(r'[] \1', buf)
# buf = string.replace(buf, 'G_CONST_RETURN ', 'const-')
buf = string.replace(buf, 'const ', '')
return buf
def strip_comments(buf):
parts = []
lastpos = 0
while 1:
pos = string.find(buf, '/*', lastpos)
if pos >= 0:
parts.append(buf[lastpos:pos])
pos = string.find(buf, '*/', pos)
if pos >= 0:
lastpos = pos + 2
else:
break
else:
parts.append(buf[lastpos:])
break
return string.join(parts, '')
def find_enums(buf):
enums = []
buf = strip_comments(buf)
buf = re.sub('\n', ' ', buf)
enum_pat = re.compile(r'enum\s*{([^}]*)}\s*([A-Z][A-Za-z]*)(\s|;)')
splitter = re.compile(r'\s*,\s', re.MULTILINE)
pos = 0
while pos < len(buf):
m = enum_pat.search(buf, pos)
if not m: break
name = m.group(2)
vals = m.group(1)
isflags = string.find(vals, '<<') >= 0
entries = []
for val in splitter.split(vals):
if not string.strip(val): continue
entries.append(string.split(val)[0])
enums.append((name, isflags, entries))
pos = m.end()
return enums
#typedef unsigned int dbus_bool_t;
#typedef struct {
#
# }
#typedef struct FooStruct FooStruct;
# typedef void (* DBusAddWatchFunction) (DBusWatch *watch,
# void *data);
def find_typedefs(buf):
typedefs = []
buf = re.sub('\n', ' ', strip_comments(buf))
typedef_pat = re.compile(
r"""typedef\s*(?P<type>\w*)
\s*
([(]\s*\*\s*(?P<callback>[\w* ]*)[)]|{([^}]*)}|)
\s*
(?P<args1>[(](?P<args2>[\s\w*,_]*)[)]|[\w ]*)""",
re.MULTILINE | re.VERBOSE)
pat = re.compile(r"""\s+""", re.MULTILINE)
pos = 0
while pos < len(buf):
m = typedef_pat.search(buf, pos)
if not m:
break
if m.group('type') == 'enum':
pos = m.end()
continue
if m.group('args2') != None:
args = pat.sub(' ', m.group('args2'))
current = '%s (* %s) (%s)' % (m.group('type'),
m.group('callback'),
args)
else:
current = '%s %s' % (m.group('type'), m.group('args1'))
typedefs.append(current)
pos = m.end()
return typedefs
proto_pat = re.compile(r"""
(?P<ret>(-|\w|\&|\*)+\s*) # return type
\s+ # skip whitespace
(?P<func>\w+)\s*[(] # match the function name until the opening (
(?P<args>.*?)[)] # group the function arguments
""", re.IGNORECASE|re.VERBOSE)
arg_split_pat = re.compile("\s*,\s*")
def find_functions(buf):
functions = []
buf = clean_func(buf)
buf = string.split(buf,'\n')
for p in buf:
if len(p) == 0:
continue
m = proto_pat.match(p)
if m == None:
continue
func = m.group('func')
ret = m.group('ret')
args = m.group('args')
args = arg_split_pat.split(args)
# for i in range(len(args)):
# spaces = string.count(args[i], ' ')
# if spaces > 1:
# args[i] = string.replace(args[i], ' ', '-', spaces - 1)
functions.append((func, ret, args))
return functions
class Writer:
def __init__(self, filename, enums, typedefs, functions):
if not (enums or typedefs or functions):
return
print 'cdef extern from "%s":' % filename
self.output_enums(enums)
self.output_typedefs(typedefs)
self.output_functions(functions)
print ' pass'
print
def output_enums(self, enums):
for enum in enums:
print ' ctypedef enum %s:' % enum[0]
if enum[1] == 0:
for item in enum[2]:
print ' %s' % item
else:
i = 0
for item in enum[2]:
print ' %s' % item
# print ' %s = 1 << %d' % (item, i)
i += 1
print
def output_typedefs(self, typedefs):
for typedef in typedefs:
if typedef.find('va_list') != -1:
continue
parts = typedef.split()
if parts[0] == 'struct':
if parts[-2] == parts[-1]:
parts = parts[:-1]
print ' ctypedef %s' % ' '.join(parts)
else:
print ' ctypedef %s' % typedef
def output_functions(self, functions):
for func, ret, args in functions:
if func[0] == '_':
continue
str = ', '.join(args)
if str.find('...') != -1:
continue
if str.find('va_list') != -1:
continue
if str.strip() == 'void':
continue
print ' %-20s %s (%s)' % (ret, func, str)
def do_buffer(name, buffer):
functions = find_functions(buffer)
typedefs = find_typedefs(buffer)
enums = find_enums(buffer)
Writer(name, enums, typedefs, functions)
def do_header(filename, name=None):
if name == None:
name = filename
buffer = ""
for line in open(filename).readlines():
if line[0] == '#':
continue
buffer += line
print '# -- %s -- ' % filename
do_buffer(name, buffer)
filename = sys.argv[1]
if filename.endswith('.h'):
do_header(filename)
raise SystemExit
cppflags = ""
for flag in sys.argv[2:]:
cppflags = cppflags + " " + flag
fd = open(filename)
for line in fd.readlines():
if line.startswith('#include'):
filename = line.split(' ')[1][1:-2]
command = "echo '%s'|cpp %s" % (line, cppflags)
sys.stderr.write('running %s' % (command))
output = commands.getoutput(command)
do_buffer(filename, output)
else:
print line[:-1]