1use std::{
2 borrow::Borrow,
3 env,
4 path::PathBuf,
5 sync::{Arc, LazyLock, Mutex, PoisonError, Weak},
6};
7
8use anyhow::{Context, Result};
9use turbo_bincode::{
10 TurboBincodeBuffer, new_turbo_bincode_decoder, turbo_bincode_decode, turbo_bincode_encode,
11 turbo_bincode_encode_into,
12};
13use turbo_tasks::{
14 TaskId,
15 backend::CachedTaskType,
16 panic_hooks::{PanicHookGuard, register_panic_hook},
17 parallel,
18};
19
20use crate::{
21 GitVersionInfo,
22 backend::{AnyOperation, SpecificTaskDataCategory, storage_schema::TaskStorage},
23 backing_storage::{BackingStorage, BackingStorageSealed},
24 database::{
25 db_invalidation::{StartupCacheState, check_db_invalidation_and_cleanup, invalidate_db},
26 db_versioning::handle_db_versioning,
27 key_value_database::{KeySpace, KeyValueDatabase},
28 write_batch::{
29 BaseWriteBatch, ConcurrentWriteBatch, SerialWriteBatch, WriteBatch, WriteBatchRef,
30 WriteBuffer,
31 },
32 },
33 db_invalidation::invalidation_reasons,
34 utils::chunked_vec::ChunkedVec,
35};
36
37const META_KEY_OPERATIONS: u32 = 0;
38const META_KEY_NEXT_FREE_TASK_ID: u32 = 1;
39
40struct IntKey([u8; 4]);
41
42impl IntKey {
43 fn new(value: u32) -> Self {
44 Self(value.to_le_bytes())
45 }
46}
47
48impl AsRef<[u8]> for IntKey {
49 fn as_ref(&self) -> &[u8] {
50 &self.0
51 }
52}
53
54fn as_u32(bytes: impl Borrow<[u8]>) -> Result<u32> {
55 let n = u32::from_le_bytes(bytes.borrow().try_into()?);
56 Ok(n)
57}
58
59fn should_invalidate_on_panic() -> bool {
68 fn env_is_falsy(key: &str) -> bool {
69 env::var_os(key)
70 .is_none_or(|value| ["".as_ref(), "0".as_ref(), "false".as_ref()].contains(&&*value))
71 }
72 static SHOULD_INVALIDATE: LazyLock<bool> = LazyLock::new(|| {
73 env_is_falsy("TURBO_ENGINE_SKIP_INVALIDATE_ON_PANIC") && env_is_falsy("__NEXT_TEST_MODE")
74 });
75 *SHOULD_INVALIDATE
76}
77
78pub struct KeyValueDatabaseBackingStorageInner<T: KeyValueDatabase> {
79 database: T,
80 base_path: Option<PathBuf>,
83 invalidated: Mutex<bool>,
85 _panic_hook_guard: Option<PanicHookGuard>,
88}
89
90pub struct KeyValueDatabaseBackingStorage<T: KeyValueDatabase> {
91 inner: Arc<KeyValueDatabaseBackingStorageInner<T>>,
93}
94
95impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorage<T> {
99 pub(crate) fn new_in_memory(database: T) -> Self {
100 Self {
101 inner: Arc::new(KeyValueDatabaseBackingStorageInner {
102 database,
103 base_path: None,
104 invalidated: Mutex::new(false),
105 _panic_hook_guard: None,
106 }),
107 }
108 }
109
110 pub(crate) fn open_versioned_on_disk(
122 base_path: PathBuf,
123 version_info: &GitVersionInfo,
124 is_ci: bool,
125 database: impl FnOnce(PathBuf) -> Result<T>,
126 ) -> Result<(Self, StartupCacheState)>
127 where
128 T: Send + Sync + 'static,
129 {
130 let startup_cache_state = check_db_invalidation_and_cleanup(&base_path)
131 .context("Failed to check database invalidation and cleanup")?;
132 let versioned_path = handle_db_versioning(&base_path, version_info, is_ci)
133 .context("Failed to handle database versioning")?;
134 let database = (database)(versioned_path).context("Failed to open database")?;
135 let backing_storage = Self {
136 inner: Arc::new_cyclic(
137 move |weak_inner: &Weak<KeyValueDatabaseBackingStorageInner<T>>| {
138 let panic_hook_guard = if should_invalidate_on_panic() {
139 let weak_inner = weak_inner.clone();
140 Some(register_panic_hook(Box::new(move |_| {
141 let Some(inner) = weak_inner.upgrade() else {
142 return;
143 };
144 let _ = inner.invalidate(invalidation_reasons::PANIC);
149 })))
150 } else {
151 None
152 };
153 KeyValueDatabaseBackingStorageInner {
154 database,
155 base_path: Some(base_path),
156 invalidated: Mutex::new(false),
157 _panic_hook_guard: panic_hook_guard,
158 }
159 },
160 ),
161 };
162 Ok((backing_storage, startup_cache_state))
163 }
164}
165
166impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorageInner<T> {
167 fn with_tx<R>(
168 &self,
169 tx: Option<&T::ReadTransaction<'_>>,
170 f: impl FnOnce(&T::ReadTransaction<'_>) -> Result<R>,
171 ) -> Result<R> {
172 if let Some(tx) = tx {
173 f(tx)
174 } else {
175 let tx = self.database.begin_read_transaction()?;
176 let r = f(&tx)?;
177 drop(tx);
178 Ok(r)
179 }
180 }
181
182 fn invalidate(&self, reason_code: &str) -> Result<()> {
183 if let Some(base_path) = &self.base_path {
185 let mut invalidated_guard = self
188 .invalidated
189 .lock()
190 .unwrap_or_else(PoisonError::into_inner);
191 if *invalidated_guard {
192 return Ok(());
193 }
194 invalidate_db(base_path, reason_code)?;
198 self.database.prevent_writes();
199 *invalidated_guard = true;
201 }
202 Ok(())
203 }
204
205 fn get_infra_u32(&self, key: u32) -> Result<Option<u32>> {
207 let tx = self.database.begin_read_transaction()?;
208 self.database
209 .get(&tx, KeySpace::Infra, IntKey::new(key).as_ref())?
210 .map(as_u32)
211 .transpose()
212 }
213}
214
215impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
216 for KeyValueDatabaseBackingStorage<T>
217{
218 fn invalidate(&self, reason_code: &str) -> Result<()> {
219 self.inner.invalidate(reason_code)
220 }
221}
222
223impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
224 for KeyValueDatabaseBackingStorage<T>
225{
226 type ReadTransaction<'l> = T::ReadTransaction<'l>;
227
228 fn next_free_task_id(&self) -> Result<TaskId> {
229 Ok(self
230 .inner
231 .get_infra_u32(META_KEY_NEXT_FREE_TASK_ID)
232 .context("Unable to read next free task id from database")?
233 .map_or(Ok(TaskId::MIN), TaskId::try_from)?)
234 }
235
236 fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>> {
237 fn get(database: &impl KeyValueDatabase) -> Result<Vec<AnyOperation>> {
238 let tx = database.begin_read_transaction()?;
239 let Some(operations) = database.get(
240 &tx,
241 KeySpace::Infra,
242 IntKey::new(META_KEY_OPERATIONS).as_ref(),
243 )?
244 else {
245 return Ok(Vec::new());
246 };
247 let operations = turbo_bincode_decode(operations.borrow())?;
248 Ok(operations)
249 }
250 get(&self.inner.database).context("Unable to read uncompleted operations from database")
251 }
252
253 fn save_snapshot<I>(
254 &self,
255 operations: Vec<Arc<AnyOperation>>,
256 task_cache_updates: Vec<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>,
257 snapshots: Vec<I>,
258 ) -> Result<()>
259 where
260 I: Iterator<
261 Item = (
262 TaskId,
263 Option<TurboBincodeBuffer>,
264 Option<TurboBincodeBuffer>,
265 ),
266 > + Send
267 + Sync,
268 {
269 let _span = tracing::info_span!("save snapshot", operations = operations.len()).entered();
270 let mut batch = self.inner.database.write_batch()?;
271
272 const INITIAL_ENCODE_BUFFER_CAPACITY: usize = 512;
276 #[cfg(feature = "print_cache_item_size")]
277 let all_stats: std::sync::Mutex<
278 std::collections::HashMap<&'static str, TaskTypeCacheStats>,
279 > = std::sync::Mutex::new(std::collections::HashMap::new());
280 match &mut batch {
282 &mut WriteBatch::Concurrent(ref batch, _) => {
283 {
284 let _span = tracing::trace_span!("update task data").entered();
285 process_task_data(snapshots, Some(batch))?;
286 let span = tracing::trace_span!("flush task data").entered();
287 parallel::try_for_each(
288 &[KeySpace::TaskMeta, KeySpace::TaskData],
289 |&key_space| {
290 let _span = span.clone().entered();
291 unsafe { batch.flush(key_space) }
294 },
295 )?;
296 }
297
298 let mut next_task_id = get_next_free_task_id::<
299 T::SerialWriteBatch<'_>,
300 T::ConcurrentWriteBatch<'_>,
301 >(&mut WriteBatchRef::concurrent(batch))?;
302
303 {
304 let _span = tracing::trace_span!(
305 "update task cache",
306 items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
307 )
308 .entered();
309 let max_task_id = parallel::map_collect_owned::<_, _, Result<Vec<_>>>(
310 task_cache_updates,
311 |updates| {
312 let _span = _span.clone().entered();
313 let mut max_task_id = 0;
314
315 let mut task_type_bytes =
319 TurboBincodeBuffer::with_capacity(INITIAL_ENCODE_BUFFER_CAPACITY);
320 for (task_type, task_id) in updates {
321 task_type_bytes.clear();
322 encode_task_type(&task_type, &mut task_type_bytes, Some(task_id))?;
323 let task_id: u32 = *task_id;
324
325 batch
326 .put(
327 KeySpace::TaskCache,
328 WriteBuffer::Borrowed(&task_type_bytes),
329 WriteBuffer::Borrowed(&task_id.to_le_bytes()),
330 )
331 .with_context(|| {
332 format!(
333 "Unable to write task cache {task_type:?} => {task_id}"
334 )
335 })?;
336 #[cfg(feature = "print_cache_item_size")]
337 all_stats
338 .lock()
339 .unwrap()
340 .entry(task_type.get_name())
341 .or_default()
342 .add(&task_type_bytes);
343 max_task_id = max_task_id.max(task_id);
344 }
345
346 Ok(max_task_id)
347 },
348 )?
349 .into_iter()
350 .max()
351 .unwrap_or(0);
352 next_task_id = next_task_id.max(max_task_id + 1);
353 }
354
355 save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
356 &mut WriteBatchRef::concurrent(batch),
357 next_task_id,
358 operations,
359 )?;
360 }
361 WriteBatch::Serial(batch) => {
362 {
363 let _span = tracing::trace_span!("update tasks").entered();
364 let task_items =
365 process_task_data(snapshots, None::<&T::ConcurrentWriteBatch<'_>>)?;
366 for (task_id, meta, data) in task_items.into_iter().flatten() {
367 let key = IntKey::new(*task_id);
368 let key = key.as_ref();
369 if let Some(meta) = meta {
370 batch
371 .put(KeySpace::TaskMeta, WriteBuffer::Borrowed(key), meta)
372 .with_context(|| {
373 format!("Unable to write meta items for {task_id}")
374 })?;
375 }
376 if let Some(data) = data {
377 batch
378 .put(KeySpace::TaskData, WriteBuffer::Borrowed(key), data)
379 .with_context(|| {
380 format!("Unable to write data items for {task_id}")
381 })?;
382 }
383 }
384 batch.flush(KeySpace::TaskMeta)?;
385 batch.flush(KeySpace::TaskData)?;
386 }
387
388 let mut next_task_id = get_next_free_task_id::<
389 T::SerialWriteBatch<'_>,
390 T::ConcurrentWriteBatch<'_>,
391 >(&mut WriteBatchRef::serial(batch))?;
392
393 {
394 let _span = tracing::trace_span!(
395 "update task cache",
396 items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
397 )
398 .entered();
399 let mut task_type_bytes =
403 TurboBincodeBuffer::with_capacity(INITIAL_ENCODE_BUFFER_CAPACITY);
404 for (task_type, task_id) in task_cache_updates.into_iter().flatten() {
405 encode_task_type(&task_type, &mut task_type_bytes, Some(task_id))?;
406 let task_id = *task_id;
407
408 batch
409 .put(
410 KeySpace::TaskCache,
411 WriteBuffer::Borrowed(&task_type_bytes),
412 WriteBuffer::Borrowed(&task_id.to_le_bytes()),
413 )
414 .with_context(|| {
415 format!("Unable to write task cache {task_type:?} => {task_id}")
416 })?;
417 #[cfg(feature = "print_cache_item_size")]
418 all_stats
419 .lock()
420 .unwrap()
421 .entry(task_type.get_name())
422 .or_default()
423 .add(&task_type_bytes);
424 next_task_id = next_task_id.max(task_id + 1);
425 }
426 }
427
428 save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
429 &mut WriteBatchRef::serial(batch),
430 next_task_id,
431 operations,
432 )?;
433 }
434 }
435 #[cfg(feature = "print_cache_item_size")]
436 print_task_type_cache_stats(all_stats.into_inner().unwrap());
437
438 {
439 let _span = tracing::trace_span!("commit").entered();
440 batch.commit().context("Unable to commit operations")?;
441 }
442 Ok(())
443 }
444
445 fn start_read_transaction(&self) -> Option<Self::ReadTransaction<'_>> {
446 self.inner.database.begin_read_transaction().ok()
447 }
448
449 unsafe fn forward_lookup_task_cache(
450 &self,
451 tx: Option<&T::ReadTransaction<'_>>,
452 task_type: &CachedTaskType,
453 ) -> Result<Option<TaskId>> {
454 let inner = &*self.inner;
455 fn lookup<D: KeyValueDatabase>(
456 database: &D,
457 tx: &D::ReadTransaction<'_>,
458 task_type: &CachedTaskType,
459 ) -> Result<Option<TaskId>> {
460 let mut task_type_bytes = TurboBincodeBuffer::new();
461 encode_task_type(task_type, &mut task_type_bytes, None)?;
462 let Some(bytes) = database.get(tx, KeySpace::TaskCache, &task_type_bytes)? else {
463 return Ok(None);
464 };
465 let bytes = bytes.borrow().try_into()?;
466 let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
467 Ok(Some(id))
468 }
469 if inner.database.is_empty() {
470 return Ok(None);
473 }
474 inner
475 .with_tx(tx, |tx| lookup(&self.inner.database, tx, task_type))
476 .with_context(|| format!("Looking up task id for {task_type:?} from database failed"))
477 }
478
479 unsafe fn lookup_data(
480 &self,
481 tx: Option<&T::ReadTransaction<'_>>,
482 task_id: TaskId,
483 category: SpecificTaskDataCategory,
484 storage: &mut TaskStorage,
485 ) -> Result<()> {
486 let inner = &*self.inner;
487 fn lookup<D: KeyValueDatabase>(
488 database: &D,
489 tx: &D::ReadTransaction<'_>,
490 task_id: TaskId,
491 category: SpecificTaskDataCategory,
492 storage: &mut TaskStorage,
493 ) -> Result<()> {
494 let Some(bytes) =
495 database.get(tx, category.key_space(), IntKey::new(*task_id).as_ref())?
496 else {
497 return Ok(());
498 };
499 let mut decoder = new_turbo_bincode_decoder(bytes.borrow());
500 storage
501 .decode(category, &mut decoder)
502 .map_err(|e| anyhow::anyhow!("Failed to decode {category:?}: {e:?}"))
503 }
504 inner
505 .with_tx(tx, |tx| {
506 lookup(&inner.database, tx, task_id, category, storage)
507 })
508 .with_context(|| format!("Looking up task storage for {task_id} from database failed"))
509 }
510
511 unsafe fn batch_lookup_data(
512 &self,
513 tx: Option<&Self::ReadTransaction<'_>>,
514 task_ids: &[TaskId],
515 category: SpecificTaskDataCategory,
516 ) -> Result<Vec<TaskStorage>> {
517 let inner = &*self.inner;
518 fn lookup<D: KeyValueDatabase>(
519 database: &D,
520 tx: &D::ReadTransaction<'_>,
521 task_ids: &[TaskId],
522 category: SpecificTaskDataCategory,
523 ) -> Result<Vec<TaskStorage>> {
524 let int_keys: Vec<_> = task_ids.iter().map(|&id| IntKey::new(*id)).collect();
525 let keys = int_keys.iter().map(|k| k.as_ref()).collect::<Vec<_>>();
526 let bytes = database.batch_get(tx, category.key_space(), &keys)?;
527 bytes
528 .into_iter()
529 .map(|opt_bytes| {
530 let mut storage = TaskStorage::new();
531 if let Some(bytes) = opt_bytes {
532 let mut decoder = new_turbo_bincode_decoder(bytes.borrow());
533 storage
534 .decode(category, &mut decoder)
535 .map_err(|e| anyhow::anyhow!("Failed to decode {category:?}: {e:?}"))?;
536 }
537 Ok(storage)
538 })
539 .collect::<Result<Vec<_>>>()
540 }
541 inner
542 .with_tx(tx, |tx| lookup(&inner.database, tx, task_ids, category))
543 .with_context(|| {
544 format!(
545 "Looking up typed data for {} tasks from database failed",
546 task_ids.len()
547 )
548 })
549 }
550
551 fn shutdown(&self) -> Result<()> {
552 self.inner.database.shutdown()
553 }
554}
555
556fn get_next_free_task_id<'a, S, C>(
557 batch: &mut WriteBatchRef<'_, 'a, S, C>,
558) -> Result<u32, anyhow::Error>
559where
560 S: SerialWriteBatch<'a>,
561 C: ConcurrentWriteBatch<'a>,
562{
563 Ok(
564 match batch.get(
565 KeySpace::Infra,
566 IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref(),
567 )? {
568 Some(bytes) => u32::from_le_bytes(Borrow::<[u8]>::borrow(&bytes).try_into()?),
569 None => 1,
570 },
571 )
572}
573
574fn save_infra<'a, S, C>(
575 batch: &mut WriteBatchRef<'_, 'a, S, C>,
576 next_task_id: u32,
577 operations: Vec<Arc<AnyOperation>>,
578) -> Result<(), anyhow::Error>
579where
580 S: SerialWriteBatch<'a>,
581 C: ConcurrentWriteBatch<'a>,
582{
583 {
584 batch
585 .put(
586 KeySpace::Infra,
587 WriteBuffer::Borrowed(IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref()),
588 WriteBuffer::Borrowed(&next_task_id.to_le_bytes()),
589 )
590 .context("Unable to write next free task id")?;
591 }
592 {
593 let _span =
594 tracing::trace_span!("update operations", operations = operations.len()).entered();
595 let operations =
596 turbo_bincode_encode(&operations).context("Unable to serialize operations")?;
597 batch
598 .put(
599 KeySpace::Infra,
600 WriteBuffer::Borrowed(IntKey::new(META_KEY_OPERATIONS).as_ref()),
601 WriteBuffer::SmallVec(operations),
602 )
603 .context("Unable to write operations")?;
604 }
605 batch.flush(KeySpace::Infra)?;
606 Ok(())
607}
608
609fn encode_task_type(
610 task_type: &CachedTaskType,
611 buffer: &mut TurboBincodeBuffer,
612 task_id: Option<TaskId>,
613) -> Result<()> {
614 fn encode_once_into(
615 task_type: &CachedTaskType,
616 buffer: &mut TurboBincodeBuffer,
617 task_id: Option<TaskId>,
618 ) -> Result<()> {
619 turbo_bincode_encode_into(task_type, buffer).with_context(|| {
620 if let Some(task_id) = task_id {
621 format!("Unable to serialize task {task_id} cache key {task_type:?}")
622 } else {
623 format!("Unable to serialize task cache key {task_type:?}")
624 }
625 })
626 }
627
628 debug_assert!(buffer.is_empty());
629 encode_once_into(task_type, buffer, task_id)?;
630
631 if cfg!(feature = "verify_serialization") {
632 macro_rules! println_and_panic {
633 ($($tt:tt)*) => {
634 println!($($tt)*);
635 panic!($($tt)*);
636 };
637 }
638 let deserialize: Result<CachedTaskType, _> = turbo_bincode_decode(buffer);
639 match deserialize {
640 Err(err) => {
641 println_and_panic!("Task type would not be deserializable:\n{err:?}");
642 }
643 Ok(task_type2) => {
644 if &task_type2 != task_type {
645 println_and_panic!(
646 "Task type would not round-trip {task_id:?}:\noriginal: \
647 {task_type:#?}\nround-tripped: {task_type2:#?}"
648 );
649 }
650 let mut buffer2 = TurboBincodeBuffer::new();
651 match encode_once_into(&task_type2, &mut buffer2, task_id) {
652 Err(err) => {
653 println_and_panic!(
654 "Task type would not be serializable the second time:\n{err:?}"
655 );
656 }
657 Ok(()) => {
658 if buffer2 != *buffer {
659 println_and_panic!(
660 "Task type would not serialize to the same bytes the second time \
661 {task_id:?}:\noriginal: {:x?}\nsecond: {:x?}\n{task_type2:#?}",
662 buffer,
663 buffer2
664 );
665 }
666 }
667 }
668 }
669 }
670 }
671
672 Ok(())
673}
674
675type SerializedTasks = Vec<
676 Vec<(
677 TaskId,
678 Option<WriteBuffer<'static>>,
679 Option<WriteBuffer<'static>>,
680 )>,
681>;
682
683#[cfg(feature = "print_cache_item_size")]
684#[derive(Default)]
685struct TaskTypeCacheStats {
686 key_size: usize,
687 key_size_compressed: usize,
688 count: usize,
689}
690
691#[cfg(feature = "print_cache_item_size")]
692impl TaskTypeCacheStats {
693 fn compressed_size(data: &[u8]) -> Result<usize> {
694 Ok(lzzzz::lz4::Compressor::new()?.next_to_vec(
695 data,
696 &mut Vec::new(),
697 lzzzz::lz4::ACC_LEVEL_DEFAULT,
698 )?)
699 }
700 fn add(&mut self, key_bytes: &[u8]) {
701 self.key_size += key_bytes.len();
702 self.key_size_compressed += Self::compressed_size(key_bytes).unwrap_or(0);
703 self.count += 1;
704 }
705}
706
707#[cfg(feature = "print_cache_item_size")]
708fn print_task_type_cache_stats(stats: std::collections::HashMap<&'static str, TaskTypeCacheStats>) {
709 use turbo_tasks::util::FormatBytes;
710
711 let mut stats: Vec<_> = stats.into_iter().collect();
712 if stats.is_empty() {
713 return;
714 }
715 stats.sort_unstable_by(|(key_a, stats_a), (key_b, stats_b)| {
716 (stats_b.key_size_compressed, *key_b).cmp(&(stats_a.key_size_compressed, *key_a))
717 });
718 println!(
719 "Task type cache stats: {} ({})",
720 FormatBytes(
721 stats
722 .iter()
723 .map(|(_, s)| s.key_size_compressed)
724 .sum::<usize>()
725 ),
726 FormatBytes(stats.iter().map(|(_, s)| s.key_size).sum::<usize>())
727 );
728 for (fn_name, stats) in stats {
729 println!(
730 " {} ({}) {fn_name} x {} avg {} ({})",
731 FormatBytes(stats.key_size_compressed),
732 FormatBytes(stats.key_size),
733 stats.count,
734 FormatBytes(
735 stats
736 .key_size_compressed
737 .checked_div(stats.count)
738 .unwrap_or(0)
739 ),
740 FormatBytes(stats.key_size.checked_div(stats.count).unwrap_or(0)),
741 );
742 }
743}
744fn process_task_data<'a, B: ConcurrentWriteBatch<'a> + Send + Sync, I>(
745 tasks: Vec<I>,
746 batch: Option<&B>,
747) -> Result<SerializedTasks>
748where
749 I: Iterator<
750 Item = (
751 TaskId,
752 Option<TurboBincodeBuffer>,
753 Option<TurboBincodeBuffer>,
754 ),
755 > + Send
756 + Sync,
757{
758 parallel::map_collect_owned::<_, _, Result<Vec<_>>>(tasks, |tasks| {
759 let mut result = Vec::new();
760 for (task_id, meta, data) in tasks {
761 if let Some(batch) = batch {
762 let key = IntKey::new(*task_id);
763 let key = key.as_ref();
764 if let Some(meta) = meta {
765 batch.put(
766 KeySpace::TaskMeta,
767 WriteBuffer::Borrowed(key),
768 WriteBuffer::SmallVec(meta),
769 )?;
770 }
771 if let Some(data) = data {
772 batch.put(
773 KeySpace::TaskData,
774 WriteBuffer::Borrowed(key),
775 WriteBuffer::SmallVec(data),
776 )?;
777 }
778 } else {
779 result.push((
781 task_id,
782 meta.map(WriteBuffer::SmallVec),
783 data.map(WriteBuffer::SmallVec),
784 ));
785 }
786 }
787
788 Ok(result)
789 })
790}