add xcb_take_socket2 exported function

That provides XCB_TAKE_SOCKET_THREAD_SAFE_CALLBACK flag allowing the
caller to fully manage concurrent access in return_socket callback.
Otherwise, with XCB taking care of synchronizing the call to
return_socket necessary waits in return_socket implementation may
deadlock without the possibility to solve that (like that currently
happens with libx11 when it has to wait for user display lock).

With the flag return_socket will be called by every thread which
reached get_socket_back while the socket return process is not complete
yet, allowing a thread which blocks the others waiting in return_socket to
pass get_socket_back through and later unlock the other threads.
This commit is contained in:
Paul Gofman 2025-12-02 10:51:54 -06:00 committed by Paul Gofman
parent 93ee2ac73c
commit 14817a6ddf
4 changed files with 119 additions and 25 deletions

View file

@ -511,7 +511,7 @@ static void *wait_for_reply(xcb_connection_t *c, uint64_t request, xcb_generic_e
void *ret = 0;
/* If this request has not been written yet, write it. */
if(c->out.return_socket || _xcb_out_flush_to(c, request))
if(c->out.return_socket.return_socket || _xcb_out_flush_to(c, request))
{
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
reader_list reader;

View file

@ -31,6 +31,7 @@
#include <assert.h>
#include <stdlib.h>
#include <errno.h>
#ifdef _WIN32
#include <io.h>
#else
@ -90,21 +91,48 @@ static void send_sync(xcb_connection_t *c)
static void get_socket_back(xcb_connection_t *c)
{
while(c->out.return_socket && c->out.socket_moving)
pthread_cond_wait(&c->out.socket_cond, &c->iolock);
if(!c->out.return_socket)
return;
while(c->out.return_socket.return_socket) {
if(c->out.take_socket_flags & XCB_TAKE_SOCKET_THREAD_SAFE_CALLBACK) {
xcb_return_socket_callback_t return_socket = c->out.return_socket;
void *socket_closure = c->out.socket_closure;
int socket_seq = c->out.socket_seq;
c->out.socket_moving = 1;
pthread_mutex_unlock(&c->iolock);
c->out.return_socket(c->out.socket_closure);
pthread_mutex_lock(&c->iolock);
c->out.socket_moving = 0;
pthread_mutex_unlock(&c->iolock);
return_socket.return_socket(socket_closure, socket_seq);
pthread_mutex_lock(&c->iolock);
pthread_cond_broadcast(&c->out.socket_cond);
c->out.return_socket = 0;
c->out.socket_closure = 0;
_xcb_in_replies_done(c);
if(socket_seq == c->out.socket_seq && c->out.return_socket.return_socket) {
/* one thread is guaranteed to hit this condition: taking socket for the next time should pass
* the wait in get_socket_back() before increasing socket_seq. */
c->out.return_socket.return_socket = 0;
c->out.socket_closure = 0;
_xcb_in_replies_done(c);
return;
}
continue;
}
/* Without XCB_TAKE_SOCKET_THREAD_SAFE_CALLBACK make sure return_socket is called just once. */
while(c->out.return_socket.return_socket && c->out.socket_moving)
pthread_cond_wait(&c->out.socket_cond, &c->iolock);
if(!c->out.return_socket.return_socket)
return;
if (c->out.take_socket_flags & XCB_TAKE_SOCKET_THREAD_SAFE_CALLBACK) {
/* The socket was returned and taken again with a different type of callback. */
continue;
}
c->out.socket_moving = 1;
pthread_mutex_unlock(&c->iolock);
c->out.return_socket.return_socket_legacy(c->out.socket_closure);
pthread_mutex_lock(&c->iolock);
c->out.socket_moving = 0;
pthread_cond_broadcast(&c->out.socket_cond);
c->out.return_socket.return_socket = 0;
c->out.socket_closure = 0;
_xcb_in_replies_done(c);
}
}
static void prepare_socket_request(xcb_connection_t *c)
@ -373,11 +401,18 @@ xcb_send_fd(xcb_connection_t *c, int fd)
pthread_mutex_unlock(&c->iolock);
}
int xcb_take_socket(xcb_connection_t *c, void (*return_socket)(void *closure), void *closure, int flags, uint64_t *sent)
int xcb_take_socket2(xcb_connection_t *c, int request_flags, uint64_t *sent, xcb_take_socket_params_t *p,
uint32_t *take_socket_seq)
{
int ret;
if(c->has_error)
return 0;
if(p->flags & ~XCB_TAKE_SOCKET_THREAD_SAFE_CALLBACK)
{
errno = ENOTSUP;
return 0;
}
pthread_mutex_lock(&c->iolock);
get_socket_back(c);
@ -389,15 +424,18 @@ int xcb_take_socket(xcb_connection_t *c, void (*return_socket)(void *closure), v
while (ret && c->out.request != c->out.request_written);
if(ret)
{
c->out.return_socket = return_socket;
c->out.socket_closure = closure;
if(flags) {
c->out.socket_seq++;
c->out.return_socket.return_socket = p->return_socket_callback.return_socket;
c->out.socket_closure = p->closure;
c->out.take_socket_flags = p->flags;
*take_socket_seq = c->out.socket_seq;
if(request_flags) {
/* c->out.request + 1 will be the first request sent by the external
* socket owner. If the socket is returned before this request is sent
* it will be detected in _xcb_in_replies_done and this pending_reply
* will be discarded.
*/
_xcb_in_expect_reply(c, c->out.request + 1, WORKAROUND_EXTERNAL_SOCKET_OWNER, flags);
_xcb_in_expect_reply(c, c->out.request + 1, WORKAROUND_EXTERNAL_SOCKET_OWNER, request_flags);
}
assert(c->out.request == c->out.request_written);
*sent = c->out.request;
@ -406,6 +444,17 @@ int xcb_take_socket(xcb_connection_t *c, void (*return_socket)(void *closure), v
return ret;
}
int xcb_take_socket(xcb_connection_t *c, void (*return_socket)(void *closure), void *closure, int request_flags, uint64_t *sent)
{
xcb_take_socket_params_t p;
uint32_t socket_seq;
p.flags = 0;
p.return_socket_callback.return_socket_legacy = return_socket;
p.closure = closure;
return xcb_take_socket2(c, request_flags, sent, &p, &socket_seq);
}
int xcb_writev(xcb_connection_t *c, struct iovec *vector, int count, uint64_t requests)
{
int ret;
@ -435,9 +484,10 @@ int _xcb_out_init(_xcb_out *out)
{
if(pthread_cond_init(&out->socket_cond, 0))
return 0;
out->return_socket = 0;
out->return_socket.return_socket = 0;
out->socket_closure = 0;
out->socket_moving = 0;
out->socket_seq = 0;
if(pthread_cond_init(&out->cond, 0))
return 0;

View file

@ -183,15 +183,46 @@ uint64_t xcb_send_request_with_fds64(xcb_connection_t *c, int flags, struct iove
*/
void xcb_send_fd(xcb_connection_t *c, int fd);
enum xcb_take_socket_flags_t {
XCB_TAKE_SOCKET_THREAD_SAFE_CALLBACK = 1 << 1,
/**< return_socket callback can be called concurrently as well as multiple time for the same
socket acquire instance, relying on the callback implementation to synchronize that itself and avoid certain
deadlock scenarious. Cannot be used with XCB_TAKE_SOCKET_LEGACY_CALLBACK. */
};
typedef struct {
unsigned int flags;
/**< A combination of flags from the xcb_take_socket_flags_t enumeration.
If there is an unknown flag specified the call fails and errno is set to ENOTSUP. */
union {
void (*return_socket_legacy)(void *closure);
void (*return_socket)(void *closure, uint32_t take_socket_seq);
} return_socket_callback;
/**< Callback function that will be called when xcb wants to use the socket again. return_socket_legacy is used if
XCB_TAKE_SOCKET_THREAD_SAFE_CALLBACK flag is not specified.
If XCB_TAKE_SOCKET_THREAD_SAFE_CALLBACK flag is specified the callback should account for the case of possible
concurrent invocation by multiple threads trying to return the socket. The call may also be performed once again
after the socket was already returned by previous call to return_socket, before or in process of taking the socket again.
If take_socket_seq does not match the currently acquired sequence number previously returned in take_socket_seq field
the callback implementation should ignore the call. */
void *closure; /**< Argument to the callback function. */
} xcb_take_socket_params_t;
/**
* @brief Take over the write side of the socket
* @param c The connection to the X server.
* @param return_socket Callback function that will be called when xcb wants
* to use the socket again.
* @param closure Argument to the callback function.
* @param flags A combination of flags from the xcb_send_request_flags_t enumeration.
* to use the socket again. If XCB_TAKE_SOCKET_THREAD_SAFE_CALLBACK flag is specified,
* the callback should account for the case of possible concurrent invocation by multiple
* threads trying to return the socket. Return value should be zero.
* @param request_flags A combination of flags from the xcb_send_request_flags_t enumeration.
* @param sent Location to the sequence number of the last sequence request.
* Must not be NULL.
* @param flags A combination of flags from the xcb_take_socket_flags_t enumeration. If there is an unknown flag
* specified the call fails and errno is set to ENOTSUP.
* @param p Specifies xcb_take_socket_params_t structure with additional parameters.
* @param take_socket_seq On success return, filled with a sequence number associated with taking the socket.
* The same value will be passed to return_socket callback invocation(s).
* @return 1 on success, else 0.
*
* xcb_take_socket allows external code to ask XCB for permission to
@ -210,7 +241,13 @@ void xcb_send_fd(xcb_connection_t *c, int fd);
* picked up via xcb_wait_for_reply(), xcb_poll_for_reply() or
* xcb_request_check().
*/
int xcb_take_socket(xcb_connection_t *c, void (*return_socket)(void *closure), void *closure, int flags, uint64_t *sent);
int xcb_take_socket2(xcb_connection_t *c, int request_flags, uint64_t *sent, xcb_take_socket_params_t *p,
uint32_t *take_socket_seq);
/**
* @brief The call is equivalent to xcb_take_socket2 called with zero flags parameter.
*/
int xcb_take_socket(xcb_connection_t *c, void (*return_socket)(void *closure), void *closure, int request_flags, uint64_t *sent);
/**
* @brief Send raw data to the X server.

View file

@ -97,14 +97,21 @@ typedef struct _xcb_fd {
} _xcb_fd;
#endif
typedef union {
void (*return_socket_legacy)(void *closure);
void (*return_socket)(void *closure, uint32_t take_socket_seq);
} xcb_return_socket_callback_t;
typedef struct _xcb_out {
pthread_cond_t cond;
int writing;
pthread_cond_t socket_cond;
void (*return_socket)(void *closure);
xcb_return_socket_callback_t return_socket;
void *socket_closure;
int take_socket_flags;
int socket_moving;
uint32_t socket_seq;
char queue[XCB_QUEUE_BUFFER_SIZE];
int queue_len;