turbo_tasks_backend/
kv_backing_storage.rs

1use std::{
2    borrow::Borrow,
3    env,
4    path::PathBuf,
5    sync::{Arc, LazyLock, Mutex, PoisonError, Weak},
6};
7
8use anyhow::{Context, Result, anyhow};
9use serde::{Deserialize, Serialize};
10use smallvec::SmallVec;
11use turbo_tasks::{
12    SessionId, TaskId,
13    backend::CachedTaskType,
14    panic_hooks::{PanicHookGuard, register_panic_hook},
15    parallel,
16};
17
18use crate::{
19    GitVersionInfo,
20    backend::{AnyOperation, TaskDataCategory},
21    backing_storage::{BackingStorage, BackingStorageSealed},
22    data::CachedDataItem,
23    database::{
24        db_invalidation::{StartupCacheState, check_db_invalidation_and_cleanup, invalidate_db},
25        db_versioning::handle_db_versioning,
26        key_value_database::{KeySpace, KeyValueDatabase},
27        write_batch::{
28            BaseWriteBatch, ConcurrentWriteBatch, SerialWriteBatch, WriteBatch, WriteBatchRef,
29            WriteBuffer,
30        },
31    },
32    db_invalidation::invalidation_reasons,
33    utils::chunked_vec::ChunkedVec,
34};
35
36const POT_CONFIG: pot::Config = pot::Config::new().compatibility(pot::Compatibility::V4);
37
38fn pot_serialize_small_vec<T: Serialize>(value: &T) -> pot::Result<SmallVec<[u8; 16]>> {
39    struct SmallVecWrite<'l>(&'l mut SmallVec<[u8; 16]>);
40    impl std::io::Write for SmallVecWrite<'_> {
41        #[inline]
42        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
43            self.0.extend_from_slice(buf);
44            Ok(buf.len())
45        }
46
47        #[inline]
48        fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
49            self.0.extend_from_slice(buf);
50            Ok(())
51        }
52
53        #[inline]
54        fn flush(&mut self) -> std::io::Result<()> {
55            Ok(())
56        }
57    }
58
59    let mut output = SmallVec::new();
60    POT_CONFIG.serialize_into(value, SmallVecWrite(&mut output))?;
61    Ok(output)
62}
63
64fn pot_ser_symbol_map() -> pot::ser::SymbolMap {
65    pot::ser::SymbolMap::new().with_compatibility(pot::Compatibility::V4)
66}
67
68fn pot_de_symbol_list<'l>() -> pot::de::SymbolList<'l> {
69    pot::de::SymbolList::new()
70}
71
72const META_KEY_OPERATIONS: u32 = 0;
73const META_KEY_NEXT_FREE_TASK_ID: u32 = 1;
74const META_KEY_SESSION_ID: u32 = 2;
75
76struct IntKey([u8; 4]);
77
78impl IntKey {
79    fn new(value: u32) -> Self {
80        Self(value.to_le_bytes())
81    }
82}
83
84impl AsRef<[u8]> for IntKey {
85    fn as_ref(&self) -> &[u8] {
86        &self.0
87    }
88}
89
90fn as_u32(bytes: impl Borrow<[u8]>) -> Result<u32> {
91    let n = u32::from_le_bytes(bytes.borrow().try_into()?);
92    Ok(n)
93}
94
95// We want to invalidate the cache on panic for most users, but this is a band-aid to underlying
96// problems in turbo-tasks.
97//
98// If we invalidate the cache upon panic and it "fixes" the issue upon restart, users typically
99// won't report bugs to us, and we'll never find root-causes for these problems.
100//
101// These overrides let us avoid the cache invalidation / error suppression within Vercel so that we
102// feel these pain points and fix the root causes of bugs.
103fn should_invalidate_on_panic() -> bool {
104    fn env_is_falsy(key: &str) -> bool {
105        env::var_os(key)
106            .is_none_or(|value| ["".as_ref(), "0".as_ref(), "false".as_ref()].contains(&&*value))
107    }
108    static SHOULD_INVALIDATE: LazyLock<bool> = LazyLock::new(|| {
109        env_is_falsy("TURBO_ENGINE_SKIP_INVALIDATE_ON_PANIC") && env_is_falsy("__NEXT_TEST_MODE")
110    });
111    *SHOULD_INVALIDATE
112}
113
114pub struct KeyValueDatabaseBackingStorageInner<T: KeyValueDatabase> {
115    database: T,
116    /// Used when calling [`BackingStorage::invalidate`]. Can be `None` in the memory-only/no-op
117    /// storage case.
118    base_path: Option<PathBuf>,
119    /// Used to skip calling [`invalidate_db`] when the database has already been invalidated.
120    invalidated: Mutex<bool>,
121    /// We configure a panic hook to invalidate the cache. This guard cleans up our panic hook upon
122    /// drop.
123    _panic_hook_guard: Option<PanicHookGuard>,
124}
125
126pub struct KeyValueDatabaseBackingStorage<T: KeyValueDatabase> {
127    // wrapped so that `register_panic_hook` can hold a weak reference to `inner`.
128    inner: Arc<KeyValueDatabaseBackingStorageInner<T>>,
129}
130
131/// A wrapper type used by [`crate::turbo_backing_storage`] and [`crate::noop_backing_storage`].
132///
133/// Wraps a low-level key-value database into a higher-level [`BackingStorage`] type.
134impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorage<T> {
135    pub(crate) fn new_in_memory(database: T) -> Self {
136        Self {
137            inner: Arc::new(KeyValueDatabaseBackingStorageInner {
138                database,
139                base_path: None,
140                invalidated: Mutex::new(false),
141                _panic_hook_guard: None,
142            }),
143        }
144    }
145
146    /// Handles boilerplate logic for an on-disk persisted database with versioning.
147    ///
148    /// - Creates a directory per version, with a maximum number of old versions and performs
149    ///   automatic cleanup of old versions.
150    /// - Checks for a database invalidation marker file, and cleans up the database as needed.
151    /// - [Registers a dynamic panic hook][turbo_tasks::panic_hooks] to invalidate the database upon
152    ///   a panic. This invalidates the database using [`invalidation_reasons::PANIC`].
153    ///
154    /// Along with returning a [`KeyValueDatabaseBackingStorage`], this returns a
155    /// [`StartupCacheState`], which can be used by the application for logging information to the
156    /// user or telemetry about the cache.
157    pub(crate) fn open_versioned_on_disk(
158        base_path: PathBuf,
159        version_info: &GitVersionInfo,
160        is_ci: bool,
161        database: impl FnOnce(PathBuf) -> Result<T>,
162    ) -> Result<(Self, StartupCacheState)>
163    where
164        T: Send + Sync + 'static,
165    {
166        let startup_cache_state = check_db_invalidation_and_cleanup(&base_path)
167            .context("Failed to check database invalidation and cleanup")?;
168        let versioned_path = handle_db_versioning(&base_path, version_info, is_ci)
169            .context("Failed to handle database versioning")?;
170        let database = (database)(versioned_path).context("Failed to open database")?;
171        let backing_storage = Self {
172            inner: Arc::new_cyclic(
173                move |weak_inner: &Weak<KeyValueDatabaseBackingStorageInner<T>>| {
174                    let panic_hook_guard = if should_invalidate_on_panic() {
175                        let weak_inner = weak_inner.clone();
176                        Some(register_panic_hook(Box::new(move |_| {
177                            let Some(inner) = weak_inner.upgrade() else {
178                                return;
179                            };
180                            // If a panic happened that must mean something deep inside of turbopack
181                            // or turbo-tasks failed, and it may be hard to recover. We don't want
182                            // the cache to stick around, as that may persist bugs. Make a
183                            // best-effort attempt to invalidate the database (ignoring failures).
184                            let _ = inner.invalidate(invalidation_reasons::PANIC);
185                        })))
186                    } else {
187                        None
188                    };
189                    KeyValueDatabaseBackingStorageInner {
190                        database,
191                        base_path: Some(base_path),
192                        invalidated: Mutex::new(false),
193                        _panic_hook_guard: panic_hook_guard,
194                    }
195                },
196            ),
197        };
198        Ok((backing_storage, startup_cache_state))
199    }
200}
201
202impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorageInner<T> {
203    fn with_tx<R>(
204        &self,
205        tx: Option<&T::ReadTransaction<'_>>,
206        f: impl FnOnce(&T::ReadTransaction<'_>) -> Result<R>,
207    ) -> Result<R> {
208        if let Some(tx) = tx {
209            f(tx)
210        } else {
211            let tx = self.database.begin_read_transaction()?;
212            let r = f(&tx)?;
213            drop(tx);
214            Ok(r)
215        }
216    }
217
218    fn invalidate(&self, reason_code: &str) -> Result<()> {
219        // `base_path` can be `None` for a `NoopKvDb`
220        if let Some(base_path) = &self.base_path {
221            // Invalidation could happen frequently if there's a bunch of panics. We only need to
222            // invalidate once, so grab a lock.
223            let mut invalidated_guard = self
224                .invalidated
225                .lock()
226                .unwrap_or_else(PoisonError::into_inner);
227            if *invalidated_guard {
228                return Ok(());
229            }
230            // Invalidate first, as it's a very fast atomic operation. `prevent_writes` is allowed
231            // to be slower (e.g. wait for a lock) and is allowed to corrupt the database with
232            // partial writes.
233            invalidate_db(base_path, reason_code)?;
234            self.database.prevent_writes();
235            // Avoid redundant invalidations from future panics
236            *invalidated_guard = true;
237        }
238        Ok(())
239    }
240
241    /// Used to read the previous session id and the next free task ID from the database.
242    fn get_infra_u32(&self, key: u32) -> Result<Option<u32>> {
243        let tx = self.database.begin_read_transaction()?;
244        self.database
245            .get(&tx, KeySpace::Infra, IntKey::new(key).as_ref())?
246            .map(as_u32)
247            .transpose()
248    }
249}
250
251impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
252    for KeyValueDatabaseBackingStorage<T>
253{
254    fn invalidate(&self, reason_code: &str) -> Result<()> {
255        self.inner.invalidate(reason_code)
256    }
257}
258
259impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
260    for KeyValueDatabaseBackingStorage<T>
261{
262    type ReadTransaction<'l> = T::ReadTransaction<'l>;
263
264    fn next_free_task_id(&self) -> Result<TaskId> {
265        Ok(self
266            .inner
267            .get_infra_u32(META_KEY_NEXT_FREE_TASK_ID)
268            .context("Unable to read next free task id from database")?
269            .map_or(Ok(TaskId::MIN), TaskId::try_from)?)
270    }
271
272    fn next_session_id(&self) -> Result<SessionId> {
273        Ok(SessionId::try_from(
274            self.inner
275                .get_infra_u32(META_KEY_SESSION_ID)
276                .context("Unable to read session id from database")?
277                .unwrap_or(0)
278                + 1,
279        )?)
280    }
281
282    fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>> {
283        fn get(database: &impl KeyValueDatabase) -> Result<Vec<AnyOperation>> {
284            let tx = database.begin_read_transaction()?;
285            let Some(operations) = database.get(
286                &tx,
287                KeySpace::Infra,
288                IntKey::new(META_KEY_OPERATIONS).as_ref(),
289            )?
290            else {
291                return Ok(Vec::new());
292            };
293            let operations = deserialize_with_good_error(operations.borrow())?;
294            Ok(operations)
295        }
296        get(&self.inner.database).context("Unable to read uncompleted operations from database")
297    }
298
299    fn serialize(&self, task: TaskId, data: &Vec<CachedDataItem>) -> Result<SmallVec<[u8; 16]>> {
300        serialize(task, data)
301    }
302
303    fn save_snapshot<I>(
304        &self,
305        session_id: SessionId,
306        operations: Vec<Arc<AnyOperation>>,
307        task_cache_updates: Vec<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>,
308        snapshots: Vec<I>,
309    ) -> Result<()>
310    where
311        I: Iterator<
312                Item = (
313                    TaskId,
314                    Option<SmallVec<[u8; 16]>>,
315                    Option<SmallVec<[u8; 16]>>,
316                ),
317            > + Send
318            + Sync,
319    {
320        let _span = tracing::trace_span!("save snapshot", session_id = ?session_id, operations = operations.len());
321        let mut batch = self.inner.database.write_batch()?;
322
323        // Start organizing the updates in parallel
324        match &mut batch {
325            &mut WriteBatch::Concurrent(ref batch, _) => {
326                {
327                    let _span = tracing::trace_span!("update task data").entered();
328                    process_task_data(snapshots, Some(batch))?;
329                    let span = tracing::trace_span!("flush task data").entered();
330                    parallel::try_for_each(
331                        &[KeySpace::TaskMeta, KeySpace::TaskData],
332                        |&key_space| {
333                            let _span = span.clone().entered();
334                            // Safety: We already finished all processing of the task data and task
335                            // meta
336                            unsafe { batch.flush(key_space) }
337                        },
338                    )?;
339                }
340
341                let mut next_task_id = get_next_free_task_id::<
342                    T::SerialWriteBatch<'_>,
343                    T::ConcurrentWriteBatch<'_>,
344                >(&mut WriteBatchRef::concurrent(batch))?;
345
346                {
347                    let _span = tracing::trace_span!(
348                        "update task cache",
349                        items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
350                    )
351                    .entered();
352                    let result = parallel::map_collect_owned::<_, _, Result<Vec<_>>>(
353                        task_cache_updates,
354                        |updates| {
355                            let _span = _span.clone().entered();
356                            let mut max_task_id = 0;
357
358                            let mut task_type_bytes = Vec::new();
359                            for (task_type, task_id) in updates {
360                                let task_id: u32 = *task_id;
361                                serialize_task_type(&task_type, &mut task_type_bytes, task_id)?;
362
363                                batch
364                                    .put(
365                                        KeySpace::ForwardTaskCache,
366                                        WriteBuffer::Borrowed(&task_type_bytes),
367                                        WriteBuffer::Borrowed(&task_id.to_le_bytes()),
368                                    )
369                                    .with_context(|| {
370                                        anyhow!(
371                                            "Unable to write task cache {task_type:?} => {task_id}"
372                                        )
373                                    })?;
374                                batch
375                                    .put(
376                                        KeySpace::ReverseTaskCache,
377                                        WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()),
378                                        WriteBuffer::Borrowed(&task_type_bytes),
379                                    )
380                                    .with_context(|| {
381                                        anyhow!(
382                                            "Unable to write task cache {task_id} => {task_type:?}"
383                                        )
384                                    })?;
385                                max_task_id = max_task_id.max(task_id + 1);
386                            }
387
388                            Ok(max_task_id)
389                        },
390                    )?
391                    .into_iter()
392                    .max()
393                    .unwrap_or(0);
394                    next_task_id = next_task_id.max(result);
395                }
396
397                save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
398                    &mut WriteBatchRef::concurrent(batch),
399                    next_task_id,
400                    session_id,
401                    operations,
402                )?;
403            }
404            WriteBatch::Serial(batch) => {
405                {
406                    let _span = tracing::trace_span!("update tasks").entered();
407                    let task_items =
408                        process_task_data(snapshots, None::<&T::ConcurrentWriteBatch<'_>>)?;
409                    for (task_id, meta, data) in task_items.into_iter().flatten() {
410                        let key = IntKey::new(*task_id);
411                        let key = key.as_ref();
412                        if let Some(meta) = meta {
413                            batch
414                                .put(KeySpace::TaskMeta, WriteBuffer::Borrowed(key), meta)
415                                .with_context(|| {
416                                    anyhow!("Unable to write meta items for {task_id}")
417                                })?;
418                        }
419                        if let Some(data) = data {
420                            batch
421                                .put(KeySpace::TaskData, WriteBuffer::Borrowed(key), data)
422                                .with_context(|| {
423                                    anyhow!("Unable to write data items for {task_id}")
424                                })?;
425                        }
426                    }
427                    batch.flush(KeySpace::TaskMeta)?;
428                    batch.flush(KeySpace::TaskData)?;
429                }
430
431                let mut next_task_id = get_next_free_task_id::<
432                    T::SerialWriteBatch<'_>,
433                    T::ConcurrentWriteBatch<'_>,
434                >(&mut WriteBatchRef::serial(batch))?;
435
436                {
437                    let _span = tracing::trace_span!(
438                        "update task cache",
439                        items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
440                    )
441                    .entered();
442                    let mut task_type_bytes = Vec::new();
443                    for (task_type, task_id) in task_cache_updates.into_iter().flatten() {
444                        let task_id = *task_id;
445                        serialize_task_type(&task_type, &mut task_type_bytes, task_id)?;
446
447                        batch
448                            .put(
449                                KeySpace::ForwardTaskCache,
450                                WriteBuffer::Borrowed(&task_type_bytes),
451                                WriteBuffer::Borrowed(&task_id.to_le_bytes()),
452                            )
453                            .with_context(|| {
454                                anyhow!("Unable to write task cache {task_type:?} => {task_id}")
455                            })?;
456                        batch
457                            .put(
458                                KeySpace::ReverseTaskCache,
459                                WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()),
460                                WriteBuffer::Borrowed(&task_type_bytes),
461                            )
462                            .with_context(|| {
463                                anyhow!("Unable to write task cache {task_id} => {task_type:?}")
464                            })?;
465                        next_task_id = next_task_id.max(task_id + 1);
466                    }
467                }
468
469                save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
470                    &mut WriteBatchRef::serial(batch),
471                    next_task_id,
472                    session_id,
473                    operations,
474                )?;
475            }
476        }
477
478        {
479            let _span = tracing::trace_span!("commit").entered();
480            batch
481                .commit()
482                .with_context(|| anyhow!("Unable to commit operations"))?;
483        }
484        Ok(())
485    }
486
487    fn start_read_transaction(&self) -> Option<Self::ReadTransaction<'_>> {
488        self.inner.database.begin_read_transaction().ok()
489    }
490
491    unsafe fn forward_lookup_task_cache(
492        &self,
493        tx: Option<&T::ReadTransaction<'_>>,
494        task_type: &CachedTaskType,
495    ) -> Result<Option<TaskId>> {
496        let inner = &*self.inner;
497        fn lookup<D: KeyValueDatabase>(
498            database: &D,
499            tx: &D::ReadTransaction<'_>,
500            task_type: &CachedTaskType,
501        ) -> Result<Option<TaskId>> {
502            let task_type = POT_CONFIG.serialize(task_type)?;
503            let Some(bytes) = database.get(tx, KeySpace::ForwardTaskCache, &task_type)? else {
504                return Ok(None);
505            };
506            let bytes = bytes.borrow().try_into()?;
507            let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
508            Ok(Some(id))
509        }
510        if inner.database.is_empty() {
511            // Checking if the database is empty is a performance optimization
512            // to avoid serializing the task type.
513            return Ok(None);
514        }
515        inner
516            .with_tx(tx, |tx| lookup(&self.inner.database, tx, task_type))
517            .with_context(|| format!("Looking up task id for {task_type:?} from database failed"))
518    }
519
520    unsafe fn reverse_lookup_task_cache(
521        &self,
522        tx: Option<&T::ReadTransaction<'_>>,
523        task_id: TaskId,
524    ) -> Result<Option<Arc<CachedTaskType>>> {
525        let inner = &*self.inner;
526        fn lookup<D: KeyValueDatabase>(
527            database: &D,
528            tx: &D::ReadTransaction<'_>,
529            task_id: TaskId,
530        ) -> Result<Option<Arc<CachedTaskType>>> {
531            let Some(bytes) = database.get(
532                tx,
533                KeySpace::ReverseTaskCache,
534                IntKey::new(*task_id).as_ref(),
535            )?
536            else {
537                return Ok(None);
538            };
539            Ok(Some(deserialize_with_good_error(bytes.borrow())?))
540        }
541        inner
542            .with_tx(tx, |tx| lookup(&inner.database, tx, task_id))
543            .with_context(|| format!("Looking up task type for {task_id} from database failed"))
544    }
545
546    unsafe fn lookup_data(
547        &self,
548        tx: Option<&T::ReadTransaction<'_>>,
549        task_id: TaskId,
550        category: TaskDataCategory,
551    ) -> Result<Vec<CachedDataItem>> {
552        let inner = &*self.inner;
553        fn lookup<D: KeyValueDatabase>(
554            database: &D,
555            tx: &D::ReadTransaction<'_>,
556            task_id: TaskId,
557            category: TaskDataCategory,
558        ) -> Result<Vec<CachedDataItem>> {
559            let Some(bytes) = database.get(
560                tx,
561                match category {
562                    TaskDataCategory::Meta => KeySpace::TaskMeta,
563                    TaskDataCategory::Data => KeySpace::TaskData,
564                    TaskDataCategory::All => unreachable!(),
565                },
566                IntKey::new(*task_id).as_ref(),
567            )?
568            else {
569                return Ok(Vec::new());
570            };
571            let result: Vec<CachedDataItem> = deserialize_with_good_error(bytes.borrow())?;
572            Ok(result)
573        }
574        inner
575            .with_tx(tx, |tx| lookup(&inner.database, tx, task_id, category))
576            .with_context(|| format!("Looking up data for {task_id} from database failed"))
577    }
578
579    fn shutdown(&self) -> Result<()> {
580        self.inner.database.shutdown()
581    }
582}
583
584fn get_next_free_task_id<'a, S, C>(
585    batch: &mut WriteBatchRef<'_, 'a, S, C>,
586) -> Result<u32, anyhow::Error>
587where
588    S: SerialWriteBatch<'a>,
589    C: ConcurrentWriteBatch<'a>,
590{
591    Ok(
592        match batch.get(
593            KeySpace::Infra,
594            IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref(),
595        )? {
596            Some(bytes) => u32::from_le_bytes(Borrow::<[u8]>::borrow(&bytes).try_into()?),
597            None => 1,
598        },
599    )
600}
601
602fn save_infra<'a, S, C>(
603    batch: &mut WriteBatchRef<'_, 'a, S, C>,
604    next_task_id: u32,
605    session_id: SessionId,
606    operations: Vec<Arc<AnyOperation>>,
607) -> Result<(), anyhow::Error>
608where
609    S: SerialWriteBatch<'a>,
610    C: ConcurrentWriteBatch<'a>,
611{
612    {
613        batch
614            .put(
615                KeySpace::Infra,
616                WriteBuffer::Borrowed(IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref()),
617                WriteBuffer::Borrowed(&next_task_id.to_le_bytes()),
618            )
619            .with_context(|| anyhow!("Unable to write next free task id"))?;
620    }
621    {
622        let _span = tracing::trace_span!("update session id", session_id = ?session_id).entered();
623        batch
624            .put(
625                KeySpace::Infra,
626                WriteBuffer::Borrowed(IntKey::new(META_KEY_SESSION_ID).as_ref()),
627                WriteBuffer::Borrowed(&session_id.to_le_bytes()),
628            )
629            .with_context(|| anyhow!("Unable to write next session id"))?;
630    }
631    {
632        let _span =
633            tracing::trace_span!("update operations", operations = operations.len()).entered();
634        let operations = pot_serialize_small_vec(&operations)
635            .with_context(|| anyhow!("Unable to serialize operations"))?;
636        batch
637            .put(
638                KeySpace::Infra,
639                WriteBuffer::Borrowed(IntKey::new(META_KEY_OPERATIONS).as_ref()),
640                WriteBuffer::SmallVec(operations),
641            )
642            .with_context(|| anyhow!("Unable to write operations"))?;
643    }
644    batch.flush(KeySpace::Infra)?;
645    Ok(())
646}
647
648fn serialize_task_type(
649    task_type: &Arc<CachedTaskType>,
650    mut task_type_bytes: &mut Vec<u8>,
651    task_id: u32,
652) -> Result<()> {
653    task_type_bytes.clear();
654    POT_CONFIG
655        .serialize_into(&**task_type, &mut task_type_bytes)
656        .with_context(|| anyhow!("Unable to serialize task {task_id} cache key {task_type:?}"))?;
657    #[cfg(feature = "verify_serialization")]
658    {
659        let deserialize: Result<CachedTaskType, _> = serde_path_to_error::deserialize(
660            &mut pot_de_symbol_list().deserializer_for_slice(&*task_type_bytes)?,
661        );
662        if let Err(err) = deserialize {
663            println!("Task type would not be deserializable {task_id}: {err:?}\n{task_type:#?}");
664            panic!("Task type would not be deserializable {task_id}: {err:?}");
665        }
666    }
667    Ok(())
668}
669
670type SerializedTasks = Vec<
671    Vec<(
672        TaskId,
673        Option<WriteBuffer<'static>>,
674        Option<WriteBuffer<'static>>,
675    )>,
676>;
677
678fn process_task_data<'a, B: ConcurrentWriteBatch<'a> + Send + Sync, I>(
679    tasks: Vec<I>,
680    batch: Option<&B>,
681) -> Result<SerializedTasks>
682where
683    I: Iterator<
684            Item = (
685                TaskId,
686                Option<SmallVec<[u8; 16]>>,
687                Option<SmallVec<[u8; 16]>>,
688            ),
689        > + Send
690        + Sync,
691{
692    parallel::map_collect_owned::<_, _, Result<Vec<_>>>(tasks, |tasks| {
693        let mut result = Vec::new();
694        for (task_id, meta, data) in tasks {
695            if let Some(batch) = batch {
696                let key = IntKey::new(*task_id);
697                let key = key.as_ref();
698                if let Some(meta) = meta {
699                    batch.put(
700                        KeySpace::TaskMeta,
701                        WriteBuffer::Borrowed(key),
702                        WriteBuffer::SmallVec(meta),
703                    )?;
704                }
705                if let Some(data) = data {
706                    batch.put(
707                        KeySpace::TaskData,
708                        WriteBuffer::Borrowed(key),
709                        WriteBuffer::SmallVec(data),
710                    )?;
711                }
712            } else {
713                // Store the new task data
714                result.push((
715                    task_id,
716                    meta.map(WriteBuffer::SmallVec),
717                    data.map(WriteBuffer::SmallVec),
718                ));
719            }
720        }
721
722        Ok(result)
723    })
724}
725
726fn serialize(task: TaskId, data: &Vec<CachedDataItem>) -> Result<SmallVec<[u8; 16]>> {
727    Ok(match pot_serialize_small_vec(data) {
728        #[cfg(not(feature = "verify_serialization"))]
729        Ok(value) => value,
730        _ => {
731            let mut error = Ok(());
732            let mut data = data.clone();
733            data.retain(|item| {
734                let mut buf = Vec::<u8>::new();
735                let mut symbol_map = pot_ser_symbol_map();
736                let mut serializer = symbol_map.serializer_for(&mut buf).unwrap();
737                if let Err(err) = serde_path_to_error::serialize(&item, &mut serializer) {
738                    if item.is_optional() {
739                        #[cfg(feature = "verify_serialization")]
740                        println!("Skipping non-serializable optional item for {task}: {item:?}");
741                    } else {
742                        error = Err(err).context({
743                            anyhow!("Unable to serialize data item for {task}: {item:?}")
744                        });
745                    }
746                    false
747                } else {
748                    #[cfg(feature = "verify_serialization")]
749                    {
750                        let deserialize: Result<CachedDataItem, _> =
751                            serde_path_to_error::deserialize(
752                                &mut pot_de_symbol_list().deserializer_for_slice(&buf).unwrap(),
753                            );
754                        if let Err(err) = deserialize {
755                            println!(
756                                "Data item would not be deserializable {task}: {err:?}\n{item:?}"
757                            );
758                            return false;
759                        }
760                    }
761                    true
762                }
763            });
764            error?;
765
766            pot_serialize_small_vec(&data)
767                .with_context(|| anyhow!("Unable to serialize data items for {task}: {data:#?}"))?
768        }
769    })
770}
771
772fn deserialize_with_good_error<'de, T: Deserialize<'de>>(data: &'de [u8]) -> Result<T> {
773    match POT_CONFIG.deserialize(data) {
774        Ok(value) => Ok(value),
775        Err(error) => serde_path_to_error::deserialize::<'_, _, T>(
776            &mut pot_de_symbol_list().deserializer_for_slice(data)?,
777        )
778        .map_err(anyhow::Error::from)
779        .and(Err(error.into()))
780        .context("Deserialization failed"),
781    }
782}