diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 20a35bd..91eb9a9 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -18,4 +18,4 @@ build: - nix --version script: - cargo build --release - - target/release/nix-ci --check-cached + - target/release/nix-ci --check-cached --copy-to "file://$(pwd)/cache" diff --git a/src/build.rs b/src/build.rs index 6e0db4f..ae916a2 100644 --- a/src/build.rs +++ b/src/build.rs @@ -1,8 +1,12 @@ use std::collections::{HashMap, VecDeque}; -use tokio::{process::Command, sync::mpsc::Receiver}; +use tokio::{ + process::Command, + sync::mpsc::{Receiver, Sender}, +}; use crate::{ + copy::CopyLoopMessage, types::{ NixInternalLogLine, NixInternalLogLineActivity, NixInternalLogLineActivityType, NixInternalLogLineResult, NixJob, @@ -15,10 +19,22 @@ pub enum BuildLoopMessage { Stop, } +#[derive(Debug)] +pub struct BuildResult { + pub path: String, + pub result: BuildResultType, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BuildResultType { + Built, + Failed, + Unknown, +} + #[derive(Debug)] struct ActivityData { - #[allow(dead_code)] - parent: Option, + parent: u64, r#type: NixInternalLogLineActivityType, running: bool, data: Option, @@ -38,14 +54,16 @@ enum BuildState { Failed, } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] struct BuildProgress { done: u64, running: u64, failed: u64, + started_builds: VecDeque, + reported_builds: VecDeque, } -pub async fn build_loop(mut rx: Receiver) { +pub async fn build_loop(mut rx: Receiver, copy_tx: Sender) { let mut paths_built = Vec::new(); while let Some(msg) = rx.recv().await { let job = match msg { @@ -58,15 +76,23 @@ pub async fn build_loop(mut rx: Receiver) { tracing::info!("building {}", job.attr); if !paths_built.contains(&job.drv_path) { paths_built.push(job.drv_path.clone()); - if let Err(e) = run_build(job).await { - tracing::error!("nix build process errored! {}", e); + match run_build(job).await { + Err(e) => tracing::error!("nix build process errored! {}", e), + Ok(results) => { + for result in results { + if let Err(e) = copy_tx.send(CopyLoopMessage::Build(result)).await { + tracing::error!("failed to enqueue package copy: {}", e); + break; + } + } + } } } } } #[tracing::instrument(skip(job), fields(attr = job.attr))] -pub async fn run_build(job: NixJob) -> anyhow::Result<()> { +pub async fn run_build(job: NixJob) -> anyhow::Result> { let mut child = WrappedChild::new( Command::new("nix") .args(&[ @@ -80,13 +106,8 @@ pub async fn run_build(job: NixJob) -> anyhow::Result<()> { Some(format!("nix build {}", job.attr)), )?; let mut activities = HashMap::::new(); - let mut progress = BuildProgress { - done: 0, - running: 0, - failed: 0, - }; - let mut started_builds = VecDeque::new(); - let mut reported_builds = VecDeque::new(); + // build progress per parent as remote builds are run with a nested parent (scoped under the Realise activity id) + let mut progress = HashMap::::new(); loop { let line = match child.next_line().await? { ChildOutput::Finished => break, @@ -136,9 +157,12 @@ pub async fn run_build(job: NixJob) -> anyhow::Result<()> { match activity.r#type { // meta-progress counting the derivations being built as part of the build NixInternalLogLineActivityType::Builds => { - while running > progress.running { - progress.running += 1; - if let Some(build_activity_id) = started_builds.pop_front() + let progress_entry = + progress.get_mut(&activity.parent).unwrap(); + while running > progress_entry.running { + progress_entry.running += 1; + if let Some(build_activity_id) = + progress_entry.started_builds.pop_front() { let Some(a) = activities.get_mut(&build_activity_id) else { @@ -158,13 +182,15 @@ pub async fn run_build(job: NixJob) -> anyhow::Result<()> { ); } } - while failed > progress.failed { - progress.failed += 1; - reported_builds.push_back(BuildState::Failed); + while failed > progress_entry.failed { + progress_entry.failed += 1; + progress_entry + .reported_builds + .push_back(BuildState::Failed); } - while done > progress.done { - progress.done += 1; - reported_builds.push_back(BuildState::Done); + while done > progress_entry.done { + progress_entry.done += 1; + progress_entry.reported_builds.push_back(BuildState::Done); } } // meta-progress counting the paths being downloaded as part of the build @@ -200,15 +226,12 @@ pub async fn run_build(job: NixJob) -> anyhow::Result<()> { parent, activity ); - let mut entry = { - let parent = if parent == 0 { None } else { Some(parent) }; - activities.entry(id).insert_entry(ActivityData { - parent, - r#type: activity.get_type(), - running: true, - data: None, - }) - }; + let mut entry = activities.entry(id).insert_entry(ActivityData { + parent, + r#type: activity.get_type(), + running: true, + data: None, + }); if !text.is_empty() { tracing::info!("{}", text); } @@ -216,7 +239,15 @@ pub async fn run_build(job: NixJob) -> anyhow::Result<()> { // these seem to be meta-activities for reporting progress NixInternalLogLineActivity::Builds => {} NixInternalLogLineActivity::CopyPaths => {} - NixInternalLogLineActivity::Realise => {} + NixInternalLogLineActivity::Realise => { + progress.entry(parent).or_insert_with(|| BuildProgress { + done: 0, + running: 0, + failed: 0, + started_builds: VecDeque::new(), + reported_builds: VecDeque::new(), + }); + } // just ignore this, we don't really care about these NixInternalLogLineActivity::Unknown => {} NixInternalLogLineActivity::FileTransfer => {} @@ -239,7 +270,9 @@ pub async fn run_build(job: NixJob) -> anyhow::Result<()> { drv_path, state: BuildState::Started, }); - started_builds.push_back(id); + progress + .entry(parent) + .and_modify(|progress| progress.started_builds.push_back(id)); } _ => { tracing::warn!( @@ -262,7 +295,8 @@ pub async fn run_build(job: NixJob) -> anyhow::Result<()> { else { panic!("this can't happen (build activity without build data)"); }; - if let Some(report) = reported_builds.pop_front() { + let progress_entry = progress.get_mut(&entry.parent).unwrap(); + if let Some(report) = progress_entry.reported_builds.pop_front() { *state = report; } else { tracing::warn!( @@ -282,19 +316,41 @@ pub async fn run_build(job: NixJob) -> anyhow::Result<()> { if !exit_status.success() { tracing::warn!("nix build failed with {}!", exit_status); } + let mut results: HashMap = HashMap::new(); for (_, a) in &activities { match a.data.as_ref() { - Some(ActivityDataPerType::Build { drv_path, state }) => match state { - BuildState::Done => tracing::info!("derivation {} built successfully", drv_path), - BuildState::Failed => tracing::warn!("derivation {} failed to build", drv_path), - _ => tracing::warn!("derivation {} somehow still running??", drv_path), - }, + Some(ActivityDataPerType::Build { drv_path, state }) => { + let result = match state { + BuildState::Done => { + tracing::info!("derivation {} built successfully", drv_path); + BuildResultType::Built + } + BuildState::Failed => { + tracing::warn!("derivation {} failed to build", drv_path); + BuildResultType::Failed + } + _ => { + tracing::warn!("derivation {} somehow still running??", drv_path); + BuildResultType::Unknown + } + }; + results + .entry(drv_path.clone()) + .and_modify(|r| { + if result == BuildResultType::Built { + *r = result; + } + }) + .or_insert(result); + } Some(ActivityDataPerType::Substitute { store_path }) => { tracing::info!("downloaded {}", store_path); } _ => {} } } - // TODO: return the drv paths to upload! - Ok(()) + Ok(results + .into_iter() + .map(|(path, result)| BuildResult { path, result }) + .collect()) } diff --git a/src/config.rs b/src/config.rs index ba7518d..53d2f70 100644 --- a/src/config.rs +++ b/src/config.rs @@ -4,4 +4,6 @@ pub struct Options { pub systems: Vec, #[arg(long)] pub check_cached: bool, + #[arg(long)] + pub copy_to: Option, } diff --git a/src/copy.rs b/src/copy.rs new file mode 100644 index 0000000..1852dad --- /dev/null +++ b/src/copy.rs @@ -0,0 +1,133 @@ +use std::{collections::HashMap, sync::Arc}; + +use tokio::{process::Command, sync::mpsc::Receiver}; + +use crate::{ + build::{BuildResult, BuildResultType}, + config::Options, + types::NixDerivationInfo, + util::{ChildOutput, WrappedChild}, +}; + +pub enum CopyLoopMessage { + Build(BuildResult), + Stop, +} + +pub async fn copy_loop(opts: Arc, mut rx: Receiver) { + let mut paths_copied = Vec::new(); + while let Some(msg) = rx.recv().await { + let result = match msg { + CopyLoopMessage::Build(result) => result, + CopyLoopMessage::Stop => { + tracing::debug!("stop signal received"); + break; + } + }; + match &result.result { + BuildResultType::Failed => { + tracing::debug!("not uploading path that failed to build: {}", result.path); + } + BuildResultType::Built | BuildResultType::Unknown => { + let valid_paths = match get_valid_outputs(&result.path).await { + Ok(valid_paths) => valid_paths, + Err(e) => { + tracing::warn!("failed to get valid paths for drv {}: {}", result.path, e); + continue; + } + }; + let Some(copy_to) = &opts.copy_to else { + continue; + }; + for store_path in &valid_paths { + if paths_copied.contains(store_path) { + continue; + } + paths_copied.push(store_path.clone()); + match copy_path(store_path, copy_to).await { + Ok(()) => tracing::info!("copied path {}", store_path), + Err(e) => tracing::warn!("failed to copy path {}: {}", store_path, e), + } + } + } + } + } +} + +async fn get_valid_outputs(drv_path: &str) -> anyhow::Result> { + let cmd = Command::new("nix") + .args(&["derivation", "show", drv_path]) + .output() + .await?; + if !cmd.status.success() { + tracing::warn!( + "nix derivation show: {}\n{}", + String::from_utf8_lossy(&cmd.stdout), + String::from_utf8_lossy(&cmd.stderr), + ); + return Err(anyhow::anyhow!( + "nix derivation show exited with non-success exit status!" + )); + } + + tracing::trace!( + "nix derivation show output: {}", + String::from_utf8_lossy(&cmd.stdout) + ); + let drv_info: HashMap = serde_json::from_slice(&cmd.stdout)?; + let drv_info = drv_info + .get(drv_path) + .ok_or_else(|| anyhow::anyhow!("derivation missing?"))?; + let mut valid_paths = Vec::new(); + for (_, output) in &drv_info.outputs { + let Some(store_path) = &output.path else { + continue; + }; + match check_path_validity(&store_path).await { + Ok(true) => valid_paths.push(store_path.clone()), + Ok(false) => (), + Err(e) => { + tracing::warn!("failed to check path validity for {}: {}", store_path, e); + } + } + } + + Ok(valid_paths) +} + +async fn check_path_validity(store_path: &str) -> anyhow::Result { + let cmd = Command::new("nix") + .args(&["path-info", store_path]) + .output() + .await?; + tracing::trace!( + "nix path-info output: {}\n{}", + String::from_utf8_lossy(&cmd.stdout), + String::from_utf8_lossy(&cmd.stderr), + ); + Ok(cmd.status.success()) +} + +async fn copy_path(store_path: &str, copy_to: &str) -> anyhow::Result<()> { + let mut cmd = WrappedChild::new( + Command::new("nix").args(&["copy", "--to", copy_to]), + Some(format!("nix copy {}", store_path)), + )?; + loop { + match cmd.next_line().await? { + ChildOutput::Finished => break, + ChildOutput::Stderr(line) | ChildOutput::Stdout(line) => { + tracing::info!("{}", line); + } + } + } + let exit_status = cmd.exit_status().unwrap(); + if exit_status.success() { + Ok(()) + } else { + Err(anyhow::anyhow!( + "nix copy exited with non-success {}", + exit_status + )) + } +} diff --git a/src/eval.rs b/src/eval.rs new file mode 100644 index 0000000..21ed883 --- /dev/null +++ b/src/eval.rs @@ -0,0 +1,65 @@ +use std::sync::Arc; + +use tokio::{process::Command, sync::mpsc::Sender}; + +use crate::{ + build::BuildLoopMessage, + config::Options, + types::{NixJob, NixJobCacheStatus}, + util::{ChildOutput, WrappedChild}, +}; + +pub async fn nix_eval_jobs( + opts: Arc, + build_tx: Sender, +) -> anyhow::Result<()> { + let mut command = Command::new("nix-eval-jobs"); + // TODO: make this configurable + command.args(&[ + "--force-recurse", + "--max-memory-size", + "3072", + "--workers", + "4", + "--flake", + ".#checks", + ]); + if opts.check_cached { + command.arg("--check-cache-status"); + } + let mut jobs = WrappedChild::new(&mut command, None)?; + loop { + let line = match jobs.next_line().await? { + ChildOutput::Finished => break, + ChildOutput::Stderr(line) => { + tracing::info!("nix-eval-jobs: {}", line); + continue; + } + ChildOutput::Stdout(line) => line, + }; + tracing::trace!("nix-eval-jobs line: {}", line); + let job: NixJob = serde_json::from_str(&line)?; + tracing::debug!("got new job: {:?}", job); + if !opts.systems.contains(&job.system) { + tracing::info!("skipping unwanted system build for {}", job.attr); + continue; + } + match (job.cache_status, job.is_cached) { + (Some(NixJobCacheStatus::Cached), _) | (None, Some(true)) => { + tracing::info!("skipping cached build for {}", job.attr); + continue; + } + (Some(NixJobCacheStatus::Local), _) => { + // TODO: local paths should still be uploaded but needn't be built + // probably when lix-eval-jobs implements `cacheStatus` (in addition to `isCached`) + // they also have output paths already set in the nix-eval-jobs output + tracing::info!("skipping cached build for {}", job.attr); + continue; + } + _ => {} + } + build_tx.send(BuildLoopMessage::Job(job)).await?; + } + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 5c303ca..94f48c7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,17 +1,16 @@ +use std::sync::Arc; + use clap::Parser; use tokio::{process::Command, sync::mpsc}; use tracing::level_filters::LevelFilter; use tracing_subscriber::EnvFilter; -use crate::{ - build::BuildLoopMessage, - config::Options, - types::{NixJob, NixJobCacheStatus}, - util::{ChildOutput, WrappedChild}, -}; +use crate::{build::BuildLoopMessage, config::Options, copy::CopyLoopMessage}; mod build; mod config; +mod copy; +mod eval; mod types; mod util; @@ -36,64 +35,38 @@ async fn main() -> anyhow::Result<()> { tracing::info!("no system specified, defaulting to current {}", system); opts.systems.push(system); } - - let (build_tx, build_rx) = mpsc::channel(4); - let build_loop = tokio::spawn(self::build::build_loop(build_rx)); - - let mut jobs = nix_eval_jobs(&opts)?; - loop { - let line = match jobs.next_line().await? { - ChildOutput::Finished => break, - ChildOutput::Stderr(line) => { - tracing::info!("nix-eval-jobs: {}", line); - continue; - } - ChildOutput::Stdout(line) => line, - }; - tracing::trace!("nix-eval-jobs line: {}", line); - let job: NixJob = serde_json::from_str(&line)?; - tracing::debug!("got new job: {:?}", job); - if !opts.systems.contains(&job.system) { - tracing::info!("skipping unwanted system build for {}", job.attr); - continue; + if let Some(store) = &opts.copy_to { + let cmd = Command::new("nix") + .args(&["store", "ping", "--store", store]) + .output() + .await?; + if !cmd.status.success() { + tracing::error!( + "{:?} is not a valid nix store: {}", + store, + String::from_utf8_lossy(&cmd.stderr), + ); + std::process::exit(1); } - match (job.cache_status, job.is_cached) { - (Some(NixJobCacheStatus::Cached), _) | (None, Some(true)) => { - tracing::info!("skipping cached build for {}", job.attr); - continue; - } - (Some(NixJobCacheStatus::Local), _) => { - // TODO: local paths should still be uploaded but needn't be built - // probably when lix-eval-jobs implements `cacheStatus` (in addition to `isCached`) - // they also have output paths already set in the nix-eval-jobs output - tracing::info!("skipping cached build for {}", job.attr); - continue; - } - _ => {} - } - build_tx.send(BuildLoopMessage::Job(job)).await?; } + tracing::debug!("running with options {:?}", opts); + let opts = Arc::new(opts); + + let (build_tx, build_rx) = mpsc::channel(16); + let (copy_tx, copy_rx) = mpsc::channel(16); + let eval_loop = tokio::spawn(crate::eval::nix_eval_jobs( + Arc::clone(&opts), + build_tx.clone(), + )); + let build_loop = tokio::spawn(crate::build::build_loop(build_rx, copy_tx.clone())); + let copy_loop = tokio::spawn(crate::copy::copy_loop(Arc::clone(&opts), copy_rx)); + + eval_loop.await??; build_tx.send(BuildLoopMessage::Stop).await?; build_loop.await?; + copy_tx.send(CopyLoopMessage::Stop).await?; + copy_loop.await?; Ok(()) } - -fn nix_eval_jobs(opts: &Options) -> anyhow::Result { - let mut command = Command::new("nix-eval-jobs"); - // TODO: make this configurable - command.args(&[ - "--force-recurse", - "--max-memory-size", - "3072", - "--workers", - "4", - "--flake", - ".#checks", - ]); - if opts.check_cached { - command.arg("--check-cache-status"); - } - Ok(WrappedChild::new(&mut command, None)?) -} diff --git a/src/types.rs b/src/types.rs index ff5f42b..4f86131 100644 --- a/src/types.rs +++ b/src/types.rs @@ -157,3 +157,13 @@ pub enum NixInternalLogLineResult { log: String, }, } + +#[derive(Deserialize, Debug)] +pub struct NixDerivationInfo { + pub outputs: HashMap, +} + +#[derive(Deserialize, Debug)] +pub struct NixDerivationInfoOutput { + pub path: Option, +}