turbo_tasks/graph/
graph_traversal.rs

1use std::future::Future;
2
3use anyhow::Result;
4use futures::{StreamExt, stream::FuturesUnordered};
5use rustc_hash::FxHashSet;
6
7use super::{Visit, VisitControlFlow, graph_store::GraphStore, with_future::With};
8
9/// A list of modules that were already visited and should be skipped (including their subgraphs).
10#[derive(Clone, Default, Debug)]
11pub struct VisitedNodes<T>(pub FxHashSet<T>);
12
13/// [`GraphTraversal`] is a utility type that can be used to traverse a graph of
14/// nodes, where each node can have a variable number of outgoing edges.
15///
16/// The traversal is done in parallel, and the order of the nodes in the traversal
17/// result is determined by the [`GraphStore`] parameter.
18pub trait GraphTraversal: GraphStore + Sized {
19    fn visit<VisitImpl, Impl>(
20        self,
21        root_nodes: impl IntoIterator<Item = Self::Node>,
22        visit: VisitImpl,
23    ) -> impl Future<Output = GraphTraversalResult<Result<Self>>> + Send
24    where
25        VisitImpl: Visit<Self::Node, Self::Edge, Impl> + Send,
26        Impl: Send;
27}
28
29impl<Store> GraphTraversal for Store
30where
31    Store: GraphStore,
32{
33    /// Visits the graph starting from the given `roots`, and returns a future
34    /// that will resolve to the traversal result.
35    fn visit<VisitImpl, Impl>(
36        mut self,
37        root_nodes: impl IntoIterator<Item = Self::Node>,
38        mut visit: VisitImpl,
39    ) -> impl Future<Output = GraphTraversalResult<Result<Self>>> + Send
40    where
41        VisitImpl: Visit<Self::Node, Self::Edge, Impl> + Send,
42        Impl: Send,
43    {
44        let mut futures = FuturesUnordered::new();
45        let mut is_abort = false;
46
47        // Populate `futures` with all the roots, `root_nodes` isn't required to be `Send`, so this
48        // has to happen outside of the future. We could require `root_nodes` to be `Send` in the
49        // future.
50        for node in root_nodes {
51            match visit.visit(&node, None) {
52                VisitControlFlow::Continue => {
53                    if let Some(handle) = self.try_enter(&node) {
54                        let span = visit.span(&node, None);
55                        futures.push(With::new(visit.edges(&node), span, handle));
56                    }
57                    self.insert(None, node);
58                }
59                VisitControlFlow::Skip => {
60                    self.insert(None, node);
61                }
62                VisitControlFlow::Exclude => {
63                    // do nothing
64                }
65                VisitControlFlow::Abort => {
66                    // this must be returned inside the `async` block below so that it's part of the
67                    // returned future
68                    is_abort = true;
69                }
70            }
71        }
72
73        async move {
74            if is_abort {
75                return GraphTraversalResult::Aborted;
76            }
77            loop {
78                match futures.next().await {
79                    Some((parent_node, span, Ok(edges))) => {
80                        let _guard = span.enter();
81                        for (node, edge) in edges {
82                            match visit.visit(&node, Some(&edge)) {
83                                VisitControlFlow::Continue => {
84                                    if let Some(handle) = self.try_enter(&node) {
85                                        let span = visit.span(&node, Some(&edge));
86                                        let edges_future = visit.edges(&node);
87                                        futures.push(With::new(edges_future, span, handle));
88                                    }
89                                    self.insert(Some((&parent_node, edge)), node);
90                                }
91                                VisitControlFlow::Skip => {
92                                    self.insert(Some((&parent_node, edge)), node);
93                                }
94                                VisitControlFlow::Exclude => {
95                                    // do nothing
96                                }
97                                VisitControlFlow::Abort => {
98                                    return GraphTraversalResult::Aborted;
99                                }
100                            }
101                        }
102                    }
103                    Some((_, _, Err(err))) => {
104                        return GraphTraversalResult::Completed(Err(err));
105                    }
106                    None => {
107                        return GraphTraversalResult::Completed(Ok(self));
108                    }
109                }
110            }
111        }
112    }
113}
114
115pub enum GraphTraversalResult<Completed> {
116    Completed(Completed),
117    Aborted,
118}
119
120impl<Completed> GraphTraversalResult<Completed> {
121    pub fn completed(self) -> Completed {
122        match self {
123            GraphTraversalResult::Completed(completed) => completed,
124            GraphTraversalResult::Aborted => panic!("Graph traversal was aborted"),
125        }
126    }
127}