rusticl/queue: overhaul of the queue+event handling

This new approach handles things as follows:
1. Fences won't be attached to events anymore, applications only wait on
   the cv attached to the event.
2. Only the queue is allowed to update event status for non user events.
   This will eliminate all remaining status updating races between the
   queue and applications waiting on events.
3. Queue minimized flushing by bundling events
4. Increase cv wait timeout as there is really no point in waking up too
   often.

Reduces amount of emited fences on radeonsi in luxmark 3.1 luxball by 90%

Signed-off-by: Karol Herbst <git@karolherbst.de>
Reviewed by Nora Allen <blackcatgames@protonmail.com>

Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/23612>
This commit is contained in:
Karol Herbst 2023-06-13 02:07:02 +02:00 committed by Marge Bot
parent 6114e66124
commit 5b3ff7e3f3
2 changed files with 92 additions and 43 deletions

View file

@ -5,7 +5,6 @@ use crate::core::queue::*;
use crate::impl_cl_type_trait;
use mesa_rust::pipe::context::*;
use mesa_rust::pipe::fence::*;
use mesa_rust_util::static_assert;
use rusticl_opencl_gen::*;
@ -29,7 +28,6 @@ pub type EventSig = Box<dyn Fn(&Arc<Queue>, &PipeContext) -> CLResult<()>>;
struct EventMutState {
status: cl_int,
cbs: [Vec<(EventCB, *mut c_void)>; 3],
fence: Option<PipeFence>,
work: Option<EventSig>,
}
@ -65,7 +63,6 @@ impl Event {
state: Mutex::new(EventMutState {
status: CL_QUEUED as cl_int,
cbs: [Vec::new(), Vec::new(), Vec::new()],
fence: None,
work: Some(work),
}),
cv: Condvar::new(),
@ -82,7 +79,6 @@ impl Event {
state: Mutex::new(EventMutState {
status: CL_SUBMITTED as cl_int,
cbs: [Vec::new(), Vec::new(), Vec::new()],
fence: None,
work: None,
}),
cv: Condvar::new(),
@ -104,7 +100,12 @@ impl Event {
fn set_status(&self, lock: &mut MutexGuard<EventMutState>, new: cl_int) {
lock.status = new;
self.cv.notify_all();
// signal on completion or an error
if new <= CL_COMPLETE as cl_int {
self.cv.notify_all();
}
if [CL_COMPLETE, CL_RUNNING, CL_SUBMITTED].contains(&(new as u32)) {
if let Some(cbs) = lock.cbs.get(new as usize) {
cbs.iter()
@ -122,6 +123,10 @@ impl Event {
self.status() < 0
}
pub fn is_user(&self) -> bool {
self.cmd_type == CL_COMMAND_USER
}
pub fn add_cb(&self, state: cl_int, cb: EventCB, data: *mut c_void) {
let mut lock = self.state();
let status = lock.status;
@ -135,21 +140,21 @@ impl Event {
}
}
pub(super) fn signal(&self) {
let mut lock = self.state();
self.set_status(&mut lock, CL_RUNNING as cl_int);
self.set_status(&mut lock, CL_COMPLETE as cl_int);
}
pub fn wait(&self) -> cl_int {
let mut lock = self.state();
while lock.status >= CL_SUBMITTED as cl_int {
if lock.fence.is_some() {
lock.fence.as_ref().unwrap().wait();
// so we trigger all cbs
self.set_status(&mut lock, CL_RUNNING as cl_int);
self.set_status(&mut lock, CL_COMPLETE as cl_int);
} else {
lock = self
.cv
.wait_timeout(lock, Duration::from_millis(50))
.unwrap()
.0;
}
while lock.status >= CL_RUNNING as cl_int {
lock = self
.cv
.wait_timeout(lock, Duration::from_secs(1))
.unwrap()
.0;
}
lock.status
}
@ -157,7 +162,7 @@ impl Event {
// We always assume that work here simply submits stuff to the hardware even if it's just doing
// sw emulation or nothing at all.
// If anything requets waiting, we will update the status through fencing later.
pub fn call(&self, ctx: &PipeContext) -> cl_int {
pub fn call(&self, ctx: &PipeContext) {
let mut lock = self.state();
let status = lock.status;
if status == CL_QUEUED as cl_int {
@ -171,7 +176,6 @@ impl Event {
CL_SUBMITTED as cl_int,
|e| e,
);
lock.fence = Some(ctx.flush());
res
},
);
@ -180,9 +184,6 @@ impl Event {
// absolutely sure it happens before the status update.
drop(work);
self.set_status(&mut lock, new);
new
} else {
status
}
}

View file

@ -4,6 +4,7 @@ use crate::core::device::*;
use crate::core::event::*;
use crate::impl_cl_type_trait;
use mesa_rust::pipe::context::PipeContext;
use mesa_rust_util::properties::*;
use rusticl_opencl_gen::*;
@ -14,19 +15,31 @@ use std::sync::Mutex;
use std::thread;
use std::thread::JoinHandle;
struct QueueState {
pending: Vec<Arc<Event>>,
last: Option<Arc<Event>>,
}
pub struct Queue {
pub base: CLObjectBase<CL_INVALID_COMMAND_QUEUE>,
pub context: Arc<Context>,
pub device: Arc<Device>,
pub props: cl_command_queue_properties,
pub props_v2: Option<Properties<cl_queue_properties>>,
pending: Mutex<Vec<Arc<Event>>>,
state: Mutex<QueueState>,
_thrd: Option<JoinHandle<()>>,
chan_in: mpsc::Sender<Vec<Arc<Event>>>,
}
impl_cl_type_trait!(cl_command_queue, Queue, CL_INVALID_COMMAND_QUEUE);
fn flush_events(evs: &mut Vec<Arc<Event>>, pipe: &PipeContext) {
if !evs.is_empty() {
pipe.flush().wait();
evs.drain(..).for_each(|e| e.signal());
}
}
impl Queue {
pub fn new(
context: Arc<Context>,
@ -44,7 +57,10 @@ impl Queue {
device: device,
props: props,
props_v2: props_v2,
pending: Mutex::new(Vec::new()),
state: Mutex::new(QueueState {
pending: Vec::new(),
last: None,
}),
_thrd: Some(
thread::Builder::new()
.name("rusticl queue thread".into())
@ -53,21 +69,47 @@ impl Queue {
if r.is_err() {
break;
}
let new_events = r.unwrap();
for e in &new_events {
// all events should be processed, but we might have to wait on user
// events to happen
let err = e.deps.iter().map(|e| e.wait()).find(|s| *s < 0);
let mut flushed = Vec::new();
for e in new_events {
// If we hit any deps from another queue, flush so we don't risk a dead
// lock.
if e.deps.iter().any(|ev| ev.queue != e.queue) {
flush_events(&mut flushed, &pipe);
}
// We have to wait on user events or events from other queues.
let err = e
.deps
.iter()
.filter(|ev| ev.is_user() || ev.queue != e.queue)
.map(|e| e.wait())
.find(|s| *s < 0);
if let Some(err) = err {
// if a dependency failed, fail this event as well
// If a dependency failed, fail this event as well.
e.set_user_status(err);
continue;
}
e.call(&pipe);
if !e.is_user() {
flushed.push(e);
} else {
e.call(&pipe);
// On each user event we flush our events as application might
// wait on them before signaling user events.
flush_events(&mut flushed, &pipe);
// Wait on user events as they are synchronization points in the
// application's control.
e.wait();
}
}
for e in new_events {
e.wait();
}
flush_events(&mut flushed, &pipe);
})
.unwrap(),
),
@ -76,28 +118,34 @@ impl Queue {
}
pub fn queue(&self, e: Arc<Event>) {
self.pending.lock().unwrap().push(e);
self.state.lock().unwrap().pending.push(e);
}
pub fn flush(&self, wait: bool) -> CLResult<()> {
let mut p = self.pending.lock().unwrap();
let events = p.clone();
let mut state = self.state.lock().unwrap();
// Update last if and only if we get new events, this prevents breaking application code
// doing things like `clFlush(q); clFinish(q);`
if let Some(last) = state.pending.last() {
state.last = Some(last.clone());
}
// This should never ever error, but if it does return an error
self.chan_in
.send((*p).drain(0..).collect())
.send((state.pending).drain(0..).collect())
.map_err(|_| CL_OUT_OF_HOST_MEMORY)?;
if wait {
for e in events {
e.wait();
}
// Waiting on the last event is good enough here as the queue will process it in order,
// also take the value so we can release the event once we are done
state.last.take().map(|e| e.wait());
}
Ok(())
}
pub fn dependencies_for_pending_events(&self) -> HashSet<Arc<Queue>> {
let p = self.pending.lock().unwrap();
let state = self.state.lock().unwrap();
let mut queues = Event::deep_unflushed_queues(&p);
let mut queues = Event::deep_unflushed_queues(&state.pending);
queues.remove(self);
queues
}