1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::Bound;
17use std::sync::atomic::AtomicUsize;
18use std::sync::atomic::Ordering::Relaxed;
19use std::sync::{Arc, LazyLock};
20
21use await_tree::InstrumentAwait;
22use bytes::Bytes;
23use foyer::Hint;
24use futures::future::try_join;
25use futures::{FutureExt, StreamExt, stream};
26use itertools::Itertools;
27use risingwave_common::catalog::TableId;
28use risingwave_hummock_sdk::key::{EPOCH_LEN, FullKey, FullKeyTracker, UserKey};
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::{EpochWithGap, KeyComparator, LocalSstableInfo};
31use thiserror_ext::AsReport;
32use tracing::{error, warn};
33
34use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
35use crate::hummock::compactor::compaction_filter::DummyCompactionFilter;
36use crate::hummock::compactor::context::{CompactorContext, await_tree_key};
37use crate::hummock::compactor::{CompactOutput, Compactor, check_flush_result};
38use crate::hummock::event_handler::uploader::UploadTaskOutput;
39use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator, UserIterator};
40use crate::hummock::shared_buffer::shared_buffer_batch::{
41 SharedBufferBatch, SharedBufferBatchInner, SharedBufferBatchOldValues, SharedBufferKeyEntry,
42 VersionedSharedBufferValue,
43};
44use crate::hummock::{
45 CachePolicy, GetObjectId, HummockError, HummockResult, ObjectIdManagerRef,
46 SstableBuilderOptions,
47};
48use crate::mem_table::ImmutableMemtable;
49use crate::opts::StorageOpts;
50
51const GC_DELETE_KEYS_FOR_FLUSH: bool = false;
52
53pub async fn compact(
55 context: CompactorContext,
56 object_id_manager: ObjectIdManagerRef,
57 payload: Vec<ImmutableMemtable>,
58 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
59) -> HummockResult<UploadTaskOutput> {
60 let table_ids_with_old_value: HashSet<TableId> = payload
61 .iter()
62 .filter(|imm| imm.has_old_value())
63 .map(|imm| imm.table_id)
64 .collect();
65 let mut non_log_store_new_value_payload = Vec::with_capacity(payload.len());
66 let mut log_store_new_value_payload = Vec::with_capacity(payload.len());
67 let mut old_value_payload = Vec::with_capacity(payload.len());
68 for imm in payload {
69 if table_ids_with_old_value.contains(&imm.table_id) {
70 if imm.has_old_value() {
71 old_value_payload.push(imm.clone());
72 }
73 log_store_new_value_payload.push(imm);
74 } else {
75 assert!(!imm.has_old_value());
76 non_log_store_new_value_payload.push(imm);
77 }
78 }
79 let non_log_store_new_value_future = async {
80 if non_log_store_new_value_payload.is_empty() {
81 Ok(vec![])
82 } else {
83 compact_shared_buffer::<true>(
84 context.clone(),
85 object_id_manager.clone(),
86 compaction_catalog_manager_ref.clone(),
87 non_log_store_new_value_payload,
88 )
89 .instrument_await("shared_buffer_compact_non_log_store_new_value")
90 .await
91 }
92 };
93
94 let log_store_new_value_future = async {
95 if log_store_new_value_payload.is_empty() {
96 Ok(vec![])
97 } else {
98 compact_shared_buffer::<true>(
99 context.clone(),
100 object_id_manager.clone(),
101 compaction_catalog_manager_ref.clone(),
102 log_store_new_value_payload,
103 )
104 .instrument_await("shared_buffer_compact_log_store_new_value")
105 .await
106 }
107 };
108
109 let old_value_future = async {
110 if old_value_payload.is_empty() {
111 Ok(vec![])
112 } else {
113 compact_shared_buffer::<false>(
114 context.clone(),
115 object_id_manager.clone(),
116 compaction_catalog_manager_ref.clone(),
117 old_value_payload,
118 )
119 .instrument_await("shared_buffer_compact_log_store_old_value")
120 .await
121 }
122 };
123
124 let ((non_log_store_new_value_ssts, log_store_new_value_ssts), old_value_ssts) = try_join(
126 try_join(non_log_store_new_value_future, log_store_new_value_future),
127 old_value_future,
128 )
129 .await?;
130
131 let mut new_value_ssts = non_log_store_new_value_ssts;
132 new_value_ssts.extend(log_store_new_value_ssts);
133
134 Ok(UploadTaskOutput {
135 new_value_ssts,
136 old_value_ssts,
137 wait_poll_timer: None,
138 })
139}
140
141async fn compact_shared_buffer<const IS_NEW_VALUE: bool>(
146 context: CompactorContext,
147 object_id_manager: ObjectIdManagerRef,
148 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
149 mut payload: Vec<ImmutableMemtable>,
150) -> HummockResult<Vec<LocalSstableInfo>> {
151 if !IS_NEW_VALUE {
152 assert!(payload.iter().all(|imm| imm.has_old_value()));
153 }
154 let existing_table_ids: HashSet<TableId> =
156 payload.iter().map(|imm| imm.table_id).dedup().collect();
157 assert!(!existing_table_ids.is_empty());
158
159 let compaction_catalog_agent_ref = compaction_catalog_manager_ref
160 .acquire(existing_table_ids.iter().copied().collect())
161 .await?;
162 let existing_table_ids = compaction_catalog_agent_ref
163 .table_ids()
164 .collect::<HashSet<_>>();
165 payload.retain(|imm| {
166 let ret = existing_table_ids.contains(&imm.table_id);
167 if !ret {
168 error!(
169 "can not find table {:?}, it may be removed by meta-service",
170 imm.table_id
171 );
172 }
173 ret
174 });
175
176 let total_key_count = payload.iter().map(|imm| imm.key_count()).sum::<usize>();
177 let (splits, sub_compaction_sstable_size, table_vnode_partition) =
178 generate_splits(&payload, &existing_table_ids, context.storage_opts.as_ref());
179 let parallelism = splits.len();
180 let mut compact_success = true;
181 let mut output_ssts = Vec::with_capacity(parallelism);
182 let mut compaction_futures = vec![];
183 let use_block_based_filter =
186 risingwave_hummock_sdk::filter_utils::is_kv_count_too_large_for_xor16(
187 total_key_count as u64,
188 None,
189 );
190
191 for (split_index, key_range) in splits.into_iter().enumerate() {
192 let compactor = SharedBufferCompactRunner::new(
193 split_index,
194 key_range,
195 context.clone(),
196 sub_compaction_sstable_size as usize,
197 table_vnode_partition.clone(),
198 use_block_based_filter,
199 object_id_manager.clone(),
200 );
201 let mut forward_iters = Vec::with_capacity(payload.len());
202 for imm in &payload {
203 forward_iters.push(imm.clone().into_directed_iter::<Forward, IS_NEW_VALUE>());
204 }
205 let compaction_executor = context.compaction_executor.clone();
206 let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
207 let handle = compaction_executor.spawn({
208 static NEXT_SHARED_BUFFER_COMPACT_ID: LazyLock<AtomicUsize> =
209 LazyLock::new(|| AtomicUsize::new(0));
210 let tree_root = context.await_tree_reg.as_ref().map(|reg| {
211 let id = NEXT_SHARED_BUFFER_COMPACT_ID.fetch_add(1, Relaxed);
212 reg.register(
213 await_tree_key::CompactSharedBuffer { id },
214 format!(
215 "Compact Shared Buffer: {:?}",
216 payload
217 .iter()
218 .flat_map(|imm| imm.epochs().iter())
219 .copied()
220 .collect::<BTreeSet<_>>()
221 ),
222 )
223 });
224 let future = compactor.run(
225 MergeIterator::new(forward_iters),
226 compaction_catalog_agent_ref,
227 );
228 if let Some(root) = tree_root {
229 root.instrument(future).left_future()
230 } else {
231 future.right_future()
232 }
233 });
234 compaction_futures.push(handle);
235 }
236
237 let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
238 let mut err = None;
239 while let Some(future_result) = buffered.next().await {
240 match future_result {
241 Ok(Ok((split_index, ssts, table_stats_map))) => {
242 output_ssts.push((split_index, ssts, table_stats_map));
243 }
244 Ok(Err(e)) => {
245 compact_success = false;
246 tracing::warn!(error = %e.as_report(), "Shared Buffer Compaction failed with error");
247 err = Some(e);
248 }
249 Err(e) => {
250 compact_success = false;
251 tracing::warn!(
252 error = %e.as_report(),
253 "Shared Buffer Compaction failed with future error",
254 );
255 err = Some(HummockError::compaction_executor(
256 "failed while execute in tokio",
257 ));
258 }
259 }
260 }
261
262 output_ssts.sort_by_key(|(split_index, ..)| *split_index);
264
265 if compact_success {
266 let mut level0 = Vec::with_capacity(parallelism);
267 let mut sst_infos = vec![];
268 for (_, ssts, _) in output_ssts {
269 for sst_info in &ssts {
270 context
271 .compactor_metrics
272 .write_build_l0_bytes
273 .inc_by(sst_info.file_size());
274
275 sst_infos.push(sst_info.sst_info.clone());
276 }
277 level0.extend(ssts);
278 }
279 if context.storage_opts.check_compaction_result {
280 let compaction_executor = context.compaction_executor.clone();
281 let mut forward_iters = Vec::with_capacity(payload.len());
282 for imm in &payload {
283 if !existing_table_ids.contains(&imm.table_id) {
284 continue;
285 }
286 forward_iters.push(imm.clone().into_forward_iter());
287 }
288 let iter = MergeIterator::new(forward_iters);
289 let left_iter = UserIterator::new(
290 iter,
291 (Bound::Unbounded, Bound::Unbounded),
292 u64::MAX,
293 0,
294 None,
295 );
296 compaction_executor.spawn(async move {
297 match check_flush_result(
298 left_iter,
299 Vec::from_iter(existing_table_ids.iter().cloned()),
300 sst_infos,
301 context,
302 )
303 .await
304 {
305 Err(e) => {
306 tracing::warn!(error = %e.as_report(), "Failed check flush result of memtable");
307 }
308 Ok(true) => (),
309 Ok(false) => {
310 panic!(
311 "failed to check flush result consistency of state-table {:?}",
312 existing_table_ids
313 );
314 }
315 }
316 });
317 }
318 Ok(level0)
319 } else {
320 Err(err.unwrap())
321 }
322}
323
324pub async fn merge_imms_in_memory(
326 table_id: TableId,
327 imms: Vec<ImmutableMemtable>,
328) -> ImmutableMemtable {
329 let mut epochs = vec![];
330 let mut merged_size = 0;
331 assert!(imms.iter().rev().map(|imm| imm.batch_id()).is_sorted());
332 let max_imm_id = imms[0].batch_id();
333
334 let has_old_value = imms[0].has_old_value();
335 assert!(imms.iter().all(|imm| imm.has_old_value() == has_old_value));
338
339 let (old_value_size, global_old_value_size) = if has_old_value {
340 (
341 imms.iter()
342 .map(|imm| imm.old_values().expect("has old value").size)
343 .sum(),
344 Some(
345 imms[0]
346 .old_values()
347 .expect("has old value")
348 .global_old_value_size
349 .clone(),
350 ),
351 )
352 } else {
353 (0, None)
354 };
355
356 let mut imm_iters = Vec::with_capacity(imms.len());
357 let key_count = imms.iter().map(|imm| imm.key_count()).sum();
358 let value_count = imms.iter().map(|imm| imm.value_count()).sum();
359 for imm in imms {
360 assert!(imm.key_count() > 0, "imm should not be empty");
361 assert_eq!(
362 table_id,
363 imm.table_id(),
364 "should only merge data belonging to the same table"
365 );
366
367 epochs.push(imm.min_epoch());
368 merged_size += imm.size();
369
370 imm_iters.push(imm.into_forward_iter());
371 }
372 epochs.sort();
373
374 let mut mi = MergeIterator::new(imm_iters);
376 mi.rewind_no_await();
377 assert!(mi.is_valid());
378
379 let first_item_key = mi.current_key_entry().key.clone();
380
381 let mut merged_entries: Vec<SharedBufferKeyEntry> = Vec::with_capacity(key_count);
382 let mut values: Vec<VersionedSharedBufferValue> = Vec::with_capacity(value_count);
383 let mut old_values: Option<Vec<Bytes>> = if has_old_value {
384 Some(Vec::with_capacity(value_count))
385 } else {
386 None
387 };
388
389 merged_entries.push(SharedBufferKeyEntry {
390 key: first_item_key.clone(),
391 value_offset: 0,
392 });
393
394 let mut full_key_tracker = FullKeyTracker::<Bytes>::new(FullKey::new_with_gap_epoch(
396 table_id,
397 first_item_key,
398 EpochWithGap::new_max_epoch(),
399 ));
400
401 while mi.is_valid() {
402 let key_entry = mi.current_key_entry();
403 let user_key = UserKey {
404 table_id,
405 table_key: key_entry.key.clone(),
406 };
407 if full_key_tracker.observe_multi_version(
408 user_key,
409 key_entry
410 .new_values
411 .iter()
412 .map(|(epoch_with_gap, _)| *epoch_with_gap),
413 ) {
414 let last_entry = merged_entries.last_mut().expect("non-empty");
415 if last_entry.value_offset == values.len() {
416 warn!(key = ?last_entry.key, "key has no value in imm compact. skipped");
417 last_entry.key = full_key_tracker.latest_user_key().table_key.clone();
418 } else {
419 merged_entries.push(SharedBufferKeyEntry {
421 key: full_key_tracker.latest_user_key().table_key.clone(),
422 value_offset: values.len(),
423 });
424 }
425 }
426 values.extend(
427 key_entry
428 .new_values
429 .iter()
430 .map(|(epoch_with_gap, value)| (*epoch_with_gap, value.clone())),
431 );
432 if let Some(old_values) = &mut old_values {
433 old_values.extend(key_entry.old_values.expect("should exist").iter().cloned())
434 }
435 mi.advance_peek_to_next_key();
436 tokio::task::consume_budget().await;
439 }
440
441 let old_values = old_values.map(|old_values| {
442 SharedBufferBatchOldValues::new(
443 old_values,
444 old_value_size,
445 global_old_value_size.expect("should exist when has old value"),
446 )
447 });
448
449 SharedBufferBatch {
450 inner: Arc::new(SharedBufferBatchInner::new_with_multi_epoch_batches(
451 epochs,
452 merged_entries,
453 values,
454 old_values,
455 merged_size,
456 max_imm_id,
457 )),
458 table_id,
459 }
460}
461
462fn generate_splits(
464 payload: &Vec<ImmutableMemtable>,
465 existing_table_ids: &HashSet<TableId>,
466 storage_opts: &StorageOpts,
467) -> (Vec<KeyRange>, u64, BTreeMap<TableId, u32>) {
468 let mut size_and_start_user_keys = vec![];
469 let mut compact_data_size = 0;
470 let mut table_size_infos: HashMap<TableId, u64> = HashMap::default();
471 let mut table_vnode_partition = BTreeMap::default();
472 for imm in payload {
473 let data_size = {
474 (imm.value_count() * EPOCH_LEN + imm.size()) as u64
476 };
477 compact_data_size += data_size;
478 size_and_start_user_keys.push((
479 data_size,
480 FullKey {
481 user_key: imm.start_user_key(),
482 epoch_with_gap: EpochWithGap::new_max_epoch(),
483 }
484 .encode(),
485 ));
486 let v = table_size_infos.entry(imm.table_id).or_insert(0);
487 *v += data_size;
488 }
489
490 size_and_start_user_keys
491 .sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref()));
492 let mut splits = Vec::with_capacity(size_and_start_user_keys.len());
493 splits.push(KeyRange::new(Bytes::new(), Bytes::new()));
494 let sstable_size = (storage_opts.sstable_size_mb as u64) << 20;
495 let min_sstable_size = (storage_opts.min_sstable_size_mb as u64) << 20;
496 let parallel_compact_size = (storage_opts.parallel_compact_size_mb as u64) << 20;
497 let parallelism = std::cmp::min(
498 storage_opts.share_buffers_sync_parallelism as u64,
499 size_and_start_user_keys.len() as u64,
500 );
501 let sub_compaction_data_size = if compact_data_size > parallel_compact_size && parallelism > 1 {
502 compact_data_size / parallelism
503 } else {
504 compact_data_size
505 };
506
507 if parallelism > 1 && compact_data_size > sstable_size {
508 let mut last_buffer_size = 0;
509 let mut last_key: Vec<u8> = vec![];
510 for (data_size, key) in size_and_start_user_keys {
511 if last_buffer_size >= sub_compaction_data_size && !last_key.eq(&key) {
512 splits.last_mut().unwrap().right = Bytes::from(key.clone());
513 splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
514 last_buffer_size = data_size;
515 } else {
516 last_buffer_size += data_size;
517 }
518
519 last_key = key;
520 }
521 }
522
523 if compact_data_size > sstable_size {
524 for table_id in existing_table_ids {
527 if let Some(table_size) = table_size_infos.get(table_id)
528 && *table_size > min_sstable_size
529 {
530 table_vnode_partition.insert(*table_id, 1);
531 }
532 }
533 }
534
535 let sub_compaction_sstable_size = std::cmp::min(sstable_size, sub_compaction_data_size * 6 / 5);
538 (splits, sub_compaction_sstable_size, table_vnode_partition)
539}
540
541pub struct SharedBufferCompactRunner {
542 compactor: Compactor,
543 split_index: usize,
544}
545
546impl SharedBufferCompactRunner {
547 pub fn new(
548 split_index: usize,
549 key_range: KeyRange,
550 context: CompactorContext,
551 sub_compaction_sstable_size: usize,
552 table_vnode_partition: BTreeMap<TableId, u32>,
553 use_block_based_filter: bool,
554 object_id_getter: Arc<dyn GetObjectId>,
555 ) -> Self {
556 let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
557 options.capacity = sub_compaction_sstable_size;
558 let compactor = Compactor::new(
559 context,
560 options,
561 super::TaskConfig {
562 key_range,
563 cache_policy: CachePolicy::Fill(Hint::Normal),
564 gc_delete_keys: GC_DELETE_KEYS_FOR_FLUSH,
565 retain_multiple_version: true,
566 stats_target_table_ids: None,
567 table_vnode_partition,
568 use_block_based_filter,
569 table_schemas: Default::default(),
570 disable_drop_column_optimization: false,
571 },
572 object_id_getter,
573 );
574 Self {
575 compactor,
576 split_index,
577 }
578 }
579
580 pub async fn run(
581 self,
582 iter: impl HummockIterator<Direction = Forward>,
583 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
584 ) -> HummockResult<CompactOutput> {
585 let dummy_compaction_filter = DummyCompactionFilter {};
586 let (ssts, table_stats_map) = self
587 .compactor
588 .compact_key_range(
589 iter,
590 dummy_compaction_filter,
591 compaction_catalog_agent_ref,
592 None,
593 None,
594 None,
595 )
596 .await?;
597 Ok((self.split_index, ssts, table_stats_map))
598 }
599}
600
601#[cfg(test)]
602mod tests {
603 use std::collections::HashSet;
604
605 use bytes::Bytes;
606 use risingwave_common::catalog::TableId;
607 use risingwave_common::hash::VirtualNode;
608 use risingwave_common::util::epoch::test_epoch;
609 use risingwave_hummock_sdk::key::{TableKey, prefix_slice_with_vnode};
610
611 use crate::hummock::compactor::shared_buffer_compact::generate_splits;
612 use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferValue;
613 use crate::mem_table::ImmutableMemtable;
614 use crate::opts::StorageOpts;
615
616 fn generate_key(key: &str) -> TableKey<Bytes> {
617 TableKey(prefix_slice_with_vnode(
618 VirtualNode::from_index(1),
619 key.as_bytes(),
620 ))
621 }
622
623 #[tokio::test]
624 async fn test_generate_splits_in_order() {
625 let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test(
626 test_epoch(3),
627 0,
628 vec![(
629 generate_key("dddd"),
630 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
631 )],
632 1024 * 1024,
633 TableId::new(1),
634 );
635 let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test(
636 test_epoch(3),
637 0,
638 vec![(
639 generate_key("abb"),
640 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
641 )],
642 (1024 + 256) * 1024,
643 TableId::new(1),
644 );
645
646 let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test(
647 test_epoch(2),
648 0,
649 vec![(
650 generate_key("abc"),
651 SharedBufferValue::Insert(Bytes::from_static(b"v2")),
652 )],
653 (1024 + 512) * 1024,
654 TableId::new(1),
655 );
656 let imm4 = ImmutableMemtable::build_shared_buffer_batch_for_test(
657 test_epoch(3),
658 0,
659 vec![(
660 generate_key("aaa"),
661 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
662 )],
663 (1024 + 512) * 1024,
664 TableId::new(1),
665 );
666
667 let imm5 = ImmutableMemtable::build_shared_buffer_batch_for_test(
668 test_epoch(3),
669 0,
670 vec![(
671 generate_key("aaa"),
672 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
673 )],
674 (1024 + 256) * 1024,
675 TableId::new(2),
676 );
677
678 let storage_opts = StorageOpts {
679 share_buffers_sync_parallelism: 3,
680 parallel_compact_size_mb: 1,
681 sstable_size_mb: 1,
682 ..Default::default()
683 };
684 let payload = vec![imm1, imm2, imm3, imm4, imm5];
685 let (splits, _sstable_capacity, vnodes) = generate_splits(
686 &payload,
687 &HashSet::from_iter([1.into(), 2.into()]),
688 &storage_opts,
689 );
690 assert_eq!(
691 splits.len(),
692 storage_opts.share_buffers_sync_parallelism as usize
693 );
694 assert!(vnodes.is_empty());
695
696 for i in 1..splits.len() {
698 assert_eq!(splits[i].left, splits[i - 1].right);
699 assert!(splits[i].left > splits[i - 1].left);
700 assert!(splits[i].right.is_empty() || splits[i].left < splits[i].right);
701 }
702 }
703
704 #[tokio::test]
705 async fn test_generate_splits_no_duplicate_keys() {
706 let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test(
709 test_epoch(1),
710 0,
711 vec![(
712 generate_key("zzz"), SharedBufferValue::Insert(Bytes::from_static(b"v1")),
714 )],
715 2 * 1024 * 1024, TableId::new(1),
717 );
718
719 let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test(
720 test_epoch(1),
721 0,
722 vec![(
723 generate_key("aaa"), SharedBufferValue::Insert(Bytes::from_static(b"v1")),
725 )],
726 2 * 1024 * 1024, TableId::new(1),
728 );
729
730 let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test(
731 test_epoch(1),
732 0,
733 vec![(
734 generate_key("mmm"), SharedBufferValue::Insert(Bytes::from_static(b"v1")),
736 )],
737 2 * 1024 * 1024, TableId::new(1),
739 );
740
741 let storage_opts = StorageOpts {
742 share_buffers_sync_parallelism: 3, parallel_compact_size_mb: 2, sstable_size_mb: 1, ..Default::default()
746 };
747
748 let payload = vec![imm1, imm2, imm3];
750 let (splits, _sstable_capacity, _vnodes) =
751 generate_splits(&payload, &HashSet::from_iter([1.into()]), &storage_opts);
752
753 assert!(
755 splits.len() > 1,
756 "Expected multiple splits, got {}",
757 splits.len()
758 );
759
760 for i in 0..splits.len() {
762 for j in (i + 1)..splits.len() {
763 let split_i = &splits[i];
764 let split_j = &splits[j];
765
766 if !split_i.right.is_empty() && !split_j.left.is_empty() {
768 assert!(
769 split_i.right <= split_j.left || split_j.right <= split_i.left,
770 "Split {} and {} overlap: [{:?}, {:?}) vs [{:?}, {:?})",
771 i,
772 j,
773 split_i.left,
774 split_i.right,
775 split_j.left,
776 split_j.right
777 );
778 }
779 }
780 }
781
782 for i in 1..splits.len() {
784 if !splits[i - 1].right.is_empty() && !splits[i].left.is_empty() {
785 assert!(
786 splits[i - 1].right <= splits[i].left,
787 "Splits are not in sorted order at index {}: {:?} > {:?}",
788 i,
789 splits[i - 1].right,
790 splits[i].left
791 );
792 }
793 }
794 }
795}