diff --git a/src/gallium/frontends/rusticl/core/event.rs b/src/gallium/frontends/rusticl/core/event.rs index 8c87a2c397f..879e99a8373 100644 --- a/src/gallium/frontends/rusticl/core/event.rs +++ b/src/gallium/frontends/rusticl/core/event.rs @@ -5,7 +5,6 @@ use crate::core::queue::*; use crate::impl_cl_type_trait; use mesa_rust::pipe::context::*; -use mesa_rust::pipe::fence::*; use mesa_rust_util::static_assert; use rusticl_opencl_gen::*; @@ -29,7 +28,6 @@ pub type EventSig = Box, &PipeContext) -> CLResult<()>>; struct EventMutState { status: cl_int, cbs: [Vec<(EventCB, *mut c_void)>; 3], - fence: Option, work: Option, } @@ -65,7 +63,6 @@ impl Event { state: Mutex::new(EventMutState { status: CL_QUEUED as cl_int, cbs: [Vec::new(), Vec::new(), Vec::new()], - fence: None, work: Some(work), }), cv: Condvar::new(), @@ -82,7 +79,6 @@ impl Event { state: Mutex::new(EventMutState { status: CL_SUBMITTED as cl_int, cbs: [Vec::new(), Vec::new(), Vec::new()], - fence: None, work: None, }), cv: Condvar::new(), @@ -104,7 +100,12 @@ impl Event { fn set_status(&self, lock: &mut MutexGuard, new: cl_int) { lock.status = new; - self.cv.notify_all(); + + // signal on completion or an error + if new <= CL_COMPLETE as cl_int { + self.cv.notify_all(); + } + if [CL_COMPLETE, CL_RUNNING, CL_SUBMITTED].contains(&(new as u32)) { if let Some(cbs) = lock.cbs.get(new as usize) { cbs.iter() @@ -122,6 +123,10 @@ impl Event { self.status() < 0 } + pub fn is_user(&self) -> bool { + self.cmd_type == CL_COMMAND_USER + } + pub fn add_cb(&self, state: cl_int, cb: EventCB, data: *mut c_void) { let mut lock = self.state(); let status = lock.status; @@ -135,21 +140,21 @@ impl Event { } } + pub(super) fn signal(&self) { + let mut lock = self.state(); + + self.set_status(&mut lock, CL_RUNNING as cl_int); + self.set_status(&mut lock, CL_COMPLETE as cl_int); + } + pub fn wait(&self) -> cl_int { let mut lock = self.state(); - while lock.status >= CL_SUBMITTED as cl_int { - if lock.fence.is_some() { - lock.fence.as_ref().unwrap().wait(); - // so we trigger all cbs - self.set_status(&mut lock, CL_RUNNING as cl_int); - self.set_status(&mut lock, CL_COMPLETE as cl_int); - } else { - lock = self - .cv - .wait_timeout(lock, Duration::from_millis(50)) - .unwrap() - .0; - } + while lock.status >= CL_RUNNING as cl_int { + lock = self + .cv + .wait_timeout(lock, Duration::from_secs(1)) + .unwrap() + .0; } lock.status } @@ -157,7 +162,7 @@ impl Event { // We always assume that work here simply submits stuff to the hardware even if it's just doing // sw emulation or nothing at all. // If anything requets waiting, we will update the status through fencing later. - pub fn call(&self, ctx: &PipeContext) -> cl_int { + pub fn call(&self, ctx: &PipeContext) { let mut lock = self.state(); let status = lock.status; if status == CL_QUEUED as cl_int { @@ -171,7 +176,6 @@ impl Event { CL_SUBMITTED as cl_int, |e| e, ); - lock.fence = Some(ctx.flush()); res }, ); @@ -180,9 +184,6 @@ impl Event { // absolutely sure it happens before the status update. drop(work); self.set_status(&mut lock, new); - new - } else { - status } } diff --git a/src/gallium/frontends/rusticl/core/queue.rs b/src/gallium/frontends/rusticl/core/queue.rs index a8e5209b4d1..1777cdef192 100644 --- a/src/gallium/frontends/rusticl/core/queue.rs +++ b/src/gallium/frontends/rusticl/core/queue.rs @@ -4,6 +4,7 @@ use crate::core::device::*; use crate::core::event::*; use crate::impl_cl_type_trait; +use mesa_rust::pipe::context::PipeContext; use mesa_rust_util::properties::*; use rusticl_opencl_gen::*; @@ -14,19 +15,31 @@ use std::sync::Mutex; use std::thread; use std::thread::JoinHandle; +struct QueueState { + pending: Vec>, + last: Option>, +} + pub struct Queue { pub base: CLObjectBase, pub context: Arc, pub device: Arc, pub props: cl_command_queue_properties, pub props_v2: Option>, - pending: Mutex>>, + state: Mutex, _thrd: Option>, chan_in: mpsc::Sender>>, } impl_cl_type_trait!(cl_command_queue, Queue, CL_INVALID_COMMAND_QUEUE); +fn flush_events(evs: &mut Vec>, pipe: &PipeContext) { + if !evs.is_empty() { + pipe.flush().wait(); + evs.drain(..).for_each(|e| e.signal()); + } +} + impl Queue { pub fn new( context: Arc, @@ -44,7 +57,10 @@ impl Queue { device: device, props: props, props_v2: props_v2, - pending: Mutex::new(Vec::new()), + state: Mutex::new(QueueState { + pending: Vec::new(), + last: None, + }), _thrd: Some( thread::Builder::new() .name("rusticl queue thread".into()) @@ -53,21 +69,47 @@ impl Queue { if r.is_err() { break; } + let new_events = r.unwrap(); - for e in &new_events { - // all events should be processed, but we might have to wait on user - // events to happen - let err = e.deps.iter().map(|e| e.wait()).find(|s| *s < 0); + let mut flushed = Vec::new(); + + for e in new_events { + // If we hit any deps from another queue, flush so we don't risk a dead + // lock. + if e.deps.iter().any(|ev| ev.queue != e.queue) { + flush_events(&mut flushed, &pipe); + } + + // We have to wait on user events or events from other queues. + let err = e + .deps + .iter() + .filter(|ev| ev.is_user() || ev.queue != e.queue) + .map(|e| e.wait()) + .find(|s| *s < 0); + if let Some(err) = err { - // if a dependency failed, fail this event as well + // If a dependency failed, fail this event as well. e.set_user_status(err); + continue; + } + + e.call(&pipe); + + if !e.is_user() { + flushed.push(e); } else { - e.call(&pipe); + // On each user event we flush our events as application might + // wait on them before signaling user events. + flush_events(&mut flushed, &pipe); + + // Wait on user events as they are synchronization points in the + // application's control. + e.wait(); } } - for e in new_events { - e.wait(); - } + + flush_events(&mut flushed, &pipe); }) .unwrap(), ), @@ -76,28 +118,34 @@ impl Queue { } pub fn queue(&self, e: Arc) { - self.pending.lock().unwrap().push(e); + self.state.lock().unwrap().pending.push(e); } pub fn flush(&self, wait: bool) -> CLResult<()> { - let mut p = self.pending.lock().unwrap(); - let events = p.clone(); + let mut state = self.state.lock().unwrap(); + + // Update last if and only if we get new events, this prevents breaking application code + // doing things like `clFlush(q); clFinish(q);` + if let Some(last) = state.pending.last() { + state.last = Some(last.clone()); + } + // This should never ever error, but if it does return an error self.chan_in - .send((*p).drain(0..).collect()) + .send((state.pending).drain(0..).collect()) .map_err(|_| CL_OUT_OF_HOST_MEMORY)?; if wait { - for e in events { - e.wait(); - } + // Waiting on the last event is good enough here as the queue will process it in order, + // also take the value so we can release the event once we are done + state.last.take().map(|e| e.wait()); } Ok(()) } pub fn dependencies_for_pending_events(&self) -> HashSet> { - let p = self.pending.lock().unwrap(); + let state = self.state.lock().unwrap(); - let mut queues = Event::deep_unflushed_queues(&p); + let mut queues = Event::deep_unflushed_queues(&state.pending); queues.remove(self); queues }