initial commit
This commit is contained in:
commit
8e1813ca0d
14 changed files with 2005 additions and 0 deletions
300
src/build.rs
Normal file
300
src/build.rs
Normal file
|
|
@ -0,0 +1,300 @@
|
|||
use std::collections::{HashMap, VecDeque};
|
||||
|
||||
use tokio::{process::Command, sync::mpsc::Receiver};
|
||||
|
||||
use crate::{
|
||||
types::{
|
||||
NixInternalLogLine, NixInternalLogLineActivity, NixInternalLogLineActivityType,
|
||||
NixInternalLogLineResult, NixJob,
|
||||
},
|
||||
util::{ChildOutput, WrappedChild},
|
||||
};
|
||||
|
||||
pub enum BuildLoopMessage {
|
||||
Job(NixJob),
|
||||
Stop,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ActivityData {
|
||||
#[allow(dead_code)]
|
||||
parent: Option<u64>,
|
||||
r#type: NixInternalLogLineActivityType,
|
||||
running: bool,
|
||||
data: Option<ActivityDataPerType>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum ActivityDataPerType {
|
||||
Build { drv_path: String, state: BuildState },
|
||||
Substitute { store_path: String },
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum BuildState {
|
||||
Started,
|
||||
Running,
|
||||
Done,
|
||||
Failed,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
struct BuildProgress {
|
||||
done: u64,
|
||||
running: u64,
|
||||
failed: u64,
|
||||
}
|
||||
|
||||
pub async fn build_loop(mut rx: Receiver<BuildLoopMessage>) {
|
||||
let mut paths_built = Vec::new();
|
||||
while let Some(msg) = rx.recv().await {
|
||||
let job = match msg {
|
||||
BuildLoopMessage::Job(job) => job,
|
||||
BuildLoopMessage::Stop => {
|
||||
tracing::debug!("stop signal received");
|
||||
break;
|
||||
}
|
||||
};
|
||||
tracing::info!("building {}", job.attr);
|
||||
if !paths_built.contains(&job.drv_path) {
|
||||
paths_built.push(job.drv_path.clone());
|
||||
if let Err(e) = run_build(job).await {
|
||||
tracing::error!("nix build process errored! {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(job), fields(attr = job.attr))]
|
||||
pub async fn run_build(job: NixJob) -> anyhow::Result<()> {
|
||||
let mut child = WrappedChild::new(
|
||||
Command::new("nix")
|
||||
.args(&[
|
||||
"build",
|
||||
"--keep-going",
|
||||
"--verbose",
|
||||
"--log-format",
|
||||
"internal-json",
|
||||
])
|
||||
.arg(format!("{}^*", job.drv_path)),
|
||||
Some(format!("nix build {}", job.attr)),
|
||||
)?;
|
||||
let mut activities = HashMap::<u64, ActivityData>::new();
|
||||
let mut progress = BuildProgress {
|
||||
done: 0,
|
||||
running: 0,
|
||||
failed: 0,
|
||||
};
|
||||
let mut started_builds = VecDeque::new();
|
||||
let mut reported_builds = VecDeque::new();
|
||||
loop {
|
||||
let line = match child.next_line().await? {
|
||||
ChildOutput::Finished => break,
|
||||
ChildOutput::Stderr(line) => line,
|
||||
ChildOutput::Stdout(line) => {
|
||||
tracing::info!(line);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if !line.starts_with("@nix ") {
|
||||
tracing::warn!("invalid nix build output: {}", line);
|
||||
continue;
|
||||
}
|
||||
let log: NixInternalLogLine = match serde_json::from_str(&line[4..]) {
|
||||
Ok(log) => log,
|
||||
Err(e) => {
|
||||
tracing::warn!("failed to parse nix log output {:?}: {}", line, e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
tracing::trace!("log line: {:?}", log);
|
||||
match log {
|
||||
NixInternalLogLine::Msg { msg, .. } => {
|
||||
tracing::info!("{}", msg);
|
||||
}
|
||||
NixInternalLogLine::Result { id, result } => {
|
||||
let activity = activities.get_mut(&id);
|
||||
if activity.is_none() {
|
||||
tracing::warn!("received result for unknown activity {}!", id);
|
||||
}
|
||||
match result {
|
||||
NixInternalLogLineResult::BuildLogLine { log }
|
||||
| NixInternalLogLineResult::PostBuildLogLine { log } => {
|
||||
tracing::info!("{}", log)
|
||||
}
|
||||
// apparently only reported for the realise activity, with activity types of
|
||||
// FileTransfer and CopyPath. contains bytes downloaded and unpacked, respectively
|
||||
NixInternalLogLineResult::SetExpected { .. } => {}
|
||||
// progress reporting, depends on the activity type
|
||||
NixInternalLogLineResult::Progress {
|
||||
done,
|
||||
expected: _,
|
||||
running,
|
||||
failed,
|
||||
} => {
|
||||
if let Some(activity) = activity {
|
||||
match activity.r#type {
|
||||
// meta-progress counting the derivations being built as part of the build
|
||||
NixInternalLogLineActivityType::Builds => {
|
||||
while running > progress.running {
|
||||
progress.running += 1;
|
||||
if let Some(build_activity_id) = started_builds.pop_front()
|
||||
{
|
||||
let Some(a) = activities.get_mut(&build_activity_id)
|
||||
else {
|
||||
panic!("this can't happen (id without activity)");
|
||||
};
|
||||
let ActivityDataPerType::Build { state, .. } =
|
||||
a.data.as_mut().unwrap()
|
||||
else {
|
||||
panic!(
|
||||
"this can't happen (build activity without build data)"
|
||||
);
|
||||
};
|
||||
*state = BuildState::Running;
|
||||
} else {
|
||||
tracing::warn!(
|
||||
"no build started but we got a running report?"
|
||||
);
|
||||
}
|
||||
}
|
||||
while failed > progress.failed {
|
||||
progress.failed += 1;
|
||||
reported_builds.push_back(BuildState::Failed);
|
||||
}
|
||||
while done > progress.done {
|
||||
progress.done += 1;
|
||||
reported_builds.push_back(BuildState::Done);
|
||||
}
|
||||
}
|
||||
// meta-progress counting the paths being downloaded as part of the build
|
||||
NixInternalLogLineActivityType::CopyPaths => {}
|
||||
// progress for a single download
|
||||
NixInternalLogLineActivityType::FileTransfer => {}
|
||||
// progress for a file unpack(?) downloaded nar -> store
|
||||
NixInternalLogLineActivityType::CopyPath => {}
|
||||
_ => {
|
||||
tracing::warn!(
|
||||
"progress reported for unhandled activity type {:?}",
|
||||
activity.r#type
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
tracing::warn!("unhandled result: activity {}, {:?}", id, result);
|
||||
}
|
||||
}
|
||||
}
|
||||
NixInternalLogLine::Start {
|
||||
id,
|
||||
level: _,
|
||||
text,
|
||||
parent,
|
||||
activity,
|
||||
} => {
|
||||
tracing::debug!(
|
||||
"activity {} (parent {}) started: {:?}",
|
||||
id,
|
||||
parent,
|
||||
activity
|
||||
);
|
||||
let mut entry = {
|
||||
let parent = if parent == 0 { None } else { Some(parent) };
|
||||
activities.entry(id).insert_entry(ActivityData {
|
||||
parent,
|
||||
r#type: activity.get_type(),
|
||||
running: true,
|
||||
data: None,
|
||||
})
|
||||
};
|
||||
if !text.is_empty() {
|
||||
tracing::info!("{}", text);
|
||||
}
|
||||
match activity {
|
||||
// these seem to be meta-activities for reporting progress
|
||||
NixInternalLogLineActivity::Builds => {}
|
||||
NixInternalLogLineActivity::CopyPaths => {}
|
||||
NixInternalLogLineActivity::Realise => {}
|
||||
// just ignore this, we don't really care about these
|
||||
NixInternalLogLineActivity::Unknown => {}
|
||||
NixInternalLogLineActivity::FileTransfer => {}
|
||||
NixInternalLogLineActivity::QueryPathInfo { .. } => {
|
||||
// has FileTransfer as child
|
||||
}
|
||||
NixInternalLogLineActivity::CopyPath => {
|
||||
// has FileTransfer as child (when downloading)
|
||||
}
|
||||
NixInternalLogLineActivity::Substitute { store_path, .. } => {
|
||||
// has CopyPath as child
|
||||
entry
|
||||
.get_mut()
|
||||
.data
|
||||
.replace(ActivityDataPerType::Substitute { store_path });
|
||||
}
|
||||
// useful activities
|
||||
NixInternalLogLineActivity::Build { drv_path, .. } => {
|
||||
entry.get_mut().data.replace(ActivityDataPerType::Build {
|
||||
drv_path,
|
||||
state: BuildState::Started,
|
||||
});
|
||||
started_builds.push_back(id);
|
||||
}
|
||||
_ => {
|
||||
tracing::warn!(
|
||||
"unhandled start: activity {} (parent {}), {:?}",
|
||||
id,
|
||||
parent,
|
||||
activity
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
NixInternalLogLine::Stop { id } => {
|
||||
tracing::debug!("activity {} stopped", id);
|
||||
if let Some(entry) = activities.get_mut(&id) {
|
||||
entry.running = false;
|
||||
match entry.r#type {
|
||||
NixInternalLogLineActivityType::Build => {
|
||||
let ActivityDataPerType::Build { state, .. } =
|
||||
entry.data.as_mut().unwrap()
|
||||
else {
|
||||
panic!("this can't happen (build activity without build data)");
|
||||
};
|
||||
if let Some(report) = reported_builds.pop_front() {
|
||||
*state = report;
|
||||
} else {
|
||||
tracing::warn!(
|
||||
"build stopped but no report (don't know whether it failed)"
|
||||
);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let exit_status = child
|
||||
.exit_status()
|
||||
.ok_or(anyhow::anyhow!("child has exited, but no exit status????"))?;
|
||||
if !exit_status.success() {
|
||||
tracing::warn!("nix build failed with {}!", exit_status);
|
||||
}
|
||||
for (_, a) in &activities {
|
||||
match a.data.as_ref() {
|
||||
Some(ActivityDataPerType::Build { drv_path, state }) => match state {
|
||||
BuildState::Done => tracing::info!("derivation {} built successfully", drv_path),
|
||||
BuildState::Failed => tracing::warn!("derivation {} failed to build", drv_path),
|
||||
_ => tracing::warn!("derivation {} somehow still running??", drv_path),
|
||||
},
|
||||
Some(ActivityDataPerType::Substitute { store_path }) => {
|
||||
tracing::info!("downloaded {}", store_path);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
// TODO: return the drv paths to upload!
|
||||
Ok(())
|
||||
}
|
||||
7
src/config.rs
Normal file
7
src/config.rs
Normal file
|
|
@ -0,0 +1,7 @@
|
|||
#[derive(clap::Parser, Debug)]
|
||||
pub struct Options {
|
||||
#[arg(long = "system", short)]
|
||||
pub systems: Vec<String>,
|
||||
#[arg(long)]
|
||||
pub check_cached: bool,
|
||||
}
|
||||
99
src/main.rs
Normal file
99
src/main.rs
Normal file
|
|
@ -0,0 +1,99 @@
|
|||
use clap::Parser;
|
||||
use tokio::{process::Command, sync::mpsc};
|
||||
use tracing::level_filters::LevelFilter;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
use crate::{
|
||||
build::BuildLoopMessage,
|
||||
config::Options,
|
||||
types::{NixJob, NixJobCacheStatus},
|
||||
util::{ChildOutput, WrappedChild},
|
||||
};
|
||||
|
||||
mod build;
|
||||
mod config;
|
||||
mod types;
|
||||
mod util;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let mut opts = Options::parse();
|
||||
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(
|
||||
EnvFilter::builder()
|
||||
.with_default_directive(LevelFilter::INFO.into())
|
||||
.from_env_lossy(),
|
||||
)
|
||||
.init();
|
||||
|
||||
if opts.systems.is_empty() {
|
||||
let cmd = Command::new("nix")
|
||||
.args(&["config", "show", "system"])
|
||||
.output()
|
||||
.await?;
|
||||
let system = std::str::from_utf8(&cmd.stdout)?.trim().into();
|
||||
tracing::info!("no system specified, defaulting to current {}", system);
|
||||
opts.systems.push(system);
|
||||
}
|
||||
|
||||
let (build_tx, build_rx) = mpsc::channel(4);
|
||||
let build_loop = tokio::spawn(self::build::build_loop(build_rx));
|
||||
|
||||
let mut jobs = nix_eval_jobs(&opts)?;
|
||||
loop {
|
||||
let line = match jobs.next_line().await? {
|
||||
ChildOutput::Finished => break,
|
||||
ChildOutput::Stderr(line) => {
|
||||
tracing::info!("nix-eval-jobs: {}", line);
|
||||
continue;
|
||||
}
|
||||
ChildOutput::Stdout(line) => line,
|
||||
};
|
||||
tracing::trace!("nix-eval-jobs line: {}", line);
|
||||
let job: NixJob = serde_json::from_str(&line)?;
|
||||
tracing::debug!("got new job: {:?}", job);
|
||||
if !opts.systems.contains(&job.system) {
|
||||
tracing::info!("skipping unwanted system build for {}", job.attr);
|
||||
continue;
|
||||
}
|
||||
match (job.cache_status, job.is_cached) {
|
||||
(Some(NixJobCacheStatus::Cached), _) | (None, Some(true)) => {
|
||||
tracing::info!("skipping cached build for {}", job.attr);
|
||||
continue;
|
||||
}
|
||||
(Some(NixJobCacheStatus::Local), _) => {
|
||||
// TODO: local paths should still be uploaded but needn't be built
|
||||
// probably when lix-eval-jobs implements `cacheStatus` (in addition to `isCached`)
|
||||
// they also have output paths already set in the nix-eval-jobs output
|
||||
tracing::info!("skipping cached build for {}", job.attr);
|
||||
continue;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
build_tx.send(BuildLoopMessage::Job(job)).await?;
|
||||
}
|
||||
|
||||
build_tx.send(BuildLoopMessage::Stop).await?;
|
||||
build_loop.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn nix_eval_jobs(opts: &Options) -> anyhow::Result<WrappedChild> {
|
||||
let mut command = Command::new("nix-eval-jobs");
|
||||
// TODO: make this configurable
|
||||
command.args(&[
|
||||
"--force-recurse",
|
||||
"--max-memory-size",
|
||||
"3072",
|
||||
"--workers",
|
||||
"4",
|
||||
"--flake",
|
||||
".#checks",
|
||||
]);
|
||||
if opts.check_cached {
|
||||
command.arg("--check-cache-status");
|
||||
}
|
||||
Ok(WrappedChild::new(&mut command, None)?)
|
||||
}
|
||||
159
src/types.rs
Normal file
159
src/types.rs
Normal file
|
|
@ -0,0 +1,159 @@
|
|||
#![allow(dead_code)]
|
||||
use std::collections::HashMap;
|
||||
|
||||
use serde::Deserialize;
|
||||
use strum::FromRepr;
|
||||
|
||||
#[derive(Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub enum NixJobCacheStatus {
|
||||
Cached,
|
||||
Local,
|
||||
NotBuilt,
|
||||
Unknown,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct NixJob {
|
||||
pub attr: String,
|
||||
pub name: String,
|
||||
pub drv_path: String,
|
||||
pub system: String,
|
||||
pub cache_status: Option<NixJobCacheStatus>,
|
||||
pub is_cached: Option<bool>,
|
||||
pub outputs: HashMap<String, Option<String>>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
#[serde(tag = "action", rename_all = "camelCase")]
|
||||
pub enum NixInternalLogLine {
|
||||
Start {
|
||||
id: u64,
|
||||
level: u8,
|
||||
text: String,
|
||||
parent: u64,
|
||||
#[serde(flatten)]
|
||||
activity: NixInternalLogLineActivity,
|
||||
},
|
||||
Stop {
|
||||
id: u64,
|
||||
},
|
||||
Msg {
|
||||
level: u8,
|
||||
msg: String,
|
||||
},
|
||||
Result {
|
||||
id: u64,
|
||||
#[serde(flatten)]
|
||||
result: NixInternalLogLineResult,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(FromRepr, Debug, Clone, Copy)]
|
||||
#[repr(u64)]
|
||||
pub enum NixInternalLogLineActivityType {
|
||||
Unknown = 0,
|
||||
CopyPath = 100,
|
||||
FileTransfer = 101,
|
||||
Realise = 102,
|
||||
CopyPaths = 103,
|
||||
Builds = 104,
|
||||
Build = 105,
|
||||
OptimiseStore = 106,
|
||||
VerifyPaths = 107,
|
||||
Substitute = 108,
|
||||
QueryPathInfo = 109,
|
||||
PostBuildHook = 110,
|
||||
BuildWaiting = 111,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum NixInternalLogLineActivity {
|
||||
Unknown,
|
||||
CopyPath,
|
||||
FileTransfer,
|
||||
Realise,
|
||||
CopyPaths,
|
||||
Builds,
|
||||
Build {
|
||||
drv_path: String,
|
||||
machine: String,
|
||||
cur_round: u64,
|
||||
nr_rounds: u64,
|
||||
},
|
||||
OptimiseStore,
|
||||
VerifyPaths,
|
||||
Substitute {
|
||||
store_path: String,
|
||||
substituter: String,
|
||||
},
|
||||
QueryPathInfo {
|
||||
store_path: String,
|
||||
substituter: String,
|
||||
},
|
||||
PostBuildHook {
|
||||
store_path: String,
|
||||
},
|
||||
BuildWaiting,
|
||||
}
|
||||
|
||||
impl NixInternalLogLineActivity {
|
||||
pub fn get_type(&self) -> NixInternalLogLineActivityType {
|
||||
match self {
|
||||
NixInternalLogLineActivity::Unknown => NixInternalLogLineActivityType::Unknown,
|
||||
NixInternalLogLineActivity::CopyPath => NixInternalLogLineActivityType::CopyPath,
|
||||
NixInternalLogLineActivity::FileTransfer => {
|
||||
NixInternalLogLineActivityType::FileTransfer
|
||||
}
|
||||
NixInternalLogLineActivity::Realise => NixInternalLogLineActivityType::Realise,
|
||||
NixInternalLogLineActivity::CopyPaths => NixInternalLogLineActivityType::CopyPaths,
|
||||
NixInternalLogLineActivity::Builds => NixInternalLogLineActivityType::Builds,
|
||||
NixInternalLogLineActivity::Build { .. } => NixInternalLogLineActivityType::Build,
|
||||
NixInternalLogLineActivity::OptimiseStore => {
|
||||
NixInternalLogLineActivityType::OptimiseStore
|
||||
}
|
||||
NixInternalLogLineActivity::VerifyPaths => NixInternalLogLineActivityType::VerifyPaths,
|
||||
NixInternalLogLineActivity::Substitute { .. } => {
|
||||
NixInternalLogLineActivityType::Substitute
|
||||
}
|
||||
NixInternalLogLineActivity::QueryPathInfo { .. } => {
|
||||
NixInternalLogLineActivityType::QueryPathInfo
|
||||
}
|
||||
NixInternalLogLineActivity::PostBuildHook { .. } => {
|
||||
NixInternalLogLineActivityType::PostBuildHook
|
||||
}
|
||||
NixInternalLogLineActivity::BuildWaiting => {
|
||||
NixInternalLogLineActivityType::BuildWaiting
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum NixInternalLogLineResult {
|
||||
FileLinked {
|
||||
bytes_linked: i64,
|
||||
},
|
||||
BuildLogLine {
|
||||
log: String,
|
||||
},
|
||||
UntrustedPath,
|
||||
CorruptedPath,
|
||||
SetPhase {
|
||||
phase_name: String,
|
||||
},
|
||||
Progress {
|
||||
done: u64,
|
||||
expected: u64,
|
||||
running: u64,
|
||||
failed: u64,
|
||||
},
|
||||
SetExpected {
|
||||
activity_type: NixInternalLogLineActivityType,
|
||||
expected: u64,
|
||||
},
|
||||
PostBuildLogLine {
|
||||
log: String,
|
||||
},
|
||||
}
|
||||
212
src/util.rs
Normal file
212
src/util.rs
Normal file
|
|
@ -0,0 +1,212 @@
|
|||
use std::{
|
||||
pin::Pin,
|
||||
process::{ExitStatus, Stdio},
|
||||
task::Poll,
|
||||
};
|
||||
|
||||
use futures::{
|
||||
FutureExt,
|
||||
future::{BoxFuture, poll_fn},
|
||||
};
|
||||
use tokio::{
|
||||
io::{AsyncBufReadExt, BufReader, Lines},
|
||||
process::{Child, ChildStderr, ChildStdout, Command},
|
||||
};
|
||||
|
||||
pub mod deserializer;
|
||||
|
||||
pub struct WrappedChild {
|
||||
// this ensures the `!Unpin`` internal struct is not moved at all
|
||||
inner: Pin<Box<WrappedChildInternal>>,
|
||||
}
|
||||
|
||||
impl WrappedChild {
|
||||
pub fn new(command: &mut Command, name: Option<String>) -> anyhow::Result<Self> {
|
||||
let name = name.unwrap_or_else(|| command.as_std().get_program().to_string_lossy().into());
|
||||
let mut child = command
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()?;
|
||||
tracing::info!("spawned {}", name);
|
||||
let stdout = BufReader::new(child.stdout.take().unwrap()).lines();
|
||||
let stderr = BufReader::new(child.stderr.take().unwrap()).lines();
|
||||
|
||||
let inner = WrappedChildInternal {
|
||||
name,
|
||||
exit_status: None,
|
||||
child,
|
||||
child_wait_fut: None,
|
||||
stdout: Some(stdout),
|
||||
stderr: Some(stderr),
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
inner: Box::pin(inner),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn next_line(&mut self) -> anyhow::Result<ChildOutput> {
|
||||
poll_fn(|cx| self.inner.as_mut().poll(cx)).await
|
||||
}
|
||||
|
||||
/// return the exit status if the child process has already exited, `None` otherwise
|
||||
pub fn exit_status(&self) -> Option<ExitStatus> {
|
||||
self.inner.exit_status
|
||||
}
|
||||
}
|
||||
|
||||
#[pin_project::pin_project(!Unpin)]
|
||||
struct WrappedChildInternal {
|
||||
name: String,
|
||||
exit_status: Option<ExitStatus>,
|
||||
#[pin]
|
||||
child: Child,
|
||||
#[pin]
|
||||
stdout: Option<Lines<BufReader<ChildStdout>>>,
|
||||
#[pin]
|
||||
stderr: Option<Lines<BufReader<ChildStderr>>>,
|
||||
#[pin]
|
||||
// this future keeps a mutable reference to `child`
|
||||
// this struct must thus not be moved!! (thus the `!Unpin`)
|
||||
child_wait_fut: Option<BoxFuture<'static, std::io::Result<ExitStatus>>>,
|
||||
}
|
||||
|
||||
impl WrappedChildInternal {
|
||||
fn do_lifetime_extension<'a>(
|
||||
mut pin: Pin<&mut Option<BoxFuture<'static, std::io::Result<ExitStatus>>>>,
|
||||
future: BoxFuture<'a, std::io::Result<ExitStatus>>,
|
||||
) {
|
||||
if pin.is_some() {
|
||||
return;
|
||||
}
|
||||
// SAFETY: this transmute just does a lifetime extension from Pin<Box<'a>> to Pin<Box<'static>>
|
||||
// the types must be the same except for the lifetime parameter
|
||||
pin.replace(unsafe { std::mem::transmute(future) });
|
||||
}
|
||||
|
||||
fn map_stdio_poll_result(
|
||||
result: std::io::Result<Option<String>>,
|
||||
) -> Result<String, StdioState> {
|
||||
match result {
|
||||
Ok(Some(line)) => Ok(line),
|
||||
Ok(None) => Err(StdioState::Closed),
|
||||
Err(e) => Err(StdioState::Errored(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum StdioState {
|
||||
Closed,
|
||||
Errored(std::io::Error),
|
||||
}
|
||||
|
||||
pub enum ChildOutput {
|
||||
Stdout(String),
|
||||
Stderr(String),
|
||||
Finished,
|
||||
}
|
||||
|
||||
impl Future for WrappedChildInternal {
|
||||
type Output = anyhow::Result<ChildOutput>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
|
||||
let mut this = self.as_mut().project();
|
||||
tracing::trace!("[child process {}] poll called", this.name);
|
||||
if this.exit_status.is_some() {
|
||||
// process has exited already, so just return asap
|
||||
return Poll::Ready(Ok(ChildOutput::Finished));
|
||||
}
|
||||
|
||||
let state = match (
|
||||
this.stdout.as_mut().as_pin_mut(),
|
||||
this.stderr.as_mut().as_pin_mut(),
|
||||
) {
|
||||
(Some(stdout), Some(stderr)) => match stdout
|
||||
.poll_next_line(cx)
|
||||
.map(Self::map_stdio_poll_result)
|
||||
{
|
||||
Poll::Pending => match stderr.poll_next_line(cx).map(Self::map_stdio_poll_result) {
|
||||
Poll::Pending => return Poll::Pending,
|
||||
Poll::Ready(Ok(line)) => {
|
||||
return Poll::Ready(Ok(ChildOutput::Stderr(line)));
|
||||
}
|
||||
Poll::Ready(Err(e)) => {
|
||||
tracing::debug!("[child process {}] stderr closed", this.name);
|
||||
drop(this.stderr.as_mut().take());
|
||||
e
|
||||
}
|
||||
},
|
||||
Poll::Ready(Ok(line)) => {
|
||||
return Poll::Ready(Ok(ChildOutput::Stdout(line)));
|
||||
}
|
||||
Poll::Ready(Err(e)) => {
|
||||
tracing::debug!("[child process {}] stdout closed", this.name);
|
||||
drop(this.stdout.as_mut().take());
|
||||
e
|
||||
}
|
||||
},
|
||||
(Some(stdout), None) => {
|
||||
match stdout.poll_next_line(cx).map(Self::map_stdio_poll_result) {
|
||||
Poll::Pending => return Poll::Pending,
|
||||
Poll::Ready(Ok(line)) => {
|
||||
return Poll::Ready(Ok(ChildOutput::Stdout(line)));
|
||||
}
|
||||
Poll::Ready(Err(e)) => {
|
||||
tracing::debug!("[child process {}] stdout closed", this.name);
|
||||
drop(this.stdout.as_mut().take());
|
||||
e
|
||||
}
|
||||
}
|
||||
}
|
||||
(None, Some(stderr)) => {
|
||||
match stderr.poll_next_line(cx).map(Self::map_stdio_poll_result) {
|
||||
Poll::Pending => return Poll::Pending,
|
||||
Poll::Ready(Ok(line)) => {
|
||||
return Poll::Ready(Ok(ChildOutput::Stderr(line)));
|
||||
}
|
||||
Poll::Ready(Err(e)) => {
|
||||
tracing::debug!("[child process {}] stderr closed", this.name);
|
||||
drop(this.stderr.as_mut().take());
|
||||
e
|
||||
}
|
||||
}
|
||||
}
|
||||
(None, None) => StdioState::Closed,
|
||||
};
|
||||
if let StdioState::Errored(e) = state {
|
||||
tracing::debug!("[child process {}] io error: {}", this.name, e);
|
||||
}
|
||||
// pending if at least some i/o still exists
|
||||
if this.stderr.is_some() || this.stdout.is_some() {
|
||||
// make sure we immediately poll the non-closed stream again, otherwise this future might not be awoken again
|
||||
return self.poll(cx);
|
||||
}
|
||||
|
||||
// now do the process kill/wait/exit stuff as both stdios are closed
|
||||
|
||||
if this.child_wait_fut.is_none() {
|
||||
// send kill signal to child
|
||||
if let Err(e) = this.child.start_kill() {
|
||||
return Poll::Ready(Err(e.into()));
|
||||
}
|
||||
// start waiting for the child to exit
|
||||
Self::do_lifetime_extension(this.child_wait_fut.as_mut(), this.child.wait().boxed());
|
||||
}
|
||||
|
||||
// we can unwrap here because the future has been set if it didn't exist already above
|
||||
let child_wait = this.child_wait_fut.as_mut().as_pin_mut().unwrap();
|
||||
match child_wait.poll(cx) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(Err(e)) => {
|
||||
this.child_wait_fut.as_mut().take();
|
||||
Poll::Ready(Err(e.into()))
|
||||
}
|
||||
Poll::Ready(Ok(status)) => {
|
||||
this.child_wait_fut.as_mut().take();
|
||||
this.exit_status.replace(status);
|
||||
tracing::info!("[child process {}] exited with {}", this.name, status);
|
||||
Poll::Ready(Ok(ChildOutput::Finished))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
275
src/util/deserializer.rs
Normal file
275
src/util/deserializer.rs
Normal file
|
|
@ -0,0 +1,275 @@
|
|||
use serde::{
|
||||
__private::de::{Content, ContentDeserializer},
|
||||
Deserialize,
|
||||
de::Visitor,
|
||||
};
|
||||
|
||||
use crate::types::{
|
||||
NixInternalLogLineActivity, NixInternalLogLineActivityType, NixInternalLogLineResult,
|
||||
};
|
||||
|
||||
struct TypeFieldsVisitor;
|
||||
impl<'de> Visitor<'de> for TypeFieldsVisitor {
|
||||
type Value = (u64, Content<'de>);
|
||||
|
||||
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
formatter.write_str("type + fields")
|
||||
}
|
||||
|
||||
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
|
||||
where
|
||||
A: serde::de::MapAccess<'de>,
|
||||
{
|
||||
let mut tag = None;
|
||||
let mut fields = None;
|
||||
|
||||
while let Some(key) = map.next_key::<String>()? {
|
||||
match key.as_str() {
|
||||
"type" => {
|
||||
if tag.is_some() {
|
||||
return Err(serde::de::Error::duplicate_field("type"));
|
||||
}
|
||||
tag.replace(map.next_value::<u64>()?);
|
||||
}
|
||||
"fields" => {
|
||||
if fields.is_some() {
|
||||
return Err(serde::de::Error::duplicate_field("fields"));
|
||||
}
|
||||
fields.replace(map.next_value::<Content>()?);
|
||||
}
|
||||
_ => return Err(serde::de::Error::unknown_field(&key, &[])),
|
||||
}
|
||||
}
|
||||
|
||||
let Some(tag) = tag else {
|
||||
return Err(serde::de::Error::missing_field("type"));
|
||||
};
|
||||
let fields = fields.unwrap_or(Content::None);
|
||||
Ok((tag, fields))
|
||||
}
|
||||
}
|
||||
|
||||
struct TypedTupleVisitor<T> {
|
||||
_marker: std::marker::PhantomData<T>,
|
||||
}
|
||||
impl<T> TypedTupleVisitor<T> {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
_marker: std::marker::PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de, T0> Visitor<'de> for TypedTupleVisitor<(T0,)>
|
||||
where
|
||||
T0: Deserialize<'de>,
|
||||
{
|
||||
type Value = (T0,);
|
||||
|
||||
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
formatter.write_str("tuple")
|
||||
}
|
||||
|
||||
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
|
||||
where
|
||||
A: serde::de::SeqAccess<'de>,
|
||||
{
|
||||
let Some(v0) = seq.next_element()? else {
|
||||
return Err(serde::de::Error::invalid_length(0, &"1"));
|
||||
};
|
||||
Ok((v0,))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de, T0, T1> Visitor<'de> for TypedTupleVisitor<(T0, T1)>
|
||||
where
|
||||
T0: Deserialize<'de>,
|
||||
T1: Deserialize<'de>,
|
||||
{
|
||||
type Value = (T0, T1);
|
||||
|
||||
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
formatter.write_str("tuple")
|
||||
}
|
||||
|
||||
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
|
||||
where
|
||||
A: serde::de::SeqAccess<'de>,
|
||||
{
|
||||
let Some(v0) = seq.next_element()? else {
|
||||
return Err(serde::de::Error::invalid_length(0, &"2"));
|
||||
};
|
||||
let Some(v1) = seq.next_element()? else {
|
||||
return Err(serde::de::Error::invalid_length(1, &"2"));
|
||||
};
|
||||
Ok((v0, v1))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de, T0, T1, T2, T3> Visitor<'de> for TypedTupleVisitor<(T0, T1, T2, T3)>
|
||||
where
|
||||
T0: Deserialize<'de>,
|
||||
T1: Deserialize<'de>,
|
||||
T2: Deserialize<'de>,
|
||||
T3: Deserialize<'de>,
|
||||
{
|
||||
type Value = (T0, T1, T2, T3);
|
||||
|
||||
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
formatter.write_str("tuple")
|
||||
}
|
||||
|
||||
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
|
||||
where
|
||||
A: serde::de::SeqAccess<'de>,
|
||||
{
|
||||
let Some(v0) = seq.next_element()? else {
|
||||
return Err(serde::de::Error::invalid_length(0, &"4"));
|
||||
};
|
||||
let Some(v1) = seq.next_element()? else {
|
||||
return Err(serde::de::Error::invalid_length(1, &"4"));
|
||||
};
|
||||
let Some(v2) = seq.next_element()? else {
|
||||
return Err(serde::de::Error::invalid_length(2, &"4"));
|
||||
};
|
||||
let Some(v3) = seq.next_element()? else {
|
||||
return Err(serde::de::Error::invalid_length(3, &"4"));
|
||||
};
|
||||
Ok((v0, v1, v2, v3))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for NixInternalLogLineResult {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
let (r#type, fields) = deserializer.deserialize_any(TypeFieldsVisitor)?;
|
||||
let de = ContentDeserializer::<D::Error>::new(fields);
|
||||
match r#type {
|
||||
100 => {
|
||||
let fields =
|
||||
serde::Deserializer::deserialize_seq(de, TypedTupleVisitor::<(_,)>::new())?;
|
||||
Ok(NixInternalLogLineResult::FileLinked {
|
||||
bytes_linked: fields.0,
|
||||
})
|
||||
}
|
||||
101 => {
|
||||
let fields =
|
||||
serde::Deserializer::deserialize_seq(de, TypedTupleVisitor::<(_,)>::new())?;
|
||||
Ok(NixInternalLogLineResult::BuildLogLine { log: fields.0 })
|
||||
}
|
||||
102 => Ok(NixInternalLogLineResult::UntrustedPath),
|
||||
103 => Ok(NixInternalLogLineResult::CorruptedPath),
|
||||
104 => {
|
||||
let fields =
|
||||
serde::Deserializer::deserialize_seq(de, TypedTupleVisitor::<(_,)>::new())?;
|
||||
Ok(NixInternalLogLineResult::BuildLogLine { log: fields.0 })
|
||||
}
|
||||
105 => {
|
||||
let fields = serde::Deserializer::deserialize_seq(
|
||||
de,
|
||||
TypedTupleVisitor::<(_, _, _, _)>::new(),
|
||||
)?;
|
||||
Ok(NixInternalLogLineResult::Progress {
|
||||
done: fields.0,
|
||||
expected: fields.1,
|
||||
running: fields.2,
|
||||
failed: fields.3,
|
||||
})
|
||||
}
|
||||
106 => {
|
||||
let fields =
|
||||
serde::Deserializer::deserialize_seq(de, TypedTupleVisitor::<(_, _)>::new())?;
|
||||
Ok(NixInternalLogLineResult::SetExpected {
|
||||
activity_type: NixInternalLogLineActivityType::from_repr(fields.0).ok_or(
|
||||
serde::de::Error::invalid_value(
|
||||
serde::de::Unexpected::Unsigned(fields.0),
|
||||
&"valid activity type",
|
||||
),
|
||||
)?,
|
||||
expected: fields.1,
|
||||
})
|
||||
}
|
||||
107 => {
|
||||
let fields =
|
||||
serde::Deserializer::deserialize_seq(de, TypedTupleVisitor::<(_,)>::new())?;
|
||||
Ok(NixInternalLogLineResult::PostBuildLogLine { log: fields.0 })
|
||||
}
|
||||
_ => Err(serde::de::Error::invalid_value(
|
||||
serde::de::Unexpected::Unsigned(r#type),
|
||||
&"valid result type tag",
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for NixInternalLogLineActivity {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
let (r#type, fields) = deserializer.deserialize_any(TypeFieldsVisitor)?;
|
||||
let de = ContentDeserializer::<D::Error>::new(fields);
|
||||
let r#type = NixInternalLogLineActivityType::from_repr(r#type).ok_or(
|
||||
serde::de::Error::invalid_value(
|
||||
serde::de::Unexpected::Unsigned(r#type),
|
||||
&"valid activity type",
|
||||
),
|
||||
)?;
|
||||
match r#type {
|
||||
NixInternalLogLineActivityType::Unknown => Ok(NixInternalLogLineActivity::Unknown),
|
||||
NixInternalLogLineActivityType::CopyPath => Ok(NixInternalLogLineActivity::CopyPath),
|
||||
NixInternalLogLineActivityType::FileTransfer => {
|
||||
Ok(NixInternalLogLineActivity::FileTransfer)
|
||||
}
|
||||
NixInternalLogLineActivityType::Realise => Ok(NixInternalLogLineActivity::Realise),
|
||||
NixInternalLogLineActivityType::CopyPaths => Ok(NixInternalLogLineActivity::CopyPaths),
|
||||
NixInternalLogLineActivityType::Builds => Ok(NixInternalLogLineActivity::Builds),
|
||||
NixInternalLogLineActivityType::Build => {
|
||||
let fields = serde::Deserializer::deserialize_seq(
|
||||
de,
|
||||
TypedTupleVisitor::<(_, _, _, _)>::new(),
|
||||
)?;
|
||||
Ok(NixInternalLogLineActivity::Build {
|
||||
drv_path: fields.0,
|
||||
machine: fields.1,
|
||||
cur_round: fields.2,
|
||||
nr_rounds: fields.3,
|
||||
})
|
||||
}
|
||||
NixInternalLogLineActivityType::OptimiseStore => {
|
||||
Ok(NixInternalLogLineActivity::OptimiseStore)
|
||||
}
|
||||
NixInternalLogLineActivityType::VerifyPaths => {
|
||||
Ok(NixInternalLogLineActivity::VerifyPaths)
|
||||
}
|
||||
NixInternalLogLineActivityType::Substitute => {
|
||||
let fields =
|
||||
serde::Deserializer::deserialize_seq(de, TypedTupleVisitor::<(_, _)>::new())?;
|
||||
Ok(NixInternalLogLineActivity::Substitute {
|
||||
store_path: fields.0,
|
||||
substituter: fields.1,
|
||||
})
|
||||
}
|
||||
NixInternalLogLineActivityType::QueryPathInfo => {
|
||||
let fields =
|
||||
serde::Deserializer::deserialize_seq(de, TypedTupleVisitor::<(_, _)>::new())?;
|
||||
Ok(NixInternalLogLineActivity::QueryPathInfo {
|
||||
store_path: fields.0,
|
||||
substituter: fields.1,
|
||||
})
|
||||
}
|
||||
NixInternalLogLineActivityType::PostBuildHook => {
|
||||
let fields =
|
||||
serde::Deserializer::deserialize_seq(de, TypedTupleVisitor::<(_,)>::new())?;
|
||||
Ok(NixInternalLogLineActivity::PostBuildHook {
|
||||
store_path: fields.0,
|
||||
})
|
||||
}
|
||||
NixInternalLogLineActivityType::BuildWaiting => {
|
||||
Ok(NixInternalLogLineActivity::BuildWaiting)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue