risingwave_storage/hummock/compactor/
shared_buffer_compact.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::Bound;
17use std::sync::atomic::AtomicUsize;
18use std::sync::atomic::Ordering::Relaxed;
19use std::sync::{Arc, LazyLock};
20
21use await_tree::InstrumentAwait;
22use bytes::Bytes;
23use foyer::Hint;
24use futures::future::try_join;
25use futures::{FutureExt, StreamExt, stream};
26use itertools::Itertools;
27use risingwave_common::catalog::TableId;
28use risingwave_hummock_sdk::key::{EPOCH_LEN, FullKey, FullKeyTracker, UserKey};
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::{EpochWithGap, KeyComparator, LocalSstableInfo};
31use risingwave_pb::hummock::compact_task;
32use thiserror_ext::AsReport;
33use tracing::{error, warn};
34
35use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
36use crate::hummock::compactor::compaction_filter::DummyCompactionFilter;
37use crate::hummock::compactor::context::{CompactorContext, await_tree_key};
38use crate::hummock::compactor::{CompactOutput, Compactor, check_flush_result};
39use crate::hummock::event_handler::uploader::UploadTaskOutput;
40use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator, UserIterator};
41use crate::hummock::shared_buffer::shared_buffer_batch::{
42    SharedBufferBatch, SharedBufferBatchInner, SharedBufferBatchOldValues, SharedBufferKeyEntry,
43    VersionedSharedBufferValue,
44};
45use crate::hummock::utils::MemoryTracker;
46use crate::hummock::{
47    CachePolicy, GetObjectId, HummockError, HummockResult, ObjectIdManagerRef,
48    SstableBuilderOptions,
49};
50use crate::mem_table::ImmutableMemtable;
51use crate::opts::StorageOpts;
52
53const GC_DELETE_KEYS_FOR_FLUSH: bool = false;
54
55/// Flush shared buffer to level0. Resulted SSTs are grouped by compaction group.
56pub async fn compact(
57    context: CompactorContext,
58    object_id_manager: ObjectIdManagerRef,
59    payload: Vec<ImmutableMemtable>,
60    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
61) -> HummockResult<UploadTaskOutput> {
62    let table_ids_with_old_value: HashSet<TableId> = payload
63        .iter()
64        .filter(|imm| imm.has_old_value())
65        .map(|imm| imm.table_id)
66        .collect();
67    let mut non_log_store_new_value_payload = Vec::with_capacity(payload.len());
68    let mut log_store_new_value_payload = Vec::with_capacity(payload.len());
69    let mut old_value_payload = Vec::with_capacity(payload.len());
70    for imm in payload {
71        if table_ids_with_old_value.contains(&imm.table_id) {
72            if imm.has_old_value() {
73                old_value_payload.push(imm.clone());
74            }
75            log_store_new_value_payload.push(imm);
76        } else {
77            assert!(!imm.has_old_value());
78            non_log_store_new_value_payload.push(imm);
79        }
80    }
81    let non_log_store_new_value_future = async {
82        if non_log_store_new_value_payload.is_empty() {
83            Ok(vec![])
84        } else {
85            compact_shared_buffer::<true>(
86                context.clone(),
87                object_id_manager.clone(),
88                compaction_catalog_manager_ref.clone(),
89                non_log_store_new_value_payload,
90            )
91            .instrument_await("shared_buffer_compact_non_log_store_new_value")
92            .await
93        }
94    };
95
96    let log_store_new_value_future = async {
97        if log_store_new_value_payload.is_empty() {
98            Ok(vec![])
99        } else {
100            compact_shared_buffer::<true>(
101                context.clone(),
102                object_id_manager.clone(),
103                compaction_catalog_manager_ref.clone(),
104                log_store_new_value_payload,
105            )
106            .instrument_await("shared_buffer_compact_log_store_new_value")
107            .await
108        }
109    };
110
111    let old_value_future = async {
112        if old_value_payload.is_empty() {
113            Ok(vec![])
114        } else {
115            compact_shared_buffer::<false>(
116                context.clone(),
117                object_id_manager.clone(),
118                compaction_catalog_manager_ref.clone(),
119                old_value_payload,
120            )
121            .instrument_await("shared_buffer_compact_log_store_old_value")
122            .await
123        }
124    };
125
126    // Note that the output is reordered compared with input `payload`.
127    let ((non_log_store_new_value_ssts, log_store_new_value_ssts), old_value_ssts) = try_join(
128        try_join(non_log_store_new_value_future, log_store_new_value_future),
129        old_value_future,
130    )
131    .await?;
132
133    let mut new_value_ssts = non_log_store_new_value_ssts;
134    new_value_ssts.extend(log_store_new_value_ssts);
135
136    Ok(UploadTaskOutput {
137        new_value_ssts,
138        old_value_ssts,
139        wait_poll_timer: None,
140    })
141}
142
143/// For compaction from shared buffer to level 0, this is the only function gets called.
144///
145/// The `IS_NEW_VALUE` flag means for the given payload, we are doing compaction using its new value or old value.
146/// When `IS_NEW_VALUE` is false, we are compacting with old value, and the payload imms should have `old_values` not `None`
147async fn compact_shared_buffer<const IS_NEW_VALUE: bool>(
148    context: CompactorContext,
149    object_id_manager: ObjectIdManagerRef,
150    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
151    mut payload: Vec<ImmutableMemtable>,
152) -> HummockResult<Vec<LocalSstableInfo>> {
153    if !IS_NEW_VALUE {
154        assert!(payload.iter().all(|imm| imm.has_old_value()));
155    }
156    // Local memory compaction looks at all key ranges.
157    let existing_table_ids: HashSet<TableId> =
158        payload.iter().map(|imm| imm.table_id).dedup().collect();
159    assert!(!existing_table_ids.is_empty());
160
161    let compaction_catalog_agent_ref = compaction_catalog_manager_ref
162        .acquire(existing_table_ids.iter().copied().collect())
163        .await?;
164    let existing_table_ids = compaction_catalog_agent_ref
165        .table_ids()
166        .collect::<HashSet<_>>();
167    payload.retain(|imm| {
168        let ret = existing_table_ids.contains(&imm.table_id);
169        if !ret {
170            error!(
171                "can not find table {:?}, it may be removed by meta-service",
172                imm.table_id
173            );
174        }
175        ret
176    });
177
178    let total_key_count = payload.iter().map(|imm| imm.key_count()).sum::<usize>();
179    let (splits, sub_compaction_sstable_size, table_vnode_partition) =
180        generate_splits(&payload, &existing_table_ids, context.storage_opts.as_ref());
181    let parallelism = splits.len();
182    let mut compact_success = true;
183    let mut output_ssts = Vec::with_capacity(parallelism);
184    let mut compaction_futures = vec![];
185    // Shared buffer compaction always goes to L0. Use block_based_filter when kv_count is large.
186    // Use None to apply the default threshold since shared buffer flush doesn't have a CompactTask.
187    let use_block_based_filter =
188        risingwave_hummock_sdk::filter_utils::is_kv_count_too_large_for_xor16(
189            total_key_count as u64,
190            None,
191        );
192
193    for (split_index, key_range) in splits.into_iter().enumerate() {
194        let compactor = SharedBufferCompactRunner::new(
195            split_index,
196            key_range,
197            context.clone(),
198            sub_compaction_sstable_size as usize,
199            table_vnode_partition.clone(),
200            use_block_based_filter,
201            object_id_manager.clone(),
202        );
203        let mut forward_iters = Vec::with_capacity(payload.len());
204        for imm in &payload {
205            forward_iters.push(imm.clone().into_directed_iter::<Forward, IS_NEW_VALUE>());
206        }
207        let compaction_executor = context.compaction_executor.clone();
208        let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
209        let handle = compaction_executor.spawn({
210            static NEXT_SHARED_BUFFER_COMPACT_ID: LazyLock<AtomicUsize> =
211                LazyLock::new(|| AtomicUsize::new(0));
212            let tree_root = context.await_tree_reg.as_ref().map(|reg| {
213                let id = NEXT_SHARED_BUFFER_COMPACT_ID.fetch_add(1, Relaxed);
214                reg.register(
215                    await_tree_key::CompactSharedBuffer { id },
216                    format!(
217                        "Compact Shared Buffer: {:?}",
218                        payload
219                            .iter()
220                            .flat_map(|imm| imm.epochs().iter())
221                            .copied()
222                            .collect::<BTreeSet<_>>()
223                    ),
224                )
225            });
226            let future = compactor.run(
227                MergeIterator::new(forward_iters),
228                compaction_catalog_agent_ref,
229            );
230            if let Some(root) = tree_root {
231                root.instrument(future).left_future()
232            } else {
233                future.right_future()
234            }
235        });
236        compaction_futures.push(handle);
237    }
238
239    let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
240    let mut err = None;
241    while let Some(future_result) = buffered.next().await {
242        match future_result {
243            Ok(Ok((split_index, ssts, table_stats_map))) => {
244                output_ssts.push((split_index, ssts, table_stats_map));
245            }
246            Ok(Err(e)) => {
247                compact_success = false;
248                tracing::warn!(error = %e.as_report(), "Shared Buffer Compaction failed with error");
249                err = Some(e);
250            }
251            Err(e) => {
252                compact_success = false;
253                tracing::warn!(
254                    error = %e.as_report(),
255                    "Shared Buffer Compaction failed with future error",
256                );
257                err = Some(HummockError::compaction_executor(
258                    "failed while execute in tokio",
259                ));
260            }
261        }
262    }
263
264    // Sort by split/key range index.
265    output_ssts.sort_by_key(|(split_index, ..)| *split_index);
266
267    if compact_success {
268        let mut level0 = Vec::with_capacity(parallelism);
269        let mut sst_infos = vec![];
270        for (_, ssts, _) in output_ssts {
271            for sst_info in &ssts {
272                context
273                    .compactor_metrics
274                    .write_build_l0_bytes
275                    .inc_by(sst_info.file_size());
276
277                sst_infos.push(sst_info.sst_info.clone());
278            }
279            level0.extend(ssts);
280        }
281        if context.storage_opts.check_compaction_result {
282            let compaction_executor = context.compaction_executor.clone();
283            let mut forward_iters = Vec::with_capacity(payload.len());
284            for imm in &payload {
285                if !existing_table_ids.contains(&imm.table_id) {
286                    continue;
287                }
288                forward_iters.push(imm.clone().into_forward_iter());
289            }
290            let iter = MergeIterator::new(forward_iters);
291            let left_iter = UserIterator::new(
292                iter,
293                (Bound::Unbounded, Bound::Unbounded),
294                u64::MAX,
295                0,
296                None,
297            );
298            compaction_executor.spawn(async move {
299                match check_flush_result(
300                    left_iter,
301                    Vec::from_iter(existing_table_ids.iter().cloned()),
302                    sst_infos,
303                    context,
304                )
305                .await
306                {
307                    Err(e) => {
308                        tracing::warn!(error = %e.as_report(), "Failed check flush result of memtable");
309                    }
310                    Ok(true) => (),
311                    Ok(false) => {
312                        panic!(
313                            "failed to check flush result consistency of state-table {:?}",
314                            existing_table_ids
315                        );
316                    }
317                }
318            });
319        }
320        Ok(level0)
321    } else {
322        Err(err.unwrap())
323    }
324}
325
326/// Merge multiple batches into a larger one
327pub async fn merge_imms_in_memory(
328    table_id: TableId,
329    imms: Vec<ImmutableMemtable>,
330    memory_tracker: Option<MemoryTracker>,
331) -> ImmutableMemtable {
332    let mut epochs = vec![];
333    let mut merged_size = 0;
334    assert!(imms.iter().rev().map(|imm| imm.batch_id()).is_sorted());
335    let max_imm_id = imms[0].batch_id();
336
337    let has_old_value = imms[0].has_old_value();
338    // TODO: make sure that the corner case on switch_op_consistency is handled
339    // If the imm of a table id contains old value, all other imm of the same table id should have old value
340    assert!(imms.iter().all(|imm| imm.has_old_value() == has_old_value));
341
342    let (old_value_size, global_old_value_size) = if has_old_value {
343        (
344            imms.iter()
345                .map(|imm| imm.old_values().expect("has old value").size)
346                .sum(),
347            Some(
348                imms[0]
349                    .old_values()
350                    .expect("has old value")
351                    .global_old_value_size
352                    .clone(),
353            ),
354        )
355    } else {
356        (0, None)
357    };
358
359    let mut imm_iters = Vec::with_capacity(imms.len());
360    let key_count = imms.iter().map(|imm| imm.key_count()).sum();
361    let value_count = imms.iter().map(|imm| imm.value_count()).sum();
362    for imm in imms {
363        assert!(imm.key_count() > 0, "imm should not be empty");
364        assert_eq!(
365            table_id,
366            imm.table_id(),
367            "should only merge data belonging to the same table"
368        );
369
370        epochs.push(imm.min_epoch());
371        merged_size += imm.size();
372
373        imm_iters.push(imm.into_forward_iter());
374    }
375    epochs.sort();
376
377    // use merge iterator to merge input imms
378    let mut mi = MergeIterator::new(imm_iters);
379    mi.rewind_no_await();
380    assert!(mi.is_valid());
381
382    let first_item_key = mi.current_key_entry().key.clone();
383
384    let mut merged_entries: Vec<SharedBufferKeyEntry> = Vec::with_capacity(key_count);
385    let mut values: Vec<VersionedSharedBufferValue> = Vec::with_capacity(value_count);
386    let mut old_values: Option<Vec<Bytes>> = if has_old_value {
387        Some(Vec::with_capacity(value_count))
388    } else {
389        None
390    };
391
392    merged_entries.push(SharedBufferKeyEntry {
393        key: first_item_key.clone(),
394        value_offset: 0,
395    });
396
397    // Use first key, max epoch to initialize the tracker to ensure that the check first call to full_key_tracker.observe will succeed
398    let mut full_key_tracker = FullKeyTracker::<Bytes>::new(FullKey::new_with_gap_epoch(
399        table_id,
400        first_item_key,
401        EpochWithGap::new_max_epoch(),
402    ));
403
404    while mi.is_valid() {
405        let key_entry = mi.current_key_entry();
406        let user_key = UserKey {
407            table_id,
408            table_key: key_entry.key.clone(),
409        };
410        if full_key_tracker.observe_multi_version(
411            user_key,
412            key_entry
413                .new_values
414                .iter()
415                .map(|(epoch_with_gap, _)| *epoch_with_gap),
416        ) {
417            let last_entry = merged_entries.last_mut().expect("non-empty");
418            if last_entry.value_offset == values.len() {
419                warn!(key = ?last_entry.key, "key has no value in imm compact. skipped");
420                last_entry.key = full_key_tracker.latest_user_key().table_key.clone();
421            } else {
422                // Record kv entries
423                merged_entries.push(SharedBufferKeyEntry {
424                    key: full_key_tracker.latest_user_key().table_key.clone(),
425                    value_offset: values.len(),
426                });
427            }
428        }
429        values.extend(
430            key_entry
431                .new_values
432                .iter()
433                .map(|(epoch_with_gap, value)| (*epoch_with_gap, value.clone())),
434        );
435        if let Some(old_values) = &mut old_values {
436            old_values.extend(key_entry.old_values.expect("should exist").iter().cloned())
437        }
438        mi.advance_peek_to_next_key();
439        // Since there is no blocking point in this method, but it is cpu intensive, we call this method
440        // to do cooperative scheduling
441        tokio::task::consume_budget().await;
442    }
443
444    let old_values = old_values.map(|old_values| {
445        SharedBufferBatchOldValues::new(
446            old_values,
447            old_value_size,
448            global_old_value_size.expect("should exist when has old value"),
449        )
450    });
451
452    SharedBufferBatch {
453        inner: Arc::new(SharedBufferBatchInner::new_with_multi_epoch_batches(
454            epochs,
455            merged_entries,
456            values,
457            old_values,
458            merged_size,
459            max_imm_id,
460            memory_tracker,
461        )),
462        table_id,
463    }
464}
465
466///  Based on the incoming payload and opts, calculate the sharding method and sstable size of shared buffer compaction.
467fn generate_splits(
468    payload: &Vec<ImmutableMemtable>,
469    existing_table_ids: &HashSet<TableId>,
470    storage_opts: &StorageOpts,
471) -> (Vec<KeyRange>, u64, BTreeMap<TableId, u32>) {
472    let mut size_and_start_user_keys = vec![];
473    let mut compact_data_size = 0;
474    let mut table_size_infos: HashMap<TableId, u64> = HashMap::default();
475    let mut table_vnode_partition = BTreeMap::default();
476    for imm in payload {
477        let data_size = {
478            // calculate encoded bytes of key var length
479            (imm.value_count() * EPOCH_LEN + imm.size()) as u64
480        };
481        compact_data_size += data_size;
482        size_and_start_user_keys.push((
483            data_size,
484            FullKey {
485                user_key: imm.start_user_key(),
486                epoch_with_gap: EpochWithGap::new_max_epoch(),
487            }
488            .encode(),
489        ));
490        let v = table_size_infos.entry(imm.table_id).or_insert(0);
491        *v += data_size;
492    }
493
494    size_and_start_user_keys
495        .sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref()));
496    let mut splits = Vec::with_capacity(size_and_start_user_keys.len());
497    splits.push(KeyRange::new(Bytes::new(), Bytes::new()));
498    let sstable_size = (storage_opts.sstable_size_mb as u64) << 20;
499    let min_sstable_size = (storage_opts.min_sstable_size_mb as u64) << 20;
500    let parallel_compact_size = (storage_opts.parallel_compact_size_mb as u64) << 20;
501    let parallelism = std::cmp::min(
502        storage_opts.share_buffers_sync_parallelism as u64,
503        size_and_start_user_keys.len() as u64,
504    );
505    let sub_compaction_data_size = if compact_data_size > parallel_compact_size && parallelism > 1 {
506        compact_data_size / parallelism
507    } else {
508        compact_data_size
509    };
510
511    if parallelism > 1 && compact_data_size > sstable_size {
512        let mut last_buffer_size = 0;
513        let mut last_key: Vec<u8> = vec![];
514        for (data_size, key) in size_and_start_user_keys {
515            if last_buffer_size >= sub_compaction_data_size && !last_key.eq(&key) {
516                splits.last_mut().unwrap().right = Bytes::from(key.clone());
517                splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
518                last_buffer_size = data_size;
519            } else {
520                last_buffer_size += data_size;
521            }
522
523            last_key = key;
524        }
525    }
526
527    if compact_data_size > sstable_size {
528        // Meta node will calculate size of each state-table in one task in `risingwave_meta::hummock::manager::compaction::calculate_vnode_partition`.
529        // To make the calculate result more accurately we shall split the large state-table from other small ones.
530        for table_id in existing_table_ids {
531            if let Some(table_size) = table_size_infos.get(table_id)
532                && *table_size > min_sstable_size
533            {
534                table_vnode_partition.insert(*table_id, 1);
535            }
536        }
537    }
538
539    // mul 1.2 for other extra memory usage.
540    // Ensure that the size of each sstable is still less than `sstable_size` after optimization to avoid generating a huge size sstable which will affect the object store
541    let sub_compaction_sstable_size = std::cmp::min(sstable_size, sub_compaction_data_size * 6 / 5);
542    (splits, sub_compaction_sstable_size, table_vnode_partition)
543}
544
545pub struct SharedBufferCompactRunner {
546    compactor: Compactor,
547    split_index: usize,
548}
549
550impl SharedBufferCompactRunner {
551    pub fn new(
552        split_index: usize,
553        key_range: KeyRange,
554        context: CompactorContext,
555        sub_compaction_sstable_size: usize,
556        table_vnode_partition: BTreeMap<TableId, u32>,
557        use_block_based_filter: bool,
558        object_id_getter: Arc<dyn GetObjectId>,
559    ) -> Self {
560        let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
561        options.capacity = sub_compaction_sstable_size;
562        let compactor = Compactor::new(
563            context,
564            options,
565            super::TaskConfig {
566                key_range,
567                cache_policy: CachePolicy::Fill(Hint::Normal),
568                gc_delete_keys: GC_DELETE_KEYS_FOR_FLUSH,
569                retain_multiple_version: true,
570                stats_target_table_ids: None,
571                task_type: compact_task::TaskType::SharedBuffer,
572                table_vnode_partition,
573                use_block_based_filter,
574                table_schemas: Default::default(),
575                disable_drop_column_optimization: false,
576            },
577            object_id_getter,
578        );
579        Self {
580            compactor,
581            split_index,
582        }
583    }
584
585    pub async fn run(
586        self,
587        iter: impl HummockIterator<Direction = Forward>,
588        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
589    ) -> HummockResult<CompactOutput> {
590        let dummy_compaction_filter = DummyCompactionFilter {};
591        let (ssts, table_stats_map) = self
592            .compactor
593            .compact_key_range(
594                iter,
595                dummy_compaction_filter,
596                compaction_catalog_agent_ref,
597                None,
598                None,
599                None,
600            )
601            .await?;
602        Ok((self.split_index, ssts, table_stats_map))
603    }
604}
605
606#[cfg(test)]
607mod tests {
608    use std::collections::HashSet;
609
610    use bytes::Bytes;
611    use risingwave_common::catalog::TableId;
612    use risingwave_common::hash::VirtualNode;
613    use risingwave_common::util::epoch::test_epoch;
614    use risingwave_hummock_sdk::key::{TableKey, prefix_slice_with_vnode};
615
616    use crate::hummock::compactor::shared_buffer_compact::generate_splits;
617    use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferValue;
618    use crate::mem_table::ImmutableMemtable;
619    use crate::opts::StorageOpts;
620
621    fn generate_key(key: &str) -> TableKey<Bytes> {
622        TableKey(prefix_slice_with_vnode(
623            VirtualNode::from_index(1),
624            key.as_bytes(),
625        ))
626    }
627
628    #[tokio::test]
629    async fn test_generate_splits_in_order() {
630        let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test(
631            test_epoch(3),
632            0,
633            vec![(
634                generate_key("dddd"),
635                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
636            )],
637            1024 * 1024,
638            TableId::new(1),
639        );
640        let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test(
641            test_epoch(3),
642            0,
643            vec![(
644                generate_key("abb"),
645                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
646            )],
647            (1024 + 256) * 1024,
648            TableId::new(1),
649        );
650
651        let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test(
652            test_epoch(2),
653            0,
654            vec![(
655                generate_key("abc"),
656                SharedBufferValue::Insert(Bytes::from_static(b"v2")),
657            )],
658            (1024 + 512) * 1024,
659            TableId::new(1),
660        );
661        let imm4 = ImmutableMemtable::build_shared_buffer_batch_for_test(
662            test_epoch(3),
663            0,
664            vec![(
665                generate_key("aaa"),
666                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
667            )],
668            (1024 + 512) * 1024,
669            TableId::new(1),
670        );
671
672        let imm5 = ImmutableMemtable::build_shared_buffer_batch_for_test(
673            test_epoch(3),
674            0,
675            vec![(
676                generate_key("aaa"),
677                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
678            )],
679            (1024 + 256) * 1024,
680            TableId::new(2),
681        );
682
683        let storage_opts = StorageOpts {
684            share_buffers_sync_parallelism: 3,
685            parallel_compact_size_mb: 1,
686            sstable_size_mb: 1,
687            ..Default::default()
688        };
689        let payload = vec![imm1, imm2, imm3, imm4, imm5];
690        let (splits, _sstable_capacity, vnodes) = generate_splits(
691            &payload,
692            &HashSet::from_iter([1.into(), 2.into()]),
693            &storage_opts,
694        );
695        assert_eq!(
696            splits.len(),
697            storage_opts.share_buffers_sync_parallelism as usize
698        );
699        assert!(vnodes.is_empty());
700
701        // Basic validation: splits should be continuous and monotonic
702        for i in 1..splits.len() {
703            assert_eq!(splits[i].left, splits[i - 1].right);
704            assert!(splits[i].left > splits[i - 1].left);
705            assert!(splits[i].right.is_empty() || splits[i].left < splits[i].right);
706        }
707    }
708
709    #[tokio::test]
710    async fn test_generate_splits_no_duplicate_keys() {
711        // Create test data with specific ordering to detect sorting issues
712        // Make data sizes large enough to trigger splitting
713        let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test(
714            test_epoch(1),
715            0,
716            vec![(
717                generate_key("zzz"), // This should be last after sorting
718                SharedBufferValue::Insert(Bytes::from_static(b"v1")),
719            )],
720            2 * 1024 * 1024, // 2MB to ensure compact_data_size > sstable_size
721            TableId::new(1),
722        );
723
724        let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test(
725            test_epoch(1),
726            0,
727            vec![(
728                generate_key("aaa"), // This should be first after sorting
729                SharedBufferValue::Insert(Bytes::from_static(b"v1")),
730            )],
731            2 * 1024 * 1024, // 2MB
732            TableId::new(1),
733        );
734
735        let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test(
736            test_epoch(1),
737            0,
738            vec![(
739                generate_key("mmm"), // This should be middle after sorting
740                SharedBufferValue::Insert(Bytes::from_static(b"v1")),
741            )],
742            2 * 1024 * 1024, // 2MB
743            TableId::new(1),
744        );
745
746        let storage_opts = StorageOpts {
747            share_buffers_sync_parallelism: 3, // Enable parallelism
748            parallel_compact_size_mb: 2,       // Small threshold to trigger splitting
749            sstable_size_mb: 1,                // Small SSTable size to trigger condition
750            ..Default::default()
751        };
752
753        // Test with payload in wrong order (zzz, aaa, mmm) instead of sorted order (aaa, mmm, zzz)
754        let payload = vec![imm1, imm2, imm3];
755        let (splits, _sstable_capacity, _vnodes) =
756            generate_splits(&payload, &HashSet::from_iter([1.into()]), &storage_opts);
757
758        // Should have multiple splits due to large data size
759        assert!(
760            splits.len() > 1,
761            "Expected multiple splits, got {}",
762            splits.len()
763        );
764
765        // Verify no key range overlaps between splits
766        for i in 0..splits.len() {
767            for j in (i + 1)..splits.len() {
768                let split_i = &splits[i];
769                let split_j = &splits[j];
770
771                // Check that splits don't overlap
772                if !split_i.right.is_empty() && !split_j.left.is_empty() {
773                    assert!(
774                        split_i.right <= split_j.left || split_j.right <= split_i.left,
775                        "Split {} and {} overlap: [{:?}, {:?}) vs [{:?}, {:?})",
776                        i,
777                        j,
778                        split_i.left,
779                        split_i.right,
780                        split_j.left,
781                        split_j.right
782                    );
783                }
784            }
785        }
786
787        // Additional verification: ensure splits are sorted
788        for i in 1..splits.len() {
789            if !splits[i - 1].right.is_empty() && !splits[i].left.is_empty() {
790                assert!(
791                    splits[i - 1].right <= splits[i].left,
792                    "Splits are not in sorted order at index {}: {:?} > {:?}",
793                    i,
794                    splits[i - 1].right,
795                    splits[i].left
796                );
797            }
798        }
799    }
800}