fix remote builds, add copying

This commit is contained in:
⛧-440729 [sophie] 2025-05-06 22:37:39 +02:00
parent 8e1813ca0d
commit 529c88f0fb
No known key found for this signature in database
GPG key ID: 8566000000440729
7 changed files with 342 additions and 103 deletions

View file

@ -1,8 +1,12 @@
use std::collections::{HashMap, VecDeque};
use tokio::{process::Command, sync::mpsc::Receiver};
use tokio::{
process::Command,
sync::mpsc::{Receiver, Sender},
};
use crate::{
copy::CopyLoopMessage,
types::{
NixInternalLogLine, NixInternalLogLineActivity, NixInternalLogLineActivityType,
NixInternalLogLineResult, NixJob,
@ -15,10 +19,22 @@ pub enum BuildLoopMessage {
Stop,
}
#[derive(Debug)]
pub struct BuildResult {
pub path: String,
pub result: BuildResultType,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BuildResultType {
Built,
Failed,
Unknown,
}
#[derive(Debug)]
struct ActivityData {
#[allow(dead_code)]
parent: Option<u64>,
parent: u64,
r#type: NixInternalLogLineActivityType,
running: bool,
data: Option<ActivityDataPerType>,
@ -38,14 +54,16 @@ enum BuildState {
Failed,
}
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone)]
struct BuildProgress {
done: u64,
running: u64,
failed: u64,
started_builds: VecDeque<u64>,
reported_builds: VecDeque<BuildState>,
}
pub async fn build_loop(mut rx: Receiver<BuildLoopMessage>) {
pub async fn build_loop(mut rx: Receiver<BuildLoopMessage>, copy_tx: Sender<CopyLoopMessage>) {
let mut paths_built = Vec::new();
while let Some(msg) = rx.recv().await {
let job = match msg {
@ -58,15 +76,23 @@ pub async fn build_loop(mut rx: Receiver<BuildLoopMessage>) {
tracing::info!("building {}", job.attr);
if !paths_built.contains(&job.drv_path) {
paths_built.push(job.drv_path.clone());
if let Err(e) = run_build(job).await {
tracing::error!("nix build process errored! {}", e);
match run_build(job).await {
Err(e) => tracing::error!("nix build process errored! {}", e),
Ok(results) => {
for result in results {
if let Err(e) = copy_tx.send(CopyLoopMessage::Build(result)).await {
tracing::error!("failed to enqueue package copy: {}", e);
break;
}
}
}
}
}
}
}
#[tracing::instrument(skip(job), fields(attr = job.attr))]
pub async fn run_build(job: NixJob) -> anyhow::Result<()> {
pub async fn run_build(job: NixJob) -> anyhow::Result<Vec<BuildResult>> {
let mut child = WrappedChild::new(
Command::new("nix")
.args(&[
@ -80,13 +106,8 @@ pub async fn run_build(job: NixJob) -> anyhow::Result<()> {
Some(format!("nix build {}", job.attr)),
)?;
let mut activities = HashMap::<u64, ActivityData>::new();
let mut progress = BuildProgress {
done: 0,
running: 0,
failed: 0,
};
let mut started_builds = VecDeque::new();
let mut reported_builds = VecDeque::new();
// build progress per parent as remote builds are run with a nested parent (scoped under the Realise activity id)
let mut progress = HashMap::<u64, BuildProgress>::new();
loop {
let line = match child.next_line().await? {
ChildOutput::Finished => break,
@ -136,9 +157,12 @@ pub async fn run_build(job: NixJob) -> anyhow::Result<()> {
match activity.r#type {
// meta-progress counting the derivations being built as part of the build
NixInternalLogLineActivityType::Builds => {
while running > progress.running {
progress.running += 1;
if let Some(build_activity_id) = started_builds.pop_front()
let progress_entry =
progress.get_mut(&activity.parent).unwrap();
while running > progress_entry.running {
progress_entry.running += 1;
if let Some(build_activity_id) =
progress_entry.started_builds.pop_front()
{
let Some(a) = activities.get_mut(&build_activity_id)
else {
@ -158,13 +182,15 @@ pub async fn run_build(job: NixJob) -> anyhow::Result<()> {
);
}
}
while failed > progress.failed {
progress.failed += 1;
reported_builds.push_back(BuildState::Failed);
while failed > progress_entry.failed {
progress_entry.failed += 1;
progress_entry
.reported_builds
.push_back(BuildState::Failed);
}
while done > progress.done {
progress.done += 1;
reported_builds.push_back(BuildState::Done);
while done > progress_entry.done {
progress_entry.done += 1;
progress_entry.reported_builds.push_back(BuildState::Done);
}
}
// meta-progress counting the paths being downloaded as part of the build
@ -200,15 +226,12 @@ pub async fn run_build(job: NixJob) -> anyhow::Result<()> {
parent,
activity
);
let mut entry = {
let parent = if parent == 0 { None } else { Some(parent) };
activities.entry(id).insert_entry(ActivityData {
parent,
r#type: activity.get_type(),
running: true,
data: None,
})
};
let mut entry = activities.entry(id).insert_entry(ActivityData {
parent,
r#type: activity.get_type(),
running: true,
data: None,
});
if !text.is_empty() {
tracing::info!("{}", text);
}
@ -216,7 +239,15 @@ pub async fn run_build(job: NixJob) -> anyhow::Result<()> {
// these seem to be meta-activities for reporting progress
NixInternalLogLineActivity::Builds => {}
NixInternalLogLineActivity::CopyPaths => {}
NixInternalLogLineActivity::Realise => {}
NixInternalLogLineActivity::Realise => {
progress.entry(parent).or_insert_with(|| BuildProgress {
done: 0,
running: 0,
failed: 0,
started_builds: VecDeque::new(),
reported_builds: VecDeque::new(),
});
}
// just ignore this, we don't really care about these
NixInternalLogLineActivity::Unknown => {}
NixInternalLogLineActivity::FileTransfer => {}
@ -239,7 +270,9 @@ pub async fn run_build(job: NixJob) -> anyhow::Result<()> {
drv_path,
state: BuildState::Started,
});
started_builds.push_back(id);
progress
.entry(parent)
.and_modify(|progress| progress.started_builds.push_back(id));
}
_ => {
tracing::warn!(
@ -262,7 +295,8 @@ pub async fn run_build(job: NixJob) -> anyhow::Result<()> {
else {
panic!("this can't happen (build activity without build data)");
};
if let Some(report) = reported_builds.pop_front() {
let progress_entry = progress.get_mut(&entry.parent).unwrap();
if let Some(report) = progress_entry.reported_builds.pop_front() {
*state = report;
} else {
tracing::warn!(
@ -282,19 +316,41 @@ pub async fn run_build(job: NixJob) -> anyhow::Result<()> {
if !exit_status.success() {
tracing::warn!("nix build failed with {}!", exit_status);
}
let mut results: HashMap<String, BuildResultType> = HashMap::new();
for (_, a) in &activities {
match a.data.as_ref() {
Some(ActivityDataPerType::Build { drv_path, state }) => match state {
BuildState::Done => tracing::info!("derivation {} built successfully", drv_path),
BuildState::Failed => tracing::warn!("derivation {} failed to build", drv_path),
_ => tracing::warn!("derivation {} somehow still running??", drv_path),
},
Some(ActivityDataPerType::Build { drv_path, state }) => {
let result = match state {
BuildState::Done => {
tracing::info!("derivation {} built successfully", drv_path);
BuildResultType::Built
}
BuildState::Failed => {
tracing::warn!("derivation {} failed to build", drv_path);
BuildResultType::Failed
}
_ => {
tracing::warn!("derivation {} somehow still running??", drv_path);
BuildResultType::Unknown
}
};
results
.entry(drv_path.clone())
.and_modify(|r| {
if result == BuildResultType::Built {
*r = result;
}
})
.or_insert(result);
}
Some(ActivityDataPerType::Substitute { store_path }) => {
tracing::info!("downloaded {}", store_path);
}
_ => {}
}
}
// TODO: return the drv paths to upload!
Ok(())
Ok(results
.into_iter()
.map(|(path, result)| BuildResult { path, result })
.collect())
}