turbopack_node/worker_pool/
mod.rs1use std::{
2 path::PathBuf,
3 sync::{
4 Arc,
5 atomic::{AtomicU32, Ordering},
6 },
7};
8
9use anyhow::{Context, Result};
10use rustc_hash::FxHashMap;
11use tokio::{
12 select,
13 sync::{Semaphore, oneshot},
14 time::sleep,
15};
16use turbo_rcstr::{RcStr, rcstr};
17use turbo_tasks::{ResolvedVc, duration_span};
18use turbo_tasks_fs::FileSystemPath;
19
20use crate::{
21 AssetsForSourceMapping,
22 backend::{CreatePoolFuture, CreatePoolOptions, NodeBackend},
23 evaluate::{EvaluateOperation, EvaluatePool, Operation},
24 pool_stats::{AcquiredPermits, PoolStatsSnapshot},
25 worker_pool::{
26 operation::{
27 PoolState, TaskChannels, WORKER_POOL_OPERATION, WorkerOperation, WorkerOptions,
28 get_pool_state,
29 },
30 worker_thread::create_worker,
31 },
32};
33
34mod operation;
35mod worker_thread;
36
37static OPERATION_TASK_ID: AtomicU32 = AtomicU32::new(1);
38
39#[turbo_tasks::value(cell = "new", serialization = "none", eq = "manual", shared)]
40pub(crate) struct WorkerThreadPool {
41 worker_options: Arc<WorkerOptions>,
42 concurrency: usize,
43 pub(crate) assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
44 pub(crate) assets_root: FileSystemPath,
45 pub(crate) project_dir: FileSystemPath,
46 #[turbo_tasks(trace_ignore, debug_ignore)]
47 state: Arc<PoolState>,
48 #[turbo_tasks(trace_ignore, debug_ignore)]
49 concurrency_semaphore: Arc<Semaphore>,
50 #[turbo_tasks(trace_ignore, debug_ignore)]
51 bootup_semaphore: Arc<Semaphore>,
52}
53
54impl WorkerThreadPool {
55 pub(crate) async fn create(
56 cwd: PathBuf,
57 entrypoint: PathBuf,
58 _env: FxHashMap<RcStr, RcStr>,
60 assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
61 assets_root: FileSystemPath,
62 project_dir: FileSystemPath,
63 concurrency: usize,
64 debug: bool,
65 ) -> EvaluatePool {
66 let cwd: RcStr = cwd.to_string_lossy().into();
67 let filename: RcStr = entrypoint.to_string_lossy().into();
68 let worker_options = Arc::new(WorkerOptions { cwd, filename });
69 let state = get_pool_state(worker_options.clone()).await;
70 EvaluatePool::new(
71 Box::new(Self {
72 worker_options,
73 concurrency: (if debug { 1 } else { concurrency }),
74 assets_for_source_mapping,
75 assets_root: assets_root.clone(),
76 project_dir: project_dir.clone(),
77 state,
78 concurrency_semaphore: Arc::new(Semaphore::new(if debug {
79 1
80 } else {
81 concurrency
82 })),
83 bootup_semaphore: Arc::new(Semaphore::new(1)),
84 }) as Box<dyn EvaluateOperation>,
85 assets_for_source_mapping,
86 assets_root,
87 project_dir,
88 )
89 }
90
91 async fn acquire_worker(&self) -> Result<(u32, AcquiredPermits)> {
92 let concurrency_permit = self.concurrency_semaphore.clone().acquire_owned().await?;
93
94 {
95 let mut idle = self.state.idle_workers.lock();
96 if let Some(worker_id) = idle.pop() {
97 return Ok((
98 worker_id,
99 AcquiredPermits::Idle {
100 _concurrency_permit: concurrency_permit,
101 },
102 ));
103 }
104 }
105
106 let (tx, rx) = oneshot::channel();
107 {
108 let mut waiters = self.state.waiters.lock();
109 let mut idle = self.state.idle_workers.lock();
110 if let Some(worker_id) = idle.pop() {
111 return Ok((
112 worker_id,
113 AcquiredPermits::Idle {
114 _concurrency_permit: concurrency_permit,
115 },
116 ));
117 }
118 waiters.push(tx);
119 }
120
121 let bootup = async {
122 let permit = self.bootup_semaphore.clone().acquire_owned().await;
123 let wait_time = self.state.stats.lock().wait_time_before_bootup();
124 sleep(wait_time).await;
125 permit
126 };
127
128 select! {
129 worker_id = rx => {
130 let worker_id = worker_id?;
131 Ok((worker_id, AcquiredPermits::Idle { _concurrency_permit: concurrency_permit }))
132 }
133 bootup_permit = bootup => {
134 let bootup_permit = bootup_permit.context("acquiring bootup permit")?;
135 {
136 self.state.stats.lock().add_booting_worker();
137 }
138 let worker_id = create_worker(self.worker_options.clone()).await?;
139
140 {
141 let mut stats = self.state.stats.lock();
142 stats.finished_booting_worker();
143 }
144
145 self.bootup_semaphore.add_permits(1);
146 Ok((worker_id, AcquiredPermits::Fresh { _concurrency_permit: concurrency_permit, _bootup_permit: bootup_permit }))
147 }
148 }
149 }
150}
151
152#[turbo_tasks::value(shared)]
153pub(crate) struct WorkerThreadsBackend;
154
155#[turbo_tasks::value_impl]
156impl NodeBackend for WorkerThreadsBackend {
157 fn runtime_module_path(&self) -> RcStr {
158 rcstr!("worker_thread/evaluate.ts")
159 }
160
161 fn globals_module_path(&self) -> RcStr {
162 rcstr!("worker_thread/globals.ts")
163 }
164
165 fn create_pool(&self, options: CreatePoolOptions) -> CreatePoolFuture {
166 Box::pin(async move {
167 let CreatePoolOptions {
168 cwd,
169 entrypoint,
170 env,
171 assets_for_source_mapping,
172 assets_root,
173 project_dir,
174 concurrency,
175 debug,
176 } = options;
177
178 Ok(WorkerThreadPool::create(
179 cwd,
180 entrypoint,
181 env,
182 assets_for_source_mapping,
183 assets_root,
184 project_dir,
185 concurrency,
186 debug,
187 )
188 .await)
189 })
190 }
191
192 fn scale_down(&self) -> Result<()> {
193 WorkerThreadPool::scale_down();
194 Ok(())
195 }
196
197 fn scale_zero(&self) -> Result<()> {
198 WorkerThreadPool::scale_zero();
199 Ok(())
200 }
201}
202
203impl WorkerThreadPool {
204 pub fn scale_down() {
205 let _ = WORKER_POOL_OPERATION.scale_down();
206 }
207
208 pub fn scale_zero() {
209 let _ = WORKER_POOL_OPERATION.scale_zero();
210 }
211}
212
213#[async_trait::async_trait]
214impl EvaluateOperation for WorkerThreadPool {
215 async fn operation(&self) -> Result<Box<dyn Operation>> {
216 let operation = {
217 let _guard = duration_span!("Node.js operation");
218 let worker_options = self.worker_options.clone();
219
220 let task_id = OPERATION_TASK_ID.fetch_add(1, Ordering::Release);
221
222 if task_id == 0 {
223 panic!("Node.js operation task id overflow")
224 }
225
226 let (worker_id, permits) = self.acquire_worker().await?;
227
228 let state = self.state.clone();
229
230 let channels = TaskChannels::new(task_id, worker_id);
232
233 WorkerOperation {
234 worker_options,
235 worker_id,
236 state: state.clone(),
237 on_drop: Some(Box::new(move |worker_id| {
238 let mut waiters = state.waiters.lock();
239 loop {
240 if let Some(tx) = waiters.pop() {
241 if tx.send(worker_id).is_ok() {
242 break;
243 }
244 } else {
245 state.idle_workers.lock().push(worker_id);
246 break;
247 }
248 }
249 })),
250 _permits: permits,
251 channels,
252 }
253 };
254
255 Ok(Box::new(operation))
256 }
257
258 fn stats(&self) -> PoolStatsSnapshot {
260 self.state.stats.lock().snapshot()
261 }
262
263 fn pre_warm(&self) {
264 }
266}