mirror of
https://gitlab.freedesktop.org/mesa/mesa.git
synced 2026-01-10 23:20:14 +01:00
rusticl/queue: overhaul of the queue+event handling
This new approach handles things as follows: 1. Fences won't be attached to events anymore, applications only wait on the cv attached to the event. 2. Only the queue is allowed to update event status for non user events. This will eliminate all remaining status updating races between the queue and applications waiting on events. 3. Queue minimized flushing by bundling events 4. Increase cv wait timeout as there is really no point in waking up too often. Reduces amount of emited fences on radeonsi in luxmark 3.1 luxball by 90% Signed-off-by: Karol Herbst <git@karolherbst.de> Reviewed by Nora Allen <blackcatgames@protonmail.com> Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/23612>
This commit is contained in:
parent
6114e66124
commit
5b3ff7e3f3
2 changed files with 92 additions and 43 deletions
|
|
@ -5,7 +5,6 @@ use crate::core::queue::*;
|
|||
use crate::impl_cl_type_trait;
|
||||
|
||||
use mesa_rust::pipe::context::*;
|
||||
use mesa_rust::pipe::fence::*;
|
||||
use mesa_rust_util::static_assert;
|
||||
use rusticl_opencl_gen::*;
|
||||
|
||||
|
|
@ -29,7 +28,6 @@ pub type EventSig = Box<dyn Fn(&Arc<Queue>, &PipeContext) -> CLResult<()>>;
|
|||
struct EventMutState {
|
||||
status: cl_int,
|
||||
cbs: [Vec<(EventCB, *mut c_void)>; 3],
|
||||
fence: Option<PipeFence>,
|
||||
work: Option<EventSig>,
|
||||
}
|
||||
|
||||
|
|
@ -65,7 +63,6 @@ impl Event {
|
|||
state: Mutex::new(EventMutState {
|
||||
status: CL_QUEUED as cl_int,
|
||||
cbs: [Vec::new(), Vec::new(), Vec::new()],
|
||||
fence: None,
|
||||
work: Some(work),
|
||||
}),
|
||||
cv: Condvar::new(),
|
||||
|
|
@ -82,7 +79,6 @@ impl Event {
|
|||
state: Mutex::new(EventMutState {
|
||||
status: CL_SUBMITTED as cl_int,
|
||||
cbs: [Vec::new(), Vec::new(), Vec::new()],
|
||||
fence: None,
|
||||
work: None,
|
||||
}),
|
||||
cv: Condvar::new(),
|
||||
|
|
@ -104,7 +100,12 @@ impl Event {
|
|||
|
||||
fn set_status(&self, lock: &mut MutexGuard<EventMutState>, new: cl_int) {
|
||||
lock.status = new;
|
||||
self.cv.notify_all();
|
||||
|
||||
// signal on completion or an error
|
||||
if new <= CL_COMPLETE as cl_int {
|
||||
self.cv.notify_all();
|
||||
}
|
||||
|
||||
if [CL_COMPLETE, CL_RUNNING, CL_SUBMITTED].contains(&(new as u32)) {
|
||||
if let Some(cbs) = lock.cbs.get(new as usize) {
|
||||
cbs.iter()
|
||||
|
|
@ -122,6 +123,10 @@ impl Event {
|
|||
self.status() < 0
|
||||
}
|
||||
|
||||
pub fn is_user(&self) -> bool {
|
||||
self.cmd_type == CL_COMMAND_USER
|
||||
}
|
||||
|
||||
pub fn add_cb(&self, state: cl_int, cb: EventCB, data: *mut c_void) {
|
||||
let mut lock = self.state();
|
||||
let status = lock.status;
|
||||
|
|
@ -135,21 +140,21 @@ impl Event {
|
|||
}
|
||||
}
|
||||
|
||||
pub(super) fn signal(&self) {
|
||||
let mut lock = self.state();
|
||||
|
||||
self.set_status(&mut lock, CL_RUNNING as cl_int);
|
||||
self.set_status(&mut lock, CL_COMPLETE as cl_int);
|
||||
}
|
||||
|
||||
pub fn wait(&self) -> cl_int {
|
||||
let mut lock = self.state();
|
||||
while lock.status >= CL_SUBMITTED as cl_int {
|
||||
if lock.fence.is_some() {
|
||||
lock.fence.as_ref().unwrap().wait();
|
||||
// so we trigger all cbs
|
||||
self.set_status(&mut lock, CL_RUNNING as cl_int);
|
||||
self.set_status(&mut lock, CL_COMPLETE as cl_int);
|
||||
} else {
|
||||
lock = self
|
||||
.cv
|
||||
.wait_timeout(lock, Duration::from_millis(50))
|
||||
.unwrap()
|
||||
.0;
|
||||
}
|
||||
while lock.status >= CL_RUNNING as cl_int {
|
||||
lock = self
|
||||
.cv
|
||||
.wait_timeout(lock, Duration::from_secs(1))
|
||||
.unwrap()
|
||||
.0;
|
||||
}
|
||||
lock.status
|
||||
}
|
||||
|
|
@ -157,7 +162,7 @@ impl Event {
|
|||
// We always assume that work here simply submits stuff to the hardware even if it's just doing
|
||||
// sw emulation or nothing at all.
|
||||
// If anything requets waiting, we will update the status through fencing later.
|
||||
pub fn call(&self, ctx: &PipeContext) -> cl_int {
|
||||
pub fn call(&self, ctx: &PipeContext) {
|
||||
let mut lock = self.state();
|
||||
let status = lock.status;
|
||||
if status == CL_QUEUED as cl_int {
|
||||
|
|
@ -171,7 +176,6 @@ impl Event {
|
|||
CL_SUBMITTED as cl_int,
|
||||
|e| e,
|
||||
);
|
||||
lock.fence = Some(ctx.flush());
|
||||
res
|
||||
},
|
||||
);
|
||||
|
|
@ -180,9 +184,6 @@ impl Event {
|
|||
// absolutely sure it happens before the status update.
|
||||
drop(work);
|
||||
self.set_status(&mut lock, new);
|
||||
new
|
||||
} else {
|
||||
status
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ use crate::core::device::*;
|
|||
use crate::core::event::*;
|
||||
use crate::impl_cl_type_trait;
|
||||
|
||||
use mesa_rust::pipe::context::PipeContext;
|
||||
use mesa_rust_util::properties::*;
|
||||
use rusticl_opencl_gen::*;
|
||||
|
||||
|
|
@ -14,19 +15,31 @@ use std::sync::Mutex;
|
|||
use std::thread;
|
||||
use std::thread::JoinHandle;
|
||||
|
||||
struct QueueState {
|
||||
pending: Vec<Arc<Event>>,
|
||||
last: Option<Arc<Event>>,
|
||||
}
|
||||
|
||||
pub struct Queue {
|
||||
pub base: CLObjectBase<CL_INVALID_COMMAND_QUEUE>,
|
||||
pub context: Arc<Context>,
|
||||
pub device: Arc<Device>,
|
||||
pub props: cl_command_queue_properties,
|
||||
pub props_v2: Option<Properties<cl_queue_properties>>,
|
||||
pending: Mutex<Vec<Arc<Event>>>,
|
||||
state: Mutex<QueueState>,
|
||||
_thrd: Option<JoinHandle<()>>,
|
||||
chan_in: mpsc::Sender<Vec<Arc<Event>>>,
|
||||
}
|
||||
|
||||
impl_cl_type_trait!(cl_command_queue, Queue, CL_INVALID_COMMAND_QUEUE);
|
||||
|
||||
fn flush_events(evs: &mut Vec<Arc<Event>>, pipe: &PipeContext) {
|
||||
if !evs.is_empty() {
|
||||
pipe.flush().wait();
|
||||
evs.drain(..).for_each(|e| e.signal());
|
||||
}
|
||||
}
|
||||
|
||||
impl Queue {
|
||||
pub fn new(
|
||||
context: Arc<Context>,
|
||||
|
|
@ -44,7 +57,10 @@ impl Queue {
|
|||
device: device,
|
||||
props: props,
|
||||
props_v2: props_v2,
|
||||
pending: Mutex::new(Vec::new()),
|
||||
state: Mutex::new(QueueState {
|
||||
pending: Vec::new(),
|
||||
last: None,
|
||||
}),
|
||||
_thrd: Some(
|
||||
thread::Builder::new()
|
||||
.name("rusticl queue thread".into())
|
||||
|
|
@ -53,21 +69,47 @@ impl Queue {
|
|||
if r.is_err() {
|
||||
break;
|
||||
}
|
||||
|
||||
let new_events = r.unwrap();
|
||||
for e in &new_events {
|
||||
// all events should be processed, but we might have to wait on user
|
||||
// events to happen
|
||||
let err = e.deps.iter().map(|e| e.wait()).find(|s| *s < 0);
|
||||
let mut flushed = Vec::new();
|
||||
|
||||
for e in new_events {
|
||||
// If we hit any deps from another queue, flush so we don't risk a dead
|
||||
// lock.
|
||||
if e.deps.iter().any(|ev| ev.queue != e.queue) {
|
||||
flush_events(&mut flushed, &pipe);
|
||||
}
|
||||
|
||||
// We have to wait on user events or events from other queues.
|
||||
let err = e
|
||||
.deps
|
||||
.iter()
|
||||
.filter(|ev| ev.is_user() || ev.queue != e.queue)
|
||||
.map(|e| e.wait())
|
||||
.find(|s| *s < 0);
|
||||
|
||||
if let Some(err) = err {
|
||||
// if a dependency failed, fail this event as well
|
||||
// If a dependency failed, fail this event as well.
|
||||
e.set_user_status(err);
|
||||
continue;
|
||||
}
|
||||
|
||||
e.call(&pipe);
|
||||
|
||||
if !e.is_user() {
|
||||
flushed.push(e);
|
||||
} else {
|
||||
e.call(&pipe);
|
||||
// On each user event we flush our events as application might
|
||||
// wait on them before signaling user events.
|
||||
flush_events(&mut flushed, &pipe);
|
||||
|
||||
// Wait on user events as they are synchronization points in the
|
||||
// application's control.
|
||||
e.wait();
|
||||
}
|
||||
}
|
||||
for e in new_events {
|
||||
e.wait();
|
||||
}
|
||||
|
||||
flush_events(&mut flushed, &pipe);
|
||||
})
|
||||
.unwrap(),
|
||||
),
|
||||
|
|
@ -76,28 +118,34 @@ impl Queue {
|
|||
}
|
||||
|
||||
pub fn queue(&self, e: Arc<Event>) {
|
||||
self.pending.lock().unwrap().push(e);
|
||||
self.state.lock().unwrap().pending.push(e);
|
||||
}
|
||||
|
||||
pub fn flush(&self, wait: bool) -> CLResult<()> {
|
||||
let mut p = self.pending.lock().unwrap();
|
||||
let events = p.clone();
|
||||
let mut state = self.state.lock().unwrap();
|
||||
|
||||
// Update last if and only if we get new events, this prevents breaking application code
|
||||
// doing things like `clFlush(q); clFinish(q);`
|
||||
if let Some(last) = state.pending.last() {
|
||||
state.last = Some(last.clone());
|
||||
}
|
||||
|
||||
// This should never ever error, but if it does return an error
|
||||
self.chan_in
|
||||
.send((*p).drain(0..).collect())
|
||||
.send((state.pending).drain(0..).collect())
|
||||
.map_err(|_| CL_OUT_OF_HOST_MEMORY)?;
|
||||
if wait {
|
||||
for e in events {
|
||||
e.wait();
|
||||
}
|
||||
// Waiting on the last event is good enough here as the queue will process it in order,
|
||||
// also take the value so we can release the event once we are done
|
||||
state.last.take().map(|e| e.wait());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn dependencies_for_pending_events(&self) -> HashSet<Arc<Queue>> {
|
||||
let p = self.pending.lock().unwrap();
|
||||
let state = self.state.lock().unwrap();
|
||||
|
||||
let mut queues = Event::deep_unflushed_queues(&p);
|
||||
let mut queues = Event::deep_unflushed_queues(&state.pending);
|
||||
queues.remove(self);
|
||||
queues
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue