1use std::{future::Future, ops::Deref, path::PathBuf, sync::Arc, time::Duration};
2
3use anyhow::{Context, Result, anyhow};
4use napi::{
5 JsFunction, JsObject, JsUnknown, NapiRaw, NapiValue, Status,
6 bindgen_prelude::{External, ToNapiValue},
7 threadsafe_function::{ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode},
8};
9use rustc_hash::FxHashMap;
10use serde::Serialize;
11use tokio::sync::mpsc::Receiver;
12use turbo_tasks::{
13 Effects, OperationVc, ReadRef, TaskId, TryJoinIterExt, TurboTasks, TurboTasksApi, UpdateInfo,
14 Vc, VcValueType, get_effects, message_queue::CompilationEvent,
15 task_statistics::TaskStatisticsApi, trace::TraceRawVcs,
16};
17use turbo_tasks_backend::{
18 DefaultBackingStorage, GitVersionInfo, NoopBackingStorage, default_backing_storage,
19 noop_backing_storage,
20};
21use turbo_tasks_fs::FileContent;
22use turbopack_core::{
23 diagnostics::{Diagnostic, DiagnosticContextExt, PlainDiagnostic},
24 error::PrettyPrintError,
25 issue::{
26 IssueDescriptionExt, IssueSeverity, PlainIssue, PlainIssueSource, PlainSource, StyledString,
27 },
28 source_pos::SourcePos,
29};
30
31use crate::util::log_internal_error_and_inform;
32
33#[derive(Clone)]
34pub enum NextTurboTasks {
35 Memory(Arc<TurboTasks<turbo_tasks_backend::TurboTasksBackend<NoopBackingStorage>>>),
36 PersistentCaching(
37 Arc<TurboTasks<turbo_tasks_backend::TurboTasksBackend<DefaultBackingStorage>>>,
38 ),
39}
40
41impl NextTurboTasks {
42 pub fn dispose_root_task(&self, task: TaskId) {
43 match self {
44 NextTurboTasks::Memory(turbo_tasks) => turbo_tasks.dispose_root_task(task),
45 NextTurboTasks::PersistentCaching(turbo_tasks) => turbo_tasks.dispose_root_task(task),
46 }
47 }
48
49 pub fn spawn_root_task<T, F, Fut>(&self, functor: F) -> TaskId
50 where
51 T: Send,
52 F: Fn() -> Fut + Send + Sync + Clone + 'static,
53 Fut: Future<Output = Result<Vc<T>>> + Send,
54 {
55 match self {
56 NextTurboTasks::Memory(turbo_tasks) => turbo_tasks.spawn_root_task(functor),
57 NextTurboTasks::PersistentCaching(turbo_tasks) => turbo_tasks.spawn_root_task(functor),
58 }
59 }
60
61 pub async fn run_once<T: TraceRawVcs + Send + 'static>(
62 &self,
63 future: impl Future<Output = Result<T>> + Send + 'static,
64 ) -> Result<T> {
65 match self {
66 NextTurboTasks::Memory(turbo_tasks) => turbo_tasks.run_once(future).await,
67 NextTurboTasks::PersistentCaching(turbo_tasks) => turbo_tasks.run_once(future).await,
68 }
69 }
70
71 pub fn spawn_once_task<T, Fut>(&self, future: Fut) -> TaskId
72 where
73 T: Send,
74 Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
75 {
76 match self {
77 NextTurboTasks::Memory(turbo_tasks) => turbo_tasks.spawn_once_task(future),
78 NextTurboTasks::PersistentCaching(turbo_tasks) => turbo_tasks.spawn_once_task(future),
79 }
80 }
81
82 pub async fn aggregated_update_info(
83 &self,
84 aggregation: Duration,
85 timeout: Duration,
86 ) -> Option<UpdateInfo> {
87 match self {
88 NextTurboTasks::Memory(turbo_tasks) => {
89 turbo_tasks
90 .aggregated_update_info(aggregation, timeout)
91 .await
92 }
93 NextTurboTasks::PersistentCaching(turbo_tasks) => {
94 turbo_tasks
95 .aggregated_update_info(aggregation, timeout)
96 .await
97 }
98 }
99 }
100
101 pub async fn get_or_wait_aggregated_update_info(&self, aggregation: Duration) -> UpdateInfo {
102 match self {
103 NextTurboTasks::Memory(turbo_tasks) => {
104 turbo_tasks
105 .get_or_wait_aggregated_update_info(aggregation)
106 .await
107 }
108 NextTurboTasks::PersistentCaching(turbo_tasks) => {
109 turbo_tasks
110 .get_or_wait_aggregated_update_info(aggregation)
111 .await
112 }
113 }
114 }
115
116 pub async fn stop_and_wait(&self) {
117 match self {
118 NextTurboTasks::Memory(turbo_tasks) => turbo_tasks.stop_and_wait().await,
119 NextTurboTasks::PersistentCaching(turbo_tasks) => turbo_tasks.stop_and_wait().await,
120 }
121 }
122
123 pub fn task_statistics(&self) -> &TaskStatisticsApi {
124 match self {
125 NextTurboTasks::Memory(turbo_tasks) => turbo_tasks.task_statistics(),
126 NextTurboTasks::PersistentCaching(turbo_tasks) => turbo_tasks.task_statistics(),
127 }
128 }
129
130 pub fn get_compilation_events_stream(
131 &self,
132 event_types: Option<Vec<String>>,
133 ) -> Receiver<Arc<dyn CompilationEvent>> {
134 match self {
135 NextTurboTasks::Memory(turbo_tasks) => {
136 turbo_tasks.subscribe_to_compilation_events(event_types)
137 }
138 NextTurboTasks::PersistentCaching(turbo_tasks) => {
139 turbo_tasks.subscribe_to_compilation_events(event_types)
140 }
141 }
142 }
143
144 pub fn send_compilation_event(&self, event: Arc<dyn CompilationEvent>) {
145 match self {
146 NextTurboTasks::Memory(turbo_tasks) => turbo_tasks.send_compilation_event(event),
147 NextTurboTasks::PersistentCaching(turbo_tasks) => {
148 turbo_tasks.send_compilation_event(event)
149 }
150 }
151 }
152}
153
154pub fn create_turbo_tasks(
155 output_path: PathBuf,
156 persistent_caching: bool,
157 _memory_limit: usize,
158 dependency_tracking: bool,
159) -> Result<NextTurboTasks> {
160 Ok(if persistent_caching {
161 let version_info = GitVersionInfo {
162 describe: env!("VERGEN_GIT_DESCRIBE"),
163 dirty: option_env!("CI").is_none_or(|value| value.is_empty())
164 && env!("VERGEN_GIT_DIRTY") == "true",
165 };
166 NextTurboTasks::PersistentCaching(TurboTasks::new(
167 turbo_tasks_backend::TurboTasksBackend::new(
168 turbo_tasks_backend::BackendOptions {
169 storage_mode: Some(if std::env::var("TURBO_ENGINE_READ_ONLY").is_ok() {
170 turbo_tasks_backend::StorageMode::ReadOnly
171 } else {
172 turbo_tasks_backend::StorageMode::ReadWrite
173 }),
174 dependency_tracking,
175 ..Default::default()
176 },
177 default_backing_storage(&output_path.join("cache/turbopack"), &version_info)?,
178 ),
179 ))
180 } else {
181 NextTurboTasks::Memory(TurboTasks::new(
182 turbo_tasks_backend::TurboTasksBackend::new(
183 turbo_tasks_backend::BackendOptions {
184 storage_mode: None,
185 dependency_tracking,
186 ..Default::default()
187 },
188 noop_backing_storage(),
189 ),
190 ))
191 })
192}
193
194#[derive(Clone)]
197pub struct VcArc<T> {
198 turbo_tasks: NextTurboTasks,
199 vc: OperationVc<T>,
201}
202
203impl<T> VcArc<T> {
204 pub fn new(turbo_tasks: NextTurboTasks, vc: OperationVc<T>) -> Self {
205 Self { turbo_tasks, vc }
206 }
207
208 pub fn turbo_tasks(&self) -> &NextTurboTasks {
209 &self.turbo_tasks
210 }
211}
212
213impl<T> Deref for VcArc<T> {
214 type Target = OperationVc<T>;
215
216 fn deref(&self) -> &Self::Target {
217 &self.vc
218 }
219}
220
221pub fn serde_enum_to_string<T: Serialize>(value: &T) -> Result<String> {
222 Ok(serde_json::to_value(value)?
223 .as_str()
224 .context("value must serialize to a string")?
225 .to_string())
226}
227
228pub struct RootTask {
230 #[allow(dead_code)]
231 turbo_tasks: NextTurboTasks,
232 #[allow(dead_code)]
233 task_id: Option<TaskId>,
234}
235
236impl Drop for RootTask {
237 fn drop(&mut self) {
238 }
240}
241
242#[napi]
243pub fn root_task_dispose(
244 #[napi(ts_arg_type = "{ __napiType: \"RootTask\" }")] mut root_task: External<RootTask>,
245) -> napi::Result<()> {
246 if let Some(task) = root_task.task_id.take() {
247 root_task.turbo_tasks.dispose_root_task(task);
248 }
249 Ok(())
250}
251
252pub async fn get_issues<T: Send>(source: OperationVc<T>) -> Result<Arc<Vec<ReadRef<PlainIssue>>>> {
253 let issues = source.peek_issues_with_path().await?;
254 Ok(Arc::new(issues.get_plain_issues().await?))
255}
256
257pub async fn get_diagnostics<T: Send>(
262 source: OperationVc<T>,
263) -> Result<Arc<Vec<ReadRef<PlainDiagnostic>>>> {
264 let captured_diags = source.peek_diagnostics().await?;
265 let mut diags = captured_diags
266 .diagnostics
267 .iter()
268 .map(|d| d.into_plain())
269 .try_join()
270 .await?;
271
272 diags.sort();
273
274 Ok(Arc::new(diags))
275}
276
277#[napi(object)]
278pub struct NapiIssue {
279 pub severity: String,
280 pub stage: String,
281 pub file_path: String,
282 pub title: serde_json::Value,
283 pub description: Option<serde_json::Value>,
284 pub detail: Option<serde_json::Value>,
285 pub source: Option<NapiIssueSource>,
286 pub documentation_link: String,
287 pub sub_issues: Vec<NapiIssue>,
288}
289
290impl From<&PlainIssue> for NapiIssue {
291 fn from(issue: &PlainIssue) -> Self {
292 Self {
293 description: issue
294 .description
295 .as_ref()
296 .map(|styled| serde_json::to_value(StyledStringSerialize::from(styled)).unwrap()),
297 stage: issue.stage.to_string(),
298 file_path: issue.file_path.to_string(),
299 detail: issue
300 .detail
301 .as_ref()
302 .map(|styled| serde_json::to_value(StyledStringSerialize::from(styled)).unwrap()),
303 documentation_link: issue.documentation_link.to_string(),
304 severity: issue.severity.as_str().to_string(),
305 source: issue.source.as_ref().map(|source| source.into()),
306 title: serde_json::to_value(StyledStringSerialize::from(&issue.title)).unwrap(),
307 sub_issues: issue
308 .sub_issues
309 .iter()
310 .map(|issue| (&**issue).into())
311 .collect(),
312 }
313 }
314}
315
316#[derive(Serialize)]
317#[serde(tag = "type", rename_all = "camelCase")]
318pub enum StyledStringSerialize<'a> {
319 Line {
320 value: Vec<StyledStringSerialize<'a>>,
321 },
322 Stack {
323 value: Vec<StyledStringSerialize<'a>>,
324 },
325 Text {
326 value: &'a str,
327 },
328 Code {
329 value: &'a str,
330 },
331 Strong {
332 value: &'a str,
333 },
334}
335
336impl<'a> From<&'a StyledString> for StyledStringSerialize<'a> {
337 fn from(value: &'a StyledString) -> Self {
338 match value {
339 StyledString::Line(parts) => StyledStringSerialize::Line {
340 value: parts.iter().map(|p| p.into()).collect(),
341 },
342 StyledString::Stack(parts) => StyledStringSerialize::Stack {
343 value: parts.iter().map(|p| p.into()).collect(),
344 },
345 StyledString::Text(string) => StyledStringSerialize::Text { value: string },
346 StyledString::Code(string) => StyledStringSerialize::Code { value: string },
347 StyledString::Strong(string) => StyledStringSerialize::Strong { value: string },
348 }
349 }
350}
351
352#[napi(object)]
353pub struct NapiIssueSource {
354 pub source: NapiSource,
355 pub range: Option<NapiIssueSourceRange>,
356}
357
358impl From<&PlainIssueSource> for NapiIssueSource {
359 fn from(
360 PlainIssueSource {
361 asset: source,
362 range,
363 }: &PlainIssueSource,
364 ) -> Self {
365 Self {
366 source: (&**source).into(),
367 range: range.as_ref().map(|range| range.into()),
368 }
369 }
370}
371
372#[napi(object)]
373pub struct NapiIssueSourceRange {
374 pub start: NapiSourcePos,
375 pub end: NapiSourcePos,
376}
377
378impl From<&(SourcePos, SourcePos)> for NapiIssueSourceRange {
379 fn from((start, end): &(SourcePos, SourcePos)) -> Self {
380 Self {
381 start: (*start).into(),
382 end: (*end).into(),
383 }
384 }
385}
386
387#[napi(object)]
388pub struct NapiSource {
389 pub ident: String,
390 pub content: Option<String>,
391}
392
393impl From<&PlainSource> for NapiSource {
394 fn from(source: &PlainSource) -> Self {
395 Self {
396 ident: source.ident.to_string(),
397 content: match &*source.content {
398 FileContent::Content(content) => match content.content().to_str() {
399 Ok(str) => Some(str.into_owned()),
400 Err(_) => None,
401 },
402 FileContent::NotFound => None,
403 },
404 }
405 }
406}
407
408#[napi(object)]
409pub struct NapiSourcePos {
410 pub line: u32,
411 pub column: u32,
412}
413
414impl From<SourcePos> for NapiSourcePos {
415 fn from(pos: SourcePos) -> Self {
416 Self {
417 line: pos.line,
418 column: pos.column,
419 }
420 }
421}
422
423#[napi(object)]
424pub struct NapiDiagnostic {
425 pub category: String,
426 pub name: String,
427 #[napi(ts_type = "Record<string, string>")]
428 pub payload: FxHashMap<String, String>,
429}
430
431impl NapiDiagnostic {
432 pub fn from(diagnostic: &PlainDiagnostic) -> Self {
433 Self {
434 category: diagnostic.category.to_string(),
435 name: diagnostic.name.to_string(),
436 payload: diagnostic
437 .payload
438 .iter()
439 .map(|(k, v)| (k.to_string(), v.to_string()))
440 .collect(),
441 }
442 }
443}
444
445pub struct TurbopackResult<T: ToNapiValue> {
446 pub result: T,
447 pub issues: Vec<NapiIssue>,
448 pub diagnostics: Vec<NapiDiagnostic>,
449}
450
451impl<T: ToNapiValue> ToNapiValue for TurbopackResult<T> {
452 unsafe fn to_napi_value(
453 env: napi::sys::napi_env,
454 val: Self,
455 ) -> napi::Result<napi::sys::napi_value> {
456 let mut obj = unsafe { napi::Env::from_raw(env).create_object()? };
457
458 let result = unsafe {
459 let result = T::to_napi_value(env, val.result)?;
460 JsUnknown::from_raw(env, result)?
461 };
462 if matches!(result.get_type()?, napi::ValueType::Object) {
463 let result = unsafe { result.cast::<JsObject>() };
465
466 for key in JsObject::keys(&result)? {
467 let value: JsUnknown = result.get_named_property(&key)?;
468 obj.set_named_property(&key, value)?;
469 }
470 }
471
472 obj.set_named_property("issues", val.issues)?;
473 obj.set_named_property("diagnostics", val.diagnostics)?;
474
475 Ok(unsafe { obj.raw() })
476 }
477}
478
479pub fn subscribe<T: 'static + Send + Sync, F: Future<Output = Result<T>> + Send, V: ToNapiValue>(
480 turbo_tasks: NextTurboTasks,
481 func: JsFunction,
482 handler: impl 'static + Sync + Send + Clone + Fn() -> F,
483 mapper: impl 'static + Sync + Send + FnMut(ThreadSafeCallContext<T>) -> napi::Result<Vec<V>>,
484) -> napi::Result<External<RootTask>> {
485 let func: ThreadsafeFunction<T> = func.create_threadsafe_function(0, mapper)?;
486 let task_id = turbo_tasks.spawn_root_task(move || {
487 let handler = handler.clone();
488 let func = func.clone();
489 Box::pin(async move {
490 let result = handler().await;
491
492 let status = func.call(
493 result.map_err(|e| {
494 log_internal_error_and_inform(&e);
495 napi::Error::from_reason(PrettyPrintError(&e).to_string())
496 }),
497 ThreadsafeFunctionCallMode::NonBlocking,
498 );
499 if !matches!(status, Status::Ok) {
500 let error = anyhow!("Error calling JS function: {}", status);
501 eprintln!("{error}");
502 return Err::<Vc<()>, _>(error);
503 }
504 Ok(Default::default())
505 })
506 });
507 Ok(External::new(RootTask {
508 turbo_tasks,
509 task_id: Some(task_id),
510 }))
511}
512
513pub async fn strongly_consistent_catch_collectables<R: VcValueType + Send>(
516 source_op: OperationVc<R>,
517) -> Result<(
518 Option<ReadRef<R>>,
519 Arc<Vec<ReadRef<PlainIssue>>>,
520 Arc<Vec<ReadRef<PlainDiagnostic>>>,
521 Arc<Effects>,
522)> {
523 let result = source_op.read_strongly_consistent().await;
524 let issues = get_issues(source_op).await?;
525 let diagnostics = get_diagnostics(source_op).await?;
526 let effects = Arc::new(get_effects(source_op).await?);
527
528 let result = if result.is_err() && issues.iter().any(|i| i.severity <= IssueSeverity::Error) {
529 None
530 } else {
531 Some(result?)
532 };
533
534 Ok((result, issues, diagnostics, effects))
535}