diff --git a/src/libeis-client.c b/src/libeis-client.c index 84dce6a..1620728 100644 --- a/src/libeis-client.c +++ b/src/libeis-client.c @@ -47,12 +47,25 @@ DEFINE_TRISTATE(started, finished, connected); DEFINE_UNREF_CLEANUP_FUNC(brei_result); +struct eis_unsent { + struct list node; + struct iobuf *buf; +}; + +static void +eis_unsent_free(struct eis_unsent *unsent); + static void client_drop_seats(struct eis_client *client); static void eis_client_destroy(struct eis_client *client) { + struct eis_unsent *unsent; + list_for_each_safe(unsent, &client->unsent_queue, node) { + eis_unsent_free(unsent); + } + client_drop_seats(client); eis_handshake_unref(client->setup); eis_connection_unref(client->connection); @@ -151,6 +164,47 @@ eis_client_is_sender(struct eis_client *client) return client->is_sender; } +static void +eis_client_queue_unsent(struct eis_client *client, + struct source *source, + struct iobuf *buf) +{ + if (list_empty(&client->unsent_queue)) { + source_enable_write(source, true); + } + + struct eis_unsent *unsent = xalloc(sizeof *unsent); + unsent->buf = buf; + list_append(&client->unsent_queue, &unsent->node); +} + +static void +eis_unsent_free(struct eis_unsent *unsent) +{ + list_remove(&unsent->node); + iobuf_free(unsent->buf); + free(unsent); +} + +static int +eis_client_unsent_flush(struct eis_client* client) +{ + if (list_empty(&client->unsent_queue)) + return 0; + + struct source *source = client->source; + int fd = source_get_fd(source); + struct eis_unsent *unsent; + list_for_each_safe(unsent, &client->unsent_queue, node) { + int rc = iobuf_send(unsent->buf, fd); + if (rc < 0) + return rc; + eis_unsent_free(unsent); + } + source_enable_write(source, false); + return 0; +} + int eis_client_send_message(struct eis_client *client, const struct brei_object *object, uint32_t opcode, const char *signature, size_t nargs, ...) @@ -182,7 +236,19 @@ eis_client_send_message(struct eis_client *client, const struct brei_object *obj _cleanup_iobuf_ struct iobuf *buf = brei_result_get_data(result); assert(buf); int fd = source_get_fd(client->source); - int rc = iobuf_send(buf, fd); + int rc = -EPIPE; + if (fd != -1) { + rc = eis_client_unsent_flush(client); + if (rc >= 0) + rc = iobuf_send(buf, fd); + if (rc == -EAGAIN) { + eis_client_queue_unsent(client, client->source, steal(&buf)); + rc = 0; + } else if (rc < 0){ + log_warn(eis, "failed to send message: %s", strerror(-rc)); + source_remove(client->source); + } + } return rc < 0 ? rc : 0; } @@ -390,18 +456,26 @@ client_dispatch(struct source *source, void *userdata) _unref_(eis_client) *client = eis_client_ref(userdata); enum eis_client_state old_state = client->state; - _unref_(brei_result) *result = brei_dispatch(client->brei, source_get_fd(source), - lookup_object, client); - if (result) { - if (old_state != EIS_CLIENT_STATE_REQUESTED_DISCONNECT || - brei_result_get_reason(result) != BREI_CONNECTION_DISCONNECT_REASON_TRANSPORT) - log_warn(eis_client_get_context(client), "Client error: %s", - brei_result_get_explanation(result)); + /* Flush any pending writes, if we have them */ + int rc = eis_client_unsent_flush(client); + if (rc < 0 && rc != -EAGAIN) { + log_warn(eis_client_get_context(client), + "Error flushing unsent queue: %s", strerror(-rc)); + eis_client_disconnect(client); + } else { + _unref_(brei_result) *result = brei_dispatch(client->brei, source_get_fd(source), + lookup_object, client); + if (result) { + if (old_state != EIS_CLIENT_STATE_REQUESTED_DISCONNECT || + brei_result_get_reason(result) != BREI_CONNECTION_DISCONNECT_REASON_TRANSPORT) + log_warn(eis_client_get_context(client), "Client error: %s", + brei_result_get_explanation(result)); - brei_drain_fd(source_get_fd(source)); - eis_client_disconnect_with_reason(client, - brei_result_get_reason(result), - brei_result_get_explanation(result)); + brei_drain_fd(source_get_fd(source)); + eis_client_disconnect_with_reason(client, + brei_result_get_reason(result), + brei_result_get_explanation(result)); + } } static const char *client_states[] = { @@ -436,6 +510,7 @@ eis_client_new(struct eis *eis, int fd) list_init(&client->seats); list_init(&client->seats_pending); list_init(&client->proto_objects); + list_init(&client->unsent_queue); client->interface_versions = (struct eis_client_interface_versions){ .ei_connection = VERSION_V(1), diff --git a/src/libeis-client.h b/src/libeis-client.h index ec92da3..14b4833 100644 --- a/src/libeis-client.h +++ b/src/libeis-client.h @@ -59,6 +59,8 @@ struct eis_client { struct brei_context *brei; struct eis_connection *connection; + struct list unsent_queue; + struct list proto_objects; /* struct brei_objects list */ object_id_t next_object_id; object_id_t last_client_object_id; diff --git a/test/test-eis.c b/test/test-eis.c index dff081f..0dd32b9 100644 --- a/test/test-eis.c +++ b/test/test-eis.c @@ -398,6 +398,110 @@ MUNIT_TEST(eistest_multiple_emulating) return MUNIT_OK; } +MUNIT_TEST(eistest_socket_overflow) +{ + _unref_(peck) *peck = peck_new_context(PECK_EI_RECEIVER); + + peck_enable_eis_behavior(peck, PECK_EIS_BEHAVIOR_ACCEPT_CLIENT); + peck_enable_eis_behavior(peck, PECK_EIS_BEHAVIOR_DEFAULT_SEAT); + peck_enable_eis_behavior(peck, PECK_EIS_BEHAVIOR_HANDLE_BIND_SEAT); + peck_enable_eis_behavior(peck, PECK_EIS_BEHAVIOR_ADD_POINTER); + peck_enable_eis_behavior(peck, PECK_EIS_BEHAVIOR_RESUME_DEVICE); + peck_dispatch_until_stable(peck); + + peck_dispatch_until_stable(peck); + + with_server(peck) { + struct eis_device *device = peck_eis_get_default_pointer(peck); + eis_device_start_emulating(device, 1); + + for (size_t i = 0; i < 500; i++) { + eis_device_pointer_motion(device, -10, -10); + eis_device_frame(device, 0 + i); + } + eis_device_stop_emulating(device); + } + + peck_dispatch_until_stable(peck); + + enum ei_event_type last_type = -1; + + with_client(peck) { + _unref_(ei_event) *added = + peck_ei_next_event(ei, EI_EVENT_DEVICE_ADDED); + _unref_(ei_event) *resumed = + peck_ei_next_event(ei, EI_EVENT_DEVICE_RESUMED); + _unref_(ei_event) *start = + peck_ei_next_event(ei, EI_EVENT_DEVICE_START_EMULATING); + + do { + struct ei_event *peek = ei_peek_event(ei); + if (!peek) + break; + + last_type = ei_event_get_type(peek); + switch (last_type) { + case EI_EVENT_FRAME: + case EI_EVENT_POINTER_MOTION: + break; + default: + munit_assert(last_type == EI_EVENT_FRAME || + last_type == EI_EVENT_POINTER_MOTION); + break; + } + ei_event_unref(peek); + ei_event_unref(ei_get_event(ei)); + } while (true); + + peck_assert_no_ei_events(ei); + } + + /* We've hit the first batch, now let the server send + * the remaining events */ + peck_dispatch_until_stable(peck); + + bool complete = false; + + with_client(peck) { + if (last_type == EI_EVENT_POINTER_MOTION) { + _unref_(ei_event) *frame = peck_ei_next_event(ei, EI_EVENT_FRAME); + } else if (last_type == EI_EVENT_FRAME) { + /* This fail if the buffer filled just + * before the STOP_EVENT event. Adjust the test + * if that ever happens */ + _unref_(ei_event) *motion = peck_ei_next_event(ei, EI_EVENT_POINTER_MOTION); + } + do { + struct ei_event *peek = ei_peek_event(ei); + if (!peek) { + peck_dispatch_until_stable(peck); + break; + } + + last_type = ei_event_get_type(peek); + switch (last_type) { + case EI_EVENT_FRAME: + case EI_EVENT_POINTER_MOTION: + break; + case EI_EVENT_DEVICE_STOP_EMULATING: + complete = true; + break; + default: + munit_error(ei_event_type_to_string(last_type)); + break; + } + ei_event_unref(peek); + ei_event_unref(ei_get_event(ei)); + } while (!complete); + } + + with_client(peck) { + peck_assert_no_ei_events(ei); + } + + return MUNIT_OK; +} + MUNIT_TEST(test_eis_ping) { _unref_(peck) *peck = peck_new();