eis: if a client is slow, queue up messages for future delivery

If a receiver client stops calling ei_dispatch for a while eventually
we fill up the send buffer, causing messages to be quietly dropped.
When the client resumes the message stream resumes with whatever we send
next but that can leave the client in an inconsistent state.

deskflow hit this in the server-side where our event sequence of pointer
motion+frames eventually filled up the buffer, causing
eis_device_stop_emulating() to be silently dropped. On the next
InputCapture sequence eis_device_start_emulating() was sent to an
already emulating client (as seen by the client).

This patch adds a secondary queue, if we fail to send a message with
EAGAIN queue it up and flush that queue whenever the next message is
sent. Meanwhile any newly added messages go straight into that queue.

The caveat here: a nonresponding client will eventually trigger OOM,
there is no upper limit on the messages yet

This is the libeis version of
commit 69e973e6b3 ("ei: queue unsent messages for later delivery if our buffer is full")

Closes: #79
Part-of: <https://gitlab.freedesktop.org/libinput/libei/-/merge_requests/331>
This commit is contained in:
Peter Hutterer 2025-05-02 14:59:58 +10:00
parent daf0b24665
commit daba46a2ae
3 changed files with 193 additions and 12 deletions

View file

@ -47,12 +47,25 @@
DEFINE_TRISTATE(started, finished, connected);
DEFINE_UNREF_CLEANUP_FUNC(brei_result);
struct eis_unsent {
struct list node;
struct iobuf *buf;
};
static void
eis_unsent_free(struct eis_unsent *unsent);
static void
client_drop_seats(struct eis_client *client);
static void
eis_client_destroy(struct eis_client *client)
{
struct eis_unsent *unsent;
list_for_each_safe(unsent, &client->unsent_queue, node) {
eis_unsent_free(unsent);
}
client_drop_seats(client);
eis_handshake_unref(client->setup);
eis_connection_unref(client->connection);
@ -151,6 +164,47 @@ eis_client_is_sender(struct eis_client *client)
return client->is_sender;
}
static void
eis_client_queue_unsent(struct eis_client *client,
struct source *source,
struct iobuf *buf)
{
if (list_empty(&client->unsent_queue)) {
source_enable_write(source, true);
}
struct eis_unsent *unsent = xalloc(sizeof *unsent);
unsent->buf = buf;
list_append(&client->unsent_queue, &unsent->node);
}
static void
eis_unsent_free(struct eis_unsent *unsent)
{
list_remove(&unsent->node);
iobuf_free(unsent->buf);
free(unsent);
}
static int
eis_client_unsent_flush(struct eis_client* client)
{
if (list_empty(&client->unsent_queue))
return 0;
struct source *source = client->source;
int fd = source_get_fd(source);
struct eis_unsent *unsent;
list_for_each_safe(unsent, &client->unsent_queue, node) {
int rc = iobuf_send(unsent->buf, fd);
if (rc < 0)
return rc;
eis_unsent_free(unsent);
}
source_enable_write(source, false);
return 0;
}
int
eis_client_send_message(struct eis_client *client, const struct brei_object *object,
uint32_t opcode, const char *signature, size_t nargs, ...)
@ -182,7 +236,19 @@ eis_client_send_message(struct eis_client *client, const struct brei_object *obj
_cleanup_iobuf_ struct iobuf *buf = brei_result_get_data(result);
assert(buf);
int fd = source_get_fd(client->source);
int rc = iobuf_send(buf, fd);
int rc = -EPIPE;
if (fd != -1) {
rc = eis_client_unsent_flush(client);
if (rc >= 0)
rc = iobuf_send(buf, fd);
if (rc == -EAGAIN) {
eis_client_queue_unsent(client, client->source, steal(&buf));
rc = 0;
} else if (rc < 0){
log_warn(eis, "failed to send message: %s", strerror(-rc));
source_remove(client->source);
}
}
return rc < 0 ? rc : 0;
}
@ -390,18 +456,26 @@ client_dispatch(struct source *source, void *userdata)
_unref_(eis_client) *client = eis_client_ref(userdata);
enum eis_client_state old_state = client->state;
_unref_(brei_result) *result = brei_dispatch(client->brei, source_get_fd(source),
lookup_object, client);
if (result) {
if (old_state != EIS_CLIENT_STATE_REQUESTED_DISCONNECT ||
brei_result_get_reason(result) != BREI_CONNECTION_DISCONNECT_REASON_TRANSPORT)
log_warn(eis_client_get_context(client), "Client error: %s",
brei_result_get_explanation(result));
/* Flush any pending writes, if we have them */
int rc = eis_client_unsent_flush(client);
if (rc < 0 && rc != -EAGAIN) {
log_warn(eis_client_get_context(client),
"Error flushing unsent queue: %s", strerror(-rc));
eis_client_disconnect(client);
} else {
_unref_(brei_result) *result = brei_dispatch(client->brei, source_get_fd(source),
lookup_object, client);
if (result) {
if (old_state != EIS_CLIENT_STATE_REQUESTED_DISCONNECT ||
brei_result_get_reason(result) != BREI_CONNECTION_DISCONNECT_REASON_TRANSPORT)
log_warn(eis_client_get_context(client), "Client error: %s",
brei_result_get_explanation(result));
brei_drain_fd(source_get_fd(source));
eis_client_disconnect_with_reason(client,
brei_result_get_reason(result),
brei_result_get_explanation(result));
brei_drain_fd(source_get_fd(source));
eis_client_disconnect_with_reason(client,
brei_result_get_reason(result),
brei_result_get_explanation(result));
}
}
static const char *client_states[] = {
@ -436,6 +510,7 @@ eis_client_new(struct eis *eis, int fd)
list_init(&client->seats);
list_init(&client->seats_pending);
list_init(&client->proto_objects);
list_init(&client->unsent_queue);
client->interface_versions = (struct eis_client_interface_versions){
.ei_connection = VERSION_V(1),

View file

@ -59,6 +59,8 @@ struct eis_client {
struct brei_context *brei;
struct eis_connection *connection;
struct list unsent_queue;
struct list proto_objects; /* struct brei_objects list */
object_id_t next_object_id;
object_id_t last_client_object_id;

View file

@ -398,6 +398,110 @@ MUNIT_TEST(eistest_multiple_emulating)
return MUNIT_OK;
}
MUNIT_TEST(eistest_socket_overflow)
{
_unref_(peck) *peck = peck_new_context(PECK_EI_RECEIVER);
peck_enable_eis_behavior(peck, PECK_EIS_BEHAVIOR_ACCEPT_CLIENT);
peck_enable_eis_behavior(peck, PECK_EIS_BEHAVIOR_DEFAULT_SEAT);
peck_enable_eis_behavior(peck, PECK_EIS_BEHAVIOR_HANDLE_BIND_SEAT);
peck_enable_eis_behavior(peck, PECK_EIS_BEHAVIOR_ADD_POINTER);
peck_enable_eis_behavior(peck, PECK_EIS_BEHAVIOR_RESUME_DEVICE);
peck_dispatch_until_stable(peck);
peck_dispatch_until_stable(peck);
with_server(peck) {
struct eis_device *device = peck_eis_get_default_pointer(peck);
eis_device_start_emulating(device, 1);
for (size_t i = 0; i < 500; i++) {
eis_device_pointer_motion(device, -10, -10);
eis_device_frame(device, 0 + i);
}
eis_device_stop_emulating(device);
}
peck_dispatch_until_stable(peck);
enum ei_event_type last_type = -1;
with_client(peck) {
_unref_(ei_event) *added =
peck_ei_next_event(ei, EI_EVENT_DEVICE_ADDED);
_unref_(ei_event) *resumed =
peck_ei_next_event(ei, EI_EVENT_DEVICE_RESUMED);
_unref_(ei_event) *start =
peck_ei_next_event(ei, EI_EVENT_DEVICE_START_EMULATING);
do {
struct ei_event *peek = ei_peek_event(ei);
if (!peek)
break;
last_type = ei_event_get_type(peek);
switch (last_type) {
case EI_EVENT_FRAME:
case EI_EVENT_POINTER_MOTION:
break;
default:
munit_assert(last_type == EI_EVENT_FRAME ||
last_type == EI_EVENT_POINTER_MOTION);
break;
}
ei_event_unref(peek);
ei_event_unref(ei_get_event(ei));
} while (true);
peck_assert_no_ei_events(ei);
}
/* We've hit the first batch, now let the server send
* the remaining events */
peck_dispatch_until_stable(peck);
bool complete = false;
with_client(peck) {
if (last_type == EI_EVENT_POINTER_MOTION) {
_unref_(ei_event) *frame = peck_ei_next_event(ei, EI_EVENT_FRAME);
} else if (last_type == EI_EVENT_FRAME) {
/* This fail if the buffer filled just
* before the STOP_EVENT event. Adjust the test
* if that ever happens */
_unref_(ei_event) *motion = peck_ei_next_event(ei, EI_EVENT_POINTER_MOTION);
}
do {
struct ei_event *peek = ei_peek_event(ei);
if (!peek) {
peck_dispatch_until_stable(peck);
break;
}
last_type = ei_event_get_type(peek);
switch (last_type) {
case EI_EVENT_FRAME:
case EI_EVENT_POINTER_MOTION:
break;
case EI_EVENT_DEVICE_STOP_EMULATING:
complete = true;
break;
default:
munit_error(ei_event_type_to_string(last_type));
break;
}
ei_event_unref(peek);
ei_event_unref(ei_get_event(ei));
} while (!complete);
}
with_client(peck) {
peck_assert_no_ei_events(ei);
}
return MUNIT_OK;
}
MUNIT_TEST(test_eis_ping)
{
_unref_(peck) *peck = peck_new();