ovsdb: use the FD directly instead of GSocketConnection/GOutputStream

GSocketConnection/GOutputStream/GInputStream seems rather unnecessary.
Maybe they make sense when you want to write portable code (for
Windows). Otherwise, watching a file descriptor and reading/writing it
directly is simpler (and also more efficient).

For example, we passed no GCancellable to g_input_stream_read_async().
What does that mean w.r.t. destroying the NMOvsdb instance? I suspect
it's wrong, but it's hard to say, because there are so many layers of
code.

Note that we anyway keep state in NMOvsdb, namely the data we want to
send (output_buf) and the data we partially received (input_buf). All we
need, are poll notifications when the file descriptor is ready. To
those, we hook up the read/write callbacks. Also before was the code
async, and there were callbacks when when read/write was done. That does
not simplify the code in any way.

- we no longer use separate NMOvsdbPrivate.buf and NMOvsdbPrivate.input
  buffers. There is just a NMOvsdbPrivate.input_buf that can we can fill
  directly.
This commit is contained in:
Thomas Haller 2023-03-30 20:09:35 +02:00
parent f862d4bbce
commit 7e12d437fe
No known key found for this signature in database
GPG key ID: 29C2366E4DFC5728

View file

@ -12,6 +12,7 @@
#include "libnm-glib-aux/nm-jansson.h"
#include "libnm-glib-aux/nm-str-buf.h"
#include "libnm-glib-aux/nm-io-utils.h"
#include "nm-core-utils.h"
#include "libnm-core-intern/nm-core-internal.h"
#include "devices/nm-device.h"
@ -134,13 +135,16 @@ enum {
static guint signals[LAST_SIGNAL] = {0};
typedef struct {
NMPlatform *platform;
GSocketConnection *conn;
GCancellable *conn_cancellable;
char buf[4096]; /* Input buffer */
GString *input; /* JSON stream waiting for decoding. */
GString *output; /* JSON stream to be sent. */
guint64 call_id_counter;
NMPlatform *platform;
int conn_fd;
GSource *conn_fd_in_source;
GSource *conn_fd_out_source;
GCancellable *conn_cancellable;
NMStrBuf input_buf;
NMStrBuf output_buf;
guint64 call_id_counter;
CList calls_lst_head;
@ -176,12 +180,13 @@ NM_DEFINE_SINGLETON_GETTER(NMOvsdb, nm_ovsdb_get, NM_TYPE_OVSDB);
/*****************************************************************************/
static void ovsdb_try_connect(NMOvsdb *self);
static void ovsdb_disconnect(NMOvsdb *self, gboolean retry, gboolean is_disposing);
static void ovsdb_read(NMOvsdb *self);
static void ovsdb_write(NMOvsdb *self);
static void ovsdb_next_command(NMOvsdb *self);
static void cleanup_check_ready(NMOvsdb *self);
static void ovsdb_try_connect(NMOvsdb *self);
static void ovsdb_disconnect(NMOvsdb *self, gboolean retry, gboolean is_disposing);
static void ovsdb_read(NMOvsdb *self);
static void ovsdb_write_try(NMOvsdb *self);
static gboolean ovsdb_write_cb(int fd, GIOCondition condition, gpointer user_data);
static void ovsdb_next_command(NMOvsdb *self);
static void cleanup_check_ready(NMOvsdb *self);
/*****************************************************************************/
@ -1448,7 +1453,7 @@ ovsdb_next_command(NMOvsdb *self)
nm_auto_free char *cmd = NULL;
nm_auto_decref_json json_t *msg = NULL;
if (!priv->conn)
if (priv->conn_fd < 0)
return;
if (c_list_is_empty(&priv->calls_lst_head))
@ -1585,9 +1590,9 @@ ovsdb_next_command(NMOvsdb *self)
cmd = json_dumps(msg, 0);
_LOGT_call(call, "send: call-id=%" G_GUINT64_FORMAT ", %s", call->call_id, cmd);
g_string_append(priv->output, cmd);
nm_str_buf_append(&priv->output_buf, cmd);
ovsdb_write(self);
ovsdb_write_try(self);
}
/**
@ -2189,16 +2194,12 @@ ovsdb_got_echo(NMOvsdb *self, json_int_t id, json_t *data)
NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
nm_auto_decref_json json_t *msg = NULL;
nm_auto_free char *reply = NULL;
gboolean output_was_empty;
output_was_empty = priv->output->len == 0;
msg = json_pack("{s:I, s:O}", "id", id, "result", data);
reply = json_dumps(msg, 0);
g_string_append(priv->output, reply);
nm_str_buf_append(&priv->output_buf, reply);
if (output_was_empty)
ovsdb_write(self);
ovsdb_write_try(self);
}
/**
@ -2302,7 +2303,7 @@ ovsdb_got_msg(NMOvsdb *self, json_t *msg)
/* Don't progress further commands in case the callback hit an error
* and disconnected us. */
if (!priv->conn)
if (priv->conn_fd < 0)
return;
/* Now we're free to serialize and send the next command, if any. */
@ -2318,8 +2319,8 @@ ovsdb_got_msg(NMOvsdb *self, json_t *msg)
/*****************************************************************************/
typedef struct {
gsize bufp;
GString *input;
gsize bufp;
NMStrBuf *input;
} JsonReadMsgData;
/* Lower level marshalling and demarshalling of the JSON-RPC traffic on the
@ -2339,13 +2340,13 @@ _json_read_msg_cb(void *buffer, size_t buflen, void *user_data)
}
/* Pass one more byte to the JSON decoder. */
*(char *) buffer = data->input->str[data->bufp];
*(char *) buffer = nm_str_buf_get_char(data->input, data->bufp);
data->bufp++;
return 1;
}
static json_t *
_json_read_msg(GString *input)
_json_read_msg(NMStrBuf *input)
{
JsonReadMsgData data = {
.bufp = 0,
@ -2364,114 +2365,112 @@ _json_read_msg(GString *input)
return NULL;
nm_assert(data.bufp > 0);
g_string_erase(input, 0, data.bufp);
nm_str_buf_erase(input, 0, data.bufp, FALSE);
return msg;
}
/**
* ovsdb_read_cb:
*
* Read out the data available from the ovsdb socket and try to deserialize
* the JSON. If we see a complete object, pass it upwards to ovsdb_got_msg().
*/
static void
ovsdb_read_cb(GObject *source_object, GAsyncResult *res, gpointer user_data)
{
NMOvsdb *self = NM_OVSDB(user_data);
NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
GInputStream *stream = G_INPUT_STREAM(source_object);
GError *error = NULL;
gssize size;
size = g_input_stream_read_finish(stream, res, &error);
if (size == -1) {
/* ovsdb-server was possibly restarted */
_LOGW("short read from ovsdb: %s", error->message);
priv->num_failures++;
g_clear_error(&error);
ovsdb_disconnect(self, priv->num_failures <= OVSDB_MAX_FAILURES, FALSE);
return;
}
g_string_append_len(priv->input, priv->buf, size);
while (TRUE) {
nm_auto_decref_json json_t *msg = NULL;
msg = _json_read_msg(priv->input);
if (!msg)
break;
ovsdb_got_msg(self, msg);
}
if (!priv->conn)
return;
if (size)
ovsdb_read(self);
}
static void
ovsdb_read(NMOvsdb *self)
{
NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
g_input_stream_read_async(g_io_stream_get_input_stream(G_IO_STREAM(priv->conn)),
priv->buf,
sizeof(priv->buf),
G_PRIORITY_DEFAULT,
NULL,
ovsdb_read_cb,
self);
}
static void
ovsdb_write_cb(GObject *source_object, GAsyncResult *res, gpointer user_data)
{
GOutputStream *stream = G_OUTPUT_STREAM(source_object);
NMOvsdb *self = NM_OVSDB(user_data);
NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
GError *error = NULL;
gssize size;
size = g_output_stream_write_finish(stream, res, &error);
if (size == -1) {
again:
size = nm_utils_fd_read(priv->conn_fd, &priv->input_buf);
if (size <= 0) {
if (size == -EAGAIN)
return;
/* ovsdb-server was possibly restarted */
_LOGW("short write to ovsdb: %s", error->message);
_LOGW("short read from ovsdb: %s", nm_strerror_native(-size));
priv->num_failures++;
g_clear_error(&error);
ovsdb_disconnect(self, priv->num_failures <= OVSDB_MAX_FAILURES, FALSE);
return;
}
if (!priv->conn)
return;
nm_assert(priv->input_buf.len > 0);
g_string_erase(priv->output, 0, size);
while (TRUE) {
nm_auto_decref_json json_t *msg = NULL;
ovsdb_write(self);
msg = _json_read_msg(&priv->input_buf);
if (!msg)
break;
ovsdb_got_msg(self, msg);
if (priv->input_buf.len == 0)
break;
}
if (priv->input_buf.len > 0) {
/* We have an incomplete message in the message buffer. Don't wait for another round
* of "poll", instead try to read it again. */
goto again;
}
}
static gboolean
ovsdb_read_cb(int fd, GIOCondition condition, gpointer user_data)
{
ovsdb_read(user_data);
return G_SOURCE_CONTINUE;
}
static void
ovsdb_write(NMOvsdb *self)
{
NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
GOutputStream *stream;
gssize n;
if (!priv->output->len)
again:
if (priv->output_buf.len == 0) {
nm_clear_g_source_inst(&priv->conn_fd_out_source);
return;
}
stream = g_io_stream_get_output_stream(G_IO_STREAM(priv->conn));
if (g_output_stream_has_pending(stream))
n = write(priv->conn_fd,
nm_str_buf_get_str_at_unsafe(&priv->output_buf, 0),
priv->output_buf.len);
if (n < 0)
n = -NM_ERRNO_NATIVE(errno);
if (n == -EAGAIN) {
if (!priv->conn_fd_out_source) {
priv->conn_fd_out_source =
nm_g_unix_fd_add_source(priv->conn_fd, G_IO_OUT, ovsdb_write_cb, self);
}
return;
}
g_output_stream_write_async(stream,
priv->output->str,
priv->output->len,
G_PRIORITY_DEFAULT,
NULL,
ovsdb_write_cb,
self);
if (n <= 0) {
/* ovsdb-server was possibly restarted */
_LOGW("short write to ovsdb: %s", nm_strerror_native(-n));
priv->num_failures++;
ovsdb_disconnect(self, priv->num_failures <= OVSDB_MAX_FAILURES, FALSE);
return;
}
nm_str_buf_erase(&priv->output_buf, 0, n, FALSE);
goto again;
}
static void
ovsdb_write_try(NMOvsdb *self)
{
NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
if (priv->conn_fd >= 0 && !priv->conn_fd_out_source)
ovsdb_write(self);
}
static gboolean
ovsdb_write_cb(int fd, GIOCondition condition, gpointer user_data)
{
ovsdb_write(user_data);
return G_SOURCE_CONTINUE;
}
/*****************************************************************************/
@ -2494,7 +2493,7 @@ ovsdb_disconnect(NMOvsdb *self, gboolean retry, gboolean is_disposing)
nm_assert(!retry || !is_disposing);
if (!priv->conn && !priv->conn_cancellable)
if (priv->conn_fd < 0 && !priv->conn_cancellable)
return;
_LOGD("disconnecting from ovsdb, retry %d", retry);
@ -2518,9 +2517,11 @@ ovsdb_disconnect(NMOvsdb *self, gboolean retry, gboolean is_disposing)
_call_complete(call, NULL, error);
}
g_string_truncate(priv->input, 0);
g_string_truncate(priv->output, 0);
g_clear_object(&priv->conn);
nm_str_buf_reset(&priv->input_buf);
nm_str_buf_reset(&priv->output_buf);
nm_clear_fd(&priv->conn_fd);
nm_clear_g_source_inst(&priv->conn_fd_in_source);
nm_clear_g_source_inst(&priv->conn_fd_out_source);
nm_clear_g_free(&priv->db_uuid);
nm_clear_g_cancellable(&priv->conn_cancellable);
@ -2721,15 +2722,12 @@ _ovsdb_connect_complete_with_fd(NMOvsdb *self, int fd_take)
gs_unref_object GSocket *socket = NULL;
gs_free_error GError *error = NULL;
socket = g_socket_new_from_fd(nm_steal_fd(&fd_take), &error);
if (!socket) {
_LOGT("connect: failure to open socket for new FD: %s", error->message);
ovsdb_disconnect(self, FALSE, FALSE);
return;
}
nm_clear_g_cancellable(&priv->conn_cancellable);
priv->conn = g_socket_connection_factory_create_connection(socket);
g_clear_object(&priv->conn_cancellable);
nm_io_fcntl_setfl_update_nonblock(fd_take);
priv->conn_fd = nm_steal_fd(&fd_take);
priv->conn_fd_in_source = nm_g_unix_fd_add_source(priv->conn_fd, G_IO_IN, ovsdb_read_cb, self);
ovsdb_read(self);
ovsdb_next_command(self);
@ -2803,7 +2801,7 @@ ovsdb_try_connect(NMOvsdb *self)
{
NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
if (priv->conn || priv->conn_cancellable)
if (priv->conn_fd >= 0 || priv->conn_cancellable)
return;
_LOGT("connect: start connecting socket %s on idle", NM_OVSDB_SOCKET);
@ -2983,11 +2981,15 @@ nm_ovsdb_init(NMOvsdb *self)
{
NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
priv->conn_fd = -1;
priv->input_buf = NM_STR_BUF_INIT(0, FALSE);
priv->output_buf = NM_STR_BUF_INIT(0, FALSE);
c_list_init(&priv->calls_lst_head);
priv->platform = g_object_ref(NM_PLATFORM_GET);
priv->input = g_string_new(NULL);
priv->output = g_string_new(NULL);
priv->bridges =
g_hash_table_new_full(nm_pstr_hash, nm_pstr_equal, (GDestroyNotify) _free_bridge, NULL);
priv->ports =
@ -3008,14 +3010,8 @@ dispose(GObject *object)
nm_assert(c_list_is_empty(&priv->calls_lst_head));
if (priv->input) {
g_string_free(priv->input, TRUE);
priv->input = NULL;
}
if (priv->output) {
g_string_free(priv->output, TRUE);
priv->output = NULL;
}
nm_str_buf_destroy(&priv->input_buf);
nm_str_buf_destroy(&priv->output_buf);
g_clear_object(&priv->platform);
nm_clear_pointer(&priv->bridges, g_hash_table_destroy);