1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::Bound;
17use std::sync::atomic::AtomicUsize;
18use std::sync::atomic::Ordering::Relaxed;
19use std::sync::{Arc, LazyLock};
20
21use await_tree::InstrumentAwait;
22use bytes::Bytes;
23use foyer::Hint;
24use futures::future::try_join;
25use futures::{FutureExt, StreamExt, stream};
26use itertools::Itertools;
27use risingwave_common::catalog::TableId;
28use risingwave_hummock_sdk::key::{EPOCH_LEN, FullKey, FullKeyTracker, UserKey};
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::{EpochWithGap, KeyComparator, LocalSstableInfo};
31use risingwave_pb::hummock::compact_task;
32use thiserror_ext::AsReport;
33use tracing::{error, warn};
34
35use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
36use crate::hummock::compactor::compaction_filter::DummyCompactionFilter;
37use crate::hummock::compactor::context::{CompactorContext, await_tree_key};
38use crate::hummock::compactor::{CompactOutput, Compactor, check_flush_result};
39use crate::hummock::event_handler::uploader::UploadTaskOutput;
40use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator, UserIterator};
41use crate::hummock::shared_buffer::shared_buffer_batch::{
42 SharedBufferBatch, SharedBufferBatchInner, SharedBufferBatchOldValues, SharedBufferKeyEntry,
43 VersionedSharedBufferValue,
44};
45use crate::hummock::utils::MemoryTracker;
46use crate::hummock::{
47 CachePolicy, GetObjectId, HummockError, HummockResult, ObjectIdManagerRef,
48 SstableBuilderOptions,
49};
50use crate::mem_table::ImmutableMemtable;
51use crate::opts::StorageOpts;
52
53const GC_DELETE_KEYS_FOR_FLUSH: bool = false;
54
55pub async fn compact(
57 context: CompactorContext,
58 object_id_manager: ObjectIdManagerRef,
59 payload: Vec<ImmutableMemtable>,
60 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
61) -> HummockResult<UploadTaskOutput> {
62 let table_ids_with_old_value: HashSet<TableId> = payload
63 .iter()
64 .filter(|imm| imm.has_old_value())
65 .map(|imm| imm.table_id)
66 .collect();
67 let mut non_log_store_new_value_payload = Vec::with_capacity(payload.len());
68 let mut log_store_new_value_payload = Vec::with_capacity(payload.len());
69 let mut old_value_payload = Vec::with_capacity(payload.len());
70 for imm in payload {
71 if table_ids_with_old_value.contains(&imm.table_id) {
72 if imm.has_old_value() {
73 old_value_payload.push(imm.clone());
74 }
75 log_store_new_value_payload.push(imm);
76 } else {
77 assert!(!imm.has_old_value());
78 non_log_store_new_value_payload.push(imm);
79 }
80 }
81 let non_log_store_new_value_future = async {
82 if non_log_store_new_value_payload.is_empty() {
83 Ok(vec![])
84 } else {
85 compact_shared_buffer::<true>(
86 context.clone(),
87 object_id_manager.clone(),
88 compaction_catalog_manager_ref.clone(),
89 non_log_store_new_value_payload,
90 )
91 .instrument_await("shared_buffer_compact_non_log_store_new_value")
92 .await
93 }
94 };
95
96 let log_store_new_value_future = async {
97 if log_store_new_value_payload.is_empty() {
98 Ok(vec![])
99 } else {
100 compact_shared_buffer::<true>(
101 context.clone(),
102 object_id_manager.clone(),
103 compaction_catalog_manager_ref.clone(),
104 log_store_new_value_payload,
105 )
106 .instrument_await("shared_buffer_compact_log_store_new_value")
107 .await
108 }
109 };
110
111 let old_value_future = async {
112 if old_value_payload.is_empty() {
113 Ok(vec![])
114 } else {
115 compact_shared_buffer::<false>(
116 context.clone(),
117 object_id_manager.clone(),
118 compaction_catalog_manager_ref.clone(),
119 old_value_payload,
120 )
121 .instrument_await("shared_buffer_compact_log_store_old_value")
122 .await
123 }
124 };
125
126 let ((non_log_store_new_value_ssts, log_store_new_value_ssts), old_value_ssts) = try_join(
128 try_join(non_log_store_new_value_future, log_store_new_value_future),
129 old_value_future,
130 )
131 .await?;
132
133 let mut new_value_ssts = non_log_store_new_value_ssts;
134 new_value_ssts.extend(log_store_new_value_ssts);
135
136 Ok(UploadTaskOutput {
137 new_value_ssts,
138 old_value_ssts,
139 wait_poll_timer: None,
140 })
141}
142
143async fn compact_shared_buffer<const IS_NEW_VALUE: bool>(
148 context: CompactorContext,
149 object_id_manager: ObjectIdManagerRef,
150 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
151 mut payload: Vec<ImmutableMemtable>,
152) -> HummockResult<Vec<LocalSstableInfo>> {
153 if !IS_NEW_VALUE {
154 assert!(payload.iter().all(|imm| imm.has_old_value()));
155 }
156 let existing_table_ids: HashSet<TableId> =
158 payload.iter().map(|imm| imm.table_id).dedup().collect();
159 assert!(!existing_table_ids.is_empty());
160
161 let compaction_catalog_agent_ref = compaction_catalog_manager_ref
162 .acquire(existing_table_ids.iter().copied().collect())
163 .await?;
164 let existing_table_ids = compaction_catalog_agent_ref
165 .table_ids()
166 .collect::<HashSet<_>>();
167 payload.retain(|imm| {
168 let ret = existing_table_ids.contains(&imm.table_id);
169 if !ret {
170 error!(
171 "can not find table {:?}, it may be removed by meta-service",
172 imm.table_id
173 );
174 }
175 ret
176 });
177
178 let total_key_count = payload.iter().map(|imm| imm.key_count()).sum::<usize>();
179 let (splits, sub_compaction_sstable_size, table_vnode_partition) =
180 generate_splits(&payload, &existing_table_ids, context.storage_opts.as_ref());
181 let parallelism = splits.len();
182 let mut compact_success = true;
183 let mut output_ssts = Vec::with_capacity(parallelism);
184 let mut compaction_futures = vec![];
185 let use_block_based_filter =
188 risingwave_hummock_sdk::filter_utils::is_kv_count_too_large_for_xor16(
189 total_key_count as u64,
190 None,
191 );
192
193 for (split_index, key_range) in splits.into_iter().enumerate() {
194 let compactor = SharedBufferCompactRunner::new(
195 split_index,
196 key_range,
197 context.clone(),
198 sub_compaction_sstable_size as usize,
199 table_vnode_partition.clone(),
200 use_block_based_filter,
201 object_id_manager.clone(),
202 );
203 let mut forward_iters = Vec::with_capacity(payload.len());
204 for imm in &payload {
205 forward_iters.push(imm.clone().into_directed_iter::<Forward, IS_NEW_VALUE>());
206 }
207 let compaction_executor = context.compaction_executor.clone();
208 let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
209 let handle = compaction_executor.spawn({
210 static NEXT_SHARED_BUFFER_COMPACT_ID: LazyLock<AtomicUsize> =
211 LazyLock::new(|| AtomicUsize::new(0));
212 let tree_root = context.await_tree_reg.as_ref().map(|reg| {
213 let id = NEXT_SHARED_BUFFER_COMPACT_ID.fetch_add(1, Relaxed);
214 reg.register(
215 await_tree_key::CompactSharedBuffer { id },
216 format!(
217 "Compact Shared Buffer: {:?}",
218 payload
219 .iter()
220 .flat_map(|imm| imm.epochs().iter())
221 .copied()
222 .collect::<BTreeSet<_>>()
223 ),
224 )
225 });
226 let future = compactor.run(
227 MergeIterator::new(forward_iters),
228 compaction_catalog_agent_ref,
229 );
230 if let Some(root) = tree_root {
231 root.instrument(future).left_future()
232 } else {
233 future.right_future()
234 }
235 });
236 compaction_futures.push(handle);
237 }
238
239 let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
240 let mut err = None;
241 while let Some(future_result) = buffered.next().await {
242 match future_result {
243 Ok(Ok((split_index, ssts, table_stats_map))) => {
244 output_ssts.push((split_index, ssts, table_stats_map));
245 }
246 Ok(Err(e)) => {
247 compact_success = false;
248 tracing::warn!(error = %e.as_report(), "Shared Buffer Compaction failed with error");
249 err = Some(e);
250 }
251 Err(e) => {
252 compact_success = false;
253 tracing::warn!(
254 error = %e.as_report(),
255 "Shared Buffer Compaction failed with future error",
256 );
257 err = Some(HummockError::compaction_executor(
258 "failed while execute in tokio",
259 ));
260 }
261 }
262 }
263
264 output_ssts.sort_by_key(|(split_index, ..)| *split_index);
266
267 if compact_success {
268 let mut level0 = Vec::with_capacity(parallelism);
269 let mut sst_infos = vec![];
270 for (_, ssts, _) in output_ssts {
271 for sst_info in &ssts {
272 context
273 .compactor_metrics
274 .write_build_l0_bytes
275 .inc_by(sst_info.file_size());
276
277 sst_infos.push(sst_info.sst_info.clone());
278 }
279 level0.extend(ssts);
280 }
281 if context.storage_opts.check_compaction_result {
282 let compaction_executor = context.compaction_executor.clone();
283 let mut forward_iters = Vec::with_capacity(payload.len());
284 for imm in &payload {
285 if !existing_table_ids.contains(&imm.table_id) {
286 continue;
287 }
288 forward_iters.push(imm.clone().into_forward_iter());
289 }
290 let iter = MergeIterator::new(forward_iters);
291 let left_iter = UserIterator::new(
292 iter,
293 (Bound::Unbounded, Bound::Unbounded),
294 u64::MAX,
295 0,
296 None,
297 );
298 compaction_executor.spawn(async move {
299 match check_flush_result(
300 left_iter,
301 Vec::from_iter(existing_table_ids.iter().cloned()),
302 sst_infos,
303 context,
304 )
305 .await
306 {
307 Err(e) => {
308 tracing::warn!(error = %e.as_report(), "Failed check flush result of memtable");
309 }
310 Ok(true) => (),
311 Ok(false) => {
312 panic!(
313 "failed to check flush result consistency of state-table {:?}",
314 existing_table_ids
315 );
316 }
317 }
318 });
319 }
320 Ok(level0)
321 } else {
322 Err(err.unwrap())
323 }
324}
325
326pub async fn merge_imms_in_memory(
328 table_id: TableId,
329 imms: Vec<ImmutableMemtable>,
330 memory_tracker: Option<MemoryTracker>,
331) -> ImmutableMemtable {
332 let mut epochs = vec![];
333 let mut merged_size = 0;
334 assert!(imms.iter().rev().map(|imm| imm.batch_id()).is_sorted());
335 let max_imm_id = imms[0].batch_id();
336
337 let has_old_value = imms[0].has_old_value();
338 assert!(imms.iter().all(|imm| imm.has_old_value() == has_old_value));
341
342 let (old_value_size, global_old_value_size) = if has_old_value {
343 (
344 imms.iter()
345 .map(|imm| imm.old_values().expect("has old value").size)
346 .sum(),
347 Some(
348 imms[0]
349 .old_values()
350 .expect("has old value")
351 .global_old_value_size
352 .clone(),
353 ),
354 )
355 } else {
356 (0, None)
357 };
358
359 let mut imm_iters = Vec::with_capacity(imms.len());
360 let key_count = imms.iter().map(|imm| imm.key_count()).sum();
361 let value_count = imms.iter().map(|imm| imm.value_count()).sum();
362 for imm in imms {
363 assert!(imm.key_count() > 0, "imm should not be empty");
364 assert_eq!(
365 table_id,
366 imm.table_id(),
367 "should only merge data belonging to the same table"
368 );
369
370 epochs.push(imm.min_epoch());
371 merged_size += imm.size();
372
373 imm_iters.push(imm.into_forward_iter());
374 }
375 epochs.sort();
376
377 let mut mi = MergeIterator::new(imm_iters);
379 mi.rewind_no_await();
380 assert!(mi.is_valid());
381
382 let first_item_key = mi.current_key_entry().key.clone();
383
384 let mut merged_entries: Vec<SharedBufferKeyEntry> = Vec::with_capacity(key_count);
385 let mut values: Vec<VersionedSharedBufferValue> = Vec::with_capacity(value_count);
386 let mut old_values: Option<Vec<Bytes>> = if has_old_value {
387 Some(Vec::with_capacity(value_count))
388 } else {
389 None
390 };
391
392 merged_entries.push(SharedBufferKeyEntry {
393 key: first_item_key.clone(),
394 value_offset: 0,
395 });
396
397 let mut full_key_tracker = FullKeyTracker::<Bytes>::new(FullKey::new_with_gap_epoch(
399 table_id,
400 first_item_key,
401 EpochWithGap::new_max_epoch(),
402 ));
403
404 while mi.is_valid() {
405 let key_entry = mi.current_key_entry();
406 let user_key = UserKey {
407 table_id,
408 table_key: key_entry.key.clone(),
409 };
410 if full_key_tracker.observe_multi_version(
411 user_key,
412 key_entry
413 .new_values
414 .iter()
415 .map(|(epoch_with_gap, _)| *epoch_with_gap),
416 ) {
417 let last_entry = merged_entries.last_mut().expect("non-empty");
418 if last_entry.value_offset == values.len() {
419 warn!(key = ?last_entry.key, "key has no value in imm compact. skipped");
420 last_entry.key = full_key_tracker.latest_user_key().table_key.clone();
421 } else {
422 merged_entries.push(SharedBufferKeyEntry {
424 key: full_key_tracker.latest_user_key().table_key.clone(),
425 value_offset: values.len(),
426 });
427 }
428 }
429 values.extend(
430 key_entry
431 .new_values
432 .iter()
433 .map(|(epoch_with_gap, value)| (*epoch_with_gap, value.clone())),
434 );
435 if let Some(old_values) = &mut old_values {
436 old_values.extend(key_entry.old_values.expect("should exist").iter().cloned())
437 }
438 mi.advance_peek_to_next_key();
439 tokio::task::consume_budget().await;
442 }
443
444 let old_values = old_values.map(|old_values| {
445 SharedBufferBatchOldValues::new(
446 old_values,
447 old_value_size,
448 global_old_value_size.expect("should exist when has old value"),
449 )
450 });
451
452 SharedBufferBatch {
453 inner: Arc::new(SharedBufferBatchInner::new_with_multi_epoch_batches(
454 epochs,
455 merged_entries,
456 values,
457 old_values,
458 merged_size,
459 max_imm_id,
460 memory_tracker,
461 )),
462 table_id,
463 }
464}
465
466fn generate_splits(
468 payload: &Vec<ImmutableMemtable>,
469 existing_table_ids: &HashSet<TableId>,
470 storage_opts: &StorageOpts,
471) -> (Vec<KeyRange>, u64, BTreeMap<TableId, u32>) {
472 let mut size_and_start_user_keys = vec![];
473 let mut compact_data_size = 0;
474 let mut table_size_infos: HashMap<TableId, u64> = HashMap::default();
475 let mut table_vnode_partition = BTreeMap::default();
476 for imm in payload {
477 let data_size = {
478 (imm.value_count() * EPOCH_LEN + imm.size()) as u64
480 };
481 compact_data_size += data_size;
482 size_and_start_user_keys.push((
483 data_size,
484 FullKey {
485 user_key: imm.start_user_key(),
486 epoch_with_gap: EpochWithGap::new_max_epoch(),
487 }
488 .encode(),
489 ));
490 let v = table_size_infos.entry(imm.table_id).or_insert(0);
491 *v += data_size;
492 }
493
494 size_and_start_user_keys
495 .sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref()));
496 let mut splits = Vec::with_capacity(size_and_start_user_keys.len());
497 splits.push(KeyRange::new(Bytes::new(), Bytes::new()));
498 let sstable_size = (storage_opts.sstable_size_mb as u64) << 20;
499 let min_sstable_size = (storage_opts.min_sstable_size_mb as u64) << 20;
500 let parallel_compact_size = (storage_opts.parallel_compact_size_mb as u64) << 20;
501 let parallelism = std::cmp::min(
502 storage_opts.share_buffers_sync_parallelism as u64,
503 size_and_start_user_keys.len() as u64,
504 );
505 let sub_compaction_data_size = if compact_data_size > parallel_compact_size && parallelism > 1 {
506 compact_data_size / parallelism
507 } else {
508 compact_data_size
509 };
510
511 if parallelism > 1 && compact_data_size > sstable_size {
512 let mut last_buffer_size = 0;
513 let mut last_key: Vec<u8> = vec![];
514 for (data_size, key) in size_and_start_user_keys {
515 if last_buffer_size >= sub_compaction_data_size && !last_key.eq(&key) {
516 splits.last_mut().unwrap().right = Bytes::from(key.clone());
517 splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
518 last_buffer_size = data_size;
519 } else {
520 last_buffer_size += data_size;
521 }
522
523 last_key = key;
524 }
525 }
526
527 if compact_data_size > sstable_size {
528 for table_id in existing_table_ids {
531 if let Some(table_size) = table_size_infos.get(table_id)
532 && *table_size > min_sstable_size
533 {
534 table_vnode_partition.insert(*table_id, 1);
535 }
536 }
537 }
538
539 let sub_compaction_sstable_size = std::cmp::min(sstable_size, sub_compaction_data_size * 6 / 5);
542 (splits, sub_compaction_sstable_size, table_vnode_partition)
543}
544
545pub struct SharedBufferCompactRunner {
546 compactor: Compactor,
547 split_index: usize,
548}
549
550impl SharedBufferCompactRunner {
551 pub fn new(
552 split_index: usize,
553 key_range: KeyRange,
554 context: CompactorContext,
555 sub_compaction_sstable_size: usize,
556 table_vnode_partition: BTreeMap<TableId, u32>,
557 use_block_based_filter: bool,
558 object_id_getter: Arc<dyn GetObjectId>,
559 ) -> Self {
560 let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
561 options.capacity = sub_compaction_sstable_size;
562 let compactor = Compactor::new(
563 context,
564 options,
565 super::TaskConfig {
566 key_range,
567 cache_policy: CachePolicy::Fill(Hint::Normal),
568 gc_delete_keys: GC_DELETE_KEYS_FOR_FLUSH,
569 retain_multiple_version: true,
570 stats_target_table_ids: None,
571 task_type: compact_task::TaskType::SharedBuffer,
572 table_vnode_partition,
573 use_block_based_filter,
574 table_schemas: Default::default(),
575 disable_drop_column_optimization: false,
576 },
577 object_id_getter,
578 );
579 Self {
580 compactor,
581 split_index,
582 }
583 }
584
585 pub async fn run(
586 self,
587 iter: impl HummockIterator<Direction = Forward>,
588 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
589 ) -> HummockResult<CompactOutput> {
590 let dummy_compaction_filter = DummyCompactionFilter {};
591 let (ssts, table_stats_map) = self
592 .compactor
593 .compact_key_range(
594 iter,
595 dummy_compaction_filter,
596 compaction_catalog_agent_ref,
597 None,
598 None,
599 None,
600 )
601 .await?;
602 Ok((self.split_index, ssts, table_stats_map))
603 }
604}
605
606#[cfg(test)]
607mod tests {
608 use std::collections::HashSet;
609
610 use bytes::Bytes;
611 use risingwave_common::catalog::TableId;
612 use risingwave_common::hash::VirtualNode;
613 use risingwave_common::util::epoch::test_epoch;
614 use risingwave_hummock_sdk::key::{TableKey, prefix_slice_with_vnode};
615
616 use crate::hummock::compactor::shared_buffer_compact::generate_splits;
617 use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferValue;
618 use crate::mem_table::ImmutableMemtable;
619 use crate::opts::StorageOpts;
620
621 fn generate_key(key: &str) -> TableKey<Bytes> {
622 TableKey(prefix_slice_with_vnode(
623 VirtualNode::from_index(1),
624 key.as_bytes(),
625 ))
626 }
627
628 #[tokio::test]
629 async fn test_generate_splits_in_order() {
630 let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test(
631 test_epoch(3),
632 0,
633 vec![(
634 generate_key("dddd"),
635 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
636 )],
637 1024 * 1024,
638 TableId::new(1),
639 );
640 let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test(
641 test_epoch(3),
642 0,
643 vec![(
644 generate_key("abb"),
645 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
646 )],
647 (1024 + 256) * 1024,
648 TableId::new(1),
649 );
650
651 let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test(
652 test_epoch(2),
653 0,
654 vec![(
655 generate_key("abc"),
656 SharedBufferValue::Insert(Bytes::from_static(b"v2")),
657 )],
658 (1024 + 512) * 1024,
659 TableId::new(1),
660 );
661 let imm4 = ImmutableMemtable::build_shared_buffer_batch_for_test(
662 test_epoch(3),
663 0,
664 vec![(
665 generate_key("aaa"),
666 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
667 )],
668 (1024 + 512) * 1024,
669 TableId::new(1),
670 );
671
672 let imm5 = ImmutableMemtable::build_shared_buffer_batch_for_test(
673 test_epoch(3),
674 0,
675 vec![(
676 generate_key("aaa"),
677 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
678 )],
679 (1024 + 256) * 1024,
680 TableId::new(2),
681 );
682
683 let storage_opts = StorageOpts {
684 share_buffers_sync_parallelism: 3,
685 parallel_compact_size_mb: 1,
686 sstable_size_mb: 1,
687 ..Default::default()
688 };
689 let payload = vec![imm1, imm2, imm3, imm4, imm5];
690 let (splits, _sstable_capacity, vnodes) = generate_splits(
691 &payload,
692 &HashSet::from_iter([1.into(), 2.into()]),
693 &storage_opts,
694 );
695 assert_eq!(
696 splits.len(),
697 storage_opts.share_buffers_sync_parallelism as usize
698 );
699 assert!(vnodes.is_empty());
700
701 for i in 1..splits.len() {
703 assert_eq!(splits[i].left, splits[i - 1].right);
704 assert!(splits[i].left > splits[i - 1].left);
705 assert!(splits[i].right.is_empty() || splits[i].left < splits[i].right);
706 }
707 }
708
709 #[tokio::test]
710 async fn test_generate_splits_no_duplicate_keys() {
711 let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test(
714 test_epoch(1),
715 0,
716 vec![(
717 generate_key("zzz"), SharedBufferValue::Insert(Bytes::from_static(b"v1")),
719 )],
720 2 * 1024 * 1024, TableId::new(1),
722 );
723
724 let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test(
725 test_epoch(1),
726 0,
727 vec![(
728 generate_key("aaa"), SharedBufferValue::Insert(Bytes::from_static(b"v1")),
730 )],
731 2 * 1024 * 1024, TableId::new(1),
733 );
734
735 let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test(
736 test_epoch(1),
737 0,
738 vec![(
739 generate_key("mmm"), SharedBufferValue::Insert(Bytes::from_static(b"v1")),
741 )],
742 2 * 1024 * 1024, TableId::new(1),
744 );
745
746 let storage_opts = StorageOpts {
747 share_buffers_sync_parallelism: 3, parallel_compact_size_mb: 2, sstable_size_mb: 1, ..Default::default()
751 };
752
753 let payload = vec![imm1, imm2, imm3];
755 let (splits, _sstable_capacity, _vnodes) =
756 generate_splits(&payload, &HashSet::from_iter([1.into()]), &storage_opts);
757
758 assert!(
760 splits.len() > 1,
761 "Expected multiple splits, got {}",
762 splits.len()
763 );
764
765 for i in 0..splits.len() {
767 for j in (i + 1)..splits.len() {
768 let split_i = &splits[i];
769 let split_j = &splits[j];
770
771 if !split_i.right.is_empty() && !split_j.left.is_empty() {
773 assert!(
774 split_i.right <= split_j.left || split_j.right <= split_i.left,
775 "Split {} and {} overlap: [{:?}, {:?}) vs [{:?}, {:?})",
776 i,
777 j,
778 split_i.left,
779 split_i.right,
780 split_j.left,
781 split_j.right
782 );
783 }
784 }
785 }
786
787 for i in 1..splits.len() {
789 if !splits[i - 1].right.is_empty() && !splits[i].left.is_empty() {
790 assert!(
791 splits[i - 1].right <= splits[i].left,
792 "Splits are not in sorted order at index {}: {:?} > {:?}",
793 i,
794 splits[i - 1].right,
795 splits[i].left
796 );
797 }
798 }
799 }
800}