diff --git a/src/libei-private.h b/src/libei-private.h index 18bd50e..8586292 100644 --- a/src/libei-private.h +++ b/src/libei-private.h @@ -79,6 +79,11 @@ struct ei_interface_versions { uint32_t ei_touchscreen; }; +struct ei_unsent { + struct list node; + struct iobuf *buf; +}; + struct ei { struct object object; @@ -94,6 +99,8 @@ struct ei { struct brei_context *brei; struct sink *sink; struct source *source; + struct list unsent_queue; + struct ei_backend_interface backend_interface; void *backend; enum ei_state state; @@ -143,6 +150,9 @@ int ei_send_message(struct ei *ei, const struct brei_object *object, uint32_t opcode, const char *signature, size_t nargs, ...); +int +ei_unsent_flush(struct ei* ei); + void ei_connected(struct ei *ei); diff --git a/src/libei.c b/src/libei.c index 2193529..4f41f24 100644 --- a/src/libei.c +++ b/src/libei.c @@ -50,6 +50,9 @@ _Static_assert(sizeof(enum ei_keymap_type) == sizeof(int), "Invalid enum size"); _Static_assert(sizeof(enum ei_event_type) == sizeof(int), "Invalid enum size"); _Static_assert(sizeof(enum ei_log_priority) == sizeof(int), "Invalid enum size"); +static void +ei_unsent_free(struct ei_unsent *unsent); + static void ei_destroy(struct ei *ei) { @@ -59,6 +62,11 @@ ei_destroy(struct ei *ei) while ((e = ei_get_event(ei)) != NULL) ei_event_unref(e); + struct ei_unsent *unsent; + list_for_each_safe(unsent, &ei->unsent_queue, node) { + ei_unsent_free(unsent); + } + if (ei->backend_interface.destroy) ei->backend_interface.destroy(ei, ei->backend); ei->backend = NULL; @@ -100,6 +108,7 @@ ei_create_context(bool is_sender, void *user_data) list_init(&ei->event_queue); list_init(&ei->seats); list_init(&ei->proto_objects); + list_init(&ei->unsent_queue); ei->interface_versions = (struct ei_interface_versions){ .ei_connection = VERSION_V(1), @@ -729,12 +738,19 @@ connection_dispatch(struct source *source, void *userdata) struct ei *ei = userdata; enum ei_state old_state = ei->state; - _unref_(brei_result) *result = brei_dispatch(ei->brei, source_get_fd(source), - lookup_object, ei); - if (result) { - log_warn(ei, "Connection error: %s", brei_result_get_explanation(result)); - brei_drain_fd(source_get_fd(source)); + /* Flush any pending writes, if we have them */ + int rc = ei_unsent_flush(ei); + if (rc < 0 && rc != -EAGAIN) { + log_warn(ei, "Error flushing unsent queue: %s", strerror(-rc)); ei_disconnect(ei); + } else { + _unref_(brei_result) *result = brei_dispatch(ei->brei, source_get_fd(source), + lookup_object, ei); + if (result) { + log_warn(ei, "Connection error: %s", brei_result_get_explanation(result)); + brei_drain_fd(source_get_fd(source)); + ei_disconnect(ei); + } } static const char *states[] = { @@ -752,6 +768,45 @@ connection_dispatch(struct source *source, void *userdata) states[ei->state]); } +static void +ei_queue_unsent(struct ei *ei, struct source *source, struct iobuf *buf) +{ + if (list_empty(&ei->unsent_queue)) { + source_enable_write(source, true); + } + + struct ei_unsent *unsent = xalloc(sizeof *unsent); + unsent->buf = buf; + list_append(&ei->unsent_queue, &unsent->node); +} + +static void +ei_unsent_free(struct ei_unsent *unsent) +{ + list_remove(&unsent->node); + iobuf_free(unsent->buf); + free(unsent); +} + +int +ei_unsent_flush(struct ei* ei) +{ + if (list_empty(&ei->unsent_queue)) + return 0; + + struct source *source = ei->source; + int fd = source_get_fd(source); + struct ei_unsent *unsent; + list_for_each_safe(unsent, &ei->unsent_queue, node) { + int rc = iobuf_send(unsent->buf, fd); + if (rc < 0) + return rc; + ei_unsent_free(unsent); + } + source_enable_write(source, false); + return 0; +} + int ei_send_message(struct ei *ei, const struct brei_object *object, uint32_t opcode, const char *signature, size_t nargs, ...) @@ -781,7 +836,16 @@ ei_send_message(struct ei *ei, const struct brei_object *object, _cleanup_iobuf_ struct iobuf *buf = brei_result_get_data(result); assert(buf); int fd = source_get_fd(ei->source); - int rc = iobuf_send(buf, fd); + + int rc = ei_unsent_flush(ei); + if (rc >= 0) + rc = iobuf_send(buf, fd); + if (rc == -EAGAIN) { + ei_queue_unsent(ei, ei->source, steal(&buf)); + rc = 0; + } else if (rc < 0){ + log_warn(ei, "failed to send message: %s", strerror(-rc)); + } return rc < 0 ? rc : 0; } diff --git a/test/test-ei.c b/test/test-ei.c index cc89eda..3cf852f 100644 --- a/test/test-ei.c +++ b/test/test-ei.c @@ -545,3 +545,137 @@ MUNIT_TEST(test_xdotool_rel_motion) return MUNIT_OK; } + +MUNIT_TEST(test_ei_exceed_write_buffer) +{ + _unref_(peck) *peck = peck_new(); + + peck_enable_eis_behavior(peck, PECK_EIS_BEHAVIOR_NONE); + peck_enable_eis_behavior(peck, PECK_EIS_BEHAVIOR_ACCEPT_ALL); + peck_enable_eis_behavior(peck, PECK_EIS_BEHAVIOR_ADD_POINTER); + peck_enable_ei_behavior(peck, PECK_EI_BEHAVIOR_AUTODEVICES); + peck_enable_ei_behavior(peck, PECK_EI_BEHAVIOR_AUTOSTART); + peck_dispatch_until_stable(peck); + + uint64_t toffset = peck_ei_now(peck); + unsigned int count = 10000; /* Large enough to require several flushes */ + struct eis_event *events[count]; + + with_server(peck) { + peck_drain_eis(eis); + } + + with_client(peck) { + struct ei_device *device = peck_ei_get_default_pointer(peck); + for (unsigned int i = 0; i < count/2; i++) { + ei_device_pointer_motion(device, -1, 10); + ei_device_frame(device, toffset + i); + } + } + + peck_dispatch_eis(peck); + + unsigned int before_buffer = 0; + with_server(peck) { + struct eis_event *next; + while ((next = eis_get_event(eis))) { + events[before_buffer++] = next; + munit_assert_uint(before_buffer, <=, count); + } + } + + unsigned int nevents = before_buffer; + if (before_buffer < count) { + /* We sent >socket buffersize events, so we don't expect to receive all of those */ + /* Calling dispatch (on both) should flush some more */ + peck_dispatch_until_stable(peck); + + with_server(peck) { + struct eis_event *next; + while ((next = eis_get_event(eis))) { + events[nevents++] = next; + + munit_assert_uint(nevents, <=, count); + + if (nevents % 50 == 0) + peck_dispatch_ei(peck); + + _unref_(eis_event) *next = eis_peek_event(eis); + if (!next) + peck_dispatch_eis(peck); + } + } + }; + + munit_assert_uint(nevents, ==, count); + + for (unsigned int i = 0; i < count; i += 2) { + _unref_(eis_event) *motion = events[i]; + _unref_(eis_event) *frame = events[i+1]; + + uint64_t time = eis_event_get_time(frame); + + munit_assert_string_equal(peck_eis_event_name(motion), + peck_eis_event_type_name(EIS_EVENT_POINTER_MOTION)); + munit_assert_string_equal(peck_eis_event_name(frame), + peck_eis_event_type_name(EIS_EVENT_FRAME)); + munit_assert_int64(time, ==, toffset + i/2); + } + + /* Our events are as expected but we never got EAGAIN on + * the buffer, so let's count this test as skipped */ + if (before_buffer == count) + return MUNIT_SKIP; + + return MUNIT_OK; +} + +MUNIT_TEST(test_ei_exceed_write_buffer_cleanup) +{ + _unref_(peck) *peck = peck_new(); + struct ei *ei = peck_get_ei(peck); + + peck_enable_eis_behavior(peck, PECK_EIS_BEHAVIOR_NONE); + peck_enable_eis_behavior(peck, PECK_EIS_BEHAVIOR_ACCEPT_ALL); + peck_enable_eis_behavior(peck, PECK_EIS_BEHAVIOR_ADD_POINTER); + peck_enable_ei_behavior(peck, PECK_EI_BEHAVIOR_AUTODEVICES); + peck_enable_ei_behavior(peck, PECK_EI_BEHAVIOR_AUTOSTART); + peck_dispatch_until_stable(peck); + + uint64_t toffset = peck_ei_now(peck); + unsigned int count = 10000; /* Large enough to require several flushes */ + + with_server(peck) { + peck_drain_eis(eis); + } + + with_client(peck) { + struct ei_device *device = peck_ei_get_default_pointer(peck); + for (unsigned int i = 0; i < count/2; i++) { + ei_device_pointer_motion(device, -1, 10); + ei_device_frame(device, toffset + i); + } + } + + peck_dispatch_eis(peck); + + unsigned int before_buffer = 0; + with_server(peck) { + struct eis_event *next; + while ((next = eis_get_event(eis))) { + munit_assert_uint(before_buffer, <=, count); + eis_event_unref(next); + } + } + + /* Our events are as expected but we never got EAGAIN on + * the buffer, so let's count this test as skipped */ + if (before_buffer == count) + return MUNIT_SKIP; + + /* Make sure cleanup is handled properly */ + peck_drop_ei(peck); + ei_unref(ei); + + return MUNIT_OK; +}