1use std::{
2 borrow::Borrow,
3 env,
4 path::PathBuf,
5 sync::{Arc, LazyLock, Mutex, PoisonError, Weak},
6};
7
8use anyhow::{Context, Result, anyhow};
9use serde::{Deserialize, Serialize};
10use smallvec::SmallVec;
11use turbo_tasks::{
12 SessionId, TaskId,
13 backend::CachedTaskType,
14 panic_hooks::{PanicHookGuard, register_panic_hook},
15 parallel,
16};
17
18use crate::{
19 GitVersionInfo,
20 backend::{AnyOperation, TaskDataCategory},
21 backing_storage::{BackingStorage, BackingStorageSealed},
22 data::CachedDataItem,
23 database::{
24 db_invalidation::{StartupCacheState, check_db_invalidation_and_cleanup, invalidate_db},
25 db_versioning::handle_db_versioning,
26 key_value_database::{KeySpace, KeyValueDatabase},
27 write_batch::{
28 BaseWriteBatch, ConcurrentWriteBatch, SerialWriteBatch, WriteBatch, WriteBatchRef,
29 WriteBuffer,
30 },
31 },
32 db_invalidation::invalidation_reasons,
33 utils::chunked_vec::ChunkedVec,
34};
35
36const POT_CONFIG: pot::Config = pot::Config::new().compatibility(pot::Compatibility::V4);
37
38fn pot_serialize_small_vec<T: Serialize>(value: &T) -> pot::Result<SmallVec<[u8; 16]>> {
39 struct SmallVecWrite<'l>(&'l mut SmallVec<[u8; 16]>);
40 impl std::io::Write for SmallVecWrite<'_> {
41 #[inline]
42 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
43 self.0.extend_from_slice(buf);
44 Ok(buf.len())
45 }
46
47 #[inline]
48 fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
49 self.0.extend_from_slice(buf);
50 Ok(())
51 }
52
53 #[inline]
54 fn flush(&mut self) -> std::io::Result<()> {
55 Ok(())
56 }
57 }
58
59 let mut output = SmallVec::new();
60 POT_CONFIG.serialize_into(value, SmallVecWrite(&mut output))?;
61 Ok(output)
62}
63
64fn pot_ser_symbol_map() -> pot::ser::SymbolMap {
65 pot::ser::SymbolMap::new().with_compatibility(pot::Compatibility::V4)
66}
67
68fn pot_de_symbol_list<'l>() -> pot::de::SymbolList<'l> {
69 pot::de::SymbolList::new()
70}
71
72const META_KEY_OPERATIONS: u32 = 0;
73const META_KEY_NEXT_FREE_TASK_ID: u32 = 1;
74const META_KEY_SESSION_ID: u32 = 2;
75
76struct IntKey([u8; 4]);
77
78impl IntKey {
79 fn new(value: u32) -> Self {
80 Self(value.to_le_bytes())
81 }
82}
83
84impl AsRef<[u8]> for IntKey {
85 fn as_ref(&self) -> &[u8] {
86 &self.0
87 }
88}
89
90fn as_u32(bytes: impl Borrow<[u8]>) -> Result<u32> {
91 let n = u32::from_le_bytes(bytes.borrow().try_into()?);
92 Ok(n)
93}
94
95fn should_invalidate_on_panic() -> bool {
104 fn env_is_falsy(key: &str) -> bool {
105 env::var_os(key)
106 .is_none_or(|value| ["".as_ref(), "0".as_ref(), "false".as_ref()].contains(&&*value))
107 }
108 static SHOULD_INVALIDATE: LazyLock<bool> = LazyLock::new(|| {
109 env_is_falsy("TURBO_ENGINE_SKIP_INVALIDATE_ON_PANIC") && env_is_falsy("__NEXT_TEST_MODE")
110 });
111 *SHOULD_INVALIDATE
112}
113
114pub struct KeyValueDatabaseBackingStorageInner<T: KeyValueDatabase> {
115 database: T,
116 base_path: Option<PathBuf>,
119 invalidated: Mutex<bool>,
121 _panic_hook_guard: Option<PanicHookGuard>,
124}
125
126pub struct KeyValueDatabaseBackingStorage<T: KeyValueDatabase> {
127 inner: Arc<KeyValueDatabaseBackingStorageInner<T>>,
129}
130
131impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorage<T> {
135 pub(crate) fn new_in_memory(database: T) -> Self {
136 Self {
137 inner: Arc::new(KeyValueDatabaseBackingStorageInner {
138 database,
139 base_path: None,
140 invalidated: Mutex::new(false),
141 _panic_hook_guard: None,
142 }),
143 }
144 }
145
146 pub(crate) fn open_versioned_on_disk(
158 base_path: PathBuf,
159 version_info: &GitVersionInfo,
160 is_ci: bool,
161 database: impl FnOnce(PathBuf) -> Result<T>,
162 ) -> Result<(Self, StartupCacheState)>
163 where
164 T: Send + Sync + 'static,
165 {
166 let startup_cache_state = check_db_invalidation_and_cleanup(&base_path)
167 .context("Failed to check database invalidation and cleanup")?;
168 let versioned_path = handle_db_versioning(&base_path, version_info, is_ci)
169 .context("Failed to handle database versioning")?;
170 let database = (database)(versioned_path).context("Failed to open database")?;
171 let backing_storage = Self {
172 inner: Arc::new_cyclic(
173 move |weak_inner: &Weak<KeyValueDatabaseBackingStorageInner<T>>| {
174 let panic_hook_guard = if should_invalidate_on_panic() {
175 let weak_inner = weak_inner.clone();
176 Some(register_panic_hook(Box::new(move |_| {
177 let Some(inner) = weak_inner.upgrade() else {
178 return;
179 };
180 let _ = inner.invalidate(invalidation_reasons::PANIC);
185 })))
186 } else {
187 None
188 };
189 KeyValueDatabaseBackingStorageInner {
190 database,
191 base_path: Some(base_path),
192 invalidated: Mutex::new(false),
193 _panic_hook_guard: panic_hook_guard,
194 }
195 },
196 ),
197 };
198 Ok((backing_storage, startup_cache_state))
199 }
200}
201
202impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorageInner<T> {
203 fn with_tx<R>(
204 &self,
205 tx: Option<&T::ReadTransaction<'_>>,
206 f: impl FnOnce(&T::ReadTransaction<'_>) -> Result<R>,
207 ) -> Result<R> {
208 if let Some(tx) = tx {
209 f(tx)
210 } else {
211 let tx = self.database.begin_read_transaction()?;
212 let r = f(&tx)?;
213 drop(tx);
214 Ok(r)
215 }
216 }
217
218 fn invalidate(&self, reason_code: &str) -> Result<()> {
219 if let Some(base_path) = &self.base_path {
221 let mut invalidated_guard = self
224 .invalidated
225 .lock()
226 .unwrap_or_else(PoisonError::into_inner);
227 if *invalidated_guard {
228 return Ok(());
229 }
230 invalidate_db(base_path, reason_code)?;
234 self.database.prevent_writes();
235 *invalidated_guard = true;
237 }
238 Ok(())
239 }
240
241 fn get_infra_u32(&self, key: u32) -> Result<Option<u32>> {
243 let tx = self.database.begin_read_transaction()?;
244 self.database
245 .get(&tx, KeySpace::Infra, IntKey::new(key).as_ref())?
246 .map(as_u32)
247 .transpose()
248 }
249}
250
251impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
252 for KeyValueDatabaseBackingStorage<T>
253{
254 fn invalidate(&self, reason_code: &str) -> Result<()> {
255 self.inner.invalidate(reason_code)
256 }
257}
258
259impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
260 for KeyValueDatabaseBackingStorage<T>
261{
262 type ReadTransaction<'l> = T::ReadTransaction<'l>;
263
264 fn next_free_task_id(&self) -> Result<TaskId> {
265 Ok(self
266 .inner
267 .get_infra_u32(META_KEY_NEXT_FREE_TASK_ID)
268 .context("Unable to read next free task id from database")?
269 .map_or(Ok(TaskId::MIN), TaskId::try_from)?)
270 }
271
272 fn next_session_id(&self) -> Result<SessionId> {
273 Ok(SessionId::try_from(
274 self.inner
275 .get_infra_u32(META_KEY_SESSION_ID)
276 .context("Unable to read session id from database")?
277 .unwrap_or(0)
278 + 1,
279 )?)
280 }
281
282 fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>> {
283 fn get(database: &impl KeyValueDatabase) -> Result<Vec<AnyOperation>> {
284 let tx = database.begin_read_transaction()?;
285 let Some(operations) = database.get(
286 &tx,
287 KeySpace::Infra,
288 IntKey::new(META_KEY_OPERATIONS).as_ref(),
289 )?
290 else {
291 return Ok(Vec::new());
292 };
293 let operations = deserialize_with_good_error(operations.borrow())?;
294 Ok(operations)
295 }
296 get(&self.inner.database).context("Unable to read uncompleted operations from database")
297 }
298
299 fn serialize(&self, task: TaskId, data: &Vec<CachedDataItem>) -> Result<SmallVec<[u8; 16]>> {
300 serialize(task, data)
301 }
302
303 fn save_snapshot<I>(
304 &self,
305 session_id: SessionId,
306 operations: Vec<Arc<AnyOperation>>,
307 task_cache_updates: Vec<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>,
308 snapshots: Vec<I>,
309 ) -> Result<()>
310 where
311 I: Iterator<
312 Item = (
313 TaskId,
314 Option<SmallVec<[u8; 16]>>,
315 Option<SmallVec<[u8; 16]>>,
316 ),
317 > + Send
318 + Sync,
319 {
320 let _span = tracing::trace_span!("save snapshot", session_id = ?session_id, operations = operations.len());
321 let mut batch = self.inner.database.write_batch()?;
322
323 match &mut batch {
325 &mut WriteBatch::Concurrent(ref batch, _) => {
326 {
327 let _span = tracing::trace_span!("update task data").entered();
328 process_task_data(snapshots, Some(batch))?;
329 let span = tracing::trace_span!("flush task data").entered();
330 parallel::try_for_each(
331 &[KeySpace::TaskMeta, KeySpace::TaskData],
332 |&key_space| {
333 let _span = span.clone().entered();
334 unsafe { batch.flush(key_space) }
337 },
338 )?;
339 }
340
341 let mut next_task_id = get_next_free_task_id::<
342 T::SerialWriteBatch<'_>,
343 T::ConcurrentWriteBatch<'_>,
344 >(&mut WriteBatchRef::concurrent(batch))?;
345
346 {
347 let _span = tracing::trace_span!(
348 "update task cache",
349 items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
350 )
351 .entered();
352 let result = parallel::map_collect_owned::<_, _, Result<Vec<_>>>(
353 task_cache_updates,
354 |updates| {
355 let _span = _span.clone().entered();
356 let mut max_task_id = 0;
357
358 let mut task_type_bytes = Vec::new();
359 for (task_type, task_id) in updates {
360 serialize_task_type(
361 &task_type,
362 &mut task_type_bytes,
363 Some(task_id),
364 )?;
365 let task_id: u32 = *task_id;
366
367 batch
368 .put(
369 KeySpace::ForwardTaskCache,
370 WriteBuffer::Borrowed(&task_type_bytes),
371 WriteBuffer::Borrowed(&task_id.to_le_bytes()),
372 )
373 .with_context(|| {
374 anyhow!(
375 "Unable to write task cache {task_type:?} => {task_id}"
376 )
377 })?;
378 batch
379 .put(
380 KeySpace::ReverseTaskCache,
381 WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()),
382 WriteBuffer::Borrowed(&task_type_bytes),
383 )
384 .with_context(|| {
385 anyhow!(
386 "Unable to write task cache {task_id} => {task_type:?}"
387 )
388 })?;
389 max_task_id = max_task_id.max(task_id + 1);
390 }
391
392 Ok(max_task_id)
393 },
394 )?
395 .into_iter()
396 .max()
397 .unwrap_or(0);
398 next_task_id = next_task_id.max(result);
399 }
400
401 save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
402 &mut WriteBatchRef::concurrent(batch),
403 next_task_id,
404 session_id,
405 operations,
406 )?;
407 }
408 WriteBatch::Serial(batch) => {
409 {
410 let _span = tracing::trace_span!("update tasks").entered();
411 let task_items =
412 process_task_data(snapshots, None::<&T::ConcurrentWriteBatch<'_>>)?;
413 for (task_id, meta, data) in task_items.into_iter().flatten() {
414 let key = IntKey::new(*task_id);
415 let key = key.as_ref();
416 if let Some(meta) = meta {
417 batch
418 .put(KeySpace::TaskMeta, WriteBuffer::Borrowed(key), meta)
419 .with_context(|| {
420 anyhow!("Unable to write meta items for {task_id}")
421 })?;
422 }
423 if let Some(data) = data {
424 batch
425 .put(KeySpace::TaskData, WriteBuffer::Borrowed(key), data)
426 .with_context(|| {
427 anyhow!("Unable to write data items for {task_id}")
428 })?;
429 }
430 }
431 batch.flush(KeySpace::TaskMeta)?;
432 batch.flush(KeySpace::TaskData)?;
433 }
434
435 let mut next_task_id = get_next_free_task_id::<
436 T::SerialWriteBatch<'_>,
437 T::ConcurrentWriteBatch<'_>,
438 >(&mut WriteBatchRef::serial(batch))?;
439
440 {
441 let _span = tracing::trace_span!(
442 "update task cache",
443 items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
444 )
445 .entered();
446 let mut task_type_bytes = Vec::new();
447 for (task_type, task_id) in task_cache_updates.into_iter().flatten() {
448 serialize_task_type(&task_type, &mut task_type_bytes, Some(task_id))?;
449 let task_id = *task_id;
450
451 batch
452 .put(
453 KeySpace::ForwardTaskCache,
454 WriteBuffer::Borrowed(&task_type_bytes),
455 WriteBuffer::Borrowed(&task_id.to_le_bytes()),
456 )
457 .with_context(|| {
458 anyhow!("Unable to write task cache {task_type:?} => {task_id}")
459 })?;
460 batch
461 .put(
462 KeySpace::ReverseTaskCache,
463 WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()),
464 WriteBuffer::Borrowed(&task_type_bytes),
465 )
466 .with_context(|| {
467 anyhow!("Unable to write task cache {task_id} => {task_type:?}")
468 })?;
469 next_task_id = next_task_id.max(task_id + 1);
470 }
471 }
472
473 save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
474 &mut WriteBatchRef::serial(batch),
475 next_task_id,
476 session_id,
477 operations,
478 )?;
479 }
480 }
481
482 {
483 let _span = tracing::trace_span!("commit").entered();
484 batch
485 .commit()
486 .with_context(|| anyhow!("Unable to commit operations"))?;
487 }
488 Ok(())
489 }
490
491 fn start_read_transaction(&self) -> Option<Self::ReadTransaction<'_>> {
492 self.inner.database.begin_read_transaction().ok()
493 }
494
495 unsafe fn forward_lookup_task_cache(
496 &self,
497 tx: Option<&T::ReadTransaction<'_>>,
498 task_type: &CachedTaskType,
499 ) -> Result<Option<TaskId>> {
500 let inner = &*self.inner;
501 fn lookup<D: KeyValueDatabase>(
502 database: &D,
503 tx: &D::ReadTransaction<'_>,
504 task_type: &CachedTaskType,
505 ) -> Result<Option<TaskId>> {
506 let mut task_type_bytes = Vec::new();
507 serialize_task_type(task_type, &mut task_type_bytes, None)?;
508 let Some(bytes) = database.get(tx, KeySpace::ForwardTaskCache, &task_type_bytes)?
509 else {
510 return Ok(None);
511 };
512 let bytes = bytes.borrow().try_into()?;
513 let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
514 Ok(Some(id))
515 }
516 if inner.database.is_empty() {
517 return Ok(None);
520 }
521 inner
522 .with_tx(tx, |tx| lookup(&self.inner.database, tx, task_type))
523 .with_context(|| format!("Looking up task id for {task_type:?} from database failed"))
524 }
525
526 unsafe fn reverse_lookup_task_cache(
527 &self,
528 tx: Option<&T::ReadTransaction<'_>>,
529 task_id: TaskId,
530 ) -> Result<Option<Arc<CachedTaskType>>> {
531 let inner = &*self.inner;
532 fn lookup<D: KeyValueDatabase>(
533 database: &D,
534 tx: &D::ReadTransaction<'_>,
535 task_id: TaskId,
536 ) -> Result<Option<Arc<CachedTaskType>>> {
537 let Some(bytes) = database.get(
538 tx,
539 KeySpace::ReverseTaskCache,
540 IntKey::new(*task_id).as_ref(),
541 )?
542 else {
543 return Ok(None);
544 };
545 Ok(Some(deserialize_with_good_error(bytes.borrow())?))
546 }
547 inner
548 .with_tx(tx, |tx| lookup(&inner.database, tx, task_id))
549 .with_context(|| format!("Looking up task type for {task_id} from database failed"))
550 }
551
552 unsafe fn lookup_data(
553 &self,
554 tx: Option<&T::ReadTransaction<'_>>,
555 task_id: TaskId,
556 category: TaskDataCategory,
557 ) -> Result<Vec<CachedDataItem>> {
558 let inner = &*self.inner;
559 fn lookup<D: KeyValueDatabase>(
560 database: &D,
561 tx: &D::ReadTransaction<'_>,
562 task_id: TaskId,
563 category: TaskDataCategory,
564 ) -> Result<Vec<CachedDataItem>> {
565 let Some(bytes) = database.get(
566 tx,
567 match category {
568 TaskDataCategory::Meta => KeySpace::TaskMeta,
569 TaskDataCategory::Data => KeySpace::TaskData,
570 TaskDataCategory::All => unreachable!(),
571 },
572 IntKey::new(*task_id).as_ref(),
573 )?
574 else {
575 return Ok(Vec::new());
576 };
577 let result: Vec<CachedDataItem> = deserialize_with_good_error(bytes.borrow())?;
578 Ok(result)
579 }
580 inner
581 .with_tx(tx, |tx| lookup(&inner.database, tx, task_id, category))
582 .with_context(|| format!("Looking up data for {task_id} from database failed"))
583 }
584
585 fn shutdown(&self) -> Result<()> {
586 self.inner.database.shutdown()
587 }
588}
589
590fn get_next_free_task_id<'a, S, C>(
591 batch: &mut WriteBatchRef<'_, 'a, S, C>,
592) -> Result<u32, anyhow::Error>
593where
594 S: SerialWriteBatch<'a>,
595 C: ConcurrentWriteBatch<'a>,
596{
597 Ok(
598 match batch.get(
599 KeySpace::Infra,
600 IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref(),
601 )? {
602 Some(bytes) => u32::from_le_bytes(Borrow::<[u8]>::borrow(&bytes).try_into()?),
603 None => 1,
604 },
605 )
606}
607
608fn save_infra<'a, S, C>(
609 batch: &mut WriteBatchRef<'_, 'a, S, C>,
610 next_task_id: u32,
611 session_id: SessionId,
612 operations: Vec<Arc<AnyOperation>>,
613) -> Result<(), anyhow::Error>
614where
615 S: SerialWriteBatch<'a>,
616 C: ConcurrentWriteBatch<'a>,
617{
618 {
619 batch
620 .put(
621 KeySpace::Infra,
622 WriteBuffer::Borrowed(IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref()),
623 WriteBuffer::Borrowed(&next_task_id.to_le_bytes()),
624 )
625 .with_context(|| anyhow!("Unable to write next free task id"))?;
626 }
627 {
628 let _span = tracing::trace_span!("update session id", session_id = ?session_id).entered();
629 batch
630 .put(
631 KeySpace::Infra,
632 WriteBuffer::Borrowed(IntKey::new(META_KEY_SESSION_ID).as_ref()),
633 WriteBuffer::Borrowed(&session_id.to_le_bytes()),
634 )
635 .with_context(|| anyhow!("Unable to write next session id"))?;
636 }
637 {
638 let _span =
639 tracing::trace_span!("update operations", operations = operations.len()).entered();
640 let operations = pot_serialize_small_vec(&operations)
641 .with_context(|| anyhow!("Unable to serialize operations"))?;
642 batch
643 .put(
644 KeySpace::Infra,
645 WriteBuffer::Borrowed(IntKey::new(META_KEY_OPERATIONS).as_ref()),
646 WriteBuffer::SmallVec(operations),
647 )
648 .with_context(|| anyhow!("Unable to write operations"))?;
649 }
650 batch.flush(KeySpace::Infra)?;
651 Ok(())
652}
653
654#[inline(never)]
662fn serialize_task_type(
663 task_type: &CachedTaskType,
664 mut task_type_bytes: &mut Vec<u8>,
665 task_id: Option<TaskId>,
666) -> Result<()> {
667 task_type_bytes.clear();
668 POT_CONFIG
669 .serialize_into(task_type, &mut task_type_bytes)
670 .with_context(|| {
671 if let Some(task_id) = task_id {
672 anyhow!("Unable to serialize task {task_id} cache key {task_type:?}")
673 } else {
674 anyhow!("Unable to serialize task cache key {task_type:?}")
675 }
676 })?;
677 #[cfg(feature = "verify_serialization")]
678 {
679 let deserialize: Result<CachedTaskType, _> = serde_path_to_error::deserialize(
680 &mut pot_de_symbol_list().deserializer_for_slice(&*task_type_bytes)?,
681 );
682 match deserialize {
683 Err(err) => {
684 println!(
685 "Task type would not be deserializable {task_id:?}: {err:?}\n{task_type:#?}"
686 );
687 panic!("Task type would not be deserializable {task_id:?}: {err:?}");
688 }
689 Ok(task_type2) => {
690 if &task_type2 != task_type {
691 println!(
692 "Task type would not round-trip {task_id:?}:\noriginal: \
693 {task_type:#?}\nround-tripped: {task_type2:#?}"
694 );
695 panic!(
696 "Task type would not round-trip {task_id:?}:\noriginal: \
697 {task_type:#?}\nround-tripped: {task_type2:#?}"
698 );
699 }
700 let mut bytes2 = Vec::new();
701 let result2 = POT_CONFIG.serialize_into(&task_type2, &mut bytes2);
702 match result2 {
703 Err(err) => {
704 println!(
705 "Task type would not be serializable the second time {task_id:?}: \
706 {err:?}\n{task_type2:#?}"
707 );
708 panic!(
709 "Task type would not be serializable the second time {task_id:?}: \
710 {err:?}\n{task_type2:#?}"
711 );
712 }
713 Ok(()) => {
714 if bytes2 != *task_type_bytes {
715 println!(
716 "Task type would not serialize to the same bytes the second time \
717 {task_id:?}:\noriginal: {:x?}\nsecond: {:x?}\n{task_type2:#?}",
718 task_type_bytes, bytes2
719 );
720 panic!(
721 "Task type would not serialize to the same bytes the second time \
722 {task_id:?}:\noriginal: {:x?}\nsecond: {:x?}\n{task_type2:#?}",
723 task_type_bytes, bytes2
724 );
725 }
726 }
727 }
728 }
729 }
730 }
731 Ok(())
732}
733
734type SerializedTasks = Vec<
735 Vec<(
736 TaskId,
737 Option<WriteBuffer<'static>>,
738 Option<WriteBuffer<'static>>,
739 )>,
740>;
741
742fn process_task_data<'a, B: ConcurrentWriteBatch<'a> + Send + Sync, I>(
743 tasks: Vec<I>,
744 batch: Option<&B>,
745) -> Result<SerializedTasks>
746where
747 I: Iterator<
748 Item = (
749 TaskId,
750 Option<SmallVec<[u8; 16]>>,
751 Option<SmallVec<[u8; 16]>>,
752 ),
753 > + Send
754 + Sync,
755{
756 parallel::map_collect_owned::<_, _, Result<Vec<_>>>(tasks, |tasks| {
757 let mut result = Vec::new();
758 for (task_id, meta, data) in tasks {
759 if let Some(batch) = batch {
760 let key = IntKey::new(*task_id);
761 let key = key.as_ref();
762 if let Some(meta) = meta {
763 batch.put(
764 KeySpace::TaskMeta,
765 WriteBuffer::Borrowed(key),
766 WriteBuffer::SmallVec(meta),
767 )?;
768 }
769 if let Some(data) = data {
770 batch.put(
771 KeySpace::TaskData,
772 WriteBuffer::Borrowed(key),
773 WriteBuffer::SmallVec(data),
774 )?;
775 }
776 } else {
777 result.push((
779 task_id,
780 meta.map(WriteBuffer::SmallVec),
781 data.map(WriteBuffer::SmallVec),
782 ));
783 }
784 }
785
786 Ok(result)
787 })
788}
789
790fn serialize(task: TaskId, data: &Vec<CachedDataItem>) -> Result<SmallVec<[u8; 16]>> {
791 Ok(match pot_serialize_small_vec(data) {
792 #[cfg(not(feature = "verify_serialization"))]
793 Ok(value) => value,
794 _ => {
795 let mut error = Ok(());
796 let mut data = data.clone();
797 data.retain(|item| {
798 let mut buf = Vec::<u8>::new();
799 let mut symbol_map = pot_ser_symbol_map();
800 let mut serializer = symbol_map.serializer_for(&mut buf).unwrap();
801 if let Err(err) = serde_path_to_error::serialize(&item, &mut serializer) {
802 if item.is_optional() {
803 #[cfg(feature = "verify_serialization")]
804 println!(
805 "Skipping non-serializable optional item for {task}: {item:?} due to \
806 {err}"
807 );
808 } else {
809 error = Err(err).context({
810 anyhow!("Unable to serialize data item for {task}: {item:?}")
811 });
812 }
813 false
814 } else {
815 #[cfg(feature = "verify_serialization")]
816 {
817 let deserialize: Result<CachedDataItem, _> =
818 serde_path_to_error::deserialize(
819 &mut pot_de_symbol_list().deserializer_for_slice(&buf).unwrap(),
820 );
821 if let Err(err) = deserialize {
822 println!(
823 "Data item would not be deserializable {task}: {err:?}\n{item:?}"
824 );
825 return false;
826 }
827 }
828 true
829 }
830 });
831 error?;
832
833 pot_serialize_small_vec(&data)
834 .with_context(|| anyhow!("Unable to serialize data items for {task}: {data:#?}"))?
835 }
836 })
837}
838
839fn deserialize_with_good_error<'de, T: Deserialize<'de>>(data: &'de [u8]) -> Result<T> {
840 match POT_CONFIG.deserialize(data) {
841 Ok(value) => Ok(value),
842 Err(error) => serde_path_to_error::deserialize::<'_, _, T>(
843 &mut pot_de_symbol_list().deserializer_for_slice(data)?,
844 )
845 .map_err(anyhow::Error::from)
846 .and(Err(error.into()))
847 .context("Deserialization failed"),
848 }
849}