From 98463b689b9bffc912f719aa20ea28c8df0fe412 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 21 Feb 2019 12:14:25 +0100 Subject: [PATCH] node: improve pending results Make a special function for pending results to make it clear that it is different from normal results. Don't pass result code to result function, it is not useful because since the callback is called, all must be fine. The spa_pending is removed from the list right before the callback and can thus be freed in the callback. Pass the spa_pending in the pending callback so that extra data can be added that way. Reuse spa_pending objects in link and nodes instead of allocating. We always only have one pending operation and we can cancel any pending previous operation by removing the pending. --- spa/include/spa/node/node.h | 6 +- spa/include/spa/node/utils.h | 15 ++-- spa/include/spa/utils/result.h | 17 ++-- spa/plugins/alsa/alsa-sink.c | 4 +- spa/plugins/alsa/alsa-source.c | 4 +- spa/plugins/alsa/alsa-utils.c | 2 +- spa/plugins/audioconvert/audioconvert.c | 4 +- spa/plugins/audioconvert/channelmix.c | 4 +- spa/plugins/audioconvert/fmtconvert.c | 2 +- spa/plugins/audioconvert/merger.c | 4 +- spa/plugins/audioconvert/resample.c | 2 +- spa/plugins/audioconvert/splitter.c | 4 +- spa/plugins/audiomixer/audiomixer.c | 2 +- spa/plugins/audiotestsrc/audiotestsrc.c | 4 +- spa/plugins/bluez5/a2dp-sink.c | 4 +- spa/plugins/ffmpeg/ffmpeg-dec.c | 2 +- spa/plugins/ffmpeg/ffmpeg-enc.c | 2 +- spa/plugins/test/fakesink.c | 4 +- spa/plugins/test/fakesrc.c | 4 +- spa/plugins/v4l2/v4l2-source.c | 8 +- spa/plugins/v4l2/v4l2-utils.c | 4 +- spa/plugins/videotestsrc/videotestsrc.c | 4 +- spa/plugins/volume/volume.c | 4 +- src/examples/export-sink.c | 2 +- src/examples/export-source.c | 2 +- src/examples/local-v4l2.c | 2 +- src/modules/module-audio-dsp/floatmix.c | 2 +- src/modules/module-client-node/client-node.c | 10 ++- .../module-client-node/client-stream.c | 4 +- src/modules/spa/spa-node.c | 6 +- src/pipewire/link.c | 79 ++++++++----------- src/pipewire/node.c | 63 +++++---------- src/pipewire/stream.c | 2 +- 33 files changed, 126 insertions(+), 156 deletions(-) diff --git a/spa/include/spa/node/node.h b/spa/include/spa/node/node.h index 3c0ef3b32..136772d96 100644 --- a/spa/include/spa/node/node.h +++ b/spa/include/spa/node/node.h @@ -212,15 +212,15 @@ struct spa_node { * The hook is automatically removed after the hook is called. * * \param node a spa_node - * \param res an async return value to wait for + * \param seq an async return value to wait for * \param pending a spa_pending structure * \param func a result callback * \param data data passed to \a func * \return 0 on success * -EINVAL when node is NULL */ - int (*wait) (struct spa_node *node, int res, struct spa_pending *pending, - spa_result_func_t func, void *data); + int (*wait) (struct spa_node *node, int seq, struct spa_pending *pending, + spa_pending_func_t func, void *data); /** * Enumerate the parameters of a node. diff --git a/spa/include/spa/node/utils.h b/spa/include/spa/node/utils.h index 66291efb9..b0adbcdf7 100644 --- a/spa/include/spa/node/utils.h +++ b/spa/include/spa/node/utils.h @@ -37,20 +37,17 @@ struct spa_result_node_enum_params_data { }; static inline int spa_result_func_node_enum_params(void *data, - int seq, int res, const void *result) + uint32_t count, const void *result) { struct spa_result_node_enum_params_data *d = (struct spa_result_node_enum_params_data *)data; const struct spa_result_node_enum_params *r = (const struct spa_result_node_enum_params *)result; - - if (res == 1) { - uint32_t offset = d->builder->state.offset; - spa_pod_builder_raw_padded(d->builder, r->param, SPA_POD_SIZE(r->param)); - d->data.next = r->next; - d->data.param = SPA_MEMBER(d->builder->data, offset, struct spa_pod); - } - return res; + uint32_t offset = d->builder->state.offset; + spa_pod_builder_raw_padded(d->builder, r->param, SPA_POD_SIZE(r->param)); + d->data.next = r->next; + d->data.param = SPA_MEMBER(d->builder->data, offset, struct spa_pod); + return 1; } static inline int spa_node_enum_params_sync(struct spa_node *node, diff --git a/spa/include/spa/utils/result.h b/spa/include/spa/utils/result.h index 56ec6efd2..e32c22931 100644 --- a/spa/include/spa/utils/result.h +++ b/spa/include/spa/utils/result.h @@ -43,15 +43,18 @@ extern "C" { #define SPA_RESULT_ASYNC_SEQ(res) ((res) & SPA_ASYNC_SEQ_MASK) #define SPA_RESULT_RETURN_ASYNC(seq) (SPA_ASYNC_BIT | SPA_RESULT_ASYNC_SEQ(seq)) -typedef int (*spa_result_func_t) (void *data, int seq, int res, const void *result); +typedef int (*spa_result_func_t) (void *data, uint32_t count, const void *result); + +struct spa_pending; + +typedef int (*spa_pending_func_t) (struct spa_pending *pending, const void *result); struct spa_pending { - struct spa_list link; - int seq; - int res; - spa_result_func_t func; - void *data; - void (*removed) (struct spa_pending *pending); + struct spa_list link; /**< link used internally */ + int seq; /**< sequence number of pending result */ + int res; /**< result code of operation */ + spa_pending_func_t func; /**< callback function */ + void *data; /**< extra user data */ }; #ifdef __cplusplus diff --git a/spa/plugins/alsa/alsa-sink.c b/spa/plugins/alsa/alsa-sink.c index 01f8589a0..daeb85d30 100644 --- a/spa/plugins/alsa/alsa-sink.c +++ b/spa/plugins/alsa/alsa-sink.c @@ -159,7 +159,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -437,7 +437,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/alsa/alsa-source.c b/spa/plugins/alsa/alsa-source.c index 518a5b055..ffc09303c 100644 --- a/spa/plugins/alsa/alsa-source.c +++ b/spa/plugins/alsa/alsa-source.c @@ -155,7 +155,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -434,7 +434,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/alsa/alsa-utils.c b/spa/plugins/alsa/alsa-utils.c index e463f8b12..219448cd9 100644 --- a/spa/plugins/alsa/alsa-utils.c +++ b/spa/plugins/alsa/alsa-utils.c @@ -359,7 +359,7 @@ spa_alsa_enum_format(struct state *state, uint32_t start, uint32_t num, if ((res = spa_pod_filter(&b, &result.param, fmt, filter)) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) goto exit; if (++count != num) diff --git a/spa/plugins/audioconvert/audioconvert.c b/spa/plugins/audioconvert/audioconvert.c index 57dcbb21a..16731e094 100644 --- a/spa/plugins/audioconvert/audioconvert.c +++ b/spa/plugins/audioconvert/audioconvert.c @@ -443,7 +443,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -715,7 +715,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/audioconvert/channelmix.c b/spa/plugins/audioconvert/channelmix.c index 075ebcffb..0584e9f53 100644 --- a/spa/plugins/audioconvert/channelmix.c +++ b/spa/plugins/audioconvert/channelmix.c @@ -518,7 +518,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -806,7 +806,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/audioconvert/fmtconvert.c b/spa/plugins/audioconvert/fmtconvert.c index cb1c3682d..638614d65 100644 --- a/spa/plugins/audioconvert/fmtconvert.c +++ b/spa/plugins/audioconvert/fmtconvert.c @@ -489,7 +489,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/audioconvert/merger.c b/spa/plugins/audioconvert/merger.c index 0b694d681..abd403712 100644 --- a/spa/plugins/audioconvert/merger.c +++ b/spa/plugins/audioconvert/merger.c @@ -208,7 +208,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -513,7 +513,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/audioconvert/resample.c b/spa/plugins/audioconvert/resample.c index 507003a96..c88545660 100644 --- a/spa/plugins/audioconvert/resample.c +++ b/spa/plugins/audioconvert/resample.c @@ -450,7 +450,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/audioconvert/splitter.c b/spa/plugins/audioconvert/splitter.c index de617f835..e8e478da0 100644 --- a/spa/plugins/audioconvert/splitter.c +++ b/spa/plugins/audioconvert/splitter.c @@ -206,7 +206,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -510,7 +510,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/audiomixer/audiomixer.c b/spa/plugins/audiomixer/audiomixer.c index b77e44dfb..64d36408e 100644 --- a/spa/plugins/audiomixer/audiomixer.c +++ b/spa/plugins/audiomixer/audiomixer.c @@ -438,7 +438,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/audiotestsrc/audiotestsrc.c b/spa/plugins/audiotestsrc/audiotestsrc.c index fdecbbbf2..6c9672bbb 100644 --- a/spa/plugins/audiotestsrc/audiotestsrc.c +++ b/spa/plugins/audiotestsrc/audiotestsrc.c @@ -238,7 +238,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -665,7 +665,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/bluez5/a2dp-sink.c b/spa/plugins/bluez5/a2dp-sink.c index f0c6a1fd6..cb9b9bc8c 100644 --- a/spa/plugins/bluez5/a2dp-sink.c +++ b/spa/plugins/bluez5/a2dp-sink.c @@ -232,7 +232,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -1050,7 +1050,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/ffmpeg/ffmpeg-dec.c b/spa/plugins/ffmpeg/ffmpeg-dec.c index dad40594c..c36d08365 100644 --- a/spa/plugins/ffmpeg/ffmpeg-dec.c +++ b/spa/plugins/ffmpeg/ffmpeg-dec.c @@ -261,7 +261,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/ffmpeg/ffmpeg-enc.c b/spa/plugins/ffmpeg/ffmpeg-enc.c index a1f058274..90fafa76e 100644 --- a/spa/plugins/ffmpeg/ffmpeg-enc.c +++ b/spa/plugins/ffmpeg/ffmpeg-enc.c @@ -255,7 +255,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/test/fakesink.c b/spa/plugins/test/fakesink.c index e021bc2a2..381206cdb 100644 --- a/spa/plugins/test/fakesink.c +++ b/spa/plugins/test/fakesink.c @@ -147,7 +147,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -491,7 +491,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/test/fakesrc.c b/spa/plugins/test/fakesrc.c index 30783ed57..af126fd19 100644 --- a/spa/plugins/test/fakesrc.c +++ b/spa/plugins/test/fakesrc.c @@ -157,7 +157,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -508,7 +508,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/v4l2/v4l2-source.c b/spa/plugins/v4l2/v4l2-source.c index a789321b8..79bfe4e76 100644 --- a/spa/plugins/v4l2/v4l2-source.c +++ b/spa/plugins/v4l2/v4l2-source.c @@ -146,8 +146,8 @@ struct impl { #include "v4l2-utils.c" -static int impl_node_wait(struct spa_node *node, int res, struct spa_pending *pending, - spa_result_func_t func, void *data) +static int impl_node_wait(struct spa_node *node, int seq, struct spa_pending *pending, + spa_pending_func_t func, void *data) { return -ENOTSUP; } @@ -247,7 +247,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -574,7 +574,7 @@ static int impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/v4l2/v4l2-utils.c b/spa/plugins/v4l2/v4l2-utils.c index 557563f59..6c6f22999 100644 --- a/spa/plugins/v4l2/v4l2-utils.c +++ b/spa/plugins/v4l2/v4l2-utils.c @@ -812,7 +812,7 @@ spa_v4l2_enum_format(struct impl *this, result.param = spa_pod_builder_pop(&b, &f[0]); result.next++; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) goto exit; if (++count != num) @@ -1142,7 +1142,7 @@ spa_v4l2_enum_controls(struct impl *this, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) goto exit; if (++count != num) diff --git a/spa/plugins/videotestsrc/videotestsrc.c b/spa/plugins/videotestsrc/videotestsrc.c index 92adcc812..a939c2ddc 100644 --- a/spa/plugins/videotestsrc/videotestsrc.c +++ b/spa/plugins/videotestsrc/videotestsrc.c @@ -207,7 +207,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -591,7 +591,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/volume/volume.c b/spa/plugins/volume/volume.c index 688dfcde5..84ae5c866 100644 --- a/spa/plugins/volume/volume.c +++ b/spa/plugins/volume/volume.c @@ -183,7 +183,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -434,7 +434,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/src/examples/export-sink.c b/src/examples/export-sink.c index 51d3e3a54..53b06c249 100644 --- a/src/examples/export-sink.c +++ b/src/examples/export-sink.c @@ -291,7 +291,7 @@ static int impl_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/src/examples/export-source.c b/src/examples/export-source.c index 3370aefe0..08465d4a0 100644 --- a/src/examples/export-source.c +++ b/src/examples/export-source.c @@ -262,7 +262,7 @@ static int impl_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/src/examples/local-v4l2.c b/src/examples/local-v4l2.c index 5452f7b8e..538c13a69 100644 --- a/src/examples/local-v4l2.c +++ b/src/examples/local-v4l2.c @@ -185,7 +185,7 @@ static int impl_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/src/modules/module-audio-dsp/floatmix.c b/src/modules/module-audio-dsp/floatmix.c index fc35cf4eb..b57c5d9ec 100644 --- a/src/modules/module-audio-dsp/floatmix.c +++ b/src/modules/module-audio-dsp/floatmix.c @@ -429,7 +429,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index 6efd68240..7748c1a9c 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -389,7 +389,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) != 0) continue; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -529,7 +529,7 @@ impl_node_sync(struct spa_node *node) static int impl_node_wait(struct spa_node *node, int res, struct spa_pending *pending, - spa_result_func_t func, void *data) + spa_pending_func_t func, void *data) { struct node *this; int seq; @@ -705,7 +705,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) continue; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -1256,8 +1256,10 @@ static void client_node_resource_done(void *data, uint32_t seq) spa_list_for_each_safe(p, t, &this->pending_list, link) { if (p->seq == (int) seq) { pw_log_debug("client-node %p: do callback %d", this, p->res); - p->func(p->data, p->res, 0, NULL); spa_list_remove(&p->link); + p->seq = p->res; + p->res = 0; + p->func(p, NULL); count++; } } diff --git a/src/modules/module-client-node/client-stream.c b/src/modules/module-client-node/client-stream.c index b51e7d7b6..5075ff020 100644 --- a/src/modules/module-client-node/client-stream.c +++ b/src/modules/module-client-node/client-stream.c @@ -170,7 +170,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -350,7 +350,7 @@ impl_node_sync(struct spa_node *node) static int impl_node_wait(struct spa_node *node, int seq, struct spa_pending *pending, - spa_result_func_t func, void *data) + spa_pending_func_t func, void *data) { struct node *this; struct impl *impl; diff --git a/src/modules/spa/spa-node.c b/src/modules/spa/spa-node.c index b25a86c6b..9540f3df9 100644 --- a/src/modules/spa/spa-node.c +++ b/src/modules/spa/spa-node.c @@ -99,11 +99,11 @@ static int complete_init(struct impl *impl) return 0; } -static int on_init_done(void *data, int seq, int res, const void *result) +static int on_init_done(struct spa_pending *pending, const void *result) { - struct impl *impl = data; + struct impl *impl = pending->data; struct pw_node *this = impl->this; - pw_log_debug("spa-node %p: init complete event %d %d", this, seq, res); + pw_log_debug("spa-node %p: init complete event %d %d", this, pending->seq, pending->res); return complete_init(impl); } diff --git a/src/pipewire/link.c b/src/pipewire/link.c index d4d3ed89b..2ee7269f3 100644 --- a/src/pipewire/link.c +++ b/src/pipewire/link.c @@ -69,6 +69,9 @@ struct impl { struct spa_io_buffers io; struct pw_node *inode, *onode; + + struct spa_pending input_pending; + struct spa_pending output_pending; }; struct resource_data { @@ -150,24 +153,28 @@ static void pw_link_update_state(struct pw_link *link, enum pw_link_state state, } } -static int complete_output(void *data, int seq, int res, const void *result) +static int complete_output(struct spa_pending *pending, const void *result) { - struct pw_link *this = data; + struct pw_link *this = pending->data; struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); struct pw_node *node = this->output->node; - seq = SPA_RESULT_ASYNC_SEQ(seq); + uint32_t seq = SPA_RESULT_ASYNC_SEQ(pending->seq); + int res = pending->res; pw_log_debug("link %p: node %p async complete %d %d", impl, node, seq, res); + pending->res = 0; pw_work_queue_complete(impl->work, node, seq, res); return 0; } -static int complete_input(void *data, int seq, int res, const void *result) +static int complete_input(struct spa_pending *pending, const void *result) { - struct pw_link *this = data; + struct pw_link *this = pending->data; struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); struct pw_node *node = this->input->node; - seq = SPA_RESULT_ASYNC_SEQ(seq); + uint32_t seq = SPA_RESULT_ASYNC_SEQ(pending->seq); + int res = pending->res; pw_log_debug("link %p: node %p async complete %d %d", impl, node, seq, res); + pending->res = 0; pw_work_queue_complete(impl->work, node, seq, res); return 0; } @@ -204,16 +211,13 @@ static void complete_paused(void *obj, void *data, int res, uint32_t id) } } -static void remove_pending(struct spa_pending *pending) +static struct spa_pending *prepare_pending(struct impl *impl, struct spa_pending *pending) { - free(pending); -} - -static struct spa_pending *make_pending(struct impl *impl) -{ - struct spa_pending *pending; - pending = calloc(1, sizeof(struct spa_pending)); - pending->removed = remove_pending; + if (pending->res != 0) { + pw_log_warn("link %p: remove pending %d", impl, pending->res); + spa_list_remove(&pending->link); + pending->res = 0; + } return pending; } @@ -336,10 +340,9 @@ static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_st } if (SPA_RESULT_IS_ASYNC(res)) { spa_node_wait(output->node->node, res, - make_pending(impl), + prepare_pending(impl, &impl->output_pending), complete_output, this); - pw_work_queue_add(impl->work, output->node, res, complete_ready, &this->rt.out_mix); } @@ -355,7 +358,7 @@ static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_st } if (SPA_RESULT_IS_ASYNC(res2)) { spa_node_wait(input->node->node, res2, - make_pending(impl), + prepare_pending(impl, &impl->input_pending), complete_input, this); @@ -724,7 +727,10 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s goto error; } if (SPA_RESULT_IS_ASYNC(res)) { - //spa_node_sync(output->node->node, res); + spa_node_wait(output->node->node, res, + prepare_pending(impl, &impl->output_pending), + complete_output, + this); pw_work_queue_add(impl->work, output->node, res, complete_paused, &this->rt.out_mix); } @@ -744,7 +750,10 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s goto error; } if (SPA_RESULT_IS_ASYNC(res)) { - //spa_node_sync(input->node->node, res); + spa_node_wait(input->node->node, res, + prepare_pending(impl, &impl->input_pending), + complete_input, + this); pw_work_queue_add(impl->work, input->node, res, complete_paused, &this->rt.in_mix); } @@ -768,7 +777,7 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s } if (SPA_RESULT_IS_ASYNC(res)) { spa_node_wait(output->node->node, res, - make_pending(impl), + prepare_pending(impl, &impl->output_pending), complete_output, this); pw_work_queue_add(impl->work, output->node, res, complete_paused, @@ -791,7 +800,7 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s } if (SPA_RESULT_IS_ASYNC(res)) { spa_node_wait(input->node->node, res, - make_pending(impl), + prepare_pending(impl, &impl->input_pending), complete_input, this); pw_work_queue_add(impl->work, input->node, res, complete_paused, @@ -920,28 +929,6 @@ static void check_states(void *obj, void *user_data, int res, uint32_t id) this, -EBUSY, (pw_work_func_t) check_states, this); } -#if 0 -static void -input_node_async_complete(void *data, uint32_t seq, int res) -{ - struct impl *impl = data; - struct pw_node *node = impl->this.input->node; - - pw_log_debug("link %p: node %p async complete %d %d", impl, node, seq, res); - pw_work_queue_complete(impl->work, node, seq, res); -} - -static void -output_node_async_complete(void *data, uint32_t seq, int res) -{ - struct impl *impl = data; - struct pw_node *node = impl->this.output->node; - - pw_log_debug("link %p: node %p async complete %d %d", impl, node, seq, res); - pw_work_queue_complete(impl->work, node, seq, res); -} -#endif - static void clear_port_buffers(struct pw_link *link, struct pw_port *port) { int res; @@ -972,6 +959,8 @@ static void input_remove(struct pw_link *this, struct pw_port *port) spa_hook_remove(&impl->input_port_listener); spa_hook_remove(&impl->input_node_listener); + prepare_pending(impl, &impl->input_pending); + spa_list_remove(&this->input_link); pw_port_events_link_removed(this->input, this); @@ -991,6 +980,8 @@ static void output_remove(struct pw_link *this, struct pw_port *port) spa_hook_remove(&impl->output_port_listener); spa_hook_remove(&impl->output_node_listener); + prepare_pending(impl, &impl->output_pending); + spa_list_remove(&this->output_link); pw_port_events_link_removed(this->output, this); diff --git a/src/pipewire/node.c b/src/pipewire/node.c index 1361bac7a..76f0535f8 100644 --- a/src/pipewire/node.c +++ b/src/pipewire/node.c @@ -50,7 +50,6 @@ struct impl { struct pw_node this; struct pw_work_queue *work; - bool pause_on_idle; struct spa_graph driver_graph; struct spa_graph_state driver_state; @@ -62,7 +61,9 @@ struct impl { uint32_t next_position; int last_error; - struct spa_pending init_pending; + + struct spa_pending pending_state; + int pause_on_idle:1; }; struct resource_data { @@ -837,33 +838,13 @@ static int node_port_info(void *data, enum spa_direction direction, uint32_t por return 0; } -struct node_pending { - struct spa_pending pending; - struct impl *impl; - enum pw_node_state state; -}; - -static void remove_pending(struct spa_pending *pending) +static int node_complete(struct spa_pending *pending, const void *result) { - free(pending); -} - -static struct node_pending *make_pending(struct impl *impl, enum pw_node_state state) -{ - struct node_pending *pending; - pending = calloc(1, sizeof(struct node_pending)); - pending->pending.removed = remove_pending; - pending->impl = impl; - pending->state = state; - return pending; -} - -static int node_complete(void *data, int seq, int res, const void *result) -{ - struct node_pending *pending = data; - struct impl *impl = pending->impl; - seq = SPA_RESULT_ASYNC_SEQ(seq); - pw_log_debug("node %p: done event %d %u", impl, res, pending->pending.seq); + struct impl *impl = pending->data; + uint32_t seq = SPA_RESULT_ASYNC_SEQ(pending->seq); + int res = pending->res; + pending->res = 0; + pw_log_debug("node %p: done event %d %u", impl, res, pending->seq); impl->last_error = res; pw_work_queue_complete(impl->work, &impl->this, seq, res); return 0; @@ -933,29 +914,17 @@ static const struct spa_node_callbacks node_callbacks = { .reuse_buffer = node_reuse_buffer, }; -static int init_result(void *data, int seq, int res, const void *result) -{ - pw_log_debug("node %p: set_callbacks finished", data); - return 0; -} - SPA_EXPORT int pw_node_set_implementation(struct pw_node *node, struct spa_node *spa_node) { int res; - struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); node->node = spa_node; pw_log_debug("node %p: implementation %p", node, spa_node); spa_graph_node_set_callbacks(&node->rt.node, &spa_graph_node_impl_default, spa_node); res = spa_node_set_callbacks(node->node, &node_callbacks, node); - if (SPA_RESULT_IS_ASYNC(res)) { - pw_log_debug("node %p: init is async %d", node, res); - spa_node_wait(node->node, res, &impl->init_pending, init_result, node); - } - if (spa_node_set_io(node->node, SPA_IO_Position, &node->rt.activation->position, @@ -1025,6 +994,11 @@ void pw_node_destroy(struct pw_node *node) if (node->registered) spa_list_remove(&node->link); + if (impl->pending_state.res != 0) { + pw_log_debug("remove pending state %d", impl->pending_state.res); + spa_list_remove(&impl->pending_state.link); + impl->pending_state.res = 0; + } spa_node_set_callbacks(node->node, NULL, NULL); pw_log_debug("node %p: unlink ports", node); @@ -1270,10 +1244,13 @@ int pw_node_set_state(struct pw_node *node, enum pw_node_state state) return res; if (SPA_RESULT_IS_ASYNC(res)) { - struct node_pending *pending = make_pending(impl, state); + if (impl->pending_state.res != 0) { + pw_log_warn("remove pending state %d", impl->pending_state.res); + spa_list_remove(&impl->pending_state.link); + } - spa_node_wait(node->node, res, &pending->pending, - node_complete, pending); + spa_node_wait(node->node, res, &impl->pending_state, + node_complete, impl); } pw_work_queue_add(impl->work, diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 8ba002788..52877a3ec 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -430,7 +430,7 @@ static int impl_port_enum_params(struct spa_node *node, } } - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num)