risingwave_storage/hummock/compactor/
shared_buffer_compact.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::Bound;
17use std::sync::atomic::AtomicUsize;
18use std::sync::atomic::Ordering::Relaxed;
19use std::sync::{Arc, LazyLock};
20
21use await_tree::InstrumentAwait;
22use bytes::Bytes;
23use foyer::Hint;
24use futures::future::try_join;
25use futures::{FutureExt, StreamExt, stream};
26use itertools::Itertools;
27use risingwave_common::catalog::TableId;
28use risingwave_hummock_sdk::key::{EPOCH_LEN, FullKey, FullKeyTracker, UserKey};
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::{EpochWithGap, KeyComparator, LocalSstableInfo};
31use thiserror_ext::AsReport;
32use tracing::{error, warn};
33
34use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
35use crate::hummock::compactor::compaction_filter::DummyCompactionFilter;
36use crate::hummock::compactor::context::{CompactorContext, await_tree_key};
37use crate::hummock::compactor::{CompactOutput, Compactor, check_flush_result};
38use crate::hummock::event_handler::uploader::UploadTaskOutput;
39use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator, UserIterator};
40use crate::hummock::shared_buffer::shared_buffer_batch::{
41    SharedBufferBatch, SharedBufferBatchInner, SharedBufferBatchOldValues, SharedBufferKeyEntry,
42    VersionedSharedBufferValue,
43};
44use crate::hummock::{
45    CachePolicy, GetObjectId, HummockError, HummockResult, ObjectIdManagerRef,
46    SstableBuilderOptions,
47};
48use crate::mem_table::ImmutableMemtable;
49use crate::opts::StorageOpts;
50
51const GC_DELETE_KEYS_FOR_FLUSH: bool = false;
52
53/// Flush shared buffer to level0. Resulted SSTs are grouped by compaction group.
54pub async fn compact(
55    context: CompactorContext,
56    object_id_manager: ObjectIdManagerRef,
57    payload: Vec<ImmutableMemtable>,
58    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
59) -> HummockResult<UploadTaskOutput> {
60    let table_ids_with_old_value: HashSet<TableId> = payload
61        .iter()
62        .filter(|imm| imm.has_old_value())
63        .map(|imm| imm.table_id)
64        .collect();
65    let mut non_log_store_new_value_payload = Vec::with_capacity(payload.len());
66    let mut log_store_new_value_payload = Vec::with_capacity(payload.len());
67    let mut old_value_payload = Vec::with_capacity(payload.len());
68    for imm in payload {
69        if table_ids_with_old_value.contains(&imm.table_id) {
70            if imm.has_old_value() {
71                old_value_payload.push(imm.clone());
72            }
73            log_store_new_value_payload.push(imm);
74        } else {
75            assert!(!imm.has_old_value());
76            non_log_store_new_value_payload.push(imm);
77        }
78    }
79    let non_log_store_new_value_future = async {
80        if non_log_store_new_value_payload.is_empty() {
81            Ok(vec![])
82        } else {
83            compact_shared_buffer::<true>(
84                context.clone(),
85                object_id_manager.clone(),
86                compaction_catalog_manager_ref.clone(),
87                non_log_store_new_value_payload,
88            )
89            .instrument_await("shared_buffer_compact_non_log_store_new_value")
90            .await
91        }
92    };
93
94    let log_store_new_value_future = async {
95        if log_store_new_value_payload.is_empty() {
96            Ok(vec![])
97        } else {
98            compact_shared_buffer::<true>(
99                context.clone(),
100                object_id_manager.clone(),
101                compaction_catalog_manager_ref.clone(),
102                log_store_new_value_payload,
103            )
104            .instrument_await("shared_buffer_compact_log_store_new_value")
105            .await
106        }
107    };
108
109    let old_value_future = async {
110        if old_value_payload.is_empty() {
111            Ok(vec![])
112        } else {
113            compact_shared_buffer::<false>(
114                context.clone(),
115                object_id_manager.clone(),
116                compaction_catalog_manager_ref.clone(),
117                old_value_payload,
118            )
119            .instrument_await("shared_buffer_compact_log_store_old_value")
120            .await
121        }
122    };
123
124    // Note that the output is reordered compared with input `payload`.
125    let ((non_log_store_new_value_ssts, log_store_new_value_ssts), old_value_ssts) = try_join(
126        try_join(non_log_store_new_value_future, log_store_new_value_future),
127        old_value_future,
128    )
129    .await?;
130
131    let mut new_value_ssts = non_log_store_new_value_ssts;
132    new_value_ssts.extend(log_store_new_value_ssts);
133
134    Ok(UploadTaskOutput {
135        new_value_ssts,
136        old_value_ssts,
137        wait_poll_timer: None,
138    })
139}
140
141/// For compaction from shared buffer to level 0, this is the only function gets called.
142///
143/// The `IS_NEW_VALUE` flag means for the given payload, we are doing compaction using its new value or old value.
144/// When `IS_NEW_VALUE` is false, we are compacting with old value, and the payload imms should have `old_values` not `None`
145async fn compact_shared_buffer<const IS_NEW_VALUE: bool>(
146    context: CompactorContext,
147    object_id_manager: ObjectIdManagerRef,
148    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
149    mut payload: Vec<ImmutableMemtable>,
150) -> HummockResult<Vec<LocalSstableInfo>> {
151    if !IS_NEW_VALUE {
152        assert!(payload.iter().all(|imm| imm.has_old_value()));
153    }
154    // Local memory compaction looks at all key ranges.
155    let existing_table_ids: HashSet<TableId> =
156        payload.iter().map(|imm| imm.table_id).dedup().collect();
157    assert!(!existing_table_ids.is_empty());
158
159    let compaction_catalog_agent_ref = compaction_catalog_manager_ref
160        .acquire(existing_table_ids.iter().copied().collect())
161        .await?;
162    let existing_table_ids = compaction_catalog_agent_ref
163        .table_ids()
164        .collect::<HashSet<_>>();
165    payload.retain(|imm| {
166        let ret = existing_table_ids.contains(&imm.table_id);
167        if !ret {
168            error!(
169                "can not find table {:?}, it may be removed by meta-service",
170                imm.table_id
171            );
172        }
173        ret
174    });
175
176    let total_key_count = payload.iter().map(|imm| imm.key_count()).sum::<usize>();
177    let (splits, sub_compaction_sstable_size, table_vnode_partition) =
178        generate_splits(&payload, &existing_table_ids, context.storage_opts.as_ref());
179    let parallelism = splits.len();
180    let mut compact_success = true;
181    let mut output_ssts = Vec::with_capacity(parallelism);
182    let mut compaction_futures = vec![];
183    // Shared buffer compaction always goes to L0. Use block_based_filter when kv_count is large.
184    // Use None to apply the default threshold since shared buffer flush doesn't have a CompactTask.
185    let use_block_based_filter =
186        risingwave_hummock_sdk::filter_utils::is_kv_count_too_large_for_xor16(
187            total_key_count as u64,
188            None,
189        );
190
191    for (split_index, key_range) in splits.into_iter().enumerate() {
192        let compactor = SharedBufferCompactRunner::new(
193            split_index,
194            key_range,
195            context.clone(),
196            sub_compaction_sstable_size as usize,
197            table_vnode_partition.clone(),
198            use_block_based_filter,
199            object_id_manager.clone(),
200        );
201        let mut forward_iters = Vec::with_capacity(payload.len());
202        for imm in &payload {
203            forward_iters.push(imm.clone().into_directed_iter::<Forward, IS_NEW_VALUE>());
204        }
205        let compaction_executor = context.compaction_executor.clone();
206        let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
207        let handle = compaction_executor.spawn({
208            static NEXT_SHARED_BUFFER_COMPACT_ID: LazyLock<AtomicUsize> =
209                LazyLock::new(|| AtomicUsize::new(0));
210            let tree_root = context.await_tree_reg.as_ref().map(|reg| {
211                let id = NEXT_SHARED_BUFFER_COMPACT_ID.fetch_add(1, Relaxed);
212                reg.register(
213                    await_tree_key::CompactSharedBuffer { id },
214                    format!(
215                        "Compact Shared Buffer: {:?}",
216                        payload
217                            .iter()
218                            .flat_map(|imm| imm.epochs().iter())
219                            .copied()
220                            .collect::<BTreeSet<_>>()
221                    ),
222                )
223            });
224            let future = compactor.run(
225                MergeIterator::new(forward_iters),
226                compaction_catalog_agent_ref,
227            );
228            if let Some(root) = tree_root {
229                root.instrument(future).left_future()
230            } else {
231                future.right_future()
232            }
233        });
234        compaction_futures.push(handle);
235    }
236
237    let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
238    let mut err = None;
239    while let Some(future_result) = buffered.next().await {
240        match future_result {
241            Ok(Ok((split_index, ssts, table_stats_map))) => {
242                output_ssts.push((split_index, ssts, table_stats_map));
243            }
244            Ok(Err(e)) => {
245                compact_success = false;
246                tracing::warn!(error = %e.as_report(), "Shared Buffer Compaction failed with error");
247                err = Some(e);
248            }
249            Err(e) => {
250                compact_success = false;
251                tracing::warn!(
252                    error = %e.as_report(),
253                    "Shared Buffer Compaction failed with future error",
254                );
255                err = Some(HummockError::compaction_executor(
256                    "failed while execute in tokio",
257                ));
258            }
259        }
260    }
261
262    // Sort by split/key range index.
263    output_ssts.sort_by_key(|(split_index, ..)| *split_index);
264
265    if compact_success {
266        let mut level0 = Vec::with_capacity(parallelism);
267        let mut sst_infos = vec![];
268        for (_, ssts, _) in output_ssts {
269            for sst_info in &ssts {
270                context
271                    .compactor_metrics
272                    .write_build_l0_bytes
273                    .inc_by(sst_info.file_size());
274
275                sst_infos.push(sst_info.sst_info.clone());
276            }
277            level0.extend(ssts);
278        }
279        if context.storage_opts.check_compaction_result {
280            let compaction_executor = context.compaction_executor.clone();
281            let mut forward_iters = Vec::with_capacity(payload.len());
282            for imm in &payload {
283                if !existing_table_ids.contains(&imm.table_id) {
284                    continue;
285                }
286                forward_iters.push(imm.clone().into_forward_iter());
287            }
288            let iter = MergeIterator::new(forward_iters);
289            let left_iter = UserIterator::new(
290                iter,
291                (Bound::Unbounded, Bound::Unbounded),
292                u64::MAX,
293                0,
294                None,
295            );
296            compaction_executor.spawn(async move {
297                match check_flush_result(
298                    left_iter,
299                    Vec::from_iter(existing_table_ids.iter().cloned()),
300                    sst_infos,
301                    context,
302                )
303                .await
304                {
305                    Err(e) => {
306                        tracing::warn!(error = %e.as_report(), "Failed check flush result of memtable");
307                    }
308                    Ok(true) => (),
309                    Ok(false) => {
310                        panic!(
311                            "failed to check flush result consistency of state-table {:?}",
312                            existing_table_ids
313                        );
314                    }
315                }
316            });
317        }
318        Ok(level0)
319    } else {
320        Err(err.unwrap())
321    }
322}
323
324/// Merge multiple batches into a larger one
325pub async fn merge_imms_in_memory(
326    table_id: TableId,
327    imms: Vec<ImmutableMemtable>,
328) -> ImmutableMemtable {
329    let mut epochs = vec![];
330    let mut merged_size = 0;
331    assert!(imms.iter().rev().map(|imm| imm.batch_id()).is_sorted());
332    let max_imm_id = imms[0].batch_id();
333
334    let has_old_value = imms[0].has_old_value();
335    // TODO: make sure that the corner case on switch_op_consistency is handled
336    // If the imm of a table id contains old value, all other imm of the same table id should have old value
337    assert!(imms.iter().all(|imm| imm.has_old_value() == has_old_value));
338
339    let (old_value_size, global_old_value_size) = if has_old_value {
340        (
341            imms.iter()
342                .map(|imm| imm.old_values().expect("has old value").size)
343                .sum(),
344            Some(
345                imms[0]
346                    .old_values()
347                    .expect("has old value")
348                    .global_old_value_size
349                    .clone(),
350            ),
351        )
352    } else {
353        (0, None)
354    };
355
356    let mut imm_iters = Vec::with_capacity(imms.len());
357    let key_count = imms.iter().map(|imm| imm.key_count()).sum();
358    let value_count = imms.iter().map(|imm| imm.value_count()).sum();
359    for imm in imms {
360        assert!(imm.key_count() > 0, "imm should not be empty");
361        assert_eq!(
362            table_id,
363            imm.table_id(),
364            "should only merge data belonging to the same table"
365        );
366
367        epochs.push(imm.min_epoch());
368        merged_size += imm.size();
369
370        imm_iters.push(imm.into_forward_iter());
371    }
372    epochs.sort();
373
374    // use merge iterator to merge input imms
375    let mut mi = MergeIterator::new(imm_iters);
376    mi.rewind_no_await();
377    assert!(mi.is_valid());
378
379    let first_item_key = mi.current_key_entry().key.clone();
380
381    let mut merged_entries: Vec<SharedBufferKeyEntry> = Vec::with_capacity(key_count);
382    let mut values: Vec<VersionedSharedBufferValue> = Vec::with_capacity(value_count);
383    let mut old_values: Option<Vec<Bytes>> = if has_old_value {
384        Some(Vec::with_capacity(value_count))
385    } else {
386        None
387    };
388
389    merged_entries.push(SharedBufferKeyEntry {
390        key: first_item_key.clone(),
391        value_offset: 0,
392    });
393
394    // Use first key, max epoch to initialize the tracker to ensure that the check first call to full_key_tracker.observe will succeed
395    let mut full_key_tracker = FullKeyTracker::<Bytes>::new(FullKey::new_with_gap_epoch(
396        table_id,
397        first_item_key,
398        EpochWithGap::new_max_epoch(),
399    ));
400
401    while mi.is_valid() {
402        let key_entry = mi.current_key_entry();
403        let user_key = UserKey {
404            table_id,
405            table_key: key_entry.key.clone(),
406        };
407        if full_key_tracker.observe_multi_version(
408            user_key,
409            key_entry
410                .new_values
411                .iter()
412                .map(|(epoch_with_gap, _)| *epoch_with_gap),
413        ) {
414            let last_entry = merged_entries.last_mut().expect("non-empty");
415            if last_entry.value_offset == values.len() {
416                warn!(key = ?last_entry.key, "key has no value in imm compact. skipped");
417                last_entry.key = full_key_tracker.latest_user_key().table_key.clone();
418            } else {
419                // Record kv entries
420                merged_entries.push(SharedBufferKeyEntry {
421                    key: full_key_tracker.latest_user_key().table_key.clone(),
422                    value_offset: values.len(),
423                });
424            }
425        }
426        values.extend(
427            key_entry
428                .new_values
429                .iter()
430                .map(|(epoch_with_gap, value)| (*epoch_with_gap, value.clone())),
431        );
432        if let Some(old_values) = &mut old_values {
433            old_values.extend(key_entry.old_values.expect("should exist").iter().cloned())
434        }
435        mi.advance_peek_to_next_key();
436        // Since there is no blocking point in this method, but it is cpu intensive, we call this method
437        // to do cooperative scheduling
438        tokio::task::consume_budget().await;
439    }
440
441    let old_values = old_values.map(|old_values| {
442        SharedBufferBatchOldValues::new(
443            old_values,
444            old_value_size,
445            global_old_value_size.expect("should exist when has old value"),
446        )
447    });
448
449    SharedBufferBatch {
450        inner: Arc::new(SharedBufferBatchInner::new_with_multi_epoch_batches(
451            epochs,
452            merged_entries,
453            values,
454            old_values,
455            merged_size,
456            max_imm_id,
457        )),
458        table_id,
459    }
460}
461
462///  Based on the incoming payload and opts, calculate the sharding method and sstable size of shared buffer compaction.
463fn generate_splits(
464    payload: &Vec<ImmutableMemtable>,
465    existing_table_ids: &HashSet<TableId>,
466    storage_opts: &StorageOpts,
467) -> (Vec<KeyRange>, u64, BTreeMap<TableId, u32>) {
468    let mut size_and_start_user_keys = vec![];
469    let mut compact_data_size = 0;
470    let mut table_size_infos: HashMap<TableId, u64> = HashMap::default();
471    let mut table_vnode_partition = BTreeMap::default();
472    for imm in payload {
473        let data_size = {
474            // calculate encoded bytes of key var length
475            (imm.value_count() * EPOCH_LEN + imm.size()) as u64
476        };
477        compact_data_size += data_size;
478        size_and_start_user_keys.push((
479            data_size,
480            FullKey {
481                user_key: imm.start_user_key(),
482                epoch_with_gap: EpochWithGap::new_max_epoch(),
483            }
484            .encode(),
485        ));
486        let v = table_size_infos.entry(imm.table_id).or_insert(0);
487        *v += data_size;
488    }
489
490    size_and_start_user_keys
491        .sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref()));
492    let mut splits = Vec::with_capacity(size_and_start_user_keys.len());
493    splits.push(KeyRange::new(Bytes::new(), Bytes::new()));
494    let sstable_size = (storage_opts.sstable_size_mb as u64) << 20;
495    let min_sstable_size = (storage_opts.min_sstable_size_mb as u64) << 20;
496    let parallel_compact_size = (storage_opts.parallel_compact_size_mb as u64) << 20;
497    let parallelism = std::cmp::min(
498        storage_opts.share_buffers_sync_parallelism as u64,
499        size_and_start_user_keys.len() as u64,
500    );
501    let sub_compaction_data_size = if compact_data_size > parallel_compact_size && parallelism > 1 {
502        compact_data_size / parallelism
503    } else {
504        compact_data_size
505    };
506
507    if parallelism > 1 && compact_data_size > sstable_size {
508        let mut last_buffer_size = 0;
509        let mut last_key: Vec<u8> = vec![];
510        for (data_size, key) in size_and_start_user_keys {
511            if last_buffer_size >= sub_compaction_data_size && !last_key.eq(&key) {
512                splits.last_mut().unwrap().right = Bytes::from(key.clone());
513                splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
514                last_buffer_size = data_size;
515            } else {
516                last_buffer_size += data_size;
517            }
518
519            last_key = key;
520        }
521    }
522
523    if compact_data_size > sstable_size {
524        // Meta node will calculate size of each state-table in one task in `risingwave_meta::hummock::manager::compaction::calculate_vnode_partition`.
525        // To make the calculate result more accurately we shall split the large state-table from other small ones.
526        for table_id in existing_table_ids {
527            if let Some(table_size) = table_size_infos.get(table_id)
528                && *table_size > min_sstable_size
529            {
530                table_vnode_partition.insert(*table_id, 1);
531            }
532        }
533    }
534
535    // mul 1.2 for other extra memory usage.
536    // Ensure that the size of each sstable is still less than `sstable_size` after optimization to avoid generating a huge size sstable which will affect the object store
537    let sub_compaction_sstable_size = std::cmp::min(sstable_size, sub_compaction_data_size * 6 / 5);
538    (splits, sub_compaction_sstable_size, table_vnode_partition)
539}
540
541pub struct SharedBufferCompactRunner {
542    compactor: Compactor,
543    split_index: usize,
544}
545
546impl SharedBufferCompactRunner {
547    pub fn new(
548        split_index: usize,
549        key_range: KeyRange,
550        context: CompactorContext,
551        sub_compaction_sstable_size: usize,
552        table_vnode_partition: BTreeMap<TableId, u32>,
553        use_block_based_filter: bool,
554        object_id_getter: Arc<dyn GetObjectId>,
555    ) -> Self {
556        let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
557        options.capacity = sub_compaction_sstable_size;
558        let compactor = Compactor::new(
559            context,
560            options,
561            super::TaskConfig {
562                key_range,
563                cache_policy: CachePolicy::Fill(Hint::Normal),
564                gc_delete_keys: GC_DELETE_KEYS_FOR_FLUSH,
565                retain_multiple_version: true,
566                stats_target_table_ids: None,
567                table_vnode_partition,
568                use_block_based_filter,
569                table_schemas: Default::default(),
570                disable_drop_column_optimization: false,
571            },
572            object_id_getter,
573        );
574        Self {
575            compactor,
576            split_index,
577        }
578    }
579
580    pub async fn run(
581        self,
582        iter: impl HummockIterator<Direction = Forward>,
583        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
584    ) -> HummockResult<CompactOutput> {
585        let dummy_compaction_filter = DummyCompactionFilter {};
586        let (ssts, table_stats_map) = self
587            .compactor
588            .compact_key_range(
589                iter,
590                dummy_compaction_filter,
591                compaction_catalog_agent_ref,
592                None,
593                None,
594                None,
595            )
596            .await?;
597        Ok((self.split_index, ssts, table_stats_map))
598    }
599}
600
601#[cfg(test)]
602mod tests {
603    use std::collections::HashSet;
604
605    use bytes::Bytes;
606    use risingwave_common::catalog::TableId;
607    use risingwave_common::hash::VirtualNode;
608    use risingwave_common::util::epoch::test_epoch;
609    use risingwave_hummock_sdk::key::{TableKey, prefix_slice_with_vnode};
610
611    use crate::hummock::compactor::shared_buffer_compact::generate_splits;
612    use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferValue;
613    use crate::mem_table::ImmutableMemtable;
614    use crate::opts::StorageOpts;
615
616    fn generate_key(key: &str) -> TableKey<Bytes> {
617        TableKey(prefix_slice_with_vnode(
618            VirtualNode::from_index(1),
619            key.as_bytes(),
620        ))
621    }
622
623    #[tokio::test]
624    async fn test_generate_splits_in_order() {
625        let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test(
626            test_epoch(3),
627            0,
628            vec![(
629                generate_key("dddd"),
630                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
631            )],
632            1024 * 1024,
633            TableId::new(1),
634        );
635        let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test(
636            test_epoch(3),
637            0,
638            vec![(
639                generate_key("abb"),
640                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
641            )],
642            (1024 + 256) * 1024,
643            TableId::new(1),
644        );
645
646        let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test(
647            test_epoch(2),
648            0,
649            vec![(
650                generate_key("abc"),
651                SharedBufferValue::Insert(Bytes::from_static(b"v2")),
652            )],
653            (1024 + 512) * 1024,
654            TableId::new(1),
655        );
656        let imm4 = ImmutableMemtable::build_shared_buffer_batch_for_test(
657            test_epoch(3),
658            0,
659            vec![(
660                generate_key("aaa"),
661                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
662            )],
663            (1024 + 512) * 1024,
664            TableId::new(1),
665        );
666
667        let imm5 = ImmutableMemtable::build_shared_buffer_batch_for_test(
668            test_epoch(3),
669            0,
670            vec![(
671                generate_key("aaa"),
672                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
673            )],
674            (1024 + 256) * 1024,
675            TableId::new(2),
676        );
677
678        let storage_opts = StorageOpts {
679            share_buffers_sync_parallelism: 3,
680            parallel_compact_size_mb: 1,
681            sstable_size_mb: 1,
682            ..Default::default()
683        };
684        let payload = vec![imm1, imm2, imm3, imm4, imm5];
685        let (splits, _sstable_capacity, vnodes) = generate_splits(
686            &payload,
687            &HashSet::from_iter([1.into(), 2.into()]),
688            &storage_opts,
689        );
690        assert_eq!(
691            splits.len(),
692            storage_opts.share_buffers_sync_parallelism as usize
693        );
694        assert!(vnodes.is_empty());
695
696        // Basic validation: splits should be continuous and monotonic
697        for i in 1..splits.len() {
698            assert_eq!(splits[i].left, splits[i - 1].right);
699            assert!(splits[i].left > splits[i - 1].left);
700            assert!(splits[i].right.is_empty() || splits[i].left < splits[i].right);
701        }
702    }
703
704    #[tokio::test]
705    async fn test_generate_splits_no_duplicate_keys() {
706        // Create test data with specific ordering to detect sorting issues
707        // Make data sizes large enough to trigger splitting
708        let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test(
709            test_epoch(1),
710            0,
711            vec![(
712                generate_key("zzz"), // This should be last after sorting
713                SharedBufferValue::Insert(Bytes::from_static(b"v1")),
714            )],
715            2 * 1024 * 1024, // 2MB to ensure compact_data_size > sstable_size
716            TableId::new(1),
717        );
718
719        let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test(
720            test_epoch(1),
721            0,
722            vec![(
723                generate_key("aaa"), // This should be first after sorting
724                SharedBufferValue::Insert(Bytes::from_static(b"v1")),
725            )],
726            2 * 1024 * 1024, // 2MB
727            TableId::new(1),
728        );
729
730        let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test(
731            test_epoch(1),
732            0,
733            vec![(
734                generate_key("mmm"), // This should be middle after sorting
735                SharedBufferValue::Insert(Bytes::from_static(b"v1")),
736            )],
737            2 * 1024 * 1024, // 2MB
738            TableId::new(1),
739        );
740
741        let storage_opts = StorageOpts {
742            share_buffers_sync_parallelism: 3, // Enable parallelism
743            parallel_compact_size_mb: 2,       // Small threshold to trigger splitting
744            sstable_size_mb: 1,                // Small SSTable size to trigger condition
745            ..Default::default()
746        };
747
748        // Test with payload in wrong order (zzz, aaa, mmm) instead of sorted order (aaa, mmm, zzz)
749        let payload = vec![imm1, imm2, imm3];
750        let (splits, _sstable_capacity, _vnodes) =
751            generate_splits(&payload, &HashSet::from_iter([1.into()]), &storage_opts);
752
753        // Should have multiple splits due to large data size
754        assert!(
755            splits.len() > 1,
756            "Expected multiple splits, got {}",
757            splits.len()
758        );
759
760        // Verify no key range overlaps between splits
761        for i in 0..splits.len() {
762            for j in (i + 1)..splits.len() {
763                let split_i = &splits[i];
764                let split_j = &splits[j];
765
766                // Check that splits don't overlap
767                if !split_i.right.is_empty() && !split_j.left.is_empty() {
768                    assert!(
769                        split_i.right <= split_j.left || split_j.right <= split_i.left,
770                        "Split {} and {} overlap: [{:?}, {:?}) vs [{:?}, {:?})",
771                        i,
772                        j,
773                        split_i.left,
774                        split_i.right,
775                        split_j.left,
776                        split_j.right
777                    );
778                }
779            }
780        }
781
782        // Additional verification: ensure splits are sorted
783        for i in 1..splits.len() {
784            if !splits[i - 1].right.is_empty() && !splits[i].left.is_empty() {
785                assert!(
786                    splits[i - 1].right <= splits[i].left,
787                    "Splits are not in sorted order at index {}: {:?} > {:?}",
788                    i,
789                    splits[i - 1].right,
790                    splits[i].left
791                );
792            }
793        }
794    }
795}