Skip to main content

abbaye/builders/
orchestrator.rs

1use std::collections::HashMap;
2use std::time::Duration;
3
4use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
5use miette::{IntoDiagnostic, Result};
6use tokio::sync::{mpsc, watch};
7use tokio::task::JoinSet;
8use tracing::{info, warn};
9
10use crate::builders::{ArtifactPath, BuilderEntry, LogEvent};
11use crate::cli::{COLOURS, GREEN, RED, RESET, YELLOW};
12use crate::site::spinner_style;
13
14/// Run all builders in parallel, respecting `depends_on` ordering.
15///
16/// Returns `(dist_artifacts, doc_artifacts)` – file-type artifacts go into
17/// the dist/ directory while directory-type artifacts go into docs/.
18pub(crate) async fn run_builders(
19    builders: &[BuilderEntry],
20    version: &str,
21) -> Result<(Vec<ArtifactPath>, Vec<ArtifactPath>)> {
22    let id_to_idx: HashMap<&str, usize> = builders
23        .iter()
24        .enumerate()
25        .filter_map(|(i, e)| e.id.as_deref().map(|id| (id, i)))
26        .collect();
27
28    for (i, entry) in builders.iter().enumerate() {
29        for dep in &entry.depends_on {
30            if !id_to_idx.contains_key(dep.as_str()) {
31                return Err(miette::miette!(
32                    "builder #{i} ({}) lists '{}' in depends_on, \
33                     but no builder has that id",
34                    entry.label(),
35                    dep
36                ));
37            }
38        }
39    }
40
41    {
42        let n = builders.len();
43        let mut state = vec![0u8; n];
44
45        fn dfs(
46            idx: usize,
47            id_to_idx: &HashMap<&str, usize>,
48            builders: &[BuilderEntry],
49            state: &mut Vec<u8>,
50        ) -> Result<()> {
51            if state[idx] == 1 {
52                return Err(miette::miette!(
53                    "dependency cycle detected involving builder #{idx} ({})",
54                    builders[idx].id.as_deref().unwrap_or(builders[idx].label())
55                ));
56            }
57            if state[idx] == 2 {
58                return Ok(());
59            }
60            state[idx] = 1;
61            for dep in &builders[idx].depends_on {
62                if let Some(&dep_idx) = id_to_idx.get(dep.as_str()) {
63                    dfs(dep_idx, id_to_idx, builders, state)?;
64                }
65            }
66            state[idx] = 2;
67            Ok(())
68        }
69
70        for i in 0..n {
71            dfs(i, &id_to_idx, builders, &mut state)?;
72        }
73    }
74
75    let mut completion_txs: HashMap<String, watch::Sender<Option<bool>>> = HashMap::new();
76    let mut completion_rxs: HashMap<String, watch::Receiver<Option<bool>>> = HashMap::new();
77
78    for entry in builders {
79        if let Some(id) = &entry.id {
80            let (tx, rx) = watch::channel(None::<bool>);
81            completion_txs.insert(id.clone(), tx);
82            completion_rxs.insert(id.clone(), rx);
83        }
84    }
85
86    let total = builders.len();
87
88    if crate::utils::is_interactive() {
89        run_with_bars(builders, version, completion_txs, completion_rxs, total).await
90    } else {
91        run_with_logs(builders, version, completion_txs, completion_rxs, total).await
92    }
93}
94
95// ── Shared builder-task body ────────────────────────────────────────────────
96
97/// Events reported by the shared builder loop to the UI layer.
98enum BuildEvent {
99    DepWait(String),
100    Skip(String),
101    Run,
102    Success(usize),
103    Failure(String),
104}
105
106/// Run a single builder inside a `JoinSet` task, handling dependency waiting,
107/// completion signalling, and reporting events back to the UI layer.
108async fn run_builder_task(
109    entry: BuilderEntry,
110    version: String,
111    dep_receivers: Vec<(String, watch::Receiver<Option<bool>>)>,
112    my_tx: Option<watch::Sender<Option<bool>>>,
113    log_tx: mpsc::UnboundedSender<LogEvent>,
114    on_event: impl Fn(BuildEvent) + Send + 'static,
115) -> Result<Vec<ArtifactPath>> {
116    for (dep_id, mut rx) in dep_receivers {
117        on_event(BuildEvent::DepWait(dep_id.clone()));
118
119        let resolved = rx.wait_for(|v| v.is_some()).await;
120
121        let succeeded = match resolved {
122            Err(_) => false,
123            Ok(r) => r.unwrap_or(false),
124        };
125
126        if !succeeded {
127            on_event(BuildEvent::Skip(dep_id.clone()));
128            if let Some(tx) = &my_tx {
129                let _ = tx.send(Some(false));
130            }
131            return Ok(vec![]);
132        }
133    }
134
135    on_event(BuildEvent::Run);
136    let result = entry.build(&version, log_tx).await;
137    let succeeded = result.is_ok();
138
139    if let Some(tx) = my_tx {
140        let _ = tx.send(Some(succeeded));
141    }
142
143    match &result {
144        Ok(artifacts) => on_event(BuildEvent::Success(artifacts.len())),
145        Err(e) => on_event(BuildEvent::Failure(e.to_string())),
146    }
147    result
148}
149
150/// Collect finished `JoinSet` tasks and split results into dist / doc artifacts.
151async fn collect_results(
152    mut join_set: JoinSet<Result<Vec<ArtifactPath>>>,
153    on_done: impl FnOnce(bool),
154) -> Result<(Vec<ArtifactPath>, Vec<ArtifactPath>)> {
155    let mut errors: Vec<miette::Report> = Vec::new();
156    let mut dist_artifacts = Vec::new();
157    let mut doc_artifacts = Vec::new();
158
159    while let Some(res) = join_set.join_next().await {
160        match res.into_diagnostic()? {
161            Ok(artifacts) => {
162                for artifact in artifacts {
163                    if artifact.path.is_dir() {
164                        doc_artifacts.push(artifact);
165                    } else {
166                        dist_artifacts.push(artifact);
167                    }
168                }
169            }
170            Err(e) => errors.push(e),
171        }
172    }
173
174    on_done(errors.is_empty());
175
176    if let Some(first_err) = errors.into_iter().next() {
177        return Err(first_err);
178    }
179
180    Ok((dist_artifacts, doc_artifacts))
181}
182
183// ── Interactive (progress bar) UI ───────────────────────────────────────────
184
185async fn run_with_bars(
186    builders: &[BuilderEntry],
187    version: &str,
188    mut completion_txs: HashMap<String, watch::Sender<Option<bool>>>,
189    completion_rxs: HashMap<String, watch::Receiver<Option<bool>>>,
190    total: usize,
191) -> Result<(Vec<ArtifactPath>, Vec<ArtifactPath>)> {
192    let multi = MultiProgress::new();
193
194    let summary = multi.add(ProgressBar::new(total as u64));
195    summary.set_style(
196        ProgressStyle::with_template("{pos}/{len} builders  {bar:20.green/white}  {msg}")
197            .expect("valid template"),
198    );
199    summary.set_message("building…");
200
201    let mut join_set: JoinSet<Result<Vec<ArtifactPath>>> = JoinSet::new();
202
203    for (i, entry) in builders.iter().enumerate() {
204        let color = COLOURS[i % COLOURS.len()];
205        let label = entry.id.as_deref().unwrap_or(entry.label());
206        let colored_prefix = format!("{color}[{label}]{RESET}");
207
208        let pb = multi.insert_before(&summary, ProgressBar::new_spinner());
209        pb.set_style(spinner_style(false));
210        pb.set_prefix(colored_prefix);
211        pb.set_message("starting…");
212        pb.enable_steady_tick(Duration::from_millis(100));
213
214        let (log_tx, mut log_rx) = mpsc::unbounded_channel::<LogEvent>();
215
216        let pb_log = pb.clone();
217        let multi_log = multi.clone();
218        let parent_color_idx = i;
219        tokio::spawn(async move {
220            let mut child_pbs: HashMap<String, ProgressBar> = HashMap::new();
221            let mut last_child_pb = pb_log.clone();
222            let mut child_color_idx = parent_color_idx + 1;
223
224            while let Some(event) = log_rx.recv().await {
225                match event {
226                    LogEvent::Line(line) => {
227                        pb_log.set_message(line);
228                    }
229                    LogEvent::ChildStart { id, label } => {
230                        let child_color = COLOURS[child_color_idx % COLOURS.len()];
231                        child_color_idx += 1;
232                        let child_pb =
233                            multi_log.insert_after(&last_child_pb, ProgressBar::new_spinner());
234                        child_pb.set_style(spinner_style(true));
235                        child_pb.set_prefix(format!("{child_color}[{label}]{RESET}"));
236                        child_pb.set_message("starting…");
237                        child_pb.enable_steady_tick(Duration::from_millis(100));
238                        last_child_pb = child_pb.clone();
239                        child_pbs.insert(id, child_pb);
240                    }
241                    LogEvent::ChildLine { id, line } => {
242                        if let Some(child_pb) = child_pbs.get(&id) {
243                            child_pb.set_message(line);
244                        }
245                    }
246                    LogEvent::ChildFinish {
247                        id,
248                        success,
249                        summary,
250                    } => {
251                        if let Some(child_pb) = child_pbs.remove(&id) {
252                            if success {
253                                child_pb.finish_with_message(format!(
254                                    "{GREEN}\u{2713}{RESET} {summary}"
255                                ));
256                            } else {
257                                child_pb
258                                    .finish_with_message(format!("{RED}\u{2717}{RESET} {summary}"));
259                            }
260                        }
261                    }
262                }
263            }
264        });
265
266        let dep_receivers: Vec<(String, watch::Receiver<Option<bool>>)> = entry
267            .depends_on
268            .iter()
269            .filter_map(|dep_id| {
270                completion_rxs
271                    .get(dep_id)
272                    .map(|rx| (dep_id.clone(), rx.clone()))
273            })
274            .collect();
275
276        let my_tx: Option<watch::Sender<Option<bool>>> =
277            entry.id.as_ref().and_then(|id| completion_txs.remove(id));
278
279        let entry = entry.clone();
280        let version = version.to_owned();
281        let pb_task = pb.clone();
282        let summary_task = summary.clone();
283
284        join_set.spawn(async move {
285            run_builder_task(
286                entry,
287                version,
288                dep_receivers,
289                my_tx,
290                log_tx,
291                move |event| match event {
292                    BuildEvent::DepWait(dep_id) => {
293                        pb_task.set_message(format!("waiting for '{dep_id}'…"));
294                    }
295                    BuildEvent::Skip(dep_id) => {
296                        summary_task.inc(1);
297                        pb_task.finish_with_message(format!(
298                            "{YELLOW}\u{29B8} skipped{RESET} (dependency '{dep_id}' failed)"
299                        ));
300                    }
301                    BuildEvent::Run => {
302                        pb_task.set_message("running…");
303                    }
304                    BuildEvent::Success(count) => {
305                        summary_task.inc(1);
306                        pb_task.finish_with_message(format!(
307                            "{GREEN}\u{2713} done{RESET} ({count} artifact(s))"
308                        ));
309                    }
310                    BuildEvent::Failure(error) => {
311                        summary_task.inc(1);
312                        pb_task
313                            .finish_with_message(format!("{RED}\u{2717} failed:{RESET} {error}"));
314                    }
315                },
316            )
317            .await
318        });
319    }
320
321    let summary_bar = summary.clone();
322    collect_results(join_set, move |ok| {
323        let msg = if ok {
324            format!("{GREEN}\u{2713} all done{RESET}")
325        } else {
326            format!("{RED}\u{2717} some builders failed{RESET}")
327        };
328        summary_bar.finish_with_message(msg);
329    })
330    .await
331}
332
333// ── Non-interactive (tracing) UI ────────────────────────────────────────────
334
335async fn run_with_logs(
336    builders: &[BuilderEntry],
337    version: &str,
338    mut completion_txs: HashMap<String, watch::Sender<Option<bool>>>,
339    completion_rxs: HashMap<String, watch::Receiver<Option<bool>>>,
340    total: usize,
341) -> Result<(Vec<ArtifactPath>, Vec<ArtifactPath>)> {
342    info!("Running {total} builder(s) …");
343
344    let mut join_set: JoinSet<Result<Vec<ArtifactPath>>> = JoinSet::new();
345
346    for entry in builders.iter() {
347        let label = entry.id.as_deref().unwrap_or(entry.label()).to_owned();
348
349        let (log_tx, mut log_rx) = mpsc::unbounded_channel::<LogEvent>();
350
351        let log_label = label.clone();
352        tokio::spawn(async move {
353            while let Some(event) = log_rx.recv().await {
354                match event {
355                    LogEvent::Line(line) => info!("[{log_label}] {line}"),
356                    LogEvent::ChildStart { label: child, .. } => {
357                        info!("[{log_label}] [{child}] starting …");
358                    }
359                    LogEvent::ChildLine { id, line } => {
360                        info!("[{log_label}] [{id}] {line}");
361                    }
362                    LogEvent::ChildFinish {
363                        id,
364                        success,
365                        summary,
366                    } => {
367                        if success {
368                            info!("[{log_label}] [{id}] {summary}");
369                        } else {
370                            warn!("[{log_label}] [{id}] failed: {summary}");
371                        }
372                    }
373                }
374            }
375        });
376
377        let dep_receivers: Vec<(String, watch::Receiver<Option<bool>>)> = entry
378            .depends_on
379            .iter()
380            .filter_map(|dep_id| {
381                completion_rxs
382                    .get(dep_id)
383                    .map(|rx| (dep_id.clone(), rx.clone()))
384            })
385            .collect();
386
387        let my_tx: Option<watch::Sender<Option<bool>>> =
388            entry.id.as_ref().and_then(|id| completion_txs.remove(id));
389
390        let entry = entry.clone();
391        let version = version.to_owned();
392        let task_label = label.clone();
393
394        join_set.spawn(async move {
395            run_builder_task(
396                entry,
397                version,
398                dep_receivers,
399                my_tx,
400                log_tx,
401                move |event| match event {
402                    BuildEvent::DepWait(dep_id) => {
403                        info!("[{task_label}] waiting for '{dep_id}' …");
404                    }
405                    BuildEvent::Skip(dep_id) => {
406                        info!("[{task_label}] skipped (dependency '{dep_id}' failed)");
407                    }
408                    BuildEvent::Run => {
409                        info!("[{task_label}] running …");
410                    }
411                    BuildEvent::Success(count) => {
412                        info!("[{task_label}] done ({count} artifact(s))");
413                    }
414                    BuildEvent::Failure(error) => {
415                        warn!("[{task_label}] failed: {error}");
416                    }
417                },
418            )
419            .await
420        });
421    }
422
423    collect_results(join_set, |ok| {
424        if ok {
425            info!("All builders completed successfully.");
426        } else {
427            warn!("Some builders failed.");
428        }
429    })
430    .await
431}