diff --git a/src/gallium/frontends/rusticl/core/queue.rs b/src/gallium/frontends/rusticl/core/queue.rs index 0836ace2025..a73db4f2280 100644 --- a/src/gallium/frontends/rusticl/core/queue.rs +++ b/src/gallium/frontends/rusticl/core/queue.rs @@ -9,6 +9,7 @@ use crate::impl_cl_type_trait; use mesa_rust::compiler::nir::NirShader; use mesa_rust::pipe::context::PipeContext; use mesa_rust::pipe::context::PipeContextPrio; +use mesa_rust::pipe::fence::PipeFence; use mesa_rust_gen::*; use mesa_rust_util::properties::*; use rusticl_opencl_gen::*; @@ -203,7 +204,8 @@ pub struct Queue { pub props: cl_command_queue_properties, pub props_v2: Properties, state: Mutex, - _thrd: JoinHandle<()>, + _thread_worker: JoinHandle<()>, + _thrd_signal: JoinHandle<()>, } /// Wrapper around Event to set it to an error state on drop. This is useful for dealing with panics @@ -272,16 +274,22 @@ impl IntoIterator for QueueEvents { impl_cl_type_trait!(cl_command_queue, Queue, CL_INVALID_COMMAND_QUEUE); -fn flush_events(evs: &mut Vec>, pipe: &PipeContext) -> cl_int { +fn flush_events( + evs: &mut Vec>, + pipe: &PipeContext, + send: &mpsc::Sender<(PipeFence, Box<[Arc]>)>, +) -> cl_int { if !evs.is_empty() { - pipe.flush().wait(); + let fence = pipe.flush(); if pipe.device_reset_status() != pipe_reset_status::PIPE_NO_RESET { // if the context reset while executing, simply put all events into error state. evs.drain(..) .for_each(|e| e.set_user_status(CL_OUT_OF_RESOURCES)); return CL_OUT_OF_RESOURCES; } else { - evs.drain(..).for_each(|e| e.signal()); + // We drain the original vector so we don't start allocating from scratch. + let evs = evs.drain(..).collect(); + send.send((fence, evs)).unwrap(); } } @@ -311,6 +319,7 @@ impl Queue { // should be detected earlier (e.g.: checking for CAPs). let ctx = SendableQueueContext::new(device, prio)?; let (tx_q, rx_t) = mpsc::channel::>>(); + let (tx_q2, rx_t2) = mpsc::channel(); Ok(Arc::new(Self { base: CLObjectBase::new(RusticlTypes::Queue), context: context, @@ -322,9 +331,10 @@ impl Queue { last: Weak::new(), chan_in: tx_q, }), - _thrd: thread::Builder::new() - .name("rusticl queue thread".into()) + _thread_worker: thread::Builder::new() + .name("rusticl queue worker thread".into()) .spawn(move || { + let tx_q2 = tx_q2; // Track the error of all executed events. This is only needed for in-order // queues, so for out of order we'll need to update this. // Also, the OpenCL specification gives us enough freedom to do whatever we want @@ -353,7 +363,7 @@ impl Queue { // If we hit any deps from another queue, flush so we don't risk a dead // lock. if e.deps().iter().any(|ev| !e.has_same_queue_as(ev)) { - let dep_err = flush_events(&mut flushed, &ctx); + let dep_err = flush_events(&mut flushed, &ctx, &tx_q2); last_err = cmp::min(last_err, dep_err); } @@ -387,7 +397,7 @@ impl Queue { if e.is_user() { // On each user event we flush our events as application might // wait on them before signaling user events. - last_err = flush_events(&mut flushed, &ctx); + last_err = flush_events(&mut flushed, &ctx, &tx_q2); if last_err >= 0 { // Wait on user events as they are synchronization points in the @@ -396,17 +406,32 @@ impl Queue { } } else if Platform::dbg().sync_every_event { flushed.push(e); - last_err = flush_events(&mut flushed, &ctx); + last_err = flush_events(&mut flushed, &ctx, &tx_q2); } else { flushed.push(e); } } - let flush_err = flush_events(&mut flushed, &ctx); + let flush_err = flush_events(&mut flushed, &ctx, &tx_q2); last_err = cmp::min(last_err, flush_err); } }) .unwrap(), + _thrd_signal: thread::Builder::new() + .name("rusticl queue signal thread".into()) + .spawn(move || loop { + let Ok((fence, events)) = rx_t2.recv() else { + break; + }; + + let evs = events.iter(); + if fence.wait() { + evs.for_each(|e| e.signal()); + } else { + evs.for_each(|e| e.set_user_status(CL_OUT_OF_RESOURCES)); + } + }) + .unwrap(), })) } diff --git a/src/gallium/frontends/rusticl/mesa/pipe/fence.rs b/src/gallium/frontends/rusticl/mesa/pipe/fence.rs index 5c063986d34..745324a3ea7 100644 --- a/src/gallium/frontends/rusticl/mesa/pipe/fence.rs +++ b/src/gallium/frontends/rusticl/mesa/pipe/fence.rs @@ -22,6 +22,8 @@ pub struct PipeFence { screen: Arc, } +unsafe impl Send for PipeFence {} + impl PipeFence { pub fn new(fence: *mut pipe_fence_handle, screen: &Arc) -> Self { Self {