turbo_tasks/graph/
graph_traversal.rs1use std::future::Future;
2
3use anyhow::Result;
4use futures::{StreamExt, stream::FuturesUnordered};
5use rustc_hash::FxHashSet;
6
7use super::{Visit, VisitControlFlow, graph_store::GraphStore, with_future::With};
8
9#[derive(Clone, Default, Debug)]
11pub struct VisitedNodes<T>(pub FxHashSet<T>);
12
13pub trait GraphTraversal: GraphStore + Sized {
19 fn visit<VisitImpl, Impl>(
20 self,
21 root_nodes: impl IntoIterator<Item = Self::Node>,
22 visit: VisitImpl,
23 ) -> impl Future<Output = GraphTraversalResult<Result<Self>>> + Send
24 where
25 VisitImpl: Visit<Self::Node, Self::Edge, Impl> + Send,
26 Impl: Send;
27}
28
29impl<Store> GraphTraversal for Store
30where
31 Store: GraphStore,
32{
33 fn visit<VisitImpl, Impl>(
36 mut self,
37 root_nodes: impl IntoIterator<Item = Self::Node>,
38 mut visit: VisitImpl,
39 ) -> impl Future<Output = GraphTraversalResult<Result<Self>>> + Send
40 where
41 VisitImpl: Visit<Self::Node, Self::Edge, Impl> + Send,
42 Impl: Send,
43 {
44 let mut futures = FuturesUnordered::new();
45 let mut is_abort = false;
46
47 for node in root_nodes {
51 match visit.visit(&node, None) {
52 VisitControlFlow::Continue => {
53 if let Some(handle) = self.try_enter(&node) {
54 let span = visit.span(&node, None);
55 futures.push(With::new(visit.edges(&node), span, handle));
56 }
57 self.insert(None, node);
58 }
59 VisitControlFlow::Skip => {
60 self.insert(None, node);
61 }
62 VisitControlFlow::Exclude => {
63 }
65 VisitControlFlow::Abort => {
66 is_abort = true;
69 }
70 }
71 }
72
73 async move {
74 if is_abort {
75 return GraphTraversalResult::Aborted;
76 }
77 loop {
78 match futures.next().await {
79 Some((parent_node, span, Ok(edges))) => {
80 let _guard = span.enter();
81 for (node, edge) in edges {
82 match visit.visit(&node, Some(&edge)) {
83 VisitControlFlow::Continue => {
84 if let Some(handle) = self.try_enter(&node) {
85 let span = visit.span(&node, Some(&edge));
86 let edges_future = visit.edges(&node);
87 futures.push(With::new(edges_future, span, handle));
88 }
89 self.insert(Some((&parent_node, edge)), node);
90 }
91 VisitControlFlow::Skip => {
92 self.insert(Some((&parent_node, edge)), node);
93 }
94 VisitControlFlow::Exclude => {
95 }
97 VisitControlFlow::Abort => {
98 return GraphTraversalResult::Aborted;
99 }
100 }
101 }
102 }
103 Some((_, _, Err(err))) => {
104 return GraphTraversalResult::Completed(Err(err));
105 }
106 None => {
107 return GraphTraversalResult::Completed(Ok(self));
108 }
109 }
110 }
111 }
112 }
113}
114
115pub enum GraphTraversalResult<Completed> {
116 Completed(Completed),
117 Aborted,
118}
119
120impl<Completed> GraphTraversalResult<Completed> {
121 pub fn completed(self) -> Completed {
122 match self {
123 GraphTraversalResult::Completed(completed) => completed,
124 GraphTraversalResult::Aborted => panic!("Graph traversal was aborted"),
125 }
126 }
127}