turbo_tasks/graph/
graph_traversal.rs

1use std::future::Future;
2
3use anyhow::Result;
4use futures::{StreamExt, stream::FuturesUnordered};
5use rustc_hash::FxHashSet;
6
7use super::{
8    SkipDuplicates, Visit, VisitControlFlow,
9    graph_store::{GraphNode, GraphStore},
10    with_future::With,
11};
12
13/// A list of modules that were already visited and should be skipped (including their subgraphs).
14#[derive(Clone, Default, Debug)]
15pub struct VisitedNodes<T>(pub FxHashSet<T>);
16
17/// [`GraphTraversal`] is a utility type that can be used to traverse a graph of
18/// nodes, where each node can have a variable number of outgoing edges.
19///
20/// The traversal is done in parallel, and the order of the nodes in the traversal
21/// result is determined by the [`GraphStore`] parameter.
22pub trait GraphTraversal: GraphStore + Sized {
23    fn visit<VisitImpl, Abort, Impl>(
24        self,
25        root_edges: impl IntoIterator<Item = VisitImpl::Edge>,
26        visit: VisitImpl,
27    ) -> impl Future<Output = GraphTraversalResult<Result<Self>, Abort>> + Send
28    where
29        VisitImpl: Visit<Self::Node, Abort, Impl> + Send,
30        Abort: Send,
31        Impl: Send;
32
33    fn skip_duplicates(self) -> SkipDuplicates<Self>;
34    fn skip_duplicates_with_visited_nodes(
35        self,
36        visited: VisitedNodes<Self::Node>,
37    ) -> SkipDuplicates<Self>;
38}
39
40impl<Store> GraphTraversal for Store
41where
42    Store: GraphStore,
43{
44    /// Visits the graph starting from the given `roots`, and returns a future
45    /// that will resolve to the traversal result.
46    fn visit<VisitImpl, Abort, Impl>(
47        mut self,
48        root_edges: impl IntoIterator<Item = VisitImpl::Edge>,
49        mut visit: VisitImpl,
50    ) -> impl Future<Output = GraphTraversalResult<Result<Self>, Abort>> + Send
51    where
52        VisitImpl: Visit<Self::Node, Abort, Impl> + Send,
53        Abort: Send,
54        Impl: Send,
55    {
56        let mut futures = FuturesUnordered::new();
57        let mut root_abort = None;
58
59        // Populate `futures` with all the roots, `root_edges` isn't required to be `Send`, so this
60        // has to happen outside of the future. We could require `root_edges` to be `Send` in the
61        // future.
62        for edge in root_edges {
63            match visit.visit(edge) {
64                VisitControlFlow::Continue(node) => {
65                    if let Some((parent_handle, node_ref)) = self.insert(None, GraphNode(node)) {
66                        let span = visit.span(node_ref);
67                        futures.push(With::new(visit.edges(node_ref), span, parent_handle));
68                    }
69                }
70                VisitControlFlow::Skip(node) => {
71                    self.insert(None, GraphNode(node));
72                }
73                VisitControlFlow::Abort(abort) => {
74                    // this must be returned inside the `async` block below so that it's part of the
75                    // returned future
76                    root_abort = Some(abort)
77                }
78            }
79        }
80
81        async move {
82            if let Some(abort) = root_abort {
83                return GraphTraversalResult::Aborted(abort);
84            }
85            loop {
86                match futures.next().await {
87                    Some((parent_handle, span, Ok(edges))) => {
88                        let _guard = span.enter();
89                        for edge in edges {
90                            match visit.visit(edge) {
91                                VisitControlFlow::Continue(node) => {
92                                    if let Some((node_handle, node_ref)) =
93                                        self.insert(Some(parent_handle.clone()), GraphNode(node))
94                                    {
95                                        let span = visit.span(node_ref);
96                                        futures.push(With::new(
97                                            visit.edges(node_ref),
98                                            span,
99                                            node_handle,
100                                        ));
101                                    }
102                                }
103                                VisitControlFlow::Skip(node) => {
104                                    self.insert(Some(parent_handle.clone()), GraphNode(node));
105                                }
106                                VisitControlFlow::Abort(abort) => {
107                                    return GraphTraversalResult::Aborted(abort);
108                                }
109                            }
110                        }
111                    }
112                    Some((_, _, Err(err))) => {
113                        return GraphTraversalResult::Completed(Err(err));
114                    }
115                    None => {
116                        return GraphTraversalResult::Completed(Ok(self));
117                    }
118                }
119            }
120        }
121    }
122
123    fn skip_duplicates(self) -> SkipDuplicates<Self> {
124        SkipDuplicates::new(self)
125    }
126
127    fn skip_duplicates_with_visited_nodes(
128        self,
129        visited: VisitedNodes<Store::Node>,
130    ) -> SkipDuplicates<Self> {
131        SkipDuplicates::new_with_visited_nodes(self, visited.0)
132    }
133}
134
135pub enum GraphTraversalResult<Completed, Aborted> {
136    Completed(Completed),
137    Aborted(Aborted),
138}
139
140impl<Completed> GraphTraversalResult<Completed, !> {
141    pub fn completed(self) -> Completed {
142        match self {
143            GraphTraversalResult::Completed(completed) => completed,
144            GraphTraversalResult::Aborted(_) => unreachable!("the type parameter `Aborted` is `!`"),
145        }
146    }
147}