mirror of
https://gitlab.freedesktop.org/dbus/dbus.git
synced 2025-12-24 22:50:08 +01:00
* python/dbus.py: * python/dbus_bindings.pyx.in: * python/tests/test-client.py: Add dbus.ByteArray and dbus_bindings.ByteArray types so that byte streams can be passed back. Give jdahlin the heaps of credit that are so rightfully his.
1138 lines
36 KiB
Python
1138 lines
36 KiB
Python
# -*- Mode: Python -*-
|
|
|
|
# jdahlin is the most coolest and awesomest person in the world
|
|
# and wrote all the good parts of this code. all the bad parts
|
|
# where python conditionals have a ( ) around them, thus violating
|
|
# PEP-8 were written by the lame wannabe python programmer seth
|
|
|
|
#include "dbus_h_wrapper.h"
|
|
|
|
cdef extern from "stdlib.h":
|
|
cdef void *malloc(size_t size)
|
|
cdef void free(void *ptr)
|
|
cdef void *calloc(size_t nmemb, size_t size)
|
|
|
|
cdef extern from "dbus-glib.h":
|
|
ctypedef struct GMainContext
|
|
cdef void dbus_connection_setup_with_g_main (DBusConnection *connection,
|
|
GMainContext *context)
|
|
cdef void dbus_server_setup_with_g_main (DBusServer *server,
|
|
GMainContext *context)
|
|
|
|
cdef extern from "Python.h":
|
|
void Py_XINCREF (object)
|
|
void Py_XDECREF (object)
|
|
object PyString_FromStringAndSize(char *, int)
|
|
|
|
ctypedef struct DBusError:
|
|
char *name
|
|
char *message
|
|
unsigned int dummy1
|
|
unsigned int dummy2
|
|
unsigned int dummy3
|
|
unsigned int dummy4
|
|
unsigned int dummy5
|
|
void *padding1
|
|
|
|
ctypedef struct DBusMessageIter:
|
|
void *dummy1
|
|
void *dummy2
|
|
dbus_uint32_t dummy3
|
|
int dummy4
|
|
int dummy5
|
|
int dummy6
|
|
int dummy7
|
|
int dummy8
|
|
int dummy9
|
|
int dummy10
|
|
int dummy11
|
|
int pad1
|
|
int pad2
|
|
void *pad3
|
|
|
|
ctypedef struct DBusObjectPathVTable:
|
|
DBusObjectPathUnregisterFunction unregister_function
|
|
DBusObjectPathMessageFunction message_function
|
|
void (* dbus_internal_pad1) (void *)
|
|
void (* dbus_internal_pad2) (void *)
|
|
void (* dbus_internal_pad3) (void *)
|
|
void (* dbus_internal_pad4) (void *)
|
|
|
|
|
|
_user_data_references = [ ]
|
|
|
|
class DBusException(Exception):
|
|
pass
|
|
|
|
class ConnectionError(Exception):
|
|
pass
|
|
|
|
class ObjectPath(str):
|
|
def __init__(self, value):
|
|
str.__init__(value)
|
|
|
|
class ByteArray(str):
|
|
def __init__(self, value):
|
|
str.__init__(value)
|
|
|
|
|
|
#forward delcerations
|
|
cdef class Connection
|
|
cdef class Message
|
|
cdef class PendingCall
|
|
cdef class Watch
|
|
cdef class MessageIter
|
|
|
|
cdef void cunregister_function_handler (DBusConnection *connection,
|
|
void *user_data):
|
|
cdef Connection conn
|
|
tup = <object>user_data
|
|
assert (type(tup) == list)
|
|
function = tup[1]
|
|
conn = Connection()
|
|
conn.__cinit__(None, connection)
|
|
|
|
args = [conn]
|
|
function(*args)
|
|
|
|
cdef DBusHandlerResult cmessage_function_handler (DBusConnection *connection,
|
|
DBusMessage *msg,
|
|
void *user_data):
|
|
cdef Connection conn
|
|
cdef Message message
|
|
|
|
tup = <object>user_data
|
|
assert (type(tup) == list)
|
|
function = tup[0]
|
|
message = Message(_create=0)
|
|
message._set_msg(msg)
|
|
|
|
conn = Connection()
|
|
conn.__cinit__(None, connection)
|
|
|
|
args = [conn,
|
|
message]
|
|
retval = function(*args)
|
|
if (retval == None):
|
|
retval = DBUS_HANDLER_RESULT_HANDLED
|
|
|
|
return retval
|
|
|
|
cdef class Connection:
|
|
cdef DBusConnection *conn
|
|
|
|
def __init__(self, address=None, Connection _conn=None):
|
|
cdef DBusConnection *c_conn
|
|
cdef char *c_address
|
|
c_conn=NULL
|
|
if (_conn != None):
|
|
c_conn = _conn.conn
|
|
|
|
if (address != None or _conn != None):
|
|
self.__cinit__(c_address, c_conn)
|
|
|
|
# hack to be able to pass in a c pointer to the constructor
|
|
# while still alowing python programs to create a Connection object
|
|
cdef __cinit__(self, address, DBusConnection *_conn):
|
|
cdef DBusError error
|
|
dbus_error_init(&error)
|
|
if _conn != NULL:
|
|
self.conn = _conn
|
|
dbus_connection_ref(self.conn)
|
|
else:
|
|
self.conn = dbus_connection_open(address,
|
|
&error)
|
|
if dbus_error_is_set(&error):
|
|
raise DBusException, error.message
|
|
|
|
|
|
cdef _set_conn(self, DBusConnection *conn):
|
|
self.conn = conn
|
|
|
|
cdef DBusConnection *_get_conn(self):
|
|
return self.conn
|
|
|
|
def get_base_service(self):
|
|
return bus_get_base_service(self)
|
|
|
|
def setup_with_g_main(self):
|
|
dbus_connection_setup_with_g_main(self.conn, NULL)
|
|
|
|
def disconnect(self):
|
|
dbus_connection_disconnect(self.conn)
|
|
|
|
def get_is_connected(self):
|
|
return dbus_connection_get_is_connected(self.conn)
|
|
|
|
def get_is_authenticated(self):
|
|
return dbus_connection_get_is_authenticated(self.conn)
|
|
|
|
def flush(self):
|
|
dbus_connection_flush(self.conn)
|
|
|
|
def borrow_message(self):
|
|
cdef Message m
|
|
m = Message(_create=0)
|
|
m._set_msg(dbus_connection_borrow_message(self.conn))
|
|
return m
|
|
|
|
def return_message(self, Message message):
|
|
cdef DBusMessage *msg
|
|
msg = message._get_msg()
|
|
dbus_connection_return_message(self.conn, msg)
|
|
|
|
def steal_borrowed_message(self, Message message):
|
|
cdef DBusMessage *msg
|
|
msg = message._get_msg()
|
|
dbus_connection_steal_borrowed_message(self.conn,
|
|
msg)
|
|
|
|
def pop_message(self):
|
|
cdef DBusMessage *msg
|
|
cdef Message m
|
|
|
|
msg = dbus_connection_pop_message(self.conn)
|
|
if msg != NULL:
|
|
m = Message(_create=0)
|
|
m._set_msg(msg)
|
|
else:
|
|
m = None
|
|
return m
|
|
|
|
def get_dispatch_status(self):
|
|
return dbus_connection_get_dispatch_status(self.conn)
|
|
|
|
def dispatch(self):
|
|
return dbus_connection_dispatch(self.conn)
|
|
|
|
def send(self, Message message):
|
|
#cdef dbus_uint32_t client_serial
|
|
#if type(message) != Message:
|
|
# raise TypeError
|
|
cdef DBusMessage *msg
|
|
msg = message._get_msg()
|
|
retval = dbus_connection_send(self.conn,
|
|
msg,
|
|
NULL)
|
|
return retval
|
|
|
|
def send_with_reply(self, Message message, timeout_milliseconds):
|
|
cdef dbus_bool_t retval
|
|
cdef DBusPendingCall *cpending_call
|
|
cdef DBusError error
|
|
cdef DBusMessage *msg
|
|
cdef PendingCall pending_call
|
|
|
|
dbus_error_init(&error)
|
|
|
|
cpending_call = NULL
|
|
|
|
msg = message._get_msg()
|
|
|
|
retval = dbus_connection_send_with_reply(self.conn,
|
|
msg,
|
|
&cpending_call,
|
|
timeout_milliseconds)
|
|
|
|
if dbus_error_is_set(&error):
|
|
raise DBusException, error.message
|
|
|
|
if (cpending_call != NULL):
|
|
pending_call = PendingCall()
|
|
pending_call.__cinit__(cpending_call)
|
|
else:
|
|
pending_call = None
|
|
|
|
return (retval, pending_call)
|
|
|
|
def send_with_reply_and_block(self, Message message,
|
|
timeout_milliseconds=0):
|
|
cdef DBusMessage * retval
|
|
cdef DBusError error
|
|
cdef DBusMessage *msg
|
|
cdef Message m
|
|
|
|
dbus_error_init(&error)
|
|
|
|
msg = message._get_msg()
|
|
|
|
retval = dbus_connection_send_with_reply_and_block(
|
|
self.conn,
|
|
msg,
|
|
timeout_milliseconds,
|
|
&error)
|
|
|
|
if dbus_error_is_set(&error):
|
|
raise DBusException, error.message
|
|
|
|
if retval == NULL:
|
|
raise AssertionError
|
|
|
|
m = Message(_create=0)
|
|
m._set_msg(retval)
|
|
return m
|
|
|
|
def set_watch_functions(self, add_function, remove_function, data):
|
|
pass
|
|
|
|
def set_timeout_functions(self, add_function, remove_function, data):
|
|
pass
|
|
|
|
def set_wakeup_main_function(self, wakeup_main_function, data):
|
|
pass
|
|
|
|
# FIXME: set_dispatch_status_function, get_unix_user, set_unix_user_function
|
|
|
|
def add_filter(self, filter_function):
|
|
user_data = [ filter_function ]
|
|
global _user_data_references
|
|
_user_data_references.append(user_data)
|
|
|
|
return dbus_connection_add_filter(self.conn,
|
|
cmessage_function_handler,
|
|
<void*>user_data,
|
|
NULL)
|
|
|
|
|
|
#FIXME: remove_filter
|
|
# this is pretty tricky, we want to only remove the filter
|
|
# if we truly have no more calls to our message_function_handler...ugh
|
|
|
|
def set_data(self, slot, data):
|
|
pass
|
|
|
|
def get_data(self, slot):
|
|
pass
|
|
|
|
def set_max_message_size(self, size):
|
|
dbus_connection_set_max_message_size(self.conn, size)
|
|
|
|
def get_max_message_size(self):
|
|
return dbus_connection_get_max_message_size(self.conn)
|
|
|
|
def set_max_received_size(self, size):
|
|
dbus_connection_set_max_received_size(self.conn, size)
|
|
|
|
def get_max_received_size(self):
|
|
return dbus_connection_get_max_received_size(self.conn)
|
|
|
|
def get_outgoing_size(self):
|
|
return dbus_connection_get_outgoing_size(self.conn)
|
|
|
|
# preallocate_send, free_preallocated_send, send_preallocated
|
|
|
|
def register_object_path(self, path, unregister_cb, message_cb):
|
|
cdef DBusObjectPathVTable cvtable
|
|
|
|
cvtable.unregister_function = cunregister_function_handler
|
|
cvtable.message_function = cmessage_function_handler
|
|
|
|
user_data = [message_cb, unregister_cb]
|
|
global _user_data_references
|
|
_user_data_references.append(user_data)
|
|
|
|
return dbus_connection_register_object_path(self.conn, path, &cvtable,
|
|
<void*>user_data)
|
|
|
|
def register_fallback(self, path, unregister_cb, message_cb):
|
|
cdef DBusObjectPathVTable cvtable
|
|
|
|
cvtable.unregister_function = cunregister_function_handler
|
|
cvtable.message_function = cmessage_function_handler
|
|
|
|
user_data = [message_cb, unregister_cb]
|
|
global _user_data_references
|
|
_user_data_references.append(user_data)
|
|
|
|
return dbus_connection_register_fallback(self.conn, path, &cvtable,
|
|
<void*>user_data)
|
|
|
|
#FIXME: unregister_object_path , see problems with remove_filter
|
|
|
|
def list_registered (self, parent_path):
|
|
cdef char **cchild_entries
|
|
cdef dbus_bool_t retval
|
|
|
|
retval = dbus_connection_list_registered(self.conn, parent_path, &cchild_entries)
|
|
|
|
if (not retval):
|
|
#FIXME: raise out of memory exception?
|
|
return None
|
|
|
|
i = 0
|
|
child_entries = []
|
|
|
|
while (cchild_entries[i] != NULL):
|
|
child_entries.append(cchild_entries[i])
|
|
i = i + 1
|
|
|
|
dbus_free_string_array(cchild_entries)
|
|
|
|
return child_entries
|
|
|
|
|
|
cdef class PendingCall:
|
|
cdef DBusPendingCall *pending_call
|
|
|
|
def __init__(self, PendingCall _pending_call=None):
|
|
if (_pending_call != None):
|
|
self.__cinit__(_pending_call.pending_call)
|
|
|
|
cdef void __cinit__(self, DBusPendingCall *_pending_call):
|
|
self.pending_call = _pending_call
|
|
dbus_pending_call_ref(self.pending_call)
|
|
|
|
cdef DBusPendingCall *_get_pending_call(self):
|
|
return self.pending_call
|
|
|
|
def cancel(self):
|
|
dbus_pending_call_cancel(self.pending_call)
|
|
|
|
def get_completed(self):
|
|
return dbus_pending_call_get_completed(self.pending_call)
|
|
|
|
def get_reply(self):
|
|
cdef Message message
|
|
message = Message(_create=0)
|
|
message._set_msg(dbus_pending_call_get_reply(self.pending_call))
|
|
return message
|
|
|
|
def block(self):
|
|
dbus_pending_call_block(self.pending_call)
|
|
|
|
cdef class Watch:
|
|
cdef DBusWatch* watch
|
|
|
|
def __init__(self):
|
|
pass
|
|
|
|
cdef __cinit__(self, DBusWatch *cwatch):
|
|
self.watch = cwatch
|
|
|
|
def get_fd(self):
|
|
return dbus_watch_get_fd(self.watch)
|
|
|
|
# FIXME: not picked up correctly by extract.py
|
|
#def get_flags(self):
|
|
# return dbus_watch_get_flags(self.watch)
|
|
|
|
def handle(self, flags):
|
|
return dbus_watch_handle(self.watch, flags)
|
|
|
|
def get_enabled(self):
|
|
return dbus_watch_get_enabled(self.watch)
|
|
|
|
cdef class MessageIter:
|
|
cdef DBusMessageIter *iter
|
|
cdef DBusMessageIter real_iter
|
|
|
|
def __init__(self):
|
|
self.iter = &self.real_iter
|
|
|
|
cdef __cinit__(self, DBusMessageIter *iter):
|
|
self.real_iter = iter[0]
|
|
|
|
cdef DBusMessageIter *_get_iter(self):
|
|
return self.iter
|
|
|
|
def has_next(self):
|
|
return dbus_message_iter_has_next(self.iter)
|
|
|
|
def next(self):
|
|
return dbus_message_iter_next(self.iter)
|
|
|
|
def get(self):
|
|
arg_type = self.get_arg_type()
|
|
|
|
if arg_type == TYPE_INVALID:
|
|
raise TypeError, 'Invalid arg type in MessageIter'
|
|
elif arg_type == TYPE_NIL:
|
|
retval = None
|
|
elif arg_type == TYPE_STRING:
|
|
retval = self.get_string()
|
|
elif arg_type == TYPE_INT32:
|
|
retval = self.get_int32()
|
|
elif arg_type == TYPE_UINT32:
|
|
retval = self.get_uint32()
|
|
elif arg_type == TYPE_DOUBLE:
|
|
retval = self.get_double()
|
|
elif arg_type == TYPE_BYTE:
|
|
retval = self.get_byte()
|
|
elif arg_type == TYPE_BOOLEAN:
|
|
retval = self.get_boolean()
|
|
elif arg_type == TYPE_ARRAY:
|
|
array_type = self.get_array_type()
|
|
|
|
if array_type == TYPE_STRING:
|
|
retval = self.get_string_array()
|
|
elif array_type == TYPE_OBJECT_PATH:
|
|
retval = self.get_object_path_array()
|
|
elif array_type == TYPE_BYTE:
|
|
retval = self.get_byte_array()
|
|
elif array_type == TYPE_INT32:
|
|
retval = self.get_int32_array()
|
|
elif array_type == TYPE_UINT32:
|
|
retval = self.get_uint32_array()
|
|
elif array_type == TYPE_DOUBLE:
|
|
retval = self.get_double_array()
|
|
else:
|
|
raise TypeError, "Unknown array type %d in MessageIter" % (array_type)
|
|
elif arg_type == TYPE_DICT:
|
|
retval = self.get_dict()
|
|
elif arg_type == TYPE_OBJECT_PATH:
|
|
retval = self.get_object_path()
|
|
else:
|
|
raise TypeError, 'Unknown arg type %d in MessageIter' % (arg_type)
|
|
|
|
return retval
|
|
|
|
def get_dict(self):
|
|
cdef DBusMessageIter c_dict_iter
|
|
cdef MessageIter dict_iter
|
|
|
|
dbus_message_iter_init_dict_iterator(self.iter, &c_dict_iter)
|
|
|
|
dict_iter = MessageIter()
|
|
dict_iter.__cinit__(&c_dict_iter)
|
|
|
|
dict = {}
|
|
|
|
end_of_dict = False
|
|
|
|
while True:
|
|
key = dict_iter.get_dict_key()
|
|
value = dict_iter.get()
|
|
dict[key] = value
|
|
if not dict_iter.has_next():
|
|
break
|
|
dict_iter.next()
|
|
|
|
return dict
|
|
|
|
def get_arg_type(self):
|
|
return dbus_message_iter_get_arg_type(self.iter)
|
|
|
|
def get_array_type(self):
|
|
return dbus_message_iter_get_array_type(self.iter)
|
|
|
|
# FIXME: implement get_byte
|
|
#def get_byte(self):
|
|
# return dbus_message_iter_get_byte(self.iter)
|
|
|
|
def get_boolean(self):
|
|
return dbus_message_iter_get_boolean(self.iter)
|
|
|
|
def get_int32(self):
|
|
return dbus_message_iter_get_int32(self.iter)
|
|
|
|
def get_uint32(self):
|
|
return dbus_message_iter_get_uint32(self.iter)
|
|
|
|
def get_double(self):
|
|
return dbus_message_iter_get_double(self.iter)
|
|
|
|
def get_string(self):
|
|
return dbus_message_iter_get_string(self.iter)
|
|
|
|
def get_object_path(self):
|
|
object_path_string = dbus_message_iter_get_object_path(self.iter)
|
|
return ObjectPath(object_path_string)
|
|
|
|
def get_dict_key(self):
|
|
return dbus_message_iter_get_dict_key(self.iter)
|
|
|
|
# FIXME: implement dbus_message_iter_init_array_iterator
|
|
|
|
def get_byte_array(self):
|
|
cdef int len
|
|
cdef unsigned char *bytearray
|
|
cdef int i
|
|
dbus_message_iter_get_byte_array(self.iter, &bytearray, <int*>&len)
|
|
python_string = PyString_FromStringAndSize(<char *>bytearray, len)
|
|
return python_string
|
|
|
|
# FIXME: implement dbus_message_iter_get_boolean_array
|
|
|
|
def get_int32_array(self):
|
|
cdef int len
|
|
cdef dbus_int32_t *retval
|
|
cdef int i
|
|
dbus_message_iter_get_int32_array(self.iter, &retval, <int*>&len)
|
|
python_list = []
|
|
for i from 0 <= i < len:
|
|
python_list.append(retval[i])
|
|
return python_list
|
|
|
|
def get_uint32_array(self):
|
|
cdef int len
|
|
cdef dbus_uint32_t *retval
|
|
cdef int i
|
|
dbus_message_iter_get_uint32_array(self.iter, &retval, <int*>&len)
|
|
python_list = []
|
|
for i from 0 <= i < len:
|
|
python_list.append(retval[i])
|
|
return python_list
|
|
|
|
def get_double_array(self):
|
|
cdef int len
|
|
cdef double *retval
|
|
cdef int i
|
|
dbus_message_iter_get_double_array(self.iter, &retval, <int*>&len)
|
|
python_list = []
|
|
for i from 0 <= i < len:
|
|
python_list.append(retval[i])
|
|
return python_list
|
|
|
|
|
|
def get_string_array(self):
|
|
cdef int len
|
|
cdef char **retval
|
|
cdef int i
|
|
dbus_message_iter_get_string_array(self.iter, &retval, <int*>&len)
|
|
list = []
|
|
for i from 0 <= i < len:
|
|
list.append(retval[i])
|
|
return list
|
|
|
|
def get_object_path_array(self):
|
|
cdef int len
|
|
cdef char **retval
|
|
cdef int i
|
|
dbus_message_iter_get_object_path_array(self.iter, &retval, <int*>&len)
|
|
list = []
|
|
for i from 0 <= i < len:
|
|
list.append(ObjectPath(retval[i]))
|
|
return list
|
|
|
|
# dbus_message_append_iter_init included in class Message
|
|
|
|
#FIXME: handle all the different types?
|
|
def append(self, value):
|
|
value_type = type(value)
|
|
|
|
if value_type == bool:
|
|
retval = self.append_boolean(value)
|
|
elif value_type == int:
|
|
retval = self.append_int32(value)
|
|
elif value_type == str:
|
|
retval = self.append_string(value)
|
|
elif value_type == float:
|
|
retval = self.append_double(value)
|
|
elif value_type == dict:
|
|
retval = self.append_dict(value)
|
|
elif value_type == list:
|
|
if len(value) == 0:
|
|
# Empty lists are currently not supported, returning None instead
|
|
retval = self.append(None)
|
|
else:
|
|
list_type = type(value[0])
|
|
if list_type == str:
|
|
self.append_string_array(value)
|
|
elif list_type == int:
|
|
self.append_int32_array(value)
|
|
elif list_type == float:
|
|
self.append_double_array(value)
|
|
elif isinstance(value[0], ObjectPath):
|
|
self.append_object_path_array(value)
|
|
else:
|
|
raise TypeError, "List of unknown type '%s'" % (list_type)
|
|
elif value_type == None.__class__:
|
|
retval = self.append_nil()
|
|
elif isinstance(value, ObjectPath):
|
|
retval = self.append_object_path(value)
|
|
elif isinstance(value, ByteArray):
|
|
retval = self.append_byte_array(value)
|
|
else:
|
|
raise TypeError, "Argument of unknown type '%s'" % (value_type)
|
|
|
|
return retval
|
|
|
|
def append_nil(self):
|
|
return dbus_message_iter_append_nil(self.iter)
|
|
|
|
def append_boolean(self, value):
|
|
return dbus_message_iter_append_boolean(self.iter, value)
|
|
|
|
def append_byte(self, value):
|
|
if type(value) != str or len(value) != 1:
|
|
raise TypeError
|
|
return dbus_message_iter_append_byte(self.iter, ord(value))
|
|
|
|
def append_int32(self, value):
|
|
return dbus_message_iter_append_int32(self.iter, value)
|
|
|
|
def append_uint32(self, value):
|
|
return dbus_message_iter_append_uint32(self.iter, value)
|
|
|
|
def append_double(self, value):
|
|
return dbus_message_iter_append_double(self.iter, value)
|
|
|
|
def append_string(self, value):
|
|
return dbus_message_iter_append_string(self.iter, value)
|
|
|
|
def append_dict_key(self, value):
|
|
return dbus_message_iter_append_dict_key(self.iter, value)
|
|
|
|
def append_object_path(self, value):
|
|
return dbus_message_iter_append_object_path(self.iter, value)
|
|
|
|
# FIXME: append_array, append_boolean_array, append_uint32_array
|
|
|
|
def append_dict(self, python_dict):
|
|
cdef DBusMessageIter c_dict_iter
|
|
cdef MessageIter dict_iter
|
|
|
|
dbus_message_iter_append_dict(self.iter, &c_dict_iter)
|
|
|
|
dict_iter = MessageIter()
|
|
dict_iter.__cinit__(&c_dict_iter)
|
|
|
|
for key, value in python_dict.iteritems():
|
|
if type(key) != str:
|
|
raise TypeError, "DBus dict keys must be strings"
|
|
dict_iter.append_dict_key(key)
|
|
dict_iter.append(value)
|
|
|
|
def append_byte_array(self, python_list):
|
|
cdef unsigned char * value
|
|
cdef int length
|
|
cdef int i
|
|
length = len(python_list)
|
|
value = <unsigned char*>malloc(length * sizeof(unsigned char))
|
|
for i from 0 <= i < length:
|
|
item = python_list[i]
|
|
if type(item) != str or len(item) != 1:
|
|
raise TypeError
|
|
value[i] = ord(item)
|
|
return dbus_message_iter_append_byte_array(self.iter, value, length)
|
|
|
|
def append_int32_array(self, python_list):
|
|
cdef dbus_int32_t *value
|
|
cdef int length
|
|
cdef int i
|
|
length = len(python_list)
|
|
value = <dbus_int32_t*>malloc(length * sizeof(dbus_int32_t))
|
|
for i from 0 <= i < length:
|
|
item = python_list[i]
|
|
if type(item) != int:
|
|
raise TypeError
|
|
value[i] = item
|
|
return dbus_message_iter_append_int32_array(self.iter, value, length)
|
|
|
|
def append_double_array(self, python_list):
|
|
cdef double *value
|
|
cdef int length
|
|
cdef int i
|
|
length = len(python_list)
|
|
value = <double*>malloc(length * sizeof(double))
|
|
for i from 0 <= i < length:
|
|
item = python_list[i]
|
|
if type(item) != float:
|
|
raise TypeError
|
|
value[i] = item
|
|
return dbus_message_iter_append_double_array(self.iter, value, length)
|
|
|
|
def append_object_path_array(self, list):
|
|
cdef char **value
|
|
cdef int length
|
|
cdef int i
|
|
length = len(list)
|
|
value = <char**>malloc(length * sizeof(char *))
|
|
for i from 0 <= i < length:
|
|
item = list[i]
|
|
if not isinstance(item, ObjectPath):
|
|
raise TypeError
|
|
value[i] = item
|
|
|
|
return dbus_message_iter_append_object_path_array(self.iter, value, length)
|
|
|
|
def append_string_array(self, python_list):
|
|
cdef char **value
|
|
cdef int length
|
|
cdef dbus_bool_t return_code
|
|
cdef int i
|
|
length = len(python_list)
|
|
value = <char**>malloc(length * sizeof(char *))
|
|
for i from 0 <= i < length:
|
|
item = python_list[i]
|
|
if type(item) != str:
|
|
raise TypeError
|
|
value[i] = item
|
|
return dbus_message_iter_append_string_array(self.iter, value, length)
|
|
|
|
(MESSAGE_TYPE_INVALID, MESSAGE_TYPE_METHOD_CALL, MESSAGE_TYPE_METHOD_RETURN, MESSAGE_TYPE_ERROR, MESSAGE_TYPE_SIGNAL) = range(5)
|
|
(TYPE_INVALID, TYPE_NIL, TYPE_BYTE, TYPE_BOOLEAN, TYPE_INT32, TYPE_UINT32, TYPE_INT64, TYPE_UINT64, TYPE_DOUBLE, TYPE_STRING, TYPE_CUSTOM, TYPE_ARRAY, TYPE_DICT, TYPE_OBJECT_PATH) = (0, ord('v'), ord('y'), ord('b'), ord('i'), ord('u'), ord('x'), ord('t'), ord('d'), ord('s'), ord('c'), ord('a'), ord('m'), ord('o'))
|
|
(HANDLER_RESULT_HANDLED, HANDLER_RESULT_NOT_YET_HANDLED, HANDLER_RESULT_NEED_MEMORY) = range(3)
|
|
|
|
cdef class Message:
|
|
cdef DBusMessage *msg
|
|
|
|
def __init__(self, message_type=MESSAGE_TYPE_INVALID,
|
|
service=None, path=None, interface=None, method=None,
|
|
Message method_call=None,
|
|
name=None,
|
|
Message reply_to=None, error_name=None, error_message=None,
|
|
_create=1):
|
|
cdef char *cservice
|
|
cdef DBusMessage *cmsg
|
|
|
|
if (service == None):
|
|
cservice = NULL
|
|
else:
|
|
cservice = service
|
|
|
|
if not _create:
|
|
return
|
|
|
|
if message_type == MESSAGE_TYPE_METHOD_CALL:
|
|
self.msg = dbus_message_new_method_call(cservice, path, interface, method)
|
|
elif message_type == MESSAGE_TYPE_METHOD_RETURN:
|
|
cmsg = method_call._get_msg()
|
|
self.msg = dbus_message_new_method_return(cmsg)
|
|
elif message_type == MESSAGE_TYPE_SIGNAL:
|
|
self.msg = dbus_message_new_signal(path, interface, name)
|
|
elif message_type == MESSAGE_TYPE_ERROR:
|
|
cmsg = reply_to._get_msg()
|
|
self.msg = dbus_message_new_error(cmsg, error_name, error_message)
|
|
|
|
def type_to_name(self, type):
|
|
if type == MESSAGE_TYPE_SIGNAL:
|
|
return "signal"
|
|
elif type == MESSAGE_TYPE_METHOD_CALL:
|
|
return "method call"
|
|
elif type == MESSAGE_TYPE_METHOD_RETURN:
|
|
return "method return"
|
|
elif type == MESSAGE_TYPE_ERROR:
|
|
return "error"
|
|
else:
|
|
return "(unknown message type)"
|
|
|
|
def __str__(self):
|
|
message_type = self.get_type()
|
|
sender = self.get_sender()
|
|
|
|
if sender == None:
|
|
sender = "(no sender)"
|
|
|
|
if (message_type == MESSAGE_TYPE_METHOD_CALL) or (message_type == MESSAGE_TYPE_SIGNAL):
|
|
retval = '%s interface=%s; member=%s; sender=%s' % (self.type_to_name(message_type),
|
|
self.get_interface(),
|
|
self.get_member(),
|
|
sender)
|
|
elif message_type == MESSAGE_TYPE_METHOD_RETURN:
|
|
retval = '%s sender=%s' % (self.type_to_name(message_type),
|
|
sender)
|
|
elif message_type == MESSAGE_TYPE_ERROR:
|
|
retval = '%s name=%s; sender=%s' % (self.type_to_name(message_type),
|
|
self.get_error_name(),
|
|
sender)
|
|
else:
|
|
retval = "Message of unknown type %d" % (message_type)
|
|
|
|
|
|
# FIXME: should really use self.convert_to_tuple() here
|
|
|
|
iter = self.get_iter()
|
|
value_at_iter = True
|
|
|
|
while (value_at_iter):
|
|
type = iter.get_arg_type()
|
|
|
|
if type == TYPE_INVALID:
|
|
break
|
|
elif type == TYPE_NIL:
|
|
arg = 'nil:None\n'
|
|
elif type == TYPE_STRING:
|
|
str = iter.get_string()
|
|
arg = 'string:%s\n' % (str)
|
|
elif type == TYPE_OBJECT_PATH:
|
|
path = iter.get_object_path()
|
|
arg = 'object_path:%s\n' % (path)
|
|
elif type == TYPE_INT32:
|
|
num = iter.get_int32()
|
|
arg = 'int32:%d\n' % (num)
|
|
elif type == TYPE_UINT32:
|
|
num = iter.get_uint32()
|
|
arg = 'uint32:%u\n' % (num)
|
|
elif type == TYPE_DOUBLE:
|
|
num = iter.get_double()
|
|
arg = 'double:%f\n' % (num)
|
|
elif type == TYPE_BYTE:
|
|
num = iter.get_byte()
|
|
arg = 'byte:%d\n' % (num)
|
|
elif type == TYPE_BOOLEAN:
|
|
bool = iter.get_boolean()
|
|
if (bool):
|
|
str = "true"
|
|
else:
|
|
str = "false"
|
|
arg = 'boolean:%s\n' % (str)
|
|
else:
|
|
arg = '(unknown arg type %d)\n' % type
|
|
|
|
retval = retval + arg
|
|
value_at_iter = iter.next()
|
|
|
|
return retval
|
|
|
|
cdef _set_msg(self, DBusMessage *msg):
|
|
self.msg = msg
|
|
|
|
cdef DBusMessage *_get_msg(self):
|
|
return self.msg
|
|
|
|
def get_iter(self):
|
|
cdef DBusMessageIter iter
|
|
cdef MessageIter message_iter
|
|
cdef DBusMessage *msg
|
|
|
|
msg = self._get_msg()
|
|
dbus_message_iter_init(msg, &iter)
|
|
|
|
message_iter = MessageIter()
|
|
message_iter.__cinit__(&iter)
|
|
|
|
return message_iter
|
|
|
|
def get_args_list(self):
|
|
retval = [ ]
|
|
|
|
iter = self.get_iter()
|
|
try:
|
|
retval.append(iter.get())
|
|
except TypeError, e:
|
|
return [ ]
|
|
|
|
value_at_iter = iter.next()
|
|
while (value_at_iter):
|
|
retval.append(iter.get())
|
|
value_at_iter = iter.next()
|
|
|
|
return retval
|
|
|
|
# FIXME: implement dbus_message_copy?
|
|
|
|
def get_type(self):
|
|
return dbus_message_get_type(self.msg)
|
|
|
|
def set_path(self, object_path):
|
|
return dbus_message_set_path(self.msg, object_path)
|
|
|
|
def get_path(self):
|
|
return dbus_message_get_path(self.msg)
|
|
|
|
def set_interface(self, interface):
|
|
return dbus_message_set_interface(self.msg, interface)
|
|
|
|
def get_interface(self):
|
|
return dbus_message_get_interface(self.msg)
|
|
|
|
def set_member(self, member):
|
|
return dbus_message_set_member(self.msg, member)
|
|
|
|
def get_member(self):
|
|
return dbus_message_get_member(self.msg)
|
|
|
|
def set_error_name(self, name):
|
|
return dbus_message_set_error_name(self.msg, name)
|
|
|
|
def get_error_name(self):
|
|
return dbus_message_get_error_name(self.msg)
|
|
|
|
def set_destination(self, destination):
|
|
return dbus_message_set_destination(self.msg, destination)
|
|
|
|
def get_destination(self):
|
|
return dbus_message_get_destination(self.msg)
|
|
|
|
def set_sender(self, sender):
|
|
return dbus_message_set_sender(self.msg, sender)
|
|
|
|
def get_sender(self):
|
|
cdef char *sender
|
|
sender = dbus_message_get_sender(self.msg)
|
|
if (sender == NULL):
|
|
return None
|
|
else:
|
|
return sender
|
|
|
|
def set_no_reply(self, no_reply):
|
|
dbus_message_set_no_reply(self.msg, no_reply)
|
|
|
|
def get_no_reply(self):
|
|
return dbus_message_get_no_reply(self.msg)
|
|
|
|
def is_method_call(self, interface, method):
|
|
return dbus_message_is_method_call(self.msg, interface, method)
|
|
|
|
def is_signal(self, interface, signal_name):
|
|
return dbus_message_is_signal(self.msg, interface, signal_name)
|
|
|
|
def is_error(self, error_name):
|
|
return dbus_message_is_error(self.msg, error_name)
|
|
|
|
def has_destination(self, service):
|
|
return dbus_message_has_destination(self.msg, service)
|
|
|
|
def has_sender(self, service):
|
|
return dbus_message_has_sender(self.msg, service)
|
|
|
|
def get_serial(self):
|
|
return dbus_message_get_serial(self.msg)
|
|
|
|
def set_reply_serial(self, reply_serial):
|
|
return dbus_message_set_reply_serial(self.msg, reply_serial)
|
|
|
|
def get_reply_serial(self):
|
|
return dbus_message_get_reply_serial(self.msg)
|
|
|
|
#FIXME: dbus_message_get_path_decomposed
|
|
|
|
# FIXME: all the different dbus_message_*args* methods
|
|
|
|
class Signal(Message):
|
|
def __init__(self, spath, sinterface, sname):
|
|
Message.__init__(self, MESSAGE_TYPE_SIGNAL, path=spath, interface=sinterface, name=sname)
|
|
|
|
class MethodCall(Message):
|
|
def __init__(self, mpath, minterface, mmethod):
|
|
Message.__init__(self, MESSAGE_TYPE_METHOD_CALL, path=mpath, interface=minterface, method=mmethod)
|
|
|
|
class MethodReturn(Message):
|
|
def __init__(self, method_call):
|
|
Message.__init__(self, MESSAGE_TYPE_METHOD_RETURN, method_call=method_call)
|
|
|
|
class Error(Message):
|
|
def __init__(self, reply_to, error_name, error_message):
|
|
Message.__init__(self, MESSAGE_TYPE_ERROR, reply_to=reply_to, error_name=error_name, error_message=error_message)
|
|
|
|
cdef class Server:
|
|
cdef DBusServer *server
|
|
def __init__(self, address):
|
|
cdef DBusError error
|
|
dbus_error_init(&error)
|
|
self.server = dbus_server_listen(address,
|
|
&error)
|
|
if dbus_error_is_set(&error):
|
|
raise DBusException, error.message
|
|
|
|
def setup_with_g_main (self):
|
|
dbus_server_setup_with_g_main(self.server, NULL)
|
|
|
|
def disconnect(self):
|
|
dbus_server_disconnect(self.server)
|
|
|
|
def get_is_connected(self):
|
|
return dbus_server_get_is_connected(self.server)
|
|
|
|
# def set_new_connection_function(self, function, data):
|
|
# dbus_server_set_new_connection_function(self.conn, function,
|
|
# data, NULL)
|
|
|
|
# def set_watch_functions(self, add_function, remove_function, data):
|
|
# dbus_server_set_watch_functions(self.server,
|
|
# add_function, remove_function,
|
|
# data, NULL)
|
|
|
|
# def set_timeout_functions(self, add_function, remove_function, data):
|
|
# dbus_server_set_timeout_functions(self.server,
|
|
# add_function, remove_function,
|
|
# data, NULL)
|
|
|
|
# def handle_watch(self, watch, condition):
|
|
# dbus_server_handle_watch(self.conn, watch, condition)
|
|
|
|
BUS_SESSION = DBUS_BUS_SESSION
|
|
BUS_SYSTEM = DBUS_BUS_SYSTEM
|
|
BUS_ACTIVATION = DBUS_BUS_ACTIVATION
|
|
|
|
def bus_get (bus_type):
|
|
cdef DBusError error
|
|
cdef Connection conn
|
|
dbus_error_init(&error)
|
|
cdef DBusConnection *connection
|
|
|
|
connection = dbus_bus_get(bus_type,
|
|
&error)
|
|
|
|
if dbus_error_is_set(&error):
|
|
raise DBusException, error.message
|
|
|
|
conn = Connection()
|
|
conn.__cinit__(None, connection)
|
|
return conn
|
|
|
|
def bus_get_base_service(Connection connection):
|
|
cdef DBusConnection *conn
|
|
conn = connection._get_conn()
|
|
return dbus_bus_get_base_service(conn)
|
|
|
|
def bus_register(Connection connection):
|
|
cdef DBusError error
|
|
dbus_error_init(&error)
|
|
cdef dbus_bool_t retval
|
|
cdef DBusConnection *conn
|
|
|
|
conn = connection._get_conn()
|
|
retval = dbus_bus_register(conn,
|
|
&error)
|
|
if dbus_error_is_set(&error):
|
|
raise DBusException, error.message
|
|
|
|
return retval
|
|
|
|
SERVICE_FLAG_PROHIBIT_REPLACEMENT = 0x1
|
|
SERVICE_FLAG_REPLACE_EXISTING = 0x2
|
|
|
|
def bus_acquire_service(Connection connection, service_name, flags=0):
|
|
cdef DBusError error
|
|
dbus_error_init(&error)
|
|
cdef int retval
|
|
cdef DBusConnection *conn
|
|
|
|
conn = connection._get_conn()
|
|
retval = dbus_bus_acquire_service(conn,
|
|
service_name,
|
|
flags,
|
|
&error)
|
|
if dbus_error_is_set(&error):
|
|
raise DBusException, error.message
|
|
return retval
|
|
|
|
def bus_service_exists(Connection connection, service_name):
|
|
cdef DBusError error
|
|
dbus_error_init(&error)
|
|
cdef dbus_bool_t retval
|
|
cdef DBusConnection *conn
|
|
|
|
conn = connection._get_conn()
|
|
retval = dbus_bus_service_exists(conn,
|
|
service_name,
|
|
&error)
|
|
if dbus_error_is_set(&error):
|
|
raise DBusException, error.message
|
|
return retval
|
|
|
|
def bus_add_match(Connection connection, rule):
|
|
cdef DBusError error
|
|
cdef DBusConnection *conn
|
|
|
|
dbus_error_init(&error)
|
|
|
|
conn = connection._get_conn()
|
|
dbus_bus_add_match (conn, rule, &error)
|
|
|
|
if dbus_error_is_set(&error):
|
|
raise DBusException, error.message
|
|
|
|
def bus_remove_match(Connection connection, rule):
|
|
cdef DBusError error
|
|
cdef DBusConnection *conn
|
|
|
|
dbus_error_init(&error)
|
|
|
|
conn = connection._get_conn()
|
|
dbus_bus_remove_match (conn, rule, &error)
|
|
|
|
if dbus_error_is_set(&error):
|
|
raise DBusException, error.message
|