diff --git a/pipewire-jack b/pipewire-jack index 135b72ce7..3a9035a44 160000 --- a/pipewire-jack +++ b/pipewire-jack @@ -1 +1 @@ -Subproject commit 135b72ce70b5da10424f664edcb531bf530618b1 +Subproject commit 3a9035a44cb1927ac1b20937d3d1e9c6e32ffe5c diff --git a/spa/include/spa/buffer/buffer.h b/spa/include/spa/buffer/buffer.h index 1a123ba8e..db52bc56d 100644 --- a/spa/include/spa/buffer/buffer.h +++ b/spa/include/spa/buffer/buffer.h @@ -48,7 +48,7 @@ enum spa_data_type { SPA_DATA_LAST, /**< not part of ABI */ }; -/** Chunk of memory */ +/** Chunk of memory, can change for each buffer */ struct spa_chunk { uint32_t offset; /**< offset of valid data. Should be taken * modulo the data maxsize to get the offset @@ -56,15 +56,19 @@ struct spa_chunk { uint32_t size; /**< size of valid data. Should be clamped to * maxsize. */ int32_t stride; /**< stride of valid data */ - int32_t dummy; /**< dummy field for alignment */ +#define SPA_CHUNK_FLAG_NONE 0 +#define SPA_CHUNK_FLAG_CORRUPTED (1u<<0) /**< chunk data is corrupted in some way */ + int32_t flags; /**< chunk flags */ }; -/** Data for a buffer */ +/** Data for a buffer this stays constant for a buffer */ struct spa_data { uint32_t type; /**< memory type, one of enum spa_data_type */ #define SPA_DATA_FLAG_NONE 0 -#define SPA_DATA_FLAG_CORRUPTED (1u<<0) /**< data is corrupted in some way */ -#define SPA_DATA_FLAG_DYNAMIC (1u<<1) /**< data pointer can be changed */ +#define SPA_DATA_FLAG_READABLE (1u<<0) /**< data is readable */ +#define SPA_DATA_FLAG_WRITABLE (1u<<1) /**< data is writable */ +#define SPA_DATA_FLAG_DYNAMIC (1u<<2) /**< data pointer can be changed */ +#define SPA_DATA_FLAG_READWRITE (SPA_DATA_FLAG_READABLE|SPA_DATA_FLAG_WRITABLE) uint32_t flags; /**< data flags */ int64_t fd; /**< optional fd for data */ uint32_t mapoffset; /**< offset to map fd at */ diff --git a/spa/plugins/v4l2/v4l2-utils.c b/spa/plugins/v4l2/v4l2-utils.c index 166dac451..8811592d0 100644 --- a/spa/plugins/v4l2/v4l2-utils.c +++ b/spa/plugins/v4l2/v4l2-utils.c @@ -1202,6 +1202,9 @@ static int mmap_read(struct impl *this) d[0].chunk->offset = 0; d[0].chunk->size = buf.bytesused; d[0].chunk->stride = port->fmt.fmt.pix.bytesperline; + d[0].chunk->flags = 0; + if (buf.flags & V4L2_BUF_FLAG_ERROR) + d[0].flags |= SPA_CHUNK_FLAG_CORRUPTED; SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUTSTANDING); @@ -1399,6 +1402,7 @@ mmap_init(struct impl *this, d[0].chunk->offset = 0; d[0].chunk->size = 0; d[0].chunk->stride = port->fmt.fmt.pix.bytesperline; + d[0].chunk->flags = 0; if (port->export_buf) { struct v4l2_exportbuffer expbuf; @@ -1412,11 +1416,13 @@ mmap_init(struct impl *this, continue; } d[0].type = SPA_DATA_DmaBuf; + d[0].flags = SPA_DATA_FLAG_READABLE; d[0].fd = expbuf.fd; d[0].data = NULL; SPA_FLAG_SET(b->flags, BUFFER_FLAG_ALLOCATED); } else { d[0].type = SPA_DATA_MemPtr; + d[0].flags = SPA_DATA_FLAG_READABLE; d[0].fd = -1; d[0].data = mmap(NULL, b->v4l2_buffer.length, diff --git a/src/extensions/client-node.h b/src/extensions/client-node.h index 9fe254f87..05d8c8b9a 100644 --- a/src/extensions/client-node.h +++ b/src/extensions/client-node.h @@ -71,11 +71,17 @@ struct pw_client_node_proxy_events { * \param node_id the node id created for this client node * \param readfd fd for signal data can be read * \param writefd fd for signal data can be written + * \param mem_id id for activation memory + * \param offset offset of activation memory + * \param size size of activation memory */ int (*transport) (void *object, uint32_t node_id, int readfd, - int writefd); + int writefd, + uint32_t mem_id, + uint32_t offset, + uint32_t size); /** * Notify of a property change * diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index 95c2d8445..2524fd5df 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -120,6 +120,7 @@ struct node { struct spa_callbacks callbacks; struct pw_resource *resource; + struct pw_client *client; struct spa_source data_source; int writefd; @@ -164,8 +165,6 @@ struct impl { #define pw_client_node_resource(r,m,v,...) \ pw_resource_call_res(r,struct pw_client_node_proxy_events,m,v,__VA_ARGS__) -#define pw_client_node_resource_add_mem(r,...) \ - pw_client_node_resource(r,add_mem,0,__VA_ARGS__) #define pw_client_node_resource_transport(r,...) \ pw_client_node_resource(r,transport,0,__VA_ARGS__) #define pw_client_node_resource_set_param(r,...) \ @@ -249,7 +248,7 @@ static int clear_buffers(struct node *this, struct mix *mix) uint32_t id; id = SPA_PTR_TO_UINT32(b->buffer.datas[j].data); - m = pw_mempool_find_id(this->resource->client->pool, id); + m = pw_mempool_find_id(this->client->pool, id); if (m) { pw_log_debug(NAME " %p: mem %d", impl, m->id); pw_memblock_unref(m); @@ -352,7 +351,7 @@ static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size) if (mem_offset + size > mem->map->size) return -EINVAL; - m = pw_mempool_import_block(this->resource->client->pool, mem); + m = pw_mempool_import_block(this->client->pool, mem); if (m == NULL) return -errno; @@ -658,7 +657,7 @@ static int do_port_set_io(struct impl *impl, if (mem_offset + size > mem->map->size) return -EINVAL; - m = pw_mempool_import_block(this->resource->client->pool, mem); + m = pw_mempool_import_block(this->client->pool, mem); if (m == NULL) return -errno; @@ -762,7 +761,7 @@ do_port_use_buffers(struct impl *impl, data_size += d->maxsize; } - m = pw_mempool_import_block(this->resource->client->pool, mem); + m = pw_mempool_import_block(this->client->pool, mem); if (m == NULL) return -errno; @@ -786,10 +785,18 @@ do_port_use_buffers(struct impl *impl, if (d->type == SPA_DATA_DmaBuf || d->type == SPA_DATA_MemFd) { - m = pw_mempool_import(this->resource->client->pool, - d->type, d->fd, d->flags); + uint32_t flags = PW_MEMBLOCK_FLAG_DONT_CLOSE; + + if (d->flags & SPA_DATA_FLAG_READABLE) + flags |= PW_MEMBLOCK_FLAG_READABLE; + if (d->flags & SPA_DATA_FLAG_WRITABLE) + flags |= PW_MEMBLOCK_FLAG_WRITABLE; + + m = pw_mempool_import(this->client->pool, + flags, d->type, d->fd); if (m == NULL) return -errno; + b->buffer.datas[j].type = SPA_DATA_MemId; b->buffer.datas[j].data = SPA_UINT32_TO_PTR(m->id); } else if (d->type == SPA_DATA_MemPtr) { @@ -896,7 +903,7 @@ client_node_get_node(void *data, impl->bind_node_version = version; impl->bind_node_id = new_id; - pw_map_insert_at(&this->resource->client->objects, new_id, NULL); + pw_map_insert_at(&this->client->objects, new_id, NULL); return NULL; } @@ -1160,19 +1167,15 @@ void pw_client_node_registered(struct pw_client_node *this, struct pw_global *gl struct pw_memblock *m; pw_log_debug(NAME " %p: %d", this, node_id); - pw_client_node_resource_transport(this->resource, - node_id, - impl->other_fds[0], - impl->other_fds[1]); - m = pw_mempool_import_block(this->resource->client->pool, node->activation); if (m == NULL) { pw_log_debug(NAME " %p: can't import block: %m", this); return; } - pw_client_node_resource_set_activation(this->resource, + pw_client_node_resource_transport(this->resource, node_id, + impl->other_fds[0], impl->other_fds[1], m->id, 0, @@ -1208,11 +1211,12 @@ static void node_initialized(void *data) size = sizeof(struct spa_io_buffers) * MAX_AREAS; - if (pw_mempool_alloc(impl->core->pool, - PW_MEMBLOCK_FLAG_READWRITE | - PW_MEMBLOCK_FLAG_MAP | - PW_MEMBLOCK_FLAG_SEAL, - size, &impl->io_areas) < 0) + impl->io_areas = pw_mempool_alloc(impl->core->pool, + PW_MEMBLOCK_FLAG_READWRITE | + PW_MEMBLOCK_FLAG_MAP | + PW_MEMBLOCK_FLAG_SEAL, + SPA_DATA_MemFd, size); + if (impl->io_areas == NULL) return; pw_log_debug(NAME " %p: io areas %p", node, impl->io_areas->map->ptr); @@ -1475,7 +1479,7 @@ static void node_peer_added(void *data, struct pw_node *peer) if (this->resource == NULL) return; - m = pw_mempool_import_block(this->resource->client->pool, peer->activation); + m = pw_mempool_import_block(this->client->pool, peer->activation); if (m == NULL) { pw_log_debug(NAME " %p: can't ensure mem: %m", this); return; @@ -1500,7 +1504,7 @@ static void node_peer_removed(void *data, struct pw_node *peer) if (this->resource == NULL) return; - m = pw_mempool_find_fd(this->resource->client->pool, + m = pw_mempool_find_fd(this->client->pool, peer->activation->fd); if (m == NULL) { pw_log_warn(NAME " %p: unknown peer %p fd:%d", &impl->this, peer, @@ -1596,6 +1600,7 @@ struct pw_client_node *pw_client_node_new(struct pw_resource *resource, node_init(&impl->node, NULL, support, n_support); impl->node.impl = impl; impl->node.resource = resource; + impl->node.client = client; this->flags = do_register ? 0 : 1; pw_map_init(&impl->io_map, 64, 64); @@ -1606,7 +1611,7 @@ struct pw_client_node *pw_client_node_new(struct pw_resource *resource, this->resource = resource; this->parent = parent; this->node = pw_spa_node_new(core, - pw_resource_get_client(this->resource), + client, parent, name, PW_SPA_NODE_FLAG_ASYNC | diff --git a/src/modules/module-client-node/protocol-native.c b/src/modules/module-client-node/protocol-native.c index 89ca7b052..5d84bc509 100644 --- a/src/modules/module-client-node/protocol-native.c +++ b/src/modules/module-client-node/protocol-native.c @@ -266,14 +266,17 @@ static int client_node_demarshal_transport(void *object, const struct pw_protoco { struct pw_proxy *proxy = object; struct spa_pod_parser prs; - uint32_t node_id, ridx, widx; + uint32_t node_id, ridx, widx, mem_id, offset, sz; int readfd, writefd; spa_pod_parser_init(&prs, msg->data, msg->size); if (spa_pod_parser_get_struct(&prs, SPA_POD_Int(&node_id), SPA_POD_Int(&ridx), - SPA_POD_Int(&widx)) < 0) + SPA_POD_Int(&widx), + SPA_POD_Int(&mem_id), + SPA_POD_Int(&offset), + SPA_POD_Int(&sz)) < 0) return -EINVAL; readfd = pw_protocol_native_get_proxy_fd(proxy, ridx); @@ -283,7 +286,8 @@ static int client_node_demarshal_transport(void *object, const struct pw_protoco return -EINVAL; pw_proxy_notify(proxy, struct pw_client_node_proxy_events, transport, 0, node_id, - readfd, writefd); + readfd, writefd, mem_id, + offset, sz); return 0; } @@ -540,7 +544,8 @@ static int client_node_demarshal_set_io(void *object, const struct pw_protocol_n return 0; } -static int client_node_marshal_transport(void *object, uint32_t node_id, int readfd, int writefd) +static int client_node_marshal_transport(void *object, uint32_t node_id, int readfd, int writefd, + uint32_t mem_id, uint32_t offset, uint32_t size) { struct pw_protocol_native_message *msg; struct pw_resource *resource = object; @@ -551,7 +556,10 @@ static int client_node_marshal_transport(void *object, uint32_t node_id, int rea spa_pod_builder_add_struct(b, SPA_POD_Int(node_id), SPA_POD_Int(pw_protocol_native_add_resource_fd(resource, readfd)), - SPA_POD_Int(pw_protocol_native_add_resource_fd(resource, writefd))); + SPA_POD_Int(pw_protocol_native_add_resource_fd(resource, writefd)), + SPA_POD_Int(mem_id), + SPA_POD_Int(offset), + SPA_POD_Int(size)); return pw_protocol_native_end_resource(resource, b); } diff --git a/src/modules/module-client-node/remote-node.c b/src/modules/module-client-node/remote-node.c index 0f1532b23..4e06bd3f2 100644 --- a/src/modules/module-client-node/remote-node.c +++ b/src/modules/module-client-node/remote-node.c @@ -51,11 +51,6 @@ struct buffer { struct pw_memmap *mem; }; -struct io { - uint32_t id; - struct pw_memmap *mem; -}; - struct mix { struct spa_list link; struct pw_port *port; @@ -63,7 +58,6 @@ struct mix { struct pw_port_mix mix; struct pw_array buffers; bool active; - struct io ios[MAX_IO]; }; struct link { @@ -79,6 +73,7 @@ struct node_data { uint32_t remote_id; int rtwritefd; + struct pw_memmap *activation; struct mix mix_pool[MAX_MIX]; struct spa_list mix[2]; @@ -94,7 +89,6 @@ struct node_data { struct spa_hook proxy_listener; struct pw_proxy *proxy; - struct io ios[MAX_IO]; struct spa_io_position *position; struct pw_array links; @@ -102,50 +96,6 @@ struct node_data { /** \endcond */ -static void init_ios(struct io *ios) -{ - int i; - for (i = 0; i < MAX_IO; i++) - ios[i].id = SPA_ID_INVALID; -} - -static void clear_io(struct io *io) -{ - pw_log_debug("%p clear id:%u mem:%p", io, io->id, io->mem); - pw_memmap_free(io->mem); - io->mem = NULL; - io->id = SPA_ID_INVALID; -} - -static struct io *update_io(struct node_data *data, struct io *ios, - uint32_t id, struct pw_memmap *mem) -{ - int i; - struct io *io, *f = NULL; - - pw_log_debug("node %p: update id:%u mem:%p", data, id, mem); - - for (i = 0; i < MAX_IO; i++) { - io = &ios[i]; - if (io->id == SPA_ID_INVALID && f == NULL) - f = io; - else if (io->id == id) { - if (io->mem && io->mem != mem) - clear_io(io); - f = io; - break; - } - } - if (f == NULL) - return NULL; - - io = f; - io->id = id; - io->mem = mem; - - return io; -} - static struct link *find_activation(struct pw_array *links, uint32_t node_id) { struct link *l; @@ -179,6 +129,7 @@ static void clean_transport(struct node_data *data) } pw_array_clear(&data->links); + pw_memmap_free(data->activation); close(data->rtwritefd); data->remote_id = SPA_ID_INVALID; data->have_transport = false; @@ -192,7 +143,6 @@ static void mix_init(struct mix *mix, struct pw_port *port, uint32_t mix_id) mix->active = false; pw_array_init(&mix->buffers, 32); pw_array_ensure_size(&mix->buffers, sizeof(struct buffer) * 64); - init_ios(mix->ios); } static int @@ -278,7 +228,7 @@ static struct mix *ensure_mix(struct node_data *data, static int client_node_transport(void *object, uint32_t node_id, - int readfd, int writefd) + int readfd, int writefd, uint32_t mem_id, uint32_t offset, uint32_t size) { struct pw_proxy *proxy = object; struct node_data *data = proxy->user_data; @@ -286,16 +236,25 @@ static int client_node_transport(void *object, uint32_t node_id, clean_transport(data); - data->have_transport = true; - data->remote_id = node_id; + data->activation = pw_mempool_map_id(proxy->remote->pool, mem_id, + PW_MEMMAP_FLAG_READWRITE, offset, size, NULL); + if (data->activation == NULL) { + pw_log_debug("remote-node %p: can't map activation: %m", proxy); + return -errno; + } - pw_log_debug("remote-node %p: create transport with fds %d %d for node %u", - proxy, readfd, writefd, node_id); + data->remote_id = node_id; + data->node->rt.activation = data->activation->ptr; + + pw_log_debug("remote-node %p: fds:%d %d node:%u activation:%p", + proxy, readfd, writefd, node_id, data->activation->ptr); data->rtwritefd = writefd; close(data->node->source.fd); data->node->source.fd = readfd; + data->have_transport = true; + if (data->node->active) pw_client_node_proxy_set_active(data->node_proxy, true); @@ -441,14 +400,17 @@ client_node_set_io(void *object, struct node_data *data = proxy->user_data; struct pw_memmap *mm; void *ptr; + uint32_t tag[5] = { data->remote_id, id, }; if (memid == SPA_ID_INVALID) { - size = 0; + if ((mm = pw_mempool_find_tag(proxy->remote->pool, tag)) != NULL) + pw_memmap_free(mm); mm = ptr = NULL; + size = 0; } else { mm = pw_mempool_map_id(proxy->remote->pool, memid, - PROT_READ|PROT_WRITE, offset, size); + PW_MEMMAP_FLAG_READWRITE, offset, size, tag); if (mm == NULL) { pw_log_warn("can't map memory id %u: %m", memid); return -errno; @@ -459,8 +421,6 @@ client_node_set_io(void *object, pw_log_debug("node %p: set io %s %p", proxy, spa_debug_type_find_name(spa_type_io, id), ptr); - update_io(data, data->ios, id, mm); - switch (id) { case SPA_IO_Position: data->position = ptr; @@ -535,7 +495,7 @@ static int clear_buffers(struct node_data *data, struct mix *mix) struct buffer *b; int res; - pw_log_debug("port %p: clear buffers %d", port, mix->mix_id); + pw_log_debug("port %p: clear buffers mix:%d", port, mix->mix_id); if ((res = pw_port_use_buffers(port, mix->mix_id, NULL, 0)) < 0) { pw_log_error("port %p: error clear buffers %s", port, spa_strerror(res)); return res; @@ -609,7 +569,7 @@ client_node_port_use_buffers(void *object, goto error_exit; } - prot = PROT_READ | (direction == SPA_DIRECTION_OUTPUT ? PROT_WRITE : 0); + prot = PW_MEMMAP_FLAG_READ | (direction == SPA_DIRECTION_OUTPUT ? PW_MEMMAP_FLAG_WRITE : 0); /* clear previous buffers */ clear_buffers(data, mix); @@ -622,7 +582,7 @@ client_node_port_use_buffers(void *object, struct pw_memmap *mm; mm = pw_mempool_map_id(proxy->remote->pool, buffers[i].mem_id, - prot, buffers[i].offset, buffers[i].size); + prot, buffers[i].offset, buffers[i].size, NULL); if (mm == NULL) { res = -errno; goto error_exit_cleanup; @@ -735,6 +695,7 @@ client_node_port_set_io(void *object, struct pw_memmap *mm; void *ptr; int res = 0; + uint32_t tag[5] = { data->remote_id, direction, port_id, mix_id, id }; mix = ensure_mix(data, direction, port_id, mix_id); if (mix == NULL) { @@ -743,12 +704,15 @@ client_node_port_set_io(void *object, } if (memid == SPA_ID_INVALID) { + if ((mm = pw_mempool_find_tag(proxy->remote->pool, tag)) != NULL) + pw_memmap_free(mm); + mm = ptr = NULL; size = 0; } else { mm = pw_mempool_map_id(proxy->remote->pool, memid, - PROT_READ|PROT_WRITE, offset, size); + PW_MEMMAP_FLAG_READWRITE, offset, size, tag); if (mm == NULL) { res = -errno; goto error_exit; @@ -759,8 +723,6 @@ client_node_port_set_io(void *object, pw_log_debug("port %p: set io:%s new:%p old:%p", mix->port, spa_debug_type_find_name(spa_type_io, id), ptr, mix->mix.io); - update_io(data, mix->ios, id, mm); - if (id == SPA_IO_Buffers) { if (ptr == NULL && mix->mix.io) deactivate_mix(data, mix); @@ -799,6 +761,7 @@ static int link_signal_func(void *user_data) if (write(link->signalfd, &cmd, sizeof(cmd)) != sizeof(cmd)) pw_log_warn("link %p: write failed %m", link); + return 0; } @@ -825,7 +788,7 @@ client_node_set_activation(void *object, } else { mm = pw_mempool_map_id(proxy->remote->pool, memid, - PROT_READ|PROT_WRITE, offset, size); + PW_MEMMAP_FLAG_READWRITE, offset, size, NULL); if (mm == NULL) { res = -errno; goto error_exit; @@ -837,6 +800,8 @@ client_node_set_activation(void *object, if (data->remote_id == node_id) { pw_log_debug("node %p: our activation %u: %u %u %u %p", node, node_id, memid, offset, size, ptr); + if (mm) + pw_memmap_free(mm); close(signalfd); return 0; } @@ -1098,7 +1063,6 @@ static struct pw_proxy *node_export(struct pw_remote *remote, void *object, bool data->core = pw_node_get_core(node); data->node_proxy = (struct pw_client_node_proxy *)proxy; data->remote_id = SPA_ID_INVALID; - init_ios(data->ios); node->exported = true; diff --git a/src/pipewire/control.c b/src/pipewire/control.c index 9b0568f4a..db83b636e 100644 --- a/src/pipewire/control.c +++ b/src/pipewire/control.c @@ -183,12 +183,15 @@ int pw_control_add_link(struct pw_control *control, uint32_t cmix, size = SPA_MAX(control->size, other->size); if (impl->mem == NULL) { - if ((res = pw_mempool_alloc(control->core->pool, + impl->mem = pw_mempool_alloc(control->core->pool, PW_MEMBLOCK_FLAG_READWRITE | PW_MEMBLOCK_FLAG_SEAL | PW_MEMBLOCK_FLAG_MAP, - size, &impl->mem)) < 0) + SPA_DATA_MemFd, size); + if (impl->mem == NULL) { + res = -errno; goto exit; + } } if (spa_list_is_empty(&control->links)) { diff --git a/src/pipewire/link.c b/src/pipewire/link.c index accada940..515b5b0ca 100644 --- a/src/pipewire/link.c +++ b/src/pipewire/link.c @@ -388,7 +388,6 @@ static int alloc_buffers(struct pw_link *this, uint32_t flags, struct allocation *allocation) { - int res; struct spa_buffer **buffers, *bp; uint32_t i; uint32_t n_metas; @@ -428,6 +427,7 @@ static int alloc_buffers(struct pw_link *this, if (data_sizes[i] > 0) { d->type = SPA_DATA_MemPtr; d->maxsize = data_sizes[i]; + SPA_FLAG_SET(d->flags, SPA_DATA_FLAG_READWRITE); } else { d->type = SPA_ID_INVALID; d->maxsize = 0; @@ -445,12 +445,14 @@ static int alloc_buffers(struct pw_link *this, /* pointer to buffer structures */ bp = SPA_MEMBER(buffers, n_buffers * sizeof(struct spa_buffer *), struct spa_buffer); - if ((res = pw_mempool_alloc(this->core->pool, - PW_MEMBLOCK_FLAG_READWRITE | - PW_MEMBLOCK_FLAG_SEAL | - PW_MEMBLOCK_FLAG_MAP, - n_buffers * info.mem_size, &m)) < 0) - return res; + m = pw_mempool_alloc(this->core->pool, + PW_MEMBLOCK_FLAG_READWRITE | + PW_MEMBLOCK_FLAG_SEAL | + PW_MEMBLOCK_FLAG_MAP, + SPA_DATA_MemFd, + n_buffers * info.mem_size); + if (m == NULL) + return -errno; pw_log_debug("layout buffers %p data %p", bp, m->map->ptr); spa_buffer_alloc_layout_array(&info, n_buffers, buffers, bp, m->map->ptr); @@ -756,9 +758,13 @@ do_activate_link(struct spa_loop *loop, spa_list_append(&this->input->rt.mix_list, &this->rt.in_mix.rt_link); if (impl->inode != impl->onode) { + uint32_t required; + this->rt.target.activation = impl->inode->rt.activation; spa_list_append(&impl->onode->rt.target_list, &this->rt.target.link); - this->rt.target.activation->state[0].required++; + required = ++this->rt.target.activation->state[0].required; + pw_log_trace("link %p: node:%p required:%d", this, + impl->inode, required); } return 0; } @@ -965,6 +971,7 @@ do_deactivate_link(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) { struct pw_link *this = user_data; + struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); pw_log_trace("link %p: disable %p and %p", this, &this->rt.in_mix, &this->rt.out_mix); @@ -972,8 +979,12 @@ do_deactivate_link(struct spa_loop *loop, spa_list_remove(&this->rt.in_mix.rt_link); if (this->input->node != this->output->node) { + uint32_t required; + spa_list_remove(&this->rt.target.link); - this->rt.target.activation->state[0].required--; + required = --this->rt.target.activation->state[0].required; + pw_log_trace("link %p: node:%p required:%d", this, + impl->inode, required); } return 0; diff --git a/src/pipewire/mem.c b/src/pipewire/mem.c index 8728b7416..ccf07d158 100644 --- a/src/pipewire/mem.c +++ b/src/pipewire/mem.c @@ -272,8 +272,8 @@ static struct mapping * memblock_map(struct memblock *b, ptr = mmap(NULL, size, prot, MAP_SHARED, b->this.fd, offset); if (ptr == MAP_FAILED) { - pw_log_error("pool %p: Failed to mmap memory %d size:%d: %m", - p, b->this.fd, size); + pw_log_error("pool %p: Failed to mmap memory fd:%d offset:%u size:%u: %m", + p, b->this.fd, offset, size); return NULL; } @@ -300,7 +300,7 @@ static void mapping_unmap(struct mapping *m) struct memblock *b = m->block; struct mempool *p = SPA_CONTAINER_OF(b->this.pool, struct mempool, this); - pw_log_debug("pool %p: map:%p fd:%d ptr:%p size:%d", p, m, b->this.fd, m->ptr, m->size); + pw_log_debug("pool %p: mapping:%p fd:%d ptr:%p size:%d", p, m, b->this.fd, m->ptr, m->size); munmap(m->ptr, m->size); spa_list_remove(&m->link); @@ -311,7 +311,7 @@ static void mapping_unmap(struct mapping *m) SPA_EXPORT struct pw_memmap * pw_memblock_map(struct pw_memblock *block, - enum pw_memmap_flags flags, uint32_t offset, uint32_t size) + enum pw_memmap_flags flags, uint32_t offset, uint32_t size, uint32_t tag[5]) { struct memblock *b = SPA_CONTAINER_OF(block, struct memblock, this); struct mempool *p = SPA_CONTAINER_OF(block->pool, struct mempool, this); @@ -341,6 +341,8 @@ struct pw_memmap * pw_memblock_map(struct pw_memblock *block, mm->this.offset = offset; mm->this.size = size; mm->this.ptr = SPA_MEMBER(m->ptr, range.start, void); + if (tag) + memcpy(mm->this.tag, tag, sizeof(mm->this.tag)); spa_list_append(&b->maps, &mm->link); @@ -352,7 +354,7 @@ struct pw_memmap * pw_memblock_map(struct pw_memblock *block, SPA_EXPORT struct pw_memmap * pw_mempool_map_id(struct pw_mempool *pool, - uint32_t id, enum pw_memmap_flags flags, uint32_t offset, uint32_t size) + uint32_t id, enum pw_memmap_flags flags, uint32_t offset, uint32_t size, uint32_t tag[5]) { struct mempool *impl = SPA_CONTAINER_OF(pool, struct mempool, this); struct memblock *b; @@ -362,7 +364,7 @@ struct pw_memmap * pw_mempool_map_id(struct pw_mempool *pool, errno = -ENOENT; return NULL; } - return pw_memblock_map(&b->this, flags, offset, size); + return pw_memblock_map(&b->this, flags, offset, size, tag); } SPA_EXPORT @@ -373,7 +375,7 @@ int pw_memmap_free(struct pw_memmap *map) struct memblock *b = m->block; struct mempool *p = SPA_CONTAINER_OF(b->this.pool, struct mempool, this); - pw_log_debug("pool %p: map:%p fd:%d ptr:%p map:%p ref:%d", p, + pw_log_debug("pool %p: map:%p fd:%d ptr:%p mapping:%p ref:%d", p, &mm->this, b->this.fd, mm->this.ptr, m, m->ref); spa_list_remove(&mm->link); @@ -387,26 +389,24 @@ int pw_memmap_free(struct pw_memmap *map) } /** Create a new memblock + * \param pool the pool to use * \param flags memblock flags + * \param type the requested memory type one of enum spa_data_type * \param size size to allocate - * \param[out] mem memblock structure to fill - * \return 0 on success, < 0 on error + * \return a memblock structure or NULL with errno on error * \memberof pw_memblock */ SPA_EXPORT -int pw_mempool_alloc(struct pw_mempool *pool, enum pw_memblock_flags flags, - size_t size, struct pw_memblock **mem) +struct pw_memblock * pw_mempool_alloc(struct pw_mempool *pool, enum pw_memblock_flags flags, + uint32_t type, size_t size) { struct mempool *impl = SPA_CONTAINER_OF(pool, struct mempool, this); struct memblock *b; int res; - spa_return_val_if_fail(pool != NULL, -EINVAL); - spa_return_val_if_fail(mem != NULL, -EINVAL); - b = calloc(1, sizeof(struct memblock)); if (b == NULL) - return -errno; + return NULL; b->this.ref = 1; b->this.pool = pool; @@ -445,7 +445,7 @@ int pw_mempool_alloc(struct pw_mempool *pool, enum pw_memblock_flags flags, } } #endif - b->this.type = SPA_DATA_MemFd; + b->this.type = type; if (flags & PW_MEMBLOCK_FLAG_MAP && size > 0) { enum pw_memmap_flags fl = 0; @@ -455,28 +455,29 @@ int pw_mempool_alloc(struct pw_mempool *pool, enum pw_memblock_flags flags, if (flags & PW_MEMBLOCK_FLAG_WRITABLE) fl |= PW_MEMMAP_FLAG_WRITE; - b->this.map = pw_memblock_map(&b->this, fl, 0, size); + b->this.map = pw_memblock_map(&b->this, fl, 0, size, NULL); if (b->this.map == NULL) { res = -errno; pw_log_warn("Failed to map: %m"); goto error_close; } + b->this.ref--; } b->this.id = pw_map_insert_new(&impl->map, b); spa_list_append(&impl->blocks, &b->link); - pw_log_debug("mem %p: alloc id:%d", &b->this, b->this.id); + pw_log_debug("pool %p: mem %p alloc id:%d type:%u", pool, &b->this, b->this.id, type); pw_mempool_emit_added(impl, &b->this); - *mem = &b->this; - return 0; + return &b->this; error_close: close(b->this.fd); error_free: free(b); - return res; + errno = -res; + return NULL; } static struct memblock * mempool_find_fd(struct pw_mempool *pool, int fd) @@ -495,7 +496,7 @@ static struct memblock * mempool_find_fd(struct pw_mempool *pool, int fd) SPA_EXPORT struct pw_memblock * pw_mempool_import(struct pw_mempool *pool, - uint32_t type, int fd, uint32_t flags) + enum pw_memblock_flags flags, uint32_t type, int fd) { struct mempool *impl = SPA_CONTAINER_OF(pool, struct mempool, this); struct memblock *b; @@ -521,7 +522,8 @@ struct pw_memblock * pw_mempool_import(struct pw_mempool *pool, b->this.id = pw_map_insert_new(&impl->map, b); spa_list_append(&impl->blocks, &b->link); - pw_log_debug("pool %p: import %p id:%u fd:%d", pool, b, b->this.id, fd); + pw_log_debug("pool %p: import %p id:%u flags:%08x type:%u fd:%d", + pool, b, b->this.id, flags, type, fd); pw_mempool_emit_added(impl, &b->this); @@ -532,12 +534,13 @@ SPA_EXPORT struct pw_memblock * pw_mempool_import_block(struct pw_mempool *pool, struct pw_memblock *mem) { - return pw_mempool_import(pool, mem->type, mem->fd, - mem->flags | PW_MEMBLOCK_FLAG_DONT_CLOSE); + return pw_mempool_import(pool, + mem->flags | PW_MEMBLOCK_FLAG_DONT_CLOSE, + mem->type, mem->fd); } -int pw_mempool_remove_id(struct pw_mempool *pool, uint32_t id) +int pw_mempool_unref_id(struct pw_mempool *pool, uint32_t id) { struct mempool *impl = SPA_CONTAINER_OF(pool, struct mempool, this); struct memblock *b; @@ -568,6 +571,8 @@ void pw_memblock_free(struct pw_memblock *block) pool, block, block->id, block->fd, block->ref); block->ref++; + if (block->map) + block->ref++; pw_map_remove(&impl->map, block->id); spa_list_remove(&b->link); @@ -628,3 +633,21 @@ struct pw_memblock * pw_mempool_find_fd(struct pw_mempool *pool, int fd) return &b->this; } + +SPA_EXPORT +struct pw_memmap * pw_mempool_find_tag(struct pw_mempool *pool, uint32_t tag[5]) +{ + struct mempool *impl = SPA_CONTAINER_OF(pool, struct mempool, this); + struct memblock *b; + struct memmap *mm; + + spa_list_for_each(b, &impl->blocks, link) { + spa_list_for_each(mm, &b->maps, link) { + if (memcmp(tag, mm->this.tag, sizeof(mm->this.tag)) == 0) { + pw_log_debug("pool %p: found %p", pool, mm); + return &mm->this; + } + } + } + return NULL; +} diff --git a/src/pipewire/mem.h b/src/pipewire/mem.h index 2bb61e830..9e0793153 100644 --- a/src/pipewire/mem.h +++ b/src/pipewire/mem.h @@ -62,62 +62,76 @@ struct pw_mempool { * Memory block structure */ struct pw_memblock { struct pw_mempool *pool; /**< owner pool */ - enum pw_memblock_flags flags; - int ref; /**< refcount */ uint32_t id; /**< unique id */ - uint32_t type; /**< type of the fd */ - uint32_t size; /**< size of memory */ + int ref; /**< refcount */ + uint32_t flags; /**< flags for the memory block on of enum pw_memblock_flags */ + uint32_t type; /**< type of the fd, one of enum spa_data_type */ int fd; /**< fd */ + uint32_t size; /**< size of memory */ struct pw_memmap *map; /**< optional map when PW_MEMBLOCK_FLAG_MAP was given */ }; -/** parameters to map a memory range */ -struct pw_map_range { - uint32_t start; /** offset in first page with start of data */ - uint32_t offset; /** page aligned offset to map */ - uint32_t size; /** size to map */ -}; - /** a mapped region of a pw_memblock */ struct pw_memmap { struct pw_memblock *block; /**< owner memblock */ - enum pw_memmap_flags flags; /**< flags used when mapping */ + void *ptr; /**< mapped pointer */ + uint32_t flags; /**< flags for the mapping on of enum pw_memmap_flags */ uint32_t offset; /**< offset in memblock */ uint32_t size; /**< size in memblock */ - void *ptr; /**< mapped pointer */ + uint32_t tag[5]; /**< user tag */ }; struct pw_mempool_events { #define PW_VERSION_MEMPOOL_EVENTS 0 uint32_t version; + /** the pool is destroyed */ void (*destroy) (void *data); + /** a new memory block is added to the pool */ void (*added) (void *data, struct pw_memblock *block); + /** a memory block is removed from the pool */ void (*removed) (void *data, struct pw_memblock *block); }; +/** Create a new memory pool */ struct pw_mempool *pw_mempool_new(struct pw_properties *props); +/** Listen for events */ void pw_mempool_add_listener(struct pw_mempool *pool, struct spa_hook *listener, const struct pw_mempool_events *events, void *data); +/** Destroy a pool */ void pw_mempool_destroy(struct pw_mempool *pool); -int pw_mempool_alloc(struct pw_mempool *pool, enum pw_memblock_flags flags, - size_t size, struct pw_memblock **mem); +/** Allocate a memory block from the pool */ +struct pw_memblock * pw_mempool_alloc(struct pw_mempool *pool, + enum pw_memblock_flags flags, uint32_t type, size_t size); +/** Import a block from another pool */ struct pw_memblock * pw_mempool_import_block(struct pw_mempool *pool, struct pw_memblock *mem); -struct pw_memblock * pw_mempool_import(struct pw_mempool *pool, - uint32_t type, int fd, enum pw_memblock_flags flags); -/** Find memblock for given \a id */ -int pw_mempool_remove_id(struct pw_mempool *pool, uint32_t id); +/** Import an fd into the pool */ +struct pw_memblock * pw_mempool_import(struct pw_mempool *pool, + enum pw_memblock_flags flags, uint32_t type, int fd); + +/** Free a memblock regardless of the refcount and destroy all mappings */ +void pw_memblock_free(struct pw_memblock *mem); + +/** Unref a memblock */ +static inline void pw_memblock_unref(struct pw_memblock *mem) +{ + if (--mem->ref == 0) + pw_memblock_free(mem); +} + +/** Unref a memblock for given \a id */ +int pw_mempool_unref_id(struct pw_mempool *pool, uint32_t id); /** Find memblock for given \a ptr */ struct pw_memblock * pw_mempool_find_ptr(struct pw_mempool *pool, const void *ptr); @@ -128,21 +142,30 @@ struct pw_memblock * pw_mempool_find_id(struct pw_mempool *pool, uint32_t id); /** Find memblock for given \a fd */ struct pw_memblock * pw_mempool_find_fd(struct pw_mempool *pool, int fd); + +/** Map a region of a memory block */ struct pw_memmap * pw_memblock_map(struct pw_memblock *block, - enum pw_memmap_flags flags, uint32_t offset, uint32_t size); + enum pw_memmap_flags flags, uint32_t offset, uint32_t size, + uint32_t tag[5]); +/** Map a region of a memory block with \a id */ struct pw_memmap * pw_mempool_map_id(struct pw_mempool *pool, uint32_t id, - enum pw_memmap_flags flags, uint32_t offset, uint32_t size); + enum pw_memmap_flags flags, uint32_t offset, uint32_t size, + uint32_t tag[5]); +/** find a map with the given tag */ +struct pw_memmap * pw_mempool_find_tag(struct pw_mempool *pool, uint32_t tag[5]); + +/** Unmap a region */ int pw_memmap_free(struct pw_memmap *map); -void pw_memblock_free(struct pw_memblock *mem); -static inline void pw_memblock_unref(struct pw_memblock *mem) -{ - if (--mem->ref == 0) - pw_memblock_free(mem); -} +/** parameters to map a memory range */ +struct pw_map_range { + uint32_t start; /** offset in first page with start of data */ + uint32_t offset; /** page aligned offset to map */ + uint32_t size; /** size to map */ +}; #define PW_MAP_RANGE_INIT (struct pw_map_range){ 0, } diff --git a/src/pipewire/node.c b/src/pipewire/node.c index 7a66b9b18..635c01d7c 100644 --- a/src/pipewire/node.c +++ b/src/pipewire/node.c @@ -96,27 +96,44 @@ static void node_deactivate(struct pw_node *this) static void add_node(struct pw_node *this, struct pw_node *driver) { - pw_log_trace("node %p: add to driver %p", this, driver); + uint32_t rdriver, rnode; + + if (this->exported) + return; + + pw_log_trace("node %p: add to driver %p %p %p", this, driver, + driver->rt.activation, this->rt.activation); /* signal the driver */ this->rt.driver_target.activation = driver->rt.activation; this->rt.driver_target.node = driver; this->rt.driver_target.data = driver; spa_list_append(&this->rt.target_list, &this->rt.driver_target.link); - this->rt.driver_target.activation->state[0].required++; + rdriver = ++this->rt.driver_target.activation->state[0].required; spa_list_append(&driver->rt.target_list, &this->rt.target.link); - this->rt.activation->state[0].required++; + rnode = ++this->rt.activation->state[0].required; + + pw_log_trace("node %p: required driver:%d node:%d", this, rdriver, rnode); } static void remove_node(struct pw_node *this) { - pw_log_trace("node %p: remove from driver %p", this, - this->rt.driver_target.data); + uint32_t rdriver, rnode; + + if (this->exported) + return; + + pw_log_trace("node %p: remove from driver %p %p %p", + this, this->rt.driver_target.data, + this->rt.driver_target.activation, this->rt.activation); + spa_list_remove(&this->rt.driver_target.link); - this->rt.driver_target.activation->state[0].required--; + rdriver = --this->rt.driver_target.activation->state[0].required; spa_list_remove(&this->rt.target.link); - this->rt.activation->state[0].required--; + rnode = --this->rt.activation->state[0].required; + + pw_log_trace("node %p: required driver:%d node:%d", this, rdriver, rnode); } static int @@ -862,12 +879,15 @@ struct pw_node *pw_node_new(struct pw_core *core, size = sizeof(struct pw_node_activation); - if ((res = pw_mempool_alloc(this->core->pool, - PW_MEMBLOCK_FLAG_READWRITE | - PW_MEMBLOCK_FLAG_SEAL | - PW_MEMBLOCK_FLAG_MAP, - size, &this->activation)) < 0) + this->activation = pw_mempool_alloc(this->core->pool, + PW_MEMBLOCK_FLAG_READWRITE | + PW_MEMBLOCK_FLAG_SEAL | + PW_MEMBLOCK_FLAG_MAP, + SPA_DATA_MemFd, size); + if (this->activation == NULL) { + res = -errno; goto error_clean; + } impl->work = pw_work_queue_new(this->core->main_loop); diff --git a/src/pipewire/port.c b/src/pipewire/port.c index 75478430d..5bbd8a0da 100644 --- a/src/pipewire/port.c +++ b/src/pipewire/port.c @@ -1031,7 +1031,7 @@ int pw_port_use_buffers(struct pw_port *port, uint32_t mix_id, struct pw_node *node = port->node; struct pw_port_mix *mix = NULL; - pw_log_debug("port %p: %d:%d.%d: %d buffers %d", port, + pw_log_debug("port %p: %d:%d.%d: %d buffers state:%d", port, port->direction, port->port_id, mix_id, n_buffers, port->state); if (n_buffers == 0 && port->state <= PW_PORT_STATE_READY) diff --git a/src/pipewire/remote.c b/src/pipewire/remote.c index a22097be4..e8ba41608 100644 --- a/src/pipewire/remote.c +++ b/src/pipewire/remote.c @@ -150,7 +150,7 @@ static void core_event_add_mem(void *data, uint32_t id, uint32_t type, int fd, u pw_log_debug("remote %p: add mem %u type:%u fd:%d flags:%u", this, id, type, fd, flags); - m = pw_mempool_import(this->pool, type, fd, flags); + m = pw_mempool_import(this->pool, flags, type, fd); if (m->id != id) { pw_log_error("remote %p: invalid mem id %u, expected %u", this, id, m->id); @@ -162,7 +162,7 @@ static void core_event_remove_mem(void *data, uint32_t id) { struct pw_remote *this = data; pw_log_debug("remote %p: remove mem %u", this, id); - pw_mempool_remove_id(this->pool, id); + pw_mempool_unref_id(this->pool, id); } static const struct pw_core_proxy_events core_proxy_events = {