glib-aux,cloud-setup: merge branch 'th/cloud-setup-utils-poll'

https://gitlab.freedesktop.org/NetworkManager/NetworkManager/-/merge_requests/1566
This commit is contained in:
Thomas Haller 2023-03-13 17:13:19 +01:00
commit 44a3ccccf4
No known key found for this signature in database
GPG key ID: 29C2366E4DFC5728
5 changed files with 307 additions and 287 deletions

View file

@ -21,6 +21,7 @@
#include "c-list/src/c-list.h"
#include "nm-errno.h"
#include "nm-str-buf.h"
#include "nm-time-utils.h"
G_STATIC_ASSERT(sizeof(NMEtherAddr) == 6);
G_STATIC_ASSERT(_nm_alignof(NMEtherAddr) == 1);
@ -6789,3 +6790,263 @@ nm_hostname_is_valid(const char *s, gboolean trailing_dot)
return TRUE;
}
/*****************************************************************************/
typedef struct {
GTask *task;
GSource *source_timeout;
GSource *source_next_poll;
GMainContext *context;
GCancellable *internal_cancellable;
NMUtilsPollProbeStartFcn probe_start_fcn;
NMUtilsPollProbeFinishFcn probe_finish_fcn;
gpointer probe_user_data;
gulong cancellable_id;
gint64 last_poll_start_ms;
int sleep_timeout_ms;
int ratelimit_timeout_ms;
bool completed : 1;
} PollTaskData;
static void
_poll_task_data_free(gpointer data)
{
PollTaskData *poll_task_data = data;
nm_assert(G_IS_TASK(poll_task_data->task));
nm_assert(!poll_task_data->source_next_poll);
nm_assert(!poll_task_data->source_timeout);
nm_assert(poll_task_data->cancellable_id == 0);
g_main_context_unref(poll_task_data->context);
nm_g_slice_free(poll_task_data);
}
static void
_poll_return(PollTaskData *poll_task_data, GError *error_take)
{
nm_clear_g_source_inst(&poll_task_data->source_next_poll);
nm_clear_g_source_inst(&poll_task_data->source_timeout);
nm_clear_g_cancellable_disconnect(g_task_get_cancellable(poll_task_data->task),
&poll_task_data->cancellable_id);
nm_clear_g_cancellable(&poll_task_data->internal_cancellable);
if (error_take)
g_task_return_error(poll_task_data->task, g_steal_pointer(&error_take));
else
g_task_return_boolean(poll_task_data->task, TRUE);
g_object_unref(poll_task_data->task);
}
static gboolean _poll_start_cb(gpointer user_data);
static void
_poll_done_cb(GObject *source, GAsyncResult *result, gpointer user_data)
{
PollTaskData *poll_task_data = user_data;
_nm_unused gs_unref_object GTask *task =
poll_task_data->task; /* balance ref from _poll_start_cb() */
gs_free_error GError *error = NULL;
gint64 now_ms;
gint64 wait_ms;
gboolean is_finished;
is_finished =
poll_task_data->probe_finish_fcn(source, result, poll_task_data->probe_user_data, &error);
if (nm_utils_error_is_cancelled(error)) {
/* we already handle this differently. Nothing to do. */
return;
}
if (error || is_finished) {
_poll_return(poll_task_data, g_steal_pointer(&error));
return;
}
now_ms = nm_utils_get_monotonic_timestamp_msec();
if (poll_task_data->ratelimit_timeout_ms > 0)
wait_ms =
(poll_task_data->last_poll_start_ms + poll_task_data->ratelimit_timeout_ms) - now_ms;
else
wait_ms = 0;
if (poll_task_data->sleep_timeout_ms > 0)
wait_ms = MAX(wait_ms, poll_task_data->sleep_timeout_ms);
poll_task_data->source_next_poll =
nm_g_source_attach(nm_g_timeout_source_new(MAX(1, wait_ms),
G_PRIORITY_DEFAULT,
_poll_start_cb,
poll_task_data,
NULL),
poll_task_data->context);
}
static gboolean
_poll_start_cb(gpointer user_data)
{
PollTaskData *poll_task_data = user_data;
nm_clear_g_source_inst(&poll_task_data->source_next_poll);
poll_task_data->last_poll_start_ms = nm_utils_get_monotonic_timestamp_msec();
g_object_ref(poll_task_data->task); /* balanced by _poll_done_cb() */
poll_task_data->probe_start_fcn(poll_task_data->internal_cancellable,
poll_task_data->probe_user_data,
_poll_done_cb,
poll_task_data);
return G_SOURCE_CONTINUE;
}
static gboolean
_poll_timeout_cb(gpointer user_data)
{
PollTaskData *poll_task_data = user_data;
_poll_return(poll_task_data, nm_utils_error_new(NM_UTILS_ERROR_UNKNOWN, "timeout expired"));
return G_SOURCE_CONTINUE;
}
static void
_poll_cancelled_cb(GObject *object, gpointer user_data)
{
PollTaskData *poll_task_data = user_data;
GError *error = NULL;
nm_clear_g_signal_handler(g_task_get_cancellable(poll_task_data->task),
&poll_task_data->cancellable_id);
nm_utils_error_set_cancelled(&error, FALSE, NULL);
_poll_return(poll_task_data, error);
}
/**
* nm_utils_poll:
* @poll_timeout_ms: if >= 0, then this is the overall timeout for how long we poll.
* When this timeout expires, the request completes with failure (and error set).
* @ratelimit_timeout_ms: if > 0, we ratelimit the starts from one prope_start_fcn
* call to the next. We will wait at least this time between two consecutive polls.
* @sleep_timeout_ms: if > 0, then we wait after a probe finished this timeout
* before the next. Together with @ratelimit_timeout_ms this determines how
* frequently we probe. We will wait at least this time between the end of the
* previous poll and the next one.
* @probe_register_object_fcn: (allow-none): called by nm_utils_poll()
* synchronously, with the new, internal GTask instance. The purpose of this
* callback is a bit obscure, you may want to pass NULL here. It's used by some
* caller to register a weak pointer on the internal GTask instance to track
* the lifetime of the operation.
* @probe_start_fcn: used to start a (asynchronous) probe. A probe must be
* completed by calling the provided callback. While a probe is in progress, we
* will not start another. The function is called the first time on an idle
* handler, afterwards it gets called again on each timeout for polling.
* @probe_finish_fcn: will be called from the callback of @probe_start_fcn. If the
* function returns %TRUE (polling done) or an error, polling stops. Otherwise,
* another poll will be started.
* @probe_user_data: user_data for the probe functions.
* @cancellable: cancellable for polling.
* @callback: when polling completes.
* @user_data: for @callback.
*
* This uses the current g_main_context_get_thread_default() for scheduling
* actions.
*/
void
nm_utils_poll(int poll_timeout_ms,
int ratelimit_timeout_ms,
int sleep_timeout_ms,
NMUtilsPollProbeRegisterObjectFcn probe_register_object_fcn,
NMUtilsPollProbeStartFcn probe_start_fcn,
NMUtilsPollProbeFinishFcn probe_finish_fcn,
gpointer probe_user_data,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
PollTaskData *poll_task_data;
poll_task_data = g_slice_new(PollTaskData);
*poll_task_data = (PollTaskData){
.task = nm_g_task_new(NULL, cancellable, nm_utils_poll, callback, user_data),
.probe_start_fcn = probe_start_fcn,
.probe_finish_fcn = probe_finish_fcn,
.probe_user_data = probe_user_data,
.completed = FALSE,
.context = g_main_context_ref_thread_default(),
.sleep_timeout_ms = sleep_timeout_ms,
.ratelimit_timeout_ms = ratelimit_timeout_ms,
.internal_cancellable = g_cancellable_new(),
};
g_task_set_task_data(poll_task_data->task, poll_task_data, _poll_task_data_free);
if (probe_register_object_fcn)
probe_register_object_fcn((GObject *) poll_task_data->task, probe_user_data);
if (poll_timeout_ms >= 0) {
poll_task_data->source_timeout =
nm_g_source_attach(nm_g_timeout_source_new(poll_timeout_ms,
G_PRIORITY_DEFAULT,
_poll_timeout_cb,
poll_task_data,
NULL),
poll_task_data->context);
}
poll_task_data->source_next_poll = nm_g_source_attach(
nm_g_idle_source_new(G_PRIORITY_DEFAULT_IDLE, _poll_start_cb, poll_task_data, NULL),
poll_task_data->context);
if (cancellable) {
gulong signal_id;
signal_id = g_cancellable_connect(cancellable,
G_CALLBACK(_poll_cancelled_cb),
poll_task_data,
NULL);
if (signal_id == 0) {
/* the request is already cancelled. Return. */
return;
}
poll_task_data->cancellable_id = signal_id;
}
}
/**
* nm_utils_poll_finish:
* @result: the GAsyncResult from the GAsyncReadyCallback callback.
* @probe_user_data: the user data provided to nm_utils_poll().
* @error: the failure code.
*
* Returns: %TRUE if the polling completed with success. In that case,
* the error won't be set.
* If the request was cancelled, this is indicated by @error and
* %FALSE will be returned.
* If the probe returned a failure, this returns %FALSE and the error
* provided by @probe_finish_fcn.
* If the request times out, this returns %FALSE with error set.
* Error is always set if (and only if) the function returns %FALSE.
*/
gboolean
nm_utils_poll_finish(GAsyncResult *result, gpointer *probe_user_data, GError **error)
{
GTask *task;
PollTaskData *poll_task_data;
g_return_val_if_fail(nm_g_task_is_valid(result, NULL, nm_utils_poll), FALSE);
g_return_val_if_fail(!error || !*error, FALSE);
task = G_TASK(result);
if (probe_user_data) {
poll_task_data = g_task_get_task_data(task);
NM_SET_OUT(probe_user_data, poll_task_data->probe_user_data);
}
return g_task_propagate_boolean(task, error);
}

View file

@ -3239,4 +3239,31 @@ nm_path_startswith(const char *path, const char *prefix)
gboolean nm_hostname_is_valid(const char *s, gboolean trailing_dot);
/*****************************************************************************/
typedef void (*NMUtilsPollProbeRegisterObjectFcn)(GObject *object, gpointer user_data);
typedef void (*NMUtilsPollProbeStartFcn)(GCancellable *cancellable,
gpointer probe_user_data,
GAsyncReadyCallback callback,
gpointer user_data);
typedef gboolean (*NMUtilsPollProbeFinishFcn)(GObject *source,
GAsyncResult *result,
gpointer probe_user_data,
GError **error);
void nm_utils_poll(int poll_timeout_ms,
int ratelimit_timeout_ms,
int sleep_timeout_ms,
NMUtilsPollProbeRegisterObjectFcn probe_register_object_fcn,
NMUtilsPollProbeStartFcn probe_start_fcn,
NMUtilsPollProbeFinishFcn probe_finish_fcn,
gpointer probe_user_data,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data);
gboolean nm_utils_poll_finish(GAsyncResult *result, gpointer *probe_user_data, GError **error);
#endif /* __NM_SHARED_UTILS_H__ */

View file

@ -166,258 +166,6 @@ nmcs_wait_for_objects_iterate_until_done(GMainContext *context, int timeout_msec
/*****************************************************************************/
typedef struct {
GTask *task;
GSource *source_timeout;
GSource *source_next_poll;
GMainContext *context;
GCancellable *internal_cancellable;
NMCSUtilsPollProbeStartFcn probe_start_fcn;
NMCSUtilsPollProbeFinishFcn probe_finish_fcn;
gpointer probe_user_data;
gulong cancellable_id;
gint64 last_poll_start_ms;
int sleep_timeout_ms;
int ratelimit_timeout_ms;
bool completed : 1;
} PollTaskData;
static void
_poll_task_data_free(gpointer data)
{
PollTaskData *poll_task_data = data;
nm_assert(G_IS_TASK(poll_task_data->task));
nm_assert(!poll_task_data->source_next_poll);
nm_assert(!poll_task_data->source_timeout);
nm_assert(poll_task_data->cancellable_id == 0);
g_main_context_unref(poll_task_data->context);
nm_g_slice_free(poll_task_data);
}
static void
_poll_return(PollTaskData *poll_task_data, GError *error_take)
{
nm_clear_g_source_inst(&poll_task_data->source_next_poll);
nm_clear_g_source_inst(&poll_task_data->source_timeout);
nm_clear_g_cancellable_disconnect(g_task_get_cancellable(poll_task_data->task),
&poll_task_data->cancellable_id);
nm_clear_g_cancellable(&poll_task_data->internal_cancellable);
if (error_take)
g_task_return_error(poll_task_data->task, g_steal_pointer(&error_take));
else
g_task_return_boolean(poll_task_data->task, TRUE);
g_object_unref(poll_task_data->task);
}
static gboolean _poll_start_cb(gpointer user_data);
static void
_poll_done_cb(GObject *source, GAsyncResult *result, gpointer user_data)
{
PollTaskData *poll_task_data = user_data;
_nm_unused gs_unref_object GTask *task =
poll_task_data->task; /* balance ref from _poll_start_cb() */
gs_free_error GError *error = NULL;
gint64 now_ms;
gint64 wait_ms;
gboolean is_finished;
is_finished =
poll_task_data->probe_finish_fcn(source, result, poll_task_data->probe_user_data, &error);
if (nm_utils_error_is_cancelled(error)) {
/* we already handle this differently. Nothing to do. */
return;
}
if (error || is_finished) {
_poll_return(poll_task_data, g_steal_pointer(&error));
return;
}
now_ms = nm_utils_get_monotonic_timestamp_msec();
if (poll_task_data->ratelimit_timeout_ms > 0)
wait_ms =
(poll_task_data->last_poll_start_ms + poll_task_data->ratelimit_timeout_ms) - now_ms;
else
wait_ms = 0;
if (poll_task_data->sleep_timeout_ms > 0)
wait_ms = MAX(wait_ms, poll_task_data->sleep_timeout_ms);
poll_task_data->source_next_poll =
nm_g_source_attach(nm_g_timeout_source_new(MAX(1, wait_ms),
G_PRIORITY_DEFAULT,
_poll_start_cb,
poll_task_data,
NULL),
poll_task_data->context);
}
static gboolean
_poll_start_cb(gpointer user_data)
{
PollTaskData *poll_task_data = user_data;
nm_clear_g_source_inst(&poll_task_data->source_next_poll);
poll_task_data->last_poll_start_ms = nm_utils_get_monotonic_timestamp_msec();
g_object_ref(poll_task_data->task); /* balanced by _poll_done_cb() */
poll_task_data->probe_start_fcn(poll_task_data->internal_cancellable,
poll_task_data->probe_user_data,
_poll_done_cb,
poll_task_data);
return G_SOURCE_CONTINUE;
}
static gboolean
_poll_timeout_cb(gpointer user_data)
{
PollTaskData *poll_task_data = user_data;
_poll_return(poll_task_data, nm_utils_error_new(NM_UTILS_ERROR_UNKNOWN, "timeout expired"));
return G_SOURCE_CONTINUE;
}
static void
_poll_cancelled_cb(GObject *object, gpointer user_data)
{
PollTaskData *poll_task_data = user_data;
GError *error = NULL;
nm_clear_g_signal_handler(g_task_get_cancellable(poll_task_data->task),
&poll_task_data->cancellable_id);
nm_utils_error_set_cancelled(&error, FALSE, NULL);
_poll_return(poll_task_data, error);
}
/**
* nmcs_utils_poll:
* @poll_timeout_ms: if >= 0, then this is the overall timeout for how long we poll.
* When this timeout expires, the request completes with failure (and error set).
* @ratelimit_timeout_ms: if > 0, we ratelimit the starts from one prope_start_fcn
* call to the next.
* @sleep_timeout_ms: if > 0, then we wait after a probe finished this timeout
* before the next. Together with @ratelimit_timeout_ms this determines how
* frequently we probe.
* @probe_start_fcn: used to start a (asynchronous) probe. A probe must be completed
* by calling the provided callback. While a probe is in progress, we will not
* start another. This function is already invoked the first time synchronously,
* during nmcs_utils_poll().
* @probe_finish_fcn: will be called from the callback of @probe_start_fcn. If the
* function returns %TRUE (polling done) or an error, polling stops. Otherwise,
* another poll will be started.
* @probe_user_data: user_data for the probe functions.
* @cancellable: cancellable for polling.
* @callback: when polling completes.
* @user_data: for @callback.
*
* This uses the current g_main_context_get_thread_default() for scheduling
* actions.
*/
void
nmcs_utils_poll(int poll_timeout_ms,
int ratelimit_timeout_ms,
int sleep_timeout_ms,
NMCSUtilsPollProbeStartFcn probe_start_fcn,
NMCSUtilsPollProbeFinishFcn probe_finish_fcn,
gpointer probe_user_data,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
PollTaskData *poll_task_data;
poll_task_data = g_slice_new(PollTaskData);
*poll_task_data = (PollTaskData){
.task = nm_g_task_new(NULL, cancellable, nmcs_utils_poll, callback, user_data),
.probe_start_fcn = probe_start_fcn,
.probe_finish_fcn = probe_finish_fcn,
.probe_user_data = probe_user_data,
.completed = FALSE,
.context = g_main_context_ref_thread_default(),
.sleep_timeout_ms = sleep_timeout_ms,
.ratelimit_timeout_ms = ratelimit_timeout_ms,
.internal_cancellable = g_cancellable_new(),
};
nmcs_wait_for_objects_register(poll_task_data->task);
g_task_set_task_data(poll_task_data->task, poll_task_data, _poll_task_data_free);
if (poll_timeout_ms >= 0) {
poll_task_data->source_timeout =
nm_g_source_attach(nm_g_timeout_source_new(poll_timeout_ms,
G_PRIORITY_DEFAULT,
_poll_timeout_cb,
poll_task_data,
NULL),
poll_task_data->context);
}
poll_task_data->source_next_poll = nm_g_source_attach(
nm_g_idle_source_new(G_PRIORITY_DEFAULT_IDLE, _poll_start_cb, poll_task_data, NULL),
poll_task_data->context);
if (cancellable) {
gulong signal_id;
signal_id = g_cancellable_connect(cancellable,
G_CALLBACK(_poll_cancelled_cb),
poll_task_data,
NULL);
if (signal_id == 0) {
/* the request is already cancelled. Return. */
return;
}
poll_task_data->cancellable_id = signal_id;
}
}
/**
* nmcs_utils_poll_finish:
* @result: the GAsyncResult from the GAsyncReadyCallback callback.
* @probe_user_data: the user data provided to nmcs_utils_poll().
* @error: the failure code.
*
* Returns: %TRUE if the polling completed with success. In that case,
* the error won't be set.
* If the request was cancelled, this is indicated by @error and
* %FALSE will be returned.
* If the probe returned a failure, this returns %FALSE and the error
* provided by @probe_finish_fcn.
* If the request times out, this returns %FALSE with error set.
* Error is always set if (and only if) the function returns %FALSE.
*/
gboolean
nmcs_utils_poll_finish(GAsyncResult *result, gpointer *probe_user_data, GError **error)
{
GTask *task;
PollTaskData *poll_task_data;
g_return_val_if_fail(nm_g_task_is_valid(result, NULL, nmcs_utils_poll), FALSE);
g_return_val_if_fail(!error || !*error, FALSE);
task = G_TASK(result);
if (probe_user_data) {
poll_task_data = g_task_get_task_data(task);
NM_SET_OUT(probe_user_data, poll_task_data->probe_user_data);
}
return g_task_propagate_boolean(task, error);
}
/*****************************************************************************/
char *
nmcs_utils_hwaddr_normalize(const char *hwaddr, gssize len)
{

View file

@ -40,30 +40,6 @@ gboolean nmcs_wait_for_objects_iterate_until_done(GMainContext *context, int tim
/*****************************************************************************/
typedef void (*NMCSUtilsPollProbeStartFcn)(GCancellable *cancellable,
gpointer probe_user_data,
GAsyncReadyCallback callback,
gpointer user_data);
typedef gboolean (*NMCSUtilsPollProbeFinishFcn)(GObject *source,
GAsyncResult *result,
gpointer probe_user_data,
GError **error);
void nmcs_utils_poll(int poll_timeout_ms,
int ratelimit_timeout_ms,
int sleep_timeout_ms,
NMCSUtilsPollProbeStartFcn probe_start_fcn,
NMCSUtilsPollProbeFinishFcn probe_finish_fcn,
gpointer probe_user_data,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data);
gboolean nmcs_utils_poll_finish(GAsyncResult *result, gpointer *probe_user_data, GError **error);
/*****************************************************************************/
char *nmcs_utils_hwaddr_normalize(const char *hwaddr, gssize len);
static inline char *

View file

@ -429,6 +429,12 @@ _poll_req_data_free(gpointer data)
nm_g_slice_free(poll_req_data);
}
static void
_poll_reg_probe_register_object_fcn(GObject *object, gpointer user_data)
{
nmcs_wait_for_objects_register(object);
}
static void
_poll_req_probe_start_fcn(GCancellable *cancellable,
gpointer probe_user_data,
@ -508,13 +514,14 @@ _poll_req_probe_finish_fcn(GObject *source,
static void
_poll_req_done_cb(GObject *source, GAsyncResult *result, gpointer user_data)
{
PollReqData *poll_req_data = user_data;
PollReqData *poll_req_data = NULL;
gs_free_error GError *error = NULL;
gboolean success;
success = nmcs_utils_poll_finish(result, NULL, &error);
success = nm_utils_poll_finish(result, (gpointer *) &poll_req_data, &error);
nm_assert((!!success) == (!error));
nm_assert(poll_req_data);
if (error)
g_task_return_error(poll_req_data->task, g_steal_pointer(&error));
@ -574,15 +581,16 @@ nm_http_client_poll_req(NMHttpClient *self,
context =
nm_g_main_context_push_thread_default_if_necessary(nm_http_client_get_main_context(self));
nmcs_utils_poll(poll_timeout_ms,
ratelimit_timeout_ms,
0,
_poll_req_probe_start_fcn,
_poll_req_probe_finish_fcn,
poll_req_data,
cancellable,
_poll_req_done_cb,
poll_req_data);
nm_utils_poll(poll_timeout_ms,
ratelimit_timeout_ms,
0,
_poll_reg_probe_register_object_fcn,
_poll_req_probe_start_fcn,
_poll_req_probe_finish_fcn,
poll_req_data,
cancellable,
_poll_req_done_cb,
NULL);
}
gboolean