Skip to main content

turbopack_node/worker_pool/
mod.rs

1use std::{
2    path::PathBuf,
3    sync::{
4        Arc,
5        atomic::{AtomicU32, Ordering},
6    },
7};
8
9use anyhow::{Context, Result};
10use rustc_hash::FxHashMap;
11use tokio::{
12    select,
13    sync::{Semaphore, oneshot},
14    time::sleep,
15};
16use turbo_rcstr::{RcStr, rcstr};
17use turbo_tasks::{ResolvedVc, duration_span};
18use turbo_tasks_fs::FileSystemPath;
19
20use crate::{
21    AssetsForSourceMapping,
22    backend::{CreatePoolFuture, CreatePoolOptions, NodeBackend},
23    evaluate::{EvaluateOperation, EvaluatePool, Operation},
24    pool_stats::{AcquiredPermits, PoolStatsSnapshot},
25    worker_pool::{
26        operation::{
27            PoolState, TaskChannels, WORKER_POOL_OPERATION, WorkerOperation, WorkerOptions,
28            get_pool_state,
29        },
30        worker_thread::create_worker,
31    },
32};
33
34mod operation;
35mod worker_thread;
36
37static OPERATION_TASK_ID: AtomicU32 = AtomicU32::new(1);
38
39#[turbo_tasks::value(cell = "new", serialization = "none", eq = "manual", shared)]
40pub(crate) struct WorkerThreadPool {
41    worker_options: Arc<WorkerOptions>,
42    concurrency: usize,
43    pub(crate) assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
44    pub(crate) assets_root: FileSystemPath,
45    pub(crate) project_dir: FileSystemPath,
46    #[turbo_tasks(trace_ignore, debug_ignore)]
47    state: Arc<PoolState>,
48    #[turbo_tasks(trace_ignore, debug_ignore)]
49    concurrency_semaphore: Arc<Semaphore>,
50    #[turbo_tasks(trace_ignore, debug_ignore)]
51    bootup_semaphore: Arc<Semaphore>,
52}
53
54impl WorkerThreadPool {
55    pub(crate) async fn create(
56        cwd: PathBuf,
57        entrypoint: PathBuf,
58        // The worker thread will inherit env from parent process, so it's not needed
59        _env: FxHashMap<RcStr, RcStr>,
60        assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
61        assets_root: FileSystemPath,
62        project_dir: FileSystemPath,
63        concurrency: usize,
64        debug: bool,
65    ) -> EvaluatePool {
66        let cwd: RcStr = cwd.to_string_lossy().into();
67        let filename: RcStr = entrypoint.to_string_lossy().into();
68        let worker_options = Arc::new(WorkerOptions { cwd, filename });
69        let state = get_pool_state(worker_options.clone()).await;
70        EvaluatePool::new(
71            Box::new(Self {
72                worker_options,
73                concurrency: (if debug { 1 } else { concurrency }),
74                assets_for_source_mapping,
75                assets_root: assets_root.clone(),
76                project_dir: project_dir.clone(),
77                state,
78                concurrency_semaphore: Arc::new(Semaphore::new(if debug {
79                    1
80                } else {
81                    concurrency
82                })),
83                bootup_semaphore: Arc::new(Semaphore::new(1)),
84            }) as Box<dyn EvaluateOperation>,
85            assets_for_source_mapping,
86            assets_root,
87            project_dir,
88        )
89    }
90
91    async fn acquire_worker(&self) -> Result<(u32, AcquiredPermits)> {
92        let concurrency_permit = self.concurrency_semaphore.clone().acquire_owned().await?;
93
94        {
95            let mut idle = self.state.idle_workers.lock();
96            if let Some(worker_id) = idle.pop() {
97                return Ok((
98                    worker_id,
99                    AcquiredPermits::Idle {
100                        _concurrency_permit: concurrency_permit,
101                    },
102                ));
103            }
104        }
105
106        let (tx, rx) = oneshot::channel();
107        {
108            let mut waiters = self.state.waiters.lock();
109            let mut idle = self.state.idle_workers.lock();
110            if let Some(worker_id) = idle.pop() {
111                return Ok((
112                    worker_id,
113                    AcquiredPermits::Idle {
114                        _concurrency_permit: concurrency_permit,
115                    },
116                ));
117            }
118            waiters.push(tx);
119        }
120
121        let bootup = async {
122            let permit = self.bootup_semaphore.clone().acquire_owned().await;
123            let wait_time = self.state.stats.lock().wait_time_before_bootup();
124            sleep(wait_time).await;
125            permit
126        };
127
128        select! {
129            worker_id = rx => {
130                let worker_id = worker_id?;
131                Ok((worker_id, AcquiredPermits::Idle { _concurrency_permit: concurrency_permit }))
132            }
133            bootup_permit = bootup => {
134                let bootup_permit = bootup_permit.context("acquiring bootup permit")?;
135                {
136                    self.state.stats.lock().add_booting_worker();
137                }
138                let worker_id = create_worker(self.worker_options.clone()).await?;
139
140                {
141                    let mut stats = self.state.stats.lock();
142                    stats.finished_booting_worker();
143                }
144
145                self.bootup_semaphore.add_permits(1);
146                Ok((worker_id, AcquiredPermits::Fresh { _concurrency_permit: concurrency_permit, _bootup_permit: bootup_permit }))
147            }
148        }
149    }
150}
151
152#[turbo_tasks::value(shared)]
153pub(crate) struct WorkerThreadsBackend;
154
155#[turbo_tasks::value_impl]
156impl NodeBackend for WorkerThreadsBackend {
157    fn runtime_module_path(&self) -> RcStr {
158        rcstr!("worker_thread/evaluate.ts")
159    }
160
161    fn globals_module_path(&self) -> RcStr {
162        rcstr!("worker_thread/globals.ts")
163    }
164
165    fn create_pool(&self, options: CreatePoolOptions) -> CreatePoolFuture {
166        Box::pin(async move {
167            let CreatePoolOptions {
168                cwd,
169                entrypoint,
170                env,
171                assets_for_source_mapping,
172                assets_root,
173                project_dir,
174                concurrency,
175                debug,
176            } = options;
177
178            Ok(WorkerThreadPool::create(
179                cwd,
180                entrypoint,
181                env,
182                assets_for_source_mapping,
183                assets_root,
184                project_dir,
185                concurrency,
186                debug,
187            )
188            .await)
189        })
190    }
191
192    fn scale_down(&self) -> Result<()> {
193        WorkerThreadPool::scale_down();
194        Ok(())
195    }
196
197    fn scale_zero(&self) -> Result<()> {
198        WorkerThreadPool::scale_zero();
199        Ok(())
200    }
201}
202
203impl WorkerThreadPool {
204    pub fn scale_down() {
205        let _ = WORKER_POOL_OPERATION.scale_down();
206    }
207
208    pub fn scale_zero() {
209        let _ = WORKER_POOL_OPERATION.scale_zero();
210    }
211}
212
213#[async_trait::async_trait]
214impl EvaluateOperation for WorkerThreadPool {
215    async fn operation(&self) -> Result<Box<dyn Operation>> {
216        let operation = {
217            let _guard = duration_span!("Node.js operation");
218            let worker_options = self.worker_options.clone();
219
220            let task_id = OPERATION_TASK_ID.fetch_add(1, Ordering::Release);
221
222            if task_id == 0 {
223                panic!("Node.js operation task id overflow")
224            }
225
226            let (worker_id, permits) = self.acquire_worker().await?;
227
228            let state = self.state.clone();
229
230            // Pre-allocate channels for this task to avoid HashMap lookups during communication
231            let channels = TaskChannels::new(task_id, worker_id);
232
233            WorkerOperation {
234                worker_options,
235                worker_id,
236                state: state.clone(),
237                on_drop: Some(Box::new(move |worker_id| {
238                    let mut waiters = state.waiters.lock();
239                    loop {
240                        if let Some(tx) = waiters.pop() {
241                            if tx.send(worker_id).is_ok() {
242                                break;
243                            }
244                        } else {
245                            state.idle_workers.lock().push(worker_id);
246                            break;
247                        }
248                    }
249                })),
250                _permits: permits,
251                channels,
252            }
253        };
254
255        Ok(Box::new(operation))
256    }
257
258    /// Returns a snapshot of the pool's internal statistics.
259    fn stats(&self) -> PoolStatsSnapshot {
260        self.state.stats.lock().snapshot()
261    }
262
263    fn pre_warm(&self) {
264        // TODO: This is a no-op for worker_pool right now, only process_pool implements it
265    }
266}