risingwave_storage/hummock/compactor/
shared_buffer_compact.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::Bound;
17use std::sync::atomic::AtomicUsize;
18use std::sync::atomic::Ordering::Relaxed;
19use std::sync::{Arc, LazyLock};
20
21use await_tree::InstrumentAwait;
22use bytes::Bytes;
23use foyer::Hint;
24use futures::future::try_join;
25use futures::{FutureExt, StreamExt, stream};
26use itertools::Itertools;
27use risingwave_common::catalog::TableId;
28use risingwave_hummock_sdk::key::{EPOCH_LEN, FullKey};
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::{EpochWithGap, KeyComparator, LocalSstableInfo};
31use risingwave_pb::hummock::{PbSstableFilterLayout, PbSstableFilterType};
32use thiserror_ext::AsReport;
33use tracing::error;
34
35use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
36use crate::hummock::compactor::compaction_filter::DummyCompactionFilter;
37use crate::hummock::compactor::compaction_utils::{
38    blocked_xor_filter_key_count_threshold, estimate_output_key_count_by_size,
39};
40use crate::hummock::compactor::context::{CompactorContext, await_tree_key};
41use crate::hummock::compactor::{CompactOutput, Compactor, check_flush_result};
42use crate::hummock::event_handler::uploader::UploadTaskOutput;
43use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator, UserIterator};
44use crate::hummock::{
45    CachePolicy, GetObjectId, HummockError, HummockResult, ObjectIdManagerRef,
46    SstableBuilderOptions,
47};
48use crate::mem_table::ImmutableMemtable;
49use crate::opts::StorageOpts;
50
51const GC_DELETE_KEYS_FOR_FLUSH: bool = false;
52
53/// Flush shared buffer to level0. Resulted SSTs are grouped by compaction group.
54pub async fn compact(
55    context: CompactorContext,
56    object_id_manager: ObjectIdManagerRef,
57    payload: Vec<ImmutableMemtable>,
58    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
59) -> HummockResult<UploadTaskOutput> {
60    let table_ids_with_old_value: HashSet<TableId> = payload
61        .iter()
62        .filter(|imm| imm.has_old_value())
63        .map(|imm| imm.table_id)
64        .collect();
65    let mut non_log_store_new_value_payload = Vec::with_capacity(payload.len());
66    let mut log_store_new_value_payload = Vec::with_capacity(payload.len());
67    let mut old_value_payload = Vec::with_capacity(payload.len());
68    for imm in payload {
69        if table_ids_with_old_value.contains(&imm.table_id) {
70            if imm.has_old_value() {
71                old_value_payload.push(imm.clone());
72            }
73            log_store_new_value_payload.push(imm);
74        } else {
75            assert!(!imm.has_old_value());
76            non_log_store_new_value_payload.push(imm);
77        }
78    }
79    let non_log_store_new_value_future = async {
80        if non_log_store_new_value_payload.is_empty() {
81            Ok(vec![])
82        } else {
83            compact_shared_buffer::<true>(
84                context.clone(),
85                object_id_manager.clone(),
86                compaction_catalog_manager_ref.clone(),
87                non_log_store_new_value_payload,
88            )
89            .instrument_await("shared_buffer_compact_non_log_store_new_value")
90            .await
91        }
92    };
93
94    let log_store_new_value_future = async {
95        if log_store_new_value_payload.is_empty() {
96            Ok(vec![])
97        } else {
98            compact_shared_buffer::<true>(
99                context.clone(),
100                object_id_manager.clone(),
101                compaction_catalog_manager_ref.clone(),
102                log_store_new_value_payload,
103            )
104            .instrument_await("shared_buffer_compact_log_store_new_value")
105            .await
106        }
107    };
108
109    let old_value_future = async {
110        if old_value_payload.is_empty() {
111            Ok(vec![])
112        } else {
113            compact_shared_buffer::<false>(
114                context.clone(),
115                object_id_manager.clone(),
116                compaction_catalog_manager_ref.clone(),
117                old_value_payload,
118            )
119            .instrument_await("shared_buffer_compact_log_store_old_value")
120            .await
121        }
122    };
123
124    // Note that the output is reordered compared with input `payload`.
125    let ((non_log_store_new_value_ssts, log_store_new_value_ssts), old_value_ssts) = try_join(
126        try_join(non_log_store_new_value_future, log_store_new_value_future),
127        old_value_future,
128    )
129    .await?;
130
131    let mut new_value_ssts = non_log_store_new_value_ssts;
132    new_value_ssts.extend(log_store_new_value_ssts);
133
134    Ok(UploadTaskOutput {
135        new_value_ssts,
136        old_value_ssts,
137        wait_poll_timer: None,
138    })
139}
140
141/// For compaction from shared buffer to level 0, this is the only function gets called.
142///
143/// The `IS_NEW_VALUE` flag means for the given payload, we are doing compaction using its new value or old value.
144/// When `IS_NEW_VALUE` is false, we are compacting with old value, and the payload imms should have `old_values` not `None`
145async fn compact_shared_buffer<const IS_NEW_VALUE: bool>(
146    context: CompactorContext,
147    object_id_manager: ObjectIdManagerRef,
148    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
149    mut payload: Vec<ImmutableMemtable>,
150) -> HummockResult<Vec<LocalSstableInfo>> {
151    if !IS_NEW_VALUE {
152        assert!(payload.iter().all(|imm| imm.has_old_value()));
153    }
154    // Local memory compaction looks at all key ranges.
155    let existing_table_ids: HashSet<TableId> =
156        payload.iter().map(|imm| imm.table_id).dedup().collect();
157    assert!(!existing_table_ids.is_empty());
158
159    let compaction_catalog_agent_ref = compaction_catalog_manager_ref
160        .acquire(existing_table_ids.iter().copied().collect())
161        .await?;
162    let existing_table_ids = compaction_catalog_agent_ref
163        .table_ids()
164        .collect::<HashSet<_>>();
165    payload.retain(|imm| {
166        let ret = existing_table_ids.contains(&imm.table_id);
167        if !ret {
168            error!(
169                "can not find table {:?}, it may be removed by meta-service",
170                imm.table_id
171            );
172        }
173        ret
174    });
175
176    let total_key_count = payload.iter().map(|imm| imm.key_count()).sum::<usize>();
177    let (splits, sub_compaction_sstable_size, table_vnode_partition, compact_data_size) =
178        generate_splits(&payload, &existing_table_ids, context.storage_opts.as_ref());
179    let parallelism = splits.len();
180    let mut compact_success = true;
181    let mut output_ssts = Vec::with_capacity(parallelism);
182    let mut compaction_futures = vec![];
183    // Shared buffer compaction always goes to L0. Use a blocked filter when kv_count is large.
184    // Use None to apply the default threshold since shared buffer flush doesn't have a CompactTask.
185    let estimated_output_key_count = estimate_output_key_count_by_size(
186        total_key_count as u64,
187        compact_data_size,
188        sub_compaction_sstable_size as usize,
189    );
190    let sstable_filter_layout =
191        if risingwave_hummock_sdk::filter_utils::should_use_blocked_xor_filter_by_kv_count(
192            estimated_output_key_count as u64,
193            None,
194        ) {
195            PbSstableFilterLayout::Blocked
196        } else {
197            PbSstableFilterLayout::Plain
198        };
199
200    for (split_index, key_range) in splits.into_iter().enumerate() {
201        let compactor = SharedBufferCompactRunner::new(
202            split_index,
203            key_range,
204            context.clone(),
205            sub_compaction_sstable_size as usize,
206            estimated_output_key_count,
207            table_vnode_partition.clone(),
208            sstable_filter_layout,
209            object_id_manager.clone(),
210        );
211        let mut forward_iters = Vec::with_capacity(payload.len());
212        for imm in &payload {
213            forward_iters.push(imm.clone().into_directed_iter::<Forward, IS_NEW_VALUE>());
214        }
215        let compaction_executor = context.compaction_executor.clone();
216        let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
217        let handle = compaction_executor.spawn({
218            static NEXT_SHARED_BUFFER_COMPACT_ID: LazyLock<AtomicUsize> =
219                LazyLock::new(|| AtomicUsize::new(0));
220            let tree_root = context.await_tree_reg.as_ref().map(|reg| {
221                let id = NEXT_SHARED_BUFFER_COMPACT_ID.fetch_add(1, Relaxed);
222                reg.register(
223                    await_tree_key::CompactSharedBuffer { id },
224                    format!(
225                        "Compact Shared Buffer: {:?}",
226                        payload
227                            .iter()
228                            .map(|imm| imm.epoch())
229                            .collect::<BTreeSet<_>>()
230                    ),
231                )
232            });
233            let future = compactor.run(
234                MergeIterator::new(forward_iters),
235                compaction_catalog_agent_ref,
236            );
237            if let Some(root) = tree_root {
238                root.instrument(future).left_future()
239            } else {
240                future.right_future()
241            }
242        });
243        compaction_futures.push(handle);
244    }
245
246    let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
247    let mut err = None;
248    while let Some(future_result) = buffered.next().await {
249        match future_result {
250            Ok(Ok((split_index, ssts, table_stats_map))) => {
251                output_ssts.push((split_index, ssts, table_stats_map));
252            }
253            Ok(Err(e)) => {
254                compact_success = false;
255                tracing::warn!(error = %e.as_report(), "Shared Buffer Compaction failed with error");
256                err = Some(e);
257            }
258            Err(e) => {
259                compact_success = false;
260                tracing::warn!(
261                    error = %e.as_report(),
262                    "Shared Buffer Compaction failed with future error",
263                );
264                err = Some(HummockError::compaction_executor(
265                    "failed while execute in tokio",
266                ));
267            }
268        }
269    }
270
271    // Sort by split/key range index.
272    output_ssts.sort_by_key(|(split_index, ..)| *split_index);
273
274    if compact_success {
275        let mut level0 = Vec::with_capacity(parallelism);
276        let mut sst_infos = vec![];
277        for (_, ssts, _) in output_ssts {
278            for sst_info in &ssts {
279                context
280                    .compactor_metrics
281                    .write_build_l0_bytes
282                    .inc_by(sst_info.file_size());
283
284                sst_infos.push(sst_info.sst_info.clone());
285            }
286            level0.extend(ssts);
287        }
288        if context.storage_opts.check_compaction_result {
289            let compaction_executor = context.compaction_executor.clone();
290            let mut forward_iters = Vec::with_capacity(payload.len());
291            for imm in &payload {
292                if !existing_table_ids.contains(&imm.table_id) {
293                    continue;
294                }
295                forward_iters.push(imm.clone().into_forward_iter());
296            }
297            let iter = MergeIterator::new(forward_iters);
298            let left_iter = UserIterator::new(
299                iter,
300                (Bound::Unbounded, Bound::Unbounded),
301                u64::MAX,
302                0,
303                None,
304            );
305            compaction_executor.spawn(async move {
306                match check_flush_result(
307                    left_iter,
308                    sst_infos,
309                    context,
310                )
311                .await
312                {
313                    Err(e) => {
314                        tracing::warn!(error = %e.as_report(), "Failed check flush result of memtable");
315                    }
316                    Ok(true) => (),
317                    Ok(false) => {
318                        panic!(
319                            "failed to check flush result consistency of state-table {:?}",
320                            existing_table_ids
321                        );
322                    }
323                }
324            });
325        }
326        Ok(level0)
327    } else {
328        Err(err.unwrap())
329    }
330}
331
332///  Based on the incoming payload and opts, calculate the sharding method and sstable size of shared buffer compaction.
333fn generate_splits(
334    payload: &Vec<ImmutableMemtable>,
335    existing_table_ids: &HashSet<TableId>,
336    storage_opts: &StorageOpts,
337) -> (Vec<KeyRange>, u64, BTreeMap<TableId, u32>, u64) {
338    let mut size_and_start_user_keys = vec![];
339    let mut compact_data_size = 0;
340    let mut table_size_infos: HashMap<TableId, u64> = HashMap::default();
341    let mut table_vnode_partition = BTreeMap::default();
342    for imm in payload {
343        let data_size = {
344            // calculate encoded bytes of key var length
345            (imm.value_count() * EPOCH_LEN + imm.size()) as u64
346        };
347        compact_data_size += data_size;
348        size_and_start_user_keys.push((
349            data_size,
350            FullKey {
351                user_key: imm.start_user_key(),
352                epoch_with_gap: EpochWithGap::new_max_epoch(),
353            }
354            .encode(),
355        ));
356        let v = table_size_infos.entry(imm.table_id).or_insert(0);
357        *v += data_size;
358    }
359
360    size_and_start_user_keys
361        .sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref()));
362    let mut splits = Vec::with_capacity(size_and_start_user_keys.len());
363    splits.push(KeyRange::new(Bytes::new(), Bytes::new()));
364    let sstable_size = (storage_opts.sstable_size_mb as u64) << 20;
365    let min_sstable_size = (storage_opts.min_sstable_size_mb as u64) << 20;
366    let parallel_compact_size = (storage_opts.parallel_compact_size_mb as u64) << 20;
367    let parallelism = std::cmp::min(
368        storage_opts.share_buffers_sync_parallelism as u64,
369        size_and_start_user_keys.len() as u64,
370    );
371    let sub_compaction_data_size = if compact_data_size > parallel_compact_size && parallelism > 1 {
372        compact_data_size / parallelism
373    } else {
374        compact_data_size
375    };
376
377    if parallelism > 1 && compact_data_size > sstable_size {
378        let mut last_buffer_size = 0;
379        let mut last_key: Vec<u8> = vec![];
380        for (data_size, key) in size_and_start_user_keys {
381            if last_buffer_size >= sub_compaction_data_size && !last_key.eq(&key) {
382                splits.last_mut().unwrap().right = Bytes::from(key.clone());
383                splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
384                last_buffer_size = data_size;
385            } else {
386                last_buffer_size += data_size;
387            }
388
389            last_key = key;
390        }
391    }
392
393    if compact_data_size > sstable_size {
394        // Meta node will calculate size of each state-table in one task in `risingwave_meta::hummock::manager::compaction::calculate_vnode_partition`.
395        // To make the calculate result more accurately we shall split the large state-table from other small ones.
396        for table_id in existing_table_ids {
397            if let Some(table_size) = table_size_infos.get(table_id)
398                && *table_size > min_sstable_size
399            {
400                table_vnode_partition.insert(*table_id, 1);
401            }
402        }
403    }
404
405    // mul 1.2 for other extra memory usage.
406    // Ensure that the size of each sstable is still less than `sstable_size` after optimization to avoid generating a huge size sstable which will affect the object store
407    let sub_compaction_sstable_size = std::cmp::min(sstable_size, sub_compaction_data_size * 6 / 5);
408    (
409        splits,
410        sub_compaction_sstable_size,
411        table_vnode_partition,
412        compact_data_size,
413    )
414}
415
416pub struct SharedBufferCompactRunner {
417    compactor: Compactor,
418    split_index: usize,
419}
420
421impl SharedBufferCompactRunner {
422    pub fn new(
423        split_index: usize,
424        key_range: KeyRange,
425        context: CompactorContext,
426        sub_compaction_sstable_size: usize,
427        estimated_output_key_count: usize,
428        table_vnode_partition: BTreeMap<TableId, u32>,
429        sstable_filter_layout: PbSstableFilterLayout,
430        object_id_getter: Arc<dyn GetObjectId>,
431    ) -> Self {
432        let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
433        options.capacity = sub_compaction_sstable_size;
434        options.estimated_output_key_count = Some(estimated_output_key_count);
435        options.filter_hash_prealloc_key_count_cap = blocked_xor_filter_key_count_threshold(None);
436        let compactor = Compactor::new(
437            context,
438            options,
439            super::TaskConfig {
440                key_range,
441                cache_policy: CachePolicy::Fill(Hint::Normal),
442                gc_delete_keys: GC_DELETE_KEYS_FOR_FLUSH,
443                retain_multiple_version: true,
444                table_vnode_partition,
445                sstable_filter_layout,
446                // L0 flush writes overlapping SSTs, so keep a conservative filter type here.
447                sstable_filter_type: PbSstableFilterType::SstableFilterXor16,
448                table_schemas: Default::default(),
449                disable_drop_column_optimization: false,
450            },
451            object_id_getter,
452        );
453        Self {
454            compactor,
455            split_index,
456        }
457    }
458
459    pub async fn run(
460        self,
461        iter: impl HummockIterator<Direction = Forward>,
462        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
463    ) -> HummockResult<CompactOutput> {
464        let dummy_compaction_filter = DummyCompactionFilter {};
465        let (ssts, table_stats_map) = self
466            .compactor
467            .compact_key_range(
468                iter,
469                dummy_compaction_filter,
470                compaction_catalog_agent_ref,
471                None,
472                None,
473                None,
474            )
475            .await?;
476        Ok((self.split_index, ssts, table_stats_map))
477    }
478}
479
480#[cfg(test)]
481mod tests {
482    use std::collections::HashSet;
483
484    use bytes::Bytes;
485    use risingwave_common::catalog::TableId;
486    use risingwave_common::hash::VirtualNode;
487    use risingwave_common::util::epoch::test_epoch;
488    use risingwave_hummock_sdk::key::{TableKey, prefix_slice_with_vnode};
489
490    use crate::hummock::compactor::shared_buffer_compact::generate_splits;
491    use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferValue;
492    use crate::mem_table::ImmutableMemtable;
493    use crate::opts::StorageOpts;
494
495    fn generate_key(key: &str) -> TableKey<Bytes> {
496        TableKey(prefix_slice_with_vnode(
497            VirtualNode::from_index(1),
498            key.as_bytes(),
499        ))
500    }
501
502    #[tokio::test]
503    async fn test_generate_splits_in_order() {
504        let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test(
505            test_epoch(3),
506            0,
507            vec![(
508                generate_key("dddd"),
509                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
510            )],
511            1024 * 1024,
512            TableId::new(1),
513        );
514        let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test(
515            test_epoch(3),
516            0,
517            vec![(
518                generate_key("abb"),
519                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
520            )],
521            (1024 + 256) * 1024,
522            TableId::new(1),
523        );
524
525        let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test(
526            test_epoch(2),
527            0,
528            vec![(
529                generate_key("abc"),
530                SharedBufferValue::Insert(Bytes::from_static(b"v2")),
531            )],
532            (1024 + 512) * 1024,
533            TableId::new(1),
534        );
535        let imm4 = ImmutableMemtable::build_shared_buffer_batch_for_test(
536            test_epoch(3),
537            0,
538            vec![(
539                generate_key("aaa"),
540                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
541            )],
542            (1024 + 512) * 1024,
543            TableId::new(1),
544        );
545
546        let imm5 = ImmutableMemtable::build_shared_buffer_batch_for_test(
547            test_epoch(3),
548            0,
549            vec![(
550                generate_key("aaa"),
551                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
552            )],
553            (1024 + 256) * 1024,
554            TableId::new(2),
555        );
556
557        let storage_opts = StorageOpts {
558            share_buffers_sync_parallelism: 3,
559            parallel_compact_size_mb: 1,
560            sstable_size_mb: 1,
561            ..Default::default()
562        };
563        let payload = vec![imm1, imm2, imm3, imm4, imm5];
564        let (splits, _sstable_capacity, vnodes, _) = generate_splits(
565            &payload,
566            &HashSet::from_iter([1.into(), 2.into()]),
567            &storage_opts,
568        );
569        assert_eq!(
570            splits.len(),
571            storage_opts.share_buffers_sync_parallelism as usize
572        );
573        assert!(vnodes.is_empty());
574
575        // Basic validation: splits should be continuous and monotonic
576        for i in 1..splits.len() {
577            assert_eq!(splits[i].left, splits[i - 1].right);
578            assert!(splits[i].left > splits[i - 1].left);
579            assert!(splits[i].right.is_empty() || splits[i].left < splits[i].right);
580        }
581    }
582
583    #[tokio::test]
584    async fn test_generate_splits_no_duplicate_keys() {
585        // Create test data with specific ordering to detect sorting issues
586        // Make data sizes large enough to trigger splitting
587        let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test(
588            test_epoch(1),
589            0,
590            vec![(
591                generate_key("zzz"), // This should be last after sorting
592                SharedBufferValue::Insert(Bytes::from_static(b"v1")),
593            )],
594            2 * 1024 * 1024, // 2MB to ensure compact_data_size > sstable_size
595            TableId::new(1),
596        );
597
598        let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test(
599            test_epoch(1),
600            0,
601            vec![(
602                generate_key("aaa"), // This should be first after sorting
603                SharedBufferValue::Insert(Bytes::from_static(b"v1")),
604            )],
605            2 * 1024 * 1024, // 2MB
606            TableId::new(1),
607        );
608
609        let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test(
610            test_epoch(1),
611            0,
612            vec![(
613                generate_key("mmm"), // This should be middle after sorting
614                SharedBufferValue::Insert(Bytes::from_static(b"v1")),
615            )],
616            2 * 1024 * 1024, // 2MB
617            TableId::new(1),
618        );
619
620        let storage_opts = StorageOpts {
621            share_buffers_sync_parallelism: 3, // Enable parallelism
622            parallel_compact_size_mb: 2,       // Small threshold to trigger splitting
623            sstable_size_mb: 1,                // Small SSTable size to trigger condition
624            ..Default::default()
625        };
626
627        // Test with payload in wrong order (zzz, aaa, mmm) instead of sorted order (aaa, mmm, zzz)
628        let payload = vec![imm1, imm2, imm3];
629        let (splits, _sstable_capacity, _vnodes, _) =
630            generate_splits(&payload, &HashSet::from_iter([1.into()]), &storage_opts);
631
632        // Should have multiple splits due to large data size
633        assert!(
634            splits.len() > 1,
635            "Expected multiple splits, got {}",
636            splits.len()
637        );
638
639        // Verify no key range overlaps between splits
640        for i in 0..splits.len() {
641            for j in (i + 1)..splits.len() {
642                let split_i = &splits[i];
643                let split_j = &splits[j];
644
645                // Check that splits don't overlap
646                if !split_i.right.is_empty() && !split_j.left.is_empty() {
647                    assert!(
648                        split_i.right <= split_j.left || split_j.right <= split_i.left,
649                        "Split {} and {} overlap: [{:?}, {:?}) vs [{:?}, {:?})",
650                        i,
651                        j,
652                        split_i.left,
653                        split_i.right,
654                        split_j.left,
655                        split_j.right
656                    );
657                }
658            }
659        }
660
661        // Additional verification: ensure splits are sorted
662        for i in 1..splits.len() {
663            if !splits[i - 1].right.is_empty() && !splits[i].left.is_empty() {
664                assert!(
665                    splits[i - 1].right <= splits[i].left,
666                    "Splits are not in sorted order at index {}: {:?} > {:?}",
667                    i,
668                    splits[i - 1].right,
669                    splits[i].left
670                );
671            }
672        }
673    }
674}