diff --git a/pinos/client/connection.c b/pinos/client/connection.c index 3411cf6b5..27aed319d 100644 --- a/pinos/client/connection.c +++ b/pinos/client/connection.c @@ -151,6 +151,15 @@ connection_parse_node_command (PinosConnection *conn, PinosControlCmdNodeCommand cmd->command = SPA_MEMBER (p, SPA_PTR_TO_INT (cmd->command), SpaNodeCommand); } +static void +connection_parse_port_command (PinosConnection *conn, PinosControlCmdPortCommand *cmd) +{ + void *p = conn->in.data; + memcpy (cmd, p, sizeof (PinosControlCmdPortCommand)); + if (cmd->command) + cmd->command = SPA_MEMBER (p, SPA_PTR_TO_INT (cmd->command), SpaNodeCommand); +} + static void * connection_ensure_size (PinosConnection *conn, ConnectionBuffer *buf, size_t size) { @@ -363,6 +372,27 @@ connection_add_node_command (PinosConnection *conn, PinosControlCmdNodeCommand * memcpy (p, cm->command, cm->command->size); } +static void +connection_add_port_command (PinosConnection *conn, PinosControlCmdPortCommand *cm) +{ + size_t len; + void *p; + PinosControlCmdPortCommand *d; + + /* calculate length */ + len = sizeof (PinosControlCmdPortCommand); + len += cm->command->size; + + p = connection_add_cmd (conn, PINOS_CONTROL_CMD_PORT_COMMAND, len); + memcpy (p, cm, sizeof (PinosControlCmdPortCommand)); + d = p; + + p = SPA_MEMBER (d, sizeof (PinosControlCmdPortCommand), void); + d->command = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); + + memcpy (p, cm->command, cm->command->size); +} + static gboolean refill_buffer (PinosConnection *conn, ConnectionBuffer *buf) { @@ -595,6 +625,10 @@ pinos_connection_parse_cmd (PinosConnection *conn, connection_parse_node_command (conn, command); break; + case PINOS_CONTROL_CMD_PORT_COMMAND: + connection_parse_port_command (conn, command); + break; + case PINOS_CONTROL_CMD_INVALID: return FALSE; } @@ -742,6 +776,10 @@ pinos_connection_add_cmd (PinosConnection *conn, connection_add_node_command (conn, command); break; + case PINOS_CONTROL_CMD_PORT_COMMAND: + connection_add_port_command (conn, command); + break; + case PINOS_CONTROL_CMD_INVALID: return FALSE; } diff --git a/pinos/client/connection.h b/pinos/client/connection.h index 165e58d9d..6517ace87 100644 --- a/pinos/client/connection.h +++ b/pinos/client/connection.h @@ -49,6 +49,7 @@ typedef enum { PINOS_CONTROL_CMD_SET_PROPERTY = 35, PINOS_CONTROL_CMD_NODE_COMMAND = 36, + PINOS_CONTROL_CMD_PORT_COMMAND = 37, /* both */ PINOS_CONTROL_CMD_ADD_MEM = 64, @@ -132,6 +133,12 @@ typedef struct { SpaNodeCommand *command; } PinosControlCmdNodeCommand; +/* PINOS_CONTROL_CMD_PORT_COMMAND */ +typedef struct { + uint32_t port_id; + SpaNodeCommand *command; +} PinosControlCmdPortCommand; + /* PINOS_CONTROL_CMD_ADD_MEM */ typedef struct { SpaDirection direction; diff --git a/pinos/client/introspect.h b/pinos/client/introspect.h index bad88de0c..d41873830 100644 --- a/pinos/client/introspect.h +++ b/pinos/client/introspect.h @@ -23,6 +23,8 @@ #include #include +#include + G_BEGIN_DECLS /** @@ -59,9 +61,9 @@ const gchar * pinos_node_state_as_string (PinosNodeState state); * The direction of a port */ typedef enum { - PINOS_DIRECTION_INVALID = 0, - PINOS_DIRECTION_INPUT = 1, - PINOS_DIRECTION_OUTPUT = 2 + PINOS_DIRECTION_INVALID = SPA_DIRECTION_INVALID, + PINOS_DIRECTION_INPUT = SPA_DIRECTION_INPUT, + PINOS_DIRECTION_OUTPUT = SPA_DIRECTION_OUTPUT } PinosDirection; const gchar * pinos_direction_as_string (PinosDirection direction); diff --git a/pinos/client/meson.build b/pinos/client/meson.build index dc12c9621..d6ed7a236 100644 --- a/pinos/client/meson.build +++ b/pinos/client/meson.build @@ -4,8 +4,9 @@ pinos_headers = [ 'introspect.h', 'pinos.h', 'properties.h', - 'stream.h', 'ringbuffer.h', + 'rtkit.h', + 'stream.h', 'subscribe.h', 'thread-mainloop.h', ] @@ -20,6 +21,7 @@ pinos_sources = [ 'stream.c', 'pinos.c', 'ringbuffer.c', + 'rtkit.c', 'subscribe.c', 'thread-mainloop.c', gdbus_target, diff --git a/pinos/client/rtkit.c b/pinos/client/rtkit.c new file mode 100644 index 000000000..cb2f51a24 --- /dev/null +++ b/pinos/client/rtkit.c @@ -0,0 +1,104 @@ +/* Pinos + * Copyright (C) 2015 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include + +#include "rtkit.h" + +static pid_t _gettid(void) { + return (pid_t) syscall(SYS_gettid); +} + +gboolean +pinos_rtkit_make_realtime (GDBusConnection *system_bus, + pid_t thread, + gint priority, + GError **error) +{ + GVariant *v; + + if (thread == 0) + thread = _gettid(); + + v = g_dbus_connection_call_sync (system_bus, + RTKIT_SERVICE_NAME, + RTKIT_OBJECT_PATH, + "org.freedesktop.RealtimeKit1", + "MakeThreadRealtime", + g_variant_new ("(tu)", + (guint64) thread, + (guint32) priority), + NULL, + G_DBUS_CALL_FLAGS_NONE, + -1, + NULL, + error); + if (v) + g_variant_unref (v); + + return v != NULL; +} + +gboolean +pinos_rtkit_make_high_priority (GDBusConnection *system_bus, + pid_t thread, + gint nice_level, + GError **error) +{ + GVariant *v; + + if (thread == 0) + thread = _gettid(); + + v = g_dbus_connection_call_sync (system_bus, + RTKIT_SERVICE_NAME, + RTKIT_OBJECT_PATH, + "org.freedesktop.RealtimeKit1", + "MakeThreadHighPriority", + g_variant_new ("(tu)", + (guint64) thread, + (guint32) nice_level), + NULL, + G_DBUS_CALL_FLAGS_NONE, + -1, + NULL, + error); + if (v) + g_variant_unref (v); + + return v != NULL; +} + +int pinos_rtkit_get_max_realtime_priority (GDBusConnection *system_bus) +{ + return 0; +} + +int pinos_rtkit_get_min_nice_level (GDBusConnection *system_bus, int* min_nice_level) +{ + return 0; +} + +/* Return the maximum value of RLIMIT_RTTIME to set before attempting a + * realtime request. A negative value is an errno style error code. + */ +long long rtkit_get_rttime_usec_max (GDBusConnection *system_bus) +{ + return 0; +} diff --git a/pinos/client/rtkit.h b/pinos/client/rtkit.h new file mode 100644 index 000000000..87b3da923 --- /dev/null +++ b/pinos/client/rtkit.h @@ -0,0 +1,68 @@ +/* Pinos + * Copyright (C) 2015 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __PINOS_RTKIT_H__ +#define __PINOS_RTKIT_H__ + +#include +#include + +G_BEGIN_DECLS + +#define RTKIT_SERVICE_NAME "org.freedesktop.RealtimeKit1" +#define RTKIT_OBJECT_PATH "/org/freedesktop/RealtimeKit1" + +/* This is mostly equivalent to sched_setparam(thread, SCHED_RR, { + * .sched_priority = priority }). 'thread' needs to be a kernel thread + * id as returned by gettid(), not a pthread_t! If 'thread' is 0 the + * current thread is used. + */ +gboolean pinos_rtkit_make_realtime (GDBusConnection *system_bus, + pid_t thread, + gint priority, + GError **error); + + +/* This is mostly equivalent to setpriority(PRIO_PROCESS, thread, + * nice_level). 'thread' needs to be a kernel thread id as returned by + * gettid(), not a pthread_t! If 'thread' is 0 the current thread is + * used. */ +gboolean pinos_rtkit_make_high_priority (GDBusConnection *system_bus, + pid_t thread, + gint nice_level, + GError **error); + +/* Return the maximum value of realtime priority available. Realtime requests + * above this value will fail. A negative value is an errno style error code. + */ +int pinos_rtkit_get_max_realtime_priority (GDBusConnection *system_bus); + +/* Retreive the minimum value of nice level available. High prio requests + * below this value will fail. The returned value is a negative errno + * style error code, or 0 on success.*/ +int pinos_rtkit_get_min_nice_level (GDBusConnection *system_bus, int* min_nice_level); + +/* Return the maximum value of RLIMIT_RTTIME to set before attempting a + * realtime request. A negative value is an errno style error code. + */ +long long rtkit_get_rttime_usec_max (GDBusConnection *system_bus); + +G_END_DECLS + +#endif /* __PINOS_RTKIT_H__ */ diff --git a/pinos/client/stream.c b/pinos/client/stream.c index 3b6f5675c..57cc74295 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -874,7 +874,7 @@ handle_node_command (PinosStream *stream, break; case SPA_NODE_COMMAND_PAUSE: { - g_debug ("stream %p: pause", stream); + g_debug ("stream %p: pause %d", stream, seq); add_state_change (stream, SPA_NODE_STATE_PAUSED); add_async_complete (stream, seq, SPA_RESULT_OK); @@ -882,12 +882,12 @@ handle_node_command (PinosStream *stream, if (!pinos_connection_flush (priv->conn)) g_warning ("stream %p: error writing connection", stream); - stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL); + stream_set_state (stream, PINOS_STREAM_STATE_PAUSED, NULL); break; } case SPA_NODE_COMMAND_START: { - g_debug ("stream %p: start", stream); + g_debug ("stream %p: start %d", stream, seq); add_state_change (stream, SPA_NODE_STATE_STREAMING); add_async_complete (stream, seq, SPA_RESULT_OK); @@ -911,7 +911,6 @@ handle_node_command (PinosStream *stream, g_warning ("stream %p: error writing connection", stream); break; } - case SPA_NODE_COMMAND_CLOCK_UPDATE: { SpaNodeCommandClockUpdate *cu = (SpaNodeCommandClockUpdate *) command; @@ -945,7 +944,7 @@ parse_connection (PinosStream *stream) case PINOS_CONTROL_CMD_PORT_STATUS_CHANGE: case PINOS_CONTROL_CMD_NODE_STATE_CHANGE: case PINOS_CONTROL_CMD_PROCESS_BUFFER: - g_warning ("got unexpected connection %d", cmd); + g_warning ("got unexpected command %d", cmd); break; case PINOS_CONTROL_CMD_ADD_PORT: @@ -968,6 +967,7 @@ parse_connection (PinosStream *stream) priv->pending_seq = p.seq; g_object_notify (G_OBJECT (stream), "format"); + stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL); break; } case PINOS_CONTROL_CMD_SET_PROPERTY: @@ -1114,6 +1114,11 @@ parse_connection (PinosStream *stream) if (!pinos_connection_flush (conn)) g_warning ("stream %p: error writing connection", stream); + + if (p.n_buffers) + stream_set_state (stream, PINOS_STREAM_STATE_PAUSED, NULL); + else + stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL); break; } case PINOS_CONTROL_CMD_NODE_EVENT: @@ -1136,7 +1141,15 @@ parse_connection (PinosStream *stream) handle_node_command (stream, p.seq, p.command); break; } + case PINOS_CONTROL_CMD_PORT_COMMAND: + { + PinosControlCmdPortCommand p; + if (!pinos_connection_parse_cmd (conn, &p)) + break; + + break; + } case PINOS_CONTROL_CMD_INVALID: g_warning ("unhandled command %d", cmd); break; @@ -1167,7 +1180,7 @@ parse_rtconnection (PinosStream *stream) case PINOS_CONTROL_CMD_ADD_MEM: case PINOS_CONTROL_CMD_USE_BUFFERS: case PINOS_CONTROL_CMD_NODE_COMMAND: - g_warning ("got unexpected connection %d", cmd); + g_warning ("got unexpected command %d", cmd); break; case PINOS_CONTROL_CMD_PROCESS_BUFFER: @@ -1202,6 +1215,10 @@ parse_rtconnection (PinosStream *stream) handle_rtnode_event (stream, p.event); break; } + case PINOS_CONTROL_CMD_PORT_COMMAND: + { + break; + } } } @@ -1344,7 +1361,7 @@ on_node_proxy (GObject *source_object, do_node_init (stream); - stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL); + stream_set_state (stream, PINOS_STREAM_STATE_CONFIGURE, NULL); g_object_unref (stream); return; @@ -1664,9 +1681,7 @@ pinos_stream_start (PinosStream *stream) g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE); priv = stream->priv; - g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_READY, FALSE); - - stream_set_state (stream, PINOS_STREAM_STATE_STARTING, NULL); + //g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_PAUSED, FALSE); g_main_context_invoke (priv->context->priv->context, (GSourceFunc) do_start, @@ -1709,8 +1724,8 @@ pinos_stream_stop (PinosStream *stream) static void on_node_removed (GObject *source_object, - GAsyncResult *res, - gpointer user_data) + GAsyncResult *res, + gpointer user_data) { PinosStream *stream = user_data; PinosStreamPrivate *priv = stream->priv; diff --git a/pinos/client/stream.h b/pinos/client/stream.h index 1ac07ff29..6237dc786 100644 --- a/pinos/client/stream.h +++ b/pinos/client/stream.h @@ -47,9 +47,10 @@ typedef enum { PINOS_STREAM_STATE_ERROR = -1, PINOS_STREAM_STATE_UNCONNECTED = 0, PINOS_STREAM_STATE_CONNECTING = 1, - PINOS_STREAM_STATE_READY = 2, - PINOS_STREAM_STATE_STARTING = 3, - PINOS_STREAM_STATE_STREAMING = 4 + PINOS_STREAM_STATE_CONFIGURE = 2, + PINOS_STREAM_STATE_READY = 3, + PINOS_STREAM_STATE_PAUSED = 4, + PINOS_STREAM_STATE_STREAMING = 5 } PinosStreamState; const gchar * pinos_stream_state_as_string (PinosStreamState state); diff --git a/pinos/gst/gstpinospool.c b/pinos/gst/gstpinospool.c index 2019bb11d..b941e6ec3 100644 --- a/pinos/gst/gstpinospool.c +++ b/pinos/gst/gstpinospool.c @@ -92,6 +92,7 @@ release_buffer (GstBufferPool * pool, GstBuffer *buffer) GST_DEBUG ("release buffer %p", buffer); GST_OBJECT_LOCK (pool); g_queue_push_tail (&p->available, buffer); + g_cond_signal (&p->cond); GST_OBJECT_UNLOCK (pool); } diff --git a/pinos/gst/gstpinossink.c b/pinos/gst/gstpinossink.c index 9c8198036..224876a21 100644 --- a/pinos/gst/gstpinossink.c +++ b/pinos/gst/gstpinossink.c @@ -450,6 +450,7 @@ on_new_buffer (GObject *gobject, buf = g_hash_table_lookup (pinossink->buf_ids, GINT_TO_POINTER (id)); if (buf) { + gst_buffer_unref (buf); pinos_thread_main_loop_signal (pinossink->loop, FALSE); } } @@ -469,9 +470,10 @@ on_stream_notify (GObject *gobject, switch (state) { case PINOS_STREAM_STATE_UNCONNECTED: case PINOS_STREAM_STATE_CONNECTING: - case PINOS_STREAM_STATE_STARTING: - case PINOS_STREAM_STATE_STREAMING: + case PINOS_STREAM_STATE_CONFIGURE: case PINOS_STREAM_STATE_READY: + case PINOS_STREAM_STATE_PAUSED: + case PINOS_STREAM_STATE_STREAMING: break; case PINOS_STREAM_STATE_ERROR: GST_ELEMENT_ERROR (pinossink, RESOURCE, FAILED, @@ -487,6 +489,7 @@ on_format_notify (GObject *gobject, GParamSpec *pspec, gpointer user_data) { +#if 0 GstPinosSink *pinossink = user_data; GstStructure *config; GstCaps *caps; @@ -514,6 +517,7 @@ on_format_notify (GObject *gobject, param_meta_enable.type = SPA_META_TYPE_HEADER; pinos_stream_finish_format (pinossink->stream, SPA_RESULT_OK, port_params, 2); +#endif } static gboolean @@ -634,10 +638,12 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer) d->offset = mem->offset; d->size = mem->size; } + gst_buffer_ref (buffer); if (!(res = pinos_stream_send_buffer (pinossink->stream, data->id))) g_warning ("can't send buffer"); + done: pinos_thread_main_loop_unlock (pinossink->loop); diff --git a/pinos/gst/gstpinossrc.c b/pinos/gst/gstpinossrc.c index 1e6d17c3d..e2fa1abcd 100644 --- a/pinos/gst/gstpinossrc.c +++ b/pinos/gst/gstpinossrc.c @@ -451,38 +451,39 @@ on_new_buffer (GObject *gobject, { GstPinosSrc *pinossrc = user_data; GstBuffer *buf; + ProcessMemData *data; + SpaMetaHeader *h; + guint i; GST_LOG_OBJECT (pinossrc, "got new buffer"); buf = g_hash_table_lookup (pinossrc->buf_ids, GINT_TO_POINTER (id)); - - if (buf) { - ProcessMemData *data; - SpaMetaHeader *h; - guint i; - - data = gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (buf), - process_mem_data_quark); - h = data->header; - if (h) { - GST_INFO ("pts %" G_GUINT64_FORMAT ", dts_offset %"G_GUINT64_FORMAT, h->pts, h->dts_offset); - - if (GST_CLOCK_TIME_IS_VALID (h->pts)) { - GST_BUFFER_PTS (buf) = h->pts; - if (GST_BUFFER_PTS (buf) + h->dts_offset > 0) - GST_BUFFER_DTS (buf) = GST_BUFFER_PTS (buf) + h->dts_offset; - } - GST_BUFFER_OFFSET (buf) = h->seq; - } - for (i = 0; i < data->buf->n_datas; i++) { - SpaData *d = &data->buf->datas[i]; - GstMemory *mem = gst_buffer_peek_memory (buf, i); - mem->offset = d->offset; - mem->size = d->size; - } - g_queue_push_tail (&pinossrc->queue, buf); - - pinos_thread_main_loop_signal (pinossrc->loop, FALSE); + if (buf == NULL) { + g_warning ("unknown buffer %d", id); + return; } + + data = gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (buf), + process_mem_data_quark); + h = data->header; + if (h) { + GST_INFO ("pts %" G_GUINT64_FORMAT ", dts_offset %"G_GUINT64_FORMAT, h->pts, h->dts_offset); + + if (GST_CLOCK_TIME_IS_VALID (h->pts)) { + GST_BUFFER_PTS (buf) = h->pts; + if (GST_BUFFER_PTS (buf) + h->dts_offset > 0) + GST_BUFFER_DTS (buf) = GST_BUFFER_PTS (buf) + h->dts_offset; + } + GST_BUFFER_OFFSET (buf) = h->seq; + } + for (i = 0; i < data->buf->n_datas; i++) { + SpaData *d = &data->buf->datas[i]; + GstMemory *mem = gst_buffer_peek_memory (buf, i); + mem->offset = d->offset; + mem->size = d->size; + } + g_queue_push_tail (&pinossrc->queue, buf); + + pinos_thread_main_loop_signal (pinossrc->loop, FALSE); return; } @@ -494,14 +495,15 @@ on_stream_notify (GObject *gobject, GstPinosSrc *pinossrc = user_data; PinosStreamState state = pinos_stream_get_state (pinossrc->stream); - GST_DEBUG ("got stream state %d", state); + GST_DEBUG ("got stream state %s", pinos_stream_state_as_string (state)); switch (state) { case PINOS_STREAM_STATE_UNCONNECTED: case PINOS_STREAM_STATE_CONNECTING: - case PINOS_STREAM_STATE_STARTING: - case PINOS_STREAM_STATE_STREAMING: + case PINOS_STREAM_STATE_CONFIGURE: case PINOS_STREAM_STATE_READY: + case PINOS_STREAM_STATE_PAUSED: + case PINOS_STREAM_STATE_STREAMING: break; case PINOS_STREAM_STATE_ERROR: GST_ELEMENT_ERROR (pinossrc, RESOURCE, FAILED, @@ -529,6 +531,8 @@ parse_stream_properties (GstPinosSrc *pinossrc, PinosProperties *props) pinossrc->max_latency = var ? (GstClockTime) atoi (var) : GST_CLOCK_TIME_NONE; GST_OBJECT_UNLOCK (pinossrc); + GST_DEBUG_OBJECT (pinossrc, "live %d", is_live); + gst_base_src_set_live (GST_BASE_SRC (pinossrc), is_live); } @@ -539,10 +543,12 @@ gst_pinos_src_stream_start (GstPinosSrc *pinossrc) PinosProperties *props; pinos_thread_main_loop_lock (pinossrc->loop); + GST_DEBUG_OBJECT (pinossrc, "doing stream start"); res = pinos_stream_start (pinossrc->stream); while (TRUE) { PinosStreamState state = pinos_stream_get_state (pinossrc->stream); + GST_DEBUG_OBJECT (pinossrc, "waiting for STREAMING, now %s", pinos_stream_state_as_string (state)); if (state == PINOS_STREAM_STATE_STREAMING) break; @@ -559,6 +565,7 @@ gst_pinos_src_stream_start (GstPinosSrc *pinossrc) pinos_properties_free (props); pinos_thread_main_loop_lock (pinossrc->loop); + GST_DEBUG_OBJECT (pinossrc, "signal started"); pinossrc->started = TRUE; pinos_thread_main_loop_signal (pinossrc->loop, FALSE); pinos_thread_main_loop_unlock (pinossrc->loop); @@ -582,6 +589,8 @@ wait_negotiated (GstPinosSrc *this) while (TRUE) { state = pinos_stream_get_state (this->stream); + GST_DEBUG_OBJECT (this, "waiting for started signal, state now %s", + pinos_stream_state_as_string (state)); if (state == PINOS_STREAM_STATE_ERROR) break; @@ -590,6 +599,7 @@ wait_negotiated (GstPinosSrc *this) pinos_thread_main_loop_wait (this->loop); } + GST_DEBUG_OBJECT (this, "got started signal"); pinos_thread_main_loop_unlock (this->loop); return state; @@ -642,6 +652,7 @@ gst_pinos_src_negotiate (GstBaseSrc * basesrc) while (TRUE) { PinosStreamState state = pinos_stream_get_state (pinossrc->stream); + GST_DEBUG_OBJECT (basesrc, "waiting for UNCONNECTED, now %s", pinos_stream_state_as_string (state)); if (state == PINOS_STREAM_STATE_UNCONNECTED) break; @@ -665,7 +676,9 @@ gst_pinos_src_negotiate (GstBaseSrc * basesrc) while (TRUE) { PinosStreamState state = pinos_stream_get_state (pinossrc->stream); - if (state == PINOS_STREAM_STATE_READY) + GST_DEBUG_OBJECT (basesrc, "waiting for PAUSED, now %s", pinos_stream_state_as_string (state)); + if (state == PINOS_STREAM_STATE_PAUSED || + state == PINOS_STREAM_STATE_STREAMING) break; if (state == PINOS_STREAM_STATE_ERROR) @@ -726,6 +739,7 @@ on_format_notify (GObject *gobject, g_object_get (gobject, "format", &format, NULL); caps = gst_caps_from_format (format); + GST_DEBUG_OBJECT (pinossrc, "we got format %" GST_PTR_FORMAT, caps); res = gst_base_src_set_caps (GST_BASE_SRC (pinossrc), caps); gst_caps_unref (caps); @@ -738,8 +752,10 @@ on_format_notify (GObject *gobject, param_meta.param.size = sizeof (param_meta); param_meta.type = SPA_META_TYPE_HEADER; + GST_DEBUG_OBJECT (pinossrc, "doing finish format"); pinos_stream_finish_format (pinossrc->stream, SPA_RESULT_OK, params, 1); } else { + GST_WARNING_OBJECT (pinossrc, "finish format with error"); pinos_stream_finish_format (pinossrc->stream, SPA_RESULT_INVALID_MEDIA_TYPE, NULL, 0); } } @@ -948,7 +964,7 @@ on_context_notify (GObject *gobject, GstPinosSrc *pinossrc = user_data; PinosContextState state = pinos_context_get_state (pinossrc->ctx); - GST_DEBUG ("got context state %d", state); + GST_DEBUG ("got context state %s", pinos_context_state_as_string (state)); switch (state) { case PINOS_CONTEXT_STATE_UNCONNECTED: @@ -1000,6 +1016,7 @@ gst_pinos_src_open (GstPinosSrc * pinossrc) while (TRUE) { PinosContextState state = pinos_context_get_state (pinossrc->ctx); + GST_DEBUG ("waiting for CONNECTED, now %s", pinos_context_state_as_string (state)); if (state == PINOS_CONTEXT_STATE_CONNECTED) break; diff --git a/pinos/modules/spa/spa-alsa-monitor.c b/pinos/modules/spa/spa-alsa-monitor.c index 5a87d32f1..a947985bd 100644 --- a/pinos/modules/spa/spa-alsa-monitor.c +++ b/pinos/modules/spa/spa-alsa-monitor.c @@ -125,8 +125,8 @@ add_item (PinosSpaALSAMonitor *this, SpaMonitorItem *item) return; } if ((res = spa_handle_get_interface (handle, priv->uri.clock, &clock_iface)) < 0) { - g_error ("can't get CLOCK interface: %d", res); - return; + g_debug ("can't get CLOCK interface: %d", res); + clock_iface = NULL; } if (item->info) { diff --git a/pinos/server/client-node.c b/pinos/server/client-node.c index 4ae6b2edd..fce02e112 100644 --- a/pinos/server/client-node.c +++ b/pinos/server/client-node.c @@ -45,7 +45,7 @@ #define MAX_INPUTS 64 #define MAX_OUTPUTS 64 -#define MAX_BUFFERS 16 +#define MAX_BUFFERS 64 #define CHECK_IN_PORT_ID(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) < MAX_INPUTS) #define CHECK_OUT_PORT_ID(this,d,p) ((d) == SPA_DIRECTION_OUTPUT && (p) < MAX_OUTPUTS) @@ -57,16 +57,20 @@ #define CHECK_OUT_PORT(this,d,p) (CHECK_OUT_PORT_ID(this,d,p) && (this)->out_ports[p].valid) #define CHECK_PORT(this,d,p) (CHECK_IN_PORT (this,d,p) || CHECK_OUT_PORT (this,d,p)) +#define CHECK_PORT_BUFFER(this,b,p) (b < p->n_buffers) + typedef struct _SpaProxy SpaProxy; typedef struct _ProxyBuffer ProxyBuffer; struct _ProxyBuffer { - SpaBuffer *outbuf; - SpaBuffer buffer; - SpaMeta metas[4]; - SpaData datas[4]; - off_t offset; - size_t size; + SpaBuffer *outbuf; + SpaBuffer buffer; + SpaMeta metas[4]; + SpaData datas[4]; + off_t offset; + size_t size; + bool outstanding; + ProxyBuffer *next; }; typedef struct { @@ -85,7 +89,6 @@ typedef struct { size_t buffer_mem_size; void *buffer_mem_ptr; - uint32_t buffer_id; SpaQueue ready; } SpaProxyPort; @@ -654,7 +657,7 @@ spa_proxy_node_port_use_buffers (SpaNode *node, b->buffer.datas = b->datas; b->buffer.metas = b->metas; - b->size = pinos_serialize_buffer_get_size (buffers[i]); + b->size = SPA_ROUND_UP_N (pinos_serialize_buffer_get_size (buffers[i]), 64); b->offset = size; for (j = 0; j < buffers[i]->n_metas; j++) { @@ -810,7 +813,7 @@ copy_meta_in (SpaProxy *this, SpaProxyPort *port, uint32_t buffer_id) unsigned int i; for (i = 0; i < b->outbuf->n_metas; i++) { - SpaMeta *sm = &b->metas[i]; + SpaMeta *sm = &b->buffer.metas[i]; SpaMeta *dm = &b->outbuf->metas[i]; memcpy (dm->data, sm->data, dm->size); } @@ -852,7 +855,6 @@ spa_proxy_node_port_push_input (SpaNode *node, SpaProxyPort *port; unsigned int i; bool have_error = false; - bool have_enough = false; PinosControlCmdProcessBuffer pb; if (node == NULL || n_info == 0 || info == NULL) @@ -874,7 +876,7 @@ spa_proxy_node_port_push_input (SpaNode *node, have_error = true; continue; } - if (info[i].buffer_id >= port->n_buffers) { + if (!CHECK_PORT_BUFFER (this, info[i].buffer_id, port)) { if (port->n_buffers == 0) info[i].status = SPA_RESULT_NO_BUFFERS; else @@ -895,8 +897,6 @@ spa_proxy_node_port_push_input (SpaNode *node, if (have_error) return SPA_RESULT_ERROR; - if (have_enough) - return SPA_RESULT_HAVE_ENOUGH_INPUT; if (!pinos_connection_flush (this->rtconn)) spa_log_error (this->log, "proxy %p: error writing connection\n", this); @@ -913,7 +913,6 @@ spa_proxy_node_port_pull_output (SpaNode *node, SpaProxyPort *port; unsigned int i; bool have_error = false; - bool need_more = false; if (node == NULL || n_info == 0 || info == NULL) return SPA_RESULT_INVALID_ARGUMENTS; @@ -921,6 +920,8 @@ spa_proxy_node_port_pull_output (SpaNode *node, this = SPA_CONTAINER_OF (node, SpaProxy, node); for (i = 0; i < n_info; i++) { + ProxyBuffer *b; + if (!CHECK_OUT_PORT (this, SPA_DIRECTION_OUTPUT, info[i].port_id)) { spa_log_warn (this->log, "invalid port %u\n", info[i].port_id); info[i].status = SPA_RESULT_INVALID_PORT; @@ -936,15 +937,19 @@ spa_proxy_node_port_pull_output (SpaNode *node, continue; } - info[i].buffer_id = port->buffer_id; - info[i].status = SPA_RESULT_OK; + SPA_QUEUE_POP_HEAD (&port->ready, ProxyBuffer, next, b); + if (b == NULL) { + info[i].status = SPA_RESULT_UNEXPECTED; + have_error = true; + continue; + } + b->outstanding = true; - port->buffer_id = SPA_ID_INVALID; + info[i].buffer_id = b->outbuf->id; + info[i].status = SPA_RESULT_OK; } if (have_error) return SPA_RESULT_ERROR; - if (need_more) - return SPA_RESULT_NEED_MORE_INPUT; return SPA_RESULT_OK; } @@ -987,6 +992,7 @@ spa_proxy_node_port_send_command (SpaNode *node, SpaNodeCommand *command) { SpaProxy *this; + SpaResult res = SPA_RESULT_OK; if (node == NULL || command == NULL) return SPA_RESULT_INVALID_ARGUMENTS; @@ -994,11 +1000,34 @@ spa_proxy_node_port_send_command (SpaNode *node, this = SPA_CONTAINER_OF (node, SpaProxy, node); switch (command->type) { + case SPA_NODE_COMMAND_INVALID: + return SPA_RESULT_INVALID_COMMAND; + + case SPA_NODE_COMMAND_START: + case SPA_NODE_COMMAND_PAUSE: + case SPA_NODE_COMMAND_FLUSH: + case SPA_NODE_COMMAND_DRAIN: + case SPA_NODE_COMMAND_MARKER: + { + PinosControlCmdPortCommand cpc; + + cpc.port_id = port_id; + cpc.command = command; + pinos_connection_add_cmd (this->rtconn, PINOS_CONTROL_CMD_PORT_COMMAND, &cpc); + + if (!pinos_connection_flush (this->rtconn)) { + spa_log_error (this->log, "proxy %p: error writing connection\n", this); + res = SPA_RESULT_ERROR; + } + break; + } + default: spa_log_warn (this->log, "unhandled command %d\n", command->type); + res = SPA_RESULT_NOT_IMPLEMENTED; break; } - return SPA_RESULT_NOT_IMPLEMENTED; + return res; } static SpaResult @@ -1039,6 +1068,7 @@ parse_connection (SpaProxy *this) case PINOS_CONTROL_CMD_SET_FORMAT: case PINOS_CONTROL_CMD_SET_PROPERTY: case PINOS_CONTROL_CMD_NODE_COMMAND: + case PINOS_CONTROL_CMD_PORT_COMMAND: case PINOS_CONTROL_CMD_PROCESS_BUFFER: spa_log_error (this->log, "proxy %p: got unexpected command %d\n", this, cmd); break; @@ -1154,6 +1184,7 @@ parse_rtconnection (SpaProxy *this) { PinosControlCmdProcessBuffer cmd; SpaProxyPort *port; + ProxyBuffer *b; if (!pinos_connection_parse_cmd (conn, &cmd)) break; @@ -1163,12 +1194,14 @@ parse_rtconnection (SpaProxy *this) port = cmd.direction == SPA_DIRECTION_INPUT ? &this->in_ports[cmd.port_id] : &this->out_ports[cmd.port_id]; - if (port->buffer_id != SPA_ID_INVALID) - spa_log_warn (this->log, "proxy %p: unprocessed buffer: %d\n", this, port->buffer_id); + if (!CHECK_PORT_BUFFER (this, cmd.buffer_id, port)) + break; copy_meta_in (this, port, cmd.buffer_id); - port->buffer_id = cmd.buffer_id; + b = &port->buffers[cmd.buffer_id]; + b->next = NULL; + SPA_QUEUE_PUSH_TAIL (&port->ready, ProxyBuffer, next, b); break; } case PINOS_CONTROL_CMD_NODE_EVENT: @@ -1181,6 +1214,15 @@ parse_rtconnection (SpaProxy *this) handle_node_event (this, cne.event); break; } + case PINOS_CONTROL_CMD_PORT_COMMAND: + { + PinosControlCmdPortCommand cm; + + if (!pinos_connection_parse_cmd (conn, &cm)) + break; + + break; + } } } diff --git a/pinos/server/daemon.c b/pinos/server/daemon.c index 0d4901eca..eeb5b2ac2 100644 --- a/pinos/server/daemon.c +++ b/pinos/server/daemon.c @@ -201,28 +201,14 @@ no_node: static void -on_node_remove_signal (PinosNode *node, +on_link_port_unlinked (PinosLink *link, + PinosPort *port, PinosDaemon *this) { - g_debug ("daemon %p: node %p remove", this, node); -} + g_debug ("daemon %p: link %p: port %p unlinked", this, link, port); -static void -on_link_input_unlinked (PinosLink *link, - PinosPort *port, - PinosDaemon *this) -{ - g_debug ("daemon %p: link %p: input unlinked", this, link); -} - -static void -on_link_output_unlinked (PinosLink *link, - PinosPort *port, - PinosDaemon *this) -{ - g_debug ("daemon %p: link %p: output unlinked", this, link); - - try_link_port (link->input->node, link->input, this); + if (port->direction == PINOS_DIRECTION_OUTPUT && link->input) + try_link_port (link->input->node, link->input, this); } static void @@ -300,9 +286,9 @@ try_link_port (PinosNode *node, PinosPort *port, PinosDaemon *this) goto error; if (port->direction == PINOS_DIRECTION_OUTPUT) - link = pinos_node_link (node, port, target, NULL, NULL, &error); + link = pinos_port_link (port, target, NULL, NULL, &error); else - link = pinos_node_link (target->node, target, port, NULL, NULL, &error); + link = pinos_port_link (target, port, NULL, NULL, &error); if (link == NULL) goto error; @@ -311,9 +297,7 @@ try_link_port (PinosNode *node, PinosPort *port, PinosDaemon *this) if (client) pinos_client_add_object (client, G_OBJECT (link)); - g_signal_connect (target->node, "remove", (GCallback) on_node_remove_signal, this); - g_signal_connect (link, "input-unlinked", (GCallback) on_link_input_unlinked, this); - g_signal_connect (link, "output-unlinked", (GCallback) on_link_output_unlinked, this); + g_signal_connect (link, "port-unlinked", (GCallback) on_link_port_unlinked, this); g_signal_connect (link, "notify::state", (GCallback) on_link_state_notify, this); pinos_link_activate (link); @@ -771,6 +755,9 @@ pinos_daemon_find_port (PinosDaemon *daemon, for (nodes = priv->nodes; nodes; nodes = g_list_next (nodes)) { PinosNode *n = nodes->data; + if (n->flags & PINOS_NODE_FLAG_REMOVING) + continue; + g_debug ("node path \"%s\"", pinos_node_get_object_path (n)); if (have_name) { diff --git a/pinos/server/data-loop.c b/pinos/server/data-loop.c index 59fae1931..8db6cc28c 100644 --- a/pinos/server/data-loop.c +++ b/pinos/server/data-loop.c @@ -17,15 +17,20 @@ * Boston, MA 02110-1301, USA. */ +#include #include #include #include #include #include +#include +#include +#include #include #include "spa/include/spa/ringbuffer.h" +#include "pinos/client/rtkit.h" #include "pinos/server/data-loop.h" #define PINOS_DATA_LOOP_GET_PRIVATE(loop) \ @@ -60,6 +65,7 @@ struct _PinosDataLoopPrivate gboolean running; pthread_t thread; + }; G_DEFINE_TYPE (PinosDataLoop, pinos_data_loop, G_TYPE_OBJECT); @@ -74,6 +80,52 @@ enum LAST_SIGNAL }; +static void +make_realtime (PinosDataLoop *this) +{ + struct sched_param sp; + GDBusConnection *system_bus; + GError *error = NULL; + struct rlimit rl; + int r, rtprio; + long long rttime; + + rtprio = 20; + rttime = 20000; + + spa_zero (sp); + sp.sched_priority = rtprio; + + if (pthread_setschedparam (pthread_self(), SCHED_RR|SCHED_RESET_ON_FORK, &sp) == 0) { + g_debug ("SCHED_OTHER|SCHED_RESET_ON_FORK worked."); + return; + } + system_bus = g_bus_get_sync (G_BUS_TYPE_SYSTEM, NULL, NULL); + + rl.rlim_cur = rl.rlim_max = rttime; + if ((r = setrlimit (RLIMIT_RTTIME, &rl)) < 0) + g_debug ("setrlimit() failed: %s", strerror (errno)); + + if (rttime >= 0) { + r = getrlimit (RLIMIT_RTTIME, &rl); + if (r >= 0 && (long long) rl.rlim_max > rttime) { + g_debug ("Clamping rlimit-rttime to %lld for RealtimeKit", rttime); + rl.rlim_cur = rl.rlim_max = rttime; + + if ((r = setrlimit (RLIMIT_RTTIME, &rl)) < 0) + g_debug ("setrlimit() failed: %s", strerror (errno)); + } + } + + if (!pinos_rtkit_make_realtime (system_bus, 0, rtprio, &error)) { + g_debug ("could not make thread realtime: %s", error->message); + g_clear_error (&error); + } else { + g_debug ("thread made realtime"); + } + g_object_unref (system_bus); +} + static void * loop (void *user_data) { @@ -82,11 +134,14 @@ loop (void *user_data) SpaPoll *p = &this->poll; unsigned int i, j; + make_realtime (this); + g_debug ("data-loop %p: enter thread", this); while (priv->running) { SpaPollNotifyData ndata; unsigned int n_idle = 0; int r; + struct timespec ts; /* prepare */ for (i = 0; i < priv->n_poll; i++) { @@ -161,6 +216,9 @@ loop (void *user_data) continue; } +// clock_gettime (CLOCK_MONOTONIC, &ts); +// fprintf (stderr, "%llu\n", SPA_TIMESPEC_TO_TIME (&ts)); + /* after */ for (i = 0; i < priv->n_poll; i++) { SpaPollItem *p = &priv->poll[i]; @@ -284,9 +342,6 @@ do_remove_item (SpaPoll *poll, if (!in_thread) wakeup_thread (this); } - if (priv->n_poll == 0) { - stop_thread (this, in_thread); - } return SPA_RESULT_OK; } diff --git a/pinos/server/link.c b/pinos/server/link.c index cd751e911..808f2a30b 100644 --- a/pinos/server/link.c +++ b/pinos/server/link.c @@ -74,8 +74,7 @@ enum enum { SIGNAL_REMOVE, - SIGNAL_INPUT_UNLINKED, - SIGNAL_OUTPUT_UNLINKED, + SIGNAL_PORT_UNLINKED, LAST_SIGNAL }; @@ -388,10 +387,14 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state) "error get input port info: %d", res); goto error; } + spa_debug_port_info (oinfo); + spa_debug_port_info (iinfo); + in_flags = iinfo->flags; out_flags = oinfo->flags; if (out_flags & SPA_PORT_INFO_FLAG_LIVE) { + g_debug ("setting link as live"); this->output->node->live = true; this->input->node->live = true; } @@ -430,9 +433,6 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state) } else return SPA_RESULT_OK; - spa_debug_port_info (oinfo); - spa_debug_port_info (iinfo); - if (priv->buffers == NULL) { SpaAllocParamBuffers *in_alloc, *out_alloc; guint max_buffers = MAX_BUFFERS; @@ -486,7 +486,7 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state) } hdr_size += n_metas * sizeof (SpaMeta); - buf_size = hdr_size + minsize; + buf_size = SPA_ROUND_UP_N (hdr_size + minsize, 64); priv->n_buffers = max_buffers; pinos_memblock_alloc (PINOS_MEMBLOCK_FLAG_WITH_FD | @@ -543,7 +543,7 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state) d->data = NULL; } } - g_debug ("allocated %d buffers %p", priv->n_buffers, priv->buffers); + g_debug ("allocated %d buffers %p %zd", priv->n_buffers, priv->buffers, minsize); priv->allocated = TRUE; } @@ -665,11 +665,18 @@ do_start (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state) } static SpaResult -check_states (PinosLink *this, SpaResult res) +check_states (PinosLink *this, + gpointer user_data, + SpaResult res) { SpaNodeState in_state, out_state; PinosLinkPrivate *priv = this->priv; + if (res != SPA_RESULT_OK) { + g_warning ("link %p: error: %d", this, res); + return res; + } + again: if (this->input == NULL || this->output == NULL) return SPA_RESULT_OK; @@ -745,15 +752,12 @@ typedef struct { } UnlinkedData; static void -on_input_unlinked (UnlinkedData *data) +on_port_unlinked (PinosPort *port, PinosLink *this, SpaResult res, gulong id) { - g_signal_emit (data->link, signals[SIGNAL_INPUT_UNLINKED], 0, data->port); -} + g_signal_emit (this, signals[SIGNAL_PORT_UNLINKED], 0, port); -static void -on_output_unlinked (UnlinkedData *data) -{ - g_signal_emit (data->link, signals[SIGNAL_OUTPUT_UNLINKED], 0, data->port); + if (this->input == NULL || this->output == NULL) + pinos_link_update_state (this, PINOS_LINK_STATE_UNLINKED); } static void @@ -761,57 +765,34 @@ on_node_remove (PinosNode *node, PinosLink *this) { PinosLinkPrivate *priv = this->priv; SpaResult res = SPA_RESULT_OK; - UnlinkedData data; - - data.link = this; + PinosPort *port, *other; g_signal_handlers_disconnect_by_data (node, this); - if (node == this->input->node) { - if (this->input->allocated) { - priv->buffers = NULL; - priv->n_buffers = 0; - if ((res = spa_node_port_use_buffers (this->output->node->node, - SPA_DIRECTION_OUTPUT, - this->output->port, - priv->buffers, - priv->n_buffers)) < 0) { - g_warning ("link %p: failed to clear output buffers: %d", this, res); - } - } - data.port = this->input; - this->input = NULL; - pinos_main_loop_defer (priv->main_loop, - this, - res, - (PinosDeferFunc) on_input_unlinked, - g_memdup (&data, sizeof (UnlinkedData)), - g_free); - } else { - if (this->output->allocated) { - priv->buffers = NULL; - priv->n_buffers = 0; + if (this->input && node == this->input->node) { + port = this->input; + other = this->output; + } else if (this->output && node == this->output->node) { + port = this->output; + other = this->input; + } else + return; - if ((res = spa_node_port_use_buffers (this->input->node->node, - SPA_DIRECTION_INPUT, - this->input->port, - priv->buffers, - priv->n_buffers)) < 0) { - g_warning ("link %p: failed to clear input buffers: %d", this, res); - } - } - data.port = this->output; - this->output = NULL; - pinos_main_loop_defer (priv->main_loop, - this, - res, - (PinosDeferFunc) on_output_unlinked, - g_memdup (&data, sizeof (UnlinkedData)), - g_free); + if (port->allocated) { + priv->buffers = NULL; + priv->n_buffers = 0; + + g_debug ("link %p: clear input allocated buffers on port %p", link, other); + pinos_port_clear_buffers (other); } - if (this->input == NULL || this->output == NULL) - pinos_link_update_state (this, PINOS_LINK_STATE_UNLINKED); + res = pinos_port_unlink (port, this); + pinos_main_loop_defer (priv->main_loop, + port, + res, + (PinosDeferFunc) on_port_unlinked, + g_object_ref (this), + g_object_unref); } static void @@ -854,28 +835,11 @@ pinos_link_dispose (GObject * object) g_signal_emit (this, signals[SIGNAL_REMOVE], 0, NULL); - if (this->input) { - if (!this->input->allocated) { - spa_node_port_use_buffers (this->input->node->node, - SPA_DIRECTION_INPUT, - this->input->port, - NULL, 0); - this->input->buffers = NULL; - this->input->n_buffers = 0; - } - this->input = NULL; - } - if (this->output) { - if (!this->output->allocated) { - spa_node_port_use_buffers (this->output->node->node, - SPA_DIRECTION_OUTPUT, - this->output->port, - NULL, 0); - this->output->buffers = NULL; - this->output->n_buffers = 0; - } - this->output = NULL; - } + if (this->input) + pinos_port_unlink (this->input, this); + if (this->output) + pinos_port_unlink (this->output, this); + link_unregister_object (this); pinos_main_loop_defer_cancel (priv->main_loop, this, 0); @@ -894,9 +858,8 @@ pinos_link_finalize (GObject * object) g_clear_object (&priv->iface); g_free (priv->object_path); - if (priv->allocated) { + if (priv->allocated) pinos_memblock_free (&priv->buffer_mem); - } G_OBJECT_CLASS (pinos_link_parent_class)->finalize (object); } @@ -982,17 +945,7 @@ pinos_link_class_init (PinosLinkClass * klass) G_TYPE_NONE, 0, G_TYPE_NONE); - signals[SIGNAL_INPUT_UNLINKED] = g_signal_new ("input-unlinked", - G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST, - 0, - NULL, - NULL, - g_cclosure_marshal_generic, - G_TYPE_NONE, - 1, - G_TYPE_POINTER); - signals[SIGNAL_OUTPUT_UNLINKED] = g_signal_new ("output-unlinked", + signals[SIGNAL_PORT_UNLINKED] = g_signal_new ("port-unlinked", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, @@ -1067,7 +1020,7 @@ pinos_link_activate (PinosLink *this) g_return_val_if_fail (PINOS_IS_LINK (this), FALSE); spa_ringbuffer_init (&this->ringbuffer, SPA_N_ELEMENTS (this->queue)); - check_states (this, SPA_RESULT_OK); + check_states (this, NULL, SPA_RESULT_OK); return TRUE; } diff --git a/pinos/server/link.h b/pinos/server/link.h index 03b8cbd7d..49df39816 100644 --- a/pinos/server/link.h +++ b/pinos/server/link.h @@ -53,7 +53,7 @@ struct _PinosLink { PinosPort *output; PinosPort *input; - uint32_t queue[16]; + uint32_t queue[64]; SpaRingbuffer ringbuffer; gint in_ready; diff --git a/pinos/server/main-loop.c b/pinos/server/main-loop.c index daa7122cf..375abc0b2 100644 --- a/pinos/server/main-loop.c +++ b/pinos/server/main-loop.c @@ -389,11 +389,9 @@ process_work_queue (PinosMainLoop *this) for (item = priv->work.head, prev = NULL; item; prev = item, item = next) { next = item->next; - g_debug ("main-loop %p: peek work queue item %p seq %d, prev %p next %p", this, item->obj, item->seq, prev, next); if (item->seq != SPA_ID_INVALID) continue; - g_debug ("main-loop %p: process work item %p", this, item->obj); if (priv->work.tail == item) priv->work.tail = prev; if (prev == NULL) @@ -401,8 +399,10 @@ process_work_queue (PinosMainLoop *this) else prev->next = next; - if (item->func) + if (item->func) { + g_debug ("main-loop %p: process work item %p", this, item->obj); item->func (item->obj, item->data, item->res, item->id); + } if (item->notify) item->notify (item->data); @@ -484,7 +484,7 @@ pinos_main_loop_defer_cancel (PinosMainLoop *loop, priv->work_id = g_idle_add ((GSourceFunc) process_work_queue, loop); } -void +gboolean pinos_main_loop_defer_complete (PinosMainLoop *loop, gpointer obj, uint32_t seq, @@ -494,11 +494,9 @@ pinos_main_loop_defer_complete (PinosMainLoop *loop, PinosMainLoopPrivate *priv; gboolean have_work = FALSE; - g_return_if_fail (PINOS_IS_MAIN_LOOP (loop)); + g_return_val_if_fail (PINOS_IS_MAIN_LOOP (loop), FALSE); priv = loop->priv; - g_debug ("main-loop %p: async complete %d %d for object %p", loop, seq, res, obj); - for (item = priv->work.head; item; item = item->next) { if (item->obj == obj && item->seq == seq) { g_debug ("main-loop %p: found defered %d for object %p", loop, seq, obj); @@ -512,6 +510,8 @@ pinos_main_loop_defer_complete (PinosMainLoop *loop, if (priv->work_id == 0 && have_work) priv->work_id = g_idle_add ((GSourceFunc) process_work_queue, loop); + + return have_work; } GMainLoop * diff --git a/pinos/server/main-loop.h b/pinos/server/main-loop.h index b480fc83a..0b65e3ecd 100644 --- a/pinos/server/main-loop.h +++ b/pinos/server/main-loop.h @@ -85,7 +85,7 @@ gulong pinos_main_loop_defer (PinosMainLoop *loo void pinos_main_loop_defer_cancel (PinosMainLoop *loop, gpointer obj, gulong id); -void pinos_main_loop_defer_complete (PinosMainLoop *loop, +gboolean pinos_main_loop_defer_complete (PinosMainLoop *loop, gpointer obj, uint32_t seq, SpaResult res); diff --git a/pinos/server/node.c b/pinos/server/node.c index 981a22718..ae24726e0 100644 --- a/pinos/server/node.c +++ b/pinos/server/node.c @@ -53,18 +53,6 @@ free_node_port (PinosPort *np) g_slice_free (PinosPort, np); } -static PinosPort * -find_node_port (GList *ports, PinosNode *node, uint32_t port) -{ - GList *walk; - for (walk = ports; walk; walk = g_list_next (walk)) { - PinosPort *np = walk->data; - if (np->node == node && np->port == port) - return np; - } - return NULL; -} - struct _PinosNodePrivate { PinosDaemon *daemon; @@ -74,6 +62,8 @@ struct _PinosNodePrivate gchar *object_path; gchar *name; + uint32_t seq; + gboolean async_init; unsigned int max_input_ports; unsigned int max_output_ports; @@ -340,14 +330,12 @@ do_read_link (SpaPoll *poll, size_t offset; SpaResult res; - if (spa_ringbuffer_get_read_offset (&link->ringbuffer, &offset) > 0) { + if (link->input == NULL) + return SPA_RESULT_OK; + + while (link->in_ready > 0 && spa_ringbuffer_get_read_offset (&link->ringbuffer, &offset) > 0) { SpaPortInputInfo iinfo[1]; - if (link->in_ready <= 0 || link->input == NULL) - return SPA_RESULT_OK; - - link->in_ready--; - iinfo[0].port_id = link->input->port; iinfo[0].buffer_id = link->queue[offset]; iinfo[0].flags = SPA_PORT_INPUT_FLAG_NONE; @@ -356,6 +344,7 @@ do_read_link (SpaPoll *poll, g_warning ("node %p: error pushing buffer: %d, %d", this, res, iinfo[0].status); spa_ringbuffer_read_advance (&link->ringbuffer, 1); + link->in_ready--; } return SPA_RESULT_OK; } @@ -379,8 +368,8 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) SpaNodeEventAsyncComplete *ac = (SpaNodeEventAsyncComplete *) event; g_debug ("node %p: async complete event %d %d", this, ac->seq, ac->res); - pinos_main_loop_defer_complete (priv->main_loop, this, ac->seq, ac->res); - g_signal_emit (this, signals[SIGNAL_ASYNC_COMPLETE], 0, ac->seq, ac->res); + if (!pinos_main_loop_defer_complete (priv->main_loop, this, ac->seq, ac->res)) + g_signal_emit (this, signals[SIGNAL_ASYNC_COMPLETE], 0, ac->seq, ac->res); break; } @@ -410,8 +399,8 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) SpaNodeEventHaveOutput *ho = (SpaNodeEventHaveOutput *) event; SpaPortOutputInfo oinfo[1] = { 0, }; SpaResult res; - gboolean pushed = FALSE; guint i; + gboolean pushed = FALSE; oinfo[0].port_id = ho->port_id; @@ -422,9 +411,12 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) for (i = 0; i < priv->rt.links->len; i++) { PinosLink *link = g_ptr_array_index (priv->rt.links, i); + PinosPort *output = link->output; + PinosPort *input = link->input; size_t offset; - if (link->output == NULL || link->output->port != ho->port_id) + if (output == NULL || input == NULL || + output->node->node != node || output->port != ho->port_id) continue; if (spa_ringbuffer_get_write_offset (&link->ringbuffer, &offset) > 0) { @@ -441,7 +433,6 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) } } if (!pushed) { - g_debug ("node %p: discarded buffer %u", this, oinfo[0].buffer_id); if ((res = spa_node_port_reuse_buffer (node, oinfo[0].port_id, oinfo[0].buffer_id)) < 0) g_warning ("node %p: error reuse buffer: %d", node, res); } @@ -456,7 +447,7 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) for (i = 0; i < priv->rt.links->len; i++) { PinosLink *link = g_ptr_array_index (priv->rt.links, i); - if (link->input == NULL || link->input->port != rb->port_id) + if (link->input == NULL || link->input->port != rb->port_id || link->output == NULL) continue; if ((res = spa_node_port_reuse_buffer (link->output->node->node, @@ -1015,7 +1006,11 @@ pinos_node_remove (PinosNode *node) { g_return_if_fail (PINOS_IS_NODE (node)); + if (node->flags & PINOS_NODE_FLAG_REMOVING) + return; + g_debug ("node %p: remove", node); + node->flags |= PINOS_NODE_FLAG_REMOVING; g_signal_emit (node, signals[SIGNAL_REMOVE], 0, NULL); } @@ -1071,64 +1066,6 @@ pinos_node_get_free_port (PinosNode *node, } -static SpaResult -do_remove_link (SpaPoll *poll, - bool async, - uint32_t seq, - size_t size, - void *data, - void *user_data) -{ - PinosNode *this = user_data; - PinosNodePrivate *priv = this->priv; - PinosLink *link = ((PinosLink**)data)[0]; - - g_ptr_array_remove_fast (priv->rt.links, link); - - return SPA_RESULT_OK; -} - -static void -on_remove_link (PinosLink *link, PinosNode *node) -{ - PinosPort *p; - PinosNode *n; - - if (link->output) { - n = link->output->node; - if ((p = find_node_port (n->priv->output_ports, n, link->output->port))) - if (g_ptr_array_remove_fast (p->links, link)) - n->priv->n_used_output_links--; - - spa_poll_invoke (&n->priv->data_loop->poll, - do_remove_link, - SPA_ID_INVALID, - sizeof (PinosLink *), - &link, - n); - - if (n->priv->n_used_output_links == 0 && - n->priv->n_used_input_links == 0) - pinos_node_report_idle (n); - } - if (link->input->node) { - n = link->input->node; - if ((p = find_node_port (n->priv->input_ports, n, link->input->port))) - if (g_ptr_array_remove_fast (p->links, link)) - n->priv->n_used_input_links--; - - spa_poll_invoke (&n->priv->data_loop->poll, - do_remove_link, - SPA_ID_INVALID, - sizeof (PinosLink *), - &link, - n); - - if (n->priv->n_used_output_links == 0 && - n->priv->n_used_input_links == 0) - pinos_node_report_idle (n); - } -} static SpaResult do_add_link (SpaPoll *poll, @@ -1147,67 +1084,79 @@ do_add_link (SpaPoll *poll, return SPA_RESULT_OK; } +static PinosLink * +find_link (PinosPort *output_port, PinosPort *input_port) +{ + guint i; + + for (i = 0; i < output_port->links->len; i++) { + PinosLink *pl = g_ptr_array_index (output_port->links, i); + if (pl->input == input_port) { + return pl; + } + } + return NULL; +} + +PinosLink * +pinos_port_get_link (PinosPort *output_port, + PinosPort *input_port) +{ + g_return_val_if_fail (output_port != NULL, NULL); + g_return_val_if_fail (input_port != NULL, NULL); + + return find_link (output_port, input_port); +} + /** - * pinos_node_link: - * @output_node: a #PinosNode + * pinos_port_link: * @output_port: an output port * @input_port: an input port * @format_filter: a format filter * @properties: extra properties * @error: an error or %NULL * - * Make a link between @output_port and @input_port with the given ids + * Make a link between @output_port and @input_port * * If the ports were already linked, the existing links will be returned. * - * If the output id was linked to a different input node or id, it - * will be relinked. - * * Returns: a new #PinosLink or %NULL and @error is set. */ PinosLink * -pinos_node_link (PinosNode *output_node, - PinosPort *output_port, +pinos_port_link (PinosPort *output_port, PinosPort *input_port, GPtrArray *format_filter, PinosProperties *properties, GError **error) { PinosNodePrivate *priv; - PinosNode *input_node; - PinosLink *link = NULL; - guint i; + PinosNode *input_node, *output_node; + PinosLink *link; - g_return_val_if_fail (PINOS_IS_NODE (output_node), NULL); g_return_val_if_fail (output_port != NULL, NULL); g_return_val_if_fail (input_port != NULL, NULL); + output_node = output_port->node; priv = output_node->priv; input_node = input_port->node; - g_debug ("node %p: link %u %p:%u", output_node, output_port->port, input_node, input_port->port); + g_debug ("port link %p:%u -> %p:%u", output_node, output_port->port, input_node, input_port->port); if (output_node == input_node) goto same_node; - for (i = 0; i < output_port->links->len; i++) { - PinosLink *pl = g_ptr_array_index (output_port->links, i); - if (pl->input->node == input_node && pl->input == input_port) { - link = pl; - break; - } - } + if (input_port->links->len > 0) + goto was_linked; + + link = find_link (output_port, input_port); if (link) { - /* FIXME */ - link->input->node = input_node; - link->input = input_port; g_object_ref (link); } else { input_node->live = output_node->live; if (output_node->clock) input_node->clock = output_node->clock; - g_debug ("node %p: clock %p", output_node, output_node->clock); + g_debug ("node %p: clock %p, live %d", output_node, output_node->clock, output_node->live); link = g_object_new (PINOS_TYPE_LINK, "daemon", priv->daemon, @@ -1217,11 +1166,6 @@ pinos_node_link (PinosNode *output_node, "properties", properties, NULL); - g_signal_connect (link, - "remove", - (GCallback) on_remove_link, - output_node); - g_ptr_array_add (output_port->links, link); g_ptr_array_add (input_port->links, link); @@ -1251,6 +1195,187 @@ same_node: "can't link a node to itself"); return NULL; } +was_linked: + { + g_set_error (error, + PINOS_ERROR, + PINOS_ERROR_NODE_LINK, + "input port was already linked"); + return NULL; + } +} + +static SpaResult +pinos_port_pause (PinosPort *port) +{ + SpaNodeCommand cmd; + + cmd.type = SPA_NODE_COMMAND_PAUSE; + cmd.size = sizeof (cmd); + return spa_node_port_send_command (port->node->node, + port->direction, + port->port, + &cmd); +} + +static SpaResult +do_remove_link_done (SpaPoll *poll, + bool async, + uint32_t seq, + size_t size, + void *data, + void *user_data) +{ + PinosPort *port = user_data; + PinosNode *this = port->node; + PinosNodePrivate *priv = this->priv; + PinosLink *link = ((PinosLink**)data)[0]; + + g_debug ("port %p: finish unlink", port); + if (port->direction == PINOS_DIRECTION_OUTPUT) { + if (g_ptr_array_remove_fast (port->links, link)) + priv->n_used_output_links--; + link->output = NULL; + } else { + if (g_ptr_array_remove_fast (port->links, link)) + priv->n_used_input_links--; + link->input = NULL; + } + + if (priv->n_used_output_links == 0 && + priv->n_used_input_links == 0) { + pinos_node_report_idle (this); + } + + if (!port->allocated) { + g_debug ("port %p: clear buffers on port", port); + spa_node_port_use_buffers (port->node->node, + port->direction, + port->port, + NULL, 0); + port->buffers = NULL; + port->n_buffers = 0; + } + + pinos_main_loop_defer_complete (priv->main_loop, + port, + seq, + SPA_RESULT_OK); + g_object_unref (link); + g_object_unref (port->node); + + return SPA_RESULT_OK; +} + +static SpaResult +do_remove_link (SpaPoll *poll, + bool async, + uint32_t seq, + size_t size, + void *data, + void *user_data) +{ + PinosPort *port = user_data; + PinosNode *this = port->node; + PinosNodePrivate *priv = this->priv; + PinosLink *link = ((PinosLink**)data)[0]; + SpaResult res; + + pinos_port_pause (port); + + g_ptr_array_remove_fast (priv->rt.links, link); + + res = spa_poll_invoke (&priv->main_loop->poll, + do_remove_link_done, + seq, + sizeof (PinosLink *), + &link, + port); + return res; +} + +SpaResult +pinos_port_unlink (PinosPort *port, PinosLink *link) +{ + SpaResult res; + + g_debug ("port %p: start unlink %p", port, link); + + g_object_ref (link); + g_object_ref (port->node); + res = spa_poll_invoke (&port->node->priv->data_loop->poll, + do_remove_link, + port->node->priv->seq++, + sizeof (PinosLink *), + &link, + port); + return res; +} + +static SpaResult +do_clear_buffers_done (SpaPoll *poll, + bool async, + uint32_t seq, + size_t size, + void *data, + void *user_data) +{ + PinosPort *port = user_data; + PinosNode *this = port->node; + PinosNodePrivate *priv = this->priv; + SpaResult res; + + g_debug ("port %p: clear buffers finish", port); + + res = spa_node_port_use_buffers (port->node->node, + port->direction, + port->port, + NULL, 0); + port->buffers = NULL; + port->n_buffers = 0; + + pinos_main_loop_defer_complete (priv->main_loop, + port, + seq, + res); + return res; +} + +static SpaResult +do_clear_buffers (SpaPoll *poll, + bool async, + uint32_t seq, + size_t size, + void *data, + void *user_data) +{ + PinosPort *port = user_data; + PinosNode *this = port->node; + PinosNodePrivate *priv = this->priv; + SpaResult res; + + pinos_port_pause (port); + + res = spa_poll_invoke (&priv->main_loop->poll, + do_clear_buffers_done, + seq, + 0, NULL, + port); + return res; +} + +SpaResult +pinos_port_clear_buffers (PinosPort *port) +{ + SpaResult res; + + g_debug ("port %p: clear buffers", port); + res = spa_poll_invoke (&port->node->priv->data_loop->poll, + do_clear_buffers, + port->node->priv->seq++, + 0, NULL, + port); + return res; } /** diff --git a/pinos/server/node.h b/pinos/server/node.h index 7632c76a3..f4f95a6ad 100644 --- a/pinos/server/node.h +++ b/pinos/server/node.h @@ -29,6 +29,12 @@ typedef struct _PinosNode PinosNode; typedef struct _PinosNodeClass PinosNodeClass; typedef struct _PinosNodePrivate PinosNodePrivate; + +typedef enum { + PINOS_NODE_FLAG_NONE = 0, + PINOS_NODE_FLAG_REMOVING = (1 << 0), +} PinosNodeFlags; + #include #include @@ -65,6 +71,8 @@ struct _PinosPort { struct _PinosNode { GObject object; + PinosNodeFlags flags; + SpaNode *node; bool live; @@ -101,12 +109,6 @@ const gchar * pinos_node_get_object_path (PinosNode *node); PinosPort * pinos_node_get_free_port (PinosNode *node, PinosDirection direction); -PinosLink * pinos_node_link (PinosNode *output_node, - PinosPort *output_port, - PinosPort *input_port, - GPtrArray *format_filter, - PinosProperties *properties, - GError **error); GList * pinos_node_get_ports (PinosNode *node, PinosDirection direction); @@ -118,6 +120,17 @@ void pinos_node_report_error (PinosNode *node, GError void pinos_node_report_idle (PinosNode *node); void pinos_node_report_busy (PinosNode *node); +PinosLink * pinos_port_link (PinosPort *output_port, + PinosPort *input_port, + GPtrArray *format_filter, + PinosProperties *properties, + GError **error); +SpaResult pinos_port_unlink (PinosPort *port, + PinosLink *link); + +SpaResult pinos_port_clear_buffers (PinosPort *port); + + G_END_DECLS #endif /* __PINOS_NODE_H__ */ diff --git a/spa/include/spa/defs.h b/spa/include/spa/defs.h index ca56dc9b6..3dcc30167 100644 --- a/spa/include/spa/defs.h +++ b/spa/include/spa/defs.h @@ -116,6 +116,8 @@ typedef void (*SpaNotify) (void *data); # define SPA_PRINTF_FUNC(fmt, arg1) #endif +#define SPA_ROUND_UP_N(num,align) ((((num) + ((align) - 1)) & ~((align) - 1))) + #ifndef SPA_LIKELY #ifdef __GNUC__ #define SPA_LIKELY(x) (__builtin_expect(!!(x),1)) @@ -126,7 +128,6 @@ typedef void (*SpaNotify) (void *data); #endif #endif - #define spa_return_if_fail(expr) \ do { \ if (SPA_UNLIKELY (!(expr))) \ @@ -150,6 +151,8 @@ typedef void (*SpaNotify) (void *data); /* Does exactly nothing */ #define spa_nop() do {} while (false) +#define spa_memzero(x,l) (memset((x), 0, (l))) +#define spa_zero(x) (spa_memzero(&(x), sizeof(x))) #ifdef __cplusplus } /* extern "C" */ diff --git a/spa/include/spa/log.h b/spa/include/spa/log.h index 8bd3f726a..01c8f953a 100644 --- a/spa/include/spa/log.h +++ b/spa/include/spa/log.h @@ -111,8 +111,8 @@ struct _SpaLog { #if __STDC_VERSION__ >= 199901L -#define spa_log_log(l,lev,...) \ - if ((l) && (l)->level >= lev) \ +#define spa_log_log(l,lev,...) \ + if (SPA_UNLIKELY (spa_log_level_enabled (l, lev))) \ (l)->log((l),lev,__VA_ARGS__) #define spa_log_error(l,...) spa_log_log(l,SPA_LOG_LEVEL_ERROR,__FILE__,__LINE__,__func__,__VA_ARGS__) @@ -126,7 +126,7 @@ struct _SpaLog { #define SPA_LOG_FUNC(name,lev) \ static inline void spa_log_##name (SpaLog *l, const char *format, ...) \ { \ - if (spa_log_level_enabled (l, lev)) { \ + if (SPA_UNLIKELY (spa_log_level_enabled (l, lev))) { \ va_list varargs; \ va_start (varargs, format); \ (l)->logv((l),lev,__FILE__,__LINE__,__func__,format,varargs); \ diff --git a/spa/include/spa/queue.h b/spa/include/spa/queue.h index 7bdd72256..ca504160d 100644 --- a/spa/include/spa/queue.h +++ b/spa/include/spa/queue.h @@ -52,6 +52,9 @@ struct _SpaQueue { (q)->length++; \ } while (0); +#define SPA_QUEUE_PEEK_HEAD(q,t,i) \ + ((i) = (t*)((q)->head)); + #define SPA_QUEUE_POP_HEAD(q,t,next,i) \ do { \ if (((i) = (t*)((q)->head)) == NULL) \ diff --git a/spa/include/spa/ringbuffer.h b/spa/include/spa/ringbuffer.h index ef5743eff..045700a94 100644 --- a/spa/include/spa/ringbuffer.h +++ b/spa/include/spa/ringbuffer.h @@ -67,7 +67,7 @@ static inline SpaResult spa_ringbuffer_init (SpaRingbuffer *rbuf, size_t size) { - if ((size & (size - 1)) != 0) + if (SPA_UNLIKELY ((size & (size - 1)) != 0)) return SPA_RESULT_ERROR; rbuf->size = size; @@ -127,7 +127,7 @@ spa_ringbuffer_get_read_areas (SpaRingbuffer *rbuf, areas[0].offset = r; areas[1].offset = 0; - if (end > rbuf->size) { + if (SPA_UNLIKELY (end > rbuf->size)) { areas[0].len = rbuf->size - r; areas[1].len = end - rbuf->size; } else { @@ -185,7 +185,7 @@ spa_ringbuffer_get_write_areas (SpaRingbuffer *rbuf, areas[0].offset = w; areas[1].offset = 0; - if (end > rbuf->size) { + if (SPA_UNLIKELY (end > rbuf->size)) { areas[0].len = rbuf->size - w; areas[1].len = end - rbuf->size; } else { diff --git a/spa/plugins/alsa/alsa-monitor.c b/spa/plugins/alsa/alsa-monitor.c index ab961b07f..37ecf2f33 100644 --- a/spa/plugins/alsa/alsa-monitor.c +++ b/spa/plugins/alsa/alsa-monitor.c @@ -25,6 +25,7 @@ #include #include +#include #include #include @@ -32,6 +33,7 @@ #include #include +extern const SpaHandleFactory spa_alsa_sink_factory; extern const SpaHandleFactory spa_alsa_source_factory; typedef struct _SpaALSAMonitor SpaALSAMonitor; @@ -98,11 +100,16 @@ path_get_card_id (const char *path) return e + 5; } +#define CHECK(s,msg) if ((err = (s)) < 0) { spa_log_error (state->log, msg ": %s\n", snd_strerror(err)); return err; } + static int -fill_item (ALSAItem *item, struct udev_device *udevice) +fill_item (SpaALSAMonitor *this, ALSAItem *item, struct udev_device *udevice) { + int err; unsigned int i; const char *str; + snd_pcm_t *hndl; + char device[64]; if (item->udevice) udev_device_unref (item->udevice); @@ -120,12 +127,39 @@ fill_item (ALSAItem *item, struct udev_device *udevice) if ((str = path_get_card_id (udev_device_get_property_value (udevice, "DEVPATH"))) == NULL) return -1; + snprintf (device, 63, "hw:%s", str); + + if ((err = snd_pcm_open (&hndl, + device, + SND_PCM_STREAM_PLAYBACK, + SND_PCM_NONBLOCK | + SND_PCM_NO_AUTO_RESAMPLE | + SND_PCM_NO_AUTO_CHANNELS | + SND_PCM_NO_AUTO_FORMAT)) < 0) { + spa_log_error (this->log, "PLAYBACK open failed: %s\n", snd_strerror(err)); + if ((err = snd_pcm_open (&hndl, + device, + SND_PCM_STREAM_CAPTURE, + SND_PCM_NONBLOCK | + SND_PCM_NO_AUTO_RESAMPLE | + SND_PCM_NO_AUTO_CHANNELS | + SND_PCM_NO_AUTO_FORMAT)) < 0) { + spa_log_error (this->log, "CAPTURE open failed: %s\n", snd_strerror(err)); + return -1; + } else { + item->item.factory = &spa_alsa_source_factory; + snd_pcm_close (hndl); + } + } else { + item->item.factory = &spa_alsa_sink_factory; + snd_pcm_close (hndl); + } + item->item.id = udev_device_get_syspath (item->udevice); item->item.flags = 0; item->item.state = SPA_MONITOR_ITEM_STATE_AVAILABLE; item->item.klass = "Audio/Device"; item->item.info = &item->info; - item->item.factory = &spa_alsa_source_factory; item->info.items = item->info_items; i = 0; @@ -219,7 +253,7 @@ alsa_on_fd_events (SpaPollNotifyData *data) SpaMonitorItem *item; dev = udev_monitor_receive_device (this->umonitor); - if (fill_item (&this->uitem, dev) < 0) + if (fill_item (this, &this->uitem, dev) < 0) return 0; if ((str = udev_device_get_action (dev)) == NULL) @@ -325,7 +359,7 @@ again: } if (*state == (void*)1) { - fill_item (&this->uitem, NULL); + fill_item (this, &this->uitem, NULL); return SPA_RESULT_ENUM_END; } @@ -335,7 +369,7 @@ again: if ((*state = udev_list_entry_get_next (devices)) == NULL) *state = (void*)1; - if (fill_item (&this->uitem, dev) < 0) + if (fill_item (this, &this->uitem, dev) < 0) goto again; *item = &this->uitem.item; diff --git a/spa/plugins/alsa/alsa-sink.c b/spa/plugins/alsa/alsa-sink.c index 4274e4447..acc4a12d9 100644 --- a/spa/plugins/alsa/alsa-sink.c +++ b/spa/plugins/alsa/alsa-sink.c @@ -32,7 +32,7 @@ typedef struct _SpaALSAState SpaALSASink; static const char default_device[] = "default"; static const uint32_t default_buffer_time = 10000; -static const uint32_t default_period_time = 5000; +static const uint32_t default_period_time = 1000; static const bool default_period_event = 0; static void @@ -47,6 +47,7 @@ reset_alsa_sink_props (SpaALSAProps *props) static void update_state (SpaALSASink *this, SpaNodeState state) { + spa_log_info (this->log, "update state %d\n", state); this->node.state = state; } @@ -149,6 +150,62 @@ spa_alsa_sink_node_set_props (SpaNode *node, return res; } +static SpaResult +do_send_event (SpaPoll *poll, + bool async, + uint32_t seq, + size_t size, + void *data, + void *user_data) +{ + SpaALSASink *this = user_data; + + this->event_cb (&this->node, data, this->user_data); + + return SPA_RESULT_OK; +} + +static SpaResult +do_command (SpaPoll *poll, + bool async, + uint32_t seq, + size_t size, + void *data, + void *user_data) +{ + SpaALSASink *this = user_data; + SpaResult res; + SpaNodeCommand *cmd = data; + SpaNodeEventAsyncComplete ac; + + switch (cmd->type) { + case SPA_NODE_COMMAND_START: + case SPA_NODE_COMMAND_PAUSE: + res = spa_node_port_send_command (&this->node, + SPA_DIRECTION_INPUT, + 0, + cmd); + break; + default: + res = SPA_RESULT_NOT_IMPLEMENTED; + break; + } + + if (async) { + ac.event.type = SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE; + ac.event.size = sizeof (SpaNodeEventAsyncComplete); + ac.seq = seq; + ac.res = res; + spa_poll_invoke (this->main_loop, + do_send_event, + SPA_ID_INVALID, + sizeof (ac), + &ac, + this); + } + return res; +} + static SpaResult spa_alsa_sink_node_send_command (SpaNode *node, SpaNodeCommand *command) @@ -165,15 +222,22 @@ spa_alsa_sink_node_send_command (SpaNode *node, return SPA_RESULT_INVALID_COMMAND; case SPA_NODE_COMMAND_START: - spa_alsa_start (this); - - update_state (this, SPA_NODE_STATE_STREAMING); - break; case SPA_NODE_COMMAND_PAUSE: - spa_alsa_pause (this); + { + if (!this->have_format) + return SPA_RESULT_NO_FORMAT; - update_state (this, SPA_NODE_STATE_PAUSED); - break; + if (this->n_buffers == 0) + return SPA_RESULT_NO_BUFFERS; + + return spa_poll_invoke (this->data_loop, + do_command, + ++this->seq, + command->size, + command, + this); + + } case SPA_NODE_COMMAND_FLUSH: case SPA_NODE_COMMAND_DRAIN: case SPA_NODE_COMMAND_MARKER: @@ -297,6 +361,16 @@ spa_alsa_sink_node_port_enum_formats (SpaNode *node, return SPA_RESULT_OK; } +static SpaResult +spa_alsa_clear_buffers (SpaALSASink *this) +{ + if (this->n_buffers > 0) { + SPA_QUEUE_INIT (&this->ready); + this->n_buffers = 0; + } + return SPA_RESULT_OK; +} + static SpaResult spa_alsa_sink_node_port_set_format (SpaNode *node, SpaDirection direction, @@ -316,8 +390,11 @@ spa_alsa_sink_node_port_set_format (SpaNode *node, return SPA_RESULT_INVALID_PORT; if (format == NULL) { + spa_log_info (this->log, "clear format\n"); + spa_alsa_pause (this, false); + spa_alsa_clear_buffers (this); + spa_alsa_close (this); this->have_format = false; - this->n_buffers = 0; update_state (this, SPA_NODE_STATE_CONFIGURE); return SPA_RESULT_OK; } @@ -325,22 +402,27 @@ spa_alsa_sink_node_port_set_format (SpaNode *node, if ((res = spa_format_audio_parse (format, &this->current_format)) < 0) return res; - if (spa_alsa_set_format (this, &this->current_format, false) < 0) + if (spa_alsa_set_format (this, &this->current_format, flags) < 0) return SPA_RESULT_ERROR; - this->info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS; - this->info.maxbuffering = -1; - this->info.latency = -1; - this->info.n_params = 1; + this->info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS | + SPA_PORT_INFO_FLAG_LIVE; + this->info.maxbuffering = this->buffer_frames * this->frame_size; + this->info.latency = (this->period_frames * SPA_NSEC_PER_SEC) / this->rate; + this->info.n_params = 2; this->info.params = this->params; this->params[0] = &this->param_buffers.param; this->param_buffers.param.type = SPA_ALLOC_PARAM_TYPE_BUFFERS; this->param_buffers.param.size = sizeof (this->param_buffers); - this->param_buffers.minsize = 1; + this->param_buffers.minsize = this->period_frames * this->frame_size; this->param_buffers.stride = 0; this->param_buffers.min_buffers = 1; - this->param_buffers.max_buffers = 8; + this->param_buffers.max_buffers = 32; this->param_buffers.align = 16; + this->params[1] = &this->param_meta.param; + this->param_meta.param.type = SPA_ALLOC_PARAM_TYPE_META_ENABLE; + this->param_meta.param.size = sizeof (this->param_meta); + this->param_meta.type = SPA_META_TYPE_HEADER; this->info.extra = NULL; this->have_format = true; @@ -419,7 +501,54 @@ spa_alsa_sink_node_port_use_buffers (SpaNode *node, SpaBuffer **buffers, uint32_t n_buffers) { - return SPA_RESULT_NOT_IMPLEMENTED; + SpaALSASink *this; + int i; + + if (node == NULL) + return SPA_RESULT_INVALID_ARGUMENTS; + + if (!CHECK_PORT (this, direction, port_id)) + return SPA_RESULT_INVALID_PORT; + + this = SPA_CONTAINER_OF (node, SpaALSASink, node); + + spa_log_info (this->log, "use buffers %d\n", n_buffers); + + if (!this->have_format) + return SPA_RESULT_NO_FORMAT; + + if (n_buffers == 0) { + spa_alsa_pause (this, false); + spa_alsa_clear_buffers (this); + update_state (this, SPA_NODE_STATE_READY); + return SPA_RESULT_OK; + } + + for (i = 0; i < n_buffers; i++) { + SpaALSABuffer *b = &this->buffers[i]; + b->outbuf = buffers[i]; + b->outstanding = true; + + b->h = spa_buffer_find_meta (b->outbuf, SPA_META_TYPE_HEADER); + + switch (buffers[i]->datas[0].type) { + case SPA_DATA_TYPE_MEMFD: + case SPA_DATA_TYPE_DMABUF: + case SPA_DATA_TYPE_MEMPTR: + if (buffers[i]->datas[0].data == NULL) { + spa_log_error (this->log, "alsa-source: need mapped memory\n"); + continue; + } + break; + default: + break; + } + } + this->n_buffers = n_buffers; + + update_state (this, SPA_NODE_STATE_PAUSED); + + return SPA_RESULT_OK; } static SpaResult @@ -475,7 +604,7 @@ spa_alsa_sink_node_port_push_input (SpaNode *node, { SpaALSASink *this; unsigned int i; - bool have_error = false, have_enough = false; + bool have_error = false; if (node == NULL || n_info == 0 || info == NULL) return SPA_RESULT_INVALID_ARGUMENTS; @@ -483,32 +612,38 @@ spa_alsa_sink_node_port_push_input (SpaNode *node, this = SPA_CONTAINER_OF (node, SpaALSASink, node); for (i = 0; i < n_info; i++) { + SpaALSABuffer *b; + if (info[i].port_id != 0) { info[i].status = SPA_RESULT_INVALID_PORT; have_error = true; continue; } - - if (info[i].buffer_id != SPA_ID_INVALID) { - if (!this->have_format) { - info[i].status = SPA_RESULT_NO_FORMAT; - have_error = true; - continue; - } - - if (this->ready.length != 0) { - info[i].status = SPA_RESULT_HAVE_ENOUGH_INPUT; - have_enough = true; - continue; - } - SPA_QUEUE_PUSH_TAIL (&this->ready, SpaALSABuffer, next, &this->buffers[info[i].buffer_id]); + if (!this->have_format) { + info[i].status = SPA_RESULT_NO_FORMAT; + have_error = true; + continue; } + + if (info[i].buffer_id >= this->n_buffers) { + info[i].status = SPA_RESULT_INVALID_BUFFER_ID; + have_error = true; + continue; + } + + b = &this->buffers[info[i].buffer_id]; + if (!b->outstanding) { + info[i].status = SPA_RESULT_INVALID_BUFFER_ID; + have_error = true; + continue; + } + b->outstanding = false; + b->next = NULL; + SPA_QUEUE_PUSH_TAIL (&this->ready, SpaALSABuffer, next, b); info[i].status = SPA_RESULT_OK; } if (have_error) return SPA_RESULT_ERROR; - if (have_enough) - return SPA_RESULT_HAVE_ENOUGH_INPUT; return SPA_RESULT_OK; } @@ -535,7 +670,37 @@ spa_alsa_sink_node_port_send_command (SpaNode *node, uint32_t port_id, SpaNodeCommand *command) { - return SPA_RESULT_NOT_IMPLEMENTED; + SpaALSASink *this; + SpaResult res; + + if (node == NULL) + return SPA_RESULT_INVALID_ARGUMENTS; + + this = SPA_CONTAINER_OF (node, SpaALSASink, node); + + if (port_id != 0) + return SPA_RESULT_INVALID_PORT; + + switch (command->type) { + case SPA_NODE_COMMAND_PAUSE: + { + if (SPA_RESULT_IS_OK (res = spa_alsa_pause (this, false))) { + update_state (this, SPA_NODE_STATE_PAUSED); + } + break; + } + case SPA_NODE_COMMAND_START: + { + if (SPA_RESULT_IS_OK (res = spa_alsa_start (this, false))) { + update_state (this, SPA_NODE_STATE_STREAMING); + } + break; + } + default: + res = SPA_RESULT_NOT_IMPLEMENTED; + break; + } + return res; } @@ -615,6 +780,10 @@ alsa_sink_init (const SpaHandleFactory *factory, this->map = support[i].data; else if (strcmp (support[i].uri, SPA_LOG_URI) == 0) this->log = support[i].data; + else if (strcmp (support[i].uri, SPA_POLL__DataLoop) == 0) + this->data_loop = support[i].data; + else if (strcmp (support[i].uri, SPA_POLL__MainLoop) == 0) + this->main_loop = support[i].data; } if (this->map == NULL) { spa_log_error (this->log, "an id-map is needed"); @@ -628,7 +797,14 @@ alsa_sink_init (const SpaHandleFactory *factory, this->stream = SND_PCM_STREAM_PLAYBACK; reset_alsa_sink_props (&this->props[1]); - this->status.flags = SPA_PORT_STATUS_FLAG_NEED_INPUT; + this->status.flags = SPA_PORT_STATUS_FLAG_NONE; + + for (i = 0; info && i < info->n_items; i++) { + if (!strcmp (info->items[i].key, "alsa.card")) { + snprintf (this->props[1].device, 63, "hw:%s", info->items[i].value); + this->props[1].props.unset_mask &= ~1; + } + } update_state (this, SPA_NODE_STATE_CONFIGURE); diff --git a/spa/plugins/alsa/alsa-source.c b/spa/plugins/alsa/alsa-source.c index 7fdc025b9..c37213f61 100644 --- a/spa/plugins/alsa/alsa-source.c +++ b/spa/plugins/alsa/alsa-source.c @@ -38,8 +38,8 @@ update_state (SpaALSASource *this, SpaNodeState state) } static const char default_device[] = "hw:0"; -static const uint32_t default_buffer_time = 1000; -static const uint32_t default_period_time = 100; +static const uint32_t default_buffer_time = 10000; +static const uint32_t default_period_time = 1000; static const bool default_period_event = 0; static void @@ -178,7 +178,7 @@ do_start (SpaPoll *poll, SpaResult res; SpaNodeEventAsyncComplete ac; - if (SPA_RESULT_IS_OK (res = spa_alsa_start (this))) { + if (SPA_RESULT_IS_OK (res = spa_alsa_start (this, false))) { update_state (this, SPA_NODE_STATE_STREAMING); } @@ -209,7 +209,7 @@ do_pause (SpaPoll *poll, SpaResult res; SpaNodeEventAsyncComplete ac; - if (SPA_RESULT_IS_OK (res = spa_alsa_pause (this))) { + if (SPA_RESULT_IS_OK (res = spa_alsa_pause (this, false))) { update_state (this, SPA_NODE_STATE_PAUSED); } @@ -441,7 +441,7 @@ spa_alsa_source_node_port_set_format (SpaNode *node, return SPA_RESULT_INVALID_PORT; if (format == NULL) { - spa_alsa_pause (this); + spa_alsa_pause (this, false); spa_alsa_clear_buffers (this); spa_alsa_close (this); this->have_format = false; @@ -697,6 +697,7 @@ spa_alsa_source_node_port_pull_output (SpaNode *node, info[i].buffer_id = b->outbuf->id; info[i].status = SPA_RESULT_OK; + spa_log_debug (this->log, "pull buffer %u\n", b->outbuf->id); } if (have_error) return SPA_RESULT_ERROR; @@ -726,6 +727,7 @@ spa_alsa_source_node_port_reuse_buffer (SpaNode *node, if (buffer_id >= this->n_buffers) return SPA_RESULT_INVALID_BUFFER_ID; + spa_log_debug (this->log, "recycle buffer %u\n", buffer_id); recycle_buffer (this, buffer_id); return SPA_RESULT_OK; @@ -737,7 +739,37 @@ spa_alsa_source_node_port_send_command (SpaNode *node, uint32_t port_id, SpaNodeCommand *command) { - return SPA_RESULT_NOT_IMPLEMENTED; + SpaALSASource *this; + SpaResult res; + + if (node == NULL) + return SPA_RESULT_INVALID_ARGUMENTS; + + this = SPA_CONTAINER_OF (node, SpaALSASource, node); + + if (port_id != 0) + return SPA_RESULT_INVALID_PORT; + + switch (command->type) { + case SPA_NODE_COMMAND_PAUSE: + { + if (SPA_RESULT_IS_OK (res = spa_alsa_pause (this, false))) { + update_state (this, SPA_NODE_STATE_PAUSED); + } + break; + } + case SPA_NODE_COMMAND_START: + { + if (SPA_RESULT_IS_OK (res = spa_alsa_start (this, false))) { + update_state (this, SPA_NODE_STATE_STREAMING); + } + break; + } + default: + res = SPA_RESULT_NOT_IMPLEMENTED; + break; + } + return res; } static const SpaNode alsasource_node = { diff --git a/spa/plugins/alsa/alsa-utils.c b/spa/plugins/alsa/alsa-utils.c index 1f3446e3a..d89c25847 100644 --- a/spa/plugins/alsa/alsa-utils.c +++ b/spa/plugins/alsa/alsa-utils.c @@ -33,18 +33,11 @@ spa_alsa_open (SpaALSAState *state) SND_PCM_NO_AUTO_CHANNELS | SND_PCM_NO_AUTO_FORMAT), "open failed"); - if ((state->poll.n_fds = snd_pcm_poll_descriptors_count (state->hndl)) <= 0) { - spa_log_error (state->log, "Invalid poll descriptors count %d\n", state->poll.n_fds); - return SPA_RESULT_ERROR; - } - if ((err = snd_pcm_poll_descriptors (state->hndl, (struct pollfd *)state->fds, state->poll.n_fds)) < 0) { - spa_log_error (state->log, "Unable to obtain poll descriptors for playback: %s\n", snd_strerror(err)); - return SPA_RESULT_ERROR; - } state->poll.id = 0; state->poll.enabled = false; state->poll.fds = state->fds; + state->poll.n_fds = 0; state->poll.idle_cb = NULL; state->poll.before_cb = NULL; state->poll.after_cb = alsa_on_fd_events; @@ -242,71 +235,105 @@ set_swparams (SpaALSAState *state) static int xrun_recovery (SpaALSAState *state, snd_pcm_t *hndl, int err) { - if (err == -EPIPE) { /* under-run */ - err = snd_pcm_prepare(hndl); - if (err < 0) - spa_log_error (state->log, "Can't recovery from underrun, prepare failed: %s\n", snd_strerror(err)); - return 0; - } else if (err == -ESTRPIPE) { - while ((err = snd_pcm_resume(hndl)) == -EAGAIN) - sleep(1); /* wait until the suspend flag is released */ - if (err < 0) { - err = snd_pcm_prepare(hndl); - if (err < 0) - spa_log_error (state->log, "Can't recovery from suspend, prepare failed: %s\n", snd_strerror(err)); - } - return 0; + snd_pcm_status_t *status; + + snd_pcm_status_alloca (&status); + + if ((err = snd_pcm_status (hndl, status)) < 0) { + spa_log_error (state->log, "snd_pcm_status error: %s\n", snd_strerror (err)); } + + if (snd_pcm_status_get_state (status) == SND_PCM_STATE_SUSPENDED) { + spa_log_info (state->log, "SUSPENDED, trying to resume\n"); + + if ((err = snd_pcm_prepare (hndl)) < 0) { + spa_log_error (state->log, "snd_pcm_prepare error: %s\n", snd_strerror (err)); + } + } + if (snd_pcm_status_get_state (status) == SND_PCM_STATE_XRUN) { + spa_log_info (state->log, "XRUN\n"); + } + + if (spa_alsa_pause (state, true) != SPA_RESULT_OK) + return -1; + if (spa_alsa_start (state, true) != SPA_RESULT_OK) + return -1; + return err; } -static void -pull_input (SpaALSAState *state, void *data, snd_pcm_uframes_t frames) -{ - SpaNodeEventNeedInput ni; - - ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT; - ni.event.size = sizeof (ni); - ni.port_id = 0; - state->event_cb (&state->node, &ni.event, state->user_data); -} - static int mmap_write (SpaALSAState *state) { snd_pcm_t *hndl = state->hndl; int err; - snd_pcm_sframes_t avail, commitres; + snd_pcm_sframes_t avail; snd_pcm_uframes_t offset, frames, size; const snd_pcm_channel_area_t *my_areas; + SpaNodeEventNeedInput ni; + SpaALSABuffer *b; + snd_pcm_status_t *status; - if ((avail = snd_pcm_avail_update (hndl)) < 0) { - if ((err = xrun_recovery (state, hndl, avail)) < 0) { - spa_log_error (state->log, "Write error: %s\n", snd_strerror (err)); - return -1; - } + snd_pcm_status_alloca (&status); + + if ((err = snd_pcm_status (hndl, status)) < 0) { + spa_log_error (state->log, "snd_pcm_status error: %s\n", snd_strerror (err)); + return -1; } + avail = snd_pcm_status_get_avail (status); + size = avail; while (size > 0) { frames = size; if ((err = snd_pcm_mmap_begin (hndl, &my_areas, &offset, &frames)) < 0) { - if ((err = xrun_recovery (state, hndl, err)) < 0) { - spa_log_error (state->log, "MMAP begin avail error: %s\n", snd_strerror(err)); - return -1; - } + spa_log_error (state->log, "snd_pcm_mmap_begin error: %s\n", snd_strerror(err)); + return -1; } - pull_input (state, - (uint8_t *)my_areas[0].addr + (offset * sizeof (uint16_t) * 2), - frames); + ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT; + ni.event.size = sizeof (ni); + ni.port_id = 0; + state->event_cb (&state->node, &ni.event, state->user_data); - commitres = snd_pcm_mmap_commit (hndl, offset, frames); - if (commitres < 0 || (snd_pcm_uframes_t)commitres != frames) { - if ((err = xrun_recovery (state, hndl, commitres >= 0 ? -EPIPE : commitres)) < 0) { - spa_log_error (state->log, "MMAP commit error: %s\n", snd_strerror(err)); - return -1; + SPA_QUEUE_PEEK_HEAD (&state->ready, SpaALSABuffer, b); + + if (b) { + uint8_t *src; + size_t n_bytes; + + src = SPA_MEMBER (b->outbuf->datas[0].data, b->outbuf->datas[0].offset + state->ready_offset, void); + n_bytes = SPA_MIN (b->outbuf->datas[0].size - state->ready_offset, frames * state->frame_size); + frames = SPA_MIN (frames, n_bytes / state->frame_size); + + memcpy ((uint8_t *)my_areas[0].addr + (offset * state->frame_size), + src, + n_bytes); + + state->ready_offset += n_bytes; + if (state->ready_offset >= b->outbuf->datas[0].size) { + SpaNodeEventReuseBuffer rb; + + SPA_QUEUE_POP_HEAD (&state->ready, SpaALSABuffer, next, b); + b->outstanding = true; + + rb.event.type = SPA_NODE_EVENT_TYPE_REUSE_BUFFER; + rb.event.size = sizeof (rb); + rb.port_id = 0; + rb.buffer_id = b->outbuf->id; + state->event_cb (&state->node, &rb.event, state->user_data); + + state->ready_offset = 0; } + } else { + spa_log_warn (state->log, "underrun\n"); + snd_pcm_areas_silence (my_areas, offset, state->channels, frames, state->format); + } + + if ((err = snd_pcm_mmap_commit (hndl, offset, frames)) < 0) { + spa_log_error (state->log, "snd_pcm_mmap_commit error: %s\n", snd_strerror(err)); + if (err != -EPIPE && err != -ESTRPIPE) + return -1; } size -= frames; } @@ -318,7 +345,7 @@ mmap_read (SpaALSAState *state) { snd_pcm_t *hndl = state->hndl; int err; - snd_pcm_sframes_t avail, commitres; + snd_pcm_sframes_t avail; snd_pcm_uframes_t offset, frames, size; snd_pcm_status_t *status; const snd_pcm_channel_area_t *my_areas; @@ -330,8 +357,10 @@ mmap_read (SpaALSAState *state) snd_pcm_status_alloca(&status); - if ((err = snd_pcm_status (hndl, status)) < 0) + if ((err = snd_pcm_status (hndl, status)) < 0) { + spa_log_error (state->log, "snd_pcm_status error: %s\n", snd_strerror(err)); return err; + } avail = snd_pcm_status_get_avail (status); snd_pcm_status_get_htstamp (status, &htstamp); @@ -361,10 +390,8 @@ mmap_read (SpaALSAState *state) while (size > 0) { frames = size; if ((err = snd_pcm_mmap_begin (hndl, &my_areas, &offset, &frames)) < 0) { - if ((err = xrun_recovery (state, hndl, err)) < 0) { - spa_log_error (state->log, "MMAP begin avail error: %s\n", snd_strerror (err)); - return -1; - } + spa_log_error (state->log, "snd_pcm_mmap_begin error: %s\n", snd_strerror (err)); + return -1; } if (b) { @@ -374,16 +401,11 @@ mmap_read (SpaALSAState *state) (uint8_t *)my_areas[0].addr + (offset * state->frame_size), n_bytes); dest += n_bytes; - } else { - snd_pcm_areas_silence (my_areas, offset, state->channels, frames, state->format); } - commitres = snd_pcm_mmap_commit (hndl, offset, frames); - if (commitres < 0 || (snd_pcm_uframes_t)commitres != frames) { - if ((err = xrun_recovery (state, hndl, commitres >= 0 ? -EPIPE : commitres)) < 0) { - spa_log_error (state->log, "MMAP commit error: %s\n", snd_strerror(err)); - return -1; - } + if ((err = snd_pcm_mmap_commit (hndl, offset, frames)) < 0) { + spa_log_error (state->log, "snd_pcm_mmap_commit error: %s\n", snd_strerror(err)); + return -1; } size -= frames; } @@ -413,34 +435,27 @@ alsa_on_fd_events (SpaPollNotifyData *data) SpaALSAState *state = data->user_data; snd_pcm_t *hndl = state->hndl; int err; - unsigned short revents; + unsigned short revents = 0; snd_pcm_poll_descriptors_revents (hndl, (struct pollfd *)data->fds, data->n_fds, &revents); if (revents & POLLERR) { - if (snd_pcm_state (hndl) == SND_PCM_STATE_XRUN || - snd_pcm_state (hndl) == SND_PCM_STATE_SUSPENDED) { - err = snd_pcm_state (hndl) == SND_PCM_STATE_XRUN ? -EPIPE : -ESTRPIPE; - if ((err = xrun_recovery (state, hndl, err)) < 0) { - spa_log_error (state->log, "error: %s\n", snd_strerror (err)); - return -1; - } - } else { - spa_log_error (state->log, "Wait for poll failed\n"); + if ((err = xrun_recovery (state, hndl, err)) < 0) { + spa_log_error (state->log, "error: %s\n", snd_strerror (err)); return -1; } } if (state->stream == SND_PCM_STREAM_CAPTURE) { if (!(revents & POLLIN)) - return -1; + return 0; mmap_read (state); } else { if (!(revents & POLLOUT)) - return -1; + return 0; mmap_write (state); } @@ -449,7 +464,7 @@ alsa_on_fd_events (SpaPollNotifyData *data) } SpaResult -spa_alsa_start (SpaALSAState *state) +spa_alsa_start (SpaALSAState *state, bool xrun_recover) { int err; @@ -457,31 +472,58 @@ spa_alsa_start (SpaALSAState *state) return SPA_RESULT_OK; CHECK (set_swparams (state), "swparams"); - snd_pcm_dump (state->hndl, state->output); + if (!xrun_recover) + snd_pcm_dump (state->hndl, state->output); + + if ((err = snd_pcm_prepare (state->hndl)) < 0) { + spa_log_error (state->log, "snd_pcm_prepare error: %s\n", snd_strerror (err)); + return SPA_RESULT_ERROR; + } + + if ((state->poll.n_fds = snd_pcm_poll_descriptors_count (state->hndl)) <= 0) { + spa_log_error (state->log, "Invalid poll descriptors count %d\n", state->poll.n_fds); + return SPA_RESULT_ERROR; + } + if ((err = snd_pcm_poll_descriptors (state->hndl, (struct pollfd *)state->fds, state->poll.n_fds)) < 0) { + spa_log_error (state->log, "snd_pcm_poll_descriptors: %s\n", snd_strerror(err)); + return SPA_RESULT_ERROR; + } + + if (!xrun_recover) { + state->poll.enabled = true; + spa_poll_update_item (state->data_loop, &state->poll); + } if (state->stream == SND_PCM_STREAM_PLAYBACK) { mmap_write (state); } - state->poll.enabled = true; - spa_poll_update_item (state->data_loop, &state->poll); + if ((err = snd_pcm_start (state->hndl)) < 0) { + spa_log_error (state->log, "snd_pcm_start: %s\n", snd_strerror (err)); + return SPA_RESULT_ERROR; + } - err = snd_pcm_start (state->hndl); state->started = true; return SPA_RESULT_OK; } SpaResult -spa_alsa_pause (SpaALSAState *state) +spa_alsa_pause (SpaALSAState *state, bool xrun_recover) { + int err; + if (!state->started) return SPA_RESULT_OK; - state->poll.enabled = false; - spa_poll_update_item (state->data_loop, &state->poll); + if (!xrun_recover) { + state->poll.enabled = false; + spa_poll_update_item (state->data_loop, &state->poll); + } + + if ((err = snd_pcm_drop (state->hndl)) < 0) + spa_log_error (state->log, "snd_pcm_drop %s\n", snd_strerror (err)); - snd_pcm_drop (state->hndl); state->started = false; return SPA_RESULT_OK; diff --git a/spa/plugins/alsa/alsa-utils.h b/spa/plugins/alsa/alsa-utils.h index 26d0a9bd6..da920e726 100644 --- a/spa/plugins/alsa/alsa-utils.h +++ b/spa/plugins/alsa/alsa-utils.h @@ -47,7 +47,7 @@ typedef struct { bool period_event; } SpaALSAProps; -#define MAX_BUFFERS 16 +#define MAX_BUFFERS 64 struct _SpaALSABuffer { SpaBuffer *outbuf; @@ -106,6 +106,7 @@ struct _SpaALSAState { SpaQueue free; SpaQueue ready; + size_t ready_offset; bool started; SpaPollFd fds[16]; @@ -120,8 +121,8 @@ int spa_alsa_set_format (SpaALSAState *state, SpaFormatAudio *fmt, SpaPortFormatFlags flags); -SpaResult spa_alsa_start (SpaALSAState *state); -SpaResult spa_alsa_pause (SpaALSAState *state); +SpaResult spa_alsa_start (SpaALSAState *state, bool xrun_recover); +SpaResult spa_alsa_pause (SpaALSAState *state, bool xrun_recover); SpaResult spa_alsa_close (SpaALSAState *state); #ifdef __cplusplus diff --git a/spa/plugins/v4l2/v4l2-source.c b/spa/plugins/v4l2/v4l2-source.c index ec503cbfa..2ecd11b15 100644 --- a/spa/plugins/v4l2/v4l2-source.c +++ b/spa/plugins/v4l2/v4l2-source.c @@ -150,6 +150,7 @@ struct _SpaV4l2Source { static void update_state (SpaV4l2Source *this, SpaNodeState state) { + spa_log_info (this->log, "state: %d\n", state); this->node.state = state; } #include "v4l2-utils.c" @@ -225,7 +226,7 @@ spa_v4l2_source_node_set_props (SpaNode *node, } static SpaResult -do_send_event (SpaPoll *poll, +do_pause_done (SpaPoll *poll, bool async, uint32_t seq, size_t size, @@ -233,8 +234,70 @@ do_send_event (SpaPoll *poll, void *user_data) { SpaV4l2Source *this = user_data; + SpaV4l2State *state = &this->state[0]; + SpaNodeEventAsyncComplete *ac = data; - this->event_cb (&this->node, data, this->user_data); + + if (SPA_RESULT_IS_OK (ac->res)) + ac->res = spa_v4l2_stream_off (this); + + if (SPA_RESULT_IS_OK (ac->res)) { + state->started = false; + update_state (this, SPA_NODE_STATE_PAUSED); + } + this->event_cb (&this->node, &ac->event, this->user_data); + + return SPA_RESULT_OK; +} + +static SpaResult +do_pause (SpaPoll *poll, + bool async, + uint32_t seq, + size_t size, + void *data, + void *user_data) +{ + SpaV4l2Source *this = user_data; + SpaResult res; + SpaNodeEventAsyncComplete ac; + SpaNodeCommand *cmd = data; + + res = spa_node_port_send_command (&this->node, + SPA_DIRECTION_OUTPUT, + 0, + cmd); + + ac.event.type = SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE; + ac.event.size = sizeof (SpaNodeEventAsyncComplete); + ac.seq = seq; + ac.res = res; + spa_poll_invoke (this->state[0].main_loop, + do_pause_done, + seq, + sizeof (ac), + &ac, + this); + return SPA_RESULT_OK; +} + +static SpaResult +do_start_done (SpaPoll *poll, + bool async, + uint32_t seq, + size_t size, + void *data, + void *user_data) +{ + SpaV4l2Source *this = user_data; + SpaV4l2State *state = &this->state[0]; + SpaNodeEventAsyncComplete *ac = data; + + if (SPA_RESULT_IS_OK (ac->res)) { + state->started = true; + update_state (this, SPA_NODE_STATE_STREAMING); + } + this->event_cb (&this->node, &ac->event, this->user_data); return SPA_RESULT_OK; } @@ -250,51 +313,25 @@ do_start (SpaPoll *poll, SpaV4l2Source *this = user_data; SpaResult res; SpaNodeEventAsyncComplete ac; + SpaNodeCommand *cmd = data; - res = spa_v4l2_start (this); + res = spa_node_port_send_command (&this->node, + SPA_DIRECTION_OUTPUT, + 0, + cmd); - if (async) { - ac.event.type = SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE; - ac.event.size = sizeof (SpaNodeEventAsyncComplete); - ac.seq = seq; - ac.res = res; - spa_poll_invoke (this->state[0].main_loop, - do_send_event, - SPA_ID_INVALID, - sizeof (ac), - &ac, - this); - } - return res; -} + ac.event.type = SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE; + ac.event.size = sizeof (SpaNodeEventAsyncComplete); + ac.seq = seq; + ac.res = res; + spa_poll_invoke (this->state[0].main_loop, + do_start_done, + seq, + sizeof (ac), + &ac, + this); -static SpaResult -do_pause (SpaPoll *poll, - bool async, - uint32_t seq, - size_t size, - void *data, - void *user_data) -{ - SpaV4l2Source *this = user_data; - SpaResult res; - SpaNodeEventAsyncComplete ac; - - res = spa_v4l2_pause (this); - - if (async) { - ac.event.type = SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE; - ac.event.size = sizeof (SpaNodeEventAsyncComplete); - ac.seq = seq; - ac.res = res; - spa_poll_invoke (this->state[0].main_loop, - do_send_event, - SPA_ID_INVALID, - sizeof (ac), - &ac, - this); - } - return res; + return SPA_RESULT_OK; } static SpaResult @@ -315,6 +352,7 @@ spa_v4l2_source_node_send_command (SpaNode *node, case SPA_NODE_COMMAND_START: { SpaV4l2State *state = &this->state[0]; + SpaResult res; if (state->current_format == NULL) return SPA_RESULT_NO_FORMAT; @@ -322,11 +360,17 @@ spa_v4l2_source_node_send_command (SpaNode *node, if (state->n_buffers == 0) return SPA_RESULT_NO_BUFFERS; + if (state->started) + return SPA_RESULT_OK; + + if ((res = spa_v4l2_stream_on (this)) < 0) + return res; + return spa_poll_invoke (this->state[0].data_loop, do_start, ++this->seq, - 0, - NULL, + command->size, + command, this); } case SPA_NODE_COMMAND_PAUSE: @@ -339,13 +383,17 @@ spa_v4l2_source_node_send_command (SpaNode *node, if (state->n_buffers == 0) return SPA_RESULT_NO_BUFFERS; + if (!state->started) + return SPA_RESULT_OK; + return spa_poll_invoke (this->state[0].data_loop, do_pause, ++this->seq, - 0, - NULL, + command->size, + command, this); } + case SPA_NODE_COMMAND_FLUSH: case SPA_NODE_COMMAND_DRAIN: case SPA_NODE_COMMAND_MARKER: @@ -497,7 +545,7 @@ spa_v4l2_source_node_port_set_format (SpaNode *node, state = &this->state[port_id]; if (format == NULL) { - spa_v4l2_pause (this); + spa_v4l2_stream_off (this); spa_v4l2_clear_buffers (this); spa_v4l2_close (this); state->current_format = NULL; @@ -797,7 +845,31 @@ spa_v4l2_source_node_port_send_command (SpaNode *node, uint32_t port_id, SpaNodeCommand *command) { - return SPA_RESULT_NOT_IMPLEMENTED; + SpaV4l2Source *this; + SpaResult res; + + if (node == NULL) + return SPA_RESULT_INVALID_ARGUMENTS; + + this = SPA_CONTAINER_OF (node, SpaV4l2Source, node); + + if (port_id != 0) + return SPA_RESULT_INVALID_PORT; + + switch (command->type) { + case SPA_NODE_COMMAND_PAUSE: + res = spa_v4l2_port_set_enabled (this, false); + break; + + case SPA_NODE_COMMAND_START: + res = spa_v4l2_port_set_enabled (this, true); + break; + + default: + res = SPA_RESULT_NOT_IMPLEMENTED; + break; + } + return res; } static const SpaNode v4l2source_node = { diff --git a/spa/plugins/v4l2/v4l2-utils.c b/spa/plugins/v4l2/v4l2-utils.c index 7ff9f0434..55f4edf60 100644 --- a/spa/plugins/v4l2/v4l2-utils.c +++ b/spa/plugins/v4l2/v4l2-utils.c @@ -1134,46 +1134,38 @@ spa_v4l2_alloc_buffers (SpaV4l2Source *this, } static SpaResult -spa_v4l2_start (SpaV4l2Source *this) +spa_v4l2_stream_on (SpaV4l2Source *this) { SpaV4l2State *state = &this->state[0]; enum v4l2_buf_type type; - if (state->started) - return SPA_RESULT_OK; - type = V4L2_BUF_TYPE_VIDEO_CAPTURE; if (xioctl (state->fd, VIDIOC_STREAMON, &type) < 0) { - perror ("VIDIOC_STREAMON"); + spa_log_error (this->log, "VIDIOC_STREAMON: %s", strerror (errno)); return SPA_RESULT_ERROR; } - state->started = true; - update_state (this, SPA_NODE_STATE_STREAMING); - - state->poll.enabled = true; - spa_poll_update_item (state->data_loop, &state->poll); - return SPA_RESULT_OK; } static SpaResult -spa_v4l2_pause (SpaV4l2Source *this) +spa_v4l2_port_set_enabled (SpaV4l2Source *this, bool enabled) +{ + SpaV4l2State *state = &this->state[0]; + state->poll.enabled = enabled; + spa_poll_update_item (state->data_loop, &state->poll); + return SPA_RESULT_OK; +} + +static SpaResult +spa_v4l2_stream_off (SpaV4l2Source *this) { SpaV4l2State *state = &this->state[0]; enum v4l2_buf_type type; int i; - if (!state->started) - return SPA_RESULT_OK; - - state->started = false; - - state->poll.enabled = false; - spa_poll_update_item (state->data_loop, &state->poll); - type = V4L2_BUF_TYPE_VIDEO_CAPTURE; if (xioctl (state->fd, VIDIOC_STREAMOFF, &type) < 0) { - perror ("VIDIOC_STREAMOFF"); + spa_log_error (this->log, "VIDIOC_STREAMOFF: %s", strerror (errno)); return SPA_RESULT_ERROR; } for (i = 0; i < state->n_buffers; i++) { @@ -1182,9 +1174,8 @@ spa_v4l2_pause (SpaV4l2Source *this) b = &state->buffers[i]; if (!b->outstanding) if (xioctl (state->fd, VIDIOC_QBUF, &b->v4l2_buffer) < 0) - perror ("VIDIOC_QBUF"); + spa_log_warn (this->log, "VIDIOC_QBUF: %s", strerror (errno)); } - update_state (this, SPA_NODE_STATE_PAUSED); return SPA_RESULT_OK; }