remove wpipc

This component has been split out to form a separate project,
moved to https://git.automotivelinux.org/src/pipewire-ic-ipc/
This commit is contained in:
George Kiagiadakis 2021-08-19 16:53:28 +03:00
parent e6f2dc17ff
commit 8949e98d1f
29 changed files with 1 additions and 2916 deletions

View file

@ -176,16 +176,6 @@ build_on_fedora_no_docs:
variables:
BUILD_OPTIONS: -Dintrospection=enabled -Ddoc=disabled -Dsystem-lua=false
build_on_fedora_wpipc:
extends:
- .fedora
- .not_coverity
- .fdo.distribution-image@fedora
- .build
stage: build
variables:
BUILD_OPTIONS: -Dintrospection=disabled -Ddoc=disabled -Dsystem-lua=false -Dwpipc=enabled
build_on_ubuntu_with_gir:
extends:
- .ubuntu
@ -216,7 +206,7 @@ build_with_coverity:
script:
- export PATH=/opt/coverity/bin:$PATH
- meson "$WP_BUILD_DIR" . --prefix="$PREFIX"
-Dintrospection=disabled -Ddoc=disabled -Dwpipc=enabled
-Dintrospection=disabled -Ddoc=disabled
- cov-configure --config coverity_conf.xml
--comptype gcc --compiler cc --template
--xml-option=append_arg@C:--ppp_translator

View file

@ -95,16 +95,6 @@ Additional options
Directory for user systemd units.
.. option:: -Dwpipc=[enabled|disabled|auto]
Build the wpipc library and module-ipc. The default is **disabled**
**enabled** and **auto** currently mean the same thing.
wpipc is small library to send commands directly to WirePlumber; it is
only useful in specific embedded systems and not recommended for generic use
(use the PipeWire protocol instead)
Installation
------------

View file

@ -1,3 +1,2 @@
subdir('wp')
subdir('wplua')
subdir('wpipc')

View file

@ -1,84 +0,0 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#include "private.h"
#include "protocol.h"
#include "sender.h"
#include "client.h"
#define BUFFER_SIZE 1024
static void
on_lost_connection (struct wpipc_sender *self,
int receiver_fd,
void *data)
{
wpipc_log_warn ("client: lost connection with server %d", receiver_fd);
}
/* API */
struct wpipc_client *
wpipc_client_new (const char *path, bool connect)
{
struct wpipc_sender *base;
base = wpipc_sender_new (path, BUFFER_SIZE, on_lost_connection, NULL, 0);
if (connect)
wpipc_sender_connect (base);
return (struct wpipc_client *)base;
}
void
wpipc_client_free (struct wpipc_client *self)
{
struct wpipc_sender *base = wpipc_client_to_sender (self);
wpipc_sender_free (base);
}
bool
wpipc_client_send_request (struct wpipc_client *self,
const char *name,
const struct spa_pod *args,
wpipc_sender_reply_func_t reply,
void *data)
{
struct wpipc_sender *base = wpipc_client_to_sender (self);
/* check params */
if (name == NULL)
return false;
const size_t size = wpipc_protocol_calculate_request_size (name, args);
uint8_t buffer[size];
wpipc_protocol_build_request (buffer, size, name, args);
return wpipc_sender_send (base, buffer, size, reply, data);
}
const struct spa_pod *
wpipc_client_send_request_finish (struct wpipc_sender *self,
const uint8_t *buffer,
size_t size,
const char **error)
{
/* error */
if (wpipc_protocol_is_reply_error (buffer, size)) {
wpipc_protocol_parse_reply_error (buffer, size, error);
return NULL;
}
/* ok */
if (wpipc_protocol_is_reply_ok (buffer, size)) {
const struct spa_pod *value = NULL;
if (wpipc_protocol_parse_reply_ok (buffer, size, &value))
return value;
}
return NULL;
}

View file

@ -1,56 +0,0 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#ifndef __WPIPC_CLIENT_H__
#define __WPIPC_CLIENT_H__
#include <spa/pod/pod.h>
#include <stddef.h>
#include "sender.h"
#include "defs.h"
#ifdef __cplusplus
extern "C" {
#endif
#define wpipc_client_to_sender(self) ((struct wpipc_sender *)(self))
struct wpipc_client;
WPIPC_API
struct wpipc_client *
wpipc_client_new (const char *path, bool connect);
WPIPC_API
void
wpipc_client_free (struct wpipc_client *self);
WPIPC_API
bool
wpipc_client_send_request (struct wpipc_client *self,
const char *name,
const struct spa_pod *args,
wpipc_sender_reply_func_t reply,
void *data);
/* for reply handlers only */
WPIPC_API
const struct spa_pod *
wpipc_client_send_request_finish (struct wpipc_sender *self,
const uint8_t *buffer,
size_t size,
const char **error);
#ifdef __cplusplus
}
#endif
#endif

View file

@ -1,26 +0,0 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#ifndef __WPIPC_DEFS_H__
#define __WPIPC_DEFS_H__
#if defined(__GNUC__)
# define WPIPC_API_EXPORT extern __attribute__ ((visibility ("default")))
#else
# define WPIPC_API_EXPORT extern
#endif
#ifndef WPIPC_API
# define WPIPC_API WPIPC_API_EXPORT
#endif
#ifndef WPIPC_PRIVATE_API
# define WPIPC_PRIVATE_API __attribute__ ((deprecated ("Private API")))
#endif
#endif

View file

@ -1,54 +0,0 @@
if get_option('wpipc').disabled()
wpipc_dep = disabler()
subdir_done()
endif
wpipc_lib_sources = files(
'utils.c',
'protocol.c',
'receiver.c',
'sender.c',
'client.c',
'server.c',
)
wpipc_lib_headers = files(
'protocol.h',
'receiver.h',
'sender.h',
'client.h',
'server.h',
'wpipc.h',
)
wpipc_api_version = '0.1'
wpipc_so_version = '0'
wpipc_headers_dir = get_option('includedir') / 'wpipc-' + wpipc_api_version / 'wpipc'
install_headers(wpipc_lib_headers,
install_dir : wpipc_headers_dir
)
wpipc_lib = library('wpipc-' + wpipc_api_version,
wpipc_lib_sources,
c_args : [
'-D_GNU_SOURCE',
'-DG_LOG_USE_STRUCTURED',
'-DG_LOG_DOMAIN="wpipc"',
],
install: true,
dependencies : [threads_dep, spa_dep],
soversion: wpipc_so_version,
version: meson.project_version(),
)
wpipc_dep = declare_dependency(
link_with: wpipc_lib,
include_directories: wp_lib_include_dir,
dependencies: [spa_dep],
)
pkgconfig.generate(wpipc_lib,
subdirs: 'wpipc-' + wpipc_api_version
)

View file

@ -1,102 +0,0 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#ifndef __WPIPC_PRIVATE_H__
#define __WPIPC_PRIVATE_H__
#include <stdbool.h>
#include <stdint.h>
#include <pthread.h>
#include <stddef.h>
#include <sys/types.h>
#include <stdarg.h>
#include "defs.h"
#ifdef __cplusplus
extern "C" {
#endif
/* log */
#define wpipc_log_info(F, ...) \
wpipc_log(WPIPC_LOG_LEVEL_INFO, (F), ##__VA_ARGS__)
#define wpipc_log_warn(F, ...) \
wpipc_log(WPIPC_LOG_LEVEL_WARN, (F), ##__VA_ARGS__)
#define wpipc_log_error(F, ...) \
wpipc_log(WPIPC_LOG_LEVEL_ERROR, (F), ##__VA_ARGS__)
enum wpipc_log_level {
WPIPC_LOG_LEVEL_NONE = 0,
WPIPC_LOG_LEVEL_ERROR,
WPIPC_LOG_LEVEL_WARN,
WPIPC_LOG_LEVEL_INFO,
};
void
wpipc_logv (enum wpipc_log_level level,
const char *fmt,
va_list args) __attribute__ ((format (printf, 2, 0)));
void
wpipc_log (enum wpipc_log_level level,
const char *fmt,
...) __attribute__ ((format (printf, 2, 3)));
/* socket path */
int
wpipc_construct_socket_path (const char *name, char *buf, size_t buf_size);
/* socket */
ssize_t
wpipc_socket_write (int fd, const uint8_t *buffer, size_t size);
ssize_t
wpipc_socket_read (int fd, uint8_t **buffer, size_t *max_size);
/* epoll thread */
struct epoll_thread;
typedef void (*wpipc_epoll_thread_event_func_t) (struct epoll_thread *self,
int fd,
void *data);
struct epoll_thread {
int socket_fd;
int epoll_fd;
int event_fd;
pthread_t thread;
wpipc_epoll_thread_event_func_t socket_event_func;
wpipc_epoll_thread_event_func_t other_event_func;
void *event_data;
};
bool
wpipc_epoll_thread_init (struct epoll_thread *self,
int socket_fd,
wpipc_epoll_thread_event_func_t sock_func,
wpipc_epoll_thread_event_func_t other_func,
void *data);
bool
wpipc_epoll_thread_start (struct epoll_thread *self);
void
wpipc_epoll_thread_stop (struct epoll_thread *self);
void
wpipc_epoll_thread_destroy (struct epoll_thread *self);
#ifdef __cplusplus
}
#endif
#endif

View file

@ -1,224 +0,0 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#include <assert.h>
#include <spa/pod/builder.h>
#include <spa/pod/parser.h>
#include "protocol.h"
#define SIZE_PADDING 128
enum wpipc_protocol_reply_code {
REPLY_CODE_ERROR = 0,
REPLY_CODE_OK,
};
static bool
is_reply (const uint8_t *buffer, size_t size, int code)
{
const struct spa_pod *pod = (const struct spa_pod *)buffer;
struct spa_pod_parser p;
struct spa_pod_frame f;
int parsed_code = 0;
/* check if struct */
if (!spa_pod_is_struct (pod))
return false;
/* parse */
spa_pod_parser_pod (&p, pod);
spa_pod_parser_push_struct(&p, &f);
spa_pod_parser_get_int (&p, &parsed_code);
return parsed_code == code;
}
/* API */
size_t
wpipc_protocol_calculate_request_size (const char *name,
const struct spa_pod *args)
{
assert (name);
return strlen(name) + (args ? SPA_POD_SIZE(args) : 8) + SIZE_PADDING;
}
void
wpipc_protocol_build_request (uint8_t *buffer,
size_t size,
const char *name,
const struct spa_pod *args)
{
const struct spa_pod none = SPA_POD_INIT_None();
struct spa_pod_builder b;
struct spa_pod_frame f;
memset (buffer, 0, size);
if (args == NULL)
args = &none;
spa_pod_builder_init (&b, buffer, size);
spa_pod_builder_push_struct (&b, &f);
spa_pod_builder_string (&b, name);
spa_pod_builder_primitive (&b, args);
spa_pod_builder_pop(&b, &f);
}
bool
wpipc_protocol_parse_request (const uint8_t *buffer,
size_t size,
const char **name,
const struct spa_pod **args)
{
const struct spa_pod *pod = (const struct spa_pod *)buffer;
struct spa_pod_parser p;
struct spa_pod_frame f;
const char *parsed_name = NULL;
struct spa_pod *parsed_args = NULL;
/* check if struct */
if (!spa_pod_is_struct (pod))
return false;
/* parse */
spa_pod_parser_pod (&p, pod);
spa_pod_parser_push_struct(&p, &f);
spa_pod_parser_get_string (&p, &parsed_name);
spa_pod_parser_get_pod (&p, &parsed_args);
spa_pod_parser_pop(&p, &f);
/* check name and args */
if (name == NULL || args == NULL)
return false;
if (name != NULL)
*name = parsed_name;
if (args != NULL)
*args = parsed_args;
return true;
}
size_t
wpipc_protocol_calculate_reply_ok_size (const struct spa_pod *value)
{
return (value ? SPA_POD_SIZE(value) : 8) + SIZE_PADDING;
}
size_t
wpipc_protocol_calculate_reply_error_size (const char *msg)
{
assert (msg);
return strlen(msg) + SIZE_PADDING;
}
void
wpipc_protocol_build_reply_ok (uint8_t *buffer,
size_t size,
const struct spa_pod *value)
{
const struct spa_pod none = SPA_POD_INIT_None();
struct spa_pod_builder b;
struct spa_pod_frame f;
memset (buffer, 0, size);
if (value == NULL)
value = &none;
spa_pod_builder_init (&b, buffer, size);
spa_pod_builder_push_struct (&b, &f);
spa_pod_builder_int (&b, REPLY_CODE_OK);
spa_pod_builder_primitive (&b, value);
spa_pod_builder_pop(&b, &f);
}
void
wpipc_protocol_build_reply_error (uint8_t *buffer,
size_t size,
const char *msg)
{
struct spa_pod_builder b;
struct spa_pod_frame f;
memset (buffer, 0, size);
spa_pod_builder_init (&b, buffer, size);
spa_pod_builder_push_struct (&b, &f);
spa_pod_builder_int (&b, REPLY_CODE_ERROR);
spa_pod_builder_string (&b, msg);
spa_pod_builder_pop(&b, &f);
}
bool
wpipc_protocol_is_reply_ok (const uint8_t *buffer, size_t size)
{
return is_reply (buffer, size, REPLY_CODE_OK);
}
bool
wpipc_protocol_is_reply_error (const uint8_t *buffer, size_t size)
{
return is_reply (buffer, size, REPLY_CODE_ERROR);
}
bool
wpipc_protocol_parse_reply_ok (const uint8_t *buffer,
size_t size,
const struct spa_pod **value)
{
const struct spa_pod *pod = (const struct spa_pod *)buffer;
struct spa_pod_parser p;
struct spa_pod_frame f;
int parsed_code = 0;
struct spa_pod *parsed_value = NULL;
/* check if struct */
if (!spa_pod_is_struct (pod))
return false;
/* parse */
spa_pod_parser_pod (&p, pod);
spa_pod_parser_push_struct(&p, &f);
spa_pod_parser_get_int (&p, &parsed_code);
spa_pod_parser_get_pod (&p, &parsed_value);
spa_pod_parser_pop (&p, &f);
if (value != NULL)
*value = parsed_value;
return true;
}
bool
wpipc_protocol_parse_reply_error (const uint8_t *buffer,
size_t size,
const char **msg)
{
const struct spa_pod *pod = (const struct spa_pod *)buffer;
struct spa_pod_parser p;
struct spa_pod_frame f;
int parsed_code = 0;
const char *parsed_msg = NULL;
/* check if struct */
if (!spa_pod_is_struct (pod))
return false;
/* parse */
spa_pod_parser_pod (&p, pod);
spa_pod_parser_push_struct(&p, &f);
spa_pod_parser_get_int (&p, &parsed_code);
spa_pod_parser_get_string (&p, &parsed_msg);
spa_pod_parser_pop (&p, &f);
if (msg != NULL)
*msg = parsed_msg;
return true;
}

View file

@ -1,87 +0,0 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#ifndef __WPIPC_PROTOCOL_H__
#define __WPIPC_PROTOCOL_H__
#include <spa/pod/pod.h>
#include "defs.h"
#ifdef __cplusplus
extern "C" {
#endif
/* request */
WPIPC_API
size_t
wpipc_protocol_calculate_request_size (const char *name,
const struct spa_pod *args);
WPIPC_API
void
wpipc_protocol_build_request (uint8_t *buffer,
size_t size,
const char *name,
const struct spa_pod *args);
WPIPC_API
bool
wpipc_protocol_parse_request (const uint8_t *buffer,
size_t size,
const char **name,
const struct spa_pod **args);
/* reply */
WPIPC_API
size_t
wpipc_protocol_calculate_reply_ok_size (const struct spa_pod *value);
WPIPC_API
size_t
wpipc_protocol_calculate_reply_error_size (const char *msg);
WPIPC_API
void
wpipc_protocol_build_reply_ok (uint8_t *buffer,
size_t size,
const struct spa_pod *value);
WPIPC_API
void
wpipc_protocol_build_reply_error (uint8_t *buffer,
size_t size,
const char *msg);
WPIPC_API
bool
wpipc_protocol_is_reply_ok (const uint8_t *buffer, size_t size);
WPIPC_API
bool
wpipc_protocol_is_reply_error (const uint8_t *buffer, size_t size);
WPIPC_API
bool
wpipc_protocol_parse_reply_ok (const uint8_t *buffer,
size_t size,
const struct spa_pod **value);
WPIPC_API
bool
wpipc_protocol_parse_reply_error (const uint8_t *buffer,
size_t size,
const char **msg);
#ifdef __cplusplus
}
#endif
#endif

View file

@ -1,210 +0,0 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/epoll.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
#include "private.h"
#include "receiver.h"
#include "wpipc.h"
#define MAX_SENDERS 128
struct wpipc_receiver {
struct sockaddr_un addr;
int socket_fd;
uint8_t *buffer_read;
size_t buffer_size;
struct epoll_thread epoll_thread;
bool thread_running;
const struct wpipc_receiver_events *events;
void *events_data;
/* for subclasses */
void *user_data;
};
static bool
reply_message (struct wpipc_receiver *self,
int sender_fd,
uint8_t *buffer,
size_t size)
{
return self->events && self->events->handle_message ?
self->events->handle_message (self, sender_fd, buffer, size, self->events_data) :
wpipc_socket_write (sender_fd, buffer, size) == (ssize_t)size;
}
static void
socket_event_received (struct epoll_thread *t, int fd, void *data)
{
/* sender wants to connect, accept connection */
struct wpipc_receiver *self = data;
socklen_t addr_size = sizeof(self->addr);
int sender_fd = accept4 (fd, (struct sockaddr*)&self->addr, &addr_size,
SOCK_CLOEXEC | SOCK_NONBLOCK);
struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = sender_fd;
epoll_ctl (t->epoll_fd, EPOLL_CTL_ADD, sender_fd, &event);
if (self->events && self->events->sender_state)
self->events->sender_state (self, sender_fd,
WPIPC_RECEIVER_SENDER_STATE_CONNECTED, self->events_data);
}
static void
other_event_received (struct epoll_thread *t, int fd, void *data)
{
struct wpipc_receiver *self = data;
/* sender sends a message, read it and reply */
ssize_t size = wpipc_socket_read (fd, &self->buffer_read, &self->buffer_size);
if (size <= 0) {
if (size < 0)
wpipc_log_error ("receiver: could not read message: %s", strerror(errno));
/* client disconnected */
epoll_ctl (t->epoll_fd, EPOLL_CTL_DEL, fd, NULL);
close (fd);
if (self->events && self->events->sender_state)
self->events->sender_state (self, fd,
WPIPC_RECEIVER_SENDER_STATE_DISCONNECTED, self->events_data);
return;
}
/* reply */
if (!reply_message (self, fd, self->buffer_read, size))
wpipc_log_error ("receiver: could not reply message: %s", strerror(errno));
return;
}
/* API */
struct wpipc_receiver *
wpipc_receiver_new (const char *path,
size_t buffer_size,
const struct wpipc_receiver_events *events,
void *events_data,
size_t user_size)
{
struct wpipc_receiver *self;
int res;
/* check params */
if (path == NULL || buffer_size == 0)
return NULL;
self = calloc (1, sizeof (struct wpipc_receiver) + user_size);
if (self == NULL)
return NULL;
self->socket_fd = -1;
/* set address */
self->addr.sun_family = AF_LOCAL;
res = wpipc_construct_socket_path (path, self->addr.sun_path, sizeof(self->addr.sun_path));
if (res < 0)
goto error;
unlink (self->addr.sun_path);
/* create socket */
self->socket_fd =
socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0);
if (self->socket_fd < 0)
goto error;
/* bind socket */
if (bind (self->socket_fd, (struct sockaddr *)&self->addr,
sizeof(self->addr)) != 0)
goto error;
/* listen socket */
if (listen (self->socket_fd, MAX_SENDERS) != 0)
goto error;
/* alloc buffer read */
self->buffer_size = buffer_size;
self->buffer_read = calloc (buffer_size, sizeof (uint8_t));
if (self->buffer_read == NULL)
goto error;
/* init epoll thread */
if (!wpipc_epoll_thread_init (&self->epoll_thread, self->socket_fd,
socket_event_received, other_event_received, self))
goto error;
self->events = events;
self->events_data = events_data;
if (user_size > 0)
self->user_data = (void *)((uint8_t *)self + sizeof (struct wpipc_receiver));
return self;
error:
if (self->buffer_read)
free (self->buffer_read);
if (self->socket_fd != -1)
close (self->socket_fd);
free (self);
return NULL;
}
void
wpipc_receiver_free (struct wpipc_receiver *self)
{
wpipc_receiver_stop (self);
wpipc_epoll_thread_destroy (&self->epoll_thread);
free (self->buffer_read);
close (self->socket_fd);
unlink (self->addr.sun_path);
free (self);
}
bool
wpipc_receiver_start (struct wpipc_receiver *self)
{
if (wpipc_receiver_is_running (self))
return true;
self->thread_running = wpipc_epoll_thread_start (&self->epoll_thread);
return self->thread_running;
}
void
wpipc_receiver_stop (struct wpipc_receiver *self)
{
if (wpipc_receiver_is_running (self)) {
wpipc_epoll_thread_stop (&self->epoll_thread);
self->thread_running = false;
}
}
bool
wpipc_receiver_is_running (struct wpipc_receiver *self)
{
return self->thread_running;
}
void *
wpipc_receiver_get_user_data (struct wpipc_receiver *self)
{
return self->user_data;
}

View file

@ -1,78 +0,0 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#ifndef __WPIPC_RECEIVER_H__
#define __WPIPC_RECEIVER_H__
#include <stdbool.h>
#include <stdint.h>
#include <stddef.h>
#include "defs.h"
#ifdef __cplusplus
extern "C" {
#endif
struct wpipc_receiver;
enum wpipc_receiver_sender_state {
WPIPC_RECEIVER_SENDER_STATE_CONNECTED = 0,
WPIPC_RECEIVER_SENDER_STATE_DISCONNECTED
};
struct wpipc_receiver_events {
/* emitted when a sender state changes */
void (*sender_state) (struct wpipc_receiver *self,
int sender_fd,
enum wpipc_receiver_sender_state state,
void *data);
/* emitted when message is received and needs to be handled */
bool (*handle_message) (struct wpipc_receiver *self,
int sender_fd,
const uint8_t *buffer,
size_t size,
void *data);
};
WPIPC_API
struct wpipc_receiver *
wpipc_receiver_new (const char *path,
size_t buffer_size,
const struct wpipc_receiver_events *events,
void *events_data,
size_t user_size);
WPIPC_API
void
wpipc_receiver_free (struct wpipc_receiver *self);
WPIPC_API
bool
wpipc_receiver_start (struct wpipc_receiver *self);
WPIPC_API
void
wpipc_receiver_stop (struct wpipc_receiver *self);
WPIPC_API
bool
wpipc_receiver_is_running (struct wpipc_receiver *self);
/* for subclasses only */
WPIPC_API
void *
wpipc_receiver_get_user_data (struct wpipc_receiver *self);
#ifdef __cplusplus
}
#endif
#endif

View file

@ -1,276 +0,0 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/epoll.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
#include "private.h"
#include "sender.h"
#define MAX_ASYNC_TASKS 128
struct wpipc_sender_task {
wpipc_sender_reply_func_t func;
void *data;
};
struct wpipc_sender {
struct sockaddr_un addr;
int socket_fd;
uint8_t *buffer_read;
size_t buffer_size;
struct epoll_thread epoll_thread;
bool is_connected;
wpipc_sender_lost_conn_func_t lost_func;
void *lost_data;
bool lost_connection;
struct wpipc_sender_task async_tasks[MAX_ASYNC_TASKS];
/* for subclasses */
void *user_data;
};
static int
push_sync_task (struct wpipc_sender *self,
wpipc_sender_reply_func_t func,
void *data)
{
size_t i;
for (i = MAX_ASYNC_TASKS; i > 1; i--) {
struct wpipc_sender_task *curr = self->async_tasks + i - 1;
struct wpipc_sender_task *next = self->async_tasks + i - 2;
if (next->func != NULL && curr->func == NULL) {
curr->func = func;
curr->data = data;
return i - 1;
} else if (i - 2 == 0 && next->func == NULL) {
/* empty queue */
next->func = func;
next->data = data;
return 0;
}
}
return -1;
}
static void
pop_sync_task (struct wpipc_sender *self,
bool trigger,
bool all,
const uint8_t *buffer,
size_t size)
{
size_t i;
for (i = 0; i < MAX_ASYNC_TASKS; i++) {
struct wpipc_sender_task *task = self->async_tasks + i;
if (task->func != NULL) {
if (trigger)
task->func (self, buffer, size, task->data);
task->func = NULL;
if (!all)
return;
}
}
}
static void
socket_event_received (struct epoll_thread *t, int fd, void *data)
{
struct wpipc_sender *self = data;
/* receiver sends a reply, read it trigger corresponding task */
ssize_t size = wpipc_socket_read (fd, &self->buffer_read, &self->buffer_size);
if (size <= 0) {
if (size < 0)
wpipc_log_error ("sender: could not read reply: %s", strerror(errno));
/* receiver disconnected */
epoll_ctl (t->epoll_fd, EPOLL_CTL_DEL, fd, NULL);
shutdown(self->socket_fd, SHUT_RDWR);
self->is_connected = false;
self->lost_connection = true;
if (self->lost_func)
self->lost_func (self, fd, self->lost_data);
/* clear queue */
pop_sync_task (self, true, true, NULL, 0);
return;
}
/* trigger async task */
pop_sync_task (self, true, false, self->buffer_read, size);
return;
}
/* API */
struct wpipc_sender *
wpipc_sender_new (const char *path,
size_t buffer_size,
wpipc_sender_lost_conn_func_t lost_func,
void *lost_data,
size_t user_size)
{
struct wpipc_sender *self;
int res;
if (path == NULL)
return NULL;
self = calloc (1, sizeof (struct wpipc_sender) + user_size);
if (self == NULL)
return NULL;
self->socket_fd = -1;
/* set address */
self->addr.sun_family = AF_LOCAL;
res = wpipc_construct_socket_path (path, self->addr.sun_path, sizeof(self->addr.sun_path));
if (res < 0)
goto error;
/* create socket */
self->socket_fd =
socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC| SOCK_NONBLOCK, 0);
if (self->socket_fd < 0)
goto error;
/* alloc buffer read */
self->buffer_size = buffer_size;
self->buffer_read = calloc (buffer_size, sizeof (uint8_t));
if (self->buffer_read == NULL)
goto error;
/* init epoll thread */
if (!wpipc_epoll_thread_init (&self->epoll_thread, self->socket_fd,
socket_event_received, NULL, self))
goto error;
self->lost_func = lost_func;
self->lost_data = lost_data;
self->lost_connection = false;
if (user_size > 0)
self->user_data = (void *)((uint8_t *)self + sizeof (struct wpipc_sender));
return self;
error:
if (self->buffer_read)
free (self->buffer_read);
if (self->socket_fd != -1)
close (self->socket_fd);
free (self);
return NULL;
}
void
wpipc_sender_free (struct wpipc_sender *self)
{
wpipc_sender_disconnect (self);
wpipc_epoll_thread_destroy (&self->epoll_thread);
free (self->buffer_read);
close (self->socket_fd);
free (self);
}
bool
wpipc_sender_connect (struct wpipc_sender *self)
{
if (wpipc_sender_is_connected (self))
return true;
/* if connection was lost, re-init epoll thread with new socket */
if (self->lost_connection) {
wpipc_epoll_thread_stop (&self->epoll_thread);
wpipc_epoll_thread_destroy (&self->epoll_thread);
self->socket_fd =
socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC| SOCK_NONBLOCK, 0);
if (self->socket_fd < 0)
return false;
if (!wpipc_epoll_thread_init (&self->epoll_thread, self->socket_fd,
socket_event_received, NULL, self)) {
close (self->socket_fd);
return false;
}
self->lost_connection = false;
}
/* connect */
if (connect(self->socket_fd, (struct sockaddr *)&self->addr,
sizeof(self->addr)) == 0 &&
wpipc_epoll_thread_start (&self->epoll_thread)) {
self->is_connected = true;
return true;
}
return false;
}
void
wpipc_sender_disconnect (struct wpipc_sender *self)
{
if (wpipc_sender_is_connected (self)) {
wpipc_epoll_thread_stop (&self->epoll_thread);
shutdown(self->socket_fd, SHUT_RDWR);
self->is_connected = false;
}
}
bool
wpipc_sender_is_connected (struct wpipc_sender *self)
{
return self->is_connected;
}
bool
wpipc_sender_send (struct wpipc_sender *self,
const uint8_t *buffer,
size_t size,
wpipc_sender_reply_func_t func,
void *data)
{
int id = -1;
if (buffer == NULL || size == 0)
return false;
if (!wpipc_sender_is_connected (self))
return false;
/* add the task in the queue */
if (func) {
id = push_sync_task (self, func, data);
if (id == -1)
return false;
}
/* write buffer and remove task if it fails */
if (wpipc_socket_write (self->socket_fd, buffer, size) <= 0) {
if (id != -1)
self->async_tasks[id].func = NULL;
return false;
}
return true;
}
void *
wpipc_sender_get_user_data (struct wpipc_sender *self)
{
return self->user_data;
}

View file

@ -1,75 +0,0 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#ifndef __WPIPC_SENDER_H__
#define __WPIPC_SENDER_H__
#include <stdbool.h>
#include <stdint.h>
#include <stddef.h>
#include "defs.h"
#ifdef __cplusplus
extern "C" {
#endif
struct wpipc_sender;
typedef void (*wpipc_sender_lost_conn_func_t) (struct wpipc_sender *self,
int receiver_fd,
void *data);
typedef void (*wpipc_sender_reply_func_t) (struct wpipc_sender *self,
const uint8_t *buffer,
size_t size,
void *data);
WPIPC_API
struct wpipc_sender *
wpipc_sender_new (const char *path,
size_t buffer_size,
wpipc_sender_lost_conn_func_t lost_func,
void *lost_data,
size_t user_size);
WPIPC_API
void
wpipc_sender_free (struct wpipc_sender *self);
WPIPC_API
bool
wpipc_sender_connect (struct wpipc_sender *self);
WPIPC_API
void
wpipc_sender_disconnect (struct wpipc_sender *self);
WPIPC_API
bool
wpipc_sender_is_connected (struct wpipc_sender *self);
WPIPC_API
bool
wpipc_sender_send (struct wpipc_sender *self,
const uint8_t *buffer,
size_t size,
wpipc_sender_reply_func_t reply,
void *data);
/* for subclasses only */
WPIPC_API
void *
wpipc_sender_get_user_data (struct wpipc_sender *self);
#ifdef __cplusplus
}
#endif
#endif

View file

@ -1,261 +0,0 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#include <pthread.h>
#include "private.h"
#include "protocol.h"
#include "receiver.h"
#include "server.h"
#define BUFFER_SIZE 1024
#define MAX_REQUEST_HANDLERS 128
struct wpipc_server_client_handler
{
wpipc_server_client_handler_func_t handler;
void *data;
};
struct wpipc_server_request_handler
{
const char *name;
wpipc_server_request_handler_func_t handler;
void *data;
};
struct wpipc_server_priv {
pthread_mutex_t mutex;
struct wpipc_server_client_handler client_handler;
size_t n_request_handlers;
struct wpipc_server_request_handler request_handlers[MAX_REQUEST_HANDLERS];
};
static void
sender_state (struct wpipc_receiver *base,
int sender_fd,
enum wpipc_receiver_sender_state sender_state,
void *data)
{
struct wpipc_server_priv *priv = wpipc_receiver_get_user_data (base);
wpipc_log_info ("server: new state %d on client %d", sender_state, sender_fd);
pthread_mutex_lock (&priv->mutex);
if (priv->client_handler.handler)
priv->client_handler.handler ((struct wpipc_server *)base, sender_fd,
sender_state, priv->client_handler.data);
pthread_mutex_unlock (&priv->mutex);
}
static bool
handle_message (struct wpipc_receiver *base,
int sender_fd,
const uint8_t *buffer,
size_t size,
void *data)
{
struct wpipc_server_priv *priv = wpipc_receiver_get_user_data (base);
const char *name = NULL;
const struct spa_pod *args = NULL;
wpipc_log_info ("server: message from client %d received", sender_fd);
/* parse */
if (!wpipc_protocol_parse_request (buffer, size, &name, &args)) {
const char *msg = "could not parse request";
const size_t s = wpipc_protocol_calculate_reply_error_size (msg);
uint8_t b[s];
wpipc_protocol_build_reply_error (b, s, msg);
return wpipc_socket_write (sender_fd, b, s) == (ssize_t)s;
}
/* handle */
size_t i;
bool res = false;
pthread_mutex_lock (&priv->mutex);
for (i = 0; i < MAX_REQUEST_HANDLERS; i++) {
struct wpipc_server_request_handler *rh = priv->request_handlers + i;
if (rh->name != NULL && strcmp (rh->name, name) == 0 &&
rh->handler != NULL) {
res = rh->handler ((struct wpipc_server *)base, sender_fd, name, args,
rh->data);
pthread_mutex_unlock (&priv->mutex);
return res;
}
}
/* handler was not found, reply with error */
res = wpipc_server_reply_error ((struct wpipc_server *)base, sender_fd,
"request handler not found");
pthread_mutex_unlock (&priv->mutex);
return res;
}
static struct wpipc_receiver_events events = {
.sender_state = sender_state,
.handle_message = handle_message,
};
/* API */
struct wpipc_server *
wpipc_server_new (const char *path, bool start)
{
struct wpipc_server_priv * priv = NULL;
struct wpipc_receiver *base = NULL;
base = wpipc_receiver_new (path, BUFFER_SIZE, &events, NULL,
sizeof (struct wpipc_server_priv));
if (base == NULL)
return NULL;
priv = wpipc_receiver_get_user_data (base);
pthread_mutex_init (&priv->mutex, NULL);
priv->n_request_handlers = 0;
if (start && !wpipc_receiver_start (base)) {
wpipc_log_error ("failed to start receiver");
wpipc_server_free ((struct wpipc_server *)base);
return NULL;
}
return (struct wpipc_server *)base;
}
void
wpipc_server_free (struct wpipc_server *self)
{
struct wpipc_receiver *base = wpipc_server_to_receiver (self);
struct wpipc_server_priv *priv = wpipc_receiver_get_user_data (base);
pthread_mutex_destroy (&priv->mutex);
wpipc_receiver_free (base);
}
void
wpipc_server_set_client_handler (struct wpipc_server *self,
wpipc_server_client_handler_func_t handler,
void *data)
{
struct wpipc_receiver *base = wpipc_server_to_receiver (self);
struct wpipc_server_priv *priv = wpipc_receiver_get_user_data (base);
pthread_mutex_lock (&priv->mutex);
priv->client_handler.handler = handler;
priv->client_handler.data = data;
pthread_mutex_unlock (&priv->mutex);
}
void
wpipc_server_clear_client_handler (struct wpipc_server *self)
{
struct wpipc_receiver *base = wpipc_server_to_receiver (self);
struct wpipc_server_priv *priv = wpipc_receiver_get_user_data (base);
pthread_mutex_lock (&priv->mutex);
priv->client_handler.handler = NULL;
priv->client_handler.data = NULL;
pthread_mutex_unlock (&priv->mutex);
}
bool
wpipc_server_set_request_handler (struct wpipc_server *self,
const char *name,
wpipc_server_request_handler_func_t handler,
void *data)
{
struct wpipc_receiver *base = wpipc_server_to_receiver (self);
struct wpipc_server_priv *priv = wpipc_receiver_get_user_data (base);
size_t i;
/* check params */
if (name == NULL)
return false;
pthread_mutex_lock (&priv->mutex);
/* make sure handler does not exist */
for (i = 0; i < MAX_REQUEST_HANDLERS; i++) {
struct wpipc_server_request_handler *rh = priv->request_handlers + i;
if (rh->name != NULL && strcmp (rh->name, name) == 0) {
pthread_mutex_unlock (&priv->mutex);
return false;
}
}
/* set handler */
for (i = 0; i < MAX_REQUEST_HANDLERS; i++) {
struct wpipc_server_request_handler *rh = priv->request_handlers + i;
if (rh->name == NULL) {
rh->name = name;
rh->handler = handler;
rh->data = data;
pthread_mutex_unlock (&priv->mutex);
return true;
}
}
pthread_mutex_unlock (&priv->mutex);
return false;
}
void
wpipc_server_clear_request_handler (struct wpipc_server *self,
const char *name)
{
struct wpipc_receiver *base = wpipc_server_to_receiver (self);
struct wpipc_server_priv *priv = wpipc_receiver_get_user_data (base);
size_t i;
/* check params */
if (name == NULL)
return;
pthread_mutex_lock (&priv->mutex);
/* clear handler */
for (i = 0; i < MAX_REQUEST_HANDLERS; i++) {
struct wpipc_server_request_handler *rh = priv->request_handlers + i;
if (rh->name != NULL && strcmp (rh->name, name) == 0) {
rh->name = NULL;
break;
}
}
pthread_mutex_unlock (&priv->mutex);
}
bool
wpipc_server_reply_ok (struct wpipc_server *self,
int client_fd,
const struct spa_pod *value)
{
const size_t s = wpipc_protocol_calculate_reply_ok_size (value);
uint8_t b[s];
wpipc_protocol_build_reply_ok (b, s, value);
return wpipc_socket_write (client_fd, b, s) == (ssize_t)s;
}
bool
wpipc_server_reply_error (struct wpipc_server *self,
int client_fd,
const char *msg)
{
if (msg == NULL)
return false;
const size_t s = wpipc_protocol_calculate_reply_error_size (msg);
uint8_t b[s];
wpipc_protocol_build_reply_error (b, s, msg);
return wpipc_socket_write (client_fd, b, s) == (ssize_t)s;
}

View file

@ -1,85 +0,0 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#ifndef __WPIPC_SERVER_H__
#define __WPIPC_SERVER_H__
#include <spa/pod/pod.h>
#include "defs.h"
#include "receiver.h"
#ifdef __cplusplus
extern "C" {
#endif
#define wpipc_server_to_receiver(self) ((struct wpipc_receiver *)(self))
struct wpipc_server;
typedef void (*wpipc_server_client_handler_func_t) (struct wpipc_server *self,
int client_fd,
enum wpipc_receiver_sender_state client_state,
void *data);
typedef bool (*wpipc_server_request_handler_func_t) (struct wpipc_server *self,
int client_fd,
const char *name,
const struct spa_pod *args,
void *data);
WPIPC_API
struct wpipc_server *
wpipc_server_new (const char *path, bool start);
WPIPC_API
void
wpipc_server_free (struct wpipc_server *self);
WPIPC_API
void
wpipc_server_set_client_handler (struct wpipc_server *self,
wpipc_server_client_handler_func_t handler,
void *data);
WPIPC_API
void
wpipc_server_clear_client_handler (struct wpipc_server *self);
WPIPC_API
bool
wpipc_server_set_request_handler (struct wpipc_server *self,
const char *name,
wpipc_server_request_handler_func_t handler,
void *data);
WPIPC_API
void
wpipc_server_clear_request_handler (struct wpipc_server *self,
const char *name);
/* for request handlers only */
WPIPC_API
bool
wpipc_server_reply_ok (struct wpipc_server *self,
int client_fd,
const struct spa_pod *value);
WPIPC_API
bool
wpipc_server_reply_error (struct wpipc_server *self,
int client_fd,
const char *msg);
#ifdef __cplusplus
}
#endif
#endif

View file

@ -1,311 +0,0 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <string.h>
#include <pthread.h>
#include <time.h>
#include <errno.h>
#include <assert.h>
#include <pwd.h>
#include "private.h"
#define MAX_POLL_EVENTS 128
#define MAX_LOG_MESSAGE 1024
/* log */
const char *wpipc_logger_level_text[] = {
[WPIPC_LOG_LEVEL_ERROR] = "E",
[WPIPC_LOG_LEVEL_WARN] = "W",
[WPIPC_LOG_LEVEL_INFO] = "I",
};
struct wpipc_logger {
enum wpipc_log_level level;
};
static const struct wpipc_logger *
wpipc_log_get_instance (void)
{
static struct wpipc_logger logger_ = { 0, };
static struct wpipc_logger* instance_ = NULL;
if (instance_ == NULL) {
char * val_str = NULL;
enum wpipc_log_level val = 0;
/* default to error */
logger_.level = WPIPC_LOG_LEVEL_WARN;
/* get level from env */
val_str = getenv ("WPIPC_DEBUG");
if (val_str && sscanf (val_str, "%u", &val) == 1 &&
val >= WPIPC_LOG_LEVEL_NONE)
logger_.level = val;
instance_ = &logger_;
}
return instance_;
}
void
wpipc_logv (enum wpipc_log_level level, const char *fmt, va_list args)
{
const struct wpipc_logger *logger = NULL;
logger = wpipc_log_get_instance ();
assert (logger);
if (logger->level >= level) {
assert (level > 0);
char msg[MAX_LOG_MESSAGE];
struct timespec time;
clock_gettime (CLOCK_REALTIME, &time);
vsnprintf (msg, MAX_LOG_MESSAGE, fmt, args);
fprintf (stderr, "[%s][%lu.%lu] %s\n", wpipc_logger_level_text[level],
time.tv_sec, time.tv_sec, msg);
}
}
void
wpipc_log (enum wpipc_log_level level, const char *fmt, ...)
{
va_list args;
va_start (args, fmt);
wpipc_logv (level, fmt, args);
va_end (args);
}
/* socket path */
int
wpipc_construct_socket_path (const char *name, char *buf, size_t buf_size)
{
bool path_is_absolute;
const char *runtime_dir = NULL;
struct passwd pwd, *result = NULL;
char buffer[4096];
int name_size;
path_is_absolute = name[0] == '/';
if (!path_is_absolute) {
runtime_dir = getenv("PIPEWIRE_RUNTIME_DIR");
if (runtime_dir == NULL)
runtime_dir = getenv("XDG_RUNTIME_DIR");
if (runtime_dir == NULL)
runtime_dir = getenv("HOME");
if (runtime_dir == NULL)
runtime_dir = getenv("USERPROFILE");
if (runtime_dir == NULL) {
if (getpwuid_r(getuid(), &pwd, buffer, sizeof(buffer), &result) == 0)
runtime_dir = result ? result->pw_dir : NULL;
}
}
if (runtime_dir == NULL && !path_is_absolute)
return -ENOENT;
if (path_is_absolute)
name_size = snprintf (buf, buf_size, "%s", name) + 1;
else
name_size = snprintf (buf, buf_size, "%s/%s", runtime_dir, name) + 1;
if (name_size > (int) buf_size)
return -ENAMETOOLONG;
return 0;
}
/* socket */
ssize_t
wpipc_socket_write (int fd, const uint8_t *buffer, size_t size)
{
size_t total_written = 0;
size_t n;
assert (fd >= 0);
assert (buffer != NULL);
assert (size > 0);
do {
n = write(fd, buffer, size);
if (n < size) {
if (errno == EINTR)
continue;
if (errno == EAGAIN || errno == EWOULDBLOCK)
return total_written;
return -1;
}
total_written += n;
} while (total_written < size);
return total_written;
}
ssize_t
wpipc_socket_read (int fd, uint8_t **buffer, size_t *max_size)
{
ssize_t n;
ssize_t size;
size_t offset = 0;
assert (buffer);
assert (*buffer);
assert (max_size);
assert (*max_size > 0);
again:
size = *max_size - offset;
n = read (fd, *buffer + offset, size);
if (n == 0)
return 0;
/* check for errors */
if (n < 0) {
if (errno == EINTR)
goto again;
if (errno == EAGAIN || errno == EWOULDBLOCK)
return offset;
return -1;
}
/* realloc if we need more space, and read again */
if (n >= size) {
*max_size += *max_size;
*buffer = reallocarray (*buffer, *max_size, sizeof (uint8_t));
offset += n;
goto again;
}
return offset + n;
}
/* epoll thread */
bool
wpipc_epoll_thread_init (struct epoll_thread *self,
int socket_fd,
wpipc_epoll_thread_event_func_t sock_func,
wpipc_epoll_thread_event_func_t other_func,
void *data)
{
struct epoll_event event;
self->socket_fd = socket_fd;
self->event_fd = -1;
self->epoll_fd = -1;
/* create event fd */
self->event_fd = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK);
if (self->event_fd == -1)
goto error;
/* create epoll fd */
self->epoll_fd = epoll_create1 (EPOLL_CLOEXEC);
if (self->epoll_fd == -1)
goto error;
/* poll socket fd */
event.events = EPOLLIN;
event.data.fd = self->socket_fd;
if (epoll_ctl (self->epoll_fd, EPOLL_CTL_ADD, self->socket_fd, &event) != 0)
goto error;
/* poll event fd */
event.events = EPOLLIN;
event.data.fd = self->event_fd;
if (epoll_ctl (self->epoll_fd, EPOLL_CTL_ADD, self->event_fd, &event) != 0)
goto error;
self->socket_event_func = sock_func;
self->other_event_func = other_func;
self->event_data = data;
return true;
error:
if (self->epoll_fd != -1)
close (self->epoll_fd);
if (self->event_fd != -1)
close (self->event_fd);
return false;
}
static void *
epoll_thread_run (void *data)
{
struct epoll_thread *self = data;
bool exit = false;
while (!exit) {
/* wait for events */
struct epoll_event ep[MAX_POLL_EVENTS];
int n = epoll_wait (self->epoll_fd, ep, MAX_POLL_EVENTS, -1);
if (n < 0) {
wpipc_log_error ("epoll_thread: failed to wait for event: %s",
strerror(errno));
continue;
}
for (int i = 0; i < n; i++) {
/* socket fd */
if (ep[i].data.fd == self->socket_fd) {
if (self->socket_event_func)
self->socket_event_func (self, ep[i].data.fd, self->event_data);
}
/* event fd */
else if (ep[i].data.fd == self->event_fd) {
uint64_t stop = 0;
ssize_t res = read (ep[i].data.fd, &stop, sizeof(uint64_t));
if (res == sizeof(uint64_t) && stop == 1)
exit = true;
}
/* other */
else {
if (self->other_event_func)
self->other_event_func (self, ep[i].data.fd, self->event_data);
}
}
}
return NULL;
}
bool
wpipc_epoll_thread_start (struct epoll_thread *self)
{
return pthread_create (&self->thread, NULL, epoll_thread_run, self) == 0;
}
void
wpipc_epoll_thread_stop (struct epoll_thread *self)
{
uint64_t value = 1;
ssize_t res = write (self->event_fd, &value, sizeof(uint64_t));
if (res == sizeof(uint64_t))
pthread_join (self->thread, NULL);
}
void
wpipc_epoll_thread_destroy (struct epoll_thread *self)
{
close (self->epoll_fd);
close (self->event_fd);
}

View file

@ -1,18 +0,0 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#ifndef __WPIPC_H__
#define __WPIPC_H__
#include "protocol.h"
#include "receiver.h"
#include "sender.h"
#include "server.h"
#include "client.h"
#endif

View file

@ -22,7 +22,5 @@ option('systemd-system-unit-dir',
option('systemd-user-unit-dir',
type : 'string',
description : 'Directory for user systemd units')
option('wpipc', type : 'feature', value : 'disabled',
description: 'Build the wpipc library and module-ipc')
option('glib-supp', type : 'string', value : '',
description: 'The glib.supp valgrind suppressions file to be used when running valgrind')

View file

@ -176,16 +176,3 @@ shared_library(
install_dir : wireplumber_module_dir,
dependencies : [wp_dep, pipewire_dep],
)
if wpipc_dep.found()
shared_library(
'wireplumber-module-ipc',
[
'module-ipc.c',
],
c_args : [common_c_args, '-DG_LOG_DOMAIN="m-ipc"'],
install : true,
install_dir : wireplumber_module_dir,
dependencies : [wp_dep, pipewire_dep, wpipc_dep],
)
endif

View file

@ -1,258 +0,0 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#include <wpipc/wpipc.h>
#include <wp/wp.h>
#define SERVER_SUSPEND_REQUEST_NAME "SUSPEND"
#define SERVER_RESUME_REQUEST_NAME "RESUME"
#define METADATA_KEY "suspend.playback"
enum {
PROP_0,
PROP_PATH,
};
struct _WpIpcPlugin
{
WpPlugin parent;
gchar *path;
GHashTable *suspended_clients;
struct wpipc_server *server;
WpObjectManager *metadata_om;
};
G_DECLARE_FINAL_TYPE (WpIpcPlugin, wp_ipc_plugin,
WP, IPC_PLUGIN, WpPlugin)
G_DEFINE_TYPE (WpIpcPlugin, wp_ipc_plugin, WP_TYPE_PLUGIN)
struct idle_data {
WpIpcPlugin *self;
char *request_name;
int client_id;
};
static struct idle_data *
idle_data_new (WpIpcPlugin *self, const char *request_name, int client_id)
{
struct idle_data *data = NULL;
g_return_val_if_fail (self, NULL);
g_return_val_if_fail (request_name, NULL);
data = g_new0 (struct idle_data, 1);
data->self = g_object_ref (self);
data->request_name = g_strdup (request_name);
data->client_id = client_id;
return data;
}
static void
idle_data_free (gpointer p)
{
struct idle_data *data = p;
g_return_if_fail (data);
g_return_if_fail (data->self);
g_return_if_fail (data->request_name);
g_free (data->request_name);
g_object_unref (data->self);
g_free (data);
}
static void
wp_ipc_plugin_init (WpIpcPlugin * self)
{
}
static void
wp_ipc_plugin_set_metadata (WpIpcPlugin * self, gboolean suspend) {
g_autoptr (WpMetadata) metadata = NULL;
metadata = wp_object_manager_lookup (self->metadata_om, WP_TYPE_METADATA,
NULL);
if (!metadata) {
wp_warning_object (self, "could not find default metadata");
return;
}
wp_info_object (self, METADATA_KEY " metadata set to %d", suspend);
wp_metadata_set (metadata, 0, METADATA_KEY, "Spa:Bool", suspend ? "1" : "0");
}
static gboolean
idle_request_handler (struct idle_data *data)
{
WpIpcPlugin *self = data->self;
gpointer key = GINT_TO_POINTER (data->client_id);
/* Suspend */
if (g_strcmp0 (data->request_name, SERVER_SUSPEND_REQUEST_NAME) == 0) {
if (!g_hash_table_contains (self->suspended_clients, key))
g_hash_table_insert (self->suspended_clients, key, NULL);
if (g_hash_table_size (self->suspended_clients) == 1)
wp_ipc_plugin_set_metadata (self, TRUE);
}
/* Resume */
else if (g_strcmp0 (data->request_name, SERVER_RESUME_REQUEST_NAME) == 0) {
if (g_hash_table_contains (self->suspended_clients, key))
g_hash_table_remove (self->suspended_clients, key);
if (g_hash_table_size (self->suspended_clients) == 0)
wp_ipc_plugin_set_metadata (self, FALSE);
}
return G_SOURCE_REMOVE;
}
static bool
request_handler (struct wpipc_server *s, int client_fd,
const char *name, const struct spa_pod *args, void *data)
{
WpIpcPlugin * self = data;
g_autoptr (WpCore) core = wp_object_get_core (WP_OBJECT (self));
if (!core)
return wpipc_server_reply_error (s, client_fd, "core not valid");
wp_core_idle_add (core, NULL, (GSourceFunc) idle_request_handler,
idle_data_new (self, name, client_fd), idle_data_free);
return wpipc_server_reply_ok (s, client_fd, NULL);
}
static void
client_handler (struct wpipc_server *s, int client_fd,
enum wpipc_receiver_sender_state client_state, void *data)
{
WpIpcPlugin * self = data;
switch (client_state) {
case WPIPC_RECEIVER_SENDER_STATE_CONNECTED:
wp_info_object (self, "client connected %d", client_fd);
break;
case WPIPC_RECEIVER_SENDER_STATE_DISCONNECTED: {
g_autoptr (WpCore) core = wp_object_get_core (WP_OBJECT (self));
if (core)
wp_core_idle_add (core, NULL, (GSourceFunc) idle_request_handler,
idle_data_new (self, SERVER_RESUME_REQUEST_NAME, client_fd),
idle_data_free);
wp_info_object (self, "client disconnected %d", client_fd);
break;
}
default:
break;
}
}
static void
wp_ipc_plugin_enable (WpPlugin * plugin, WpTransition * transition)
{
WpIpcPlugin * self = WP_IPC_PLUGIN (plugin);
g_autoptr (WpCore) core = wp_object_get_core (WP_OBJECT (plugin));
g_return_if_fail (core);
g_return_if_fail (self->path);
/* Init suspended clients table */
self->suspended_clients = g_hash_table_new_full (g_direct_hash,
g_direct_equal, NULL, NULL);
/* Create the IPC server, and handle PLAY and STOP requests */
self->server = wpipc_server_new (self->path, TRUE);
g_return_if_fail (self->server);
wpipc_server_set_client_handler (self->server, client_handler, self);
wpipc_server_set_request_handler (self->server, SERVER_SUSPEND_REQUEST_NAME,
request_handler, self);
wpipc_server_set_request_handler (self->server, SERVER_RESUME_REQUEST_NAME,
request_handler, self);
/* Create the metadata object manager */
self->metadata_om = wp_object_manager_new ();
wp_object_manager_add_interest (self->metadata_om, WP_TYPE_METADATA,
WP_CONSTRAINT_TYPE_PW_GLOBAL_PROPERTY, "metadata.name", "=s", "default",
NULL);
wp_object_manager_request_object_features (self->metadata_om,
WP_TYPE_METADATA, WP_OBJECT_FEATURES_ALL);
wp_core_install_object_manager (core, self->metadata_om);
wp_object_update_features (WP_OBJECT (self), WP_PLUGIN_FEATURE_ENABLED, 0);
}
static void
wp_ipc_plugin_disable (WpPlugin * plugin)
{
WpIpcPlugin * self = WP_IPC_PLUGIN (plugin);
g_clear_object (&self->metadata_om);
g_clear_pointer (&self->server, wpipc_server_free);
g_clear_pointer (&self->suspended_clients, g_hash_table_unref);
}
static void
wp_ipc_plugin_set_property (GObject * object, guint property_id,
const GValue * value, GParamSpec * pspec)
{
WpIpcPlugin * self = WP_IPC_PLUGIN (object);
switch (property_id) {
case PROP_PATH:
g_clear_pointer (&self->path, g_free);
self->path = g_value_dup_string (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
}
}
static void
wp_ipc_plugin_finalize (GObject * object)
{
WpIpcPlugin * self = WP_IPC_PLUGIN (object);
g_clear_pointer (&self->path, g_free);
G_OBJECT_CLASS (wp_ipc_plugin_parent_class)->finalize (object);
}
static void
wp_ipc_plugin_class_init (WpIpcPluginClass * klass)
{
GObjectClass * object_class = (GObjectClass *) klass;
WpPluginClass *plugin_class = (WpPluginClass *) klass;
plugin_class->enable = wp_ipc_plugin_enable;
plugin_class->disable = wp_ipc_plugin_disable;
object_class->finalize = wp_ipc_plugin_finalize;
object_class->set_property = wp_ipc_plugin_set_property;
g_object_class_install_property (object_class, PROP_PATH,
g_param_spec_string ("path", "path",
"The path of the IPC server", NULL,
G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
}
WP_PLUGIN_EXPORT gboolean
wireplumber__module_init (WpCore * core, GVariant * args, GError ** error)
{
const gchar *path = NULL;
if (!g_variant_lookup (args, "path", "s", &path)) {
wp_warning_object (core, "cannot load IPC module without path argument");
return FALSE;
}
wp_plugin_register (g_object_new (wp_ipc_plugin_get_type (),
"name", "ipc",
"core", core,
"path", path,
NULL));
return TRUE;
}

View file

@ -20,6 +20,3 @@ load_script("suspend-node.lua")
-- Automatically sets device profiles to 'On'
load_module("device-activation")
-- Listens for events comming from the wpipc library
--load_module("ipc", {["path"] = "wpipc"})

View file

@ -11,16 +11,3 @@ executable('audiotestsrc-play',
dependencies : [giounix_dep, wp_dep, pipewire_dep],
)
endif
if wpipc_dep.found()
executable('wpipc-client',
'wpipc-client.c',
c_args : [
'-D_GNU_SOURCE',
'-DG_LOG_USE_STRUCTURED',
'-DG_LOG_DOMAIN="wpipc-client"',
],
install: false,
dependencies : [wpipc_dep, threads_dep],
)
endif

View file

@ -1,109 +0,0 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#include <pthread.h>
#include <assert.h>
#include <spa/pod/builder.h>
#include <wpipc/wpipc.h>
struct client_data {
struct wpipc_client *c;
pthread_mutex_t mutex;
pthread_cond_t cond;
bool reply_received;
};
static void
reply_handler (struct wpipc_sender *self, const uint8_t *buffer, size_t size, void *p)
{
struct client_data *data = p;
const char *error = NULL;
if (buffer) {
const struct spa_pod *pod = wpipc_client_send_request_finish (self, buffer, size, &error);
if (!pod)
printf ("error: %s\n", error ? error : "unknown");
else
printf ("success!\n");
} else {
printf ("error: lost connection with server\n");
}
/* signal reply received */
pthread_mutex_lock (&data->mutex);
data->reply_received = true;
pthread_cond_signal (&data->cond);
pthread_mutex_unlock (&data->mutex);
}
int
main (int argc, char *argv[])
{
struct client_data data;
if (argc < 2) {
printf ("usage: <server-path>\n");
return -1;
}
/* init */
data.c = wpipc_client_new (argv[1], true);
pthread_mutex_init (&data.mutex, NULL);
pthread_cond_init (&data.cond, NULL);
data.reply_received = false;
while (true) {
char str[1024];
printf ("> ");
fgets (str, 1023, stdin);
if (strncmp (str, "help", 4) == 0) {
printf ("help\tprints this message\n");
printf ("quit\texits the client\n");
printf ("send\tsends a request, usage: send <request-name> [args]\n");
} else if (strncmp (str, "quit", 4) == 0) {
printf ("exiting...\n");
break;
} else if (strncmp (str, "send", 4) == 0) {
char request_name[128];
char request_args[1024];
int n = sscanf(str, "send %s %s", request_name, request_args);
if (n <= 0)
continue;
/* send request */
if (n >= 2) {
/* TODO: for now we always create a string pod for args */
struct {
struct spa_pod_string pod;
char str[1024];
} args;
args.pod = SPA_POD_INIT_String(1024);
strncpy (args.str, request_args, 1024);
wpipc_client_send_request (data.c, request_name, (const struct spa_pod *)&args,
reply_handler, &data);
} else {
wpipc_client_send_request (data.c, request_name, NULL, reply_handler, &data);
}
/* wait for reply */
pthread_mutex_lock (&data.mutex);
while (!data.reply_received)
pthread_cond_wait (&data.cond, &data.mutex);
pthread_mutex_unlock (&data.mutex);
}
}
/* clean up */
pthread_cond_destroy (&data.cond);
pthread_mutex_destroy (&data.mutex);
wpipc_client_free (data.c);
return 0;
}

View file

@ -61,6 +61,5 @@ endif
subdir('wp')
subdir('wplua')
subdir('wpipc', if_found: wpipc_dep)
subdir('modules')
subdir('examples')

View file

@ -1,125 +0,0 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#include <glib.h>
#include <spa/pod/builder.h>
#include <spa/pod/parser.h>
#include <wpipc/wpipc.h>
#include <unistd.h>
static bool
increment_request_handler (struct wpipc_server *self, int client_fd,
const char *name, const struct spa_pod *args, void *data)
{
int32_t val = 0;
g_assert_true (spa_pod_is_int (args));
g_assert_true (spa_pod_get_int (args, &val) == 0);
struct spa_pod_int res = SPA_POD_INIT_Int (val + 1);
return wpipc_server_reply_ok (self, client_fd, (struct spa_pod *)&res);
}
static bool
error_request_handler (struct wpipc_server *self, int client_fd,
const char *name, const struct spa_pod *args, void *data)
{
return wpipc_server_reply_error (self, client_fd, "error message");
}
struct reply_data {
int32_t incremented;
const char *error;
int n_replies;
GMutex mutex;
GCond cond;
};
static void
wait_for_reply (struct reply_data *data, int n_replies)
{
g_mutex_lock (&data->mutex);
while (data->n_replies < n_replies)
g_cond_wait (&data->cond, &data->mutex);
g_mutex_unlock (&data->mutex);
}
static void
reply_handler (struct wpipc_sender *self, const uint8_t *buffer, size_t size, void *p)
{
struct reply_data *data = p;
g_assert_nonnull (data);
g_mutex_lock (&data->mutex);
const struct spa_pod *pod = wpipc_client_send_request_finish (self, buffer, size, &data->error);
if (pod) {
g_assert_true (spa_pod_is_int (pod));
g_assert_true (spa_pod_get_int (pod, &data->incremented) == 0);
}
data->n_replies++;
g_cond_signal (&data->cond);
g_mutex_unlock (&data->mutex);
}
static void
test_wpipc_server_client ()
{
g_autofree gchar *address = g_strdup_printf ("%s/wpipc-test-%d-%d",
g_get_tmp_dir(), getpid(), g_random_int ());
struct wpipc_server *s = wpipc_server_new (address, true);
g_assert_nonnull (s);
struct wpipc_client *c = wpipc_client_new (address, true);
g_assert_nonnull (c);
struct reply_data data;
g_mutex_init (&data.mutex);
g_cond_init (&data.cond);
/* add request handlers */
g_assert_true (wpipc_server_set_request_handler (s, "INCREMENT", increment_request_handler, NULL));
g_assert_true (wpipc_server_set_request_handler (s, "ERROR", error_request_handler, NULL));
/* send an INCREMENT request of 3, and make sure the returned value is 4 */
data.incremented = -1;
data.error = NULL;
data.n_replies = 0;
struct spa_pod_int i = SPA_POD_INIT_Int (3);
g_assert_true (wpipc_client_send_request (c, "INCREMENT", (struct spa_pod *)&i, reply_handler, &data));
wait_for_reply (&data, 1);
g_assert_null (data.error);
g_assert_cmpint (data.incremented, ==, 4);
/* send an ERROR request, and make sure the returned value is an error */
data.error = NULL;
data.n_replies = 0;
g_assert_true (wpipc_client_send_request (c, "ERROR", NULL, reply_handler, &data));
wait_for_reply (&data, 1);
g_assert_cmpstr (data.error, ==, "error message");
/* send an unhandled request, and make sure the server replies with an error */
data.error = NULL;
data.n_replies = 0;
g_assert_true (wpipc_client_send_request (c, "UNHANDLED-REQUEST", NULL, reply_handler, &data));
wait_for_reply (&data, 1);
g_assert_cmpstr (data.error, ==, "request handler not found");
/* clean up */
g_cond_clear (&data.cond);
g_mutex_clear (&data.mutex);
wpipc_client_free (c);
wpipc_server_free (s);
}
gint
main (gint argc, gchar *argv[])
{
g_test_init (&argc, &argv, NULL);
g_test_add_func ("/wpipc/wpipc-server-client", test_wpipc_server_client);
return g_test_run ();
}

View file

@ -1,26 +0,0 @@
common_deps = [wpipc_dep, glib_dep]
common_env = environment({
'G_TEST_SRCDIR': meson.current_source_dir(),
'G_TEST_BUILDDIR': meson.current_build_dir(),
})
test(
'test-wpipc-sender-receiver',
executable('test-sender-receiver', 'sender-receiver.c', dependencies: common_deps),
env: common_env,
workdir : meson.current_source_dir(),
)
test(
'test-wpipc-protocol',
executable('test-protocol', 'protocol.c', dependencies: common_deps),
env: common_env,
workdir : meson.current_source_dir(),
)
test(
'test-wpipc-client-server',
executable('test-client-server', 'client-server.c', dependencies: common_deps),
env: common_env,
workdir : meson.current_source_dir(),
)

View file

@ -1,78 +0,0 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#include <glib.h>
#include <spa/pod/builder.h>
#include <spa/pod/parser.h>
#include <wpipc/wpipc.h>
static void
test_wpipc_protocol ()
{
uint8_t b[1024];
/* request null value */
{
wpipc_protocol_build_request (b, sizeof(b), "name", NULL);
const char *name = NULL;
const struct spa_pod *value = NULL;
g_assert_true (wpipc_protocol_parse_request (b, sizeof(b), &name, &value));
g_assert_cmpstr (name, ==, "name");
g_assert_true (spa_pod_is_none (value));
}
/* request */
{
struct spa_pod_int i = SPA_POD_INIT_Int (8);
wpipc_protocol_build_request (b, sizeof(b), "name", (struct spa_pod *)&i);
const char *name = NULL;
const struct spa_pod_int *value = NULL;
g_assert_true (wpipc_protocol_parse_request (b, sizeof(b), &name, (const struct spa_pod **)&value));
g_assert_cmpstr (name, ==, "name");
g_assert_cmpint (value->value, ==, 8);
}
/* reply error */
{
wpipc_protocol_build_reply_error (b, sizeof(b), "error message");
g_assert_true (wpipc_protocol_is_reply_error (b, sizeof(b)));
const char *msg = NULL;
g_assert_true (wpipc_protocol_parse_reply_error (b, sizeof(b), &msg));
g_assert_cmpstr (msg, ==, "error message");
}
/* reply ok null value */
{
wpipc_protocol_build_reply_ok (b, sizeof(b), NULL);
g_assert_true (wpipc_protocol_is_reply_ok (b, sizeof(b)));
const struct spa_pod *value = NULL;
g_assert_true (wpipc_protocol_parse_reply_ok (b, sizeof(b), &value));
g_assert_nonnull (value);
g_assert_true (spa_pod_is_none (value));
}
/* reply ok */
{
struct spa_pod_int i = SPA_POD_INIT_Int (3);
wpipc_protocol_build_reply_ok (b, sizeof(b), (struct spa_pod *)&i);
g_assert_true (wpipc_protocol_is_reply_ok (b, sizeof(b)));
const struct spa_pod_int *value = NULL;
g_assert_true (wpipc_protocol_parse_reply_ok (b, sizeof(b), (const struct spa_pod **)&value));
g_assert_cmpint (value->value, ==, 3);
}
}
gint
main (gint argc, gchar *argv[])
{
g_test_init (&argc, &argv, NULL);
g_test_add_func ("/wpipc/wpipc-protocol", test_wpipc_protocol);
return g_test_run ();
}

View file

@ -1,319 +0,0 @@
/* WirePlumber
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#include <glib.h>
#include <wpipc/wpipc.h>
#include <unistd.h>
struct event_data {
const uint8_t * expected_data;
size_t expected_size;
int connections;
int n_events;
GMutex mutex;
GCond cond;
};
static void
wait_for_event (struct event_data *data, int n_events)
{
g_mutex_lock (&data->mutex);
while (data->n_events < n_events)
g_cond_wait (&data->cond, &data->mutex);
g_mutex_unlock (&data->mutex);
}
static void
sender_state_callback (struct wpipc_receiver *self, int sender_fd,
enum wpipc_receiver_sender_state sender_state, void *p)
{
struct event_data *data = p;
g_assert_nonnull (data);
g_mutex_lock (&data->mutex);
switch (sender_state) {
case WPIPC_RECEIVER_SENDER_STATE_CONNECTED:
data->connections++;
break;
case WPIPC_RECEIVER_SENDER_STATE_DISCONNECTED:
data->connections--;
break;
default:
g_assert_not_reached ();
break;
}
data->n_events++;
g_cond_signal (&data->cond);
g_mutex_unlock (&data->mutex);
}
static void
reply_callback (struct wpipc_sender *self, const uint8_t *buffer, size_t size, void *p)
{
struct event_data *data = p;
g_assert_nonnull (data);
g_assert_nonnull (buffer);
g_mutex_lock (&data->mutex);
g_assert_cmpmem (buffer, size, data->expected_data, data->expected_size);
data->n_events++;
g_cond_signal (&data->cond);
g_mutex_unlock (&data->mutex);
}
static void
test_wpipc_receiver_basic ()
{
g_autofree gchar *address = g_strdup_printf ("%s/wpipc-test-%d-%d",
g_get_tmp_dir(), getpid(), g_random_int ());
struct wpipc_receiver *r = wpipc_receiver_new (address, 16, NULL, NULL, 0);
g_assert_nonnull (r);
/* start and stop */
g_assert_false (wpipc_receiver_is_running (r));
g_assert_true (wpipc_receiver_start (r));
g_assert_true (wpipc_receiver_is_running (r));
wpipc_receiver_stop (r);
g_assert_false (wpipc_receiver_is_running (r));
/* clean up */
wpipc_receiver_free (r);
}
static void
test_wpipc_sender_basic ()
{
g_autofree gchar *address = g_strdup_printf ("%s/wpipc-test-%d-%d",
g_get_tmp_dir(), getpid(), g_random_int ());
struct wpipc_sender *s = wpipc_sender_new (address, 16, NULL, NULL, 0);
g_assert_nonnull (s);
/* clean up */
wpipc_sender_free (s);
}
static void
test_wpipc_sender_connect ()
{
static struct wpipc_receiver_events events = {
.sender_state = sender_state_callback,
.handle_message = NULL,
};
struct event_data data;
g_mutex_init (&data.mutex);
g_cond_init (&data.cond);
data.n_events = 0;
data.connections = 0;
g_autofree gchar *address = g_strdup_printf ("%s/wpipc-test-%d-%d",
g_get_tmp_dir(), getpid(), g_random_int ());
struct wpipc_receiver *r = wpipc_receiver_new (address, 16, &events, &data, 0);
g_assert_nonnull (r);
struct wpipc_sender *s = wpipc_sender_new (address, 16, NULL, NULL, 0);
g_assert_nonnull (s);
/* start receiver */
g_assert_true (wpipc_receiver_start (r));
/* connect sender */
g_assert_true (wpipc_sender_connect (s));
g_assert_true (wpipc_sender_is_connected (s));
wait_for_event (&data, 1);
g_assert_cmpint (data.connections, ==, 1);
/* disconnect sender */
wpipc_sender_disconnect (s);
g_assert_false (wpipc_sender_is_connected (s));
wait_for_event (&data, 2);
g_assert_cmpint (data.connections, ==, 0);
/* stop receiver */
wpipc_receiver_stop (r);
/* clean up */
g_cond_clear (&data.cond);
g_mutex_clear (&data.mutex);
wpipc_sender_free (s);
wpipc_receiver_free (r);
}
static void
lost_connection_handler (struct wpipc_sender *self, int receiver_fd, void *p)
{
struct event_data *data = p;
g_assert_nonnull (data);
g_mutex_lock (&data->mutex);
data->n_events++;
g_cond_signal (&data->cond);
g_mutex_unlock (&data->mutex);
}
static void
test_wpipc_sender_lost_connection ()
{
struct event_data data;
g_mutex_init (&data.mutex);
g_cond_init (&data.cond);
g_autofree gchar *address = g_strdup_printf ("%s/wpipc-test-%d-%d",
g_get_tmp_dir(), getpid(), g_random_int ());
struct wpipc_receiver *r = wpipc_receiver_new (address, 16, NULL, NULL, 0);
g_assert_nonnull (r);
struct wpipc_sender *s = wpipc_sender_new (address, 16, lost_connection_handler, &data, 0);
g_assert_nonnull (s);
/* connect sender */
g_assert_true (wpipc_sender_connect (s));
g_assert_true (wpipc_sender_is_connected (s));
/* destroy receiver and make sure the lost connection handler is triggered */
data.n_events = 0;
wpipc_receiver_free (r);
wait_for_event (&data, 1);
/* make sure the connection was lost */
g_assert_false (wpipc_sender_is_connected (s));
/* create a new receiver */
struct wpipc_receiver *r2 = wpipc_receiver_new (address, 16, NULL, NULL, 0);
g_assert_nonnull (r2);
/* re-connect sender with new receiver */
g_assert_true (wpipc_sender_connect (s));
g_assert_true (wpipc_sender_is_connected (s));
/* clean up */
g_cond_clear (&data.cond);
g_mutex_clear (&data.mutex);
wpipc_sender_free (s);
wpipc_receiver_free (r2);
}
static void
test_wpipc_sender_send ()
{
g_autofree gchar *address = g_strdup_printf ("%s/wpipc-test-%d-%d",
g_get_tmp_dir(), getpid(), g_random_int ());
struct wpipc_receiver *r = wpipc_receiver_new (address, 2, NULL, NULL, 0);
g_assert_nonnull (r);
struct wpipc_sender *s = wpipc_sender_new (address, 2, NULL, NULL, 0);
g_assert_nonnull (s);
struct event_data data;
g_mutex_init (&data.mutex);
g_cond_init (&data.cond);
data.n_events = 0;
/* start receiver */
g_assert_true (wpipc_receiver_start (r));
/* connect */
g_assert_true (wpipc_sender_connect (s));
g_assert_true (wpipc_sender_is_connected (s));
/* send 1 byte message (should not realloc) */
data.n_events = 0;
data.expected_data = (const uint8_t *)"h";
data.expected_size = 1;
g_assert_true (wpipc_sender_send (s, (const uint8_t *)"h", 1, reply_callback, &data));
wait_for_event (&data, 1);
/* send 2 bytes message (should realloc once to 4) */
data.n_events = 0;
data.expected_data = (const uint8_t *)"hi";
data.expected_size = 2;
g_assert_true (wpipc_sender_send (s, (const uint8_t *)"hi", 2, reply_callback, &data));
wait_for_event (&data, 1);
/* send 3 bytes message (should not realloc) */
data.n_events = 0;
data.expected_data = (const uint8_t *)"hii";
data.expected_size = 3;
g_assert_true (wpipc_sender_send (s, (const uint8_t *)"hii", 3, reply_callback, &data));
wait_for_event (&data, 1);
/* send 28 bytes message (should realloc 3 times: first to 8, then to 16 and finally to 32) */
data.n_events = 0;
data.expected_data = (const uint8_t *)"bigger than 16 bytes message";
data.expected_size = 28;
g_assert_true (wpipc_sender_send (s, (const uint8_t *)"bigger than 16 bytes message", 28, reply_callback, &data));
wait_for_event (&data, 1);
/* don't allow empty messages */
data.n_events = 0;
g_assert_false (wpipc_sender_send (s, (const uint8_t *)"", 0, NULL, NULL));
/* stop receiver */
wpipc_receiver_stop (r);
/* clean up */
g_cond_clear (&data.cond);
g_mutex_clear (&data.mutex);
wpipc_sender_free (s);
wpipc_receiver_free (r);
}
static void
test_wpipc_multiple_senders_send ()
{
g_autofree gchar *address = g_strdup_printf ("%s/wpipc-test-%d-%d",
g_get_tmp_dir(), getpid(), g_random_int ());
struct wpipc_receiver *r = wpipc_receiver_new (address, 16, NULL, NULL, 0);
g_assert_nonnull (r);
struct wpipc_sender *senders[50];
struct event_data data;
g_mutex_init (&data.mutex);
g_cond_init (&data.cond);
data.n_events = 0;
/* start receiver */
g_assert_true (wpipc_receiver_start (r));
/* create and connect 50 senders */
for (int i = 0; i < 50; i++) {
senders[i] = wpipc_sender_new (address, 16, NULL, NULL, 0);
g_assert_nonnull (senders[i]);
g_assert_true (wpipc_sender_connect (senders[i]));
g_assert_true (wpipc_sender_is_connected (senders[i]));
}
/* send 50 messages (1 per sender) */
data.n_events = 0;
data.expected_data = (const uint8_t *)"hello";
data.expected_size = 5;
for (int i = 0; i < 50; i++)
g_assert_true (wpipc_sender_send (senders[i], (const uint8_t *)"hello", 5, reply_callback, &data));
wait_for_event (&data, 50);
/* stop receiver */
wpipc_receiver_stop (r);
/* clean up */
g_cond_clear (&data.cond);
g_mutex_clear (&data.mutex);
for (int i = 0; i < 50; i++)
wpipc_sender_free (senders[i]);
wpipc_receiver_free (r);
}
gint
main (gint argc, gchar *argv[])
{
g_test_init (&argc, &argv, NULL);
g_test_add_func ("/wpipc/receiver-basic", test_wpipc_receiver_basic);
g_test_add_func ("/wpipc/sender-basic", test_wpipc_sender_basic);
g_test_add_func ("/wpipc/sender-connect", test_wpipc_sender_connect);
g_test_add_func ("/wpipc/sender-lost-connection",
test_wpipc_sender_lost_connection);
g_test_add_func ("/wpipc/sender-send", test_wpipc_sender_send);
g_test_add_func ("/wpipc/multiple-senders-send",
test_wpipc_multiple_senders_send);
return g_test_run ();
}