diff --git a/src/core/devices/ovs/nm-ovsdb.c b/src/core/devices/ovs/nm-ovsdb.c index bac6235429..2d080b3313 100644 --- a/src/core/devices/ovs/nm-ovsdb.c +++ b/src/core/devices/ovs/nm-ovsdb.c @@ -12,6 +12,7 @@ #include "libnm-glib-aux/nm-jansson.h" #include "libnm-glib-aux/nm-str-buf.h" +#include "libnm-glib-aux/nm-io-utils.h" #include "nm-core-utils.h" #include "libnm-core-intern/nm-core-internal.h" #include "devices/nm-device.h" @@ -134,13 +135,16 @@ enum { static guint signals[LAST_SIGNAL] = {0}; typedef struct { - NMPlatform *platform; - GSocketConnection *conn; - GCancellable *conn_cancellable; - char buf[4096]; /* Input buffer */ - GString *input; /* JSON stream waiting for decoding. */ - GString *output; /* JSON stream to be sent. */ - guint64 call_id_counter; + NMPlatform *platform; + int conn_fd; + GSource *conn_fd_in_source; + GSource *conn_fd_out_source; + GCancellable *conn_cancellable; + + NMStrBuf input_buf; + NMStrBuf output_buf; + + guint64 call_id_counter; CList calls_lst_head; @@ -176,12 +180,13 @@ NM_DEFINE_SINGLETON_GETTER(NMOvsdb, nm_ovsdb_get, NM_TYPE_OVSDB); /*****************************************************************************/ -static void ovsdb_try_connect(NMOvsdb *self); -static void ovsdb_disconnect(NMOvsdb *self, gboolean retry, gboolean is_disposing); -static void ovsdb_read(NMOvsdb *self); -static void ovsdb_write(NMOvsdb *self); -static void ovsdb_next_command(NMOvsdb *self); -static void cleanup_check_ready(NMOvsdb *self); +static void ovsdb_try_connect(NMOvsdb *self); +static void ovsdb_disconnect(NMOvsdb *self, gboolean retry, gboolean is_disposing); +static void ovsdb_read(NMOvsdb *self); +static void ovsdb_write_try(NMOvsdb *self); +static gboolean ovsdb_write_cb(int fd, GIOCondition condition, gpointer user_data); +static void ovsdb_next_command(NMOvsdb *self); +static void cleanup_check_ready(NMOvsdb *self); /*****************************************************************************/ @@ -1448,7 +1453,7 @@ ovsdb_next_command(NMOvsdb *self) nm_auto_free char *cmd = NULL; nm_auto_decref_json json_t *msg = NULL; - if (!priv->conn) + if (priv->conn_fd < 0) return; if (c_list_is_empty(&priv->calls_lst_head)) @@ -1585,9 +1590,9 @@ ovsdb_next_command(NMOvsdb *self) cmd = json_dumps(msg, 0); _LOGT_call(call, "send: call-id=%" G_GUINT64_FORMAT ", %s", call->call_id, cmd); - g_string_append(priv->output, cmd); + nm_str_buf_append(&priv->output_buf, cmd); - ovsdb_write(self); + ovsdb_write_try(self); } /** @@ -2189,16 +2194,12 @@ ovsdb_got_echo(NMOvsdb *self, json_int_t id, json_t *data) NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self); nm_auto_decref_json json_t *msg = NULL; nm_auto_free char *reply = NULL; - gboolean output_was_empty; - - output_was_empty = priv->output->len == 0; msg = json_pack("{s:I, s:O}", "id", id, "result", data); reply = json_dumps(msg, 0); - g_string_append(priv->output, reply); + nm_str_buf_append(&priv->output_buf, reply); - if (output_was_empty) - ovsdb_write(self); + ovsdb_write_try(self); } /** @@ -2302,7 +2303,7 @@ ovsdb_got_msg(NMOvsdb *self, json_t *msg) /* Don't progress further commands in case the callback hit an error * and disconnected us. */ - if (!priv->conn) + if (priv->conn_fd < 0) return; /* Now we're free to serialize and send the next command, if any. */ @@ -2318,8 +2319,8 @@ ovsdb_got_msg(NMOvsdb *self, json_t *msg) /*****************************************************************************/ typedef struct { - gsize bufp; - GString *input; + gsize bufp; + NMStrBuf *input; } JsonReadMsgData; /* Lower level marshalling and demarshalling of the JSON-RPC traffic on the @@ -2339,13 +2340,13 @@ _json_read_msg_cb(void *buffer, size_t buflen, void *user_data) } /* Pass one more byte to the JSON decoder. */ - *(char *) buffer = data->input->str[data->bufp]; + *(char *) buffer = nm_str_buf_get_char(data->input, data->bufp); data->bufp++; return 1; } static json_t * -_json_read_msg(GString *input) +_json_read_msg(NMStrBuf *input) { JsonReadMsgData data = { .bufp = 0, @@ -2364,114 +2365,112 @@ _json_read_msg(GString *input) return NULL; nm_assert(data.bufp > 0); - g_string_erase(input, 0, data.bufp); + nm_str_buf_erase(input, 0, data.bufp, FALSE); return msg; } -/** - * ovsdb_read_cb: - * - * Read out the data available from the ovsdb socket and try to deserialize - * the JSON. If we see a complete object, pass it upwards to ovsdb_got_msg(). - */ -static void -ovsdb_read_cb(GObject *source_object, GAsyncResult *res, gpointer user_data) -{ - NMOvsdb *self = NM_OVSDB(user_data); - NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self); - GInputStream *stream = G_INPUT_STREAM(source_object); - GError *error = NULL; - gssize size; - - size = g_input_stream_read_finish(stream, res, &error); - if (size == -1) { - /* ovsdb-server was possibly restarted */ - _LOGW("short read from ovsdb: %s", error->message); - priv->num_failures++; - g_clear_error(&error); - ovsdb_disconnect(self, priv->num_failures <= OVSDB_MAX_FAILURES, FALSE); - return; - } - - g_string_append_len(priv->input, priv->buf, size); - while (TRUE) { - nm_auto_decref_json json_t *msg = NULL; - - msg = _json_read_msg(priv->input); - if (!msg) - break; - - ovsdb_got_msg(self, msg); - } - - if (!priv->conn) - return; - - if (size) - ovsdb_read(self); -} - static void ovsdb_read(NMOvsdb *self) { NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self); - - g_input_stream_read_async(g_io_stream_get_input_stream(G_IO_STREAM(priv->conn)), - priv->buf, - sizeof(priv->buf), - G_PRIORITY_DEFAULT, - NULL, - ovsdb_read_cb, - self); -} - -static void -ovsdb_write_cb(GObject *source_object, GAsyncResult *res, gpointer user_data) -{ - GOutputStream *stream = G_OUTPUT_STREAM(source_object); - NMOvsdb *self = NM_OVSDB(user_data); - NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self); - GError *error = NULL; gssize size; - size = g_output_stream_write_finish(stream, res, &error); - if (size == -1) { +again: + size = nm_utils_fd_read(priv->conn_fd, &priv->input_buf); + + if (size <= 0) { + if (size == -EAGAIN) + return; + /* ovsdb-server was possibly restarted */ - _LOGW("short write to ovsdb: %s", error->message); + _LOGW("short read from ovsdb: %s", nm_strerror_native(-size)); priv->num_failures++; - g_clear_error(&error); ovsdb_disconnect(self, priv->num_failures <= OVSDB_MAX_FAILURES, FALSE); return; } - if (!priv->conn) - return; + nm_assert(priv->input_buf.len > 0); - g_string_erase(priv->output, 0, size); + while (TRUE) { + nm_auto_decref_json json_t *msg = NULL; - ovsdb_write(self); + msg = _json_read_msg(&priv->input_buf); + if (!msg) + break; + + ovsdb_got_msg(self, msg); + + if (priv->input_buf.len == 0) + break; + } + + if (priv->input_buf.len > 0) { + /* We have an incomplete message in the message buffer. Don't wait for another round + * of "poll", instead try to read it again. */ + goto again; + } +} + +static gboolean +ovsdb_read_cb(int fd, GIOCondition condition, gpointer user_data) +{ + ovsdb_read(user_data); + return G_SOURCE_CONTINUE; } static void ovsdb_write(NMOvsdb *self) { NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self); - GOutputStream *stream; + gssize n; - if (!priv->output->len) +again: + if (priv->output_buf.len == 0) { + nm_clear_g_source_inst(&priv->conn_fd_out_source); return; + } - stream = g_io_stream_get_output_stream(G_IO_STREAM(priv->conn)); - if (g_output_stream_has_pending(stream)) + n = write(priv->conn_fd, + nm_str_buf_get_str_at_unsafe(&priv->output_buf, 0), + priv->output_buf.len); + + if (n < 0) + n = -NM_ERRNO_NATIVE(errno); + + if (n == -EAGAIN) { + if (!priv->conn_fd_out_source) { + priv->conn_fd_out_source = + nm_g_unix_fd_add_source(priv->conn_fd, G_IO_OUT, ovsdb_write_cb, self); + } return; + } - g_output_stream_write_async(stream, - priv->output->str, - priv->output->len, - G_PRIORITY_DEFAULT, - NULL, - ovsdb_write_cb, - self); + if (n <= 0) { + /* ovsdb-server was possibly restarted */ + _LOGW("short write to ovsdb: %s", nm_strerror_native(-n)); + priv->num_failures++; + ovsdb_disconnect(self, priv->num_failures <= OVSDB_MAX_FAILURES, FALSE); + return; + } + + nm_str_buf_erase(&priv->output_buf, 0, n, FALSE); + goto again; +} + +static void +ovsdb_write_try(NMOvsdb *self) +{ + NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self); + + if (priv->conn_fd >= 0 && !priv->conn_fd_out_source) + ovsdb_write(self); +} + +static gboolean +ovsdb_write_cb(int fd, GIOCondition condition, gpointer user_data) +{ + ovsdb_write(user_data); + return G_SOURCE_CONTINUE; } /*****************************************************************************/ @@ -2494,7 +2493,7 @@ ovsdb_disconnect(NMOvsdb *self, gboolean retry, gboolean is_disposing) nm_assert(!retry || !is_disposing); - if (!priv->conn && !priv->conn_cancellable) + if (priv->conn_fd < 0 && !priv->conn_cancellable) return; _LOGD("disconnecting from ovsdb, retry %d", retry); @@ -2518,9 +2517,11 @@ ovsdb_disconnect(NMOvsdb *self, gboolean retry, gboolean is_disposing) _call_complete(call, NULL, error); } - g_string_truncate(priv->input, 0); - g_string_truncate(priv->output, 0); - g_clear_object(&priv->conn); + nm_str_buf_reset(&priv->input_buf); + nm_str_buf_reset(&priv->output_buf); + nm_clear_fd(&priv->conn_fd); + nm_clear_g_source_inst(&priv->conn_fd_in_source); + nm_clear_g_source_inst(&priv->conn_fd_out_source); nm_clear_g_free(&priv->db_uuid); nm_clear_g_cancellable(&priv->conn_cancellable); @@ -2721,15 +2722,12 @@ _ovsdb_connect_complete_with_fd(NMOvsdb *self, int fd_take) gs_unref_object GSocket *socket = NULL; gs_free_error GError *error = NULL; - socket = g_socket_new_from_fd(nm_steal_fd(&fd_take), &error); - if (!socket) { - _LOGT("connect: failure to open socket for new FD: %s", error->message); - ovsdb_disconnect(self, FALSE, FALSE); - return; - } + nm_clear_g_cancellable(&priv->conn_cancellable); - priv->conn = g_socket_connection_factory_create_connection(socket); - g_clear_object(&priv->conn_cancellable); + nm_io_fcntl_setfl_update_nonblock(fd_take); + + priv->conn_fd = nm_steal_fd(&fd_take); + priv->conn_fd_in_source = nm_g_unix_fd_add_source(priv->conn_fd, G_IO_IN, ovsdb_read_cb, self); ovsdb_read(self); ovsdb_next_command(self); @@ -2803,7 +2801,7 @@ ovsdb_try_connect(NMOvsdb *self) { NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self); - if (priv->conn || priv->conn_cancellable) + if (priv->conn_fd >= 0 || priv->conn_cancellable) return; _LOGT("connect: start connecting socket %s on idle", NM_OVSDB_SOCKET); @@ -2983,11 +2981,15 @@ nm_ovsdb_init(NMOvsdb *self) { NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self); + priv->conn_fd = -1; + + priv->input_buf = NM_STR_BUF_INIT(0, FALSE); + priv->output_buf = NM_STR_BUF_INIT(0, FALSE); + c_list_init(&priv->calls_lst_head); priv->platform = g_object_ref(NM_PLATFORM_GET); - priv->input = g_string_new(NULL); - priv->output = g_string_new(NULL); + priv->bridges = g_hash_table_new_full(nm_pstr_hash, nm_pstr_equal, (GDestroyNotify) _free_bridge, NULL); priv->ports = @@ -3008,14 +3010,8 @@ dispose(GObject *object) nm_assert(c_list_is_empty(&priv->calls_lst_head)); - if (priv->input) { - g_string_free(priv->input, TRUE); - priv->input = NULL; - } - if (priv->output) { - g_string_free(priv->output, TRUE); - priv->output = NULL; - } + nm_str_buf_destroy(&priv->input_buf); + nm_str_buf_destroy(&priv->output_buf); g_clear_object(&priv->platform); nm_clear_pointer(&priv->bridges, g_hash_table_destroy);