rusticl/event: proper eventing support

Signed-off-by: Karol Herbst <kherbst@redhat.com>
Acked-by: Alyssa Rosenzweig <alyssa.rosenzweig@collabora.com>
Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/15439>
This commit is contained in:
Karol Herbst 2022-03-16 11:14:53 +01:00 committed by Marge Bot
parent 50e981a050
commit 47a80d7ff4
4 changed files with 171 additions and 34 deletions

View file

@ -1,6 +1,7 @@
extern crate rusticl_opencl_gen;
use crate::api::icd::*;
use crate::api::types::*;
use crate::api::util::*;
use crate::core::event::*;
use crate::core::queue::*;
@ -59,10 +60,10 @@ pub fn wait_for_events(num_events: cl_uint, event_list: *const cl_event) -> CLRe
let mut err = false;
for e in evs {
if let Some(q) = &e.queue {
q.flush(true)?;
q.flush(false)?;
}
err |= e.status() < 0;
err |= e.wait() < 0;
}
// CL_EXEC_STATUS_ERROR_FOR_EVENTS_IN_WAIT_LIST if the execution status of any of the events
@ -74,6 +75,50 @@ pub fn wait_for_events(num_events: cl_uint, event_list: *const cl_event) -> CLRe
Ok(())
}
pub fn set_event_callback(
event: cl_event,
command_exec_callback_type: cl_int,
pfn_event_notify: Option<EventCB>,
user_data: *mut ::std::os::raw::c_void,
) -> CLResult<()> {
let e = event.get_ref()?;
// CL_INVALID_VALUE if pfn_event_notify is NULL
// or if command_exec_callback_type is not CL_SUBMITTED, CL_RUNNING, or CL_COMPLETE.
if pfn_event_notify.is_none()
|| ![CL_SUBMITTED, CL_RUNNING, CL_COMPLETE]
.contains(&(command_exec_callback_type as cl_uint))
{
return Err(CL_INVALID_VALUE);
}
e.add_cb(
command_exec_callback_type,
pfn_event_notify.unwrap(),
user_data,
);
Ok(())
}
pub fn set_user_event_status(event: cl_event, execution_status: cl_int) -> CLResult<()> {
let e = event.get_ref()?;
// CL_INVALID_VALUE if the execution_status is not CL_COMPLETE or a negative integer value.
if execution_status != CL_COMPLETE as cl_int && execution_status > 0 {
return Err(CL_INVALID_VALUE);
}
// CL_INVALID_OPERATION if the execution_status for event has already been changed by a
// previous call to clSetUserEventStatus.
if e.status() != CL_SUBMITTED as cl_int {
return Err(CL_INVALID_OPERATION);
}
e.set_user_status(execution_status);
Ok(())
}
pub fn create_and_queue(
q: Arc<Queue>,
cmd_type: cl_command_type,

View file

@ -1115,13 +1115,17 @@ extern "C" fn cl_get_extension_function_address(
}
extern "C" fn cl_set_event_callback(
_event: cl_event,
_command_exec_callback_type: cl_int,
_pfn_notify: Option<EventCB>,
_user_data: *mut ::std::os::raw::c_void,
event: cl_event,
command_exec_callback_type: cl_int,
pfn_notify: Option<EventCB>,
user_data: *mut ::std::os::raw::c_void,
) -> cl_int {
println!("cl_set_event_callback not implemented");
CL_OUT_OF_HOST_MEMORY
match_err!(set_event_callback(
event,
command_exec_callback_type,
pfn_notify,
user_data
))
}
extern "C" fn cl_create_sub_buffer(
@ -1151,9 +1155,8 @@ extern "C" fn cl_create_user_event(context: cl_context, errcode_ret: *mut cl_int
match_obj!(create_user_event(context), errcode_ret)
}
extern "C" fn cl_set_user_event_status(_event: cl_event, _execution_status: cl_int) -> cl_int {
println!("cl_set_user_event_status not implemented");
CL_OUT_OF_HOST_MEMORY
extern "C" fn cl_set_user_event_status(event: cl_event, execution_status: cl_int) -> cl_int {
match_err!(set_user_event_status(event, execution_status))
}
extern "C" fn cl_enqueue_read_buffer_rect(

View file

@ -1,21 +1,39 @@
extern crate mesa_rust;
extern crate mesa_rust_util;
extern crate rusticl_opencl_gen;
use crate::api::icd::*;
use crate::api::types::*;
use crate::core::context::*;
use crate::core::queue::*;
use crate::impl_cl_type_trait;
use self::mesa_rust::pipe::context::*;
use self::mesa_rust::pipe::fence::*;
use self::mesa_rust_util::static_assert;
use self::rusticl_opencl_gen::*;
use std::os::raw::c_void;
use std::slice;
use std::sync::atomic::AtomicI32;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Condvar;
use std::sync::Mutex;
use std::sync::MutexGuard;
// we assert that those are a continous range of numbers so we won't have to use HashMaps
static_assert!(CL_COMPLETE == 0);
static_assert!(CL_RUNNING == 1);
static_assert!(CL_SUBMITTED == 2);
static_assert!(CL_QUEUED == 3);
pub type EventSig = Box<dyn Fn(&Arc<Queue>, &Arc<PipeContext>) -> CLResult<()>>;
struct EventMutState {
status: cl_int,
cbs: [Vec<(EventCB, *mut c_void)>; 3],
fence: Option<PipeFence>,
}
#[repr(C)]
pub struct Event {
pub base: CLObjectBase<CL_INVALID_EVENT>,
@ -23,9 +41,9 @@ pub struct Event {
pub queue: Option<Arc<Queue>>,
pub cmd_type: cl_command_type,
pub deps: Vec<Arc<Event>>,
// use AtomicI32 instead of cl_int so we can change it without a &mut reference
status: AtomicI32,
work: Option<EventSig>,
state: Mutex<EventMutState>,
cv: Condvar,
}
impl_cl_type_trait!(cl_event, Event, CL_INVALID_EVENT);
@ -47,8 +65,13 @@ impl Event {
queue: Some(queue.clone()),
cmd_type: cmd_type,
deps: deps,
status: AtomicI32::new(CL_QUEUED as cl_int),
state: Mutex::new(EventMutState {
status: CL_QUEUED as cl_int,
cbs: [Vec::new(), Vec::new(), Vec::new()],
fence: None,
}),
work: Some(work),
cv: Condvar::new(),
})
}
@ -59,8 +82,13 @@ impl Event {
queue: None,
cmd_type: CL_COMMAND_USER,
deps: Vec::new(),
status: AtomicI32::new(CL_SUBMITTED as cl_int),
state: Mutex::new(EventMutState {
status: CL_SUBMITTED as cl_int,
cbs: [Vec::new(), Vec::new(), Vec::new()],
fence: None,
}),
work: None,
cv: Condvar::new(),
})
}
@ -69,32 +97,83 @@ impl Event {
s.iter().map(|e| e.get_arc()).collect()
}
pub fn is_error(&self) -> bool {
self.status.load(Ordering::Relaxed) < 0
fn state(&self) -> MutexGuard<EventMutState> {
self.state.lock().unwrap()
}
pub fn status(&self) -> cl_int {
self.status.load(Ordering::Relaxed)
self.state().status
}
fn set_status(&self, lock: &mut MutexGuard<EventMutState>, new: cl_int) {
lock.status = new;
self.cv.notify_all();
if [CL_COMPLETE, CL_RUNNING, CL_SUBMITTED].contains(&(new as u32)) {
if let Some(cbs) = lock.cbs.get(new as usize) {
cbs.iter()
.for_each(|(cb, data)| unsafe { cb(cl_event::from_ptr(self), new, *data) });
}
}
}
pub fn set_user_status(&self, status: cl_int) {
let mut lock = self.state();
self.set_status(&mut lock, status);
}
pub fn is_error(&self) -> bool {
self.status() < 0
}
pub fn add_cb(&self, state: cl_int, cb: EventCB, data: *mut c_void) {
let mut lock = self.state();
let status = lock.status;
// call cb if the status was already reached
if state >= status {
drop(lock);
unsafe { cb(cl_event::from_ptr(self), status, data) };
} else {
lock.cbs.get_mut(state as usize).unwrap().push((cb, data));
}
}
pub fn wait(&self) -> cl_int {
let mut lock = self.state();
while lock.status >= CL_SUBMITTED as cl_int {
if lock.fence.is_some() {
lock.fence.as_ref().unwrap().wait();
// so we trigger all cbs
self.set_status(&mut lock, CL_RUNNING as cl_int);
self.set_status(&mut lock, CL_COMPLETE as cl_int);
} else {
lock = self.cv.wait(lock).unwrap();
}
}
lock.status
}
// We always assume that work here simply submits stuff to the hardware even if it's just doing
// sw emulation or nothing at all.
// If anything requets waiting, we will update the status through fencing later.
pub fn call(&self, ctx: &Arc<PipeContext>) -> cl_int {
let status = self.status();
let mut lock = self.state();
let status = lock.status;
if status == CL_QUEUED as cl_int {
let new = self.work.as_ref().map_or(
// if there is no work
CL_SUBMITTED as cl_int,
|w| {
w(self.queue.as_ref().unwrap(), ctx).err().map_or(
let res = w(self.queue.as_ref().unwrap(), ctx).err().map_or(
// if there is an error, negate it
CL_SUBMITTED as cl_int,
|e| e,
)
);
lock.fence = Some(ctx.flush());
res
},
);
self.status.store(new, Ordering::Relaxed);
self.set_status(&mut lock, new);
new
} else {
status

View file

@ -24,7 +24,6 @@ pub struct Queue {
pending: Mutex<Vec<Arc<Event>>>,
_thrd: Option<JoinHandle<()>>,
chan_in: mpsc::Sender<Vec<Arc<Event>>>,
chan_out: mpsc::Receiver<bool>,
}
impl_cl_type_trait!(cl_command_queue, Queue, CL_INVALID_COMMAND_QUEUE);
@ -39,7 +38,6 @@ impl Queue {
// should be detected earlier (e.g.: checking for CAPs).
let pipe = device.screen().create_context().unwrap();
let (tx_q, rx_t) = mpsc::channel::<Vec<Arc<Event>>>();
let (tx_t, rx_q) = mpsc::channel::<bool>();
Ok(Arc::new(Self {
base: CLObjectBase::new(),
context: context,
@ -54,17 +52,25 @@ impl Queue {
if r.is_err() {
break;
}
for e in r.unwrap() {
e.call(&pipe);
let new_events = r.unwrap();
for e in &new_events {
// all events should be processed, but we might have to wait on user
// events to happen
let err = e.deps.iter().map(|e| e.wait()).find(|s| *s < 0);
if let Some(err) = err {
// if a dependency failed, fail this event as well
e.set_user_status(err);
} else {
e.call(&pipe);
}
}
if tx_t.send(true).is_err() {
break;
for e in new_events {
e.wait();
}
})
.unwrap(),
),
chan_in: tx_q,
chan_out: rx_q,
}))
}
@ -72,14 +78,18 @@ impl Queue {
self.pending.lock().unwrap().push(e.clone());
}
// TODO: implement non blocking flush
pub fn flush(&self, _wait: bool) -> CLResult<()> {
pub fn flush(&self, wait: bool) -> CLResult<()> {
let mut p = self.pending.lock().unwrap();
let last = p.last().cloned();
// This should never ever error, but if it does return an error
self.chan_in
.send((*p).drain(0..).collect())
.map_err(|_| CL_OUT_OF_HOST_MEMORY)?;
self.chan_out.recv().unwrap();
if wait {
if let Some(last) = last {
last.wait();
}
}
Ok(())
}
}