turbo_tasks_backend/
kv_backing_storage.rs

1use std::{
2    borrow::Borrow,
3    env,
4    path::PathBuf,
5    sync::{Arc, LazyLock, Mutex, PoisonError, Weak},
6};
7
8use anyhow::{Context, Result};
9use turbo_bincode::{
10    TurboBincodeBuffer, turbo_bincode_decode, turbo_bincode_encode, turbo_bincode_encode_into,
11};
12use turbo_tasks::{
13    TaskId,
14    backend::CachedTaskType,
15    panic_hooks::{PanicHookGuard, register_panic_hook},
16    parallel,
17};
18
19use crate::{
20    GitVersionInfo,
21    backend::{AnyOperation, TaskDataCategory},
22    backing_storage::{BackingStorage, BackingStorageSealed},
23    data::CachedDataItem,
24    database::{
25        db_invalidation::{StartupCacheState, check_db_invalidation_and_cleanup, invalidate_db},
26        db_versioning::handle_db_versioning,
27        key_value_database::{KeySpace, KeyValueDatabase},
28        write_batch::{
29            BaseWriteBatch, ConcurrentWriteBatch, SerialWriteBatch, WriteBatch, WriteBatchRef,
30            WriteBuffer,
31        },
32    },
33    db_invalidation::invalidation_reasons,
34    utils::chunked_vec::ChunkedVec,
35};
36
37const META_KEY_OPERATIONS: u32 = 0;
38const META_KEY_NEXT_FREE_TASK_ID: u32 = 1;
39
40struct IntKey([u8; 4]);
41
42impl IntKey {
43    fn new(value: u32) -> Self {
44        Self(value.to_le_bytes())
45    }
46}
47
48impl AsRef<[u8]> for IntKey {
49    fn as_ref(&self) -> &[u8] {
50        &self.0
51    }
52}
53
54fn as_u32(bytes: impl Borrow<[u8]>) -> Result<u32> {
55    let n = u32::from_le_bytes(bytes.borrow().try_into()?);
56    Ok(n)
57}
58
59// We want to invalidate the cache on panic for most users, but this is a band-aid to underlying
60// problems in turbo-tasks.
61//
62// If we invalidate the cache upon panic and it "fixes" the issue upon restart, users typically
63// won't report bugs to us, and we'll never find root-causes for these problems.
64//
65// These overrides let us avoid the cache invalidation / error suppression within Vercel so that we
66// feel these pain points and fix the root causes of bugs.
67fn should_invalidate_on_panic() -> bool {
68    fn env_is_falsy(key: &str) -> bool {
69        env::var_os(key)
70            .is_none_or(|value| ["".as_ref(), "0".as_ref(), "false".as_ref()].contains(&&*value))
71    }
72    static SHOULD_INVALIDATE: LazyLock<bool> = LazyLock::new(|| {
73        env_is_falsy("TURBO_ENGINE_SKIP_INVALIDATE_ON_PANIC") && env_is_falsy("__NEXT_TEST_MODE")
74    });
75    *SHOULD_INVALIDATE
76}
77
78pub struct KeyValueDatabaseBackingStorageInner<T: KeyValueDatabase> {
79    database: T,
80    /// Used when calling [`BackingStorage::invalidate`]. Can be `None` in the memory-only/no-op
81    /// storage case.
82    base_path: Option<PathBuf>,
83    /// Used to skip calling [`invalidate_db`] when the database has already been invalidated.
84    invalidated: Mutex<bool>,
85    /// We configure a panic hook to invalidate the cache. This guard cleans up our panic hook upon
86    /// drop.
87    _panic_hook_guard: Option<PanicHookGuard>,
88}
89
90pub struct KeyValueDatabaseBackingStorage<T: KeyValueDatabase> {
91    // wrapped so that `register_panic_hook` can hold a weak reference to `inner`.
92    inner: Arc<KeyValueDatabaseBackingStorageInner<T>>,
93}
94
95/// A wrapper type used by [`crate::turbo_backing_storage`] and [`crate::noop_backing_storage`].
96///
97/// Wraps a low-level key-value database into a higher-level [`BackingStorage`] type.
98impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorage<T> {
99    pub(crate) fn new_in_memory(database: T) -> Self {
100        Self {
101            inner: Arc::new(KeyValueDatabaseBackingStorageInner {
102                database,
103                base_path: None,
104                invalidated: Mutex::new(false),
105                _panic_hook_guard: None,
106            }),
107        }
108    }
109
110    /// Handles boilerplate logic for an on-disk persisted database with versioning.
111    ///
112    /// - Creates a directory per version, with a maximum number of old versions and performs
113    ///   automatic cleanup of old versions.
114    /// - Checks for a database invalidation marker file, and cleans up the database as needed.
115    /// - [Registers a dynamic panic hook][turbo_tasks::panic_hooks] to invalidate the database upon
116    ///   a panic. This invalidates the database using [`invalidation_reasons::PANIC`].
117    ///
118    /// Along with returning a [`KeyValueDatabaseBackingStorage`], this returns a
119    /// [`StartupCacheState`], which can be used by the application for logging information to the
120    /// user or telemetry about the cache.
121    pub(crate) fn open_versioned_on_disk(
122        base_path: PathBuf,
123        version_info: &GitVersionInfo,
124        is_ci: bool,
125        database: impl FnOnce(PathBuf) -> Result<T>,
126    ) -> Result<(Self, StartupCacheState)>
127    where
128        T: Send + Sync + 'static,
129    {
130        let startup_cache_state = check_db_invalidation_and_cleanup(&base_path)
131            .context("Failed to check database invalidation and cleanup")?;
132        let versioned_path = handle_db_versioning(&base_path, version_info, is_ci)
133            .context("Failed to handle database versioning")?;
134        let database = (database)(versioned_path).context("Failed to open database")?;
135        let backing_storage = Self {
136            inner: Arc::new_cyclic(
137                move |weak_inner: &Weak<KeyValueDatabaseBackingStorageInner<T>>| {
138                    let panic_hook_guard = if should_invalidate_on_panic() {
139                        let weak_inner = weak_inner.clone();
140                        Some(register_panic_hook(Box::new(move |_| {
141                            let Some(inner) = weak_inner.upgrade() else {
142                                return;
143                            };
144                            // If a panic happened that must mean something deep inside of turbopack
145                            // or turbo-tasks failed, and it may be hard to recover. We don't want
146                            // the cache to stick around, as that may persist bugs. Make a
147                            // best-effort attempt to invalidate the database (ignoring failures).
148                            let _ = inner.invalidate(invalidation_reasons::PANIC);
149                        })))
150                    } else {
151                        None
152                    };
153                    KeyValueDatabaseBackingStorageInner {
154                        database,
155                        base_path: Some(base_path),
156                        invalidated: Mutex::new(false),
157                        _panic_hook_guard: panic_hook_guard,
158                    }
159                },
160            ),
161        };
162        Ok((backing_storage, startup_cache_state))
163    }
164}
165
166impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorageInner<T> {
167    fn with_tx<R>(
168        &self,
169        tx: Option<&T::ReadTransaction<'_>>,
170        f: impl FnOnce(&T::ReadTransaction<'_>) -> Result<R>,
171    ) -> Result<R> {
172        if let Some(tx) = tx {
173            f(tx)
174        } else {
175            let tx = self.database.begin_read_transaction()?;
176            let r = f(&tx)?;
177            drop(tx);
178            Ok(r)
179        }
180    }
181
182    fn invalidate(&self, reason_code: &str) -> Result<()> {
183        // `base_path` can be `None` for a `NoopKvDb`
184        if let Some(base_path) = &self.base_path {
185            // Invalidation could happen frequently if there's a bunch of panics. We only need to
186            // invalidate once, so grab a lock.
187            let mut invalidated_guard = self
188                .invalidated
189                .lock()
190                .unwrap_or_else(PoisonError::into_inner);
191            if *invalidated_guard {
192                return Ok(());
193            }
194            // Invalidate first, as it's a very fast atomic operation. `prevent_writes` is allowed
195            // to be slower (e.g. wait for a lock) and is allowed to corrupt the database with
196            // partial writes.
197            invalidate_db(base_path, reason_code)?;
198            self.database.prevent_writes();
199            // Avoid redundant invalidations from future panics
200            *invalidated_guard = true;
201        }
202        Ok(())
203    }
204
205    /// Used to read the next free task ID from the database.
206    fn get_infra_u32(&self, key: u32) -> Result<Option<u32>> {
207        let tx = self.database.begin_read_transaction()?;
208        self.database
209            .get(&tx, KeySpace::Infra, IntKey::new(key).as_ref())?
210            .map(as_u32)
211            .transpose()
212    }
213}
214
215impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
216    for KeyValueDatabaseBackingStorage<T>
217{
218    fn invalidate(&self, reason_code: &str) -> Result<()> {
219        self.inner.invalidate(reason_code)
220    }
221}
222
223impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
224    for KeyValueDatabaseBackingStorage<T>
225{
226    type ReadTransaction<'l> = T::ReadTransaction<'l>;
227
228    fn next_free_task_id(&self) -> Result<TaskId> {
229        Ok(self
230            .inner
231            .get_infra_u32(META_KEY_NEXT_FREE_TASK_ID)
232            .context("Unable to read next free task id from database")?
233            .map_or(Ok(TaskId::MIN), TaskId::try_from)?)
234    }
235
236    fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>> {
237        fn get(database: &impl KeyValueDatabase) -> Result<Vec<AnyOperation>> {
238            let tx = database.begin_read_transaction()?;
239            let Some(operations) = database.get(
240                &tx,
241                KeySpace::Infra,
242                IntKey::new(META_KEY_OPERATIONS).as_ref(),
243            )?
244            else {
245                return Ok(Vec::new());
246            };
247            let operations = turbo_bincode_decode(operations.borrow())?;
248            Ok(operations)
249        }
250        get(&self.inner.database).context("Unable to read uncompleted operations from database")
251    }
252
253    fn serialize(&self, task: TaskId, data: &Vec<CachedDataItem>) -> Result<TurboBincodeBuffer> {
254        encode_task_data(task, data)
255    }
256
257    fn save_snapshot<I>(
258        &self,
259        operations: Vec<Arc<AnyOperation>>,
260        task_cache_updates: Vec<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>,
261        snapshots: Vec<I>,
262    ) -> Result<()>
263    where
264        I: Iterator<
265                Item = (
266                    TaskId,
267                    Option<TurboBincodeBuffer>,
268                    Option<TurboBincodeBuffer>,
269                ),
270            > + Send
271            + Sync,
272    {
273        let _span = tracing::info_span!("save snapshot", operations = operations.len()).entered();
274        let mut batch = self.inner.database.write_batch()?;
275
276        // these buffers should be large, because they're temporary and re-used.
277        const INITIAL_ENCODE_BUFFER_CAPACITY: usize = 1024;
278
279        // Start organizing the updates in parallel
280        match &mut batch {
281            &mut WriteBatch::Concurrent(ref batch, _) => {
282                {
283                    let _span = tracing::trace_span!("update task data").entered();
284                    process_task_data(snapshots, Some(batch))?;
285                    let span = tracing::trace_span!("flush task data").entered();
286                    parallel::try_for_each(
287                        &[KeySpace::TaskMeta, KeySpace::TaskData],
288                        |&key_space| {
289                            let _span = span.clone().entered();
290                            // Safety: We already finished all processing of the task data and task
291                            // meta
292                            unsafe { batch.flush(key_space) }
293                        },
294                    )?;
295                }
296
297                let mut next_task_id = get_next_free_task_id::<
298                    T::SerialWriteBatch<'_>,
299                    T::ConcurrentWriteBatch<'_>,
300                >(&mut WriteBatchRef::concurrent(batch))?;
301
302                {
303                    let _span = tracing::trace_span!(
304                        "update task cache",
305                        items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
306                    )
307                    .entered();
308                    let max_task_id = parallel::map_collect_owned::<_, _, Result<Vec<_>>>(
309                        task_cache_updates,
310                        |updates| {
311                            let _span = _span.clone().entered();
312                            let mut max_task_id = 0;
313
314                            // Re-use the same buffer across every `serialize_task_type` call in
315                            // this chunk. `ConcurrentWriteBatch::put` will copy the data out of
316                            // this buffer into smaller exact-sized vecs.
317                            let mut task_type_bytes =
318                                TurboBincodeBuffer::with_capacity(INITIAL_ENCODE_BUFFER_CAPACITY);
319                            for (task_type, task_id) in updates {
320                                task_type_bytes.clear();
321                                encode_task_type(&task_type, &mut task_type_bytes, Some(task_id))?;
322                                let task_id: u32 = *task_id;
323
324                                batch
325                                    .put(
326                                        KeySpace::ForwardTaskCache,
327                                        WriteBuffer::Borrowed(&task_type_bytes),
328                                        WriteBuffer::Borrowed(&task_id.to_le_bytes()),
329                                    )
330                                    .with_context(|| {
331                                        format!(
332                                            "Unable to write task cache {task_type:?} => {task_id}"
333                                        )
334                                    })?;
335                                batch
336                                    .put(
337                                        KeySpace::ReverseTaskCache,
338                                        WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()),
339                                        WriteBuffer::Borrowed(&task_type_bytes),
340                                    )
341                                    .with_context(|| {
342                                        format!(
343                                            "Unable to write task cache {task_id} => {task_type:?}"
344                                        )
345                                    })?;
346                                max_task_id = max_task_id.max(task_id);
347                            }
348
349                            Ok(max_task_id)
350                        },
351                    )?
352                    .into_iter()
353                    .max()
354                    .unwrap_or(0);
355                    next_task_id = next_task_id.max(max_task_id + 1);
356                }
357
358                save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
359                    &mut WriteBatchRef::concurrent(batch),
360                    next_task_id,
361                    operations,
362                )?;
363            }
364            WriteBatch::Serial(batch) => {
365                {
366                    let _span = tracing::trace_span!("update tasks").entered();
367                    let task_items =
368                        process_task_data(snapshots, None::<&T::ConcurrentWriteBatch<'_>>)?;
369                    for (task_id, meta, data) in task_items.into_iter().flatten() {
370                        let key = IntKey::new(*task_id);
371                        let key = key.as_ref();
372                        if let Some(meta) = meta {
373                            batch
374                                .put(KeySpace::TaskMeta, WriteBuffer::Borrowed(key), meta)
375                                .with_context(|| {
376                                    format!("Unable to write meta items for {task_id}")
377                                })?;
378                        }
379                        if let Some(data) = data {
380                            batch
381                                .put(KeySpace::TaskData, WriteBuffer::Borrowed(key), data)
382                                .with_context(|| {
383                                    format!("Unable to write data items for {task_id}")
384                                })?;
385                        }
386                    }
387                    batch.flush(KeySpace::TaskMeta)?;
388                    batch.flush(KeySpace::TaskData)?;
389                }
390
391                let mut next_task_id = get_next_free_task_id::<
392                    T::SerialWriteBatch<'_>,
393                    T::ConcurrentWriteBatch<'_>,
394                >(&mut WriteBatchRef::serial(batch))?;
395
396                {
397                    let _span = tracing::trace_span!(
398                        "update task cache",
399                        items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
400                    )
401                    .entered();
402                    // Re-use the same buffer across every `serialize_task_type` call.
403                    // `ConcurrentWriteBatch::put` will copy the data out of this buffer into
404                    // smaller exact-sized vecs.
405                    let mut task_type_bytes =
406                        TurboBincodeBuffer::with_capacity(INITIAL_ENCODE_BUFFER_CAPACITY);
407                    for (task_type, task_id) in task_cache_updates.into_iter().flatten() {
408                        encode_task_type(&task_type, &mut task_type_bytes, Some(task_id))?;
409                        let task_id = *task_id;
410
411                        batch
412                            .put(
413                                KeySpace::ForwardTaskCache,
414                                WriteBuffer::Borrowed(&task_type_bytes),
415                                WriteBuffer::Borrowed(&task_id.to_le_bytes()),
416                            )
417                            .with_context(|| {
418                                format!("Unable to write task cache {task_type:?} => {task_id}")
419                            })?;
420                        batch
421                            .put(
422                                KeySpace::ReverseTaskCache,
423                                WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()),
424                                WriteBuffer::Borrowed(&task_type_bytes),
425                            )
426                            .with_context(|| {
427                                format!("Unable to write task cache {task_id} => {task_type:?}")
428                            })?;
429                        next_task_id = next_task_id.max(task_id + 1);
430                    }
431                }
432
433                save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
434                    &mut WriteBatchRef::serial(batch),
435                    next_task_id,
436                    operations,
437                )?;
438            }
439        }
440
441        {
442            let _span = tracing::trace_span!("commit").entered();
443            batch.commit().context("Unable to commit operations")?;
444        }
445        Ok(())
446    }
447
448    fn start_read_transaction(&self) -> Option<Self::ReadTransaction<'_>> {
449        self.inner.database.begin_read_transaction().ok()
450    }
451
452    unsafe fn forward_lookup_task_cache(
453        &self,
454        tx: Option<&T::ReadTransaction<'_>>,
455        task_type: &CachedTaskType,
456    ) -> Result<Option<TaskId>> {
457        let inner = &*self.inner;
458        fn lookup<D: KeyValueDatabase>(
459            database: &D,
460            tx: &D::ReadTransaction<'_>,
461            task_type: &CachedTaskType,
462        ) -> Result<Option<TaskId>> {
463            let mut task_type_bytes = TurboBincodeBuffer::new();
464            encode_task_type(task_type, &mut task_type_bytes, None)?;
465            let Some(bytes) = database.get(tx, KeySpace::ForwardTaskCache, &task_type_bytes)?
466            else {
467                return Ok(None);
468            };
469            let bytes = bytes.borrow().try_into()?;
470            let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
471            Ok(Some(id))
472        }
473        if inner.database.is_empty() {
474            // Checking if the database is empty is a performance optimization
475            // to avoid serializing the task type.
476            return Ok(None);
477        }
478        inner
479            .with_tx(tx, |tx| lookup(&self.inner.database, tx, task_type))
480            .with_context(|| format!("Looking up task id for {task_type:?} from database failed"))
481    }
482
483    unsafe fn reverse_lookup_task_cache(
484        &self,
485        tx: Option<&T::ReadTransaction<'_>>,
486        task_id: TaskId,
487    ) -> Result<Option<Arc<CachedTaskType>>> {
488        let inner = &*self.inner;
489        fn lookup<D: KeyValueDatabase>(
490            database: &D,
491            tx: &D::ReadTransaction<'_>,
492            task_id: TaskId,
493        ) -> Result<Option<Arc<CachedTaskType>>> {
494            let Some(bytes) = database.get(
495                tx,
496                KeySpace::ReverseTaskCache,
497                IntKey::new(*task_id).as_ref(),
498            )?
499            else {
500                return Ok(None);
501            };
502            Ok(Some(turbo_bincode_decode(bytes.borrow())?))
503        }
504        inner
505            .with_tx(tx, |tx| lookup(&inner.database, tx, task_id))
506            .with_context(|| format!("Looking up task type for {task_id} from database failed"))
507    }
508
509    unsafe fn lookup_data(
510        &self,
511        tx: Option<&T::ReadTransaction<'_>>,
512        task_id: TaskId,
513        category: TaskDataCategory,
514    ) -> Result<Vec<CachedDataItem>> {
515        let inner = &*self.inner;
516        fn lookup<D: KeyValueDatabase>(
517            database: &D,
518            tx: &D::ReadTransaction<'_>,
519            task_id: TaskId,
520            category: TaskDataCategory,
521        ) -> Result<Vec<CachedDataItem>> {
522            let Some(bytes) = database.get(
523                tx,
524                category_to_key_space(category),
525                IntKey::new(*task_id).as_ref(),
526            )?
527            else {
528                return Ok(Vec::new());
529            };
530            let result: Vec<CachedDataItem> = turbo_bincode_decode(bytes.borrow())?;
531            Ok(result)
532        }
533        inner
534            .with_tx(tx, |tx| lookup(&inner.database, tx, task_id, category))
535            .with_context(|| format!("Looking up data for {task_id} from database failed"))
536    }
537
538    unsafe fn batch_lookup_data(
539        &self,
540        tx: Option<&Self::ReadTransaction<'_>>,
541        task_ids: &[TaskId],
542        category: TaskDataCategory,
543    ) -> Result<Vec<Vec<CachedDataItem>>> {
544        let inner = &*self.inner;
545        fn lookup<D: KeyValueDatabase>(
546            database: &D,
547            tx: &D::ReadTransaction<'_>,
548            task_ids: &[TaskId],
549            category: TaskDataCategory,
550        ) -> Result<Vec<Vec<CachedDataItem>>> {
551            let int_keys: Vec<_> = task_ids.iter().map(|&id| IntKey::new(*id)).collect();
552            let keys = int_keys.iter().map(|k| k.as_ref()).collect::<Vec<_>>();
553            let bytes = database.batch_get(
554                tx,
555                match category {
556                    TaskDataCategory::Meta => KeySpace::TaskMeta,
557                    TaskDataCategory::Data => KeySpace::TaskData,
558                    TaskDataCategory::All => unreachable!(),
559                },
560                &keys,
561            )?;
562            bytes
563                .into_iter()
564                .map(|opt_bytes| {
565                    if let Some(bytes) = opt_bytes {
566                        let result: Vec<CachedDataItem> = turbo_bincode_decode(bytes.borrow())?;
567                        Ok(result)
568                    } else {
569                        Ok(Vec::new())
570                    }
571                })
572                .collect::<Result<Vec<_>>>()
573        }
574        inner
575            .with_tx(tx, |tx| lookup(&inner.database, tx, task_ids, category))
576            .with_context(|| {
577                format!(
578                    "Looking up data for {} tasks from database failed",
579                    task_ids.len()
580                )
581            })
582    }
583
584    fn shutdown(&self) -> Result<()> {
585        self.inner.database.shutdown()
586    }
587}
588
589fn get_next_free_task_id<'a, S, C>(
590    batch: &mut WriteBatchRef<'_, 'a, S, C>,
591) -> Result<u32, anyhow::Error>
592where
593    S: SerialWriteBatch<'a>,
594    C: ConcurrentWriteBatch<'a>,
595{
596    Ok(
597        match batch.get(
598            KeySpace::Infra,
599            IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref(),
600        )? {
601            Some(bytes) => u32::from_le_bytes(Borrow::<[u8]>::borrow(&bytes).try_into()?),
602            None => 1,
603        },
604    )
605}
606
607fn save_infra<'a, S, C>(
608    batch: &mut WriteBatchRef<'_, 'a, S, C>,
609    next_task_id: u32,
610    operations: Vec<Arc<AnyOperation>>,
611) -> Result<(), anyhow::Error>
612where
613    S: SerialWriteBatch<'a>,
614    C: ConcurrentWriteBatch<'a>,
615{
616    {
617        batch
618            .put(
619                KeySpace::Infra,
620                WriteBuffer::Borrowed(IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref()),
621                WriteBuffer::Borrowed(&next_task_id.to_le_bytes()),
622            )
623            .context("Unable to write next free task id")?;
624    }
625    {
626        let _span =
627            tracing::trace_span!("update operations", operations = operations.len()).entered();
628        let operations =
629            turbo_bincode_encode(&operations).context("Unable to serialize operations")?;
630        batch
631            .put(
632                KeySpace::Infra,
633                WriteBuffer::Borrowed(IntKey::new(META_KEY_OPERATIONS).as_ref()),
634                WriteBuffer::SmallVec(operations),
635            )
636            .context("Unable to write operations")?;
637    }
638    batch.flush(KeySpace::Infra)?;
639    Ok(())
640}
641
642fn encode_task_type(
643    task_type: &CachedTaskType,
644    buffer: &mut TurboBincodeBuffer,
645    task_id: Option<TaskId>,
646) -> Result<()> {
647    fn encode_once_into(
648        task_type: &CachedTaskType,
649        buffer: &mut TurboBincodeBuffer,
650        task_id: Option<TaskId>,
651    ) -> Result<()> {
652        turbo_bincode_encode_into(task_type, buffer).with_context(|| {
653            if let Some(task_id) = task_id {
654                format!("Unable to serialize task {task_id} cache key {task_type:?}")
655            } else {
656                format!("Unable to serialize task cache key {task_type:?}")
657            }
658        })
659    }
660
661    debug_assert!(buffer.is_empty());
662    encode_once_into(task_type, buffer, task_id)?;
663
664    if cfg!(feature = "verify_serialization") {
665        macro_rules! println_and_panic {
666            ($($tt:tt)*) => {
667                println!($($tt)*);
668                panic!($($tt)*);
669            };
670        }
671        let deserialize: Result<CachedTaskType, _> = turbo_bincode_decode(buffer);
672        match deserialize {
673            Err(err) => {
674                println_and_panic!("Task type would not be deserializable:\n{err:?}");
675            }
676            Ok(task_type2) => {
677                if &task_type2 != task_type {
678                    println_and_panic!(
679                        "Task type would not round-trip {task_id:?}:\noriginal: \
680                         {task_type:#?}\nround-tripped: {task_type2:#?}"
681                    );
682                }
683                let mut buffer2 = TurboBincodeBuffer::new();
684                match encode_once_into(&task_type2, &mut buffer2, task_id) {
685                    Err(err) => {
686                        println_and_panic!(
687                            "Task type would not be serializable the second time:\n{err:?}"
688                        );
689                    }
690                    Ok(()) => {
691                        if buffer2 != *buffer {
692                            println_and_panic!(
693                                "Task type would not serialize to the same bytes the second time \
694                                 {task_id:?}:\noriginal: {:x?}\nsecond: {:x?}\n{task_type2:#?}",
695                                buffer,
696                                buffer2
697                            );
698                        }
699                    }
700                }
701            }
702        }
703    }
704
705    Ok(())
706}
707
708type SerializedTasks = Vec<
709    Vec<(
710        TaskId,
711        Option<WriteBuffer<'static>>,
712        Option<WriteBuffer<'static>>,
713    )>,
714>;
715
716fn process_task_data<'a, B: ConcurrentWriteBatch<'a> + Send + Sync, I>(
717    tasks: Vec<I>,
718    batch: Option<&B>,
719) -> Result<SerializedTasks>
720where
721    I: Iterator<
722            Item = (
723                TaskId,
724                Option<TurboBincodeBuffer>,
725                Option<TurboBincodeBuffer>,
726            ),
727        > + Send
728        + Sync,
729{
730    parallel::map_collect_owned::<_, _, Result<Vec<_>>>(tasks, |tasks| {
731        let mut result = Vec::new();
732        for (task_id, meta, data) in tasks {
733            if let Some(batch) = batch {
734                let key = IntKey::new(*task_id);
735                let key = key.as_ref();
736                if let Some(meta) = meta {
737                    batch.put(
738                        KeySpace::TaskMeta,
739                        WriteBuffer::Borrowed(key),
740                        WriteBuffer::SmallVec(meta),
741                    )?;
742                }
743                if let Some(data) = data {
744                    batch.put(
745                        KeySpace::TaskData,
746                        WriteBuffer::Borrowed(key),
747                        WriteBuffer::SmallVec(data),
748                    )?;
749                }
750            } else {
751                // Store the new task data
752                result.push((
753                    task_id,
754                    meta.map(WriteBuffer::SmallVec),
755                    data.map(WriteBuffer::SmallVec),
756                ));
757            }
758        }
759
760        Ok(result)
761    })
762}
763
764fn encode_task_data(task: TaskId, data: &Vec<CachedDataItem>) -> Result<TurboBincodeBuffer> {
765    let orig_result = turbo_bincode_encode(data);
766    if !cfg!(feature = "verify_serialization")
767        && let Ok(value) = orig_result
768    {
769        return Ok(value);
770    }
771
772    let mut error = Ok(());
773    let mut filtered_data = data.clone();
774    filtered_data.retain(|item| match turbo_bincode_encode(&item) {
775        Ok(buf) => {
776            if cfg!(feature = "verify_serialization") {
777                let deserialized = turbo_bincode_decode::<CachedDataItem>(&buf);
778                if let Err(err) = deserialized {
779                    println!("Data item would not be deserializable {task}: {err:?}\n{item:?}");
780                    return false;
781                }
782            }
783            true
784        }
785        Err(err) => {
786            if item.is_optional() {
787                if cfg!(feature = "verify_serialization") {
788                    println!(
789                        "Skipping non-encodable optional item for {task}: {item:?} due to {err}"
790                    );
791                }
792            } else {
793                error =
794                    Err(err).context(format!("Unable to encode data item for {task}: {item:?}"));
795            }
796            false
797        }
798    });
799    error?;
800
801    (if filtered_data.len() == data.len() {
802        orig_result
803    } else {
804        turbo_bincode_encode(&filtered_data)
805    })
806    .with_context(|| format!("Unable to serialize data items for {task}: {filtered_data:#?}"))
807}
808
809fn category_to_key_space(category: TaskDataCategory) -> KeySpace {
810    match category {
811        TaskDataCategory::Meta => KeySpace::TaskMeta,
812        TaskDataCategory::Data => KeySpace::TaskData,
813        TaskDataCategory::All => unreachable!(),
814    }
815}