mirror of
https://gitlab.freedesktop.org/mesa/mesa.git
synced 2026-05-08 06:58:05 +02:00
rusticl/event: proper eventing support
Signed-off-by: Karol Herbst <kherbst@redhat.com> Acked-by: Alyssa Rosenzweig <alyssa.rosenzweig@collabora.com> Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/15439>
This commit is contained in:
parent
50e981a050
commit
47a80d7ff4
4 changed files with 171 additions and 34 deletions
|
|
@ -1,6 +1,7 @@
|
||||||
extern crate rusticl_opencl_gen;
|
extern crate rusticl_opencl_gen;
|
||||||
|
|
||||||
use crate::api::icd::*;
|
use crate::api::icd::*;
|
||||||
|
use crate::api::types::*;
|
||||||
use crate::api::util::*;
|
use crate::api::util::*;
|
||||||
use crate::core::event::*;
|
use crate::core::event::*;
|
||||||
use crate::core::queue::*;
|
use crate::core::queue::*;
|
||||||
|
|
@ -59,10 +60,10 @@ pub fn wait_for_events(num_events: cl_uint, event_list: *const cl_event) -> CLRe
|
||||||
let mut err = false;
|
let mut err = false;
|
||||||
for e in evs {
|
for e in evs {
|
||||||
if let Some(q) = &e.queue {
|
if let Some(q) = &e.queue {
|
||||||
q.flush(true)?;
|
q.flush(false)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
err |= e.status() < 0;
|
err |= e.wait() < 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// CL_EXEC_STATUS_ERROR_FOR_EVENTS_IN_WAIT_LIST if the execution status of any of the events
|
// CL_EXEC_STATUS_ERROR_FOR_EVENTS_IN_WAIT_LIST if the execution status of any of the events
|
||||||
|
|
@ -74,6 +75,50 @@ pub fn wait_for_events(num_events: cl_uint, event_list: *const cl_event) -> CLRe
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn set_event_callback(
|
||||||
|
event: cl_event,
|
||||||
|
command_exec_callback_type: cl_int,
|
||||||
|
pfn_event_notify: Option<EventCB>,
|
||||||
|
user_data: *mut ::std::os::raw::c_void,
|
||||||
|
) -> CLResult<()> {
|
||||||
|
let e = event.get_ref()?;
|
||||||
|
|
||||||
|
// CL_INVALID_VALUE if pfn_event_notify is NULL
|
||||||
|
// or if command_exec_callback_type is not CL_SUBMITTED, CL_RUNNING, or CL_COMPLETE.
|
||||||
|
if pfn_event_notify.is_none()
|
||||||
|
|| ![CL_SUBMITTED, CL_RUNNING, CL_COMPLETE]
|
||||||
|
.contains(&(command_exec_callback_type as cl_uint))
|
||||||
|
{
|
||||||
|
return Err(CL_INVALID_VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
e.add_cb(
|
||||||
|
command_exec_callback_type,
|
||||||
|
pfn_event_notify.unwrap(),
|
||||||
|
user_data,
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_user_event_status(event: cl_event, execution_status: cl_int) -> CLResult<()> {
|
||||||
|
let e = event.get_ref()?;
|
||||||
|
|
||||||
|
// CL_INVALID_VALUE if the execution_status is not CL_COMPLETE or a negative integer value.
|
||||||
|
if execution_status != CL_COMPLETE as cl_int && execution_status > 0 {
|
||||||
|
return Err(CL_INVALID_VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
// CL_INVALID_OPERATION if the execution_status for event has already been changed by a
|
||||||
|
// previous call to clSetUserEventStatus.
|
||||||
|
if e.status() != CL_SUBMITTED as cl_int {
|
||||||
|
return Err(CL_INVALID_OPERATION);
|
||||||
|
}
|
||||||
|
|
||||||
|
e.set_user_status(execution_status);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub fn create_and_queue(
|
pub fn create_and_queue(
|
||||||
q: Arc<Queue>,
|
q: Arc<Queue>,
|
||||||
cmd_type: cl_command_type,
|
cmd_type: cl_command_type,
|
||||||
|
|
|
||||||
|
|
@ -1115,13 +1115,17 @@ extern "C" fn cl_get_extension_function_address(
|
||||||
}
|
}
|
||||||
|
|
||||||
extern "C" fn cl_set_event_callback(
|
extern "C" fn cl_set_event_callback(
|
||||||
_event: cl_event,
|
event: cl_event,
|
||||||
_command_exec_callback_type: cl_int,
|
command_exec_callback_type: cl_int,
|
||||||
_pfn_notify: Option<EventCB>,
|
pfn_notify: Option<EventCB>,
|
||||||
_user_data: *mut ::std::os::raw::c_void,
|
user_data: *mut ::std::os::raw::c_void,
|
||||||
) -> cl_int {
|
) -> cl_int {
|
||||||
println!("cl_set_event_callback not implemented");
|
match_err!(set_event_callback(
|
||||||
CL_OUT_OF_HOST_MEMORY
|
event,
|
||||||
|
command_exec_callback_type,
|
||||||
|
pfn_notify,
|
||||||
|
user_data
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
extern "C" fn cl_create_sub_buffer(
|
extern "C" fn cl_create_sub_buffer(
|
||||||
|
|
@ -1151,9 +1155,8 @@ extern "C" fn cl_create_user_event(context: cl_context, errcode_ret: *mut cl_int
|
||||||
match_obj!(create_user_event(context), errcode_ret)
|
match_obj!(create_user_event(context), errcode_ret)
|
||||||
}
|
}
|
||||||
|
|
||||||
extern "C" fn cl_set_user_event_status(_event: cl_event, _execution_status: cl_int) -> cl_int {
|
extern "C" fn cl_set_user_event_status(event: cl_event, execution_status: cl_int) -> cl_int {
|
||||||
println!("cl_set_user_event_status not implemented");
|
match_err!(set_user_event_status(event, execution_status))
|
||||||
CL_OUT_OF_HOST_MEMORY
|
|
||||||
}
|
}
|
||||||
|
|
||||||
extern "C" fn cl_enqueue_read_buffer_rect(
|
extern "C" fn cl_enqueue_read_buffer_rect(
|
||||||
|
|
|
||||||
|
|
@ -1,21 +1,39 @@
|
||||||
extern crate mesa_rust;
|
extern crate mesa_rust;
|
||||||
|
extern crate mesa_rust_util;
|
||||||
extern crate rusticl_opencl_gen;
|
extern crate rusticl_opencl_gen;
|
||||||
|
|
||||||
use crate::api::icd::*;
|
use crate::api::icd::*;
|
||||||
|
use crate::api::types::*;
|
||||||
use crate::core::context::*;
|
use crate::core::context::*;
|
||||||
use crate::core::queue::*;
|
use crate::core::queue::*;
|
||||||
use crate::impl_cl_type_trait;
|
use crate::impl_cl_type_trait;
|
||||||
|
|
||||||
use self::mesa_rust::pipe::context::*;
|
use self::mesa_rust::pipe::context::*;
|
||||||
|
use self::mesa_rust::pipe::fence::*;
|
||||||
|
use self::mesa_rust_util::static_assert;
|
||||||
use self::rusticl_opencl_gen::*;
|
use self::rusticl_opencl_gen::*;
|
||||||
|
|
||||||
|
use std::os::raw::c_void;
|
||||||
use std::slice;
|
use std::slice;
|
||||||
use std::sync::atomic::AtomicI32;
|
|
||||||
use std::sync::atomic::Ordering;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::sync::Condvar;
|
||||||
|
use std::sync::Mutex;
|
||||||
|
use std::sync::MutexGuard;
|
||||||
|
|
||||||
|
// we assert that those are a continous range of numbers so we won't have to use HashMaps
|
||||||
|
static_assert!(CL_COMPLETE == 0);
|
||||||
|
static_assert!(CL_RUNNING == 1);
|
||||||
|
static_assert!(CL_SUBMITTED == 2);
|
||||||
|
static_assert!(CL_QUEUED == 3);
|
||||||
|
|
||||||
pub type EventSig = Box<dyn Fn(&Arc<Queue>, &Arc<PipeContext>) -> CLResult<()>>;
|
pub type EventSig = Box<dyn Fn(&Arc<Queue>, &Arc<PipeContext>) -> CLResult<()>>;
|
||||||
|
|
||||||
|
struct EventMutState {
|
||||||
|
status: cl_int,
|
||||||
|
cbs: [Vec<(EventCB, *mut c_void)>; 3],
|
||||||
|
fence: Option<PipeFence>,
|
||||||
|
}
|
||||||
|
|
||||||
#[repr(C)]
|
#[repr(C)]
|
||||||
pub struct Event {
|
pub struct Event {
|
||||||
pub base: CLObjectBase<CL_INVALID_EVENT>,
|
pub base: CLObjectBase<CL_INVALID_EVENT>,
|
||||||
|
|
@ -23,9 +41,9 @@ pub struct Event {
|
||||||
pub queue: Option<Arc<Queue>>,
|
pub queue: Option<Arc<Queue>>,
|
||||||
pub cmd_type: cl_command_type,
|
pub cmd_type: cl_command_type,
|
||||||
pub deps: Vec<Arc<Event>>,
|
pub deps: Vec<Arc<Event>>,
|
||||||
// use AtomicI32 instead of cl_int so we can change it without a &mut reference
|
|
||||||
status: AtomicI32,
|
|
||||||
work: Option<EventSig>,
|
work: Option<EventSig>,
|
||||||
|
state: Mutex<EventMutState>,
|
||||||
|
cv: Condvar,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl_cl_type_trait!(cl_event, Event, CL_INVALID_EVENT);
|
impl_cl_type_trait!(cl_event, Event, CL_INVALID_EVENT);
|
||||||
|
|
@ -47,8 +65,13 @@ impl Event {
|
||||||
queue: Some(queue.clone()),
|
queue: Some(queue.clone()),
|
||||||
cmd_type: cmd_type,
|
cmd_type: cmd_type,
|
||||||
deps: deps,
|
deps: deps,
|
||||||
status: AtomicI32::new(CL_QUEUED as cl_int),
|
state: Mutex::new(EventMutState {
|
||||||
|
status: CL_QUEUED as cl_int,
|
||||||
|
cbs: [Vec::new(), Vec::new(), Vec::new()],
|
||||||
|
fence: None,
|
||||||
|
}),
|
||||||
work: Some(work),
|
work: Some(work),
|
||||||
|
cv: Condvar::new(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -59,8 +82,13 @@ impl Event {
|
||||||
queue: None,
|
queue: None,
|
||||||
cmd_type: CL_COMMAND_USER,
|
cmd_type: CL_COMMAND_USER,
|
||||||
deps: Vec::new(),
|
deps: Vec::new(),
|
||||||
status: AtomicI32::new(CL_SUBMITTED as cl_int),
|
state: Mutex::new(EventMutState {
|
||||||
|
status: CL_SUBMITTED as cl_int,
|
||||||
|
cbs: [Vec::new(), Vec::new(), Vec::new()],
|
||||||
|
fence: None,
|
||||||
|
}),
|
||||||
work: None,
|
work: None,
|
||||||
|
cv: Condvar::new(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -69,32 +97,83 @@ impl Event {
|
||||||
s.iter().map(|e| e.get_arc()).collect()
|
s.iter().map(|e| e.get_arc()).collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn is_error(&self) -> bool {
|
fn state(&self) -> MutexGuard<EventMutState> {
|
||||||
self.status.load(Ordering::Relaxed) < 0
|
self.state.lock().unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn status(&self) -> cl_int {
|
pub fn status(&self) -> cl_int {
|
||||||
self.status.load(Ordering::Relaxed)
|
self.state().status
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_status(&self, lock: &mut MutexGuard<EventMutState>, new: cl_int) {
|
||||||
|
lock.status = new;
|
||||||
|
self.cv.notify_all();
|
||||||
|
if [CL_COMPLETE, CL_RUNNING, CL_SUBMITTED].contains(&(new as u32)) {
|
||||||
|
if let Some(cbs) = lock.cbs.get(new as usize) {
|
||||||
|
cbs.iter()
|
||||||
|
.for_each(|(cb, data)| unsafe { cb(cl_event::from_ptr(self), new, *data) });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_user_status(&self, status: cl_int) {
|
||||||
|
let mut lock = self.state();
|
||||||
|
self.set_status(&mut lock, status);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_error(&self) -> bool {
|
||||||
|
self.status() < 0
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_cb(&self, state: cl_int, cb: EventCB, data: *mut c_void) {
|
||||||
|
let mut lock = self.state();
|
||||||
|
let status = lock.status;
|
||||||
|
|
||||||
|
// call cb if the status was already reached
|
||||||
|
if state >= status {
|
||||||
|
drop(lock);
|
||||||
|
unsafe { cb(cl_event::from_ptr(self), status, data) };
|
||||||
|
} else {
|
||||||
|
lock.cbs.get_mut(state as usize).unwrap().push((cb, data));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn wait(&self) -> cl_int {
|
||||||
|
let mut lock = self.state();
|
||||||
|
while lock.status >= CL_SUBMITTED as cl_int {
|
||||||
|
if lock.fence.is_some() {
|
||||||
|
lock.fence.as_ref().unwrap().wait();
|
||||||
|
// so we trigger all cbs
|
||||||
|
self.set_status(&mut lock, CL_RUNNING as cl_int);
|
||||||
|
self.set_status(&mut lock, CL_COMPLETE as cl_int);
|
||||||
|
} else {
|
||||||
|
lock = self.cv.wait(lock).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
lock.status
|
||||||
}
|
}
|
||||||
|
|
||||||
// We always assume that work here simply submits stuff to the hardware even if it's just doing
|
// We always assume that work here simply submits stuff to the hardware even if it's just doing
|
||||||
// sw emulation or nothing at all.
|
// sw emulation or nothing at all.
|
||||||
// If anything requets waiting, we will update the status through fencing later.
|
// If anything requets waiting, we will update the status through fencing later.
|
||||||
pub fn call(&self, ctx: &Arc<PipeContext>) -> cl_int {
|
pub fn call(&self, ctx: &Arc<PipeContext>) -> cl_int {
|
||||||
let status = self.status();
|
let mut lock = self.state();
|
||||||
|
let status = lock.status;
|
||||||
if status == CL_QUEUED as cl_int {
|
if status == CL_QUEUED as cl_int {
|
||||||
let new = self.work.as_ref().map_or(
|
let new = self.work.as_ref().map_or(
|
||||||
// if there is no work
|
// if there is no work
|
||||||
CL_SUBMITTED as cl_int,
|
CL_SUBMITTED as cl_int,
|
||||||
|w| {
|
|w| {
|
||||||
w(self.queue.as_ref().unwrap(), ctx).err().map_or(
|
let res = w(self.queue.as_ref().unwrap(), ctx).err().map_or(
|
||||||
// if there is an error, negate it
|
// if there is an error, negate it
|
||||||
CL_SUBMITTED as cl_int,
|
CL_SUBMITTED as cl_int,
|
||||||
|e| e,
|
|e| e,
|
||||||
)
|
);
|
||||||
|
lock.fence = Some(ctx.flush());
|
||||||
|
res
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
self.status.store(new, Ordering::Relaxed);
|
self.set_status(&mut lock, new);
|
||||||
new
|
new
|
||||||
} else {
|
} else {
|
||||||
status
|
status
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,6 @@ pub struct Queue {
|
||||||
pending: Mutex<Vec<Arc<Event>>>,
|
pending: Mutex<Vec<Arc<Event>>>,
|
||||||
_thrd: Option<JoinHandle<()>>,
|
_thrd: Option<JoinHandle<()>>,
|
||||||
chan_in: mpsc::Sender<Vec<Arc<Event>>>,
|
chan_in: mpsc::Sender<Vec<Arc<Event>>>,
|
||||||
chan_out: mpsc::Receiver<bool>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl_cl_type_trait!(cl_command_queue, Queue, CL_INVALID_COMMAND_QUEUE);
|
impl_cl_type_trait!(cl_command_queue, Queue, CL_INVALID_COMMAND_QUEUE);
|
||||||
|
|
@ -39,7 +38,6 @@ impl Queue {
|
||||||
// should be detected earlier (e.g.: checking for CAPs).
|
// should be detected earlier (e.g.: checking for CAPs).
|
||||||
let pipe = device.screen().create_context().unwrap();
|
let pipe = device.screen().create_context().unwrap();
|
||||||
let (tx_q, rx_t) = mpsc::channel::<Vec<Arc<Event>>>();
|
let (tx_q, rx_t) = mpsc::channel::<Vec<Arc<Event>>>();
|
||||||
let (tx_t, rx_q) = mpsc::channel::<bool>();
|
|
||||||
Ok(Arc::new(Self {
|
Ok(Arc::new(Self {
|
||||||
base: CLObjectBase::new(),
|
base: CLObjectBase::new(),
|
||||||
context: context,
|
context: context,
|
||||||
|
|
@ -54,17 +52,25 @@ impl Queue {
|
||||||
if r.is_err() {
|
if r.is_err() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
for e in r.unwrap() {
|
let new_events = r.unwrap();
|
||||||
|
for e in &new_events {
|
||||||
|
// all events should be processed, but we might have to wait on user
|
||||||
|
// events to happen
|
||||||
|
let err = e.deps.iter().map(|e| e.wait()).find(|s| *s < 0);
|
||||||
|
if let Some(err) = err {
|
||||||
|
// if a dependency failed, fail this event as well
|
||||||
|
e.set_user_status(err);
|
||||||
|
} else {
|
||||||
e.call(&pipe);
|
e.call(&pipe);
|
||||||
}
|
}
|
||||||
if tx_t.send(true).is_err() {
|
}
|
||||||
break;
|
for e in new_events {
|
||||||
|
e.wait();
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.unwrap(),
|
.unwrap(),
|
||||||
),
|
),
|
||||||
chan_in: tx_q,
|
chan_in: tx_q,
|
||||||
chan_out: rx_q,
|
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -72,14 +78,18 @@ impl Queue {
|
||||||
self.pending.lock().unwrap().push(e.clone());
|
self.pending.lock().unwrap().push(e.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: implement non blocking flush
|
pub fn flush(&self, wait: bool) -> CLResult<()> {
|
||||||
pub fn flush(&self, _wait: bool) -> CLResult<()> {
|
|
||||||
let mut p = self.pending.lock().unwrap();
|
let mut p = self.pending.lock().unwrap();
|
||||||
|
let last = p.last().cloned();
|
||||||
// This should never ever error, but if it does return an error
|
// This should never ever error, but if it does return an error
|
||||||
self.chan_in
|
self.chan_in
|
||||||
.send((*p).drain(0..).collect())
|
.send((*p).drain(0..).collect())
|
||||||
.map_err(|_| CL_OUT_OF_HOST_MEMORY)?;
|
.map_err(|_| CL_OUT_OF_HOST_MEMORY)?;
|
||||||
self.chan_out.recv().unwrap();
|
if wait {
|
||||||
|
if let Some(last) = last {
|
||||||
|
last.wait();
|
||||||
|
}
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue