turbo_tasks_fuzz/
symlink_stress.rs1use std::{
2 path::{Path, PathBuf},
3 time::{Duration, Instant},
4};
5
6const PROGRESS_INTERVAL: Duration = Duration::from_secs(1);
7
8use clap::Args;
9use rand::{RngExt, SeedableRng};
10use turbo_rcstr::{RcStr, rcstr};
11use turbo_tasks::{ResolvedVc, TryJoinIterExt, Vc, apply_effects};
12use turbo_tasks_backend::{BackendOptions, TurboTasksBackend, noop_backing_storage};
13use turbo_tasks_fs::{DiskFileSystem, FileSystem, FileSystemPath, LinkContent, LinkType};
14
15#[derive(Args)]
16pub struct SymlinkStress {
17 #[arg(long)]
18 fs_root: PathBuf,
19 #[arg(long, default_value_t = 20)]
21 target_count: usize,
22 #[arg(long, default_value_t = 50)]
24 symlink_count: usize,
25 #[arg(long, default_value_t = 16)]
27 parallelism: usize,
28 #[arg(long, default_value_t = 5)]
30 duration_secs: u64,
31}
32
33pub async fn run(args: SymlinkStress) -> anyhow::Result<()> {
34 std::fs::create_dir(&args.fs_root)?;
35 let fs_root = args.fs_root.canonicalize()?;
36 let _guard = FsCleanup {
37 path: &fs_root.clone(),
38 };
39
40 let targets_dir = fs_root.join("_targets");
42 std::fs::create_dir(&targets_dir)?;
43 for i in 0..args.target_count {
44 std::fs::create_dir(targets_dir.join(i.to_string()))?;
45 }
46
47 let symlinks_dir = fs_root.join("_symlinks");
49 std::fs::create_dir(&symlinks_dir)?;
50
51 let tt = turbo_tasks::TurboTasks::new(TurboTasksBackend::new(
52 BackendOptions::default(),
53 noop_backing_storage(),
54 ));
55
56 let target_count = args.target_count;
57 let symlink_count = args.symlink_count;
58 let parallelism = args.parallelism;
59 let duration = Duration::from_secs(args.duration_secs);
60
61 tt.run_once(async move {
62 let project_fs = disk_file_system_operation(RcStr::from(fs_root.to_str().unwrap()))
63 .resolve_strongly_consistent()
64 .await?;
65 let project_root = disk_file_system_root_operation(project_fs)
66 .resolve_strongly_consistent()
67 .await?
68 .owned()
69 .await?;
70
71 let symlinks_path = project_root.join("_symlinks")?;
73 let initial_target = RcStr::from("../_targets/0");
74
75 println!("creating {symlink_count} initial symlinks...");
76
77 let initial_op =
78 create_initial_symlinks_operation(symlinks_path.clone(), symlink_count, initial_target);
79 initial_op.read_strongly_consistent().await?;
80 apply_effects(initial_op).await?;
81
82 println!(
83 "starting stress test with parallelism={} for {}s...",
84 parallelism,
85 duration.as_secs()
86 );
87
88 let mut rng = rand::rngs::SmallRng::from_rng(&mut rand::rng());
89 let mut total_writes: u64 = 0;
90 let mut last_progress_writes: u64 = 0;
91 let start_time = Instant::now();
92 let mut last_progress_time = start_time;
93
94 loop {
95 if start_time.elapsed() >= duration {
97 break;
98 }
99
100 let updates: Vec<(usize, usize)> = (0..parallelism)
102 .map(|_| {
103 let symlink_idx = rng.random_range(0..symlink_count);
104 let target_idx = rng.random_range(0..target_count);
105 (symlink_idx, target_idx)
106 })
107 .collect();
108
109 let write_op = write_symlinks_batch_operation(symlinks_path.clone(), updates);
111 write_op.read_strongly_consistent().await?;
112 apply_effects(write_op).await?;
113
114 total_writes += parallelism as u64;
115
116 let now = Instant::now();
118 if now.duration_since(last_progress_time) >= PROGRESS_INTERVAL {
119 let interval_writes = total_writes - last_progress_writes;
120 let interval_duration = now.duration_since(last_progress_time);
121 let writes_per_sec = interval_writes as f64 / interval_duration.as_secs_f64();
122 println!(
123 "{:.1}s: {} writes, {:.0} writes/sec",
124 start_time.elapsed().as_secs_f64(),
125 total_writes,
126 writes_per_sec
127 );
128 last_progress_time = now;
129 last_progress_writes = total_writes;
130 }
131 }
132
133 let elapsed = start_time.elapsed();
135 let writes_per_sec = total_writes as f64 / elapsed.as_secs_f64();
136 println!(
137 "completed {} symlink writes in {:.2}s ({:.0} writes/sec)",
138 total_writes,
139 elapsed.as_secs_f64(),
140 writes_per_sec
141 );
142
143 Ok(())
144 })
145 .await?;
146
147 tt.stop_and_wait().await;
148 Ok(())
149}
150
151#[turbo_tasks::function(operation)]
152fn disk_file_system_operation(fs_root: RcStr) -> Vc<DiskFileSystem> {
153 DiskFileSystem::new(rcstr!("project"), fs_root)
154}
155
156#[turbo_tasks::function(operation)]
157fn disk_file_system_root_operation(fs: ResolvedVc<DiskFileSystem>) -> Vc<FileSystemPath> {
158 fs.root()
159}
160
161#[turbo_tasks::function(operation)]
162async fn create_initial_symlinks_operation(
163 symlinks_dir: FileSystemPath,
164 count: usize,
165 target: RcStr,
166) -> anyhow::Result<()> {
167 (0..count)
168 .map(|i| write_symlink(symlinks_dir.clone(), i, target.clone()))
169 .try_join()
170 .await?;
171 Ok(())
172}
173
174#[turbo_tasks::function(operation)]
175async fn write_symlinks_batch_operation(
176 symlinks_dir: FileSystemPath,
177 updates: Vec<(usize, usize)>,
178) -> anyhow::Result<()> {
179 updates
180 .into_iter()
181 .map(|(symlink_idx, target_idx)| {
182 let target = RcStr::from(format!("../_targets/{}", target_idx));
183 write_symlink(symlinks_dir.clone(), symlink_idx, target)
184 })
185 .try_join()
186 .await?;
187 Ok(())
188}
189
190#[turbo_tasks::function]
191async fn write_symlink(
192 symlinks_dir: FileSystemPath,
193 symlink_idx: usize,
194 target: RcStr,
195) -> anyhow::Result<()> {
196 let symlink_path = symlinks_dir.join(&symlink_idx.to_string())?;
197 let link_content = LinkContent::Link {
198 target,
199 link_type: LinkType::DIRECTORY,
200 };
201 symlink_path
202 .fs()
203 .write_link(symlink_path.clone(), link_content.cell())
204 .await?;
205 Ok(())
206}
207
208struct FsCleanup<'a> {
209 path: &'a Path,
210}
211
212impl Drop for FsCleanup<'_> {
213 fn drop(&mut self) {
214 std::fs::remove_dir_all(self.path).unwrap();
215 }
216}