From 657f32486576895a26ac38b188bb3ca1bb424f95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=9B=A7-440729=20=5Bsophie=5D?= Date: Fri, 16 May 2025 21:46:27 +0200 Subject: [PATCH] collect results, handle failing evals --- src/build.rs | 46 +++++++++++++++++++++++++++++++----------- src/copy.rs | 41 ++++++++++++++++++++++++++++++++++---- src/eval.rs | 29 ++++++++++++++++++++++++--- src/main.rs | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++-- src/types.rs | 14 +++++++++++++ 5 files changed, 166 insertions(+), 20 deletions(-) diff --git a/src/build.rs b/src/build.rs index ae916a2..2f97e18 100644 --- a/src/build.rs +++ b/src/build.rs @@ -6,10 +6,11 @@ use tokio::{ }; use crate::{ + NixCiResult, NixCiResultType, copy::CopyLoopMessage, types::{ NixInternalLogLine, NixInternalLogLineActivity, NixInternalLogLineActivityType, - NixInternalLogLineResult, NixJob, + NixInternalLogLineResult, NixJob, NixJobEnum, }, util::{ChildOutput, WrappedChild}, }; @@ -63,7 +64,11 @@ struct BuildProgress { reported_builds: VecDeque, } -pub async fn build_loop(mut rx: Receiver, copy_tx: Sender) { +pub async fn build_loop( + mut rx: Receiver, + copy_tx: Sender, + result_tx: Sender, +) { let mut paths_built = Vec::new(); while let Some(msg) = rx.recv().await { let job = match msg { @@ -74,12 +79,31 @@ pub async fn build_loop(mut rx: Receiver, copy_tx: Sender tracing::error!("nix build process errored! {}", e), + let NixJobEnum::Success(eval_job) = job.job else { + continue; + }; + if !paths_built.contains(&eval_job.drv_path) { + paths_built.push(eval_job.drv_path.clone()); + match run_build(job.attr, eval_job.drv_path.clone()).await { + Err(e) => { + let _ = result_tx + .send(NixCiResult { + r#type: NixCiResultType::Build, + path: eval_job.drv_path.clone(), + success: false, + }) + .await; + tracing::error!("nix build process errored! {}", e); + } Ok(results) => { for result in results { + let _ = result_tx + .send(NixCiResult { + r#type: NixCiResultType::Build, + path: result.path.clone(), + success: result.result == BuildResultType::Built, + }) + .await; if let Err(e) = copy_tx.send(CopyLoopMessage::Build(result)).await { tracing::error!("failed to enqueue package copy: {}", e); break; @@ -91,8 +115,8 @@ pub async fn build_loop(mut rx: Receiver, copy_tx: Sender anyhow::Result> { +#[tracing::instrument(skip(drv_path))] +pub async fn run_build(attr: String, drv_path: String) -> anyhow::Result> { let mut child = WrappedChild::new( Command::new("nix") .args(&[ @@ -102,8 +126,8 @@ pub async fn run_build(job: NixJob) -> anyhow::Result> { "--log-format", "internal-json", ]) - .arg(format!("{}^*", job.drv_path)), - Some(format!("nix build {}", job.attr)), + .arg(format!("{}^*", drv_path)), + Some(format!("nix build {}", attr)), )?; let mut activities = HashMap::::new(); // build progress per parent as remote builds are run with a nested parent (scoped under the Realise activity id) @@ -326,7 +350,7 @@ pub async fn run_build(job: NixJob) -> anyhow::Result> { BuildResultType::Built } BuildState::Failed => { - tracing::warn!("derivation {} failed to build", drv_path); + tracing::error!("derivation {} failed to build", drv_path); BuildResultType::Failed } _ => { diff --git a/src/copy.rs b/src/copy.rs index 1852dad..9c360c1 100644 --- a/src/copy.rs +++ b/src/copy.rs @@ -1,8 +1,12 @@ use std::{collections::HashMap, sync::Arc}; -use tokio::{process::Command, sync::mpsc::Receiver}; +use tokio::{ + process::Command, + sync::mpsc::{Receiver, Sender}, +}; use crate::{ + NixCiResult, NixCiResultType, build::{BuildResult, BuildResultType}, config::Options, types::NixDerivationInfo, @@ -14,7 +18,11 @@ pub enum CopyLoopMessage { Stop, } -pub async fn copy_loop(opts: Arc, mut rx: Receiver) { +pub async fn copy_loop( + opts: Arc, + mut rx: Receiver, + result_tx: Sender, +) { let mut paths_copied = Vec::new(); while let Some(msg) = rx.recv().await { let result = match msg { @@ -32,6 +40,13 @@ pub async fn copy_loop(opts: Arc, mut rx: Receiver) { let valid_paths = match get_valid_outputs(&result.path).await { Ok(valid_paths) => valid_paths, Err(e) => { + let _ = result_tx + .send(NixCiResult { + r#type: NixCiResultType::Copy, + path: result.path.clone(), + success: false, + }) + .await; tracing::warn!("failed to get valid paths for drv {}: {}", result.path, e); continue; } @@ -45,8 +60,26 @@ pub async fn copy_loop(opts: Arc, mut rx: Receiver) { } paths_copied.push(store_path.clone()); match copy_path(store_path, copy_to).await { - Ok(()) => tracing::info!("copied path {}", store_path), - Err(e) => tracing::warn!("failed to copy path {}: {}", store_path, e), + Ok(()) => { + let _ = result_tx + .send(NixCiResult { + r#type: NixCiResultType::Copy, + path: store_path.clone(), + success: true, + }) + .await; + tracing::info!("copied path {}", store_path); + } + Err(e) => { + let _ = result_tx + .send(NixCiResult { + r#type: NixCiResultType::Copy, + path: store_path.clone(), + success: false, + }) + .await; + tracing::error!("failed to copy path {}: {}", store_path, e); + } } } } diff --git a/src/eval.rs b/src/eval.rs index 21ed883..6cc5f38 100644 --- a/src/eval.rs +++ b/src/eval.rs @@ -3,15 +3,17 @@ use std::sync::Arc; use tokio::{process::Command, sync::mpsc::Sender}; use crate::{ + NixCiResult, NixCiResultType, build::BuildLoopMessage, config::Options, - types::{NixJob, NixJobCacheStatus}, + types::{NixJob, NixJobCacheStatus, NixJobEnum}, util::{ChildOutput, WrappedChild}, }; pub async fn nix_eval_jobs( opts: Arc, build_tx: Sender, + result_tx: Sender, ) -> anyhow::Result<()> { let mut command = Command::new("nix-eval-jobs"); // TODO: make this configurable @@ -40,11 +42,32 @@ pub async fn nix_eval_jobs( tracing::trace!("nix-eval-jobs line: {}", line); let job: NixJob = serde_json::from_str(&line)?; tracing::debug!("got new job: {:?}", job); - if !opts.systems.contains(&job.system) { + let eval_job = match &job.job { + NixJobEnum::Error { error } => { + let _ = result_tx + .send(NixCiResult { + r#type: NixCiResultType::Eval, + path: job.attr.clone(), + success: false, + }) + .await; + tracing::error!("error evaluating {}: {}", job.attr, error); + continue; + } + NixJobEnum::Success(eval_job) => eval_job, + }; + let _ = result_tx + .send(NixCiResult { + r#type: NixCiResultType::Eval, + path: job.attr.clone(), + success: true, + }) + .await; + if !opts.systems.contains(&eval_job.system) { tracing::info!("skipping unwanted system build for {}", job.attr); continue; } - match (job.cache_status, job.is_cached) { + match (eval_job.cache_status, eval_job.is_cached) { (Some(NixJobCacheStatus::Cached), _) | (None, Some(true)) => { tracing::info!("skipping cached build for {}", job.attr); continue; diff --git a/src/main.rs b/src/main.rs index 94f48c7..af7a05b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,6 +14,20 @@ mod eval; mod types; mod util; +#[derive(Debug, Clone)] +struct NixCiResult { + r#type: NixCiResultType, + path: String, + success: bool, +} + +#[derive(Debug, Clone, Copy)] +enum NixCiResultType { + Eval, + Build, + Copy, +} + #[tokio::main] async fn main() -> anyhow::Result<()> { let mut opts = Options::parse(); @@ -53,14 +67,34 @@ async fn main() -> anyhow::Result<()> { tracing::debug!("running with options {:?}", opts); let opts = Arc::new(opts); + let (result_tx, mut result_rx) = mpsc::channel(16); + let (build_tx, build_rx) = mpsc::channel(16); let (copy_tx, copy_rx) = mpsc::channel(16); let eval_loop = tokio::spawn(crate::eval::nix_eval_jobs( Arc::clone(&opts), build_tx.clone(), + result_tx.clone(), )); - let build_loop = tokio::spawn(crate::build::build_loop(build_rx, copy_tx.clone())); - let copy_loop = tokio::spawn(crate::copy::copy_loop(Arc::clone(&opts), copy_rx)); + let build_loop = tokio::spawn(crate::build::build_loop( + build_rx, + copy_tx.clone(), + result_tx.clone(), + )); + let copy_loop = tokio::spawn(crate::copy::copy_loop( + Arc::clone(&opts), + copy_rx, + result_tx.clone(), + )); + drop(result_tx); + + let results_collector = tokio::spawn(async move { + let mut results = Vec::new(); + while let Some(r) = result_rx.recv().await { + results.push(r); + } + results + }); eval_loop.await??; build_tx.send(BuildLoopMessage::Stop).await?; @@ -68,5 +102,23 @@ async fn main() -> anyhow::Result<()> { copy_tx.send(CopyLoopMessage::Stop).await?; copy_loop.await?; + let results = results_collector.await?; + let mut failed = false; + for result in results { + if result.success { + continue; + } + failed = true; + let job_text = match result.r#type { + NixCiResultType::Eval => "eval", + NixCiResultType::Build => "build", + NixCiResultType::Copy => "copy", + }; + tracing::error!("{} for \"{}\" failed", job_text, result.path); + } + if failed { + return Err(anyhow::anyhow!("some builds failed")); + } + Ok(()) } diff --git a/src/types.rs b/src/types.rs index 4f86131..d9038bc 100644 --- a/src/types.rs +++ b/src/types.rs @@ -17,6 +17,20 @@ pub enum NixJobCacheStatus { #[serde(rename_all = "camelCase")] pub struct NixJob { pub attr: String, + #[serde(flatten)] + pub job: NixJobEnum, +} + +#[derive(Deserialize, Debug, Clone)] +#[serde(untagged, rename_all = "camelCase")] +pub enum NixJobEnum { + Success(NixEvalJob), + Error { error: String }, +} + +#[derive(Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct NixEvalJob { pub name: String, pub drv_path: String, pub system: String,