turbo_tasks_backend/
kv_backing_storage.rs

1use std::{
2    borrow::Borrow,
3    env,
4    path::PathBuf,
5    sync::{Arc, LazyLock, Mutex, PoisonError, Weak},
6};
7
8use anyhow::{Context, Result, anyhow};
9use turbo_bincode::{
10    TurboBincodeBuffer, turbo_bincode_decode, turbo_bincode_encode, turbo_bincode_encode_into,
11};
12use turbo_tasks::{
13    TaskId,
14    backend::CachedTaskType,
15    panic_hooks::{PanicHookGuard, register_panic_hook},
16    parallel,
17};
18
19use crate::{
20    GitVersionInfo,
21    backend::{AnyOperation, TaskDataCategory},
22    backing_storage::{BackingStorage, BackingStorageSealed},
23    data::CachedDataItem,
24    database::{
25        db_invalidation::{StartupCacheState, check_db_invalidation_and_cleanup, invalidate_db},
26        db_versioning::handle_db_versioning,
27        key_value_database::{KeySpace, KeyValueDatabase},
28        write_batch::{
29            BaseWriteBatch, ConcurrentWriteBatch, SerialWriteBatch, WriteBatch, WriteBatchRef,
30            WriteBuffer,
31        },
32    },
33    db_invalidation::invalidation_reasons,
34    utils::chunked_vec::ChunkedVec,
35};
36
37const META_KEY_OPERATIONS: u32 = 0;
38const META_KEY_NEXT_FREE_TASK_ID: u32 = 1;
39
40struct IntKey([u8; 4]);
41
42impl IntKey {
43    fn new(value: u32) -> Self {
44        Self(value.to_le_bytes())
45    }
46}
47
48impl AsRef<[u8]> for IntKey {
49    fn as_ref(&self) -> &[u8] {
50        &self.0
51    }
52}
53
54fn as_u32(bytes: impl Borrow<[u8]>) -> Result<u32> {
55    let n = u32::from_le_bytes(bytes.borrow().try_into()?);
56    Ok(n)
57}
58
59// We want to invalidate the cache on panic for most users, but this is a band-aid to underlying
60// problems in turbo-tasks.
61//
62// If we invalidate the cache upon panic and it "fixes" the issue upon restart, users typically
63// won't report bugs to us, and we'll never find root-causes for these problems.
64//
65// These overrides let us avoid the cache invalidation / error suppression within Vercel so that we
66// feel these pain points and fix the root causes of bugs.
67fn should_invalidate_on_panic() -> bool {
68    fn env_is_falsy(key: &str) -> bool {
69        env::var_os(key)
70            .is_none_or(|value| ["".as_ref(), "0".as_ref(), "false".as_ref()].contains(&&*value))
71    }
72    static SHOULD_INVALIDATE: LazyLock<bool> = LazyLock::new(|| {
73        env_is_falsy("TURBO_ENGINE_SKIP_INVALIDATE_ON_PANIC") && env_is_falsy("__NEXT_TEST_MODE")
74    });
75    *SHOULD_INVALIDATE
76}
77
78pub struct KeyValueDatabaseBackingStorageInner<T: KeyValueDatabase> {
79    database: T,
80    /// Used when calling [`BackingStorage::invalidate`]. Can be `None` in the memory-only/no-op
81    /// storage case.
82    base_path: Option<PathBuf>,
83    /// Used to skip calling [`invalidate_db`] when the database has already been invalidated.
84    invalidated: Mutex<bool>,
85    /// We configure a panic hook to invalidate the cache. This guard cleans up our panic hook upon
86    /// drop.
87    _panic_hook_guard: Option<PanicHookGuard>,
88}
89
90pub struct KeyValueDatabaseBackingStorage<T: KeyValueDatabase> {
91    // wrapped so that `register_panic_hook` can hold a weak reference to `inner`.
92    inner: Arc<KeyValueDatabaseBackingStorageInner<T>>,
93}
94
95/// A wrapper type used by [`crate::turbo_backing_storage`] and [`crate::noop_backing_storage`].
96///
97/// Wraps a low-level key-value database into a higher-level [`BackingStorage`] type.
98impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorage<T> {
99    pub(crate) fn new_in_memory(database: T) -> Self {
100        Self {
101            inner: Arc::new(KeyValueDatabaseBackingStorageInner {
102                database,
103                base_path: None,
104                invalidated: Mutex::new(false),
105                _panic_hook_guard: None,
106            }),
107        }
108    }
109
110    /// Handles boilerplate logic for an on-disk persisted database with versioning.
111    ///
112    /// - Creates a directory per version, with a maximum number of old versions and performs
113    ///   automatic cleanup of old versions.
114    /// - Checks for a database invalidation marker file, and cleans up the database as needed.
115    /// - [Registers a dynamic panic hook][turbo_tasks::panic_hooks] to invalidate the database upon
116    ///   a panic. This invalidates the database using [`invalidation_reasons::PANIC`].
117    ///
118    /// Along with returning a [`KeyValueDatabaseBackingStorage`], this returns a
119    /// [`StartupCacheState`], which can be used by the application for logging information to the
120    /// user or telemetry about the cache.
121    pub(crate) fn open_versioned_on_disk(
122        base_path: PathBuf,
123        version_info: &GitVersionInfo,
124        is_ci: bool,
125        database: impl FnOnce(PathBuf) -> Result<T>,
126    ) -> Result<(Self, StartupCacheState)>
127    where
128        T: Send + Sync + 'static,
129    {
130        let startup_cache_state = check_db_invalidation_and_cleanup(&base_path)
131            .context("Failed to check database invalidation and cleanup")?;
132        let versioned_path = handle_db_versioning(&base_path, version_info, is_ci)
133            .context("Failed to handle database versioning")?;
134        let database = (database)(versioned_path).context("Failed to open database")?;
135        let backing_storage = Self {
136            inner: Arc::new_cyclic(
137                move |weak_inner: &Weak<KeyValueDatabaseBackingStorageInner<T>>| {
138                    let panic_hook_guard = if should_invalidate_on_panic() {
139                        let weak_inner = weak_inner.clone();
140                        Some(register_panic_hook(Box::new(move |_| {
141                            let Some(inner) = weak_inner.upgrade() else {
142                                return;
143                            };
144                            // If a panic happened that must mean something deep inside of turbopack
145                            // or turbo-tasks failed, and it may be hard to recover. We don't want
146                            // the cache to stick around, as that may persist bugs. Make a
147                            // best-effort attempt to invalidate the database (ignoring failures).
148                            let _ = inner.invalidate(invalidation_reasons::PANIC);
149                        })))
150                    } else {
151                        None
152                    };
153                    KeyValueDatabaseBackingStorageInner {
154                        database,
155                        base_path: Some(base_path),
156                        invalidated: Mutex::new(false),
157                        _panic_hook_guard: panic_hook_guard,
158                    }
159                },
160            ),
161        };
162        Ok((backing_storage, startup_cache_state))
163    }
164}
165
166impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorageInner<T> {
167    fn with_tx<R>(
168        &self,
169        tx: Option<&T::ReadTransaction<'_>>,
170        f: impl FnOnce(&T::ReadTransaction<'_>) -> Result<R>,
171    ) -> Result<R> {
172        if let Some(tx) = tx {
173            f(tx)
174        } else {
175            let tx = self.database.begin_read_transaction()?;
176            let r = f(&tx)?;
177            drop(tx);
178            Ok(r)
179        }
180    }
181
182    fn invalidate(&self, reason_code: &str) -> Result<()> {
183        // `base_path` can be `None` for a `NoopKvDb`
184        if let Some(base_path) = &self.base_path {
185            // Invalidation could happen frequently if there's a bunch of panics. We only need to
186            // invalidate once, so grab a lock.
187            let mut invalidated_guard = self
188                .invalidated
189                .lock()
190                .unwrap_or_else(PoisonError::into_inner);
191            if *invalidated_guard {
192                return Ok(());
193            }
194            // Invalidate first, as it's a very fast atomic operation. `prevent_writes` is allowed
195            // to be slower (e.g. wait for a lock) and is allowed to corrupt the database with
196            // partial writes.
197            invalidate_db(base_path, reason_code)?;
198            self.database.prevent_writes();
199            // Avoid redundant invalidations from future panics
200            *invalidated_guard = true;
201        }
202        Ok(())
203    }
204
205    /// Used to read the next free task ID from the database.
206    fn get_infra_u32(&self, key: u32) -> Result<Option<u32>> {
207        let tx = self.database.begin_read_transaction()?;
208        self.database
209            .get(&tx, KeySpace::Infra, IntKey::new(key).as_ref())?
210            .map(as_u32)
211            .transpose()
212    }
213}
214
215impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
216    for KeyValueDatabaseBackingStorage<T>
217{
218    fn invalidate(&self, reason_code: &str) -> Result<()> {
219        self.inner.invalidate(reason_code)
220    }
221}
222
223impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
224    for KeyValueDatabaseBackingStorage<T>
225{
226    type ReadTransaction<'l> = T::ReadTransaction<'l>;
227
228    fn next_free_task_id(&self) -> Result<TaskId> {
229        Ok(self
230            .inner
231            .get_infra_u32(META_KEY_NEXT_FREE_TASK_ID)
232            .context("Unable to read next free task id from database")?
233            .map_or(Ok(TaskId::MIN), TaskId::try_from)?)
234    }
235
236    fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>> {
237        fn get(database: &impl KeyValueDatabase) -> Result<Vec<AnyOperation>> {
238            let tx = database.begin_read_transaction()?;
239            let Some(operations) = database.get(
240                &tx,
241                KeySpace::Infra,
242                IntKey::new(META_KEY_OPERATIONS).as_ref(),
243            )?
244            else {
245                return Ok(Vec::new());
246            };
247            let operations = turbo_bincode_decode(operations.borrow())?;
248            Ok(operations)
249        }
250        get(&self.inner.database).context("Unable to read uncompleted operations from database")
251    }
252
253    fn serialize(&self, task: TaskId, data: &Vec<CachedDataItem>) -> Result<TurboBincodeBuffer> {
254        encode_task_data(task, data)
255    }
256
257    fn save_snapshot<I>(
258        &self,
259        operations: Vec<Arc<AnyOperation>>,
260        task_cache_updates: Vec<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>,
261        snapshots: Vec<I>,
262    ) -> Result<()>
263    where
264        I: Iterator<
265                Item = (
266                    TaskId,
267                    Option<TurboBincodeBuffer>,
268                    Option<TurboBincodeBuffer>,
269                ),
270            > + Send
271            + Sync,
272    {
273        let _span = tracing::info_span!("save snapshot", operations = operations.len()).entered();
274        let mut batch = self.inner.database.write_batch()?;
275
276        // these buffers should be large, because they're temporary and re-used.
277        const INITIAL_ENCODE_BUFFER_CAPACITY: usize = 1024;
278
279        // Start organizing the updates in parallel
280        match &mut batch {
281            &mut WriteBatch::Concurrent(ref batch, _) => {
282                {
283                    let _span = tracing::trace_span!("update task data").entered();
284                    process_task_data(snapshots, Some(batch))?;
285                    let span = tracing::trace_span!("flush task data").entered();
286                    parallel::try_for_each(
287                        &[KeySpace::TaskMeta, KeySpace::TaskData],
288                        |&key_space| {
289                            let _span = span.clone().entered();
290                            // Safety: We already finished all processing of the task data and task
291                            // meta
292                            unsafe { batch.flush(key_space) }
293                        },
294                    )?;
295                }
296
297                let mut next_task_id = get_next_free_task_id::<
298                    T::SerialWriteBatch<'_>,
299                    T::ConcurrentWriteBatch<'_>,
300                >(&mut WriteBatchRef::concurrent(batch))?;
301
302                {
303                    let _span = tracing::trace_span!(
304                        "update task cache",
305                        items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
306                    )
307                    .entered();
308                    let max_task_id = parallel::map_collect_owned::<_, _, Result<Vec<_>>>(
309                        task_cache_updates,
310                        |updates| {
311                            let _span = _span.clone().entered();
312                            let mut max_task_id = 0;
313
314                            // Re-use the same buffer across every `serialize_task_type` call in
315                            // this chunk. `ConcurrentWriteBatch::put` will copy the data out of
316                            // this buffer into smaller exact-sized vecs.
317                            let mut task_type_bytes =
318                                TurboBincodeBuffer::with_capacity(INITIAL_ENCODE_BUFFER_CAPACITY);
319                            for (task_type, task_id) in updates {
320                                task_type_bytes.clear();
321                                encode_task_type(&task_type, &mut task_type_bytes, Some(task_id))?;
322                                let task_id: u32 = *task_id;
323
324                                batch
325                                    .put(
326                                        KeySpace::ForwardTaskCache,
327                                        WriteBuffer::Borrowed(&task_type_bytes),
328                                        WriteBuffer::Borrowed(&task_id.to_le_bytes()),
329                                    )
330                                    .with_context(|| {
331                                        anyhow!(
332                                            "Unable to write task cache {task_type:?} => {task_id}"
333                                        )
334                                    })?;
335                                batch
336                                    .put(
337                                        KeySpace::ReverseTaskCache,
338                                        WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()),
339                                        WriteBuffer::Borrowed(&task_type_bytes),
340                                    )
341                                    .with_context(|| {
342                                        anyhow!(
343                                            "Unable to write task cache {task_id} => {task_type:?}"
344                                        )
345                                    })?;
346                                max_task_id = max_task_id.max(task_id);
347                            }
348
349                            Ok(max_task_id)
350                        },
351                    )?
352                    .into_iter()
353                    .max()
354                    .unwrap_or(0);
355                    next_task_id = next_task_id.max(max_task_id + 1);
356                }
357
358                save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
359                    &mut WriteBatchRef::concurrent(batch),
360                    next_task_id,
361                    operations,
362                )?;
363            }
364            WriteBatch::Serial(batch) => {
365                {
366                    let _span = tracing::trace_span!("update tasks").entered();
367                    let task_items =
368                        process_task_data(snapshots, None::<&T::ConcurrentWriteBatch<'_>>)?;
369                    for (task_id, meta, data) in task_items.into_iter().flatten() {
370                        let key = IntKey::new(*task_id);
371                        let key = key.as_ref();
372                        if let Some(meta) = meta {
373                            batch
374                                .put(KeySpace::TaskMeta, WriteBuffer::Borrowed(key), meta)
375                                .with_context(|| {
376                                    anyhow!("Unable to write meta items for {task_id}")
377                                })?;
378                        }
379                        if let Some(data) = data {
380                            batch
381                                .put(KeySpace::TaskData, WriteBuffer::Borrowed(key), data)
382                                .with_context(|| {
383                                    anyhow!("Unable to write data items for {task_id}")
384                                })?;
385                        }
386                    }
387                    batch.flush(KeySpace::TaskMeta)?;
388                    batch.flush(KeySpace::TaskData)?;
389                }
390
391                let mut next_task_id = get_next_free_task_id::<
392                    T::SerialWriteBatch<'_>,
393                    T::ConcurrentWriteBatch<'_>,
394                >(&mut WriteBatchRef::serial(batch))?;
395
396                {
397                    let _span = tracing::trace_span!(
398                        "update task cache",
399                        items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
400                    )
401                    .entered();
402                    // Re-use the same buffer across every `serialize_task_type` call.
403                    // `ConcurrentWriteBatch::put` will copy the data out of this buffer into
404                    // smaller exact-sized vecs.
405                    let mut task_type_bytes =
406                        TurboBincodeBuffer::with_capacity(INITIAL_ENCODE_BUFFER_CAPACITY);
407                    for (task_type, task_id) in task_cache_updates.into_iter().flatten() {
408                        encode_task_type(&task_type, &mut task_type_bytes, Some(task_id))?;
409                        let task_id = *task_id;
410
411                        batch
412                            .put(
413                                KeySpace::ForwardTaskCache,
414                                WriteBuffer::Borrowed(&task_type_bytes),
415                                WriteBuffer::Borrowed(&task_id.to_le_bytes()),
416                            )
417                            .with_context(|| {
418                                anyhow!("Unable to write task cache {task_type:?} => {task_id}")
419                            })?;
420                        batch
421                            .put(
422                                KeySpace::ReverseTaskCache,
423                                WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()),
424                                WriteBuffer::Borrowed(&task_type_bytes),
425                            )
426                            .with_context(|| {
427                                anyhow!("Unable to write task cache {task_id} => {task_type:?}")
428                            })?;
429                        next_task_id = next_task_id.max(task_id + 1);
430                    }
431                }
432
433                save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
434                    &mut WriteBatchRef::serial(batch),
435                    next_task_id,
436                    operations,
437                )?;
438            }
439        }
440
441        {
442            let _span = tracing::trace_span!("commit").entered();
443            batch
444                .commit()
445                .with_context(|| anyhow!("Unable to commit operations"))?;
446        }
447        Ok(())
448    }
449
450    fn start_read_transaction(&self) -> Option<Self::ReadTransaction<'_>> {
451        self.inner.database.begin_read_transaction().ok()
452    }
453
454    unsafe fn forward_lookup_task_cache(
455        &self,
456        tx: Option<&T::ReadTransaction<'_>>,
457        task_type: &CachedTaskType,
458    ) -> Result<Option<TaskId>> {
459        let inner = &*self.inner;
460        fn lookup<D: KeyValueDatabase>(
461            database: &D,
462            tx: &D::ReadTransaction<'_>,
463            task_type: &CachedTaskType,
464        ) -> Result<Option<TaskId>> {
465            let mut task_type_bytes = TurboBincodeBuffer::new();
466            encode_task_type(task_type, &mut task_type_bytes, None)?;
467            let Some(bytes) = database.get(tx, KeySpace::ForwardTaskCache, &task_type_bytes)?
468            else {
469                return Ok(None);
470            };
471            let bytes = bytes.borrow().try_into()?;
472            let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
473            Ok(Some(id))
474        }
475        if inner.database.is_empty() {
476            // Checking if the database is empty is a performance optimization
477            // to avoid serializing the task type.
478            return Ok(None);
479        }
480        inner
481            .with_tx(tx, |tx| lookup(&self.inner.database, tx, task_type))
482            .with_context(|| format!("Looking up task id for {task_type:?} from database failed"))
483    }
484
485    unsafe fn reverse_lookup_task_cache(
486        &self,
487        tx: Option<&T::ReadTransaction<'_>>,
488        task_id: TaskId,
489    ) -> Result<Option<Arc<CachedTaskType>>> {
490        let inner = &*self.inner;
491        fn lookup<D: KeyValueDatabase>(
492            database: &D,
493            tx: &D::ReadTransaction<'_>,
494            task_id: TaskId,
495        ) -> Result<Option<Arc<CachedTaskType>>> {
496            let Some(bytes) = database.get(
497                tx,
498                KeySpace::ReverseTaskCache,
499                IntKey::new(*task_id).as_ref(),
500            )?
501            else {
502                return Ok(None);
503            };
504            Ok(Some(turbo_bincode_decode(bytes.borrow())?))
505        }
506        inner
507            .with_tx(tx, |tx| lookup(&inner.database, tx, task_id))
508            .with_context(|| format!("Looking up task type for {task_id} from database failed"))
509    }
510
511    unsafe fn lookup_data(
512        &self,
513        tx: Option<&T::ReadTransaction<'_>>,
514        task_id: TaskId,
515        category: TaskDataCategory,
516    ) -> Result<Vec<CachedDataItem>> {
517        let inner = &*self.inner;
518        fn lookup<D: KeyValueDatabase>(
519            database: &D,
520            tx: &D::ReadTransaction<'_>,
521            task_id: TaskId,
522            category: TaskDataCategory,
523        ) -> Result<Vec<CachedDataItem>> {
524            let Some(bytes) = database.get(
525                tx,
526                match category {
527                    TaskDataCategory::Meta => KeySpace::TaskMeta,
528                    TaskDataCategory::Data => KeySpace::TaskData,
529                    TaskDataCategory::All => unreachable!(),
530                },
531                IntKey::new(*task_id).as_ref(),
532            )?
533            else {
534                return Ok(Vec::new());
535            };
536            let result: Vec<CachedDataItem> = turbo_bincode_decode(bytes.borrow())?;
537            Ok(result)
538        }
539        inner
540            .with_tx(tx, |tx| lookup(&inner.database, tx, task_id, category))
541            .with_context(|| format!("Looking up data for {task_id} from database failed"))
542    }
543
544    fn shutdown(&self) -> Result<()> {
545        self.inner.database.shutdown()
546    }
547}
548
549fn get_next_free_task_id<'a, S, C>(
550    batch: &mut WriteBatchRef<'_, 'a, S, C>,
551) -> Result<u32, anyhow::Error>
552where
553    S: SerialWriteBatch<'a>,
554    C: ConcurrentWriteBatch<'a>,
555{
556    Ok(
557        match batch.get(
558            KeySpace::Infra,
559            IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref(),
560        )? {
561            Some(bytes) => u32::from_le_bytes(Borrow::<[u8]>::borrow(&bytes).try_into()?),
562            None => 1,
563        },
564    )
565}
566
567fn save_infra<'a, S, C>(
568    batch: &mut WriteBatchRef<'_, 'a, S, C>,
569    next_task_id: u32,
570    operations: Vec<Arc<AnyOperation>>,
571) -> Result<(), anyhow::Error>
572where
573    S: SerialWriteBatch<'a>,
574    C: ConcurrentWriteBatch<'a>,
575{
576    {
577        batch
578            .put(
579                KeySpace::Infra,
580                WriteBuffer::Borrowed(IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref()),
581                WriteBuffer::Borrowed(&next_task_id.to_le_bytes()),
582            )
583            .context("Unable to write next free task id")?;
584    }
585    {
586        let _span =
587            tracing::trace_span!("update operations", operations = operations.len()).entered();
588        let operations =
589            turbo_bincode_encode(&operations).context("Unable to serialize operations")?;
590        batch
591            .put(
592                KeySpace::Infra,
593                WriteBuffer::Borrowed(IntKey::new(META_KEY_OPERATIONS).as_ref()),
594                WriteBuffer::SmallVec(operations),
595            )
596            .context("Unable to write operations")?;
597    }
598    batch.flush(KeySpace::Infra)?;
599    Ok(())
600}
601
602fn encode_task_type(
603    task_type: &CachedTaskType,
604    buffer: &mut TurboBincodeBuffer,
605    task_id: Option<TaskId>,
606) -> Result<()> {
607    fn encode_once_into(
608        task_type: &CachedTaskType,
609        buffer: &mut TurboBincodeBuffer,
610        task_id: Option<TaskId>,
611    ) -> Result<()> {
612        turbo_bincode_encode_into(task_type, buffer).with_context(|| {
613            if let Some(task_id) = task_id {
614                format!("Unable to serialize task {task_id} cache key {task_type:?}")
615            } else {
616                format!("Unable to serialize task cache key {task_type:?}")
617            }
618        })
619    }
620
621    debug_assert!(buffer.is_empty());
622    encode_once_into(task_type, buffer, task_id)?;
623
624    if cfg!(feature = "verify_serialization") {
625        macro_rules! println_and_panic {
626            ($($tt:tt)*) => {
627                println!($($tt)*);
628                panic!($($tt)*);
629            };
630        }
631        let deserialize: Result<CachedTaskType, _> = turbo_bincode_decode(buffer);
632        match deserialize {
633            Err(err) => {
634                println_and_panic!("Task type would not be deserializable:\n{err:?}");
635            }
636            Ok(task_type2) => {
637                if &task_type2 != task_type {
638                    println_and_panic!(
639                        "Task type would not round-trip {task_id:?}:\noriginal: \
640                         {task_type:#?}\nround-tripped: {task_type2:#?}"
641                    );
642                }
643                let mut buffer2 = TurboBincodeBuffer::new();
644                match encode_once_into(&task_type2, &mut buffer2, task_id) {
645                    Err(err) => {
646                        println_and_panic!(
647                            "Task type would not be serializable the second time:\n{err:?}"
648                        );
649                    }
650                    Ok(()) => {
651                        if buffer2 != *buffer {
652                            println_and_panic!(
653                                "Task type would not serialize to the same bytes the second time \
654                                 {task_id:?}:\noriginal: {:x?}\nsecond: {:x?}\n{task_type2:#?}",
655                                buffer,
656                                buffer2
657                            );
658                        }
659                    }
660                }
661            }
662        }
663    }
664
665    Ok(())
666}
667
668type SerializedTasks = Vec<
669    Vec<(
670        TaskId,
671        Option<WriteBuffer<'static>>,
672        Option<WriteBuffer<'static>>,
673    )>,
674>;
675
676fn process_task_data<'a, B: ConcurrentWriteBatch<'a> + Send + Sync, I>(
677    tasks: Vec<I>,
678    batch: Option<&B>,
679) -> Result<SerializedTasks>
680where
681    I: Iterator<
682            Item = (
683                TaskId,
684                Option<TurboBincodeBuffer>,
685                Option<TurboBincodeBuffer>,
686            ),
687        > + Send
688        + Sync,
689{
690    parallel::map_collect_owned::<_, _, Result<Vec<_>>>(tasks, |tasks| {
691        let mut result = Vec::new();
692        for (task_id, meta, data) in tasks {
693            if let Some(batch) = batch {
694                let key = IntKey::new(*task_id);
695                let key = key.as_ref();
696                if let Some(meta) = meta {
697                    batch.put(
698                        KeySpace::TaskMeta,
699                        WriteBuffer::Borrowed(key),
700                        WriteBuffer::SmallVec(meta),
701                    )?;
702                }
703                if let Some(data) = data {
704                    batch.put(
705                        KeySpace::TaskData,
706                        WriteBuffer::Borrowed(key),
707                        WriteBuffer::SmallVec(data),
708                    )?;
709                }
710            } else {
711                // Store the new task data
712                result.push((
713                    task_id,
714                    meta.map(WriteBuffer::SmallVec),
715                    data.map(WriteBuffer::SmallVec),
716                ));
717            }
718        }
719
720        Ok(result)
721    })
722}
723
724fn encode_task_data(task: TaskId, data: &Vec<CachedDataItem>) -> Result<TurboBincodeBuffer> {
725    let orig_result = turbo_bincode_encode(data);
726    if !cfg!(feature = "verify_serialization")
727        && let Ok(value) = orig_result
728    {
729        return Ok(value);
730    }
731
732    let mut error = Ok(());
733    let mut filtered_data = data.clone();
734    filtered_data.retain(|item| match turbo_bincode_encode(&item) {
735        Ok(buf) => {
736            if cfg!(feature = "verify_serialization") {
737                let deserialized = turbo_bincode_decode::<CachedDataItem>(&buf);
738                if let Err(err) = deserialized {
739                    println!("Data item would not be deserializable {task}: {err:?}\n{item:?}");
740                    return false;
741                }
742            }
743            true
744        }
745        Err(err) => {
746            if item.is_optional() {
747                if cfg!(feature = "verify_serialization") {
748                    println!(
749                        "Skipping non-encodable optional item for {task}: {item:?} due to {err}"
750                    );
751                }
752            } else {
753                error =
754                    Err(err).context(format!("Unable to encode data item for {task}: {item:?}"));
755            }
756            false
757        }
758    });
759    error?;
760
761    (if filtered_data.len() == data.len() {
762        orig_result
763    } else {
764        turbo_bincode_encode(&filtered_data)
765    })
766    .with_context(|| format!("Unable to serialize data items for {task}: {filtered_data:#?}"))
767}