turbopack_node/worker_pool/
mod.rs1use std::{
2 path::PathBuf,
3 sync::{
4 Arc,
5 atomic::{AtomicU32, Ordering},
6 },
7};
8
9use anyhow::{Context, Result};
10use rustc_hash::FxHashMap;
11use tokio::{
12 select,
13 sync::{Semaphore, oneshot},
14 time::sleep,
15};
16use turbo_rcstr::{RcStr, rcstr};
17use turbo_tasks::{ResolvedVc, duration_span};
18use turbo_tasks_fs::FileSystemPath;
19
20use crate::{
21 AssetsForSourceMapping,
22 backend::{CreatePoolFuture, CreatePoolOptions, NodeBackend},
23 evaluate::{EvaluateOperation, EvaluatePool, Operation},
24 pool_stats::{AcquiredPermits, PoolStatsSnapshot},
25 worker_pool::{
26 operation::{
27 PoolState, TaskChannels, WORKER_POOL_OPERATION, WorkerOperation, WorkerOptions,
28 get_pool_state,
29 },
30 worker_thread::create_worker,
31 },
32};
33
34mod operation;
35mod worker_thread;
36
37static OPERATION_TASK_ID: AtomicU32 = AtomicU32::new(1);
38
39#[turbo_tasks::value(
40 cell = "new",
41 serialization = "skip",
42 evict = "last",
43 eq = "manual",
44 shared
45)]
46pub(crate) struct WorkerThreadPool {
47 worker_options: Arc<WorkerOptions>,
48 concurrency: usize,
49 pub(crate) assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
50 pub(crate) assets_root: FileSystemPath,
51 pub(crate) project_dir: FileSystemPath,
52 #[turbo_tasks(trace_ignore, debug_ignore)]
53 state: Arc<PoolState>,
54 #[turbo_tasks(trace_ignore, debug_ignore)]
55 concurrency_semaphore: Arc<Semaphore>,
56 #[turbo_tasks(trace_ignore, debug_ignore)]
57 bootup_semaphore: Arc<Semaphore>,
58}
59
60impl WorkerThreadPool {
61 pub(crate) async fn create(
62 cwd: PathBuf,
63 entrypoint: PathBuf,
64 _env: FxHashMap<RcStr, RcStr>,
66 assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
67 assets_root: FileSystemPath,
68 project_dir: FileSystemPath,
69 concurrency: usize,
70 debug: bool,
71 ) -> EvaluatePool {
72 let cwd: RcStr = cwd.to_string_lossy().into();
73 let filename: RcStr = entrypoint.to_string_lossy().into();
74 let worker_options = Arc::new(WorkerOptions { cwd, filename });
75 let state = get_pool_state(worker_options.clone()).await;
76 EvaluatePool::new(
77 Box::new(Self {
78 worker_options,
79 concurrency: (if debug { 1 } else { concurrency }),
80 assets_for_source_mapping,
81 assets_root: assets_root.clone(),
82 project_dir: project_dir.clone(),
83 state,
84 concurrency_semaphore: Arc::new(Semaphore::new(if debug {
85 1
86 } else {
87 concurrency
88 })),
89 bootup_semaphore: Arc::new(Semaphore::new(1)),
90 }) as Box<dyn EvaluateOperation>,
91 assets_for_source_mapping,
92 assets_root,
93 project_dir,
94 )
95 }
96
97 async fn acquire_worker(&self) -> Result<(u32, AcquiredPermits)> {
98 let concurrency_permit = self.concurrency_semaphore.clone().acquire_owned().await?;
99
100 {
101 let mut idle = self.state.idle_workers.lock();
102 if let Some(worker_id) = idle.pop() {
103 return Ok((
104 worker_id,
105 AcquiredPermits::Idle {
106 _concurrency_permit: concurrency_permit,
107 },
108 ));
109 }
110 }
111
112 let (tx, rx) = oneshot::channel();
113 {
114 let mut waiters = self.state.waiters.lock();
115 let mut idle = self.state.idle_workers.lock();
116 if let Some(worker_id) = idle.pop() {
117 return Ok((
118 worker_id,
119 AcquiredPermits::Idle {
120 _concurrency_permit: concurrency_permit,
121 },
122 ));
123 }
124 waiters.push(tx);
125 }
126
127 let bootup = async {
128 let permit = self.bootup_semaphore.clone().acquire_owned().await;
129 let wait_time = self.state.stats.lock().wait_time_before_bootup();
130 sleep(wait_time).await;
131 permit
132 };
133
134 select! {
135 worker_id = rx => {
136 let worker_id = worker_id?;
137 Ok((worker_id, AcquiredPermits::Idle { _concurrency_permit: concurrency_permit }))
138 }
139 bootup_permit = bootup => {
140 let bootup_permit = bootup_permit.context("acquiring bootup permit")?;
141 {
142 self.state.stats.lock().add_booting_worker();
143 }
144 let worker_id = create_worker(self.worker_options.clone()).await?;
145
146 {
147 let mut stats = self.state.stats.lock();
148 stats.finished_booting_worker();
149 }
150
151 self.bootup_semaphore.add_permits(1);
152 Ok((worker_id, AcquiredPermits::Fresh { _concurrency_permit: concurrency_permit, _bootup_permit: bootup_permit }))
153 }
154 }
155 }
156}
157
158#[turbo_tasks::value(shared)]
159pub(crate) struct WorkerThreadsBackend;
160
161#[turbo_tasks::value_impl]
162impl NodeBackend for WorkerThreadsBackend {
163 fn runtime_module_path(&self) -> RcStr {
164 rcstr!("worker_thread/evaluate.ts")
165 }
166
167 fn globals_module_path(&self) -> RcStr {
168 rcstr!("worker_thread/globals.ts")
169 }
170
171 fn create_pool(&self, options: CreatePoolOptions) -> CreatePoolFuture {
172 Box::pin(async move {
173 let CreatePoolOptions {
174 cwd,
175 entrypoint,
176 env,
177 assets_for_source_mapping,
178 assets_root,
179 project_dir,
180 concurrency,
181 debug,
182 } = options;
183
184 Ok(WorkerThreadPool::create(
185 cwd,
186 entrypoint,
187 env,
188 assets_for_source_mapping,
189 assets_root,
190 project_dir,
191 concurrency,
192 debug,
193 )
194 .await)
195 })
196 }
197
198 fn scale_down(&self) -> Result<()> {
199 WorkerThreadPool::scale_down();
200 Ok(())
201 }
202
203 fn scale_zero(&self) -> Result<()> {
204 WorkerThreadPool::scale_zero();
205 Ok(())
206 }
207}
208
209impl WorkerThreadPool {
210 pub fn scale_down() {
211 let _ = WORKER_POOL_OPERATION.scale_down();
212 }
213
214 pub fn scale_zero() {
215 let _ = WORKER_POOL_OPERATION.scale_zero();
216 }
217}
218
219#[async_trait::async_trait]
220impl EvaluateOperation for WorkerThreadPool {
221 async fn operation(&self) -> Result<Box<dyn Operation>> {
222 let operation = {
223 let _guard = duration_span!("Node.js operation");
224 let worker_options = self.worker_options.clone();
225
226 let task_id = OPERATION_TASK_ID.fetch_add(1, Ordering::Release);
227
228 if task_id == 0 {
229 panic!("Node.js operation task id overflow")
230 }
231
232 let (worker_id, permits) = self.acquire_worker().await?;
233
234 let state = self.state.clone();
235
236 let channels = TaskChannels::new(task_id, worker_id);
238
239 WorkerOperation {
240 worker_options,
241 worker_id,
242 state: state.clone(),
243 on_drop: Some(Box::new(move |worker_id| {
244 let mut waiters = state.waiters.lock();
245 loop {
246 if let Some(tx) = waiters.pop() {
247 if tx.send(worker_id).is_ok() {
248 break;
249 }
250 } else {
251 state.idle_workers.lock().push(worker_id);
252 break;
253 }
254 }
255 })),
256 _permits: permits,
257 channels,
258 }
259 };
260
261 Ok(Box::new(operation))
262 }
263
264 fn stats(&self) -> PoolStatsSnapshot {
266 self.state.stats.lock().snapshot()
267 }
268
269 fn pre_warm(&self) {
270 }
272}