1use std::{
2    borrow::Borrow,
3    env,
4    path::PathBuf,
5    sync::{Arc, LazyLock, Mutex, PoisonError, Weak},
6};
7
8use anyhow::{Context, Result, anyhow};
9use serde::{Deserialize, Serialize};
10use smallvec::SmallVec;
11use turbo_tasks::{
12    SessionId, TaskId,
13    backend::CachedTaskType,
14    panic_hooks::{PanicHookGuard, register_panic_hook},
15    parallel,
16};
17
18use crate::{
19    GitVersionInfo,
20    backend::{AnyOperation, TaskDataCategory},
21    backing_storage::{BackingStorage, BackingStorageSealed},
22    data::CachedDataItem,
23    database::{
24        db_invalidation::{StartupCacheState, check_db_invalidation_and_cleanup, invalidate_db},
25        db_versioning::handle_db_versioning,
26        key_value_database::{KeySpace, KeyValueDatabase},
27        write_batch::{
28            BaseWriteBatch, ConcurrentWriteBatch, SerialWriteBatch, WriteBatch, WriteBatchRef,
29            WriteBuffer,
30        },
31    },
32    db_invalidation::invalidation_reasons,
33    utils::chunked_vec::ChunkedVec,
34};
35
36const POT_CONFIG: pot::Config = pot::Config::new().compatibility(pot::Compatibility::V4);
37
38fn pot_serialize_small_vec<T: Serialize>(value: &T) -> pot::Result<SmallVec<[u8; 16]>> {
39    struct SmallVecWrite<'l>(&'l mut SmallVec<[u8; 16]>);
40    impl std::io::Write for SmallVecWrite<'_> {
41        #[inline]
42        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
43            self.0.extend_from_slice(buf);
44            Ok(buf.len())
45        }
46
47        #[inline]
48        fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
49            self.0.extend_from_slice(buf);
50            Ok(())
51        }
52
53        #[inline]
54        fn flush(&mut self) -> std::io::Result<()> {
55            Ok(())
56        }
57    }
58
59    let mut output = SmallVec::new();
60    POT_CONFIG.serialize_into(value, SmallVecWrite(&mut output))?;
61    Ok(output)
62}
63
64fn pot_ser_symbol_map() -> pot::ser::SymbolMap {
65    pot::ser::SymbolMap::new().with_compatibility(pot::Compatibility::V4)
66}
67
68fn pot_de_symbol_list<'l>() -> pot::de::SymbolList<'l> {
69    pot::de::SymbolList::new()
70}
71
72const META_KEY_OPERATIONS: u32 = 0;
73const META_KEY_NEXT_FREE_TASK_ID: u32 = 1;
74const META_KEY_SESSION_ID: u32 = 2;
75
76struct IntKey([u8; 4]);
77
78impl IntKey {
79    fn new(value: u32) -> Self {
80        Self(value.to_le_bytes())
81    }
82}
83
84impl AsRef<[u8]> for IntKey {
85    fn as_ref(&self) -> &[u8] {
86        &self.0
87    }
88}
89
90fn as_u32(bytes: impl Borrow<[u8]>) -> Result<u32> {
91    let n = u32::from_le_bytes(bytes.borrow().try_into()?);
92    Ok(n)
93}
94
95fn should_invalidate_on_panic() -> bool {
104    fn env_is_falsy(key: &str) -> bool {
105        env::var_os(key)
106            .is_none_or(|value| ["".as_ref(), "0".as_ref(), "false".as_ref()].contains(&&*value))
107    }
108    static SHOULD_INVALIDATE: LazyLock<bool> = LazyLock::new(|| {
109        env_is_falsy("TURBO_ENGINE_SKIP_INVALIDATE_ON_PANIC") && env_is_falsy("__NEXT_TEST_MODE")
110    });
111    *SHOULD_INVALIDATE
112}
113
114pub struct KeyValueDatabaseBackingStorageInner<T: KeyValueDatabase> {
115    database: T,
116    base_path: Option<PathBuf>,
119    invalidated: Mutex<bool>,
121    _panic_hook_guard: Option<PanicHookGuard>,
124}
125
126pub struct KeyValueDatabaseBackingStorage<T: KeyValueDatabase> {
127    inner: Arc<KeyValueDatabaseBackingStorageInner<T>>,
129}
130
131impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorage<T> {
135    pub(crate) fn new_in_memory(database: T) -> Self {
136        Self {
137            inner: Arc::new(KeyValueDatabaseBackingStorageInner {
138                database,
139                base_path: None,
140                invalidated: Mutex::new(false),
141                _panic_hook_guard: None,
142            }),
143        }
144    }
145
146    pub(crate) fn open_versioned_on_disk(
158        base_path: PathBuf,
159        version_info: &GitVersionInfo,
160        is_ci: bool,
161        database: impl FnOnce(PathBuf) -> Result<T>,
162    ) -> Result<(Self, StartupCacheState)>
163    where
164        T: Send + Sync + 'static,
165    {
166        let startup_cache_state = check_db_invalidation_and_cleanup(&base_path)
167            .context("Failed to check database invalidation and cleanup")?;
168        let versioned_path = handle_db_versioning(&base_path, version_info, is_ci)
169            .context("Failed to handle database versioning")?;
170        let database = (database)(versioned_path).context("Failed to open database")?;
171        let backing_storage = Self {
172            inner: Arc::new_cyclic(
173                move |weak_inner: &Weak<KeyValueDatabaseBackingStorageInner<T>>| {
174                    let panic_hook_guard = if should_invalidate_on_panic() {
175                        let weak_inner = weak_inner.clone();
176                        Some(register_panic_hook(Box::new(move |_| {
177                            let Some(inner) = weak_inner.upgrade() else {
178                                return;
179                            };
180                            let _ = inner.invalidate(invalidation_reasons::PANIC);
185                        })))
186                    } else {
187                        None
188                    };
189                    KeyValueDatabaseBackingStorageInner {
190                        database,
191                        base_path: Some(base_path),
192                        invalidated: Mutex::new(false),
193                        _panic_hook_guard: panic_hook_guard,
194                    }
195                },
196            ),
197        };
198        Ok((backing_storage, startup_cache_state))
199    }
200}
201
202impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorageInner<T> {
203    fn with_tx<R>(
204        &self,
205        tx: Option<&T::ReadTransaction<'_>>,
206        f: impl FnOnce(&T::ReadTransaction<'_>) -> Result<R>,
207    ) -> Result<R> {
208        if let Some(tx) = tx {
209            f(tx)
210        } else {
211            let tx = self.database.begin_read_transaction()?;
212            let r = f(&tx)?;
213            drop(tx);
214            Ok(r)
215        }
216    }
217
218    fn invalidate(&self, reason_code: &str) -> Result<()> {
219        if let Some(base_path) = &self.base_path {
221            let mut invalidated_guard = self
224                .invalidated
225                .lock()
226                .unwrap_or_else(PoisonError::into_inner);
227            if *invalidated_guard {
228                return Ok(());
229            }
230            invalidate_db(base_path, reason_code)?;
234            self.database.prevent_writes();
235            *invalidated_guard = true;
237        }
238        Ok(())
239    }
240
241    fn get_infra_u32(&self, key: u32) -> Result<Option<u32>> {
243        let tx = self.database.begin_read_transaction()?;
244        self.database
245            .get(&tx, KeySpace::Infra, IntKey::new(key).as_ref())?
246            .map(as_u32)
247            .transpose()
248    }
249}
250
251impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
252    for KeyValueDatabaseBackingStorage<T>
253{
254    fn invalidate(&self, reason_code: &str) -> Result<()> {
255        self.inner.invalidate(reason_code)
256    }
257}
258
259impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
260    for KeyValueDatabaseBackingStorage<T>
261{
262    type ReadTransaction<'l> = T::ReadTransaction<'l>;
263
264    fn next_free_task_id(&self) -> Result<TaskId> {
265        Ok(self
266            .inner
267            .get_infra_u32(META_KEY_NEXT_FREE_TASK_ID)
268            .context("Unable to read next free task id from database")?
269            .map_or(Ok(TaskId::MIN), TaskId::try_from)?)
270    }
271
272    fn next_session_id(&self) -> Result<SessionId> {
273        Ok(SessionId::try_from(
274            self.inner
275                .get_infra_u32(META_KEY_SESSION_ID)
276                .context("Unable to read session id from database")?
277                .unwrap_or(0)
278                + 1,
279        )?)
280    }
281
282    fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>> {
283        fn get(database: &impl KeyValueDatabase) -> Result<Vec<AnyOperation>> {
284            let tx = database.begin_read_transaction()?;
285            let Some(operations) = database.get(
286                &tx,
287                KeySpace::Infra,
288                IntKey::new(META_KEY_OPERATIONS).as_ref(),
289            )?
290            else {
291                return Ok(Vec::new());
292            };
293            let operations = deserialize_with_good_error(operations.borrow())?;
294            Ok(operations)
295        }
296        get(&self.inner.database).context("Unable to read uncompleted operations from database")
297    }
298
299    fn serialize(&self, task: TaskId, data: &Vec<CachedDataItem>) -> Result<SmallVec<[u8; 16]>> {
300        serialize(task, data)
301    }
302
303    fn save_snapshot<I>(
304        &self,
305        session_id: SessionId,
306        operations: Vec<Arc<AnyOperation>>,
307        task_cache_updates: Vec<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>,
308        snapshots: Vec<I>,
309    ) -> Result<()>
310    where
311        I: Iterator<
312                Item = (
313                    TaskId,
314                    Option<SmallVec<[u8; 16]>>,
315                    Option<SmallVec<[u8; 16]>>,
316                ),
317            > + Send
318            + Sync,
319    {
320        let _span = tracing::trace_span!("save snapshot", session_id = ?session_id, operations = operations.len());
321        let mut batch = self.inner.database.write_batch()?;
322
323        match &mut batch {
325            &mut WriteBatch::Concurrent(ref batch, _) => {
326                {
327                    let _span = tracing::trace_span!("update task data").entered();
328                    process_task_data(snapshots, Some(batch))?;
329                    let span = tracing::trace_span!("flush task data").entered();
330                    parallel::try_for_each(
331                        &[KeySpace::TaskMeta, KeySpace::TaskData],
332                        |&key_space| {
333                            let _span = span.clone().entered();
334                            unsafe { batch.flush(key_space) }
337                        },
338                    )?;
339                }
340
341                let mut next_task_id = get_next_free_task_id::<
342                    T::SerialWriteBatch<'_>,
343                    T::ConcurrentWriteBatch<'_>,
344                >(&mut WriteBatchRef::concurrent(batch))?;
345
346                {
347                    let _span = tracing::trace_span!(
348                        "update task cache",
349                        items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
350                    )
351                    .entered();
352                    let result = parallel::map_collect_owned::<_, _, Result<Vec<_>>>(
353                        task_cache_updates,
354                        |updates| {
355                            let _span = _span.clone().entered();
356                            let mut max_task_id = 0;
357
358                            let mut task_type_bytes = Vec::new();
359                            for (task_type, task_id) in updates {
360                                serialize_task_type(
361                                    &task_type,
362                                    &mut task_type_bytes,
363                                    Some(task_id),
364                                )?;
365                                let task_id: u32 = *task_id;
366
367                                batch
368                                    .put(
369                                        KeySpace::ForwardTaskCache,
370                                        WriteBuffer::Borrowed(&task_type_bytes),
371                                        WriteBuffer::Borrowed(&task_id.to_le_bytes()),
372                                    )
373                                    .with_context(|| {
374                                        anyhow!(
375                                            "Unable to write task cache {task_type:?} => {task_id}"
376                                        )
377                                    })?;
378                                batch
379                                    .put(
380                                        KeySpace::ReverseTaskCache,
381                                        WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()),
382                                        WriteBuffer::Borrowed(&task_type_bytes),
383                                    )
384                                    .with_context(|| {
385                                        anyhow!(
386                                            "Unable to write task cache {task_id} => {task_type:?}"
387                                        )
388                                    })?;
389                                max_task_id = max_task_id.max(task_id + 1);
390                            }
391
392                            Ok(max_task_id)
393                        },
394                    )?
395                    .into_iter()
396                    .max()
397                    .unwrap_or(0);
398                    next_task_id = next_task_id.max(result);
399                }
400
401                save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
402                    &mut WriteBatchRef::concurrent(batch),
403                    next_task_id,
404                    session_id,
405                    operations,
406                )?;
407            }
408            WriteBatch::Serial(batch) => {
409                {
410                    let _span = tracing::trace_span!("update tasks").entered();
411                    let task_items =
412                        process_task_data(snapshots, None::<&T::ConcurrentWriteBatch<'_>>)?;
413                    for (task_id, meta, data) in task_items.into_iter().flatten() {
414                        let key = IntKey::new(*task_id);
415                        let key = key.as_ref();
416                        if let Some(meta) = meta {
417                            batch
418                                .put(KeySpace::TaskMeta, WriteBuffer::Borrowed(key), meta)
419                                .with_context(|| {
420                                    anyhow!("Unable to write meta items for {task_id}")
421                                })?;
422                        }
423                        if let Some(data) = data {
424                            batch
425                                .put(KeySpace::TaskData, WriteBuffer::Borrowed(key), data)
426                                .with_context(|| {
427                                    anyhow!("Unable to write data items for {task_id}")
428                                })?;
429                        }
430                    }
431                    batch.flush(KeySpace::TaskMeta)?;
432                    batch.flush(KeySpace::TaskData)?;
433                }
434
435                let mut next_task_id = get_next_free_task_id::<
436                    T::SerialWriteBatch<'_>,
437                    T::ConcurrentWriteBatch<'_>,
438                >(&mut WriteBatchRef::serial(batch))?;
439
440                {
441                    let _span = tracing::trace_span!(
442                        "update task cache",
443                        items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
444                    )
445                    .entered();
446                    let mut task_type_bytes = Vec::new();
447                    for (task_type, task_id) in task_cache_updates.into_iter().flatten() {
448                        serialize_task_type(&task_type, &mut task_type_bytes, Some(task_id))?;
449                        let task_id = *task_id;
450
451                        batch
452                            .put(
453                                KeySpace::ForwardTaskCache,
454                                WriteBuffer::Borrowed(&task_type_bytes),
455                                WriteBuffer::Borrowed(&task_id.to_le_bytes()),
456                            )
457                            .with_context(|| {
458                                anyhow!("Unable to write task cache {task_type:?} => {task_id}")
459                            })?;
460                        batch
461                            .put(
462                                KeySpace::ReverseTaskCache,
463                                WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()),
464                                WriteBuffer::Borrowed(&task_type_bytes),
465                            )
466                            .with_context(|| {
467                                anyhow!("Unable to write task cache {task_id} => {task_type:?}")
468                            })?;
469                        next_task_id = next_task_id.max(task_id + 1);
470                    }
471                }
472
473                save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
474                    &mut WriteBatchRef::serial(batch),
475                    next_task_id,
476                    session_id,
477                    operations,
478                )?;
479            }
480        }
481
482        {
483            let _span = tracing::trace_span!("commit").entered();
484            batch
485                .commit()
486                .with_context(|| anyhow!("Unable to commit operations"))?;
487        }
488        Ok(())
489    }
490
491    fn start_read_transaction(&self) -> Option<Self::ReadTransaction<'_>> {
492        self.inner.database.begin_read_transaction().ok()
493    }
494
495    unsafe fn forward_lookup_task_cache(
496        &self,
497        tx: Option<&T::ReadTransaction<'_>>,
498        task_type: &CachedTaskType,
499    ) -> Result<Option<TaskId>> {
500        let inner = &*self.inner;
501        fn lookup<D: KeyValueDatabase>(
502            database: &D,
503            tx: &D::ReadTransaction<'_>,
504            task_type: &CachedTaskType,
505        ) -> Result<Option<TaskId>> {
506            let mut task_type_bytes = Vec::new();
507            serialize_task_type(task_type, &mut task_type_bytes, None)?;
508            let Some(bytes) = database.get(tx, KeySpace::ForwardTaskCache, &task_type_bytes)?
509            else {
510                return Ok(None);
511            };
512            let bytes = bytes.borrow().try_into()?;
513            let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
514            Ok(Some(id))
515        }
516        if inner.database.is_empty() {
517            return Ok(None);
520        }
521        inner
522            .with_tx(tx, |tx| lookup(&self.inner.database, tx, task_type))
523            .with_context(|| format!("Looking up task id for {task_type:?} from database failed"))
524    }
525
526    unsafe fn reverse_lookup_task_cache(
527        &self,
528        tx: Option<&T::ReadTransaction<'_>>,
529        task_id: TaskId,
530    ) -> Result<Option<Arc<CachedTaskType>>> {
531        let inner = &*self.inner;
532        fn lookup<D: KeyValueDatabase>(
533            database: &D,
534            tx: &D::ReadTransaction<'_>,
535            task_id: TaskId,
536        ) -> Result<Option<Arc<CachedTaskType>>> {
537            let Some(bytes) = database.get(
538                tx,
539                KeySpace::ReverseTaskCache,
540                IntKey::new(*task_id).as_ref(),
541            )?
542            else {
543                return Ok(None);
544            };
545            Ok(Some(deserialize_with_good_error(bytes.borrow())?))
546        }
547        inner
548            .with_tx(tx, |tx| lookup(&inner.database, tx, task_id))
549            .with_context(|| format!("Looking up task type for {task_id} from database failed"))
550    }
551
552    unsafe fn lookup_data(
553        &self,
554        tx: Option<&T::ReadTransaction<'_>>,
555        task_id: TaskId,
556        category: TaskDataCategory,
557    ) -> Result<Vec<CachedDataItem>> {
558        let inner = &*self.inner;
559        fn lookup<D: KeyValueDatabase>(
560            database: &D,
561            tx: &D::ReadTransaction<'_>,
562            task_id: TaskId,
563            category: TaskDataCategory,
564        ) -> Result<Vec<CachedDataItem>> {
565            let Some(bytes) = database.get(
566                tx,
567                match category {
568                    TaskDataCategory::Meta => KeySpace::TaskMeta,
569                    TaskDataCategory::Data => KeySpace::TaskData,
570                    TaskDataCategory::All => unreachable!(),
571                },
572                IntKey::new(*task_id).as_ref(),
573            )?
574            else {
575                return Ok(Vec::new());
576            };
577            let result: Vec<CachedDataItem> = deserialize_with_good_error(bytes.borrow())?;
578            Ok(result)
579        }
580        inner
581            .with_tx(tx, |tx| lookup(&inner.database, tx, task_id, category))
582            .with_context(|| format!("Looking up data for {task_id} from database failed"))
583    }
584
585    fn shutdown(&self) -> Result<()> {
586        self.inner.database.shutdown()
587    }
588}
589
590fn get_next_free_task_id<'a, S, C>(
591    batch: &mut WriteBatchRef<'_, 'a, S, C>,
592) -> Result<u32, anyhow::Error>
593where
594    S: SerialWriteBatch<'a>,
595    C: ConcurrentWriteBatch<'a>,
596{
597    Ok(
598        match batch.get(
599            KeySpace::Infra,
600            IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref(),
601        )? {
602            Some(bytes) => u32::from_le_bytes(Borrow::<[u8]>::borrow(&bytes).try_into()?),
603            None => 1,
604        },
605    )
606}
607
608fn save_infra<'a, S, C>(
609    batch: &mut WriteBatchRef<'_, 'a, S, C>,
610    next_task_id: u32,
611    session_id: SessionId,
612    operations: Vec<Arc<AnyOperation>>,
613) -> Result<(), anyhow::Error>
614where
615    S: SerialWriteBatch<'a>,
616    C: ConcurrentWriteBatch<'a>,
617{
618    {
619        batch
620            .put(
621                KeySpace::Infra,
622                WriteBuffer::Borrowed(IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref()),
623                WriteBuffer::Borrowed(&next_task_id.to_le_bytes()),
624            )
625            .with_context(|| anyhow!("Unable to write next free task id"))?;
626    }
627    {
628        let _span = tracing::trace_span!("update session id", session_id = ?session_id).entered();
629        batch
630            .put(
631                KeySpace::Infra,
632                WriteBuffer::Borrowed(IntKey::new(META_KEY_SESSION_ID).as_ref()),
633                WriteBuffer::Borrowed(&session_id.to_le_bytes()),
634            )
635            .with_context(|| anyhow!("Unable to write next session id"))?;
636    }
637    {
638        let _span =
639            tracing::trace_span!("update operations", operations = operations.len()).entered();
640        let operations = pot_serialize_small_vec(&operations)
641            .with_context(|| anyhow!("Unable to serialize operations"))?;
642        batch
643            .put(
644                KeySpace::Infra,
645                WriteBuffer::Borrowed(IntKey::new(META_KEY_OPERATIONS).as_ref()),
646                WriteBuffer::SmallVec(operations),
647            )
648            .with_context(|| anyhow!("Unable to write operations"))?;
649    }
650    batch.flush(KeySpace::Infra)?;
651    Ok(())
652}
653
654#[inline(never)]
662fn serialize_task_type(
663    task_type: &CachedTaskType,
664    mut task_type_bytes: &mut Vec<u8>,
665    task_id: Option<TaskId>,
666) -> Result<()> {
667    task_type_bytes.clear();
668    POT_CONFIG
669        .serialize_into(task_type, &mut task_type_bytes)
670        .with_context(|| {
671            if let Some(task_id) = task_id {
672                anyhow!("Unable to serialize task {task_id} cache key {task_type:?}")
673            } else {
674                anyhow!("Unable to serialize task cache key {task_type:?}")
675            }
676        })?;
677    #[cfg(feature = "verify_serialization")]
678    {
679        let deserialize: Result<CachedTaskType, _> = serde_path_to_error::deserialize(
680            &mut pot_de_symbol_list().deserializer_for_slice(&*task_type_bytes)?,
681        );
682        match deserialize {
683            Err(err) => {
684                println!(
685                    "Task type would not be deserializable {task_id:?}: {err:?}\n{task_type:#?}"
686                );
687                panic!("Task type would not be deserializable {task_id:?}: {err:?}");
688            }
689            Ok(task_type2) => {
690                if &task_type2 != task_type {
691                    println!(
692                        "Task type would not round-trip {task_id:?}:\noriginal: \
693                         {task_type:#?}\nround-tripped: {task_type2:#?}"
694                    );
695                    panic!(
696                        "Task type would not round-trip {task_id:?}:\noriginal: \
697                         {task_type:#?}\nround-tripped: {task_type2:#?}"
698                    );
699                }
700                let mut bytes2 = Vec::new();
701                let result2 = POT_CONFIG.serialize_into(&task_type2, &mut bytes2);
702                match result2 {
703                    Err(err) => {
704                        println!(
705                            "Task type would not be serializable the second time {task_id:?}: \
706                             {err:?}\n{task_type2:#?}"
707                        );
708                        panic!(
709                            "Task type would not be serializable the second time {task_id:?}: \
710                             {err:?}\n{task_type2:#?}"
711                        );
712                    }
713                    Ok(()) => {
714                        if bytes2 != *task_type_bytes {
715                            println!(
716                                "Task type would not serialize to the same bytes the second time \
717                                 {task_id:?}:\noriginal: {:x?}\nsecond: {:x?}\n{task_type2:#?}",
718                                task_type_bytes, bytes2
719                            );
720                            panic!(
721                                "Task type would not serialize to the same bytes the second time \
722                                 {task_id:?}:\noriginal: {:x?}\nsecond: {:x?}\n{task_type2:#?}",
723                                task_type_bytes, bytes2
724                            );
725                        }
726                    }
727                }
728            }
729        }
730    }
731    Ok(())
732}
733
734type SerializedTasks = Vec<
735    Vec<(
736        TaskId,
737        Option<WriteBuffer<'static>>,
738        Option<WriteBuffer<'static>>,
739    )>,
740>;
741
742fn process_task_data<'a, B: ConcurrentWriteBatch<'a> + Send + Sync, I>(
743    tasks: Vec<I>,
744    batch: Option<&B>,
745) -> Result<SerializedTasks>
746where
747    I: Iterator<
748            Item = (
749                TaskId,
750                Option<SmallVec<[u8; 16]>>,
751                Option<SmallVec<[u8; 16]>>,
752            ),
753        > + Send
754        + Sync,
755{
756    parallel::map_collect_owned::<_, _, Result<Vec<_>>>(tasks, |tasks| {
757        let mut result = Vec::new();
758        for (task_id, meta, data) in tasks {
759            if let Some(batch) = batch {
760                let key = IntKey::new(*task_id);
761                let key = key.as_ref();
762                if let Some(meta) = meta {
763                    batch.put(
764                        KeySpace::TaskMeta,
765                        WriteBuffer::Borrowed(key),
766                        WriteBuffer::SmallVec(meta),
767                    )?;
768                }
769                if let Some(data) = data {
770                    batch.put(
771                        KeySpace::TaskData,
772                        WriteBuffer::Borrowed(key),
773                        WriteBuffer::SmallVec(data),
774                    )?;
775                }
776            } else {
777                result.push((
779                    task_id,
780                    meta.map(WriteBuffer::SmallVec),
781                    data.map(WriteBuffer::SmallVec),
782                ));
783            }
784        }
785
786        Ok(result)
787    })
788}
789
790fn serialize(task: TaskId, data: &Vec<CachedDataItem>) -> Result<SmallVec<[u8; 16]>> {
791    Ok(match pot_serialize_small_vec(data) {
792        #[cfg(not(feature = "verify_serialization"))]
793        Ok(value) => value,
794        _ => {
795            let mut error = Ok(());
796            let mut data = data.clone();
797            data.retain(|item| {
798                let mut buf = Vec::<u8>::new();
799                let mut symbol_map = pot_ser_symbol_map();
800                let mut serializer = symbol_map.serializer_for(&mut buf).unwrap();
801                if let Err(err) = serde_path_to_error::serialize(&item, &mut serializer) {
802                    if item.is_optional() {
803                        #[cfg(feature = "verify_serialization")]
804                        println!(
805                            "Skipping non-serializable optional item for {task}: {item:?} due to \
806                             {err}"
807                        );
808                    } else {
809                        error = Err(err).context({
810                            anyhow!("Unable to serialize data item for {task}: {item:?}")
811                        });
812                    }
813                    false
814                } else {
815                    #[cfg(feature = "verify_serialization")]
816                    {
817                        let deserialize: Result<CachedDataItem, _> =
818                            serde_path_to_error::deserialize(
819                                &mut pot_de_symbol_list().deserializer_for_slice(&buf).unwrap(),
820                            );
821                        if let Err(err) = deserialize {
822                            println!(
823                                "Data item would not be deserializable {task}: {err:?}\n{item:?}"
824                            );
825                            return false;
826                        }
827                    }
828                    true
829                }
830            });
831            error?;
832
833            pot_serialize_small_vec(&data)
834                .with_context(|| anyhow!("Unable to serialize data items for {task}: {data:#?}"))?
835        }
836    })
837}
838
839fn deserialize_with_good_error<'de, T: Deserialize<'de>>(data: &'de [u8]) -> Result<T> {
840    match POT_CONFIG.deserialize(data) {
841        Ok(value) => Ok(value),
842        Err(error) => serde_path_to_error::deserialize::<'_, _, T>(
843            &mut pot_de_symbol_list().deserializer_for_slice(data)?,
844        )
845        .map_err(anyhow::Error::from)
846        .and(Err(error.into()))
847        .context("Deserialization failed"),
848    }
849}