1use std::collections::HashMap;
2use std::time::Duration;
3
4use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
5use miette::{IntoDiagnostic, Result};
6use tokio::sync::{mpsc, watch};
7use tokio::task::JoinSet;
8use tracing::{info, warn};
9
10use crate::builders::{ArtifactPath, BuilderEntry, LogEvent};
11use crate::cli::{COLOURS, GREEN, RED, RESET, YELLOW};
12use crate::site::spinner_style;
13
14pub(crate) async fn run_builders(
19 builders: &[BuilderEntry],
20 version: &str,
21) -> Result<(Vec<ArtifactPath>, Vec<ArtifactPath>)> {
22 let id_to_idx: HashMap<&str, usize> = builders
23 .iter()
24 .enumerate()
25 .filter_map(|(i, e)| e.id.as_deref().map(|id| (id, i)))
26 .collect();
27
28 for (i, entry) in builders.iter().enumerate() {
29 for dep in &entry.depends_on {
30 if !id_to_idx.contains_key(dep.as_str()) {
31 return Err(miette::miette!(
32 "builder #{i} ({}) lists '{}' in depends_on, \
33 but no builder has that id",
34 entry.label(),
35 dep
36 ));
37 }
38 }
39 }
40
41 {
42 let n = builders.len();
43 let mut state = vec![0u8; n];
44
45 fn dfs(
46 idx: usize,
47 id_to_idx: &HashMap<&str, usize>,
48 builders: &[BuilderEntry],
49 state: &mut Vec<u8>,
50 ) -> Result<()> {
51 if state[idx] == 1 {
52 return Err(miette::miette!(
53 "dependency cycle detected involving builder #{idx} ({})",
54 builders[idx].id.as_deref().unwrap_or(builders[idx].label())
55 ));
56 }
57 if state[idx] == 2 {
58 return Ok(());
59 }
60 state[idx] = 1;
61 for dep in &builders[idx].depends_on {
62 if let Some(&dep_idx) = id_to_idx.get(dep.as_str()) {
63 dfs(dep_idx, id_to_idx, builders, state)?;
64 }
65 }
66 state[idx] = 2;
67 Ok(())
68 }
69
70 for i in 0..n {
71 dfs(i, &id_to_idx, builders, &mut state)?;
72 }
73 }
74
75 let mut completion_txs: HashMap<String, watch::Sender<Option<bool>>> = HashMap::new();
76 let mut completion_rxs: HashMap<String, watch::Receiver<Option<bool>>> = HashMap::new();
77
78 for entry in builders {
79 if let Some(id) = &entry.id {
80 let (tx, rx) = watch::channel(None::<bool>);
81 completion_txs.insert(id.clone(), tx);
82 completion_rxs.insert(id.clone(), rx);
83 }
84 }
85
86 let total = builders.len();
87
88 if crate::utils::is_interactive() {
89 run_with_bars(builders, version, completion_txs, completion_rxs, total).await
90 } else {
91 run_with_logs(builders, version, completion_txs, completion_rxs, total).await
92 }
93}
94
95enum BuildEvent {
99 DepWait(String),
100 Skip(String),
101 Run,
102 Success(usize),
103 Failure(String),
104}
105
106async fn run_builder_task(
109 entry: BuilderEntry,
110 version: String,
111 dep_receivers: Vec<(String, watch::Receiver<Option<bool>>)>,
112 my_tx: Option<watch::Sender<Option<bool>>>,
113 log_tx: mpsc::UnboundedSender<LogEvent>,
114 on_event: impl Fn(BuildEvent) + Send + 'static,
115) -> Result<Vec<ArtifactPath>> {
116 for (dep_id, mut rx) in dep_receivers {
117 on_event(BuildEvent::DepWait(dep_id.clone()));
118
119 let resolved = rx.wait_for(|v| v.is_some()).await;
120
121 let succeeded = match resolved {
122 Err(_) => false,
123 Ok(r) => r.unwrap_or(false),
124 };
125
126 if !succeeded {
127 on_event(BuildEvent::Skip(dep_id.clone()));
128 if let Some(tx) = &my_tx {
129 let _ = tx.send(Some(false));
130 }
131 return Ok(vec![]);
132 }
133 }
134
135 on_event(BuildEvent::Run);
136 let result = entry.build(&version, log_tx).await;
137 let succeeded = result.is_ok();
138
139 if let Some(tx) = my_tx {
140 let _ = tx.send(Some(succeeded));
141 }
142
143 match &result {
144 Ok(artifacts) => on_event(BuildEvent::Success(artifacts.len())),
145 Err(e) => on_event(BuildEvent::Failure(e.to_string())),
146 }
147 result
148}
149
150async fn collect_results(
152 mut join_set: JoinSet<Result<Vec<ArtifactPath>>>,
153 on_done: impl FnOnce(bool),
154) -> Result<(Vec<ArtifactPath>, Vec<ArtifactPath>)> {
155 let mut errors: Vec<miette::Report> = Vec::new();
156 let mut dist_artifacts = Vec::new();
157 let mut doc_artifacts = Vec::new();
158
159 while let Some(res) = join_set.join_next().await {
160 match res.into_diagnostic()? {
161 Ok(artifacts) => {
162 for artifact in artifacts {
163 if artifact.path.is_dir() {
164 doc_artifacts.push(artifact);
165 } else {
166 dist_artifacts.push(artifact);
167 }
168 }
169 }
170 Err(e) => errors.push(e),
171 }
172 }
173
174 on_done(errors.is_empty());
175
176 if let Some(first_err) = errors.into_iter().next() {
177 return Err(first_err);
178 }
179
180 Ok((dist_artifacts, doc_artifacts))
181}
182
183async fn run_with_bars(
186 builders: &[BuilderEntry],
187 version: &str,
188 mut completion_txs: HashMap<String, watch::Sender<Option<bool>>>,
189 completion_rxs: HashMap<String, watch::Receiver<Option<bool>>>,
190 total: usize,
191) -> Result<(Vec<ArtifactPath>, Vec<ArtifactPath>)> {
192 let multi = MultiProgress::new();
193
194 let summary = multi.add(ProgressBar::new(total as u64));
195 summary.set_style(
196 ProgressStyle::with_template("{pos}/{len} builders {bar:20.green/white} {msg}")
197 .expect("valid template"),
198 );
199 summary.set_message("building…");
200
201 let mut join_set: JoinSet<Result<Vec<ArtifactPath>>> = JoinSet::new();
202
203 for (i, entry) in builders.iter().enumerate() {
204 let color = COLOURS[i % COLOURS.len()];
205 let label = entry.id.as_deref().unwrap_or(entry.label());
206 let colored_prefix = format!("{color}[{label}]{RESET}");
207
208 let pb = multi.insert_before(&summary, ProgressBar::new_spinner());
209 pb.set_style(spinner_style(false));
210 pb.set_prefix(colored_prefix);
211 pb.set_message("starting…");
212 pb.enable_steady_tick(Duration::from_millis(100));
213
214 let (log_tx, mut log_rx) = mpsc::unbounded_channel::<LogEvent>();
215
216 let pb_log = pb.clone();
217 let multi_log = multi.clone();
218 let parent_color_idx = i;
219 tokio::spawn(async move {
220 let mut child_pbs: HashMap<String, ProgressBar> = HashMap::new();
221 let mut last_child_pb = pb_log.clone();
222 let mut child_color_idx = parent_color_idx + 1;
223
224 while let Some(event) = log_rx.recv().await {
225 match event {
226 LogEvent::Line(line) => {
227 pb_log.set_message(line);
228 }
229 LogEvent::ChildStart { id, label } => {
230 let child_color = COLOURS[child_color_idx % COLOURS.len()];
231 child_color_idx += 1;
232 let child_pb =
233 multi_log.insert_after(&last_child_pb, ProgressBar::new_spinner());
234 child_pb.set_style(spinner_style(true));
235 child_pb.set_prefix(format!("{child_color}[{label}]{RESET}"));
236 child_pb.set_message("starting…");
237 child_pb.enable_steady_tick(Duration::from_millis(100));
238 last_child_pb = child_pb.clone();
239 child_pbs.insert(id, child_pb);
240 }
241 LogEvent::ChildLine { id, line } => {
242 if let Some(child_pb) = child_pbs.get(&id) {
243 child_pb.set_message(line);
244 }
245 }
246 LogEvent::ChildFinish {
247 id,
248 success,
249 summary,
250 } => {
251 if let Some(child_pb) = child_pbs.remove(&id) {
252 if success {
253 child_pb.finish_with_message(format!(
254 "{GREEN}\u{2713}{RESET} {summary}"
255 ));
256 } else {
257 child_pb
258 .finish_with_message(format!("{RED}\u{2717}{RESET} {summary}"));
259 }
260 }
261 }
262 }
263 }
264 });
265
266 let dep_receivers: Vec<(String, watch::Receiver<Option<bool>>)> = entry
267 .depends_on
268 .iter()
269 .filter_map(|dep_id| {
270 completion_rxs
271 .get(dep_id)
272 .map(|rx| (dep_id.clone(), rx.clone()))
273 })
274 .collect();
275
276 let my_tx: Option<watch::Sender<Option<bool>>> =
277 entry.id.as_ref().and_then(|id| completion_txs.remove(id));
278
279 let entry = entry.clone();
280 let version = version.to_owned();
281 let pb_task = pb.clone();
282 let summary_task = summary.clone();
283
284 join_set.spawn(async move {
285 run_builder_task(
286 entry,
287 version,
288 dep_receivers,
289 my_tx,
290 log_tx,
291 move |event| match event {
292 BuildEvent::DepWait(dep_id) => {
293 pb_task.set_message(format!("waiting for '{dep_id}'…"));
294 }
295 BuildEvent::Skip(dep_id) => {
296 summary_task.inc(1);
297 pb_task.finish_with_message(format!(
298 "{YELLOW}\u{29B8} skipped{RESET} (dependency '{dep_id}' failed)"
299 ));
300 }
301 BuildEvent::Run => {
302 pb_task.set_message("running…");
303 }
304 BuildEvent::Success(count) => {
305 summary_task.inc(1);
306 pb_task.finish_with_message(format!(
307 "{GREEN}\u{2713} done{RESET} ({count} artifact(s))"
308 ));
309 }
310 BuildEvent::Failure(error) => {
311 summary_task.inc(1);
312 pb_task
313 .finish_with_message(format!("{RED}\u{2717} failed:{RESET} {error}"));
314 }
315 },
316 )
317 .await
318 });
319 }
320
321 let summary_bar = summary.clone();
322 collect_results(join_set, move |ok| {
323 let msg = if ok {
324 format!("{GREEN}\u{2713} all done{RESET}")
325 } else {
326 format!("{RED}\u{2717} some builders failed{RESET}")
327 };
328 summary_bar.finish_with_message(msg);
329 })
330 .await
331}
332
333async fn run_with_logs(
336 builders: &[BuilderEntry],
337 version: &str,
338 mut completion_txs: HashMap<String, watch::Sender<Option<bool>>>,
339 completion_rxs: HashMap<String, watch::Receiver<Option<bool>>>,
340 total: usize,
341) -> Result<(Vec<ArtifactPath>, Vec<ArtifactPath>)> {
342 info!("Running {total} builder(s) …");
343
344 let mut join_set: JoinSet<Result<Vec<ArtifactPath>>> = JoinSet::new();
345
346 for entry in builders.iter() {
347 let label = entry.id.as_deref().unwrap_or(entry.label()).to_owned();
348
349 let (log_tx, mut log_rx) = mpsc::unbounded_channel::<LogEvent>();
350
351 let log_label = label.clone();
352 tokio::spawn(async move {
353 while let Some(event) = log_rx.recv().await {
354 match event {
355 LogEvent::Line(line) => info!("[{log_label}] {line}"),
356 LogEvent::ChildStart { label: child, .. } => {
357 info!("[{log_label}] [{child}] starting …");
358 }
359 LogEvent::ChildLine { id, line } => {
360 info!("[{log_label}] [{id}] {line}");
361 }
362 LogEvent::ChildFinish {
363 id,
364 success,
365 summary,
366 } => {
367 if success {
368 info!("[{log_label}] [{id}] {summary}");
369 } else {
370 warn!("[{log_label}] [{id}] failed: {summary}");
371 }
372 }
373 }
374 }
375 });
376
377 let dep_receivers: Vec<(String, watch::Receiver<Option<bool>>)> = entry
378 .depends_on
379 .iter()
380 .filter_map(|dep_id| {
381 completion_rxs
382 .get(dep_id)
383 .map(|rx| (dep_id.clone(), rx.clone()))
384 })
385 .collect();
386
387 let my_tx: Option<watch::Sender<Option<bool>>> =
388 entry.id.as_ref().and_then(|id| completion_txs.remove(id));
389
390 let entry = entry.clone();
391 let version = version.to_owned();
392 let task_label = label.clone();
393
394 join_set.spawn(async move {
395 run_builder_task(
396 entry,
397 version,
398 dep_receivers,
399 my_tx,
400 log_tx,
401 move |event| match event {
402 BuildEvent::DepWait(dep_id) => {
403 info!("[{task_label}] waiting for '{dep_id}' …");
404 }
405 BuildEvent::Skip(dep_id) => {
406 info!("[{task_label}] skipped (dependency '{dep_id}' failed)");
407 }
408 BuildEvent::Run => {
409 info!("[{task_label}] running …");
410 }
411 BuildEvent::Success(count) => {
412 info!("[{task_label}] done ({count} artifact(s))");
413 }
414 BuildEvent::Failure(error) => {
415 warn!("[{task_label}] failed: {error}");
416 }
417 },
418 )
419 .await
420 });
421 }
422
423 collect_results(join_set, |ok| {
424 if ok {
425 info!("All builders completed successfully.");
426 } else {
427 warn!("Some builders failed.");
428 }
429 })
430 .await
431}