From 808484041cdc275a10dab2b939834f114e3a09d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Se=C3=A1n=20de=20B=C3=BArca?= Date: Thu, 7 Aug 2025 07:20:47 -0700 Subject: [PATCH] rusticl: execute program builds as jobs on a worker thread v2: rework to use Rust closure for worker job function v3: split preparatory restructuring into separate commit v4: parallelize link and compile, adjust thread/job count v5: split out naming changes to later commit, move validation to api/ Reviewed-by: Karol Herbst Part-of: --- src/gallium/frontends/rusticl/api/program.rs | 17 +- src/gallium/frontends/rusticl/core/context.rs | 6 + src/gallium/frontends/rusticl/core/program.rs | 335 +++++++++++++----- 3 files changed, 269 insertions(+), 89 deletions(-) diff --git a/src/gallium/frontends/rusticl/api/program.rs b/src/gallium/frontends/rusticl/api/program.rs index c4fbc71f717..7c5904023f0 100644 --- a/src/gallium/frontends/rusticl/api/program.rs +++ b/src/gallium/frontends/rusticl/api/program.rs @@ -321,12 +321,18 @@ fn build_program( return Err(CL_INVALID_OPERATION); } + if p.any_device_in_progress(&devs) { + // CL_INVALID_OPERATION if the build of a program executable for any of + // the devices listed in device_list by a previous call to + // clBuildProgram for program has not completed. + return Err(CL_INVALID_OPERATION); + } + let options = c_string_to_string(options); let res = p.build(devs, options, cb_opt); //• CL_INVALID_BINARY if program is created with clCreateProgramWithBinary and devices listed in device_list do not have a valid program binary loaded. //• CL_INVALID_BUILD_OPTIONS if the build options specified by options are invalid. - //• CL_INVALID_OPERATION if the build of a program executable for any of the devices listed in device_list by a previous call to clBuildProgram for program has not completed. //• CL_INVALID_OPERATION if program was not created with clCreateProgramWithSource, clCreateProgramWithIL or clCreateProgramWithBinary. res @@ -398,11 +404,18 @@ fn compile_program( return Err(CL_INVALID_OPERATION); } + if p.any_device_in_progress(&devs) { + // CL_INVALID_OPERATION if the compilation or build of a program + // executable for any of the devices listed in device_list by a previous + // call to clCompileProgram or clBuildProgram for program has not + // completed. + return Err(CL_INVALID_OPERATION); + } + let options = c_string_to_string(options); let res = p.compile(devs, options, headers, cb_opt); // • CL_INVALID_COMPILER_OPTIONS if the compiler options specified by options are invalid. - // • CL_INVALID_OPERATION if the compilation or build of a program executable for any of the devices listed in device_list by a previous call to clCompileProgram or clBuildProgram for program has not completed. res } diff --git a/src/gallium/frontends/rusticl/core/context.rs b/src/gallium/frontends/rusticl/core/context.rs index 1ed37472235..e8060fb5e8d 100644 --- a/src/gallium/frontends/rusticl/core/context.rs +++ b/src/gallium/frontends/rusticl/core/context.rs @@ -13,6 +13,7 @@ use mesa_rust::pipe::context::RWFlags; use mesa_rust::pipe::fence::FenceFd; use mesa_rust::pipe::resource::*; use mesa_rust::pipe::screen::ResourceType; +use mesa_rust::util; use mesa_rust_gen::*; use mesa_rust_util::conversion::*; use mesa_rust_util::properties::Properties; @@ -113,6 +114,7 @@ pub struct Context { >, svm: Mutex, pub gl_ctx_manager: Option, + pub worker_queue: util::queue::Queue, } impl_cl_type_trait!(cl_context, Context, CL_INVALID_CONTEXT); @@ -123,6 +125,9 @@ impl Context { properties: Properties, gl_ctx_manager: Option, ) -> Arc { + let worker_count = u32::max(util::cpu_count() / 2, 1); + let max_job_count = worker_count * 8; + Arc::new(Self { base: CLObjectBase::new(RusticlTypes::Context), devs: devs, @@ -133,6 +138,7 @@ impl Context { svm_ptrs: TrackedPointers::new(), }), gl_ctx_manager: gl_ctx_manager, + worker_queue: util::queue::Queue::new(c"clctxworker", max_job_count, worker_count), }) } diff --git a/src/gallium/frontends/rusticl/core/program.rs b/src/gallium/frontends/rusticl/core/program.rs index f95cbe74e18..c8fd3306bb7 100644 --- a/src/gallium/frontends/rusticl/core/program.rs +++ b/src/gallium/frontends/rusticl/core/program.rs @@ -574,50 +574,34 @@ impl Program { options: String, callback: Option, ) -> CLResult<()> { - let lib = options.contains("-create-library"); - let mut info = self.build_info(); + self.set_builds_in_progress(&devs)?; - for &dev in &devs { - if !self.do_compile(dev, &options, &[], &mut info) { - continue; - } + // If the caller did not provide a callback, block until build finishes. + if callback.is_none() { + self.context + .worker_queue + .add_job_sync(create_build_closure( + Arc::clone(&self), + devs.clone(), + options, + callback, + )) + .wait(); - // skip compilation if we already have the right thing. - let d = info.dev_build_mut(dev); - if self.is_bin() { - if d.bin_type == CL_PROGRAM_BINARY_TYPE_EXECUTABLE && !lib - || d.bin_type == CL_PROGRAM_BINARY_TYPE_LIBRARY && lib - { - continue; - } - } - - let spirv = d.spirv.take().unwrap(); - let spirvs = [&spirv]; - - // Don't request validation of the SPIR-V, as we've just done that - // as part of compilation. - Self::do_link(d, &spirvs, lib, None); - } - - info.rebuild_kernels(&devs, self.is_src()); - - // The callback must be called after we've dropped any mutex locks we're - // holding. - drop(info); - - if let Some(callback) = callback { - callback.call(&self); - } - - debug_logging(&self, &devs); - - if !self.all_devices_succeeded(&devs) { // clBuildProgram returns CL_BUILD_PROGRAM_FAILURE if there is a // failure to build the program executable. This error will be - // returned if clBuildProgram does not return until the build - // has completed. - return Err(CL_BUILD_PROGRAM_FAILURE); + // returned if clBuildProgram does not return until the build has + // completed. + if !self.all_devices_succeeded(&devs) { + return Err(CL_BUILD_PROGRAM_FAILURE); + } + } else { + self.context.worker_queue.add_job(create_build_closure( + Arc::clone(&self), + devs, + options, + callback, + )); } Ok(()) @@ -712,27 +696,42 @@ impl Program { pub fn compile( self: Arc, - devs: Vec<&Device>, + devs: Vec<&'static Device>, options: String, headers: Vec, callback: Option, ) -> CLResult<()> { - for &dev in &devs { - self.do_compile(dev, &options, &headers, &mut self.build_info()); - } + self.set_builds_in_progress(&devs)?; - if let Some(callback) = callback { - callback.call(&self); - } + // If the caller did not provide a callback, block until compile + // finishes. + if callback.is_none() { + self.context + .worker_queue + .add_job_sync(create_compile_closure( + Arc::clone(&self), + devs.clone(), + options, + headers, + callback, + )) + .wait(); - debug_logging(&self, &devs); - - if !self.all_devices_succeeded(&devs) { // clCompileProgram returns CL_COMPILE_PROGRAM_FAILURE if there is a // failure to compile the program source. This error will be // returned if clCompileProgram does not return until the compile // has completed. - return Err(CL_COMPILE_PROGRAM_FAILURE); + if !self.all_devices_succeeded(&devs) { + return Err(CL_COMPILE_PROGRAM_FAILURE); + } + } else { + self.context.worker_queue.add_job(create_compile_closure( + Arc::clone(&self), + devs, + options, + headers, + callback, + )); } Ok(()) @@ -741,42 +740,32 @@ impl Program { pub fn link( context: Arc, devs: Vec<&'static Device>, - progs: Vec>, + progs: Vec>, options: String, callback: Option, - ) -> CLResult<(Arc, cl_int)> { - let mut builds = HashMap::new(); - let mut locks: Vec<_> = progs.iter().map(|p| p.build_info()).collect(); - let lib = options.contains("-create-library"); + ) -> CLResult<(Arc, cl_int)> { + // Link can begin, so we must return a valid program object. + let builds = devs + .iter() + .map(|&device| { + ( + device, + DeviceProgramBuild { + status: CL_BUILD_IN_PROGRESS, + bin_type: CL_PROGRAM_BINARY_TYPE_NONE, + ..Default::default() + }, + ) + }) + .collect(); - for &d in &devs { - let bins: Vec<_> = locks - .iter_mut() - .map(|l| l.dev_build(d).spirv.as_ref().unwrap()) - .collect(); - - let mut build = DeviceProgramBuild { - status: CL_BUILD_IN_PROGRESS, - bin_type: CL_PROGRAM_BINARY_TYPE_NONE, - ..Default::default() - }; - - let device_for_validation = Platform::dbg().validate_spirv.then_some(d); - Self::do_link(&mut build, &bins, lib, device_for_validation); - - builds.insert(d, build); - } - - let mut build = ProgramBuild { + let build = ProgramBuild { builds_by_device: builds, spec_constants: HashMap::new(), kernels: Vec::new(), kernel_info: HashMap::new(), }; - // Pre build nir kernels - build.rebuild_kernels(&devs, false); - let res = Arc::new(Self { base: CLObjectBase::new(RusticlTypes::Program), context: context, @@ -785,18 +774,39 @@ impl Program { build: Mutex::new(build), }); - if let Some(callback) = callback { - callback.call(&res); - } + // If the caller did not provide a callback, block until compile + // finishes. + let status = if callback.is_none() { + res.context + .worker_queue + .add_job_sync(create_link_closure( + Arc::clone(&res), + devs.clone(), + progs, + options, + callback, + )) + .wait(); - debug_logging(&res, &devs); - - let status = if res.all_devices_succeeded(&devs) { - CL_SUCCESS as cl_int - } else { // clLinkProgram returns CL_LINK_PROGRAM_FAILURE if there is a // failure to link the compiled binaries and/or libraries. - CL_LINK_PROGRAM_FAILURE + if res.all_devices_succeeded(&devs) { + CL_SUCCESS as cl_int + } else { + CL_LINK_PROGRAM_FAILURE + } + } else { + res.context.worker_queue.add_job(create_link_closure( + Arc::clone(&res), + devs, + progs, + options, + callback, + )); + + // clLinkProgram always returns success if there is a callback and + // link can begin. + CL_SUCCESS as cl_int }; Ok((res, status)) @@ -841,6 +851,29 @@ impl Program { }; } + /// Sets the status to "in progress" for the device-specific builds for each + /// of the provided builds. + fn set_builds_in_progress(&self, devs: &[&Device]) -> CLResult<()> { + let mut build_info = self.build_info(); + for &device in devs { + // Iterate separately to set these so we don't leave any permanently + // set to in progress in the event of encountering a build still in + // progress. + let device_build = build_info.dev_build_mut(device); + device_build.status = CL_BUILD_IN_PROGRESS; + } + + Ok(()) + } + + /// Returns `true` if build is in progress for any of the provided devices, + /// false otherwise. + pub fn any_device_in_progress(&self, devices: &[&Device]) -> bool { + devices + .iter() + .any(|&device| self.status(device) == CL_BUILD_IN_PROGRESS) + } + /// Returns `true` if build succeeded for each of the provided devices, /// false otherwise. pub fn all_devices_succeeded(&self, devices: &[&Device]) -> bool { @@ -897,3 +930,131 @@ fn debug_logging(p: &Program, devs: &[&Device]) { } } } + +/// Returns a closure which, when called, compiles and links SPIR-V for the +/// provided program and devices. +/// +/// The returned closure is suitable for adding to an async queue. +fn create_build_closure( + program: Arc, + devs: Vec<&'static Device>, + options: String, + mut callback: Option, +) -> impl FnMut() + Send + Sync + 'static { + move || { + let lib = options.contains("-create-library"); + let mut info = program.build_info(); + + for &dev in &devs { + if !program.do_compile(dev, &options, &[], &mut info) { + continue; + } + + let d = info.dev_build_mut(dev); + // skip compilation if we already have the right thing. + if program.is_bin() { + if d.bin_type == CL_PROGRAM_BINARY_TYPE_EXECUTABLE && !lib + || d.bin_type == CL_PROGRAM_BINARY_TYPE_LIBRARY && lib + { + d.status = CL_BUILD_SUCCESS as cl_build_status; + continue; + } + } + + let spirv = d.spirv.take().unwrap(); + let spirvs = [&spirv]; + + // Don't request validation of the SPIR-V, as we've just done that + // as part of compilation. + Program::do_link(d, &spirvs, lib, None); + } + + info.rebuild_kernels(&devs, program.is_src()); + + // The callback must be called after we've dropped any mutex locks we're + // holding. + drop(info); + + if let Some(callback) = callback.take() { + callback.call(program.as_ref()); + } + + debug_logging(&program, &devs); + } +} + +/// Returns a closure which, when called, compiles SPIR-V for the provided +/// program and devices. +/// +/// The returned closure is suitable for adding to an async queue. +fn create_compile_closure( + program: Arc, + devs: Vec<&'static Device>, + options: String, + headers: Vec, + mut callback: Option, +) -> impl FnMut() + Send + Sync + 'static { + move || { + let mut info = program.build_info(); + + for &dev in &devs { + program.do_compile(dev, &options, &headers, &mut info); + } + + // The callback must be called after we've dropped any mutex locks we're + // holding. + drop(info); + + if let Some(callback) = callback.take() { + callback.call(&program); + } + + debug_logging(&program, &devs); + } +} + +/// Returns a closure which, when called, links SPIR-V for the provided input +/// programs and devices. +/// +/// `program` is populated with the resulting device-specific build info. +/// +/// The returned closure is suitable for adding to an async queue. +fn create_link_closure( + program: Arc, + devs: Vec<&'static Device>, + progs: Vec>, + options: String, + mut callback: Option, +) -> impl FnMut() + Send + Sync + 'static { + move || { + let mut locks: Vec<_> = progs.iter().map(|p| p.build_info()).collect(); + let lib = options.contains("-create-library"); + + let mut info = program.build_info(); + + for &d in &devs { + let bins: Vec<_> = locks + .iter_mut() + .map(|l| l.dev_build(d).spirv.as_ref().unwrap()) + .collect(); + + let mut build = info.dev_build_mut(d); + + let device_for_validation = Platform::dbg().validate_spirv.then_some(d); + Program::do_link(&mut build, &bins, lib, device_for_validation); + } + + // Pre build nir kernels + info.rebuild_kernels(&devs, false); + + // The callback must be called after we've dropped any mutex locks we're + // holding. + drop(info); + + if let Some(callback) = callback.take() { + callback.call(&program); + } + + debug_logging(&program, &devs); + } +}