1use std::{
2 fs::OpenOptions,
3 io::Write,
4 iter,
5 path::{Path, PathBuf},
6 sync::{Arc, Mutex},
7 time::Duration,
8};
9
10use clap::{Args, Parser, Subcommand};
11use rand::{Rng, RngCore, SeedableRng};
12use rustc_hash::FxHashSet;
13use tokio::time::sleep;
14use turbo_rcstr::RcStr;
15use turbo_tasks::{NonLocalValue, ResolvedVc, TransientInstance, Vc, trace::TraceRawVcs};
16use turbo_tasks_backend::{BackendOptions, TurboTasksBackend, noop_backing_storage};
17use turbo_tasks_fs::{DiskFileSystem, FileSystem, FileSystemPath};
18
19#[derive(Parser)]
27#[command()]
28struct Cli {
29 #[command(subcommand)]
30 command: Commands,
31}
32
33#[derive(Subcommand)]
34enum Commands {
35 FsWatcher(FsWatcher),
37}
38
39#[derive(Args)]
40struct FsWatcher {
41 #[arg(long)]
42 fs_root: PathBuf,
43 #[arg(long, default_value_t = 4)]
44 depth: usize,
45 #[arg(long, default_value_t = 6)]
46 width: usize,
47 #[arg(long, default_value_t = 100)]
48 notify_timeout_ms: u64,
49 #[arg(long, default_value_t = 200)]
50 file_modifications: u32,
51 #[arg(long, default_value_t = 2)]
52 directory_modifications: u32,
53}
54
55#[tokio::main]
56async fn main() -> anyhow::Result<()> {
57 register();
58 let cli = Cli::parse();
59
60 match cli.command {
61 Commands::FsWatcher(args) => fuzz_fs_watcher(args).await,
62 }
63}
64
65#[derive(Default, NonLocalValue, TraceRawVcs)]
66struct PathInvalidations(#[turbo_tasks(trace_ignore)] Arc<Mutex<FxHashSet<RcStr>>>);
67
68async fn fuzz_fs_watcher(args: FsWatcher) -> anyhow::Result<()> {
69 std::fs::create_dir(&args.fs_root)?;
70 let fs_root = args.fs_root.canonicalize()?;
71 let _guard = FsCleanup {
72 path: &fs_root.clone(),
73 };
74
75 let tt = turbo_tasks::TurboTasks::new(TurboTasksBackend::new(
76 BackendOptions::default(),
77 noop_backing_storage(),
78 ));
79 tt.run_once(async move {
80 let invalidations = TransientInstance::new(PathInvalidations::default());
81 let fs_root_rcstr = RcStr::from(fs_root.to_str().unwrap());
82 let project_fs = disk_file_system_operation(fs_root_rcstr.clone())
83 .resolve_strongly_consistent()
84 .await?;
85 let project_root = disk_file_system_root_operation(project_fs)
86 .resolve_strongly_consistent()
87 .await?;
88 create_directory_tree(&mut FxHashSet::default(), &fs_root, args.depth, args.width)?;
89
90 project_fs.await?.start_watching(None).await?;
91
92 let read_all_paths_op =
93 read_all_paths_operation(invalidations.clone(), project_root, args.depth, args.width);
94 read_all_paths_op.read_strongly_consistent().await?;
95 {
96 let mut invalidations = invalidations.0.lock().unwrap();
97 println!("read all {} files", invalidations.len());
98 invalidations.clear();
99 }
100
101 let mut rand_buf = [0; 16];
102 let mut rng = rand::rngs::SmallRng::from_rng(&mut rand::rng());
103 loop {
104 let mut modified_file_paths = FxHashSet::default();
105 for _ in 0..args.file_modifications {
106 let path = fs_root.join(pick_random_file(args.depth, args.width));
107 let mut f = OpenOptions::new().write(true).truncate(true).open(&path)?;
108 rng.fill_bytes(&mut rand_buf);
109 f.write_all(&rand_buf)?;
110 f.flush()?;
111 modified_file_paths.insert(path);
112 }
113 for _ in 0..args.directory_modifications {
114 let dir = pick_random_directory(args.depth, args.width);
115 let path = fs_root.join(dir.path);
116 std::fs::remove_dir_all(&path)?;
117 std::fs::create_dir(&path)?;
118 create_directory_tree(
119 &mut modified_file_paths,
120 &path,
121 args.depth - dir.depth,
122 args.width,
123 )?;
124 }
125 sleep(Duration::from_millis(args.notify_timeout_ms)).await;
128 read_all_paths_op.read_strongly_consistent().await?;
129 {
130 let mut invalidations = invalidations.0.lock().unwrap();
131 println!(
132 "modified {} files and found {} invalidations",
133 modified_file_paths.len(),
134 invalidations.len()
135 );
136 invalidations.clear();
137 }
138 }
139 })
140 .await
141}
142
143#[turbo_tasks::function(operation)]
144fn disk_file_system_operation(fs_root: RcStr) -> Vc<DiskFileSystem> {
145 DiskFileSystem::new("project".into(), fs_root, Vec::new())
146}
147
148#[turbo_tasks::function(operation)]
149fn disk_file_system_root_operation(fs: ResolvedVc<DiskFileSystem>) -> Vc<FileSystemPath> {
150 fs.root()
151}
152
153#[turbo_tasks::function]
154async fn read_path(
155 invalidations: TransientInstance<PathInvalidations>,
156 path: ResolvedVc<FileSystemPath>,
157) -> anyhow::Result<()> {
158 let path_str = path.await?.path.clone();
159 invalidations.0.lock().unwrap().insert(path_str);
160 let _ = path.read().await?;
161 Ok(())
162}
163
164#[turbo_tasks::function(operation)]
165async fn read_all_paths_operation(
166 invalidations: TransientInstance<PathInvalidations>,
167 root: ResolvedVc<FileSystemPath>,
168 depth: usize,
169 width: usize,
170) -> anyhow::Result<()> {
171 async fn read_all_paths_inner(
172 invalidations: TransientInstance<PathInvalidations>,
173 parent: ResolvedVc<FileSystemPath>,
174 depth: usize,
175 width: usize,
176 ) -> anyhow::Result<()> {
177 for child_id in 0..width {
178 let child_name = RcStr::from(child_id.to_string());
179 let child_path = parent.join(child_name).to_resolved().await?;
180 if depth == 1 {
181 read_path(invalidations.clone(), *child_path).await?;
182 } else {
183 Box::pin(read_all_paths_inner(
184 invalidations.clone(),
185 child_path,
186 depth - 1,
187 width,
188 ))
189 .await?;
190 }
191 }
192 Ok(())
193 }
194 read_all_paths_inner(invalidations, root, depth, width).await
195}
196
197fn create_directory_tree(
198 modified_file_paths: &mut FxHashSet<PathBuf>,
199 parent: &Path,
200 depth: usize,
201 width: usize,
202) -> anyhow::Result<()> {
203 let mut rng = rand::rng();
204 let mut rand_buf = [0; 16];
205 for child_id in 0..width {
206 let child_name = child_id.to_string();
207 let child_path = parent.join(&child_name);
208 if depth == 1 {
209 let mut f = std::fs::File::create(&child_path)?;
210 rng.fill_bytes(&mut rand_buf);
211 f.write_all(&rand_buf)?;
212 f.flush()?;
213 modified_file_paths.insert(child_path);
214 } else {
215 std::fs::create_dir(&child_path)?;
216 create_directory_tree(modified_file_paths, &child_path, depth - 1, width)?;
217 }
218 }
219 Ok(())
220}
221
222fn pick_random_file(depth: usize, width: usize) -> PathBuf {
223 let mut rng = rand::rng();
224 iter::repeat_with(|| rng.random_range(0..width).to_string())
225 .take(depth)
226 .collect()
227}
228
229struct RandomDirectory {
230 depth: usize,
231 path: PathBuf,
232}
233
234fn pick_random_directory(max_depth: usize, width: usize) -> RandomDirectory {
235 let mut rng = rand::rng();
236 let depth = rng.random_range(1..(max_depth - 1));
238 let path = iter::repeat_with(|| rng.random_range(0..width).to_string())
239 .take(depth)
240 .collect();
241 RandomDirectory { depth, path }
242}
243
244struct FsCleanup<'a> {
245 path: &'a Path,
246}
247
248impl Drop for FsCleanup<'_> {
249 fn drop(&mut self) {
250 std::fs::remove_dir_all(self.path).unwrap();
251 }
252}
253
254fn register() {
255 turbo_tasks::register();
256 turbo_tasks_fs::register();
257 include!(concat!(env!("OUT_DIR"), "/register.rs"));
258}