From 7da031c969f3e44d7cc6caec0d3cc46dd1a6e221 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 28 Feb 2023 16:14:19 +0100 Subject: [PATCH] module-rtp: add new rtp-session module The module uses the apple session setup for managing peer connections. Make a generic rtp stream object, make midi and audio implementations. --- src/modules/meson.build | 10 + src/modules/module-rtp-session.c | 1123 ++++++++++++++++++++++++++++++ src/modules/module-rtp/audio.c | 303 ++++++++ src/modules/module-rtp/midi.c | 472 +++++++++++++ src/modules/module-rtp/stream.c | 455 ++++++++++++ src/modules/module-rtp/stream.h | 38 + 6 files changed, 2401 insertions(+) create mode 100644 src/modules/module-rtp-session.c create mode 100644 src/modules/module-rtp/audio.c create mode 100644 src/modules/module-rtp/midi.c create mode 100644 src/modules/module-rtp/stream.c create mode 100644 src/modules/module-rtp/stream.h diff --git a/src/modules/meson.build b/src/modules/meson.build index 717fbf2a6..a133d0338 100644 --- a/src/modules/meson.build +++ b/src/modules/meson.build @@ -536,6 +536,16 @@ pipewire_module_rtp_sink = shared_library('pipewire-module-rtp-sink', dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep], ) +pipewire_module_rtp_sink = shared_library('pipewire-module-rtp-session', + [ 'module-rtp/stream.c', + 'module-rtp-session.c' ], + include_directories : [configinc], + install : true, + install_dir : modules_install_dir, + install_rpath: modules_install_dir, + dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep], +) + build_module_roc = roc_dep.found() if build_module_roc pipewire_module_roc_sink = shared_library('pipewire-module-roc-sink', diff --git a/src/modules/module-rtp-session.c b/src/modules/module-rtp-session.c new file mode 100644 index 000000000..a52316646 --- /dev/null +++ b/src/modules/module-rtp-session.c @@ -0,0 +1,1123 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2022 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +#include "config.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include + +/** \page page_module_rtp_sink PipeWire Module: RTP sink + * + * The `rtp-sink` module creates a PipeWire sink that sends audio + * RTP packets. + * + * ## Module Options + * + * Options specific to the behavior of this module + * + * - `source.ip =`: source IP address, default "0.0.0.0" + * - `destination.ip =`: destination IP address, default "224.0.0.56" + * - `destination.port =`: destination port, default random beteen 46000 and 47024 + * - `local.ifname = `: interface name to use + * - `net.mtu = `: MTU to use, default 1280 + * - `net.ttl = `: TTL to use, default 1 + * - `net.loop = `: loopback multicast, default false + * - `sess.min-ptime = `: minimum packet time in milliseconds, default 2 + * - `sess.max-ptime = `: maximum packet time in milliseconds, default 20 + * - `sess.name = `: a session name + * - `sess.ts-offset = `: an offset to apply to the timestamp, default -1 = random offset + * - `sess.ts-refclk = `: the name of a reference clock + * - `rtp.media = `: the media type audio|midi, default audio + * - `stream.props = {}`: properties to be passed to the stream + * + * ## General options + * + * Options with well-known behavior: + * + * - \ref PW_KEY_REMOTE_NAME + * - \ref PW_KEY_AUDIO_FORMAT + * - \ref PW_KEY_AUDIO_RATE + * - \ref PW_KEY_AUDIO_CHANNELS + * - \ref SPA_KEY_AUDIO_POSITION + * - \ref PW_KEY_NODE_NAME + * - \ref PW_KEY_NODE_DESCRIPTION + * - \ref PW_KEY_MEDIA_NAME + * - \ref PW_KEY_NODE_GROUP + * - \ref PW_KEY_NODE_LATENCY + * - \ref PW_KEY_NODE_VIRTUAL + * - \ref PW_KEY_MEDIA_CLASS + * + * ## Example configuration + *\code{.unparsed} + * context.modules = [ + * { name = libpipewire-module-rtp-sink + * args = { + * #local.ifname = "eth0" + * #source.ip = "0.0.0.0" + * #destination.ip = "224.0.0.56" + * #destination.port = 46000 + * #net.mtu = 1280 + * #net.ttl = 1 + * #net.loop = false + * #sess.min-ptime = 2 + * #sess.max-ptime = 20 + * #sess.name = "PipeWire RTP stream" + * #rtp.media = "audio" + * #audio.format = "S16BE" + * #audio.rate = 48000 + * #audio.channels = 2 + * #audio.position = [ FL FR ] + * stream.props = { + * node.name = "rtp-sink" + * } + * } + *} + *] + *\endcode + * + * \since 0.3.60 + */ + +#define NAME "rtp-sink" + +PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); +#define PW_LOG_TOPIC_DEFAULT mod_topic + +#define BUFFER_SIZE (1u<<20) +#define BUFFER_MASK (BUFFER_SIZE-1) + +#define DEFAULT_FORMAT "S16BE" +#define DEFAULT_RATE 48000 +#define DEFAULT_CHANNELS 2 +#define DEFAULT_POSITION "[ FL FR ]" + +#define DEFAULT_SOURCE_IP "0.0.0.0" +#define DEFAULT_SOURCE_PORT 0 +#define DEFAULT_DESTINATION_IP "224.0.0.56" +#define DEFAULT_DESTINATION_PORT 46000 +#define DEFAULT_TTL 1 +#define DEFAULT_MTU 1280 +#define DEFAULT_LOOP false + +#define DEFAULT_TS_OFFSET -1 + +#define USAGE "source.ip= " \ + "destination.ip= " \ + "destination.port= " \ + "local.ifname= " \ + "net.mtu= " \ + "net.ttl= " \ + "net.loop= " \ + "sess.name= " \ + "sess.min-ptime= " \ + "sess.max-ptime= " \ + "rtp.media= " \ + "audio.format= " \ + "audio.rate= " \ + "audio.channels= "\ + "audio.position= " \ + "stream.props= { key=value ... }" + +static const struct spa_dict_item module_info[] = { + { PW_KEY_MODULE_AUTHOR, "Wim Taymans " }, + { PW_KEY_MODULE_DESCRIPTION, "RTP Sink" }, + { PW_KEY_MODULE_USAGE, USAGE }, + { PW_KEY_MODULE_VERSION, PACKAGE_VERSION }, +}; + +struct session { + struct impl *impl; + struct spa_list link; + + struct sockaddr_storage ctrl_addr; + socklen_t ctrl_len; + struct sockaddr_storage data_addr; + socklen_t data_len; + + struct rtp_stream *send; + struct spa_hook send_listener; + struct rtp_stream *recv; + struct spa_hook recv_listener; + + char *name; + + uint32_t initiator; + uint32_t remote_ssrc; + + uint32_t ssrc; + uint32_t ts_offset; + + unsigned ctrl_ready:1; + unsigned data_ready:1; +}; + +struct impl { + struct pw_context *context; + + struct pw_impl_module *module; + struct spa_hook module_listener; + struct pw_properties *props; + + struct pw_properties *stream_props; + + struct pw_loop *loop; + struct pw_loop *data_loop; + + struct pw_core *core; + struct spa_hook core_listener; + struct spa_hook core_proxy_listener; + unsigned int do_disconnect:1; + + struct spa_source *timer; + + struct spa_source *ctrl_source; + struct spa_source *data_source; + + char *ifname; + char *session_name; + uint32_t mtu; + bool ttl; + bool mcast_loop; + int64_t ts_offset; + char *ts_refclk; + int payload; + + struct sockaddr_storage ctrl_addr; + socklen_t ctrl_len; + struct sockaddr_storage src_addr; + socklen_t src_len; + + struct spa_list sessions; + uint32_t n_sessions; +}; + +static ssize_t send_packet(int fd, struct msghdr *msg) +{ + ssize_t n; + n = sendmsg(fd, msg, MSG_NOSIGNAL); + if (n < 0) { + switch (errno) { + case ECONNREFUSED: + case ECONNRESET: + pw_log_debug("remote end not listening"); + break; + default: + pw_log_debug("sendmsg() failed: %m"); + break; + } + } + return n; +} + +static void send_apple_midi_cmd_in(struct session *sess, bool ctrl) +{ + struct impl *impl = sess->impl; + struct iovec iov[3]; + struct msghdr msg; + struct rtp_apple_midi hdr; + + spa_zero(hdr); + hdr.cmd = htonl(APPLE_MIDI_CMD_IN); + hdr.protocol = htonl(2); + hdr.initiator = htonl(sess->initiator); + hdr.ssrc = htonl(sess->ssrc); + + iov[0].iov_base = &hdr; + iov[0].iov_len = sizeof(hdr); + iov[1].iov_base = sess->name; + iov[1].iov_len = strlen(sess->name); + + spa_zero(msg); + msg.msg_name = ctrl ? &sess->ctrl_addr : &sess->data_addr; + msg.msg_namelen = ctrl ? sess->ctrl_len : sess->data_len; + msg.msg_iov = iov; + msg.msg_iovlen = 2; + + send_packet(ctrl ? impl->ctrl_source->fd : impl->data_source->fd, &msg); +} + +static void send_apple_midi_cmd_by(struct session *sess, bool ctrl) +{ + struct impl *impl = sess->impl; + struct iovec iov[3]; + struct msghdr msg; + struct rtp_apple_midi hdr; + + spa_zero(hdr); + hdr.cmd = htonl(APPLE_MIDI_CMD_BY); + hdr.protocol = htonl(2); + hdr.initiator = htonl(sess->initiator); + hdr.ssrc = htonl(sess->ssrc); + + iov[0].iov_base = &hdr; + iov[0].iov_len = sizeof(hdr); + + spa_zero(msg); + msg.msg_name = ctrl ? &sess->ctrl_addr : &sess->data_addr; + msg.msg_namelen = ctrl ? sess->ctrl_len : sess->data_len; + msg.msg_iov = iov; + msg.msg_iovlen = 1; + + send_packet(ctrl ? impl->ctrl_source->fd : impl->data_source->fd, &msg); +} + +static void send_destroy(void *data) +{ +} + +static void send_state_changed(void *data, bool started, const char *error) +{ + struct session *sess = data; + + pw_log_info("send initiator:%08x state %d", sess->initiator, started); + + if (started) { + if (!sess->ctrl_ready) + send_apple_midi_cmd_in(sess, true); + else if (!sess->data_ready) + send_apple_midi_cmd_in(sess, false); + } else { + if (sess->ctrl_ready) + send_apple_midi_cmd_by(sess, true); + if (sess->data_ready) + send_apple_midi_cmd_by(sess, false); + } +} + +static void send_send_packet(void *data, struct iovec *iov, size_t iovlen) +{ + struct session *sess = data; + struct impl *impl = sess->impl; + struct msghdr msg; + + spa_zero(msg); + msg.msg_name = &sess->data_addr; + msg.msg_namelen = sess->data_len; + msg.msg_iov = iov; + msg.msg_iovlen = iovlen; + msg.msg_control = NULL; + msg.msg_controllen = 0; + msg.msg_flags = 0; + + send_packet(impl->data_source->fd, &msg); +} + +static void recv_destroy(void *data) +{ +} +static void recv_state_changed(void *data, bool started, const char *error) +{ + struct session *sess = data; + pw_log_info("send initiator:%08x state %d", sess->initiator, started); + + if (started) { + if (!sess->ctrl_ready) + send_apple_midi_cmd_in(sess, true); + else if (!sess->data_ready) + send_apple_midi_cmd_in(sess, false); + } else { + if (sess->ctrl_ready) + send_apple_midi_cmd_by(sess, true); + if (sess->data_ready) + send_apple_midi_cmd_by(sess, false); + } +} + +static const struct rtp_stream_events send_stream_events = { + RTP_VERSION_STREAM_EVENTS, + .destroy = send_destroy, + .state_changed = send_state_changed, + .send_packet = send_send_packet, +}; + +static const struct rtp_stream_events recv_stream_events = { + RTP_VERSION_STREAM_EVENTS, + .destroy = recv_destroy, + .state_changed = recv_state_changed, +}; + +static void free_session(struct session *sess) +{ + spa_list_remove(&sess->link); + sess->impl->n_sessions--; + + if (sess->send) + rtp_stream_destroy(sess->send); + if (sess->recv) + rtp_stream_destroy(sess->recv); + free(sess); +} + +static bool cmp_ip(const struct sockaddr_storage *sa, const struct sockaddr_storage *sb) +{ + if (sa->ss_family == AF_INET && sb->ss_family == AF_INET) { + struct sockaddr_in *ia = (struct sockaddr_in*)sa; + struct sockaddr_in *ib = (struct sockaddr_in*)sb; + return ia->sin_addr.s_addr == ib->sin_addr.s_addr; + } else if (sa->ss_family == AF_INET6 && sb->ss_family == AF_INET6) { + struct sockaddr_in6 *ia = (struct sockaddr_in6*)sa; + struct sockaddr_in6 *ib = (struct sockaddr_in6*)sb; + return ia->sin6_addr.s6_addr == ib->sin6_addr.s6_addr; + } + return false; +} + +static int get_ip(const struct sockaddr_storage *sa, char *ip, size_t len, uint16_t *port) +{ + if (sa->ss_family == AF_INET) { + struct sockaddr_in *in = (struct sockaddr_in*)sa; + inet_ntop(sa->ss_family, &in->sin_addr, ip, len); + *port = ntohs(in->sin_port); + } else if (sa->ss_family == AF_INET6) { + struct sockaddr_in6 *in = (struct sockaddr_in6*)sa; + inet_ntop(sa->ss_family, &in->sin6_addr, ip, len); + *port = ntohs(in->sin6_port); + } else + return -EIO; + return 0; +} + +static struct session *make_session(struct impl *impl, const char *name) +{ + struct session *sess; + char addr[64]; + struct pw_properties *props; + uint16_t port = 0; + + sess = calloc(1, sizeof(struct session)); + if (sess == NULL) + return NULL; + + spa_list_append(&impl->sessions, &sess->link); + impl->n_sessions++; + + sess->impl = impl; + sess->initiator = pw_rand32(); + sess->ssrc = pw_rand32(); + sess->ts_offset = impl->ts_offset < 0 ? pw_rand32() : impl->ts_offset; + sess->name = strdup(name); + + props = pw_properties_copy(impl->stream_props); + if (props == NULL) + return NULL; + + get_ip(&sess->ctrl_addr, addr, sizeof(addr), &port); + pw_properties_set(props, "rtp.destination.ip", addr); + pw_properties_setf(props, "rtp.destination.port", "%u", port); + pw_properties_setf(props, "rtp.ts-offset", "%u", sess->ts_offset); + pw_properties_setf(props, "rtp.sender-ssrc", "%u", sess->ssrc); + pw_properties_setf(props, "rtp.receiver-ssrc", "%u", sess->remote_ssrc); + + sess->send = rtp_stream_new(impl->core, + PW_DIRECTION_INPUT, pw_properties_copy(props), + &send_stream_events, sess); + sess->recv = rtp_stream_new(impl->core, + PW_DIRECTION_OUTPUT, pw_properties_copy(props), + &recv_stream_events, sess); + + return sess; +} + +static struct session *find_session_by_addr_name(struct impl *impl, + const struct sockaddr_storage *sa, const char *name) +{ + struct session *sess; + spa_list_for_each(sess, &impl->sessions, link) { + if (cmp_ip(sa, &sess->ctrl_addr) && + spa_streq(sess->name, name)) + return sess; + } + return NULL; +} +static struct session *find_session_by_initiator(struct impl *impl, uint32_t initiator) +{ + struct session *sess; + spa_list_for_each(sess, &impl->sessions, link) { + if (sess->initiator == initiator) + return sess; + } + return NULL; +} + +static struct session *find_session_by_ssrc(struct impl *impl, uint32_t ssrc) +{ + struct session *sess; + spa_list_for_each(sess, &impl->sessions, link) { + if (sess->remote_ssrc == ssrc) + return sess; + } + return NULL; +} + +static void parse_apple_midi_cmd_in(struct impl *impl, bool ctrl, uint8_t *buffer, + ssize_t len, struct sockaddr_storage *sa, socklen_t salen) +{ + struct rtp_apple_midi *hdr = (struct rtp_apple_midi*)buffer; + struct iovec iov[3]; + struct msghdr msg; + struct rtp_apple_midi reply; + struct session *sess; + bool success = true; + uint32_t initiator = ntohl(hdr->initiator); + char addr[128]; + uint16_t port = 0; + + get_ip(sa, addr, sizeof(addr), &port); + pw_log_info("IN from %s:%d %s", addr, port, hdr->name); + + sess = find_session_by_initiator(impl, initiator); + if (sess == NULL) + sess = find_session_by_addr_name(impl, sa, hdr->name); + + if (ctrl) { + if (sess != NULL) { + if (sess->ctrl_ready) { + pw_log_warn("receive ctrl IN from existing session %08x", initiator); + success = false; + } + } else { + pw_log_info("got control IN request %08x", initiator); + sess = make_session(impl, hdr->name); + if (sess == NULL) { + pw_log_warn("failed to make session: %m"); + success = false; + } + } + if (success) { + sess->initiator = initiator; + sess->remote_ssrc = ntohl(hdr->ssrc); + sess->ctrl_addr = *sa; + sess->ctrl_len = salen; + sess->ctrl_ready = true; + } + } + else { + if (sess == NULL) { + pw_log_warn("receive data IN from nonexisting session %08x", initiator); + success = false; + } else { + pw_log_info("got data IN request %08x", initiator); + sess->data_addr = *sa; + sess->data_len = salen; + sess->data_ready = true; + } + } + + reply = *hdr; + if (success) { + reply.cmd = htonl(APPLE_MIDI_CMD_OK); + reply.ssrc = htonl(sess->ssrc); + } else + reply.cmd = htonl(APPLE_MIDI_CMD_NO); + + iov[0].iov_base = &reply; + iov[0].iov_len = sizeof(reply); + iov[1].iov_base = sess->name; + iov[1].iov_len = strlen(sess->name); + + spa_zero(msg); + msg.msg_name = sa; + msg.msg_namelen = salen; + msg.msg_iov = iov; + msg.msg_iovlen = 2; + + pw_log_debug("send %p %u", msg.msg_name, msg.msg_namelen); + + send_packet(ctrl ? impl->ctrl_source->fd : impl->data_source->fd, &msg); +} + +static void parse_apple_midi_cmd_ok(struct impl *impl, bool ctrl, uint8_t *buffer, + ssize_t len, struct sockaddr_storage *sa, socklen_t salen) +{ + struct rtp_apple_midi *hdr = (struct rtp_apple_midi*)buffer; + uint32_t initiator = ntohl(hdr->initiator); + struct session *sess; + + sess = find_session_by_initiator(impl, initiator); + if (sess == NULL) { + pw_log_warn("received OK from nonexisting session %u", initiator); + return; + } + + if (ctrl) { + pw_log_info("got ctrl OK %08x %d", initiator, sess->data_ready); + sess->ctrl_ready = true; + if (!sess->data_ready) + send_apple_midi_cmd_in(sess, false); + } else { + pw_log_info("got data OK %08x", initiator); + sess->data_ready = true; + } +} + +static void parse_apple_midi_cmd_ck(struct impl *impl, bool ctrl, uint8_t *buffer, + ssize_t len, struct sockaddr_storage *sa, socklen_t salen) +{ + struct rtp_apple_midi_ck *hdr = (struct rtp_apple_midi_ck*)buffer; + struct iovec iov[3]; + struct msghdr msg; + struct rtp_apple_midi_ck reply; + struct timespec ts; + struct session *sess; + uint64_t now; + uint32_t ssrc = ntohl(hdr->ssrc); + + sess = find_session_by_ssrc(impl, ssrc); + if (sess == NULL) { + pw_log_warn("unknown SSRC %u", ssrc); + return; + } + + pw_log_debug("got CK count %d", hdr->count); + + clock_gettime(CLOCK_MONOTONIC, &ts); + now = SPA_TIMESPEC_TO_NSEC(&ts); + reply = *hdr; + reply.ssrc = htonl(sess->ssrc); + reply.count++; + iov[0].iov_base = &reply; + iov[0].iov_len = sizeof(reply); + + switch (hdr->count) { + case 0: + reply.ts2_h = htonl(now >> 32); + reply.ts2_l = htonl(now); + break; + case 1: + reply.ts3_h = htonl(now >> 32); + reply.ts3_l = htonl(now); + break; + case 2: + return; + } + + spa_zero(msg); + msg.msg_name = sa; + msg.msg_namelen = salen; + msg.msg_iov = iov; + msg.msg_iovlen = 1; + + pw_log_debug("send %p %u", msg.msg_name, msg.msg_namelen); + + send_packet(ctrl ? impl->ctrl_source->fd : impl->data_source->fd, &msg); +} + +static void parse_apple_midi_cmd(struct impl *impl, bool ctrl, uint8_t *buffer, + ssize_t len, struct sockaddr_storage *sa, socklen_t salen) +{ + struct rtp_apple_midi *hdr = (struct rtp_apple_midi*)buffer; + switch (ntohl(hdr->cmd)) { + case APPLE_MIDI_CMD_IN: + parse_apple_midi_cmd_in(impl, ctrl, buffer, len, sa, salen); + break; + case APPLE_MIDI_CMD_OK: + parse_apple_midi_cmd_ok(impl, ctrl, buffer, len, sa, salen); + break; + case APPLE_MIDI_CMD_CK: + parse_apple_midi_cmd_ck(impl, ctrl, buffer, len, sa, salen); + break; + default: + break; + } +} + +static void +on_ctrl_io(void *data, int fd, uint32_t mask) +{ + struct impl *impl = data; + ssize_t len; + uint8_t buffer[2048]; + + if (mask & SPA_IO_IN) { + struct sockaddr_storage sa; + socklen_t salen = sizeof(sa); + + if ((len = recvfrom(fd, buffer, sizeof(buffer), 0, + (struct sockaddr*)&sa, &salen)) < 0) + goto receive_error; + + if (len < 12) + goto short_packet; + + if (buffer[0] == 0xff && buffer[1] == 0xff) { + parse_apple_midi_cmd(impl, true, buffer, len, &sa, salen); + } else { + spa_debug_mem(0, buffer, len); + } + } + return; + +receive_error: + pw_log_warn("recv error: %m"); + return; +short_packet: + pw_log_warn("short packet received"); + return; +} + +static void +on_data_io(void *data, int fd, uint32_t mask) +{ + struct impl *impl = data; + ssize_t len; + uint8_t buffer[2048]; + + if (mask & SPA_IO_IN) { + struct sockaddr_storage sa; + socklen_t salen = sizeof(sa); + + if ((len = recvfrom(fd, buffer, sizeof(buffer), 0, + (struct sockaddr*)&sa, &salen)) < 0) + goto receive_error; + + if (len < 12) + goto short_packet; + + if (buffer[0] == 0xff && buffer[1] == 0xff) { + parse_apple_midi_cmd(impl, false, buffer, len, &sa, salen); + } else { + struct rtp_header *hdr = (struct rtp_header*)buffer; + struct session *sess = find_session_by_ssrc(impl, ntohl(hdr->ssrc)); + if (sess == NULL) + goto unknown_ssrc; + + rtp_stream_receive_packet(sess->recv, buffer, len); + } + } + return; + +receive_error: + pw_log_warn("recv error: %m"); + return; +short_packet: + pw_log_warn("short packet received"); + return; +unknown_ssrc: + pw_log_warn("unknown SSRC"); + return; +} + +static int make_socket(const struct sockaddr_storage* sa, socklen_t salen, + bool loop, int ttl, char *ifname) +{ + int af, fd, val, res; + struct ifreq req; + struct sockaddr_storage src = *sa; + bool is_multicast = false; + + af = sa->ss_family; + if ((fd = socket(af, SOCK_DGRAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)) < 0) { + pw_log_error("socket failed: %m"); + return -errno; + } +#ifdef SO_TIMESTAMP + val = 1; + if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &val, sizeof(val)) < 0) { + res = -errno; + pw_log_error("setsockopt failed: %m"); + goto error; + } +#endif + val = 1; + if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) < 0) { + res = -errno; + pw_log_error("setsockopt failed: %m"); + goto error; + } + + spa_zero(req); + if (ifname) { + snprintf(req.ifr_name, sizeof(req.ifr_name), "%s", ifname); + res = ioctl(fd, SIOCGIFINDEX, &req); + if (res < 0) + pw_log_warn("SIOCGIFINDEX %s failed: %m", ifname); + } + res = 0; + if (af == AF_INET) { + static const uint32_t ipv4_mcast_mask = 0xe0000000; + struct sockaddr_in *sa4 = (struct sockaddr_in*)&src; + if ((ntohl(sa4->sin_addr.s_addr) & ipv4_mcast_mask) == ipv4_mcast_mask) { + struct ip_mreqn mr4; + memset(&mr4, 0, sizeof(mr4)); + mr4.imr_multiaddr = sa4->sin_addr; + mr4.imr_ifindex = req.ifr_ifindex; + res = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4)); + is_multicast = true; + } else { + sa4->sin_addr.s_addr = INADDR_ANY; + } + } else if (af == AF_INET6) { + struct sockaddr_in6 *sa6 = (struct sockaddr_in6*)&src; + if (sa6->sin6_addr.s6_addr[0] == 0xff) { + struct ipv6_mreq mr6; + memset(&mr6, 0, sizeof(mr6)); + mr6.ipv6mr_multiaddr = sa6->sin6_addr; + mr6.ipv6mr_interface = req.ifr_ifindex; + res = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6)); + is_multicast = true; + } else { + sa6->sin6_addr = in6addr_any; + } + } else { + res = -EINVAL; + goto error; + } + + if (res < 0) { + res = -errno; + pw_log_error("join mcast failed: %m"); + goto error; + } + if (is_multicast) { + val = loop; + if (setsockopt(fd, IPPROTO_IP, IP_MULTICAST_LOOP, &val, sizeof(val)) < 0) + pw_log_warn("setsockopt(IP_MULTICAST_LOOP) failed: %m"); + + val = ttl; + if (setsockopt(fd, IPPROTO_IP, IP_MULTICAST_TTL, &val, sizeof(val)) < 0) + pw_log_warn("setsockopt(IP_MULTICAST_TTL) failed: %m"); + } + + if (bind(fd, (struct sockaddr*)&src, salen) < 0) { + res = -errno; + pw_log_error("bind() failed: %m"); + goto error; + } + /* FIXME AES67 wants IPTOS_DSCP_AF41 */ + val = IPTOS_LOWDELAY; + if (setsockopt(fd, IPPROTO_IP, IP_TOS, &val, sizeof(val)) < 0) + pw_log_warn("setsockopt(IP_TOS) failed: %m"); + + return fd; +error: + return res; +} + +static int setup_apple_session(struct impl *impl) +{ + int fd; + + if ((fd = make_socket(&impl->ctrl_addr, impl->ctrl_len, + impl->mcast_loop, impl->ttl, impl->ifname)) < 0) { + return fd; + } + impl->ctrl_source = pw_loop_add_io(impl->loop, fd, + SPA_IO_IN, true, on_ctrl_io, impl); + + if (impl->ctrl_source == NULL) + return -errno; + + if ((fd = make_socket(&impl->src_addr, impl->src_len, + impl->mcast_loop, impl->ttl, impl->ifname)) < 0) + return fd; + + impl->data_source = pw_loop_add_io(impl->data_loop, fd, + SPA_IO_IN, true, on_data_io, impl); + if (impl->data_source == NULL) + return -errno; + + return 0; +} + +static void core_destroy(void *d) +{ + struct impl *impl = d; + spa_hook_remove(&impl->core_listener); + impl->core = NULL; + pw_impl_module_schedule_destroy(impl->module); +} + +static const struct pw_proxy_events core_proxy_events = { + .destroy = core_destroy, +}; + +static void impl_destroy(struct impl *impl) +{ + struct session *sess; + + spa_list_consume(sess, &impl->sessions, link) + free_session(sess); + + if (impl->core && impl->do_disconnect) + pw_core_disconnect(impl->core); + + if (impl->timer) + pw_loop_destroy_source(impl->loop, impl->timer); + if (impl->ctrl_source) + pw_loop_destroy_source(impl->loop, impl->ctrl_source); + if (impl->data_source) + pw_loop_destroy_source(impl->data_loop, impl->data_source); + + pw_properties_free(impl->stream_props); + pw_properties_free(impl->props); + + free(impl->ifname); + free(impl->ts_refclk); + free(impl->session_name); + free(impl); +} + +static void module_destroy(void *d) +{ + struct impl *impl = d; + spa_hook_remove(&impl->module_listener); + impl_destroy(impl); +} + +static const struct pw_impl_module_events module_events = { + PW_VERSION_IMPL_MODULE_EVENTS, + .destroy = module_destroy, +}; + +static void on_core_error(void *d, uint32_t id, int seq, int res, const char *message) +{ + struct impl *impl = d; + + pw_log_error("error id:%u seq:%d res:%d (%s): %s", + id, seq, res, spa_strerror(res), message); + + if (id == PW_ID_CORE && res == -EPIPE) + pw_impl_module_schedule_destroy(impl->module); +} + +static const struct pw_core_events core_events = { + PW_VERSION_CORE_EVENTS, + .error = on_core_error, +}; + +static int parse_address(const char *address, uint16_t port, + struct sockaddr_storage *addr, socklen_t *len) +{ + struct sockaddr_in *sa4 = (struct sockaddr_in*)addr; + struct sockaddr_in6 *sa6 = (struct sockaddr_in6*)addr; + + if (inet_pton(AF_INET, address, &sa4->sin_addr) > 0) { + sa4->sin_family = AF_INET; + sa4->sin_port = htons(port); + *len = sizeof(*sa4); + } else if (inet_pton(AF_INET6, address, &sa6->sin6_addr) > 0) { + sa6->sin6_family = AF_INET6; + sa6->sin6_port = htons(port); + *len = sizeof(*sa6); + } else + return -EINVAL; + + return 0; +} + +static void copy_props(struct impl *impl, struct pw_properties *props, const char *key) +{ + const char *str; + if ((str = pw_properties_get(props, key)) != NULL) { + if (pw_properties_get(impl->stream_props, key) == NULL) + pw_properties_set(impl->stream_props, key, str); + } +} + +SPA_EXPORT +int pipewire__module_init(struct pw_impl_module *module, const char *args) +{ + struct pw_context *context = pw_impl_module_get_context(module); + struct impl *impl; + struct pw_properties *props = NULL, *stream_props = NULL; + uint32_t id = pw_global_get_id(pw_impl_module_get_global(module)); + uint32_t pid = getpid(); + char addr[64]; + uint16_t port; + const char *str; + int res = 0; + + PW_LOG_TOPIC_INIT(mod_topic); + + impl = calloc(1, sizeof(struct impl)); + if (impl == NULL) + return -errno; + + if (args == NULL) + args = ""; + + spa_list_init(&impl->sessions); + + props = pw_properties_new_string(args); + if (props == NULL) { + res = -errno; + pw_log_error( "can't create properties: %m"); + goto out; + } + impl->props = props; + + stream_props = pw_properties_new(NULL, NULL); + if (stream_props == NULL) { + res = -errno; + pw_log_error( "can't create properties: %m"); + goto out; + } + impl->stream_props = stream_props; + + impl->module = module; + impl->context = context; + impl->loop = pw_context_get_main_loop(context); + impl->data_loop = pw_data_loop_get_loop(pw_context_get_data_loop(context)); + + if (pw_properties_get(props, PW_KEY_NODE_VIRTUAL) == NULL) + pw_properties_set(props, PW_KEY_NODE_VIRTUAL, "true"); + if (pw_properties_get(stream_props, PW_KEY_NODE_NETWORK) == NULL) + pw_properties_set(stream_props, PW_KEY_NODE_NETWORK, "true"); + + if (pw_properties_get(props, PW_KEY_NODE_NAME) == NULL) + pw_properties_setf(props, PW_KEY_NODE_NAME, "rtp-session-%u-%u", pid, id); + if (pw_properties_get(props, PW_KEY_NODE_DESCRIPTION) == NULL) + pw_properties_set(props, PW_KEY_NODE_DESCRIPTION, + pw_properties_get(props, PW_KEY_NODE_NAME)); + if (pw_properties_get(props, PW_KEY_MEDIA_NAME) == NULL) + pw_properties_set(props, PW_KEY_MEDIA_NAME, "RTP Session Stream"); + + if ((str = pw_properties_get(props, "stream.props")) != NULL) + pw_properties_update_string(stream_props, str, strlen(str)); + + copy_props(impl, props, PW_KEY_AUDIO_FORMAT); + copy_props(impl, props, PW_KEY_AUDIO_RATE); + copy_props(impl, props, PW_KEY_AUDIO_CHANNELS); + copy_props(impl, props, SPA_KEY_AUDIO_POSITION); + copy_props(impl, props, PW_KEY_NODE_NAME); + copy_props(impl, props, PW_KEY_NODE_DESCRIPTION); + copy_props(impl, props, PW_KEY_NODE_GROUP); + copy_props(impl, props, PW_KEY_NODE_LATENCY); + copy_props(impl, props, PW_KEY_NODE_VIRTUAL); + copy_props(impl, props, PW_KEY_NODE_CHANNELNAMES); + copy_props(impl, props, PW_KEY_MEDIA_NAME); + copy_props(impl, props, PW_KEY_MEDIA_CLASS); + + str = pw_properties_get(props, "local.ifname"); + impl->ifname = str ? strdup(str) : NULL; + + port = DEFAULT_SOURCE_PORT; + port = pw_properties_get_uint32(props, "control.port", port); + if ((str = pw_properties_get(props, "control.ip")) == NULL) + str = DEFAULT_SOURCE_IP; + + if ((res = parse_address(str, port, &impl->ctrl_addr, &impl->ctrl_len)) < 0) { + pw_log_error("invalid control.ip %s: %s", str, spa_strerror(res)); + goto out; + } + if ((res = parse_address(str, port ? port+1 : 0, &impl->src_addr, &impl->src_len)) < 0) { + pw_log_error("invalid data.ip %s: %s", str, spa_strerror(res)); + goto out; + } + + impl->mtu = pw_properties_get_uint32(props, "net.mtu", DEFAULT_MTU); + impl->ttl = pw_properties_get_uint32(props, "net.ttl", DEFAULT_TTL); + impl->mcast_loop = pw_properties_get_bool(props, "net.loop", DEFAULT_LOOP); + + impl->ts_offset = pw_properties_get_int64(props, + "sess.ts-offset", DEFAULT_TS_OFFSET); + + str = pw_properties_get(props, "sess.ts-refclk"); + impl->ts_refclk = str ? strdup(str) : NULL; + + if ((str = pw_properties_get(props, "sess.name")) == NULL) + pw_properties_setf(props, "sess.name", "PipeWire RTP Stream on %s", + pw_get_host_name()); + str = pw_properties_get(props, "sess.name"); + impl->session_name = str ? strdup(str) : NULL; + + pw_properties_set(stream_props, "rtp.session", impl->session_name); + get_ip(&impl->src_addr, addr, sizeof(addr), &port); + pw_properties_set(stream_props, "rtp.source.ip", addr); + pw_properties_setf(stream_props, "rtp.source.port", "%u", port); + pw_properties_setf(stream_props, "rtp.mtu", "%u", impl->mtu); + pw_properties_setf(stream_props, "rtp.ttl", "%u", impl->ttl); + + if ((str = pw_properties_get(props, "rtp.media")) != NULL) + pw_properties_set(stream_props, "rtp.media", str); + if ((str = pw_properties_get(props, "sess.min-ptime")) != NULL) + pw_properties_set(stream_props, "rtp.min-ptime", str); + if ((str = pw_properties_get(props, "sess.max-ptime")) != NULL) + pw_properties_set(stream_props, "rtp.max-ptime", str); + if (impl->ts_refclk != NULL) + pw_properties_set(stream_props, "rtp.ts-refclk", impl->ts_refclk); + + impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core); + if (impl->core == NULL) { + str = pw_properties_get(props, PW_KEY_REMOTE_NAME); + impl->core = pw_context_connect(impl->context, + pw_properties_new( + PW_KEY_REMOTE_NAME, str, + NULL), + 0); + impl->do_disconnect = true; + } + if (impl->core == NULL) { + res = -errno; + pw_log_error("can't connect: %m"); + goto out; + } + + pw_proxy_add_listener((struct pw_proxy*)impl->core, + &impl->core_proxy_listener, + &core_proxy_events, impl); + pw_core_add_listener(impl->core, + &impl->core_listener, + &core_events, impl); + + str = pw_properties_get(props, "destination.ip"); + if (str != NULL) { + struct session *sess; + + port = pw_properties_get_uint32(props, "destination.port", 0); + if (port == 0) { + pw_log_error("invalid destination.port"); + goto out; + } + sess = make_session(impl, impl->session_name); + if (sess == NULL) { + res = -errno; + pw_log_error("can't create session: %m"); + goto out; + } + if ((res = parse_address(str, port, &sess->ctrl_addr, &sess->ctrl_len)) < 0) { + pw_log_error("invalid destination.ip %s: %s", str, spa_strerror(res)); + goto out; + } + if ((res = parse_address(str, port+1, &sess->data_addr, &sess->data_len)) < 0) { + pw_log_error("invalid destination.ip %s: %s", str, spa_strerror(res)); + goto out; + } + } + if ((res = setup_apple_session(impl)) < 0) + goto out; + + + pw_impl_module_add_listener(module, &impl->module_listener, &module_events, impl); + + pw_impl_module_update_properties(module, &SPA_DICT_INIT_ARRAY(module_info)); + + return 0; +out: + impl_destroy(impl); + return res; +} diff --git a/src/modules/module-rtp/audio.c b/src/modules/module-rtp/audio.c new file mode 100644 index 000000000..247a985b6 --- /dev/null +++ b/src/modules/module-rtp/audio.c @@ -0,0 +1,303 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2022 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +static void process_audio_playback(void *data) +{ + struct impl *impl = data; + struct pw_buffer *buf; + struct spa_data *d; + uint32_t wanted, timestamp, target_buffer, stride, maxsize; + int32_t avail; + + if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { + pw_log_debug("Out of stream buffers: %m"); + return; + } + d = buf->buffer->datas; + + stride = impl->stride; + + maxsize = d[0].maxsize / stride; + wanted = buf->requested ? SPA_MIN(buf->requested, maxsize) : maxsize; + + if (impl->io_position && impl->direct_timestamp) { + /* in direct mode, read directly from the timestamp index, + * because sender and receiver are in sync, this would keep + * target_buffer of samples available. */ + spa_ringbuffer_read_update(&impl->ring, + impl->io_position->clock.position); + } + avail = spa_ringbuffer_get_read_index(&impl->ring, ×tamp); + + target_buffer = impl->target_buffer; + + if (avail < (int32_t)wanted) { + enum spa_log_level level; + memset(d[0].data, 0, wanted * stride); + if (impl->have_sync) { + impl->have_sync = false; + level = SPA_LOG_LEVEL_WARN; + } else { + level = SPA_LOG_LEVEL_DEBUG; + } + pw_log(level, "underrun %d/%u < %u", + avail, target_buffer, wanted); + } else { + float error, corr; + if (impl->first) { + if ((uint32_t)avail > target_buffer) { + uint32_t skip = avail - target_buffer; + pw_log_debug("first: avail:%d skip:%u target:%u", + avail, skip, target_buffer); + timestamp += skip; + avail = target_buffer; + } + impl->first = false; + } else if (avail > (int32_t)SPA_MIN(target_buffer * 8, BUFFER_SIZE / stride)) { + pw_log_warn("overrun %u > %u", avail, target_buffer * 8); + timestamp += avail - target_buffer; + avail = target_buffer; + } + if (!impl->direct_timestamp) { + /* when not using direct timestamp and clocks are not + * in sync, try to adjust our playback rate to keep the + * requested target_buffer bytes in the ringbuffer */ + error = (float)target_buffer - (float)avail; + error = SPA_CLAMP(error, -impl->max_error, impl->max_error); + + corr = spa_dll_update(&impl->dll, error); + + pw_log_debug("avail:%u target:%u error:%f corr:%f", avail, + target_buffer, error, corr); + + if (impl->io_rate_match) { + SPA_FLAG_SET(impl->io_rate_match->flags, + SPA_IO_RATE_MATCH_FLAG_ACTIVE); + impl->io_rate_match->rate = 1.0f / corr; + } + } + spa_ringbuffer_read_data(&impl->ring, + impl->buffer, + BUFFER_SIZE, + (timestamp * stride) & BUFFER_MASK, + d[0].data, wanted * stride); + + timestamp += wanted; + spa_ringbuffer_read_update(&impl->ring, timestamp); + } + d[0].chunk->size = wanted * stride; + d[0].chunk->stride = stride; + d[0].chunk->offset = 0; + buf->size = wanted; + + pw_stream_queue_buffer(impl->stream, buf); +} + +static void receive_rtp_audio(struct impl *impl, uint8_t *buffer, ssize_t len) +{ + struct rtp_header *hdr; + ssize_t hlen, plen; + uint16_t seq; + uint32_t timestamp, samples, write, expected_write; + uint32_t stride = impl->stride; + int32_t filled; + + if (len < 12) + goto short_packet; + + hdr = (struct rtp_header*)buffer; + if (hdr->v != 2) + goto invalid_version; + + + hlen = 12 + hdr->cc * 4; + if (hlen > len) + goto invalid_len; + + if (impl->have_ssrc && impl->ssrc != hdr->ssrc) + goto unexpected_ssrc; + impl->ssrc = hdr->ssrc; + impl->have_ssrc = true; + + seq = ntohs(hdr->sequence_number); + if (impl->have_seq && impl->seq != seq) { + pw_log_info("unexpected seq (%d != %d) SSRC:%u", + seq, impl->seq, hdr->ssrc); + impl->have_sync = false; + } + impl->seq = seq + 1; + impl->have_seq = true; + + timestamp = ntohl(hdr->timestamp) - impl->ts_offset; + + impl->receiving = true; + + plen = len - hlen; + samples = plen / stride; + + filled = spa_ringbuffer_get_write_index(&impl->ring, &expected_write); + + /* we always write to timestamp + delay */ + write = timestamp + impl->target_buffer; + + if (!impl->have_sync) { + pw_log_info("sync to timestamp:%u seq:%u ts_offset:%u SSRC:%u direct:%d", + write, impl->seq-1, impl->ts_offset, impl->ssrc, + impl->direct_timestamp); + + /* we read from timestamp, keeping target_buffer of data + * in the ringbuffer. */ + impl->ring.readindex = timestamp; + impl->ring.writeindex = write; + filled = impl->target_buffer; + + spa_dll_init(&impl->dll); + spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 128, impl->rate); + memset(impl->buffer, 0, BUFFER_SIZE); + impl->have_sync = true; + } else if (expected_write != write) { + pw_log_debug("unexpected write (%u != %u)", + write, expected_write); + } + + if (filled + samples > BUFFER_SIZE / stride) { + pw_log_debug("capture overrun %u + %u > %u", filled, samples, + BUFFER_SIZE / stride); + impl->have_sync = false; + } else { + pw_log_debug("got samples:%u", samples); + spa_ringbuffer_write_data(&impl->ring, + impl->buffer, + BUFFER_SIZE, + (write * stride) & BUFFER_MASK, + &buffer[hlen], (samples * stride)); + write += samples; + spa_ringbuffer_write_update(&impl->ring, write); + } + return; + +short_packet: + pw_log_warn("short packet received"); + return; +invalid_version: + pw_log_warn("invalid RTP version"); + spa_debug_mem(0, buffer, len); + return; +invalid_len: + pw_log_warn("invalid RTP length"); + return; +unexpected_ssrc: + pw_log_warn("unexpected SSRC (expected %u != %u)", + impl->ssrc, hdr->ssrc); + return; +} + +static inline void +set_iovec(struct spa_ringbuffer *rbuf, void *buffer, uint32_t size, + uint32_t offset, struct iovec *iov, uint32_t len) +{ + iov[0].iov_len = SPA_MIN(len, size - offset); + iov[0].iov_base = SPA_PTROFF(buffer, offset, void); + iov[1].iov_len = len - iov[0].iov_len; + iov[1].iov_base = buffer; +} + +static void flush_audio_packets(struct impl *impl) +{ + int32_t avail; + uint32_t stride, timestamp; + struct iovec iov[3]; + struct rtp_header header; + int32_t tosend; + + avail = spa_ringbuffer_get_read_index(&impl->ring, ×tamp); + tosend = impl->psamples; + + if (avail < tosend) + return; + + stride = impl->stride; + + spa_zero(header); + header.v = 2; + header.pt = impl->payload; + header.ssrc = htonl(impl->ssrc); + + iov[0].iov_base = &header; + iov[0].iov_len = sizeof(header); + + while (avail >= tosend) { + header.sequence_number = htons(impl->seq); + header.timestamp = htonl(impl->ts_offset + timestamp); + + set_iovec(&impl->ring, + impl->buffer, BUFFER_SIZE, + (timestamp * stride) & BUFFER_MASK, + &iov[1], tosend * stride); + + pw_log_trace("sending %d timestamp:%d", tosend, timestamp); + + rtp_stream_emit_send_packet(impl, iov, 3); + + impl->seq++; + timestamp += tosend; + avail -= tosend; + } + spa_ringbuffer_read_update(&impl->ring, timestamp); +} + +static void process_audio_capture(void *data) +{ + struct impl *impl = data; + struct pw_buffer *buf; + struct spa_data *d; + uint32_t offs, size, timestamp, expected_timestamp, stride; + int32_t filled, wanted; + + if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { + pw_log_debug("Out of stream buffers: %m"); + return; + } + d = buf->buffer->datas; + + offs = SPA_MIN(d[0].chunk->offset, d[0].maxsize); + size = SPA_MIN(d[0].chunk->size, d[0].maxsize - offs); + stride = impl->stride; + wanted = size / stride; + + filled = spa_ringbuffer_get_write_index(&impl->ring, &expected_timestamp); + if (SPA_LIKELY(impl->io_position)) + timestamp = impl->io_position->clock.position; + else + timestamp = expected_timestamp; + + if (impl->have_sync) { + if (expected_timestamp != timestamp) { + pw_log_warn("expected %u != timestamp %u", expected_timestamp, timestamp); + impl->have_sync = false; + } else if (filled + wanted > (int32_t)(BUFFER_SIZE / stride)) { + pw_log_warn("overrun %u + %u > %u", filled, wanted, BUFFER_SIZE / stride); + impl->have_sync = false; + } + } + if (!impl->have_sync) { + pw_log_info("sync to timestamp:%u seq:%u ts_offset:%u SSRC:%u", + timestamp, impl->seq, impl->ts_offset, impl->ssrc); + impl->ring.readindex = impl->ring.writeindex = timestamp; + memset(impl->buffer, 0, BUFFER_SIZE); + impl->have_sync = true; + } + + spa_ringbuffer_write_data(&impl->ring, + impl->buffer, + BUFFER_SIZE, + (timestamp * stride) & BUFFER_MASK, + SPA_PTROFF(d[0].data, offs, void), wanted * stride); + timestamp += wanted; + spa_ringbuffer_write_update(&impl->ring, timestamp); + + pw_stream_queue_buffer(impl->stream, buf); + + flush_audio_packets(impl); +} diff --git a/src/modules/module-rtp/midi.c b/src/modules/module-rtp/midi.c new file mode 100644 index 000000000..7095927f6 --- /dev/null +++ b/src/modules/module-rtp/midi.c @@ -0,0 +1,472 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2022 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +static void process_midi_playback(void *data) +{ + struct impl *impl = data; + struct pw_buffer *buf; + struct spa_data *d; + uint32_t timestamp, duration, maxsize, read, rate; + struct spa_pod_builder b; + struct spa_pod_frame f[1]; + void *ptr; + struct spa_pod *pod; + struct spa_pod_control *c; + + if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { + pw_log_debug("Out of stream buffers: %m"); + return; + } + d = buf->buffer->datas; + + maxsize = d[0].maxsize; + + /* we always use the graph position to select events, the receiver side is + * responsible for smoothing out the RTP timestamps to graph time */ + duration = impl->io_position->clock.duration; + if (impl->io_position) { + timestamp = impl->io_position->clock.position; + rate = impl->io_position->clock.rate.denom; + } else { + timestamp = 0; + rate = impl->rate; + } + + /* we copy events into the buffer based on the rtp timestamp + delay. */ + spa_pod_builder_init(&b, d[0].data, maxsize); + spa_pod_builder_push_sequence(&b, &f[0], 0); + + while (true) { + int32_t avail = spa_ringbuffer_get_read_index(&impl->ring, &read); + if (avail <= 0) + break; + + ptr = SPA_PTROFF(impl->buffer, read & BUFFER_MASK2, void); + + if ((pod = spa_pod_from_data(ptr, avail, 0, avail)) == NULL) + goto done; + if (!spa_pod_is_sequence(pod)) + goto done; + + /* the ringbuffer contains series of sequences, one for each + * received packet */ + SPA_POD_SEQUENCE_FOREACH((struct spa_pod_sequence*)pod, c) { + /* try to render with given delay */ + uint32_t target = c->offset + impl->target_buffer; + target = (uint64_t)target * rate / impl->rate; + if (timestamp != 0) { + /* skip old packets */ + if (target < timestamp) + continue; + /* event for next cycle */ + if (target >= timestamp + duration) + goto complete; + } else { + timestamp = target; + } + spa_pod_builder_control(&b, target - timestamp, SPA_CONTROL_Midi); + spa_pod_builder_bytes(&b, + SPA_POD_BODY(&c->value), + SPA_POD_BODY_SIZE(&c->value)); + } + /* we completed a sequence (one RTP packet), advance ringbuffer + * and go to the next packet */ + read += SPA_PTRDIFF(c, ptr); + spa_ringbuffer_read_update(&impl->ring, read); + } +complete: + spa_pod_builder_pop(&b, &f[0]); + + if (b.state.offset > maxsize) { + pw_log_warn("overflow buffer %u %u", b.state.offset, maxsize); + b.state.offset = 0; + } + d[0].chunk->size = b.state.offset; + d[0].chunk->stride = 1; + d[0].chunk->offset = 0; +done: + pw_stream_queue_buffer(impl->stream, buf); +} + +static int parse_varlen(uint8_t *p, uint32_t avail, uint32_t *result) +{ + uint32_t value = 0, offs = 0; + while (offs < avail) { + uint8_t b = p[offs++]; + value = (value << 7) | (b & 0x7f); + if ((b & 0x80) == 0) + break; + } + *result = value; + return offs; +} + +static int get_midi_size(uint8_t *p, uint32_t avail) +{ + int size; + uint32_t offs = 0, value; + + switch (p[offs++]) { + case 0xc0 ... 0xdf: + size = 2; + break; + case 0x80 ... 0xbf: + case 0xe0 ... 0xef: + size = 3; + break; + case 0xff: + case 0xf0: + case 0xf7: + size = parse_varlen(&p[offs], avail - offs, &value); + size += value + 1; + break; + default: + return -EINVAL; + } + return size; +} + +static double get_time(struct impl *impl) +{ + struct timespec ts; + struct spa_io_position *pos; + double t; + + clock_gettime(CLOCK_MONOTONIC, &ts); + if ((pos = impl->io_position) != NULL) { + t = pos->clock.position / (double) pos->clock.rate.denom; + t += (SPA_TIMESPEC_TO_NSEC(&ts) - pos->clock.nsec) / (double)SPA_NSEC_PER_SEC; + } else { + t = SPA_TIMESPEC_TO_NSEC(&ts); + } + return t; +} + +static void receive_midi(struct impl *impl, uint8_t *packet, + uint32_t timestamp, uint32_t payload_offset, uint32_t plen) +{ + uint32_t write; + struct rtp_midi_header *hdr; + int32_t filled; + struct spa_pod_builder b; + struct spa_pod_frame f[1]; + void *ptr; + uint32_t offs = payload_offset, len, end; + bool first = true; + + if (impl->direct_timestamp) { + /* in direct timestamp we attach the RTP timestamp directly on the + * midi events and render them in the corresponding cycle */ + if (!impl->have_sync) { + pw_log_info("sync to timestamp:%u seq:%u ts_offset:%u SSRC:%u direct:%d", + timestamp, impl->seq-1, impl->ts_offset, impl->ssrc, + impl->direct_timestamp); + impl->have_sync = true; + } + } else { + /* in non-direct timestamp mode, we relate the graph clock against + * the RTP timestamps */ + double ts = timestamp / (float) impl->rate; + double t = get_time(impl); + double elapsed, estimated, diff; + + /* the elapsed time between RTP timestamps */ + elapsed = ts - impl->last_timestamp; + /* for that elapsed time, our clock should have advanced + * by this amount since the last estimation */ + estimated = impl->last_time + elapsed * impl->corr; + /* calculate the diff between estimated and current clock time in + * samples */ + diff = (estimated - t) * impl->rate; + + /* no sync or we drifted too far, resync */ + if (!impl->have_sync || fabs(diff) > impl->target_buffer) { + impl->corr = 1.0; + spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 256, impl->rate); + + pw_log_info("sync to timestamp:%u seq:%u ts_offset:%u SSRC:%u direct:%d", + timestamp, impl->seq-1, impl->ts_offset, impl->ssrc, + impl->direct_timestamp); + impl->have_sync = true; + impl->ring.readindex = impl->ring.writeindex; + } else { + /* update our new rate correction */ + impl->corr = spa_dll_update(&impl->dll, diff); + /* our current time is now the estimated time */ + t = estimated; + } + pw_log_debug("%f %f %f %f", t, estimated, diff, impl->corr); + + timestamp = t * impl->rate; + + impl->last_timestamp = ts; + impl->last_time = t; + } + + filled = spa_ringbuffer_get_write_index(&impl->ring, &write); + if (filled > (int32_t)BUFFER_SIZE2) { + pw_log_warn("overflow"); + return; + } + + hdr = (struct rtp_midi_header *)&packet[offs++]; + len = hdr->len; + if (hdr->b) { + len = (len << 8) | hdr->len_b; + offs++; + } + end = len + offs; + if (end > plen) { + pw_log_warn("invalid packet %d > %d", end, plen); + return; + } + + ptr = SPA_PTROFF(impl->buffer, write & BUFFER_MASK2, void); + + /* each packet is written as a sequence of events. The offset is + * the RTP timestamp */ + spa_pod_builder_init(&b, ptr, BUFFER_SIZE2 - filled); + spa_pod_builder_push_sequence(&b, &f[0], 0); + + while (offs < end) { + uint32_t delta; + int size; + + if (first && !hdr->z) + delta = 0; + else + offs += parse_varlen(&packet[offs], end - offs, &delta); + + timestamp += delta * impl->corr; + spa_pod_builder_control(&b, timestamp, SPA_CONTROL_Midi); + + size = get_midi_size(&packet[offs], end - offs); + + if (size <= 0 || offs + size > end) { + pw_log_warn("invalid size (%08x) %d (%u %u)", + packet[offs], size, offs, end); + break; + } + + spa_pod_builder_bytes(&b, &packet[offs], size); + + offs += size; + first = false; + } + spa_pod_builder_pop(&b, &f[0]); + + write += b.state.offset; + spa_ringbuffer_write_update(&impl->ring, write); +} + +static void receive_rtp_midi(struct impl *impl, uint8_t *buffer, ssize_t len) +{ + struct rtp_header *hdr; + ssize_t hlen; + uint16_t seq; + uint32_t timestamp; + + if (len < 12) + goto short_packet; + + hdr = (struct rtp_header*)buffer; + if (hdr->v != 2) + goto invalid_version; + + hlen = 12 + hdr->cc * 4; + if (hlen > len) + goto invalid_len; + + if (impl->have_ssrc && impl->ssrc != hdr->ssrc) + goto unexpected_ssrc; + impl->ssrc = hdr->ssrc; + impl->have_ssrc = true; + + seq = ntohs(hdr->sequence_number); + if (impl->have_seq && impl->seq != seq) { + pw_log_info("unexpected seq (%d != %d) SSRC:%u", + seq, impl->seq, hdr->ssrc); + impl->have_sync = false; + } + impl->seq = seq + 1; + impl->have_seq = true; + + timestamp = ntohl(hdr->timestamp) - impl->ts_offset; + + impl->receiving = true; + + receive_midi(impl, buffer, timestamp, hlen, len); + return; + +short_packet: + pw_log_warn("short packet received"); + return; +invalid_version: + pw_log_warn("invalid RTP version"); + spa_debug_mem(0, buffer, len); + return; +invalid_len: + pw_log_warn("invalid RTP length"); + return; +unexpected_ssrc: + pw_log_warn("unexpected SSRC (expected %u != %u)", + impl->ssrc, hdr->ssrc); + return; +} + +static int write_event(uint8_t *p, uint32_t value, void *ev, uint32_t size) +{ + uint64_t buffer; + uint8_t b; + int count = 0; + + buffer = value & 0x7f; + while ((value >>= 7)) { + buffer <<= 8; + buffer |= ((value & 0x7f) | 0x80); + } + do { + b = buffer & 0xff; + p[count++] = b; + buffer >>= 8; + } while (b & 0x80); + + memcpy(&p[count], ev, size); + return count + size; +} + +static void flush_midi_packets(struct impl *impl, + struct spa_pod_sequence *sequence, uint32_t timestamp) +{ + struct spa_pod_control *c; + struct rtp_header header; + struct rtp_midi_header midi_header; + struct iovec iov[3]; + uint32_t len, prev_offset, base; + + spa_zero(header); + header.v = 2; + header.pt = impl->payload; + header.ssrc = htonl(impl->ssrc); + + spa_zero(midi_header); + + iov[0].iov_base = &header; + iov[0].iov_len = sizeof(header); + iov[1].iov_base = &midi_header; + iov[1].iov_len = sizeof(midi_header); + iov[2].iov_base = impl->buffer; + iov[2].iov_len = 0; + + prev_offset = len = base = 0; + + SPA_POD_SEQUENCE_FOREACH(sequence, c) { + void *ev; + uint32_t size, delta; + + if (c->type != SPA_CONTROL_Midi) + continue; + + ev = SPA_POD_BODY(&c->value), + size = SPA_POD_BODY_SIZE(&c->value); + + if (len > 0 && (len + size > impl->mtu || + c->offset - base > impl->psamples)) { + /* flush packet when we have one and when it's either + * too large or has too much data. */ + if (len < 16) { + midi_header.b = 0; + midi_header.len = len; + iov[1].iov_len = sizeof(midi_header) - 1; + } else { + midi_header.b = 1; + midi_header.len = (len >> 8) & 0xf; + midi_header.len_b = len & 0xff; + iov[1].iov_len = sizeof(midi_header); + } + iov[2].iov_len = len; + + pw_log_debug("sending %d timestamp:%d %u %u", + len, timestamp + base, + c->offset, impl->psamples); + rtp_stream_emit_send_packet(impl, iov, 3); + + impl->seq++; + len = 0; + } + if (len == 0) { + /* start new packet */ + base = prev_offset = c->offset; + header.sequence_number = htons(impl->seq); + header.timestamp = htonl(impl->ts_offset + timestamp + base); + + memcpy(&impl->buffer[len], ev, size); + len += size; + } else { + delta = c->offset - prev_offset; + prev_offset = c->offset; + len += write_event(&impl->buffer[len], delta, ev, size); + } + } + if (len > 0) { + /* flush last packet */ + if (len < 16) { + midi_header.b = 0; + midi_header.len = len; + iov[1].iov_len = sizeof(midi_header) - 1; + } else { + midi_header.b = 1; + midi_header.len = (len >> 8) & 0xf; + midi_header.len_b = len & 0xff; + iov[1].iov_len = sizeof(midi_header); + } + iov[2].iov_len = len; + + pw_log_debug("sending %d timestamp:%d", len, base); + rtp_stream_emit_send_packet(impl, iov, 3); + impl->seq++; + } +} + +static void process_midi_capture(void *data) +{ + struct impl *impl = data; + struct pw_buffer *buf; + struct spa_data *d; + uint32_t offs, size, timestamp; + struct spa_pod *pod; + void *ptr; + + if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { + pw_log_debug("Out of stream buffers: %m"); + return; + } + d = buf->buffer->datas; + + offs = SPA_MIN(d[0].chunk->offset, d[0].maxsize); + size = SPA_MIN(d[0].chunk->size, d[0].maxsize - offs); + + if (SPA_LIKELY(impl->io_position)) + timestamp = impl->io_position->clock.position; + else + timestamp = 0; + + ptr = SPA_PTROFF(d[0].data, offs, void); + + if ((pod = spa_pod_from_data(ptr, size, 0, size)) == NULL) + goto done; + if (!spa_pod_is_sequence(pod)) + goto done; + + if (!impl->have_sync) { + pw_log_info("sync to timestamp:%u seq:%u ts_offset:%u SSRC:%u", + timestamp, impl->seq, impl->ts_offset, impl->ssrc); + impl->have_sync = true; + } + + flush_midi_packets(impl, (struct spa_pod_sequence*)pod, timestamp); + +done: + pw_stream_queue_buffer(impl->stream, buf); +} diff --git a/src/modules/module-rtp/stream.c b/src/modules/module-rtp/stream.c new file mode 100644 index 000000000..192c4e047 --- /dev/null +++ b/src/modules/module-rtp/stream.c @@ -0,0 +1,455 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2023 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include + +#define DEFAULT_FORMAT "S16BE" +#define DEFAULT_RATE 48000 +#define DEFAULT_CHANNELS 2 +#define DEFAULT_POSITION "[ FL FR ]" + +#define BUFFER_SIZE (1u<<22) +#define BUFFER_MASK (BUFFER_SIZE-1) +#define BUFFER_SIZE2 (BUFFER_SIZE>>1) +#define BUFFER_MASK2 (BUFFER_SIZE2-1) + +#define DEFAULT_MTU 1280 +#define DEFAULT_MIN_PTIME 2 +#define DEFAULT_MAX_PTIME 20 + +#define ERROR_MSEC 2 +#define DEFAULT_SESS_LATENCY 100 + +#define rtp_stream_emit(s,m,v,...) spa_hook_list_call(&s->listener_list, \ + struct rtp_stream_events, m, v, ##__VA_ARGS__) +#define rtp_stream_emit_destroy(s) rtp_stream_emit(s, destroy, 0) +#define rtp_stream_emit_state_changed(s,n,e) rtp_stream_emit(s, state_changed,0,n,e) +#define rtp_stream_emit_send_packet(s,i,l) rtp_stream_emit(s, send_packet,0,i,l) + +struct impl { + struct spa_audio_info info; + + struct pw_stream *stream; + struct spa_hook stream_listener; + struct pw_stream_events stream_events; + + struct spa_hook_list listener_list; + struct spa_hook listener; + + const struct format_info *format_info; + + uint32_t rate; + uint32_t stride; + uint8_t payload; + uint32_t ssrc; + uint16_t seq; + unsigned have_ssrc:1; + unsigned have_seq:1; + uint32_t ts_offset; + uint32_t psamples; + uint32_t mtu; + + struct spa_ringbuffer ring; + uint8_t buffer[BUFFER_SIZE]; + + struct spa_io_rate_match *io_rate_match; + struct spa_io_position *io_position; + struct spa_dll dll; + double corr; + uint32_t target_buffer; + float max_error; + + float last_timestamp; + float last_time; + + unsigned direct_timestamp:1; + unsigned always_process:1; + unsigned started:1; + unsigned have_sync:1; + unsigned receiving:1; + unsigned first:1; + + void (*receive_rtp)(struct impl *impl, uint8_t *buffer, ssize_t len); +}; + +#include "module-rtp/audio.c" +#include "module-rtp/midi.c" + +struct format_info { + uint32_t media_subtype; + uint32_t format; + uint32_t size; + const char *mime; + const char *media_type; +}; + +static const struct format_info audio_format_info[] = { + { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_U8, 1, "L8", "audio" }, + { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_ALAW, 1, "PCMA", "audio" }, + { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_ULAW, 1, "PCMU", "audio" }, + { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_S16_BE, 2, "L16", "audio" }, + { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_S24_BE, 3, "L24", "audio" }, + { SPA_MEDIA_SUBTYPE_control, 0, 1, "rtp-midi", "audio" }, +}; + +static void stream_io_changed(void *data, uint32_t id, void *area, uint32_t size) +{ + struct impl *impl = data; + switch (id) { + case SPA_IO_RateMatch: + impl->io_rate_match = area; + break; + case SPA_IO_Position: + impl->io_position = area; + break; + } +} + +static void stream_destroy(void *d) +{ + struct impl *impl = d; + spa_hook_remove(&impl->stream_listener); + impl->stream = NULL; +} + +static int stream_start(struct impl *impl) +{ + if (impl->started) + return 0; + + rtp_stream_emit_state_changed(impl, true, NULL); + + impl->started = true; + return 0; +} + +static int stream_stop(struct impl *impl) +{ + if (!impl->started) + return 0; + + rtp_stream_emit_state_changed(impl, false, NULL); + + impl->started = false; + return 0; +} + +static void on_stream_state_changed(void *d, enum pw_stream_state old, + enum pw_stream_state state, const char *error) +{ + struct impl *impl = d; + + switch (state) { + case PW_STREAM_STATE_UNCONNECTED: + pw_log_info("stream disconnected"); + break; + case PW_STREAM_STATE_ERROR: + pw_log_error("stream error: %s", error); + rtp_stream_emit_state_changed(impl, false, error); + break; + case PW_STREAM_STATE_STREAMING: + if ((errno = -stream_start(impl)) < 0) + pw_log_error("failed to start RTP stream: %m"); + break; + case PW_STREAM_STATE_PAUSED: + if (!impl->always_process) + stream_stop(impl); + impl->have_sync = false; + break; + default: + break; + } +} + +static const struct pw_stream_events stream_events = { + PW_VERSION_STREAM_EVENTS, + .destroy = stream_destroy, + .state_changed = on_stream_state_changed, + .io_changed = stream_io_changed, +}; + +static const struct format_info *find_audio_format_info(const struct spa_audio_info *info) +{ + SPA_FOR_EACH_ELEMENT_VAR(audio_format_info, f) + if (f->media_subtype == info->media_subtype && + (f->format == 0 || f->format == info->info.raw.format)) + return f; + return NULL; +} + +static inline uint32_t format_from_name(const char *name, size_t len) +{ + int i; + for (i = 0; spa_type_audio_format[i].name; i++) { + if (strncmp(name, spa_debug_type_short_name(spa_type_audio_format[i].name), len) == 0) + return spa_type_audio_format[i].type; + } + return SPA_AUDIO_FORMAT_UNKNOWN; +} + +static uint32_t channel_from_name(const char *name) +{ + int i; + for (i = 0; spa_type_audio_channel[i].name; i++) { + if (spa_streq(name, spa_debug_type_short_name(spa_type_audio_channel[i].name))) + return spa_type_audio_channel[i].type; + } + return SPA_AUDIO_CHANNEL_UNKNOWN; +} + +static void parse_position(struct spa_audio_info_raw *info, const char *val, size_t len) +{ + struct spa_json it[2]; + char v[256]; + + spa_json_init(&it[0], val, len); + if (spa_json_enter_array(&it[0], &it[1]) <= 0) + spa_json_init(&it[1], val, len); + + info->channels = 0; + while (spa_json_get_string(&it[1], v, sizeof(v)) > 0 && + info->channels < SPA_AUDIO_MAX_CHANNELS) { + info->position[info->channels++] = channel_from_name(v); + } +} + +static void parse_audio_info(const struct pw_properties *props, struct spa_audio_info_raw *info) +{ + const char *str; + + spa_zero(*info); + if ((str = pw_properties_get(props, PW_KEY_AUDIO_FORMAT)) == NULL) + str = DEFAULT_FORMAT; + info->format = format_from_name(str, strlen(str)); + + info->rate = pw_properties_get_uint32(props, PW_KEY_AUDIO_RATE, info->rate); + if (info->rate == 0) + info->rate = DEFAULT_RATE; + + info->channels = pw_properties_get_uint32(props, PW_KEY_AUDIO_CHANNELS, info->channels); + info->channels = SPA_MIN(info->channels, SPA_AUDIO_MAX_CHANNELS); + if ((str = pw_properties_get(props, SPA_KEY_AUDIO_POSITION)) != NULL) + parse_position(info, str, strlen(str)); + if (info->channels == 0) + parse_position(info, DEFAULT_POSITION, strlen(DEFAULT_POSITION)); +} + +static uint32_t msec_to_samples(struct impl *impl, uint32_t msec) +{ + return msec * impl->rate / 1000; +} + +struct rtp_stream *rtp_stream_new(struct pw_core *core, + enum pw_direction direction, struct pw_properties *props, + const struct rtp_stream_events *events, void *data) +{ + struct impl *impl; + const char *str; + uint8_t buffer[1024]; + struct spa_pod_builder b; + uint32_t n_params, min_samples, max_samples; + float min_ptime, max_ptime; + const struct spa_pod *params[1]; + enum pw_stream_flags flags; + int latency_msec; + int res; + + impl = calloc(1, sizeof(*impl)); + if (impl == NULL) { + res = -errno; + goto out; + return NULL; + } + impl->first = true; + spa_hook_list_init(&impl->listener_list); + impl->stream_events = stream_events; + + impl->info.media_type = SPA_MEDIA_TYPE_audio; + impl->info.media_subtype = SPA_MEDIA_SUBTYPE_raw; + if ((str = pw_properties_get(props, "rtp.media")) != NULL) { + if (spa_streq(str, "audio")) { + impl->info.media_type = SPA_MEDIA_TYPE_audio; + impl->info.media_subtype = SPA_MEDIA_SUBTYPE_raw; + impl->payload = 127; + } + else if (spa_streq(str, "midi")) { + impl->info.media_type = SPA_MEDIA_TYPE_application; + impl->info.media_subtype = SPA_MEDIA_SUBTYPE_control; + impl->payload = 0x61; + } + else { + pw_log_error("unsupported media type:%s", str); + res = -EINVAL; + goto out; + } + } + + switch (impl->info.media_type) { + case SPA_MEDIA_TYPE_audio: + parse_audio_info(props, &impl->info.info.raw); + impl->format_info = find_audio_format_info(&impl->info); + if (impl->format_info == NULL) { + pw_log_error("unsupported audio format:%d channels:%d", + impl->info.info.raw.format, impl->info.info.raw.channels); + res = -EINVAL; + goto out; + } + impl->stride = impl->format_info->size * impl->info.info.raw.channels; + impl->rate = impl->info.info.raw.rate; + break; + case SPA_MEDIA_TYPE_application: + impl->format_info = find_audio_format_info(&impl->info); + if (impl->format_info == NULL) { + res = -EINVAL; + goto out; + } + pw_properties_set(props, PW_KEY_FORMAT_DSP, "8 bit raw midi"); + impl->stride = impl->format_info->size; + impl->rate = pw_properties_get_uint32(props, "midi.rate", 10000); + if (impl->rate == 0) + impl->rate = 10000; + break; + default: + spa_assert_not_reached(); + break; + } + + pw_properties_setf(props, "rtp.media", "%s", impl->format_info->media_type); + pw_properties_setf(props, "rtp.mime", "%s", impl->format_info->mime); + + if (direction == PW_DIRECTION_INPUT) + impl->ssrc = pw_properties_get_uint32(props, "rtp.sender-ssrc", pw_rand32()); + else + impl->ssrc = pw_properties_get_uint32(props, "rtp.receiver-ssrc", pw_rand32()); + impl->payload = pw_properties_get_uint32(props, "rtp.payload", impl->payload); + impl->mtu = pw_properties_get_uint32(props, "rtp.mtu", DEFAULT_MTU); + + str = pw_properties_get(props, "rtp.min-ptime"); + if (!spa_atof(str, &min_ptime)) + min_ptime = DEFAULT_MIN_PTIME; + str = pw_properties_get(props, "rtp.max-ptime"); + if (!spa_atof(str, &max_ptime)) + max_ptime = DEFAULT_MAX_PTIME; + + min_samples = min_ptime * impl->rate / 1000; + max_samples = max_ptime * impl->rate / 1000; + + impl->psamples = impl->mtu / impl->stride; + impl->psamples = SPA_CLAMP(impl->psamples, min_samples, max_samples); + + latency_msec = pw_properties_get_uint32(props, + "sess.latency.msec", DEFAULT_SESS_LATENCY); + impl->target_buffer = msec_to_samples(impl, latency_msec); + impl->max_error = msec_to_samples(impl, ERROR_MSEC); + + pw_properties_setf(props, PW_KEY_NODE_RATE, "1/%d", impl->rate); + pw_properties_setf(props, PW_KEY_NODE_LATENCY, "%d/%d", + impl->target_buffer / 2, impl->rate); + + spa_dll_init(&impl->dll); + spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 128, impl->rate); + impl->corr = 1.0; + + impl->stream = pw_stream_new(core, "rtp-session", props); + props = NULL; + if (impl->stream == NULL) { + res = -errno; + pw_log_error("can't create stream: %m"); + goto out; + } + + n_params = 0; + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + + flags = PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_RT_PROCESS; + + switch (impl->info.media_type) { + case SPA_MEDIA_TYPE_audio: + params[n_params++] = spa_format_audio_build(&b, + SPA_PARAM_EnumFormat, &impl->info); + flags |= PW_STREAM_FLAG_AUTOCONNECT; + if (direction == SPA_DIRECTION_INPUT) + impl->stream_events.process = process_audio_capture; + else + impl->stream_events.process = process_audio_playback; + impl->receive_rtp = receive_rtp_audio; + break; + case SPA_MEDIA_TYPE_application: + params[n_params++] = spa_pod_builder_add_object(&b, + SPA_TYPE_OBJECT_Format, SPA_PARAM_EnumFormat, + SPA_FORMAT_mediaType, SPA_POD_Id(SPA_MEDIA_TYPE_application), + SPA_FORMAT_mediaSubtype, SPA_POD_Id(SPA_MEDIA_SUBTYPE_control)); + if (direction == SPA_DIRECTION_INPUT) + impl->stream_events.process = process_midi_capture; + else + impl->stream_events.process = process_midi_playback; + impl->receive_rtp = receive_rtp_midi; + break; + default: + res = -EINVAL; + goto out; + } + + pw_stream_add_listener(impl->stream, + &impl->stream_listener, + &impl->stream_events, impl); + + if ((res = pw_stream_connect(impl->stream, + direction, + PW_ID_ANY, + flags, + params, n_params)) < 0) { + pw_log_error("can't connect stream: %s", spa_strerror(res)); + goto out; + } + + if (impl->always_process && + (res = stream_start(impl)) < 0) + goto out; + + spa_hook_list_append(&impl->listener_list, &impl->listener, events, data); + + return (struct rtp_stream*)impl; +out: + pw_properties_free(props); + errno = -res; + return NULL; +} + +void rtp_stream_destroy(struct rtp_stream *s) +{ + struct impl *impl = (struct impl*)s; + + rtp_stream_emit_destroy(impl); + + if (impl->stream) + pw_stream_destroy(impl->stream); + + spa_hook_list_clean(&impl->listener_list); + free(impl); +} + +int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len) +{ + struct impl *impl = (struct impl*)s; + impl->receive_rtp(impl, buffer, len); + return 0; +} diff --git a/src/modules/module-rtp/stream.h b/src/modules/module-rtp/stream.h new file mode 100644 index 000000000..445c609bd --- /dev/null +++ b/src/modules/module-rtp/stream.h @@ -0,0 +1,38 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2023 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +#ifndef PIPEWIRE_RTP_STREAM_H +#define PIPEWIRE_RTP_STREAM_H + +#ifdef __cplusplus +extern "C" { +#endif + +struct rtp_stream; + +struct rtp_stream_events { +#define RTP_VERSION_STREAM_EVENTS 0 + uint32_t version; + + void (*destroy) (void *data); + + void (*state_changed) (void *data, bool started, const char *error); + + void (*send_packet) (void *data, struct iovec *iov, size_t iovlen); +}; + +struct rtp_stream *rtp_stream_new(struct pw_core *core, + enum pw_direction direction, struct pw_properties *props, + const struct rtp_stream_events *events, void *data); + +void rtp_stream_destroy(struct rtp_stream *s); + +int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len); + + +#ifdef __cplusplus +} +#endif + +#endif /* PIPEWIRE_RTP_STREAM_H */