1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::Bound;
17use std::sync::atomic::AtomicUsize;
18use std::sync::atomic::Ordering::Relaxed;
19use std::sync::{Arc, LazyLock};
20
21use await_tree::InstrumentAwait;
22use bytes::Bytes;
23use foyer::Hint;
24use futures::future::try_join;
25use futures::{FutureExt, StreamExt, stream};
26use itertools::Itertools;
27use risingwave_common::catalog::TableId;
28use risingwave_hummock_sdk::key::{EPOCH_LEN, FullKey, FullKeyTracker, UserKey};
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::{EpochWithGap, KeyComparator, LocalSstableInfo};
31use risingwave_pb::hummock::compact_task;
32use thiserror_ext::AsReport;
33use tracing::{error, warn};
34
35use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
36use crate::hummock::compactor::compaction_filter::DummyCompactionFilter;
37use crate::hummock::compactor::context::{CompactorContext, await_tree_key};
38use crate::hummock::compactor::{CompactOutput, Compactor, check_flush_result};
39use crate::hummock::event_handler::uploader::UploadTaskOutput;
40use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator, UserIterator};
41use crate::hummock::shared_buffer::shared_buffer_batch::{
42 SharedBufferBatch, SharedBufferBatchInner, SharedBufferBatchOldValues, SharedBufferKeyEntry,
43 VersionedSharedBufferValue,
44};
45use crate::hummock::utils::MemoryTracker;
46use crate::hummock::{
47 BlockedXor16FilterBuilder, CachePolicy, GetObjectId, HummockError, HummockResult,
48 ObjectIdManagerRef, SstableBuilderOptions,
49};
50use crate::mem_table::ImmutableMemtable;
51use crate::opts::StorageOpts;
52
53const GC_DELETE_KEYS_FOR_FLUSH: bool = false;
54
55pub async fn compact(
57 context: CompactorContext,
58 object_id_manager: ObjectIdManagerRef,
59 payload: Vec<ImmutableMemtable>,
60 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
61) -> HummockResult<UploadTaskOutput> {
62 let table_ids_with_old_value: HashSet<TableId> = payload
63 .iter()
64 .filter(|imm| imm.has_old_value())
65 .map(|imm| imm.table_id)
66 .collect();
67 let mut non_log_store_new_value_payload = Vec::with_capacity(payload.len());
68 let mut log_store_new_value_payload = Vec::with_capacity(payload.len());
69 let mut old_value_payload = Vec::with_capacity(payload.len());
70 for imm in payload {
71 if table_ids_with_old_value.contains(&imm.table_id) {
72 if imm.has_old_value() {
73 old_value_payload.push(imm.clone());
74 }
75 log_store_new_value_payload.push(imm);
76 } else {
77 assert!(!imm.has_old_value());
78 non_log_store_new_value_payload.push(imm);
79 }
80 }
81 let non_log_store_new_value_future = async {
82 if non_log_store_new_value_payload.is_empty() {
83 Ok(vec![])
84 } else {
85 compact_shared_buffer::<true>(
86 context.clone(),
87 object_id_manager.clone(),
88 compaction_catalog_manager_ref.clone(),
89 non_log_store_new_value_payload,
90 )
91 .instrument_await("shared_buffer_compact_non_log_store_new_value")
92 .await
93 }
94 };
95
96 let log_store_new_value_future = async {
97 if log_store_new_value_payload.is_empty() {
98 Ok(vec![])
99 } else {
100 compact_shared_buffer::<true>(
101 context.clone(),
102 object_id_manager.clone(),
103 compaction_catalog_manager_ref.clone(),
104 log_store_new_value_payload,
105 )
106 .instrument_await("shared_buffer_compact_log_store_new_value")
107 .await
108 }
109 };
110
111 let old_value_future = async {
112 if old_value_payload.is_empty() {
113 Ok(vec![])
114 } else {
115 compact_shared_buffer::<false>(
116 context.clone(),
117 object_id_manager.clone(),
118 compaction_catalog_manager_ref.clone(),
119 old_value_payload,
120 )
121 .instrument_await("shared_buffer_compact_log_store_old_value")
122 .await
123 }
124 };
125
126 let ((non_log_store_new_value_ssts, log_store_new_value_ssts), old_value_ssts) = try_join(
128 try_join(non_log_store_new_value_future, log_store_new_value_future),
129 old_value_future,
130 )
131 .await?;
132
133 let mut new_value_ssts = non_log_store_new_value_ssts;
134 new_value_ssts.extend(log_store_new_value_ssts);
135
136 Ok(UploadTaskOutput {
137 new_value_ssts,
138 old_value_ssts,
139 wait_poll_timer: None,
140 })
141}
142
143async fn compact_shared_buffer<const IS_NEW_VALUE: bool>(
148 context: CompactorContext,
149 object_id_manager: ObjectIdManagerRef,
150 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
151 mut payload: Vec<ImmutableMemtable>,
152) -> HummockResult<Vec<LocalSstableInfo>> {
153 if !IS_NEW_VALUE {
154 assert!(payload.iter().all(|imm| imm.has_old_value()));
155 }
156 let existing_table_ids: HashSet<TableId> =
158 payload.iter().map(|imm| imm.table_id).dedup().collect();
159 assert!(!existing_table_ids.is_empty());
160
161 let compaction_catalog_agent_ref = compaction_catalog_manager_ref
162 .acquire(existing_table_ids.iter().copied().collect())
163 .await?;
164 let existing_table_ids = compaction_catalog_agent_ref
165 .table_ids()
166 .collect::<HashSet<_>>();
167 payload.retain(|imm| {
168 let ret = existing_table_ids.contains(&imm.table_id);
169 if !ret {
170 error!(
171 "can not find table {:?}, it may be removed by meta-service",
172 imm.table_id
173 );
174 }
175 ret
176 });
177
178 let total_key_count = payload.iter().map(|imm| imm.key_count()).sum::<usize>();
179 let (splits, sub_compaction_sstable_size, table_vnode_partition) =
180 generate_splits(&payload, &existing_table_ids, context.storage_opts.as_ref());
181 let parallelism = splits.len();
182 let mut compact_success = true;
183 let mut output_ssts = Vec::with_capacity(parallelism);
184 let mut compaction_futures = vec![];
185 let use_block_based_filter = BlockedXor16FilterBuilder::is_kv_count_too_large(total_key_count);
186
187 for (split_index, key_range) in splits.into_iter().enumerate() {
188 let compactor = SharedBufferCompactRunner::new(
189 split_index,
190 key_range,
191 context.clone(),
192 sub_compaction_sstable_size as usize,
193 table_vnode_partition.clone(),
194 use_block_based_filter,
195 object_id_manager.clone(),
196 );
197 let mut forward_iters = Vec::with_capacity(payload.len());
198 for imm in &payload {
199 forward_iters.push(imm.clone().into_directed_iter::<Forward, IS_NEW_VALUE>());
200 }
201 let compaction_executor = context.compaction_executor.clone();
202 let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
203 let handle = compaction_executor.spawn({
204 static NEXT_SHARED_BUFFER_COMPACT_ID: LazyLock<AtomicUsize> =
205 LazyLock::new(|| AtomicUsize::new(0));
206 let tree_root = context.await_tree_reg.as_ref().map(|reg| {
207 let id = NEXT_SHARED_BUFFER_COMPACT_ID.fetch_add(1, Relaxed);
208 reg.register(
209 await_tree_key::CompactSharedBuffer { id },
210 format!(
211 "Compact Shared Buffer: {:?}",
212 payload
213 .iter()
214 .flat_map(|imm| imm.epochs().iter())
215 .copied()
216 .collect::<BTreeSet<_>>()
217 ),
218 )
219 });
220 let future = compactor.run(
221 MergeIterator::new(forward_iters),
222 compaction_catalog_agent_ref,
223 );
224 if let Some(root) = tree_root {
225 root.instrument(future).left_future()
226 } else {
227 future.right_future()
228 }
229 });
230 compaction_futures.push(handle);
231 }
232
233 let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
234 let mut err = None;
235 while let Some(future_result) = buffered.next().await {
236 match future_result {
237 Ok(Ok((split_index, ssts, table_stats_map))) => {
238 output_ssts.push((split_index, ssts, table_stats_map));
239 }
240 Ok(Err(e)) => {
241 compact_success = false;
242 tracing::warn!(error = %e.as_report(), "Shared Buffer Compaction failed with error");
243 err = Some(e);
244 }
245 Err(e) => {
246 compact_success = false;
247 tracing::warn!(
248 error = %e.as_report(),
249 "Shared Buffer Compaction failed with future error",
250 );
251 err = Some(HummockError::compaction_executor(
252 "failed while execute in tokio",
253 ));
254 }
255 }
256 }
257
258 output_ssts.sort_by_key(|(split_index, ..)| *split_index);
260
261 if compact_success {
262 let mut level0 = Vec::with_capacity(parallelism);
263 let mut sst_infos = vec![];
264 for (_, ssts, _) in output_ssts {
265 for sst_info in &ssts {
266 context
267 .compactor_metrics
268 .write_build_l0_bytes
269 .inc_by(sst_info.file_size());
270
271 sst_infos.push(sst_info.sst_info.clone());
272 }
273 level0.extend(ssts);
274 }
275 if context.storage_opts.check_compaction_result {
276 let compaction_executor = context.compaction_executor.clone();
277 let mut forward_iters = Vec::with_capacity(payload.len());
278 for imm in &payload {
279 if !existing_table_ids.contains(&imm.table_id) {
280 continue;
281 }
282 forward_iters.push(imm.clone().into_forward_iter());
283 }
284 let iter = MergeIterator::new(forward_iters);
285 let left_iter = UserIterator::new(
286 iter,
287 (Bound::Unbounded, Bound::Unbounded),
288 u64::MAX,
289 0,
290 None,
291 );
292 compaction_executor.spawn(async move {
293 match check_flush_result(
294 left_iter,
295 Vec::from_iter(existing_table_ids.iter().cloned()),
296 sst_infos,
297 context,
298 )
299 .await
300 {
301 Err(e) => {
302 tracing::warn!(error = %e.as_report(), "Failed check flush result of memtable");
303 }
304 Ok(true) => (),
305 Ok(false) => {
306 panic!(
307 "failed to check flush result consistency of state-table {:?}",
308 existing_table_ids
309 );
310 }
311 }
312 });
313 }
314 Ok(level0)
315 } else {
316 Err(err.unwrap())
317 }
318}
319
320pub async fn merge_imms_in_memory(
322 table_id: TableId,
323 imms: Vec<ImmutableMemtable>,
324 memory_tracker: Option<MemoryTracker>,
325) -> ImmutableMemtable {
326 let mut epochs = vec![];
327 let mut merged_size = 0;
328 assert!(imms.iter().rev().map(|imm| imm.batch_id()).is_sorted());
329 let max_imm_id = imms[0].batch_id();
330
331 let has_old_value = imms[0].has_old_value();
332 assert!(imms.iter().all(|imm| imm.has_old_value() == has_old_value));
335
336 let (old_value_size, global_old_value_size) = if has_old_value {
337 (
338 imms.iter()
339 .map(|imm| imm.old_values().expect("has old value").size)
340 .sum(),
341 Some(
342 imms[0]
343 .old_values()
344 .expect("has old value")
345 .global_old_value_size
346 .clone(),
347 ),
348 )
349 } else {
350 (0, None)
351 };
352
353 let mut imm_iters = Vec::with_capacity(imms.len());
354 let key_count = imms.iter().map(|imm| imm.key_count()).sum();
355 let value_count = imms.iter().map(|imm| imm.value_count()).sum();
356 for imm in imms {
357 assert!(imm.key_count() > 0, "imm should not be empty");
358 assert_eq!(
359 table_id,
360 imm.table_id(),
361 "should only merge data belonging to the same table"
362 );
363
364 epochs.push(imm.min_epoch());
365 merged_size += imm.size();
366
367 imm_iters.push(imm.into_forward_iter());
368 }
369 epochs.sort();
370
371 let mut mi = MergeIterator::new(imm_iters);
373 mi.rewind_no_await();
374 assert!(mi.is_valid());
375
376 let first_item_key = mi.current_key_entry().key.clone();
377
378 let mut merged_entries: Vec<SharedBufferKeyEntry> = Vec::with_capacity(key_count);
379 let mut values: Vec<VersionedSharedBufferValue> = Vec::with_capacity(value_count);
380 let mut old_values: Option<Vec<Bytes>> = if has_old_value {
381 Some(Vec::with_capacity(value_count))
382 } else {
383 None
384 };
385
386 merged_entries.push(SharedBufferKeyEntry {
387 key: first_item_key.clone(),
388 value_offset: 0,
389 });
390
391 let mut full_key_tracker = FullKeyTracker::<Bytes>::new(FullKey::new_with_gap_epoch(
393 table_id,
394 first_item_key,
395 EpochWithGap::new_max_epoch(),
396 ));
397
398 while mi.is_valid() {
399 let key_entry = mi.current_key_entry();
400 let user_key = UserKey {
401 table_id,
402 table_key: key_entry.key.clone(),
403 };
404 if full_key_tracker.observe_multi_version(
405 user_key,
406 key_entry
407 .new_values
408 .iter()
409 .map(|(epoch_with_gap, _)| *epoch_with_gap),
410 ) {
411 let last_entry = merged_entries.last_mut().expect("non-empty");
412 if last_entry.value_offset == values.len() {
413 warn!(key = ?last_entry.key, "key has no value in imm compact. skipped");
414 last_entry.key = full_key_tracker.latest_user_key().table_key.clone();
415 } else {
416 merged_entries.push(SharedBufferKeyEntry {
418 key: full_key_tracker.latest_user_key().table_key.clone(),
419 value_offset: values.len(),
420 });
421 }
422 }
423 values.extend(
424 key_entry
425 .new_values
426 .iter()
427 .map(|(epoch_with_gap, value)| (*epoch_with_gap, value.clone())),
428 );
429 if let Some(old_values) = &mut old_values {
430 old_values.extend(key_entry.old_values.expect("should exist").iter().cloned())
431 }
432 mi.advance_peek_to_next_key();
433 tokio::task::consume_budget().await;
436 }
437
438 let old_values = old_values.map(|old_values| {
439 SharedBufferBatchOldValues::new(
440 old_values,
441 old_value_size,
442 global_old_value_size.expect("should exist when has old value"),
443 )
444 });
445
446 SharedBufferBatch {
447 inner: Arc::new(SharedBufferBatchInner::new_with_multi_epoch_batches(
448 epochs,
449 merged_entries,
450 values,
451 old_values,
452 merged_size,
453 max_imm_id,
454 memory_tracker,
455 )),
456 table_id,
457 }
458}
459
460fn generate_splits(
462 payload: &Vec<ImmutableMemtable>,
463 existing_table_ids: &HashSet<TableId>,
464 storage_opts: &StorageOpts,
465) -> (Vec<KeyRange>, u64, BTreeMap<TableId, u32>) {
466 let mut size_and_start_user_keys = vec![];
467 let mut compact_data_size = 0;
468 let mut table_size_infos: HashMap<TableId, u64> = HashMap::default();
469 let mut table_vnode_partition = BTreeMap::default();
470 for imm in payload {
471 let data_size = {
472 (imm.value_count() * EPOCH_LEN + imm.size()) as u64
474 };
475 compact_data_size += data_size;
476 size_and_start_user_keys.push((
477 data_size,
478 FullKey {
479 user_key: imm.start_user_key(),
480 epoch_with_gap: EpochWithGap::new_max_epoch(),
481 }
482 .encode(),
483 ));
484 let v = table_size_infos.entry(imm.table_id).or_insert(0);
485 *v += data_size;
486 }
487
488 size_and_start_user_keys
489 .sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref()));
490 let mut splits = Vec::with_capacity(size_and_start_user_keys.len());
491 splits.push(KeyRange::new(Bytes::new(), Bytes::new()));
492 let sstable_size = (storage_opts.sstable_size_mb as u64) << 20;
493 let min_sstable_size = (storage_opts.min_sstable_size_mb as u64) << 20;
494 let parallel_compact_size = (storage_opts.parallel_compact_size_mb as u64) << 20;
495 let parallelism = std::cmp::min(
496 storage_opts.share_buffers_sync_parallelism as u64,
497 size_and_start_user_keys.len() as u64,
498 );
499 let sub_compaction_data_size = if compact_data_size > parallel_compact_size && parallelism > 1 {
500 compact_data_size / parallelism
501 } else {
502 compact_data_size
503 };
504
505 if parallelism > 1 && compact_data_size > sstable_size {
506 let mut last_buffer_size = 0;
507 let mut last_key: Vec<u8> = vec![];
508 for (data_size, key) in size_and_start_user_keys {
509 if last_buffer_size >= sub_compaction_data_size && !last_key.eq(&key) {
510 splits.last_mut().unwrap().right = Bytes::from(key.clone());
511 splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
512 last_buffer_size = data_size;
513 } else {
514 last_buffer_size += data_size;
515 }
516
517 last_key = key;
518 }
519 }
520
521 if compact_data_size > sstable_size {
522 for table_id in existing_table_ids {
525 if let Some(table_size) = table_size_infos.get(table_id)
526 && *table_size > min_sstable_size
527 {
528 table_vnode_partition.insert(*table_id, 1);
529 }
530 }
531 }
532
533 let sub_compaction_sstable_size = std::cmp::min(sstable_size, sub_compaction_data_size * 6 / 5);
536 (splits, sub_compaction_sstable_size, table_vnode_partition)
537}
538
539pub struct SharedBufferCompactRunner {
540 compactor: Compactor,
541 split_index: usize,
542}
543
544impl SharedBufferCompactRunner {
545 pub fn new(
546 split_index: usize,
547 key_range: KeyRange,
548 context: CompactorContext,
549 sub_compaction_sstable_size: usize,
550 table_vnode_partition: BTreeMap<TableId, u32>,
551 use_block_based_filter: bool,
552 object_id_getter: Arc<dyn GetObjectId>,
553 ) -> Self {
554 let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
555 options.capacity = sub_compaction_sstable_size;
556 let compactor = Compactor::new(
557 context,
558 options,
559 super::TaskConfig {
560 key_range,
561 cache_policy: CachePolicy::Fill(Hint::Normal),
562 gc_delete_keys: GC_DELETE_KEYS_FOR_FLUSH,
563 retain_multiple_version: true,
564 stats_target_table_ids: None,
565 task_type: compact_task::TaskType::SharedBuffer,
566 table_vnode_partition,
567 use_block_based_filter,
568 table_schemas: Default::default(),
569 disable_drop_column_optimization: false,
570 },
571 object_id_getter,
572 );
573 Self {
574 compactor,
575 split_index,
576 }
577 }
578
579 pub async fn run(
580 self,
581 iter: impl HummockIterator<Direction = Forward>,
582 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
583 ) -> HummockResult<CompactOutput> {
584 let dummy_compaction_filter = DummyCompactionFilter {};
585 let (ssts, table_stats_map) = self
586 .compactor
587 .compact_key_range(
588 iter,
589 dummy_compaction_filter,
590 compaction_catalog_agent_ref,
591 None,
592 None,
593 None,
594 )
595 .await?;
596 Ok((self.split_index, ssts, table_stats_map))
597 }
598}
599
600#[cfg(test)]
601mod tests {
602 use std::collections::HashSet;
603
604 use bytes::Bytes;
605 use risingwave_common::catalog::TableId;
606 use risingwave_common::hash::VirtualNode;
607 use risingwave_common::util::epoch::test_epoch;
608 use risingwave_hummock_sdk::key::{TableKey, prefix_slice_with_vnode};
609
610 use crate::hummock::compactor::shared_buffer_compact::generate_splits;
611 use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferValue;
612 use crate::mem_table::ImmutableMemtable;
613 use crate::opts::StorageOpts;
614
615 fn generate_key(key: &str) -> TableKey<Bytes> {
616 TableKey(prefix_slice_with_vnode(
617 VirtualNode::from_index(1),
618 key.as_bytes(),
619 ))
620 }
621
622 #[tokio::test]
623 async fn test_generate_splits_in_order() {
624 let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test(
625 test_epoch(3),
626 0,
627 vec![(
628 generate_key("dddd"),
629 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
630 )],
631 1024 * 1024,
632 TableId::new(1),
633 );
634 let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test(
635 test_epoch(3),
636 0,
637 vec![(
638 generate_key("abb"),
639 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
640 )],
641 (1024 + 256) * 1024,
642 TableId::new(1),
643 );
644
645 let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test(
646 test_epoch(2),
647 0,
648 vec![(
649 generate_key("abc"),
650 SharedBufferValue::Insert(Bytes::from_static(b"v2")),
651 )],
652 (1024 + 512) * 1024,
653 TableId::new(1),
654 );
655 let imm4 = ImmutableMemtable::build_shared_buffer_batch_for_test(
656 test_epoch(3),
657 0,
658 vec![(
659 generate_key("aaa"),
660 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
661 )],
662 (1024 + 512) * 1024,
663 TableId::new(1),
664 );
665
666 let imm5 = ImmutableMemtable::build_shared_buffer_batch_for_test(
667 test_epoch(3),
668 0,
669 vec![(
670 generate_key("aaa"),
671 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
672 )],
673 (1024 + 256) * 1024,
674 TableId::new(2),
675 );
676
677 let storage_opts = StorageOpts {
678 share_buffers_sync_parallelism: 3,
679 parallel_compact_size_mb: 1,
680 sstable_size_mb: 1,
681 ..Default::default()
682 };
683 let payload = vec![imm1, imm2, imm3, imm4, imm5];
684 let (splits, _sstable_capacity, vnodes) = generate_splits(
685 &payload,
686 &HashSet::from_iter([1.into(), 2.into()]),
687 &storage_opts,
688 );
689 assert_eq!(
690 splits.len(),
691 storage_opts.share_buffers_sync_parallelism as usize
692 );
693 assert!(vnodes.is_empty());
694
695 for i in 1..splits.len() {
697 assert_eq!(splits[i].left, splits[i - 1].right);
698 assert!(splits[i].left > splits[i - 1].left);
699 assert!(splits[i].right.is_empty() || splits[i].left < splits[i].right);
700 }
701 }
702
703 #[tokio::test]
704 async fn test_generate_splits_no_duplicate_keys() {
705 let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test(
708 test_epoch(1),
709 0,
710 vec![(
711 generate_key("zzz"), SharedBufferValue::Insert(Bytes::from_static(b"v1")),
713 )],
714 2 * 1024 * 1024, TableId::new(1),
716 );
717
718 let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test(
719 test_epoch(1),
720 0,
721 vec![(
722 generate_key("aaa"), SharedBufferValue::Insert(Bytes::from_static(b"v1")),
724 )],
725 2 * 1024 * 1024, TableId::new(1),
727 );
728
729 let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test(
730 test_epoch(1),
731 0,
732 vec![(
733 generate_key("mmm"), SharedBufferValue::Insert(Bytes::from_static(b"v1")),
735 )],
736 2 * 1024 * 1024, TableId::new(1),
738 );
739
740 let storage_opts = StorageOpts {
741 share_buffers_sync_parallelism: 3, parallel_compact_size_mb: 2, sstable_size_mb: 1, ..Default::default()
745 };
746
747 let payload = vec![imm1, imm2, imm3];
749 let (splits, _sstable_capacity, _vnodes) =
750 generate_splits(&payload, &HashSet::from_iter([1.into()]), &storage_opts);
751
752 assert!(
754 splits.len() > 1,
755 "Expected multiple splits, got {}",
756 splits.len()
757 );
758
759 for i in 0..splits.len() {
761 for j in (i + 1)..splits.len() {
762 let split_i = &splits[i];
763 let split_j = &splits[j];
764
765 if !split_i.right.is_empty() && !split_j.left.is_empty() {
767 assert!(
768 split_i.right <= split_j.left || split_j.right <= split_i.left,
769 "Split {} and {} overlap: [{:?}, {:?}) vs [{:?}, {:?})",
770 i,
771 j,
772 split_i.left,
773 split_i.right,
774 split_j.left,
775 split_j.right
776 );
777 }
778 }
779 }
780
781 for i in 1..splits.len() {
783 if !splits[i - 1].right.is_empty() && !splits[i].left.is_empty() {
784 assert!(
785 splits[i - 1].right <= splits[i].left,
786 "Splits are not in sorted order at index {}: {:?} > {:?}",
787 i,
788 splits[i - 1].right,
789 splits[i].left
790 );
791 }
792 }
793 }
794}