diff --git a/src/extensions/client-node.h b/src/extensions/client-node.h index 0d3f40b46..63b804088 100644 --- a/src/extensions/client-node.h +++ b/src/extensions/client-node.h @@ -288,14 +288,14 @@ pw_client_node_proxy_destroy(struct pw_client_node_proxy *p) } -#define PW_CLIENT_NODE_PROXY_EVENT_TRANSPORT 0 -#define PW_CLIENT_NODE_PROXY_EVENT_SET_PARAM 1 -#define PW_CLIENT_NODE_PROXY_EVENT_EVENT 2 -#define PW_CLIENT_NODE_PROXY_EVENT_COMMAND 3 -#define PW_CLIENT_NODE_PROXY_EVENT_ADD_PORT 4 -#define PW_CLIENT_NODE_PROXY_EVENT_REMOVE_PORT 5 -#define PW_CLIENT_NODE_PROXY_EVENT_PORT_SET_PARAM 6 -#define PW_CLIENT_NODE_PROXY_EVENT_PORT_ADD_MEM 7 +#define PW_CLIENT_NODE_PROXY_EVENT_ADD_MEM 0 +#define PW_CLIENT_NODE_PROXY_EVENT_TRANSPORT 1 +#define PW_CLIENT_NODE_PROXY_EVENT_SET_PARAM 2 +#define PW_CLIENT_NODE_PROXY_EVENT_EVENT 3 +#define PW_CLIENT_NODE_PROXY_EVENT_COMMAND 4 +#define PW_CLIENT_NODE_PROXY_EVENT_ADD_PORT 5 +#define PW_CLIENT_NODE_PROXY_EVENT_REMOVE_PORT 6 +#define PW_CLIENT_NODE_PROXY_EVENT_PORT_SET_PARAM 7 #define PW_CLIENT_NODE_PROXY_EVENT_PORT_USE_BUFFERS 8 #define PW_CLIENT_NODE_PROXY_EVENT_PORT_COMMAND 9 #define PW_CLIENT_NODE_PROXY_EVENT_PORT_SET_IO 10 @@ -305,6 +305,19 @@ pw_client_node_proxy_destroy(struct pw_client_node_proxy *p) struct pw_client_node_proxy_events { #define PW_VERSION_CLIENT_NODE_PROXY_EVENTS 0 uint32_t version; + /** + * Memory was added to a node + * + * \param mem_id the id of the memory + * \param type the memory type + * \param memfd the fd of the memory + * \param flags flags for the \a memfd + */ + void (*add_mem) (void *object, + uint32_t mem_id, + uint32_t type, + int memfd, + uint32_t flags); /** * Notify of a new transport area * @@ -387,25 +400,6 @@ struct pw_client_node_proxy_events { uint32_t port_id, uint32_t id, uint32_t flags, const struct spa_pod *param); - /** - * Memory was added for a port - * - * \param direction a port direction - * \param port_id the port id - * \param mem_id the id of the memory - * \param type the memory type - * \param memfd the fd of the memory - * \param flags flags for the \a memfd - * \param offset valid offset of mapped memory from \a memfd - * \param size valid size of mapped memory from \a memfd - */ - void (*port_add_mem) (void *object, - enum spa_direction direction, - uint32_t port_id, - uint32_t mem_id, - uint32_t type, - int memfd, - uint32_t flags); /** * Notify the port of buffers * @@ -463,6 +457,8 @@ pw_client_node_proxy_add_listener(struct pw_client_node_proxy *p, pw_proxy_add_proxy_listener((struct pw_proxy*)p, listener, events, data); } +#define pw_client_node_resource_add_mem(r,...) \ + pw_resource_notify(r,struct pw_client_node_proxy_events,add_mem,__VA_ARGS__) #define pw_client_node_resource_transport(r,...) \ pw_resource_notify(r,struct pw_client_node_proxy_events,transport,__VA_ARGS__) #define pw_client_node_resource_set_param(r,...) \ @@ -477,8 +473,6 @@ pw_client_node_proxy_add_listener(struct pw_client_node_proxy *p, pw_resource_notify(r,struct pw_client_node_proxy_events,remove_port,__VA_ARGS__) #define pw_client_node_resource_port_set_param(r,...) \ pw_resource_notify(r,struct pw_client_node_proxy_events,port_set_param,__VA_ARGS__) -#define pw_client_node_resource_port_add_mem(r,...) \ - pw_resource_notify(r,struct pw_client_node_proxy_events,port_add_mem,__VA_ARGS__) #define pw_client_node_resource_port_use_buffers(r,...) \ pw_resource_notify(r,struct pw_client_node_proxy_events,port_use_buffers,__VA_ARGS__) #define pw_client_node_resource_port_command(r,...) \ diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index e7782904f..0142f62a4 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -68,8 +68,6 @@ struct buffer { struct spa_buffer buffer; struct spa_meta metas[4]; struct spa_data datas[4]; - off_t offset; - size_t size; bool outstanding; }; @@ -540,11 +538,10 @@ spa_proxy_node_port_set_io(struct spa_node *node, if ((mem = pw_memblock_find(data)) == NULL) return -EINVAL; - pw_client_node_resource_port_add_mem(this->resource, - direction, port_id, - memid, - t->data.MemFd, - mem->fd, mem->flags); + pw_client_node_resource_add_mem(this->resource, + memid, + t->data.MemFd, + mem->fd, mem->flags); pw_client_node_resource_port_set_io(this->resource, this->seq, @@ -567,7 +564,6 @@ spa_proxy_node_port_use_buffers(struct spa_node *node, struct impl *impl; struct port *port; uint32_t i, j; - size_t n_mem; struct pw_client_node_buffer *mb; struct pw_type *t; @@ -598,11 +594,10 @@ spa_proxy_node_port_use_buffers(struct spa_node *node, if (this->resource == NULL) return 0; - n_mem = this->membase; for (i = 0; i < n_buffers; i++) { struct buffer *b = &port->buffers[i]; struct pw_memblock *m; - size_t data_size; + size_t data_size, size; void *baseptr; b->outbuf = buffers[i]; @@ -632,21 +627,20 @@ spa_proxy_node_port_use_buffers(struct spa_node *node, } mb[i].buffer = &b->buffer; - mb[i].mem_id = n_mem++; + mb[i].mem_id = this->membase++; mb[i].offset = SPA_PTRDIFF(baseptr, m->ptr + m->offset); mb[i].size = data_size; - pw_client_node_resource_port_add_mem(this->resource, - direction, - port_id, - mb[i].mem_id, - t->data.MemFd, - m->fd, m->flags); + pw_client_node_resource_add_mem(this->resource, + mb[i].mem_id, + t->data.MemFd, + m->fd, m->flags); for (j = 0; j < buffers[i]->n_metas; j++) memcpy(&b->buffer.metas[j], &buffers[i]->metas[j], sizeof(struct spa_meta)); b->buffer.n_metas = j; + size = 0; for (j = 0; j < buffers[i]->n_datas; j++) { struct spa_data *d = &buffers[i]->datas[j]; @@ -654,18 +648,16 @@ spa_proxy_node_port_use_buffers(struct spa_node *node, if (d->type == t->data.DmaBuf || d->type == t->data.MemFd) { - pw_client_node_resource_port_add_mem(this->resource, - direction, - port_id, - n_mem, - d->type, - d->fd, - d->flags); - b->buffer.datas[j].data = SPA_UINT32_TO_PTR(n_mem); - n_mem++; + pw_client_node_resource_add_mem(this->resource, + this->membase, + d->type, + d->fd, + d->flags); + b->buffer.datas[j].data = SPA_UINT32_TO_PTR(this->membase); + this->membase++; } else if (d->type == t->data.MemPtr) { - b->buffer.datas[j].data = SPA_INT_TO_PTR(b->size); - b->size += d->maxsize; + b->buffer.datas[j].data = SPA_INT_TO_PTR(size); + size += d->maxsize; } else { b->buffer.datas[j].type = SPA_ID_INVALID; b->buffer.datas[j].data = 0; diff --git a/src/modules/module-client-node/protocol-native.c b/src/modules/module-client-node/protocol-native.c index fbfec10c4..854475700 100644 --- a/src/modules/module-client-node/protocol-native.c +++ b/src/modules/module-client-node/protocol-native.c @@ -149,6 +149,31 @@ static void client_node_marshal_destroy(void *object) pw_protocol_native_end_proxy(proxy, b); } +static bool client_node_demarshal_add_mem(void *object, void *data, size_t size) +{ + struct pw_proxy *proxy = object; + struct spa_pod_parser prs; + uint32_t mem_id, type, memfd_idx, flags; + int memfd; + + spa_pod_parser_init(&prs, data, size, 0); + if (spa_pod_parser_get(&prs, + "[" + "i", &mem_id, + "I", &type, + "i", &memfd_idx, + "i", &flags, NULL) < 0) + return false; + + memfd = pw_protocol_native_get_proxy_fd(proxy, memfd_idx); + + pw_proxy_notify(proxy, struct pw_client_node_proxy_events, add_mem, + mem_id, + type, + memfd, flags); + return true; +} + static bool client_node_demarshal_transport(void *object, void *data, size_t size) { struct pw_proxy *proxy = object; @@ -294,34 +319,6 @@ static bool client_node_demarshal_port_set_param(void *object, void *data, size_ return true; } -static bool client_node_demarshal_port_add_mem(void *object, void *data, size_t size) -{ - struct pw_proxy *proxy = object; - struct spa_pod_parser prs; - uint32_t direction, port_id, mem_id, type, memfd_idx, flags; - int memfd; - - spa_pod_parser_init(&prs, data, size, 0); - if (spa_pod_parser_get(&prs, - "[" - "i", &direction, - "i", &port_id, - "i", &mem_id, - "I", &type, - "i", &memfd_idx, - "i", &flags, NULL) < 0) - return false; - - memfd = pw_protocol_native_get_proxy_fd(proxy, memfd_idx); - - pw_proxy_notify(proxy, struct pw_client_node_proxy_events, port_add_mem, direction, - port_id, - mem_id, - type, - memfd, flags); - return true; -} - static bool client_node_demarshal_port_use_buffers(void *object, void *data, size_t size) { struct pw_proxy *proxy = object; @@ -432,6 +429,26 @@ static bool client_node_demarshal_port_set_io(void *object, void *data, size_t s return true; } +static void +client_node_marshal_add_mem(void *object, + uint32_t mem_id, + uint32_t type, + int memfd, uint32_t flags) +{ + struct pw_resource *resource = object; + struct spa_pod_builder *b; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_ADD_MEM); + + spa_pod_builder_struct(b, + "i", mem_id, + "I", type, + "i", pw_protocol_native_add_resource_fd(resource, memfd), + "i", flags); + + pw_protocol_native_end_resource(resource, b); +} + static void client_node_marshal_transport(void *object, uint32_t node_id, int readfd, int writefd, struct pw_client_node_transport *transport) { @@ -556,30 +573,6 @@ client_node_marshal_port_set_param(void *object, pw_protocol_native_end_resource(resource, b); } -static void -client_node_marshal_port_add_mem(void *object, - enum spa_direction direction, - uint32_t port_id, - uint32_t mem_id, - uint32_t type, - int memfd, uint32_t flags) -{ - struct pw_resource *resource = object; - struct spa_pod_builder *b; - - b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_PORT_ADD_MEM); - - spa_pod_builder_struct(b, - "i", direction, - "i", port_id, - "i", mem_id, - "I", type, - "i", pw_protocol_native_add_resource_fd(resource, memfd), - "i", flags); - - pw_protocol_native_end_resource(resource, b); -} - static void client_node_marshal_port_use_buffers(void *object, uint32_t seq, @@ -838,6 +831,7 @@ static const struct pw_protocol_native_demarshal pw_protocol_native_client_node_ static const struct pw_client_node_proxy_events pw_protocol_native_client_node_event_marshal = { PW_VERSION_CLIENT_NODE_PROXY_EVENTS, + &client_node_marshal_add_mem, &client_node_marshal_transport, &client_node_marshal_set_param, &client_node_marshal_event_event, @@ -845,13 +839,13 @@ static const struct pw_client_node_proxy_events pw_protocol_native_client_node_e &client_node_marshal_add_port, &client_node_marshal_remove_port, &client_node_marshal_port_set_param, - &client_node_marshal_port_add_mem, &client_node_marshal_port_use_buffers, &client_node_marshal_port_command, &client_node_marshal_port_set_io, }; static const struct pw_protocol_native_demarshal pw_protocol_native_client_node_event_demarshal[] = { + { &client_node_demarshal_add_mem, PW_PROTOCOL_NATIVE_REMAP }, { &client_node_demarshal_transport, 0 }, { &client_node_demarshal_set_param, PW_PROTOCOL_NATIVE_REMAP }, { &client_node_demarshal_event_event, PW_PROTOCOL_NATIVE_REMAP }, @@ -859,7 +853,6 @@ static const struct pw_protocol_native_demarshal pw_protocol_native_client_node_ { &client_node_demarshal_add_port, 0 }, { &client_node_demarshal_remove_port, 0 }, { &client_node_demarshal_port_set_param, PW_PROTOCOL_NATIVE_REMAP }, - { &client_node_demarshal_port_add_mem, PW_PROTOCOL_NATIVE_REMAP }, { &client_node_demarshal_port_use_buffers, PW_PROTOCOL_NATIVE_REMAP }, { &client_node_demarshal_port_command, PW_PROTOCOL_NATIVE_REMAP }, { &client_node_demarshal_port_set_io, PW_PROTOCOL_NATIVE_REMAP }, diff --git a/src/pipewire/map.h b/src/pipewire/map.h index 44b76f850..2e77cdb4a 100644 --- a/src/pipewire/map.h +++ b/src/pipewire/map.h @@ -132,7 +132,7 @@ static inline bool pw_map_insert_at(struct pw_map *map, uint32_t id, void *data) return true; } -/** Remove and item at index +/** Remove an item at index * \param map the map to remove from * \param id the index to remove * \memberof pw_map diff --git a/src/pipewire/remote.c b/src/pipewire/remote.c index 7c911973a..d812e7535 100644 --- a/src/pipewire/remote.c +++ b/src/pipewire/remote.c @@ -50,6 +50,7 @@ struct mem_id { uint32_t id; int fd; uint32_t flags; + uint32_t ref; }; struct buffer_id { @@ -58,6 +59,8 @@ struct buffer_id { struct spa_buffer *buf; void *ptr; struct pw_map_range map; + uint32_t n_mem; + struct mem_id **mem; }; struct port { @@ -66,7 +69,6 @@ struct port { struct pw_port *port; - struct pw_array mem_ids; struct pw_array buffer_ids; bool in_order; }; @@ -88,6 +90,8 @@ struct node_data { struct spa_graph_node in_node; struct port *in_ports; + struct pw_array mem_ids; + struct pw_node *node; struct spa_hook node_listener; @@ -411,7 +415,7 @@ void pw_remote_disconnect(struct pw_remote *remote) pw_protocol_client_disconnect (remote->conn); spa_list_for_each_safe(proxy, t2, &remote->proxy_list, link) - pw_proxy_destroy(proxy); + pw_proxy_destroy(proxy); remote->core_proxy = NULL; pw_map_clear(&remote->objects); @@ -517,10 +521,42 @@ on_rtsocket_condition(void *user_data, int fd, enum spa_io mask) } } +static struct mem_id *find_mem(struct pw_array *mem_ids, uint32_t id) +{ + struct mem_id *mid; + + pw_array_for_each(mid, mem_ids) { + if (mid->id == id) + return mid; + } + return NULL; +} + +static void clear_memid(struct node_data *data, struct mem_id *mid) +{ + if (mid->fd != -1) { + bool has_ref = false; + int fd; + + fd = mid->fd; + mid->fd = -1; + + pw_array_for_each(mid, &data->mem_ids) { + if (mid->fd == fd) { + has_ref = true; + break; + } + } + if (!has_ref) + close(fd); + } +} + static void clean_transport(struct pw_proxy *proxy) { struct node_data *data = proxy->user_data; struct pw_port *port; + struct mem_id *mid; if (data->trans == NULL) return; @@ -536,6 +572,10 @@ static void clean_transport(struct pw_proxy *proxy) spa_graph_port_remove(&data->out_ports[port->port_id].input); } + pw_array_for_each(mid, &data->mem_ids) + clear_memid(data, mid); + pw_array_clear(&data->mem_ids); + free(data->in_ports); free(data->out_ports); pw_client_node_transport_destroy(data->trans); @@ -546,8 +586,6 @@ static void clean_transport(struct pw_proxy *proxy) static void port_init(struct port *port) { - pw_array_init(&port->mem_ids, 64); - pw_array_ensure_size(&port->mem_ids, sizeof(struct mem_id) * 64); pw_array_init(&port->buffer_ids, 32); pw_array_ensure_size(&port->buffer_ids, sizeof(struct buffer_id) * 64); port->in_order = true; @@ -567,6 +605,30 @@ static struct port *find_port(struct node_data *data, enum spa_direction directi } } +static void client_node_add_mem(void *object, + uint32_t mem_id, + uint32_t type, int memfd, uint32_t flags) +{ + struct pw_proxy *proxy = object; + struct node_data *data = proxy->user_data; + struct mem_id *m; + + m = find_mem(&data->mem_ids, mem_id); + if (m) { + pw_log_debug("update mem %u, fd %d, flags %d", + mem_id, memfd, flags); + clear_memid(data, m); + } else { + m = pw_array_add(&data->mem_ids, sizeof(struct mem_id)); + pw_log_debug("add mem %u, fd %d, flags %d", + mem_id, memfd, flags); + } + m->id = mem_id; + m->fd = memfd; + m->flags = flags; + m->ref = 0; +} + static void client_node_transport(void *object, uint32_t node_id, int readfd, int writefd, struct pw_client_node_transport *transport) @@ -836,34 +898,10 @@ client_node_port_set_param(void *object, pw_client_node_proxy_done(data->node_proxy, seq, res); } -static struct mem_id *find_mem(struct port *port, uint32_t id) -{ - struct mem_id *mid; - - pw_array_for_each(mid, &port->mem_ids) { - if (mid->id == id) - return mid; - } - return NULL; -} - -static void clear_memid(struct mem_id *mid) -{ - close(mid->fd); -} - -static void clear_mems(struct port *port) -{ - struct mem_id *mid; - - pw_array_for_each(mid, &port->mem_ids) - clear_memid(mid); - port->mem_ids.size = 0; -} - -static void clear_buffers(struct port *port) +static void clear_buffers(struct node_data *data, struct port *port) { struct buffer_id *bid; + int i; pw_log_debug("port %p: clear buffers", port); @@ -872,6 +910,14 @@ static void clear_buffers(struct port *port) if (munmap(bid->ptr, bid->map.size) < 0) pw_log_warn("failed to unmap: %m"); } + if (bid->mem != NULL) { + for (i = 0; i < bid->n_mem; i++) { + if (--bid->mem[i]->ref == 0) + clear_memid(data, bid->mem[i]); + } + bid->mem = NULL; + bid->n_mem = 0; + } bid->ptr = NULL; free(bid->buf); bid->buf = NULL; @@ -879,40 +925,6 @@ static void clear_buffers(struct port *port) port->buffer_ids.size = 0; } -static void clear_port(struct port *port) -{ - clear_buffers(port); - clear_mems(port); - pw_array_clear(&port->mem_ids); - pw_array_clear(&port->buffer_ids); -} - -static void -client_node_port_add_mem(void *object, - enum spa_direction direction, uint32_t port_id, - uint32_t mem_id, - uint32_t type, int memfd, uint32_t flags) -{ - struct pw_proxy *proxy = object; - struct node_data *data = proxy->user_data; - struct mem_id *m; - struct port *port = find_port(data, direction, port_id); - - m = find_mem(port, mem_id); - if (m) { - pw_log_debug("update mem %u, fd %d, flags %d", - mem_id, memfd, flags); - clear_memid(m); - } else { - m = pw_array_add(&port->mem_ids, sizeof(struct mem_id)); - pw_log_debug("add mem %u, fd %d, flags %d", - mem_id, memfd, flags); - } - m->id = mem_id; - m->fd = memfd; - m->flags = flags; -} - static void client_node_port_use_buffers(void *object, uint32_t seq, @@ -940,14 +952,14 @@ client_node_port_use_buffers(void *object, prot = PROT_READ | (direction == SPA_DIRECTION_OUTPUT ? PROT_WRITE : 0); /* clear previous buffers */ - clear_buffers(port); + clear_buffers(data, port); bufs = alloca(n_buffers * sizeof(struct spa_buffer *)); for (i = 0; i < n_buffers; i++) { off_t offset; - struct mem_id *mid = find_mem(port, buffers[i].mem_id); + struct mem_id *mid = find_mem(&data->mem_ids, buffers[i].mem_id); if (mid == NULL) { pw_log_warn("unknown memory id %u", buffers[i].mem_id); continue; @@ -975,18 +987,26 @@ client_node_port_use_buffers(void *object, size_t size; size = sizeof(struct spa_buffer); + size += sizeof(struct mem_id *); for (j = 0; j < buffers[i].buffer->n_metas; j++) size += sizeof(struct spa_meta); - for (j = 0; j < buffers[i].buffer->n_datas; j++) + for (j = 0; j < buffers[i].buffer->n_datas; j++) { size += sizeof(struct spa_data); + size += sizeof(struct mem_id *); + } b = bid->buf = malloc(size); memcpy(b, buffers[i].buffer, sizeof(struct spa_buffer)); b->metas = SPA_MEMBER(b, sizeof(struct spa_buffer), struct spa_meta); - b->datas = - SPA_MEMBER(b->metas, sizeof(struct spa_meta) * b->n_metas, + b->datas = SPA_MEMBER(b->metas, sizeof(struct spa_meta) * b->n_metas, struct spa_data); + bid->mem = SPA_MEMBER(b->datas, sizeof(struct spa_data) * b->n_datas, + struct mem_id*); + bid->n_mem = 0; + + mid->ref++; + bid->mem[bid->n_mem++] = mid; } bid->id = b->id; @@ -1012,10 +1032,13 @@ client_node_port_use_buffers(void *object, struct spa_chunk); if (d->type == t->data.MemFd || d->type == t->data.DmaBuf) { - struct mem_id *bmid = find_mem(port, SPA_PTR_TO_UINT32(d->data)); + struct mem_id *bmid = find_mem(&data->mem_ids, + SPA_PTR_TO_UINT32(d->data)); d->data = NULL; d->fd = bmid->fd; + bmid->ref++; + bid->mem[bid->n_mem++] = bmid; pw_log_debug(" data %d %u -> fd %d", j, bmid->id, bmid->fd); } else if (d->type == t->data.MemPtr) { d->data = SPA_MEMBER(bid->ptr, @@ -1031,9 +1054,6 @@ client_node_port_use_buffers(void *object, res = pw_port_use_buffers(port->port, bufs, n_buffers); - if (n_buffers == 0) - clear_mems(port); - done: pw_client_node_proxy_done(data->node_proxy, seq, res); @@ -1078,7 +1098,7 @@ client_node_port_set_io(void *object, if (port == NULL) return; - mid = find_mem(port, memid); + mid = find_mem(&data->mem_ids, memid); if (mid == NULL) { pw_log_warn("unknown memory id %u", memid); return; @@ -1097,12 +1117,12 @@ client_node_port_set_io(void *object, id, SPA_MEMBER(ptr, r.start, void), size); - } static const struct pw_client_node_proxy_events client_node_events = { PW_VERSION_CLIENT_NODE_PROXY_EVENTS, + .add_mem = client_node_add_mem, .transport = client_node_transport, .set_param = client_node_set_param, .event = client_node_event, @@ -1110,7 +1130,6 @@ static const struct pw_client_node_proxy_events client_node_events = { .add_port = client_node_add_port, .remove_port = client_node_remove_port, .port_set_param = client_node_port_set_param, - .port_add_mem = client_node_port_add_mem, .port_use_buffers = client_node_port_use_buffers, .port_command = client_node_port_command, .port_set_io = client_node_port_set_io, @@ -1166,6 +1185,12 @@ static const struct pw_node_events node_events = { .have_output = node_have_output, }; +static void clear_port(struct node_data *data, struct port *port) +{ + clear_buffers(data, port); + pw_array_clear(&port->buffer_ids); +} + static void node_proxy_destroy(void *_data) { struct node_data *data = _data; @@ -1174,9 +1199,9 @@ static void node_proxy_destroy(void *_data) if (data->trans) { for (i = 0; i < data->trans->area->max_input_ports; i++) - clear_port(&data->in_ports[i]); + clear_port(data, &data->in_ports[i]); for (i = 0; i < data->trans->area->max_output_ports; i++) - clear_port(&data->out_ports[i]); + clear_port(data, &data->out_ports[i]); } clean_transport(proxy); @@ -1243,6 +1268,9 @@ struct pw_proxy *pw_remote_export(struct pw_remote *remote, data->in_node_impl = node_impl; data->out_node_impl = node_impl; + pw_array_init(&data->mem_ids, 64); + pw_array_ensure_size(&data->mem_ids, sizeof(struct mem_id) * 64); + spa_graph_node_init(&data->in_node); spa_graph_node_set_implementation(&data->in_node, &data->in_node_impl); spa_graph_node_init(&data->out_node); diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 78c661966..0ac755d51 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -46,6 +46,7 @@ struct mem_id { uint32_t id; int fd; uint32_t flags; + uint32_t ref; }; struct buffer_id { @@ -55,6 +56,8 @@ struct buffer_id { struct spa_buffer *buf; void *ptr; struct pw_map_range map; + uint32_t n_mem; + struct mem_id **mem; }; struct stream { @@ -131,7 +134,7 @@ static void clear_mems(struct pw_stream *stream) struct mem_id *mid; pw_array_for_each(mid, &impl->mem_ids) - clear_memid(impl, mid); + clear_memid(impl, mid); impl->mem_ids.size = 0; } @@ -793,10 +796,9 @@ client_node_port_set_param(void *data, } static void -client_node_port_add_mem(void *data, - enum spa_direction direction, uint32_t port_id, - uint32_t mem_id, - uint32_t type, int memfd, uint32_t flags) +client_node_add_mem(void *data, + uint32_t mem_id, + uint32_t type, int memfd, uint32_t flags) { struct stream *impl = data; struct pw_stream *stream = &impl->this; @@ -871,18 +873,26 @@ client_node_port_use_buffers(void *data, size_t size; size = sizeof(struct spa_buffer); + size += sizeof(struct mem_id *); for (j = 0; j < buffers[i].buffer->n_metas; j++) size += sizeof(struct spa_meta); - for (j = 0; j < buffers[i].buffer->n_datas; j++) + for (j = 0; j < buffers[i].buffer->n_datas; j++) { size += sizeof(struct spa_data); + size += sizeof(struct mem_id *); + } b = bid->buf = malloc(size); memcpy(b, buffers[i].buffer, sizeof(struct spa_buffer)); b->metas = SPA_MEMBER(b, sizeof(struct spa_buffer), struct spa_meta); - b->datas = - SPA_MEMBER(b->metas, sizeof(struct spa_meta) * b->n_metas, + b->datas = SPA_MEMBER(b->metas, sizeof(struct spa_meta) * b->n_metas, struct spa_data); + bid->mem = SPA_MEMBER(b->datas, sizeof(struct spa_data) * b->n_datas, + struct mem_id*); + bid->n_mem = 0; + + mid->ref++; + bid->mem[bid->n_mem++] = mid; } bid->id = b->id; @@ -913,6 +923,8 @@ client_node_port_use_buffers(void *data, struct mem_id *bmid = find_mem(stream, SPA_PTR_TO_UINT32(d->data)); d->data = NULL; d->fd = bmid->fd; + bmid->ref++; + bid->mem[bid->n_mem++] = bmid; pw_log_debug(" data %d %u -> fd %d", j, bmid->id, bmid->fd); } else if (d->type == t->data.MemPtr) { d->data = SPA_MEMBER(bid->ptr, @@ -967,6 +979,7 @@ static void client_node_transport(void *data, uint32_t node_id, static const struct pw_client_node_proxy_events client_node_events = { PW_VERSION_CLIENT_NODE_PROXY_EVENTS, + .add_mem = client_node_add_mem, .transport = client_node_transport, .set_param = client_node_set_param, .event = client_node_event, @@ -974,7 +987,6 @@ static const struct pw_client_node_proxy_events client_node_events = { .add_port = client_node_add_port, .remove_port = client_node_remove_port, .port_set_param = client_node_port_set_param, - .port_add_mem = client_node_port_add_mem, .port_use_buffers = client_node_port_use_buffers, .port_command = client_node_port_command, };