From 8dfb22d12bb09aa9c6dfbc75a2d8f3b679e08112 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 12 Jun 2023 09:09:11 +0200 Subject: [PATCH] module-netjack2: add opus support --- src/modules/meson.build | 4 +- src/modules/module-netjack2-driver.c | 7 +- src/modules/module-netjack2-manager.c | 34 ++- src/modules/module-netjack2/peer.c | 333 +++++++++++++++++++++++--- 4 files changed, 331 insertions(+), 47 deletions(-) diff --git a/src/modules/meson.build b/src/modules/meson.build index 30279474f..d8f9ed622 100644 --- a/src/modules/meson.build +++ b/src/modules/meson.build @@ -208,7 +208,7 @@ pipewire_module_netjack2_driver = shared_library('pipewire-module-netjack2-drive install : true, install_dir : modules_install_dir, install_rpath: modules_install_dir, - dependencies : [spa_dep, mathlib, dl_lib, pipewire_dep], + dependencies : [spa_dep, mathlib, dl_lib, pipewire_dep, opus_dep], ) pipewire_module_netjack2_manager = shared_library('pipewire-module-netjack2-manager', @@ -217,7 +217,7 @@ pipewire_module_netjack2_manager = shared_library('pipewire-module-netjack2-mana install : true, install_dir : modules_install_dir, install_rpath: modules_install_dir, - dependencies : [spa_dep, mathlib, dl_lib, pipewire_dep], + dependencies : [spa_dep, mathlib, dl_lib, pipewire_dep, opus_dep], ) pipewire_module_profiler = shared_library('pipewire-module-profiler', diff --git a/src/modules/module-netjack2-driver.c b/src/modules/module-netjack2-driver.c index 1a7048095..0aa92d304 100644 --- a/src/modules/module-netjack2-driver.c +++ b/src/modules/module-netjack2-driver.c @@ -869,9 +869,7 @@ static int handle_follower_setup(struct impl *impl, struct nj2_session_params *p peer->other_stream = 's'; peer->send_volume = &impl->sink.volume; peer->recv_volume = &impl->source.volume; - peer->buffer_size = peer->params.period_size * sizeof(float) * - SPA_MAX(peer->params.send_midi_channels, peer->params.recv_midi_channels); - peer->buffer = calloc(1, peer->buffer_size); + netjack2_init(peer); int bufsize = NETWORK_MAX_LATENCY * (peer->params.mtu + peer->params.period_size * sizeof(float) * @@ -1049,9 +1047,8 @@ static int send_stop_driver(struct impl *impl) pw_filter_destroy(impl->source.filter); if (impl->sink.filter) pw_filter_destroy(impl->sink.filter); - free(impl->peer.buffer); - impl->peer.buffer = NULL; + netjack2_cleanup(&impl->peer); return 0; } diff --git a/src/modules/module-netjack2-manager.c b/src/modules/module-netjack2-manager.c index 1d1a2a4dd..02a3e2999 100644 --- a/src/modules/module-netjack2-manager.c +++ b/src/modules/module-netjack2-manager.c @@ -60,6 +60,8 @@ * placed per stream. * - `netjack2.sample-rate`: the sample rate to use, default 48000 * - `netjack2.period-size`: the buffer size to use, default 1024 + * - `netjack2.encoding`: the encoding, float|opus, default float + * - `netjack2.kbps`: the number of kilobits per second when encoding, default 64 * - `audio.channels`: the number of audio ports. Can also be added to the stream props. * - `midi.ports`: the number of midi ports. Can also be added to the stream props. * - `source.props`: Extra properties for the source filter. @@ -88,6 +90,8 @@ * #netjack2.connect = true * #netjack2.sample-rate = 48000 * #netjack2.period-size = 1024 + * #netjack2.encoding = float # float|opus + * #netjack2.kbps = 64 * #midi.ports = 0 * #audio.channels = 2 * #audio.position = [ FL FR ] @@ -123,6 +127,8 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); #define DEFAULT_SAMPLE_RATE 48000 #define DEFAULT_PERIOD_SIZE 1024 +#define DEFAULT_ENCODING "float" +#define DEFAULT_KBPS 64 #define DEFAULT_CHANNELS 2 #define DEFAULT_POSITION "[ FL FR ]" #define DEFAULT_MIDI_PORTS 1 @@ -232,6 +238,8 @@ struct impl { uint32_t dscp; uint32_t period_size; uint32_t samplerate; + uint32_t encoding; + uint32_t kbps; struct pw_impl_module *module; struct spa_hook module_listener; @@ -364,8 +372,7 @@ static void follower_free(struct follower *follower) if (follower->socket) pw_loop_destroy_source(impl->data_loop->loop, follower->socket); - free(follower->peer.buffer); - + netjack2_cleanup(&follower->peer); free(follower); } @@ -900,8 +907,8 @@ static int handle_follower_available(struct impl *impl, struct nj2_session_param snprintf(peer->params.driver_name, sizeof(peer->params.driver_name), "%s", pw_get_host_name()); peer->params.sample_rate = follower->samplerate; peer->params.period_size = follower->period_size; - peer->params.sample_encoder = NJ2_ENCODER_FLOAT; - peer->params.kbps = 0; + peer->params.sample_encoder = impl->encoding; + peer->params.kbps = impl->kbps; if (peer->params.send_audio_channels < 0) peer->params.send_audio_channels = follower->sink.info.channels; @@ -947,9 +954,7 @@ static int handle_follower_available(struct impl *impl, struct nj2_session_param peer->other_stream = 'r'; peer->send_volume = &follower->sink.volume; peer->recv_volume = &follower->source.volume; - peer->buffer_size = peer->params.period_size * sizeof(float) * - SPA_MAX(peer->params.send_midi_channels, peer->params.recv_midi_channels); - peer->buffer = calloc(1, peer->buffer_size); + netjack2_init(peer); int bufsize = NETWORK_MAX_LATENCY * (peer->params.mtu + follower->period_size * sizeof(float) * @@ -1245,6 +1250,21 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) DEFAULT_SAMPLE_RATE); impl->period_size = pw_properties_get_uint32(impl->props, "netjack2.period-size", DEFAULT_PERIOD_SIZE); + if ((str = pw_properties_get(impl->props, "netjack2.encoding")) == NULL) + str = DEFAULT_ENCODING; + if (spa_streq(str, "float")) + impl->encoding = NJ2_ENCODER_FLOAT; +#ifdef HAVE_OPUS + else if (spa_streq(str, "opus")) + impl->encoding = NJ2_ENCODER_OPUS; +#endif + else { + pw_log_error("invalid netjack2.encoding '%s'", str); + res = -EINVAL; + goto error; + } + impl->kbps = pw_properties_get_uint32(impl->props, "netjack2.kbps", + DEFAULT_KBPS); if (pw_properties_get(props, PW_KEY_NODE_VIRTUAL) == NULL) pw_properties_set(props, PW_KEY_NODE_VIRTUAL, "true"); diff --git a/src/modules/module-netjack2/peer.c b/src/modules/module-netjack2/peer.c index 0a11f7b51..f5fd777fc 100644 --- a/src/modules/module-netjack2/peer.c +++ b/src/modules/module-netjack2/peer.c @@ -1,6 +1,11 @@ #include +#ifdef HAVE_OPUS +#include +#include +#endif + #define MAX_BUFFER_FRAMES 8192 struct volume { @@ -63,12 +68,109 @@ struct netjack2_peer { struct volume *send_volume; struct volume *recv_volume; - void *buffer; - uint32_t buffer_size; + void *midi_data; + uint32_t midi_size; + + float *empty; +#ifdef HAVE_OPUS + void *encoded_data; + uint32_t encoded_size; + uint32_t max_encoded_size; + OpusCustomMode *opus_config; + OpusCustomEncoder **opus_enc; + OpusCustomDecoder **opus_dec; +#endif unsigned fix_midi:1; }; +static int netjack2_init(struct netjack2_peer *peer) +{ + int res = 0; + + peer->empty = calloc(MAX_BUFFER_FRAMES, sizeof(float)); + + peer->midi_size = peer->params.period_size * sizeof(float) * + SPA_MAX(peer->params.send_midi_channels, peer->params.recv_midi_channels); + peer->midi_data = calloc(1, peer->midi_size); + + if (peer->params.sample_encoder == NJ2_ENCODER_OPUS) { +#ifdef HAVE_OPUS + int32_t i; + peer->max_encoded_size = (peer->params.kbps * peer->params.period_size * 1024) / + (peer->params.sample_rate * 8) + sizeof(uint16_t); + peer->encoded_size = peer->max_encoded_size * + SPA_MAX(peer->params.send_audio_channels, peer->params.recv_audio_channels); + if ((peer->encoded_data = calloc(1, peer->encoded_size)) == NULL) + goto error_errno; + if ((peer->opus_config = opus_custom_mode_create(peer->params.sample_rate, + peer->params.period_size, &res)) == NULL) + goto error_opus; + if ((peer->opus_enc = calloc(peer->params.send_audio_channels, + sizeof(OpusCustomEncoder*))) == NULL) + goto error_errno; + + for (i = 0; i < peer->params.send_audio_channels; i++) { + if ((peer->opus_enc[i] = opus_custom_encoder_create(peer->opus_config, + 1, &res)) == NULL) + goto error_opus; + opus_custom_encoder_ctl(peer->opus_enc[i], + OPUS_SET_BITRATE(peer->params.kbps*1024)); // bits per second + opus_custom_encoder_ctl(peer->opus_enc[i], + OPUS_SET_COMPLEXITY(10)); + opus_custom_encoder_ctl(peer->opus_enc[i], + OPUS_SET_SIGNAL(OPUS_SIGNAL_MUSIC)); + opus_custom_encoder_ctl(peer->opus_enc[i], + OPUS_SET_SIGNAL(OPUS_APPLICATION_RESTRICTED_LOWDELAY)); + } + if ((peer->opus_dec = calloc(peer->params.recv_audio_channels, + sizeof(OpusCustomDecoder*))) == NULL) + goto error_errno; + for (i = 0; i < peer->params.recv_audio_channels; i++) { + if ((peer->opus_dec[i] = opus_custom_decoder_create(peer->opus_config, + 1, &res)) == NULL) + goto error_opus; + } +#else + return -ENOTSUP; +#endif + + } + return res; +#ifdef HAVE_OPUS +error_errno: + pw_log_warn("error: %m"); + return -errno; +error_opus: + pw_log_warn("error: %d", res); + return -EINVAL; +#endif +} + +static void netjack2_cleanup(struct netjack2_peer *peer) +{ + + free(peer->empty); + free(peer->midi_data); +#ifdef HAVE_OPUS + int32_t i; + for (i = 0; i < peer->params.send_audio_channels; i++) { + if (peer->opus_enc[i]) + opus_custom_encoder_destroy(peer->opus_enc[i]); + } + free(peer->opus_enc); + for (i = 0; i < peer->params.recv_audio_channels; i++) { + if (peer->opus_dec[i]) + opus_custom_decoder_destroy(peer->opus_dec[i]); + } + free(peer->opus_dec); + if (peer->opus_config) + opus_custom_mode_destroy(peer->opus_config); + free(peer->encoded_data); +#endif + spa_zero(*peer); +} + struct data_info { uint32_t id; void *data; @@ -217,23 +319,25 @@ static int netjack2_send_midi(struct netjack2_peer *peer, uint32_t nframes, struct data_info *info, uint32_t n_info) { struct nj2_packet_header header; - uint8_t buffer[peer->params.mtu]; - uint32_t i, num_packets, active_ports, data_size, res1, res2; + uint8_t buffer[peer->params.mtu], *midi_data; + uint32_t i, num_packets, active_ports, midi_size; uint32_t max_size; active_ports = peer->params.send_midi_channels; if (active_ports <= 0) return 0; - data_size = 0; + midi_size = 0; + midi_data = peer->midi_data; + for (i = 0; i < active_ports; i++) { struct nj2_midi_buffer *mbuf; void *data = (i < n_info && info) ? info[i].data : NULL; - mbuf = SPA_PTROFF(peer->buffer, data_size, struct nj2_midi_buffer); + mbuf = SPA_PTROFF(midi_data, midi_size, struct nj2_midi_buffer); midi_to_netjack2(peer, mbuf, data, nframes); - data_size += sizeof(*mbuf) + midi_size += sizeof(*mbuf) + mbuf->event_count * sizeof(struct nj2_midi_event) + mbuf->write_pos; @@ -243,10 +347,7 @@ static int netjack2_send_midi(struct netjack2_peer *peer, uint32_t nframes, /* Note: jack2 calculates the packet max_size and num packets with * different values... */ max_size = peer->params.mtu - sizeof(header); - - res1 = data_size % max_size; - res2 = data_size / max_size; - num_packets = (res1) ? res2 + 1 : res2; + num_packets = (midi_size + max_size-1) / max_size; strcpy(header.type, "header"); header.data_type = htonl('m'); @@ -259,7 +360,7 @@ static int netjack2_send_midi(struct netjack2_peer *peer, uint32_t nframes, for (i = 0; i < num_packets; i++) { uint32_t is_last = ((i == num_packets - 1) && peer->params.send_audio_channels == 0) ? 1 : 0; - uint32_t size = data_size - i * max_size; + uint32_t size = midi_size - i * max_size; uint32_t copy_size = SPA_MIN(size, max_size); uint32_t packet_size = sizeof(header) + copy_size; @@ -268,7 +369,7 @@ static int netjack2_send_midi(struct netjack2_peer *peer, uint32_t nframes, header.packet_size = htonl(packet_size); memcpy(buffer, &header, sizeof(header)); memcpy(SPA_PTROFF(buffer, sizeof(header), void), - SPA_PTROFF(peer->buffer, i * max_size, void), + SPA_PTROFF(midi_data, i * max_size, void), copy_size); send(peer->fd, buffer, packet_size, 0); //nj2_dump_packet_header(&header); @@ -333,13 +434,97 @@ static int netjack2_send_audio(struct netjack2_peer *peer, uint32_t nframes, return 0; } +static int netjack2_send_opus(struct netjack2_peer *peer, uint32_t nframes, + struct data_info *info, uint32_t n_info) +{ +#ifdef HAVE_OPUS + struct nj2_packet_header header; + uint8_t buffer[peer->params.mtu], *encoded_data; + uint32_t i, j, active_ports, num_packets, max_size, max_encoded; + uint32_t sub_period_bytes, last_period_bytes; + + active_ports = peer->params.send_audio_channels; + if (active_ports <= 0) + return 0; + + max_encoded = peer->max_encoded_size; + + max_size = PACKET_AVAILABLE_SIZE(peer->params.mtu); + num_packets = ((active_ports * max_encoded) + max_size-1) / max_size; + + sub_period_bytes = max_encoded / num_packets; + last_period_bytes = sub_period_bytes + max_encoded % num_packets; + + encoded_data = peer->encoded_data; + + for (i = 0; i < active_ports; i++) { + uint16_t *ap = SPA_PTROFF(encoded_data, i * max_encoded, uint16_t); + void *pcm; + int res; + + if (i >= n_info || (pcm = info[i].data) == NULL) + pcm = peer->empty; + + res = opus_custom_encode_float(peer->opus_enc[i], + pcm, nframes, (unsigned char*)&ap[1], max_encoded - 2); + + if (res < 0 || res > 0xffff) { + pw_log_warn("encoding error %d", res); + ap[0] = 0; + } else { + ap[0] = htons(res); + } + } + + strcpy(header.type, "header"); + header.data_type = htonl('a'); + header.data_stream = htonl(peer->our_stream); + header.id = htonl(peer->params.id); + header.cycle = htonl(peer->cycle); + header.active_ports = htonl(active_ports); + header.num_packets = htonl(num_packets); + header.frames = htonl(nframes); + + for (i = 0; i < num_packets; i++) { + uint32_t is_last = (i == num_packets - 1) ? 1 : 0; + uint32_t data_size, packet_size; + + data_size = is_last ? last_period_bytes : sub_period_bytes; + packet_size = sizeof(header) + active_ports * data_size; + + header.sub_cycle = htonl(i); + header.is_last = htonl(is_last); + header.packet_size = htonl(packet_size); + memcpy(buffer, &header, sizeof(header)); + for (j = 0; j < active_ports; j++) { + memcpy(SPA_PTROFF(buffer, sizeof(header) + j * data_size, void), + SPA_PTROFF(encoded_data, + j * max_encoded + i * sub_period_bytes, void), + data_size); + } + send(peer->fd, buffer, packet_size, 0); + //nj2_dump_packet_header(&header); + } + return 0; +#else + return -ENOTSUP; +#endif +} + static int netjack2_send_data(struct netjack2_peer *peer, uint32_t nframes, struct data_info *midi, uint32_t n_midi, struct data_info *audio, uint32_t n_audio) { netjack2_send_sync(peer, nframes); netjack2_send_midi(peer, nframes, midi, n_midi); - netjack2_send_audio(peer, nframes, audio, n_audio); + switch (peer->params.sample_encoder) { + case NJ2_ENCODER_FLOAT: + netjack2_send_audio(peer, nframes, audio, n_audio); + break; + case NJ2_ENCODER_OPUS: + netjack2_send_opus(peer, nframes, audio, n_audio); + break; + } return 0; } @@ -422,50 +607,51 @@ static int netjack2_recv_midi(struct netjack2_peer *peer, struct nj2_packet_head struct data_info *info, uint32_t n_info) { ssize_t len; - uint32_t i, active_ports, sub_cycle, max_size, offset, buffer_size; + uint32_t i, active_ports, sub_cycle, max_size, offset, midi_size; uint32_t packet_size = SPA_MIN(ntohl(header->packet_size), peer->params.mtu); - uint8_t buffer[packet_size], *buffer_data = buffer; - - //nj2_dump_packet_header(header); + uint8_t buffer[packet_size], *data = buffer, *midi_data; if ((len = recv(peer->fd, buffer, packet_size, 0)) < 0) return -errno; - active_ports = peer->params.send_midi_channels; + active_ports = peer->params.recv_midi_channels; + if (active_ports == 0) + return 0; + sub_cycle = ntohl(header->sub_cycle); peer->sync.num_packets = ntohl(header->num_packets); max_size = peer->params.mtu - sizeof(*header); offset = max_size * sub_cycle; - buffer_data += sizeof(*header); + data += sizeof(*header); len -= sizeof(*header); - if (offset + len < peer->buffer_size) - memcpy(SPA_PTROFF(peer->buffer, offset, void), buffer_data, len); + midi_data = peer->midi_data; + midi_size = peer->midi_size; + + if (offset + len < midi_size) + memcpy(SPA_PTROFF(midi_data, offset, void), data, len); if (++(*count) < peer->sync.num_packets) return 0; - buffer_data = peer->buffer; - buffer_size = peer->buffer_size; - for (i = 0; i < active_ports; i++) { - struct nj2_midi_buffer *mbuf = (struct nj2_midi_buffer *)buffer_data; + struct nj2_midi_buffer *mbuf = (struct nj2_midi_buffer *)midi_data; nj2_midi_buffer_ntoh(mbuf, mbuf); size_t used = sizeof(*mbuf) + mbuf->event_count * sizeof(struct nj2_midi_event) + mbuf->write_pos; - if (used > buffer_size) + if (used > midi_size) break; if (i < n_info && info[i].data != NULL) { netjack2_to_midi(info[i].data, peer->params.period_size * sizeof(float), mbuf); info[i].filled = true; } - buffer_data += used; - buffer_size -= used; + midi_data += used; + midi_size -= used; } return 0; } @@ -513,18 +699,92 @@ static int netjack2_recv_audio(struct netjack2_peer *peer, struct nj2_packet_hea sub_cycle * sub_period_size * sizeof(float), float); do_volume(dst, (float*)&ap[1], peer->recv_volume, active_port, sub_period_size, true); - info[active_port].filled = peer->sync.is_last; + info[active_port].filled = true; } } return 0; } +static int netjack2_recv_opus(struct netjack2_peer *peer, struct nj2_packet_header *header, + uint32_t *count, struct data_info *info, uint32_t n_info) +{ +#ifdef HAVE_OPUS + ssize_t len; + uint32_t i, active_ports, sub_cycle, max_size, encoded_size, max_encoded; + uint32_t packet_size = SPA_MIN(ntohl(header->packet_size), peer->params.mtu); + uint8_t buffer[packet_size], *data = buffer, *encoded_data; + uint32_t sub_period_bytes, last_period_bytes, data_size, num_packets; + + if ((len = recv(peer->fd, buffer, packet_size, 0)) < 0) + return -errno; + + active_ports = peer->params.recv_audio_channels; + if (active_ports == 0) + return 0; + + sub_cycle = ntohl(header->sub_cycle); + peer->sync.num_packets = ntohl(header->num_packets); + + max_encoded = peer->max_encoded_size; + + max_size = PACKET_AVAILABLE_SIZE(peer->params.mtu); + num_packets = ((active_ports * max_encoded) + max_size-1) / max_size; + + sub_period_bytes = max_encoded / num_packets; + last_period_bytes = sub_period_bytes + max_encoded % num_packets; + + data += sizeof(*header); + len -= sizeof(*header); + + if (sub_cycle == peer->sync.num_packets-1) + data_size = last_period_bytes; + else + data_size = sub_period_bytes; + + encoded_data = peer->encoded_data; + encoded_size = peer->encoded_size; + + if ((active_ports-1) * max_encoded + sub_cycle * sub_period_bytes + data_size > encoded_size) + return -ENOSPC; + + for (i = 0; i < active_ports; i++) { + memcpy(SPA_PTROFF(encoded_data, + i * max_encoded + sub_cycle * sub_period_bytes, void), + SPA_PTROFF(data, i * data_size, void), + data_size); + } + if (++(*count) < peer->sync.num_packets) + return 0; + + for (i = 0; i < active_ports; i++) { + uint16_t *ap = SPA_PTROFF(encoded_data, i * max_encoded, uint16_t); + void *pcm; + int res; + + if (i >= n_info || (pcm = info[i].data) == NULL) + continue; + + res = opus_custom_decode_float(peer->opus_dec[i], + (unsigned char*)&ap[1], ntohs(ap[0]), + pcm, peer->sync.frames); + + if (res < 0 || res > 0xffff || res != peer->sync.frames) + pw_log_warn("decoding error %d", res); + else + info[i].filled = true; + } + return 0; +#else + return -ENOTSUP; +#endif +} + static int netjack2_recv_data(struct netjack2_peer *peer, struct data_info *midi, uint32_t n_midi, struct data_info *audio, uint32_t n_audio) { ssize_t len; - uint32_t i, count = 0; + uint32_t i, audio_count = 0, midi_count = 0; struct nj2_packet_header header; while (!peer->sync.is_last) { @@ -546,10 +806,17 @@ static int netjack2_recv_data(struct netjack2_peer *peer, switch (ntohl(header.data_type)) { case 'm': - netjack2_recv_midi(peer, &header, &count, midi, n_midi); + netjack2_recv_midi(peer, &header, &midi_count, midi, n_midi); break; case 'a': - netjack2_recv_audio(peer, &header, &count, audio, n_audio); + switch (peer->params.sample_encoder) { + case NJ2_ENCODER_FLOAT: + netjack2_recv_audio(peer, &header, &audio_count, audio, n_audio); + break; + case NJ2_ENCODER_OPUS: + netjack2_recv_opus(peer, &header, &audio_count, audio, n_audio); + break; + } break; case 's': pw_log_info("missing last data packet");