1use std::{
2 borrow::Borrow,
3 env,
4 path::PathBuf,
5 sync::{Arc, LazyLock, Mutex, PoisonError, Weak},
6};
7
8use anyhow::{Context, Result, anyhow};
9use serde::{Deserialize, Serialize};
10use smallvec::SmallVec;
11use turbo_tasks::{
12 SessionId, TaskId,
13 backend::CachedTaskType,
14 panic_hooks::{PanicHookGuard, register_panic_hook},
15 parallel,
16};
17
18use crate::{
19 GitVersionInfo,
20 backend::{AnyOperation, TaskDataCategory},
21 backing_storage::{BackingStorage, BackingStorageSealed},
22 data::CachedDataItem,
23 database::{
24 db_invalidation::{StartupCacheState, check_db_invalidation_and_cleanup, invalidate_db},
25 db_versioning::handle_db_versioning,
26 key_value_database::{KeySpace, KeyValueDatabase},
27 write_batch::{
28 BaseWriteBatch, ConcurrentWriteBatch, SerialWriteBatch, WriteBatch, WriteBatchRef,
29 WriteBuffer,
30 },
31 },
32 db_invalidation::invalidation_reasons,
33 utils::chunked_vec::ChunkedVec,
34};
35
36const POT_CONFIG: pot::Config = pot::Config::new().compatibility(pot::Compatibility::V4);
37
38fn pot_serialize_small_vec<T: Serialize>(value: &T) -> pot::Result<SmallVec<[u8; 16]>> {
39 struct SmallVecWrite<'l>(&'l mut SmallVec<[u8; 16]>);
40 impl std::io::Write for SmallVecWrite<'_> {
41 #[inline]
42 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
43 self.0.extend_from_slice(buf);
44 Ok(buf.len())
45 }
46
47 #[inline]
48 fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
49 self.0.extend_from_slice(buf);
50 Ok(())
51 }
52
53 #[inline]
54 fn flush(&mut self) -> std::io::Result<()> {
55 Ok(())
56 }
57 }
58
59 let mut output = SmallVec::new();
60 POT_CONFIG.serialize_into(value, SmallVecWrite(&mut output))?;
61 Ok(output)
62}
63
64fn pot_ser_symbol_map() -> pot::ser::SymbolMap {
65 pot::ser::SymbolMap::new().with_compatibility(pot::Compatibility::V4)
66}
67
68fn pot_de_symbol_list<'l>() -> pot::de::SymbolList<'l> {
69 pot::de::SymbolList::new()
70}
71
72const META_KEY_OPERATIONS: u32 = 0;
73const META_KEY_NEXT_FREE_TASK_ID: u32 = 1;
74const META_KEY_SESSION_ID: u32 = 2;
75
76struct IntKey([u8; 4]);
77
78impl IntKey {
79 fn new(value: u32) -> Self {
80 Self(value.to_le_bytes())
81 }
82}
83
84impl AsRef<[u8]> for IntKey {
85 fn as_ref(&self) -> &[u8] {
86 &self.0
87 }
88}
89
90fn as_u32(bytes: impl Borrow<[u8]>) -> Result<u32> {
91 let n = u32::from_le_bytes(bytes.borrow().try_into()?);
92 Ok(n)
93}
94
95fn should_invalidate_on_panic() -> bool {
104 fn env_is_falsy(key: &str) -> bool {
105 env::var_os(key)
106 .is_none_or(|value| ["".as_ref(), "0".as_ref(), "false".as_ref()].contains(&&*value))
107 }
108 static SHOULD_INVALIDATE: LazyLock<bool> = LazyLock::new(|| {
109 env_is_falsy("TURBO_ENGINE_SKIP_INVALIDATE_ON_PANIC") && env_is_falsy("__NEXT_TEST_MODE")
110 });
111 *SHOULD_INVALIDATE
112}
113
114pub struct KeyValueDatabaseBackingStorageInner<T: KeyValueDatabase> {
115 database: T,
116 base_path: Option<PathBuf>,
119 invalidated: Mutex<bool>,
121 _panic_hook_guard: Option<PanicHookGuard>,
124}
125
126pub struct KeyValueDatabaseBackingStorage<T: KeyValueDatabase> {
127 inner: Arc<KeyValueDatabaseBackingStorageInner<T>>,
129}
130
131impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorage<T> {
135 pub(crate) fn new_in_memory(database: T) -> Self {
136 Self {
137 inner: Arc::new(KeyValueDatabaseBackingStorageInner {
138 database,
139 base_path: None,
140 invalidated: Mutex::new(false),
141 _panic_hook_guard: None,
142 }),
143 }
144 }
145
146 pub(crate) fn open_versioned_on_disk(
158 base_path: PathBuf,
159 version_info: &GitVersionInfo,
160 is_ci: bool,
161 database: impl FnOnce(PathBuf) -> Result<T>,
162 ) -> Result<(Self, StartupCacheState)>
163 where
164 T: Send + Sync + 'static,
165 {
166 let startup_cache_state = check_db_invalidation_and_cleanup(&base_path)
167 .context("Failed to check database invalidation and cleanup")?;
168 let versioned_path = handle_db_versioning(&base_path, version_info, is_ci)
169 .context("Failed to handle database versioning")?;
170 let database = (database)(versioned_path).context("Failed to open database")?;
171 let backing_storage = Self {
172 inner: Arc::new_cyclic(
173 move |weak_inner: &Weak<KeyValueDatabaseBackingStorageInner<T>>| {
174 let panic_hook_guard = if should_invalidate_on_panic() {
175 let weak_inner = weak_inner.clone();
176 Some(register_panic_hook(Box::new(move |_| {
177 let Some(inner) = weak_inner.upgrade() else {
178 return;
179 };
180 let _ = inner.invalidate(invalidation_reasons::PANIC);
185 })))
186 } else {
187 None
188 };
189 KeyValueDatabaseBackingStorageInner {
190 database,
191 base_path: Some(base_path),
192 invalidated: Mutex::new(false),
193 _panic_hook_guard: panic_hook_guard,
194 }
195 },
196 ),
197 };
198 Ok((backing_storage, startup_cache_state))
199 }
200}
201
202impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorageInner<T> {
203 fn with_tx<R>(
204 &self,
205 tx: Option<&T::ReadTransaction<'_>>,
206 f: impl FnOnce(&T::ReadTransaction<'_>) -> Result<R>,
207 ) -> Result<R> {
208 if let Some(tx) = tx {
209 f(tx)
210 } else {
211 let tx = self.database.begin_read_transaction()?;
212 let r = f(&tx)?;
213 drop(tx);
214 Ok(r)
215 }
216 }
217
218 fn invalidate(&self, reason_code: &str) -> Result<()> {
219 if let Some(base_path) = &self.base_path {
221 let mut invalidated_guard = self
224 .invalidated
225 .lock()
226 .unwrap_or_else(PoisonError::into_inner);
227 if *invalidated_guard {
228 return Ok(());
229 }
230 invalidate_db(base_path, reason_code)?;
234 self.database.prevent_writes();
235 *invalidated_guard = true;
237 }
238 Ok(())
239 }
240
241 fn get_infra_u32(&self, key: u32) -> Result<Option<u32>> {
243 let tx = self.database.begin_read_transaction()?;
244 self.database
245 .get(&tx, KeySpace::Infra, IntKey::new(key).as_ref())?
246 .map(as_u32)
247 .transpose()
248 }
249}
250
251impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
252 for KeyValueDatabaseBackingStorage<T>
253{
254 fn invalidate(&self, reason_code: &str) -> Result<()> {
255 self.inner.invalidate(reason_code)
256 }
257}
258
259impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
260 for KeyValueDatabaseBackingStorage<T>
261{
262 type ReadTransaction<'l> = T::ReadTransaction<'l>;
263
264 fn next_free_task_id(&self) -> Result<TaskId> {
265 Ok(self
266 .inner
267 .get_infra_u32(META_KEY_NEXT_FREE_TASK_ID)
268 .context("Unable to read next free task id from database")?
269 .map_or(Ok(TaskId::MIN), TaskId::try_from)?)
270 }
271
272 fn next_session_id(&self) -> Result<SessionId> {
273 Ok(SessionId::try_from(
274 self.inner
275 .get_infra_u32(META_KEY_SESSION_ID)
276 .context("Unable to read session id from database")?
277 .unwrap_or(0)
278 + 1,
279 )?)
280 }
281
282 fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>> {
283 fn get(database: &impl KeyValueDatabase) -> Result<Vec<AnyOperation>> {
284 let tx = database.begin_read_transaction()?;
285 let Some(operations) = database.get(
286 &tx,
287 KeySpace::Infra,
288 IntKey::new(META_KEY_OPERATIONS).as_ref(),
289 )?
290 else {
291 return Ok(Vec::new());
292 };
293 let operations = deserialize_with_good_error(operations.borrow())?;
294 Ok(operations)
295 }
296 get(&self.inner.database).context("Unable to read uncompleted operations from database")
297 }
298
299 fn serialize(&self, task: TaskId, data: &Vec<CachedDataItem>) -> Result<SmallVec<[u8; 16]>> {
300 serialize(task, data)
301 }
302
303 fn save_snapshot<I>(
304 &self,
305 session_id: SessionId,
306 operations: Vec<Arc<AnyOperation>>,
307 task_cache_updates: Vec<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>,
308 snapshots: Vec<I>,
309 ) -> Result<()>
310 where
311 I: Iterator<
312 Item = (
313 TaskId,
314 Option<SmallVec<[u8; 16]>>,
315 Option<SmallVec<[u8; 16]>>,
316 ),
317 > + Send
318 + Sync,
319 {
320 let _span = tracing::trace_span!("save snapshot", session_id = ?session_id, operations = operations.len());
321 let mut batch = self.inner.database.write_batch()?;
322
323 match &mut batch {
325 &mut WriteBatch::Concurrent(ref batch, _) => {
326 {
327 let _span = tracing::trace_span!("update task data").entered();
328 process_task_data(snapshots, Some(batch))?;
329 let span = tracing::trace_span!("flush task data").entered();
330 parallel::try_for_each(
331 &[KeySpace::TaskMeta, KeySpace::TaskData],
332 |&key_space| {
333 let _span = span.clone().entered();
334 unsafe { batch.flush(key_space) }
337 },
338 )?;
339 }
340
341 let mut next_task_id = get_next_free_task_id::<
342 T::SerialWriteBatch<'_>,
343 T::ConcurrentWriteBatch<'_>,
344 >(&mut WriteBatchRef::concurrent(batch))?;
345
346 {
347 let _span = tracing::trace_span!(
348 "update task cache",
349 items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
350 )
351 .entered();
352 let result = parallel::map_collect_owned::<_, _, Result<Vec<_>>>(
353 task_cache_updates,
354 |updates| {
355 let _span = _span.clone().entered();
356 let mut max_task_id = 0;
357
358 let mut task_type_bytes = Vec::new();
359 for (task_type, task_id) in updates {
360 let task_id: u32 = *task_id;
361 serialize_task_type(&task_type, &mut task_type_bytes, task_id)?;
362
363 batch
364 .put(
365 KeySpace::ForwardTaskCache,
366 WriteBuffer::Borrowed(&task_type_bytes),
367 WriteBuffer::Borrowed(&task_id.to_le_bytes()),
368 )
369 .with_context(|| {
370 anyhow!(
371 "Unable to write task cache {task_type:?} => {task_id}"
372 )
373 })?;
374 batch
375 .put(
376 KeySpace::ReverseTaskCache,
377 WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()),
378 WriteBuffer::Borrowed(&task_type_bytes),
379 )
380 .with_context(|| {
381 anyhow!(
382 "Unable to write task cache {task_id} => {task_type:?}"
383 )
384 })?;
385 max_task_id = max_task_id.max(task_id + 1);
386 }
387
388 Ok(max_task_id)
389 },
390 )?
391 .into_iter()
392 .max()
393 .unwrap_or(0);
394 next_task_id = next_task_id.max(result);
395 }
396
397 save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
398 &mut WriteBatchRef::concurrent(batch),
399 next_task_id,
400 session_id,
401 operations,
402 )?;
403 }
404 WriteBatch::Serial(batch) => {
405 {
406 let _span = tracing::trace_span!("update tasks").entered();
407 let task_items =
408 process_task_data(snapshots, None::<&T::ConcurrentWriteBatch<'_>>)?;
409 for (task_id, meta, data) in task_items.into_iter().flatten() {
410 let key = IntKey::new(*task_id);
411 let key = key.as_ref();
412 if let Some(meta) = meta {
413 batch
414 .put(KeySpace::TaskMeta, WriteBuffer::Borrowed(key), meta)
415 .with_context(|| {
416 anyhow!("Unable to write meta items for {task_id}")
417 })?;
418 }
419 if let Some(data) = data {
420 batch
421 .put(KeySpace::TaskData, WriteBuffer::Borrowed(key), data)
422 .with_context(|| {
423 anyhow!("Unable to write data items for {task_id}")
424 })?;
425 }
426 }
427 batch.flush(KeySpace::TaskMeta)?;
428 batch.flush(KeySpace::TaskData)?;
429 }
430
431 let mut next_task_id = get_next_free_task_id::<
432 T::SerialWriteBatch<'_>,
433 T::ConcurrentWriteBatch<'_>,
434 >(&mut WriteBatchRef::serial(batch))?;
435
436 {
437 let _span = tracing::trace_span!(
438 "update task cache",
439 items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
440 )
441 .entered();
442 let mut task_type_bytes = Vec::new();
443 for (task_type, task_id) in task_cache_updates.into_iter().flatten() {
444 let task_id = *task_id;
445 serialize_task_type(&task_type, &mut task_type_bytes, task_id)?;
446
447 batch
448 .put(
449 KeySpace::ForwardTaskCache,
450 WriteBuffer::Borrowed(&task_type_bytes),
451 WriteBuffer::Borrowed(&task_id.to_le_bytes()),
452 )
453 .with_context(|| {
454 anyhow!("Unable to write task cache {task_type:?} => {task_id}")
455 })?;
456 batch
457 .put(
458 KeySpace::ReverseTaskCache,
459 WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()),
460 WriteBuffer::Borrowed(&task_type_bytes),
461 )
462 .with_context(|| {
463 anyhow!("Unable to write task cache {task_id} => {task_type:?}")
464 })?;
465 next_task_id = next_task_id.max(task_id + 1);
466 }
467 }
468
469 save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
470 &mut WriteBatchRef::serial(batch),
471 next_task_id,
472 session_id,
473 operations,
474 )?;
475 }
476 }
477
478 {
479 let _span = tracing::trace_span!("commit").entered();
480 batch
481 .commit()
482 .with_context(|| anyhow!("Unable to commit operations"))?;
483 }
484 Ok(())
485 }
486
487 fn start_read_transaction(&self) -> Option<Self::ReadTransaction<'_>> {
488 self.inner.database.begin_read_transaction().ok()
489 }
490
491 unsafe fn forward_lookup_task_cache(
492 &self,
493 tx: Option<&T::ReadTransaction<'_>>,
494 task_type: &CachedTaskType,
495 ) -> Result<Option<TaskId>> {
496 let inner = &*self.inner;
497 fn lookup<D: KeyValueDatabase>(
498 database: &D,
499 tx: &D::ReadTransaction<'_>,
500 task_type: &CachedTaskType,
501 ) -> Result<Option<TaskId>> {
502 let task_type = POT_CONFIG.serialize(task_type)?;
503 let Some(bytes) = database.get(tx, KeySpace::ForwardTaskCache, &task_type)? else {
504 return Ok(None);
505 };
506 let bytes = bytes.borrow().try_into()?;
507 let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
508 Ok(Some(id))
509 }
510 if inner.database.is_empty() {
511 return Ok(None);
514 }
515 inner
516 .with_tx(tx, |tx| lookup(&self.inner.database, tx, task_type))
517 .with_context(|| format!("Looking up task id for {task_type:?} from database failed"))
518 }
519
520 unsafe fn reverse_lookup_task_cache(
521 &self,
522 tx: Option<&T::ReadTransaction<'_>>,
523 task_id: TaskId,
524 ) -> Result<Option<Arc<CachedTaskType>>> {
525 let inner = &*self.inner;
526 fn lookup<D: KeyValueDatabase>(
527 database: &D,
528 tx: &D::ReadTransaction<'_>,
529 task_id: TaskId,
530 ) -> Result<Option<Arc<CachedTaskType>>> {
531 let Some(bytes) = database.get(
532 tx,
533 KeySpace::ReverseTaskCache,
534 IntKey::new(*task_id).as_ref(),
535 )?
536 else {
537 return Ok(None);
538 };
539 Ok(Some(deserialize_with_good_error(bytes.borrow())?))
540 }
541 inner
542 .with_tx(tx, |tx| lookup(&inner.database, tx, task_id))
543 .with_context(|| format!("Looking up task type for {task_id} from database failed"))
544 }
545
546 unsafe fn lookup_data(
547 &self,
548 tx: Option<&T::ReadTransaction<'_>>,
549 task_id: TaskId,
550 category: TaskDataCategory,
551 ) -> Result<Vec<CachedDataItem>> {
552 let inner = &*self.inner;
553 fn lookup<D: KeyValueDatabase>(
554 database: &D,
555 tx: &D::ReadTransaction<'_>,
556 task_id: TaskId,
557 category: TaskDataCategory,
558 ) -> Result<Vec<CachedDataItem>> {
559 let Some(bytes) = database.get(
560 tx,
561 match category {
562 TaskDataCategory::Meta => KeySpace::TaskMeta,
563 TaskDataCategory::Data => KeySpace::TaskData,
564 TaskDataCategory::All => unreachable!(),
565 },
566 IntKey::new(*task_id).as_ref(),
567 )?
568 else {
569 return Ok(Vec::new());
570 };
571 let result: Vec<CachedDataItem> = deserialize_with_good_error(bytes.borrow())?;
572 Ok(result)
573 }
574 inner
575 .with_tx(tx, |tx| lookup(&inner.database, tx, task_id, category))
576 .with_context(|| format!("Looking up data for {task_id} from database failed"))
577 }
578
579 fn shutdown(&self) -> Result<()> {
580 self.inner.database.shutdown()
581 }
582}
583
584fn get_next_free_task_id<'a, S, C>(
585 batch: &mut WriteBatchRef<'_, 'a, S, C>,
586) -> Result<u32, anyhow::Error>
587where
588 S: SerialWriteBatch<'a>,
589 C: ConcurrentWriteBatch<'a>,
590{
591 Ok(
592 match batch.get(
593 KeySpace::Infra,
594 IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref(),
595 )? {
596 Some(bytes) => u32::from_le_bytes(Borrow::<[u8]>::borrow(&bytes).try_into()?),
597 None => 1,
598 },
599 )
600}
601
602fn save_infra<'a, S, C>(
603 batch: &mut WriteBatchRef<'_, 'a, S, C>,
604 next_task_id: u32,
605 session_id: SessionId,
606 operations: Vec<Arc<AnyOperation>>,
607) -> Result<(), anyhow::Error>
608where
609 S: SerialWriteBatch<'a>,
610 C: ConcurrentWriteBatch<'a>,
611{
612 {
613 batch
614 .put(
615 KeySpace::Infra,
616 WriteBuffer::Borrowed(IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref()),
617 WriteBuffer::Borrowed(&next_task_id.to_le_bytes()),
618 )
619 .with_context(|| anyhow!("Unable to write next free task id"))?;
620 }
621 {
622 let _span = tracing::trace_span!("update session id", session_id = ?session_id).entered();
623 batch
624 .put(
625 KeySpace::Infra,
626 WriteBuffer::Borrowed(IntKey::new(META_KEY_SESSION_ID).as_ref()),
627 WriteBuffer::Borrowed(&session_id.to_le_bytes()),
628 )
629 .with_context(|| anyhow!("Unable to write next session id"))?;
630 }
631 {
632 let _span =
633 tracing::trace_span!("update operations", operations = operations.len()).entered();
634 let operations = pot_serialize_small_vec(&operations)
635 .with_context(|| anyhow!("Unable to serialize operations"))?;
636 batch
637 .put(
638 KeySpace::Infra,
639 WriteBuffer::Borrowed(IntKey::new(META_KEY_OPERATIONS).as_ref()),
640 WriteBuffer::SmallVec(operations),
641 )
642 .with_context(|| anyhow!("Unable to write operations"))?;
643 }
644 batch.flush(KeySpace::Infra)?;
645 Ok(())
646}
647
648fn serialize_task_type(
649 task_type: &Arc<CachedTaskType>,
650 mut task_type_bytes: &mut Vec<u8>,
651 task_id: u32,
652) -> Result<()> {
653 task_type_bytes.clear();
654 POT_CONFIG
655 .serialize_into(&**task_type, &mut task_type_bytes)
656 .with_context(|| anyhow!("Unable to serialize task {task_id} cache key {task_type:?}"))?;
657 #[cfg(feature = "verify_serialization")]
658 {
659 let deserialize: Result<CachedTaskType, _> = serde_path_to_error::deserialize(
660 &mut pot_de_symbol_list().deserializer_for_slice(&*task_type_bytes)?,
661 );
662 if let Err(err) = deserialize {
663 println!("Task type would not be deserializable {task_id}: {err:?}\n{task_type:#?}");
664 panic!("Task type would not be deserializable {task_id}: {err:?}");
665 }
666 }
667 Ok(())
668}
669
670type SerializedTasks = Vec<
671 Vec<(
672 TaskId,
673 Option<WriteBuffer<'static>>,
674 Option<WriteBuffer<'static>>,
675 )>,
676>;
677
678fn process_task_data<'a, B: ConcurrentWriteBatch<'a> + Send + Sync, I>(
679 tasks: Vec<I>,
680 batch: Option<&B>,
681) -> Result<SerializedTasks>
682where
683 I: Iterator<
684 Item = (
685 TaskId,
686 Option<SmallVec<[u8; 16]>>,
687 Option<SmallVec<[u8; 16]>>,
688 ),
689 > + Send
690 + Sync,
691{
692 parallel::map_collect_owned::<_, _, Result<Vec<_>>>(tasks, |tasks| {
693 let mut result = Vec::new();
694 for (task_id, meta, data) in tasks {
695 if let Some(batch) = batch {
696 let key = IntKey::new(*task_id);
697 let key = key.as_ref();
698 if let Some(meta) = meta {
699 batch.put(
700 KeySpace::TaskMeta,
701 WriteBuffer::Borrowed(key),
702 WriteBuffer::SmallVec(meta),
703 )?;
704 }
705 if let Some(data) = data {
706 batch.put(
707 KeySpace::TaskData,
708 WriteBuffer::Borrowed(key),
709 WriteBuffer::SmallVec(data),
710 )?;
711 }
712 } else {
713 result.push((
715 task_id,
716 meta.map(WriteBuffer::SmallVec),
717 data.map(WriteBuffer::SmallVec),
718 ));
719 }
720 }
721
722 Ok(result)
723 })
724}
725
726fn serialize(task: TaskId, data: &Vec<CachedDataItem>) -> Result<SmallVec<[u8; 16]>> {
727 Ok(match pot_serialize_small_vec(data) {
728 #[cfg(not(feature = "verify_serialization"))]
729 Ok(value) => value,
730 _ => {
731 let mut error = Ok(());
732 let mut data = data.clone();
733 data.retain(|item| {
734 let mut buf = Vec::<u8>::new();
735 let mut symbol_map = pot_ser_symbol_map();
736 let mut serializer = symbol_map.serializer_for(&mut buf).unwrap();
737 if let Err(err) = serde_path_to_error::serialize(&item, &mut serializer) {
738 if item.is_optional() {
739 #[cfg(feature = "verify_serialization")]
740 println!("Skipping non-serializable optional item for {task}: {item:?}");
741 } else {
742 error = Err(err).context({
743 anyhow!("Unable to serialize data item for {task}: {item:?}")
744 });
745 }
746 false
747 } else {
748 #[cfg(feature = "verify_serialization")]
749 {
750 let deserialize: Result<CachedDataItem, _> =
751 serde_path_to_error::deserialize(
752 &mut pot_de_symbol_list().deserializer_for_slice(&buf).unwrap(),
753 );
754 if let Err(err) = deserialize {
755 println!(
756 "Data item would not be deserializable {task}: {err:?}\n{item:?}"
757 );
758 return false;
759 }
760 }
761 true
762 }
763 });
764 error?;
765
766 pot_serialize_small_vec(&data)
767 .with_context(|| anyhow!("Unable to serialize data items for {task}: {data:#?}"))?
768 }
769 })
770}
771
772fn deserialize_with_good_error<'de, T: Deserialize<'de>>(data: &'de [u8]) -> Result<T> {
773 match POT_CONFIG.deserialize(data) {
774 Ok(value) => Ok(value),
775 Err(error) => serde_path_to_error::deserialize::<'_, _, T>(
776 &mut pot_de_symbol_list().deserializer_for_slice(data)?,
777 )
778 .map_err(anyhow::Error::from)
779 .and(Err(error.into()))
780 .context("Deserialization failed"),
781 }
782}