Process multiple messages in one dispatch

We may get more than one message per dispatch so let's make sure we iterate
through all the available data.

Signed-off-by: Peter Hutterer <peter.hutterer@who-t.net>
This commit is contained in:
Peter Hutterer 2020-07-31 14:14:21 +10:00
parent 68fe48c02f
commit 9f4827135d
2 changed files with 68 additions and 42 deletions

View file

@ -404,10 +404,10 @@ ei_get_event(struct ei *ei)
}
static struct message *
connection_parse_message(const char *data_in, size_t len)
connection_parse_message(const char *data_in, size_t *len)
{
_cleanup_(message_freep) struct message *msg = xalloc(sizeof(*msg));
ServerMessage *proto = server_message__unpack(NULL, len,
ServerMessage *proto = server_message__unpack(NULL, *len,
(const unsigned char*)data_in);
if (!proto)
return NULL;
@ -451,6 +451,8 @@ connection_parse_message(const char *data_in, size_t len)
return NULL;
}
*len = server_message__get_packed_size(proto);
server_message__free_unpacked(proto, NULL);
return steal(&msg);
}
@ -526,9 +528,9 @@ connection_connected_handle_msg(struct ei *ei, struct message *msg)
}
static void
connection_dispatch(struct source *source, void *data)
connection_dispatch(struct source *source, void *userdata)
{
struct ei *ei = data;
struct ei *ei = userdata;
enum ei_state old_state = ei->state;
_cleanup_(message_freep) struct message *msg = NULL;
@ -543,24 +545,35 @@ connection_dispatch(struct source *source, void *data)
goto error;
}
msg = connection_parse_message(iobuf_data(buf), iobuf_len(buf));
if (!msg) {
rc = -EBADMSG;
goto error;
}
size_t idx = 0;
while (true) {
const char *data = iobuf_data(buf) + idx;
size_t len = iobuf_len(buf) - idx;
switch (ei->state) {
case EI_STATE_NEW:
rc = connection_new_handle_msg(ei, msg);
if (len == 0)
break;
case EI_STATE_CONNECTING:
rc = connection_connecting_handle_msg(ei, msg);
break;
case EI_STATE_CONNECTED:
rc = connection_connected_handle_msg(ei, msg);
break;
case EI_STATE_DISCONNECTED:
abort();
msg = connection_parse_message(data, &len);
if (!msg) {
rc = -EBADMSG;
goto error;
}
idx += len;
switch (ei->state) {
case EI_STATE_NEW:
rc = connection_new_handle_msg(ei, msg);
break;
case EI_STATE_CONNECTING:
rc = connection_connecting_handle_msg(ei, msg);
break;
case EI_STATE_CONNECTED:
rc = connection_connected_handle_msg(ei, msg);
break;
case EI_STATE_DISCONNECTED:
abort();
}
}
error:

View file

@ -347,10 +347,10 @@ client_connected_handle_msg(struct eis_client *client,
}
static struct message *
client_parse_message(const char *data_in, size_t len)
client_parse_message(const char *data_in, size_t *len)
{
_cleanup_(message_freep) struct message *msg = xalloc(sizeof(*msg));
ClientMessage *proto = client_message__unpack(NULL, len,
ClientMessage *proto = client_message__unpack(NULL, *len,
(const unsigned char*)data_in);
if (!proto)
@ -406,15 +406,17 @@ client_parse_message(const char *data_in, size_t len)
return NULL;
}
*len = client_message__get_packed_size(proto);
client_message__free_unpacked(proto, NULL);
return steal(&msg);
}
static void
client_dispatch(struct source *source, void *data)
client_dispatch(struct source *source, void *userdata)
{
struct eis_client *client = data;
struct eis_client *client = userdata;
enum eis_client_state old_state = client->state;
_cleanup_(message_freep) struct message *msg = NULL;
@ -429,26 +431,37 @@ client_dispatch(struct source *source, void *data)
goto error;
}
msg = client_parse_message(iobuf_data(buf), iobuf_len(buf));
if (!msg) {
rc = -EBADMSG;
goto error;
}
size_t idx = 0;
while (true) {
const char *data = iobuf_data(buf) + idx;
size_t len = iobuf_len(buf) - idx;
switch (client->state) {
case EIS_CLIENT_STATE_HELLO:
rc = client_hello_handle_msg(client, msg);
if (len == 0)
break;
case EIS_CLIENT_STATE_CONNECTING:
/* Client is waiting for us, shouldn't send anything
* but disconnect */
rc = client_connecting_handle_msg(client, msg);
break;
case EIS_CLIENT_STATE_CONNECTED:
rc = client_connected_handle_msg(client, msg);
break;
case EIS_CLIENT_STATE_DISCONNECTED:
abort();
msg = client_parse_message(data, &len);
if (!msg) {
rc = -EBADMSG;
goto error;
}
idx += len;
switch (client->state) {
case EIS_CLIENT_STATE_HELLO:
rc = client_hello_handle_msg(client, msg);
break;
case EIS_CLIENT_STATE_CONNECTING:
/* Client is waiting for us, shouldn't send anything
* but disconnect */
rc = client_connecting_handle_msg(client, msg);
break;
case EIS_CLIENT_STATE_CONNECTED:
rc = client_connected_handle_msg(client, msg);
break;
case EIS_CLIENT_STATE_DISCONNECTED:
abort();
}
}
error: