1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::Bound;
17use std::sync::atomic::AtomicUsize;
18use std::sync::atomic::Ordering::Relaxed;
19use std::sync::{Arc, LazyLock};
20
21use await_tree::InstrumentAwait;
22use bytes::Bytes;
23use foyer::Hint;
24use futures::future::try_join;
25use futures::{FutureExt, StreamExt, stream};
26use itertools::Itertools;
27use risingwave_common::catalog::TableId;
28use risingwave_hummock_sdk::key::{EPOCH_LEN, FullKey, FullKeyTracker, UserKey};
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::{EpochWithGap, KeyComparator, LocalSstableInfo};
31use risingwave_pb::hummock::compact_task;
32use thiserror_ext::AsReport;
33use tracing::{error, warn};
34
35use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
36use crate::hummock::compactor::compaction_filter::DummyCompactionFilter;
37use crate::hummock::compactor::context::{CompactorContext, await_tree_key};
38use crate::hummock::compactor::{CompactOutput, Compactor, check_flush_result};
39use crate::hummock::event_handler::uploader::UploadTaskOutput;
40use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator, UserIterator};
41use crate::hummock::shared_buffer::shared_buffer_batch::{
42 SharedBufferBatch, SharedBufferBatchInner, SharedBufferBatchOldValues, SharedBufferKeyEntry,
43 VersionedSharedBufferValue,
44};
45use crate::hummock::{
46 CachePolicy, GetObjectId, HummockError, HummockResult, ObjectIdManagerRef,
47 SstableBuilderOptions,
48};
49use crate::mem_table::ImmutableMemtable;
50use crate::opts::StorageOpts;
51
52const GC_DELETE_KEYS_FOR_FLUSH: bool = false;
53
54pub async fn compact(
56 context: CompactorContext,
57 object_id_manager: ObjectIdManagerRef,
58 payload: Vec<ImmutableMemtable>,
59 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
60) -> HummockResult<UploadTaskOutput> {
61 let table_ids_with_old_value: HashSet<TableId> = payload
62 .iter()
63 .filter(|imm| imm.has_old_value())
64 .map(|imm| imm.table_id)
65 .collect();
66 let mut non_log_store_new_value_payload = Vec::with_capacity(payload.len());
67 let mut log_store_new_value_payload = Vec::with_capacity(payload.len());
68 let mut old_value_payload = Vec::with_capacity(payload.len());
69 for imm in payload {
70 if table_ids_with_old_value.contains(&imm.table_id) {
71 if imm.has_old_value() {
72 old_value_payload.push(imm.clone());
73 }
74 log_store_new_value_payload.push(imm);
75 } else {
76 assert!(!imm.has_old_value());
77 non_log_store_new_value_payload.push(imm);
78 }
79 }
80 let non_log_store_new_value_future = async {
81 if non_log_store_new_value_payload.is_empty() {
82 Ok(vec![])
83 } else {
84 compact_shared_buffer::<true>(
85 context.clone(),
86 object_id_manager.clone(),
87 compaction_catalog_manager_ref.clone(),
88 non_log_store_new_value_payload,
89 )
90 .instrument_await("shared_buffer_compact_non_log_store_new_value")
91 .await
92 }
93 };
94
95 let log_store_new_value_future = async {
96 if log_store_new_value_payload.is_empty() {
97 Ok(vec![])
98 } else {
99 compact_shared_buffer::<true>(
100 context.clone(),
101 object_id_manager.clone(),
102 compaction_catalog_manager_ref.clone(),
103 log_store_new_value_payload,
104 )
105 .instrument_await("shared_buffer_compact_log_store_new_value")
106 .await
107 }
108 };
109
110 let old_value_future = async {
111 if old_value_payload.is_empty() {
112 Ok(vec![])
113 } else {
114 compact_shared_buffer::<false>(
115 context.clone(),
116 object_id_manager.clone(),
117 compaction_catalog_manager_ref.clone(),
118 old_value_payload,
119 )
120 .instrument_await("shared_buffer_compact_log_store_old_value")
121 .await
122 }
123 };
124
125 let ((non_log_store_new_value_ssts, log_store_new_value_ssts), old_value_ssts) = try_join(
127 try_join(non_log_store_new_value_future, log_store_new_value_future),
128 old_value_future,
129 )
130 .await?;
131
132 let mut new_value_ssts = non_log_store_new_value_ssts;
133 new_value_ssts.extend(log_store_new_value_ssts);
134
135 Ok(UploadTaskOutput {
136 new_value_ssts,
137 old_value_ssts,
138 wait_poll_timer: None,
139 })
140}
141
142async fn compact_shared_buffer<const IS_NEW_VALUE: bool>(
147 context: CompactorContext,
148 object_id_manager: ObjectIdManagerRef,
149 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
150 mut payload: Vec<ImmutableMemtable>,
151) -> HummockResult<Vec<LocalSstableInfo>> {
152 if !IS_NEW_VALUE {
153 assert!(payload.iter().all(|imm| imm.has_old_value()));
154 }
155 let existing_table_ids: HashSet<TableId> =
157 payload.iter().map(|imm| imm.table_id).dedup().collect();
158 assert!(!existing_table_ids.is_empty());
159
160 let compaction_catalog_agent_ref = compaction_catalog_manager_ref
161 .acquire(existing_table_ids.iter().copied().collect())
162 .await?;
163 let existing_table_ids = compaction_catalog_agent_ref
164 .table_ids()
165 .collect::<HashSet<_>>();
166 payload.retain(|imm| {
167 let ret = existing_table_ids.contains(&imm.table_id);
168 if !ret {
169 error!(
170 "can not find table {:?}, it may be removed by meta-service",
171 imm.table_id
172 );
173 }
174 ret
175 });
176
177 let total_key_count = payload.iter().map(|imm| imm.key_count()).sum::<usize>();
178 let (splits, sub_compaction_sstable_size, table_vnode_partition) =
179 generate_splits(&payload, &existing_table_ids, context.storage_opts.as_ref());
180 let parallelism = splits.len();
181 let mut compact_success = true;
182 let mut output_ssts = Vec::with_capacity(parallelism);
183 let mut compaction_futures = vec![];
184 let use_block_based_filter =
187 risingwave_hummock_sdk::filter_utils::is_kv_count_too_large_for_xor16(
188 total_key_count as u64,
189 None,
190 );
191
192 for (split_index, key_range) in splits.into_iter().enumerate() {
193 let compactor = SharedBufferCompactRunner::new(
194 split_index,
195 key_range,
196 context.clone(),
197 sub_compaction_sstable_size as usize,
198 table_vnode_partition.clone(),
199 use_block_based_filter,
200 object_id_manager.clone(),
201 );
202 let mut forward_iters = Vec::with_capacity(payload.len());
203 for imm in &payload {
204 forward_iters.push(imm.clone().into_directed_iter::<Forward, IS_NEW_VALUE>());
205 }
206 let compaction_executor = context.compaction_executor.clone();
207 let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
208 let handle = compaction_executor.spawn({
209 static NEXT_SHARED_BUFFER_COMPACT_ID: LazyLock<AtomicUsize> =
210 LazyLock::new(|| AtomicUsize::new(0));
211 let tree_root = context.await_tree_reg.as_ref().map(|reg| {
212 let id = NEXT_SHARED_BUFFER_COMPACT_ID.fetch_add(1, Relaxed);
213 reg.register(
214 await_tree_key::CompactSharedBuffer { id },
215 format!(
216 "Compact Shared Buffer: {:?}",
217 payload
218 .iter()
219 .flat_map(|imm| imm.epochs().iter())
220 .copied()
221 .collect::<BTreeSet<_>>()
222 ),
223 )
224 });
225 let future = compactor.run(
226 MergeIterator::new(forward_iters),
227 compaction_catalog_agent_ref,
228 );
229 if let Some(root) = tree_root {
230 root.instrument(future).left_future()
231 } else {
232 future.right_future()
233 }
234 });
235 compaction_futures.push(handle);
236 }
237
238 let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
239 let mut err = None;
240 while let Some(future_result) = buffered.next().await {
241 match future_result {
242 Ok(Ok((split_index, ssts, table_stats_map))) => {
243 output_ssts.push((split_index, ssts, table_stats_map));
244 }
245 Ok(Err(e)) => {
246 compact_success = false;
247 tracing::warn!(error = %e.as_report(), "Shared Buffer Compaction failed with error");
248 err = Some(e);
249 }
250 Err(e) => {
251 compact_success = false;
252 tracing::warn!(
253 error = %e.as_report(),
254 "Shared Buffer Compaction failed with future error",
255 );
256 err = Some(HummockError::compaction_executor(
257 "failed while execute in tokio",
258 ));
259 }
260 }
261 }
262
263 output_ssts.sort_by_key(|(split_index, ..)| *split_index);
265
266 if compact_success {
267 let mut level0 = Vec::with_capacity(parallelism);
268 let mut sst_infos = vec![];
269 for (_, ssts, _) in output_ssts {
270 for sst_info in &ssts {
271 context
272 .compactor_metrics
273 .write_build_l0_bytes
274 .inc_by(sst_info.file_size());
275
276 sst_infos.push(sst_info.sst_info.clone());
277 }
278 level0.extend(ssts);
279 }
280 if context.storage_opts.check_compaction_result {
281 let compaction_executor = context.compaction_executor.clone();
282 let mut forward_iters = Vec::with_capacity(payload.len());
283 for imm in &payload {
284 if !existing_table_ids.contains(&imm.table_id) {
285 continue;
286 }
287 forward_iters.push(imm.clone().into_forward_iter());
288 }
289 let iter = MergeIterator::new(forward_iters);
290 let left_iter = UserIterator::new(
291 iter,
292 (Bound::Unbounded, Bound::Unbounded),
293 u64::MAX,
294 0,
295 None,
296 );
297 compaction_executor.spawn(async move {
298 match check_flush_result(
299 left_iter,
300 Vec::from_iter(existing_table_ids.iter().cloned()),
301 sst_infos,
302 context,
303 )
304 .await
305 {
306 Err(e) => {
307 tracing::warn!(error = %e.as_report(), "Failed check flush result of memtable");
308 }
309 Ok(true) => (),
310 Ok(false) => {
311 panic!(
312 "failed to check flush result consistency of state-table {:?}",
313 existing_table_ids
314 );
315 }
316 }
317 });
318 }
319 Ok(level0)
320 } else {
321 Err(err.unwrap())
322 }
323}
324
325pub async fn merge_imms_in_memory(
327 table_id: TableId,
328 imms: Vec<ImmutableMemtable>,
329) -> ImmutableMemtable {
330 let mut epochs = vec![];
331 let mut merged_size = 0;
332 assert!(imms.iter().rev().map(|imm| imm.batch_id()).is_sorted());
333 let max_imm_id = imms[0].batch_id();
334
335 let has_old_value = imms[0].has_old_value();
336 assert!(imms.iter().all(|imm| imm.has_old_value() == has_old_value));
339
340 let (old_value_size, global_old_value_size) = if has_old_value {
341 (
342 imms.iter()
343 .map(|imm| imm.old_values().expect("has old value").size)
344 .sum(),
345 Some(
346 imms[0]
347 .old_values()
348 .expect("has old value")
349 .global_old_value_size
350 .clone(),
351 ),
352 )
353 } else {
354 (0, None)
355 };
356
357 let mut imm_iters = Vec::with_capacity(imms.len());
358 let key_count = imms.iter().map(|imm| imm.key_count()).sum();
359 let value_count = imms.iter().map(|imm| imm.value_count()).sum();
360 for imm in imms {
361 assert!(imm.key_count() > 0, "imm should not be empty");
362 assert_eq!(
363 table_id,
364 imm.table_id(),
365 "should only merge data belonging to the same table"
366 );
367
368 epochs.push(imm.min_epoch());
369 merged_size += imm.size();
370
371 imm_iters.push(imm.into_forward_iter());
372 }
373 epochs.sort();
374
375 let mut mi = MergeIterator::new(imm_iters);
377 mi.rewind_no_await();
378 assert!(mi.is_valid());
379
380 let first_item_key = mi.current_key_entry().key.clone();
381
382 let mut merged_entries: Vec<SharedBufferKeyEntry> = Vec::with_capacity(key_count);
383 let mut values: Vec<VersionedSharedBufferValue> = Vec::with_capacity(value_count);
384 let mut old_values: Option<Vec<Bytes>> = if has_old_value {
385 Some(Vec::with_capacity(value_count))
386 } else {
387 None
388 };
389
390 merged_entries.push(SharedBufferKeyEntry {
391 key: first_item_key.clone(),
392 value_offset: 0,
393 });
394
395 let mut full_key_tracker = FullKeyTracker::<Bytes>::new(FullKey::new_with_gap_epoch(
397 table_id,
398 first_item_key,
399 EpochWithGap::new_max_epoch(),
400 ));
401
402 while mi.is_valid() {
403 let key_entry = mi.current_key_entry();
404 let user_key = UserKey {
405 table_id,
406 table_key: key_entry.key.clone(),
407 };
408 if full_key_tracker.observe_multi_version(
409 user_key,
410 key_entry
411 .new_values
412 .iter()
413 .map(|(epoch_with_gap, _)| *epoch_with_gap),
414 ) {
415 let last_entry = merged_entries.last_mut().expect("non-empty");
416 if last_entry.value_offset == values.len() {
417 warn!(key = ?last_entry.key, "key has no value in imm compact. skipped");
418 last_entry.key = full_key_tracker.latest_user_key().table_key.clone();
419 } else {
420 merged_entries.push(SharedBufferKeyEntry {
422 key: full_key_tracker.latest_user_key().table_key.clone(),
423 value_offset: values.len(),
424 });
425 }
426 }
427 values.extend(
428 key_entry
429 .new_values
430 .iter()
431 .map(|(epoch_with_gap, value)| (*epoch_with_gap, value.clone())),
432 );
433 if let Some(old_values) = &mut old_values {
434 old_values.extend(key_entry.old_values.expect("should exist").iter().cloned())
435 }
436 mi.advance_peek_to_next_key();
437 tokio::task::consume_budget().await;
440 }
441
442 let old_values = old_values.map(|old_values| {
443 SharedBufferBatchOldValues::new(
444 old_values,
445 old_value_size,
446 global_old_value_size.expect("should exist when has old value"),
447 )
448 });
449
450 SharedBufferBatch {
451 inner: Arc::new(SharedBufferBatchInner::new_with_multi_epoch_batches(
452 epochs,
453 merged_entries,
454 values,
455 old_values,
456 merged_size,
457 max_imm_id,
458 )),
459 table_id,
460 }
461}
462
463fn generate_splits(
465 payload: &Vec<ImmutableMemtable>,
466 existing_table_ids: &HashSet<TableId>,
467 storage_opts: &StorageOpts,
468) -> (Vec<KeyRange>, u64, BTreeMap<TableId, u32>) {
469 let mut size_and_start_user_keys = vec![];
470 let mut compact_data_size = 0;
471 let mut table_size_infos: HashMap<TableId, u64> = HashMap::default();
472 let mut table_vnode_partition = BTreeMap::default();
473 for imm in payload {
474 let data_size = {
475 (imm.value_count() * EPOCH_LEN + imm.size()) as u64
477 };
478 compact_data_size += data_size;
479 size_and_start_user_keys.push((
480 data_size,
481 FullKey {
482 user_key: imm.start_user_key(),
483 epoch_with_gap: EpochWithGap::new_max_epoch(),
484 }
485 .encode(),
486 ));
487 let v = table_size_infos.entry(imm.table_id).or_insert(0);
488 *v += data_size;
489 }
490
491 size_and_start_user_keys
492 .sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref()));
493 let mut splits = Vec::with_capacity(size_and_start_user_keys.len());
494 splits.push(KeyRange::new(Bytes::new(), Bytes::new()));
495 let sstable_size = (storage_opts.sstable_size_mb as u64) << 20;
496 let min_sstable_size = (storage_opts.min_sstable_size_mb as u64) << 20;
497 let parallel_compact_size = (storage_opts.parallel_compact_size_mb as u64) << 20;
498 let parallelism = std::cmp::min(
499 storage_opts.share_buffers_sync_parallelism as u64,
500 size_and_start_user_keys.len() as u64,
501 );
502 let sub_compaction_data_size = if compact_data_size > parallel_compact_size && parallelism > 1 {
503 compact_data_size / parallelism
504 } else {
505 compact_data_size
506 };
507
508 if parallelism > 1 && compact_data_size > sstable_size {
509 let mut last_buffer_size = 0;
510 let mut last_key: Vec<u8> = vec![];
511 for (data_size, key) in size_and_start_user_keys {
512 if last_buffer_size >= sub_compaction_data_size && !last_key.eq(&key) {
513 splits.last_mut().unwrap().right = Bytes::from(key.clone());
514 splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
515 last_buffer_size = data_size;
516 } else {
517 last_buffer_size += data_size;
518 }
519
520 last_key = key;
521 }
522 }
523
524 if compact_data_size > sstable_size {
525 for table_id in existing_table_ids {
528 if let Some(table_size) = table_size_infos.get(table_id)
529 && *table_size > min_sstable_size
530 {
531 table_vnode_partition.insert(*table_id, 1);
532 }
533 }
534 }
535
536 let sub_compaction_sstable_size = std::cmp::min(sstable_size, sub_compaction_data_size * 6 / 5);
539 (splits, sub_compaction_sstable_size, table_vnode_partition)
540}
541
542pub struct SharedBufferCompactRunner {
543 compactor: Compactor,
544 split_index: usize,
545}
546
547impl SharedBufferCompactRunner {
548 pub fn new(
549 split_index: usize,
550 key_range: KeyRange,
551 context: CompactorContext,
552 sub_compaction_sstable_size: usize,
553 table_vnode_partition: BTreeMap<TableId, u32>,
554 use_block_based_filter: bool,
555 object_id_getter: Arc<dyn GetObjectId>,
556 ) -> Self {
557 let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
558 options.capacity = sub_compaction_sstable_size;
559 let compactor = Compactor::new(
560 context,
561 options,
562 super::TaskConfig {
563 key_range,
564 cache_policy: CachePolicy::Fill(Hint::Normal),
565 gc_delete_keys: GC_DELETE_KEYS_FOR_FLUSH,
566 retain_multiple_version: true,
567 stats_target_table_ids: None,
568 task_type: compact_task::TaskType::SharedBuffer,
569 table_vnode_partition,
570 use_block_based_filter,
571 table_schemas: Default::default(),
572 disable_drop_column_optimization: false,
573 },
574 object_id_getter,
575 );
576 Self {
577 compactor,
578 split_index,
579 }
580 }
581
582 pub async fn run(
583 self,
584 iter: impl HummockIterator<Direction = Forward>,
585 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
586 ) -> HummockResult<CompactOutput> {
587 let dummy_compaction_filter = DummyCompactionFilter {};
588 let (ssts, table_stats_map) = self
589 .compactor
590 .compact_key_range(
591 iter,
592 dummy_compaction_filter,
593 compaction_catalog_agent_ref,
594 None,
595 None,
596 None,
597 )
598 .await?;
599 Ok((self.split_index, ssts, table_stats_map))
600 }
601}
602
603#[cfg(test)]
604mod tests {
605 use std::collections::HashSet;
606
607 use bytes::Bytes;
608 use risingwave_common::catalog::TableId;
609 use risingwave_common::hash::VirtualNode;
610 use risingwave_common::util::epoch::test_epoch;
611 use risingwave_hummock_sdk::key::{TableKey, prefix_slice_with_vnode};
612
613 use crate::hummock::compactor::shared_buffer_compact::generate_splits;
614 use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferValue;
615 use crate::mem_table::ImmutableMemtable;
616 use crate::opts::StorageOpts;
617
618 fn generate_key(key: &str) -> TableKey<Bytes> {
619 TableKey(prefix_slice_with_vnode(
620 VirtualNode::from_index(1),
621 key.as_bytes(),
622 ))
623 }
624
625 #[tokio::test]
626 async fn test_generate_splits_in_order() {
627 let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test(
628 test_epoch(3),
629 0,
630 vec![(
631 generate_key("dddd"),
632 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
633 )],
634 1024 * 1024,
635 TableId::new(1),
636 );
637 let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test(
638 test_epoch(3),
639 0,
640 vec![(
641 generate_key("abb"),
642 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
643 )],
644 (1024 + 256) * 1024,
645 TableId::new(1),
646 );
647
648 let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test(
649 test_epoch(2),
650 0,
651 vec![(
652 generate_key("abc"),
653 SharedBufferValue::Insert(Bytes::from_static(b"v2")),
654 )],
655 (1024 + 512) * 1024,
656 TableId::new(1),
657 );
658 let imm4 = ImmutableMemtable::build_shared_buffer_batch_for_test(
659 test_epoch(3),
660 0,
661 vec![(
662 generate_key("aaa"),
663 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
664 )],
665 (1024 + 512) * 1024,
666 TableId::new(1),
667 );
668
669 let imm5 = ImmutableMemtable::build_shared_buffer_batch_for_test(
670 test_epoch(3),
671 0,
672 vec![(
673 generate_key("aaa"),
674 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
675 )],
676 (1024 + 256) * 1024,
677 TableId::new(2),
678 );
679
680 let storage_opts = StorageOpts {
681 share_buffers_sync_parallelism: 3,
682 parallel_compact_size_mb: 1,
683 sstable_size_mb: 1,
684 ..Default::default()
685 };
686 let payload = vec![imm1, imm2, imm3, imm4, imm5];
687 let (splits, _sstable_capacity, vnodes) = generate_splits(
688 &payload,
689 &HashSet::from_iter([1.into(), 2.into()]),
690 &storage_opts,
691 );
692 assert_eq!(
693 splits.len(),
694 storage_opts.share_buffers_sync_parallelism as usize
695 );
696 assert!(vnodes.is_empty());
697
698 for i in 1..splits.len() {
700 assert_eq!(splits[i].left, splits[i - 1].right);
701 assert!(splits[i].left > splits[i - 1].left);
702 assert!(splits[i].right.is_empty() || splits[i].left < splits[i].right);
703 }
704 }
705
706 #[tokio::test]
707 async fn test_generate_splits_no_duplicate_keys() {
708 let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test(
711 test_epoch(1),
712 0,
713 vec![(
714 generate_key("zzz"), SharedBufferValue::Insert(Bytes::from_static(b"v1")),
716 )],
717 2 * 1024 * 1024, TableId::new(1),
719 );
720
721 let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test(
722 test_epoch(1),
723 0,
724 vec![(
725 generate_key("aaa"), SharedBufferValue::Insert(Bytes::from_static(b"v1")),
727 )],
728 2 * 1024 * 1024, TableId::new(1),
730 );
731
732 let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test(
733 test_epoch(1),
734 0,
735 vec![(
736 generate_key("mmm"), SharedBufferValue::Insert(Bytes::from_static(b"v1")),
738 )],
739 2 * 1024 * 1024, TableId::new(1),
741 );
742
743 let storage_opts = StorageOpts {
744 share_buffers_sync_parallelism: 3, parallel_compact_size_mb: 2, sstable_size_mb: 1, ..Default::default()
748 };
749
750 let payload = vec![imm1, imm2, imm3];
752 let (splits, _sstable_capacity, _vnodes) =
753 generate_splits(&payload, &HashSet::from_iter([1.into()]), &storage_opts);
754
755 assert!(
757 splits.len() > 1,
758 "Expected multiple splits, got {}",
759 splits.len()
760 );
761
762 for i in 0..splits.len() {
764 for j in (i + 1)..splits.len() {
765 let split_i = &splits[i];
766 let split_j = &splits[j];
767
768 if !split_i.right.is_empty() && !split_j.left.is_empty() {
770 assert!(
771 split_i.right <= split_j.left || split_j.right <= split_i.left,
772 "Split {} and {} overlap: [{:?}, {:?}) vs [{:?}, {:?})",
773 i,
774 j,
775 split_i.left,
776 split_i.right,
777 split_j.left,
778 split_j.right
779 );
780 }
781 }
782 }
783
784 for i in 1..splits.len() {
786 if !splits[i - 1].right.is_empty() && !splits[i].left.is_empty() {
787 assert!(
788 splits[i - 1].right <= splits[i].left,
789 "Splits are not in sorted order at index {}: {:?} > {:?}",
790 i,
791 splits[i - 1].right,
792 splits[i].left
793 );
794 }
795 }
796 }
797}