lib: add wpipc library

Simple library that uses sockets for inter-process communication. It provides an
API to create server and client objects. Users can add custom handlers in the
server, and clients can send requests for those custom handlers.
This commit is contained in:
Julian Bouzas 2021-04-20 04:08:58 -04:00
parent 28a4229681
commit 795df4b693
24 changed files with 2486 additions and 0 deletions

View file

@ -1,2 +1,3 @@
subdir('wp') subdir('wp')
subdir('wplua') subdir('wplua')
subdir('wpipc')

84
lib/wpipc/client.c Normal file
View file

@ -0,0 +1,84 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#include "private.h"
#include "protocol.h"
#include "sender.h"
#include "client.h"
#define BUFFER_SIZE 1024
static void
on_lost_connection (struct wpipc_sender *self,
int receiver_fd,
void *data)
{
wpipc_log_warn ("client: lost connection with server %d", receiver_fd);
}
/* API */
struct wpipc_client *
wpipc_client_new (const char *path, bool connect)
{
struct wpipc_sender *base;
base = wpipc_sender_new (path, BUFFER_SIZE, on_lost_connection, NULL, 0);
if (connect)
wpipc_sender_connect (base);
return (struct wpipc_client *)base;
}
void
wpipc_client_free (struct wpipc_client *self)
{
struct wpipc_sender *base = wpipc_client_to_sender (self);
wpipc_sender_free (base);
}
bool
wpipc_client_send_request (struct wpipc_client *self,
const char *name,
const struct spa_pod *args,
wpipc_sender_reply_func_t reply,
void *data)
{
struct wpipc_sender *base = wpipc_client_to_sender (self);
/* check params */
if (name == NULL)
return false;
const size_t size = wpipc_protocol_calculate_request_size (name, args);
uint8_t buffer[size];
wpipc_protocol_build_request (buffer, size, name, args);
return wpipc_sender_send (base, buffer, size, reply, data);
}
const struct spa_pod *
wpipc_client_send_request_finish (struct wpipc_sender *self,
const uint8_t *buffer,
size_t size,
const char **error)
{
/* error */
if (wpipc_protocol_is_reply_error (buffer, size)) {
wpipc_protocol_parse_reply_error (buffer, size, error);
return NULL;
}
/* ok */
if (wpipc_protocol_is_reply_ok (buffer, size)) {
const struct spa_pod *value = NULL;
if (wpipc_protocol_parse_reply_ok (buffer, size, &value))
return value;
}
return NULL;
}

56
lib/wpipc/client.h Normal file
View file

@ -0,0 +1,56 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#ifndef __WPIPC_CLIENT_H__
#define __WPIPC_CLIENT_H__
#include <spa/pod/pod.h>
#include <stddef.h>
#include "sender.h"
#include "defs.h"
#ifdef __cplusplus
extern "C" {
#endif
#define wpipc_client_to_sender(self) ((struct wpipc_sender *)(self))
struct wpipc_client;
WPIPC_API
struct wpipc_client *
wpipc_client_new (const char *path, bool connect);
WPIPC_API
void
wpipc_client_free (struct wpipc_client *self);
WPIPC_API
bool
wpipc_client_send_request (struct wpipc_client *self,
const char *name,
const struct spa_pod *args,
wpipc_sender_reply_func_t reply,
void *data);
/* for reply handlers only */
WPIPC_API
const struct spa_pod *
wpipc_client_send_request_finish (struct wpipc_sender *self,
const uint8_t *buffer,
size_t size,
const char **error);
#ifdef __cplusplus
}
#endif
#endif

26
lib/wpipc/defs.h Normal file
View file

@ -0,0 +1,26 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#ifndef __WPIPC_DEFS_H__
#define __WPIPC_DEFS_H__
#if defined(__GNUC__)
# define WPIPC_API_EXPORT extern __attribute__ ((visibility ("default")))
#else
# define WPIPC_API_EXPORT extern
#endif
#ifndef WPIPC_API
# define WPIPC_API WPIPC_API_EXPORT
#endif
#ifndef WPIPC_PRIVATE_API
# define WPIPC_PRIVATE_API __attribute__ ((deprecated ("Private API")))
#endif
#endif

40
lib/wpipc/meson.build Normal file
View file

@ -0,0 +1,40 @@
wpipc_lib_sources = files(
'utils.c',
'protocol.c',
'receiver.c',
'sender.c',
'client.c',
'server.c',
)
wpipc_lib_headers = files(
'protocol.h',
'receiver.h',
'sender.h',
'client.h',
'server.h',
'wpipc.h',
)
install_headers(wpipc_lib_headers,
install_dir : join_paths(get_option('includedir'), 'wpipc-' + wireplumber_api_version, 'wpipc')
)
wpipc_lib = library('wpipc-' + wireplumber_api_version,
wpipc_lib_sources,
c_args : [
'-D_GNU_SOURCE',
'-DG_LOG_USE_STRUCTURED',
'-DG_LOG_DOMAIN="wpipc"',
],
install: true,
dependencies : [dependency('threads'), spa_dep],
)
wpipc_dep = declare_dependency(
link_with: wpipc_lib,
include_directories: wp_lib_include_dir,
dependencies: [spa_dep],
)

97
lib/wpipc/private.h Normal file
View file

@ -0,0 +1,97 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#ifndef __WPIPC_PRIVATE_H__
#define __WPIPC_PRIVATE_H__
#include <stdbool.h>
#include <stdint.h>
#include <pthread.h>
#include <stddef.h>
#include <sys/types.h>
#include <stdarg.h>
#include "defs.h"
#ifdef __cplusplus
extern "C" {
#endif
/* log */
#define wpipc_log_info(F, ...) \
wpipc_log(WPIPC_LOG_LEVEL_INFO, (F), ##__VA_ARGS__)
#define wpipc_log_warn(F, ...) \
wpipc_log(WPIPC_LOG_LEVEL_WARN, (F), ##__VA_ARGS__)
#define wpipc_log_error(F, ...) \
wpipc_log(WPIPC_LOG_LEVEL_ERROR, (F), ##__VA_ARGS__)
enum wpipc_log_level {
WPIPC_LOG_LEVEL_NONE = 0,
WPIPC_LOG_LEVEL_ERROR,
WPIPC_LOG_LEVEL_WARN,
WPIPC_LOG_LEVEL_INFO,
};
void
wpipc_logv (enum wpipc_log_level level,
const char *fmt,
va_list args) __attribute__ ((format (printf, 2, 0)));
void
wpipc_log (enum wpipc_log_level level,
const char *fmt,
...) __attribute__ ((format (printf, 2, 3)));
/* socket */
ssize_t
wpipc_socket_write (int fd, const uint8_t *buffer, size_t size);
ssize_t
wpipc_socket_read (int fd, uint8_t **buffer, size_t *max_size);
/* epoll thread */
struct epoll_thread;
typedef void (*wpipc_epoll_thread_event_funct_t) (struct epoll_thread *self,
int fd,
void *data);
struct epoll_thread {
int socket_fd;
int epoll_fd;
int event_fd;
pthread_t thread;
wpipc_epoll_thread_event_funct_t socket_event_func;
wpipc_epoll_thread_event_funct_t other_event_func;
void *event_data;
};
bool
wpipc_epoll_thread_init (struct epoll_thread *self,
int socket_fd,
wpipc_epoll_thread_event_funct_t sock_func,
wpipc_epoll_thread_event_funct_t other_func,
void *data);
bool
wpipc_epoll_thread_start (struct epoll_thread *self);
void
wpipc_epoll_thread_stop (struct epoll_thread *self);
void
wpipc_epoll_thread_destroy (struct epoll_thread *self);
#ifdef __cplusplus
}
#endif
#endif

217
lib/wpipc/protocol.c Normal file
View file

@ -0,0 +1,217 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#include <assert.h>
#include <spa/pod/builder.h>
#include <spa/pod/parser.h>
#include "protocol.h"
#define SIZE_PADDING 128
enum wpipc_protocol_reply_code {
REPLY_CODE_ERROR = 0,
REPLY_CODE_OK,
};
static bool
is_reply (const uint8_t *buffer, size_t size, int code)
{
const struct spa_pod *pod = (const struct spa_pod *)buffer;
struct spa_pod_parser p;
struct spa_pod_frame f;
int parsed_code = 0;
/* check if struct */
if (!spa_pod_is_struct (pod))
return false;
/* parse */
spa_pod_parser_pod (&p, pod);
spa_pod_parser_push_struct(&p, &f);
spa_pod_parser_get_int (&p, &parsed_code);
return parsed_code == code;
}
/* API */
size_t
wpipc_protocol_calculate_request_size (const char *name,
const struct spa_pod *args)
{
assert (name);
return strlen(name) + (args ? SPA_POD_SIZE(args) : 8) + SIZE_PADDING;
}
void
wpipc_protocol_build_request (uint8_t *buffer,
size_t size,
const char *name,
const struct spa_pod *args)
{
const struct spa_pod none = SPA_POD_INIT_None();
struct spa_pod_builder b;
struct spa_pod_frame f;
if (args == NULL)
args = &none;
spa_pod_builder_init (&b, buffer, size);
spa_pod_builder_push_struct (&b, &f);
spa_pod_builder_string (&b, name);
spa_pod_builder_primitive (&b, args);
spa_pod_builder_pop(&b, &f);
}
bool
wpipc_protocol_parse_request (const uint8_t *buffer,
size_t size,
const char **name,
const struct spa_pod **args)
{
const struct spa_pod *pod = (const struct spa_pod *)buffer;
struct spa_pod_parser p;
struct spa_pod_frame f;
const char *parsed_name = NULL;
struct spa_pod *parsed_args = NULL;
/* check if struct */
if (!spa_pod_is_struct (pod))
return false;
/* parse */
spa_pod_parser_pod (&p, pod);
spa_pod_parser_push_struct(&p, &f);
spa_pod_parser_get_string (&p, &parsed_name);
spa_pod_parser_get_pod (&p, &parsed_args);
spa_pod_parser_pop(&p, &f);
/* check name and args */
if (name == NULL || args == NULL)
return false;
if (name != NULL)
*name = parsed_name;
if (args != NULL)
*args = parsed_args;
return true;
}
size_t
wpipc_protocol_calculate_reply_ok_size (const struct spa_pod *value)
{
return (value ? SPA_POD_SIZE(value) : 8) + SIZE_PADDING;
}
size_t
wpipc_protocol_calculate_reply_error_size (const char *msg)
{
assert (msg);
return strlen(msg) + SIZE_PADDING;
}
void
wpipc_protocol_build_reply_ok (uint8_t *buffer,
size_t size,
const struct spa_pod *value)
{
const struct spa_pod none = SPA_POD_INIT_None();
struct spa_pod_builder b;
struct spa_pod_frame f;
if (value == NULL)
value = &none;
spa_pod_builder_init (&b, buffer, size);
spa_pod_builder_push_struct (&b, &f);
spa_pod_builder_int (&b, REPLY_CODE_OK);
spa_pod_builder_primitive (&b, value);
spa_pod_builder_pop(&b, &f);
}
void
wpipc_protocol_build_reply_error (uint8_t *buffer,
size_t size,
const char *msg)
{
struct spa_pod_builder b;
struct spa_pod_frame f;
spa_pod_builder_init (&b, buffer, size);
spa_pod_builder_push_struct (&b, &f);
spa_pod_builder_int (&b, REPLY_CODE_ERROR);
spa_pod_builder_string (&b, msg);
spa_pod_builder_pop(&b, &f);
}
bool
wpipc_protocol_is_reply_ok (const uint8_t *buffer, size_t size)
{
return is_reply (buffer, size, REPLY_CODE_OK);
}
bool
wpipc_protocol_is_reply_error (const uint8_t *buffer, size_t size)
{
return is_reply (buffer, size, REPLY_CODE_ERROR);
}
bool
wpipc_protocol_parse_reply_ok (const uint8_t *buffer,
size_t size,
const struct spa_pod **value)
{
const struct spa_pod *pod = (const struct spa_pod *)buffer;
struct spa_pod_parser p;
struct spa_pod_frame f;
int parsed_code = 0;
struct spa_pod *parsed_value = NULL;
/* check if struct */
if (!spa_pod_is_struct (pod))
return false;
/* parse */
spa_pod_parser_pod (&p, pod);
spa_pod_parser_push_struct(&p, &f);
spa_pod_parser_get_int (&p, &parsed_code);
spa_pod_parser_get_pod (&p, &parsed_value);
spa_pod_parser_pop (&p, &f);
if (value != NULL)
*value = parsed_value;
return true;
}
bool
wpipc_protocol_parse_reply_error (const uint8_t *buffer,
size_t size,
const char **msg)
{
const struct spa_pod *pod = (const struct spa_pod *)buffer;
struct spa_pod_parser p;
struct spa_pod_frame f;
int parsed_code = 0;
const char *parsed_msg = NULL;
/* check if struct */
if (!spa_pod_is_struct (pod))
return false;
/* parse */
spa_pod_parser_pod (&p, pod);
spa_pod_parser_push_struct(&p, &f);
spa_pod_parser_get_int (&p, &parsed_code);
spa_pod_parser_get_string (&p, &parsed_msg);
spa_pod_parser_pop (&p, &f);
if (msg != NULL)
*msg = parsed_msg;
return true;
}

87
lib/wpipc/protocol.h Normal file
View file

@ -0,0 +1,87 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#ifndef __WPIPC_PROTOCOL_H__
#define __WPIPC_PROTOCOL_H__
#include <spa/pod/pod.h>
#include "defs.h"
#ifdef __cplusplus
extern "C" {
#endif
/* request */
WPIPC_API
size_t
wpipc_protocol_calculate_request_size (const char *name,
const struct spa_pod *args);
WPIPC_API
void
wpipc_protocol_build_request (uint8_t *buffer,
size_t size,
const char *name,
const struct spa_pod *args);
WPIPC_API
bool
wpipc_protocol_parse_request (const uint8_t *buffer,
size_t size,
const char **name,
const struct spa_pod **args);
/* reply */
WPIPC_API
size_t
wpipc_protocol_calculate_reply_ok_size (const struct spa_pod *value);
WPIPC_API
size_t
wpipc_protocol_calculate_reply_error_size (const char *msg);
WPIPC_API
void
wpipc_protocol_build_reply_ok (uint8_t *buffer,
size_t size,
const struct spa_pod *value);
WPIPC_API
void
wpipc_protocol_build_reply_error (uint8_t *buffer,
size_t size,
const char *msg);
WPIPC_API
bool
wpipc_protocol_is_reply_ok (const uint8_t *buffer, size_t size);
WPIPC_API
bool
wpipc_protocol_is_reply_error (const uint8_t *buffer, size_t size);
WPIPC_API
bool
wpipc_protocol_parse_reply_ok (const uint8_t *buffer,
size_t size,
const struct spa_pod **value);
WPIPC_API
bool
wpipc_protocol_parse_reply_error (const uint8_t *buffer,
size_t size,
const char **msg);
#ifdef __cplusplus
}
#endif
#endif

213
lib/wpipc/receiver.c Normal file
View file

@ -0,0 +1,213 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/epoll.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
#include "private.h"
#include "receiver.h"
#include "wpipc.h"
#define MAX_SENDERS 128
struct wpipc_receiver {
struct sockaddr_un addr;
int socket_fd;
uint8_t *buffer_read;
size_t buffer_size;
struct epoll_thread epoll_thread;
bool thread_running;
const struct wpipc_receiver_events *events;
void *events_data;
/* for subclasses */
void *user_data;
};
static bool
reply_message (struct wpipc_receiver *self,
int sender_fd,
uint8_t *buffer,
size_t size)
{
return self->events && self->events->handle_message ?
self->events->handle_message (self, sender_fd, buffer, size, self->events_data) :
wpipc_socket_write (sender_fd, buffer, size) == (ssize_t)size;
}
static void
socket_event_received (struct epoll_thread *t, int fd, void *data)
{
/* sender wants to connect, accept connection */
struct wpipc_receiver *self = data;
socklen_t addr_size = sizeof(self->addr);
int sender_fd = accept4 (fd, (struct sockaddr*)&self->addr, &addr_size,
SOCK_CLOEXEC | SOCK_NONBLOCK);
struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = sender_fd;
epoll_ctl (t->epoll_fd, EPOLL_CTL_ADD, sender_fd, &event);
if (self->events && self->events->sender_state)
self->events->sender_state (self, sender_fd,
WPIPC_RECEIVER_SENDER_STATE_CONNECTED, self->events_data);
}
static void
other_event_received (struct epoll_thread *t, int fd, void *data)
{
struct wpipc_receiver *self = data;
/* sender sends a message, read it and reply */
ssize_t size = wpipc_socket_read (fd, &self->buffer_read, &self->buffer_size);
if (size < 0) {
wpipc_log_error ("receiver: could not read message: %s", strerror(errno));
return;
}
if (size == 0) {
/* client disconnected */
epoll_ctl (t->epoll_fd, EPOLL_CTL_DEL, fd, NULL);
close (fd);
if (self->events && self->events->sender_state)
self->events->sender_state (self, fd,
WPIPC_RECEIVER_SENDER_STATE_DISCONNECTED, self->events_data);
return;
}
/* reply */
if (!reply_message (self, fd, self->buffer_read, size))
wpipc_log_error ("receiver: could not reply message: %s", strerror(errno));
return;
}
/* API */
struct wpipc_receiver *
wpipc_receiver_new (const char *path,
size_t buffer_size,
const struct wpipc_receiver_events *events,
void *events_data,
size_t user_size)
{
struct wpipc_receiver *self;
int name_size;
/* check params */
if (path == NULL || buffer_size == 0)
return NULL;
unlink (path);
self = calloc (1, sizeof (struct wpipc_receiver) + user_size);
if (self == NULL)
return NULL;
self->socket_fd = -1;
/* set address */
self->addr.sun_family = AF_LOCAL;
name_size = snprintf(self->addr.sun_path, sizeof(self->addr.sun_path), "%s",
path) + 1;
if (name_size > (int) sizeof(self->addr.sun_path))
goto error;
/* create socket */
self->socket_fd =
socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0);
if (self->socket_fd < 0)
goto error;
/* bind socket */
if (bind (self->socket_fd, (struct sockaddr *)&self->addr,
sizeof(self->addr)) != 0)
goto error;
/* listen socket */
if (listen (self->socket_fd, MAX_SENDERS) != 0)
goto error;
/* alloc buffer read */
self->buffer_size = buffer_size;
self->buffer_read = calloc (buffer_size, sizeof (uint8_t));
if (self->buffer_read == NULL)
goto error;
/* init epoll thread */
if (!wpipc_epoll_thread_init (&self->epoll_thread, self->socket_fd,
socket_event_received, other_event_received, self))
goto error;
self->events = events;
self->events_data = events_data;
if (user_size > 0)
self->user_data = (void *)((uint8_t *)self + sizeof (struct wpipc_receiver));
return self;
error:
if (self->buffer_read)
free (self->buffer_read);
if (self->socket_fd != -1)
close (self->socket_fd);
free (self);
return NULL;
}
void
wpipc_receiver_free (struct wpipc_receiver *self)
{
wpipc_receiver_stop (self);
wpipc_epoll_thread_destroy (&self->epoll_thread);
free (self->buffer_read);
close (self->socket_fd);
free (self);
}
bool
wpipc_receiver_start (struct wpipc_receiver *self)
{
if (wpipc_receiver_is_running (self))
return true;
self->thread_running = wpipc_epoll_thread_start (&self->epoll_thread);
return self->thread_running;
}
void
wpipc_receiver_stop (struct wpipc_receiver *self)
{
if (wpipc_receiver_is_running (self)) {
wpipc_epoll_thread_stop (&self->epoll_thread);
self->thread_running = false;
}
}
bool
wpipc_receiver_is_running (struct wpipc_receiver *self)
{
return self->thread_running;
}
void *
wpipc_receiver_get_user_data (struct wpipc_receiver *self)
{
return self->user_data;
}

78
lib/wpipc/receiver.h Normal file
View file

@ -0,0 +1,78 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#ifndef __WPIPC_RECEIVER_H__
#define __WPIPC_RECEIVER_H__
#include <stdbool.h>
#include <stdint.h>
#include <stddef.h>
#include "defs.h"
#ifdef __cplusplus
extern "C" {
#endif
struct wpipc_receiver;
enum wpipc_receiver_sender_state {
WPIPC_RECEIVER_SENDER_STATE_CONNECTED = 0,
WPIPC_RECEIVER_SENDER_STATE_DISCONNECTED
};
struct wpipc_receiver_events {
/* emitted when a sender state changes */
void (*sender_state) (struct wpipc_receiver *self,
int sender_fd,
enum wpipc_receiver_sender_state state,
void *data);
/* emitted when message is received and needs to be handled */
bool (*handle_message) (struct wpipc_receiver *self,
int sender_fd,
const uint8_t *buffer,
size_t size,
void *data);
};
WPIPC_API
struct wpipc_receiver *
wpipc_receiver_new (const char *path,
size_t buffer_size,
const struct wpipc_receiver_events *events,
void *events_data,
size_t user_size);
WPIPC_API
void
wpipc_receiver_free (struct wpipc_receiver *self);
WPIPC_API
bool
wpipc_receiver_start (struct wpipc_receiver *self);
WPIPC_API
void
wpipc_receiver_stop (struct wpipc_receiver *self);
WPIPC_API
bool
wpipc_receiver_is_running (struct wpipc_receiver *self);
/* for subclasses only */
WPIPC_API
void *
wpipc_receiver_get_user_data (struct wpipc_receiver *self);
#ifdef __cplusplus
}
#endif
#endif

251
lib/wpipc/sender.c Normal file
View file

@ -0,0 +1,251 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
#include "private.h"
#include "sender.h"
#define MAX_ASYNC_TASKS 128
struct wpipc_sender_task {
wpipc_sender_reply_func_t func;
void *data;
};
struct wpipc_sender {
struct sockaddr_un addr;
int socket_fd;
uint8_t *buffer_read;
size_t buffer_size;
struct epoll_thread epoll_thread;
bool is_connected;
wpipc_sender_lost_conn_func_t lost_func;
void *lost_data;
struct wpipc_sender_task async_tasks[MAX_ASYNC_TASKS];
/* for subclasses */
void *user_data;
};
static int
push_sync_task (struct wpipc_sender *self,
wpipc_sender_reply_func_t func,
void *data)
{
size_t i;
for (i = MAX_ASYNC_TASKS; i > 1; i--) {
struct wpipc_sender_task *curr = self->async_tasks + i - 1;
struct wpipc_sender_task *next = self->async_tasks + i - 2;
if (next->func != NULL && curr->func == NULL) {
curr->func = func;
curr->data = data;
return i - 1;
} else if (i - 2 == 0 && next->func == NULL) {
/* empty queue */
next->func = func;
next->data = data;
return 0;
}
}
return -1;
}
static void
pop_sync_task (struct wpipc_sender *self,
bool trigger,
const uint8_t *buffer,
size_t size)
{
size_t i;
for (i = 0; i < MAX_ASYNC_TASKS; i++) {
struct wpipc_sender_task *task = self->async_tasks + i;
if (task->func != NULL) {
if (trigger)
task->func (self, buffer, size, task->data);
task->func = NULL;
return;
}
}
}
static void
socket_event_received (struct epoll_thread *t, int fd, void *data)
{
struct wpipc_sender *self = data;
/* receiver sends a reply, read it trigger corresponding task */
ssize_t size = wpipc_socket_read (fd, &self->buffer_read, &self->buffer_size);
if (size < 0) {
wpipc_log_error ("sender: could not read reply: %s", strerror(errno));
return;
}
if (size == 0) {
if (self->lost_func)
self->lost_func (self, fd, self->lost_data);
return;
}
/* trigger async task */
pop_sync_task (self, true, self->buffer_read, size);
return;
}
/* API */
struct wpipc_sender *
wpipc_sender_new (const char *path,
size_t buffer_size,
wpipc_sender_lost_conn_func_t lost_func,
void *lost_data,
size_t user_size)
{
struct wpipc_sender *self;
int name_size;
if (path == NULL)
return NULL;
self = calloc (1, sizeof (struct wpipc_sender) + user_size);
if (self == NULL)
return NULL;
self->socket_fd = -1;
/* set address */
self->addr.sun_family = AF_LOCAL;
name_size = snprintf(self->addr.sun_path, sizeof(self->addr.sun_path), "%s",
path) + 1;
if (name_size > (int) sizeof(self->addr.sun_path))
goto error;
/* create socket */
self->socket_fd =
socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC| SOCK_NONBLOCK, 0);
if (self->socket_fd < 0)
goto error;
/* alloc buffer read */
self->buffer_size = buffer_size;
self->buffer_read = calloc (buffer_size, sizeof (uint8_t));
if (self->buffer_read == NULL)
goto error;
/* init epoll thread */
if (!wpipc_epoll_thread_init (&self->epoll_thread, self->socket_fd,
socket_event_received, NULL, self))
goto error;
self->lost_func = lost_func;
self->lost_data = lost_data;
if (user_size > 0)
self->user_data = (void *)((uint8_t *)self + sizeof (struct wpipc_sender));
return self;
error:
if (self->buffer_read)
free (self->buffer_read);
if (self->socket_fd != -1)
close (self->socket_fd);
free (self);
return NULL;
}
void
wpipc_sender_free (struct wpipc_sender *self)
{
wpipc_sender_disconnect (self);
wpipc_epoll_thread_destroy (&self->epoll_thread);
free (self->buffer_read);
close (self->socket_fd);
free (self);
}
bool
wpipc_sender_connect (struct wpipc_sender *self)
{
if (wpipc_sender_is_connected (self))
return true;
if (connect(self->socket_fd, (struct sockaddr *)&self->addr,
sizeof(self->addr)) == 0 &&
wpipc_epoll_thread_start (&self->epoll_thread)) {
self->is_connected = true;
return true;
}
return false;
}
void
wpipc_sender_disconnect (struct wpipc_sender *self)
{
if (wpipc_sender_is_connected (self)) {
wpipc_epoll_thread_stop (&self->epoll_thread);
shutdown(self->socket_fd, SHUT_RDWR);
self->is_connected = false;
}
}
bool
wpipc_sender_is_connected (struct wpipc_sender *self)
{
return self->is_connected;
}
bool
wpipc_sender_send (struct wpipc_sender *self,
const uint8_t *buffer,
size_t size,
wpipc_sender_reply_func_t func,
void *data)
{
int id = -1;
if (buffer == NULL || size == 0)
return false;
if (!wpipc_sender_is_connected (self))
return false;
/* add the task in the queue */
if (func) {
id = push_sync_task (self, func, data);
if (id == -1)
return false;
}
/* write buffer and remove task if it fails */
if (wpipc_socket_write (self->socket_fd, buffer, size) <= 0) {
if (id != -1)
self->async_tasks[id].func = NULL;
return false;
}
return true;
}
void *
wpipc_sender_get_user_data (struct wpipc_sender *self)
{
return self->user_data;
}

75
lib/wpipc/sender.h Normal file
View file

@ -0,0 +1,75 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#ifndef __WPIPC_SENDER_H__
#define __WPIPC_SENDER_H__
#include <stdbool.h>
#include <stdint.h>
#include <stddef.h>
#include "defs.h"
#ifdef __cplusplus
extern "C" {
#endif
struct wpipc_sender;
typedef void (*wpipc_sender_lost_conn_func_t) (struct wpipc_sender *self,
int receiver_fd,
void *data);
typedef void (*wpipc_sender_reply_func_t) (struct wpipc_sender *self,
const uint8_t *buffer,
size_t size,
void *data);
WPIPC_API
struct wpipc_sender *
wpipc_sender_new (const char *path,
size_t buffer_size,
wpipc_sender_lost_conn_func_t lost_func,
void *lost_data,
size_t user_size);
WPIPC_API
void
wpipc_sender_free (struct wpipc_sender *self);
WPIPC_API
bool
wpipc_sender_connect (struct wpipc_sender *self);
WPIPC_API
void
wpipc_sender_disconnect (struct wpipc_sender *self);
WPIPC_API
bool
wpipc_sender_is_connected (struct wpipc_sender *self);
WPIPC_API
bool
wpipc_sender_send (struct wpipc_sender *self,
const uint8_t *buffer,
size_t size,
wpipc_sender_reply_func_t reply,
void *data);
/* for subclasses only */
WPIPC_API
void *
wpipc_sender_get_user_data (struct wpipc_sender *self);
#ifdef __cplusplus
}
#endif
#endif

258
lib/wpipc/server.c Normal file
View file

@ -0,0 +1,258 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#include <pthread.h>
#include "private.h"
#include "protocol.h"
#include "receiver.h"
#include "server.h"
#define BUFFER_SIZE 1024
#define MAX_REQUEST_HANDLERS 128
struct wpipc_server_client_handler
{
wpipc_server_client_handler_func_t handler;
void *data;
};
struct wpipc_server_request_handler
{
const char *name;
wpipc_server_request_handler_func_t handler;
void *data;
};
struct wpipc_server_priv {
pthread_mutex_t mutex;
struct wpipc_server_client_handler client_handler;
size_t n_request_handlers;
struct wpipc_server_request_handler request_handlers[MAX_REQUEST_HANDLERS];
};
static void
sender_state (struct wpipc_receiver *base,
int sender_fd,
enum wpipc_receiver_sender_state sender_state,
void *data)
{
struct wpipc_server_priv *priv = wpipc_receiver_get_user_data (base);
wpipc_log_info ("server: new state %d on client %d", sender_state, sender_fd);
pthread_mutex_lock (&priv->mutex);
if (priv->client_handler.handler)
priv->client_handler.handler ((struct wpipc_server *)base, sender_fd,
sender_state, priv->client_handler.data);
pthread_mutex_unlock (&priv->mutex);
}
static bool
handle_message (struct wpipc_receiver *base,
int sender_fd,
const uint8_t *buffer,
size_t size,
void *data)
{
struct wpipc_server_priv *priv = wpipc_receiver_get_user_data (base);
const char *name = NULL;
const struct spa_pod *args = NULL;
wpipc_log_info ("server: message from client %d received", sender_fd);
/* parse */
if (!wpipc_protocol_parse_request (buffer, size, &name, &args)) {
const char *msg = "could not parse request";
const size_t s = wpipc_protocol_calculate_reply_error_size (msg);
uint8_t b[s];
wpipc_protocol_build_reply_error (b, s, msg);
return wpipc_socket_write (sender_fd, b, s) == (ssize_t)s;
}
/* handle */
size_t i;
bool res = false;
pthread_mutex_lock (&priv->mutex);
for (i = 0; i < MAX_REQUEST_HANDLERS; i++) {
struct wpipc_server_request_handler *rh = priv->request_handlers + i;
if (rh->name != NULL && strcmp (rh->name, name) == 0 &&
rh->handler != NULL) {
res = rh->handler ((struct wpipc_server *)base, sender_fd, name, args,
rh->data);
pthread_mutex_unlock (&priv->mutex);
return res;
}
}
/* handler was not found, reply with error */
res = wpipc_server_reply_error ((struct wpipc_server *)base, sender_fd,
"request handler not found");
pthread_mutex_unlock (&priv->mutex);
return res;
}
static struct wpipc_receiver_events events = {
.sender_state = sender_state,
.handle_message = handle_message,
};
/* API */
struct wpipc_server *
wpipc_server_new (const char *path, bool start)
{
struct wpipc_server_priv * priv = NULL;
struct wpipc_receiver *base = NULL;
base = wpipc_receiver_new (path, BUFFER_SIZE, &events, NULL,
sizeof (struct wpipc_server_priv));
if (base == NULL)
return NULL;
priv = wpipc_receiver_get_user_data (base);
pthread_mutex_init (&priv->mutex, NULL);
priv->n_request_handlers = 0;
if (start)
wpipc_receiver_start (base);
return (struct wpipc_server *)base;
}
void
wpipc_server_free (struct wpipc_server *self)
{
struct wpipc_receiver *base = wpipc_server_to_receiver (self);
struct wpipc_server_priv *priv = wpipc_receiver_get_user_data (base);
pthread_mutex_destroy (&priv->mutex);
wpipc_receiver_free (base);
}
void
wpipc_server_set_client_handler (struct wpipc_server *self,
wpipc_server_client_handler_func_t handler,
void *data)
{
struct wpipc_receiver *base = wpipc_server_to_receiver (self);
struct wpipc_server_priv *priv = wpipc_receiver_get_user_data (base);
pthread_mutex_lock (&priv->mutex);
priv->client_handler.handler = handler;
priv->client_handler.data = data;
pthread_mutex_unlock (&priv->mutex);
}
void
wpipc_server_clear_client_handler (struct wpipc_server *self)
{
struct wpipc_receiver *base = wpipc_server_to_receiver (self);
struct wpipc_server_priv *priv = wpipc_receiver_get_user_data (base);
pthread_mutex_lock (&priv->mutex);
priv->client_handler.handler = NULL;
priv->client_handler.data = NULL;
pthread_mutex_unlock (&priv->mutex);
}
bool
wpipc_server_set_request_handler (struct wpipc_server *self,
const char *name,
wpipc_server_request_handler_func_t handler,
void *data)
{
struct wpipc_receiver *base = wpipc_server_to_receiver (self);
struct wpipc_server_priv *priv = wpipc_receiver_get_user_data (base);
size_t i;
/* check params */
if (name == NULL)
return false;
pthread_mutex_lock (&priv->mutex);
/* make sure handler does not exist */
for (i = 0; i < MAX_REQUEST_HANDLERS; i++) {
struct wpipc_server_request_handler *rh = priv->request_handlers + i;
if (rh->name != NULL && strcmp (rh->name, name) == 0) {
pthread_mutex_unlock (&priv->mutex);
return false;
}
}
/* set handler */
for (i = 0; i < MAX_REQUEST_HANDLERS; i++) {
struct wpipc_server_request_handler *rh = priv->request_handlers + i;
if (rh->name == NULL) {
rh->name = name;
rh->handler = handler;
rh->data = data;
pthread_mutex_unlock (&priv->mutex);
return true;
}
}
pthread_mutex_unlock (&priv->mutex);
return false;
}
void
wpipc_server_clear_request_handler (struct wpipc_server *self,
const char *name)
{
struct wpipc_receiver *base = wpipc_server_to_receiver (self);
struct wpipc_server_priv *priv = wpipc_receiver_get_user_data (base);
size_t i;
/* check params */
if (name == NULL)
return;
pthread_mutex_lock (&priv->mutex);
/* clear handler */
for (i = 0; i < MAX_REQUEST_HANDLERS; i++) {
struct wpipc_server_request_handler *rh = priv->request_handlers + i;
if (rh->name != NULL && strcmp (rh->name, name) == 0) {
rh->name = NULL;
break;
}
}
pthread_mutex_unlock (&priv->mutex);
}
bool
wpipc_server_reply_ok (struct wpipc_server *self,
int client_fd,
const struct spa_pod *value)
{
const size_t s = wpipc_protocol_calculate_reply_ok_size (value);
uint8_t b[s];
wpipc_protocol_build_reply_ok (b, s, value);
return wpipc_socket_write (client_fd, b, s) == (ssize_t)s;
}
bool
wpipc_server_reply_error (struct wpipc_server *self,
int client_fd,
const char *msg)
{
if (msg == NULL)
return false;
const size_t s = wpipc_protocol_calculate_reply_error_size (msg);
uint8_t b[s];
wpipc_protocol_build_reply_error (b, s, msg);
return wpipc_socket_write (client_fd, b, s) == (ssize_t)s;
}

85
lib/wpipc/server.h Normal file
View file

@ -0,0 +1,85 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#ifndef __WPIPC_SERVER_H__
#define __WPIPC_SERVER_H__
#include <spa/pod/pod.h>
#include "defs.h"
#include "receiver.h"
#ifdef __cplusplus
extern "C" {
#endif
#define wpipc_server_to_receiver(self) ((struct wpipc_receiver *)(self))
struct wpipc_server;
typedef void (*wpipc_server_client_handler_func_t) (struct wpipc_server *self,
int client_fd,
enum wpipc_receiver_sender_state client_state,
void *data);
typedef bool (*wpipc_server_request_handler_func_t) (struct wpipc_server *self,
int client_fd,
const char *name,
const struct spa_pod *args,
void *data);
WPIPC_API
struct wpipc_server *
wpipc_server_new (const char *path, bool start);
WPIPC_API
void
wpipc_server_free (struct wpipc_server *self);
WPIPC_API
void
wpipc_server_set_client_handler (struct wpipc_server *self,
wpipc_server_client_handler_func_t handler,
void *data);
WPIPC_API
void
wpipc_server_clear_client_handler (struct wpipc_server *self);
WPIPC_API
bool
wpipc_server_set_request_handler (struct wpipc_server *self,
const char *name,
wpipc_server_request_handler_func_t handler,
void *data);
WPIPC_API
void
wpipc_server_clear_request_handler (struct wpipc_server *self,
const char *name);
/* for request handlers only */
WPIPC_API
bool
wpipc_server_reply_ok (struct wpipc_server *self,
int client_fd,
const struct spa_pod *value);
WPIPC_API
bool
wpipc_server_reply_error (struct wpipc_server *self,
int client_fd,
const char *msg);
#ifdef __cplusplus
}
#endif
#endif

269
lib/wpipc/utils.c Normal file
View file

@ -0,0 +1,269 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <string.h>
#include <pthread.h>
#include <time.h>
#include <errno.h>
#include <assert.h>
#include "private.h"
#define MAX_POLL_EVENTS 128
#define MAX_LOG_MESSAGE 1024
/* log */
const char *wpipc_logger_level_text[] = {
[WPIPC_LOG_LEVEL_ERROR] = "E",
[WPIPC_LOG_LEVEL_WARN] = "W",
[WPIPC_LOG_LEVEL_INFO] = "I",
};
struct wpipc_logger {
enum wpipc_log_level level;
};
static const struct wpipc_logger *
wpipc_log_get_instance (void)
{
static struct wpipc_logger logger_ = { 0, };
static struct wpipc_logger* instance_ = NULL;
if (instance_ == NULL) {
char * val_str = NULL;
enum wpipc_log_level val = 0;
/* default to error */
logger_.level = WPIPC_LOG_LEVEL_WARN;
/* get level from env */
val_str = getenv ("WPIPC_DEBUG");
if (val_str && sscanf (val_str, "%u", &val) == 1 &&
val >= WPIPC_LOG_LEVEL_NONE)
logger_.level = val;
instance_ = &logger_;
}
return instance_;
}
void
wpipc_logv (enum wpipc_log_level level, const char *fmt, va_list args)
{
const struct wpipc_logger *logger = NULL;
logger = wpipc_log_get_instance ();
assert (logger);
if (logger->level >= level) {
assert (level > 0);
char msg[MAX_LOG_MESSAGE];
struct timespec time;
clock_gettime (CLOCK_REALTIME, &time);
vsnprintf (msg, MAX_LOG_MESSAGE, fmt, args);
fprintf (stderr, "[%s][%lu.%lu] %s\n", wpipc_logger_level_text[level],
time.tv_sec, time.tv_sec, msg);
}
}
void
wpipc_log (enum wpipc_log_level level, const char *fmt, ...)
{
va_list args;
va_start (args, fmt);
wpipc_logv (level, fmt, args);
va_end (args);
}
/* socket */
ssize_t
wpipc_socket_write (int fd, const uint8_t *buffer, size_t size)
{
size_t total_written = 0;
size_t n;
assert (fd >= 0);
assert (buffer != NULL);
assert (size > 0);
do {
n = write(fd, buffer, size);
if (n < size) {
if (errno == EINTR)
continue;
if (errno == EAGAIN || errno == EWOULDBLOCK)
return total_written;
return -1;
}
total_written += n;
} while (total_written < size);
return total_written;
}
ssize_t
wpipc_socket_read (int fd, uint8_t **buffer, size_t *max_size)
{
ssize_t n;
ssize_t size;
size_t offset = 0;
assert (buffer);
assert (*buffer);
assert (max_size);
assert (*max_size > 0);
again:
size = *max_size - offset;
n = read (fd, *buffer + offset, size);
if (n == 0)
return 0;
/* check for errors */
if (n < 0) {
if (errno == EINTR)
goto again;
if (errno == EAGAIN || errno == EWOULDBLOCK)
return offset;
return -1;
}
/* realloc if we need more space, and read again */
if (n >= size) {
*max_size += *max_size;
*buffer = reallocarray (*buffer, *max_size, sizeof (uint8_t));
offset += n;
goto again;
}
return offset + n;
}
/* epoll thread */
bool
wpipc_epoll_thread_init (struct epoll_thread *self,
int socket_fd,
wpipc_epoll_thread_event_funct_t sock_func,
wpipc_epoll_thread_event_funct_t other_func,
void *data)
{
struct epoll_event event;
self->socket_fd = socket_fd;
self->event_fd = -1;
self->epoll_fd = -1;
/* create event fd */
self->event_fd = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK);
if (self->event_fd == -1)
goto error;
/* create epoll fd */
self->epoll_fd = epoll_create1 (EPOLL_CLOEXEC);
if (self->epoll_fd == -1)
goto error;
/* poll socket fd */
event.events = EPOLLIN;
event.data.fd = self->socket_fd;
if (epoll_ctl (self->epoll_fd, EPOLL_CTL_ADD, self->socket_fd, &event) != 0)
goto error;
/* poll event fd */
event.events = EPOLLIN;
event.data.fd = self->event_fd;
if (epoll_ctl (self->epoll_fd, EPOLL_CTL_ADD, self->event_fd, &event) != 0)
goto error;
self->socket_event_func = sock_func;
self->other_event_func = other_func;
self->event_data = data;
return true;
error:
if (self->epoll_fd != -1)
close (self->epoll_fd);
if (self->event_fd != -1)
close (self->event_fd);
return false;
}
static void *
epoll_thread_run (void *data)
{
struct epoll_thread *self = data;
bool exit = false;
while (!exit) {
/* wait for events */
struct epoll_event ep[MAX_POLL_EVENTS];
int n = epoll_wait (self->epoll_fd, ep, MAX_POLL_EVENTS, -1);
if (n < 0) {
wpipc_log_error ("epoll_thread: failed to wait for event: %s",
strerror(errno));
continue;
}
for (int i = 0; i < n; i++) {
/* socket fd */
if (ep[i].data.fd == self->socket_fd) {
if (self->socket_event_func)
self->socket_event_func (self, ep[i].data.fd, self->event_data);
}
/* event fd */
else if (ep[i].data.fd == self->event_fd) {
uint64_t stop = 0;
ssize_t res = read (ep[i].data.fd, &stop, sizeof(uint64_t));
if (res == sizeof(uint64_t) && stop == 1)
exit = true;
}
/* other */
else {
if (self->other_event_func)
self->other_event_func (self, ep[i].data.fd, self->event_data);
}
}
}
return NULL;
}
bool
wpipc_epoll_thread_start (struct epoll_thread *self)
{
return pthread_create (&self->thread, NULL, epoll_thread_run, self) == 0;
}
void
wpipc_epoll_thread_stop (struct epoll_thread *self)
{
uint64_t value = 1;
ssize_t res = write (self->event_fd, &value, sizeof(uint64_t));
if (res == sizeof(uint64_t))
pthread_join (self->thread, NULL);
}
void
wpipc_epoll_thread_destroy (struct epoll_thread *self)
{
close (self->epoll_fd);
close (self->event_fd);
}

18
lib/wpipc/wpipc.h Normal file
View file

@ -0,0 +1,18 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#ifndef __WPIPC_H__
#define __WPIPC_H__
#include "protocol.h"
#include "receiver.h"
#include "sender.h"
#include "server.h"
#include "client.h"
#endif

View file

@ -40,10 +40,12 @@ endif
cc = meson.get_compiler('c') cc = meson.get_compiler('c')
glib_req_version = '>= 2.58' glib_req_version = '>= 2.58'
glib_dep = dependency('glib-2.0', version : glib_req_version)
gobject_dep = dependency('gobject-2.0', version : glib_req_version) gobject_dep = dependency('gobject-2.0', version : glib_req_version)
gmodule_dep = dependency('gmodule-2.0', version : glib_req_version) gmodule_dep = dependency('gmodule-2.0', version : glib_req_version)
gio_dep = dependency('gio-2.0', version : glib_req_version) gio_dep = dependency('gio-2.0', version : glib_req_version)
giounix_dep = dependency('gio-unix-2.0', version : glib_req_version) giounix_dep = dependency('gio-unix-2.0', version : glib_req_version)
spa_dep = dependency('libspa-0.2', version: '>= 0.2')
pipewire_dep = dependency('libpipewire-0.3', version: '>= 0.3.20') pipewire_dep = dependency('libpipewire-0.3', version: '>= 0.3.20')
mathlib = cc.find_library('m') mathlib = cc.find_library('m')

View file

@ -8,3 +8,14 @@ executable('audiotestsrc-play',
install: false, install: false,
dependencies : [giounix_dep, wp_dep, pipewire_dep], dependencies : [giounix_dep, wp_dep, pipewire_dep],
) )
executable('wpipc-client',
'wpipc-client.c',
c_args : [
'-D_GNU_SOURCE',
'-DG_LOG_USE_STRUCTURED',
'-DG_LOG_DOMAIN="wpipc-client"',
],
install: false,
dependencies : [wpipc_dep],
)

View file

@ -0,0 +1,104 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#include <pthread.h>
#include <assert.h>
#include <spa/pod/builder.h>
#include <wpipc/wpipc.h>
struct client_data {
struct wpipc_client *c;
pthread_mutex_t mutex;
pthread_cond_t cond;
bool reply_received;
};
static void
reply_handler (struct wpipc_sender *self, const uint8_t *buffer, size_t size, void *p)
{
struct client_data *data = p;
const char *error = NULL;
const struct spa_pod *pod = wpipc_client_send_request_finish (self, buffer, size, &error);
if (!pod)
printf ("error: %s\n", error ? error : "unknown");
else
printf ("success!\n");
/* signal reply received */
pthread_mutex_lock (&data->mutex);
data->reply_received = true;
pthread_cond_signal (&data->cond);
pthread_mutex_unlock (&data->mutex);
}
int
main (int argc, char *argv[])
{
struct client_data data;
if (argc < 2) {
printf ("usage: <server-path>\n");
return -1;
}
/* init */
data.c = wpipc_client_new (argv[1], true);
pthread_mutex_init (&data.mutex, NULL);
pthread_cond_init (&data.cond, NULL);
data.reply_received = false;
while (true) {
char str[1024];
printf ("> ");
fgets (str, 1023, stdin);
if (strncmp (str, "help", 4) == 0) {
printf ("help\tprints this message\n");
printf ("quit\texits the client\n");
printf ("send\tsends a request, usage: send <request-name> [args]\n");
} else if (strncmp (str, "quit", 4) == 0) {
printf ("exiting...\n");
break;
} else if (strncmp (str, "send", 4) == 0) {
char request_name[128];
char request_args[1024];
int n = sscanf(str, "send %s %s", request_name, request_args);
if (n <= 0)
continue;
/* send request */
if (n >= 2) {
/* TODO: for now we always create a string pod for args */
struct {
struct spa_pod_string pod;
char str[1024];
} args;
args.pod = SPA_POD_INIT_String(1024);
strncpy (args.str, request_args, 1024);
wpipc_client_send_request (data.c, request_name, (const struct spa_pod *)&args,
reply_handler, &data);
} else {
wpipc_client_send_request (data.c, request_name, NULL, reply_handler, &data);
}
/* wait for reply */
pthread_mutex_lock (&data.mutex);
while (!data.reply_received)
pthread_cond_wait (&data.cond, &data.mutex);
pthread_mutex_unlock (&data.mutex);
}
}
/* clean up */
pthread_cond_destroy (&data.cond);
pthread_mutex_destroy (&data.mutex);
wpipc_client_free (data.c);
return 0;
}

View file

@ -1,4 +1,5 @@
subdir('wp') subdir('wp')
subdir('wplua') subdir('wplua')
subdir('wpipc')
subdir('modules') subdir('modules')
subdir('examples') subdir('examples')

124
tests/wpipc/client-server.c Normal file
View file

@ -0,0 +1,124 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#include <glib.h>
#include <spa/pod/builder.h>
#include <spa/pod/parser.h>
#include <wpipc/wpipc.h>
#define TEST_ADDRESS "/tmp/wpipc-client-server"
static bool
increment_request_handler (struct wpipc_server *self, int client_fd,
const char *name, const struct spa_pod *args, void *data)
{
int32_t val = 0;
g_assert_true (spa_pod_is_int (args));
g_assert_true (spa_pod_get_int (args, &val) == 0);
struct spa_pod_int res = SPA_POD_INIT_Int (val + 1);
return wpipc_server_reply_ok (self, client_fd, (struct spa_pod *)&res);
}
static bool
error_request_handler (struct wpipc_server *self, int client_fd,
const char *name, const struct spa_pod *args, void *data)
{
return wpipc_server_reply_error (self, client_fd, "error message");
}
struct reply_data {
int32_t incremented;
const char *error;
int n_replies;
GMutex mutex;
};
static void
wait_for_reply (struct reply_data *data, int n_replies)
{
while (true) {
g_mutex_lock (&data->mutex);
if (data->n_replies == n_replies) {
g_mutex_unlock (&data->mutex);
break;
}
g_mutex_unlock (&data->mutex);
}
}
static void
reply_handler (struct wpipc_sender *self, const uint8_t *buffer, size_t size, void *p)
{
struct reply_data *data = p;
g_assert_nonnull (data);
g_mutex_lock (&data->mutex);
const struct spa_pod *pod = wpipc_client_send_request_finish (self, buffer, size, &data->error);
if (pod) {
g_assert_true (spa_pod_is_int (pod));
g_assert_true (spa_pod_get_int (pod, &data->incremented) == 0);
}
data->n_replies++;
g_mutex_unlock (&data->mutex);
}
static void
test_wpipc_server_client ()
{
struct wpipc_server *s = wpipc_server_new (TEST_ADDRESS, true);
g_assert_nonnull (s);
struct wpipc_client *c = wpipc_client_new (TEST_ADDRESS, true);
g_assert_nonnull (c);
struct reply_data data;
g_mutex_init (&data.mutex);
/* add request handlers */
g_assert_true (wpipc_server_set_request_handler (s, "INCREMENT", increment_request_handler, NULL));
g_assert_true (wpipc_server_set_request_handler (s, "ERROR", error_request_handler, NULL));
/* send an INCREMENT request of 3, and make sure the returned value is 4 */
data.incremented = -1;
data.error = NULL;
data.n_replies = 0;
struct spa_pod_int i = SPA_POD_INIT_Int (3);
g_assert_true (wpipc_client_send_request (c, "INCREMENT", (struct spa_pod *)&i, reply_handler, &data));
wait_for_reply (&data, 1);
g_assert_null (data.error);
g_assert_cmpint (data.incremented, ==, 4);
/* send an ERROR request, and make sure the returned value is an error */
data.error = NULL;
data.n_replies = 0;
g_assert_true (wpipc_client_send_request (c, "ERROR", NULL, reply_handler, &data));
wait_for_reply (&data, 1);
g_assert_cmpstr (data.error, ==, "error message");
/* send an unhandled request, and make sure the server replies with an error */
data.error = NULL;
data.n_replies = 0;
g_assert_true (wpipc_client_send_request (c, "UNHANDLED-REQUEST", NULL, reply_handler, &data));
wait_for_reply (&data, 1);
g_assert_cmpstr (data.error, ==, "request handler not found");
/* clean up */
g_mutex_clear (&data.mutex);
wpipc_client_free (c);
wpipc_server_free (s);
}
gint
main (gint argc, gchar *argv[])
{
g_test_init (&argc, &argv, NULL);
g_test_add_func ("/wpipc/wpipc-server-client", test_wpipc_server_client);
return g_test_run ();
}

26
tests/wpipc/meson.build Normal file
View file

@ -0,0 +1,26 @@
common_deps = [wpipc_dep, glib_dep]
common_env = [
'G_TEST_SRCDIR=@0@'.format(meson.current_source_dir()),
'G_TEST_BUILDDIR=@0@'.format(meson.current_build_dir()),
]
test(
'test-wpipc-sender-receiver',
executable('test-sender-receiver', 'sender-receiver.c', dependencies: common_deps),
env: common_env,
workdir : meson.current_source_dir(),
)
test(
'test-wpipc-protocol',
executable('test-protocol', 'protocol.c', dependencies: common_deps),
env: common_env,
workdir : meson.current_source_dir(),
)
test(
'test-wpipc-client-server',
executable('test-client-server', 'client-server.c', dependencies: common_deps),
env: common_env,
workdir : meson.current_source_dir(),
)

77
tests/wpipc/protocol.c Normal file
View file

@ -0,0 +1,77 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#include <glib.h>
#include <spa/pod/builder.h>
#include <spa/pod/parser.h>
#include <wpipc/wpipc.h>
static void
test_wpipc_protocol ()
{
uint8_t b[1024];
/* request null value */
{
wpipc_protocol_build_request (b, sizeof(b), "name", NULL);
const char *name = NULL;
const struct spa_pod *value = NULL;
g_assert_true (wpipc_protocol_parse_request (b, sizeof(b), &name, &value));
g_assert_cmpstr (name, ==, "name");
g_assert_true (spa_pod_is_none (value));
}
/* request */
{
struct spa_pod_int i = SPA_POD_INIT_Int (8);
wpipc_protocol_build_request (b, sizeof(b), "name", (struct spa_pod *)&i);
const char *name = NULL;
const struct spa_pod_int *value = NULL;
g_assert_true (wpipc_protocol_parse_request (b, sizeof(b), &name, (const struct spa_pod **)&value));
g_assert_cmpstr (name, ==, "name");
g_assert_cmpint (value->value, ==, 8);
}
/* reply error */
{
wpipc_protocol_build_reply_error (b, sizeof(b), "error message");
g_assert_true (wpipc_protocol_is_reply_error (b, sizeof(b)));
const char *msg = NULL;
g_assert_true (wpipc_protocol_parse_reply_error (b, sizeof(b), &msg));
g_assert_cmpstr (msg, ==, "error message");
}
/* reply ok null value */
{
wpipc_protocol_build_reply_ok (b, sizeof(b), NULL);
g_assert_true (wpipc_protocol_is_reply_ok (b, sizeof(b)));
const struct spa_pod *value = NULL;
g_assert_true (wpipc_protocol_parse_reply_ok (b, sizeof(b), &value));
g_assert_true (spa_pod_is_none (value));
}
/* reply ok */
{
struct spa_pod_int i = SPA_POD_INIT_Int (3);
wpipc_protocol_build_reply_ok (b, sizeof(b), (struct spa_pod *)&i);
g_assert_true (wpipc_protocol_is_reply_ok (b, sizeof(b)));
const struct spa_pod_int *value = NULL;
g_assert_true (wpipc_protocol_parse_reply_ok (b, sizeof(b), (const struct spa_pod **)&value));
g_assert_cmpint (value->value, ==, 3);
}
}
gint
main (gint argc, gchar *argv[])
{
g_test_init (&argc, &argv, NULL);
g_test_add_func ("/wpipc/wpipc-protocol", test_wpipc_protocol);
return g_test_run ();
}

View file

@ -0,0 +1,286 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#include <glib.h>
#include <wpipc/wpipc.h>
#define TEST_ADDRESS "/tmp/wpipc-sender-receiver"
struct event_data {
const uint8_t * expected_data;
size_t expected_size;
int connections;
int n_events;
GMutex mutex;
};
static void
wait_for_event (struct event_data *data, int n_events)
{
while (true) {
g_mutex_lock (&data->mutex);
if (data->n_events == n_events) {
g_mutex_unlock (&data->mutex);
break;
}
g_mutex_unlock (&data->mutex);
}
}
static void
sender_state_callback (struct wpipc_receiver *self, int sender_fd,
enum wpipc_receiver_sender_state sender_state, void *p)
{
struct event_data *data = p;
g_assert_nonnull (data);
g_mutex_lock (&data->mutex);
switch (sender_state) {
case WPIPC_RECEIVER_SENDER_STATE_CONNECTED:
data->connections++;
break;
case WPIPC_RECEIVER_SENDER_STATE_DISCONNECTED:
data->connections--;
break;
default:
g_assert_not_reached ();
break;
}
data->n_events++;
g_mutex_unlock (&data->mutex);
}
static void
reply_callback (struct wpipc_sender *self, const uint8_t *buffer, size_t size, void *p)
{
struct event_data *data = p;
g_assert_nonnull (data);
g_assert_nonnull (buffer);
g_mutex_lock (&data->mutex);
g_assert_cmpmem (buffer, size, data->expected_data, data->expected_size);
data->n_events++;
g_mutex_unlock (&data->mutex);
}
static void
test_wpipc_receiver_basic ()
{
struct wpipc_receiver *r = wpipc_receiver_new (TEST_ADDRESS, 16, NULL, NULL, 0);
g_assert_nonnull (r);
/* start and stop */
g_assert_false (wpipc_receiver_is_running (r));
g_assert_true (wpipc_receiver_start (r));
g_assert_true (wpipc_receiver_is_running (r));
wpipc_receiver_stop (r);
g_assert_false (wpipc_receiver_is_running (r));
/* clean up */
wpipc_receiver_free (r);
}
static void
test_wpipc_sender_basic ()
{
struct wpipc_sender *s = wpipc_sender_new (TEST_ADDRESS, 16, NULL, NULL, 0);
g_assert_nonnull (s);
/* clean up */
wpipc_sender_free (s);
}
static void
test_wpipc_sender_connect ()
{
static struct wpipc_receiver_events events = {
.sender_state = sender_state_callback,
.handle_message = NULL,
};
struct event_data data;
g_mutex_init (&data.mutex);
data.n_events = 0;
data.connections = 0;
struct wpipc_receiver *r = wpipc_receiver_new (TEST_ADDRESS, 16, &events, &data, 0);
g_assert_nonnull (r);
struct wpipc_sender *s = wpipc_sender_new (TEST_ADDRESS, 16, NULL, NULL, 0);
g_assert_nonnull (s);
/* start receiver */
g_assert_true (wpipc_receiver_start (r));
/* connect sender */
g_assert_true (wpipc_sender_connect (s));
g_assert_true (wpipc_sender_is_connected (s));
wait_for_event (&data, 1);
g_assert_cmpint (data.connections, ==, 1);
/* disconnect sender */
wpipc_sender_disconnect (s);
g_assert_false (wpipc_sender_is_connected (s));
wait_for_event (&data, 2);
g_assert_cmpint (data.connections, ==, 0);
/* stop receiver */
wpipc_receiver_stop (r);
/* clean up */
g_mutex_clear (&data.mutex);
wpipc_sender_free (s);
wpipc_receiver_free (r);
}
static void
lost_connection_handler (struct wpipc_sender *self, int receiver_fd, void *p)
{
struct event_data *data = p;
g_assert_nonnull (data);
g_mutex_lock (&data->mutex);
data->n_events++;
g_mutex_unlock (&data->mutex);
}
static void
test_wpipc_sender_lost_connection ()
{
struct event_data data;
g_mutex_init (&data.mutex);
struct wpipc_receiver *r = wpipc_receiver_new (TEST_ADDRESS, 16, NULL, NULL, 0);
g_assert_nonnull (r);
struct wpipc_sender *s = wpipc_sender_new (TEST_ADDRESS, 16, lost_connection_handler, &data, 0);
g_assert_nonnull (s);
/* connect sender */
g_assert_true (wpipc_sender_connect (s));
g_assert_true (wpipc_sender_is_connected (s));
/* destroy receiver and make sure the lost connection handler is triggered */
data.n_events = 0;
wpipc_receiver_free (r);
wait_for_event (&data, 1);
/* clean up */
g_mutex_clear (&data.mutex);
wpipc_sender_free (s);
}
static void
test_wpipc_sender_send ()
{
struct wpipc_receiver *r = wpipc_receiver_new (TEST_ADDRESS, 2, NULL, NULL, 0);
g_assert_nonnull (r);
struct wpipc_sender *s = wpipc_sender_new (TEST_ADDRESS, 2, NULL, NULL, 0);
g_assert_nonnull (s);
struct event_data data;
g_mutex_init (&data.mutex);
data.n_events = 0;
/* start receiver */
g_assert_true (wpipc_receiver_start (r));
/* connect */
g_assert_true (wpipc_sender_connect (s));
g_assert_true (wpipc_sender_is_connected (s));
/* send 1 byte message (should not realloc) */
data.n_events = 0;
data.expected_data = (const uint8_t *)"h";
data.expected_size = 1;
g_assert_true (wpipc_sender_send (s, (const uint8_t *)"h1", 1, reply_callback, &data));
wait_for_event (&data, 1);
/* send 2 bytes message (should realloc once to 4) */
data.n_events = 0;
data.expected_data = (const uint8_t *)"hi";
data.expected_size = 2;
g_assert_true (wpipc_sender_send (s, (const uint8_t *)"hi", 2, reply_callback, &data));
wait_for_event (&data, 1);
/* send 3 bytes message (should not realloc) */
data.n_events = 0;
data.expected_data = (const uint8_t *)"hii";
data.expected_size = 3;
g_assert_true (wpipc_sender_send (s, (const uint8_t *)"hii", 3, reply_callback, &data));
wait_for_event (&data, 1);
/* send 28 bytes message (should realloc 3 times: first to 8, then to 16 and finally to 32) */
data.n_events = 0;
data.expected_data = (const uint8_t *)"bigger than 16 bytes message";
data.expected_size = 28;
g_assert_true (wpipc_sender_send (s, (const uint8_t *)"bigger than 16 bytes message", 28, reply_callback, &data));
wait_for_event (&data, 1);
/* don't allow empty messages */
data.n_events = 0;
g_assert_false (wpipc_sender_send (s, (const uint8_t *)"", 0, NULL, NULL));
/* stop receiver */
wpipc_receiver_stop (r);
/* clean up */
g_mutex_clear (&data.mutex);
wpipc_sender_free (s);
wpipc_receiver_free (r);
}
static void
test_wpipc_multiple_senders_send ()
{
struct wpipc_receiver *r = wpipc_receiver_new (TEST_ADDRESS, 16, NULL, NULL, 0);
g_assert_nonnull (r);
struct wpipc_sender *senders[50];
struct event_data data;
g_mutex_init (&data.mutex);
data.n_events = 0;
/* start receiver */
g_assert_true (wpipc_receiver_start (r));
/* create and connect 50 senders */
for (int i = 0; i < 50; i++) {
senders[i] = wpipc_sender_new (TEST_ADDRESS, 16, NULL, NULL, 0);
g_assert_nonnull (senders[i]);
g_assert_true (wpipc_sender_connect (senders[i]));
g_assert_true (wpipc_sender_is_connected (senders[i]));
}
/* send 50 messages (1 per sender) */
data.n_events = 0;
data.expected_data = (const uint8_t *)"hello";
data.expected_size = 5;
for (int i = 0; i < 50; i++)
g_assert_true (wpipc_sender_send (senders[i], (const uint8_t *)"hello", 5, reply_callback, &data));
wait_for_event (&data, 50);
/* stop receiver */
wpipc_receiver_stop (r);
/* clean up */
g_mutex_clear (&data.mutex);
for (int i = 0; i < 50; i++)
wpipc_sender_free (senders[i]);
wpipc_receiver_free (r);
}
gint
main (gint argc, gchar *argv[])
{
g_test_init (&argc, &argv, NULL);
g_test_add_func ("/wpipc/receiver-basic", test_wpipc_receiver_basic);
g_test_add_func ("/wpipc/sender-basic", test_wpipc_sender_basic);
g_test_add_func ("/wpipc/sender-connect", test_wpipc_sender_connect);
g_test_add_func ("/wpipc/sender-lost-connection",
test_wpipc_sender_lost_connection);
g_test_add_func ("/wpipc/sender-send", test_wpipc_sender_send);
g_test_add_func ("/wpipc/multiple-senders-send",
test_wpipc_multiple_senders_send);
return g_test_run ();
}