diff --git a/pipewire/server/node.c b/pipewire/server/node.c index 0ad3949df..697ac4d3a 100644 --- a/pipewire/server/node.c +++ b/pipewire/server/node.c @@ -709,41 +709,50 @@ struct pw_port *pw_node_get_free_port(struct pw_node *node, enum pw_direction di pw_log_debug("node %p: direction %d max %u, n %u", node, direction, max_ports, *n_ports); + /* first try to find an unlinked port */ spa_list_for_each(p, ports, link) { - if (spa_list_is_empty(&p->links)) { - port = p; - break; - } + if (spa_list_is_empty(&p->links)) + return p; } - if (port == NULL) { - /* no port, can we create one ? */ - if (*n_ports < max_ports) { - for (i = 0; i < max_ports && port == NULL; i++) { - if (portmap[i] == NULL) { - pw_log_debug("node %p: creating port direction %d %u", node, - direction, i); - port = portmap[i] = pw_port_new(node, direction, i); - spa_list_insert(ports, &port->link); - (*n_ports)++; - if ((res = spa_node_add_port(node->node, direction, i)) < 0) { - pw_log_error("node %p: could not add port %d", node, - i); - } else { - spa_node_port_set_io(node->node, direction, i, - &port->io); - } + /* no port, can we create one ? */ + if (*n_ports < max_ports) { + for (i = 0; i < max_ports; i++) { + if (portmap[i] == NULL) { + pw_log_debug("node %p: creating port direction %d %u", node, direction, i); + + port = pw_port_new(node, direction, i); + if (port == NULL) + goto no_mem; + + spa_list_insert(ports, &port->link); + + if ((res = spa_node_add_port(node->node, direction, i)) < 0) { + pw_log_error("node %p: could not add port %d", node, i); + pw_port_destroy(port); + continue; + } else { + spa_node_port_set_io(node->node, direction, i, &port->io); } + (*n_ports)++; + portmap[i] = port; + break; } - } else { - /* for output we can reuse an existing port */ - if (direction == PW_DIRECTION_OUTPUT && !spa_list_is_empty(ports)) { - port = spa_list_first(ports, struct pw_port, link); - } + } + } else { + if (!spa_list_is_empty(ports)) { + port = spa_list_first(ports, struct pw_port, link); + /* for output we can reuse an existing port, for input only + * when there is a mixer */ + if (direction == PW_DIRECTION_INPUT && port->mixer == NULL) + port = NULL; } } return port; + no_mem: + pw_log_error("node %p: can't create new port", node); + return NULL; } static void on_state_complete(struct pw_node *node, void *data, int res) diff --git a/pipewire/server/port.h b/pipewire/server/port.h index 4cb12843e..069dde2bb 100644 --- a/pipewire/server/port.h +++ b/pipewire/server/port.h @@ -72,6 +72,8 @@ struct pw_port { struct spa_list links; /**< list of \ref pw_link */ + void *mixer; /**< optional port buffer mixer */ + struct { struct spa_list links; /**< list of \ref pw_link only accessed from the * data thread */ diff --git a/spa/include/spa/graph.h b/spa/include/spa/graph.h index 239947006..b65556618 100644 --- a/spa/include/spa/graph.h +++ b/spa/include/spa/graph.h @@ -34,6 +34,7 @@ struct spa_graph_port; struct spa_graph { struct spa_list nodes; struct spa_list ready; + struct spa_graph_node *node; }; typedef int (*spa_graph_node_func_t) (struct spa_graph_node * node); @@ -74,14 +75,16 @@ static inline void spa_graph_init(struct spa_graph *graph) static inline int spa_graph_node_schedule_default(struct spa_graph_node *node) { + int res; struct spa_node *n = node->user_data; if (node->action == SPA_GRAPH_ACTION_IN) - return spa_node_process_input(n); + res = spa_node_process_input(n); else if (node->action == SPA_GRAPH_ACTION_OUT) - return spa_node_process_output(n); + res = spa_node_process_output(n); else - return SPA_RESULT_ERROR; + res = SPA_RESULT_ERROR; + return res; } static inline void @@ -163,14 +166,13 @@ spa_graph_port_unlink(struct spa_graph *graph, struct spa_graph_port *out, in->peer = NULL; } -static inline void spa_graph_node_schedule(struct spa_graph *graph, struct spa_graph_node *node) +static inline bool spa_graph_node_iterate(struct spa_graph *graph) { + bool res; struct spa_graph_port *p; - if (node->ready_link.next == NULL) - spa_list_insert(graph->ready.prev, &node->ready_link); - - while (!spa_list_is_empty(&graph->ready)) { + res = !spa_list_is_empty(&graph->ready); + if (res) { struct spa_graph_node *n = spa_list_first(&graph->ready, struct spa_graph_node, ready_link); @@ -181,8 +183,8 @@ static inline void spa_graph_node_schedule(struct spa_graph *graph, struct spa_g case SPA_GRAPH_ACTION_IN: case SPA_GRAPH_ACTION_OUT: n->state = n->schedule(n); - if (n->action == SPA_GRAPH_ACTION_IN && n == node) - continue; + if (n->action == SPA_GRAPH_ACTION_IN && n == graph->node) + break; n->action = SPA_GRAPH_ACTION_CHECK; spa_list_insert(graph->ready.prev, &n->ready_link); break; @@ -193,7 +195,7 @@ static inline void spa_graph_node_schedule(struct spa_graph *graph, struct spa_g spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { struct spa_graph_node *pn = p->peer->node; if (p->io->status == SPA_RESULT_NEED_BUFFER) { - if (pn != node + if (pn != graph->node || pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC) { pn->action = SPA_GRAPH_ACTION_OUT; spa_list_insert(graph->ready.prev, @@ -204,14 +206,33 @@ static inline void spa_graph_node_schedule(struct spa_graph *graph, struct spa_g } } else if (n->state == SPA_RESULT_HAVE_BUFFER) { spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) - spa_graph_port_check(graph, p->peer); + spa_graph_port_check(graph, p->peer); } break; default: break; } + res = !spa_list_is_empty(&graph->ready); } + return res; +} + +static inline void spa_graph_node_pull(struct spa_graph *graph, struct spa_graph_node *node) +{ + node->action = SPA_GRAPH_ACTION_CHECK; + node->state = SPA_RESULT_NEED_BUFFER; + graph->node = node; + if (node->ready_link.next == NULL) + spa_list_insert(graph->ready.prev, &node->ready_link); +} + +static inline void spa_graph_node_push(struct spa_graph *graph, struct spa_graph_node *node) +{ + node->action = SPA_GRAPH_ACTION_OUT; + graph->node = node; + if (node->ready_link.next == NULL) + spa_list_insert(graph->ready.prev, &node->ready_link); } #ifdef __cplusplus diff --git a/spa/tests/test-graph.c b/spa/tests/test-graph.c index 3eb707066..404ea0eb4 100644 --- a/spa/tests/test-graph.c +++ b/spa/tests/test-graph.c @@ -229,10 +229,9 @@ static void on_sink_need_input(struct spa_node *node, void *user_data) { struct data *data = user_data; - data->sink_node.action = SPA_GRAPH_ACTION_CHECK; - data->sink_node.state = SPA_RESULT_NEED_BUFFER; + spa_graph_node_pull(&data->graph, &data->sink_node); - spa_graph_node_schedule(&data->graph, &data->sink_node); + while (spa_graph_node_iterate(&data->graph)); } static void diff --git a/spa/tests/test-mixer.c b/spa/tests/test-mixer.c index 5b7754fbc..99b614f0d 100644 --- a/spa/tests/test-mixer.c +++ b/spa/tests/test-mixer.c @@ -37,7 +37,7 @@ #include #include -#undef USE_GRAPH +#define USE_GRAPH static SPA_TYPE_MAP_IMPL(default_map, 4096); static SPA_LOG_IMPL(default_log); @@ -240,10 +240,9 @@ static void on_sink_need_input(struct spa_node *node, void *user_data) { struct data *data = user_data; #ifdef USE_GRAPH - data->sink_node.action = PROCESS_CHECK; - data->sink_node.state = SPA_RESULT_NEED_BUFFER; + spa_graph_node_pull(&data->graph, &data->sink_node); + while (spa_graph_node_iterate(&data->graph)); - spa_graph_node_schedule(&data->graph, &data->sink_node); #else int res; diff --git a/spa/tests/test-perf.c b/spa/tests/test-perf.c index 938407678..eb22eff73 100644 --- a/spa/tests/test-perf.c +++ b/spa/tests/test-perf.c @@ -223,8 +223,8 @@ static void on_sink_pull(struct data *data) spa_node_process_output(data->source); spa_node_process_input(data->sink); } else { - data->sink_node.action = SPA_GRAPH_ACTION_CHECK; - spa_graph_node_schedule(&data->graph, &data->sink_node); + spa_graph_node_pull(&data->graph, &data->sink_node); + while (spa_graph_node_iterate(&data->graph)); } } @@ -235,8 +235,8 @@ static void on_source_push(struct data *data) spa_node_process_output(data->source); spa_node_process_input(data->sink); } else { - data->source_node.action = SPA_GRAPH_ACTION_OUT; - spa_graph_node_schedule(&data->graph, &data->source_node); + spa_graph_node_push(&data->graph, &data->source_node); + while (spa_graph_node_iterate(&data->graph)); } }