mirror of
https://gitlab.freedesktop.org/mesa/mesa.git
synced 2025-12-25 04:20:08 +01:00
rusticl/event: proper eventing support
Signed-off-by: Karol Herbst <kherbst@redhat.com> Acked-by: Alyssa Rosenzweig <alyssa.rosenzweig@collabora.com> Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/15439>
This commit is contained in:
parent
50e981a050
commit
47a80d7ff4
4 changed files with 171 additions and 34 deletions
|
|
@ -1,6 +1,7 @@
|
|||
extern crate rusticl_opencl_gen;
|
||||
|
||||
use crate::api::icd::*;
|
||||
use crate::api::types::*;
|
||||
use crate::api::util::*;
|
||||
use crate::core::event::*;
|
||||
use crate::core::queue::*;
|
||||
|
|
@ -59,10 +60,10 @@ pub fn wait_for_events(num_events: cl_uint, event_list: *const cl_event) -> CLRe
|
|||
let mut err = false;
|
||||
for e in evs {
|
||||
if let Some(q) = &e.queue {
|
||||
q.flush(true)?;
|
||||
q.flush(false)?;
|
||||
}
|
||||
|
||||
err |= e.status() < 0;
|
||||
err |= e.wait() < 0;
|
||||
}
|
||||
|
||||
// CL_EXEC_STATUS_ERROR_FOR_EVENTS_IN_WAIT_LIST if the execution status of any of the events
|
||||
|
|
@ -74,6 +75,50 @@ pub fn wait_for_events(num_events: cl_uint, event_list: *const cl_event) -> CLRe
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub fn set_event_callback(
|
||||
event: cl_event,
|
||||
command_exec_callback_type: cl_int,
|
||||
pfn_event_notify: Option<EventCB>,
|
||||
user_data: *mut ::std::os::raw::c_void,
|
||||
) -> CLResult<()> {
|
||||
let e = event.get_ref()?;
|
||||
|
||||
// CL_INVALID_VALUE if pfn_event_notify is NULL
|
||||
// or if command_exec_callback_type is not CL_SUBMITTED, CL_RUNNING, or CL_COMPLETE.
|
||||
if pfn_event_notify.is_none()
|
||||
|| ![CL_SUBMITTED, CL_RUNNING, CL_COMPLETE]
|
||||
.contains(&(command_exec_callback_type as cl_uint))
|
||||
{
|
||||
return Err(CL_INVALID_VALUE);
|
||||
}
|
||||
|
||||
e.add_cb(
|
||||
command_exec_callback_type,
|
||||
pfn_event_notify.unwrap(),
|
||||
user_data,
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn set_user_event_status(event: cl_event, execution_status: cl_int) -> CLResult<()> {
|
||||
let e = event.get_ref()?;
|
||||
|
||||
// CL_INVALID_VALUE if the execution_status is not CL_COMPLETE or a negative integer value.
|
||||
if execution_status != CL_COMPLETE as cl_int && execution_status > 0 {
|
||||
return Err(CL_INVALID_VALUE);
|
||||
}
|
||||
|
||||
// CL_INVALID_OPERATION if the execution_status for event has already been changed by a
|
||||
// previous call to clSetUserEventStatus.
|
||||
if e.status() != CL_SUBMITTED as cl_int {
|
||||
return Err(CL_INVALID_OPERATION);
|
||||
}
|
||||
|
||||
e.set_user_status(execution_status);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn create_and_queue(
|
||||
q: Arc<Queue>,
|
||||
cmd_type: cl_command_type,
|
||||
|
|
|
|||
|
|
@ -1115,13 +1115,17 @@ extern "C" fn cl_get_extension_function_address(
|
|||
}
|
||||
|
||||
extern "C" fn cl_set_event_callback(
|
||||
_event: cl_event,
|
||||
_command_exec_callback_type: cl_int,
|
||||
_pfn_notify: Option<EventCB>,
|
||||
_user_data: *mut ::std::os::raw::c_void,
|
||||
event: cl_event,
|
||||
command_exec_callback_type: cl_int,
|
||||
pfn_notify: Option<EventCB>,
|
||||
user_data: *mut ::std::os::raw::c_void,
|
||||
) -> cl_int {
|
||||
println!("cl_set_event_callback not implemented");
|
||||
CL_OUT_OF_HOST_MEMORY
|
||||
match_err!(set_event_callback(
|
||||
event,
|
||||
command_exec_callback_type,
|
||||
pfn_notify,
|
||||
user_data
|
||||
))
|
||||
}
|
||||
|
||||
extern "C" fn cl_create_sub_buffer(
|
||||
|
|
@ -1151,9 +1155,8 @@ extern "C" fn cl_create_user_event(context: cl_context, errcode_ret: *mut cl_int
|
|||
match_obj!(create_user_event(context), errcode_ret)
|
||||
}
|
||||
|
||||
extern "C" fn cl_set_user_event_status(_event: cl_event, _execution_status: cl_int) -> cl_int {
|
||||
println!("cl_set_user_event_status not implemented");
|
||||
CL_OUT_OF_HOST_MEMORY
|
||||
extern "C" fn cl_set_user_event_status(event: cl_event, execution_status: cl_int) -> cl_int {
|
||||
match_err!(set_user_event_status(event, execution_status))
|
||||
}
|
||||
|
||||
extern "C" fn cl_enqueue_read_buffer_rect(
|
||||
|
|
|
|||
|
|
@ -1,21 +1,39 @@
|
|||
extern crate mesa_rust;
|
||||
extern crate mesa_rust_util;
|
||||
extern crate rusticl_opencl_gen;
|
||||
|
||||
use crate::api::icd::*;
|
||||
use crate::api::types::*;
|
||||
use crate::core::context::*;
|
||||
use crate::core::queue::*;
|
||||
use crate::impl_cl_type_trait;
|
||||
|
||||
use self::mesa_rust::pipe::context::*;
|
||||
use self::mesa_rust::pipe::fence::*;
|
||||
use self::mesa_rust_util::static_assert;
|
||||
use self::rusticl_opencl_gen::*;
|
||||
|
||||
use std::os::raw::c_void;
|
||||
use std::slice;
|
||||
use std::sync::atomic::AtomicI32;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Condvar;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::MutexGuard;
|
||||
|
||||
// we assert that those are a continous range of numbers so we won't have to use HashMaps
|
||||
static_assert!(CL_COMPLETE == 0);
|
||||
static_assert!(CL_RUNNING == 1);
|
||||
static_assert!(CL_SUBMITTED == 2);
|
||||
static_assert!(CL_QUEUED == 3);
|
||||
|
||||
pub type EventSig = Box<dyn Fn(&Arc<Queue>, &Arc<PipeContext>) -> CLResult<()>>;
|
||||
|
||||
struct EventMutState {
|
||||
status: cl_int,
|
||||
cbs: [Vec<(EventCB, *mut c_void)>; 3],
|
||||
fence: Option<PipeFence>,
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
pub struct Event {
|
||||
pub base: CLObjectBase<CL_INVALID_EVENT>,
|
||||
|
|
@ -23,9 +41,9 @@ pub struct Event {
|
|||
pub queue: Option<Arc<Queue>>,
|
||||
pub cmd_type: cl_command_type,
|
||||
pub deps: Vec<Arc<Event>>,
|
||||
// use AtomicI32 instead of cl_int so we can change it without a &mut reference
|
||||
status: AtomicI32,
|
||||
work: Option<EventSig>,
|
||||
state: Mutex<EventMutState>,
|
||||
cv: Condvar,
|
||||
}
|
||||
|
||||
impl_cl_type_trait!(cl_event, Event, CL_INVALID_EVENT);
|
||||
|
|
@ -47,8 +65,13 @@ impl Event {
|
|||
queue: Some(queue.clone()),
|
||||
cmd_type: cmd_type,
|
||||
deps: deps,
|
||||
status: AtomicI32::new(CL_QUEUED as cl_int),
|
||||
state: Mutex::new(EventMutState {
|
||||
status: CL_QUEUED as cl_int,
|
||||
cbs: [Vec::new(), Vec::new(), Vec::new()],
|
||||
fence: None,
|
||||
}),
|
||||
work: Some(work),
|
||||
cv: Condvar::new(),
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -59,8 +82,13 @@ impl Event {
|
|||
queue: None,
|
||||
cmd_type: CL_COMMAND_USER,
|
||||
deps: Vec::new(),
|
||||
status: AtomicI32::new(CL_SUBMITTED as cl_int),
|
||||
state: Mutex::new(EventMutState {
|
||||
status: CL_SUBMITTED as cl_int,
|
||||
cbs: [Vec::new(), Vec::new(), Vec::new()],
|
||||
fence: None,
|
||||
}),
|
||||
work: None,
|
||||
cv: Condvar::new(),
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -69,32 +97,83 @@ impl Event {
|
|||
s.iter().map(|e| e.get_arc()).collect()
|
||||
}
|
||||
|
||||
pub fn is_error(&self) -> bool {
|
||||
self.status.load(Ordering::Relaxed) < 0
|
||||
fn state(&self) -> MutexGuard<EventMutState> {
|
||||
self.state.lock().unwrap()
|
||||
}
|
||||
|
||||
pub fn status(&self) -> cl_int {
|
||||
self.status.load(Ordering::Relaxed)
|
||||
self.state().status
|
||||
}
|
||||
|
||||
fn set_status(&self, lock: &mut MutexGuard<EventMutState>, new: cl_int) {
|
||||
lock.status = new;
|
||||
self.cv.notify_all();
|
||||
if [CL_COMPLETE, CL_RUNNING, CL_SUBMITTED].contains(&(new as u32)) {
|
||||
if let Some(cbs) = lock.cbs.get(new as usize) {
|
||||
cbs.iter()
|
||||
.for_each(|(cb, data)| unsafe { cb(cl_event::from_ptr(self), new, *data) });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_user_status(&self, status: cl_int) {
|
||||
let mut lock = self.state();
|
||||
self.set_status(&mut lock, status);
|
||||
}
|
||||
|
||||
pub fn is_error(&self) -> bool {
|
||||
self.status() < 0
|
||||
}
|
||||
|
||||
pub fn add_cb(&self, state: cl_int, cb: EventCB, data: *mut c_void) {
|
||||
let mut lock = self.state();
|
||||
let status = lock.status;
|
||||
|
||||
// call cb if the status was already reached
|
||||
if state >= status {
|
||||
drop(lock);
|
||||
unsafe { cb(cl_event::from_ptr(self), status, data) };
|
||||
} else {
|
||||
lock.cbs.get_mut(state as usize).unwrap().push((cb, data));
|
||||
}
|
||||
}
|
||||
|
||||
pub fn wait(&self) -> cl_int {
|
||||
let mut lock = self.state();
|
||||
while lock.status >= CL_SUBMITTED as cl_int {
|
||||
if lock.fence.is_some() {
|
||||
lock.fence.as_ref().unwrap().wait();
|
||||
// so we trigger all cbs
|
||||
self.set_status(&mut lock, CL_RUNNING as cl_int);
|
||||
self.set_status(&mut lock, CL_COMPLETE as cl_int);
|
||||
} else {
|
||||
lock = self.cv.wait(lock).unwrap();
|
||||
}
|
||||
}
|
||||
lock.status
|
||||
}
|
||||
|
||||
// We always assume that work here simply submits stuff to the hardware even if it's just doing
|
||||
// sw emulation or nothing at all.
|
||||
// If anything requets waiting, we will update the status through fencing later.
|
||||
pub fn call(&self, ctx: &Arc<PipeContext>) -> cl_int {
|
||||
let status = self.status();
|
||||
let mut lock = self.state();
|
||||
let status = lock.status;
|
||||
if status == CL_QUEUED as cl_int {
|
||||
let new = self.work.as_ref().map_or(
|
||||
// if there is no work
|
||||
CL_SUBMITTED as cl_int,
|
||||
|w| {
|
||||
w(self.queue.as_ref().unwrap(), ctx).err().map_or(
|
||||
let res = w(self.queue.as_ref().unwrap(), ctx).err().map_or(
|
||||
// if there is an error, negate it
|
||||
CL_SUBMITTED as cl_int,
|
||||
|e| e,
|
||||
)
|
||||
);
|
||||
lock.fence = Some(ctx.flush());
|
||||
res
|
||||
},
|
||||
);
|
||||
self.status.store(new, Ordering::Relaxed);
|
||||
self.set_status(&mut lock, new);
|
||||
new
|
||||
} else {
|
||||
status
|
||||
|
|
|
|||
|
|
@ -24,7 +24,6 @@ pub struct Queue {
|
|||
pending: Mutex<Vec<Arc<Event>>>,
|
||||
_thrd: Option<JoinHandle<()>>,
|
||||
chan_in: mpsc::Sender<Vec<Arc<Event>>>,
|
||||
chan_out: mpsc::Receiver<bool>,
|
||||
}
|
||||
|
||||
impl_cl_type_trait!(cl_command_queue, Queue, CL_INVALID_COMMAND_QUEUE);
|
||||
|
|
@ -39,7 +38,6 @@ impl Queue {
|
|||
// should be detected earlier (e.g.: checking for CAPs).
|
||||
let pipe = device.screen().create_context().unwrap();
|
||||
let (tx_q, rx_t) = mpsc::channel::<Vec<Arc<Event>>>();
|
||||
let (tx_t, rx_q) = mpsc::channel::<bool>();
|
||||
Ok(Arc::new(Self {
|
||||
base: CLObjectBase::new(),
|
||||
context: context,
|
||||
|
|
@ -54,17 +52,25 @@ impl Queue {
|
|||
if r.is_err() {
|
||||
break;
|
||||
}
|
||||
for e in r.unwrap() {
|
||||
e.call(&pipe);
|
||||
let new_events = r.unwrap();
|
||||
for e in &new_events {
|
||||
// all events should be processed, but we might have to wait on user
|
||||
// events to happen
|
||||
let err = e.deps.iter().map(|e| e.wait()).find(|s| *s < 0);
|
||||
if let Some(err) = err {
|
||||
// if a dependency failed, fail this event as well
|
||||
e.set_user_status(err);
|
||||
} else {
|
||||
e.call(&pipe);
|
||||
}
|
||||
}
|
||||
if tx_t.send(true).is_err() {
|
||||
break;
|
||||
for e in new_events {
|
||||
e.wait();
|
||||
}
|
||||
})
|
||||
.unwrap(),
|
||||
),
|
||||
chan_in: tx_q,
|
||||
chan_out: rx_q,
|
||||
}))
|
||||
}
|
||||
|
||||
|
|
@ -72,14 +78,18 @@ impl Queue {
|
|||
self.pending.lock().unwrap().push(e.clone());
|
||||
}
|
||||
|
||||
// TODO: implement non blocking flush
|
||||
pub fn flush(&self, _wait: bool) -> CLResult<()> {
|
||||
pub fn flush(&self, wait: bool) -> CLResult<()> {
|
||||
let mut p = self.pending.lock().unwrap();
|
||||
let last = p.last().cloned();
|
||||
// This should never ever error, but if it does return an error
|
||||
self.chan_in
|
||||
.send((*p).drain(0..).collect())
|
||||
.map_err(|_| CL_OUT_OF_HOST_MEMORY)?;
|
||||
self.chan_out.recv().unwrap();
|
||||
if wait {
|
||||
if let Some(last) = last {
|
||||
last.wait();
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue