diff --git a/src/Makefile.am b/src/Makefile.am index 63e59c58f..f8efd9ef2 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -683,7 +683,7 @@ libpulsecommon_@PA_MAJORMINOR@_la_SOURCES = \ pulsecore/memblock.c pulsecore/memblock.h \ pulsecore/memblockq.c pulsecore/memblockq.h \ pulsecore/memchunk.c pulsecore/memchunk.h \ - pulsecore/native-common.h \ + pulsecore/native-common.c pulsecore/native-common.h \ pulsecore/once.c pulsecore/once.h \ pulsecore/packet.c pulsecore/packet.h \ pulsecore/parseaddr.c pulsecore/parseaddr.h \ diff --git a/src/modules/module-tunnel.c b/src/modules/module-tunnel.c index 53c440225..5c8b84ae5 100644 --- a/src/modules/module-tunnel.c +++ b/src/modules/module-tunnel.c @@ -1778,7 +1778,7 @@ static void pstream_die_callback(pa_pstream *p, void *userdata) { } /* Called from main context */ -static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_cmsg_ancil_data *ancil_data, void *userdata) { +static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data, void *userdata) { struct userdata *u = userdata; pa_assert(p); diff --git a/src/pulse/context.c b/src/pulse/context.c index b8f51a65f..ef3941604 100644 --- a/src/pulse/context.c +++ b/src/pulse/context.c @@ -69,6 +69,7 @@ void pa_command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata); static void pa_command_enable_srbchannel(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata); static void pa_command_disable_srbchannel(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata); +static void pa_command_register_memfd_shmid(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata); static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = { [PA_COMMAND_REQUEST] = pa_command_request, @@ -90,6 +91,7 @@ static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = { [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = pa_command_stream_buffer_attr, [PA_COMMAND_ENABLE_SRBCHANNEL] = pa_command_enable_srbchannel, [PA_COMMAND_DISABLE_SRBCHANNEL] = pa_command_disable_srbchannel, + [PA_COMMAND_REGISTER_MEMFD_SHMID] = pa_command_register_memfd_shmid, }; static void context_free(pa_context *c); @@ -330,7 +332,7 @@ static void pstream_die_callback(pa_pstream *p, void *userdata) { pa_context_fail(c, PA_ERR_CONNECTIONTERMINATED); } -static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_cmsg_ancil_data *ancil_data, void *userdata) { +static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data, void *userdata) { pa_context *c = userdata; pa_assert(p); @@ -1432,8 +1434,7 @@ static void pa_command_enable_srbchannel(pa_pdispatch *pd, uint32_t command, uin pa_context *c = userdata; #ifdef HAVE_CREDS - const int *fds; - int nfd; + pa_cmsg_ancil_data *ancil = NULL; pa_assert(pd); pa_assert(command == PA_COMMAND_ENABLE_SRBCHANNEL); @@ -1441,26 +1442,34 @@ static void pa_command_enable_srbchannel(pa_pdispatch *pd, uint32_t command, uin pa_assert(c); pa_assert(PA_REFCNT_VALUE(c) >= 1); - /* Currently only one srb channel is supported, might change in future versions */ - if (c->srb_template.readfd != -1) { - pa_context_fail(c, PA_ERR_PROTOCOL); - return; - } + ancil = pa_pdispatch_take_ancil_data(pd); + if (!ancil) + goto fail; - fds = pa_pdispatch_fds(pd, &nfd); - if (nfd != 2 || !fds || fds[0] == -1 || fds[1] == -1) { - pa_context_fail(c, PA_ERR_PROTOCOL); - return; - } + /* Currently only one srb channel is supported, might change in future versions */ + if (c->srb_template.readfd != -1) + goto fail; + + if (ancil->nfd != 2 || ancil->fds[0] == -1 || ancil->fds[1] == -1) + goto fail; pa_context_ref(c); - c->srb_template.readfd = fds[0]; - c->srb_template.writefd = fds[1]; + c->srb_template.readfd = ancil->fds[0]; + c->srb_template.writefd = ancil->fds[1]; c->srb_setup_tag = tag; pa_context_unref(c); + ancil->close_fds_on_cleanup = false; + return; + +fail: + if (ancil) + pa_cmsg_ancil_data_close_fds(ancil); + + pa_context_fail(c, PA_ERR_PROTOCOL); + return; #else pa_assert(c); pa_context_fail(c, PA_ERR_PROTOCOL); @@ -1493,6 +1502,18 @@ static void pa_command_disable_srbchannel(pa_pdispatch *pd, uint32_t command, ui pa_pstream_send_tagstruct(c->pstream, t2); } +static void pa_command_register_memfd_shmid(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { + pa_context *c = userdata; + + pa_assert(pd); + pa_assert(command == PA_COMMAND_REGISTER_MEMFD_SHMID); + pa_assert(t); + pa_assert(c); + pa_assert(PA_REFCNT_VALUE(c) >= 1); + + if (pa_common_command_register_memfd_shmid(c->pstream, pd, c->version, command, t)) + pa_context_fail(c, PA_ERR_PROTOCOL); +} void pa_command_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { pa_context *c = userdata; diff --git a/src/pulsecore/creds.h b/src/pulsecore/creds.h index 64d838730..9fdbb4f7e 100644 --- a/src/pulsecore/creds.h +++ b/src/pulsecore/creds.h @@ -49,9 +49,14 @@ struct pa_cmsg_ancil_data { pa_creds creds; bool creds_valid; int nfd; + + /* Don't close these fds by your own. Check pa_cmsg_ancil_data_close_fds() */ int fds[MAX_ANCIL_DATA_FDS]; + bool close_fds_on_cleanup; }; +void pa_cmsg_ancil_data_close_fds(struct pa_cmsg_ancil_data *ancil); + #else #undef HAVE_CREDS #endif diff --git a/src/pulsecore/iochannel.c b/src/pulsecore/iochannel.c index d1bb109ea..e62750b11 100644 --- a/src/pulsecore/iochannel.c +++ b/src/pulsecore/iochannel.c @@ -450,6 +450,7 @@ ssize_t pa_iochannel_read_with_ancil_data(pa_iochannel*io, void*data, size_t l, } memcpy(ancil_data->fds, CMSG_DATA(cmh), nfd * sizeof(int)); ancil_data->nfd = nfd; + ancil_data->close_fds_on_cleanup = true; } } diff --git a/src/pulsecore/memblock.c b/src/pulsecore/memblock.c index 8a7f5f38d..57b0645ba 100644 --- a/src/pulsecore/memblock.c +++ b/src/pulsecore/memblock.c @@ -1107,7 +1107,8 @@ pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i); -/* Should be called locked */ +/* Should be called locked + * Caller owns passed @memfd_fd and must close it down when appropriate. */ static pa_memimport_segment* segment_attach(pa_memimport *i, pa_mem_type_t type, uint32_t shm_id, int memfd_fd, bool writable) { pa_memimport_segment* seg; @@ -1196,7 +1197,9 @@ void pa_memimport_free(pa_memimport *i) { * memory region) as its value. * * Note! check comments at 'pa_shm->fd', 'segment_is_permanent()', - * and 'pa_pstream_register_memfd_mempool()' for further details. */ + * and 'pa_pstream_register_memfd_mempool()' for further details. + * + * Caller owns passed @memfd_fd and must close it down when appropriate. */ int pa_memimport_attach_memfd(pa_memimport *i, uint32_t shm_id, int memfd_fd, bool writable) { pa_memimport_segment *seg; int ret = -1; diff --git a/src/pulsecore/native-common.c b/src/pulsecore/native-common.c new file mode 100644 index 000000000..282a4ed3b --- /dev/null +++ b/src/pulsecore/native-common.c @@ -0,0 +1,78 @@ +/*** + This file is part of PulseAudio. + + Copyright 2016 Ahmed S. Darwish + + PulseAudio is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as + published by the Free Software Foundation; either version 2.1 of the + License, or (at your option) any later version. + + PulseAudio is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with PulseAudio; if not, see . +***/ + +#ifdef HAVE_CONFIG_H +#include +#endif + +#include +#include +#include +#include +#include +#include + +#include "native-common.h" + +/* + * Command handlers shared between client and server + */ + +/* Check pa_pstream_register_memfd_mempool() for further details */ +int pa_common_command_register_memfd_shmid(pa_pstream *p, pa_pdispatch *pd, uint32_t version, + uint32_t command, pa_tagstruct *t) { +#if defined(HAVE_CREDS) && defined(HAVE_MEMFD) + pa_cmsg_ancil_data *ancil = NULL; + unsigned shm_id; + int ret = -1; + + pa_assert(pd); + pa_assert(command == PA_COMMAND_REGISTER_MEMFD_SHMID); + pa_assert(t); + + ancil = pa_pdispatch_take_ancil_data(pd); + if (!ancil) + goto finish; + + /* Upon fd leaks and reaching our open fd limit, recvmsg(2) + * just strips all passed fds from the ancillary data */ + if (ancil->nfd == 0) { + pa_log("Expected 1 memfd fd to be received over pipe; got 0"); + pa_log("Did we reach our open file descriptors limit?"); + goto finish; + } + + if (ancil->nfd != 1 || ancil->fds[0] == -1) + goto finish; + + if (version < 31 || pa_tagstruct_getu32(t, &shm_id) < 0 || !pa_tagstruct_eof(t)) + goto finish; + + pa_pstream_attach_memfd_shmid(p, shm_id, ancil->fds[0]); + + ret = 0; +finish: + if (ancil) + pa_cmsg_ancil_data_close_fds(ancil); + + return ret; +#else + return -1; +#endif +} diff --git a/src/pulsecore/native-common.h b/src/pulsecore/native-common.h index dc62895fc..70338b9f3 100644 --- a/src/pulsecore/native-common.h +++ b/src/pulsecore/native-common.h @@ -24,6 +24,10 @@ #include #include +#include +#include +#include + PA_C_DECL_BEGIN enum { @@ -179,6 +183,10 @@ enum { PA_COMMAND_ENABLE_SRBCHANNEL, PA_COMMAND_DISABLE_SRBCHANNEL, + /* Supported since protocol v31 (9.0) + * BOTH DIRECTIONS */ + PA_COMMAND_REGISTER_MEMFD_SHMID, + PA_COMMAND_MAX }; @@ -193,6 +201,9 @@ enum { #define PA_NATIVE_DEFAULT_UNIX_SOCKET "native" +int pa_common_command_register_memfd_shmid(pa_pstream *p, pa_pdispatch *pd, uint32_t version, + uint32_t command, pa_tagstruct *t); + PA_C_DECL_END #endif diff --git a/src/pulsecore/pdispatch.c b/src/pulsecore/pdispatch.c index f1368759f..ab632a5ab 100644 --- a/src/pulsecore/pdispatch.c +++ b/src/pulsecore/pdispatch.c @@ -195,6 +195,10 @@ static const char *command_names[PA_COMMAND_MAX] = { /* BOTH DIRECTIONS */ [PA_COMMAND_ENABLE_SRBCHANNEL] = "ENABLE_SRBCHANNEL", [PA_COMMAND_DISABLE_SRBCHANNEL] = "DISABLE_SRBCHANNEL", + + /* Supported since protocol v31 (9.0) */ + /* BOTH DIRECTIONS */ + [PA_COMMAND_REGISTER_MEMFD_SHMID] = "REGISTER_MEMFD_SHMID", }; #endif @@ -219,7 +223,7 @@ struct pa_pdispatch { PA_LLIST_HEAD(struct reply_info, replies); pa_pdispatch_drain_cb_t drain_callback; void *drain_userdata; - const pa_cmsg_ancil_data *ancil_data; + pa_cmsg_ancil_data *ancil_data; bool use_rtclock; }; @@ -289,7 +293,7 @@ static void run_action(pa_pdispatch *pd, struct reply_info *r, uint32_t command, pa_pdispatch_unref(pd); } -int pa_pdispatch_run(pa_pdispatch *pd, pa_packet *packet, const pa_cmsg_ancil_data *ancil_data, void *userdata) { +int pa_pdispatch_run(pa_pdispatch *pd, pa_packet *packet, pa_cmsg_ancil_data *ancil_data, void *userdata) { uint32_t tag, command; pa_tagstruct *ts = NULL; int ret = -1; @@ -448,18 +452,24 @@ const pa_creds * pa_pdispatch_creds(pa_pdispatch *pd) { return NULL; } -const int * pa_pdispatch_fds(pa_pdispatch *pd, int *nfd) { +/* Should be called only once during the dispatcher lifetime + * + * If the returned ancillary data contains any fds, caller maintains sole + * responsibility of closing them down using pa_cmsg_ancil_data_close_fds() */ +pa_cmsg_ancil_data *pa_pdispatch_take_ancil_data(pa_pdispatch *pd) { + pa_cmsg_ancil_data *ancil; + pa_assert(pd); pa_assert(PA_REFCNT_VALUE(pd) >= 1); - pa_assert(nfd); - if (pd->ancil_data) { - *nfd = pd->ancil_data->nfd; - return pd->ancil_data->fds; - } + ancil = pd->ancil_data; - *nfd = 0; - return NULL; + /* iochannel guarantees us that nfd will always be capped */ + if (ancil) + pa_assert(ancil->nfd <= MAX_ANCIL_DATA_FDS); + + pd->ancil_data = NULL; + return ancil; } #endif diff --git a/src/pulsecore/pdispatch.h b/src/pulsecore/pdispatch.h index 9cb341968..af7698191 100644 --- a/src/pulsecore/pdispatch.h +++ b/src/pulsecore/pdispatch.h @@ -39,7 +39,7 @@ pa_pdispatch* pa_pdispatch_new(pa_mainloop_api *m, bool use_rtclock, const pa_pd void pa_pdispatch_unref(pa_pdispatch *pd); pa_pdispatch* pa_pdispatch_ref(pa_pdispatch *pd); -int pa_pdispatch_run(pa_pdispatch *pd, pa_packet *p, const pa_cmsg_ancil_data *ancil_data, void *userdata); +int pa_pdispatch_run(pa_pdispatch *pd, pa_packet *p, pa_cmsg_ancil_data *ancil_data, void *userdata); void pa_pdispatch_register_reply(pa_pdispatch *pd, uint32_t tag, int timeout, pa_pdispatch_cb_t callback, void *userdata, pa_free_cb_t free_cb); @@ -51,7 +51,6 @@ void pa_pdispatch_set_drain_callback(pa_pdispatch *pd, pa_pdispatch_drain_cb_t c void pa_pdispatch_unregister_reply(pa_pdispatch *pd, void *userdata); const pa_creds * pa_pdispatch_creds(pa_pdispatch *pd); - -const int * pa_pdispatch_fds(pa_pdispatch *pd, int *nfd); +pa_cmsg_ancil_data *pa_pdispatch_take_ancil_data(pa_pdispatch *pd); #endif diff --git a/src/pulsecore/protocol-native.c b/src/pulsecore/protocol-native.c index c9182d01c..ffa5c4dcd 100644 --- a/src/pulsecore/protocol-native.c +++ b/src/pulsecore/protocol-native.c @@ -301,6 +301,7 @@ static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_ static void command_set_sink_or_source_port(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata); static void command_set_port_latency_offset(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata); static void command_enable_srbchannel(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata); +static void command_register_memfd_shmid(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata); static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = { [PA_COMMAND_ERROR] = NULL, @@ -406,6 +407,8 @@ static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = { [PA_COMMAND_ENABLE_SRBCHANNEL] = command_enable_srbchannel, + [PA_COMMAND_REGISTER_MEMFD_SHMID] = command_register_memfd_shmid, + [PA_COMMAND_EXTENSION] = command_extension }; @@ -2646,7 +2649,7 @@ static void setup_srbchannel(pa_native_connection *c) { pa_tagstruct_putu32(t, (size_t) srb); /* tag */ fdlist[0] = srbt.readfd; fdlist[1] = srbt.writefd; - pa_pstream_send_tagstruct_with_fds(c->pstream, t, 2, fdlist); + pa_pstream_send_tagstruct_with_fds(c->pstream, t, 2, fdlist, false); /* Send ringbuffer memblock to client */ mc.memblock = srbt.memblock; @@ -2805,6 +2808,16 @@ static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_ta setup_srbchannel(c); } +static void command_register_memfd_shmid(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { + pa_native_connection *c = PA_NATIVE_CONNECTION(userdata); + + pa_native_connection_assert_ref(c); + pa_assert(t); + + if (pa_common_command_register_memfd_shmid(c->pstream, pd, c->version, command, t)) + protocol_error(c); +} + static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { pa_native_connection *c = PA_NATIVE_CONNECTION(userdata); const char *name = NULL; @@ -4910,7 +4923,7 @@ static void command_set_port_latency_offset(pa_pdispatch *pd, uint32_t command, /*** pstream callbacks ***/ -static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_cmsg_ancil_data *ancil_data, void *userdata) { +static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data, void *userdata) { pa_native_connection *c = PA_NATIVE_CONNECTION(userdata); pa_assert(p); diff --git a/src/pulsecore/pstream-util.c b/src/pulsecore/pstream-util.c index e87450337..449ea1f85 100644 --- a/src/pulsecore/pstream-util.c +++ b/src/pulsecore/pstream-util.c @@ -21,13 +21,16 @@ #include #endif -#include +#include #include +#include +#include +#include #include #include "pstream-util.h" -static void pa_pstream_send_tagstruct_with_ancil_data(pa_pstream *p, pa_tagstruct *t, const pa_cmsg_ancil_data *ancil_data) { +static void pa_pstream_send_tagstruct_with_ancil_data(pa_pstream *p, pa_tagstruct *t, pa_cmsg_ancil_data *ancil_data) { size_t length; const uint8_t *data; pa_packet *packet; @@ -58,12 +61,21 @@ void pa_pstream_send_tagstruct_with_creds(pa_pstream *p, pa_tagstruct *t, const pa_pstream_send_tagstruct_with_ancil_data(p, t, NULL); } -void pa_pstream_send_tagstruct_with_fds(pa_pstream *p, pa_tagstruct *t, int nfd, const int *fds) { +/* @close_fds: If set then the pstreams code, after invoking a sendmsg(), + * will close all passed fds. + * + * Such fds cannot be closed here as this might lead to freeing them + * before they're actually passed to the other end. The internally-used + * pa_pstream_send_packet() does not do any actual writes and just + * defers write events over the pstream. */ +void pa_pstream_send_tagstruct_with_fds(pa_pstream *p, pa_tagstruct *t, int nfd, const int *fds, + bool close_fds) { if (nfd > 0) { pa_cmsg_ancil_data a; a.nfd = nfd; a.creds_valid = false; + a.close_fds_on_cleanup = close_fds; pa_assert(nfd <= MAX_ANCIL_DATA_FDS); memcpy(a.fds, fds, sizeof(int) * nfd); pa_pstream_send_tagstruct_with_ancil_data(p, t, &a); @@ -78,7 +90,8 @@ void pa_pstream_send_tagstruct_with_creds(pa_pstream *p, pa_tagstruct *t, const pa_pstream_send_tagstruct_with_ancil_data(p, t, NULL); } -void pa_pstream_send_tagstruct_with_fds(pa_pstream *p, pa_tagstruct *t, int nfd, const int *fds) { +void pa_pstream_send_tagstruct_with_fds(pa_pstream *p, pa_tagstruct *t, int nfd, const int *fds, + bool close_fds) { pa_assert_not_reached(); } @@ -102,3 +115,82 @@ void pa_pstream_send_simple_ack(pa_pstream *p, uint32_t tag) { pa_tagstruct_putu32(t, tag); pa_pstream_send_tagstruct(p, t); } + +/* Before sending blocks from a memfd-backed pool over the pipe, we + * must call this method first. + * + * This is needed to transfer memfd blocks without passing their fd + * every time, thus minimizing overhead and avoiding fd leaks. + * + * On registration a packet is sent with the memfd fd as ancil data; + * such packet has an ID that uniquely identifies the pool's memfd + * region. Upon arrival the other end creates a permanent mapping + * between that ID and the passed memfd memory area. + * + * By doing so, we won't need to reference the pool's memfd fd any + * further - just its ID. Both endpoints can then close their fds. */ +int pa_pstream_register_memfd_mempool(pa_pstream *p, pa_mempool *pool, const char **fail_reason) { +#if defined(HAVE_CREDS) && defined(HAVE_MEMFD) + unsigned shm_id; + int memfd_fd, ret = -1; + pa_tagstruct *t; + bool per_client_mempool; + + pa_assert(p); + pa_assert(fail_reason); + + *fail_reason = NULL; + per_client_mempool = pa_mempool_is_per_client(pool); + + pa_pstream_ref(p); + + if (!pa_mempool_is_shared(pool)) { + *fail_reason = "mempool is not shared"; + goto finish; + } + + if (!pa_mempool_is_memfd_backed(pool)) { + *fail_reason = "mempool is not memfd-backed"; + goto finish; + } + + if (pa_mempool_get_shm_id(pool, &shm_id)) { + *fail_reason = "could not extract pool SHM ID"; + goto finish; + } + + if (!pa_pstream_get_memfd(p)) { + *fail_reason = "pipe does not support memfd transport"; + goto finish; + } + + memfd_fd = (per_client_mempool) ? pa_mempool_take_memfd_fd(pool) : + pa_mempool_get_memfd_fd(pool); + + /* Note! For per-client mempools we've taken ownership of the memfd + * fd, and we're thus the sole code path responsible for closing it. + * In case of any failure, it MUST be closed. */ + + if (pa_pstream_attach_memfd_shmid(p, shm_id, memfd_fd)) { + *fail_reason = "could not attach memfd SHM ID to pipe"; + + if (per_client_mempool) + pa_assert_se(pa_close(memfd_fd) == 0); + goto finish; + } + + t = pa_tagstruct_new(); + pa_tagstruct_putu32(t, PA_COMMAND_REGISTER_MEMFD_SHMID); + pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */ + pa_tagstruct_putu32(t, shm_id); + pa_pstream_send_tagstruct_with_fds(p, t, 1, &memfd_fd, per_client_mempool); + + ret = 0; +finish: + pa_pstream_unref(p); + return ret; + +#else + pa_assert_not_reached(); +#endif +} diff --git a/src/pulsecore/pstream-util.h b/src/pulsecore/pstream-util.h index 136631497..1191d4803 100644 --- a/src/pulsecore/pstream-util.h +++ b/src/pulsecore/pstream-util.h @@ -27,11 +27,13 @@ /* The tagstruct is freed!*/ void pa_pstream_send_tagstruct_with_creds(pa_pstream *p, pa_tagstruct *t, const pa_creds *creds); -void pa_pstream_send_tagstruct_with_fds(pa_pstream *p, pa_tagstruct *t, int nfd, const int *fds); +void pa_pstream_send_tagstruct_with_fds(pa_pstream *p, pa_tagstruct *t, int nfd, const int *fds, bool close_fds); #define pa_pstream_send_tagstruct(p, t) pa_pstream_send_tagstruct_with_creds((p), (t), NULL) void pa_pstream_send_error(pa_pstream *p, uint32_t tag, uint32_t error); void pa_pstream_send_simple_ack(pa_pstream *p, uint32_t tag); +int pa_pstream_register_memfd_mempool(pa_pstream *p, pa_mempool *pool, const char **fail_reason); + #endif diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c index 0fb37a03b..1ea3c5bb0 100644 --- a/src/pulsecore/pstream.c +++ b/src/pulsecore/pstream.c @@ -32,6 +32,7 @@ #include +#include #include #include #include @@ -44,6 +45,7 @@ /* We piggyback information if audio data blocks are stored in SHM on the seek mode */ #define PA_FLAG_SHMDATA 0x80000000LU +#define PA_FLAG_SHMDATA_MEMFD_BLOCK 0x20000000LU #define PA_FLAG_SHMRELEASE 0x40000000LU #define PA_FLAG_SHMREVOKE 0xC0000000LU #define PA_FLAG_SHMMASK 0xFF000000LU @@ -143,7 +145,17 @@ struct pa_pstream { struct pstream_read readio, readsrb; - bool use_shm; + /* @use_shm: beside copying the full audio data to the other + * PA end, this pipe supports just sending references of the + * same audio data blocks if they reside in a SHM pool. + * + * @use_memfd: pipe supports sending SHM memfd block references + * + * @registered_memfd_ids: registered memfd pools SHM IDs. Check + * pa_pstream_register_memfd_mempool() for more information. */ + bool use_shm, use_memfd; + pa_idxset *registered_memfd_ids; + pa_memimport *import; pa_memexport *export; @@ -168,11 +180,33 @@ struct pa_pstream { pa_mempool *mempool; #ifdef HAVE_CREDS - pa_cmsg_ancil_data read_ancil_data, write_ancil_data; + pa_cmsg_ancil_data read_ancil_data, *write_ancil_data; bool send_ancil_data_now; #endif }; +#ifdef HAVE_CREDS +/* Don't close the ancillary fds by your own! Always call this method; + * it guarantees necessary cleanups after fds close.. This method is + * also multiple-invocations safe. */ +void pa_cmsg_ancil_data_close_fds(struct pa_cmsg_ancil_data *ancil) { + if (ancil && ancil->close_fds_on_cleanup) { + int i; + + pa_assert(ancil->nfd <= MAX_ANCIL_DATA_FDS); + + for (i = 0; i < ancil->nfd; i++) + if (ancil->fds[i] != -1) { + pa_assert_se(pa_close(ancil->fds[i]) == 0); + ancil->fds[i] = -1; + } + + ancil->nfd = 0; + ancil->close_fds_on_cleanup = false; + } +} +#endif + static int do_write(pa_pstream *p); static int do_read(pa_pstream *p, struct pstream_read *re); @@ -287,6 +321,35 @@ pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *poo return p; } +/* Attach memfd<->SHM_ID mapping to given pstream and its memimport. + * Check pa_pstream_register_memfd_mempool() for further info. + * + * Caller owns the passed @memfd_fd and must close it down when appropriate. */ +int pa_pstream_attach_memfd_shmid(pa_pstream *p, unsigned shm_id, int memfd_fd) { + int err = -1; + + pa_assert(memfd_fd != -1); + + if (!p->use_memfd) { + pa_log_warn("Received memfd ID registration request over a pipe " + "that does not support memfds"); + return err; + } + + if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) { + pa_log_warn("previously registered memfd SHM ID = %u", shm_id); + return err; + } + + if (pa_memimport_attach_memfd(p->import, shm_id, memfd_fd, true)) { + pa_log("Failed to create permanent mapping for memfd region with ID = %u", shm_id); + return err; + } + + pa_assert_se(pa_idxset_put(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL) == 0); + return 0; +} + static void item_free(void *item) { struct item_info *i = item; pa_assert(i); @@ -299,6 +362,15 @@ static void item_free(void *item) { pa_packet_unref(i->packet); } +#ifdef HAVE_CREDS + /* On error recovery paths, there might be lingering items + * on the pstream send queue and they are usually freed with + * a call to 'pa_queue_free(p->send_queue, item_free)'. Make + * sure we do not leak any fds in that case! */ + if (i->with_ancil_data) + pa_cmsg_ancil_data_close_fds(&i->ancil_data); +#endif + if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0) pa_xfree(i); } @@ -328,18 +400,25 @@ static void pstream_free(pa_pstream *p) { if (p->readio.packet) pa_packet_unref(p->readio.packet); + if (p->registered_memfd_ids) + pa_idxset_free(p->registered_memfd_ids, NULL); + pa_xfree(p); } -void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_cmsg_ancil_data *ancil_data) { +void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data) { struct item_info *i; pa_assert(p); pa_assert(PA_REFCNT_VALUE(p) > 0); pa_assert(packet); - if (p->dead) + if (p->dead) { +#ifdef HAVE_CREDS + pa_cmsg_ancil_data_close_fds(ancil_data); +#endif return; + } if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items)))) i = pa_xnew(struct item_info, 1); @@ -556,22 +635,40 @@ static void prepare_next_write_item(pa_pstream *p) { &shm_id, &offset, &length) >= 0) { - pa_assert(type == PA_MEM_TYPE_SHARED_POSIX); - flags |= PA_FLAG_SHMDATA; - if (pa_mempool_is_remote_writable(current_pool)) - flags |= PA_FLAG_SHMWRITABLE; - send_payload = false; + if (type == PA_MEM_TYPE_SHARED_POSIX) + send_payload = false; - shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id); - shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id); - shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index)); - shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length); + if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd) { + if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) { + flags |= PA_FLAG_SHMDATA_MEMFD_BLOCK; + send_payload = false; + } else { + if (pa_log_ratelimit(PA_LOG_ERROR)) { + pa_log("Cannot send block reference with non-registered memfd ID = %u", shm_id); + pa_log("Fallig back to copying full block data over socket"); + } + } + } - p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size); - p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size; + if (send_payload) { + pa_assert_se(pa_memexport_process_release(current_export, block_id) == 0); + } else { + flags |= PA_FLAG_SHMDATA; + if (pa_mempool_is_remote_writable(current_pool)) + flags |= PA_FLAG_SHMWRITABLE; + + shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id); + shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id); + shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index)); + shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length); + + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size); + p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size; + } } /* else */ +/* FIXME: Avoid memexport slot leaks. Call pa_memexport_process_release() */ /* pa_log_warn("Failed to export memory block."); */ if (current_export != p->export) @@ -590,7 +687,7 @@ static void prepare_next_write_item(pa_pstream *p) { #ifdef HAVE_CREDS if ((p->send_ancil_data_now = p->write.current->with_ancil_data)) - p->write_ancil_data = p->write.current->ancil_data; + p->write_ancil_data = &p->write.current->ancil_data; #endif } @@ -650,14 +747,16 @@ static int do_write(pa_pstream *p) { #ifdef HAVE_CREDS if (p->send_ancil_data_now) { - if (p->write_ancil_data.creds_valid) { - pa_assert(p->write_ancil_data.nfd == 0); - if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_ancil_data.creds)) < 0) + if (p->write_ancil_data->creds_valid) { + pa_assert(p->write_ancil_data->nfd == 0); + if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_ancil_data->creds)) < 0) goto fail; } else - if ((r = pa_iochannel_write_with_fds(p->io, d, l, p->write_ancil_data.nfd, p->write_ancil_data.fds)) < 0) + if ((r = pa_iochannel_write_with_fds(p->io, d, l, p->write_ancil_data->nfd, p->write_ancil_data->fds)) < 0) goto fail; + + pa_cmsg_ancil_data_close_fds(p->write_ancil_data); p->send_ancil_data_now = false; } else #endif @@ -688,6 +787,10 @@ static int do_write(pa_pstream *p) { return (size_t) r == l ? 1 : 0; fail: +#ifdef HAVE_CREDS + if (p->send_ancil_data_now) + pa_cmsg_ancil_data_close_fds(p->write_ancil_data); +#endif if (release_memblock) pa_memblock_release(release_memblock); @@ -768,6 +871,7 @@ static int do_read(pa_pstream *p, struct pstream_read *re) { pa_assert(b.nfd <= MAX_ANCIL_DATA_FDS); p->read_ancil_data.nfd = b.nfd; memcpy(p->read_ancil_data.fds, b.fds, sizeof(int) * b.nfd); + p->read_ancil_data.close_fds_on_cleanup = b.close_fds_on_cleanup; } } #else @@ -844,7 +948,7 @@ static int do_read(pa_pstream *p, struct pstream_read *re) { return -1; } - if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) { + if (((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0) { if (length != sizeof(re->shm_info)) { pa_log_warn("Received SHM memblock frame with invalid frame length."); @@ -887,19 +991,28 @@ static int do_read(pa_pstream *p, struct pstream_read *re) { pa_packet_unref(re->packet); } else { - pa_memblock *b; + pa_memblock *b = NULL; uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]); - pa_assert((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA); + uint32_t shm_id = ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]); + pa_mem_type_t type = (flags & PA_FLAG_SHMDATA_MEMFD_BLOCK) ? + PA_MEM_TYPE_SHARED_MEMFD : PA_MEM_TYPE_SHARED_POSIX; + pa_assert(((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0); pa_assert(p->import); - if (!(b = pa_memimport_get(p->import, - PA_MEM_TYPE_SHARED_POSIX, - ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]), - ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]), - ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]), - ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]), - !!(flags & PA_FLAG_SHMWRITABLE)))) { + if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd && + !pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) { + + if (pa_log_ratelimit(PA_LOG_ERROR)) + pa_log("Ignoring received block reference with non-registered memfd ID = %u", shm_id); + + } else if (!(b = pa_memimport_get(p->import, + type, + ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]), + shm_id, + ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]), + ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]), + !!(flags & PA_FLAG_SHMWRITABLE)))) { if (pa_log_ratelimit(PA_LOG_DEBUG)) pa_log_debug("Failed to import memory block."); @@ -942,6 +1055,13 @@ frame_done: re->data = NULL; #ifdef HAVE_CREDS + /* FIXME: Close received ancillary data fds if the pstream's + * receive_packet_callback did not do so. + * + * Malicious clients can attach fds to unknown commands, or attach them + * to commands that does not expect fds. By doing so, server will reach + * its open fd limit and future clients' SHM transfers will always fail. + */ p->read_ancil_data.creds_valid = false; p->read_ancil_data.nfd = 0; #endif @@ -1090,6 +1210,18 @@ void pa_pstream_enable_shm(pa_pstream *p, bool enable) { } } +void pa_pstream_enable_memfd(pa_pstream *p) { + pa_assert(p); + pa_assert(PA_REFCNT_VALUE(p) > 0); + pa_assert(p->use_shm); + + p->use_memfd = true; + + if (!p->registered_memfd_ids) { + p->registered_memfd_ids = pa_idxset_new(NULL, NULL); + } +} + bool pa_pstream_get_shm(pa_pstream *p) { pa_assert(p); pa_assert(PA_REFCNT_VALUE(p) > 0); @@ -1097,6 +1229,13 @@ bool pa_pstream_get_shm(pa_pstream *p) { return p->use_shm; } +bool pa_pstream_get_memfd(pa_pstream *p) { + pa_assert(p); + pa_assert(PA_REFCNT_VALUE(p) > 0); + + return p->use_memfd; +} + void pa_pstream_set_srbchannel(pa_pstream *p, pa_srbchannel *srb) { pa_assert(p); pa_assert(PA_REFCNT_VALUE(p) > 0 || srb == NULL); diff --git a/src/pulsecore/pstream.h b/src/pulsecore/pstream.h index f4e1462e9..2bff270ad 100644 --- a/src/pulsecore/pstream.h +++ b/src/pulsecore/pstream.h @@ -36,7 +36,7 @@ typedef struct pa_pstream pa_pstream; -typedef void (*pa_pstream_packet_cb_t)(pa_pstream *p, pa_packet *packet, const pa_cmsg_ancil_data *ancil_data, void *userdata); +typedef void (*pa_pstream_packet_cb_t)(pa_pstream *p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data, void *userdata); typedef void (*pa_pstream_memblock_cb_t)(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata); typedef void (*pa_pstream_notify_cb_t)(pa_pstream *p, void *userdata); typedef void (*pa_pstream_block_id_cb_t)(pa_pstream *p, uint32_t block_id, void *userdata); @@ -48,7 +48,9 @@ void pa_pstream_unref(pa_pstream*p); void pa_pstream_unlink(pa_pstream *p); -void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_cmsg_ancil_data *ancil_data); +int pa_pstream_attach_memfd_shmid(pa_pstream *p, unsigned shm_id, int memfd_fd); + +void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data); void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk); void pa_pstream_send_release(pa_pstream *p, uint32_t block_id); void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id); @@ -63,7 +65,9 @@ void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, bool pa_pstream_is_pending(pa_pstream *p); void pa_pstream_enable_shm(pa_pstream *p, bool enable); +void pa_pstream_enable_memfd(pa_pstream *p); bool pa_pstream_get_shm(pa_pstream *p); +bool pa_pstream_get_memfd(pa_pstream *p); /* Enables shared ringbuffer channel. Note that the srbchannel is now owned by the pstream. Setting srb to NULL will free any existing srbchannel. */ diff --git a/src/tests/connect-stress.c b/src/tests/connect-stress.c index 055ef1326..a243df9ea 100644 --- a/src/tests/connect-stress.c +++ b/src/tests/connect-stress.c @@ -63,7 +63,8 @@ static const pa_sample_spec sample_spec = { static void context_state_callback(pa_context *c, void *userdata); -static void connect(const char *name, int *try) { +/* Note: don't conflict with connect(2) declaration */ +static void _connect(const char *name, int *try) { int ret; pa_mainloop_api *api; @@ -87,7 +88,7 @@ static void connect(const char *name, int *try) { fail_unless(ret == 0); } -static void disconnect(void) { +static void _disconnect(void) { int i; fail_unless(mainloop != NULL); @@ -201,9 +202,9 @@ START_TEST (connect_stress_test) { streams[i] = NULL; for (i = 0; i < NTESTS; i++) { - connect(bname, &i); + _connect(bname, &i); usleep(rand() % 500000); - disconnect(); + _disconnect(); usleep(rand() % 500000); } diff --git a/src/tests/srbchannel-test.c b/src/tests/srbchannel-test.c index 9fc5d45b8..0e7b0ce90 100644 --- a/src/tests/srbchannel-test.c +++ b/src/tests/srbchannel-test.c @@ -34,7 +34,7 @@ static unsigned packets_received; static unsigned packets_checksum; static size_t packets_length; -static void packet_received(pa_pstream *p, pa_packet *packet, const pa_cmsg_ancil_data *ancil_data, void *userdata) { +static void packet_received(pa_pstream *p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data, void *userdata) { const uint8_t *pdata; size_t plen; unsigned i;