turbopack_core/module_graph/
module_batches.rs

1use std::{
2    collections::{VecDeque, hash_map::Entry},
3    hash::BuildHasherDefault,
4    mem::take,
5};
6
7use anyhow::{Context, Result, bail};
8use bincode::{Decode, Encode};
9use either::Either;
10use petgraph::graph::{DiGraph, EdgeIndex, NodeIndex};
11use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
12use serde::{Deserialize, Serialize};
13use tracing::Instrument;
14use turbo_prehash::BuildHasherExt;
15use turbo_tasks::{
16    FxIndexMap, FxIndexSet, NonLocalValue, ResolvedVc, TaskInput, TryJoinIterExt, ValueToString,
17    Vc, trace::TraceRawVcs,
18};
19
20use crate::{
21    chunk::{ChunkableModule, ChunkingType},
22    module::Module,
23    module_graph::{
24        GraphTraversalAction, ModuleGraph, ModuleGraphRef,
25        chunk_group_info::{ChunkGroupInfo, ChunkGroupKey, RoaringBitmapWrapper},
26        module_batch::{ModuleBatch, ModuleBatchGroup, ModuleOrBatch},
27        traced_di_graph::{TracedDiGraph, iter_neighbors_rev},
28    },
29};
30#[turbo_tasks::value]
31#[derive(Debug, Clone, Default, TaskInput, Hash)]
32pub struct BatchingConfig {
33    /// Use a heuristic based on the module path to create batches. It aims for batches of a good
34    /// size.
35    pub use_heuristic: bool,
36}
37
38#[turbo_tasks::value_impl]
39impl BatchingConfig {
40    #[turbo_tasks::function]
41    pub fn new(config: BatchingConfig) -> Vc<Self> {
42        config.cell()
43    }
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize, TraceRawVcs, NonLocalValue)]
47pub struct ModuleBatchesGraphEdge {
48    pub ty: ChunkingType,
49    pub module: Option<ResolvedVc<Box<dyn Module>>>,
50}
51
52#[derive(Debug, Clone, TraceRawVcs, NonLocalValue, Encode, Decode)]
53struct EntriesList(
54    #[bincode(with = "turbo_bincode::indexset")] pub FxIndexSet<ResolvedVc<Box<dyn Module>>>,
55);
56
57#[turbo_tasks::value(cell = "new", eq = "manual")]
58pub struct ModuleBatchesGraph {
59    graph: TracedDiGraph<ModuleOrBatch, ModuleBatchesGraphEdge>,
60
61    // NodeIndex isn't necessarily stable (because of swap_remove), but we never remove nodes.
62    //
63    // HashMaps have nondeterministic order, but this map is only used for lookups and not
64    // iteration.
65    //
66    // This contains Vcs, but they are already contained in the graph, so no need to trace this.
67    #[turbo_tasks(trace_ignore)]
68    #[bincode(with_serde)]
69    entries: FxHashMap<ResolvedVc<Box<dyn Module>>, NodeIndex>,
70    batch_groups: FxHashMap<ModuleOrBatch, ResolvedVc<ModuleBatchGroup>>,
71
72    /// For chunk groups where the postorder of entries is different than the order of the
73    /// `ChunkGroup::entries()` this contains Some with the postorder list of entries of that chunk
74    /// group. The index in this list corresponds to the index in the
75    /// chunk_group_info.chunk_groups.
76    ordered_entries: Vec<Option<EntriesList>>,
77}
78
79impl ModuleBatchesGraph {
80    pub async fn get_entry_index(&self, entry: ResolvedVc<Box<dyn Module>>) -> Result<NodeIndex> {
81        let Some(entry) = self.entries.get(&entry) else {
82            bail!(
83                "Entry {} is not in graph (possible entries: {:#?})",
84                entry.ident().to_string().await?,
85                self.entries
86                    .keys()
87                    .map(|e| e.ident().to_string())
88                    .try_join()
89                    .await?
90            );
91        };
92        Ok(*entry)
93    }
94
95    pub fn get_ordered_entries<'l>(
96        &'l self,
97        chunk_group_info: &'l ChunkGroupInfo,
98        idx: usize,
99    ) -> impl Iterator<Item = ResolvedVc<Box<dyn Module>>> + 'l {
100        if let Some(EntriesList(ordered_entries)) = self
101            .ordered_entries
102            .get(idx)
103            .as_ref()
104            .and_then(|o| o.as_ref())
105        {
106            if let Some(chunk_group) = chunk_group_info.chunk_groups.get_index(idx) {
107                debug_assert_eq!(ordered_entries.len(), chunk_group.entries_count());
108            }
109            Either::Left(Either::Left(ordered_entries.iter().copied()))
110        } else if let Some(chunk_group) = chunk_group_info.chunk_groups.get_index(idx) {
111            Either::Right(chunk_group.entries())
112        } else {
113            Either::Left(Either::Right(std::iter::empty()))
114        }
115    }
116
117    pub fn get_batch_group(
118        &self,
119        module_or_batch: &ModuleOrBatch,
120    ) -> Option<ResolvedVc<ModuleBatchGroup>> {
121        self.batch_groups.get(module_or_batch).copied()
122    }
123
124    pub async fn get_entry(&self, entry: ResolvedVc<Box<dyn Module>>) -> Result<ModuleOrBatch> {
125        let entry = self.get_entry_index(entry).await?;
126        Ok(*self.graph.node_weight(entry).unwrap())
127    }
128
129    // Clippy complains but there's a type error without the bound
130    #[allow(clippy::implied_bounds_in_impls)]
131    /// Traverses all reachable edges in dfs order. The preorder visitor can be used to
132    /// forward state down the graph, and to skip subgraphs
133    ///
134    /// Use this to collect batches/modules in evaluation order.
135    ///
136    /// Target nodes can be revisited (once per incoming edge).
137    /// Edges are traversed in normal order, so should correspond to reference order.
138    ///
139    /// * `entries` - The entry modules to start the traversal from
140    /// * `state` - The state to be passed to the visitors
141    /// * `visit_preorder` - Called before visiting the children of a node.
142    ///    - Receives: (originating &ModuleBatchesGraphNode, edge &ChunkingType), target
143    ///      &ModuleBatchesGraphNode, state &S
144    ///    - Can return [GraphTraversalAction]s to control the traversal
145    /// * `visit_postorder` - Called after visiting the children of a node. Return
146    ///    - Receives: (originating &ModuleBatchesGraphNode, edge &ChunkingType), target
147    ///      &ModuleBatchesGraphNode, state &S
148    pub fn traverse_edges_from_entries_dfs<'a, S>(
149        &'a self,
150        entries: impl IntoIterator<
151            Item = NodeIndex,
152            IntoIter = impl Iterator<Item = NodeIndex> + DoubleEndedIterator,
153        >,
154        state: &mut S,
155        mut visit_preorder: impl FnMut(
156            Option<(&'a ModuleOrBatch, &'a ModuleBatchesGraphEdge)>,
157            &'a ModuleOrBatch,
158            &mut S,
159        ) -> Result<GraphTraversalAction>,
160        mut visit_postorder: impl FnMut(
161            Option<(&'a ModuleOrBatch, &'a ModuleBatchesGraphEdge)>,
162            &'a ModuleOrBatch,
163            &mut S,
164        ),
165    ) -> Result<()> {
166        let graph = &self.graph;
167
168        enum ReverseDFSPass {
169            Visit,
170            ExpandAndVisit,
171        }
172
173        let entries = entries.into_iter();
174        #[allow(clippy::type_complexity)] // This is a temporary internal structure
175        let mut stack: Vec<(ReverseDFSPass, Option<(NodeIndex, EdgeIndex)>, NodeIndex)> = entries
176            .rev()
177            .map(|e| (ReverseDFSPass::ExpandAndVisit, None, e))
178            .collect();
179        let mut expanded = FxHashSet::default();
180        while let Some((pass, parent, current)) = stack.pop() {
181            let parent_arg = parent.map(|(node, edge)| {
182                (
183                    graph.node_weight(node).unwrap(),
184                    graph.edge_weight(edge).unwrap(),
185                )
186            });
187            match pass {
188                ReverseDFSPass::Visit => {
189                    let current_node = graph.node_weight(current).unwrap();
190                    visit_postorder(parent_arg, current_node, state);
191                }
192                ReverseDFSPass::ExpandAndVisit => {
193                    let current_node = graph.node_weight(current).unwrap();
194                    let action = visit_preorder(parent_arg, current_node, state)?;
195                    if action == GraphTraversalAction::Exclude {
196                        continue;
197                    }
198                    stack.push((ReverseDFSPass::Visit, parent, current));
199                    if action == GraphTraversalAction::Continue && expanded.insert(current) {
200                        stack.extend(iter_neighbors_rev(graph, current).map(|(edge, child)| {
201                            (ReverseDFSPass::ExpandAndVisit, Some((current, edge)), child)
202                        }));
203                    }
204                }
205            }
206        }
207
208        Ok(())
209    }
210}
211
212type PreBatchIndex = usize;
213
214#[derive(Hash, PartialEq, Eq, Clone, Debug)]
215enum PreBatchItem {
216    ParallelModule(ResolvedVc<Box<dyn Module>>),
217    ParallelReference(PreBatchIndex),
218    NonParallelEdge(ChunkingType, ResolvedVc<Box<dyn Module>>),
219}
220
221struct PreBatch {
222    items: FxIndexSet<PreBatchItem>,
223    chunk_groups: RoaringBitmapWrapper,
224}
225
226impl PreBatch {
227    fn new(chunk_groups: RoaringBitmapWrapper) -> Self {
228        Self {
229            items: FxIndexSet::default(),
230            chunk_groups,
231        }
232    }
233}
234
235struct TraversalState<'l> {
236    items: Vec<PreBatchItem>,
237    this: &'l mut PreBatches,
238}
239
240struct PreBatches {
241    boundary_modules: FxHashSet<ResolvedVc<Box<dyn Module>>>,
242    batches: Vec<PreBatch>,
243    entries: FxHashMap<ResolvedVc<Box<dyn Module>>, PreBatchIndex>,
244    single_module_entries: FxIndexSet<ResolvedVc<Box<dyn Module>>>,
245}
246
247impl PreBatches {
248    fn new() -> Self {
249        Self {
250            boundary_modules: FxHashSet::default(),
251            batches: Vec::new(),
252            entries: FxHashMap::default(),
253            single_module_entries: FxIndexSet::default(),
254        }
255    }
256
257    fn ensure_pre_batch_for_module(
258        &mut self,
259        module: ResolvedVc<Box<dyn Module>>,
260        chunk_group_info: &ChunkGroupInfo,
261        queue: &mut VecDeque<(ResolvedVc<Box<dyn Module>>, PreBatchIndex)>,
262    ) -> Result<PreBatchIndex> {
263        Ok(match self.entries.entry(module) {
264            Entry::Vacant(e) => {
265                let index = self.batches.len();
266                queue.push_back((module, index));
267                let chunk_groups = chunk_group_info
268                    .module_chunk_groups
269                    .get(&module)
270                    .context("all modules need to have chunk group info")?;
271                let batch = PreBatch::new(chunk_groups.clone());
272                self.batches.push(batch);
273                e.insert(index);
274                index
275            }
276            Entry::Occupied(e) => *e.get(),
277        })
278    }
279
280    async fn get_pre_batch_items(
281        &mut self,
282        entry: ResolvedVc<Box<dyn Module>>,
283        chunk_group_info: &ChunkGroupInfo,
284        module_graph: &ModuleGraphRef,
285        queue: &mut VecDeque<(ResolvedVc<Box<dyn Module>>, PreBatchIndex)>,
286    ) -> Result<Vec<PreBatchItem>> {
287        let mut state = TraversalState {
288            items: Vec::new(),
289            this: self,
290        };
291        let mut visited = FxHashSet::default();
292        module_graph.traverse_edges_dfs(
293            std::iter::once(entry),
294            &mut state,
295            |parent_info, node, state| {
296                let ty = parent_info.map_or(
297                    &ChunkingType::Parallel {
298                        inherit_async: false,
299                        hoisted: false,
300                    },
301                    |(_, ty)| &ty.chunking_type,
302                );
303                let module = node;
304                if !ty.is_parallel() {
305                    state.items.push(PreBatchItem::NonParallelEdge(
306                        ty.without_inherit_async(),
307                        module,
308                    ));
309                    return Ok(GraphTraversalAction::Exclude);
310                }
311                if visited.insert(module) {
312                    if parent_info.is_some() && state.this.boundary_modules.contains(&module) {
313                        let idx = state.this.ensure_pre_batch_for_module(
314                            module,
315                            chunk_group_info,
316                            queue,
317                        )?;
318                        state.items.push(PreBatchItem::ParallelReference(idx));
319                        return Ok(GraphTraversalAction::Exclude);
320                    }
321                    Ok(GraphTraversalAction::Continue)
322                } else {
323                    Ok(GraphTraversalAction::Exclude)
324                }
325            },
326            |_, node, state| {
327                let item = PreBatchItem::ParallelModule(node);
328                state.items.push(item);
329                Ok(())
330            },
331        )?;
332        Ok(state.items)
333    }
334}
335
336pub async fn compute_module_batches(
337    module_graph: Vc<ModuleGraph>,
338    _config: &BatchingConfig,
339) -> Result<Vc<ModuleBatchesGraph>> {
340    let outer_span = tracing::info_span!(
341        "compute module batches",
342        initial_pre_batch_items = tracing::field::Empty,
343        initial_pre_batches = tracing::field::Empty,
344        extracted_shared_items = tracing::field::Empty,
345        batches = tracing::field::Empty,
346        modules = tracing::field::Empty,
347        edges = tracing::field::Empty
348    );
349    let span = outer_span.clone();
350    async move {
351        let chunk_group_info = module_graph.chunk_group_info().await?;
352        let module_graph = module_graph.read_graphs().await?;
353
354        let mut pre_batches = PreBatches::new();
355
356        // Walk the module graph and mark all modules that are boundary modules (referenced from a
357        // different chunk group bitmap)
358        module_graph.traverse_edges_unordered(|parent, node| {
359            if let Some((parent, ty)) = parent {
360                let std::collections::hash_set::Entry::Vacant(entry) =
361                    pre_batches.boundary_modules.entry(node)
362                else {
363                    // Already a boundary module, can skip check
364                    return Ok(());
365                };
366                if ty.chunking_type.is_parallel() {
367                    let parent_chunk_groups = chunk_group_info
368                        .module_chunk_groups
369                        .get(&parent)
370                        .context("all modules need to have chunk group info")?;
371                    let chunk_groups = chunk_group_info
372                        .module_chunk_groups
373                        .get(&node)
374                        .context("all modules need to have chunk group info")?;
375                    if parent_chunk_groups != chunk_groups {
376                        // This is a boundary module
377                        entry.insert();
378                    }
379                } else {
380                    entry.insert();
381                }
382            }
383            Ok(())
384        })?;
385
386        // All entries are boundary modules too
387        for chunk_group in &chunk_group_info.chunk_groups {
388            for entry in chunk_group.entries() {
389                pre_batches.boundary_modules.insert(entry);
390            }
391        }
392
393        // Pre batches would be incorrect with cycles, so we need to opt-out of pre batches for
394        // cycles that include boundary modules
395        module_graph.traverse_cycles(
396            |ref_data| ref_data.chunking_type.is_parallel(),
397            |cycle| {
398                if cycle
399                    .iter()
400                    .any(|node| pre_batches.boundary_modules.contains(node))
401                {
402                    pre_batches
403                        .boundary_modules
404                        .extend(cycle.iter().map(|node| **node));
405                }
406                Ok(())
407            },
408        )?;
409
410        let mut queue: VecDeque<(ResolvedVc<Box<dyn Module>>, PreBatchIndex)> = VecDeque::new();
411
412        let mut chunk_group_indices_with_merged_children = FxHashSet::default();
413
414        // Start with the entries
415        for chunk_group in &chunk_group_info.chunk_groups {
416            for entry in chunk_group.entries() {
417                pre_batches.ensure_pre_batch_for_module(entry, &chunk_group_info, &mut queue)?;
418            }
419            if let Some(parent) = chunk_group.get_merged_parent() {
420                chunk_group_indices_with_merged_children.insert(parent);
421            }
422        }
423
424        let mut initial_pre_batch_items = 0;
425        // Fill all pre batches
426        while let Some((chunkable_module, idx)) = queue.pop_front() {
427            let items = pre_batches
428                .get_pre_batch_items(
429                    chunkable_module,
430                    &chunk_group_info,
431                    &module_graph,
432                    &mut queue,
433                )
434                .await?;
435            initial_pre_batch_items += items.len();
436            let batch = &mut pre_batches.batches[idx];
437            batch.items.extend(items);
438        }
439        span.record("initial_pre_batch_items", initial_pre_batch_items);
440        span.record("initial_pre_batches", pre_batches.batches.len());
441
442        // Figure out the order of all merged groups
443        let mut ordered_entries: Vec<Option<EntriesList>> =
444            vec![None; chunk_group_info.chunk_groups.len()];
445        for (i, chunk_group) in chunk_group_info.chunk_groups.iter().enumerate() {
446            if !chunk_group_indices_with_merged_children.contains(&i) {
447                continue;
448            }
449            let mut merged_modules: FxHashMap<ChunkingType, FxIndexSet<_>> = FxHashMap::default();
450            let mut stack = ordered_entries[i]
451                .as_ref()
452                .map_or_else(
453                    || Either::Left(chunk_group.entries()),
454                    |v| Either::Right(v.0.iter().copied()),
455                )
456                .map(|module| {
457                    let idx = *pre_batches
458                        .entries
459                        .get(&module)
460                        .context("could not prebatch for module")?;
461                    Ok((idx, 0))
462                })
463                .collect::<Result<Vec<_>>>()?;
464            stack.reverse();
465            let mut visited = FxHashSet::default();
466            while let Some((idx, mut pos)) = stack.pop() {
467                let batch = &pre_batches.batches[idx];
468                while let Some(item) = batch.items.get_index(pos) {
469                    match item {
470                        PreBatchItem::ParallelModule(_) => {}
471                        PreBatchItem::ParallelReference(other_idx) => {
472                            if visited.insert(*other_idx) {
473                                stack.push((idx, pos + 1));
474                                stack.push((*other_idx, 0));
475                                break;
476                            }
477                        }
478                        PreBatchItem::NonParallelEdge(chunking_type, module) => {
479                            if chunking_type.is_merged() {
480                                merged_modules
481                                    .entry(chunking_type.clone())
482                                    .or_default()
483                                    .insert(*module);
484                            }
485                        }
486                    }
487                    pos += 1;
488                }
489            }
490            if !merged_modules.is_empty() {
491                for (ty, merged_modules) in merged_modules {
492                    let chunk_group_key = match ty {
493                        ChunkingType::Isolated {
494                            merge_tag: Some(merge_tag),
495                            ..
496                        } => ChunkGroupKey::IsolatedMerged {
497                            parent: i.into(),
498                            merge_tag: merge_tag.clone(),
499                        },
500                        ChunkingType::Shared {
501                            merge_tag: Some(merge_tag),
502                            ..
503                        } => ChunkGroupKey::SharedMerged {
504                            parent: i.into(),
505                            merge_tag: merge_tag.clone(),
506                        },
507                        _ => unreachable!(),
508                    };
509                    let idx = chunk_group_info
510                        .chunk_group_keys
511                        .get_index_of(&chunk_group_key)
512                        .context("could not find chunk group key for merged chunk group")?;
513                    ordered_entries[idx] = Some(EntriesList(merged_modules));
514                }
515            }
516        }
517
518        // Create a map of parallel module to the batches they are contained in.
519        let mut parallel_module_to_pre_batch: FxIndexMap<_, Vec<PreBatchIndex>> =
520            FxIndexMap::default();
521
522        // Fill the map and also fill up the single_module_entries
523        for (idx, pre_batch) in pre_batches.batches.iter().enumerate() {
524            for item in &pre_batch.items {
525                match item {
526                    PreBatchItem::ParallelModule(module) => {
527                        parallel_module_to_pre_batch
528                            .entry(*module)
529                            .or_default()
530                            .push(idx);
531                    }
532                    PreBatchItem::NonParallelEdge(_, module) => {
533                        if !pre_batches.entries.contains_key(module) {
534                            pre_batches.single_module_entries.insert(*module);
535                        }
536                    }
537                    PreBatchItem::ParallelReference(_) => {}
538                }
539            }
540        }
541
542        // We never want a module to occur in multiple batches.
543
544        let mut extracted_shared_items = 0;
545        // Extract shared modules into separate batches
546        for i in 0..parallel_module_to_pre_batch.len() {
547            let (&module, batches) = parallel_module_to_pre_batch
548                .get_index(i)
549                .context("could not find parallel module to pre batch index")?;
550            if batches.len() > 1 {
551                // Create a new batch for the shared modules
552                let batches_with_item_index = batches
553                    .iter()
554                    .map(|&idx| {
555                        let batch_items = &pre_batches.batches[idx].items;
556                        let item_idx = batch_items
557                            .get_index_of(&PreBatchItem::ParallelModule(module))
558                            .context("could not find batch item index for parallel module")?;
559                        Ok((idx, item_idx))
560                    })
561                    .collect::<Result<Vec<_>>>()?;
562                let mut selected_items = 1;
563                fn get_item_at(
564                    pre_batches: &PreBatches,
565                    batch_idx: PreBatchIndex,
566                    item_idx: usize,
567                ) -> Option<&PreBatchItem> {
568                    pre_batches.batches[batch_idx].items.get_index(item_idx)
569                }
570                // Select more matching items that are equal in all batches that contain the shared
571                // module(s)
572                loop {
573                    if let Some(PreBatchItem::ParallelModule(next_module)) = get_item_at(
574                        &pre_batches,
575                        batches_with_item_index[0].0,
576                        batches_with_item_index[0].1 + selected_items,
577                    ) && parallel_module_to_pre_batch
578                        .get(next_module)
579                        .context("could not find pre batch for parallel module")?
580                        .len()
581                        == batches.len()
582                        && batches_with_item_index[1..]
583                            .iter()
584                            .all(|&(batch_idx, item_idx)| {
585                                get_item_at(&pre_batches, batch_idx, item_idx + selected_items)
586                                    == Some(&PreBatchItem::ParallelModule(*next_module))
587                            })
588                    {
589                        selected_items += 1;
590                        continue;
591                    }
592                    break;
593                }
594                extracted_shared_items += selected_items;
595
596                // Check if a batch is completely selected. In that case we can replace all other
597                // occurrences with a reference to that batch
598                let exact_match = batches_with_item_index
599                    .iter()
600                    .find(|&&(batch_idx, item_idx)| {
601                        item_idx == 0
602                            && pre_batches.batches[batch_idx].items.len() == selected_items
603                    });
604                if let Some(&(exact_match, _)) = exact_match {
605                    // Replace all other occurrences with a reference to the exact match
606                    for &(batch_index, item_start) in batches_with_item_index.iter() {
607                        if batch_index != exact_match {
608                            pre_batches.batches[batch_index].items.splice(
609                                item_start..item_start + selected_items,
610                                std::iter::once(PreBatchItem::ParallelReference(exact_match)),
611                            );
612                        }
613                    }
614                    for item in pre_batches.batches[exact_match].items.iter() {
615                        if let PreBatchItem::ParallelModule(module) = item {
616                            parallel_module_to_pre_batch
617                                .get_mut(module)
618                                .context("could not find pre batch for parallel module")?
619                                .clear();
620                        }
621                    }
622                } else {
623                    // Create a new batch of the shared part and replace all occurrences with a
624                    // reference to that batch
625                    let first_batch_index = batches_with_item_index[0].0;
626                    let first_batch_item_index = batches_with_item_index[0].1;
627                    let new_batch_index = pre_batches.batches.len();
628                    let mut new_batch =
629                        PreBatch::new(pre_batches.batches[first_batch_index].chunk_groups.clone());
630                    new_batch
631                        .items
632                        .extend(pre_batches.batches[first_batch_index].items.splice(
633                            first_batch_item_index..first_batch_item_index + selected_items,
634                            std::iter::once(PreBatchItem::ParallelReference(new_batch_index)),
635                        ));
636                    for item in new_batch.items.iter() {
637                        if let PreBatchItem::ParallelModule(module) = item {
638                            parallel_module_to_pre_batch
639                                .get_mut(module)
640                                .context("could not find pre batch for parallel module")?
641                                .clear();
642                        }
643                    }
644                    pre_batches.batches.push(new_batch);
645                    for &(batch_index, item_start) in batches_with_item_index[1..].iter() {
646                        pre_batches.batches[batch_index].items.splice(
647                            item_start..item_start + selected_items,
648                            std::iter::once(PreBatchItem::ParallelReference(new_batch_index)),
649                        );
650                    }
651                }
652            }
653        }
654        span.record("extracted_shared_items", extracted_shared_items);
655
656        // Now every module is only in one batch
657
658        let mut edges_count = 0;
659
660        // Since batches can only have references followed by a list of parallel chunkable modules,
661        // we need to split batches that have modules before references.
662        for i in 0..pre_batches.batches.len() {
663            let items = take(&mut pre_batches.batches[i].items);
664            let mut new_items =
665                FxIndexSet::with_capacity_and_hasher(items.len(), Default::default());
666            enum Mode {
667                ParallelChunkableModule,
668                Other,
669            }
670            let mut mode = Mode::Other;
671            for item in items {
672                let chunkable_module = if let PreBatchItem::ParallelModule(module) = &item {
673                    ResolvedVc::try_downcast::<Box<dyn ChunkableModule>>(*module)
674                } else {
675                    None
676                };
677                let item = if let PreBatchItem::ParallelModule(module) = item {
678                    if chunkable_module.is_some() {
679                        PreBatchItem::ParallelModule(module)
680                    } else {
681                        pre_batches.single_module_entries.insert(module);
682                        PreBatchItem::NonParallelEdge(
683                            ChunkingType::Parallel {
684                                inherit_async: false,
685                                hoisted: false,
686                            },
687                            module,
688                        )
689                    }
690                } else {
691                    item
692                };
693                match (&mode, chunkable_module) {
694                    (_, Some(_)) => {
695                        mode = Mode::ParallelChunkableModule;
696                        new_items.insert(item);
697                    }
698                    (Mode::Other, _) => {
699                        edges_count += 1;
700                        new_items.insert(item);
701                    }
702                    (Mode::ParallelChunkableModule, _) => {
703                        // Split the batch
704                        let idx = pre_batches.batches.len();
705                        let mut new_batch =
706                            PreBatch::new(pre_batches.batches[i].chunk_groups.clone());
707                        new_batch.items.extend(new_items.drain(..));
708                        pre_batches.batches.push(new_batch);
709                        edges_count += 1;
710                        new_items.insert(PreBatchItem::ParallelReference(idx));
711                        if chunkable_module.is_some() {
712                            new_items.insert(item);
713                        } else {
714                            edges_count += 1;
715                            mode = Mode::Other;
716                            new_items.insert(item);
717                        }
718                    }
719                }
720            }
721            pre_batches.batches[i].items = new_items;
722        }
723        span.record("pre_batches", pre_batches.batches.len());
724
725        // Now batches are in the correct shape. We can create the real batches and the graph.
726
727        // Create the graph
728        let mut graph: DiGraph<ModuleOrBatch, ModuleBatchesGraphEdge, u32> =
729            petgraph::graph::DiGraph::with_capacity(
730                pre_batches.batches.len() + pre_batches.single_module_entries.len(),
731                edges_count,
732            );
733
734        // Create the Vc<ModuleBatch> instances
735        let batches = pre_batches
736            .batches
737            .iter_mut()
738            .enumerate()
739            .map(async |(i, pre_batch)| {
740                let mut modules = pre_batch.items.iter().filter_map(|item| {
741                    if let PreBatchItem::ParallelModule(module) = item {
742                        ResolvedVc::try_downcast(*module)
743                    } else {
744                        None
745                    }
746                });
747                let Some(first) = modules.next() else {
748                    return Ok(ModuleOrBatch::None(i));
749                };
750                if let Some(second) = modules.next() {
751                    let batch = ModuleBatch::new(
752                        [first, second]
753                            .into_iter()
754                            .chain(modules)
755                            .map(|m| *m)
756                            .collect::<Vec<_>>(),
757                        Some(pre_batch.chunk_groups.clone()),
758                    );
759                    Ok(ModuleOrBatch::Batch(batch.to_resolved().await?))
760                } else {
761                    Ok(ModuleOrBatch::Module(ResolvedVc::upcast(first)))
762                }
763            })
764            .try_join()
765            .await?;
766
767        // Create the batch groups by grouping batches with the same chunk groups
768        let mut batch_groups: FxHashMap<_, Vec<_>> = FxHashMap::default();
769        for (i, pre_batch) in pre_batches.batches.iter().enumerate() {
770            let key = BuildHasherDefault::<FxHasher>::default().prehash(&pre_batch.chunk_groups);
771            let batch = batches[i];
772            batch_groups.entry(key).or_default().push(batch);
773        }
774        for &module in &pre_batches.single_module_entries {
775            let chunk_groups = chunk_group_info
776                .module_chunk_groups
777                .get(&module)
778                .context("all modules need to have chunk group info")?;
779            let key = BuildHasherDefault::<FxHasher>::default().prehash(chunk_groups);
780            batch_groups
781                .entry(key)
782                .or_default()
783                .push(ModuleOrBatch::Module(module));
784        }
785
786        // Create the batch group instances
787        let batch_groups = batch_groups
788            .into_iter()
789            .map(async |(key, items)| {
790                if items.len() == 1 {
791                    Ok(Either::Left(std::iter::empty()))
792                } else {
793                    let batch_group = ModuleBatchGroup::new(items.clone(), (*key).clone())
794                        .to_resolved()
795                        .await?;
796                    Ok(Either::Right(
797                        items.into_iter().map(move |item| (item, batch_group)),
798                    ))
799                }
800            })
801            .try_join()
802            .await?
803            .into_iter()
804            .flatten()
805            .collect::<FxHashMap<_, _>>();
806
807        // Insert batches into the graph and store the NodeIndices
808        let mut batches_count = 0;
809        let mut modules_count = 0;
810        let batch_indices = batches
811            .into_iter()
812            .map(|batch| {
813                match &batch {
814                    ModuleOrBatch::Batch(_) => batches_count += 1,
815                    ModuleOrBatch::Module(_) => modules_count += 1,
816                    ModuleOrBatch::None(_) => {}
817                }
818                graph.add_node(batch)
819            })
820            .collect::<Vec<_>>();
821
822        // Also insert single modules into the graph and store the NodeIndices
823        let single_module_indices = pre_batches
824            .single_module_entries
825            .iter()
826            .map(|module| graph.add_node(ModuleOrBatch::Module(*module)))
827            .collect::<Vec<_>>();
828
829        span.record("batches", batches_count);
830        modules_count += pre_batches.single_module_entries.len();
831        span.record("modules", modules_count);
832        span.record("edges", edges_count);
833
834        // Add all the edges to the graph
835        for (i, pre_batch) in pre_batches.batches.into_iter().enumerate() {
836            let index = batch_indices[i];
837            let items = pre_batch.items;
838            for item in items {
839                match item {
840                    PreBatchItem::ParallelReference(idx) => {
841                        graph.add_edge(
842                            index,
843                            batch_indices[idx],
844                            ModuleBatchesGraphEdge {
845                                ty: ChunkingType::Parallel {
846                                    inherit_async: false,
847                                    hoisted: false,
848                                },
849                                module: None,
850                            },
851                        );
852                    }
853                    PreBatchItem::NonParallelEdge(ty, module) => {
854                        if let Some(batch) = pre_batches.entries.get(&module).copied() {
855                            graph.add_edge(
856                                index,
857                                batch_indices[batch],
858                                ModuleBatchesGraphEdge {
859                                    ty,
860                                    module: Some(module),
861                                },
862                            );
863                            continue;
864                        }
865                        let idx = pre_batches
866                            .single_module_entries
867                            .get_index_of(&module)
868                            .context("could not find single module entry index")?;
869                        let idx = single_module_indices[idx];
870                        graph.add_edge(
871                            index,
872                            idx,
873                            ModuleBatchesGraphEdge {
874                                ty,
875                                module: Some(module),
876                            },
877                        );
878                    }
879                    PreBatchItem::ParallelModule(_) => {}
880                }
881            }
882        }
883
884        debug_assert_eq!(graph.capacity().0, graph.node_count());
885        debug_assert_eq!(graph.capacity().1, graph.edge_count());
886
887        // Find the NodeIndices for our entries of the graph
888        let mut entries = FxHashMap::default();
889        for chunk_group in &chunk_group_info.chunk_groups {
890            for module in chunk_group.entries() {
891                if let Some(batch) = pre_batches.entries.get(&module).copied() {
892                    entries.insert(module, batch_indices[batch]);
893                    continue;
894                }
895                let idx = pre_batches
896                    .single_module_entries
897                    .get_index_of(&module)
898                    .context("could not find single module entry index")?;
899                let idx = single_module_indices[idx];
900                entries.insert(module, idx);
901            }
902        }
903
904        Ok(ModuleBatchesGraph {
905            graph: TracedDiGraph(graph),
906            entries,
907            batch_groups,
908            ordered_entries,
909        }
910        .cell())
911    }
912    .instrument(outer_span)
913    .await
914}