turbo_tasks_backend/
kv_backing_storage.rs

1use std::{
2    borrow::Borrow,
3    cmp::max,
4    env,
5    path::PathBuf,
6    sync::{Arc, LazyLock, Mutex, PoisonError, Weak},
7};
8
9use anyhow::{Context, Result, anyhow};
10use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
11use serde::{Deserialize, Serialize};
12use smallvec::SmallVec;
13use tracing::Span;
14use turbo_tasks::{
15    SessionId, TaskId,
16    backend::CachedTaskType,
17    panic_hooks::{PanicHookGuard, register_panic_hook},
18    turbo_tasks_scope,
19};
20
21use crate::{
22    GitVersionInfo,
23    backend::{AnyOperation, TaskDataCategory},
24    backing_storage::{BackingStorage, BackingStorageSealed},
25    data::CachedDataItem,
26    database::{
27        db_invalidation::{StartupCacheState, check_db_invalidation_and_cleanup, invalidate_db},
28        db_versioning::handle_db_versioning,
29        key_value_database::{KeySpace, KeyValueDatabase},
30        write_batch::{
31            BaseWriteBatch, ConcurrentWriteBatch, SerialWriteBatch, WriteBatch, WriteBatchRef,
32            WriteBuffer,
33        },
34    },
35    db_invalidation::invalidation_reasons,
36    utils::chunked_vec::ChunkedVec,
37};
38
39const POT_CONFIG: pot::Config = pot::Config::new().compatibility(pot::Compatibility::V4);
40
41fn pot_serialize_small_vec<T: Serialize>(value: &T) -> pot::Result<SmallVec<[u8; 16]>> {
42    struct SmallVecWrite<'l>(&'l mut SmallVec<[u8; 16]>);
43    impl std::io::Write for SmallVecWrite<'_> {
44        #[inline]
45        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
46            self.0.extend_from_slice(buf);
47            Ok(buf.len())
48        }
49
50        #[inline]
51        fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
52            self.0.extend_from_slice(buf);
53            Ok(())
54        }
55
56        #[inline]
57        fn flush(&mut self) -> std::io::Result<()> {
58            Ok(())
59        }
60    }
61
62    let mut output = SmallVec::new();
63    POT_CONFIG.serialize_into(value, SmallVecWrite(&mut output))?;
64    Ok(output)
65}
66
67fn pot_ser_symbol_map() -> pot::ser::SymbolMap {
68    pot::ser::SymbolMap::new().with_compatibility(pot::Compatibility::V4)
69}
70
71fn pot_de_symbol_list<'l>() -> pot::de::SymbolList<'l> {
72    pot::de::SymbolList::new()
73}
74
75const META_KEY_OPERATIONS: u32 = 0;
76const META_KEY_NEXT_FREE_TASK_ID: u32 = 1;
77const META_KEY_SESSION_ID: u32 = 2;
78
79struct IntKey([u8; 4]);
80
81impl IntKey {
82    fn new(value: u32) -> Self {
83        Self(value.to_le_bytes())
84    }
85}
86
87impl AsRef<[u8]> for IntKey {
88    fn as_ref(&self) -> &[u8] {
89        &self.0
90    }
91}
92
93fn as_u32(bytes: impl Borrow<[u8]>) -> Result<u32> {
94    let n = u32::from_le_bytes(bytes.borrow().try_into()?);
95    Ok(n)
96}
97
98// We want to invalidate the cache on panic for most users, but this is a band-aid to underlying
99// problems in turbo-tasks.
100//
101// If we invalidate the cache upon panic and it "fixes" the issue upon restart, users typically
102// won't report bugs to us, and we'll never find root-causes for these problems.
103//
104// These overrides let us avoid the cache invalidation / error suppression within Vercel so that we
105// feel these pain points and fix the root causes of bugs.
106fn should_invalidate_on_panic() -> bool {
107    fn env_is_falsy(key: &str) -> bool {
108        env::var_os(key)
109            .is_none_or(|value| ["".as_ref(), "0".as_ref(), "false".as_ref()].contains(&&*value))
110    }
111    static SHOULD_INVALIDATE: LazyLock<bool> = LazyLock::new(|| {
112        env_is_falsy("TURBO_ENGINE_SKIP_INVALIDATE_ON_PANIC") && env_is_falsy("__NEXT_TEST_MODE")
113    });
114    *SHOULD_INVALIDATE
115}
116
117pub struct KeyValueDatabaseBackingStorageInner<T: KeyValueDatabase> {
118    database: T,
119    /// Used when calling [`BackingStorage::invalidate`]. Can be `None` in the memory-only/no-op
120    /// storage case.
121    base_path: Option<PathBuf>,
122    /// Used to skip calling [`invalidate_db`] when the database has already been invalidated.
123    invalidated: Mutex<bool>,
124    /// We configure a panic hook to invalidate the cache. This guard cleans up our panic hook upon
125    /// drop.
126    _panic_hook_guard: Option<PanicHookGuard>,
127}
128
129pub struct KeyValueDatabaseBackingStorage<T: KeyValueDatabase> {
130    // wrapped so that `register_panic_hook` can hold a weak reference to `inner`.
131    inner: Arc<KeyValueDatabaseBackingStorageInner<T>>,
132}
133
134/// A wrapper type used by [`crate::turbo_backing_storage`] and [`crate::noop_backing_storage`].
135///
136/// Wraps a low-level key-value database into a higher-level [`BackingStorage`] type.
137impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorage<T> {
138    pub(crate) fn new_in_memory(database: T) -> Self {
139        Self {
140            inner: Arc::new(KeyValueDatabaseBackingStorageInner {
141                database,
142                base_path: None,
143                invalidated: Mutex::new(false),
144                _panic_hook_guard: None,
145            }),
146        }
147    }
148
149    /// Handles boilerplate logic for an on-disk persisted database with versioning.
150    ///
151    /// - Creates a directory per version, with a maximum number of old versions and performs
152    ///   automatic cleanup of old versions.
153    /// - Checks for a database invalidation marker file, and cleans up the database as needed.
154    /// - [Registers a dynamic panic hook][turbo_tasks::panic_hooks] to invalidate the database upon
155    ///   a panic. This invalidates the database using [`invalidation_reasons::PANIC`].
156    ///
157    /// Along with returning a [`KeyValueDatabaseBackingStorage`], this returns a
158    /// [`StartupCacheState`], which can be used by the application for logging information to the
159    /// user or telemetry about the cache.
160    pub(crate) fn open_versioned_on_disk(
161        base_path: PathBuf,
162        version_info: &GitVersionInfo,
163        is_ci: bool,
164        database: impl FnOnce(PathBuf) -> Result<T>,
165    ) -> Result<(Self, StartupCacheState)>
166    where
167        T: Send + Sync + 'static,
168    {
169        let startup_cache_state = check_db_invalidation_and_cleanup(&base_path)?;
170        let versioned_path = handle_db_versioning(&base_path, version_info, is_ci)?;
171        let database = (database)(versioned_path)?;
172        let backing_storage = Self {
173            inner: Arc::new_cyclic(
174                move |weak_inner: &Weak<KeyValueDatabaseBackingStorageInner<T>>| {
175                    let panic_hook_guard = if should_invalidate_on_panic() {
176                        let weak_inner = weak_inner.clone();
177                        Some(register_panic_hook(Box::new(move |_| {
178                            let Some(inner) = weak_inner.upgrade() else {
179                                return;
180                            };
181                            // If a panic happened that must mean something deep inside of turbopack
182                            // or turbo-tasks failed, and it may be hard to recover. We don't want
183                            // the cache to stick around, as that may persist bugs. Make a
184                            // best-effort attempt to invalidate the database (ignoring failures).
185                            let _ = inner.invalidate(invalidation_reasons::PANIC);
186                        })))
187                    } else {
188                        None
189                    };
190                    KeyValueDatabaseBackingStorageInner {
191                        database,
192                        base_path: Some(base_path),
193                        invalidated: Mutex::new(false),
194                        _panic_hook_guard: panic_hook_guard,
195                    }
196                },
197            ),
198        };
199        Ok((backing_storage, startup_cache_state))
200    }
201}
202
203impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorageInner<T> {
204    fn with_tx<R>(
205        &self,
206        tx: Option<&T::ReadTransaction<'_>>,
207        f: impl FnOnce(&T::ReadTransaction<'_>) -> Result<R>,
208    ) -> Result<R> {
209        if let Some(tx) = tx {
210            f(tx)
211        } else {
212            let tx = self.database.begin_read_transaction()?;
213            let r = f(&tx)?;
214            drop(tx);
215            Ok(r)
216        }
217    }
218
219    fn invalidate(&self, reason_code: &str) -> Result<()> {
220        // `base_path` can be `None` for a `NoopKvDb`
221        if let Some(base_path) = &self.base_path {
222            // Invalidation could happen frequently if there's a bunch of panics. We only need to
223            // invalidate once, so grab a lock.
224            let mut invalidated_guard = self
225                .invalidated
226                .lock()
227                .unwrap_or_else(PoisonError::into_inner);
228            if *invalidated_guard {
229                return Ok(());
230            }
231            // Invalidate first, as it's a very fast atomic operation. `prevent_writes` is allowed
232            // to be slower (e.g. wait for a lock) and is allowed to corrupt the database with
233            // partial writes.
234            invalidate_db(base_path, reason_code)?;
235            self.database.prevent_writes();
236            // Avoid redundant invalidations from future panics
237            *invalidated_guard = true;
238        }
239        Ok(())
240    }
241
242    /// Used to read the previous session id and the next free task ID from the database.
243    fn get_infra_u32(&self, key: u32) -> Result<Option<u32>> {
244        let tx = self.database.begin_read_transaction()?;
245        self.database
246            .get(&tx, KeySpace::Infra, IntKey::new(key).as_ref())?
247            .map(as_u32)
248            .transpose()
249    }
250}
251
252impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
253    for KeyValueDatabaseBackingStorage<T>
254{
255    fn invalidate(&self, reason_code: &str) -> Result<()> {
256        self.inner.invalidate(reason_code)
257    }
258}
259
260impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
261    for KeyValueDatabaseBackingStorage<T>
262{
263    type ReadTransaction<'l> = T::ReadTransaction<'l>;
264
265    fn next_free_task_id(&self) -> Result<TaskId> {
266        Ok(TaskId::try_from(
267            self.inner
268                .get_infra_u32(META_KEY_NEXT_FREE_TASK_ID)
269                .context("Unable to read next free task id from database")?
270                .unwrap_or(1),
271        )?)
272    }
273
274    fn next_session_id(&self) -> Result<SessionId> {
275        Ok(SessionId::try_from(
276            self.inner
277                .get_infra_u32(META_KEY_SESSION_ID)
278                .context("Unable to read session id from database")?
279                .unwrap_or(0)
280                + 1,
281        )?)
282    }
283
284    fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>> {
285        fn get(database: &impl KeyValueDatabase) -> Result<Vec<AnyOperation>> {
286            let tx = database.begin_read_transaction()?;
287            let Some(operations) = database.get(
288                &tx,
289                KeySpace::Infra,
290                IntKey::new(META_KEY_OPERATIONS).as_ref(),
291            )?
292            else {
293                return Ok(Vec::new());
294            };
295            let operations = deserialize_with_good_error(operations.borrow())?;
296            Ok(operations)
297        }
298        get(&self.inner.database).context("Unable to read uncompleted operations from database")
299    }
300
301    fn serialize(&self, task: TaskId, data: &Vec<CachedDataItem>) -> Result<SmallVec<[u8; 16]>> {
302        serialize(task, data)
303    }
304
305    fn save_snapshot<I>(
306        &self,
307        session_id: SessionId,
308        operations: Vec<Arc<AnyOperation>>,
309        task_cache_updates: Vec<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>,
310        snapshots: Vec<I>,
311    ) -> Result<()>
312    where
313        I: Iterator<
314                Item = (
315                    TaskId,
316                    Option<SmallVec<[u8; 16]>>,
317                    Option<SmallVec<[u8; 16]>>,
318                ),
319            > + Send
320            + Sync,
321    {
322        let _span = tracing::trace_span!("save snapshot", session_id = ?session_id, operations = operations.len());
323        let mut batch = self.inner.database.write_batch()?;
324
325        // Start organizing the updates in parallel
326        match &mut batch {
327            &mut WriteBatch::Concurrent(ref batch, _) => {
328                {
329                    let _span = tracing::trace_span!("update task data").entered();
330                    process_task_data(snapshots, Some(batch))?;
331                    let span = tracing::trace_span!("flush task data").entered();
332                    [KeySpace::TaskMeta, KeySpace::TaskData]
333                        .into_par_iter()
334                        .try_for_each(|key_space| {
335                            let _span = span.clone().entered();
336                            // Safety: We already finished all processing of the task data and task
337                            // meta
338                            unsafe { batch.flush(key_space) }
339                        })?;
340                }
341
342                let mut next_task_id = get_next_free_task_id::<
343                    T::SerialWriteBatch<'_>,
344                    T::ConcurrentWriteBatch<'_>,
345                >(&mut WriteBatchRef::concurrent(batch))?;
346
347                {
348                    let _span = tracing::trace_span!(
349                        "update task cache",
350                        items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
351                    )
352                    .entered();
353                    let result = task_cache_updates
354                        .into_par_iter()
355                        .with_max_len(1)
356                        .map(|updates| {
357                            let _span = _span.clone().entered();
358                            let mut max_task_id = 0;
359
360                            let mut task_type_bytes = Vec::new();
361                            for (task_type, task_id) in updates {
362                                let task_id: u32 = *task_id;
363                                serialize_task_type(&task_type, &mut task_type_bytes, task_id)?;
364
365                                batch
366                                    .put(
367                                        KeySpace::ForwardTaskCache,
368                                        WriteBuffer::Borrowed(&task_type_bytes),
369                                        WriteBuffer::Borrowed(&task_id.to_le_bytes()),
370                                    )
371                                    .with_context(|| {
372                                        anyhow!(
373                                            "Unable to write task cache {task_type:?} => {task_id}"
374                                        )
375                                    })?;
376                                batch
377                                    .put(
378                                        KeySpace::ReverseTaskCache,
379                                        WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()),
380                                        WriteBuffer::Borrowed(&task_type_bytes),
381                                    )
382                                    .with_context(|| {
383                                        anyhow!(
384                                            "Unable to write task cache {task_id} => {task_type:?}"
385                                        )
386                                    })?;
387                                max_task_id = max_task_id.max(task_id + 1);
388                            }
389
390                            Ok(max_task_id)
391                        })
392                        .reduce(
393                            || Ok(0),
394                            |a, b| -> anyhow::Result<_> {
395                                let a_max = a?;
396                                let b_max = b?;
397                                Ok(max(a_max, b_max))
398                            },
399                        )?;
400                    next_task_id = next_task_id.max(result);
401                }
402
403                save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
404                    &mut WriteBatchRef::concurrent(batch),
405                    next_task_id,
406                    session_id,
407                    operations,
408                )?;
409            }
410            WriteBatch::Serial(batch) => {
411                let mut task_items_result = Ok(Vec::new());
412                turbo_tasks::scope(|s| {
413                    s.spawn(|_| {
414                        task_items_result =
415                            process_task_data(snapshots, None::<&T::ConcurrentWriteBatch<'_>>);
416                    });
417
418                    let mut next_task_id =
419                        get_next_free_task_id::<
420                            T::SerialWriteBatch<'_>,
421                            T::ConcurrentWriteBatch<'_>,
422                        >(&mut WriteBatchRef::serial(batch))?;
423
424                    {
425                        let _span = tracing::trace_span!(
426                            "update task cache",
427                            items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
428                        )
429                        .entered();
430                        let mut task_type_bytes = Vec::new();
431                        for (task_type, task_id) in task_cache_updates.into_iter().flatten() {
432                            let task_id = *task_id;
433                            serialize_task_type(&task_type, &mut task_type_bytes, task_id)?;
434
435                            batch
436                                .put(
437                                    KeySpace::ForwardTaskCache,
438                                    WriteBuffer::Borrowed(&task_type_bytes),
439                                    WriteBuffer::Borrowed(&task_id.to_le_bytes()),
440                                )
441                                .with_context(|| {
442                                    anyhow!("Unable to write task cache {task_type:?} => {task_id}")
443                                })?;
444                            batch
445                                .put(
446                                    KeySpace::ReverseTaskCache,
447                                    WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()),
448                                    WriteBuffer::Borrowed(&task_type_bytes),
449                                )
450                                .with_context(|| {
451                                    anyhow!("Unable to write task cache {task_id} => {task_type:?}")
452                                })?;
453                            next_task_id = next_task_id.max(task_id + 1);
454                        }
455                    }
456
457                    save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
458                        &mut WriteBatchRef::serial(batch),
459                        next_task_id,
460                        session_id,
461                        operations,
462                    )?;
463                    anyhow::Ok(())
464                })?;
465
466                {
467                    let _span = tracing::trace_span!("update tasks").entered();
468                    for (task_id, meta, data) in task_items_result?.into_iter().flatten() {
469                        let key = IntKey::new(*task_id);
470                        let key = key.as_ref();
471                        if let Some(meta) = meta {
472                            batch
473                                .put(KeySpace::TaskMeta, WriteBuffer::Borrowed(key), meta)
474                                .with_context(|| {
475                                    anyhow!("Unable to write meta items for {task_id}")
476                                })?;
477                        }
478                        if let Some(data) = data {
479                            batch
480                                .put(KeySpace::TaskData, WriteBuffer::Borrowed(key), data)
481                                .with_context(|| {
482                                    anyhow!("Unable to write data items for {task_id}")
483                                })?;
484                        }
485                    }
486                }
487            }
488        }
489
490        {
491            let _span = tracing::trace_span!("commit").entered();
492            batch
493                .commit()
494                .with_context(|| anyhow!("Unable to commit operations"))?;
495        }
496        Ok(())
497    }
498
499    fn start_read_transaction(&self) -> Option<Self::ReadTransaction<'_>> {
500        self.inner.database.begin_read_transaction().ok()
501    }
502
503    unsafe fn forward_lookup_task_cache(
504        &self,
505        tx: Option<&T::ReadTransaction<'_>>,
506        task_type: &CachedTaskType,
507    ) -> Result<Option<TaskId>> {
508        let inner = &*self.inner;
509        fn lookup<D: KeyValueDatabase>(
510            database: &D,
511            tx: &D::ReadTransaction<'_>,
512            task_type: &CachedTaskType,
513        ) -> Result<Option<TaskId>> {
514            let task_type = POT_CONFIG.serialize(task_type)?;
515            let Some(bytes) = database.get(tx, KeySpace::ForwardTaskCache, &task_type)? else {
516                return Ok(None);
517            };
518            let bytes = bytes.borrow().try_into()?;
519            let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
520            Ok(Some(id))
521        }
522        if inner.database.is_empty() {
523            // Checking if the database is empty is a performance optimization
524            // to avoid serializing the task type.
525            return Ok(None);
526        }
527        inner
528            .with_tx(tx, |tx| lookup(&self.inner.database, tx, task_type))
529            .with_context(|| format!("Looking up task id for {task_type:?} from database failed"))
530    }
531
532    unsafe fn reverse_lookup_task_cache(
533        &self,
534        tx: Option<&T::ReadTransaction<'_>>,
535        task_id: TaskId,
536    ) -> Result<Option<Arc<CachedTaskType>>> {
537        let inner = &*self.inner;
538        fn lookup<D: KeyValueDatabase>(
539            database: &D,
540            tx: &D::ReadTransaction<'_>,
541            task_id: TaskId,
542        ) -> Result<Option<Arc<CachedTaskType>>> {
543            let Some(bytes) = database.get(
544                tx,
545                KeySpace::ReverseTaskCache,
546                IntKey::new(*task_id).as_ref(),
547            )?
548            else {
549                return Ok(None);
550            };
551            Ok(Some(deserialize_with_good_error(bytes.borrow())?))
552        }
553        inner
554            .with_tx(tx, |tx| lookup(&inner.database, tx, task_id))
555            .with_context(|| format!("Looking up task type for {task_id} from database failed"))
556    }
557
558    unsafe fn lookup_data(
559        &self,
560        tx: Option<&T::ReadTransaction<'_>>,
561        task_id: TaskId,
562        category: TaskDataCategory,
563    ) -> Result<Vec<CachedDataItem>> {
564        let inner = &*self.inner;
565        fn lookup<D: KeyValueDatabase>(
566            database: &D,
567            tx: &D::ReadTransaction<'_>,
568            task_id: TaskId,
569            category: TaskDataCategory,
570        ) -> Result<Vec<CachedDataItem>> {
571            let Some(bytes) = database.get(
572                tx,
573                match category {
574                    TaskDataCategory::Meta => KeySpace::TaskMeta,
575                    TaskDataCategory::Data => KeySpace::TaskData,
576                    TaskDataCategory::All => unreachable!(),
577                },
578                IntKey::new(*task_id).as_ref(),
579            )?
580            else {
581                return Ok(Vec::new());
582            };
583            let result: Vec<CachedDataItem> = deserialize_with_good_error(bytes.borrow())?;
584            Ok(result)
585        }
586        inner
587            .with_tx(tx, |tx| lookup(&inner.database, tx, task_id, category))
588            .with_context(|| format!("Looking up data for {task_id} from database failed"))
589    }
590
591    fn shutdown(&self) -> Result<()> {
592        self.inner.database.shutdown()
593    }
594}
595
596fn get_next_free_task_id<'a, S, C>(
597    batch: &mut WriteBatchRef<'_, 'a, S, C>,
598) -> Result<u32, anyhow::Error>
599where
600    S: SerialWriteBatch<'a>,
601    C: ConcurrentWriteBatch<'a>,
602{
603    Ok(
604        match batch.get(
605            KeySpace::Infra,
606            IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref(),
607        )? {
608            Some(bytes) => u32::from_le_bytes(Borrow::<[u8]>::borrow(&bytes).try_into()?),
609            None => 1,
610        },
611    )
612}
613
614fn save_infra<'a, S, C>(
615    batch: &mut WriteBatchRef<'_, 'a, S, C>,
616    next_task_id: u32,
617    session_id: SessionId,
618    operations: Vec<Arc<AnyOperation>>,
619) -> Result<(), anyhow::Error>
620where
621    S: SerialWriteBatch<'a>,
622    C: ConcurrentWriteBatch<'a>,
623{
624    {
625        batch
626            .put(
627                KeySpace::Infra,
628                WriteBuffer::Borrowed(IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref()),
629                WriteBuffer::Borrowed(&next_task_id.to_le_bytes()),
630            )
631            .with_context(|| anyhow!("Unable to write next free task id"))?;
632    }
633    {
634        let _span = tracing::trace_span!("update session id", session_id = ?session_id).entered();
635        batch
636            .put(
637                KeySpace::Infra,
638                WriteBuffer::Borrowed(IntKey::new(META_KEY_SESSION_ID).as_ref()),
639                WriteBuffer::Borrowed(&session_id.to_le_bytes()),
640            )
641            .with_context(|| anyhow!("Unable to write next session id"))?;
642    }
643    {
644        let _span =
645            tracing::trace_span!("update operations", operations = operations.len()).entered();
646        let operations = pot_serialize_small_vec(&operations)
647            .with_context(|| anyhow!("Unable to serialize operations"))?;
648        batch
649            .put(
650                KeySpace::Infra,
651                WriteBuffer::Borrowed(IntKey::new(META_KEY_OPERATIONS).as_ref()),
652                WriteBuffer::SmallVec(operations),
653            )
654            .with_context(|| anyhow!("Unable to write operations"))?;
655    }
656    batch.flush(KeySpace::Infra)?;
657    Ok(())
658}
659
660fn serialize_task_type(
661    task_type: &Arc<CachedTaskType>,
662    mut task_type_bytes: &mut Vec<u8>,
663    task_id: u32,
664) -> Result<()> {
665    task_type_bytes.clear();
666    POT_CONFIG
667        .serialize_into(&**task_type, &mut task_type_bytes)
668        .with_context(|| anyhow!("Unable to serialize task {task_id} cache key {task_type:?}"))?;
669    #[cfg(feature = "verify_serialization")]
670    {
671        let deserialize: Result<CachedTaskType, _> = serde_path_to_error::deserialize(
672            &mut pot_de_symbol_list().deserializer_for_slice(&*task_type_bytes)?,
673        );
674        if let Err(err) = deserialize {
675            println!("Task type would not be deserializable {task_id}: {err:?}\n{task_type:#?}");
676            panic!("Task type would not be deserializable {task_id}: {err:?}");
677        }
678    }
679    Ok(())
680}
681
682type SerializedTasks = Vec<
683    Vec<(
684        TaskId,
685        Option<WriteBuffer<'static>>,
686        Option<WriteBuffer<'static>>,
687    )>,
688>;
689
690fn process_task_data<'a, B: ConcurrentWriteBatch<'a> + Send + Sync, I>(
691    tasks: Vec<I>,
692    batch: Option<&B>,
693) -> Result<SerializedTasks>
694where
695    I: Iterator<
696            Item = (
697                TaskId,
698                Option<SmallVec<[u8; 16]>>,
699                Option<SmallVec<[u8; 16]>>,
700            ),
701        > + Send
702        + Sync,
703{
704    let span = Span::current();
705    let turbo_tasks = turbo_tasks::turbo_tasks();
706    let handle = tokio::runtime::Handle::current();
707    tasks
708        .into_par_iter()
709        .map(|tasks| {
710            let _span = span.clone().entered();
711            let _guard = handle.clone().enter();
712            turbo_tasks_scope(turbo_tasks.clone(), || {
713                let mut result = Vec::new();
714                for (task_id, meta, data) in tasks {
715                    if let Some(batch) = batch {
716                        let key = IntKey::new(*task_id);
717                        let key = key.as_ref();
718                        if let Some(meta) = meta {
719                            batch.put(
720                                KeySpace::TaskMeta,
721                                WriteBuffer::Borrowed(key),
722                                WriteBuffer::SmallVec(meta),
723                            )?;
724                        }
725                        if let Some(data) = data {
726                            batch.put(
727                                KeySpace::TaskData,
728                                WriteBuffer::Borrowed(key),
729                                WriteBuffer::SmallVec(data),
730                            )?;
731                        }
732                    } else {
733                        // Store the new task data
734                        result.push((
735                            task_id,
736                            meta.map(WriteBuffer::SmallVec),
737                            data.map(WriteBuffer::SmallVec),
738                        ));
739                    }
740                }
741
742                Ok(result)
743            })
744        })
745        .collect::<Result<Vec<_>>>()
746}
747
748fn serialize(task: TaskId, data: &Vec<CachedDataItem>) -> Result<SmallVec<[u8; 16]>> {
749    Ok(match pot_serialize_small_vec(data) {
750        #[cfg(not(feature = "verify_serialization"))]
751        Ok(value) => value,
752        _ => {
753            let mut error = Ok(());
754            let mut data = data.clone();
755            data.retain(|item| {
756                let mut buf = Vec::<u8>::new();
757                let mut symbol_map = pot_ser_symbol_map();
758                let mut serializer = symbol_map.serializer_for(&mut buf).unwrap();
759                if let Err(err) = serde_path_to_error::serialize(&item, &mut serializer) {
760                    if item.is_optional() {
761                        #[cfg(feature = "verify_serialization")]
762                        println!("Skipping non-serializable optional item for {task}: {item:?}");
763                    } else {
764                        error = Err(err).context({
765                            anyhow!("Unable to serialize data item for {task}: {item:?}")
766                        });
767                    }
768                    false
769                } else {
770                    #[cfg(feature = "verify_serialization")]
771                    {
772                        let deserialize: Result<CachedDataItem, _> =
773                            serde_path_to_error::deserialize(
774                                &mut pot_de_symbol_list().deserializer_for_slice(&buf).unwrap(),
775                            );
776                        if let Err(err) = deserialize {
777                            println!(
778                                "Data item would not be deserializable {task}: {err:?}\n{item:?}"
779                            );
780                            return false;
781                        }
782                    }
783                    true
784                }
785            });
786            error?;
787
788            pot_serialize_small_vec(&data)
789                .with_context(|| anyhow!("Unable to serialize data items for {task}: {data:#?}"))?
790        }
791    })
792}
793
794fn deserialize_with_good_error<'de, T: Deserialize<'de>>(data: &'de [u8]) -> Result<T> {
795    match POT_CONFIG.deserialize(data) {
796        Ok(value) => Ok(value),
797        Err(error) => serde_path_to_error::deserialize::<'_, _, T>(
798            &mut pot_de_symbol_list().deserializer_for_slice(data)?,
799        )
800        .map_err(anyhow::Error::from)
801        .and(Err(error.into()))
802        .context("Deserialization failed"),
803    }
804}