From 69e973e6b30b7f6b21a0765e643ade7ef749ea53 Mon Sep 17 00:00:00 2001 From: Peter Hutterer Date: Tue, 10 Oct 2023 11:09:48 +1000 Subject: [PATCH] ei: queue unsent messages for later delivery if our buffer is full If our write buffer is full, enqueue the events instead and try to flush them out later when we can write again. Fixes #46 --- src/libei-private.h | 10 ++++ src/libei.c | 76 +++++++++++++++++++++++-- test/test-ei.c | 134 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 214 insertions(+), 6 deletions(-) diff --git a/src/libei-private.h b/src/libei-private.h index 18bd50e..8586292 100644 --- a/src/libei-private.h +++ b/src/libei-private.h @@ -79,6 +79,11 @@ struct ei_interface_versions { uint32_t ei_touchscreen; }; +struct ei_unsent { + struct list node; + struct iobuf *buf; +}; + struct ei { struct object object; @@ -94,6 +99,8 @@ struct ei { struct brei_context *brei; struct sink *sink; struct source *source; + struct list unsent_queue; + struct ei_backend_interface backend_interface; void *backend; enum ei_state state; @@ -143,6 +150,9 @@ int ei_send_message(struct ei *ei, const struct brei_object *object, uint32_t opcode, const char *signature, size_t nargs, ...); +int +ei_unsent_flush(struct ei* ei); + void ei_connected(struct ei *ei); diff --git a/src/libei.c b/src/libei.c index 2193529..4f41f24 100644 --- a/src/libei.c +++ b/src/libei.c @@ -50,6 +50,9 @@ _Static_assert(sizeof(enum ei_keymap_type) == sizeof(int), "Invalid enum size"); _Static_assert(sizeof(enum ei_event_type) == sizeof(int), "Invalid enum size"); _Static_assert(sizeof(enum ei_log_priority) == sizeof(int), "Invalid enum size"); +static void +ei_unsent_free(struct ei_unsent *unsent); + static void ei_destroy(struct ei *ei) { @@ -59,6 +62,11 @@ ei_destroy(struct ei *ei) while ((e = ei_get_event(ei)) != NULL) ei_event_unref(e); + struct ei_unsent *unsent; + list_for_each_safe(unsent, &ei->unsent_queue, node) { + ei_unsent_free(unsent); + } + if (ei->backend_interface.destroy) ei->backend_interface.destroy(ei, ei->backend); ei->backend = NULL; @@ -100,6 +108,7 @@ ei_create_context(bool is_sender, void *user_data) list_init(&ei->event_queue); list_init(&ei->seats); list_init(&ei->proto_objects); + list_init(&ei->unsent_queue); ei->interface_versions = (struct ei_interface_versions){ .ei_connection = VERSION_V(1), @@ -729,12 +738,19 @@ connection_dispatch(struct source *source, void *userdata) struct ei *ei = userdata; enum ei_state old_state = ei->state; - _unref_(brei_result) *result = brei_dispatch(ei->brei, source_get_fd(source), - lookup_object, ei); - if (result) { - log_warn(ei, "Connection error: %s", brei_result_get_explanation(result)); - brei_drain_fd(source_get_fd(source)); + /* Flush any pending writes, if we have them */ + int rc = ei_unsent_flush(ei); + if (rc < 0 && rc != -EAGAIN) { + log_warn(ei, "Error flushing unsent queue: %s", strerror(-rc)); ei_disconnect(ei); + } else { + _unref_(brei_result) *result = brei_dispatch(ei->brei, source_get_fd(source), + lookup_object, ei); + if (result) { + log_warn(ei, "Connection error: %s", brei_result_get_explanation(result)); + brei_drain_fd(source_get_fd(source)); + ei_disconnect(ei); + } } static const char *states[] = { @@ -752,6 +768,45 @@ connection_dispatch(struct source *source, void *userdata) states[ei->state]); } +static void +ei_queue_unsent(struct ei *ei, struct source *source, struct iobuf *buf) +{ + if (list_empty(&ei->unsent_queue)) { + source_enable_write(source, true); + } + + struct ei_unsent *unsent = xalloc(sizeof *unsent); + unsent->buf = buf; + list_append(&ei->unsent_queue, &unsent->node); +} + +static void +ei_unsent_free(struct ei_unsent *unsent) +{ + list_remove(&unsent->node); + iobuf_free(unsent->buf); + free(unsent); +} + +int +ei_unsent_flush(struct ei* ei) +{ + if (list_empty(&ei->unsent_queue)) + return 0; + + struct source *source = ei->source; + int fd = source_get_fd(source); + struct ei_unsent *unsent; + list_for_each_safe(unsent, &ei->unsent_queue, node) { + int rc = iobuf_send(unsent->buf, fd); + if (rc < 0) + return rc; + ei_unsent_free(unsent); + } + source_enable_write(source, false); + return 0; +} + int ei_send_message(struct ei *ei, const struct brei_object *object, uint32_t opcode, const char *signature, size_t nargs, ...) @@ -781,7 +836,16 @@ ei_send_message(struct ei *ei, const struct brei_object *object, _cleanup_iobuf_ struct iobuf *buf = brei_result_get_data(result); assert(buf); int fd = source_get_fd(ei->source); - int rc = iobuf_send(buf, fd); + + int rc = ei_unsent_flush(ei); + if (rc >= 0) + rc = iobuf_send(buf, fd); + if (rc == -EAGAIN) { + ei_queue_unsent(ei, ei->source, steal(&buf)); + rc = 0; + } else if (rc < 0){ + log_warn(ei, "failed to send message: %s", strerror(-rc)); + } return rc < 0 ? rc : 0; } diff --git a/test/test-ei.c b/test/test-ei.c index cc89eda..3cf852f 100644 --- a/test/test-ei.c +++ b/test/test-ei.c @@ -545,3 +545,137 @@ MUNIT_TEST(test_xdotool_rel_motion) return MUNIT_OK; } + +MUNIT_TEST(test_ei_exceed_write_buffer) +{ + _unref_(peck) *peck = peck_new(); + + peck_enable_eis_behavior(peck, PECK_EIS_BEHAVIOR_NONE); + peck_enable_eis_behavior(peck, PECK_EIS_BEHAVIOR_ACCEPT_ALL); + peck_enable_eis_behavior(peck, PECK_EIS_BEHAVIOR_ADD_POINTER); + peck_enable_ei_behavior(peck, PECK_EI_BEHAVIOR_AUTODEVICES); + peck_enable_ei_behavior(peck, PECK_EI_BEHAVIOR_AUTOSTART); + peck_dispatch_until_stable(peck); + + uint64_t toffset = peck_ei_now(peck); + unsigned int count = 10000; /* Large enough to require several flushes */ + struct eis_event *events[count]; + + with_server(peck) { + peck_drain_eis(eis); + } + + with_client(peck) { + struct ei_device *device = peck_ei_get_default_pointer(peck); + for (unsigned int i = 0; i < count/2; i++) { + ei_device_pointer_motion(device, -1, 10); + ei_device_frame(device, toffset + i); + } + } + + peck_dispatch_eis(peck); + + unsigned int before_buffer = 0; + with_server(peck) { + struct eis_event *next; + while ((next = eis_get_event(eis))) { + events[before_buffer++] = next; + munit_assert_uint(before_buffer, <=, count); + } + } + + unsigned int nevents = before_buffer; + if (before_buffer < count) { + /* We sent >socket buffersize events, so we don't expect to receive all of those */ + /* Calling dispatch (on both) should flush some more */ + peck_dispatch_until_stable(peck); + + with_server(peck) { + struct eis_event *next; + while ((next = eis_get_event(eis))) { + events[nevents++] = next; + + munit_assert_uint(nevents, <=, count); + + if (nevents % 50 == 0) + peck_dispatch_ei(peck); + + _unref_(eis_event) *next = eis_peek_event(eis); + if (!next) + peck_dispatch_eis(peck); + } + } + }; + + munit_assert_uint(nevents, ==, count); + + for (unsigned int i = 0; i < count; i += 2) { + _unref_(eis_event) *motion = events[i]; + _unref_(eis_event) *frame = events[i+1]; + + uint64_t time = eis_event_get_time(frame); + + munit_assert_string_equal(peck_eis_event_name(motion), + peck_eis_event_type_name(EIS_EVENT_POINTER_MOTION)); + munit_assert_string_equal(peck_eis_event_name(frame), + peck_eis_event_type_name(EIS_EVENT_FRAME)); + munit_assert_int64(time, ==, toffset + i/2); + } + + /* Our events are as expected but we never got EAGAIN on + * the buffer, so let's count this test as skipped */ + if (before_buffer == count) + return MUNIT_SKIP; + + return MUNIT_OK; +} + +MUNIT_TEST(test_ei_exceed_write_buffer_cleanup) +{ + _unref_(peck) *peck = peck_new(); + struct ei *ei = peck_get_ei(peck); + + peck_enable_eis_behavior(peck, PECK_EIS_BEHAVIOR_NONE); + peck_enable_eis_behavior(peck, PECK_EIS_BEHAVIOR_ACCEPT_ALL); + peck_enable_eis_behavior(peck, PECK_EIS_BEHAVIOR_ADD_POINTER); + peck_enable_ei_behavior(peck, PECK_EI_BEHAVIOR_AUTODEVICES); + peck_enable_ei_behavior(peck, PECK_EI_BEHAVIOR_AUTOSTART); + peck_dispatch_until_stable(peck); + + uint64_t toffset = peck_ei_now(peck); + unsigned int count = 10000; /* Large enough to require several flushes */ + + with_server(peck) { + peck_drain_eis(eis); + } + + with_client(peck) { + struct ei_device *device = peck_ei_get_default_pointer(peck); + for (unsigned int i = 0; i < count/2; i++) { + ei_device_pointer_motion(device, -1, 10); + ei_device_frame(device, toffset + i); + } + } + + peck_dispatch_eis(peck); + + unsigned int before_buffer = 0; + with_server(peck) { + struct eis_event *next; + while ((next = eis_get_event(eis))) { + munit_assert_uint(before_buffer, <=, count); + eis_event_unref(next); + } + } + + /* Our events are as expected but we never got EAGAIN on + * the buffer, so let's count this test as skipped */ + if (before_buffer == count) + return MUNIT_SKIP; + + /* Make sure cleanup is handled properly */ + peck_drop_ei(peck); + ei_unref(ei); + + return MUNIT_OK; +}