diff --git a/src/modules/rtp/module-rtp-recv.c b/src/modules/rtp/module-rtp-recv.c index 91305fac5..4e78cb432 100644 --- a/src/modules/rtp/module-rtp-recv.c +++ b/src/modules/rtp/module-rtp-recv.c @@ -90,12 +90,11 @@ struct session { pa_memblockq *memblockq; bool first_packet; - uint32_t ssrc; uint32_t offset; struct pa_sdp_info sdp_info; - pa_rtp_context rtp_context; + pa_rtp_context *rtp_context; pa_rtpoll_item *rtpoll_item; @@ -205,6 +204,7 @@ static void sink_input_suspend_within_thread(pa_sink_input* i, bool b) { /* Called from I/O thread context */ static int rtpoll_work_cb(pa_rtpoll_item *i) { pa_memchunk chunk; + uint32_t timestamp; int64_t k, j, delta; struct timeval now = { 0, 0 }; struct session *s; @@ -224,37 +224,30 @@ static int rtpoll_work_cb(pa_rtpoll_item *i) { p->revents = 0; - if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->module->core->mempool, &now) < 0) + if (pa_rtp_recv(s->rtp_context, &chunk, s->userdata->module->core->mempool, ×tamp, &now) < 0) return 0; - if (s->sdp_info.payload != s->rtp_context.payload || - !PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) { + if (!PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) { pa_memblock_unref(chunk.memblock); return 0; } if (!s->first_packet) { s->first_packet = true; - - s->ssrc = s->rtp_context.ssrc; - s->offset = s->rtp_context.timestamp; - } else { - if (s->ssrc != s->rtp_context.ssrc) { - pa_memblock_unref(chunk.memblock); - return 0; - } + s->offset = timestamp; } /* Check whether there was a timestamp overflow */ - k = (int64_t) s->rtp_context.timestamp - (int64_t) s->offset; - j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) s->rtp_context.timestamp; + k = (int64_t) timestamp - (int64_t) s->offset; + j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) timestamp; if ((k < 0 ? -k : k) < (j < 0 ? -j : j)) delta = k; else delta = j; - pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, true); + pa_memblockq_seek(s->memblockq, delta * (int64_t) pa_rtp_context_get_frame_size(s->rtp_context), PA_SEEK_RELATIVE, + true); if (now.tv_sec == 0) { PA_ONCE_BEGIN { @@ -274,7 +267,7 @@ static int rtpoll_work_cb(pa_rtpoll_item *i) { pa_memblock_unref(chunk.memblock); /* The next timestamp we expect */ - s->offset = s->rtp_context.timestamp + (uint32_t) (chunk.length / s->rtp_context.frame_size); + s->offset = timestamp + (uint32_t) (chunk.length / pa_rtp_context_get_frame_size(s->rtp_context)); pa_atomic_store(&s->timestamp, (int) now.tv_sec); @@ -383,18 +376,12 @@ static int rtpoll_work_cb(pa_rtpoll_item *i) { /* Called from I/O thread context */ static void sink_input_attach(pa_sink_input *i) { struct session *s; - struct pollfd *p; pa_sink_input_assert_ref(i); pa_assert_se(s = i->userdata); pa_assert(!s->rtpoll_item); - s->rtpoll_item = pa_rtpoll_item_new(i->sink->thread_info.rtpoll, PA_RTPOLL_LATE, 1); - - p = pa_rtpoll_item_get_pollfd(s->rtpoll_item, NULL); - p->fd = s->rtp_context.fd; - p->events = POLLIN; - p->revents = 0; + s->rtpoll_item = pa_rtp_context_get_rtpoll_item(s->rtp_context, i->sink->thread_info.rtpoll); pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb); pa_rtpoll_item_set_userdata(s->rtpoll_item, s); @@ -582,7 +569,8 @@ static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_in pa_memblock_unref(silence.memblock); - pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec)); + if (!(s->rtp_context = pa_rtp_context_new_recv(fd, sdp_info->payload, pa_frame_size(&s->sdp_info.sample_spec)))) + goto fail; pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s); u->n_sessions++; @@ -617,7 +605,7 @@ static void session_free(struct session *s) { pa_memblockq_free(s->memblockq); pa_sdp_info_destroy(&s->sdp_info); - pa_rtp_context_destroy(&s->rtp_context); + pa_rtp_context_free(s->rtp_context); pa_xfree(s); } diff --git a/src/modules/rtp/module-rtp-send.c b/src/modules/rtp/module-rtp-send.c index b609b7c78..e647a9d3b 100644 --- a/src/modules/rtp/module-rtp-send.c +++ b/src/modules/rtp/module-rtp-send.c @@ -107,7 +107,7 @@ struct userdata { pa_source_output *source_output; pa_memblockq *memblockq; - pa_rtp_context rtp_context; + pa_rtp_context *rtp_context; pa_sap_context sap_context; pa_time_event *sap_event; @@ -143,7 +143,7 @@ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) return; } - pa_rtp_send(&u->rtp_context, u->memblockq); + pa_rtp_send(u->rtp_context, u->memblockq); } static pa_source_output_flags_t get_dont_inhibit_auto_suspend_flag(pa_source *source, @@ -488,11 +488,12 @@ int pa__init(pa_module*m) { pa_xfree(n); - if (pa_rtp_context_init_send(&u->rtp_context, fd, payload, mtu, pa_frame_size(&ss)) < 0) + if (!(u->rtp_context = pa_rtp_context_new_send(fd, payload, mtu, pa_frame_size(&ss)))) goto fail; pa_sap_context_init_send(&u->sap_context, sap_fd, p); - pa_log_info("RTP stream initialized with mtu %u on %s:%u from %s ttl=%u, SSRC=0x%08x, payload=%u, initial sequence #%u", mtu, dst_addr, port, src_addr, ttl, u->rtp_context.ssrc, payload, u->rtp_context.sequence); + pa_log_info("RTP stream initialized with mtu %u on %s:%u from %s ttl=%u, payload=%u", + mtu, dst_addr, port, src_addr, ttl, payload); pa_log_info("SDP-Data:\n%s\nEOF", p); pa_sap_send(&u->sap_context, 0); @@ -534,7 +535,7 @@ void pa__done(pa_module*m) { pa_source_output_unref(u->source_output); } - pa_rtp_context_destroy(&u->rtp_context); + pa_rtp_context_free(u->rtp_context); pa_sap_send(&u->sap_context, 1); pa_sap_context_destroy(&u->sap_context); diff --git a/src/modules/rtp/rtp.c b/src/modules/rtp/rtp.c index c5b131898..5a066d92b 100644 --- a/src/modules/rtp/rtp.c +++ b/src/modules/rtp/rtp.c @@ -40,13 +40,31 @@ #include #include #include +#include #include "rtp.h" -int pa_rtp_context_init_send(pa_rtp_context *c, int fd, uint8_t payload, size_t mtu, size_t frame_size) { - pa_assert(c); +typedef struct pa_rtp_context { + int fd; + uint16_t sequence; + uint32_t timestamp; + uint32_t ssrc; + uint8_t payload; + size_t frame_size; + size_t mtu; + + uint8_t *recv_buf; + size_t recv_buf_size; + pa_memchunk memchunk; +} pa_rtp_context; + +pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, size_t frame_size) { + pa_rtp_context *c; + pa_assert(fd >= 0); + c = pa_xnew0(pa_rtp_context, 1); + c->fd = fd; c->sequence = (uint16_t) (rand()*rand()); c->timestamp = 0; @@ -59,7 +77,7 @@ int pa_rtp_context_init_send(pa_rtp_context *c, int fd, uint8_t payload, size_t c->recv_buf_size = 0; pa_memchunk_reset(&c->memchunk); - return 0; + return c; } #define MAX_IOVECS 16 @@ -151,19 +169,23 @@ int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) { return 0; } -pa_rtp_context* pa_rtp_context_init_recv(pa_rtp_context *c, int fd, size_t frame_size) { - pa_assert(c); +pa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, size_t frame_size) { + pa_rtp_context *c; + + c = pa_xnew0(pa_rtp_context, 1); c->fd = fd; + c->payload = payload; c->frame_size = frame_size; c->recv_buf_size = 2000; c->recv_buf = pa_xmalloc(c->recv_buf_size); pa_memchunk_reset(&c->memchunk); + return c; } -int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, struct timeval *tstamp) { +int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp) { int size; size_t audio_length; size_t metadata_length; @@ -171,6 +193,8 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, struct struct cmsghdr *cm; struct iovec iov; uint32_t header; + uint32_t ssrc; + uint8_t payload; unsigned cc; ssize_t r; uint8_t aux[1024]; @@ -246,12 +270,12 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, struct } memcpy(&header, iov.iov_base, sizeof(uint32_t)); - memcpy(&c->timestamp, (uint8_t*) iov.iov_base + 4, sizeof(uint32_t)); - memcpy(&c->ssrc, (uint8_t*) iov.iov_base + 8, sizeof(uint32_t)); + memcpy(rtp_tstamp, (uint8_t*) iov.iov_base + 4, sizeof(uint32_t)); + memcpy(&ssrc, (uint8_t*) iov.iov_base + 8, sizeof(uint32_t)); header = ntohl(header); - c->timestamp = ntohl(c->timestamp); - c->ssrc = ntohl(c->ssrc); + *rtp_tstamp = ntohl(*rtp_tstamp); + ssrc = ntohl(c->ssrc); if ((header >> 30) != 2) { pa_log_warn("Unsupported RTP version."); @@ -268,12 +292,22 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, struct goto fail; } + if (ssrc != c->ssrc) { + pa_log_debug("Got unexpected SSRC"); + goto fail; + } + cc = (header >> 24) & 0xF; - c->payload = (uint8_t) ((header >> 16) & 127U); + payload = (uint8_t) ((header >> 16) & 127U); c->sequence = (uint16_t) (header & 0xFFFFU); metadata_length = 12 + cc * 4; + if (payload != c->payload) { + pa_log_debug("Got unexpected payload: %u", payload); + goto fail; + } + if (metadata_length > (unsigned) size) { pa_log_warn("RTP packet too short. (CSRC)"); goto fail; @@ -388,7 +422,7 @@ int pa_rtp_sample_spec_valid(const pa_sample_spec *ss) { return ss->format == PA_SAMPLE_S16BE; } -void pa_rtp_context_destroy(pa_rtp_context *c) { +void pa_rtp_context_free(pa_rtp_context *c) { pa_assert(c); pa_assert_se(pa_close(c->fd) == 0); @@ -397,8 +431,7 @@ void pa_rtp_context_destroy(pa_rtp_context *c) { pa_memblock_unref(c->memchunk.memblock); pa_xfree(c->recv_buf); - c->recv_buf = NULL; - c->recv_buf_size = 0; + pa_xfree(c); } const char* pa_rtp_format_to_string(pa_sample_format_t f) { @@ -418,3 +451,21 @@ pa_sample_format_t pa_rtp_string_to_format(const char *s) { else return PA_SAMPLE_INVALID; } + +size_t pa_rtp_context_get_frame_size(pa_rtp_context *c) { + return c->frame_size; +} + +pa_rtpoll_item* pa_rtp_context_get_rtpoll_item(pa_rtp_context *c, pa_rtpoll *rtpoll) { + pa_rtpoll_item *item; + struct pollfd *p; + + item = pa_rtpoll_item_new(rtpoll, PA_RTPOLL_LATE, 1); + + p = pa_rtpoll_item_get_pollfd(item, NULL); + p->fd = c->fd; + p->events = POLLIN; + p->revents = 0; + + return item; +} diff --git a/src/modules/rtp/rtp.h b/src/modules/rtp/rtp.h index a8e6d1e74..e3146ec07 100644 --- a/src/modules/rtp/rtp.h +++ b/src/modules/rtp/rtp.h @@ -25,31 +25,24 @@ #include #include #include +#include -typedef struct pa_rtp_context { - int fd; - uint16_t sequence; - uint32_t timestamp; - uint32_t ssrc; - uint8_t payload; - size_t frame_size; - size_t mtu; - - uint8_t *recv_buf; - size_t recv_buf_size; - pa_memchunk memchunk; -} pa_rtp_context; +typedef struct pa_rtp_context pa_rtp_context; int pa_rtp_context_init_send(pa_rtp_context *c, int fd, uint8_t payload, size_t mtu, size_t frame_size); +pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, size_t frame_size); /* If the memblockq doesn't have a silence memchunk set, then the caller must * guarantee that the current read index doesn't point to a hole. */ int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q); -pa_rtp_context* pa_rtp_context_init_recv(pa_rtp_context *c, int fd, size_t frame_size); -int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, struct timeval *tstamp); +pa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, size_t frame_size); +int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp); -void pa_rtp_context_destroy(pa_rtp_context *c); +void pa_rtp_context_free(pa_rtp_context *c); + +size_t pa_rtp_context_get_frame_size(pa_rtp_context *c); +pa_rtpoll_item* pa_rtp_context_get_rtpoll_item(pa_rtp_context *c, pa_rtpoll *rtpoll); pa_sample_spec* pa_rtp_sample_spec_fixup(pa_sample_spec *ss); int pa_rtp_sample_spec_valid(const pa_sample_spec *ss);