util: change the sources API to source_new and sink_add_source

This is a more explicit API that makes it more obvious where the ref/unref
calls need to go. The sink now has two refs to the sources as well (epoll data
pointer and the list) which means a caller cannot just source_unref() anymore
without removing a source. Since this is how we've been using it anyway - meh.

Signed-off-by: Peter Hutterer <peter.hutterer@who-t.net>
This commit is contained in:
Peter Hutterer 2020-08-07 13:07:10 +10:00
parent 9d18f04e9b
commit 9c2c912353
6 changed files with 139 additions and 69 deletions

View file

@ -741,14 +741,16 @@ error:
int
ei_set_connection(struct ei *ei, int fd)
{
struct source *source = source_add_autoclose(ei->sink, fd, connection_dispatch, ei);
if (!source)
return -ENOMEM;
struct source *source = source_new(fd, connection_dispatch, ei);
int rc = sink_add_source(ei->sink, source);
if (rc == 0) {
ei->source = source_ref(source);
ei->state = EI_STATE_BACKEND;
}
ei->source = source_ref(source);
ei->state = EI_STATE_BACKEND;
source_unref(source);
return 0;
return rc;
}
_public_ void

View file

@ -598,16 +598,24 @@ eis_client_new(struct eis *eis, int fd)
client->id = ++client_id;
list_init(&client->devices);
struct source *s = source_add_autoclose(eis->sink, fd,
client_dispatch, client);
struct source *s = source_new(fd, client_dispatch, client);
int rc = sink_add_source(eis->sink, s);
if (rc != 0) {
source_unref(s);
return NULL;
}
client->source = source_ref(s);
client->state = EIS_CLIENT_STATE_HELLO;
int rc = client_send_hello(client);
rc = client_send_hello(client);
if (rc != 0)
client = eis_client_unref(client);
eis_add_client(eis, eis_client_ref(client));
source_unref(s);
return client;
}

View file

@ -131,11 +131,16 @@ eis_setup_backend_socket(struct eis *eis, const char *socketpath)
if (listen(sockfd, 2) == -1)
return -errno;
struct source *s = source_add_autoclose(eis->sink, sockfd, listener_dispatch, eis_socket);
eis_socket->listener = source_ref(s);
eis_socket->socketpath = steal(&path);
eis->backend = steal(&eis_socket);
eis->backend_interface = interface;
struct source *s = source_new(sockfd, listener_dispatch, eis_socket);
int rc = sink_add_source(eis->sink, s);
if (rc == 0) {
eis_socket->listener = source_ref(s);
eis_socket->socketpath = steal(&path);
eis->backend = steal(&eis_socket);
eis->backend_interface = interface;
}
return 0;
source_unref(s);
return rc;
}

View file

@ -53,6 +53,7 @@ struct source {
void *user_data;
enum source_close_behavior close_behavior;
int fd;
bool is_active;
};
OBJECT_IMPLEMENT_REF(source);
@ -68,68 +69,49 @@ OBJECT_IMPLEMENT_SETTER(source, user_data, void*);
void
source_remove(struct source *source)
{
if (source->fd != -1) {
epoll_ctl(source->sink->epollfd, EPOLL_CTL_DEL, source->fd, NULL);
if (source->close_behavior == SOURCE_CLOSE_FD_ON_REMOVE)
source->fd = xclose(source->fd);
}
if (!source->is_active)
return;
if (!list_empty(&source->link)) {
list_remove(&source->link);
/* Note: epollfd was the owner of the source, new owner is
* the removed list */
list_append(&source->sink->sources_removed, &source->link);
}
epoll_ctl(source->sink->epollfd, EPOLL_CTL_DEL, source->fd, NULL);
if (source->close_behavior == SOURCE_CLOSE_FD_ON_REMOVE)
source->fd = xclose(source->fd);
source->is_active = false;
source_unref(source);
/* Note: sources list was the owner of the source, new owner
is the removed list */
list_remove(&source->link);
list_append(&source->sink->sources_removed, &source->link);
source->sink = NULL;
}
/* Ignore, use source_unref() */
static void
source_destroy(struct source *source)
{
if (source->fd != -1) {
source_remove(source);
if (source->close_behavior == SOURCE_CLOSE_FD_ON_DESTROY)
source->fd = xclose(source->fd);
}
/* We expect source_remove() to be called before we ever get here */
assert(!source->is_active);
if (source->close_behavior == SOURCE_CLOSE_FD_ON_DESTROY)
source->fd = xclose(source->fd);
}
OBJECT_IMPLEMENT_CREATE(source);
static inline struct source *
source_add(struct sink *sink, int sourcefd,
source_dispatch_t dispatch,
void *user_data,
enum source_close_behavior close_behavior)
struct source *
source_new(int sourcefd, source_dispatch_t dispatch, void *user_data)
{
struct source *source = source_create(NULL);
source->dispatch = dispatch;
source->user_data = user_data;
source->fd = sourcefd;
source->sink = sink;
source->close_behavior = close_behavior;
struct epoll_event e = {
.events = EPOLLIN,
.data.ptr = source, /* epollfd is the owner of this source */
};
if (epoll_ctl(sink->epollfd, EPOLL_CTL_ADD, sourcefd, &e) < 0) {
free(source);
return NULL;
}
list_append(&sink->sources, &source->link);
source->close_behavior = SOURCE_CLOSE_FD_ON_REMOVE;
source->is_active = false;
list_init(&source->link);
return source;
}
struct source *
source_add_autoclose(struct sink *sink, int sourcefd,
source_dispatch_t dispatch,
void *user_data)
{
return source_add(sink, sourcefd, dispatch, user_data, SOURCE_CLOSE_FD_ON_REMOVE);
}
static void
@ -196,3 +178,25 @@ sink_dispatch(struct sink *sink)
return 0;
}
int
sink_add_source(struct sink *sink, struct source *source)
{
struct epoll_event e = {
.events = EPOLLIN,
.data.ptr = source_ref(source),
};
if (epoll_ctl(sink->epollfd, EPOLL_CTL_ADD, source_get_fd(source), &e) < 0) {
source_unref(source);
return -errno;
}
source->is_active = true;
source->sink = sink;
source_ref(source);
list_append(&sink->sources, &source->link);
return 0;
}

View file

@ -39,7 +39,8 @@ struct sink;
typedef void (*source_dispatch_t)(struct source *source, void *user_data);
/**
* Remove source from its sink without destroying it.
* Remove source from its sink without destroying it, a source may be
* re-added to a sink later.
*/
void source_remove(struct source *source);
@ -47,8 +48,12 @@ struct source *
source_ref(struct source *source);
/**
* Unref source. When the last reference is dropped, the source is removed
* from the sink and resources are released.
* Unref source. When the last reference is dropped, resources
* are released.
*
* Note that due to implementation details, it is not possible to get the
* refcount to zero by calling source_unref() in the caller, you *must*
* remove a source with source_remove() to be able to release it fully.
*/
struct source *
source_unref(struct source *source);
@ -62,13 +67,21 @@ source_get_user_data(struct source *source);
void
source_set_user_data(struct source *source, void *user_data);
/**
* Add a new source that closes the connected fd on removal of the source.
* Create a new source for the given file descriptor with the given dispatch
* callback. The source's default behavior is that the fd is closed on the
* call to source_remove().
*
* This source does not generate events until added to a sink with
* sink_add_source().
*
* The returned source has a refcount of 1, use source_unref() to release th
* memory.
*/
struct source *
source_add_autoclose(struct sink *sink, int sourcefd,
source_dispatch_t dispatch,
void *user_data);
source_new(int fd, source_dispatch_t dispatch, void *user_data);
struct sink *
sink_new(void);
@ -79,6 +92,13 @@ sink_unref(struct sink *sink);
int
sink_dispatch(struct sink *sink);
/**
* Add the source to the given sink. Use source_remove() to remove the
* source.
*/
int
sink_add_source(struct sink *sink, struct source *source);
/**
* The epollfd to monitor for this sink.
*/

View file

@ -37,6 +37,8 @@
DEFINE_TRIVIAL_CLEANUP_FUNC(struct sink *, sink_unref);
#define _cleanup_sink_ _cleanup_(sink_unrefp)
DEFINE_TRIVIAL_CLEANUP_FUNC(struct source *, source_unref);
#define _cleanup_source_ _cleanup_(source_unrefp)
static MunitResult
test_sink(const MunitParameter params[], void *user_data)
@ -84,11 +86,12 @@ test_source(const MunitParameter params[], void *user_data)
munit_assert_int(rc, !=, -1);
struct buffer buffer = {0};
struct source *s = source_add_autoclose(sink, fd[0], read_buffer, &buffer);
s = source_ref(s);
struct source *s = source_new(fd[0], read_buffer, &buffer);
munit_assert_int(source_get_fd(s), ==, fd[0]);
sink_add_source(sink, s);
/* Nothing to read yet, dispatch is a noop */
sink_dispatch(sink);
munit_assert_int(buffer.len, ==, 0);
@ -126,13 +129,41 @@ test_source(const MunitParameter params[], void *user_data)
return MUNIT_OK;
}
static void
drain_data(struct source *source, void *user_data)
{
char buf[1024] = {0};
read(source_get_fd(source), buf, sizeof(buf));
}
static MunitResult
test_source_readd(const MunitParameter params[], void *user_data)
{
_cleanup_sink_ struct sink *sink = sink_new();
int fd[2];
int rc = pipe2(fd, O_CLOEXEC|O_NONBLOCK);
munit_assert_int(rc, !=, -1);
_cleanup_source_ struct source *s = source_new(fd[0], drain_data, NULL);
sink_add_source(sink, s);
sink_dispatch(sink);
/* remove and re-add without calling dispatch */
source_remove(s);
sink_add_source(sink, s);
source_remove(s);
return MUNIT_OK;
}
static MunitTest iotest_tests[] = {
{ .name = "sink", .test = test_sink},
{ .name = "source", .test = test_source},
{ .name = "/sink", .test = test_sink},
{ .name = "/source", .test = test_source},
{ .name = "/source/readd", .test = test_source_readd},
};
static const MunitSuite iotest_suite = {
"sources",
"/sources",
iotest_tests,
NULL,
1,