1use std::{
2 borrow::Borrow,
3 cmp::max,
4 env,
5 path::PathBuf,
6 sync::{Arc, LazyLock, Mutex, PoisonError, Weak},
7};
8
9use anyhow::{Context, Result, anyhow};
10use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
11use serde::{Deserialize, Serialize};
12use smallvec::SmallVec;
13use tracing::Span;
14use turbo_tasks::{
15 SessionId, TaskId,
16 backend::CachedTaskType,
17 panic_hooks::{PanicHookGuard, register_panic_hook},
18 turbo_tasks_scope,
19};
20
21use crate::{
22 GitVersionInfo,
23 backend::{AnyOperation, TaskDataCategory},
24 backing_storage::{BackingStorage, BackingStorageSealed},
25 data::CachedDataItem,
26 database::{
27 db_invalidation::{StartupCacheState, check_db_invalidation_and_cleanup, invalidate_db},
28 db_versioning::handle_db_versioning,
29 key_value_database::{KeySpace, KeyValueDatabase},
30 write_batch::{
31 BaseWriteBatch, ConcurrentWriteBatch, SerialWriteBatch, WriteBatch, WriteBatchRef,
32 WriteBuffer,
33 },
34 },
35 db_invalidation::invalidation_reasons,
36 utils::chunked_vec::ChunkedVec,
37};
38
39const POT_CONFIG: pot::Config = pot::Config::new().compatibility(pot::Compatibility::V4);
40
41fn pot_serialize_small_vec<T: Serialize>(value: &T) -> pot::Result<SmallVec<[u8; 16]>> {
42 struct SmallVecWrite<'l>(&'l mut SmallVec<[u8; 16]>);
43 impl std::io::Write for SmallVecWrite<'_> {
44 #[inline]
45 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
46 self.0.extend_from_slice(buf);
47 Ok(buf.len())
48 }
49
50 #[inline]
51 fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
52 self.0.extend_from_slice(buf);
53 Ok(())
54 }
55
56 #[inline]
57 fn flush(&mut self) -> std::io::Result<()> {
58 Ok(())
59 }
60 }
61
62 let mut output = SmallVec::new();
63 POT_CONFIG.serialize_into(value, SmallVecWrite(&mut output))?;
64 Ok(output)
65}
66
67fn pot_ser_symbol_map() -> pot::ser::SymbolMap {
68 pot::ser::SymbolMap::new().with_compatibility(pot::Compatibility::V4)
69}
70
71fn pot_de_symbol_list<'l>() -> pot::de::SymbolList<'l> {
72 pot::de::SymbolList::new()
73}
74
75const META_KEY_OPERATIONS: u32 = 0;
76const META_KEY_NEXT_FREE_TASK_ID: u32 = 1;
77const META_KEY_SESSION_ID: u32 = 2;
78
79struct IntKey([u8; 4]);
80
81impl IntKey {
82 fn new(value: u32) -> Self {
83 Self(value.to_le_bytes())
84 }
85}
86
87impl AsRef<[u8]> for IntKey {
88 fn as_ref(&self) -> &[u8] {
89 &self.0
90 }
91}
92
93fn as_u32(bytes: impl Borrow<[u8]>) -> Result<u32> {
94 let n = u32::from_le_bytes(bytes.borrow().try_into()?);
95 Ok(n)
96}
97
98fn should_invalidate_on_panic() -> bool {
107 fn env_is_falsy(key: &str) -> bool {
108 env::var_os(key)
109 .is_none_or(|value| ["".as_ref(), "0".as_ref(), "false".as_ref()].contains(&&*value))
110 }
111 static SHOULD_INVALIDATE: LazyLock<bool> = LazyLock::new(|| {
112 env_is_falsy("TURBO_ENGINE_SKIP_INVALIDATE_ON_PANIC") && env_is_falsy("__NEXT_TEST_MODE")
113 });
114 *SHOULD_INVALIDATE
115}
116
117pub struct KeyValueDatabaseBackingStorageInner<T: KeyValueDatabase> {
118 database: T,
119 base_path: Option<PathBuf>,
122 invalidated: Mutex<bool>,
124 _panic_hook_guard: Option<PanicHookGuard>,
127}
128
129pub struct KeyValueDatabaseBackingStorage<T: KeyValueDatabase> {
130 inner: Arc<KeyValueDatabaseBackingStorageInner<T>>,
132}
133
134impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorage<T> {
138 pub(crate) fn new_in_memory(database: T) -> Self {
139 Self {
140 inner: Arc::new(KeyValueDatabaseBackingStorageInner {
141 database,
142 base_path: None,
143 invalidated: Mutex::new(false),
144 _panic_hook_guard: None,
145 }),
146 }
147 }
148
149 pub(crate) fn open_versioned_on_disk(
161 base_path: PathBuf,
162 version_info: &GitVersionInfo,
163 is_ci: bool,
164 database: impl FnOnce(PathBuf) -> Result<T>,
165 ) -> Result<(Self, StartupCacheState)>
166 where
167 T: Send + Sync + 'static,
168 {
169 let startup_cache_state = check_db_invalidation_and_cleanup(&base_path)?;
170 let versioned_path = handle_db_versioning(&base_path, version_info, is_ci)?;
171 let database = (database)(versioned_path)?;
172 let backing_storage = Self {
173 inner: Arc::new_cyclic(
174 move |weak_inner: &Weak<KeyValueDatabaseBackingStorageInner<T>>| {
175 let panic_hook_guard = if should_invalidate_on_panic() {
176 let weak_inner = weak_inner.clone();
177 Some(register_panic_hook(Box::new(move |_| {
178 let Some(inner) = weak_inner.upgrade() else {
179 return;
180 };
181 let _ = inner.invalidate(invalidation_reasons::PANIC);
186 })))
187 } else {
188 None
189 };
190 KeyValueDatabaseBackingStorageInner {
191 database,
192 base_path: Some(base_path),
193 invalidated: Mutex::new(false),
194 _panic_hook_guard: panic_hook_guard,
195 }
196 },
197 ),
198 };
199 Ok((backing_storage, startup_cache_state))
200 }
201}
202
203impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorageInner<T> {
204 fn with_tx<R>(
205 &self,
206 tx: Option<&T::ReadTransaction<'_>>,
207 f: impl FnOnce(&T::ReadTransaction<'_>) -> Result<R>,
208 ) -> Result<R> {
209 if let Some(tx) = tx {
210 f(tx)
211 } else {
212 let tx = self.database.begin_read_transaction()?;
213 let r = f(&tx)?;
214 drop(tx);
215 Ok(r)
216 }
217 }
218
219 fn invalidate(&self, reason_code: &str) -> Result<()> {
220 if let Some(base_path) = &self.base_path {
222 let mut invalidated_guard = self
225 .invalidated
226 .lock()
227 .unwrap_or_else(PoisonError::into_inner);
228 if *invalidated_guard {
229 return Ok(());
230 }
231 invalidate_db(base_path, reason_code)?;
235 self.database.prevent_writes();
236 *invalidated_guard = true;
238 }
239 Ok(())
240 }
241
242 fn get_infra_u32(&self, key: u32) -> Result<Option<u32>> {
244 let tx = self.database.begin_read_transaction()?;
245 self.database
246 .get(&tx, KeySpace::Infra, IntKey::new(key).as_ref())?
247 .map(as_u32)
248 .transpose()
249 }
250}
251
252impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
253 for KeyValueDatabaseBackingStorage<T>
254{
255 fn invalidate(&self, reason_code: &str) -> Result<()> {
256 self.inner.invalidate(reason_code)
257 }
258}
259
260impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
261 for KeyValueDatabaseBackingStorage<T>
262{
263 type ReadTransaction<'l> = T::ReadTransaction<'l>;
264
265 fn next_free_task_id(&self) -> Result<TaskId> {
266 Ok(TaskId::try_from(
267 self.inner
268 .get_infra_u32(META_KEY_NEXT_FREE_TASK_ID)
269 .context("Unable to read next free task id from database")?
270 .unwrap_or(1),
271 )?)
272 }
273
274 fn next_session_id(&self) -> Result<SessionId> {
275 Ok(SessionId::try_from(
276 self.inner
277 .get_infra_u32(META_KEY_SESSION_ID)
278 .context("Unable to read session id from database")?
279 .unwrap_or(0)
280 + 1,
281 )?)
282 }
283
284 fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>> {
285 fn get(database: &impl KeyValueDatabase) -> Result<Vec<AnyOperation>> {
286 let tx = database.begin_read_transaction()?;
287 let Some(operations) = database.get(
288 &tx,
289 KeySpace::Infra,
290 IntKey::new(META_KEY_OPERATIONS).as_ref(),
291 )?
292 else {
293 return Ok(Vec::new());
294 };
295 let operations = deserialize_with_good_error(operations.borrow())?;
296 Ok(operations)
297 }
298 get(&self.inner.database).context("Unable to read uncompleted operations from database")
299 }
300
301 fn serialize(&self, task: TaskId, data: &Vec<CachedDataItem>) -> Result<SmallVec<[u8; 16]>> {
302 serialize(task, data)
303 }
304
305 fn save_snapshot<I>(
306 &self,
307 session_id: SessionId,
308 operations: Vec<Arc<AnyOperation>>,
309 task_cache_updates: Vec<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>,
310 snapshots: Vec<I>,
311 ) -> Result<()>
312 where
313 I: Iterator<
314 Item = (
315 TaskId,
316 Option<SmallVec<[u8; 16]>>,
317 Option<SmallVec<[u8; 16]>>,
318 ),
319 > + Send
320 + Sync,
321 {
322 let _span = tracing::trace_span!("save snapshot", session_id = ?session_id, operations = operations.len());
323 let mut batch = self.inner.database.write_batch()?;
324
325 match &mut batch {
327 &mut WriteBatch::Concurrent(ref batch, _) => {
328 {
329 let _span = tracing::trace_span!("update task data").entered();
330 process_task_data(snapshots, Some(batch))?;
331 let span = tracing::trace_span!("flush task data").entered();
332 [KeySpace::TaskMeta, KeySpace::TaskData]
333 .into_par_iter()
334 .try_for_each(|key_space| {
335 let _span = span.clone().entered();
336 unsafe { batch.flush(key_space) }
339 })?;
340 }
341
342 let mut next_task_id = get_next_free_task_id::<
343 T::SerialWriteBatch<'_>,
344 T::ConcurrentWriteBatch<'_>,
345 >(&mut WriteBatchRef::concurrent(batch))?;
346
347 {
348 let _span = tracing::trace_span!(
349 "update task cache",
350 items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
351 )
352 .entered();
353 let result = task_cache_updates
354 .into_par_iter()
355 .with_max_len(1)
356 .map(|updates| {
357 let _span = _span.clone().entered();
358 let mut max_task_id = 0;
359
360 let mut task_type_bytes = Vec::new();
361 for (task_type, task_id) in updates {
362 let task_id: u32 = *task_id;
363 serialize_task_type(&task_type, &mut task_type_bytes, task_id)?;
364
365 batch
366 .put(
367 KeySpace::ForwardTaskCache,
368 WriteBuffer::Borrowed(&task_type_bytes),
369 WriteBuffer::Borrowed(&task_id.to_le_bytes()),
370 )
371 .with_context(|| {
372 anyhow!(
373 "Unable to write task cache {task_type:?} => {task_id}"
374 )
375 })?;
376 batch
377 .put(
378 KeySpace::ReverseTaskCache,
379 WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()),
380 WriteBuffer::Borrowed(&task_type_bytes),
381 )
382 .with_context(|| {
383 anyhow!(
384 "Unable to write task cache {task_id} => {task_type:?}"
385 )
386 })?;
387 max_task_id = max_task_id.max(task_id + 1);
388 }
389
390 Ok(max_task_id)
391 })
392 .reduce(
393 || Ok(0),
394 |a, b| -> anyhow::Result<_> {
395 let a_max = a?;
396 let b_max = b?;
397 Ok(max(a_max, b_max))
398 },
399 )?;
400 next_task_id = next_task_id.max(result);
401 }
402
403 save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
404 &mut WriteBatchRef::concurrent(batch),
405 next_task_id,
406 session_id,
407 operations,
408 )?;
409 }
410 WriteBatch::Serial(batch) => {
411 let mut task_items_result = Ok(Vec::new());
412 turbo_tasks::scope(|s| {
413 s.spawn(|_| {
414 task_items_result =
415 process_task_data(snapshots, None::<&T::ConcurrentWriteBatch<'_>>);
416 });
417
418 let mut next_task_id =
419 get_next_free_task_id::<
420 T::SerialWriteBatch<'_>,
421 T::ConcurrentWriteBatch<'_>,
422 >(&mut WriteBatchRef::serial(batch))?;
423
424 {
425 let _span = tracing::trace_span!(
426 "update task cache",
427 items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
428 )
429 .entered();
430 let mut task_type_bytes = Vec::new();
431 for (task_type, task_id) in task_cache_updates.into_iter().flatten() {
432 let task_id = *task_id;
433 serialize_task_type(&task_type, &mut task_type_bytes, task_id)?;
434
435 batch
436 .put(
437 KeySpace::ForwardTaskCache,
438 WriteBuffer::Borrowed(&task_type_bytes),
439 WriteBuffer::Borrowed(&task_id.to_le_bytes()),
440 )
441 .with_context(|| {
442 anyhow!("Unable to write task cache {task_type:?} => {task_id}")
443 })?;
444 batch
445 .put(
446 KeySpace::ReverseTaskCache,
447 WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()),
448 WriteBuffer::Borrowed(&task_type_bytes),
449 )
450 .with_context(|| {
451 anyhow!("Unable to write task cache {task_id} => {task_type:?}")
452 })?;
453 next_task_id = next_task_id.max(task_id + 1);
454 }
455 }
456
457 save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
458 &mut WriteBatchRef::serial(batch),
459 next_task_id,
460 session_id,
461 operations,
462 )?;
463 anyhow::Ok(())
464 })?;
465
466 {
467 let _span = tracing::trace_span!("update tasks").entered();
468 for (task_id, meta, data) in task_items_result?.into_iter().flatten() {
469 let key = IntKey::new(*task_id);
470 let key = key.as_ref();
471 if let Some(meta) = meta {
472 batch
473 .put(KeySpace::TaskMeta, WriteBuffer::Borrowed(key), meta)
474 .with_context(|| {
475 anyhow!("Unable to write meta items for {task_id}")
476 })?;
477 }
478 if let Some(data) = data {
479 batch
480 .put(KeySpace::TaskData, WriteBuffer::Borrowed(key), data)
481 .with_context(|| {
482 anyhow!("Unable to write data items for {task_id}")
483 })?;
484 }
485 }
486 }
487 }
488 }
489
490 {
491 let _span = tracing::trace_span!("commit").entered();
492 batch
493 .commit()
494 .with_context(|| anyhow!("Unable to commit operations"))?;
495 }
496 Ok(())
497 }
498
499 fn start_read_transaction(&self) -> Option<Self::ReadTransaction<'_>> {
500 self.inner.database.begin_read_transaction().ok()
501 }
502
503 unsafe fn forward_lookup_task_cache(
504 &self,
505 tx: Option<&T::ReadTransaction<'_>>,
506 task_type: &CachedTaskType,
507 ) -> Result<Option<TaskId>> {
508 let inner = &*self.inner;
509 fn lookup<D: KeyValueDatabase>(
510 database: &D,
511 tx: &D::ReadTransaction<'_>,
512 task_type: &CachedTaskType,
513 ) -> Result<Option<TaskId>> {
514 let task_type = POT_CONFIG.serialize(task_type)?;
515 let Some(bytes) = database.get(tx, KeySpace::ForwardTaskCache, &task_type)? else {
516 return Ok(None);
517 };
518 let bytes = bytes.borrow().try_into()?;
519 let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
520 Ok(Some(id))
521 }
522 if inner.database.is_empty() {
523 return Ok(None);
526 }
527 inner
528 .with_tx(tx, |tx| lookup(&self.inner.database, tx, task_type))
529 .with_context(|| format!("Looking up task id for {task_type:?} from database failed"))
530 }
531
532 unsafe fn reverse_lookup_task_cache(
533 &self,
534 tx: Option<&T::ReadTransaction<'_>>,
535 task_id: TaskId,
536 ) -> Result<Option<Arc<CachedTaskType>>> {
537 let inner = &*self.inner;
538 fn lookup<D: KeyValueDatabase>(
539 database: &D,
540 tx: &D::ReadTransaction<'_>,
541 task_id: TaskId,
542 ) -> Result<Option<Arc<CachedTaskType>>> {
543 let Some(bytes) = database.get(
544 tx,
545 KeySpace::ReverseTaskCache,
546 IntKey::new(*task_id).as_ref(),
547 )?
548 else {
549 return Ok(None);
550 };
551 Ok(Some(deserialize_with_good_error(bytes.borrow())?))
552 }
553 inner
554 .with_tx(tx, |tx| lookup(&inner.database, tx, task_id))
555 .with_context(|| format!("Looking up task type for {task_id} from database failed"))
556 }
557
558 unsafe fn lookup_data(
559 &self,
560 tx: Option<&T::ReadTransaction<'_>>,
561 task_id: TaskId,
562 category: TaskDataCategory,
563 ) -> Result<Vec<CachedDataItem>> {
564 let inner = &*self.inner;
565 fn lookup<D: KeyValueDatabase>(
566 database: &D,
567 tx: &D::ReadTransaction<'_>,
568 task_id: TaskId,
569 category: TaskDataCategory,
570 ) -> Result<Vec<CachedDataItem>> {
571 let Some(bytes) = database.get(
572 tx,
573 match category {
574 TaskDataCategory::Meta => KeySpace::TaskMeta,
575 TaskDataCategory::Data => KeySpace::TaskData,
576 TaskDataCategory::All => unreachable!(),
577 },
578 IntKey::new(*task_id).as_ref(),
579 )?
580 else {
581 return Ok(Vec::new());
582 };
583 let result: Vec<CachedDataItem> = deserialize_with_good_error(bytes.borrow())?;
584 Ok(result)
585 }
586 inner
587 .with_tx(tx, |tx| lookup(&inner.database, tx, task_id, category))
588 .with_context(|| format!("Looking up data for {task_id} from database failed"))
589 }
590
591 fn shutdown(&self) -> Result<()> {
592 self.inner.database.shutdown()
593 }
594}
595
596fn get_next_free_task_id<'a, S, C>(
597 batch: &mut WriteBatchRef<'_, 'a, S, C>,
598) -> Result<u32, anyhow::Error>
599where
600 S: SerialWriteBatch<'a>,
601 C: ConcurrentWriteBatch<'a>,
602{
603 Ok(
604 match batch.get(
605 KeySpace::Infra,
606 IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref(),
607 )? {
608 Some(bytes) => u32::from_le_bytes(Borrow::<[u8]>::borrow(&bytes).try_into()?),
609 None => 1,
610 },
611 )
612}
613
614fn save_infra<'a, S, C>(
615 batch: &mut WriteBatchRef<'_, 'a, S, C>,
616 next_task_id: u32,
617 session_id: SessionId,
618 operations: Vec<Arc<AnyOperation>>,
619) -> Result<(), anyhow::Error>
620where
621 S: SerialWriteBatch<'a>,
622 C: ConcurrentWriteBatch<'a>,
623{
624 {
625 batch
626 .put(
627 KeySpace::Infra,
628 WriteBuffer::Borrowed(IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref()),
629 WriteBuffer::Borrowed(&next_task_id.to_le_bytes()),
630 )
631 .with_context(|| anyhow!("Unable to write next free task id"))?;
632 }
633 {
634 let _span = tracing::trace_span!("update session id", session_id = ?session_id).entered();
635 batch
636 .put(
637 KeySpace::Infra,
638 WriteBuffer::Borrowed(IntKey::new(META_KEY_SESSION_ID).as_ref()),
639 WriteBuffer::Borrowed(&session_id.to_le_bytes()),
640 )
641 .with_context(|| anyhow!("Unable to write next session id"))?;
642 }
643 {
644 let _span =
645 tracing::trace_span!("update operations", operations = operations.len()).entered();
646 let operations = pot_serialize_small_vec(&operations)
647 .with_context(|| anyhow!("Unable to serialize operations"))?;
648 batch
649 .put(
650 KeySpace::Infra,
651 WriteBuffer::Borrowed(IntKey::new(META_KEY_OPERATIONS).as_ref()),
652 WriteBuffer::SmallVec(operations),
653 )
654 .with_context(|| anyhow!("Unable to write operations"))?;
655 }
656 batch.flush(KeySpace::Infra)?;
657 Ok(())
658}
659
660fn serialize_task_type(
661 task_type: &Arc<CachedTaskType>,
662 mut task_type_bytes: &mut Vec<u8>,
663 task_id: u32,
664) -> Result<()> {
665 task_type_bytes.clear();
666 POT_CONFIG
667 .serialize_into(&**task_type, &mut task_type_bytes)
668 .with_context(|| anyhow!("Unable to serialize task {task_id} cache key {task_type:?}"))?;
669 #[cfg(feature = "verify_serialization")]
670 {
671 let deserialize: Result<CachedTaskType, _> = serde_path_to_error::deserialize(
672 &mut pot_de_symbol_list().deserializer_for_slice(&*task_type_bytes)?,
673 );
674 if let Err(err) = deserialize {
675 println!("Task type would not be deserializable {task_id}: {err:?}\n{task_type:#?}");
676 panic!("Task type would not be deserializable {task_id}: {err:?}");
677 }
678 }
679 Ok(())
680}
681
682type SerializedTasks = Vec<
683 Vec<(
684 TaskId,
685 Option<WriteBuffer<'static>>,
686 Option<WriteBuffer<'static>>,
687 )>,
688>;
689
690fn process_task_data<'a, B: ConcurrentWriteBatch<'a> + Send + Sync, I>(
691 tasks: Vec<I>,
692 batch: Option<&B>,
693) -> Result<SerializedTasks>
694where
695 I: Iterator<
696 Item = (
697 TaskId,
698 Option<SmallVec<[u8; 16]>>,
699 Option<SmallVec<[u8; 16]>>,
700 ),
701 > + Send
702 + Sync,
703{
704 let span = Span::current();
705 let turbo_tasks = turbo_tasks::turbo_tasks();
706 let handle = tokio::runtime::Handle::current();
707 tasks
708 .into_par_iter()
709 .map(|tasks| {
710 let _span = span.clone().entered();
711 let _guard = handle.clone().enter();
712 turbo_tasks_scope(turbo_tasks.clone(), || {
713 let mut result = Vec::new();
714 for (task_id, meta, data) in tasks {
715 if let Some(batch) = batch {
716 let key = IntKey::new(*task_id);
717 let key = key.as_ref();
718 if let Some(meta) = meta {
719 batch.put(
720 KeySpace::TaskMeta,
721 WriteBuffer::Borrowed(key),
722 WriteBuffer::SmallVec(meta),
723 )?;
724 }
725 if let Some(data) = data {
726 batch.put(
727 KeySpace::TaskData,
728 WriteBuffer::Borrowed(key),
729 WriteBuffer::SmallVec(data),
730 )?;
731 }
732 } else {
733 result.push((
735 task_id,
736 meta.map(WriteBuffer::SmallVec),
737 data.map(WriteBuffer::SmallVec),
738 ));
739 }
740 }
741
742 Ok(result)
743 })
744 })
745 .collect::<Result<Vec<_>>>()
746}
747
748fn serialize(task: TaskId, data: &Vec<CachedDataItem>) -> Result<SmallVec<[u8; 16]>> {
749 Ok(match pot_serialize_small_vec(data) {
750 #[cfg(not(feature = "verify_serialization"))]
751 Ok(value) => value,
752 _ => {
753 let mut error = Ok(());
754 let mut data = data.clone();
755 data.retain(|item| {
756 let mut buf = Vec::<u8>::new();
757 let mut symbol_map = pot_ser_symbol_map();
758 let mut serializer = symbol_map.serializer_for(&mut buf).unwrap();
759 if let Err(err) = serde_path_to_error::serialize(&item, &mut serializer) {
760 if item.is_optional() {
761 #[cfg(feature = "verify_serialization")]
762 println!("Skipping non-serializable optional item for {task}: {item:?}");
763 } else {
764 error = Err(err).context({
765 anyhow!("Unable to serialize data item for {task}: {item:?}")
766 });
767 }
768 false
769 } else {
770 #[cfg(feature = "verify_serialization")]
771 {
772 let deserialize: Result<CachedDataItem, _> =
773 serde_path_to_error::deserialize(
774 &mut pot_de_symbol_list().deserializer_for_slice(&buf).unwrap(),
775 );
776 if let Err(err) = deserialize {
777 println!(
778 "Data item would not be deserializable {task}: {err:?}\n{item:?}"
779 );
780 return false;
781 }
782 }
783 true
784 }
785 });
786 error?;
787
788 pot_serialize_small_vec(&data)
789 .with_context(|| anyhow!("Unable to serialize data items for {task}: {data:#?}"))?
790 }
791 })
792}
793
794fn deserialize_with_good_error<'de, T: Deserialize<'de>>(data: &'de [u8]) -> Result<T> {
795 match POT_CONFIG.deserialize(data) {
796 Ok(value) => Ok(value),
797 Err(error) => serde_path_to_error::deserialize::<'_, _, T>(
798 &mut pot_de_symbol_list().deserializer_for_slice(data)?,
799 )
800 .map_err(anyhow::Error::from)
801 .and(Err(error.into()))
802 .context("Deserialization failed"),
803 }
804}