turbopack_core/module_graph/
module_batches.rs

1use std::{
2    collections::{VecDeque, hash_map::Entry},
3    hash::BuildHasherDefault,
4    mem::take,
5};
6
7use anyhow::{Context, Result, bail};
8use either::Either;
9use petgraph::graph::{DiGraph, EdgeIndex, NodeIndex};
10use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
11use serde::{Deserialize, Serialize};
12use tracing::Instrument;
13use turbo_prehash::BuildHasherExt;
14use turbo_tasks::{
15    FxIndexMap, FxIndexSet, NonLocalValue, ResolvedVc, TaskInput, TryJoinIterExt, ValueToString,
16    Vc, trace::TraceRawVcs,
17};
18
19use crate::{
20    chunk::{ChunkableModule, ChunkingType},
21    module::Module,
22    module_graph::{
23        GraphTraversalAction, ModuleGraph,
24        chunk_group_info::{ChunkGroupInfo, ChunkGroupKey, RoaringBitmapWrapper},
25        module_batch::{ModuleBatch, ModuleBatchGroup, ModuleOrBatch},
26        traced_di_graph::{TracedDiGraph, iter_neighbors_rev},
27    },
28};
29#[turbo_tasks::value]
30#[derive(Debug, Clone, Default, TaskInput, Hash)]
31pub struct BatchingConfig {
32    /// Use a heuristic based on the module path to create batches. It aims for batches of a good
33    /// size.
34    pub use_heuristic: bool,
35}
36
37#[turbo_tasks::value_impl]
38impl BatchingConfig {
39    #[turbo_tasks::function]
40    pub fn new(config: BatchingConfig) -> Vc<Self> {
41        config.cell()
42    }
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize, TraceRawVcs, NonLocalValue)]
46pub struct ModuleBatchesGraphEdge {
47    pub ty: ChunkingType,
48    pub module: Option<ResolvedVc<Box<dyn Module>>>,
49}
50
51type EntriesList = FxIndexSet<ResolvedVc<Box<dyn Module>>>;
52
53#[turbo_tasks::value(cell = "new", eq = "manual", into = "new")]
54pub struct ModuleBatchesGraph {
55    graph: TracedDiGraph<ModuleOrBatch, ModuleBatchesGraphEdge>,
56
57    // NodeIndex isn't necessarily stable (because of swap_remove), but we never remove nodes.
58    //
59    // HashMaps have nondeterministic order, but this map is only used for lookups and not
60    // iteration.
61    //
62    // This contains Vcs, but they are already contained in the graph, so no need to trace this.
63    #[turbo_tasks(trace_ignore)]
64    entries: FxHashMap<ResolvedVc<Box<dyn Module>>, NodeIndex>,
65    batch_groups: FxHashMap<ModuleOrBatch, ResolvedVc<ModuleBatchGroup>>,
66
67    /// For chunk groups where the postorder of entries is different than the order of the
68    /// `ChunkGroup::entries()` this contains Some with the postorder list of entries of that chunk
69    /// group. The index in this list corresponds to the index in the
70    /// chunk_group_info.chunk_groups.
71    ordered_entries: Vec<Option<EntriesList>>,
72}
73
74impl ModuleBatchesGraph {
75    pub async fn get_entry_index(&self, entry: ResolvedVc<Box<dyn Module>>) -> Result<NodeIndex> {
76        let Some(entry) = self.entries.get(&entry) else {
77            bail!(
78                "Entry {} is not in graph (possible entries: {:#?})",
79                entry.ident().to_string().await?,
80                self.entries
81                    .keys()
82                    .map(|e| e.ident().to_string())
83                    .try_join()
84                    .await?
85            );
86        };
87        Ok(*entry)
88    }
89
90    pub fn get_ordered_entries<'l>(
91        &'l self,
92        chunk_group_info: &'l ChunkGroupInfo,
93        idx: usize,
94    ) -> impl Iterator<Item = ResolvedVc<Box<dyn Module>>> + 'l {
95        if let Some(ordered_entries) = self
96            .ordered_entries
97            .get(idx)
98            .as_ref()
99            .and_then(|o| o.as_ref())
100        {
101            if let Some(chunk_group) = chunk_group_info.chunk_groups.get_index(idx) {
102                debug_assert_eq!(ordered_entries.len(), chunk_group.entries_count());
103            }
104            Either::Left(Either::Left(ordered_entries.iter().copied()))
105        } else if let Some(chunk_group) = chunk_group_info.chunk_groups.get_index(idx) {
106            Either::Right(chunk_group.entries())
107        } else {
108            Either::Left(Either::Right(std::iter::empty()))
109        }
110    }
111
112    pub fn get_batch_group(
113        &self,
114        module_or_batch: &ModuleOrBatch,
115    ) -> Option<ResolvedVc<ModuleBatchGroup>> {
116        self.batch_groups.get(module_or_batch).copied()
117    }
118
119    pub async fn get_entry(&self, entry: ResolvedVc<Box<dyn Module>>) -> Result<ModuleOrBatch> {
120        let entry = self.get_entry_index(entry).await?;
121        Ok(*self.graph.node_weight(entry).unwrap())
122    }
123
124    // Clippy complains but there's a type error without the bound
125    #[allow(clippy::implied_bounds_in_impls)]
126    /// Traverses all reachable edges in topological order. The preorder visitor can be used to
127    /// forward state down the graph, and to skip subgraphs
128    ///
129    /// Use this to collect batches/modules in evaluation order.
130    ///
131    /// Target nodes can be revisited (once per incoming edge).
132    /// Edges are traversed in normal order, so should correspond to reference order.
133    ///
134    /// * `entry` - The entry module to start the traversal from
135    /// * `state` - The state to be passed to the visitors
136    /// * `visit_preorder` - Called before visiting the children of a node.
137    ///    - Receives: (originating &ModuleBatchesGraphNode, edge &ChunkingType), target
138    ///      &ModuleBatchesGraphNode, state &S
139    ///    - Can return [GraphTraversalAction]s to control the traversal
140    /// * `visit_postorder` - Called after visiting the children of a node. Return
141    ///    - Receives: (originating &ModuleBatchesGraphNode, edge &ChunkingType), target
142    ///      &ModuleBatchesGraphNode, state &S
143    pub fn traverse_edges_from_entries_topological<'a, S>(
144        &'a self,
145        entries: impl IntoIterator<
146            Item = NodeIndex,
147            IntoIter = impl Iterator<Item = NodeIndex> + DoubleEndedIterator,
148        >,
149        state: &mut S,
150        mut visit_preorder: impl FnMut(
151            Option<(&'a ModuleOrBatch, &'a ModuleBatchesGraphEdge)>,
152            &'a ModuleOrBatch,
153            &mut S,
154        ) -> Result<GraphTraversalAction>,
155        mut visit_postorder: impl FnMut(
156            Option<(&'a ModuleOrBatch, &'a ModuleBatchesGraphEdge)>,
157            &'a ModuleOrBatch,
158            &mut S,
159        ),
160    ) -> Result<()> {
161        let graph = &self.graph;
162
163        enum ReverseTopologicalPass {
164            Visit,
165            ExpandAndVisit,
166        }
167
168        let entries = entries.into_iter();
169        #[allow(clippy::type_complexity)] // This is a temporary internal structure
170        let mut stack: Vec<(
171            ReverseTopologicalPass,
172            Option<(NodeIndex, EdgeIndex)>,
173            NodeIndex,
174        )> = entries
175            .rev()
176            .map(|e| (ReverseTopologicalPass::ExpandAndVisit, None, e))
177            .collect();
178        let mut expanded = FxHashSet::default();
179        while let Some((pass, parent, current)) = stack.pop() {
180            let parent_arg = parent.map(|(node, edge)| {
181                (
182                    graph.node_weight(node).unwrap(),
183                    graph.edge_weight(edge).unwrap(),
184                )
185            });
186            match pass {
187                ReverseTopologicalPass::Visit => {
188                    let current_node = graph.node_weight(current).unwrap();
189                    visit_postorder(parent_arg, current_node, state);
190                }
191                ReverseTopologicalPass::ExpandAndVisit => {
192                    let current_node = graph.node_weight(current).unwrap();
193                    let action = visit_preorder(parent_arg, current_node, state)?;
194                    if action == GraphTraversalAction::Exclude {
195                        continue;
196                    }
197                    stack.push((ReverseTopologicalPass::Visit, parent, current));
198                    if action == GraphTraversalAction::Continue && expanded.insert(current) {
199                        stack.extend(iter_neighbors_rev(graph, current).map(|(edge, child)| {
200                            (
201                                ReverseTopologicalPass::ExpandAndVisit,
202                                Some((current, edge)),
203                                child,
204                            )
205                        }));
206                    }
207                }
208            }
209        }
210
211        Ok(())
212    }
213}
214
215type PreBatchIndex = usize;
216
217#[derive(Hash, PartialEq, Eq, Clone, Debug)]
218enum PreBatchItem {
219    ParallelModule(ResolvedVc<Box<dyn Module>>),
220    ParallelReference(PreBatchIndex),
221    NonParallelEdge(ChunkingType, ResolvedVc<Box<dyn Module>>),
222}
223
224struct PreBatch {
225    items: FxIndexSet<PreBatchItem>,
226    chunk_groups: RoaringBitmapWrapper,
227}
228
229impl PreBatch {
230    fn new(chunk_groups: RoaringBitmapWrapper) -> Self {
231        Self {
232            items: FxIndexSet::default(),
233            chunk_groups,
234        }
235    }
236}
237
238struct TraversalState<'l> {
239    items: Vec<PreBatchItem>,
240    this: &'l mut PreBatches,
241}
242
243struct PreBatches {
244    boundary_modules: FxHashSet<ResolvedVc<Box<dyn Module>>>,
245    batches: Vec<PreBatch>,
246    entries: FxHashMap<ResolvedVc<Box<dyn Module>>, PreBatchIndex>,
247    single_module_entries: FxIndexSet<ResolvedVc<Box<dyn Module>>>,
248}
249
250impl PreBatches {
251    fn new() -> Self {
252        Self {
253            boundary_modules: FxHashSet::default(),
254            batches: Vec::new(),
255            entries: FxHashMap::default(),
256            single_module_entries: FxIndexSet::default(),
257        }
258    }
259
260    fn ensure_pre_batch_for_module(
261        &mut self,
262        module: ResolvedVc<Box<dyn Module>>,
263        chunk_group_info: &ChunkGroupInfo,
264        queue: &mut VecDeque<(ResolvedVc<Box<dyn Module>>, PreBatchIndex)>,
265    ) -> Result<PreBatchIndex> {
266        Ok(match self.entries.entry(module) {
267            Entry::Vacant(e) => {
268                let index = self.batches.len();
269                queue.push_back((module, index));
270                let chunk_groups = chunk_group_info
271                    .module_chunk_groups
272                    .get(&module)
273                    .context("all modules need to have chunk group info")?;
274                let batch = PreBatch::new(chunk_groups.clone());
275                self.batches.push(batch);
276                e.insert(index);
277                index
278            }
279            Entry::Occupied(e) => *e.get(),
280        })
281    }
282
283    async fn get_pre_batch_items(
284        &mut self,
285        entry: ResolvedVc<Box<dyn Module>>,
286        chunk_group_info: &ChunkGroupInfo,
287        module_graph: &ModuleGraph,
288        queue: &mut VecDeque<(ResolvedVc<Box<dyn Module>>, PreBatchIndex)>,
289    ) -> Result<Vec<PreBatchItem>> {
290        let mut state = TraversalState {
291            items: Vec::new(),
292            this: self,
293        };
294        let mut visited = FxHashSet::default();
295        module_graph
296            .traverse_edges_from_entries_topological(
297                std::iter::once(ResolvedVc::upcast(entry)),
298                &mut state,
299                |parent_info, node, state| {
300                    let ty = parent_info.map_or(
301                        &ChunkingType::Parallel {
302                            inherit_async: false,
303                            hoisted: false,
304                        },
305                        |(_, ty)| ty,
306                    );
307                    let module = node.module;
308                    if !ty.is_parallel() {
309                        state.items.push(PreBatchItem::NonParallelEdge(
310                            ty.without_inherit_async(),
311                            module,
312                        ));
313                        return Ok(GraphTraversalAction::Exclude);
314                    }
315                    if visited.insert(module) {
316                        if parent_info.is_some() && state.this.boundary_modules.contains(&module) {
317                            let idx = state.this.ensure_pre_batch_for_module(
318                                module,
319                                chunk_group_info,
320                                queue,
321                            )?;
322                            state.items.push(PreBatchItem::ParallelReference(idx));
323                            return Ok(GraphTraversalAction::Exclude);
324                        }
325                        Ok(GraphTraversalAction::Continue)
326                    } else {
327                        Ok(GraphTraversalAction::Exclude)
328                    }
329                },
330                |_, node, state| {
331                    let item = PreBatchItem::ParallelModule(node.module);
332                    state.items.push(item);
333                },
334            )
335            .await?;
336        Ok(state.items)
337    }
338}
339
340pub async fn compute_module_batches(
341    module_graph: Vc<ModuleGraph>,
342    _config: &BatchingConfig,
343) -> Result<Vc<ModuleBatchesGraph>> {
344    let outer_span = tracing::info_span!(
345        "compute module batches",
346        initial_pre_batch_items = tracing::field::Empty,
347        initial_pre_batches = tracing::field::Empty,
348        extracted_shared_items = tracing::field::Empty,
349        batches = tracing::field::Empty,
350        modules = tracing::field::Empty,
351        edges = tracing::field::Empty
352    );
353    let span = outer_span.clone();
354    async move {
355        let chunk_group_info = module_graph.chunk_group_info().await?;
356        let module_graph = module_graph.await?;
357
358        let mut pre_batches = PreBatches::new();
359
360        // Walk the module graph and mark all modules that are boundary modules (referenced from a
361        // different chunk group bitmap)
362        module_graph
363            .traverse_all_edges_unordered(|(parent, ty), node| {
364                let std::collections::hash_set::Entry::Vacant(entry) =
365                    pre_batches.boundary_modules.entry(node.module)
366                else {
367                    // Already a boundary module, can skip check
368                    return Ok(());
369                };
370                if ty.is_parallel() {
371                    let parent_chunk_groups = chunk_group_info
372                        .module_chunk_groups
373                        .get(&parent.module)
374                        .context("all modules need to have chunk group info")?;
375                    let chunk_groups = chunk_group_info
376                        .module_chunk_groups
377                        .get(&node.module)
378                        .context("all modules need to have chunk group info")?;
379                    if parent_chunk_groups != chunk_groups {
380                        // This is a boundary module
381                        entry.insert();
382                    }
383                } else {
384                    entry.insert();
385                }
386                Ok(())
387            })
388            .await?;
389
390        // All entries are boundary modules too
391        for chunk_group in &chunk_group_info.chunk_groups {
392            for entry in chunk_group.entries() {
393                pre_batches.boundary_modules.insert(entry);
394            }
395        }
396
397        // Pre batches would be incorrect with cycles, so we need to opt-out of pre batches for
398        // cycles that include boundary modules
399        module_graph
400            .traverse_cycles(
401                |ty| ty.is_parallel(),
402                |cycle| {
403                    if cycle
404                        .iter()
405                        .any(|node| pre_batches.boundary_modules.contains(&node.module))
406                    {
407                        pre_batches
408                            .boundary_modules
409                            .extend(cycle.iter().map(|node| node.module));
410                    }
411                },
412            )
413            .await?;
414
415        let mut queue: VecDeque<(ResolvedVc<Box<dyn Module>>, PreBatchIndex)> = VecDeque::new();
416
417        let mut chunk_group_indicies_with_merged_children = FxHashSet::default();
418
419        // Start with the entries
420        for chunk_group in &chunk_group_info.chunk_groups {
421            for entry in chunk_group.entries() {
422                if let Some(chunkable_module) = ResolvedVc::try_downcast(entry) {
423                    pre_batches.ensure_pre_batch_for_module(
424                        chunkable_module,
425                        &chunk_group_info,
426                        &mut queue,
427                    )?;
428                } else {
429                    pre_batches.single_module_entries.insert(entry);
430                }
431            }
432            if let Some(parent) = chunk_group.get_merged_parent() {
433                chunk_group_indicies_with_merged_children.insert(parent);
434            }
435        }
436
437        let mut initial_pre_batch_items = 0;
438        // Fill all pre batches
439        while let Some((chunkable_module, idx)) = queue.pop_front() {
440            let items = pre_batches
441                .get_pre_batch_items(
442                    chunkable_module,
443                    &chunk_group_info,
444                    &module_graph,
445                    &mut queue,
446                )
447                .await?;
448            initial_pre_batch_items += items.len();
449            let batch = &mut pre_batches.batches[idx];
450            batch.items.extend(items);
451        }
452        span.record("initial_pre_batch_items", initial_pre_batch_items);
453        span.record("initial_pre_batches", pre_batches.batches.len());
454
455        // Figure out the order of all merged groups
456        let mut ordered_entries: Vec<Option<EntriesList>> =
457            vec![None; chunk_group_info.chunk_groups.len()];
458        for (i, chunk_group) in chunk_group_info.chunk_groups.iter().enumerate() {
459            if !chunk_group_indicies_with_merged_children.contains(&i) {
460                continue;
461            }
462            let mut merged_modules: FxHashMap<ChunkingType, FxIndexSet<_>> = FxHashMap::default();
463            let mut stack = ordered_entries[i]
464                .as_ref()
465                .map_or_else(
466                    || Either::Left(chunk_group.entries()),
467                    |v| Either::Right(v.iter().copied()),
468                )
469                .filter_map(|module| {
470                    if let Some(chunkable_module) = ResolvedVc::try_downcast(module) {
471                        let idx = *pre_batches.entries.get(&chunkable_module).unwrap();
472                        Some((idx, 0))
473                    } else {
474                        None
475                    }
476                })
477                .collect::<Vec<_>>();
478            stack.reverse();
479            let mut visited = FxHashSet::default();
480            while let Some((idx, mut pos)) = stack.pop() {
481                let batch = &pre_batches.batches[idx];
482                while let Some(item) = batch.items.get_index(pos) {
483                    match item {
484                        PreBatchItem::ParallelModule(_) => {}
485                        PreBatchItem::ParallelReference(other_idx) => {
486                            if visited.insert(*other_idx) {
487                                stack.push((idx, pos + 1));
488                                stack.push((*other_idx, 0));
489                                break;
490                            }
491                        }
492                        PreBatchItem::NonParallelEdge(chunking_type, module) => {
493                            if chunking_type.is_merged() {
494                                merged_modules
495                                    .entry(chunking_type.clone())
496                                    .or_default()
497                                    .insert(*module);
498                            }
499                        }
500                    }
501                    pos += 1;
502                }
503            }
504            if !merged_modules.is_empty() {
505                for (ty, merged_modules) in merged_modules {
506                    let chunk_group_key = match ty {
507                        ChunkingType::Isolated {
508                            merge_tag: Some(merge_tag),
509                            ..
510                        } => ChunkGroupKey::IsolatedMerged {
511                            parent: i.into(),
512                            merge_tag: merge_tag.clone(),
513                        },
514                        ChunkingType::Shared {
515                            merge_tag: Some(merge_tag),
516                            ..
517                        } => ChunkGroupKey::SharedMerged {
518                            parent: i.into(),
519                            merge_tag: merge_tag.clone(),
520                        },
521                        _ => unreachable!(),
522                    };
523                    let idx = chunk_group_info
524                        .chunk_group_keys
525                        .get_index_of(&chunk_group_key)
526                        .unwrap();
527                    ordered_entries[idx] = Some(merged_modules);
528                }
529            }
530        }
531
532        // Create a map of parallel module to the batches they are contained in.
533        let mut parallel_module_to_pre_batch: FxIndexMap<_, Vec<PreBatchIndex>> =
534            FxIndexMap::default();
535
536        // Fill the map and also fill up the single_module_entries
537        for (idx, pre_batch) in pre_batches.batches.iter().enumerate() {
538            for item in &pre_batch.items {
539                match item {
540                    PreBatchItem::ParallelModule(module) => {
541                        parallel_module_to_pre_batch
542                            .entry(*module)
543                            .or_default()
544                            .push(idx);
545                    }
546                    PreBatchItem::NonParallelEdge(_, module) => {
547                        if let Some(chunkable_module) = ResolvedVc::try_downcast(*module) {
548                            if !pre_batches.entries.contains_key(&chunkable_module) {
549                                pre_batches.single_module_entries.insert(*module);
550                            }
551                        } else {
552                            pre_batches.single_module_entries.insert(*module);
553                        }
554                    }
555                    PreBatchItem::ParallelReference(_) => {}
556                }
557            }
558        }
559
560        // We never want a module to occur in multiple batches.
561
562        let mut extracted_shared_items = 0;
563        // Extract shared modules into separate batches
564        for i in 0..parallel_module_to_pre_batch.len() {
565            let (&module, batches) = parallel_module_to_pre_batch.get_index(i).unwrap();
566            if batches.len() > 1 {
567                // Create a new batch for the shared modules
568                let batches_with_item_index = batches
569                    .iter()
570                    .map(|&idx| {
571                        let batch_items = &pre_batches.batches[idx].items;
572                        let item_idx = batch_items
573                            .get_index_of(&PreBatchItem::ParallelModule(module))
574                            .unwrap();
575                        (idx, item_idx)
576                    })
577                    .collect::<Vec<_>>();
578                let mut selected_items = 1;
579                fn get_item_at(
580                    pre_batches: &PreBatches,
581                    batch_idx: PreBatchIndex,
582                    item_idx: usize,
583                ) -> Option<&PreBatchItem> {
584                    pre_batches.batches[batch_idx].items.get_index(item_idx)
585                }
586                // Select more matching items that are equal in all batches that contain the shared
587                // module(s)
588                loop {
589                    if let Some(PreBatchItem::ParallelModule(next_module)) = get_item_at(
590                        &pre_batches,
591                        batches_with_item_index[0].0,
592                        batches_with_item_index[0].1 + selected_items,
593                    ) {
594                        if parallel_module_to_pre_batch.get(next_module).unwrap().len()
595                            == batches.len()
596                            && batches_with_item_index[1..]
597                                .iter()
598                                .all(|&(batch_idx, item_idx)| {
599                                    get_item_at(&pre_batches, batch_idx, item_idx + selected_items)
600                                        == Some(&PreBatchItem::ParallelModule(*next_module))
601                                })
602                        {
603                            selected_items += 1;
604                            continue;
605                        }
606                    }
607                    break;
608                }
609                extracted_shared_items += selected_items;
610
611                // Check if a batch is completely selected. In that case we can replace all other
612                // occurences with a reference to that batch
613                let exact_match = batches_with_item_index
614                    .iter()
615                    .find(|&&(batch_idx, item_idx)| {
616                        item_idx == 0
617                            && pre_batches.batches[batch_idx].items.len() == selected_items
618                    });
619                if let Some(&(exact_match, _)) = exact_match {
620                    // Replace all other occurences with a reference to the exact match
621                    for &(batch_index, item_start) in batches_with_item_index.iter() {
622                        if batch_index != exact_match {
623                            pre_batches.batches[batch_index].items.splice(
624                                item_start..item_start + selected_items,
625                                std::iter::once(PreBatchItem::ParallelReference(exact_match)),
626                            );
627                        }
628                    }
629                    for item in pre_batches.batches[exact_match].items.iter() {
630                        if let PreBatchItem::ParallelModule(module) = item {
631                            parallel_module_to_pre_batch
632                                .get_mut(module)
633                                .unwrap()
634                                .clear();
635                        }
636                    }
637                } else {
638                    // Create a new batch of the shared part and replace all occurences with a
639                    // reference to that batch
640                    let first_batch_index = batches_with_item_index[0].0;
641                    let first_batch_item_index = batches_with_item_index[0].1;
642                    let new_batch_index = pre_batches.batches.len();
643                    let mut new_batch =
644                        PreBatch::new(pre_batches.batches[first_batch_index].chunk_groups.clone());
645                    new_batch
646                        .items
647                        .extend(pre_batches.batches[first_batch_index].items.splice(
648                            first_batch_item_index..first_batch_item_index + selected_items,
649                            std::iter::once(PreBatchItem::ParallelReference(new_batch_index)),
650                        ));
651                    for item in new_batch.items.iter() {
652                        if let PreBatchItem::ParallelModule(module) = item {
653                            parallel_module_to_pre_batch
654                                .get_mut(module)
655                                .unwrap()
656                                .clear();
657                        }
658                    }
659                    pre_batches.batches.push(new_batch);
660                    for &(batch_index, item_start) in batches_with_item_index[1..].iter() {
661                        pre_batches.batches[batch_index].items.splice(
662                            item_start..item_start + selected_items,
663                            std::iter::once(PreBatchItem::ParallelReference(new_batch_index)),
664                        );
665                    }
666                }
667            }
668        }
669        span.record("extracted_shared_items", extracted_shared_items);
670
671        // Now every module is only in one batch
672
673        let mut edges_count = 0;
674
675        // Since batches can only have references followed by a list of parallel chunkable modules,
676        // we need to split batches that have modules before references.
677        for i in 0..pre_batches.batches.len() {
678            let items = take(&mut pre_batches.batches[i].items);
679            let mut new_items =
680                FxIndexSet::with_capacity_and_hasher(items.len(), Default::default());
681            enum Mode {
682                ParallelChunkableModule,
683                Other,
684            }
685            let mut mode = Mode::Other;
686            for item in items {
687                let chunkable_module = if let PreBatchItem::ParallelModule(module) = &item {
688                    ResolvedVc::try_downcast::<Box<dyn ChunkableModule>>(*module)
689                } else {
690                    None
691                };
692                let item = if let PreBatchItem::ParallelModule(module) = item {
693                    if chunkable_module.is_some() {
694                        PreBatchItem::ParallelModule(module)
695                    } else {
696                        pre_batches.single_module_entries.insert(module);
697                        PreBatchItem::NonParallelEdge(
698                            ChunkingType::Parallel {
699                                inherit_async: false,
700                                hoisted: false,
701                            },
702                            module,
703                        )
704                    }
705                } else {
706                    item
707                };
708                match (&mode, chunkable_module) {
709                    (_, Some(_)) => {
710                        mode = Mode::ParallelChunkableModule;
711                        new_items.insert(item);
712                    }
713                    (Mode::Other, _) => {
714                        edges_count += 1;
715                        new_items.insert(item);
716                    }
717                    (Mode::ParallelChunkableModule, _) => {
718                        // Split the batch
719                        let idx = pre_batches.batches.len();
720                        let mut new_batch =
721                            PreBatch::new(pre_batches.batches[i].chunk_groups.clone());
722                        new_batch.items.extend(new_items.drain(..));
723                        pre_batches.batches.push(new_batch);
724                        edges_count += 1;
725                        new_items.insert(PreBatchItem::ParallelReference(idx));
726                        if chunkable_module.is_some() {
727                            new_items.insert(item);
728                        } else {
729                            edges_count += 1;
730                            mode = Mode::Other;
731                            new_items.insert(item);
732                        }
733                    }
734                }
735            }
736            pre_batches.batches[i].items = new_items;
737        }
738        span.record("pre_batches", pre_batches.batches.len());
739
740        // Now batches are in the correct shape. We can create the real batches and the graph.
741
742        // Create the graph
743        let mut graph: DiGraph<ModuleOrBatch, ModuleBatchesGraphEdge, u32> =
744            petgraph::graph::DiGraph::with_capacity(
745                pre_batches.batches.len() + pre_batches.single_module_entries.len(),
746                edges_count,
747            );
748
749        // Create the Vc<ModuleBatch> instances
750        let batches = pre_batches
751            .batches
752            .iter_mut()
753            .enumerate()
754            .map(async |(i, pre_batch)| {
755                let mut modules = pre_batch.items.iter().filter_map(|item| {
756                    if let PreBatchItem::ParallelModule(module) = item {
757                        ResolvedVc::try_downcast(*module)
758                    } else {
759                        None
760                    }
761                });
762                let Some(first) = modules.next() else {
763                    return Ok(ModuleOrBatch::None(i));
764                };
765                if let Some(second) = modules.next() {
766                    let batch = ModuleBatch::new(
767                        [first, second]
768                            .into_iter()
769                            .chain(modules)
770                            .map(|m| *m)
771                            .collect::<Vec<_>>(),
772                        Some(pre_batch.chunk_groups.clone()),
773                    );
774                    Ok(ModuleOrBatch::Batch(batch.to_resolved().await?))
775                } else {
776                    Ok(ModuleOrBatch::Module(ResolvedVc::upcast(first)))
777                }
778            })
779            .try_join()
780            .await?;
781
782        // Create the batch groups by grouping batches with the same chunk groups
783        let mut batch_groups: FxHashMap<_, Vec<_>> = FxHashMap::default();
784        for (i, pre_batch) in pre_batches.batches.iter().enumerate() {
785            let key =
786                BuildHasherDefault::<FxHasher>::default().prehash(pre_batch.chunk_groups.clone());
787            let batch = batches[i];
788            batch_groups.entry(key).or_default().push(batch);
789        }
790        for &module in &pre_batches.single_module_entries {
791            let chunk_groups = chunk_group_info
792                .module_chunk_groups
793                .get(&module)
794                .context("all modules need to have chunk group info")?;
795            let key = BuildHasherDefault::<FxHasher>::default().prehash(chunk_groups.clone());
796            batch_groups
797                .entry(key)
798                .or_default()
799                .push(ModuleOrBatch::Module(module));
800        }
801
802        // Create the batch group instances
803        let batch_groups = batch_groups
804            .into_iter()
805            .map(async |(key, items)| {
806                if items.len() == 1 {
807                    Ok(Either::Left(std::iter::empty()))
808                } else {
809                    let batch_group = ModuleBatchGroup::new(items.clone(), (*key).clone())
810                        .to_resolved()
811                        .await?;
812                    Ok(Either::Right(
813                        items.into_iter().map(move |item| (item, batch_group)),
814                    ))
815                }
816            })
817            .try_join()
818            .await?
819            .into_iter()
820            .flatten()
821            .collect::<FxHashMap<_, _>>();
822
823        // Insert batches into the graph and store the NodeIndicies
824        let mut batches_count = 0;
825        let mut modules_count = 0;
826        let batch_indicies = batches
827            .into_iter()
828            .map(|batch| {
829                match &batch {
830                    ModuleOrBatch::Batch(_) => batches_count += 1,
831                    ModuleOrBatch::Module(_) => modules_count += 1,
832                    ModuleOrBatch::None(_) => {}
833                }
834                graph.add_node(batch)
835            })
836            .collect::<Vec<_>>();
837
838        // Also insert single modules into the graph and store the NodeIndicies
839        let single_module_indicies = pre_batches
840            .single_module_entries
841            .iter()
842            .map(|module| graph.add_node(ModuleOrBatch::Module(*module)))
843            .collect::<Vec<_>>();
844
845        span.record("batches", batches_count);
846        modules_count += pre_batches.single_module_entries.len();
847        span.record("modules", modules_count);
848        span.record("edges", edges_count);
849
850        // Add all the edges to the graph
851        for (i, pre_batch) in pre_batches.batches.into_iter().enumerate() {
852            let index = batch_indicies[i];
853            let items = pre_batch.items;
854            for item in items {
855                match item {
856                    PreBatchItem::ParallelReference(idx) => {
857                        graph.add_edge(
858                            index,
859                            batch_indicies[idx],
860                            ModuleBatchesGraphEdge {
861                                ty: ChunkingType::Parallel {
862                                    inherit_async: false,
863                                    hoisted: false,
864                                },
865                                module: None,
866                            },
867                        );
868                    }
869                    PreBatchItem::NonParallelEdge(ty, module) => {
870                        if let Some(chunkable_module) = ResolvedVc::try_downcast(module) {
871                            if let Some(batch) = pre_batches.entries.get(&chunkable_module).copied()
872                            {
873                                graph.add_edge(
874                                    index,
875                                    batch_indicies[batch],
876                                    ModuleBatchesGraphEdge {
877                                        ty,
878                                        module: Some(module),
879                                    },
880                                );
881                                continue;
882                            }
883                        }
884                        let idx = pre_batches
885                            .single_module_entries
886                            .get_index_of(&module)
887                            .unwrap();
888                        let idx = single_module_indicies[idx];
889                        graph.add_edge(
890                            index,
891                            idx,
892                            ModuleBatchesGraphEdge {
893                                ty,
894                                module: Some(module),
895                            },
896                        );
897                    }
898                    PreBatchItem::ParallelModule(_) => {}
899                }
900            }
901        }
902
903        debug_assert_eq!(graph.capacity().0, graph.node_count());
904        debug_assert_eq!(graph.capacity().1, graph.edge_count());
905
906        // Find the NodeIndicies for our entries of the graph
907        let mut entries = FxHashMap::default();
908        for chunk_group in &chunk_group_info.chunk_groups {
909            for module in chunk_group.entries() {
910                if let Some(chunkable_module) = ResolvedVc::try_downcast(module) {
911                    if let Some(batch) = pre_batches.entries.get(&chunkable_module).copied() {
912                        entries.insert(module, batch_indicies[batch]);
913                        continue;
914                    }
915                }
916                let idx = pre_batches
917                    .single_module_entries
918                    .get_index_of(&module)
919                    .unwrap();
920                let idx = single_module_indicies[idx];
921                entries.insert(module, idx);
922            }
923        }
924
925        Ok(ModuleBatchesGraph {
926            graph: TracedDiGraph(graph),
927            entries,
928            batch_groups,
929            ordered_entries,
930        }
931        .cell())
932    }
933    .instrument(outer_span)
934    .await
935}