Skip to main content

turbo_tasks_fuzz/
symlink_stress.rs

1use std::{
2    path::{Path, PathBuf},
3    time::{Duration, Instant},
4};
5
6const PROGRESS_INTERVAL: Duration = Duration::from_secs(1);
7
8use clap::Args;
9use rand::{RngExt, SeedableRng};
10use turbo_rcstr::{RcStr, rcstr};
11use turbo_tasks::{ResolvedVc, TryJoinIterExt, Vc, apply_effects};
12use turbo_tasks_backend::{BackendOptions, TurboTasksBackend, noop_backing_storage};
13use turbo_tasks_fs::{DiskFileSystem, FileSystem, FileSystemPath, LinkContent, LinkType};
14
15#[derive(Args)]
16pub struct SymlinkStress {
17    #[arg(long)]
18    fs_root: PathBuf,
19    /// Number of target directories symlinks can point to.
20    #[arg(long, default_value_t = 20)]
21    target_count: usize,
22    /// Number of symlinks to create and update.
23    #[arg(long, default_value_t = 50)]
24    symlink_count: usize,
25    /// Number of symlink writes to perform in parallel.
26    #[arg(long, default_value_t = 16)]
27    parallelism: usize,
28    /// How long to run the stress test for.
29    #[arg(long, default_value_t = 5)]
30    duration_secs: u64,
31}
32
33pub async fn run(args: SymlinkStress) -> anyhow::Result<()> {
34    std::fs::create_dir(&args.fs_root)?;
35    let fs_root = args.fs_root.canonicalize()?;
36    let _guard = FsCleanup {
37        path: &fs_root.clone(),
38    };
39
40    // Create target directories that symlinks will point to
41    let targets_dir = fs_root.join("_targets");
42    std::fs::create_dir(&targets_dir)?;
43    for i in 0..args.target_count {
44        std::fs::create_dir(targets_dir.join(i.to_string()))?;
45    }
46
47    // Create symlinks directory
48    let symlinks_dir = fs_root.join("_symlinks");
49    std::fs::create_dir(&symlinks_dir)?;
50
51    let tt = turbo_tasks::TurboTasks::new(TurboTasksBackend::new(
52        BackendOptions::default(),
53        noop_backing_storage(),
54    ));
55
56    let target_count = args.target_count;
57    let symlink_count = args.symlink_count;
58    let parallelism = args.parallelism;
59    let duration = Duration::from_secs(args.duration_secs);
60
61    tt.run_once(async move {
62        let project_fs = disk_file_system_operation(RcStr::from(fs_root.to_str().unwrap()))
63            .resolve_strongly_consistent()
64            .await?;
65        let project_root = disk_file_system_root_operation(project_fs)
66            .resolve_strongly_consistent()
67            .await?
68            .owned()
69            .await?;
70
71        // Create initial symlinks via turbo-tasks, all pointing to target 0
72        let symlinks_path = project_root.join("_symlinks")?;
73        let initial_target = RcStr::from("../_targets/0");
74
75        println!("creating {symlink_count} initial symlinks...");
76
77        let initial_op =
78            create_initial_symlinks_operation(symlinks_path.clone(), symlink_count, initial_target);
79        initial_op.read_strongly_consistent().await?;
80        apply_effects(initial_op).await?;
81
82        println!(
83            "starting stress test with parallelism={} for {}s...",
84            parallelism,
85            duration.as_secs()
86        );
87
88        let mut rng = rand::rngs::SmallRng::from_rng(&mut rand::rng());
89        let mut total_writes: u64 = 0;
90        let mut last_progress_writes: u64 = 0;
91        let start_time = Instant::now();
92        let mut last_progress_time = start_time;
93
94        loop {
95            // Check if we've reached the duration limit
96            if start_time.elapsed() >= duration {
97                break;
98            }
99
100            // Generate random symlink updates for this batch
101            let updates: Vec<(usize, usize)> = (0..parallelism)
102                .map(|_| {
103                    let symlink_idx = rng.random_range(0..symlink_count);
104                    let target_idx = rng.random_range(0..target_count);
105                    (symlink_idx, target_idx)
106                })
107                .collect();
108
109            // Execute writes in parallel via turbo-tasks
110            let write_op = write_symlinks_batch_operation(symlinks_path.clone(), updates);
111            write_op.read_strongly_consistent().await?;
112            apply_effects(write_op).await?;
113
114            total_writes += parallelism as u64;
115
116            // Print progress every PROGRESS_INTERVAL
117            let now = Instant::now();
118            if now.duration_since(last_progress_time) >= PROGRESS_INTERVAL {
119                let interval_writes = total_writes - last_progress_writes;
120                let interval_duration = now.duration_since(last_progress_time);
121                let writes_per_sec = interval_writes as f64 / interval_duration.as_secs_f64();
122                println!(
123                    "{:.1}s: {} writes, {:.0} writes/sec",
124                    start_time.elapsed().as_secs_f64(),
125                    total_writes,
126                    writes_per_sec
127                );
128                last_progress_time = now;
129                last_progress_writes = total_writes;
130            }
131        }
132
133        // Final summary
134        let elapsed = start_time.elapsed();
135        let writes_per_sec = total_writes as f64 / elapsed.as_secs_f64();
136        println!(
137            "completed {} symlink writes in {:.2}s ({:.0} writes/sec)",
138            total_writes,
139            elapsed.as_secs_f64(),
140            writes_per_sec
141        );
142
143        Ok(())
144    })
145    .await?;
146
147    tt.stop_and_wait().await;
148    Ok(())
149}
150
151#[turbo_tasks::function(operation)]
152fn disk_file_system_operation(fs_root: RcStr) -> Vc<DiskFileSystem> {
153    DiskFileSystem::new(rcstr!("project"), fs_root)
154}
155
156#[turbo_tasks::function(operation)]
157fn disk_file_system_root_operation(fs: ResolvedVc<DiskFileSystem>) -> Vc<FileSystemPath> {
158    fs.root()
159}
160
161#[turbo_tasks::function(operation)]
162async fn create_initial_symlinks_operation(
163    symlinks_dir: FileSystemPath,
164    count: usize,
165    target: RcStr,
166) -> anyhow::Result<()> {
167    (0..count)
168        .map(|i| write_symlink(symlinks_dir.clone(), i, target.clone()))
169        .try_join()
170        .await?;
171    Ok(())
172}
173
174#[turbo_tasks::function(operation)]
175async fn write_symlinks_batch_operation(
176    symlinks_dir: FileSystemPath,
177    updates: Vec<(usize, usize)>,
178) -> anyhow::Result<()> {
179    updates
180        .into_iter()
181        .map(|(symlink_idx, target_idx)| {
182            let target = RcStr::from(format!("../_targets/{}", target_idx));
183            write_symlink(symlinks_dir.clone(), symlink_idx, target)
184        })
185        .try_join()
186        .await?;
187    Ok(())
188}
189
190#[turbo_tasks::function]
191async fn write_symlink(
192    symlinks_dir: FileSystemPath,
193    symlink_idx: usize,
194    target: RcStr,
195) -> anyhow::Result<()> {
196    let symlink_path = symlinks_dir.join(&symlink_idx.to_string())?;
197    let link_content = LinkContent::Link {
198        target,
199        link_type: LinkType::DIRECTORY,
200    };
201    symlink_path
202        .fs()
203        .write_link(symlink_path.clone(), link_content.cell())
204        .await?;
205    Ok(())
206}
207
208struct FsCleanup<'a> {
209    path: &'a Path,
210}
211
212impl Drop for FsCleanup<'_> {
213    fn drop(&mut self) {
214        std::fs::remove_dir_all(self.path).unwrap();
215    }
216}