turbo_tasks/graph/
graph_traversal.rs1use std::future::Future;
2
3use anyhow::Result;
4use futures::{StreamExt, stream::FuturesUnordered};
5use rustc_hash::FxHashSet;
6
7use super::{
8 SkipDuplicates, Visit, VisitControlFlow,
9 graph_store::{GraphNode, GraphStore},
10 with_future::With,
11};
12
13#[derive(Clone, Default, Debug)]
15pub struct VisitedNodes<T>(pub FxHashSet<T>);
16
17pub trait GraphTraversal: GraphStore + Sized {
23 fn visit<VisitImpl, Abort, Impl>(
24 self,
25 root_edges: impl IntoIterator<Item = VisitImpl::Edge>,
26 visit: VisitImpl,
27 ) -> impl Future<Output = GraphTraversalResult<Result<Self>, Abort>> + Send
28 where
29 VisitImpl: Visit<Self::Node, Abort, Impl> + Send,
30 Abort: Send,
31 Impl: Send;
32
33 fn skip_duplicates(self) -> SkipDuplicates<Self>;
34 fn skip_duplicates_with_visited_nodes(
35 self,
36 visited: VisitedNodes<Self::Node>,
37 ) -> SkipDuplicates<Self>;
38}
39
40impl<Store> GraphTraversal for Store
41where
42 Store: GraphStore,
43{
44 fn visit<VisitImpl, Abort, Impl>(
47 mut self,
48 root_edges: impl IntoIterator<Item = VisitImpl::Edge>,
49 mut visit: VisitImpl,
50 ) -> impl Future<Output = GraphTraversalResult<Result<Self>, Abort>> + Send
51 where
52 VisitImpl: Visit<Self::Node, Abort, Impl> + Send,
53 Abort: Send,
54 Impl: Send,
55 {
56 let mut futures = FuturesUnordered::new();
57 let mut root_abort = None;
58
59 for edge in root_edges {
63 match visit.visit(edge) {
64 VisitControlFlow::Continue(node) => {
65 if let Some((parent_handle, node_ref)) = self.insert(None, GraphNode(node)) {
66 let span = visit.span(node_ref);
67 futures.push(With::new(visit.edges(node_ref), span, parent_handle));
68 }
69 }
70 VisitControlFlow::Skip(node) => {
71 self.insert(None, GraphNode(node));
72 }
73 VisitControlFlow::Abort(abort) => {
74 root_abort = Some(abort)
77 }
78 }
79 }
80
81 async move {
82 if let Some(abort) = root_abort {
83 return GraphTraversalResult::Aborted(abort);
84 }
85 loop {
86 match futures.next().await {
87 Some((parent_handle, span, Ok(edges))) => {
88 let _guard = span.enter();
89 for edge in edges {
90 match visit.visit(edge) {
91 VisitControlFlow::Continue(node) => {
92 if let Some((node_handle, node_ref)) =
93 self.insert(Some(parent_handle.clone()), GraphNode(node))
94 {
95 let span = visit.span(node_ref);
96 futures.push(With::new(
97 visit.edges(node_ref),
98 span,
99 node_handle,
100 ));
101 }
102 }
103 VisitControlFlow::Skip(node) => {
104 self.insert(Some(parent_handle.clone()), GraphNode(node));
105 }
106 VisitControlFlow::Abort(abort) => {
107 return GraphTraversalResult::Aborted(abort);
108 }
109 }
110 }
111 }
112 Some((_, _, Err(err))) => {
113 return GraphTraversalResult::Completed(Err(err));
114 }
115 None => {
116 return GraphTraversalResult::Completed(Ok(self));
117 }
118 }
119 }
120 }
121 }
122
123 fn skip_duplicates(self) -> SkipDuplicates<Self> {
124 SkipDuplicates::new(self)
125 }
126
127 fn skip_duplicates_with_visited_nodes(
128 self,
129 visited: VisitedNodes<Store::Node>,
130 ) -> SkipDuplicates<Self> {
131 SkipDuplicates::new_with_visited_nodes(self, visited.0)
132 }
133}
134
135pub enum GraphTraversalResult<Completed, Aborted> {
136 Completed(Completed),
137 Aborted(Aborted),
138}
139
140impl<Completed> GraphTraversalResult<Completed, !> {
141 pub fn completed(self) -> Completed {
142 match self {
143 GraphTraversalResult::Completed(completed) => completed,
144 GraphTraversalResult::Aborted(_) => unreachable!("the type parameter `Aborted` is `!`"),
145 }
146 }
147}