risingwave_storage/hummock/compactor/
shared_buffer_compact.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::Bound;
17use std::sync::atomic::AtomicUsize;
18use std::sync::atomic::Ordering::Relaxed;
19use std::sync::{Arc, LazyLock};
20
21use await_tree::InstrumentAwait;
22use bytes::Bytes;
23use foyer::Hint;
24use futures::future::try_join;
25use futures::{FutureExt, StreamExt, stream};
26use itertools::Itertools;
27use risingwave_common::catalog::TableId;
28use risingwave_hummock_sdk::key::{EPOCH_LEN, FullKey, FullKeyTracker, UserKey};
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::{EpochWithGap, KeyComparator, LocalSstableInfo};
31use thiserror_ext::AsReport;
32use tracing::{error, warn};
33
34use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
35use crate::hummock::compactor::compaction_filter::DummyCompactionFilter;
36use crate::hummock::compactor::context::{CompactorContext, await_tree_key};
37use crate::hummock::compactor::{CompactOutput, Compactor, check_flush_result};
38use crate::hummock::event_handler::uploader::UploadTaskOutput;
39use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator, UserIterator};
40use crate::hummock::shared_buffer::shared_buffer_batch::{
41    SharedBufferBatch, SharedBufferBatchInner, SharedBufferBatchOldValues, SharedBufferKeyEntry,
42    VersionedSharedBufferValue,
43};
44use crate::hummock::{
45    CachePolicy, GetObjectId, HummockError, HummockResult, ObjectIdManagerRef,
46    SstableBuilderOptions,
47};
48use crate::mem_table::ImmutableMemtable;
49use crate::opts::StorageOpts;
50
51const GC_DELETE_KEYS_FOR_FLUSH: bool = false;
52
53/// Flush shared buffer to level0. Resulted SSTs are grouped by compaction group.
54pub async fn compact(
55    context: CompactorContext,
56    object_id_manager: ObjectIdManagerRef,
57    payload: Vec<ImmutableMemtable>,
58    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
59) -> HummockResult<UploadTaskOutput> {
60    let table_ids_with_old_value: HashSet<TableId> = payload
61        .iter()
62        .filter(|imm| imm.has_old_value())
63        .map(|imm| imm.table_id)
64        .collect();
65    let mut non_log_store_new_value_payload = Vec::with_capacity(payload.len());
66    let mut log_store_new_value_payload = Vec::with_capacity(payload.len());
67    let mut old_value_payload = Vec::with_capacity(payload.len());
68    for imm in payload {
69        if table_ids_with_old_value.contains(&imm.table_id) {
70            if imm.has_old_value() {
71                old_value_payload.push(imm.clone());
72            }
73            log_store_new_value_payload.push(imm);
74        } else {
75            assert!(!imm.has_old_value());
76            non_log_store_new_value_payload.push(imm);
77        }
78    }
79    let non_log_store_new_value_future = async {
80        if non_log_store_new_value_payload.is_empty() {
81            Ok(vec![])
82        } else {
83            compact_shared_buffer::<true>(
84                context.clone(),
85                object_id_manager.clone(),
86                compaction_catalog_manager_ref.clone(),
87                non_log_store_new_value_payload,
88            )
89            .instrument_await("shared_buffer_compact_non_log_store_new_value")
90            .await
91        }
92    };
93
94    let log_store_new_value_future = async {
95        if log_store_new_value_payload.is_empty() {
96            Ok(vec![])
97        } else {
98            compact_shared_buffer::<true>(
99                context.clone(),
100                object_id_manager.clone(),
101                compaction_catalog_manager_ref.clone(),
102                log_store_new_value_payload,
103            )
104            .instrument_await("shared_buffer_compact_log_store_new_value")
105            .await
106        }
107    };
108
109    let old_value_future = async {
110        if old_value_payload.is_empty() {
111            Ok(vec![])
112        } else {
113            compact_shared_buffer::<false>(
114                context.clone(),
115                object_id_manager.clone(),
116                compaction_catalog_manager_ref.clone(),
117                old_value_payload,
118            )
119            .instrument_await("shared_buffer_compact_log_store_old_value")
120            .await
121        }
122    };
123
124    // Note that the output is reordered compared with input `payload`.
125    let ((non_log_store_new_value_ssts, log_store_new_value_ssts), old_value_ssts) = try_join(
126        try_join(non_log_store_new_value_future, log_store_new_value_future),
127        old_value_future,
128    )
129    .await?;
130
131    let mut new_value_ssts = non_log_store_new_value_ssts;
132    new_value_ssts.extend(log_store_new_value_ssts);
133
134    Ok(UploadTaskOutput {
135        new_value_ssts,
136        old_value_ssts,
137        wait_poll_timer: None,
138    })
139}
140
141/// For compaction from shared buffer to level 0, this is the only function gets called.
142///
143/// The `IS_NEW_VALUE` flag means for the given payload, we are doing compaction using its new value or old value.
144/// When `IS_NEW_VALUE` is false, we are compacting with old value, and the payload imms should have `old_values` not `None`
145async fn compact_shared_buffer<const IS_NEW_VALUE: bool>(
146    context: CompactorContext,
147    object_id_manager: ObjectIdManagerRef,
148    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
149    mut payload: Vec<ImmutableMemtable>,
150) -> HummockResult<Vec<LocalSstableInfo>> {
151    if !IS_NEW_VALUE {
152        assert!(payload.iter().all(|imm| imm.has_old_value()));
153    }
154    // Local memory compaction looks at all key ranges.
155    let existing_table_ids: HashSet<TableId> =
156        payload.iter().map(|imm| imm.table_id).dedup().collect();
157    assert!(!existing_table_ids.is_empty());
158
159    let compaction_catalog_agent_ref = compaction_catalog_manager_ref
160        .acquire(existing_table_ids.iter().copied().collect())
161        .await?;
162    let existing_table_ids = compaction_catalog_agent_ref
163        .table_ids()
164        .collect::<HashSet<_>>();
165    payload.retain(|imm| {
166        let ret = existing_table_ids.contains(&imm.table_id);
167        if !ret {
168            error!(
169                "can not find table {:?}, it may be removed by meta-service",
170                imm.table_id
171            );
172        }
173        ret
174    });
175
176    let total_key_count = payload.iter().map(|imm| imm.key_count()).sum::<usize>();
177    let (splits, sub_compaction_sstable_size, table_vnode_partition) =
178        generate_splits(&payload, &existing_table_ids, context.storage_opts.as_ref());
179    let parallelism = splits.len();
180    let mut compact_success = true;
181    let mut output_ssts = Vec::with_capacity(parallelism);
182    let mut compaction_futures = vec![];
183    // Shared buffer compaction always goes to L0. Use block_based_filter when kv_count is large.
184    // Use None to apply the default threshold since shared buffer flush doesn't have a CompactTask.
185    let use_block_based_filter =
186        risingwave_hummock_sdk::filter_utils::is_kv_count_too_large_for_xor16(
187            total_key_count as u64,
188            None,
189        );
190
191    for (split_index, key_range) in splits.into_iter().enumerate() {
192        let compactor = SharedBufferCompactRunner::new(
193            split_index,
194            key_range,
195            context.clone(),
196            sub_compaction_sstable_size as usize,
197            table_vnode_partition.clone(),
198            use_block_based_filter,
199            object_id_manager.clone(),
200        );
201        let mut forward_iters = Vec::with_capacity(payload.len());
202        for imm in &payload {
203            forward_iters.push(imm.clone().into_directed_iter::<Forward, IS_NEW_VALUE>());
204        }
205        let compaction_executor = context.compaction_executor.clone();
206        let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
207        let handle = compaction_executor.spawn({
208            static NEXT_SHARED_BUFFER_COMPACT_ID: LazyLock<AtomicUsize> =
209                LazyLock::new(|| AtomicUsize::new(0));
210            let tree_root = context.await_tree_reg.as_ref().map(|reg| {
211                let id = NEXT_SHARED_BUFFER_COMPACT_ID.fetch_add(1, Relaxed);
212                reg.register(
213                    await_tree_key::CompactSharedBuffer { id },
214                    format!(
215                        "Compact Shared Buffer: {:?}",
216                        payload
217                            .iter()
218                            .flat_map(|imm| imm.epochs().iter())
219                            .copied()
220                            .collect::<BTreeSet<_>>()
221                    ),
222                )
223            });
224            let future = compactor.run(
225                MergeIterator::new(forward_iters),
226                compaction_catalog_agent_ref,
227            );
228            if let Some(root) = tree_root {
229                root.instrument(future).left_future()
230            } else {
231                future.right_future()
232            }
233        });
234        compaction_futures.push(handle);
235    }
236
237    let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
238    let mut err = None;
239    while let Some(future_result) = buffered.next().await {
240        match future_result {
241            Ok(Ok((split_index, ssts, table_stats_map))) => {
242                output_ssts.push((split_index, ssts, table_stats_map));
243            }
244            Ok(Err(e)) => {
245                compact_success = false;
246                tracing::warn!(error = %e.as_report(), "Shared Buffer Compaction failed with error");
247                err = Some(e);
248            }
249            Err(e) => {
250                compact_success = false;
251                tracing::warn!(
252                    error = %e.as_report(),
253                    "Shared Buffer Compaction failed with future error",
254                );
255                err = Some(HummockError::compaction_executor(
256                    "failed while execute in tokio",
257                ));
258            }
259        }
260    }
261
262    // Sort by split/key range index.
263    output_ssts.sort_by_key(|(split_index, ..)| *split_index);
264
265    if compact_success {
266        let mut level0 = Vec::with_capacity(parallelism);
267        let mut sst_infos = vec![];
268        for (_, ssts, _) in output_ssts {
269            for sst_info in &ssts {
270                context
271                    .compactor_metrics
272                    .write_build_l0_bytes
273                    .inc_by(sst_info.file_size());
274
275                sst_infos.push(sst_info.sst_info.clone());
276            }
277            level0.extend(ssts);
278        }
279        if context.storage_opts.check_compaction_result {
280            let compaction_executor = context.compaction_executor.clone();
281            let mut forward_iters = Vec::with_capacity(payload.len());
282            for imm in &payload {
283                if !existing_table_ids.contains(&imm.table_id) {
284                    continue;
285                }
286                forward_iters.push(imm.clone().into_forward_iter());
287            }
288            let iter = MergeIterator::new(forward_iters);
289            let left_iter = UserIterator::new(
290                iter,
291                (Bound::Unbounded, Bound::Unbounded),
292                u64::MAX,
293                0,
294                None,
295            );
296            compaction_executor.spawn(async move {
297                match check_flush_result(
298                    left_iter,
299                    sst_infos,
300                    context,
301                )
302                .await
303                {
304                    Err(e) => {
305                        tracing::warn!(error = %e.as_report(), "Failed check flush result of memtable");
306                    }
307                    Ok(true) => (),
308                    Ok(false) => {
309                        panic!(
310                            "failed to check flush result consistency of state-table {:?}",
311                            existing_table_ids
312                        );
313                    }
314                }
315            });
316        }
317        Ok(level0)
318    } else {
319        Err(err.unwrap())
320    }
321}
322
323/// Merge multiple batches into a larger one
324pub async fn merge_imms_in_memory(
325    table_id: TableId,
326    imms: Vec<ImmutableMemtable>,
327) -> ImmutableMemtable {
328    let mut epochs = vec![];
329    let mut merged_size = 0;
330    assert!(imms.iter().rev().map(|imm| imm.batch_id()).is_sorted());
331    let max_imm_id = imms[0].batch_id();
332
333    let has_old_value = imms[0].has_old_value();
334    // TODO: make sure that the corner case on switch_op_consistency is handled
335    // If the imm of a table id contains old value, all other imm of the same table id should have old value
336    assert!(imms.iter().all(|imm| imm.has_old_value() == has_old_value));
337
338    let (old_value_size, global_old_value_size) = if has_old_value {
339        (
340            imms.iter()
341                .map(|imm| imm.old_values().expect("has old value").size)
342                .sum(),
343            Some(
344                imms[0]
345                    .old_values()
346                    .expect("has old value")
347                    .global_old_value_size
348                    .clone(),
349            ),
350        )
351    } else {
352        (0, None)
353    };
354
355    let mut imm_iters = Vec::with_capacity(imms.len());
356    let key_count = imms.iter().map(|imm| imm.key_count()).sum();
357    let value_count = imms.iter().map(|imm| imm.value_count()).sum();
358    for imm in imms {
359        assert!(imm.key_count() > 0, "imm should not be empty");
360        assert_eq!(
361            table_id,
362            imm.table_id(),
363            "should only merge data belonging to the same table"
364        );
365
366        epochs.push(imm.min_epoch());
367        merged_size += imm.size();
368
369        imm_iters.push(imm.into_forward_iter());
370    }
371    epochs.sort();
372
373    // use merge iterator to merge input imms
374    let mut mi = MergeIterator::new(imm_iters);
375    mi.rewind_no_await();
376    assert!(mi.is_valid());
377
378    let first_item_key = mi.current_key_entry().key.clone();
379
380    let mut merged_entries: Vec<SharedBufferKeyEntry> = Vec::with_capacity(key_count);
381    let mut values: Vec<VersionedSharedBufferValue> = Vec::with_capacity(value_count);
382    let mut old_values: Option<Vec<Bytes>> = if has_old_value {
383        Some(Vec::with_capacity(value_count))
384    } else {
385        None
386    };
387
388    merged_entries.push(SharedBufferKeyEntry {
389        key: first_item_key.clone(),
390        value_offset: 0,
391    });
392
393    // Use first key, max epoch to initialize the tracker to ensure that the check first call to full_key_tracker.observe will succeed
394    let mut full_key_tracker = FullKeyTracker::<Bytes>::new(FullKey::new_with_gap_epoch(
395        table_id,
396        first_item_key,
397        EpochWithGap::new_max_epoch(),
398    ));
399
400    while mi.is_valid() {
401        let key_entry = mi.current_key_entry();
402        let user_key = UserKey {
403            table_id,
404            table_key: key_entry.key.clone(),
405        };
406        if full_key_tracker.observe_multi_version(
407            user_key,
408            key_entry
409                .new_values
410                .iter()
411                .map(|(epoch_with_gap, _)| *epoch_with_gap),
412        ) {
413            let last_entry = merged_entries.last_mut().expect("non-empty");
414            if last_entry.value_offset == values.len() {
415                warn!(key = ?last_entry.key, "key has no value in imm compact. skipped");
416                last_entry.key = full_key_tracker.latest_user_key().table_key.clone();
417            } else {
418                // Record kv entries
419                merged_entries.push(SharedBufferKeyEntry {
420                    key: full_key_tracker.latest_user_key().table_key.clone(),
421                    value_offset: values.len(),
422                });
423            }
424        }
425        values.extend(
426            key_entry
427                .new_values
428                .iter()
429                .map(|(epoch_with_gap, value)| (*epoch_with_gap, value.clone())),
430        );
431        if let Some(old_values) = &mut old_values {
432            old_values.extend(key_entry.old_values.expect("should exist").iter().cloned())
433        }
434        mi.advance_peek_to_next_key();
435        // Since there is no blocking point in this method, but it is cpu intensive, we call this method
436        // to do cooperative scheduling
437        tokio::task::consume_budget().await;
438    }
439
440    let old_values = old_values.map(|old_values| {
441        SharedBufferBatchOldValues::new(
442            old_values,
443            old_value_size,
444            global_old_value_size.expect("should exist when has old value"),
445        )
446    });
447
448    SharedBufferBatch {
449        inner: Arc::new(SharedBufferBatchInner::new_with_multi_epoch_batches(
450            epochs,
451            merged_entries,
452            values,
453            old_values,
454            merged_size,
455            max_imm_id,
456        )),
457        table_id,
458    }
459}
460
461///  Based on the incoming payload and opts, calculate the sharding method and sstable size of shared buffer compaction.
462fn generate_splits(
463    payload: &Vec<ImmutableMemtable>,
464    existing_table_ids: &HashSet<TableId>,
465    storage_opts: &StorageOpts,
466) -> (Vec<KeyRange>, u64, BTreeMap<TableId, u32>) {
467    let mut size_and_start_user_keys = vec![];
468    let mut compact_data_size = 0;
469    let mut table_size_infos: HashMap<TableId, u64> = HashMap::default();
470    let mut table_vnode_partition = BTreeMap::default();
471    for imm in payload {
472        let data_size = {
473            // calculate encoded bytes of key var length
474            (imm.value_count() * EPOCH_LEN + imm.size()) as u64
475        };
476        compact_data_size += data_size;
477        size_and_start_user_keys.push((
478            data_size,
479            FullKey {
480                user_key: imm.start_user_key(),
481                epoch_with_gap: EpochWithGap::new_max_epoch(),
482            }
483            .encode(),
484        ));
485        let v = table_size_infos.entry(imm.table_id).or_insert(0);
486        *v += data_size;
487    }
488
489    size_and_start_user_keys
490        .sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref()));
491    let mut splits = Vec::with_capacity(size_and_start_user_keys.len());
492    splits.push(KeyRange::new(Bytes::new(), Bytes::new()));
493    let sstable_size = (storage_opts.sstable_size_mb as u64) << 20;
494    let min_sstable_size = (storage_opts.min_sstable_size_mb as u64) << 20;
495    let parallel_compact_size = (storage_opts.parallel_compact_size_mb as u64) << 20;
496    let parallelism = std::cmp::min(
497        storage_opts.share_buffers_sync_parallelism as u64,
498        size_and_start_user_keys.len() as u64,
499    );
500    let sub_compaction_data_size = if compact_data_size > parallel_compact_size && parallelism > 1 {
501        compact_data_size / parallelism
502    } else {
503        compact_data_size
504    };
505
506    if parallelism > 1 && compact_data_size > sstable_size {
507        let mut last_buffer_size = 0;
508        let mut last_key: Vec<u8> = vec![];
509        for (data_size, key) in size_and_start_user_keys {
510            if last_buffer_size >= sub_compaction_data_size && !last_key.eq(&key) {
511                splits.last_mut().unwrap().right = Bytes::from(key.clone());
512                splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
513                last_buffer_size = data_size;
514            } else {
515                last_buffer_size += data_size;
516            }
517
518            last_key = key;
519        }
520    }
521
522    if compact_data_size > sstable_size {
523        // Meta node will calculate size of each state-table in one task in `risingwave_meta::hummock::manager::compaction::calculate_vnode_partition`.
524        // To make the calculate result more accurately we shall split the large state-table from other small ones.
525        for table_id in existing_table_ids {
526            if let Some(table_size) = table_size_infos.get(table_id)
527                && *table_size > min_sstable_size
528            {
529                table_vnode_partition.insert(*table_id, 1);
530            }
531        }
532    }
533
534    // mul 1.2 for other extra memory usage.
535    // Ensure that the size of each sstable is still less than `sstable_size` after optimization to avoid generating a huge size sstable which will affect the object store
536    let sub_compaction_sstable_size = std::cmp::min(sstable_size, sub_compaction_data_size * 6 / 5);
537    (splits, sub_compaction_sstable_size, table_vnode_partition)
538}
539
540pub struct SharedBufferCompactRunner {
541    compactor: Compactor,
542    split_index: usize,
543}
544
545impl SharedBufferCompactRunner {
546    pub fn new(
547        split_index: usize,
548        key_range: KeyRange,
549        context: CompactorContext,
550        sub_compaction_sstable_size: usize,
551        table_vnode_partition: BTreeMap<TableId, u32>,
552        use_block_based_filter: bool,
553        object_id_getter: Arc<dyn GetObjectId>,
554    ) -> Self {
555        let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
556        options.capacity = sub_compaction_sstable_size;
557        let compactor = Compactor::new(
558            context,
559            options,
560            super::TaskConfig {
561                key_range,
562                cache_policy: CachePolicy::Fill(Hint::Normal),
563                gc_delete_keys: GC_DELETE_KEYS_FOR_FLUSH,
564                retain_multiple_version: true,
565                table_vnode_partition,
566                use_block_based_filter,
567                table_schemas: Default::default(),
568                disable_drop_column_optimization: false,
569            },
570            object_id_getter,
571        );
572        Self {
573            compactor,
574            split_index,
575        }
576    }
577
578    pub async fn run(
579        self,
580        iter: impl HummockIterator<Direction = Forward>,
581        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
582    ) -> HummockResult<CompactOutput> {
583        let dummy_compaction_filter = DummyCompactionFilter {};
584        let (ssts, table_stats_map) = self
585            .compactor
586            .compact_key_range(
587                iter,
588                dummy_compaction_filter,
589                compaction_catalog_agent_ref,
590                None,
591                None,
592                None,
593            )
594            .await?;
595        Ok((self.split_index, ssts, table_stats_map))
596    }
597}
598
599#[cfg(test)]
600mod tests {
601    use std::collections::HashSet;
602
603    use bytes::Bytes;
604    use risingwave_common::catalog::TableId;
605    use risingwave_common::hash::VirtualNode;
606    use risingwave_common::util::epoch::test_epoch;
607    use risingwave_hummock_sdk::key::{TableKey, prefix_slice_with_vnode};
608
609    use crate::hummock::compactor::shared_buffer_compact::generate_splits;
610    use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferValue;
611    use crate::mem_table::ImmutableMemtable;
612    use crate::opts::StorageOpts;
613
614    fn generate_key(key: &str) -> TableKey<Bytes> {
615        TableKey(prefix_slice_with_vnode(
616            VirtualNode::from_index(1),
617            key.as_bytes(),
618        ))
619    }
620
621    #[tokio::test]
622    async fn test_generate_splits_in_order() {
623        let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test(
624            test_epoch(3),
625            0,
626            vec![(
627                generate_key("dddd"),
628                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
629            )],
630            1024 * 1024,
631            TableId::new(1),
632        );
633        let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test(
634            test_epoch(3),
635            0,
636            vec![(
637                generate_key("abb"),
638                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
639            )],
640            (1024 + 256) * 1024,
641            TableId::new(1),
642        );
643
644        let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test(
645            test_epoch(2),
646            0,
647            vec![(
648                generate_key("abc"),
649                SharedBufferValue::Insert(Bytes::from_static(b"v2")),
650            )],
651            (1024 + 512) * 1024,
652            TableId::new(1),
653        );
654        let imm4 = ImmutableMemtable::build_shared_buffer_batch_for_test(
655            test_epoch(3),
656            0,
657            vec![(
658                generate_key("aaa"),
659                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
660            )],
661            (1024 + 512) * 1024,
662            TableId::new(1),
663        );
664
665        let imm5 = ImmutableMemtable::build_shared_buffer_batch_for_test(
666            test_epoch(3),
667            0,
668            vec![(
669                generate_key("aaa"),
670                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
671            )],
672            (1024 + 256) * 1024,
673            TableId::new(2),
674        );
675
676        let storage_opts = StorageOpts {
677            share_buffers_sync_parallelism: 3,
678            parallel_compact_size_mb: 1,
679            sstable_size_mb: 1,
680            ..Default::default()
681        };
682        let payload = vec![imm1, imm2, imm3, imm4, imm5];
683        let (splits, _sstable_capacity, vnodes) = generate_splits(
684            &payload,
685            &HashSet::from_iter([1.into(), 2.into()]),
686            &storage_opts,
687        );
688        assert_eq!(
689            splits.len(),
690            storage_opts.share_buffers_sync_parallelism as usize
691        );
692        assert!(vnodes.is_empty());
693
694        // Basic validation: splits should be continuous and monotonic
695        for i in 1..splits.len() {
696            assert_eq!(splits[i].left, splits[i - 1].right);
697            assert!(splits[i].left > splits[i - 1].left);
698            assert!(splits[i].right.is_empty() || splits[i].left < splits[i].right);
699        }
700    }
701
702    #[tokio::test]
703    async fn test_generate_splits_no_duplicate_keys() {
704        // Create test data with specific ordering to detect sorting issues
705        // Make data sizes large enough to trigger splitting
706        let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test(
707            test_epoch(1),
708            0,
709            vec![(
710                generate_key("zzz"), // This should be last after sorting
711                SharedBufferValue::Insert(Bytes::from_static(b"v1")),
712            )],
713            2 * 1024 * 1024, // 2MB to ensure compact_data_size > sstable_size
714            TableId::new(1),
715        );
716
717        let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test(
718            test_epoch(1),
719            0,
720            vec![(
721                generate_key("aaa"), // This should be first after sorting
722                SharedBufferValue::Insert(Bytes::from_static(b"v1")),
723            )],
724            2 * 1024 * 1024, // 2MB
725            TableId::new(1),
726        );
727
728        let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test(
729            test_epoch(1),
730            0,
731            vec![(
732                generate_key("mmm"), // This should be middle after sorting
733                SharedBufferValue::Insert(Bytes::from_static(b"v1")),
734            )],
735            2 * 1024 * 1024, // 2MB
736            TableId::new(1),
737        );
738
739        let storage_opts = StorageOpts {
740            share_buffers_sync_parallelism: 3, // Enable parallelism
741            parallel_compact_size_mb: 2,       // Small threshold to trigger splitting
742            sstable_size_mb: 1,                // Small SSTable size to trigger condition
743            ..Default::default()
744        };
745
746        // Test with payload in wrong order (zzz, aaa, mmm) instead of sorted order (aaa, mmm, zzz)
747        let payload = vec![imm1, imm2, imm3];
748        let (splits, _sstable_capacity, _vnodes) =
749            generate_splits(&payload, &HashSet::from_iter([1.into()]), &storage_opts);
750
751        // Should have multiple splits due to large data size
752        assert!(
753            splits.len() > 1,
754            "Expected multiple splits, got {}",
755            splits.len()
756        );
757
758        // Verify no key range overlaps between splits
759        for i in 0..splits.len() {
760            for j in (i + 1)..splits.len() {
761                let split_i = &splits[i];
762                let split_j = &splits[j];
763
764                // Check that splits don't overlap
765                if !split_i.right.is_empty() && !split_j.left.is_empty() {
766                    assert!(
767                        split_i.right <= split_j.left || split_j.right <= split_i.left,
768                        "Split {} and {} overlap: [{:?}, {:?}) vs [{:?}, {:?})",
769                        i,
770                        j,
771                        split_i.left,
772                        split_i.right,
773                        split_j.left,
774                        split_j.right
775                    );
776                }
777            }
778        }
779
780        // Additional verification: ensure splits are sorted
781        for i in 1..splits.len() {
782            if !splits[i - 1].right.is_empty() && !splits[i].left.is_empty() {
783                assert!(
784                    splits[i - 1].right <= splits[i].left,
785                    "Splits are not in sorted order at index {}: {:?} > {:?}",
786                    i,
787                    splits[i - 1].right,
788                    splits[i].left
789                );
790            }
791        }
792    }
793}