1use std::{
2 borrow::Borrow,
3 env,
4 path::PathBuf,
5 sync::{Arc, LazyLock, Mutex, PoisonError, Weak},
6};
7
8use anyhow::{Context, Result};
9use turbo_bincode::{
10 TurboBincodeBuffer, turbo_bincode_decode, turbo_bincode_encode, turbo_bincode_encode_into,
11};
12use turbo_tasks::{
13 TaskId,
14 backend::CachedTaskType,
15 panic_hooks::{PanicHookGuard, register_panic_hook},
16 parallel,
17};
18
19use crate::{
20 GitVersionInfo,
21 backend::{AnyOperation, TaskDataCategory},
22 backing_storage::{BackingStorage, BackingStorageSealed},
23 data::CachedDataItem,
24 database::{
25 db_invalidation::{StartupCacheState, check_db_invalidation_and_cleanup, invalidate_db},
26 db_versioning::handle_db_versioning,
27 key_value_database::{KeySpace, KeyValueDatabase},
28 write_batch::{
29 BaseWriteBatch, ConcurrentWriteBatch, SerialWriteBatch, WriteBatch, WriteBatchRef,
30 WriteBuffer,
31 },
32 },
33 db_invalidation::invalidation_reasons,
34 utils::chunked_vec::ChunkedVec,
35};
36
37const META_KEY_OPERATIONS: u32 = 0;
38const META_KEY_NEXT_FREE_TASK_ID: u32 = 1;
39
40struct IntKey([u8; 4]);
41
42impl IntKey {
43 fn new(value: u32) -> Self {
44 Self(value.to_le_bytes())
45 }
46}
47
48impl AsRef<[u8]> for IntKey {
49 fn as_ref(&self) -> &[u8] {
50 &self.0
51 }
52}
53
54fn as_u32(bytes: impl Borrow<[u8]>) -> Result<u32> {
55 let n = u32::from_le_bytes(bytes.borrow().try_into()?);
56 Ok(n)
57}
58
59fn should_invalidate_on_panic() -> bool {
68 fn env_is_falsy(key: &str) -> bool {
69 env::var_os(key)
70 .is_none_or(|value| ["".as_ref(), "0".as_ref(), "false".as_ref()].contains(&&*value))
71 }
72 static SHOULD_INVALIDATE: LazyLock<bool> = LazyLock::new(|| {
73 env_is_falsy("TURBO_ENGINE_SKIP_INVALIDATE_ON_PANIC") && env_is_falsy("__NEXT_TEST_MODE")
74 });
75 *SHOULD_INVALIDATE
76}
77
78pub struct KeyValueDatabaseBackingStorageInner<T: KeyValueDatabase> {
79 database: T,
80 base_path: Option<PathBuf>,
83 invalidated: Mutex<bool>,
85 _panic_hook_guard: Option<PanicHookGuard>,
88}
89
90pub struct KeyValueDatabaseBackingStorage<T: KeyValueDatabase> {
91 inner: Arc<KeyValueDatabaseBackingStorageInner<T>>,
93}
94
95impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorage<T> {
99 pub(crate) fn new_in_memory(database: T) -> Self {
100 Self {
101 inner: Arc::new(KeyValueDatabaseBackingStorageInner {
102 database,
103 base_path: None,
104 invalidated: Mutex::new(false),
105 _panic_hook_guard: None,
106 }),
107 }
108 }
109
110 pub(crate) fn open_versioned_on_disk(
122 base_path: PathBuf,
123 version_info: &GitVersionInfo,
124 is_ci: bool,
125 database: impl FnOnce(PathBuf) -> Result<T>,
126 ) -> Result<(Self, StartupCacheState)>
127 where
128 T: Send + Sync + 'static,
129 {
130 let startup_cache_state = check_db_invalidation_and_cleanup(&base_path)
131 .context("Failed to check database invalidation and cleanup")?;
132 let versioned_path = handle_db_versioning(&base_path, version_info, is_ci)
133 .context("Failed to handle database versioning")?;
134 let database = (database)(versioned_path).context("Failed to open database")?;
135 let backing_storage = Self {
136 inner: Arc::new_cyclic(
137 move |weak_inner: &Weak<KeyValueDatabaseBackingStorageInner<T>>| {
138 let panic_hook_guard = if should_invalidate_on_panic() {
139 let weak_inner = weak_inner.clone();
140 Some(register_panic_hook(Box::new(move |_| {
141 let Some(inner) = weak_inner.upgrade() else {
142 return;
143 };
144 let _ = inner.invalidate(invalidation_reasons::PANIC);
149 })))
150 } else {
151 None
152 };
153 KeyValueDatabaseBackingStorageInner {
154 database,
155 base_path: Some(base_path),
156 invalidated: Mutex::new(false),
157 _panic_hook_guard: panic_hook_guard,
158 }
159 },
160 ),
161 };
162 Ok((backing_storage, startup_cache_state))
163 }
164}
165
166impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorageInner<T> {
167 fn with_tx<R>(
168 &self,
169 tx: Option<&T::ReadTransaction<'_>>,
170 f: impl FnOnce(&T::ReadTransaction<'_>) -> Result<R>,
171 ) -> Result<R> {
172 if let Some(tx) = tx {
173 f(tx)
174 } else {
175 let tx = self.database.begin_read_transaction()?;
176 let r = f(&tx)?;
177 drop(tx);
178 Ok(r)
179 }
180 }
181
182 fn invalidate(&self, reason_code: &str) -> Result<()> {
183 if let Some(base_path) = &self.base_path {
185 let mut invalidated_guard = self
188 .invalidated
189 .lock()
190 .unwrap_or_else(PoisonError::into_inner);
191 if *invalidated_guard {
192 return Ok(());
193 }
194 invalidate_db(base_path, reason_code)?;
198 self.database.prevent_writes();
199 *invalidated_guard = true;
201 }
202 Ok(())
203 }
204
205 fn get_infra_u32(&self, key: u32) -> Result<Option<u32>> {
207 let tx = self.database.begin_read_transaction()?;
208 self.database
209 .get(&tx, KeySpace::Infra, IntKey::new(key).as_ref())?
210 .map(as_u32)
211 .transpose()
212 }
213}
214
215impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
216 for KeyValueDatabaseBackingStorage<T>
217{
218 fn invalidate(&self, reason_code: &str) -> Result<()> {
219 self.inner.invalidate(reason_code)
220 }
221}
222
223impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
224 for KeyValueDatabaseBackingStorage<T>
225{
226 type ReadTransaction<'l> = T::ReadTransaction<'l>;
227
228 fn next_free_task_id(&self) -> Result<TaskId> {
229 Ok(self
230 .inner
231 .get_infra_u32(META_KEY_NEXT_FREE_TASK_ID)
232 .context("Unable to read next free task id from database")?
233 .map_or(Ok(TaskId::MIN), TaskId::try_from)?)
234 }
235
236 fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>> {
237 fn get(database: &impl KeyValueDatabase) -> Result<Vec<AnyOperation>> {
238 let tx = database.begin_read_transaction()?;
239 let Some(operations) = database.get(
240 &tx,
241 KeySpace::Infra,
242 IntKey::new(META_KEY_OPERATIONS).as_ref(),
243 )?
244 else {
245 return Ok(Vec::new());
246 };
247 let operations = turbo_bincode_decode(operations.borrow())?;
248 Ok(operations)
249 }
250 get(&self.inner.database).context("Unable to read uncompleted operations from database")
251 }
252
253 fn serialize(&self, task: TaskId, data: &Vec<CachedDataItem>) -> Result<TurboBincodeBuffer> {
254 encode_task_data(task, data)
255 }
256
257 fn save_snapshot<I>(
258 &self,
259 operations: Vec<Arc<AnyOperation>>,
260 task_cache_updates: Vec<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>,
261 snapshots: Vec<I>,
262 ) -> Result<()>
263 where
264 I: Iterator<
265 Item = (
266 TaskId,
267 Option<TurboBincodeBuffer>,
268 Option<TurboBincodeBuffer>,
269 ),
270 > + Send
271 + Sync,
272 {
273 let _span = tracing::info_span!("save snapshot", operations = operations.len()).entered();
274 let mut batch = self.inner.database.write_batch()?;
275
276 const INITIAL_ENCODE_BUFFER_CAPACITY: usize = 1024;
278
279 match &mut batch {
281 &mut WriteBatch::Concurrent(ref batch, _) => {
282 {
283 let _span = tracing::trace_span!("update task data").entered();
284 process_task_data(snapshots, Some(batch))?;
285 let span = tracing::trace_span!("flush task data").entered();
286 parallel::try_for_each(
287 &[KeySpace::TaskMeta, KeySpace::TaskData],
288 |&key_space| {
289 let _span = span.clone().entered();
290 unsafe { batch.flush(key_space) }
293 },
294 )?;
295 }
296
297 let mut next_task_id = get_next_free_task_id::<
298 T::SerialWriteBatch<'_>,
299 T::ConcurrentWriteBatch<'_>,
300 >(&mut WriteBatchRef::concurrent(batch))?;
301
302 {
303 let _span = tracing::trace_span!(
304 "update task cache",
305 items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
306 )
307 .entered();
308 let max_task_id = parallel::map_collect_owned::<_, _, Result<Vec<_>>>(
309 task_cache_updates,
310 |updates| {
311 let _span = _span.clone().entered();
312 let mut max_task_id = 0;
313
314 let mut task_type_bytes =
318 TurboBincodeBuffer::with_capacity(INITIAL_ENCODE_BUFFER_CAPACITY);
319 for (task_type, task_id) in updates {
320 task_type_bytes.clear();
321 encode_task_type(&task_type, &mut task_type_bytes, Some(task_id))?;
322 let task_id: u32 = *task_id;
323
324 batch
325 .put(
326 KeySpace::ForwardTaskCache,
327 WriteBuffer::Borrowed(&task_type_bytes),
328 WriteBuffer::Borrowed(&task_id.to_le_bytes()),
329 )
330 .with_context(|| {
331 format!(
332 "Unable to write task cache {task_type:?} => {task_id}"
333 )
334 })?;
335 batch
336 .put(
337 KeySpace::ReverseTaskCache,
338 WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()),
339 WriteBuffer::Borrowed(&task_type_bytes),
340 )
341 .with_context(|| {
342 format!(
343 "Unable to write task cache {task_id} => {task_type:?}"
344 )
345 })?;
346 max_task_id = max_task_id.max(task_id);
347 }
348
349 Ok(max_task_id)
350 },
351 )?
352 .into_iter()
353 .max()
354 .unwrap_or(0);
355 next_task_id = next_task_id.max(max_task_id + 1);
356 }
357
358 save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
359 &mut WriteBatchRef::concurrent(batch),
360 next_task_id,
361 operations,
362 )?;
363 }
364 WriteBatch::Serial(batch) => {
365 {
366 let _span = tracing::trace_span!("update tasks").entered();
367 let task_items =
368 process_task_data(snapshots, None::<&T::ConcurrentWriteBatch<'_>>)?;
369 for (task_id, meta, data) in task_items.into_iter().flatten() {
370 let key = IntKey::new(*task_id);
371 let key = key.as_ref();
372 if let Some(meta) = meta {
373 batch
374 .put(KeySpace::TaskMeta, WriteBuffer::Borrowed(key), meta)
375 .with_context(|| {
376 format!("Unable to write meta items for {task_id}")
377 })?;
378 }
379 if let Some(data) = data {
380 batch
381 .put(KeySpace::TaskData, WriteBuffer::Borrowed(key), data)
382 .with_context(|| {
383 format!("Unable to write data items for {task_id}")
384 })?;
385 }
386 }
387 batch.flush(KeySpace::TaskMeta)?;
388 batch.flush(KeySpace::TaskData)?;
389 }
390
391 let mut next_task_id = get_next_free_task_id::<
392 T::SerialWriteBatch<'_>,
393 T::ConcurrentWriteBatch<'_>,
394 >(&mut WriteBatchRef::serial(batch))?;
395
396 {
397 let _span = tracing::trace_span!(
398 "update task cache",
399 items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
400 )
401 .entered();
402 let mut task_type_bytes =
406 TurboBincodeBuffer::with_capacity(INITIAL_ENCODE_BUFFER_CAPACITY);
407 for (task_type, task_id) in task_cache_updates.into_iter().flatten() {
408 encode_task_type(&task_type, &mut task_type_bytes, Some(task_id))?;
409 let task_id = *task_id;
410
411 batch
412 .put(
413 KeySpace::ForwardTaskCache,
414 WriteBuffer::Borrowed(&task_type_bytes),
415 WriteBuffer::Borrowed(&task_id.to_le_bytes()),
416 )
417 .with_context(|| {
418 format!("Unable to write task cache {task_type:?} => {task_id}")
419 })?;
420 batch
421 .put(
422 KeySpace::ReverseTaskCache,
423 WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()),
424 WriteBuffer::Borrowed(&task_type_bytes),
425 )
426 .with_context(|| {
427 format!("Unable to write task cache {task_id} => {task_type:?}")
428 })?;
429 next_task_id = next_task_id.max(task_id + 1);
430 }
431 }
432
433 save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
434 &mut WriteBatchRef::serial(batch),
435 next_task_id,
436 operations,
437 )?;
438 }
439 }
440
441 {
442 let _span = tracing::trace_span!("commit").entered();
443 batch.commit().context("Unable to commit operations")?;
444 }
445 Ok(())
446 }
447
448 fn start_read_transaction(&self) -> Option<Self::ReadTransaction<'_>> {
449 self.inner.database.begin_read_transaction().ok()
450 }
451
452 unsafe fn forward_lookup_task_cache(
453 &self,
454 tx: Option<&T::ReadTransaction<'_>>,
455 task_type: &CachedTaskType,
456 ) -> Result<Option<TaskId>> {
457 let inner = &*self.inner;
458 fn lookup<D: KeyValueDatabase>(
459 database: &D,
460 tx: &D::ReadTransaction<'_>,
461 task_type: &CachedTaskType,
462 ) -> Result<Option<TaskId>> {
463 let mut task_type_bytes = TurboBincodeBuffer::new();
464 encode_task_type(task_type, &mut task_type_bytes, None)?;
465 let Some(bytes) = database.get(tx, KeySpace::ForwardTaskCache, &task_type_bytes)?
466 else {
467 return Ok(None);
468 };
469 let bytes = bytes.borrow().try_into()?;
470 let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
471 Ok(Some(id))
472 }
473 if inner.database.is_empty() {
474 return Ok(None);
477 }
478 inner
479 .with_tx(tx, |tx| lookup(&self.inner.database, tx, task_type))
480 .with_context(|| format!("Looking up task id for {task_type:?} from database failed"))
481 }
482
483 unsafe fn reverse_lookup_task_cache(
484 &self,
485 tx: Option<&T::ReadTransaction<'_>>,
486 task_id: TaskId,
487 ) -> Result<Option<Arc<CachedTaskType>>> {
488 let inner = &*self.inner;
489 fn lookup<D: KeyValueDatabase>(
490 database: &D,
491 tx: &D::ReadTransaction<'_>,
492 task_id: TaskId,
493 ) -> Result<Option<Arc<CachedTaskType>>> {
494 let Some(bytes) = database.get(
495 tx,
496 KeySpace::ReverseTaskCache,
497 IntKey::new(*task_id).as_ref(),
498 )?
499 else {
500 return Ok(None);
501 };
502 Ok(Some(turbo_bincode_decode(bytes.borrow())?))
503 }
504 inner
505 .with_tx(tx, |tx| lookup(&inner.database, tx, task_id))
506 .with_context(|| format!("Looking up task type for {task_id} from database failed"))
507 }
508
509 unsafe fn lookup_data(
510 &self,
511 tx: Option<&T::ReadTransaction<'_>>,
512 task_id: TaskId,
513 category: TaskDataCategory,
514 ) -> Result<Vec<CachedDataItem>> {
515 let inner = &*self.inner;
516 fn lookup<D: KeyValueDatabase>(
517 database: &D,
518 tx: &D::ReadTransaction<'_>,
519 task_id: TaskId,
520 category: TaskDataCategory,
521 ) -> Result<Vec<CachedDataItem>> {
522 let Some(bytes) = database.get(
523 tx,
524 category_to_key_space(category),
525 IntKey::new(*task_id).as_ref(),
526 )?
527 else {
528 return Ok(Vec::new());
529 };
530 let result: Vec<CachedDataItem> = turbo_bincode_decode(bytes.borrow())?;
531 Ok(result)
532 }
533 inner
534 .with_tx(tx, |tx| lookup(&inner.database, tx, task_id, category))
535 .with_context(|| format!("Looking up data for {task_id} from database failed"))
536 }
537
538 unsafe fn batch_lookup_data(
539 &self,
540 tx: Option<&Self::ReadTransaction<'_>>,
541 task_ids: &[TaskId],
542 category: TaskDataCategory,
543 ) -> Result<Vec<Vec<CachedDataItem>>> {
544 let inner = &*self.inner;
545 fn lookup<D: KeyValueDatabase>(
546 database: &D,
547 tx: &D::ReadTransaction<'_>,
548 task_ids: &[TaskId],
549 category: TaskDataCategory,
550 ) -> Result<Vec<Vec<CachedDataItem>>> {
551 let int_keys: Vec<_> = task_ids.iter().map(|&id| IntKey::new(*id)).collect();
552 let keys = int_keys.iter().map(|k| k.as_ref()).collect::<Vec<_>>();
553 let bytes = database.batch_get(
554 tx,
555 match category {
556 TaskDataCategory::Meta => KeySpace::TaskMeta,
557 TaskDataCategory::Data => KeySpace::TaskData,
558 TaskDataCategory::All => unreachable!(),
559 },
560 &keys,
561 )?;
562 bytes
563 .into_iter()
564 .map(|opt_bytes| {
565 if let Some(bytes) = opt_bytes {
566 let result: Vec<CachedDataItem> = turbo_bincode_decode(bytes.borrow())?;
567 Ok(result)
568 } else {
569 Ok(Vec::new())
570 }
571 })
572 .collect::<Result<Vec<_>>>()
573 }
574 inner
575 .with_tx(tx, |tx| lookup(&inner.database, tx, task_ids, category))
576 .with_context(|| {
577 format!(
578 "Looking up data for {} tasks from database failed",
579 task_ids.len()
580 )
581 })
582 }
583
584 fn shutdown(&self) -> Result<()> {
585 self.inner.database.shutdown()
586 }
587}
588
589fn get_next_free_task_id<'a, S, C>(
590 batch: &mut WriteBatchRef<'_, 'a, S, C>,
591) -> Result<u32, anyhow::Error>
592where
593 S: SerialWriteBatch<'a>,
594 C: ConcurrentWriteBatch<'a>,
595{
596 Ok(
597 match batch.get(
598 KeySpace::Infra,
599 IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref(),
600 )? {
601 Some(bytes) => u32::from_le_bytes(Borrow::<[u8]>::borrow(&bytes).try_into()?),
602 None => 1,
603 },
604 )
605}
606
607fn save_infra<'a, S, C>(
608 batch: &mut WriteBatchRef<'_, 'a, S, C>,
609 next_task_id: u32,
610 operations: Vec<Arc<AnyOperation>>,
611) -> Result<(), anyhow::Error>
612where
613 S: SerialWriteBatch<'a>,
614 C: ConcurrentWriteBatch<'a>,
615{
616 {
617 batch
618 .put(
619 KeySpace::Infra,
620 WriteBuffer::Borrowed(IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref()),
621 WriteBuffer::Borrowed(&next_task_id.to_le_bytes()),
622 )
623 .context("Unable to write next free task id")?;
624 }
625 {
626 let _span =
627 tracing::trace_span!("update operations", operations = operations.len()).entered();
628 let operations =
629 turbo_bincode_encode(&operations).context("Unable to serialize operations")?;
630 batch
631 .put(
632 KeySpace::Infra,
633 WriteBuffer::Borrowed(IntKey::new(META_KEY_OPERATIONS).as_ref()),
634 WriteBuffer::SmallVec(operations),
635 )
636 .context("Unable to write operations")?;
637 }
638 batch.flush(KeySpace::Infra)?;
639 Ok(())
640}
641
642fn encode_task_type(
643 task_type: &CachedTaskType,
644 buffer: &mut TurboBincodeBuffer,
645 task_id: Option<TaskId>,
646) -> Result<()> {
647 fn encode_once_into(
648 task_type: &CachedTaskType,
649 buffer: &mut TurboBincodeBuffer,
650 task_id: Option<TaskId>,
651 ) -> Result<()> {
652 turbo_bincode_encode_into(task_type, buffer).with_context(|| {
653 if let Some(task_id) = task_id {
654 format!("Unable to serialize task {task_id} cache key {task_type:?}")
655 } else {
656 format!("Unable to serialize task cache key {task_type:?}")
657 }
658 })
659 }
660
661 debug_assert!(buffer.is_empty());
662 encode_once_into(task_type, buffer, task_id)?;
663
664 if cfg!(feature = "verify_serialization") {
665 macro_rules! println_and_panic {
666 ($($tt:tt)*) => {
667 println!($($tt)*);
668 panic!($($tt)*);
669 };
670 }
671 let deserialize: Result<CachedTaskType, _> = turbo_bincode_decode(buffer);
672 match deserialize {
673 Err(err) => {
674 println_and_panic!("Task type would not be deserializable:\n{err:?}");
675 }
676 Ok(task_type2) => {
677 if &task_type2 != task_type {
678 println_and_panic!(
679 "Task type would not round-trip {task_id:?}:\noriginal: \
680 {task_type:#?}\nround-tripped: {task_type2:#?}"
681 );
682 }
683 let mut buffer2 = TurboBincodeBuffer::new();
684 match encode_once_into(&task_type2, &mut buffer2, task_id) {
685 Err(err) => {
686 println_and_panic!(
687 "Task type would not be serializable the second time:\n{err:?}"
688 );
689 }
690 Ok(()) => {
691 if buffer2 != *buffer {
692 println_and_panic!(
693 "Task type would not serialize to the same bytes the second time \
694 {task_id:?}:\noriginal: {:x?}\nsecond: {:x?}\n{task_type2:#?}",
695 buffer,
696 buffer2
697 );
698 }
699 }
700 }
701 }
702 }
703 }
704
705 Ok(())
706}
707
708type SerializedTasks = Vec<
709 Vec<(
710 TaskId,
711 Option<WriteBuffer<'static>>,
712 Option<WriteBuffer<'static>>,
713 )>,
714>;
715
716fn process_task_data<'a, B: ConcurrentWriteBatch<'a> + Send + Sync, I>(
717 tasks: Vec<I>,
718 batch: Option<&B>,
719) -> Result<SerializedTasks>
720where
721 I: Iterator<
722 Item = (
723 TaskId,
724 Option<TurboBincodeBuffer>,
725 Option<TurboBincodeBuffer>,
726 ),
727 > + Send
728 + Sync,
729{
730 parallel::map_collect_owned::<_, _, Result<Vec<_>>>(tasks, |tasks| {
731 let mut result = Vec::new();
732 for (task_id, meta, data) in tasks {
733 if let Some(batch) = batch {
734 let key = IntKey::new(*task_id);
735 let key = key.as_ref();
736 if let Some(meta) = meta {
737 batch.put(
738 KeySpace::TaskMeta,
739 WriteBuffer::Borrowed(key),
740 WriteBuffer::SmallVec(meta),
741 )?;
742 }
743 if let Some(data) = data {
744 batch.put(
745 KeySpace::TaskData,
746 WriteBuffer::Borrowed(key),
747 WriteBuffer::SmallVec(data),
748 )?;
749 }
750 } else {
751 result.push((
753 task_id,
754 meta.map(WriteBuffer::SmallVec),
755 data.map(WriteBuffer::SmallVec),
756 ));
757 }
758 }
759
760 Ok(result)
761 })
762}
763
764fn encode_task_data(task: TaskId, data: &Vec<CachedDataItem>) -> Result<TurboBincodeBuffer> {
765 let orig_result = turbo_bincode_encode(data);
766 if !cfg!(feature = "verify_serialization")
767 && let Ok(value) = orig_result
768 {
769 return Ok(value);
770 }
771
772 let mut error = Ok(());
773 let mut filtered_data = data.clone();
774 filtered_data.retain(|item| match turbo_bincode_encode(&item) {
775 Ok(buf) => {
776 if cfg!(feature = "verify_serialization") {
777 let deserialized = turbo_bincode_decode::<CachedDataItem>(&buf);
778 if let Err(err) = deserialized {
779 println!("Data item would not be deserializable {task}: {err:?}\n{item:?}");
780 return false;
781 }
782 }
783 true
784 }
785 Err(err) => {
786 if item.is_optional() {
787 if cfg!(feature = "verify_serialization") {
788 println!(
789 "Skipping non-encodable optional item for {task}: {item:?} due to {err}"
790 );
791 }
792 } else {
793 error =
794 Err(err).context(format!("Unable to encode data item for {task}: {item:?}"));
795 }
796 false
797 }
798 });
799 error?;
800
801 (if filtered_data.len() == data.len() {
802 orig_result
803 } else {
804 turbo_bincode_encode(&filtered_data)
805 })
806 .with_context(|| format!("Unable to serialize data items for {task}: {filtered_data:#?}"))
807}
808
809fn category_to_key_space(category: TaskDataCategory) -> KeySpace {
810 match category {
811 TaskDataCategory::Meta => KeySpace::TaskMeta,
812 TaskDataCategory::Data => KeySpace::TaskData,
813 TaskDataCategory::All => unreachable!(),
814 }
815}