rusticl: execute program builds as jobs on a worker thread

v2: rework to use Rust closure for worker job function
v3: split preparatory restructuring into separate commit
v4: parallelize link and compile, adjust thread/job count
v5: split out naming changes to later commit, move validation to api/

Reviewed-by: Karol Herbst <kherbst@redhat.com>
Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/36497>
This commit is contained in:
Seán de Búrca 2025-08-07 07:20:47 -07:00 committed by Marge Bot
parent 56cc1b1c96
commit 808484041c
3 changed files with 269 additions and 89 deletions

View file

@ -321,12 +321,18 @@ fn build_program(
return Err(CL_INVALID_OPERATION);
}
if p.any_device_in_progress(&devs) {
// CL_INVALID_OPERATION if the build of a program executable for any of
// the devices listed in device_list by a previous call to
// clBuildProgram for program has not completed.
return Err(CL_INVALID_OPERATION);
}
let options = c_string_to_string(options);
let res = p.build(devs, options, cb_opt);
//• CL_INVALID_BINARY if program is created with clCreateProgramWithBinary and devices listed in device_list do not have a valid program binary loaded.
//• CL_INVALID_BUILD_OPTIONS if the build options specified by options are invalid.
//• CL_INVALID_OPERATION if the build of a program executable for any of the devices listed in device_list by a previous call to clBuildProgram for program has not completed.
//• CL_INVALID_OPERATION if program was not created with clCreateProgramWithSource, clCreateProgramWithIL or clCreateProgramWithBinary.
res
@ -398,11 +404,18 @@ fn compile_program(
return Err(CL_INVALID_OPERATION);
}
if p.any_device_in_progress(&devs) {
// CL_INVALID_OPERATION if the compilation or build of a program
// executable for any of the devices listed in device_list by a previous
// call to clCompileProgram or clBuildProgram for program has not
// completed.
return Err(CL_INVALID_OPERATION);
}
let options = c_string_to_string(options);
let res = p.compile(devs, options, headers, cb_opt);
// • CL_INVALID_COMPILER_OPTIONS if the compiler options specified by options are invalid.
// • CL_INVALID_OPERATION if the compilation or build of a program executable for any of the devices listed in device_list by a previous call to clCompileProgram or clBuildProgram for program has not completed.
res
}

View file

@ -13,6 +13,7 @@ use mesa_rust::pipe::context::RWFlags;
use mesa_rust::pipe::fence::FenceFd;
use mesa_rust::pipe::resource::*;
use mesa_rust::pipe::screen::ResourceType;
use mesa_rust::util;
use mesa_rust_gen::*;
use mesa_rust_util::conversion::*;
use mesa_rust_util::properties::Properties;
@ -113,6 +114,7 @@ pub struct Context {
>,
svm: Mutex<SVMContext>,
pub gl_ctx_manager: Option<GLCtxManager>,
pub worker_queue: util::queue::Queue,
}
impl_cl_type_trait!(cl_context, Context, CL_INVALID_CONTEXT);
@ -123,6 +125,9 @@ impl Context {
properties: Properties<cl_context_properties>,
gl_ctx_manager: Option<GLCtxManager>,
) -> Arc<Context> {
let worker_count = u32::max(util::cpu_count() / 2, 1);
let max_job_count = worker_count * 8;
Arc::new(Self {
base: CLObjectBase::new(RusticlTypes::Context),
devs: devs,
@ -133,6 +138,7 @@ impl Context {
svm_ptrs: TrackedPointers::new(),
}),
gl_ctx_manager: gl_ctx_manager,
worker_queue: util::queue::Queue::new(c"clctxworker", max_job_count, worker_count),
})
}

View file

@ -574,50 +574,34 @@ impl Program {
options: String,
callback: Option<ProgramCB>,
) -> CLResult<()> {
let lib = options.contains("-create-library");
let mut info = self.build_info();
self.set_builds_in_progress(&devs)?;
for &dev in &devs {
if !self.do_compile(dev, &options, &[], &mut info) {
continue;
}
// If the caller did not provide a callback, block until build finishes.
if callback.is_none() {
self.context
.worker_queue
.add_job_sync(create_build_closure(
Arc::clone(&self),
devs.clone(),
options,
callback,
))
.wait();
// skip compilation if we already have the right thing.
let d = info.dev_build_mut(dev);
if self.is_bin() {
if d.bin_type == CL_PROGRAM_BINARY_TYPE_EXECUTABLE && !lib
|| d.bin_type == CL_PROGRAM_BINARY_TYPE_LIBRARY && lib
{
continue;
}
}
let spirv = d.spirv.take().unwrap();
let spirvs = [&spirv];
// Don't request validation of the SPIR-V, as we've just done that
// as part of compilation.
Self::do_link(d, &spirvs, lib, None);
}
info.rebuild_kernels(&devs, self.is_src());
// The callback must be called after we've dropped any mutex locks we're
// holding.
drop(info);
if let Some(callback) = callback {
callback.call(&self);
}
debug_logging(&self, &devs);
if !self.all_devices_succeeded(&devs) {
// clBuildProgram returns CL_BUILD_PROGRAM_FAILURE if there is a
// failure to build the program executable. This error will be
// returned if clBuildProgram does not return until the build
// has completed.
return Err(CL_BUILD_PROGRAM_FAILURE);
// returned if clBuildProgram does not return until the build has
// completed.
if !self.all_devices_succeeded(&devs) {
return Err(CL_BUILD_PROGRAM_FAILURE);
}
} else {
self.context.worker_queue.add_job(create_build_closure(
Arc::clone(&self),
devs,
options,
callback,
));
}
Ok(())
@ -712,27 +696,42 @@ impl Program {
pub fn compile(
self: Arc<Self>,
devs: Vec<&Device>,
devs: Vec<&'static Device>,
options: String,
headers: Vec<HeaderProgram>,
callback: Option<ProgramCB>,
) -> CLResult<()> {
for &dev in &devs {
self.do_compile(dev, &options, &headers, &mut self.build_info());
}
self.set_builds_in_progress(&devs)?;
if let Some(callback) = callback {
callback.call(&self);
}
// If the caller did not provide a callback, block until compile
// finishes.
if callback.is_none() {
self.context
.worker_queue
.add_job_sync(create_compile_closure(
Arc::clone(&self),
devs.clone(),
options,
headers,
callback,
))
.wait();
debug_logging(&self, &devs);
if !self.all_devices_succeeded(&devs) {
// clCompileProgram returns CL_COMPILE_PROGRAM_FAILURE if there is a
// failure to compile the program source. This error will be
// returned if clCompileProgram does not return until the compile
// has completed.
return Err(CL_COMPILE_PROGRAM_FAILURE);
if !self.all_devices_succeeded(&devs) {
return Err(CL_COMPILE_PROGRAM_FAILURE);
}
} else {
self.context.worker_queue.add_job(create_compile_closure(
Arc::clone(&self),
devs,
options,
headers,
callback,
));
}
Ok(())
@ -741,42 +740,32 @@ impl Program {
pub fn link(
context: Arc<Context>,
devs: Vec<&'static Device>,
progs: Vec<Arc<Program>>,
progs: Vec<Arc<Self>>,
options: String,
callback: Option<ProgramCB>,
) -> CLResult<(Arc<Program>, cl_int)> {
let mut builds = HashMap::new();
let mut locks: Vec<_> = progs.iter().map(|p| p.build_info()).collect();
let lib = options.contains("-create-library");
) -> CLResult<(Arc<Self>, cl_int)> {
// Link can begin, so we must return a valid program object.
let builds = devs
.iter()
.map(|&device| {
(
device,
DeviceProgramBuild {
status: CL_BUILD_IN_PROGRESS,
bin_type: CL_PROGRAM_BINARY_TYPE_NONE,
..Default::default()
},
)
})
.collect();
for &d in &devs {
let bins: Vec<_> = locks
.iter_mut()
.map(|l| l.dev_build(d).spirv.as_ref().unwrap())
.collect();
let mut build = DeviceProgramBuild {
status: CL_BUILD_IN_PROGRESS,
bin_type: CL_PROGRAM_BINARY_TYPE_NONE,
..Default::default()
};
let device_for_validation = Platform::dbg().validate_spirv.then_some(d);
Self::do_link(&mut build, &bins, lib, device_for_validation);
builds.insert(d, build);
}
let mut build = ProgramBuild {
let build = ProgramBuild {
builds_by_device: builds,
spec_constants: HashMap::new(),
kernels: Vec::new(),
kernel_info: HashMap::new(),
};
// Pre build nir kernels
build.rebuild_kernels(&devs, false);
let res = Arc::new(Self {
base: CLObjectBase::new(RusticlTypes::Program),
context: context,
@ -785,18 +774,39 @@ impl Program {
build: Mutex::new(build),
});
if let Some(callback) = callback {
callback.call(&res);
}
// If the caller did not provide a callback, block until compile
// finishes.
let status = if callback.is_none() {
res.context
.worker_queue
.add_job_sync(create_link_closure(
Arc::clone(&res),
devs.clone(),
progs,
options,
callback,
))
.wait();
debug_logging(&res, &devs);
let status = if res.all_devices_succeeded(&devs) {
CL_SUCCESS as cl_int
} else {
// clLinkProgram returns CL_LINK_PROGRAM_FAILURE if there is a
// failure to link the compiled binaries and/or libraries.
CL_LINK_PROGRAM_FAILURE
if res.all_devices_succeeded(&devs) {
CL_SUCCESS as cl_int
} else {
CL_LINK_PROGRAM_FAILURE
}
} else {
res.context.worker_queue.add_job(create_link_closure(
Arc::clone(&res),
devs,
progs,
options,
callback,
));
// clLinkProgram always returns success if there is a callback and
// link can begin.
CL_SUCCESS as cl_int
};
Ok((res, status))
@ -841,6 +851,29 @@ impl Program {
};
}
/// Sets the status to "in progress" for the device-specific builds for each
/// of the provided builds.
fn set_builds_in_progress(&self, devs: &[&Device]) -> CLResult<()> {
let mut build_info = self.build_info();
for &device in devs {
// Iterate separately to set these so we don't leave any permanently
// set to in progress in the event of encountering a build still in
// progress.
let device_build = build_info.dev_build_mut(device);
device_build.status = CL_BUILD_IN_PROGRESS;
}
Ok(())
}
/// Returns `true` if build is in progress for any of the provided devices,
/// false otherwise.
pub fn any_device_in_progress(&self, devices: &[&Device]) -> bool {
devices
.iter()
.any(|&device| self.status(device) == CL_BUILD_IN_PROGRESS)
}
/// Returns `true` if build succeeded for each of the provided devices,
/// false otherwise.
pub fn all_devices_succeeded(&self, devices: &[&Device]) -> bool {
@ -897,3 +930,131 @@ fn debug_logging(p: &Program, devs: &[&Device]) {
}
}
}
/// Returns a closure which, when called, compiles and links SPIR-V for the
/// provided program and devices.
///
/// The returned closure is suitable for adding to an async queue.
fn create_build_closure(
program: Arc<Program>,
devs: Vec<&'static Device>,
options: String,
mut callback: Option<ProgramCB>,
) -> impl FnMut() + Send + Sync + 'static {
move || {
let lib = options.contains("-create-library");
let mut info = program.build_info();
for &dev in &devs {
if !program.do_compile(dev, &options, &[], &mut info) {
continue;
}
let d = info.dev_build_mut(dev);
// skip compilation if we already have the right thing.
if program.is_bin() {
if d.bin_type == CL_PROGRAM_BINARY_TYPE_EXECUTABLE && !lib
|| d.bin_type == CL_PROGRAM_BINARY_TYPE_LIBRARY && lib
{
d.status = CL_BUILD_SUCCESS as cl_build_status;
continue;
}
}
let spirv = d.spirv.take().unwrap();
let spirvs = [&spirv];
// Don't request validation of the SPIR-V, as we've just done that
// as part of compilation.
Program::do_link(d, &spirvs, lib, None);
}
info.rebuild_kernels(&devs, program.is_src());
// The callback must be called after we've dropped any mutex locks we're
// holding.
drop(info);
if let Some(callback) = callback.take() {
callback.call(program.as_ref());
}
debug_logging(&program, &devs);
}
}
/// Returns a closure which, when called, compiles SPIR-V for the provided
/// program and devices.
///
/// The returned closure is suitable for adding to an async queue.
fn create_compile_closure(
program: Arc<Program>,
devs: Vec<&'static Device>,
options: String,
headers: Vec<HeaderProgram>,
mut callback: Option<ProgramCB>,
) -> impl FnMut() + Send + Sync + 'static {
move || {
let mut info = program.build_info();
for &dev in &devs {
program.do_compile(dev, &options, &headers, &mut info);
}
// The callback must be called after we've dropped any mutex locks we're
// holding.
drop(info);
if let Some(callback) = callback.take() {
callback.call(&program);
}
debug_logging(&program, &devs);
}
}
/// Returns a closure which, when called, links SPIR-V for the provided input
/// programs and devices.
///
/// `program` is populated with the resulting device-specific build info.
///
/// The returned closure is suitable for adding to an async queue.
fn create_link_closure(
program: Arc<Program>,
devs: Vec<&'static Device>,
progs: Vec<Arc<Program>>,
options: String,
mut callback: Option<ProgramCB>,
) -> impl FnMut() + Send + Sync + 'static {
move || {
let mut locks: Vec<_> = progs.iter().map(|p| p.build_info()).collect();
let lib = options.contains("-create-library");
let mut info = program.build_info();
for &d in &devs {
let bins: Vec<_> = locks
.iter_mut()
.map(|l| l.dev_build(d).spirv.as_ref().unwrap())
.collect();
let mut build = info.dev_build_mut(d);
let device_for_validation = Platform::dbg().validate_spirv.then_some(d);
Program::do_link(&mut build, &bins, lib, device_for_validation);
}
// Pre build nir kernels
info.rebuild_kernels(&devs, false);
// The callback must be called after we've dropped any mutex locks we're
// holding.
drop(info);
if let Some(callback) = callback.take() {
callback.call(&program);
}
debug_logging(&program, &devs);
}
}