dbus/python/dbus_bindings.pyx.in
2005-05-19 20:27:19 +00:00

1474 lines
47 KiB
Python

# -*- Mode: Python -*-
# jdahlin is the most coolest and awesomest person in the world
# and wrote all the good parts of this code. all the bad parts
# where python conditionals have a ( ) around them, thus violating
# PEP-8 were written by the lame wannabe python programmer seth
#FIXME: find memory leaks that I am sure exist
#include "dbus_h_wrapper.h"
cdef extern from "stdlib.h":
cdef void *malloc(size_t size)
cdef void free(void *ptr)
cdef void *calloc(size_t nmemb, size_t size)
cdef extern from "dbus-glib.h":
ctypedef struct GMainContext
cdef void dbus_connection_setup_with_g_main (DBusConnection *connection,
GMainContext *context)
cdef void dbus_server_setup_with_g_main (DBusServer *server,
GMainContext *context)
cdef void dbus_g_thread_init ()
cdef extern from "Python.h":
void Py_XINCREF (object)
void Py_XDECREF (object)
object PyString_FromStringAndSize(char *, int)
ctypedef struct DBusError:
char *name
char *message
unsigned int dummy1
unsigned int dummy2
unsigned int dummy3
unsigned int dummy4
unsigned int dummy5
void *padding1
ctypedef struct DBusMessageIter:
void *dummy1
void *dummy2
dbus_uint32_t dummy3
int dummy4
int dummy5
int dummy6
int dummy7
int dummy8
int dummy9
int dummy10
int dummy11
int pad1
int pad2
void *pad3
ctypedef struct DBusObjectPathVTable:
DBusObjectPathUnregisterFunction unregister_function
DBusObjectPathMessageFunction message_function
void (* dbus_internal_pad1) (void *)
void (* dbus_internal_pad2) (void *)
void (* dbus_internal_pad3) (void *)
void (* dbus_internal_pad4) (void *)
_user_data_references = [ ]
class DBusException(Exception):
pass
class ConnectionError(Exception):
pass
class ObjectPath(str):
def __init__(self, value):
str.__init__(value)
class ByteArray(str):
def __init__(self, value):
str.__init__(value)
class Signature(str):
def __init__(self, value):
str.__init__(value)
class Byte(int):
def __init__(self, value):
int.__init__(value)
class Boolean(int):
def __init__(self, value):
int.__init__(value)
class Int16(int):
def __init__(self, value):
int.__init__(value)
class UInt16(int):
def __init__(self, value):
if value < 0:
raise TypeError('Unsigned integers must not have a negitive value')
int.__init__(value)
class Int32(int):
def __init__(self, value):
int.__init__(value)
class UInt32(long):
def __init__(self, value):
if value < 0:
raise TypeError('Unsigned integers must not have a negitive value')
long.__init__(value)
class Int64(long):
def __init__(self, value):
long.__init__(value)
class UInt64(long):
def __init__(self, value):
if value < 0:
raise TypeError('Unsigned integers must not have a negitive value')
long.__init__(value)
class Double(float):
def __init__(self, value):
float.__init__(self, value)
class String(str):
def __init__(self, value):
str.__init__(value)
class Array(list):
def __init__(self, value):
list.__init__(value)
class Struct(tuple):
def __init__(self, value):
tuple.__init__(value)
class Dictionary(dict):
def __init__(self, value):
dict.__init__(value)
#forward delcerations
cdef class Connection
cdef class Message
cdef class PendingCall
cdef class Watch
cdef class MessageIter
cdef void cunregister_function_handler (DBusConnection *connection,
void *user_data):
cdef Connection conn
tup = <object>user_data
assert (type(tup) == list)
function = tup[1]
conn = Connection()
conn.__cinit__(None, connection)
args = [conn]
function(*args)
cdef DBusHandlerResult cmessage_function_handler (DBusConnection *connection,
DBusMessage *msg,
void *user_data):
cdef Connection conn
cdef Message message
tup = <object>user_data
assert (type(tup) == list)
function = tup[0]
message = Message(_create=0)
message._set_msg(msg)
conn = Connection()
conn.__cinit__(None, connection)
args = [conn,
message]
retval = function(*args)
if (retval == None):
retval = DBUS_HANDLER_RESULT_HANDLED
return retval
cdef class Connection:
cdef DBusConnection *conn
def __init__(self, address=None, Connection _conn=None):
cdef DBusConnection *c_conn
cdef char *c_address
c_conn=NULL
if (_conn != None):
c_conn = _conn.conn
if (address != None or _conn != None):
self.__cinit__(c_address, c_conn)
# hack to be able to pass in a c pointer to the constructor
# while still alowing python programs to create a Connection object
cdef __cinit__(self, address, DBusConnection *_conn):
cdef DBusError error
dbus_error_init(&error)
if _conn != NULL:
self.conn = _conn
dbus_connection_ref(self.conn)
else:
self.conn = dbus_connection_open(address,
&error)
if dbus_error_is_set(&error):
raise DBusException, error.message
cdef _set_conn(self, DBusConnection *conn):
self.conn = conn
cdef DBusConnection *_get_conn(self):
return self.conn
def get_unique_name(self):
return bus_get_unique_name(self)
def setup_with_g_main(self):
dbus_connection_setup_with_g_main(self.conn, NULL)
def disconnect(self):
dbus_connection_disconnect(self.conn)
def get_is_connected(self):
return dbus_connection_get_is_connected(self.conn)
def get_is_authenticated(self):
return dbus_connection_get_is_authenticated(self.conn)
def flush(self):
dbus_connection_flush(self.conn)
def borrow_message(self):
cdef Message m
m = Message(_create=0)
m._set_msg(dbus_connection_borrow_message(self.conn))
return m
def return_message(self, Message message):
cdef DBusMessage *msg
msg = message._get_msg()
dbus_connection_return_message(self.conn, msg)
def steal_borrowed_message(self, Message message):
cdef DBusMessage *msg
msg = message._get_msg()
dbus_connection_steal_borrowed_message(self.conn,
msg)
def pop_message(self):
cdef DBusMessage *msg
cdef Message m
msg = dbus_connection_pop_message(self.conn)
if msg != NULL:
m = Message(_create=0)
m._set_msg(msg)
else:
m = None
return m
def get_dispatch_status(self):
return dbus_connection_get_dispatch_status(self.conn)
def dispatch(self):
return dbus_connection_dispatch(self.conn)
def send(self, Message message):
#cdef dbus_uint32_t client_serial
#if type(message) != Message:
# raise TypeError
cdef DBusMessage *msg
msg = message._get_msg()
retval = dbus_connection_send(self.conn,
msg,
NULL)
return retval
def send_with_reply_handlers(self, Message message, timeout_milliseconds, reply_handler, error_handler):
retval = False
try:
(retval, pending_call) = self.send_with_reply(message, timeout_milliseconds)
if pending_call:
pending_call.set_notify(reply_handler, error_handler)
except Exception, e:
error_handler(e)
return retval
def send_with_reply(self, Message message, timeout_milliseconds):
cdef dbus_bool_t retval
cdef DBusPendingCall *cpending_call
cdef DBusError error
cdef DBusMessage *msg
cdef PendingCall pending_call
dbus_error_init(&error)
cpending_call = NULL
msg = message._get_msg()
retval = dbus_connection_send_with_reply(self.conn,
msg,
&cpending_call,
timeout_milliseconds)
if dbus_error_is_set(&error):
raise DBusException, error.message
if (cpending_call != NULL):
pending_call = PendingCall()
pending_call.__cinit__(cpending_call)
else:
pending_call = None
return (retval, pending_call)
def send_with_reply_and_block(self, Message message,
timeout_milliseconds=0):
cdef DBusMessage * retval
cdef DBusError error
cdef DBusMessage *msg
cdef Message m
dbus_error_init(&error)
msg = message._get_msg()
retval = dbus_connection_send_with_reply_and_block(
self.conn,
msg,
timeout_milliseconds,
&error)
if dbus_error_is_set(&error):
raise DBusException, error.message
if retval == NULL:
raise AssertionError
m = Message(_create=0)
m._set_msg(retval)
return m
def set_watch_functions(self, add_function, remove_function, data):
pass
def set_timeout_functions(self, add_function, remove_function, data):
pass
def set_wakeup_main_function(self, wakeup_main_function, data):
pass
# FIXME: set_dispatch_status_function, get_unix_user, set_unix_user_function
def add_filter(self, filter_function):
user_data = [ filter_function ]
global _user_data_references
_user_data_references.append(user_data)
return dbus_connection_add_filter(self.conn,
cmessage_function_handler,
<void*>user_data,
NULL)
#FIXME: remove_filter
# this is pretty tricky, we want to only remove the filter
# if we truly have no more calls to our message_function_handler...ugh
def set_data(self, slot, data):
pass
def get_data(self, slot):
pass
def set_max_message_size(self, size):
dbus_connection_set_max_message_size(self.conn, size)
def get_max_message_size(self):
return dbus_connection_get_max_message_size(self.conn)
def set_max_received_size(self, size):
dbus_connection_set_max_received_size(self.conn, size)
def get_max_received_size(self):
return dbus_connection_get_max_received_size(self.conn)
def get_outgoing_size(self):
return dbus_connection_get_outgoing_size(self.conn)
# preallocate_send, free_preallocated_send, send_preallocated
def register_object_path(self, path, unregister_cb, message_cb):
cdef DBusObjectPathVTable cvtable
cvtable.unregister_function = cunregister_function_handler
cvtable.message_function = cmessage_function_handler
user_data = [message_cb, unregister_cb]
global _user_data_references
_user_data_references.append(user_data)
return dbus_connection_register_object_path(self.conn, path, &cvtable,
<void*>user_data)
def register_fallback(self, path, unregister_cb, message_cb):
cdef DBusObjectPathVTable cvtable
cvtable.unregister_function = cunregister_function_handler
cvtable.message_function = cmessage_function_handler
user_data = [message_cb, unregister_cb]
global _user_data_references
_user_data_references.append(user_data)
return dbus_connection_register_fallback(self.conn, path, &cvtable,
<void*>user_data)
#FIXME: unregister_object_path , see problems with remove_filter
def list_registered (self, parent_path):
cdef char **cchild_entries
cdef dbus_bool_t retval
retval = dbus_connection_list_registered(self.conn, parent_path, &cchild_entries)
if (not retval):
#FIXME: raise out of memory exception?
return None
i = 0
child_entries = []
while (cchild_entries[i] != NULL):
child_entries.append(cchild_entries[i])
i = i + 1
dbus_free_string_array(cchild_entries)
return child_entries
cdef void _pending_call_notification(DBusPendingCall *pending_call, void *user_data):
cdef DBusMessage *dbus_message
cdef Message message
(reply_handler, error_handler) = <object>user_data
dbus_message = dbus_pending_call_steal_reply(pending_call)
message = Message(_create=0)
message._set_msg(dbus_message)
type = message.get_type()
if type == MESSAGE_TYPE_METHOD_RETURN:
args = message.get_args_list()
reply_handler(*args)
elif type == MESSAGE_TYPE_ERROR:
args = message.get_args_list()
error_handler(DBusException(args[0]))
else:
error_handler(DBusException('Unexpected Message Type: ' + message.type_to_name(type)))
dbus_message_unref(dbus_message)
dbus_pending_call_unref(pending_call)
cdef void _pending_call_free_user_data(void *data):
call_tuple = <object>data
Py_XDECREF(call_tuple)
cdef class PendingCall:
cdef DBusPendingCall *pending_call
def __init__(self, PendingCall _pending_call=None):
if (_pending_call != None):
self.__cinit__(_pending_call.pending_call)
cdef void __cinit__(self, DBusPendingCall *_pending_call):
self.pending_call = _pending_call
dbus_pending_call_ref(self.pending_call)
cdef DBusPendingCall *_get_pending_call(self):
return self.pending_call
def cancel(self):
dbus_pending_call_cancel(self.pending_call)
def get_completed(self):
return dbus_pending_call_get_completed(self.pending_call)
def get_reply(self):
cdef Message message
message = Message(_create=0)
message._set_msg(dbus_pending_call_steal_reply(self.pending_call))
return message
def block(self):
dbus_pending_call_block(self.pending_call)
def set_notify(self, reply_handler, error_handler):
user_data = (reply_handler, error_handler)
Py_XINCREF(user_data)
dbus_pending_call_set_notify(self.pending_call, _pending_call_notification,
<void *>user_data, _pending_call_free_user_data)
cdef class Watch:
cdef DBusWatch* watch
def __init__(self):
pass
cdef __cinit__(self, DBusWatch *cwatch):
self.watch = cwatch
def get_fd(self):
return dbus_watch_get_fd(self.watch)
# FIXME: not picked up correctly by extract.py
#def get_flags(self):
# return dbus_watch_get_flags(self.watch)
def handle(self, flags):
return dbus_watch_handle(self.watch, flags)
def get_enabled(self):
return dbus_watch_get_enabled(self.watch)
cdef class MessageIter:
cdef DBusMessageIter *iter
cdef DBusMessageIter real_iter
cdef dbus_uint32_t level
def __init__(self, level=0):
self.iter = &self.real_iter
self.level = level
if(self.level > 32):
raise TypeError, 'Type recurion is too deep'
cdef __cinit__(self, DBusMessageIter *iter):
self.real_iter = iter[0]
cdef DBusMessageIter *_get_iter(self):
return self.iter
def has_next(self):
return dbus_message_iter_has_next(self.iter)
def next(self):
return dbus_message_iter_next(self.iter)
def get(self, arg_type=None):
if(arg_type == None):
arg_type = self.get_arg_type()
if arg_type == TYPE_INVALID:
raise TypeError, 'Invalid arg type in MessageIter'
elif arg_type == TYPE_STRING:
retval = self.get_string()
elif arg_type == TYPE_INT16:
retval = self.get_int16()
elif arg_type == TYPE_UINT16:
retval = self.get_uint16()
elif arg_type == TYPE_INT32:
retval = self.get_int32()
elif arg_type == TYPE_UINT32:
retval = self.get_uint32()
elif arg_type == TYPE_INT64:
retval = self.get_int64()
elif arg_type == TYPE_UINT64:
retval = self.get_uint64()
elif arg_type == TYPE_DOUBLE:
retval = self.get_double()
elif arg_type == TYPE_BYTE:
retval = self.get_byte()
elif arg_type == TYPE_BOOLEAN:
retval = self.get_boolean()
elif arg_type == TYPE_SIGNATURE:
retval = self.get_signature()
elif arg_type == TYPE_ARRAY:
array_type = self.get_element_type()
if array_type == TYPE_DICT_ENTRY:
retval = self.get_dict()
else:
retval = self.get_array(array_type)
elif arg_type == TYPE_OBJECT_PATH:
retval = self.get_object_path()
elif arg_type == TYPE_STRUCT:
retval = self.get_struct()
elif arg_type == TYPE_VARIANT:
retval = self.get_variant()
elif arg_type == TYPE_DICT_ENTRY:
raise TypeError, 'Dictionary Entries can only appear as part of an array container'
else:
raise TypeError, 'Unknown arg type %d in MessageIter' % (arg_type)
return retval
def get_arg_type(self):
return dbus_message_iter_get_arg_type(self.iter)
def get_element_type(self):
return dbus_message_iter_get_element_type(self.iter)
def get_byte(self):
cdef char c_val
dbus_message_iter_get_basic(self.iter, <char *>&c_val)
return c_val
def get_boolean(self):
cdef dbus_bool_t c_val
dbus_message_iter_get_basic(self.iter, <dbus_bool_t *>&c_val)
return c_val
def get_signature(self):
signature_string = self.get_string()
return Signature(signature_string)
def get_int16(self):
cdef dbus_int16_t c_val
dbus_message_iter_get_basic(self.iter, <dbus_int16_t *>&c_val)
return c_val
def get_uint16(self):
cdef dbus_uint16_t c_val
dbus_message_iter_get_basic(self.iter, <dbus_uint16_t *>&c_val)
return c_val
def get_int32(self):
cdef dbus_int32_t c_val
dbus_message_iter_get_basic(self.iter, <dbus_int32_t *>&c_val)
return c_val
def get_uint32(self):
cdef dbus_uint32_t c_val
dbus_message_iter_get_basic(self.iter, <dbus_uint32_t *>&c_val)
return c_val
def get_int64(self):
cdef dbus_int64_t c_val
dbus_message_iter_get_basic(self.iter, <dbus_int64_t *>&c_val)
return c_val
def get_uint64(self):
cdef dbus_uint64_t c_val
dbus_message_iter_get_basic(self.iter, <dbus_uint64_t *>&c_val)
return c_val
def get_double(self):
cdef double c_val
dbus_message_iter_get_basic(self.iter, <double *>&c_val)
return c_val
def get_string(self):
cdef char *c_str
dbus_message_iter_get_basic(self.iter, <char **>&c_str)
return c_str
def get_object_path(self):
object_path_string = self.get_string()
return ObjectPath(object_path_string)
def get_dict(self):
cdef DBusMessageIter c_dict_iter
cdef MessageIter dict_iter
level = self.level + 1
dbus_message_iter_recurse(self.iter, <DBusMessageIter *>&c_dict_iter)
dict_iter = MessageIter(level)
dict_iter.__cinit__(&c_dict_iter)
python_dict = {}
cur_arg_type = dict_iter.get_arg_type()
while cur_arg_type == TYPE_DICT_ENTRY:
if cur_arg_type != TYPE_DICT_ENTRY:
raise TypeError, "Dictionary elements must be of type TYPE_DICT_ENTRY '%s != %s'" % (TYPE_DICT_ENTRY, cur_arg_type)
dict_entry = dict_iter.get_struct()
if len(dict_entry) != 2:
raise TypeError, "Dictionary entries must be structs of two elements. This entry had %i elements.'" % (len(dict_entry))
python_dict[dict_entry[0]] = dict_entry[1]
dict_iter.next()
cur_arg_type = dict_iter.get_arg_type()
return python_dict
def get_array(self, type):
cdef DBusMessageIter c_array_iter
cdef MessageIter array_iter
level = self.level + 1
dbus_message_iter_recurse(self.iter, <DBusMessageIter *>&c_array_iter)
array_iter = MessageIter(level)
array_iter.__cinit__(&c_array_iter)
python_list = []
cur_arg_type = array_iter.get_arg_type()
while cur_arg_type != TYPE_INVALID:
if cur_arg_type != type:
raise TypeError, "Array elements must be of the same type '%s != %s'" % (type, cur_arg_type)
value = array_iter.get(type)
python_list.append(value)
array_iter.next()
cur_arg_type = array_iter.get_arg_type()
return python_list
def get_variant(self):
cdef DBusMessageIter c_var_iter
cdef MessageIter var_iter
level = self.level + 1
dbus_message_iter_recurse(self.iter, <DBusMessageIter *>&c_var_iter)
var_iter = MessageIter(level)
var_iter.__cinit__(&c_var_iter)
return var_iter.get()
def get_struct(self):
cdef DBusMessageIter c_struct_iter
cdef MessageIter struct_iter
level = self.level + 1
dbus_message_iter_recurse(self.iter, <DBusMessageIter *>&c_struct_iter)
struct_iter = MessageIter(level)
struct_iter.__cinit__(&c_struct_iter)
python_list = []
while struct_iter.get_arg_type() != TYPE_INVALID:
value = struct_iter.get()
python_list.append(value)
struct_iter.next()
return tuple(python_list)
def python_value_to_dbus_sig(self, value, level = 0):
if(level > 32):
raise TypeError, 'Type recurion is too deep'
level = level + 1
ptype = type(value)
ret = ""
if ptype == bool:
ret = TYPE_BOOL
ret = str(chr(ret))
elif ptype == int:
ret = TYPE_INT32
ret = str(chr(ret))
elif ptype == long:
ret = TYPE_INT64
ret = str(chr(ret))
elif ptype == str:
ret = TYPE_STRING
ret = str(chr(ret))
elif ptype == float:
ret = TYPE_DOUBLE
ret = str(chr(ret))
elif ptype == dict:
dict_list = value.items()
key, value = dict_list[0]
ret = str(chr(TYPE_ARRAY)) + str(chr(DICT_ENTRY_BEGIN))
ret = ret + self.python_value_to_dbus_sig(key, level)
ret = ret + self.python_value_to_dbus_sig(value, level)
ret = ret + str(chr(DICT_ENTRY_END))
elif ptype == tuple:
ret = str(chr(STRUCT_BEGIN))
for item in value:
ret = ret + self.python_value_to_dbus_sig(item, level)
ret = ret + str(chr(STRUCT_END))
elif ptype == list:
ret = str(chr(TYPE_ARRAY))
ret = ret + self.python_value_to_dbus_sig(value[0], level)
elif isinstance(value, ObjectPath):
ret = TYPE_OBJECT_PATH
ret = str(chr(ret))
elif isinstance(ByteArray):
ret = str(chr(TYPE_ARRAY)) + str(chr(TYPE_BYTE))
elif isinstance(Signature):
ret = TYPE_SIGNATURE
ret = str(chr(ret))
elif isinstance(value, Byte):
ret = TYPE_BYTE
ret = str(chr(ret))
elif isinstance(value, Boolean):
ret = TYPE_BOOL
ret = str(chr(ret))
elif isinstance(value, Int16):
ret = TYPE_INT16
ret = str(chr(ret))
elif isinstance(value, UInt16):
ret = TYPE_UINT16
ret = str(chr(ret))
elif isinstance(value, Int32):
ret = TYPE_INT32
ret = str(chr(ret))
elif isinstance(value, UInt32):
ret = TYPE_UINT32
ret = str(chr(ret))
elif isinstance(value, Int64):
ret = TYPE_INT64
ret = str(chr(ret))
elif isinstance(value, UInt64):
ret = TYPE_UINT64
ret = str(chr(ret))
elif isinstance(value, Double):
ret = TYPE_DOUBLE
ret = str(chr(ret))
elif isinstance(value, String):
ret = TYPE_STRING
ret = str(chr(ret))
elif isinstance(value, Array):
ret = str(chr(TYPE_ARRAY))
ret = ret + self.python_value_to_dbus_sig(value[0], level)
elif isinstance(value, Struct):
ret = str(chr(STRUCT_BEGIN))
for item in value:
ret = ret + self.python_value_to_dbus_sig(item, level)
ret = ret + str(chr(STRUCT_END))
elif isinstance(value, Dictionary):
dict_list = value.items()
key, value = dict_list[0]
ret = str(chr(TYPE_ARRAY)) + str(chr(DICT_ENTRY_BEGIN))
ret = ret + self.python_value_to_dbus_sig(key, level)
ret = ret + self.python_value_to_dbus_sig(value, level)
ret = ret + str(chr(DICT_ENTRY_END))
else:
raise TypeError, "Argument of unknown type '%s'" % (ptype)
return ret
#FIXME: handle all the different types?
def append(self, value):
value_type = type(value)
if value_type == bool:
retval = self.append_boolean(value)
elif value_type == int:
retval = self.append_int32(value)
elif value_type == long:
retval = self.append_int64(value)
elif value_type == str:
retval = self.append_string(value)
elif value_type == float:
retval = self.append_double(value)
elif value_type == dict:
retval = self.append_dict(value)
elif value_type == tuple:
retval = self.append_struct(value)
elif value_type == list:
retval = self.append_array(value)
#elif value_type == None.__class__:
# retval = self.append_nil()
elif isinstance(value, ObjectPath):
retval = self.append_object_path(value)
elif isinstance(value, ByteArray):
retval = self.append_array(value)
elif isinstance(value, Signature):
retval = self.append_signature(value)
elif isinstance(value, Byte):
retval = self.append_byte(value)
elif isinstance(value, Boolean):
retval = self.append_boolean(value)
elif isinstance(value, Int16):
retval = self.append_int16(value)
elif isinstance(value, UInt16):
retval = self.append_uint16(value)
elif isinstance(value, Int32):
retval = self.append_int32(value)
elif isinstance(value, UInt32):
retval = self.append_uint32(value)
elif isinstance(value, Int64):
retval = self.append_int64(value)
elif isinstance(value, UInt64):
retval = self.append_uint64(value)
elif isinstance(value, Double):
retval = self.append_double(value)
elif isinstance(value, String):
retval = self.append_string(value)
elif isinstance(value, Array):
retval = self.append_array(value)
elif isinstance(value, Struct):
retval = self.append_struct(value)
elif isinstance(value, Dictionary):
retval = self.append_dict(value)
else:
raise TypeError, "Argument of unknown type '%s'" % (value_type)
return retval
def append_boolean(self, value):
cdef dbus_bool_t c_value
c_value = value
return dbus_message_iter_append_basic(self.iter, TYPE_BOOLEAN, <dbus_bool_t *>&c_value)
def append_byte(self, value):
cdef char b
if type(value) != str or len(value) != 1:
raise TypeError
b = ord(value)
return dbus_message_iter_append_basic(self.iter, TYPE_BYTE, <char *>&b)
def append_int16(self, value):
cdef dbus_int32_t c_value
c_value = value
return dbus_message_iter_append_basic(self.iter, TYPE_INT16, <dbus_int32_t *>&c_value)
def append_uint16(self, value):
cdef dbus_uint32_t c_value
c_value = value
return dbus_message_iter_append_basic(self.iter, TYPE_UINT16, <dbus_uint32_t *>&c_value)
def append_int32(self, value):
cdef dbus_int32_t c_value
c_value = value
return dbus_message_iter_append_basic(self.iter, TYPE_INT32, <dbus_int32_t *>&c_value)
def append_uint32(self, value):
cdef dbus_uint32_t c_value
c_value = value
return dbus_message_iter_append_basic(self.iter, TYPE_UINT32, <dbus_uint32_t *>&c_value)
def append_int64(self, value):
cdef dbus_int64_t c_value
c_value = value
return dbus_message_iter_append_basic(self.iter, TYPE_INT64, <dbus_int64_t *>&c_value)
def append_uint64(self, value):
cdef dbus_uint64_t c_value
c_value = value
return dbus_message_iter_append_basic(self.iter, TYPE_UINT64, <dbus_uint64_t *>&c_value)
def append_double(self, value):
cdef double c_value
c_value = value
return dbus_message_iter_append_basic(self.iter, TYPE_DOUBLE, <double *>&c_value)
def append_string(self, value):
cdef char *c_value
c_value = value
return dbus_message_iter_append_basic(self.iter, TYPE_STRING, <char **>&c_value)
def append_object_path(self, value):
cdef char *c_value
c_value = value
return dbus_message_iter_append_basic(self.iter, TYPE_OBJECT_PATH, <char **>&c_value)
def append_signature(self, value):
cdef char *c_value
c_value = value
return dbus_message_iter_append_basic(self.iter, TYPE_SIGNATURE, <char **>&c_value)
def append_dict(self, python_dict):
cdef DBusMessageIter c_dict_iter, c_dict_entry_iter
cdef MessageIter dict_iter, dict_entry_iter
level = self.level + 1
dict_list = python_dict.items()
key, value = dict_list[0]
sig = str(chr(DICT_ENTRY_BEGIN))
sig = sig + self.python_value_to_dbus_sig(key)
sig = sig + self.python_value_to_dbus_sig(value)
sig = sig + str(chr(DICT_ENTRY_END))
dbus_message_iter_open_container(self.iter, TYPE_ARRAY, sig, <DBusMessageIter *>&c_dict_iter)
dict_iter = MessageIter(level)
dict_iter.__cinit__(&c_dict_iter)
for key, value in dict_list:
dbus_message_iter_open_container(dict_iter.iter, TYPE_DICT_ENTRY, sig, <DBusMessageIter *>&c_dict_entry_iter)
dict_entry_iter = MessageIter(level)
dict_entry_iter.__cinit__(&c_dict_entry_iter)
dict_entry_iter.append(key)
dict_entry_iter.append(value)
dbus_message_iter_close_container(dict_iter.iter, dict_entry_iter.iter)
dbus_message_iter_close_container(self.iter, dict_iter.iter)
def append_struct(self, python_struct):
cdef DBusMessageIter c_struct_iter
cdef MessageIter struct_iter
level = self.level + 1
dbus_message_iter_open_container(self.iter, TYPE_STRUCT, NULL, <DBusMessageIter *>&c_struct_iter)
struct_iter = MessageIter(level)
struct_iter.__cinit__(&c_struct_iter)
for item in python_struct:
if not struct_iter.append(item):
dbus_message_iter_close_container(self.iter, struct_iter.iter)
return False
dbus_message_iter_close_container(self.iter, struct_iter.iter)
def append_array(self, python_list):
cdef DBusMessageIter c_array_iter
cdef MessageIter array_iter
level = self.level + 1
sig = self.python_value_to_dbus_sig(python_list[0])
dbus_message_iter_open_container(self.iter, TYPE_ARRAY, sig, <DBusMessageIter *>&c_array_iter)
array_iter = MessageIter(level)
array_iter.__cinit__(&c_array_iter)
length = len(python_list)
for item in python_list:
if not array_iter.append(item):
dbus_message_iter_close_container(self.iter, array_iter.iter)
return False
dbus_message_iter_close_container(self.iter, array_iter.iter)
return True
def __str__(self):
cdef DBusMessageIter c_array_iter
cdef MessageIter array_iter
value_at_iter = True
retval = ""
while (value_at_iter):
type = self.get_arg_type()
if type == TYPE_INVALID:
break
elif type == TYPE_STRING:
str = iter.get_string()
arg = 'string:%s\n' % (str)
elif type == TYPE_OBJECT_PATH:
path = iter.get_object_path()
arg = 'object_path:%s\n' % (path)
elif type == TYPE_INT32:
num = iter.get_int32()
arg = 'int32:%d\n' % (num)
elif type == TYPE_UINT32:
num = iter.get_uint32()
arg = 'uint32:%u\n' % (num)
elif type == TYPE_INT64:
num = iter.get_int64()
arg = 'int64:%d\n' % (num)
elif type == TYPE_UINT64:
num = iter.get_uint64()
arg = 'uint64:%u\n' % (num)
elif type == TYPE_DOUBLE:
num = iter.get_double()
arg = 'double:%f\n' % (num)
elif type == TYPE_BYTE:
num = iter.get_byte()
arg = 'byte:%x(%s)\n' % (num, str(chr(num)))
elif type == TYPE_BOOLEAN:
bool = iter.get_boolean()
if (bool):
str = "true"
else:
str = "false"
arg = 'boolean:%s\n' % (str)
elif type == TYPE_ARRAY:
dbus_message_iter_recurse(self.iter, <DBusMessageIter *>&c_array_iter)
array_iter = MessageIter(self.level + 1)
array_iter.__cinit__(&c_array_iter)
if array_iter.has_next():
arg = 'array [' + str(array_iter) + ']'
else:
arg = 'array []'
else:
arg = '(unknown arg type %d)\n' % type
retval = retval + arg
value_at_iter = self.next()
return retval
(MESSAGE_TYPE_INVALID, MESSAGE_TYPE_METHOD_CALL, MESSAGE_TYPE_METHOD_RETURN, MESSAGE_TYPE_ERROR, MESSAGE_TYPE_SIGNAL) = range(5)
(TYPE_INVALID, TYPE_BYTE, TYPE_BOOLEAN, TYPE_INT16, TYPE_UINT16, TYPE_INT32, TYPE_UINT32, TYPE_INT64, TYPE_UINT64, TYPE_DOUBLE, TYPE_STRING, TYPE_OBJECT_PATH, TYPE_SIGNATURE, TYPE_ARRAY, TYPE_STRUCT, STRUCT_BEGIN, STRUCT_END, TYPE_VARIANT, TYPE_DICT_ENTRY, DICT_ENTRY_BEGIN, DICT_ENTRY_END) = (0, ord('y'), ord('b'), ord('n'), ord('q'), ord('i'), ord('u'), ord('x'), ord('t'), ord('d'), ord('s'), ord('o'), ord('g'), ord('a'), ord('r'), ord('('), ord(')'), ord('v'), ord('e'), ord('{'), ord('}'))
(HANDLER_RESULT_HANDLED, HANDLER_RESULT_NOT_YET_HANDLED, HANDLER_RESULT_NEED_MEMORY) = range(3)
cdef class Message:
cdef DBusMessage *msg
def __init__(self, message_type=MESSAGE_TYPE_INVALID,
service=None, path=None, dbus_interface=None, method=None,
Message method_call=None,
name=None,
Message reply_to=None, error_name=None, error_message=None,
_create=1):
cdef char *cservice
cdef char *ciface
cdef DBusMessage *cmsg
ciface = NULL
if (dbus_interface != None):
ciface = dbus_interface
cservice = NULL
if (service != None):
cservice = service
if not _create:
return
if message_type == MESSAGE_TYPE_METHOD_CALL:
self.msg = dbus_message_new_method_call(cservice, path, ciface, method)
elif message_type == MESSAGE_TYPE_METHOD_RETURN:
cmsg = method_call._get_msg()
self.msg = dbus_message_new_method_return(cmsg)
elif message_type == MESSAGE_TYPE_SIGNAL:
self.msg = dbus_message_new_signal(path, ciface, name)
elif message_type == MESSAGE_TYPE_ERROR:
cmsg = reply_to._get_msg()
self.msg = dbus_message_new_error(cmsg, error_name, error_message)
def type_to_name(self, type):
if type == MESSAGE_TYPE_SIGNAL:
return "signal"
elif type == MESSAGE_TYPE_METHOD_CALL:
return "method call"
elif type == MESSAGE_TYPE_METHOD_RETURN:
return "method return"
elif type == MESSAGE_TYPE_ERROR:
return "error"
else:
return "(unknown message type)"
def __str__(self):
message_type = self.get_type()
sender = self.get_sender()
if sender == None:
sender = "(no sender)"
if (message_type == MESSAGE_TYPE_METHOD_CALL) or (message_type == MESSAGE_TYPE_SIGNAL):
retval = '%s interface=%s; member=%s; sender=%s' % (self.type_to_name(message_type),
self.get_interface(),
self.get_member(),
sender)
elif message_type == MESSAGE_TYPE_METHOD_RETURN:
retval = '%s sender=%s' % (self.type_to_name(message_type),
sender)
elif message_type == MESSAGE_TYPE_ERROR:
retval = '%s name=%s; sender=%s' % (self.type_to_name(message_type),
self.get_error_name(),
sender)
else:
retval = "Message of unknown type %d" % (message_type)
# FIXME: should really use self.convert_to_tuple() here
iter = self.get_iter()
retval = retval + "\n" + str(iter)
return retval
cdef _set_msg(self, DBusMessage *msg):
self.msg = msg
cdef DBusMessage *_get_msg(self):
return self.msg
def get_iter(self, append=False):
cdef DBusMessageIter iter
cdef MessageIter message_iter
cdef DBusMessage *msg
msg = self._get_msg()
if append:
dbus_message_iter_init_append(msg, &iter)
else:
dbus_message_iter_init(msg, &iter)
message_iter = MessageIter(0)
message_iter.__cinit__(&iter)
return message_iter
def get_args_list(self):
retval = [ ]
iter = self.get_iter()
try:
retval.append(iter.get())
except TypeError, e:
return [ ]
value_at_iter = iter.next()
while (value_at_iter):
retval.append(iter.get())
value_at_iter = iter.next()
return retval
# FIXME: implement dbus_message_copy?
def get_type(self):
return dbus_message_get_type(self.msg)
def set_path(self, object_path):
return dbus_message_set_path(self.msg, object_path)
def get_path(self):
return dbus_message_get_path(self.msg)
def set_interface(self, interface):
return dbus_message_set_interface(self.msg, interface)
def get_interface(self):
return dbus_message_get_interface(self.msg)
def set_member(self, member):
return dbus_message_set_member(self.msg, member)
def get_member(self):
return dbus_message_get_member(self.msg)
def set_error_name(self, name):
return dbus_message_set_error_name(self.msg, name)
def get_error_name(self):
return dbus_message_get_error_name(self.msg)
def set_destination(self, destination):
return dbus_message_set_destination(self.msg, destination)
def get_destination(self):
return dbus_message_get_destination(self.msg)
def set_sender(self, sender):
return dbus_message_set_sender(self.msg, sender)
def get_sender(self):
cdef char *sender
sender = dbus_message_get_sender(self.msg)
if (sender == NULL):
return None
else:
return sender
def set_no_reply(self, no_reply):
dbus_message_set_no_reply(self.msg, no_reply)
def get_no_reply(self):
return dbus_message_get_no_reply(self.msg)
def is_method_call(self, interface, method):
return dbus_message_is_method_call(self.msg, interface, method)
def is_signal(self, interface, signal_name):
return dbus_message_is_signal(self.msg, interface, signal_name)
def is_error(self, error_name):
return dbus_message_is_error(self.msg, error_name)
def has_destination(self, service):
return dbus_message_has_destination(self.msg, service)
def has_sender(self, service):
return dbus_message_has_sender(self.msg, service)
def get_serial(self):
return dbus_message_get_serial(self.msg)
def set_reply_serial(self, reply_serial):
return dbus_message_set_reply_serial(self.msg, reply_serial)
def get_reply_serial(self):
return dbus_message_get_reply_serial(self.msg)
#FIXME: dbus_message_get_path_decomposed
# FIXME: all the different dbus_message_*args* methods
class Signal(Message):
def __init__(self, spath, sinterface, sname):
Message.__init__(self, MESSAGE_TYPE_SIGNAL, path=spath, dbus_interface=sinterface, name=sname)
class MethodCall(Message):
def __init__(self, mpath, minterface, mmethod):
Message.__init__(self, MESSAGE_TYPE_METHOD_CALL, path=mpath, dbus_interface=minterface, method=mmethod)
class MethodReturn(Message):
def __init__(self, method_call):
Message.__init__(self, MESSAGE_TYPE_METHOD_RETURN, method_call=method_call)
class Error(Message):
def __init__(self, reply_to, error_name, error_message):
Message.__init__(self, MESSAGE_TYPE_ERROR, reply_to=reply_to, error_name=error_name, error_message=error_message)
cdef class Server:
cdef DBusServer *server
def __init__(self, address):
cdef DBusError error
dbus_error_init(&error)
self.server = dbus_server_listen(address,
&error)
if dbus_error_is_set(&error):
raise DBusException, error.message
def setup_with_g_main (self):
dbus_server_setup_with_g_main(self.server, NULL)
def disconnect(self):
dbus_server_disconnect(self.server)
def get_is_connected(self):
return dbus_server_get_is_connected(self.server)
# def set_new_connection_function(self, function, data):
# dbus_server_set_new_connection_function(self.conn, function,
# data, NULL)
# def set_watch_functions(self, add_function, remove_function, data):
# dbus_server_set_watch_functions(self.server,
# add_function, remove_function,
# data, NULL)
# def set_timeout_functions(self, add_function, remove_function, data):
# dbus_server_set_timeout_functions(self.server,
# add_function, remove_function,
# data, NULL)
# def handle_watch(self, watch, condition):
# dbus_server_handle_watch(self.conn, watch, condition)
BUS_SESSION = DBUS_BUS_SESSION
BUS_SYSTEM = DBUS_BUS_SYSTEM
BUS_STARTER = DBUS_BUS_STARTER
def bus_get (bus_type):
cdef DBusError error
cdef Connection conn
dbus_error_init(&error)
cdef DBusConnection *connection
connection = dbus_bus_get(bus_type,
&error)
if dbus_error_is_set(&error):
raise DBusException, error.message
conn = Connection()
conn.__cinit__(None, connection)
return conn
def bus_get_unique_name(Connection connection):
cdef DBusConnection *conn
conn = connection._get_conn()
return dbus_bus_get_unique_name(conn)
def bus_get_unix_user(Connection connection, service_name):
cdef DBusError error
dbus_error_init(&error)
cdef int retval
cdef DBusConnection *conn
conn = connection._get_conn()
retval = dbus_bus_get_unix_user(conn, service_name, &error)
if dbus_error_is_set(&error):
raise DBusException, error.message
return retval
#These are defines, not enums so they aren't auto generated
DBUS_START_REPLY_SUCCESS = 0
DBUS_START_REPLY_ALREADY_RUNNING = 1
def bus_start_service_by_name(Connection connection, service_name, flags=0):
cdef DBusError error
dbus_error_init(&error)
cdef dbus_bool_t retval
cdef dbus_uint32_t results
cdef DBusConnection *conn
conn = connection._get_conn()
retval = dbus_bus_start_service_by_name(conn, service_name, flags, &results, &error)
return (retval, results)
def bus_register(Connection connection):
cdef DBusError error
dbus_error_init(&error)
cdef dbus_bool_t retval
cdef DBusConnection *conn
conn = connection._get_conn()
retval = dbus_bus_register(conn,
&error)
if dbus_error_is_set(&error):
raise DBusException, error.message
return retval
SERVICE_FLAG_PROHIBIT_REPLACEMENT = 0x1
SERVICE_FLAG_REPLACE_EXISTING = 0x2
def bus_request_name(Connection connection, service_name, flags=0):
cdef DBusError error
dbus_error_init(&error)
cdef int retval
cdef DBusConnection *conn
conn = connection._get_conn()
retval = dbus_bus_request_name(conn,
service_name,
flags,
&error)
if dbus_error_is_set(&error):
raise DBusException, error.message
return retval
def bus_name_has_owner(Connection connection, service_name):
cdef DBusError error
dbus_error_init(&error)
cdef dbus_bool_t retval
cdef DBusConnection *conn
conn = connection._get_conn()
retval = dbus_bus_name_has_owner(conn,
service_name,
&error)
if dbus_error_is_set(&error):
raise DBusException, error.message
return retval
def bus_add_match(Connection connection, rule):
cdef DBusError error
cdef DBusConnection *conn
dbus_error_init(&error)
conn = connection._get_conn()
dbus_bus_add_match (conn, rule, &error)
if dbus_error_is_set(&error):
raise DBusException, error.message
def bus_remove_match(Connection connection, rule):
cdef DBusError error
cdef DBusConnection *conn
dbus_error_init(&error)
conn = connection._get_conn()
dbus_bus_remove_match (conn, rule, &error)
if dbus_error_is_set(&error):
raise DBusException, error.message
def init_gthreads ():
dbus_g_thread_init ()