diff --git a/spa/include/spa/node/node.h b/spa/include/spa/node/node.h index 3c0ef3b32..136772d96 100644 --- a/spa/include/spa/node/node.h +++ b/spa/include/spa/node/node.h @@ -212,15 +212,15 @@ struct spa_node { * The hook is automatically removed after the hook is called. * * \param node a spa_node - * \param res an async return value to wait for + * \param seq an async return value to wait for * \param pending a spa_pending structure * \param func a result callback * \param data data passed to \a func * \return 0 on success * -EINVAL when node is NULL */ - int (*wait) (struct spa_node *node, int res, struct spa_pending *pending, - spa_result_func_t func, void *data); + int (*wait) (struct spa_node *node, int seq, struct spa_pending *pending, + spa_pending_func_t func, void *data); /** * Enumerate the parameters of a node. diff --git a/spa/include/spa/node/utils.h b/spa/include/spa/node/utils.h index 66291efb9..b0adbcdf7 100644 --- a/spa/include/spa/node/utils.h +++ b/spa/include/spa/node/utils.h @@ -37,20 +37,17 @@ struct spa_result_node_enum_params_data { }; static inline int spa_result_func_node_enum_params(void *data, - int seq, int res, const void *result) + uint32_t count, const void *result) { struct spa_result_node_enum_params_data *d = (struct spa_result_node_enum_params_data *)data; const struct spa_result_node_enum_params *r = (const struct spa_result_node_enum_params *)result; - - if (res == 1) { - uint32_t offset = d->builder->state.offset; - spa_pod_builder_raw_padded(d->builder, r->param, SPA_POD_SIZE(r->param)); - d->data.next = r->next; - d->data.param = SPA_MEMBER(d->builder->data, offset, struct spa_pod); - } - return res; + uint32_t offset = d->builder->state.offset; + spa_pod_builder_raw_padded(d->builder, r->param, SPA_POD_SIZE(r->param)); + d->data.next = r->next; + d->data.param = SPA_MEMBER(d->builder->data, offset, struct spa_pod); + return 1; } static inline int spa_node_enum_params_sync(struct spa_node *node, diff --git a/spa/include/spa/utils/result.h b/spa/include/spa/utils/result.h index 56ec6efd2..e32c22931 100644 --- a/spa/include/spa/utils/result.h +++ b/spa/include/spa/utils/result.h @@ -43,15 +43,18 @@ extern "C" { #define SPA_RESULT_ASYNC_SEQ(res) ((res) & SPA_ASYNC_SEQ_MASK) #define SPA_RESULT_RETURN_ASYNC(seq) (SPA_ASYNC_BIT | SPA_RESULT_ASYNC_SEQ(seq)) -typedef int (*spa_result_func_t) (void *data, int seq, int res, const void *result); +typedef int (*spa_result_func_t) (void *data, uint32_t count, const void *result); + +struct spa_pending; + +typedef int (*spa_pending_func_t) (struct spa_pending *pending, const void *result); struct spa_pending { - struct spa_list link; - int seq; - int res; - spa_result_func_t func; - void *data; - void (*removed) (struct spa_pending *pending); + struct spa_list link; /**< link used internally */ + int seq; /**< sequence number of pending result */ + int res; /**< result code of operation */ + spa_pending_func_t func; /**< callback function */ + void *data; /**< extra user data */ }; #ifdef __cplusplus diff --git a/spa/plugins/alsa/alsa-sink.c b/spa/plugins/alsa/alsa-sink.c index 01f8589a0..daeb85d30 100644 --- a/spa/plugins/alsa/alsa-sink.c +++ b/spa/plugins/alsa/alsa-sink.c @@ -159,7 +159,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -437,7 +437,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/alsa/alsa-source.c b/spa/plugins/alsa/alsa-source.c index 518a5b055..ffc09303c 100644 --- a/spa/plugins/alsa/alsa-source.c +++ b/spa/plugins/alsa/alsa-source.c @@ -155,7 +155,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -434,7 +434,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/alsa/alsa-utils.c b/spa/plugins/alsa/alsa-utils.c index e463f8b12..219448cd9 100644 --- a/spa/plugins/alsa/alsa-utils.c +++ b/spa/plugins/alsa/alsa-utils.c @@ -359,7 +359,7 @@ spa_alsa_enum_format(struct state *state, uint32_t start, uint32_t num, if ((res = spa_pod_filter(&b, &result.param, fmt, filter)) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) goto exit; if (++count != num) diff --git a/spa/plugins/audioconvert/audioconvert.c b/spa/plugins/audioconvert/audioconvert.c index 57dcbb21a..16731e094 100644 --- a/spa/plugins/audioconvert/audioconvert.c +++ b/spa/plugins/audioconvert/audioconvert.c @@ -443,7 +443,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -715,7 +715,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/audioconvert/channelmix.c b/spa/plugins/audioconvert/channelmix.c index 075ebcffb..0584e9f53 100644 --- a/spa/plugins/audioconvert/channelmix.c +++ b/spa/plugins/audioconvert/channelmix.c @@ -518,7 +518,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -806,7 +806,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/audioconvert/fmtconvert.c b/spa/plugins/audioconvert/fmtconvert.c index cb1c3682d..638614d65 100644 --- a/spa/plugins/audioconvert/fmtconvert.c +++ b/spa/plugins/audioconvert/fmtconvert.c @@ -489,7 +489,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/audioconvert/merger.c b/spa/plugins/audioconvert/merger.c index 0b694d681..abd403712 100644 --- a/spa/plugins/audioconvert/merger.c +++ b/spa/plugins/audioconvert/merger.c @@ -208,7 +208,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -513,7 +513,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/audioconvert/resample.c b/spa/plugins/audioconvert/resample.c index 507003a96..c88545660 100644 --- a/spa/plugins/audioconvert/resample.c +++ b/spa/plugins/audioconvert/resample.c @@ -450,7 +450,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/audioconvert/splitter.c b/spa/plugins/audioconvert/splitter.c index de617f835..e8e478da0 100644 --- a/spa/plugins/audioconvert/splitter.c +++ b/spa/plugins/audioconvert/splitter.c @@ -206,7 +206,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -510,7 +510,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/audiomixer/audiomixer.c b/spa/plugins/audiomixer/audiomixer.c index b77e44dfb..64d36408e 100644 --- a/spa/plugins/audiomixer/audiomixer.c +++ b/spa/plugins/audiomixer/audiomixer.c @@ -438,7 +438,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/audiotestsrc/audiotestsrc.c b/spa/plugins/audiotestsrc/audiotestsrc.c index fdecbbbf2..6c9672bbb 100644 --- a/spa/plugins/audiotestsrc/audiotestsrc.c +++ b/spa/plugins/audiotestsrc/audiotestsrc.c @@ -238,7 +238,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -665,7 +665,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/bluez5/a2dp-sink.c b/spa/plugins/bluez5/a2dp-sink.c index f0c6a1fd6..cb9b9bc8c 100644 --- a/spa/plugins/bluez5/a2dp-sink.c +++ b/spa/plugins/bluez5/a2dp-sink.c @@ -232,7 +232,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -1050,7 +1050,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/ffmpeg/ffmpeg-dec.c b/spa/plugins/ffmpeg/ffmpeg-dec.c index dad40594c..c36d08365 100644 --- a/spa/plugins/ffmpeg/ffmpeg-dec.c +++ b/spa/plugins/ffmpeg/ffmpeg-dec.c @@ -261,7 +261,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/ffmpeg/ffmpeg-enc.c b/spa/plugins/ffmpeg/ffmpeg-enc.c index a1f058274..90fafa76e 100644 --- a/spa/plugins/ffmpeg/ffmpeg-enc.c +++ b/spa/plugins/ffmpeg/ffmpeg-enc.c @@ -255,7 +255,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/test/fakesink.c b/spa/plugins/test/fakesink.c index e021bc2a2..381206cdb 100644 --- a/spa/plugins/test/fakesink.c +++ b/spa/plugins/test/fakesink.c @@ -147,7 +147,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -491,7 +491,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/test/fakesrc.c b/spa/plugins/test/fakesrc.c index 30783ed57..af126fd19 100644 --- a/spa/plugins/test/fakesrc.c +++ b/spa/plugins/test/fakesrc.c @@ -157,7 +157,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -508,7 +508,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/v4l2/v4l2-source.c b/spa/plugins/v4l2/v4l2-source.c index a789321b8..79bfe4e76 100644 --- a/spa/plugins/v4l2/v4l2-source.c +++ b/spa/plugins/v4l2/v4l2-source.c @@ -146,8 +146,8 @@ struct impl { #include "v4l2-utils.c" -static int impl_node_wait(struct spa_node *node, int res, struct spa_pending *pending, - spa_result_func_t func, void *data) +static int impl_node_wait(struct spa_node *node, int seq, struct spa_pending *pending, + spa_pending_func_t func, void *data) { return -ENOTSUP; } @@ -247,7 +247,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -574,7 +574,7 @@ static int impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/v4l2/v4l2-utils.c b/spa/plugins/v4l2/v4l2-utils.c index 557563f59..6c6f22999 100644 --- a/spa/plugins/v4l2/v4l2-utils.c +++ b/spa/plugins/v4l2/v4l2-utils.c @@ -812,7 +812,7 @@ spa_v4l2_enum_format(struct impl *this, result.param = spa_pod_builder_pop(&b, &f[0]); result.next++; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) goto exit; if (++count != num) @@ -1142,7 +1142,7 @@ spa_v4l2_enum_controls(struct impl *this, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) goto exit; if (++count != num) diff --git a/spa/plugins/videotestsrc/videotestsrc.c b/spa/plugins/videotestsrc/videotestsrc.c index 92adcc812..a939c2ddc 100644 --- a/spa/plugins/videotestsrc/videotestsrc.c +++ b/spa/plugins/videotestsrc/videotestsrc.c @@ -207,7 +207,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -591,7 +591,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/spa/plugins/volume/volume.c b/spa/plugins/volume/volume.c index 688dfcde5..84ae5c866 100644 --- a/spa/plugins/volume/volume.c +++ b/spa/plugins/volume/volume.c @@ -183,7 +183,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -434,7 +434,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/src/examples/export-sink.c b/src/examples/export-sink.c index 51d3e3a54..53b06c249 100644 --- a/src/examples/export-sink.c +++ b/src/examples/export-sink.c @@ -291,7 +291,7 @@ static int impl_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/src/examples/export-source.c b/src/examples/export-source.c index 3370aefe0..08465d4a0 100644 --- a/src/examples/export-source.c +++ b/src/examples/export-source.c @@ -262,7 +262,7 @@ static int impl_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/src/examples/local-v4l2.c b/src/examples/local-v4l2.c index 5452f7b8e..538c13a69 100644 --- a/src/examples/local-v4l2.c +++ b/src/examples/local-v4l2.c @@ -185,7 +185,7 @@ static int impl_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/src/modules/module-audio-dsp/floatmix.c b/src/modules/module-audio-dsp/floatmix.c index fc35cf4eb..b57c5d9ec 100644 --- a/src/modules/module-audio-dsp/floatmix.c +++ b/src/modules/module-audio-dsp/floatmix.c @@ -429,7 +429,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index 6efd68240..7748c1a9c 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -389,7 +389,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) != 0) continue; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -529,7 +529,7 @@ impl_node_sync(struct spa_node *node) static int impl_node_wait(struct spa_node *node, int res, struct spa_pending *pending, - spa_result_func_t func, void *data) + spa_pending_func_t func, void *data) { struct node *this; int seq; @@ -705,7 +705,7 @@ impl_node_port_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) continue; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -1256,8 +1256,10 @@ static void client_node_resource_done(void *data, uint32_t seq) spa_list_for_each_safe(p, t, &this->pending_list, link) { if (p->seq == (int) seq) { pw_log_debug("client-node %p: do callback %d", this, p->res); - p->func(p->data, p->res, 0, NULL); spa_list_remove(&p->link); + p->seq = p->res; + p->res = 0; + p->func(p, NULL); count++; } } diff --git a/src/modules/module-client-node/client-stream.c b/src/modules/module-client-node/client-stream.c index b51e7d7b6..5075ff020 100644 --- a/src/modules/module-client-node/client-stream.c +++ b/src/modules/module-client-node/client-stream.c @@ -170,7 +170,7 @@ static int impl_node_enum_params(struct spa_node *node, if (spa_pod_filter(&b, &result.param, param, filter) < 0) goto next; - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num) @@ -350,7 +350,7 @@ impl_node_sync(struct spa_node *node) static int impl_node_wait(struct spa_node *node, int seq, struct spa_pending *pending, - spa_result_func_t func, void *data) + spa_pending_func_t func, void *data) { struct node *this; struct impl *impl; diff --git a/src/modules/spa/spa-node.c b/src/modules/spa/spa-node.c index b25a86c6b..9540f3df9 100644 --- a/src/modules/spa/spa-node.c +++ b/src/modules/spa/spa-node.c @@ -99,11 +99,11 @@ static int complete_init(struct impl *impl) return 0; } -static int on_init_done(void *data, int seq, int res, const void *result) +static int on_init_done(struct spa_pending *pending, const void *result) { - struct impl *impl = data; + struct impl *impl = pending->data; struct pw_node *this = impl->this; - pw_log_debug("spa-node %p: init complete event %d %d", this, seq, res); + pw_log_debug("spa-node %p: init complete event %d %d", this, pending->seq, pending->res); return complete_init(impl); } diff --git a/src/pipewire/link.c b/src/pipewire/link.c index d4d3ed89b..2ee7269f3 100644 --- a/src/pipewire/link.c +++ b/src/pipewire/link.c @@ -69,6 +69,9 @@ struct impl { struct spa_io_buffers io; struct pw_node *inode, *onode; + + struct spa_pending input_pending; + struct spa_pending output_pending; }; struct resource_data { @@ -150,24 +153,28 @@ static void pw_link_update_state(struct pw_link *link, enum pw_link_state state, } } -static int complete_output(void *data, int seq, int res, const void *result) +static int complete_output(struct spa_pending *pending, const void *result) { - struct pw_link *this = data; + struct pw_link *this = pending->data; struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); struct pw_node *node = this->output->node; - seq = SPA_RESULT_ASYNC_SEQ(seq); + uint32_t seq = SPA_RESULT_ASYNC_SEQ(pending->seq); + int res = pending->res; pw_log_debug("link %p: node %p async complete %d %d", impl, node, seq, res); + pending->res = 0; pw_work_queue_complete(impl->work, node, seq, res); return 0; } -static int complete_input(void *data, int seq, int res, const void *result) +static int complete_input(struct spa_pending *pending, const void *result) { - struct pw_link *this = data; + struct pw_link *this = pending->data; struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); struct pw_node *node = this->input->node; - seq = SPA_RESULT_ASYNC_SEQ(seq); + uint32_t seq = SPA_RESULT_ASYNC_SEQ(pending->seq); + int res = pending->res; pw_log_debug("link %p: node %p async complete %d %d", impl, node, seq, res); + pending->res = 0; pw_work_queue_complete(impl->work, node, seq, res); return 0; } @@ -204,16 +211,13 @@ static void complete_paused(void *obj, void *data, int res, uint32_t id) } } -static void remove_pending(struct spa_pending *pending) +static struct spa_pending *prepare_pending(struct impl *impl, struct spa_pending *pending) { - free(pending); -} - -static struct spa_pending *make_pending(struct impl *impl) -{ - struct spa_pending *pending; - pending = calloc(1, sizeof(struct spa_pending)); - pending->removed = remove_pending; + if (pending->res != 0) { + pw_log_warn("link %p: remove pending %d", impl, pending->res); + spa_list_remove(&pending->link); + pending->res = 0; + } return pending; } @@ -336,10 +340,9 @@ static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_st } if (SPA_RESULT_IS_ASYNC(res)) { spa_node_wait(output->node->node, res, - make_pending(impl), + prepare_pending(impl, &impl->output_pending), complete_output, this); - pw_work_queue_add(impl->work, output->node, res, complete_ready, &this->rt.out_mix); } @@ -355,7 +358,7 @@ static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_st } if (SPA_RESULT_IS_ASYNC(res2)) { spa_node_wait(input->node->node, res2, - make_pending(impl), + prepare_pending(impl, &impl->input_pending), complete_input, this); @@ -724,7 +727,10 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s goto error; } if (SPA_RESULT_IS_ASYNC(res)) { - //spa_node_sync(output->node->node, res); + spa_node_wait(output->node->node, res, + prepare_pending(impl, &impl->output_pending), + complete_output, + this); pw_work_queue_add(impl->work, output->node, res, complete_paused, &this->rt.out_mix); } @@ -744,7 +750,10 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s goto error; } if (SPA_RESULT_IS_ASYNC(res)) { - //spa_node_sync(input->node->node, res); + spa_node_wait(input->node->node, res, + prepare_pending(impl, &impl->input_pending), + complete_input, + this); pw_work_queue_add(impl->work, input->node, res, complete_paused, &this->rt.in_mix); } @@ -768,7 +777,7 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s } if (SPA_RESULT_IS_ASYNC(res)) { spa_node_wait(output->node->node, res, - make_pending(impl), + prepare_pending(impl, &impl->output_pending), complete_output, this); pw_work_queue_add(impl->work, output->node, res, complete_paused, @@ -791,7 +800,7 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s } if (SPA_RESULT_IS_ASYNC(res)) { spa_node_wait(input->node->node, res, - make_pending(impl), + prepare_pending(impl, &impl->input_pending), complete_input, this); pw_work_queue_add(impl->work, input->node, res, complete_paused, @@ -920,28 +929,6 @@ static void check_states(void *obj, void *user_data, int res, uint32_t id) this, -EBUSY, (pw_work_func_t) check_states, this); } -#if 0 -static void -input_node_async_complete(void *data, uint32_t seq, int res) -{ - struct impl *impl = data; - struct pw_node *node = impl->this.input->node; - - pw_log_debug("link %p: node %p async complete %d %d", impl, node, seq, res); - pw_work_queue_complete(impl->work, node, seq, res); -} - -static void -output_node_async_complete(void *data, uint32_t seq, int res) -{ - struct impl *impl = data; - struct pw_node *node = impl->this.output->node; - - pw_log_debug("link %p: node %p async complete %d %d", impl, node, seq, res); - pw_work_queue_complete(impl->work, node, seq, res); -} -#endif - static void clear_port_buffers(struct pw_link *link, struct pw_port *port) { int res; @@ -972,6 +959,8 @@ static void input_remove(struct pw_link *this, struct pw_port *port) spa_hook_remove(&impl->input_port_listener); spa_hook_remove(&impl->input_node_listener); + prepare_pending(impl, &impl->input_pending); + spa_list_remove(&this->input_link); pw_port_events_link_removed(this->input, this); @@ -991,6 +980,8 @@ static void output_remove(struct pw_link *this, struct pw_port *port) spa_hook_remove(&impl->output_port_listener); spa_hook_remove(&impl->output_node_listener); + prepare_pending(impl, &impl->output_pending); + spa_list_remove(&this->output_link); pw_port_events_link_removed(this->output, this); diff --git a/src/pipewire/node.c b/src/pipewire/node.c index 1361bac7a..76f0535f8 100644 --- a/src/pipewire/node.c +++ b/src/pipewire/node.c @@ -50,7 +50,6 @@ struct impl { struct pw_node this; struct pw_work_queue *work; - bool pause_on_idle; struct spa_graph driver_graph; struct spa_graph_state driver_state; @@ -62,7 +61,9 @@ struct impl { uint32_t next_position; int last_error; - struct spa_pending init_pending; + + struct spa_pending pending_state; + int pause_on_idle:1; }; struct resource_data { @@ -837,33 +838,13 @@ static int node_port_info(void *data, enum spa_direction direction, uint32_t por return 0; } -struct node_pending { - struct spa_pending pending; - struct impl *impl; - enum pw_node_state state; -}; - -static void remove_pending(struct spa_pending *pending) +static int node_complete(struct spa_pending *pending, const void *result) { - free(pending); -} - -static struct node_pending *make_pending(struct impl *impl, enum pw_node_state state) -{ - struct node_pending *pending; - pending = calloc(1, sizeof(struct node_pending)); - pending->pending.removed = remove_pending; - pending->impl = impl; - pending->state = state; - return pending; -} - -static int node_complete(void *data, int seq, int res, const void *result) -{ - struct node_pending *pending = data; - struct impl *impl = pending->impl; - seq = SPA_RESULT_ASYNC_SEQ(seq); - pw_log_debug("node %p: done event %d %u", impl, res, pending->pending.seq); + struct impl *impl = pending->data; + uint32_t seq = SPA_RESULT_ASYNC_SEQ(pending->seq); + int res = pending->res; + pending->res = 0; + pw_log_debug("node %p: done event %d %u", impl, res, pending->seq); impl->last_error = res; pw_work_queue_complete(impl->work, &impl->this, seq, res); return 0; @@ -933,29 +914,17 @@ static const struct spa_node_callbacks node_callbacks = { .reuse_buffer = node_reuse_buffer, }; -static int init_result(void *data, int seq, int res, const void *result) -{ - pw_log_debug("node %p: set_callbacks finished", data); - return 0; -} - SPA_EXPORT int pw_node_set_implementation(struct pw_node *node, struct spa_node *spa_node) { int res; - struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); node->node = spa_node; pw_log_debug("node %p: implementation %p", node, spa_node); spa_graph_node_set_callbacks(&node->rt.node, &spa_graph_node_impl_default, spa_node); res = spa_node_set_callbacks(node->node, &node_callbacks, node); - if (SPA_RESULT_IS_ASYNC(res)) { - pw_log_debug("node %p: init is async %d", node, res); - spa_node_wait(node->node, res, &impl->init_pending, init_result, node); - } - if (spa_node_set_io(node->node, SPA_IO_Position, &node->rt.activation->position, @@ -1025,6 +994,11 @@ void pw_node_destroy(struct pw_node *node) if (node->registered) spa_list_remove(&node->link); + if (impl->pending_state.res != 0) { + pw_log_debug("remove pending state %d", impl->pending_state.res); + spa_list_remove(&impl->pending_state.link); + impl->pending_state.res = 0; + } spa_node_set_callbacks(node->node, NULL, NULL); pw_log_debug("node %p: unlink ports", node); @@ -1270,10 +1244,13 @@ int pw_node_set_state(struct pw_node *node, enum pw_node_state state) return res; if (SPA_RESULT_IS_ASYNC(res)) { - struct node_pending *pending = make_pending(impl, state); + if (impl->pending_state.res != 0) { + pw_log_warn("remove pending state %d", impl->pending_state.res); + spa_list_remove(&impl->pending_state.link); + } - spa_node_wait(node->node, res, &pending->pending, - node_complete, pending); + spa_node_wait(node->node, res, &impl->pending_state, + node_complete, impl); } pw_work_queue_add(impl->work, diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 8ba002788..52877a3ec 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -430,7 +430,7 @@ static int impl_port_enum_params(struct spa_node *node, } } - if ((res = func(data, count, 1, &result)) != 0) + if ((res = func(data, count, &result)) != 0) return res; if (++count != num)