risingwave_storage/hummock/compactor/
shared_buffer_compact.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::Bound;
17use std::sync::atomic::AtomicUsize;
18use std::sync::atomic::Ordering::Relaxed;
19use std::sync::{Arc, LazyLock};
20
21use await_tree::InstrumentAwait;
22use bytes::Bytes;
23use foyer::Hint;
24use futures::future::try_join;
25use futures::{FutureExt, StreamExt, stream};
26use itertools::Itertools;
27use risingwave_common::catalog::TableId;
28use risingwave_hummock_sdk::key::{EPOCH_LEN, FullKey, FullKeyTracker, UserKey};
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::{EpochWithGap, KeyComparator, LocalSstableInfo};
31use risingwave_pb::hummock::compact_task;
32use thiserror_ext::AsReport;
33use tracing::{error, warn};
34
35use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
36use crate::hummock::compactor::compaction_filter::DummyCompactionFilter;
37use crate::hummock::compactor::context::{CompactorContext, await_tree_key};
38use crate::hummock::compactor::{CompactOutput, Compactor, check_flush_result};
39use crate::hummock::event_handler::uploader::UploadTaskOutput;
40use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator, UserIterator};
41use crate::hummock::shared_buffer::shared_buffer_batch::{
42    SharedBufferBatch, SharedBufferBatchInner, SharedBufferBatchOldValues, SharedBufferKeyEntry,
43    VersionedSharedBufferValue,
44};
45use crate::hummock::utils::MemoryTracker;
46use crate::hummock::{
47    BlockedXor16FilterBuilder, CachePolicy, GetObjectId, HummockError, HummockResult,
48    ObjectIdManagerRef, SstableBuilderOptions,
49};
50use crate::mem_table::ImmutableMemtable;
51use crate::opts::StorageOpts;
52
53const GC_DELETE_KEYS_FOR_FLUSH: bool = false;
54
55/// Flush shared buffer to level0. Resulted SSTs are grouped by compaction group.
56pub async fn compact(
57    context: CompactorContext,
58    object_id_manager: ObjectIdManagerRef,
59    payload: Vec<ImmutableMemtable>,
60    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
61) -> HummockResult<UploadTaskOutput> {
62    let table_ids_with_old_value: HashSet<TableId> = payload
63        .iter()
64        .filter(|imm| imm.has_old_value())
65        .map(|imm| imm.table_id)
66        .collect();
67    let mut non_log_store_new_value_payload = Vec::with_capacity(payload.len());
68    let mut log_store_new_value_payload = Vec::with_capacity(payload.len());
69    let mut old_value_payload = Vec::with_capacity(payload.len());
70    for imm in payload {
71        if table_ids_with_old_value.contains(&imm.table_id) {
72            if imm.has_old_value() {
73                old_value_payload.push(imm.clone());
74            }
75            log_store_new_value_payload.push(imm);
76        } else {
77            assert!(!imm.has_old_value());
78            non_log_store_new_value_payload.push(imm);
79        }
80    }
81    let non_log_store_new_value_future = async {
82        if non_log_store_new_value_payload.is_empty() {
83            Ok(vec![])
84        } else {
85            compact_shared_buffer::<true>(
86                context.clone(),
87                object_id_manager.clone(),
88                compaction_catalog_manager_ref.clone(),
89                non_log_store_new_value_payload,
90            )
91            .instrument_await("shared_buffer_compact_non_log_store_new_value")
92            .await
93        }
94    };
95
96    let log_store_new_value_future = async {
97        if log_store_new_value_payload.is_empty() {
98            Ok(vec![])
99        } else {
100            compact_shared_buffer::<true>(
101                context.clone(),
102                object_id_manager.clone(),
103                compaction_catalog_manager_ref.clone(),
104                log_store_new_value_payload,
105            )
106            .instrument_await("shared_buffer_compact_log_store_new_value")
107            .await
108        }
109    };
110
111    let old_value_future = async {
112        if old_value_payload.is_empty() {
113            Ok(vec![])
114        } else {
115            compact_shared_buffer::<false>(
116                context.clone(),
117                object_id_manager.clone(),
118                compaction_catalog_manager_ref.clone(),
119                old_value_payload,
120            )
121            .instrument_await("shared_buffer_compact_log_store_old_value")
122            .await
123        }
124    };
125
126    // Note that the output is reordered compared with input `payload`.
127    let ((non_log_store_new_value_ssts, log_store_new_value_ssts), old_value_ssts) = try_join(
128        try_join(non_log_store_new_value_future, log_store_new_value_future),
129        old_value_future,
130    )
131    .await?;
132
133    let mut new_value_ssts = non_log_store_new_value_ssts;
134    new_value_ssts.extend(log_store_new_value_ssts);
135
136    Ok(UploadTaskOutput {
137        new_value_ssts,
138        old_value_ssts,
139        wait_poll_timer: None,
140    })
141}
142
143/// For compaction from shared buffer to level 0, this is the only function gets called.
144///
145/// The `IS_NEW_VALUE` flag means for the given payload, we are doing compaction using its new value or old value.
146/// When `IS_NEW_VALUE` is false, we are compacting with old value, and the payload imms should have `old_values` not `None`
147async fn compact_shared_buffer<const IS_NEW_VALUE: bool>(
148    context: CompactorContext,
149    object_id_manager: ObjectIdManagerRef,
150    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
151    mut payload: Vec<ImmutableMemtable>,
152) -> HummockResult<Vec<LocalSstableInfo>> {
153    if !IS_NEW_VALUE {
154        assert!(payload.iter().all(|imm| imm.has_old_value()));
155    }
156    // Local memory compaction looks at all key ranges.
157    let existing_table_ids: HashSet<TableId> =
158        payload.iter().map(|imm| imm.table_id).dedup().collect();
159    assert!(!existing_table_ids.is_empty());
160
161    let compaction_catalog_agent_ref = compaction_catalog_manager_ref
162        .acquire(existing_table_ids.iter().copied().collect())
163        .await?;
164    let existing_table_ids = compaction_catalog_agent_ref
165        .table_ids()
166        .collect::<HashSet<_>>();
167    payload.retain(|imm| {
168        let ret = existing_table_ids.contains(&imm.table_id);
169        if !ret {
170            error!(
171                "can not find table {:?}, it may be removed by meta-service",
172                imm.table_id
173            );
174        }
175        ret
176    });
177
178    let total_key_count = payload.iter().map(|imm| imm.key_count()).sum::<usize>();
179    let (splits, sub_compaction_sstable_size, table_vnode_partition) =
180        generate_splits(&payload, &existing_table_ids, context.storage_opts.as_ref());
181    let parallelism = splits.len();
182    let mut compact_success = true;
183    let mut output_ssts = Vec::with_capacity(parallelism);
184    let mut compaction_futures = vec![];
185    let use_block_based_filter = BlockedXor16FilterBuilder::is_kv_count_too_large(total_key_count);
186
187    for (split_index, key_range) in splits.into_iter().enumerate() {
188        let compactor = SharedBufferCompactRunner::new(
189            split_index,
190            key_range,
191            context.clone(),
192            sub_compaction_sstable_size as usize,
193            table_vnode_partition.clone(),
194            use_block_based_filter,
195            object_id_manager.clone(),
196        );
197        let mut forward_iters = Vec::with_capacity(payload.len());
198        for imm in &payload {
199            forward_iters.push(imm.clone().into_directed_iter::<Forward, IS_NEW_VALUE>());
200        }
201        let compaction_executor = context.compaction_executor.clone();
202        let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
203        let handle = compaction_executor.spawn({
204            static NEXT_SHARED_BUFFER_COMPACT_ID: LazyLock<AtomicUsize> =
205                LazyLock::new(|| AtomicUsize::new(0));
206            let tree_root = context.await_tree_reg.as_ref().map(|reg| {
207                let id = NEXT_SHARED_BUFFER_COMPACT_ID.fetch_add(1, Relaxed);
208                reg.register(
209                    await_tree_key::CompactSharedBuffer { id },
210                    format!(
211                        "Compact Shared Buffer: {:?}",
212                        payload
213                            .iter()
214                            .flat_map(|imm| imm.epochs().iter())
215                            .copied()
216                            .collect::<BTreeSet<_>>()
217                    ),
218                )
219            });
220            let future = compactor.run(
221                MergeIterator::new(forward_iters),
222                compaction_catalog_agent_ref,
223            );
224            if let Some(root) = tree_root {
225                root.instrument(future).left_future()
226            } else {
227                future.right_future()
228            }
229        });
230        compaction_futures.push(handle);
231    }
232
233    let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
234    let mut err = None;
235    while let Some(future_result) = buffered.next().await {
236        match future_result {
237            Ok(Ok((split_index, ssts, table_stats_map))) => {
238                output_ssts.push((split_index, ssts, table_stats_map));
239            }
240            Ok(Err(e)) => {
241                compact_success = false;
242                tracing::warn!(error = %e.as_report(), "Shared Buffer Compaction failed with error");
243                err = Some(e);
244            }
245            Err(e) => {
246                compact_success = false;
247                tracing::warn!(
248                    error = %e.as_report(),
249                    "Shared Buffer Compaction failed with future error",
250                );
251                err = Some(HummockError::compaction_executor(
252                    "failed while execute in tokio",
253                ));
254            }
255        }
256    }
257
258    // Sort by split/key range index.
259    output_ssts.sort_by_key(|(split_index, ..)| *split_index);
260
261    if compact_success {
262        let mut level0 = Vec::with_capacity(parallelism);
263        let mut sst_infos = vec![];
264        for (_, ssts, _) in output_ssts {
265            for sst_info in &ssts {
266                context
267                    .compactor_metrics
268                    .write_build_l0_bytes
269                    .inc_by(sst_info.file_size());
270
271                sst_infos.push(sst_info.sst_info.clone());
272            }
273            level0.extend(ssts);
274        }
275        if context.storage_opts.check_compaction_result {
276            let compaction_executor = context.compaction_executor.clone();
277            let mut forward_iters = Vec::with_capacity(payload.len());
278            for imm in &payload {
279                if !existing_table_ids.contains(&imm.table_id) {
280                    continue;
281                }
282                forward_iters.push(imm.clone().into_forward_iter());
283            }
284            let iter = MergeIterator::new(forward_iters);
285            let left_iter = UserIterator::new(
286                iter,
287                (Bound::Unbounded, Bound::Unbounded),
288                u64::MAX,
289                0,
290                None,
291            );
292            compaction_executor.spawn(async move {
293                match check_flush_result(
294                    left_iter,
295                    Vec::from_iter(existing_table_ids.iter().cloned()),
296                    sst_infos,
297                    context,
298                )
299                .await
300                {
301                    Err(e) => {
302                        tracing::warn!(error = %e.as_report(), "Failed check flush result of memtable");
303                    }
304                    Ok(true) => (),
305                    Ok(false) => {
306                        panic!(
307                            "failed to check flush result consistency of state-table {:?}",
308                            existing_table_ids
309                        );
310                    }
311                }
312            });
313        }
314        Ok(level0)
315    } else {
316        Err(err.unwrap())
317    }
318}
319
320/// Merge multiple batches into a larger one
321pub async fn merge_imms_in_memory(
322    table_id: TableId,
323    imms: Vec<ImmutableMemtable>,
324    memory_tracker: Option<MemoryTracker>,
325) -> ImmutableMemtable {
326    let mut epochs = vec![];
327    let mut merged_size = 0;
328    assert!(imms.iter().rev().map(|imm| imm.batch_id()).is_sorted());
329    let max_imm_id = imms[0].batch_id();
330
331    let has_old_value = imms[0].has_old_value();
332    // TODO: make sure that the corner case on switch_op_consistency is handled
333    // If the imm of a table id contains old value, all other imm of the same table id should have old value
334    assert!(imms.iter().all(|imm| imm.has_old_value() == has_old_value));
335
336    let (old_value_size, global_old_value_size) = if has_old_value {
337        (
338            imms.iter()
339                .map(|imm| imm.old_values().expect("has old value").size)
340                .sum(),
341            Some(
342                imms[0]
343                    .old_values()
344                    .expect("has old value")
345                    .global_old_value_size
346                    .clone(),
347            ),
348        )
349    } else {
350        (0, None)
351    };
352
353    let mut imm_iters = Vec::with_capacity(imms.len());
354    let key_count = imms.iter().map(|imm| imm.key_count()).sum();
355    let value_count = imms.iter().map(|imm| imm.value_count()).sum();
356    for imm in imms {
357        assert!(imm.key_count() > 0, "imm should not be empty");
358        assert_eq!(
359            table_id,
360            imm.table_id(),
361            "should only merge data belonging to the same table"
362        );
363
364        epochs.push(imm.min_epoch());
365        merged_size += imm.size();
366
367        imm_iters.push(imm.into_forward_iter());
368    }
369    epochs.sort();
370
371    // use merge iterator to merge input imms
372    let mut mi = MergeIterator::new(imm_iters);
373    mi.rewind_no_await();
374    assert!(mi.is_valid());
375
376    let first_item_key = mi.current_key_entry().key.clone();
377
378    let mut merged_entries: Vec<SharedBufferKeyEntry> = Vec::with_capacity(key_count);
379    let mut values: Vec<VersionedSharedBufferValue> = Vec::with_capacity(value_count);
380    let mut old_values: Option<Vec<Bytes>> = if has_old_value {
381        Some(Vec::with_capacity(value_count))
382    } else {
383        None
384    };
385
386    merged_entries.push(SharedBufferKeyEntry {
387        key: first_item_key.clone(),
388        value_offset: 0,
389    });
390
391    // Use first key, max epoch to initialize the tracker to ensure that the check first call to full_key_tracker.observe will succeed
392    let mut full_key_tracker = FullKeyTracker::<Bytes>::new(FullKey::new_with_gap_epoch(
393        table_id,
394        first_item_key,
395        EpochWithGap::new_max_epoch(),
396    ));
397
398    while mi.is_valid() {
399        let key_entry = mi.current_key_entry();
400        let user_key = UserKey {
401            table_id,
402            table_key: key_entry.key.clone(),
403        };
404        if full_key_tracker.observe_multi_version(
405            user_key,
406            key_entry
407                .new_values
408                .iter()
409                .map(|(epoch_with_gap, _)| *epoch_with_gap),
410        ) {
411            let last_entry = merged_entries.last_mut().expect("non-empty");
412            if last_entry.value_offset == values.len() {
413                warn!(key = ?last_entry.key, "key has no value in imm compact. skipped");
414                last_entry.key = full_key_tracker.latest_user_key().table_key.clone();
415            } else {
416                // Record kv entries
417                merged_entries.push(SharedBufferKeyEntry {
418                    key: full_key_tracker.latest_user_key().table_key.clone(),
419                    value_offset: values.len(),
420                });
421            }
422        }
423        values.extend(
424            key_entry
425                .new_values
426                .iter()
427                .map(|(epoch_with_gap, value)| (*epoch_with_gap, value.clone())),
428        );
429        if let Some(old_values) = &mut old_values {
430            old_values.extend(key_entry.old_values.expect("should exist").iter().cloned())
431        }
432        mi.advance_peek_to_next_key();
433        // Since there is no blocking point in this method, but it is cpu intensive, we call this method
434        // to do cooperative scheduling
435        tokio::task::consume_budget().await;
436    }
437
438    let old_values = old_values.map(|old_values| {
439        SharedBufferBatchOldValues::new(
440            old_values,
441            old_value_size,
442            global_old_value_size.expect("should exist when has old value"),
443        )
444    });
445
446    SharedBufferBatch {
447        inner: Arc::new(SharedBufferBatchInner::new_with_multi_epoch_batches(
448            epochs,
449            merged_entries,
450            values,
451            old_values,
452            merged_size,
453            max_imm_id,
454            memory_tracker,
455        )),
456        table_id,
457    }
458}
459
460///  Based on the incoming payload and opts, calculate the sharding method and sstable size of shared buffer compaction.
461fn generate_splits(
462    payload: &Vec<ImmutableMemtable>,
463    existing_table_ids: &HashSet<TableId>,
464    storage_opts: &StorageOpts,
465) -> (Vec<KeyRange>, u64, BTreeMap<TableId, u32>) {
466    let mut size_and_start_user_keys = vec![];
467    let mut compact_data_size = 0;
468    let mut table_size_infos: HashMap<TableId, u64> = HashMap::default();
469    let mut table_vnode_partition = BTreeMap::default();
470    for imm in payload {
471        let data_size = {
472            // calculate encoded bytes of key var length
473            (imm.value_count() * EPOCH_LEN + imm.size()) as u64
474        };
475        compact_data_size += data_size;
476        size_and_start_user_keys.push((
477            data_size,
478            FullKey {
479                user_key: imm.start_user_key(),
480                epoch_with_gap: EpochWithGap::new_max_epoch(),
481            }
482            .encode(),
483        ));
484        let v = table_size_infos.entry(imm.table_id).or_insert(0);
485        *v += data_size;
486    }
487
488    size_and_start_user_keys
489        .sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref()));
490    let mut splits = Vec::with_capacity(size_and_start_user_keys.len());
491    splits.push(KeyRange::new(Bytes::new(), Bytes::new()));
492    let sstable_size = (storage_opts.sstable_size_mb as u64) << 20;
493    let min_sstable_size = (storage_opts.min_sstable_size_mb as u64) << 20;
494    let parallel_compact_size = (storage_opts.parallel_compact_size_mb as u64) << 20;
495    let parallelism = std::cmp::min(
496        storage_opts.share_buffers_sync_parallelism as u64,
497        size_and_start_user_keys.len() as u64,
498    );
499    let sub_compaction_data_size = if compact_data_size > parallel_compact_size && parallelism > 1 {
500        compact_data_size / parallelism
501    } else {
502        compact_data_size
503    };
504
505    if parallelism > 1 && compact_data_size > sstable_size {
506        let mut last_buffer_size = 0;
507        let mut last_key: Vec<u8> = vec![];
508        for (data_size, key) in size_and_start_user_keys {
509            if last_buffer_size >= sub_compaction_data_size && !last_key.eq(&key) {
510                splits.last_mut().unwrap().right = Bytes::from(key.clone());
511                splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
512                last_buffer_size = data_size;
513            } else {
514                last_buffer_size += data_size;
515            }
516
517            last_key = key;
518        }
519    }
520
521    if compact_data_size > sstable_size {
522        // Meta node will calculate size of each state-table in one task in `risingwave_meta::hummock::manager::compaction::calculate_vnode_partition`.
523        // To make the calculate result more accurately we shall split the large state-table from other small ones.
524        for table_id in existing_table_ids {
525            if let Some(table_size) = table_size_infos.get(table_id)
526                && *table_size > min_sstable_size
527            {
528                table_vnode_partition.insert(*table_id, 1);
529            }
530        }
531    }
532
533    // mul 1.2 for other extra memory usage.
534    // Ensure that the size of each sstable is still less than `sstable_size` after optimization to avoid generating a huge size sstable which will affect the object store
535    let sub_compaction_sstable_size = std::cmp::min(sstable_size, sub_compaction_data_size * 6 / 5);
536    (splits, sub_compaction_sstable_size, table_vnode_partition)
537}
538
539pub struct SharedBufferCompactRunner {
540    compactor: Compactor,
541    split_index: usize,
542}
543
544impl SharedBufferCompactRunner {
545    pub fn new(
546        split_index: usize,
547        key_range: KeyRange,
548        context: CompactorContext,
549        sub_compaction_sstable_size: usize,
550        table_vnode_partition: BTreeMap<TableId, u32>,
551        use_block_based_filter: bool,
552        object_id_getter: Arc<dyn GetObjectId>,
553    ) -> Self {
554        let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
555        options.capacity = sub_compaction_sstable_size;
556        let compactor = Compactor::new(
557            context,
558            options,
559            super::TaskConfig {
560                key_range,
561                cache_policy: CachePolicy::Fill(Hint::Normal),
562                gc_delete_keys: GC_DELETE_KEYS_FOR_FLUSH,
563                retain_multiple_version: true,
564                stats_target_table_ids: None,
565                task_type: compact_task::TaskType::SharedBuffer,
566                table_vnode_partition,
567                use_block_based_filter,
568                table_schemas: Default::default(),
569                disable_drop_column_optimization: false,
570            },
571            object_id_getter,
572        );
573        Self {
574            compactor,
575            split_index,
576        }
577    }
578
579    pub async fn run(
580        self,
581        iter: impl HummockIterator<Direction = Forward>,
582        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
583    ) -> HummockResult<CompactOutput> {
584        let dummy_compaction_filter = DummyCompactionFilter {};
585        let (ssts, table_stats_map) = self
586            .compactor
587            .compact_key_range(
588                iter,
589                dummy_compaction_filter,
590                compaction_catalog_agent_ref,
591                None,
592                None,
593                None,
594            )
595            .await?;
596        Ok((self.split_index, ssts, table_stats_map))
597    }
598}
599
600#[cfg(test)]
601mod tests {
602    use std::collections::HashSet;
603
604    use bytes::Bytes;
605    use risingwave_common::catalog::TableId;
606    use risingwave_common::hash::VirtualNode;
607    use risingwave_common::util::epoch::test_epoch;
608    use risingwave_hummock_sdk::key::{TableKey, prefix_slice_with_vnode};
609
610    use crate::hummock::compactor::shared_buffer_compact::generate_splits;
611    use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferValue;
612    use crate::mem_table::ImmutableMemtable;
613    use crate::opts::StorageOpts;
614
615    fn generate_key(key: &str) -> TableKey<Bytes> {
616        TableKey(prefix_slice_with_vnode(
617            VirtualNode::from_index(1),
618            key.as_bytes(),
619        ))
620    }
621
622    #[tokio::test]
623    async fn test_generate_splits_in_order() {
624        let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test(
625            test_epoch(3),
626            0,
627            vec![(
628                generate_key("dddd"),
629                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
630            )],
631            1024 * 1024,
632            TableId::new(1),
633        );
634        let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test(
635            test_epoch(3),
636            0,
637            vec![(
638                generate_key("abb"),
639                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
640            )],
641            (1024 + 256) * 1024,
642            TableId::new(1),
643        );
644
645        let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test(
646            test_epoch(2),
647            0,
648            vec![(
649                generate_key("abc"),
650                SharedBufferValue::Insert(Bytes::from_static(b"v2")),
651            )],
652            (1024 + 512) * 1024,
653            TableId::new(1),
654        );
655        let imm4 = ImmutableMemtable::build_shared_buffer_batch_for_test(
656            test_epoch(3),
657            0,
658            vec![(
659                generate_key("aaa"),
660                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
661            )],
662            (1024 + 512) * 1024,
663            TableId::new(1),
664        );
665
666        let imm5 = ImmutableMemtable::build_shared_buffer_batch_for_test(
667            test_epoch(3),
668            0,
669            vec![(
670                generate_key("aaa"),
671                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
672            )],
673            (1024 + 256) * 1024,
674            TableId::new(2),
675        );
676
677        let storage_opts = StorageOpts {
678            share_buffers_sync_parallelism: 3,
679            parallel_compact_size_mb: 1,
680            sstable_size_mb: 1,
681            ..Default::default()
682        };
683        let payload = vec![imm1, imm2, imm3, imm4, imm5];
684        let (splits, _sstable_capacity, vnodes) = generate_splits(
685            &payload,
686            &HashSet::from_iter([1.into(), 2.into()]),
687            &storage_opts,
688        );
689        assert_eq!(
690            splits.len(),
691            storage_opts.share_buffers_sync_parallelism as usize
692        );
693        assert!(vnodes.is_empty());
694
695        // Basic validation: splits should be continuous and monotonic
696        for i in 1..splits.len() {
697            assert_eq!(splits[i].left, splits[i - 1].right);
698            assert!(splits[i].left > splits[i - 1].left);
699            assert!(splits[i].right.is_empty() || splits[i].left < splits[i].right);
700        }
701    }
702
703    #[tokio::test]
704    async fn test_generate_splits_no_duplicate_keys() {
705        // Create test data with specific ordering to detect sorting issues
706        // Make data sizes large enough to trigger splitting
707        let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test(
708            test_epoch(1),
709            0,
710            vec![(
711                generate_key("zzz"), // This should be last after sorting
712                SharedBufferValue::Insert(Bytes::from_static(b"v1")),
713            )],
714            2 * 1024 * 1024, // 2MB to ensure compact_data_size > sstable_size
715            TableId::new(1),
716        );
717
718        let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test(
719            test_epoch(1),
720            0,
721            vec![(
722                generate_key("aaa"), // This should be first after sorting
723                SharedBufferValue::Insert(Bytes::from_static(b"v1")),
724            )],
725            2 * 1024 * 1024, // 2MB
726            TableId::new(1),
727        );
728
729        let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test(
730            test_epoch(1),
731            0,
732            vec![(
733                generate_key("mmm"), // This should be middle after sorting
734                SharedBufferValue::Insert(Bytes::from_static(b"v1")),
735            )],
736            2 * 1024 * 1024, // 2MB
737            TableId::new(1),
738        );
739
740        let storage_opts = StorageOpts {
741            share_buffers_sync_parallelism: 3, // Enable parallelism
742            parallel_compact_size_mb: 2,       // Small threshold to trigger splitting
743            sstable_size_mb: 1,                // Small SSTable size to trigger condition
744            ..Default::default()
745        };
746
747        // Test with payload in wrong order (zzz, aaa, mmm) instead of sorted order (aaa, mmm, zzz)
748        let payload = vec![imm1, imm2, imm3];
749        let (splits, _sstable_capacity, _vnodes) =
750            generate_splits(&payload, &HashSet::from_iter([1.into()]), &storage_opts);
751
752        // Should have multiple splits due to large data size
753        assert!(
754            splits.len() > 1,
755            "Expected multiple splits, got {}",
756            splits.len()
757        );
758
759        // Verify no key range overlaps between splits
760        for i in 0..splits.len() {
761            for j in (i + 1)..splits.len() {
762                let split_i = &splits[i];
763                let split_j = &splits[j];
764
765                // Check that splits don't overlap
766                if !split_i.right.is_empty() && !split_j.left.is_empty() {
767                    assert!(
768                        split_i.right <= split_j.left || split_j.right <= split_i.left,
769                        "Split {} and {} overlap: [{:?}, {:?}) vs [{:?}, {:?})",
770                        i,
771                        j,
772                        split_i.left,
773                        split_i.right,
774                        split_j.left,
775                        split_j.right
776                    );
777                }
778            }
779        }
780
781        // Additional verification: ensure splits are sorted
782        for i in 1..splits.len() {
783            if !splits[i - 1].right.is_empty() && !splits[i].left.is_empty() {
784                assert!(
785                    splits[i - 1].right <= splits[i].left,
786                    "Splits are not in sorted order at index {}: {:?} > {:?}",
787                    i,
788                    splits[i - 1].right,
789                    splits[i].left
790                );
791            }
792        }
793    }
794}