dbus/python/dbus_bindings.pyx.in
John (J5) Palmieri 216fa619f3 * python/dbus_bindings.pyx.in: Updated to handle new D-BUS type system
- BUS_ACTIVATION -> BUS_STARTER
	- DBUS_BUS_ACTIVATION -> DBUS_BUS_STARTER
	- class MessageIter (__init__): Added recursion checking
	so we throw a nice error instead of just disconnecting from the
	bus.
	(get): Added arg_type parameter for recursion.
	Removed the nil type
	Added signiture type placeholder (not implemented)
	Added struct type placeholder (not implemented)
	Added varient type placeholder (not implemented)
	Commented out dict type for now
	(get_element_type): renamed from get_array_type
	(get_*): changed to use the dbus_message_iter_get_basic API
	(get_*_array): removed in favor of recursive get_array method
	(get_array): new recursive method which calls get to marshal
        the elements of the array
	(value_to_dbus_sig): New method returns the corrasponding
	dbus signiture to a python value
	(append): Comment out dict handling for now
	Handle lists with the new recursive API
	Comment out None handling for now
	(append_nil): removed
	(append_*): changed to use dbus_message_iter_append_basic API
	(append_*_array): removed in favor of recursive append_array
	method
	(__str__): Make it easier to print out recursive iterators
	for debugging
	- class Message (__str__): moved type inspection to the
	MessageIter class' __str__ method
	(get_iter): Added an append parameter wich defaults to False
	If True use the new API's to create an append iterator

* python/dbus.py: Update to use new bindings API
	- TYPE_ACTIVATION -> TYPE_STARTER
	- class Bus (_get_match_rule): GetServiceOwner -> GetNameOwner
	- class ActivationBus -> class StarterBus
	- class RemoteObject (__call__): get an append iterator
	- (_dispatch_dbus_method_call): get an append iterator
	- class Object (emit_signal): get an append iterator

* python/examples/: Fixed up the examples to work with the new API
2005-01-28 19:09:55 +00:00

1167 lines
36 KiB
Python

# -*- Mode: Python -*-
# jdahlin is the most coolest and awesomest person in the world
# and wrote all the good parts of this code. all the bad parts
# where python conditionals have a ( ) around them, thus violating
# PEP-8 were written by the lame wannabe python programmer seth
#include "dbus_h_wrapper.h"
cdef extern from "stdlib.h":
cdef void *malloc(size_t size)
cdef void free(void *ptr)
cdef void *calloc(size_t nmemb, size_t size)
cdef extern from "dbus-glib.h":
ctypedef struct GMainContext
cdef void dbus_connection_setup_with_g_main (DBusConnection *connection,
GMainContext *context)
cdef void dbus_server_setup_with_g_main (DBusServer *server,
GMainContext *context)
cdef void dbus_g_thread_init ()
cdef extern from "Python.h":
void Py_XINCREF (object)
void Py_XDECREF (object)
object PyString_FromStringAndSize(char *, int)
ctypedef struct DBusError:
char *name
char *message
unsigned int dummy1
unsigned int dummy2
unsigned int dummy3
unsigned int dummy4
unsigned int dummy5
void *padding1
ctypedef struct DBusMessageIter:
void *dummy1
void *dummy2
dbus_uint32_t dummy3
int dummy4
int dummy5
int dummy6
int dummy7
int dummy8
int dummy9
int dummy10
int dummy11
int pad1
int pad2
void *pad3
ctypedef struct DBusObjectPathVTable:
DBusObjectPathUnregisterFunction unregister_function
DBusObjectPathMessageFunction message_function
void (* dbus_internal_pad1) (void *)
void (* dbus_internal_pad2) (void *)
void (* dbus_internal_pad3) (void *)
void (* dbus_internal_pad4) (void *)
_user_data_references = [ ]
class DBusException(Exception):
pass
class ConnectionError(Exception):
pass
class ObjectPath(str):
def __init__(self, value):
str.__init__(value)
class ByteArray(str):
def __init__(self, value):
str.__init__(value)
#forward delcerations
cdef class Connection
cdef class Message
cdef class PendingCall
cdef class Watch
cdef class MessageIter
cdef void cunregister_function_handler (DBusConnection *connection,
void *user_data):
cdef Connection conn
tup = <object>user_data
assert (type(tup) == list)
function = tup[1]
conn = Connection()
conn.__cinit__(None, connection)
args = [conn]
function(*args)
cdef DBusHandlerResult cmessage_function_handler (DBusConnection *connection,
DBusMessage *msg,
void *user_data):
cdef Connection conn
cdef Message message
tup = <object>user_data
assert (type(tup) == list)
function = tup[0]
message = Message(_create=0)
message._set_msg(msg)
conn = Connection()
conn.__cinit__(None, connection)
args = [conn,
message]
retval = function(*args)
if (retval == None):
retval = DBUS_HANDLER_RESULT_HANDLED
return retval
cdef class Connection:
cdef DBusConnection *conn
def __init__(self, address=None, Connection _conn=None):
cdef DBusConnection *c_conn
cdef char *c_address
c_conn=NULL
if (_conn != None):
c_conn = _conn.conn
if (address != None or _conn != None):
self.__cinit__(c_address, c_conn)
# hack to be able to pass in a c pointer to the constructor
# while still alowing python programs to create a Connection object
cdef __cinit__(self, address, DBusConnection *_conn):
cdef DBusError error
dbus_error_init(&error)
if _conn != NULL:
self.conn = _conn
dbus_connection_ref(self.conn)
else:
self.conn = dbus_connection_open(address,
&error)
if dbus_error_is_set(&error):
raise DBusException, error.message
cdef _set_conn(self, DBusConnection *conn):
self.conn = conn
cdef DBusConnection *_get_conn(self):
return self.conn
def get_unique_name(self):
return bus_get_unique_name(self)
def setup_with_g_main(self):
dbus_connection_setup_with_g_main(self.conn, NULL)
def disconnect(self):
dbus_connection_disconnect(self.conn)
def get_is_connected(self):
return dbus_connection_get_is_connected(self.conn)
def get_is_authenticated(self):
return dbus_connection_get_is_authenticated(self.conn)
def flush(self):
dbus_connection_flush(self.conn)
def borrow_message(self):
cdef Message m
m = Message(_create=0)
m._set_msg(dbus_connection_borrow_message(self.conn))
return m
def return_message(self, Message message):
cdef DBusMessage *msg
msg = message._get_msg()
dbus_connection_return_message(self.conn, msg)
def steal_borrowed_message(self, Message message):
cdef DBusMessage *msg
msg = message._get_msg()
dbus_connection_steal_borrowed_message(self.conn,
msg)
def pop_message(self):
cdef DBusMessage *msg
cdef Message m
msg = dbus_connection_pop_message(self.conn)
if msg != NULL:
m = Message(_create=0)
m._set_msg(msg)
else:
m = None
return m
def get_dispatch_status(self):
return dbus_connection_get_dispatch_status(self.conn)
def dispatch(self):
return dbus_connection_dispatch(self.conn)
def send(self, Message message):
#cdef dbus_uint32_t client_serial
#if type(message) != Message:
# raise TypeError
cdef DBusMessage *msg
msg = message._get_msg()
retval = dbus_connection_send(self.conn,
msg,
NULL)
return retval
def send_with_reply(self, Message message, timeout_milliseconds):
cdef dbus_bool_t retval
cdef DBusPendingCall *cpending_call
cdef DBusError error
cdef DBusMessage *msg
cdef PendingCall pending_call
dbus_error_init(&error)
cpending_call = NULL
msg = message._get_msg()
retval = dbus_connection_send_with_reply(self.conn,
msg,
&cpending_call,
timeout_milliseconds)
if dbus_error_is_set(&error):
raise DBusException, error.message
if (cpending_call != NULL):
pending_call = PendingCall()
pending_call.__cinit__(cpending_call)
else:
pending_call = None
return (retval, pending_call)
def send_with_reply_and_block(self, Message message,
timeout_milliseconds=0):
cdef DBusMessage * retval
cdef DBusError error
cdef DBusMessage *msg
cdef Message m
dbus_error_init(&error)
msg = message._get_msg()
retval = dbus_connection_send_with_reply_and_block(
self.conn,
msg,
timeout_milliseconds,
&error)
if dbus_error_is_set(&error):
raise DBusException, error.message
if retval == NULL:
raise AssertionError
m = Message(_create=0)
m._set_msg(retval)
return m
def set_watch_functions(self, add_function, remove_function, data):
pass
def set_timeout_functions(self, add_function, remove_function, data):
pass
def set_wakeup_main_function(self, wakeup_main_function, data):
pass
# FIXME: set_dispatch_status_function, get_unix_user, set_unix_user_function
def add_filter(self, filter_function):
user_data = [ filter_function ]
global _user_data_references
_user_data_references.append(user_data)
return dbus_connection_add_filter(self.conn,
cmessage_function_handler,
<void*>user_data,
NULL)
#FIXME: remove_filter
# this is pretty tricky, we want to only remove the filter
# if we truly have no more calls to our message_function_handler...ugh
def set_data(self, slot, data):
pass
def get_data(self, slot):
pass
def set_max_message_size(self, size):
dbus_connection_set_max_message_size(self.conn, size)
def get_max_message_size(self):
return dbus_connection_get_max_message_size(self.conn)
def set_max_received_size(self, size):
dbus_connection_set_max_received_size(self.conn, size)
def get_max_received_size(self):
return dbus_connection_get_max_received_size(self.conn)
def get_outgoing_size(self):
return dbus_connection_get_outgoing_size(self.conn)
# preallocate_send, free_preallocated_send, send_preallocated
def register_object_path(self, path, unregister_cb, message_cb):
cdef DBusObjectPathVTable cvtable
cvtable.unregister_function = cunregister_function_handler
cvtable.message_function = cmessage_function_handler
user_data = [message_cb, unregister_cb]
global _user_data_references
_user_data_references.append(user_data)
return dbus_connection_register_object_path(self.conn, path, &cvtable,
<void*>user_data)
def register_fallback(self, path, unregister_cb, message_cb):
cdef DBusObjectPathVTable cvtable
cvtable.unregister_function = cunregister_function_handler
cvtable.message_function = cmessage_function_handler
user_data = [message_cb, unregister_cb]
global _user_data_references
_user_data_references.append(user_data)
return dbus_connection_register_fallback(self.conn, path, &cvtable,
<void*>user_data)
#FIXME: unregister_object_path , see problems with remove_filter
def list_registered (self, parent_path):
cdef char **cchild_entries
cdef dbus_bool_t retval
retval = dbus_connection_list_registered(self.conn, parent_path, &cchild_entries)
if (not retval):
#FIXME: raise out of memory exception?
return None
i = 0
child_entries = []
while (cchild_entries[i] != NULL):
child_entries.append(cchild_entries[i])
i = i + 1
dbus_free_string_array(cchild_entries)
return child_entries
cdef class PendingCall:
cdef DBusPendingCall *pending_call
def __init__(self, PendingCall _pending_call=None):
if (_pending_call != None):
self.__cinit__(_pending_call.pending_call)
cdef void __cinit__(self, DBusPendingCall *_pending_call):
self.pending_call = _pending_call
dbus_pending_call_ref(self.pending_call)
cdef DBusPendingCall *_get_pending_call(self):
return self.pending_call
def cancel(self):
dbus_pending_call_cancel(self.pending_call)
def get_completed(self):
return dbus_pending_call_get_completed(self.pending_call)
def get_reply(self):
cdef Message message
message = Message(_create=0)
message._set_msg(dbus_pending_call_get_reply(self.pending_call))
return message
def block(self):
dbus_pending_call_block(self.pending_call)
cdef class Watch:
cdef DBusWatch* watch
def __init__(self):
pass
cdef __cinit__(self, DBusWatch *cwatch):
self.watch = cwatch
def get_fd(self):
return dbus_watch_get_fd(self.watch)
# FIXME: not picked up correctly by extract.py
#def get_flags(self):
# return dbus_watch_get_flags(self.watch)
def handle(self, flags):
return dbus_watch_handle(self.watch, flags)
def get_enabled(self):
return dbus_watch_get_enabled(self.watch)
cdef class MessageIter:
cdef DBusMessageIter *iter
cdef DBusMessageIter real_iter
cdef dbus_uint32_t level
def __init__(self, level=0):
self.iter = &self.real_iter
self.level = level
#don't allow us to recurse forever
#FIXME: what is a sane limit?
if(self.level > 100):
raise TypeError, 'Type recurion is too deep'
cdef __cinit__(self, DBusMessageIter *iter):
self.real_iter = iter[0]
cdef DBusMessageIter *_get_iter(self):
return self.iter
def has_next(self):
return dbus_message_iter_has_next(self.iter)
def next(self):
return dbus_message_iter_next(self.iter)
def get(self, arg_type=None):
if(arg_type == None):
arg_type = self.get_arg_type()
if arg_type == TYPE_INVALID:
raise TypeError, 'Invalid arg type in MessageIter'
elif arg_type == TYPE_STRING:
retval = self.get_string()
elif arg_type == TYPE_INT32:
retval = self.get_int32()
elif arg_type == TYPE_UINT32:
retval = self.get_uint32()
elif arg_type == TYPE_INT64:
retval = self.get_int64()
elif arg_type == TYPE_UINT64:
retval = self.get_uint64()
elif arg_type == TYPE_DOUBLE:
retval = self.get_double()
elif arg_type == TYPE_BYTE:
retval = self.get_byte()
elif arg_type == TYPE_BOOLEAN:
retval = self.get_boolean()
elif arg_type == TYPE_SIGNATURE:
raise TypeError, 'Signitures not implemented yet!'
elif arg_type == TYPE_ARRAY:
array_type = self.get_element_type()
retval = self.get_array(array_type)
#elif arg_type == TYPE_DICT:
# retval = self.get_dict()
# TODO: Implement DICT when new type system implements them
elif arg_type == TYPE_OBJECT_PATH:
retval = self.get_object_path()
elif arg_type == TYPE_STRUCT:
raise TypeError, 'Structs not implemented yet!'
#TODO: implement structs
elif arg_type == TYPE_VARIANT:
raise TypeError, 'Varients not implemented yet!'
#TODO: implement variants
else:
raise TypeError, 'Unknown arg type %d in MessageIter' % (arg_type)
return retval
# TODO: Implement get_dict when DBUS supports dicts again
# def get_dict(self):
# cdef DBusMessageIter c_dict_iter
# cdef MessageIter dict_iter
#
# dbus_message_iter_recurse(self.iter, &c_dict_iter)
#
# dict_iter = MessageIter()
# dict_iter.__cinit__(&c_dict_iter)
#
# dict = {}
#
# end_of_dict = False
#
# while True:
# key = dict_iter.get_dict_key()
# value = dict_iter.get()
# dict[key] = value
# if not dict_iter.has_next():
# break
# dict_iter.next()
#
# return dict
def get_arg_type(self):
return dbus_message_iter_get_arg_type(self.iter)
def get_element_type(self):
return dbus_message_iter_get_element_type(self.iter)
def get_byte(self):
cdef char c_val
dbus_message_iter_get_basic(self.iter, <char *>&c_val)
return c_val
def get_boolean(self):
cdef dbus_bool_t c_val
dbus_message_iter_get_basic(self.iter, <dbus_bool_t *>&c_val)
return c_val
def get_int32(self):
cdef dbus_int32_t c_val
dbus_message_iter_get_basic(self.iter, <dbus_int32_t *>&c_val)
return c_val
def get_uint32(self):
cdef dbus_uint32_t c_val
dbus_message_iter_get_basic(self.iter, <dbus_uint32_t *>&c_val)
return c_val
def get_int64(self):
cdef dbus_int64_t c_val
dbus_message_iter_get_basic(self.iter, <dbus_int64_t *>&c_val)
return c_val
def get_uint64(self):
cdef dbus_uint64_t c_val
dbus_message_iter_get_basic(self.iter, <dbus_uint64_t *>&c_val)
return c_val
def get_double(self):
cdef double c_val
dbus_message_iter_get_basic(self.iter, <double *>&c_val)
return c_val
def get_string(self):
cdef char *c_str
dbus_message_iter_get_basic(self.iter, <char **>&c_str)
return c_str
def get_object_path(self):
object_path_string = self.get_string()
return ObjectPath(object_path_string)
# TODO: Implement dict when DBUS supports it again
# def get_dict_key(self):
# return dbus_message_iter_get_dict_key(self.iter)
def get_array(self, type):
cdef DBusMessageIter c_array_iter
cdef MessageIter array_iter
level = self.level + 1
dbus_message_iter_recurse(self.iter, <DBusMessageIter *>&c_array_iter)
array_iter = MessageIter(level)
array_iter.__cinit__(&c_array_iter)
python_list = []
while True:
value = array_iter.get(type)
python_list.append(value)
if not array_iter.has_next():
break
array_iter.next()
return python_list
#FIXME: handle all the different types?
def python_value_to_dbus_sig(self, value):
ptype = type(value)
ret = ""
if ptype == bool:
ret = TYPE_BOOL
elif ptype == int:
ret = TYPE_INT32
elif ptype == long:
ret = TYPE_INT64
elif ptype == str:
ret = TYPE_STRING
elif ptype == float:
ret = TYPE_FLOAT
# elif ptype == dict:
# TODO: Implement dict when DBUS supports it again
elif ptype == list:
ret = TYPE_ARRAY
elif isinstance(value, ObjectPath):
ret = TYPE_PATH
else:
raise TypeError, "Argument of unknown type '%s'" % (ptype)
return str(chr(ret))
#FIXME: handle all the different types?
def append(self, value):
value_type = type(value)
if value_type == bool:
retval = self.append_boolean(value)
elif value_type == int:
retval = self.append_int32(value)
elif value_type == long:
retval = self.append_int64(value)
elif value_type == str:
retval = self.append_string(value)
elif value_type == float:
retval = self.append_double(value)
# elif value_type == dict:
# retval = self.append_dict(value)
# TODO: Implement dict when DBUS supports it again
elif value_type == list:
retval = self.append_array(value)
#elif value_type == None.__class__:
# retval = self.append_nil()
elif isinstance(value, ObjectPath):
retval = self.append_object_path(value)
elif isinstance(value, ByteArray):
retval = self.append_array(value)
else:
raise TypeError, "Argument of unknown type '%s'" % (value_type)
return retval
def append_boolean(self, value):
cdef dbus_bool_t c_value
c_value = value
return dbus_message_iter_append_basic(self.iter, TYPE_BOOLEAN, <dbus_bool_t *>&c_value)
def append_byte(self, value):
cdef char b
if type(value) != str or len(value) != 1:
raise TypeError
b = ord(value)
return dbus_message_iter_append_basic(self.iter, TYPE_BYTE, <char *>&b)
def append_int32(self, value):
cdef dbus_int32_t c_value
c_value = value
return dbus_message_iter_append_basic(self.iter, TYPE_INT32, <dbus_int32_t *>&c_value)
def append_uint32(self, value):
cdef dbus_uint32_t c_value
c_value = value
return dbus_message_iter_append_basic(self.iter, TYPE_UINT32, <dbus_uint32_t *>&c_value)
def append_int64(self, value):
cdef dbus_int64_t c_value
c_value = value
return dbus_message_iter_append_basic(self.iter, TYPE_INT64, <dbus_int64_t *>&c_value)
def append_uint64(self, value):
cdef dbus_uint64_t c_value
c_value = value
return dbus_message_iter_append_basic(self.iter, TYPE_UINT64, <dbus_uint64_t *>&c_value)
def append_double(self, value):
cdef double c_value
c_value = value
return dbus_message_iter_append_basic(self.iter, TYPE_DOUBLE, <double *>&c_value)
def append_string(self, value):
cdef char *c_value
c_value = value
return dbus_message_iter_append_basic(self.iter, TYPE_STRING, <char **>&c_value)
# TODO: Implement dict when DBUS supports it again
# def append_dict_key(self, value):
# return dbus_message_iter_append_dict_key(self.iter, value)
def append_object_path(self, value):
cdef char *c_value
c_value = value
return dbus_message_iter_append_basic(self.iter, TYPE_PATH, <char **>&c_value)
# FIXME: append_array, append_boolean_array, append_uint32_array,
# append_uint64_array
#TODO: Implement dict when DBUS supports it again
# def append_dict(self, python_dict):
# cdef DBusMessageIter c_dict_iter
# cdef MessageIter dict_iter
#
# dbus_message_iter_append_dict(self.iter, &c_dict_iter)
#
# dict_iter = MessageIter()
# dict_iter.__cinit__(&c_dict_iter)
#
# for key, value in python_dict.iteritems():
# if type(key) != str:
# raise TypeError, "DBus dict keys must be strings"
# dict_iter.append_dict_key(key)
# dict_iter.append(value)
def append_array(self, python_list):
cdef DBusMessageIter c_array_iter
cdef MessageIter array_iter
level = self.level + 1
sig = self.python_value_to_dbus_sig(python_list[0])
dbus_message_iter_open_container(self.iter, TYPE_ARRAY, sig, <DBusMessageIter *>&c_array_iter)
array_iter = MessageIter(level)
array_iter.__cinit__(&c_array_iter)
length = len(python_list)
for item in python_list:
if not array_iter.append(item):
dbus_message_iter_close_container(self.iter, array_iter.iter)
return False
dbus_message_iter_close_container(self.iter, array_iter.iter)
return True
def __str__(self):
cdef DBusMessageIter c_array_iter
cdef MessageIter array_iter
value_at_iter = True
retval = ""
while (value_at_iter):
type = self.get_arg_type()
if type == TYPE_INVALID:
break
elif type == TYPE_STRING:
str = iter.get_string()
arg = 'string:%s\n' % (str)
elif type == TYPE_OBJECT_PATH:
path = iter.get_object_path()
arg = 'object_path:%s\n' % (path)
elif type == TYPE_INT32:
num = iter.get_int32()
arg = 'int32:%d\n' % (num)
elif type == TYPE_UINT32:
num = iter.get_uint32()
arg = 'uint32:%u\n' % (num)
elif type == TYPE_INT64:
num = iter.get_int64()
arg = 'int64:%d\n' % (num)
elif type == TYPE_UINT64:
num = iter.get_uint64()
arg = 'uint64:%u\n' % (num)
elif type == TYPE_DOUBLE:
num = iter.get_double()
arg = 'double:%f\n' % (num)
elif type == TYPE_BYTE:
num = iter.get_byte()
arg = 'byte:%x(%s)\n' % (num, str(chr(num)))
elif type == TYPE_BOOLEAN:
bool = iter.get_boolean()
if (bool):
str = "true"
else:
str = "false"
arg = 'boolean:%s\n' % (str)
elif type == TYPE_ARRAY:
dbus_message_iter_recurse(self.iter, <DBusMessageIter *>&c_array_iter)
array_iter = MessageIter(self.level + 1)
array_iter.__cinit__(&c_array_iter)
if array_iter.has_next():
arg = 'array [' + str(array_iter) + ']'
else:
arg = 'array []'
else:
arg = '(unknown arg type %d)\n' % type
retval = retval + arg
value_at_iter = self.next()
return retval
(MESSAGE_TYPE_INVALID, MESSAGE_TYPE_METHOD_CALL, MESSAGE_TYPE_METHOD_RETURN, MESSAGE_TYPE_ERROR, MESSAGE_TYPE_SIGNAL) = range(5)
(TYPE_INVALID, TYPE_BYTE, TYPE_BOOLEAN, TYPE_INT32, TYPE_UINT32, TYPE_INT64, TYPE_UINT64, TYPE_DOUBLE, TYPE_STRING, TYPE_OBJECT_PATH, TYPE_SIGNATURE, TYPE_ARRAY, TYPE_STRUCT, TYPE_STRUCT_START, TYPE_STRUCT_END, TYPE_VARIENT) = (0, ord('y'), ord('b'), ord('i'), ord('u'), ord('x'), ord('t'), ord('d'), ord('s'), ord('o'), ord('g'), ord('a'), ord('r'), ord('('), ord(')'), ord('v'))
(HANDLER_RESULT_HANDLED, HANDLER_RESULT_NOT_YET_HANDLED, HANDLER_RESULT_NEED_MEMORY) = range(3)
cdef class Message:
cdef DBusMessage *msg
def __init__(self, message_type=MESSAGE_TYPE_INVALID,
service=None, path=None, interface=None, method=None,
Message method_call=None,
name=None,
Message reply_to=None, error_name=None, error_message=None,
_create=1):
cdef char *cservice
cdef DBusMessage *cmsg
if (service == None):
cservice = NULL
else:
cservice = service
if not _create:
return
if message_type == MESSAGE_TYPE_METHOD_CALL:
self.msg = dbus_message_new_method_call(cservice, path, interface, method)
elif message_type == MESSAGE_TYPE_METHOD_RETURN:
cmsg = method_call._get_msg()
self.msg = dbus_message_new_method_return(cmsg)
elif message_type == MESSAGE_TYPE_SIGNAL:
self.msg = dbus_message_new_signal(path, interface, name)
elif message_type == MESSAGE_TYPE_ERROR:
cmsg = reply_to._get_msg()
self.msg = dbus_message_new_error(cmsg, error_name, error_message)
def type_to_name(self, type):
if type == MESSAGE_TYPE_SIGNAL:
return "signal"
elif type == MESSAGE_TYPE_METHOD_CALL:
return "method call"
elif type == MESSAGE_TYPE_METHOD_RETURN:
return "method return"
elif type == MESSAGE_TYPE_ERROR:
return "error"
else:
return "(unknown message type)"
def __str__(self):
message_type = self.get_type()
sender = self.get_sender()
if sender == None:
sender = "(no sender)"
if (message_type == MESSAGE_TYPE_METHOD_CALL) or (message_type == MESSAGE_TYPE_SIGNAL):
retval = '%s interface=%s; member=%s; sender=%s' % (self.type_to_name(message_type),
self.get_interface(),
self.get_member(),
sender)
elif message_type == MESSAGE_TYPE_METHOD_RETURN:
retval = '%s sender=%s' % (self.type_to_name(message_type),
sender)
elif message_type == MESSAGE_TYPE_ERROR:
retval = '%s name=%s; sender=%s' % (self.type_to_name(message_type),
self.get_error_name(),
sender)
else:
retval = "Message of unknown type %d" % (message_type)
# FIXME: should really use self.convert_to_tuple() here
iter = self.get_iter()
retval = retval + "\n" + str(iter)
return retval
cdef _set_msg(self, DBusMessage *msg):
self.msg = msg
cdef DBusMessage *_get_msg(self):
return self.msg
def get_iter(self, append=False):
cdef DBusMessageIter iter
cdef MessageIter message_iter
cdef DBusMessage *msg
msg = self._get_msg()
if append:
dbus_message_iter_init_append(msg, &iter)
else:
dbus_message_iter_init(msg, &iter)
message_iter = MessageIter(0)
message_iter.__cinit__(&iter)
return message_iter
def get_args_list(self):
retval = [ ]
iter = self.get_iter()
try:
retval.append(iter.get())
except TypeError, e:
return [ ]
value_at_iter = iter.next()
while (value_at_iter):
retval.append(iter.get())
value_at_iter = iter.next()
return retval
# FIXME: implement dbus_message_copy?
def get_type(self):
return dbus_message_get_type(self.msg)
def set_path(self, object_path):
return dbus_message_set_path(self.msg, object_path)
def get_path(self):
return dbus_message_get_path(self.msg)
def set_interface(self, interface):
return dbus_message_set_interface(self.msg, interface)
def get_interface(self):
return dbus_message_get_interface(self.msg)
def set_member(self, member):
return dbus_message_set_member(self.msg, member)
def get_member(self):
return dbus_message_get_member(self.msg)
def set_error_name(self, name):
return dbus_message_set_error_name(self.msg, name)
def get_error_name(self):
return dbus_message_get_error_name(self.msg)
def set_destination(self, destination):
return dbus_message_set_destination(self.msg, destination)
def get_destination(self):
return dbus_message_get_destination(self.msg)
def set_sender(self, sender):
return dbus_message_set_sender(self.msg, sender)
def get_sender(self):
cdef char *sender
sender = dbus_message_get_sender(self.msg)
if (sender == NULL):
return None
else:
return sender
def set_no_reply(self, no_reply):
dbus_message_set_no_reply(self.msg, no_reply)
def get_no_reply(self):
return dbus_message_get_no_reply(self.msg)
def is_method_call(self, interface, method):
return dbus_message_is_method_call(self.msg, interface, method)
def is_signal(self, interface, signal_name):
return dbus_message_is_signal(self.msg, interface, signal_name)
def is_error(self, error_name):
return dbus_message_is_error(self.msg, error_name)
def has_destination(self, service):
return dbus_message_has_destination(self.msg, service)
def has_sender(self, service):
return dbus_message_has_sender(self.msg, service)
def get_serial(self):
return dbus_message_get_serial(self.msg)
def set_reply_serial(self, reply_serial):
return dbus_message_set_reply_serial(self.msg, reply_serial)
def get_reply_serial(self):
return dbus_message_get_reply_serial(self.msg)
#FIXME: dbus_message_get_path_decomposed
# FIXME: all the different dbus_message_*args* methods
class Signal(Message):
def __init__(self, spath, sinterface, sname):
Message.__init__(self, MESSAGE_TYPE_SIGNAL, path=spath, interface=sinterface, name=sname)
class MethodCall(Message):
def __init__(self, mpath, minterface, mmethod):
Message.__init__(self, MESSAGE_TYPE_METHOD_CALL, path=mpath, interface=minterface, method=mmethod)
class MethodReturn(Message):
def __init__(self, method_call):
Message.__init__(self, MESSAGE_TYPE_METHOD_RETURN, method_call=method_call)
class Error(Message):
def __init__(self, reply_to, error_name, error_message):
Message.__init__(self, MESSAGE_TYPE_ERROR, reply_to=reply_to, error_name=error_name, error_message=error_message)
cdef class Server:
cdef DBusServer *server
def __init__(self, address):
cdef DBusError error
dbus_error_init(&error)
self.server = dbus_server_listen(address,
&error)
if dbus_error_is_set(&error):
raise DBusException, error.message
def setup_with_g_main (self):
dbus_server_setup_with_g_main(self.server, NULL)
def disconnect(self):
dbus_server_disconnect(self.server)
def get_is_connected(self):
return dbus_server_get_is_connected(self.server)
# def set_new_connection_function(self, function, data):
# dbus_server_set_new_connection_function(self.conn, function,
# data, NULL)
# def set_watch_functions(self, add_function, remove_function, data):
# dbus_server_set_watch_functions(self.server,
# add_function, remove_function,
# data, NULL)
# def set_timeout_functions(self, add_function, remove_function, data):
# dbus_server_set_timeout_functions(self.server,
# add_function, remove_function,
# data, NULL)
# def handle_watch(self, watch, condition):
# dbus_server_handle_watch(self.conn, watch, condition)
BUS_SESSION = DBUS_BUS_SESSION
BUS_SYSTEM = DBUS_BUS_SYSTEM
BUS_STARTER = DBUS_BUS_STARTER
def bus_get (bus_type):
cdef DBusError error
cdef Connection conn
dbus_error_init(&error)
cdef DBusConnection *connection
connection = dbus_bus_get(bus_type,
&error)
if dbus_error_is_set(&error):
raise DBusException, error.message
conn = Connection()
conn.__cinit__(None, connection)
return conn
def bus_get_unique_name(Connection connection):
cdef DBusConnection *conn
conn = connection._get_conn()
return dbus_bus_get_unique_name(conn)
def bus_get_unix_user(Connection connection, service_name):
cdef DBusError error
dbus_error_init(&error)
cdef int retval
cdef DBusConnection *conn
conn = connection._get_conn()
retval = dbus_bus_get_unix_user(conn, service_name, &error)
if dbus_error_is_set(&error):
raise DBusException, error.message
return retval
#These are defines, not enums so they aren't auto generated
DBUS_START_REPLY_SUCCESS = 0
DBUS_START_REPLY_ALREADY_RUNNING = 1
def bus_start_service_by_name(Connection connection, service_name, flags=0):
cdef DBusError error
dbus_error_init(&error)
cdef dbus_bool_t retval
cdef dbus_uint32_t results
cdef DBusConnection *conn
conn = connection._get_conn()
retval = dbus_bus_start_service_by_name(conn, service_name, flags, &results, &error)
return (retval, results)
def bus_register(Connection connection):
cdef DBusError error
dbus_error_init(&error)
cdef dbus_bool_t retval
cdef DBusConnection *conn
conn = connection._get_conn()
retval = dbus_bus_register(conn,
&error)
if dbus_error_is_set(&error):
raise DBusException, error.message
return retval
SERVICE_FLAG_PROHIBIT_REPLACEMENT = 0x1
SERVICE_FLAG_REPLACE_EXISTING = 0x2
def bus_request_name(Connection connection, service_name, flags=0):
cdef DBusError error
dbus_error_init(&error)
cdef int retval
cdef DBusConnection *conn
conn = connection._get_conn()
retval = dbus_bus_request_name(conn,
service_name,
flags,
&error)
if dbus_error_is_set(&error):
raise DBusException, error.message
return retval
def bus_name_has_owner(Connection connection, service_name):
cdef DBusError error
dbus_error_init(&error)
cdef dbus_bool_t retval
cdef DBusConnection *conn
conn = connection._get_conn()
retval = dbus_bus_name_has_owner(conn,
service_name,
&error)
if dbus_error_is_set(&error):
raise DBusException, error.message
return retval
def bus_add_match(Connection connection, rule):
cdef DBusError error
cdef DBusConnection *conn
dbus_error_init(&error)
conn = connection._get_conn()
dbus_bus_add_match (conn, rule, &error)
if dbus_error_is_set(&error):
raise DBusException, error.message
def bus_remove_match(Connection connection, rule):
cdef DBusError error
cdef DBusConnection *conn
dbus_error_init(&error)
conn = connection._get_conn()
dbus_bus_remove_match (conn, rule, &error)
if dbus_error_is_set(&error):
raise DBusException, error.message
def init_gthreads ():
dbus_g_thread_init ()