turbo_tasks_backend/
kv_backing_storage.rs

1use std::{
2    borrow::Borrow,
3    env,
4    path::PathBuf,
5    sync::{Arc, LazyLock, Mutex, PoisonError, Weak},
6};
7
8use anyhow::{Context, Result};
9use turbo_bincode::{
10    TurboBincodeBuffer, new_turbo_bincode_decoder, turbo_bincode_decode, turbo_bincode_encode,
11    turbo_bincode_encode_into,
12};
13use turbo_tasks::{
14    TaskId,
15    backend::CachedTaskType,
16    panic_hooks::{PanicHookGuard, register_panic_hook},
17    parallel,
18};
19
20use crate::{
21    GitVersionInfo,
22    backend::{AnyOperation, SpecificTaskDataCategory, storage_schema::TaskStorage},
23    backing_storage::{BackingStorage, BackingStorageSealed},
24    database::{
25        db_invalidation::{StartupCacheState, check_db_invalidation_and_cleanup, invalidate_db},
26        db_versioning::handle_db_versioning,
27        key_value_database::{KeySpace, KeyValueDatabase},
28        write_batch::{
29            BaseWriteBatch, ConcurrentWriteBatch, SerialWriteBatch, WriteBatch, WriteBatchRef,
30            WriteBuffer,
31        },
32    },
33    db_invalidation::invalidation_reasons,
34    utils::chunked_vec::ChunkedVec,
35};
36
37const META_KEY_OPERATIONS: u32 = 0;
38const META_KEY_NEXT_FREE_TASK_ID: u32 = 1;
39
40struct IntKey([u8; 4]);
41
42impl IntKey {
43    fn new(value: u32) -> Self {
44        Self(value.to_le_bytes())
45    }
46}
47
48impl AsRef<[u8]> for IntKey {
49    fn as_ref(&self) -> &[u8] {
50        &self.0
51    }
52}
53
54fn as_u32(bytes: impl Borrow<[u8]>) -> Result<u32> {
55    let n = u32::from_le_bytes(bytes.borrow().try_into()?);
56    Ok(n)
57}
58
59// We want to invalidate the cache on panic for most users, but this is a band-aid to underlying
60// problems in turbo-tasks.
61//
62// If we invalidate the cache upon panic and it "fixes" the issue upon restart, users typically
63// won't report bugs to us, and we'll never find root-causes for these problems.
64//
65// These overrides let us avoid the cache invalidation / error suppression within Vercel so that we
66// feel these pain points and fix the root causes of bugs.
67fn should_invalidate_on_panic() -> bool {
68    fn env_is_falsy(key: &str) -> bool {
69        env::var_os(key)
70            .is_none_or(|value| ["".as_ref(), "0".as_ref(), "false".as_ref()].contains(&&*value))
71    }
72    static SHOULD_INVALIDATE: LazyLock<bool> = LazyLock::new(|| {
73        env_is_falsy("TURBO_ENGINE_SKIP_INVALIDATE_ON_PANIC") && env_is_falsy("__NEXT_TEST_MODE")
74    });
75    *SHOULD_INVALIDATE
76}
77
78pub struct KeyValueDatabaseBackingStorageInner<T: KeyValueDatabase> {
79    database: T,
80    /// Used when calling [`BackingStorage::invalidate`]. Can be `None` in the memory-only/no-op
81    /// storage case.
82    base_path: Option<PathBuf>,
83    /// Used to skip calling [`invalidate_db`] when the database has already been invalidated.
84    invalidated: Mutex<bool>,
85    /// We configure a panic hook to invalidate the cache. This guard cleans up our panic hook upon
86    /// drop.
87    _panic_hook_guard: Option<PanicHookGuard>,
88}
89
90pub struct KeyValueDatabaseBackingStorage<T: KeyValueDatabase> {
91    // wrapped so that `register_panic_hook` can hold a weak reference to `inner`.
92    inner: Arc<KeyValueDatabaseBackingStorageInner<T>>,
93}
94
95/// A wrapper type used by [`crate::turbo_backing_storage`] and [`crate::noop_backing_storage`].
96///
97/// Wraps a low-level key-value database into a higher-level [`BackingStorage`] type.
98impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorage<T> {
99    pub(crate) fn new_in_memory(database: T) -> Self {
100        Self {
101            inner: Arc::new(KeyValueDatabaseBackingStorageInner {
102                database,
103                base_path: None,
104                invalidated: Mutex::new(false),
105                _panic_hook_guard: None,
106            }),
107        }
108    }
109
110    /// Handles boilerplate logic for an on-disk persisted database with versioning.
111    ///
112    /// - Creates a directory per version, with a maximum number of old versions and performs
113    ///   automatic cleanup of old versions.
114    /// - Checks for a database invalidation marker file, and cleans up the database as needed.
115    /// - [Registers a dynamic panic hook][turbo_tasks::panic_hooks] to invalidate the database upon
116    ///   a panic. This invalidates the database using [`invalidation_reasons::PANIC`].
117    ///
118    /// Along with returning a [`KeyValueDatabaseBackingStorage`], this returns a
119    /// [`StartupCacheState`], which can be used by the application for logging information to the
120    /// user or telemetry about the cache.
121    pub(crate) fn open_versioned_on_disk(
122        base_path: PathBuf,
123        version_info: &GitVersionInfo,
124        is_ci: bool,
125        database: impl FnOnce(PathBuf) -> Result<T>,
126    ) -> Result<(Self, StartupCacheState)>
127    where
128        T: Send + Sync + 'static,
129    {
130        let startup_cache_state = check_db_invalidation_and_cleanup(&base_path)
131            .context("Failed to check database invalidation and cleanup")?;
132        let versioned_path = handle_db_versioning(&base_path, version_info, is_ci)
133            .context("Failed to handle database versioning")?;
134        let database = (database)(versioned_path).context("Failed to open database")?;
135        let backing_storage = Self {
136            inner: Arc::new_cyclic(
137                move |weak_inner: &Weak<KeyValueDatabaseBackingStorageInner<T>>| {
138                    let panic_hook_guard = if should_invalidate_on_panic() {
139                        let weak_inner = weak_inner.clone();
140                        Some(register_panic_hook(Box::new(move |_| {
141                            let Some(inner) = weak_inner.upgrade() else {
142                                return;
143                            };
144                            // If a panic happened that must mean something deep inside of turbopack
145                            // or turbo-tasks failed, and it may be hard to recover. We don't want
146                            // the cache to stick around, as that may persist bugs. Make a
147                            // best-effort attempt to invalidate the database (ignoring failures).
148                            let _ = inner.invalidate(invalidation_reasons::PANIC);
149                        })))
150                    } else {
151                        None
152                    };
153                    KeyValueDatabaseBackingStorageInner {
154                        database,
155                        base_path: Some(base_path),
156                        invalidated: Mutex::new(false),
157                        _panic_hook_guard: panic_hook_guard,
158                    }
159                },
160            ),
161        };
162        Ok((backing_storage, startup_cache_state))
163    }
164}
165
166impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorageInner<T> {
167    fn with_tx<R>(
168        &self,
169        tx: Option<&T::ReadTransaction<'_>>,
170        f: impl FnOnce(&T::ReadTransaction<'_>) -> Result<R>,
171    ) -> Result<R> {
172        if let Some(tx) = tx {
173            f(tx)
174        } else {
175            let tx = self.database.begin_read_transaction()?;
176            let r = f(&tx)?;
177            drop(tx);
178            Ok(r)
179        }
180    }
181
182    fn invalidate(&self, reason_code: &str) -> Result<()> {
183        // `base_path` can be `None` for a `NoopKvDb`
184        if let Some(base_path) = &self.base_path {
185            // Invalidation could happen frequently if there's a bunch of panics. We only need to
186            // invalidate once, so grab a lock.
187            let mut invalidated_guard = self
188                .invalidated
189                .lock()
190                .unwrap_or_else(PoisonError::into_inner);
191            if *invalidated_guard {
192                return Ok(());
193            }
194            // Invalidate first, as it's a very fast atomic operation. `prevent_writes` is allowed
195            // to be slower (e.g. wait for a lock) and is allowed to corrupt the database with
196            // partial writes.
197            invalidate_db(base_path, reason_code)?;
198            self.database.prevent_writes();
199            // Avoid redundant invalidations from future panics
200            *invalidated_guard = true;
201        }
202        Ok(())
203    }
204
205    /// Used to read the next free task ID from the database.
206    fn get_infra_u32(&self, key: u32) -> Result<Option<u32>> {
207        let tx = self.database.begin_read_transaction()?;
208        self.database
209            .get(&tx, KeySpace::Infra, IntKey::new(key).as_ref())?
210            .map(as_u32)
211            .transpose()
212    }
213}
214
215impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
216    for KeyValueDatabaseBackingStorage<T>
217{
218    fn invalidate(&self, reason_code: &str) -> Result<()> {
219        self.inner.invalidate(reason_code)
220    }
221}
222
223impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
224    for KeyValueDatabaseBackingStorage<T>
225{
226    type ReadTransaction<'l> = T::ReadTransaction<'l>;
227
228    fn next_free_task_id(&self) -> Result<TaskId> {
229        Ok(self
230            .inner
231            .get_infra_u32(META_KEY_NEXT_FREE_TASK_ID)
232            .context("Unable to read next free task id from database")?
233            .map_or(Ok(TaskId::MIN), TaskId::try_from)?)
234    }
235
236    fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>> {
237        fn get(database: &impl KeyValueDatabase) -> Result<Vec<AnyOperation>> {
238            let tx = database.begin_read_transaction()?;
239            let Some(operations) = database.get(
240                &tx,
241                KeySpace::Infra,
242                IntKey::new(META_KEY_OPERATIONS).as_ref(),
243            )?
244            else {
245                return Ok(Vec::new());
246            };
247            let operations = turbo_bincode_decode(operations.borrow())?;
248            Ok(operations)
249        }
250        get(&self.inner.database).context("Unable to read uncompleted operations from database")
251    }
252
253    fn save_snapshot<I>(
254        &self,
255        operations: Vec<Arc<AnyOperation>>,
256        task_cache_updates: Vec<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>,
257        snapshots: Vec<I>,
258    ) -> Result<()>
259    where
260        I: Iterator<
261                Item = (
262                    TaskId,
263                    Option<TurboBincodeBuffer>,
264                    Option<TurboBincodeBuffer>,
265                ),
266            > + Send
267            + Sync,
268    {
269        let _span = tracing::info_span!("save snapshot", operations = operations.len()).entered();
270        let mut batch = self.inner.database.write_batch()?;
271
272        // these buffers should be large, because they're temporary and re-used.
273        // From measuring a large application the largest TaskType was ~365b, so this should be big
274        // enough to trigger no resizes in the loop.
275        const INITIAL_ENCODE_BUFFER_CAPACITY: usize = 512;
276        #[cfg(feature = "print_cache_item_size")]
277        let all_stats: std::sync::Mutex<
278            std::collections::HashMap<&'static str, TaskTypeCacheStats>,
279        > = std::sync::Mutex::new(std::collections::HashMap::new());
280        // Start organizing the updates in parallel
281        match &mut batch {
282            &mut WriteBatch::Concurrent(ref batch, _) => {
283                {
284                    let _span = tracing::trace_span!("update task data").entered();
285                    process_task_data(snapshots, Some(batch))?;
286                    let span = tracing::trace_span!("flush task data").entered();
287                    parallel::try_for_each(
288                        &[KeySpace::TaskMeta, KeySpace::TaskData],
289                        |&key_space| {
290                            let _span = span.clone().entered();
291                            // Safety: We already finished all processing of the task data and task
292                            // meta
293                            unsafe { batch.flush(key_space) }
294                        },
295                    )?;
296                }
297
298                let mut next_task_id = get_next_free_task_id::<
299                    T::SerialWriteBatch<'_>,
300                    T::ConcurrentWriteBatch<'_>,
301                >(&mut WriteBatchRef::concurrent(batch))?;
302
303                {
304                    let _span = tracing::trace_span!(
305                        "update task cache",
306                        items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
307                    )
308                    .entered();
309                    let max_task_id = parallel::map_collect_owned::<_, _, Result<Vec<_>>>(
310                        task_cache_updates,
311                        |updates| {
312                            let _span = _span.clone().entered();
313                            let mut max_task_id = 0;
314
315                            // Re-use the same buffer across every `serialize_task_type` call in
316                            // this chunk. `ConcurrentWriteBatch::put` will copy the data out of
317                            // this buffer into smaller exact-sized vecs.
318                            let mut task_type_bytes =
319                                TurboBincodeBuffer::with_capacity(INITIAL_ENCODE_BUFFER_CAPACITY);
320                            for (task_type, task_id) in updates {
321                                task_type_bytes.clear();
322                                encode_task_type(&task_type, &mut task_type_bytes, Some(task_id))?;
323                                let task_id: u32 = *task_id;
324
325                                batch
326                                    .put(
327                                        KeySpace::TaskCache,
328                                        WriteBuffer::Borrowed(&task_type_bytes),
329                                        WriteBuffer::Borrowed(&task_id.to_le_bytes()),
330                                    )
331                                    .with_context(|| {
332                                        format!(
333                                            "Unable to write task cache {task_type:?} => {task_id}"
334                                        )
335                                    })?;
336                                #[cfg(feature = "print_cache_item_size")]
337                                all_stats
338                                    .lock()
339                                    .unwrap()
340                                    .entry(task_type.get_name())
341                                    .or_default()
342                                    .add(&task_type_bytes);
343                                max_task_id = max_task_id.max(task_id);
344                            }
345
346                            Ok(max_task_id)
347                        },
348                    )?
349                    .into_iter()
350                    .max()
351                    .unwrap_or(0);
352                    next_task_id = next_task_id.max(max_task_id + 1);
353                }
354
355                save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
356                    &mut WriteBatchRef::concurrent(batch),
357                    next_task_id,
358                    operations,
359                )?;
360            }
361            WriteBatch::Serial(batch) => {
362                {
363                    let _span = tracing::trace_span!("update tasks").entered();
364                    let task_items =
365                        process_task_data(snapshots, None::<&T::ConcurrentWriteBatch<'_>>)?;
366                    for (task_id, meta, data) in task_items.into_iter().flatten() {
367                        let key = IntKey::new(*task_id);
368                        let key = key.as_ref();
369                        if let Some(meta) = meta {
370                            batch
371                                .put(KeySpace::TaskMeta, WriteBuffer::Borrowed(key), meta)
372                                .with_context(|| {
373                                    format!("Unable to write meta items for {task_id}")
374                                })?;
375                        }
376                        if let Some(data) = data {
377                            batch
378                                .put(KeySpace::TaskData, WriteBuffer::Borrowed(key), data)
379                                .with_context(|| {
380                                    format!("Unable to write data items for {task_id}")
381                                })?;
382                        }
383                    }
384                    batch.flush(KeySpace::TaskMeta)?;
385                    batch.flush(KeySpace::TaskData)?;
386                }
387
388                let mut next_task_id = get_next_free_task_id::<
389                    T::SerialWriteBatch<'_>,
390                    T::ConcurrentWriteBatch<'_>,
391                >(&mut WriteBatchRef::serial(batch))?;
392
393                {
394                    let _span = tracing::trace_span!(
395                        "update task cache",
396                        items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
397                    )
398                    .entered();
399                    // Re-use the same buffer across every `serialize_task_type` call.
400                    // `ConcurrentWriteBatch::put` will copy the data out of this buffer into
401                    // smaller exact-sized vecs.
402                    let mut task_type_bytes =
403                        TurboBincodeBuffer::with_capacity(INITIAL_ENCODE_BUFFER_CAPACITY);
404                    for (task_type, task_id) in task_cache_updates.into_iter().flatten() {
405                        encode_task_type(&task_type, &mut task_type_bytes, Some(task_id))?;
406                        let task_id = *task_id;
407
408                        batch
409                            .put(
410                                KeySpace::TaskCache,
411                                WriteBuffer::Borrowed(&task_type_bytes),
412                                WriteBuffer::Borrowed(&task_id.to_le_bytes()),
413                            )
414                            .with_context(|| {
415                                format!("Unable to write task cache {task_type:?} => {task_id}")
416                            })?;
417                        #[cfg(feature = "print_cache_item_size")]
418                        all_stats
419                            .lock()
420                            .unwrap()
421                            .entry(task_type.get_name())
422                            .or_default()
423                            .add(&task_type_bytes);
424                        next_task_id = next_task_id.max(task_id + 1);
425                    }
426                }
427
428                save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
429                    &mut WriteBatchRef::serial(batch),
430                    next_task_id,
431                    operations,
432                )?;
433            }
434        }
435        #[cfg(feature = "print_cache_item_size")]
436        print_task_type_cache_stats(all_stats.into_inner().unwrap());
437
438        {
439            let _span = tracing::trace_span!("commit").entered();
440            batch.commit().context("Unable to commit operations")?;
441        }
442        Ok(())
443    }
444
445    fn start_read_transaction(&self) -> Option<Self::ReadTransaction<'_>> {
446        self.inner.database.begin_read_transaction().ok()
447    }
448
449    unsafe fn forward_lookup_task_cache(
450        &self,
451        tx: Option<&T::ReadTransaction<'_>>,
452        task_type: &CachedTaskType,
453    ) -> Result<Option<TaskId>> {
454        let inner = &*self.inner;
455        fn lookup<D: KeyValueDatabase>(
456            database: &D,
457            tx: &D::ReadTransaction<'_>,
458            task_type: &CachedTaskType,
459        ) -> Result<Option<TaskId>> {
460            let mut task_type_bytes = TurboBincodeBuffer::new();
461            encode_task_type(task_type, &mut task_type_bytes, None)?;
462            let Some(bytes) = database.get(tx, KeySpace::TaskCache, &task_type_bytes)? else {
463                return Ok(None);
464            };
465            let bytes = bytes.borrow().try_into()?;
466            let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
467            Ok(Some(id))
468        }
469        if inner.database.is_empty() {
470            // Checking if the database is empty is a performance optimization
471            // to avoid serializing the task type.
472            return Ok(None);
473        }
474        inner
475            .with_tx(tx, |tx| lookup(&self.inner.database, tx, task_type))
476            .with_context(|| format!("Looking up task id for {task_type:?} from database failed"))
477    }
478
479    unsafe fn lookup_data(
480        &self,
481        tx: Option<&T::ReadTransaction<'_>>,
482        task_id: TaskId,
483        category: SpecificTaskDataCategory,
484        storage: &mut TaskStorage,
485    ) -> Result<()> {
486        let inner = &*self.inner;
487        fn lookup<D: KeyValueDatabase>(
488            database: &D,
489            tx: &D::ReadTransaction<'_>,
490            task_id: TaskId,
491            category: SpecificTaskDataCategory,
492            storage: &mut TaskStorage,
493        ) -> Result<()> {
494            let Some(bytes) =
495                database.get(tx, category.key_space(), IntKey::new(*task_id).as_ref())?
496            else {
497                return Ok(());
498            };
499            let mut decoder = new_turbo_bincode_decoder(bytes.borrow());
500            storage
501                .decode(category, &mut decoder)
502                .map_err(|e| anyhow::anyhow!("Failed to decode {category:?}: {e:?}"))
503        }
504        inner
505            .with_tx(tx, |tx| {
506                lookup(&inner.database, tx, task_id, category, storage)
507            })
508            .with_context(|| format!("Looking up task storage for {task_id} from database failed"))
509    }
510
511    unsafe fn batch_lookup_data(
512        &self,
513        tx: Option<&Self::ReadTransaction<'_>>,
514        task_ids: &[TaskId],
515        category: SpecificTaskDataCategory,
516    ) -> Result<Vec<TaskStorage>> {
517        let inner = &*self.inner;
518        fn lookup<D: KeyValueDatabase>(
519            database: &D,
520            tx: &D::ReadTransaction<'_>,
521            task_ids: &[TaskId],
522            category: SpecificTaskDataCategory,
523        ) -> Result<Vec<TaskStorage>> {
524            let int_keys: Vec<_> = task_ids.iter().map(|&id| IntKey::new(*id)).collect();
525            let keys = int_keys.iter().map(|k| k.as_ref()).collect::<Vec<_>>();
526            let bytes = database.batch_get(tx, category.key_space(), &keys)?;
527            bytes
528                .into_iter()
529                .map(|opt_bytes| {
530                    let mut storage = TaskStorage::new();
531                    if let Some(bytes) = opt_bytes {
532                        let mut decoder = new_turbo_bincode_decoder(bytes.borrow());
533                        storage
534                            .decode(category, &mut decoder)
535                            .map_err(|e| anyhow::anyhow!("Failed to decode {category:?}: {e:?}"))?;
536                    }
537                    Ok(storage)
538                })
539                .collect::<Result<Vec<_>>>()
540        }
541        inner
542            .with_tx(tx, |tx| lookup(&inner.database, tx, task_ids, category))
543            .with_context(|| {
544                format!(
545                    "Looking up typed data for {} tasks from database failed",
546                    task_ids.len()
547                )
548            })
549    }
550
551    fn shutdown(&self) -> Result<()> {
552        self.inner.database.shutdown()
553    }
554}
555
556fn get_next_free_task_id<'a, S, C>(
557    batch: &mut WriteBatchRef<'_, 'a, S, C>,
558) -> Result<u32, anyhow::Error>
559where
560    S: SerialWriteBatch<'a>,
561    C: ConcurrentWriteBatch<'a>,
562{
563    Ok(
564        match batch.get(
565            KeySpace::Infra,
566            IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref(),
567        )? {
568            Some(bytes) => u32::from_le_bytes(Borrow::<[u8]>::borrow(&bytes).try_into()?),
569            None => 1,
570        },
571    )
572}
573
574fn save_infra<'a, S, C>(
575    batch: &mut WriteBatchRef<'_, 'a, S, C>,
576    next_task_id: u32,
577    operations: Vec<Arc<AnyOperation>>,
578) -> Result<(), anyhow::Error>
579where
580    S: SerialWriteBatch<'a>,
581    C: ConcurrentWriteBatch<'a>,
582{
583    {
584        batch
585            .put(
586                KeySpace::Infra,
587                WriteBuffer::Borrowed(IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref()),
588                WriteBuffer::Borrowed(&next_task_id.to_le_bytes()),
589            )
590            .context("Unable to write next free task id")?;
591    }
592    {
593        let _span =
594            tracing::trace_span!("update operations", operations = operations.len()).entered();
595        let operations =
596            turbo_bincode_encode(&operations).context("Unable to serialize operations")?;
597        batch
598            .put(
599                KeySpace::Infra,
600                WriteBuffer::Borrowed(IntKey::new(META_KEY_OPERATIONS).as_ref()),
601                WriteBuffer::SmallVec(operations),
602            )
603            .context("Unable to write operations")?;
604    }
605    batch.flush(KeySpace::Infra)?;
606    Ok(())
607}
608
609fn encode_task_type(
610    task_type: &CachedTaskType,
611    buffer: &mut TurboBincodeBuffer,
612    task_id: Option<TaskId>,
613) -> Result<()> {
614    fn encode_once_into(
615        task_type: &CachedTaskType,
616        buffer: &mut TurboBincodeBuffer,
617        task_id: Option<TaskId>,
618    ) -> Result<()> {
619        turbo_bincode_encode_into(task_type, buffer).with_context(|| {
620            if let Some(task_id) = task_id {
621                format!("Unable to serialize task {task_id} cache key {task_type:?}")
622            } else {
623                format!("Unable to serialize task cache key {task_type:?}")
624            }
625        })
626    }
627
628    debug_assert!(buffer.is_empty());
629    encode_once_into(task_type, buffer, task_id)?;
630
631    if cfg!(feature = "verify_serialization") {
632        macro_rules! println_and_panic {
633            ($($tt:tt)*) => {
634                println!($($tt)*);
635                panic!($($tt)*);
636            };
637        }
638        let deserialize: Result<CachedTaskType, _> = turbo_bincode_decode(buffer);
639        match deserialize {
640            Err(err) => {
641                println_and_panic!("Task type would not be deserializable:\n{err:?}");
642            }
643            Ok(task_type2) => {
644                if &task_type2 != task_type {
645                    println_and_panic!(
646                        "Task type would not round-trip {task_id:?}:\noriginal: \
647                         {task_type:#?}\nround-tripped: {task_type2:#?}"
648                    );
649                }
650                let mut buffer2 = TurboBincodeBuffer::new();
651                match encode_once_into(&task_type2, &mut buffer2, task_id) {
652                    Err(err) => {
653                        println_and_panic!(
654                            "Task type would not be serializable the second time:\n{err:?}"
655                        );
656                    }
657                    Ok(()) => {
658                        if buffer2 != *buffer {
659                            println_and_panic!(
660                                "Task type would not serialize to the same bytes the second time \
661                                 {task_id:?}:\noriginal: {:x?}\nsecond: {:x?}\n{task_type2:#?}",
662                                buffer,
663                                buffer2
664                            );
665                        }
666                    }
667                }
668            }
669        }
670    }
671
672    Ok(())
673}
674
675type SerializedTasks = Vec<
676    Vec<(
677        TaskId,
678        Option<WriteBuffer<'static>>,
679        Option<WriteBuffer<'static>>,
680    )>,
681>;
682
683#[cfg(feature = "print_cache_item_size")]
684#[derive(Default)]
685struct TaskTypeCacheStats {
686    key_size: usize,
687    key_size_compressed: usize,
688    count: usize,
689}
690
691#[cfg(feature = "print_cache_item_size")]
692impl TaskTypeCacheStats {
693    fn compressed_size(data: &[u8]) -> Result<usize> {
694        Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
695            data,
696            &mut Vec::new(),
697            lzzzz::lz4::ACC_LEVEL_DEFAULT,
698        )?)
699    }
700    fn add(&mut self, key_bytes: &[u8]) {
701        self.key_size += key_bytes.len();
702        self.key_size_compressed += Self::compressed_size(key_bytes).unwrap_or(0);
703        self.count += 1;
704    }
705}
706
707#[cfg(feature = "print_cache_item_size")]
708fn print_task_type_cache_stats(stats: std::collections::HashMap<&'static str, TaskTypeCacheStats>) {
709    use turbo_tasks::util::FormatBytes;
710
711    let mut stats: Vec<_> = stats.into_iter().collect();
712    if stats.is_empty() {
713        return;
714    }
715    stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
716        (stats_b.key_size_compressed, *key_b).cmp(&(stats_a.key_size_compressed, *key_a))
717    });
718    println!(
719        "Task type cache stats: {} ({})",
720        FormatBytes(
721            stats
722                .iter()
723                .map(|(_, s)| s.key_size_compressed)
724                .sum::<usize>()
725        ),
726        FormatBytes(stats.iter().map(|(_, s)| s.key_size).sum::<usize>())
727    );
728    for (fn_name, stats) in stats {
729        println!(
730            "  {} ({}) {fn_name}  x {} avg {} ({})",
731            FormatBytes(stats.key_size_compressed),
732            FormatBytes(stats.key_size),
733            stats.count,
734            FormatBytes(
735                stats
736                    .key_size_compressed
737                    .checked_div(stats.count)
738                    .unwrap_or(0)
739            ),
740            FormatBytes(stats.key_size.checked_div(stats.count).unwrap_or(0)),
741        );
742    }
743}
744fn process_task_data<'a, B: ConcurrentWriteBatch<'a> + Send + Sync, I>(
745    tasks: Vec<I>,
746    batch: Option<&B>,
747) -> Result<SerializedTasks>
748where
749    I: Iterator<
750            Item = (
751                TaskId,
752                Option<TurboBincodeBuffer>,
753                Option<TurboBincodeBuffer>,
754            ),
755        > + Send
756        + Sync,
757{
758    parallel::map_collect_owned::<_, _, Result<Vec<_>>>(tasks, |tasks| {
759        let mut result = Vec::new();
760        for (task_id, meta, data) in tasks {
761            if let Some(batch) = batch {
762                let key = IntKey::new(*task_id);
763                let key = key.as_ref();
764                if let Some(meta) = meta {
765                    batch.put(
766                        KeySpace::TaskMeta,
767                        WriteBuffer::Borrowed(key),
768                        WriteBuffer::SmallVec(meta),
769                    )?;
770                }
771                if let Some(data) = data {
772                    batch.put(
773                        KeySpace::TaskData,
774                        WriteBuffer::Borrowed(key),
775                        WriteBuffer::SmallVec(data),
776                    )?;
777                }
778            } else {
779                // Store the new task data
780                result.push((
781                    task_id,
782                    meta.map(WriteBuffer::SmallVec),
783                    data.map(WriteBuffer::SmallVec),
784                ));
785            }
786        }
787
788        Ok(result)
789    })
790}