1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::Bound;
17use std::sync::atomic::AtomicUsize;
18use std::sync::atomic::Ordering::Relaxed;
19use std::sync::{Arc, LazyLock};
20
21use await_tree::InstrumentAwait;
22use bytes::Bytes;
23use foyer::Hint;
24use futures::future::try_join;
25use futures::{FutureExt, StreamExt, stream};
26use itertools::Itertools;
27use risingwave_common::catalog::TableId;
28use risingwave_hummock_sdk::key::{EPOCH_LEN, FullKey, FullKeyTracker, UserKey};
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::{EpochWithGap, KeyComparator, LocalSstableInfo};
31use thiserror_ext::AsReport;
32use tracing::{error, warn};
33
34use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
35use crate::hummock::compactor::compaction_filter::DummyCompactionFilter;
36use crate::hummock::compactor::context::{CompactorContext, await_tree_key};
37use crate::hummock::compactor::{CompactOutput, Compactor, check_flush_result};
38use crate::hummock::event_handler::uploader::UploadTaskOutput;
39use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator, UserIterator};
40use crate::hummock::shared_buffer::shared_buffer_batch::{
41 SharedBufferBatch, SharedBufferBatchInner, SharedBufferBatchOldValues, SharedBufferKeyEntry,
42 VersionedSharedBufferValue,
43};
44use crate::hummock::{
45 CachePolicy, GetObjectId, HummockError, HummockResult, ObjectIdManagerRef,
46 SstableBuilderOptions,
47};
48use crate::mem_table::ImmutableMemtable;
49use crate::opts::StorageOpts;
50
51const GC_DELETE_KEYS_FOR_FLUSH: bool = false;
52
53pub async fn compact(
55 context: CompactorContext,
56 object_id_manager: ObjectIdManagerRef,
57 payload: Vec<ImmutableMemtable>,
58 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
59) -> HummockResult<UploadTaskOutput> {
60 let table_ids_with_old_value: HashSet<TableId> = payload
61 .iter()
62 .filter(|imm| imm.has_old_value())
63 .map(|imm| imm.table_id)
64 .collect();
65 let mut non_log_store_new_value_payload = Vec::with_capacity(payload.len());
66 let mut log_store_new_value_payload = Vec::with_capacity(payload.len());
67 let mut old_value_payload = Vec::with_capacity(payload.len());
68 for imm in payload {
69 if table_ids_with_old_value.contains(&imm.table_id) {
70 if imm.has_old_value() {
71 old_value_payload.push(imm.clone());
72 }
73 log_store_new_value_payload.push(imm);
74 } else {
75 assert!(!imm.has_old_value());
76 non_log_store_new_value_payload.push(imm);
77 }
78 }
79 let non_log_store_new_value_future = async {
80 if non_log_store_new_value_payload.is_empty() {
81 Ok(vec![])
82 } else {
83 compact_shared_buffer::<true>(
84 context.clone(),
85 object_id_manager.clone(),
86 compaction_catalog_manager_ref.clone(),
87 non_log_store_new_value_payload,
88 )
89 .instrument_await("shared_buffer_compact_non_log_store_new_value")
90 .await
91 }
92 };
93
94 let log_store_new_value_future = async {
95 if log_store_new_value_payload.is_empty() {
96 Ok(vec![])
97 } else {
98 compact_shared_buffer::<true>(
99 context.clone(),
100 object_id_manager.clone(),
101 compaction_catalog_manager_ref.clone(),
102 log_store_new_value_payload,
103 )
104 .instrument_await("shared_buffer_compact_log_store_new_value")
105 .await
106 }
107 };
108
109 let old_value_future = async {
110 if old_value_payload.is_empty() {
111 Ok(vec![])
112 } else {
113 compact_shared_buffer::<false>(
114 context.clone(),
115 object_id_manager.clone(),
116 compaction_catalog_manager_ref.clone(),
117 old_value_payload,
118 )
119 .instrument_await("shared_buffer_compact_log_store_old_value")
120 .await
121 }
122 };
123
124 let ((non_log_store_new_value_ssts, log_store_new_value_ssts), old_value_ssts) = try_join(
126 try_join(non_log_store_new_value_future, log_store_new_value_future),
127 old_value_future,
128 )
129 .await?;
130
131 let mut new_value_ssts = non_log_store_new_value_ssts;
132 new_value_ssts.extend(log_store_new_value_ssts);
133
134 Ok(UploadTaskOutput {
135 new_value_ssts,
136 old_value_ssts,
137 wait_poll_timer: None,
138 })
139}
140
141async fn compact_shared_buffer<const IS_NEW_VALUE: bool>(
146 context: CompactorContext,
147 object_id_manager: ObjectIdManagerRef,
148 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
149 mut payload: Vec<ImmutableMemtable>,
150) -> HummockResult<Vec<LocalSstableInfo>> {
151 if !IS_NEW_VALUE {
152 assert!(payload.iter().all(|imm| imm.has_old_value()));
153 }
154 let existing_table_ids: HashSet<TableId> =
156 payload.iter().map(|imm| imm.table_id).dedup().collect();
157 assert!(!existing_table_ids.is_empty());
158
159 let compaction_catalog_agent_ref = compaction_catalog_manager_ref
160 .acquire(existing_table_ids.iter().copied().collect())
161 .await?;
162 let existing_table_ids = compaction_catalog_agent_ref
163 .table_ids()
164 .collect::<HashSet<_>>();
165 payload.retain(|imm| {
166 let ret = existing_table_ids.contains(&imm.table_id);
167 if !ret {
168 error!(
169 "can not find table {:?}, it may be removed by meta-service",
170 imm.table_id
171 );
172 }
173 ret
174 });
175
176 let total_key_count = payload.iter().map(|imm| imm.key_count()).sum::<usize>();
177 let (splits, sub_compaction_sstable_size, table_vnode_partition) =
178 generate_splits(&payload, &existing_table_ids, context.storage_opts.as_ref());
179 let parallelism = splits.len();
180 let mut compact_success = true;
181 let mut output_ssts = Vec::with_capacity(parallelism);
182 let mut compaction_futures = vec![];
183 let use_block_based_filter =
186 risingwave_hummock_sdk::filter_utils::is_kv_count_too_large_for_xor16(
187 total_key_count as u64,
188 None,
189 );
190
191 for (split_index, key_range) in splits.into_iter().enumerate() {
192 let compactor = SharedBufferCompactRunner::new(
193 split_index,
194 key_range,
195 context.clone(),
196 sub_compaction_sstable_size as usize,
197 table_vnode_partition.clone(),
198 use_block_based_filter,
199 object_id_manager.clone(),
200 );
201 let mut forward_iters = Vec::with_capacity(payload.len());
202 for imm in &payload {
203 forward_iters.push(imm.clone().into_directed_iter::<Forward, IS_NEW_VALUE>());
204 }
205 let compaction_executor = context.compaction_executor.clone();
206 let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
207 let handle = compaction_executor.spawn({
208 static NEXT_SHARED_BUFFER_COMPACT_ID: LazyLock<AtomicUsize> =
209 LazyLock::new(|| AtomicUsize::new(0));
210 let tree_root = context.await_tree_reg.as_ref().map(|reg| {
211 let id = NEXT_SHARED_BUFFER_COMPACT_ID.fetch_add(1, Relaxed);
212 reg.register(
213 await_tree_key::CompactSharedBuffer { id },
214 format!(
215 "Compact Shared Buffer: {:?}",
216 payload
217 .iter()
218 .flat_map(|imm| imm.epochs().iter())
219 .copied()
220 .collect::<BTreeSet<_>>()
221 ),
222 )
223 });
224 let future = compactor.run(
225 MergeIterator::new(forward_iters),
226 compaction_catalog_agent_ref,
227 );
228 if let Some(root) = tree_root {
229 root.instrument(future).left_future()
230 } else {
231 future.right_future()
232 }
233 });
234 compaction_futures.push(handle);
235 }
236
237 let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
238 let mut err = None;
239 while let Some(future_result) = buffered.next().await {
240 match future_result {
241 Ok(Ok((split_index, ssts, table_stats_map))) => {
242 output_ssts.push((split_index, ssts, table_stats_map));
243 }
244 Ok(Err(e)) => {
245 compact_success = false;
246 tracing::warn!(error = %e.as_report(), "Shared Buffer Compaction failed with error");
247 err = Some(e);
248 }
249 Err(e) => {
250 compact_success = false;
251 tracing::warn!(
252 error = %e.as_report(),
253 "Shared Buffer Compaction failed with future error",
254 );
255 err = Some(HummockError::compaction_executor(
256 "failed while execute in tokio",
257 ));
258 }
259 }
260 }
261
262 output_ssts.sort_by_key(|(split_index, ..)| *split_index);
264
265 if compact_success {
266 let mut level0 = Vec::with_capacity(parallelism);
267 let mut sst_infos = vec![];
268 for (_, ssts, _) in output_ssts {
269 for sst_info in &ssts {
270 context
271 .compactor_metrics
272 .write_build_l0_bytes
273 .inc_by(sst_info.file_size());
274
275 sst_infos.push(sst_info.sst_info.clone());
276 }
277 level0.extend(ssts);
278 }
279 if context.storage_opts.check_compaction_result {
280 let compaction_executor = context.compaction_executor.clone();
281 let mut forward_iters = Vec::with_capacity(payload.len());
282 for imm in &payload {
283 if !existing_table_ids.contains(&imm.table_id) {
284 continue;
285 }
286 forward_iters.push(imm.clone().into_forward_iter());
287 }
288 let iter = MergeIterator::new(forward_iters);
289 let left_iter = UserIterator::new(
290 iter,
291 (Bound::Unbounded, Bound::Unbounded),
292 u64::MAX,
293 0,
294 None,
295 );
296 compaction_executor.spawn(async move {
297 match check_flush_result(
298 left_iter,
299 sst_infos,
300 context,
301 )
302 .await
303 {
304 Err(e) => {
305 tracing::warn!(error = %e.as_report(), "Failed check flush result of memtable");
306 }
307 Ok(true) => (),
308 Ok(false) => {
309 panic!(
310 "failed to check flush result consistency of state-table {:?}",
311 existing_table_ids
312 );
313 }
314 }
315 });
316 }
317 Ok(level0)
318 } else {
319 Err(err.unwrap())
320 }
321}
322
323pub async fn merge_imms_in_memory(
325 table_id: TableId,
326 imms: Vec<ImmutableMemtable>,
327) -> ImmutableMemtable {
328 let mut epochs = vec![];
329 let mut merged_size = 0;
330 assert!(imms.iter().rev().map(|imm| imm.batch_id()).is_sorted());
331 let max_imm_id = imms[0].batch_id();
332
333 let has_old_value = imms[0].has_old_value();
334 assert!(imms.iter().all(|imm| imm.has_old_value() == has_old_value));
337
338 let (old_value_size, global_old_value_size) = if has_old_value {
339 (
340 imms.iter()
341 .map(|imm| imm.old_values().expect("has old value").size)
342 .sum(),
343 Some(
344 imms[0]
345 .old_values()
346 .expect("has old value")
347 .global_old_value_size
348 .clone(),
349 ),
350 )
351 } else {
352 (0, None)
353 };
354
355 let mut imm_iters = Vec::with_capacity(imms.len());
356 let key_count = imms.iter().map(|imm| imm.key_count()).sum();
357 let value_count = imms.iter().map(|imm| imm.value_count()).sum();
358 for imm in imms {
359 assert!(imm.key_count() > 0, "imm should not be empty");
360 assert_eq!(
361 table_id,
362 imm.table_id(),
363 "should only merge data belonging to the same table"
364 );
365
366 epochs.push(imm.min_epoch());
367 merged_size += imm.size();
368
369 imm_iters.push(imm.into_forward_iter());
370 }
371 epochs.sort();
372
373 let mut mi = MergeIterator::new(imm_iters);
375 mi.rewind_no_await();
376 assert!(mi.is_valid());
377
378 let first_item_key = mi.current_key_entry().key.clone();
379
380 let mut merged_entries: Vec<SharedBufferKeyEntry> = Vec::with_capacity(key_count);
381 let mut values: Vec<VersionedSharedBufferValue> = Vec::with_capacity(value_count);
382 let mut old_values: Option<Vec<Bytes>> = if has_old_value {
383 Some(Vec::with_capacity(value_count))
384 } else {
385 None
386 };
387
388 merged_entries.push(SharedBufferKeyEntry {
389 key: first_item_key.clone(),
390 value_offset: 0,
391 });
392
393 let mut full_key_tracker = FullKeyTracker::<Bytes>::new(FullKey::new_with_gap_epoch(
395 table_id,
396 first_item_key,
397 EpochWithGap::new_max_epoch(),
398 ));
399
400 while mi.is_valid() {
401 let key_entry = mi.current_key_entry();
402 let user_key = UserKey {
403 table_id,
404 table_key: key_entry.key.clone(),
405 };
406 if full_key_tracker.observe_multi_version(
407 user_key,
408 key_entry
409 .new_values
410 .iter()
411 .map(|(epoch_with_gap, _)| *epoch_with_gap),
412 ) {
413 let last_entry = merged_entries.last_mut().expect("non-empty");
414 if last_entry.value_offset == values.len() {
415 warn!(key = ?last_entry.key, "key has no value in imm compact. skipped");
416 last_entry.key = full_key_tracker.latest_user_key().table_key.clone();
417 } else {
418 merged_entries.push(SharedBufferKeyEntry {
420 key: full_key_tracker.latest_user_key().table_key.clone(),
421 value_offset: values.len(),
422 });
423 }
424 }
425 values.extend(
426 key_entry
427 .new_values
428 .iter()
429 .map(|(epoch_with_gap, value)| (*epoch_with_gap, value.clone())),
430 );
431 if let Some(old_values) = &mut old_values {
432 old_values.extend(key_entry.old_values.expect("should exist").iter().cloned())
433 }
434 mi.advance_peek_to_next_key();
435 tokio::task::consume_budget().await;
438 }
439
440 let old_values = old_values.map(|old_values| {
441 SharedBufferBatchOldValues::new(
442 old_values,
443 old_value_size,
444 global_old_value_size.expect("should exist when has old value"),
445 )
446 });
447
448 SharedBufferBatch {
449 inner: Arc::new(SharedBufferBatchInner::new_with_multi_epoch_batches(
450 epochs,
451 merged_entries,
452 values,
453 old_values,
454 merged_size,
455 max_imm_id,
456 )),
457 table_id,
458 }
459}
460
461fn generate_splits(
463 payload: &Vec<ImmutableMemtable>,
464 existing_table_ids: &HashSet<TableId>,
465 storage_opts: &StorageOpts,
466) -> (Vec<KeyRange>, u64, BTreeMap<TableId, u32>) {
467 let mut size_and_start_user_keys = vec![];
468 let mut compact_data_size = 0;
469 let mut table_size_infos: HashMap<TableId, u64> = HashMap::default();
470 let mut table_vnode_partition = BTreeMap::default();
471 for imm in payload {
472 let data_size = {
473 (imm.value_count() * EPOCH_LEN + imm.size()) as u64
475 };
476 compact_data_size += data_size;
477 size_and_start_user_keys.push((
478 data_size,
479 FullKey {
480 user_key: imm.start_user_key(),
481 epoch_with_gap: EpochWithGap::new_max_epoch(),
482 }
483 .encode(),
484 ));
485 let v = table_size_infos.entry(imm.table_id).or_insert(0);
486 *v += data_size;
487 }
488
489 size_and_start_user_keys
490 .sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref()));
491 let mut splits = Vec::with_capacity(size_and_start_user_keys.len());
492 splits.push(KeyRange::new(Bytes::new(), Bytes::new()));
493 let sstable_size = (storage_opts.sstable_size_mb as u64) << 20;
494 let min_sstable_size = (storage_opts.min_sstable_size_mb as u64) << 20;
495 let parallel_compact_size = (storage_opts.parallel_compact_size_mb as u64) << 20;
496 let parallelism = std::cmp::min(
497 storage_opts.share_buffers_sync_parallelism as u64,
498 size_and_start_user_keys.len() as u64,
499 );
500 let sub_compaction_data_size = if compact_data_size > parallel_compact_size && parallelism > 1 {
501 compact_data_size / parallelism
502 } else {
503 compact_data_size
504 };
505
506 if parallelism > 1 && compact_data_size > sstable_size {
507 let mut last_buffer_size = 0;
508 let mut last_key: Vec<u8> = vec![];
509 for (data_size, key) in size_and_start_user_keys {
510 if last_buffer_size >= sub_compaction_data_size && !last_key.eq(&key) {
511 splits.last_mut().unwrap().right = Bytes::from(key.clone());
512 splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
513 last_buffer_size = data_size;
514 } else {
515 last_buffer_size += data_size;
516 }
517
518 last_key = key;
519 }
520 }
521
522 if compact_data_size > sstable_size {
523 for table_id in existing_table_ids {
526 if let Some(table_size) = table_size_infos.get(table_id)
527 && *table_size > min_sstable_size
528 {
529 table_vnode_partition.insert(*table_id, 1);
530 }
531 }
532 }
533
534 let sub_compaction_sstable_size = std::cmp::min(sstable_size, sub_compaction_data_size * 6 / 5);
537 (splits, sub_compaction_sstable_size, table_vnode_partition)
538}
539
540pub struct SharedBufferCompactRunner {
541 compactor: Compactor,
542 split_index: usize,
543}
544
545impl SharedBufferCompactRunner {
546 pub fn new(
547 split_index: usize,
548 key_range: KeyRange,
549 context: CompactorContext,
550 sub_compaction_sstable_size: usize,
551 table_vnode_partition: BTreeMap<TableId, u32>,
552 use_block_based_filter: bool,
553 object_id_getter: Arc<dyn GetObjectId>,
554 ) -> Self {
555 let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
556 options.capacity = sub_compaction_sstable_size;
557 let compactor = Compactor::new(
558 context,
559 options,
560 super::TaskConfig {
561 key_range,
562 cache_policy: CachePolicy::Fill(Hint::Normal),
563 gc_delete_keys: GC_DELETE_KEYS_FOR_FLUSH,
564 retain_multiple_version: true,
565 table_vnode_partition,
566 use_block_based_filter,
567 table_schemas: Default::default(),
568 disable_drop_column_optimization: false,
569 },
570 object_id_getter,
571 );
572 Self {
573 compactor,
574 split_index,
575 }
576 }
577
578 pub async fn run(
579 self,
580 iter: impl HummockIterator<Direction = Forward>,
581 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
582 ) -> HummockResult<CompactOutput> {
583 let dummy_compaction_filter = DummyCompactionFilter {};
584 let (ssts, table_stats_map) = self
585 .compactor
586 .compact_key_range(
587 iter,
588 dummy_compaction_filter,
589 compaction_catalog_agent_ref,
590 None,
591 None,
592 None,
593 )
594 .await?;
595 Ok((self.split_index, ssts, table_stats_map))
596 }
597}
598
599#[cfg(test)]
600mod tests {
601 use std::collections::HashSet;
602
603 use bytes::Bytes;
604 use risingwave_common::catalog::TableId;
605 use risingwave_common::hash::VirtualNode;
606 use risingwave_common::util::epoch::test_epoch;
607 use risingwave_hummock_sdk::key::{TableKey, prefix_slice_with_vnode};
608
609 use crate::hummock::compactor::shared_buffer_compact::generate_splits;
610 use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferValue;
611 use crate::mem_table::ImmutableMemtable;
612 use crate::opts::StorageOpts;
613
614 fn generate_key(key: &str) -> TableKey<Bytes> {
615 TableKey(prefix_slice_with_vnode(
616 VirtualNode::from_index(1),
617 key.as_bytes(),
618 ))
619 }
620
621 #[tokio::test]
622 async fn test_generate_splits_in_order() {
623 let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test(
624 test_epoch(3),
625 0,
626 vec![(
627 generate_key("dddd"),
628 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
629 )],
630 1024 * 1024,
631 TableId::new(1),
632 );
633 let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test(
634 test_epoch(3),
635 0,
636 vec![(
637 generate_key("abb"),
638 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
639 )],
640 (1024 + 256) * 1024,
641 TableId::new(1),
642 );
643
644 let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test(
645 test_epoch(2),
646 0,
647 vec![(
648 generate_key("abc"),
649 SharedBufferValue::Insert(Bytes::from_static(b"v2")),
650 )],
651 (1024 + 512) * 1024,
652 TableId::new(1),
653 );
654 let imm4 = ImmutableMemtable::build_shared_buffer_batch_for_test(
655 test_epoch(3),
656 0,
657 vec![(
658 generate_key("aaa"),
659 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
660 )],
661 (1024 + 512) * 1024,
662 TableId::new(1),
663 );
664
665 let imm5 = ImmutableMemtable::build_shared_buffer_batch_for_test(
666 test_epoch(3),
667 0,
668 vec![(
669 generate_key("aaa"),
670 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
671 )],
672 (1024 + 256) * 1024,
673 TableId::new(2),
674 );
675
676 let storage_opts = StorageOpts {
677 share_buffers_sync_parallelism: 3,
678 parallel_compact_size_mb: 1,
679 sstable_size_mb: 1,
680 ..Default::default()
681 };
682 let payload = vec![imm1, imm2, imm3, imm4, imm5];
683 let (splits, _sstable_capacity, vnodes) = generate_splits(
684 &payload,
685 &HashSet::from_iter([1.into(), 2.into()]),
686 &storage_opts,
687 );
688 assert_eq!(
689 splits.len(),
690 storage_opts.share_buffers_sync_parallelism as usize
691 );
692 assert!(vnodes.is_empty());
693
694 for i in 1..splits.len() {
696 assert_eq!(splits[i].left, splits[i - 1].right);
697 assert!(splits[i].left > splits[i - 1].left);
698 assert!(splits[i].right.is_empty() || splits[i].left < splits[i].right);
699 }
700 }
701
702 #[tokio::test]
703 async fn test_generate_splits_no_duplicate_keys() {
704 let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test(
707 test_epoch(1),
708 0,
709 vec![(
710 generate_key("zzz"), SharedBufferValue::Insert(Bytes::from_static(b"v1")),
712 )],
713 2 * 1024 * 1024, TableId::new(1),
715 );
716
717 let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test(
718 test_epoch(1),
719 0,
720 vec![(
721 generate_key("aaa"), SharedBufferValue::Insert(Bytes::from_static(b"v1")),
723 )],
724 2 * 1024 * 1024, TableId::new(1),
726 );
727
728 let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test(
729 test_epoch(1),
730 0,
731 vec![(
732 generate_key("mmm"), SharedBufferValue::Insert(Bytes::from_static(b"v1")),
734 )],
735 2 * 1024 * 1024, TableId::new(1),
737 );
738
739 let storage_opts = StorageOpts {
740 share_buffers_sync_parallelism: 3, parallel_compact_size_mb: 2, sstable_size_mb: 1, ..Default::default()
744 };
745
746 let payload = vec![imm1, imm2, imm3];
748 let (splits, _sstable_capacity, _vnodes) =
749 generate_splits(&payload, &HashSet::from_iter([1.into()]), &storage_opts);
750
751 assert!(
753 splits.len() > 1,
754 "Expected multiple splits, got {}",
755 splits.len()
756 );
757
758 for i in 0..splits.len() {
760 for j in (i + 1)..splits.len() {
761 let split_i = &splits[i];
762 let split_j = &splits[j];
763
764 if !split_i.right.is_empty() && !split_j.left.is_empty() {
766 assert!(
767 split_i.right <= split_j.left || split_j.right <= split_i.left,
768 "Split {} and {} overlap: [{:?}, {:?}) vs [{:?}, {:?})",
769 i,
770 j,
771 split_i.left,
772 split_i.right,
773 split_j.left,
774 split_j.right
775 );
776 }
777 }
778 }
779
780 for i in 1..splits.len() {
782 if !splits[i - 1].right.is_empty() && !splits[i].left.is_empty() {
783 assert!(
784 splits[i - 1].right <= splits[i].left,
785 "Splits are not in sorted order at index {}: {:?} > {:?}",
786 i,
787 splits[i - 1].right,
788 splits[i].left
789 );
790 }
791 }
792 }
793}