Skip to main content

turbopack_node/worker_pool/
mod.rs

1use std::{
2    path::PathBuf,
3    sync::{
4        Arc,
5        atomic::{AtomicU32, Ordering},
6    },
7};
8
9use anyhow::{Context, Result};
10use rustc_hash::FxHashMap;
11use tokio::{
12    select,
13    sync::{Semaphore, oneshot},
14    time::sleep,
15};
16use turbo_rcstr::{RcStr, rcstr};
17use turbo_tasks::{ResolvedVc, duration_span};
18use turbo_tasks_fs::FileSystemPath;
19
20use crate::{
21    AssetsForSourceMapping,
22    backend::{CreatePoolFuture, CreatePoolOptions, NodeBackend},
23    evaluate::{EvaluateOperation, EvaluatePool, Operation},
24    pool_stats::{AcquiredPermits, PoolStatsSnapshot},
25    worker_pool::{
26        operation::{
27            PoolState, TaskChannels, WORKER_POOL_OPERATION, WorkerOperation, WorkerOptions,
28            get_pool_state,
29        },
30        worker_thread::create_worker,
31    },
32};
33
34mod operation;
35mod worker_thread;
36
37static OPERATION_TASK_ID: AtomicU32 = AtomicU32::new(1);
38
39#[turbo_tasks::value(
40    cell = "new",
41    serialization = "skip",
42    evict = "last",
43    eq = "manual",
44    shared
45)]
46pub(crate) struct WorkerThreadPool {
47    worker_options: Arc<WorkerOptions>,
48    concurrency: usize,
49    pub(crate) assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
50    pub(crate) assets_root: FileSystemPath,
51    pub(crate) project_dir: FileSystemPath,
52    #[turbo_tasks(trace_ignore, debug_ignore)]
53    state: Arc<PoolState>,
54    #[turbo_tasks(trace_ignore, debug_ignore)]
55    concurrency_semaphore: Arc<Semaphore>,
56    #[turbo_tasks(trace_ignore, debug_ignore)]
57    bootup_semaphore: Arc<Semaphore>,
58}
59
60impl WorkerThreadPool {
61    pub(crate) async fn create(
62        cwd: PathBuf,
63        entrypoint: PathBuf,
64        // The worker thread will inherit env from parent process, so it's not needed
65        _env: FxHashMap<RcStr, RcStr>,
66        assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
67        assets_root: FileSystemPath,
68        project_dir: FileSystemPath,
69        concurrency: usize,
70        debug: bool,
71    ) -> EvaluatePool {
72        let cwd: RcStr = cwd.to_string_lossy().into();
73        let filename: RcStr = entrypoint.to_string_lossy().into();
74        let worker_options = Arc::new(WorkerOptions { cwd, filename });
75        let state = get_pool_state(worker_options.clone()).await;
76        EvaluatePool::new(
77            Box::new(Self {
78                worker_options,
79                concurrency: (if debug { 1 } else { concurrency }),
80                assets_for_source_mapping,
81                assets_root: assets_root.clone(),
82                project_dir: project_dir.clone(),
83                state,
84                concurrency_semaphore: Arc::new(Semaphore::new(if debug {
85                    1
86                } else {
87                    concurrency
88                })),
89                bootup_semaphore: Arc::new(Semaphore::new(1)),
90            }) as Box<dyn EvaluateOperation>,
91            assets_for_source_mapping,
92            assets_root,
93            project_dir,
94        )
95    }
96
97    async fn acquire_worker(&self) -> Result<(u32, AcquiredPermits)> {
98        let concurrency_permit = self.concurrency_semaphore.clone().acquire_owned().await?;
99
100        {
101            let mut idle = self.state.idle_workers.lock();
102            if let Some(worker_id) = idle.pop() {
103                return Ok((
104                    worker_id,
105                    AcquiredPermits::Idle {
106                        _concurrency_permit: concurrency_permit,
107                    },
108                ));
109            }
110        }
111
112        let (tx, rx) = oneshot::channel();
113        {
114            let mut waiters = self.state.waiters.lock();
115            let mut idle = self.state.idle_workers.lock();
116            if let Some(worker_id) = idle.pop() {
117                return Ok((
118                    worker_id,
119                    AcquiredPermits::Idle {
120                        _concurrency_permit: concurrency_permit,
121                    },
122                ));
123            }
124            waiters.push(tx);
125        }
126
127        let bootup = async {
128            let permit = self.bootup_semaphore.clone().acquire_owned().await;
129            let wait_time = self.state.stats.lock().wait_time_before_bootup();
130            sleep(wait_time).await;
131            permit
132        };
133
134        select! {
135            worker_id = rx => {
136                let worker_id = worker_id?;
137                Ok((worker_id, AcquiredPermits::Idle { _concurrency_permit: concurrency_permit }))
138            }
139            bootup_permit = bootup => {
140                let bootup_permit = bootup_permit.context("acquiring bootup permit")?;
141                {
142                    self.state.stats.lock().add_booting_worker();
143                }
144                let worker_id = create_worker(self.worker_options.clone()).await?;
145
146                {
147                    let mut stats = self.state.stats.lock();
148                    stats.finished_booting_worker();
149                }
150
151                self.bootup_semaphore.add_permits(1);
152                Ok((worker_id, AcquiredPermits::Fresh { _concurrency_permit: concurrency_permit, _bootup_permit: bootup_permit }))
153            }
154        }
155    }
156}
157
158#[turbo_tasks::value(shared)]
159pub(crate) struct WorkerThreadsBackend;
160
161#[turbo_tasks::value_impl]
162impl NodeBackend for WorkerThreadsBackend {
163    fn runtime_module_path(&self) -> RcStr {
164        rcstr!("worker_thread/evaluate.ts")
165    }
166
167    fn globals_module_path(&self) -> RcStr {
168        rcstr!("worker_thread/globals.ts")
169    }
170
171    fn create_pool(&self, options: CreatePoolOptions) -> CreatePoolFuture {
172        Box::pin(async move {
173            let CreatePoolOptions {
174                cwd,
175                entrypoint,
176                env,
177                assets_for_source_mapping,
178                assets_root,
179                project_dir,
180                concurrency,
181                debug,
182            } = options;
183
184            Ok(WorkerThreadPool::create(
185                cwd,
186                entrypoint,
187                env,
188                assets_for_source_mapping,
189                assets_root,
190                project_dir,
191                concurrency,
192                debug,
193            )
194            .await)
195        })
196    }
197
198    fn scale_down(&self) -> Result<()> {
199        WorkerThreadPool::scale_down();
200        Ok(())
201    }
202
203    fn scale_zero(&self) -> Result<()> {
204        WorkerThreadPool::scale_zero();
205        Ok(())
206    }
207}
208
209impl WorkerThreadPool {
210    pub fn scale_down() {
211        let _ = WORKER_POOL_OPERATION.scale_down();
212    }
213
214    pub fn scale_zero() {
215        let _ = WORKER_POOL_OPERATION.scale_zero();
216    }
217}
218
219#[async_trait::async_trait]
220impl EvaluateOperation for WorkerThreadPool {
221    async fn operation(&self) -> Result<Box<dyn Operation>> {
222        let operation = {
223            let _guard = duration_span!("Node.js operation");
224            let worker_options = self.worker_options.clone();
225
226            let task_id = OPERATION_TASK_ID.fetch_add(1, Ordering::Release);
227
228            if task_id == 0 {
229                panic!("Node.js operation task id overflow")
230            }
231
232            let (worker_id, permits) = self.acquire_worker().await?;
233
234            let state = self.state.clone();
235
236            // Pre-allocate channels for this task to avoid HashMap lookups during communication
237            let channels = TaskChannels::new(task_id, worker_id);
238
239            WorkerOperation {
240                worker_options,
241                worker_id,
242                state: state.clone(),
243                on_drop: Some(Box::new(move |worker_id| {
244                    let mut waiters = state.waiters.lock();
245                    loop {
246                        if let Some(tx) = waiters.pop() {
247                            if tx.send(worker_id).is_ok() {
248                                break;
249                            }
250                        } else {
251                            state.idle_workers.lock().push(worker_id);
252                            break;
253                        }
254                    }
255                })),
256                _permits: permits,
257                channels,
258            }
259        };
260
261        Ok(Box::new(operation))
262    }
263
264    /// Returns a snapshot of the pool's internal statistics.
265    fn stats(&self) -> PoolStatsSnapshot {
266        self.state.stats.lock().snapshot()
267    }
268
269    fn pre_warm(&self) {
270        // TODO: This is a no-op for worker_pool right now, only process_pool implements it
271    }
272}