1use std::{
2 collections::{VecDeque, hash_map::Entry},
3 hash::BuildHasherDefault,
4 mem::take,
5};
6
7use anyhow::{Context, Result, bail};
8use either::Either;
9use petgraph::graph::{DiGraph, EdgeIndex, NodeIndex};
10use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
11use serde::{Deserialize, Serialize};
12use tracing::Instrument;
13use turbo_prehash::BuildHasherExt;
14use turbo_tasks::{
15 FxIndexMap, FxIndexSet, NonLocalValue, ResolvedVc, TaskInput, TryJoinIterExt, ValueToString,
16 Vc, trace::TraceRawVcs,
17};
18
19use crate::{
20 chunk::{ChunkableModule, ChunkingType},
21 module::Module,
22 module_graph::{
23 GraphTraversalAction, ModuleGraph,
24 chunk_group_info::{ChunkGroupInfo, ChunkGroupKey, RoaringBitmapWrapper},
25 module_batch::{ModuleBatch, ModuleBatchGroup, ModuleOrBatch},
26 traced_di_graph::{TracedDiGraph, iter_neighbors_rev},
27 },
28};
29#[turbo_tasks::value]
30#[derive(Debug, Clone, Default, TaskInput, Hash)]
31pub struct BatchingConfig {
32 pub use_heuristic: bool,
35}
36
37#[turbo_tasks::value_impl]
38impl BatchingConfig {
39 #[turbo_tasks::function]
40 pub fn new(config: BatchingConfig) -> Vc<Self> {
41 config.cell()
42 }
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize, TraceRawVcs, NonLocalValue)]
46pub struct ModuleBatchesGraphEdge {
47 pub ty: ChunkingType,
48 pub module: Option<ResolvedVc<Box<dyn Module>>>,
49}
50
51type EntriesList = FxIndexSet<ResolvedVc<Box<dyn Module>>>;
52
53#[turbo_tasks::value(cell = "new", eq = "manual", into = "new")]
54pub struct ModuleBatchesGraph {
55 graph: TracedDiGraph<ModuleOrBatch, ModuleBatchesGraphEdge>,
56
57 #[turbo_tasks(trace_ignore)]
64 entries: FxHashMap<ResolvedVc<Box<dyn Module>>, NodeIndex>,
65 batch_groups: FxHashMap<ModuleOrBatch, ResolvedVc<ModuleBatchGroup>>,
66
67 ordered_entries: Vec<Option<EntriesList>>,
72}
73
74impl ModuleBatchesGraph {
75 pub async fn get_entry_index(&self, entry: ResolvedVc<Box<dyn Module>>) -> Result<NodeIndex> {
76 let Some(entry) = self.entries.get(&entry) else {
77 bail!(
78 "Entry {} is not in graph (possible entries: {:#?})",
79 entry.ident().to_string().await?,
80 self.entries
81 .keys()
82 .map(|e| e.ident().to_string())
83 .try_join()
84 .await?
85 );
86 };
87 Ok(*entry)
88 }
89
90 pub fn get_ordered_entries<'l>(
91 &'l self,
92 chunk_group_info: &'l ChunkGroupInfo,
93 idx: usize,
94 ) -> impl Iterator<Item = ResolvedVc<Box<dyn Module>>> + 'l {
95 if let Some(ordered_entries) = self
96 .ordered_entries
97 .get(idx)
98 .as_ref()
99 .and_then(|o| o.as_ref())
100 {
101 if let Some(chunk_group) = chunk_group_info.chunk_groups.get_index(idx) {
102 debug_assert_eq!(ordered_entries.len(), chunk_group.entries_count());
103 }
104 Either::Left(Either::Left(ordered_entries.iter().copied()))
105 } else if let Some(chunk_group) = chunk_group_info.chunk_groups.get_index(idx) {
106 Either::Right(chunk_group.entries())
107 } else {
108 Either::Left(Either::Right(std::iter::empty()))
109 }
110 }
111
112 pub fn get_batch_group(
113 &self,
114 module_or_batch: &ModuleOrBatch,
115 ) -> Option<ResolvedVc<ModuleBatchGroup>> {
116 self.batch_groups.get(module_or_batch).copied()
117 }
118
119 pub async fn get_entry(&self, entry: ResolvedVc<Box<dyn Module>>) -> Result<ModuleOrBatch> {
120 let entry = self.get_entry_index(entry).await?;
121 Ok(*self.graph.node_weight(entry).unwrap())
122 }
123
124 #[allow(clippy::implied_bounds_in_impls)]
126 pub fn traverse_edges_from_entries_topological<'a, S>(
144 &'a self,
145 entries: impl IntoIterator<
146 Item = NodeIndex,
147 IntoIter = impl Iterator<Item = NodeIndex> + DoubleEndedIterator,
148 >,
149 state: &mut S,
150 mut visit_preorder: impl FnMut(
151 Option<(&'a ModuleOrBatch, &'a ModuleBatchesGraphEdge)>,
152 &'a ModuleOrBatch,
153 &mut S,
154 ) -> Result<GraphTraversalAction>,
155 mut visit_postorder: impl FnMut(
156 Option<(&'a ModuleOrBatch, &'a ModuleBatchesGraphEdge)>,
157 &'a ModuleOrBatch,
158 &mut S,
159 ),
160 ) -> Result<()> {
161 let graph = &self.graph;
162
163 enum ReverseTopologicalPass {
164 Visit,
165 ExpandAndVisit,
166 }
167
168 let entries = entries.into_iter();
169 #[allow(clippy::type_complexity)] let mut stack: Vec<(
171 ReverseTopologicalPass,
172 Option<(NodeIndex, EdgeIndex)>,
173 NodeIndex,
174 )> = entries
175 .rev()
176 .map(|e| (ReverseTopologicalPass::ExpandAndVisit, None, e))
177 .collect();
178 let mut expanded = FxHashSet::default();
179 while let Some((pass, parent, current)) = stack.pop() {
180 let parent_arg = parent.map(|(node, edge)| {
181 (
182 graph.node_weight(node).unwrap(),
183 graph.edge_weight(edge).unwrap(),
184 )
185 });
186 match pass {
187 ReverseTopologicalPass::Visit => {
188 let current_node = graph.node_weight(current).unwrap();
189 visit_postorder(parent_arg, current_node, state);
190 }
191 ReverseTopologicalPass::ExpandAndVisit => {
192 let current_node = graph.node_weight(current).unwrap();
193 let action = visit_preorder(parent_arg, current_node, state)?;
194 if action == GraphTraversalAction::Exclude {
195 continue;
196 }
197 stack.push((ReverseTopologicalPass::Visit, parent, current));
198 if action == GraphTraversalAction::Continue && expanded.insert(current) {
199 stack.extend(iter_neighbors_rev(graph, current).map(|(edge, child)| {
200 (
201 ReverseTopologicalPass::ExpandAndVisit,
202 Some((current, edge)),
203 child,
204 )
205 }));
206 }
207 }
208 }
209 }
210
211 Ok(())
212 }
213}
214
215type PreBatchIndex = usize;
216
217#[derive(Hash, PartialEq, Eq, Clone, Debug)]
218enum PreBatchItem {
219 ParallelModule(ResolvedVc<Box<dyn Module>>),
220 ParallelReference(PreBatchIndex),
221 NonParallelEdge(ChunkingType, ResolvedVc<Box<dyn Module>>),
222}
223
224struct PreBatch {
225 items: FxIndexSet<PreBatchItem>,
226 chunk_groups: RoaringBitmapWrapper,
227}
228
229impl PreBatch {
230 fn new(chunk_groups: RoaringBitmapWrapper) -> Self {
231 Self {
232 items: FxIndexSet::default(),
233 chunk_groups,
234 }
235 }
236}
237
238struct TraversalState<'l> {
239 items: Vec<PreBatchItem>,
240 this: &'l mut PreBatches,
241}
242
243struct PreBatches {
244 boundary_modules: FxHashSet<ResolvedVc<Box<dyn Module>>>,
245 batches: Vec<PreBatch>,
246 entries: FxHashMap<ResolvedVc<Box<dyn Module>>, PreBatchIndex>,
247 single_module_entries: FxIndexSet<ResolvedVc<Box<dyn Module>>>,
248}
249
250impl PreBatches {
251 fn new() -> Self {
252 Self {
253 boundary_modules: FxHashSet::default(),
254 batches: Vec::new(),
255 entries: FxHashMap::default(),
256 single_module_entries: FxIndexSet::default(),
257 }
258 }
259
260 fn ensure_pre_batch_for_module(
261 &mut self,
262 module: ResolvedVc<Box<dyn Module>>,
263 chunk_group_info: &ChunkGroupInfo,
264 queue: &mut VecDeque<(ResolvedVc<Box<dyn Module>>, PreBatchIndex)>,
265 ) -> Result<PreBatchIndex> {
266 Ok(match self.entries.entry(module) {
267 Entry::Vacant(e) => {
268 let index = self.batches.len();
269 queue.push_back((module, index));
270 let chunk_groups = chunk_group_info
271 .module_chunk_groups
272 .get(&module)
273 .context("all modules need to have chunk group info")?;
274 let batch = PreBatch::new(chunk_groups.clone());
275 self.batches.push(batch);
276 e.insert(index);
277 index
278 }
279 Entry::Occupied(e) => *e.get(),
280 })
281 }
282
283 async fn get_pre_batch_items(
284 &mut self,
285 entry: ResolvedVc<Box<dyn Module>>,
286 chunk_group_info: &ChunkGroupInfo,
287 module_graph: &ModuleGraph,
288 queue: &mut VecDeque<(ResolvedVc<Box<dyn Module>>, PreBatchIndex)>,
289 ) -> Result<Vec<PreBatchItem>> {
290 let mut state = TraversalState {
291 items: Vec::new(),
292 this: self,
293 };
294 let mut visited = FxHashSet::default();
295 module_graph
296 .traverse_edges_from_entries_topological(
297 std::iter::once(ResolvedVc::upcast(entry)),
298 &mut state,
299 |parent_info, node, state| {
300 let ty = parent_info.map_or(
301 &ChunkingType::Parallel {
302 inherit_async: false,
303 hoisted: false,
304 },
305 |(_, ty)| ty,
306 );
307 let module = node.module;
308 if !ty.is_parallel() {
309 state.items.push(PreBatchItem::NonParallelEdge(
310 ty.without_inherit_async(),
311 module,
312 ));
313 return Ok(GraphTraversalAction::Exclude);
314 }
315 if visited.insert(module) {
316 if parent_info.is_some() && state.this.boundary_modules.contains(&module) {
317 let idx = state.this.ensure_pre_batch_for_module(
318 module,
319 chunk_group_info,
320 queue,
321 )?;
322 state.items.push(PreBatchItem::ParallelReference(idx));
323 return Ok(GraphTraversalAction::Exclude);
324 }
325 Ok(GraphTraversalAction::Continue)
326 } else {
327 Ok(GraphTraversalAction::Exclude)
328 }
329 },
330 |_, node, state| {
331 let item = PreBatchItem::ParallelModule(node.module);
332 state.items.push(item);
333 },
334 )
335 .await?;
336 Ok(state.items)
337 }
338}
339
340pub async fn compute_module_batches(
341 module_graph: Vc<ModuleGraph>,
342 _config: &BatchingConfig,
343) -> Result<Vc<ModuleBatchesGraph>> {
344 let outer_span = tracing::info_span!(
345 "compute module batches",
346 initial_pre_batch_items = tracing::field::Empty,
347 initial_pre_batches = tracing::field::Empty,
348 extracted_shared_items = tracing::field::Empty,
349 batches = tracing::field::Empty,
350 modules = tracing::field::Empty,
351 edges = tracing::field::Empty
352 );
353 let span = outer_span.clone();
354 async move {
355 let chunk_group_info = module_graph.chunk_group_info().await?;
356 let module_graph = module_graph.await?;
357
358 let mut pre_batches = PreBatches::new();
359
360 module_graph
363 .traverse_all_edges_unordered(|(parent, ty), node| {
364 let std::collections::hash_set::Entry::Vacant(entry) =
365 pre_batches.boundary_modules.entry(node.module)
366 else {
367 return Ok(());
369 };
370 if ty.is_parallel() {
371 let parent_chunk_groups = chunk_group_info
372 .module_chunk_groups
373 .get(&parent.module)
374 .context("all modules need to have chunk group info")?;
375 let chunk_groups = chunk_group_info
376 .module_chunk_groups
377 .get(&node.module)
378 .context("all modules need to have chunk group info")?;
379 if parent_chunk_groups != chunk_groups {
380 entry.insert();
382 }
383 } else {
384 entry.insert();
385 }
386 Ok(())
387 })
388 .await?;
389
390 for chunk_group in &chunk_group_info.chunk_groups {
392 for entry in chunk_group.entries() {
393 pre_batches.boundary_modules.insert(entry);
394 }
395 }
396
397 module_graph
400 .traverse_cycles(
401 |ty| ty.is_parallel(),
402 |cycle| {
403 if cycle
404 .iter()
405 .any(|node| pre_batches.boundary_modules.contains(&node.module))
406 {
407 pre_batches
408 .boundary_modules
409 .extend(cycle.iter().map(|node| node.module));
410 }
411 },
412 )
413 .await?;
414
415 let mut queue: VecDeque<(ResolvedVc<Box<dyn Module>>, PreBatchIndex)> = VecDeque::new();
416
417 let mut chunk_group_indicies_with_merged_children = FxHashSet::default();
418
419 for chunk_group in &chunk_group_info.chunk_groups {
421 for entry in chunk_group.entries() {
422 if let Some(chunkable_module) = ResolvedVc::try_downcast(entry) {
423 pre_batches.ensure_pre_batch_for_module(
424 chunkable_module,
425 &chunk_group_info,
426 &mut queue,
427 )?;
428 } else {
429 pre_batches.single_module_entries.insert(entry);
430 }
431 }
432 if let Some(parent) = chunk_group.get_merged_parent() {
433 chunk_group_indicies_with_merged_children.insert(parent);
434 }
435 }
436
437 let mut initial_pre_batch_items = 0;
438 while let Some((chunkable_module, idx)) = queue.pop_front() {
440 let items = pre_batches
441 .get_pre_batch_items(
442 chunkable_module,
443 &chunk_group_info,
444 &module_graph,
445 &mut queue,
446 )
447 .await?;
448 initial_pre_batch_items += items.len();
449 let batch = &mut pre_batches.batches[idx];
450 batch.items.extend(items);
451 }
452 span.record("initial_pre_batch_items", initial_pre_batch_items);
453 span.record("initial_pre_batches", pre_batches.batches.len());
454
455 let mut ordered_entries: Vec<Option<EntriesList>> =
457 vec![None; chunk_group_info.chunk_groups.len()];
458 for (i, chunk_group) in chunk_group_info.chunk_groups.iter().enumerate() {
459 if !chunk_group_indicies_with_merged_children.contains(&i) {
460 continue;
461 }
462 let mut merged_modules: FxHashMap<ChunkingType, FxIndexSet<_>> = FxHashMap::default();
463 let mut stack = ordered_entries[i]
464 .as_ref()
465 .map_or_else(
466 || Either::Left(chunk_group.entries()),
467 |v| Either::Right(v.iter().copied()),
468 )
469 .filter_map(|module| {
470 if let Some(chunkable_module) = ResolvedVc::try_downcast(module) {
471 let idx = *pre_batches.entries.get(&chunkable_module).unwrap();
472 Some((idx, 0))
473 } else {
474 None
475 }
476 })
477 .collect::<Vec<_>>();
478 stack.reverse();
479 let mut visited = FxHashSet::default();
480 while let Some((idx, mut pos)) = stack.pop() {
481 let batch = &pre_batches.batches[idx];
482 while let Some(item) = batch.items.get_index(pos) {
483 match item {
484 PreBatchItem::ParallelModule(_) => {}
485 PreBatchItem::ParallelReference(other_idx) => {
486 if visited.insert(*other_idx) {
487 stack.push((idx, pos + 1));
488 stack.push((*other_idx, 0));
489 break;
490 }
491 }
492 PreBatchItem::NonParallelEdge(chunking_type, module) => {
493 if chunking_type.is_merged() {
494 merged_modules
495 .entry(chunking_type.clone())
496 .or_default()
497 .insert(*module);
498 }
499 }
500 }
501 pos += 1;
502 }
503 }
504 if !merged_modules.is_empty() {
505 for (ty, merged_modules) in merged_modules {
506 let chunk_group_key = match ty {
507 ChunkingType::Isolated {
508 merge_tag: Some(merge_tag),
509 ..
510 } => ChunkGroupKey::IsolatedMerged {
511 parent: i.into(),
512 merge_tag: merge_tag.clone(),
513 },
514 ChunkingType::Shared {
515 merge_tag: Some(merge_tag),
516 ..
517 } => ChunkGroupKey::SharedMerged {
518 parent: i.into(),
519 merge_tag: merge_tag.clone(),
520 },
521 _ => unreachable!(),
522 };
523 let idx = chunk_group_info
524 .chunk_group_keys
525 .get_index_of(&chunk_group_key)
526 .unwrap();
527 ordered_entries[idx] = Some(merged_modules);
528 }
529 }
530 }
531
532 let mut parallel_module_to_pre_batch: FxIndexMap<_, Vec<PreBatchIndex>> =
534 FxIndexMap::default();
535
536 for (idx, pre_batch) in pre_batches.batches.iter().enumerate() {
538 for item in &pre_batch.items {
539 match item {
540 PreBatchItem::ParallelModule(module) => {
541 parallel_module_to_pre_batch
542 .entry(*module)
543 .or_default()
544 .push(idx);
545 }
546 PreBatchItem::NonParallelEdge(_, module) => {
547 if let Some(chunkable_module) = ResolvedVc::try_downcast(*module) {
548 if !pre_batches.entries.contains_key(&chunkable_module) {
549 pre_batches.single_module_entries.insert(*module);
550 }
551 } else {
552 pre_batches.single_module_entries.insert(*module);
553 }
554 }
555 PreBatchItem::ParallelReference(_) => {}
556 }
557 }
558 }
559
560 let mut extracted_shared_items = 0;
563 for i in 0..parallel_module_to_pre_batch.len() {
565 let (&module, batches) = parallel_module_to_pre_batch.get_index(i).unwrap();
566 if batches.len() > 1 {
567 let batches_with_item_index = batches
569 .iter()
570 .map(|&idx| {
571 let batch_items = &pre_batches.batches[idx].items;
572 let item_idx = batch_items
573 .get_index_of(&PreBatchItem::ParallelModule(module))
574 .unwrap();
575 (idx, item_idx)
576 })
577 .collect::<Vec<_>>();
578 let mut selected_items = 1;
579 fn get_item_at(
580 pre_batches: &PreBatches,
581 batch_idx: PreBatchIndex,
582 item_idx: usize,
583 ) -> Option<&PreBatchItem> {
584 pre_batches.batches[batch_idx].items.get_index(item_idx)
585 }
586 loop {
589 if let Some(PreBatchItem::ParallelModule(next_module)) = get_item_at(
590 &pre_batches,
591 batches_with_item_index[0].0,
592 batches_with_item_index[0].1 + selected_items,
593 ) {
594 if parallel_module_to_pre_batch.get(next_module).unwrap().len()
595 == batches.len()
596 && batches_with_item_index[1..]
597 .iter()
598 .all(|&(batch_idx, item_idx)| {
599 get_item_at(&pre_batches, batch_idx, item_idx + selected_items)
600 == Some(&PreBatchItem::ParallelModule(*next_module))
601 })
602 {
603 selected_items += 1;
604 continue;
605 }
606 }
607 break;
608 }
609 extracted_shared_items += selected_items;
610
611 let exact_match = batches_with_item_index
614 .iter()
615 .find(|&&(batch_idx, item_idx)| {
616 item_idx == 0
617 && pre_batches.batches[batch_idx].items.len() == selected_items
618 });
619 if let Some(&(exact_match, _)) = exact_match {
620 for &(batch_index, item_start) in batches_with_item_index.iter() {
622 if batch_index != exact_match {
623 pre_batches.batches[batch_index].items.splice(
624 item_start..item_start + selected_items,
625 std::iter::once(PreBatchItem::ParallelReference(exact_match)),
626 );
627 }
628 }
629 for item in pre_batches.batches[exact_match].items.iter() {
630 if let PreBatchItem::ParallelModule(module) = item {
631 parallel_module_to_pre_batch
632 .get_mut(module)
633 .unwrap()
634 .clear();
635 }
636 }
637 } else {
638 let first_batch_index = batches_with_item_index[0].0;
641 let first_batch_item_index = batches_with_item_index[0].1;
642 let new_batch_index = pre_batches.batches.len();
643 let mut new_batch =
644 PreBatch::new(pre_batches.batches[first_batch_index].chunk_groups.clone());
645 new_batch
646 .items
647 .extend(pre_batches.batches[first_batch_index].items.splice(
648 first_batch_item_index..first_batch_item_index + selected_items,
649 std::iter::once(PreBatchItem::ParallelReference(new_batch_index)),
650 ));
651 for item in new_batch.items.iter() {
652 if let PreBatchItem::ParallelModule(module) = item {
653 parallel_module_to_pre_batch
654 .get_mut(module)
655 .unwrap()
656 .clear();
657 }
658 }
659 pre_batches.batches.push(new_batch);
660 for &(batch_index, item_start) in batches_with_item_index[1..].iter() {
661 pre_batches.batches[batch_index].items.splice(
662 item_start..item_start + selected_items,
663 std::iter::once(PreBatchItem::ParallelReference(new_batch_index)),
664 );
665 }
666 }
667 }
668 }
669 span.record("extracted_shared_items", extracted_shared_items);
670
671 let mut edges_count = 0;
674
675 for i in 0..pre_batches.batches.len() {
678 let items = take(&mut pre_batches.batches[i].items);
679 let mut new_items =
680 FxIndexSet::with_capacity_and_hasher(items.len(), Default::default());
681 enum Mode {
682 ParallelChunkableModule,
683 Other,
684 }
685 let mut mode = Mode::Other;
686 for item in items {
687 let chunkable_module = if let PreBatchItem::ParallelModule(module) = &item {
688 ResolvedVc::try_downcast::<Box<dyn ChunkableModule>>(*module)
689 } else {
690 None
691 };
692 let item = if let PreBatchItem::ParallelModule(module) = item {
693 if chunkable_module.is_some() {
694 PreBatchItem::ParallelModule(module)
695 } else {
696 pre_batches.single_module_entries.insert(module);
697 PreBatchItem::NonParallelEdge(
698 ChunkingType::Parallel {
699 inherit_async: false,
700 hoisted: false,
701 },
702 module,
703 )
704 }
705 } else {
706 item
707 };
708 match (&mode, chunkable_module) {
709 (_, Some(_)) => {
710 mode = Mode::ParallelChunkableModule;
711 new_items.insert(item);
712 }
713 (Mode::Other, _) => {
714 edges_count += 1;
715 new_items.insert(item);
716 }
717 (Mode::ParallelChunkableModule, _) => {
718 let idx = pre_batches.batches.len();
720 let mut new_batch =
721 PreBatch::new(pre_batches.batches[i].chunk_groups.clone());
722 new_batch.items.extend(new_items.drain(..));
723 pre_batches.batches.push(new_batch);
724 edges_count += 1;
725 new_items.insert(PreBatchItem::ParallelReference(idx));
726 if chunkable_module.is_some() {
727 new_items.insert(item);
728 } else {
729 edges_count += 1;
730 mode = Mode::Other;
731 new_items.insert(item);
732 }
733 }
734 }
735 }
736 pre_batches.batches[i].items = new_items;
737 }
738 span.record("pre_batches", pre_batches.batches.len());
739
740 let mut graph: DiGraph<ModuleOrBatch, ModuleBatchesGraphEdge, u32> =
744 petgraph::graph::DiGraph::with_capacity(
745 pre_batches.batches.len() + pre_batches.single_module_entries.len(),
746 edges_count,
747 );
748
749 let batches = pre_batches
751 .batches
752 .iter_mut()
753 .enumerate()
754 .map(async |(i, pre_batch)| {
755 let mut modules = pre_batch.items.iter().filter_map(|item| {
756 if let PreBatchItem::ParallelModule(module) = item {
757 ResolvedVc::try_downcast(*module)
758 } else {
759 None
760 }
761 });
762 let Some(first) = modules.next() else {
763 return Ok(ModuleOrBatch::None(i));
764 };
765 if let Some(second) = modules.next() {
766 let batch = ModuleBatch::new(
767 [first, second]
768 .into_iter()
769 .chain(modules)
770 .map(|m| *m)
771 .collect::<Vec<_>>(),
772 Some(pre_batch.chunk_groups.clone()),
773 );
774 Ok(ModuleOrBatch::Batch(batch.to_resolved().await?))
775 } else {
776 Ok(ModuleOrBatch::Module(ResolvedVc::upcast(first)))
777 }
778 })
779 .try_join()
780 .await?;
781
782 let mut batch_groups: FxHashMap<_, Vec<_>> = FxHashMap::default();
784 for (i, pre_batch) in pre_batches.batches.iter().enumerate() {
785 let key =
786 BuildHasherDefault::<FxHasher>::default().prehash(pre_batch.chunk_groups.clone());
787 let batch = batches[i];
788 batch_groups.entry(key).or_default().push(batch);
789 }
790 for &module in &pre_batches.single_module_entries {
791 let chunk_groups = chunk_group_info
792 .module_chunk_groups
793 .get(&module)
794 .context("all modules need to have chunk group info")?;
795 let key = BuildHasherDefault::<FxHasher>::default().prehash(chunk_groups.clone());
796 batch_groups
797 .entry(key)
798 .or_default()
799 .push(ModuleOrBatch::Module(module));
800 }
801
802 let batch_groups = batch_groups
804 .into_iter()
805 .map(async |(key, items)| {
806 if items.len() == 1 {
807 Ok(Either::Left(std::iter::empty()))
808 } else {
809 let batch_group = ModuleBatchGroup::new(items.clone(), (*key).clone())
810 .to_resolved()
811 .await?;
812 Ok(Either::Right(
813 items.into_iter().map(move |item| (item, batch_group)),
814 ))
815 }
816 })
817 .try_join()
818 .await?
819 .into_iter()
820 .flatten()
821 .collect::<FxHashMap<_, _>>();
822
823 let mut batches_count = 0;
825 let mut modules_count = 0;
826 let batch_indicies = batches
827 .into_iter()
828 .map(|batch| {
829 match &batch {
830 ModuleOrBatch::Batch(_) => batches_count += 1,
831 ModuleOrBatch::Module(_) => modules_count += 1,
832 ModuleOrBatch::None(_) => {}
833 }
834 graph.add_node(batch)
835 })
836 .collect::<Vec<_>>();
837
838 let single_module_indicies = pre_batches
840 .single_module_entries
841 .iter()
842 .map(|module| graph.add_node(ModuleOrBatch::Module(*module)))
843 .collect::<Vec<_>>();
844
845 span.record("batches", batches_count);
846 modules_count += pre_batches.single_module_entries.len();
847 span.record("modules", modules_count);
848 span.record("edges", edges_count);
849
850 for (i, pre_batch) in pre_batches.batches.into_iter().enumerate() {
852 let index = batch_indicies[i];
853 let items = pre_batch.items;
854 for item in items {
855 match item {
856 PreBatchItem::ParallelReference(idx) => {
857 graph.add_edge(
858 index,
859 batch_indicies[idx],
860 ModuleBatchesGraphEdge {
861 ty: ChunkingType::Parallel {
862 inherit_async: false,
863 hoisted: false,
864 },
865 module: None,
866 },
867 );
868 }
869 PreBatchItem::NonParallelEdge(ty, module) => {
870 if let Some(chunkable_module) = ResolvedVc::try_downcast(module) {
871 if let Some(batch) = pre_batches.entries.get(&chunkable_module).copied()
872 {
873 graph.add_edge(
874 index,
875 batch_indicies[batch],
876 ModuleBatchesGraphEdge {
877 ty,
878 module: Some(module),
879 },
880 );
881 continue;
882 }
883 }
884 let idx = pre_batches
885 .single_module_entries
886 .get_index_of(&module)
887 .unwrap();
888 let idx = single_module_indicies[idx];
889 graph.add_edge(
890 index,
891 idx,
892 ModuleBatchesGraphEdge {
893 ty,
894 module: Some(module),
895 },
896 );
897 }
898 PreBatchItem::ParallelModule(_) => {}
899 }
900 }
901 }
902
903 debug_assert_eq!(graph.capacity().0, graph.node_count());
904 debug_assert_eq!(graph.capacity().1, graph.edge_count());
905
906 let mut entries = FxHashMap::default();
908 for chunk_group in &chunk_group_info.chunk_groups {
909 for module in chunk_group.entries() {
910 if let Some(chunkable_module) = ResolvedVc::try_downcast(module) {
911 if let Some(batch) = pre_batches.entries.get(&chunkable_module).copied() {
912 entries.insert(module, batch_indicies[batch]);
913 continue;
914 }
915 }
916 let idx = pre_batches
917 .single_module_entries
918 .get_index_of(&module)
919 .unwrap();
920 let idx = single_module_indicies[idx];
921 entries.insert(module, idx);
922 }
923 }
924
925 Ok(ModuleBatchesGraph {
926 graph: TracedDiGraph(graph),
927 entries,
928 batch_groups,
929 ordered_entries,
930 }
931 .cell())
932 }
933 .instrument(outer_span)
934 .await
935}