use std::collections::{HashMap, VecDeque}; use tokio::{ process::Command, sync::mpsc::{Receiver, Sender}, }; use crate::{ NixCiResult, NixCiResultType, copy::CopyLoopMessage, types::{ NixInternalLogLine, NixInternalLogLineActivity, NixInternalLogLineActivityType, NixInternalLogLineResult, NixJob, NixJobEnum, }, util::{ChildOutput, WrappedChild}, }; pub enum BuildLoopMessage { Job(NixJob), Stop, } #[derive(Debug)] pub struct BuildResult { pub path: String, pub result: BuildResultType, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum BuildResultType { Built, Failed, Unknown, } #[derive(Debug)] struct ActivityData { parent: u64, r#type: NixInternalLogLineActivityType, running: bool, data: Option, } #[derive(Debug, Clone)] enum ActivityDataPerType { Build { drv_path: String, state: BuildState }, Substitute { store_path: String }, } #[derive(Debug, Clone, Copy)] enum BuildState { Started, Running, Done, Failed, } #[derive(Debug, Clone)] struct BuildProgress { done: u64, running: u64, failed: u64, started_builds: VecDeque, reported_builds: VecDeque, unreported_builds: VecDeque, } pub async fn build_loop( mut rx: Receiver, copy_tx: Sender, result_tx: Sender, ) { let mut paths_built = Vec::new(); while let Some(msg) = rx.recv().await { let job = match msg { BuildLoopMessage::Job(job) => job, BuildLoopMessage::Stop => { tracing::debug!("stop signal received"); break; } }; tracing::info!("building {}", job.attr); let NixJobEnum::Success(eval_job) = job.job else { continue; }; if !paths_built.contains(&eval_job.drv_path) { paths_built.push(eval_job.drv_path.clone()); match run_build(job.attr, eval_job.drv_path.clone()).await { Err(e) => { let _ = result_tx .send(NixCiResult { r#type: NixCiResultType::Build, path: eval_job.drv_path.clone(), success: false, }) .await; tracing::error!("nix build process errored! {}", e); } Ok(results) => { for result in results { let _ = result_tx .send(NixCiResult { r#type: NixCiResultType::Build, path: result.path.clone(), success: result.result == BuildResultType::Built, }) .await; if let Err(e) = copy_tx.send(CopyLoopMessage::Build(result)).await { tracing::error!("failed to enqueue package copy: {}", e); break; } } } } } } } #[tracing::instrument(skip(drv_path))] pub async fn run_build(attr: String, drv_path: String) -> anyhow::Result> { let mut child = WrappedChild::new( Command::new("nix") .args(&[ "build", "--keep-going", "--verbose", "--log-format", "internal-json", ]) .arg(format!("{}^*", drv_path)), Some(format!("nix build {}", attr)), )?; let mut activities = HashMap::::new(); // build progress per parent as remote builds are run with a nested parent (scoped under the Realise activity id) let mut progress = HashMap::::new(); loop { let line = match child.next_line().await? { ChildOutput::Finished => break, ChildOutput::Stderr(line) => line, ChildOutput::Stdout(line) => { tracing::info!(line); continue; } }; if !line.starts_with("@nix ") { tracing::warn!("invalid nix build output: {}", line); continue; } let log: NixInternalLogLine = match serde_json::from_str(&line[4..]) { Ok(log) => log, Err(e) => { tracing::warn!("failed to parse nix log output {:?}: {}", line, e); continue; } }; tracing::trace!("log line: {:?}", log); match log { NixInternalLogLine::Msg { msg, .. } => { tracing::info!("{}", msg); } NixInternalLogLine::Result { id, result } => { let activity = activities.get_mut(&id); if activity.is_none() { tracing::warn!("received result for unknown activity {}!", id); } match result { NixInternalLogLineResult::BuildLogLine { log } | NixInternalLogLineResult::PostBuildLogLine { log } => { tracing::info!("{}", log) } // apparently only reported for the realise activity, with activity types of // FileTransfer and CopyPath. contains bytes downloaded and unpacked, respectively NixInternalLogLineResult::SetExpected { .. } => {} // progress reporting, depends on the activity type NixInternalLogLineResult::Progress { done, expected: _, running, failed, } => { if let Some(activity) = activity { match activity.r#type { // meta-progress counting the derivations being built as part of the build NixInternalLogLineActivityType::Builds => { let progress_entry = progress.get_mut(&activity.parent).unwrap(); while running > progress_entry.running { progress_entry.running += 1; if let Some(build_activity_id) = progress_entry.started_builds.pop_front() { let Some(a) = activities.get_mut(&build_activity_id) else { panic!("this can't happen (id without activity)"); }; let ActivityDataPerType::Build { state, .. } = a.data.as_mut().unwrap() else { panic!( "this can't happen (build activity without build data)" ); }; *state = BuildState::Running; } else { tracing::warn!( "no build started but we got a running report?" ); } } while failed > progress_entry.failed { progress_entry.failed += 1; if let Some(build_id) = progress_entry.unreported_builds.pop_front() { if let Some(entry) = activities.get_mut(&build_id) { match entry.r#type { NixInternalLogLineActivityType::Build => { let ActivityDataPerType::Build { drv_path, state, } = entry.data.as_mut().unwrap() else { panic!( "this can't happen (build activity without build data)" ); }; tracing::info!( "reported failed build for derivation {} after stop", drv_path ); *state = BuildState::Failed; } _ => {} } } } else { progress_entry .reported_builds .push_back(BuildState::Failed); } } while done > progress_entry.done { progress_entry.done += 1; if let Some(build_id) = progress_entry.unreported_builds.pop_front() { if let Some(entry) = activities.get_mut(&build_id) { match entry.r#type { NixInternalLogLineActivityType::Build => { let ActivityDataPerType::Build { drv_path, state, } = entry.data.as_mut().unwrap() else { panic!( "this can't happen (build activity without build data)" ); }; tracing::info!( "reported done build for derivation {} after stop", drv_path ); *state = BuildState::Done; } _ => {} } } } else { progress_entry .reported_builds .push_back(BuildState::Done); } } } // meta-progress counting the paths being downloaded as part of the build NixInternalLogLineActivityType::CopyPaths => {} // progress for a single download NixInternalLogLineActivityType::FileTransfer => {} // progress for a file unpack(?) downloaded nar -> store NixInternalLogLineActivityType::CopyPath => {} _ => { tracing::warn!( "progress reported for unhandled activity type {:?}", activity.r#type ); } } } } _ => { tracing::warn!("unhandled result: activity {}, {:?}", id, result); } } } NixInternalLogLine::Start { id, level: _, text, parent, activity, } => { tracing::debug!( "activity {} (parent {}) started: {:?}", id, parent, activity ); let mut entry = activities.entry(id).insert_entry(ActivityData { parent, r#type: activity.get_type(), running: true, data: None, }); if !text.is_empty() { tracing::info!("{}", text); } match activity { // these seem to be meta-activities for reporting progress NixInternalLogLineActivity::Builds => {} NixInternalLogLineActivity::CopyPaths => {} NixInternalLogLineActivity::Realise => { progress.entry(parent).or_insert_with(|| BuildProgress { done: 0, running: 0, failed: 0, started_builds: VecDeque::new(), reported_builds: VecDeque::new(), unreported_builds: VecDeque::new(), }); } // just ignore this, we don't really care about these NixInternalLogLineActivity::Unknown => {} NixInternalLogLineActivity::FileTransfer => {} NixInternalLogLineActivity::QueryPathInfo { .. } => { // has FileTransfer as child } NixInternalLogLineActivity::CopyPath => { // has FileTransfer as child (when downloading) } NixInternalLogLineActivity::Substitute { store_path, .. } => { // has CopyPath as child entry .get_mut() .data .replace(ActivityDataPerType::Substitute { store_path }); } // useful activities NixInternalLogLineActivity::Build { drv_path, .. } => { entry.get_mut().data.replace(ActivityDataPerType::Build { drv_path, state: BuildState::Started, }); progress .entry(parent) .and_modify(|progress| progress.started_builds.push_back(id)); } _ => { tracing::warn!( "unhandled start: activity {} (parent {}), {:?}", id, parent, activity ) } } } NixInternalLogLine::Stop { id } => { tracing::debug!("activity {} stopped", id); if let Some(entry) = activities.get_mut(&id) { entry.running = false; match entry.r#type { NixInternalLogLineActivityType::Build => { let ActivityDataPerType::Build { state, .. } = entry.data.as_mut().unwrap() else { panic!("this can't happen (build activity without build data)"); }; let progress_entry = progress.get_mut(&entry.parent).unwrap(); if let Some(report) = progress_entry.reported_builds.pop_front() { *state = report; } else { tracing::warn!( "build stopped but no report (don't know whether it failed)" ); progress_entry.unreported_builds.push_back(id); } } _ => {} } } } } } let exit_status = child .exit_status() .ok_or(anyhow::anyhow!("child has exited, but no exit status????"))?; if !exit_status.success() { tracing::warn!("nix build failed with {}!", exit_status); } let mut results: HashMap = HashMap::new(); for (_, a) in &activities { match a.data.as_ref() { Some(ActivityDataPerType::Build { drv_path, state }) => { let result = match state { BuildState::Done => { tracing::info!("derivation {} built successfully", drv_path); BuildResultType::Built } BuildState::Failed => { tracing::error!("derivation {} failed to build", drv_path); BuildResultType::Failed } _ => { tracing::warn!("derivation {} somehow still running??", drv_path); // unknown build status -> assume it was built successfully if nix build did not return an error if exit_status.success() { BuildResultType::Built } else { BuildResultType::Unknown } } }; results .entry(drv_path.clone()) .and_modify(|r| { if result == BuildResultType::Built { *r = result; } }) .or_insert(result); } Some(ActivityDataPerType::Substitute { store_path }) => { tracing::trace!("downloaded {}", store_path); } _ => {} } } Ok(results .into_iter() .map(|(path, result)| BuildResult { path, result }) .collect()) }