diff --git a/src/gallium/frontends/rusticl/api/event.rs b/src/gallium/frontends/rusticl/api/event.rs index fe6c72eef7e..9411f0ad315 100644 --- a/src/gallium/frontends/rusticl/api/event.rs +++ b/src/gallium/frontends/rusticl/api/event.rs @@ -1,6 +1,7 @@ extern crate rusticl_opencl_gen; use crate::api::icd::*; +use crate::api::types::*; use crate::api::util::*; use crate::core::event::*; use crate::core::queue::*; @@ -59,10 +60,10 @@ pub fn wait_for_events(num_events: cl_uint, event_list: *const cl_event) -> CLRe let mut err = false; for e in evs { if let Some(q) = &e.queue { - q.flush(true)?; + q.flush(false)?; } - err |= e.status() < 0; + err |= e.wait() < 0; } // CL_EXEC_STATUS_ERROR_FOR_EVENTS_IN_WAIT_LIST if the execution status of any of the events @@ -74,6 +75,50 @@ pub fn wait_for_events(num_events: cl_uint, event_list: *const cl_event) -> CLRe Ok(()) } +pub fn set_event_callback( + event: cl_event, + command_exec_callback_type: cl_int, + pfn_event_notify: Option, + user_data: *mut ::std::os::raw::c_void, +) -> CLResult<()> { + let e = event.get_ref()?; + + // CL_INVALID_VALUE if pfn_event_notify is NULL + // or if command_exec_callback_type is not CL_SUBMITTED, CL_RUNNING, or CL_COMPLETE. + if pfn_event_notify.is_none() + || ![CL_SUBMITTED, CL_RUNNING, CL_COMPLETE] + .contains(&(command_exec_callback_type as cl_uint)) + { + return Err(CL_INVALID_VALUE); + } + + e.add_cb( + command_exec_callback_type, + pfn_event_notify.unwrap(), + user_data, + ); + + Ok(()) +} + +pub fn set_user_event_status(event: cl_event, execution_status: cl_int) -> CLResult<()> { + let e = event.get_ref()?; + + // CL_INVALID_VALUE if the execution_status is not CL_COMPLETE or a negative integer value. + if execution_status != CL_COMPLETE as cl_int && execution_status > 0 { + return Err(CL_INVALID_VALUE); + } + + // CL_INVALID_OPERATION if the execution_status for event has already been changed by a + // previous call to clSetUserEventStatus. + if e.status() != CL_SUBMITTED as cl_int { + return Err(CL_INVALID_OPERATION); + } + + e.set_user_status(execution_status); + Ok(()) +} + pub fn create_and_queue( q: Arc, cmd_type: cl_command_type, diff --git a/src/gallium/frontends/rusticl/api/icd.rs b/src/gallium/frontends/rusticl/api/icd.rs index 58a481ed2bb..70d10f5e646 100644 --- a/src/gallium/frontends/rusticl/api/icd.rs +++ b/src/gallium/frontends/rusticl/api/icd.rs @@ -1115,13 +1115,17 @@ extern "C" fn cl_get_extension_function_address( } extern "C" fn cl_set_event_callback( - _event: cl_event, - _command_exec_callback_type: cl_int, - _pfn_notify: Option, - _user_data: *mut ::std::os::raw::c_void, + event: cl_event, + command_exec_callback_type: cl_int, + pfn_notify: Option, + user_data: *mut ::std::os::raw::c_void, ) -> cl_int { - println!("cl_set_event_callback not implemented"); - CL_OUT_OF_HOST_MEMORY + match_err!(set_event_callback( + event, + command_exec_callback_type, + pfn_notify, + user_data + )) } extern "C" fn cl_create_sub_buffer( @@ -1151,9 +1155,8 @@ extern "C" fn cl_create_user_event(context: cl_context, errcode_ret: *mut cl_int match_obj!(create_user_event(context), errcode_ret) } -extern "C" fn cl_set_user_event_status(_event: cl_event, _execution_status: cl_int) -> cl_int { - println!("cl_set_user_event_status not implemented"); - CL_OUT_OF_HOST_MEMORY +extern "C" fn cl_set_user_event_status(event: cl_event, execution_status: cl_int) -> cl_int { + match_err!(set_user_event_status(event, execution_status)) } extern "C" fn cl_enqueue_read_buffer_rect( diff --git a/src/gallium/frontends/rusticl/core/event.rs b/src/gallium/frontends/rusticl/core/event.rs index 441972d6e62..cfa09c0c83c 100644 --- a/src/gallium/frontends/rusticl/core/event.rs +++ b/src/gallium/frontends/rusticl/core/event.rs @@ -1,21 +1,39 @@ extern crate mesa_rust; +extern crate mesa_rust_util; extern crate rusticl_opencl_gen; use crate::api::icd::*; +use crate::api::types::*; use crate::core::context::*; use crate::core::queue::*; use crate::impl_cl_type_trait; use self::mesa_rust::pipe::context::*; +use self::mesa_rust::pipe::fence::*; +use self::mesa_rust_util::static_assert; use self::rusticl_opencl_gen::*; +use std::os::raw::c_void; use std::slice; -use std::sync::atomic::AtomicI32; -use std::sync::atomic::Ordering; use std::sync::Arc; +use std::sync::Condvar; +use std::sync::Mutex; +use std::sync::MutexGuard; + +// we assert that those are a continous range of numbers so we won't have to use HashMaps +static_assert!(CL_COMPLETE == 0); +static_assert!(CL_RUNNING == 1); +static_assert!(CL_SUBMITTED == 2); +static_assert!(CL_QUEUED == 3); pub type EventSig = Box, &Arc) -> CLResult<()>>; +struct EventMutState { + status: cl_int, + cbs: [Vec<(EventCB, *mut c_void)>; 3], + fence: Option, +} + #[repr(C)] pub struct Event { pub base: CLObjectBase, @@ -23,9 +41,9 @@ pub struct Event { pub queue: Option>, pub cmd_type: cl_command_type, pub deps: Vec>, - // use AtomicI32 instead of cl_int so we can change it without a &mut reference - status: AtomicI32, work: Option, + state: Mutex, + cv: Condvar, } impl_cl_type_trait!(cl_event, Event, CL_INVALID_EVENT); @@ -47,8 +65,13 @@ impl Event { queue: Some(queue.clone()), cmd_type: cmd_type, deps: deps, - status: AtomicI32::new(CL_QUEUED as cl_int), + state: Mutex::new(EventMutState { + status: CL_QUEUED as cl_int, + cbs: [Vec::new(), Vec::new(), Vec::new()], + fence: None, + }), work: Some(work), + cv: Condvar::new(), }) } @@ -59,8 +82,13 @@ impl Event { queue: None, cmd_type: CL_COMMAND_USER, deps: Vec::new(), - status: AtomicI32::new(CL_SUBMITTED as cl_int), + state: Mutex::new(EventMutState { + status: CL_SUBMITTED as cl_int, + cbs: [Vec::new(), Vec::new(), Vec::new()], + fence: None, + }), work: None, + cv: Condvar::new(), }) } @@ -69,32 +97,83 @@ impl Event { s.iter().map(|e| e.get_arc()).collect() } - pub fn is_error(&self) -> bool { - self.status.load(Ordering::Relaxed) < 0 + fn state(&self) -> MutexGuard { + self.state.lock().unwrap() } pub fn status(&self) -> cl_int { - self.status.load(Ordering::Relaxed) + self.state().status + } + + fn set_status(&self, lock: &mut MutexGuard, new: cl_int) { + lock.status = new; + self.cv.notify_all(); + if [CL_COMPLETE, CL_RUNNING, CL_SUBMITTED].contains(&(new as u32)) { + if let Some(cbs) = lock.cbs.get(new as usize) { + cbs.iter() + .for_each(|(cb, data)| unsafe { cb(cl_event::from_ptr(self), new, *data) }); + } + } + } + + pub fn set_user_status(&self, status: cl_int) { + let mut lock = self.state(); + self.set_status(&mut lock, status); + } + + pub fn is_error(&self) -> bool { + self.status() < 0 + } + + pub fn add_cb(&self, state: cl_int, cb: EventCB, data: *mut c_void) { + let mut lock = self.state(); + let status = lock.status; + + // call cb if the status was already reached + if state >= status { + drop(lock); + unsafe { cb(cl_event::from_ptr(self), status, data) }; + } else { + lock.cbs.get_mut(state as usize).unwrap().push((cb, data)); + } + } + + pub fn wait(&self) -> cl_int { + let mut lock = self.state(); + while lock.status >= CL_SUBMITTED as cl_int { + if lock.fence.is_some() { + lock.fence.as_ref().unwrap().wait(); + // so we trigger all cbs + self.set_status(&mut lock, CL_RUNNING as cl_int); + self.set_status(&mut lock, CL_COMPLETE as cl_int); + } else { + lock = self.cv.wait(lock).unwrap(); + } + } + lock.status } // We always assume that work here simply submits stuff to the hardware even if it's just doing // sw emulation or nothing at all. // If anything requets waiting, we will update the status through fencing later. pub fn call(&self, ctx: &Arc) -> cl_int { - let status = self.status(); + let mut lock = self.state(); + let status = lock.status; if status == CL_QUEUED as cl_int { let new = self.work.as_ref().map_or( // if there is no work CL_SUBMITTED as cl_int, |w| { - w(self.queue.as_ref().unwrap(), ctx).err().map_or( + let res = w(self.queue.as_ref().unwrap(), ctx).err().map_or( // if there is an error, negate it CL_SUBMITTED as cl_int, |e| e, - ) + ); + lock.fence = Some(ctx.flush()); + res }, ); - self.status.store(new, Ordering::Relaxed); + self.set_status(&mut lock, new); new } else { status diff --git a/src/gallium/frontends/rusticl/core/queue.rs b/src/gallium/frontends/rusticl/core/queue.rs index 1751962497e..9611c028845 100644 --- a/src/gallium/frontends/rusticl/core/queue.rs +++ b/src/gallium/frontends/rusticl/core/queue.rs @@ -24,7 +24,6 @@ pub struct Queue { pending: Mutex>>, _thrd: Option>, chan_in: mpsc::Sender>>, - chan_out: mpsc::Receiver, } impl_cl_type_trait!(cl_command_queue, Queue, CL_INVALID_COMMAND_QUEUE); @@ -39,7 +38,6 @@ impl Queue { // should be detected earlier (e.g.: checking for CAPs). let pipe = device.screen().create_context().unwrap(); let (tx_q, rx_t) = mpsc::channel::>>(); - let (tx_t, rx_q) = mpsc::channel::(); Ok(Arc::new(Self { base: CLObjectBase::new(), context: context, @@ -54,17 +52,25 @@ impl Queue { if r.is_err() { break; } - for e in r.unwrap() { - e.call(&pipe); + let new_events = r.unwrap(); + for e in &new_events { + // all events should be processed, but we might have to wait on user + // events to happen + let err = e.deps.iter().map(|e| e.wait()).find(|s| *s < 0); + if let Some(err) = err { + // if a dependency failed, fail this event as well + e.set_user_status(err); + } else { + e.call(&pipe); + } } - if tx_t.send(true).is_err() { - break; + for e in new_events { + e.wait(); } }) .unwrap(), ), chan_in: tx_q, - chan_out: rx_q, })) } @@ -72,14 +78,18 @@ impl Queue { self.pending.lock().unwrap().push(e.clone()); } - // TODO: implement non blocking flush - pub fn flush(&self, _wait: bool) -> CLResult<()> { + pub fn flush(&self, wait: bool) -> CLResult<()> { let mut p = self.pending.lock().unwrap(); + let last = p.last().cloned(); // This should never ever error, but if it does return an error self.chan_in .send((*p).drain(0..).collect()) .map_err(|_| CL_OUT_OF_HOST_MEMORY)?; - self.chan_out.recv().unwrap(); + if wait { + if let Some(last) = last { + last.wait(); + } + } Ok(()) } }