1use std::{
2 collections::{VecDeque, hash_map::Entry},
3 hash::BuildHasherDefault,
4 mem::take,
5};
6
7use anyhow::{Context, Result, bail};
8use bincode::{Decode, Encode};
9use either::Either;
10use petgraph::graph::{DiGraph, EdgeIndex, NodeIndex};
11use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
12use serde::{Deserialize, Serialize};
13use tracing::Instrument;
14use turbo_prehash::BuildHasherExt;
15use turbo_tasks::{
16 FxIndexMap, FxIndexSet, NonLocalValue, ResolvedVc, TaskInput, TryJoinIterExt, ValueToString,
17 Vc, trace::TraceRawVcs,
18};
19
20use crate::{
21 chunk::{ChunkableModule, ChunkingType},
22 module::Module,
23 module_graph::{
24 GraphTraversalAction, ModuleGraph, ModuleGraphRef,
25 chunk_group_info::{ChunkGroupInfo, ChunkGroupKey, RoaringBitmapWrapper},
26 module_batch::{ModuleBatch, ModuleBatchGroup, ModuleOrBatch},
27 traced_di_graph::{TracedDiGraph, iter_neighbors_rev},
28 },
29};
30#[turbo_tasks::value]
31#[derive(Debug, Clone, Default, TaskInput, Hash)]
32pub struct BatchingConfig {
33 pub use_heuristic: bool,
36}
37
38#[turbo_tasks::value_impl]
39impl BatchingConfig {
40 #[turbo_tasks::function]
41 pub fn new(config: BatchingConfig) -> Vc<Self> {
42 config.cell()
43 }
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize, TraceRawVcs, NonLocalValue)]
47pub struct ModuleBatchesGraphEdge {
48 pub ty: ChunkingType,
49 pub module: Option<ResolvedVc<Box<dyn Module>>>,
50}
51
52#[derive(Debug, Clone, TraceRawVcs, NonLocalValue, Encode, Decode)]
53struct EntriesList(
54 #[bincode(with = "turbo_bincode::indexset")] pub FxIndexSet<ResolvedVc<Box<dyn Module>>>,
55);
56
57#[turbo_tasks::value(cell = "new", eq = "manual")]
58pub struct ModuleBatchesGraph {
59 graph: TracedDiGraph<ModuleOrBatch, ModuleBatchesGraphEdge>,
60
61 #[turbo_tasks(trace_ignore)]
68 #[bincode(with_serde)]
69 entries: FxHashMap<ResolvedVc<Box<dyn Module>>, NodeIndex>,
70 batch_groups: FxHashMap<ModuleOrBatch, ResolvedVc<ModuleBatchGroup>>,
71
72 ordered_entries: Vec<Option<EntriesList>>,
77}
78
79impl ModuleBatchesGraph {
80 pub async fn get_entry_index(&self, entry: ResolvedVc<Box<dyn Module>>) -> Result<NodeIndex> {
81 let Some(entry) = self.entries.get(&entry) else {
82 bail!(
83 "Entry {} is not in graph (possible entries: {:#?})",
84 entry.ident().to_string().await?,
85 self.entries
86 .keys()
87 .map(|e| e.ident().to_string())
88 .try_join()
89 .await?
90 );
91 };
92 Ok(*entry)
93 }
94
95 pub fn get_ordered_entries<'l>(
96 &'l self,
97 chunk_group_info: &'l ChunkGroupInfo,
98 idx: usize,
99 ) -> impl Iterator<Item = ResolvedVc<Box<dyn Module>>> + 'l {
100 if let Some(EntriesList(ordered_entries)) = self
101 .ordered_entries
102 .get(idx)
103 .as_ref()
104 .and_then(|o| o.as_ref())
105 {
106 if let Some(chunk_group) = chunk_group_info.chunk_groups.get_index(idx) {
107 debug_assert_eq!(ordered_entries.len(), chunk_group.entries_count());
108 }
109 Either::Left(Either::Left(ordered_entries.iter().copied()))
110 } else if let Some(chunk_group) = chunk_group_info.chunk_groups.get_index(idx) {
111 Either::Right(chunk_group.entries())
112 } else {
113 Either::Left(Either::Right(std::iter::empty()))
114 }
115 }
116
117 pub fn get_batch_group(
118 &self,
119 module_or_batch: &ModuleOrBatch,
120 ) -> Option<ResolvedVc<ModuleBatchGroup>> {
121 self.batch_groups.get(module_or_batch).copied()
122 }
123
124 pub async fn get_entry(&self, entry: ResolvedVc<Box<dyn Module>>) -> Result<ModuleOrBatch> {
125 let entry = self.get_entry_index(entry).await?;
126 Ok(*self.graph.node_weight(entry).unwrap())
127 }
128
129 #[allow(clippy::implied_bounds_in_impls)]
131 pub fn traverse_edges_from_entries_dfs<'a, S>(
149 &'a self,
150 entries: impl IntoIterator<
151 Item = NodeIndex,
152 IntoIter = impl Iterator<Item = NodeIndex> + DoubleEndedIterator,
153 >,
154 state: &mut S,
155 mut visit_preorder: impl FnMut(
156 Option<(&'a ModuleOrBatch, &'a ModuleBatchesGraphEdge)>,
157 &'a ModuleOrBatch,
158 &mut S,
159 ) -> Result<GraphTraversalAction>,
160 mut visit_postorder: impl FnMut(
161 Option<(&'a ModuleOrBatch, &'a ModuleBatchesGraphEdge)>,
162 &'a ModuleOrBatch,
163 &mut S,
164 ),
165 ) -> Result<()> {
166 let graph = &self.graph;
167
168 enum ReverseDFSPass {
169 Visit,
170 ExpandAndVisit,
171 }
172
173 let entries = entries.into_iter();
174 #[allow(clippy::type_complexity)] let mut stack: Vec<(ReverseDFSPass, Option<(NodeIndex, EdgeIndex)>, NodeIndex)> = entries
176 .rev()
177 .map(|e| (ReverseDFSPass::ExpandAndVisit, None, e))
178 .collect();
179 let mut expanded = FxHashSet::default();
180 while let Some((pass, parent, current)) = stack.pop() {
181 let parent_arg = parent.map(|(node, edge)| {
182 (
183 graph.node_weight(node).unwrap(),
184 graph.edge_weight(edge).unwrap(),
185 )
186 });
187 match pass {
188 ReverseDFSPass::Visit => {
189 let current_node = graph.node_weight(current).unwrap();
190 visit_postorder(parent_arg, current_node, state);
191 }
192 ReverseDFSPass::ExpandAndVisit => {
193 let current_node = graph.node_weight(current).unwrap();
194 let action = visit_preorder(parent_arg, current_node, state)?;
195 if action == GraphTraversalAction::Exclude {
196 continue;
197 }
198 stack.push((ReverseDFSPass::Visit, parent, current));
199 if action == GraphTraversalAction::Continue && expanded.insert(current) {
200 stack.extend(iter_neighbors_rev(graph, current).map(|(edge, child)| {
201 (ReverseDFSPass::ExpandAndVisit, Some((current, edge)), child)
202 }));
203 }
204 }
205 }
206 }
207
208 Ok(())
209 }
210}
211
212type PreBatchIndex = usize;
213
214#[derive(Hash, PartialEq, Eq, Clone, Debug)]
215enum PreBatchItem {
216 ParallelModule(ResolvedVc<Box<dyn Module>>),
217 ParallelReference(PreBatchIndex),
218 NonParallelEdge(ChunkingType, ResolvedVc<Box<dyn Module>>),
219}
220
221struct PreBatch {
222 items: FxIndexSet<PreBatchItem>,
223 chunk_groups: RoaringBitmapWrapper,
224}
225
226impl PreBatch {
227 fn new(chunk_groups: RoaringBitmapWrapper) -> Self {
228 Self {
229 items: FxIndexSet::default(),
230 chunk_groups,
231 }
232 }
233}
234
235struct TraversalState<'l> {
236 items: Vec<PreBatchItem>,
237 this: &'l mut PreBatches,
238}
239
240struct PreBatches {
241 boundary_modules: FxHashSet<ResolvedVc<Box<dyn Module>>>,
242 batches: Vec<PreBatch>,
243 entries: FxHashMap<ResolvedVc<Box<dyn Module>>, PreBatchIndex>,
244 single_module_entries: FxIndexSet<ResolvedVc<Box<dyn Module>>>,
245}
246
247impl PreBatches {
248 fn new() -> Self {
249 Self {
250 boundary_modules: FxHashSet::default(),
251 batches: Vec::new(),
252 entries: FxHashMap::default(),
253 single_module_entries: FxIndexSet::default(),
254 }
255 }
256
257 fn ensure_pre_batch_for_module(
258 &mut self,
259 module: ResolvedVc<Box<dyn Module>>,
260 chunk_group_info: &ChunkGroupInfo,
261 queue: &mut VecDeque<(ResolvedVc<Box<dyn Module>>, PreBatchIndex)>,
262 ) -> Result<PreBatchIndex> {
263 Ok(match self.entries.entry(module) {
264 Entry::Vacant(e) => {
265 let index = self.batches.len();
266 queue.push_back((module, index));
267 let chunk_groups = chunk_group_info
268 .module_chunk_groups
269 .get(&module)
270 .context("all modules need to have chunk group info")?;
271 let batch = PreBatch::new(chunk_groups.clone());
272 self.batches.push(batch);
273 e.insert(index);
274 index
275 }
276 Entry::Occupied(e) => *e.get(),
277 })
278 }
279
280 async fn get_pre_batch_items(
281 &mut self,
282 entry: ResolvedVc<Box<dyn Module>>,
283 chunk_group_info: &ChunkGroupInfo,
284 module_graph: &ModuleGraphRef,
285 queue: &mut VecDeque<(ResolvedVc<Box<dyn Module>>, PreBatchIndex)>,
286 ) -> Result<Vec<PreBatchItem>> {
287 let mut state = TraversalState {
288 items: Vec::new(),
289 this: self,
290 };
291 let mut visited = FxHashSet::default();
292 module_graph.traverse_edges_dfs(
293 std::iter::once(entry),
294 &mut state,
295 |parent_info, node, state| {
296 let ty = parent_info.map_or(
297 &ChunkingType::Parallel {
298 inherit_async: false,
299 hoisted: false,
300 },
301 |(_, ty)| &ty.chunking_type,
302 );
303 let module = node;
304 if !ty.is_parallel() {
305 state.items.push(PreBatchItem::NonParallelEdge(
306 ty.without_inherit_async(),
307 module,
308 ));
309 return Ok(GraphTraversalAction::Exclude);
310 }
311 if visited.insert(module) {
312 if parent_info.is_some() && state.this.boundary_modules.contains(&module) {
313 let idx = state.this.ensure_pre_batch_for_module(
314 module,
315 chunk_group_info,
316 queue,
317 )?;
318 state.items.push(PreBatchItem::ParallelReference(idx));
319 return Ok(GraphTraversalAction::Exclude);
320 }
321 Ok(GraphTraversalAction::Continue)
322 } else {
323 Ok(GraphTraversalAction::Exclude)
324 }
325 },
326 |_, node, state| {
327 let item = PreBatchItem::ParallelModule(node);
328 state.items.push(item);
329 Ok(())
330 },
331 )?;
332 Ok(state.items)
333 }
334}
335
336pub async fn compute_module_batches(
337 module_graph: Vc<ModuleGraph>,
338 _config: &BatchingConfig,
339) -> Result<Vc<ModuleBatchesGraph>> {
340 let outer_span = tracing::info_span!(
341 "compute module batches",
342 initial_pre_batch_items = tracing::field::Empty,
343 initial_pre_batches = tracing::field::Empty,
344 extracted_shared_items = tracing::field::Empty,
345 batches = tracing::field::Empty,
346 modules = tracing::field::Empty,
347 edges = tracing::field::Empty
348 );
349 let span = outer_span.clone();
350 async move {
351 let chunk_group_info = module_graph.chunk_group_info().await?;
352 let module_graph = module_graph.read_graphs().await?;
353
354 let mut pre_batches = PreBatches::new();
355
356 module_graph.traverse_edges_unordered(|parent, node| {
359 if let Some((parent, ty)) = parent {
360 let std::collections::hash_set::Entry::Vacant(entry) =
361 pre_batches.boundary_modules.entry(node)
362 else {
363 return Ok(());
365 };
366 if ty.chunking_type.is_parallel() {
367 let parent_chunk_groups = chunk_group_info
368 .module_chunk_groups
369 .get(&parent)
370 .context("all modules need to have chunk group info")?;
371 let chunk_groups = chunk_group_info
372 .module_chunk_groups
373 .get(&node)
374 .context("all modules need to have chunk group info")?;
375 if parent_chunk_groups != chunk_groups {
376 entry.insert();
378 }
379 } else {
380 entry.insert();
381 }
382 }
383 Ok(())
384 })?;
385
386 for chunk_group in &chunk_group_info.chunk_groups {
388 for entry in chunk_group.entries() {
389 pre_batches.boundary_modules.insert(entry);
390 }
391 }
392
393 module_graph.traverse_cycles(
396 |ref_data| ref_data.chunking_type.is_parallel(),
397 |cycle| {
398 if cycle
399 .iter()
400 .any(|node| pre_batches.boundary_modules.contains(node))
401 {
402 pre_batches
403 .boundary_modules
404 .extend(cycle.iter().map(|node| **node));
405 }
406 Ok(())
407 },
408 )?;
409
410 let mut queue: VecDeque<(ResolvedVc<Box<dyn Module>>, PreBatchIndex)> = VecDeque::new();
411
412 let mut chunk_group_indices_with_merged_children = FxHashSet::default();
413
414 for chunk_group in &chunk_group_info.chunk_groups {
416 for entry in chunk_group.entries() {
417 pre_batches.ensure_pre_batch_for_module(entry, &chunk_group_info, &mut queue)?;
418 }
419 if let Some(parent) = chunk_group.get_merged_parent() {
420 chunk_group_indices_with_merged_children.insert(parent);
421 }
422 }
423
424 let mut initial_pre_batch_items = 0;
425 while let Some((chunkable_module, idx)) = queue.pop_front() {
427 let items = pre_batches
428 .get_pre_batch_items(
429 chunkable_module,
430 &chunk_group_info,
431 &module_graph,
432 &mut queue,
433 )
434 .await?;
435 initial_pre_batch_items += items.len();
436 let batch = &mut pre_batches.batches[idx];
437 batch.items.extend(items);
438 }
439 span.record("initial_pre_batch_items", initial_pre_batch_items);
440 span.record("initial_pre_batches", pre_batches.batches.len());
441
442 let mut ordered_entries: Vec<Option<EntriesList>> =
444 vec![None; chunk_group_info.chunk_groups.len()];
445 for (i, chunk_group) in chunk_group_info.chunk_groups.iter().enumerate() {
446 if !chunk_group_indices_with_merged_children.contains(&i) {
447 continue;
448 }
449 let mut merged_modules: FxHashMap<ChunkingType, FxIndexSet<_>> = FxHashMap::default();
450 let mut stack = ordered_entries[i]
451 .as_ref()
452 .map_or_else(
453 || Either::Left(chunk_group.entries()),
454 |v| Either::Right(v.0.iter().copied()),
455 )
456 .map(|module| {
457 let idx = *pre_batches
458 .entries
459 .get(&module)
460 .context("could not prebatch for module")?;
461 Ok((idx, 0))
462 })
463 .collect::<Result<Vec<_>>>()?;
464 stack.reverse();
465 let mut visited = FxHashSet::default();
466 while let Some((idx, mut pos)) = stack.pop() {
467 let batch = &pre_batches.batches[idx];
468 while let Some(item) = batch.items.get_index(pos) {
469 match item {
470 PreBatchItem::ParallelModule(_) => {}
471 PreBatchItem::ParallelReference(other_idx) => {
472 if visited.insert(*other_idx) {
473 stack.push((idx, pos + 1));
474 stack.push((*other_idx, 0));
475 break;
476 }
477 }
478 PreBatchItem::NonParallelEdge(chunking_type, module) => {
479 if chunking_type.is_merged() {
480 merged_modules
481 .entry(chunking_type.clone())
482 .or_default()
483 .insert(*module);
484 }
485 }
486 }
487 pos += 1;
488 }
489 }
490 if !merged_modules.is_empty() {
491 for (ty, merged_modules) in merged_modules {
492 let chunk_group_key = match ty {
493 ChunkingType::Isolated {
494 merge_tag: Some(merge_tag),
495 ..
496 } => ChunkGroupKey::IsolatedMerged {
497 parent: i.into(),
498 merge_tag: merge_tag.clone(),
499 },
500 ChunkingType::Shared {
501 merge_tag: Some(merge_tag),
502 ..
503 } => ChunkGroupKey::SharedMerged {
504 parent: i.into(),
505 merge_tag: merge_tag.clone(),
506 },
507 _ => unreachable!(),
508 };
509 let idx = chunk_group_info
510 .chunk_group_keys
511 .get_index_of(&chunk_group_key)
512 .context("could not find chunk group key for merged chunk group")?;
513 ordered_entries[idx] = Some(EntriesList(merged_modules));
514 }
515 }
516 }
517
518 let mut parallel_module_to_pre_batch: FxIndexMap<_, Vec<PreBatchIndex>> =
520 FxIndexMap::default();
521
522 for (idx, pre_batch) in pre_batches.batches.iter().enumerate() {
524 for item in &pre_batch.items {
525 match item {
526 PreBatchItem::ParallelModule(module) => {
527 parallel_module_to_pre_batch
528 .entry(*module)
529 .or_default()
530 .push(idx);
531 }
532 PreBatchItem::NonParallelEdge(_, module) => {
533 if !pre_batches.entries.contains_key(module) {
534 pre_batches.single_module_entries.insert(*module);
535 }
536 }
537 PreBatchItem::ParallelReference(_) => {}
538 }
539 }
540 }
541
542 let mut extracted_shared_items = 0;
545 for i in 0..parallel_module_to_pre_batch.len() {
547 let (&module, batches) = parallel_module_to_pre_batch
548 .get_index(i)
549 .context("could not find parallel module to pre batch index")?;
550 if batches.len() > 1 {
551 let batches_with_item_index = batches
553 .iter()
554 .map(|&idx| {
555 let batch_items = &pre_batches.batches[idx].items;
556 let item_idx = batch_items
557 .get_index_of(&PreBatchItem::ParallelModule(module))
558 .context("could not find batch item index for parallel module")?;
559 Ok((idx, item_idx))
560 })
561 .collect::<Result<Vec<_>>>()?;
562 let mut selected_items = 1;
563 fn get_item_at(
564 pre_batches: &PreBatches,
565 batch_idx: PreBatchIndex,
566 item_idx: usize,
567 ) -> Option<&PreBatchItem> {
568 pre_batches.batches[batch_idx].items.get_index(item_idx)
569 }
570 loop {
573 if let Some(PreBatchItem::ParallelModule(next_module)) = get_item_at(
574 &pre_batches,
575 batches_with_item_index[0].0,
576 batches_with_item_index[0].1 + selected_items,
577 ) && parallel_module_to_pre_batch
578 .get(next_module)
579 .context("could not find pre batch for parallel module")?
580 .len()
581 == batches.len()
582 && batches_with_item_index[1..]
583 .iter()
584 .all(|&(batch_idx, item_idx)| {
585 get_item_at(&pre_batches, batch_idx, item_idx + selected_items)
586 == Some(&PreBatchItem::ParallelModule(*next_module))
587 })
588 {
589 selected_items += 1;
590 continue;
591 }
592 break;
593 }
594 extracted_shared_items += selected_items;
595
596 let exact_match = batches_with_item_index
599 .iter()
600 .find(|&&(batch_idx, item_idx)| {
601 item_idx == 0
602 && pre_batches.batches[batch_idx].items.len() == selected_items
603 });
604 if let Some(&(exact_match, _)) = exact_match {
605 for &(batch_index, item_start) in batches_with_item_index.iter() {
607 if batch_index != exact_match {
608 pre_batches.batches[batch_index].items.splice(
609 item_start..item_start + selected_items,
610 std::iter::once(PreBatchItem::ParallelReference(exact_match)),
611 );
612 }
613 }
614 for item in pre_batches.batches[exact_match].items.iter() {
615 if let PreBatchItem::ParallelModule(module) = item {
616 parallel_module_to_pre_batch
617 .get_mut(module)
618 .context("could not find pre batch for parallel module")?
619 .clear();
620 }
621 }
622 } else {
623 let first_batch_index = batches_with_item_index[0].0;
626 let first_batch_item_index = batches_with_item_index[0].1;
627 let new_batch_index = pre_batches.batches.len();
628 let mut new_batch =
629 PreBatch::new(pre_batches.batches[first_batch_index].chunk_groups.clone());
630 new_batch
631 .items
632 .extend(pre_batches.batches[first_batch_index].items.splice(
633 first_batch_item_index..first_batch_item_index + selected_items,
634 std::iter::once(PreBatchItem::ParallelReference(new_batch_index)),
635 ));
636 for item in new_batch.items.iter() {
637 if let PreBatchItem::ParallelModule(module) = item {
638 parallel_module_to_pre_batch
639 .get_mut(module)
640 .context("could not find pre batch for parallel module")?
641 .clear();
642 }
643 }
644 pre_batches.batches.push(new_batch);
645 for &(batch_index, item_start) in batches_with_item_index[1..].iter() {
646 pre_batches.batches[batch_index].items.splice(
647 item_start..item_start + selected_items,
648 std::iter::once(PreBatchItem::ParallelReference(new_batch_index)),
649 );
650 }
651 }
652 }
653 }
654 span.record("extracted_shared_items", extracted_shared_items);
655
656 let mut edges_count = 0;
659
660 for i in 0..pre_batches.batches.len() {
663 let items = take(&mut pre_batches.batches[i].items);
664 let mut new_items =
665 FxIndexSet::with_capacity_and_hasher(items.len(), Default::default());
666 enum Mode {
667 ParallelChunkableModule,
668 Other,
669 }
670 let mut mode = Mode::Other;
671 for item in items {
672 let chunkable_module = if let PreBatchItem::ParallelModule(module) = &item {
673 ResolvedVc::try_downcast::<Box<dyn ChunkableModule>>(*module)
674 } else {
675 None
676 };
677 let item = if let PreBatchItem::ParallelModule(module) = item {
678 if chunkable_module.is_some() {
679 PreBatchItem::ParallelModule(module)
680 } else {
681 pre_batches.single_module_entries.insert(module);
682 PreBatchItem::NonParallelEdge(
683 ChunkingType::Parallel {
684 inherit_async: false,
685 hoisted: false,
686 },
687 module,
688 )
689 }
690 } else {
691 item
692 };
693 match (&mode, chunkable_module) {
694 (_, Some(_)) => {
695 mode = Mode::ParallelChunkableModule;
696 new_items.insert(item);
697 }
698 (Mode::Other, _) => {
699 edges_count += 1;
700 new_items.insert(item);
701 }
702 (Mode::ParallelChunkableModule, _) => {
703 let idx = pre_batches.batches.len();
705 let mut new_batch =
706 PreBatch::new(pre_batches.batches[i].chunk_groups.clone());
707 new_batch.items.extend(new_items.drain(..));
708 pre_batches.batches.push(new_batch);
709 edges_count += 1;
710 new_items.insert(PreBatchItem::ParallelReference(idx));
711 if chunkable_module.is_some() {
712 new_items.insert(item);
713 } else {
714 edges_count += 1;
715 mode = Mode::Other;
716 new_items.insert(item);
717 }
718 }
719 }
720 }
721 pre_batches.batches[i].items = new_items;
722 }
723 span.record("pre_batches", pre_batches.batches.len());
724
725 let mut graph: DiGraph<ModuleOrBatch, ModuleBatchesGraphEdge, u32> =
729 petgraph::graph::DiGraph::with_capacity(
730 pre_batches.batches.len() + pre_batches.single_module_entries.len(),
731 edges_count,
732 );
733
734 let batches = pre_batches
736 .batches
737 .iter_mut()
738 .enumerate()
739 .map(async |(i, pre_batch)| {
740 let mut modules = pre_batch.items.iter().filter_map(|item| {
741 if let PreBatchItem::ParallelModule(module) = item {
742 ResolvedVc::try_downcast(*module)
743 } else {
744 None
745 }
746 });
747 let Some(first) = modules.next() else {
748 return Ok(ModuleOrBatch::None(i));
749 };
750 if let Some(second) = modules.next() {
751 let batch = ModuleBatch::new(
752 [first, second]
753 .into_iter()
754 .chain(modules)
755 .map(|m| *m)
756 .collect::<Vec<_>>(),
757 Some(pre_batch.chunk_groups.clone()),
758 );
759 Ok(ModuleOrBatch::Batch(batch.to_resolved().await?))
760 } else {
761 Ok(ModuleOrBatch::Module(ResolvedVc::upcast(first)))
762 }
763 })
764 .try_join()
765 .await?;
766
767 let mut batch_groups: FxHashMap<_, Vec<_>> = FxHashMap::default();
769 for (i, pre_batch) in pre_batches.batches.iter().enumerate() {
770 let key = BuildHasherDefault::<FxHasher>::default().prehash(&pre_batch.chunk_groups);
771 let batch = batches[i];
772 batch_groups.entry(key).or_default().push(batch);
773 }
774 for &module in &pre_batches.single_module_entries {
775 let chunk_groups = chunk_group_info
776 .module_chunk_groups
777 .get(&module)
778 .context("all modules need to have chunk group info")?;
779 let key = BuildHasherDefault::<FxHasher>::default().prehash(chunk_groups);
780 batch_groups
781 .entry(key)
782 .or_default()
783 .push(ModuleOrBatch::Module(module));
784 }
785
786 let batch_groups = batch_groups
788 .into_iter()
789 .map(async |(key, items)| {
790 if items.len() == 1 {
791 Ok(Either::Left(std::iter::empty()))
792 } else {
793 let batch_group = ModuleBatchGroup::new(items.clone(), (*key).clone())
794 .to_resolved()
795 .await?;
796 Ok(Either::Right(
797 items.into_iter().map(move |item| (item, batch_group)),
798 ))
799 }
800 })
801 .try_join()
802 .await?
803 .into_iter()
804 .flatten()
805 .collect::<FxHashMap<_, _>>();
806
807 let mut batches_count = 0;
809 let mut modules_count = 0;
810 let batch_indices = batches
811 .into_iter()
812 .map(|batch| {
813 match &batch {
814 ModuleOrBatch::Batch(_) => batches_count += 1,
815 ModuleOrBatch::Module(_) => modules_count += 1,
816 ModuleOrBatch::None(_) => {}
817 }
818 graph.add_node(batch)
819 })
820 .collect::<Vec<_>>();
821
822 let single_module_indices = pre_batches
824 .single_module_entries
825 .iter()
826 .map(|module| graph.add_node(ModuleOrBatch::Module(*module)))
827 .collect::<Vec<_>>();
828
829 span.record("batches", batches_count);
830 modules_count += pre_batches.single_module_entries.len();
831 span.record("modules", modules_count);
832 span.record("edges", edges_count);
833
834 for (i, pre_batch) in pre_batches.batches.into_iter().enumerate() {
836 let index = batch_indices[i];
837 let items = pre_batch.items;
838 for item in items {
839 match item {
840 PreBatchItem::ParallelReference(idx) => {
841 graph.add_edge(
842 index,
843 batch_indices[idx],
844 ModuleBatchesGraphEdge {
845 ty: ChunkingType::Parallel {
846 inherit_async: false,
847 hoisted: false,
848 },
849 module: None,
850 },
851 );
852 }
853 PreBatchItem::NonParallelEdge(ty, module) => {
854 if let Some(batch) = pre_batches.entries.get(&module).copied() {
855 graph.add_edge(
856 index,
857 batch_indices[batch],
858 ModuleBatchesGraphEdge {
859 ty,
860 module: Some(module),
861 },
862 );
863 continue;
864 }
865 let idx = pre_batches
866 .single_module_entries
867 .get_index_of(&module)
868 .context("could not find single module entry index")?;
869 let idx = single_module_indices[idx];
870 graph.add_edge(
871 index,
872 idx,
873 ModuleBatchesGraphEdge {
874 ty,
875 module: Some(module),
876 },
877 );
878 }
879 PreBatchItem::ParallelModule(_) => {}
880 }
881 }
882 }
883
884 debug_assert_eq!(graph.capacity().0, graph.node_count());
885 debug_assert_eq!(graph.capacity().1, graph.edge_count());
886
887 let mut entries = FxHashMap::default();
889 for chunk_group in &chunk_group_info.chunk_groups {
890 for module in chunk_group.entries() {
891 if let Some(batch) = pre_batches.entries.get(&module).copied() {
892 entries.insert(module, batch_indices[batch]);
893 continue;
894 }
895 let idx = pre_batches
896 .single_module_entries
897 .get_index_of(&module)
898 .context("could not find single module entry index")?;
899 let idx = single_module_indices[idx];
900 entries.insert(module, idx);
901 }
902 }
903
904 Ok(ModuleBatchesGraph {
905 graph: TracedDiGraph(graph),
906 entries,
907 batch_groups,
908 ordered_entries,
909 }
910 .cell())
911 }
912 .instrument(outer_span)
913 .await
914}