collect results, handle failing evals
This commit is contained in:
parent
529c88f0fb
commit
657f324865
5 changed files with 166 additions and 20 deletions
46
src/build.rs
46
src/build.rs
|
|
@ -6,10 +6,11 @@ use tokio::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
NixCiResult, NixCiResultType,
|
||||||
copy::CopyLoopMessage,
|
copy::CopyLoopMessage,
|
||||||
types::{
|
types::{
|
||||||
NixInternalLogLine, NixInternalLogLineActivity, NixInternalLogLineActivityType,
|
NixInternalLogLine, NixInternalLogLineActivity, NixInternalLogLineActivityType,
|
||||||
NixInternalLogLineResult, NixJob,
|
NixInternalLogLineResult, NixJob, NixJobEnum,
|
||||||
},
|
},
|
||||||
util::{ChildOutput, WrappedChild},
|
util::{ChildOutput, WrappedChild},
|
||||||
};
|
};
|
||||||
|
|
@ -63,7 +64,11 @@ struct BuildProgress {
|
||||||
reported_builds: VecDeque<BuildState>,
|
reported_builds: VecDeque<BuildState>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn build_loop(mut rx: Receiver<BuildLoopMessage>, copy_tx: Sender<CopyLoopMessage>) {
|
pub async fn build_loop(
|
||||||
|
mut rx: Receiver<BuildLoopMessage>,
|
||||||
|
copy_tx: Sender<CopyLoopMessage>,
|
||||||
|
result_tx: Sender<NixCiResult>,
|
||||||
|
) {
|
||||||
let mut paths_built = Vec::new();
|
let mut paths_built = Vec::new();
|
||||||
while let Some(msg) = rx.recv().await {
|
while let Some(msg) = rx.recv().await {
|
||||||
let job = match msg {
|
let job = match msg {
|
||||||
|
|
@ -74,12 +79,31 @@ pub async fn build_loop(mut rx: Receiver<BuildLoopMessage>, copy_tx: Sender<Copy
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
tracing::info!("building {}", job.attr);
|
tracing::info!("building {}", job.attr);
|
||||||
if !paths_built.contains(&job.drv_path) {
|
let NixJobEnum::Success(eval_job) = job.job else {
|
||||||
paths_built.push(job.drv_path.clone());
|
continue;
|
||||||
match run_build(job).await {
|
};
|
||||||
Err(e) => tracing::error!("nix build process errored! {}", e),
|
if !paths_built.contains(&eval_job.drv_path) {
|
||||||
|
paths_built.push(eval_job.drv_path.clone());
|
||||||
|
match run_build(job.attr, eval_job.drv_path.clone()).await {
|
||||||
|
Err(e) => {
|
||||||
|
let _ = result_tx
|
||||||
|
.send(NixCiResult {
|
||||||
|
r#type: NixCiResultType::Build,
|
||||||
|
path: eval_job.drv_path.clone(),
|
||||||
|
success: false,
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
tracing::error!("nix build process errored! {}", e);
|
||||||
|
}
|
||||||
Ok(results) => {
|
Ok(results) => {
|
||||||
for result in results {
|
for result in results {
|
||||||
|
let _ = result_tx
|
||||||
|
.send(NixCiResult {
|
||||||
|
r#type: NixCiResultType::Build,
|
||||||
|
path: result.path.clone(),
|
||||||
|
success: result.result == BuildResultType::Built,
|
||||||
|
})
|
||||||
|
.await;
|
||||||
if let Err(e) = copy_tx.send(CopyLoopMessage::Build(result)).await {
|
if let Err(e) = copy_tx.send(CopyLoopMessage::Build(result)).await {
|
||||||
tracing::error!("failed to enqueue package copy: {}", e);
|
tracing::error!("failed to enqueue package copy: {}", e);
|
||||||
break;
|
break;
|
||||||
|
|
@ -91,8 +115,8 @@ pub async fn build_loop(mut rx: Receiver<BuildLoopMessage>, copy_tx: Sender<Copy
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(job), fields(attr = job.attr))]
|
#[tracing::instrument(skip(drv_path))]
|
||||||
pub async fn run_build(job: NixJob) -> anyhow::Result<Vec<BuildResult>> {
|
pub async fn run_build(attr: String, drv_path: String) -> anyhow::Result<Vec<BuildResult>> {
|
||||||
let mut child = WrappedChild::new(
|
let mut child = WrappedChild::new(
|
||||||
Command::new("nix")
|
Command::new("nix")
|
||||||
.args(&[
|
.args(&[
|
||||||
|
|
@ -102,8 +126,8 @@ pub async fn run_build(job: NixJob) -> anyhow::Result<Vec<BuildResult>> {
|
||||||
"--log-format",
|
"--log-format",
|
||||||
"internal-json",
|
"internal-json",
|
||||||
])
|
])
|
||||||
.arg(format!("{}^*", job.drv_path)),
|
.arg(format!("{}^*", drv_path)),
|
||||||
Some(format!("nix build {}", job.attr)),
|
Some(format!("nix build {}", attr)),
|
||||||
)?;
|
)?;
|
||||||
let mut activities = HashMap::<u64, ActivityData>::new();
|
let mut activities = HashMap::<u64, ActivityData>::new();
|
||||||
// build progress per parent as remote builds are run with a nested parent (scoped under the Realise activity id)
|
// build progress per parent as remote builds are run with a nested parent (scoped under the Realise activity id)
|
||||||
|
|
@ -326,7 +350,7 @@ pub async fn run_build(job: NixJob) -> anyhow::Result<Vec<BuildResult>> {
|
||||||
BuildResultType::Built
|
BuildResultType::Built
|
||||||
}
|
}
|
||||||
BuildState::Failed => {
|
BuildState::Failed => {
|
||||||
tracing::warn!("derivation {} failed to build", drv_path);
|
tracing::error!("derivation {} failed to build", drv_path);
|
||||||
BuildResultType::Failed
|
BuildResultType::Failed
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
|
|
|
||||||
41
src/copy.rs
41
src/copy.rs
|
|
@ -1,8 +1,12 @@
|
||||||
use std::{collections::HashMap, sync::Arc};
|
use std::{collections::HashMap, sync::Arc};
|
||||||
|
|
||||||
use tokio::{process::Command, sync::mpsc::Receiver};
|
use tokio::{
|
||||||
|
process::Command,
|
||||||
|
sync::mpsc::{Receiver, Sender},
|
||||||
|
};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
NixCiResult, NixCiResultType,
|
||||||
build::{BuildResult, BuildResultType},
|
build::{BuildResult, BuildResultType},
|
||||||
config::Options,
|
config::Options,
|
||||||
types::NixDerivationInfo,
|
types::NixDerivationInfo,
|
||||||
|
|
@ -14,7 +18,11 @@ pub enum CopyLoopMessage {
|
||||||
Stop,
|
Stop,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn copy_loop(opts: Arc<Options>, mut rx: Receiver<CopyLoopMessage>) {
|
pub async fn copy_loop(
|
||||||
|
opts: Arc<Options>,
|
||||||
|
mut rx: Receiver<CopyLoopMessage>,
|
||||||
|
result_tx: Sender<NixCiResult>,
|
||||||
|
) {
|
||||||
let mut paths_copied = Vec::new();
|
let mut paths_copied = Vec::new();
|
||||||
while let Some(msg) = rx.recv().await {
|
while let Some(msg) = rx.recv().await {
|
||||||
let result = match msg {
|
let result = match msg {
|
||||||
|
|
@ -32,6 +40,13 @@ pub async fn copy_loop(opts: Arc<Options>, mut rx: Receiver<CopyLoopMessage>) {
|
||||||
let valid_paths = match get_valid_outputs(&result.path).await {
|
let valid_paths = match get_valid_outputs(&result.path).await {
|
||||||
Ok(valid_paths) => valid_paths,
|
Ok(valid_paths) => valid_paths,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
let _ = result_tx
|
||||||
|
.send(NixCiResult {
|
||||||
|
r#type: NixCiResultType::Copy,
|
||||||
|
path: result.path.clone(),
|
||||||
|
success: false,
|
||||||
|
})
|
||||||
|
.await;
|
||||||
tracing::warn!("failed to get valid paths for drv {}: {}", result.path, e);
|
tracing::warn!("failed to get valid paths for drv {}: {}", result.path, e);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
@ -45,8 +60,26 @@ pub async fn copy_loop(opts: Arc<Options>, mut rx: Receiver<CopyLoopMessage>) {
|
||||||
}
|
}
|
||||||
paths_copied.push(store_path.clone());
|
paths_copied.push(store_path.clone());
|
||||||
match copy_path(store_path, copy_to).await {
|
match copy_path(store_path, copy_to).await {
|
||||||
Ok(()) => tracing::info!("copied path {}", store_path),
|
Ok(()) => {
|
||||||
Err(e) => tracing::warn!("failed to copy path {}: {}", store_path, e),
|
let _ = result_tx
|
||||||
|
.send(NixCiResult {
|
||||||
|
r#type: NixCiResultType::Copy,
|
||||||
|
path: store_path.clone(),
|
||||||
|
success: true,
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
tracing::info!("copied path {}", store_path);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
let _ = result_tx
|
||||||
|
.send(NixCiResult {
|
||||||
|
r#type: NixCiResultType::Copy,
|
||||||
|
path: store_path.clone(),
|
||||||
|
success: false,
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
tracing::error!("failed to copy path {}: {}", store_path, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
29
src/eval.rs
29
src/eval.rs
|
|
@ -3,15 +3,17 @@ use std::sync::Arc;
|
||||||
use tokio::{process::Command, sync::mpsc::Sender};
|
use tokio::{process::Command, sync::mpsc::Sender};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
NixCiResult, NixCiResultType,
|
||||||
build::BuildLoopMessage,
|
build::BuildLoopMessage,
|
||||||
config::Options,
|
config::Options,
|
||||||
types::{NixJob, NixJobCacheStatus},
|
types::{NixJob, NixJobCacheStatus, NixJobEnum},
|
||||||
util::{ChildOutput, WrappedChild},
|
util::{ChildOutput, WrappedChild},
|
||||||
};
|
};
|
||||||
|
|
||||||
pub async fn nix_eval_jobs(
|
pub async fn nix_eval_jobs(
|
||||||
opts: Arc<Options>,
|
opts: Arc<Options>,
|
||||||
build_tx: Sender<BuildLoopMessage>,
|
build_tx: Sender<BuildLoopMessage>,
|
||||||
|
result_tx: Sender<NixCiResult>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let mut command = Command::new("nix-eval-jobs");
|
let mut command = Command::new("nix-eval-jobs");
|
||||||
// TODO: make this configurable
|
// TODO: make this configurable
|
||||||
|
|
@ -40,11 +42,32 @@ pub async fn nix_eval_jobs(
|
||||||
tracing::trace!("nix-eval-jobs line: {}", line);
|
tracing::trace!("nix-eval-jobs line: {}", line);
|
||||||
let job: NixJob = serde_json::from_str(&line)?;
|
let job: NixJob = serde_json::from_str(&line)?;
|
||||||
tracing::debug!("got new job: {:?}", job);
|
tracing::debug!("got new job: {:?}", job);
|
||||||
if !opts.systems.contains(&job.system) {
|
let eval_job = match &job.job {
|
||||||
|
NixJobEnum::Error { error } => {
|
||||||
|
let _ = result_tx
|
||||||
|
.send(NixCiResult {
|
||||||
|
r#type: NixCiResultType::Eval,
|
||||||
|
path: job.attr.clone(),
|
||||||
|
success: false,
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
tracing::error!("error evaluating {}: {}", job.attr, error);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
NixJobEnum::Success(eval_job) => eval_job,
|
||||||
|
};
|
||||||
|
let _ = result_tx
|
||||||
|
.send(NixCiResult {
|
||||||
|
r#type: NixCiResultType::Eval,
|
||||||
|
path: job.attr.clone(),
|
||||||
|
success: true,
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
if !opts.systems.contains(&eval_job.system) {
|
||||||
tracing::info!("skipping unwanted system build for {}", job.attr);
|
tracing::info!("skipping unwanted system build for {}", job.attr);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
match (job.cache_status, job.is_cached) {
|
match (eval_job.cache_status, eval_job.is_cached) {
|
||||||
(Some(NixJobCacheStatus::Cached), _) | (None, Some(true)) => {
|
(Some(NixJobCacheStatus::Cached), _) | (None, Some(true)) => {
|
||||||
tracing::info!("skipping cached build for {}", job.attr);
|
tracing::info!("skipping cached build for {}", job.attr);
|
||||||
continue;
|
continue;
|
||||||
|
|
|
||||||
56
src/main.rs
56
src/main.rs
|
|
@ -14,6 +14,20 @@ mod eval;
|
||||||
mod types;
|
mod types;
|
||||||
mod util;
|
mod util;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
struct NixCiResult {
|
||||||
|
r#type: NixCiResultType,
|
||||||
|
path: String,
|
||||||
|
success: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
enum NixCiResultType {
|
||||||
|
Eval,
|
||||||
|
Build,
|
||||||
|
Copy,
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
let mut opts = Options::parse();
|
let mut opts = Options::parse();
|
||||||
|
|
@ -53,14 +67,34 @@ async fn main() -> anyhow::Result<()> {
|
||||||
tracing::debug!("running with options {:?}", opts);
|
tracing::debug!("running with options {:?}", opts);
|
||||||
let opts = Arc::new(opts);
|
let opts = Arc::new(opts);
|
||||||
|
|
||||||
|
let (result_tx, mut result_rx) = mpsc::channel(16);
|
||||||
|
|
||||||
let (build_tx, build_rx) = mpsc::channel(16);
|
let (build_tx, build_rx) = mpsc::channel(16);
|
||||||
let (copy_tx, copy_rx) = mpsc::channel(16);
|
let (copy_tx, copy_rx) = mpsc::channel(16);
|
||||||
let eval_loop = tokio::spawn(crate::eval::nix_eval_jobs(
|
let eval_loop = tokio::spawn(crate::eval::nix_eval_jobs(
|
||||||
Arc::clone(&opts),
|
Arc::clone(&opts),
|
||||||
build_tx.clone(),
|
build_tx.clone(),
|
||||||
|
result_tx.clone(),
|
||||||
));
|
));
|
||||||
let build_loop = tokio::spawn(crate::build::build_loop(build_rx, copy_tx.clone()));
|
let build_loop = tokio::spawn(crate::build::build_loop(
|
||||||
let copy_loop = tokio::spawn(crate::copy::copy_loop(Arc::clone(&opts), copy_rx));
|
build_rx,
|
||||||
|
copy_tx.clone(),
|
||||||
|
result_tx.clone(),
|
||||||
|
));
|
||||||
|
let copy_loop = tokio::spawn(crate::copy::copy_loop(
|
||||||
|
Arc::clone(&opts),
|
||||||
|
copy_rx,
|
||||||
|
result_tx.clone(),
|
||||||
|
));
|
||||||
|
drop(result_tx);
|
||||||
|
|
||||||
|
let results_collector = tokio::spawn(async move {
|
||||||
|
let mut results = Vec::new();
|
||||||
|
while let Some(r) = result_rx.recv().await {
|
||||||
|
results.push(r);
|
||||||
|
}
|
||||||
|
results
|
||||||
|
});
|
||||||
|
|
||||||
eval_loop.await??;
|
eval_loop.await??;
|
||||||
build_tx.send(BuildLoopMessage::Stop).await?;
|
build_tx.send(BuildLoopMessage::Stop).await?;
|
||||||
|
|
@ -68,5 +102,23 @@ async fn main() -> anyhow::Result<()> {
|
||||||
copy_tx.send(CopyLoopMessage::Stop).await?;
|
copy_tx.send(CopyLoopMessage::Stop).await?;
|
||||||
copy_loop.await?;
|
copy_loop.await?;
|
||||||
|
|
||||||
|
let results = results_collector.await?;
|
||||||
|
let mut failed = false;
|
||||||
|
for result in results {
|
||||||
|
if result.success {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
failed = true;
|
||||||
|
let job_text = match result.r#type {
|
||||||
|
NixCiResultType::Eval => "eval",
|
||||||
|
NixCiResultType::Build => "build",
|
||||||
|
NixCiResultType::Copy => "copy",
|
||||||
|
};
|
||||||
|
tracing::error!("{} for \"{}\" failed", job_text, result.path);
|
||||||
|
}
|
||||||
|
if failed {
|
||||||
|
return Err(anyhow::anyhow!("some builds failed"));
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
14
src/types.rs
14
src/types.rs
|
|
@ -17,6 +17,20 @@ pub enum NixJobCacheStatus {
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct NixJob {
|
pub struct NixJob {
|
||||||
pub attr: String,
|
pub attr: String,
|
||||||
|
#[serde(flatten)]
|
||||||
|
pub job: NixJobEnum,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Debug, Clone)]
|
||||||
|
#[serde(untagged, rename_all = "camelCase")]
|
||||||
|
pub enum NixJobEnum {
|
||||||
|
Success(NixEvalJob),
|
||||||
|
Error { error: String },
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Debug, Clone)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct NixEvalJob {
|
||||||
pub name: String,
|
pub name: String,
|
||||||
pub drv_path: String,
|
pub drv_path: String,
|
||||||
pub system: String,
|
pub system: String,
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue