2003-02-16 Alexander Larsson <alexl@redhat.com>

* dbus/dbus-connection.c:
	Implement sent_message_with_reply. (with_reply_and block is still
	busted).
	Made dispatch_message not lose message if OOM.

	* dbus/dbus-errors.h:
	Add NoReply error (for reply timeouts).
This commit is contained in:
Alexander Larsson 2003-02-16 15:18:35 +00:00
parent 8925ff6d98
commit 0f323e6cad
3 changed files with 294 additions and 11 deletions

View file

@ -1,3 +1,13 @@
2003-02-16 Alexander Larsson <alexl@redhat.com>
* dbus/dbus-connection.c:
Implement sent_message_with_reply. (with_reply_and block is still
busted).
Made dispatch_message not lose message if OOM.
* dbus/dbus-errors.h:
Add NoReply error (for reply timeouts).
2003-02-16 Alexander Larsson <alexl@redhat.com>
* dbus/dbus-hash.c (_dbus_hash_table_unref):

View file

@ -110,13 +110,33 @@ struct DBusConnection
DBusDataSlot *data_slots; /**< Data slots */
int n_slots; /**< Slots allocated so far. */
DBusHashTable *pending_replies; /**< Hash of message serials and their message handlers. */
DBusCounter *connection_counter; /**< Counter that we decrement when finalized */
int client_serial; /**< Client serial. Increments each time a message is sent */
DBusList *disconnect_message_link;
};
typedef struct
{
DBusConnection *connection;
DBusMessageHandler *handler;
DBusTimeout *timeout;
int serial;
DBusList *timeout_link; /* Preallocated timeout response */
dbus_bool_t timeout_added;
dbus_bool_t connection_added;
} ReplyHandlerData;
static void reply_handler_data_free (ReplyHandlerData *data);
static void _dbus_connection_free_data_slots_nolock (DBusConnection *connection);
static void _dbus_connection_remove_timeout_locked (DBusConnection *connection,
DBusTimeout *timeout);
/**
* Adds a message to the incoming message queue, returning #FALSE
@ -130,11 +150,29 @@ dbus_bool_t
_dbus_connection_queue_received_message (DBusConnection *connection,
DBusMessage *message)
{
ReplyHandlerData *reply_handler_data;
dbus_int32_t reply_serial;
_dbus_assert (_dbus_transport_get_is_authenticated (connection->transport));
if (!_dbus_list_append (&connection->incoming_messages,
message))
return FALSE;
/* If this is a reply we're waiting on, remove timeout for it */
reply_serial = _dbus_message_get_reply_serial (message);
if (reply_serial != -1)
{
reply_handler_data = _dbus_hash_table_lookup_int (connection->pending_replies,
reply_serial);
if (reply_handler_data != NULL)
{
if (reply_handler_data->timeout_added)
_dbus_connection_remove_timeout_locked (connection,
reply_handler_data->timeout);
reply_handler_data->timeout_added = FALSE;
}
}
dbus_message_ref (message);
connection->n_incoming += 1;
@ -296,6 +334,16 @@ _dbus_connection_remove_timeout (DBusConnection *connection,
timeout);
}
static void
_dbus_connection_remove_timeout_locked (DBusConnection *connection,
DBusTimeout *timeout)
{
dbus_mutex_lock (connection->mutex);
_dbus_connection_remove_timeout (connection, timeout);
dbus_mutex_unlock (connection->mutex);
}
/**
* Tells the connection that the transport has been disconnected.
* Results in posting a disconnect message on the incoming message
@ -422,17 +470,18 @@ _dbus_connection_new_for_transport (DBusTransport *transport)
DBusConnection *connection;
DBusWatchList *watch_list;
DBusTimeoutList *timeout_list;
DBusHashTable *handler_table;
DBusHashTable *handler_table, *pending_replies;
DBusMutex *mutex;
DBusCondVar *message_returned_cond;
DBusCondVar *dispatch_cond;
DBusCondVar *io_path_cond;
DBusList *disconnect_link;
DBusMessage *disconnect_message;
watch_list = NULL;
connection = NULL;
handler_table = NULL;
pending_replies = NULL;
timeout_list = NULL;
mutex = NULL;
message_returned_cond = NULL;
@ -454,6 +503,12 @@ _dbus_connection_new_for_transport (DBusTransport *transport)
dbus_free, NULL);
if (handler_table == NULL)
goto error;
pending_replies =
_dbus_hash_table_new (DBUS_HASH_INT,
NULL, (DBusFreeFunction)reply_handler_data_free);
if (pending_replies == NULL)
goto error;
connection = dbus_new0 (DBusConnection, 1);
if (connection == NULL)
@ -495,6 +550,7 @@ _dbus_connection_new_for_transport (DBusTransport *transport)
connection->watches = watch_list;
connection->timeouts = timeout_list;
connection->handler_table = handler_table;
connection->pending_replies = pending_replies;
connection->filter_list = NULL;
connection->data_slots = NULL;
@ -532,6 +588,9 @@ _dbus_connection_new_for_transport (DBusTransport *transport)
if (handler_table)
_dbus_hash_table_unref (handler_table);
if (pending_replies)
_dbus_hash_table_unref (pending_replies);
if (watch_list)
_dbus_watch_list_free (watch_list);
@ -738,9 +797,12 @@ _dbus_connection_last_unref (DBusConnection *connection)
link = next;
}
_dbus_hash_table_unref (connection->handler_table);
connection->handler_table = NULL;
_dbus_hash_table_unref (connection->pending_replies);
connection->pending_replies = NULL;
_dbus_list_clear (&connection->filter_list);
@ -919,6 +981,54 @@ dbus_connection_send_message (DBusConnection *connection,
return TRUE;
}
static void
reply_handler_timeout (void *data)
{
DBusConnection *connection;
ReplyHandlerData *reply_handler_data = data;
connection = reply_handler_data->connection;
dbus_mutex_lock (connection->mutex);
if (reply_handler_data->timeout_link)
{
_dbus_connection_queue_synthesized_message_link (connection,
reply_handler_data->timeout_link);
reply_handler_data->timeout_link = NULL;
}
_dbus_connection_remove_timeout (connection,
reply_handler_data->timeout);
reply_handler_data->timeout_added = FALSE;
dbus_mutex_unlock (connection->mutex);
}
static void
reply_handler_data_free (ReplyHandlerData *data)
{
if (!data)
return;
if (data->timeout_added)
_dbus_connection_remove_timeout_locked (data->connection,
data->timeout);
if (data->connection_added)
_dbus_message_handler_remove_connection (data->handler,
data->connection);
if (data->timeout_link)
{
dbus_message_unref ((DBusMessage *)data->timeout_link->data);
_dbus_list_free_link (data->timeout_link);
}
dbus_message_handler_unref (data->handler);
dbus_free (data);
}
/**
* Queues a message to send, as with dbus_connection_send_message(),
* but also sets up a DBusMessageHandler to receive a reply to the
@ -934,8 +1044,8 @@ dbus_connection_send_message (DBusConnection *connection,
* message as a reply, after a reply has been seen the handler is
* removed. If a filter filters out the reply before the handler sees
* it, the handler is not removed but the timeout will immediately
* fire again. If a filter was dumb and kept removing the timeout
* reply then we'd get in an infinite loop.
* fire. If a filter was dumb and removed the timeout reply then
* the reply is lost (this will give a runtime warning).
*
* If #NULL is passed for the reply_handler, the timeout reply will
* still be generated and placed into the message queue, but no
@ -969,8 +1079,120 @@ dbus_connection_send_message_with_reply (DBusConnection *connection,
int timeout_milliseconds,
DBusResultCode *result)
{
/* FIXME */
return dbus_connection_send_message (connection, message, NULL, result);
DBusTimeout *timeout;
ReplyHandlerData *data;
DBusMessage *reply;
DBusList *reply_link;
dbus_int32_t serial = -1;
if (timeout_milliseconds == -1)
timeout_milliseconds = DEFAULT_TIMEOUT_VALUE;
data = dbus_new0 (ReplyHandlerData, 1);
if (!data)
{
dbus_set_result (result, DBUS_RESULT_NO_MEMORY);
return FALSE;
}
timeout = _dbus_timeout_new (timeout_milliseconds, reply_handler_timeout,
data, NULL);
if (!timeout)
{
reply_handler_data_free (data);
dbus_set_result (result, DBUS_RESULT_NO_MEMORY);
return FALSE;
}
dbus_mutex_lock (connection->mutex);
/* Add timeout */
if (!_dbus_connection_add_timeout (connection, timeout))
{
reply_handler_data_free (data);
_dbus_timeout_unref (timeout);
dbus_mutex_unlock (connection->mutex);
dbus_set_result (result, DBUS_RESULT_NO_MEMORY);
return FALSE;
}
/* The connection now owns the reference to the timeout. */
_dbus_timeout_unref (timeout);
data->timeout_added = TRUE;
data->timeout = timeout;
data->connection = connection;
if (!_dbus_message_handler_add_connection (reply_handler, connection))
{
dbus_mutex_unlock (connection->mutex);
reply_handler_data_free (data);
dbus_set_result (result, DBUS_RESULT_NO_MEMORY);
return FALSE;
}
data->connection_added = TRUE;
/* Assign a serial to the message */
if (_dbus_message_get_client_serial (message) == -1)
{
serial = _dbus_connection_get_next_client_serial (connection);
_dbus_message_set_client_serial (message, serial);
}
data->handler = reply_handler;
data->serial = serial;
dbus_message_handler_ref (reply_handler);
reply = dbus_message_new_error_reply (message, DBUS_ERROR_NO_REPLY,
"No reply within specified time");
if (!reply)
{
dbus_mutex_unlock (connection->mutex);
reply_handler_data_free (data);
dbus_set_result (result, DBUS_RESULT_NO_MEMORY);
return FALSE;
}
reply_link = _dbus_list_alloc_link (reply);
if (!reply)
{
dbus_mutex_unlock (connection->mutex);
dbus_message_unref (reply);
reply_handler_data_free (data);
dbus_set_result (result, DBUS_RESULT_NO_MEMORY);
return FALSE;
}
data->timeout_link = reply_link;
/* Insert the serial in the pending replies hash. */
if (!_dbus_hash_table_insert_int (connection->pending_replies, serial, data))
{
dbus_set_result (result, DBUS_RESULT_NO_MEMORY);
dbus_mutex_unlock (connection->mutex);
reply_handler_data_free (data);
return FALSE;
}
dbus_mutex_unlock (connection->mutex);
if (!dbus_connection_send_message (connection, message, NULL, result))
{
/* This will free the handler data too */
_dbus_hash_table_remove_int (connection->pending_replies, serial);
return FALSE;
}
dbus_set_result (result, DBUS_RESULT_SUCCESS);
return TRUE;
}
/**
@ -1256,6 +1478,15 @@ _dbus_connection_release_dispatch (DBusConnection *connection)
dbus_condvar_wake_one (connection->dispatch_cond);
}
static void
_dbus_connection_failed_pop (DBusConnection *connection,
DBusList *message_link)
{
_dbus_list_prepend_link (&connection->incoming_messages,
message_link);
connection->n_incoming += 1;
}
/**
* Pops the first-received message from the current incoming message
* queue, runs any handlers for it, then unrefs the message.
@ -1266,11 +1497,19 @@ _dbus_connection_release_dispatch (DBusConnection *connection)
dbus_bool_t
dbus_connection_dispatch_message (DBusConnection *connection)
{
DBusMessageHandler *handler;
DBusMessage *message;
DBusList *link, *filter_list_copy;
DBusList *link, *filter_list_copy, *message_link;
DBusHandlerResult result;
ReplyHandlerData *reply_handler_data;
const char *name;
dbus_int32_t reply_serial;
/* Preallocate link so we can put the message back on failure */
message_link = _dbus_list_alloc_link (NULL);
if (message_link)
return FALSE;
dbus_mutex_lock (connection->mutex);
/* We need to ref the connection since the callback could potentially
@ -1292,13 +1531,20 @@ dbus_connection_dispatch_message (DBusConnection *connection)
dbus_connection_unref (connection);
return FALSE;
}
message_link->data = message;
result = DBUS_HANDLER_RESULT_ALLOW_MORE_HANDLERS;
reply_serial = _dbus_message_get_reply_serial (message);
reply_handler_data = _dbus_hash_table_lookup_int (connection->pending_replies,
reply_serial);
if (!_dbus_list_copy (&connection->filter_list, &filter_list_copy))
{
_dbus_connection_release_dispatch (connection);
dbus_mutex_unlock (connection->mutex);
_dbus_connection_failed_pop (connection, message_link);
dbus_connection_unref (connection);
return FALSE;
}
@ -1332,15 +1578,40 @@ dbus_connection_dispatch_message (DBusConnection *connection)
_dbus_list_clear (&filter_list_copy);
dbus_mutex_lock (connection->mutex);
/* Did a reply we were waiting on get filtered? */
if (reply_handler_data && result == DBUS_HANDLER_RESULT_REMOVE_MESSAGE)
{
/* Queue the timeout immediately! */
if (reply_handler_data->timeout_link)
{
_dbus_connection_queue_synthesized_message_link (connection,
reply_handler_data->timeout_link);
reply_handler_data->timeout_link = NULL;
}
else
{
/* We already queued the timeout? Then it was filtered! */
_dbus_warn ("The timeout for the reply to %d was filtered\n", reply_serial);
}
}
if (result == DBUS_HANDLER_RESULT_REMOVE_MESSAGE)
goto out;
if (reply_handler_data)
{
dbus_mutex_unlock (connection->mutex);
result = _dbus_message_handler_handle_message (reply_handler_data->handler,
connection, message);
reply_handler_data_free (reply_handler_data);
dbus_mutex_lock (connection->mutex);
goto out;
}
name = dbus_message_get_name (message);
if (name != NULL)
{
DBusMessageHandler *handler;
handler = _dbus_hash_table_lookup_string (connection->handler_table,
name);
if (handler != NULL)
@ -1359,6 +1630,7 @@ dbus_connection_dispatch_message (DBusConnection *connection)
out:
_dbus_connection_release_dispatch (connection);
dbus_mutex_unlock (connection->mutex);
_dbus_list_free_link (message_link);
dbus_connection_unref (connection);
dbus_message_unref (message);

View file

@ -54,6 +54,7 @@ struct DBusError
#define DBUS_ERROR_SPAWN_FAILED "org.freedesktop.DBus.Error.Spawn.Failed"
#define DBUS_ERROR_NO_MEMORY "org.freedesktop.DBus.Error.NoMemory"
#define DBUS_ERROR_SERVICE_DOES_NOT_EXIST "org.freedesktop.DBus.Error.ServiceDoesNotExist"
#define DBUS_ERROR_NO_REPLY "org.freedesktop.DBus.Error.NoReply"
typedef enum
{