risingwave_storage/hummock/compactor/
shared_buffer_compact.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::Bound;
17use std::sync::atomic::AtomicUsize;
18use std::sync::atomic::Ordering::Relaxed;
19use std::sync::{Arc, LazyLock};
20
21use await_tree::InstrumentAwait;
22use bytes::Bytes;
23use foyer::Hint;
24use futures::future::try_join;
25use futures::{FutureExt, StreamExt, stream};
26use itertools::Itertools;
27use risingwave_common::catalog::TableId;
28use risingwave_hummock_sdk::key::{EPOCH_LEN, FullKey, FullKeyTracker, UserKey};
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::{EpochWithGap, KeyComparator, LocalSstableInfo};
31use risingwave_pb::hummock::compact_task;
32use thiserror_ext::AsReport;
33use tracing::{error, warn};
34
35use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
36use crate::hummock::compactor::compaction_filter::DummyCompactionFilter;
37use crate::hummock::compactor::context::{CompactorContext, await_tree_key};
38use crate::hummock::compactor::{CompactOutput, Compactor, check_flush_result};
39use crate::hummock::event_handler::uploader::UploadTaskOutput;
40use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator, UserIterator};
41use crate::hummock::shared_buffer::shared_buffer_batch::{
42    SharedBufferBatch, SharedBufferBatchInner, SharedBufferBatchOldValues, SharedBufferKeyEntry,
43    VersionedSharedBufferValue,
44};
45use crate::hummock::{
46    CachePolicy, GetObjectId, HummockError, HummockResult, ObjectIdManagerRef,
47    SstableBuilderOptions,
48};
49use crate::mem_table::ImmutableMemtable;
50use crate::opts::StorageOpts;
51
52const GC_DELETE_KEYS_FOR_FLUSH: bool = false;
53
54/// Flush shared buffer to level0. Resulted SSTs are grouped by compaction group.
55pub async fn compact(
56    context: CompactorContext,
57    object_id_manager: ObjectIdManagerRef,
58    payload: Vec<ImmutableMemtable>,
59    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
60) -> HummockResult<UploadTaskOutput> {
61    let table_ids_with_old_value: HashSet<TableId> = payload
62        .iter()
63        .filter(|imm| imm.has_old_value())
64        .map(|imm| imm.table_id)
65        .collect();
66    let mut non_log_store_new_value_payload = Vec::with_capacity(payload.len());
67    let mut log_store_new_value_payload = Vec::with_capacity(payload.len());
68    let mut old_value_payload = Vec::with_capacity(payload.len());
69    for imm in payload {
70        if table_ids_with_old_value.contains(&imm.table_id) {
71            if imm.has_old_value() {
72                old_value_payload.push(imm.clone());
73            }
74            log_store_new_value_payload.push(imm);
75        } else {
76            assert!(!imm.has_old_value());
77            non_log_store_new_value_payload.push(imm);
78        }
79    }
80    let non_log_store_new_value_future = async {
81        if non_log_store_new_value_payload.is_empty() {
82            Ok(vec![])
83        } else {
84            compact_shared_buffer::<true>(
85                context.clone(),
86                object_id_manager.clone(),
87                compaction_catalog_manager_ref.clone(),
88                non_log_store_new_value_payload,
89            )
90            .instrument_await("shared_buffer_compact_non_log_store_new_value")
91            .await
92        }
93    };
94
95    let log_store_new_value_future = async {
96        if log_store_new_value_payload.is_empty() {
97            Ok(vec![])
98        } else {
99            compact_shared_buffer::<true>(
100                context.clone(),
101                object_id_manager.clone(),
102                compaction_catalog_manager_ref.clone(),
103                log_store_new_value_payload,
104            )
105            .instrument_await("shared_buffer_compact_log_store_new_value")
106            .await
107        }
108    };
109
110    let old_value_future = async {
111        if old_value_payload.is_empty() {
112            Ok(vec![])
113        } else {
114            compact_shared_buffer::<false>(
115                context.clone(),
116                object_id_manager.clone(),
117                compaction_catalog_manager_ref.clone(),
118                old_value_payload,
119            )
120            .instrument_await("shared_buffer_compact_log_store_old_value")
121            .await
122        }
123    };
124
125    // Note that the output is reordered compared with input `payload`.
126    let ((non_log_store_new_value_ssts, log_store_new_value_ssts), old_value_ssts) = try_join(
127        try_join(non_log_store_new_value_future, log_store_new_value_future),
128        old_value_future,
129    )
130    .await?;
131
132    let mut new_value_ssts = non_log_store_new_value_ssts;
133    new_value_ssts.extend(log_store_new_value_ssts);
134
135    Ok(UploadTaskOutput {
136        new_value_ssts,
137        old_value_ssts,
138        wait_poll_timer: None,
139    })
140}
141
142/// For compaction from shared buffer to level 0, this is the only function gets called.
143///
144/// The `IS_NEW_VALUE` flag means for the given payload, we are doing compaction using its new value or old value.
145/// When `IS_NEW_VALUE` is false, we are compacting with old value, and the payload imms should have `old_values` not `None`
146async fn compact_shared_buffer<const IS_NEW_VALUE: bool>(
147    context: CompactorContext,
148    object_id_manager: ObjectIdManagerRef,
149    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
150    mut payload: Vec<ImmutableMemtable>,
151) -> HummockResult<Vec<LocalSstableInfo>> {
152    if !IS_NEW_VALUE {
153        assert!(payload.iter().all(|imm| imm.has_old_value()));
154    }
155    // Local memory compaction looks at all key ranges.
156    let existing_table_ids: HashSet<TableId> =
157        payload.iter().map(|imm| imm.table_id).dedup().collect();
158    assert!(!existing_table_ids.is_empty());
159
160    let compaction_catalog_agent_ref = compaction_catalog_manager_ref
161        .acquire(existing_table_ids.iter().copied().collect())
162        .await?;
163    let existing_table_ids = compaction_catalog_agent_ref
164        .table_ids()
165        .collect::<HashSet<_>>();
166    payload.retain(|imm| {
167        let ret = existing_table_ids.contains(&imm.table_id);
168        if !ret {
169            error!(
170                "can not find table {:?}, it may be removed by meta-service",
171                imm.table_id
172            );
173        }
174        ret
175    });
176
177    let total_key_count = payload.iter().map(|imm| imm.key_count()).sum::<usize>();
178    let (splits, sub_compaction_sstable_size, table_vnode_partition) =
179        generate_splits(&payload, &existing_table_ids, context.storage_opts.as_ref());
180    let parallelism = splits.len();
181    let mut compact_success = true;
182    let mut output_ssts = Vec::with_capacity(parallelism);
183    let mut compaction_futures = vec![];
184    // Shared buffer compaction always goes to L0. Use block_based_filter when kv_count is large.
185    // Use None to apply the default threshold since shared buffer flush doesn't have a CompactTask.
186    let use_block_based_filter =
187        risingwave_hummock_sdk::filter_utils::is_kv_count_too_large_for_xor16(
188            total_key_count as u64,
189            None,
190        );
191
192    for (split_index, key_range) in splits.into_iter().enumerate() {
193        let compactor = SharedBufferCompactRunner::new(
194            split_index,
195            key_range,
196            context.clone(),
197            sub_compaction_sstable_size as usize,
198            table_vnode_partition.clone(),
199            use_block_based_filter,
200            object_id_manager.clone(),
201        );
202        let mut forward_iters = Vec::with_capacity(payload.len());
203        for imm in &payload {
204            forward_iters.push(imm.clone().into_directed_iter::<Forward, IS_NEW_VALUE>());
205        }
206        let compaction_executor = context.compaction_executor.clone();
207        let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
208        let handle = compaction_executor.spawn({
209            static NEXT_SHARED_BUFFER_COMPACT_ID: LazyLock<AtomicUsize> =
210                LazyLock::new(|| AtomicUsize::new(0));
211            let tree_root = context.await_tree_reg.as_ref().map(|reg| {
212                let id = NEXT_SHARED_BUFFER_COMPACT_ID.fetch_add(1, Relaxed);
213                reg.register(
214                    await_tree_key::CompactSharedBuffer { id },
215                    format!(
216                        "Compact Shared Buffer: {:?}",
217                        payload
218                            .iter()
219                            .flat_map(|imm| imm.epochs().iter())
220                            .copied()
221                            .collect::<BTreeSet<_>>()
222                    ),
223                )
224            });
225            let future = compactor.run(
226                MergeIterator::new(forward_iters),
227                compaction_catalog_agent_ref,
228            );
229            if let Some(root) = tree_root {
230                root.instrument(future).left_future()
231            } else {
232                future.right_future()
233            }
234        });
235        compaction_futures.push(handle);
236    }
237
238    let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
239    let mut err = None;
240    while let Some(future_result) = buffered.next().await {
241        match future_result {
242            Ok(Ok((split_index, ssts, table_stats_map))) => {
243                output_ssts.push((split_index, ssts, table_stats_map));
244            }
245            Ok(Err(e)) => {
246                compact_success = false;
247                tracing::warn!(error = %e.as_report(), "Shared Buffer Compaction failed with error");
248                err = Some(e);
249            }
250            Err(e) => {
251                compact_success = false;
252                tracing::warn!(
253                    error = %e.as_report(),
254                    "Shared Buffer Compaction failed with future error",
255                );
256                err = Some(HummockError::compaction_executor(
257                    "failed while execute in tokio",
258                ));
259            }
260        }
261    }
262
263    // Sort by split/key range index.
264    output_ssts.sort_by_key(|(split_index, ..)| *split_index);
265
266    if compact_success {
267        let mut level0 = Vec::with_capacity(parallelism);
268        let mut sst_infos = vec![];
269        for (_, ssts, _) in output_ssts {
270            for sst_info in &ssts {
271                context
272                    .compactor_metrics
273                    .write_build_l0_bytes
274                    .inc_by(sst_info.file_size());
275
276                sst_infos.push(sst_info.sst_info.clone());
277            }
278            level0.extend(ssts);
279        }
280        if context.storage_opts.check_compaction_result {
281            let compaction_executor = context.compaction_executor.clone();
282            let mut forward_iters = Vec::with_capacity(payload.len());
283            for imm in &payload {
284                if !existing_table_ids.contains(&imm.table_id) {
285                    continue;
286                }
287                forward_iters.push(imm.clone().into_forward_iter());
288            }
289            let iter = MergeIterator::new(forward_iters);
290            let left_iter = UserIterator::new(
291                iter,
292                (Bound::Unbounded, Bound::Unbounded),
293                u64::MAX,
294                0,
295                None,
296            );
297            compaction_executor.spawn(async move {
298                match check_flush_result(
299                    left_iter,
300                    Vec::from_iter(existing_table_ids.iter().cloned()),
301                    sst_infos,
302                    context,
303                )
304                .await
305                {
306                    Err(e) => {
307                        tracing::warn!(error = %e.as_report(), "Failed check flush result of memtable");
308                    }
309                    Ok(true) => (),
310                    Ok(false) => {
311                        panic!(
312                            "failed to check flush result consistency of state-table {:?}",
313                            existing_table_ids
314                        );
315                    }
316                }
317            });
318        }
319        Ok(level0)
320    } else {
321        Err(err.unwrap())
322    }
323}
324
325/// Merge multiple batches into a larger one
326pub async fn merge_imms_in_memory(
327    table_id: TableId,
328    imms: Vec<ImmutableMemtable>,
329) -> ImmutableMemtable {
330    let mut epochs = vec![];
331    let mut merged_size = 0;
332    assert!(imms.iter().rev().map(|imm| imm.batch_id()).is_sorted());
333    let max_imm_id = imms[0].batch_id();
334
335    let has_old_value = imms[0].has_old_value();
336    // TODO: make sure that the corner case on switch_op_consistency is handled
337    // If the imm of a table id contains old value, all other imm of the same table id should have old value
338    assert!(imms.iter().all(|imm| imm.has_old_value() == has_old_value));
339
340    let (old_value_size, global_old_value_size) = if has_old_value {
341        (
342            imms.iter()
343                .map(|imm| imm.old_values().expect("has old value").size)
344                .sum(),
345            Some(
346                imms[0]
347                    .old_values()
348                    .expect("has old value")
349                    .global_old_value_size
350                    .clone(),
351            ),
352        )
353    } else {
354        (0, None)
355    };
356
357    let mut imm_iters = Vec::with_capacity(imms.len());
358    let key_count = imms.iter().map(|imm| imm.key_count()).sum();
359    let value_count = imms.iter().map(|imm| imm.value_count()).sum();
360    for imm in imms {
361        assert!(imm.key_count() > 0, "imm should not be empty");
362        assert_eq!(
363            table_id,
364            imm.table_id(),
365            "should only merge data belonging to the same table"
366        );
367
368        epochs.push(imm.min_epoch());
369        merged_size += imm.size();
370
371        imm_iters.push(imm.into_forward_iter());
372    }
373    epochs.sort();
374
375    // use merge iterator to merge input imms
376    let mut mi = MergeIterator::new(imm_iters);
377    mi.rewind_no_await();
378    assert!(mi.is_valid());
379
380    let first_item_key = mi.current_key_entry().key.clone();
381
382    let mut merged_entries: Vec<SharedBufferKeyEntry> = Vec::with_capacity(key_count);
383    let mut values: Vec<VersionedSharedBufferValue> = Vec::with_capacity(value_count);
384    let mut old_values: Option<Vec<Bytes>> = if has_old_value {
385        Some(Vec::with_capacity(value_count))
386    } else {
387        None
388    };
389
390    merged_entries.push(SharedBufferKeyEntry {
391        key: first_item_key.clone(),
392        value_offset: 0,
393    });
394
395    // Use first key, max epoch to initialize the tracker to ensure that the check first call to full_key_tracker.observe will succeed
396    let mut full_key_tracker = FullKeyTracker::<Bytes>::new(FullKey::new_with_gap_epoch(
397        table_id,
398        first_item_key,
399        EpochWithGap::new_max_epoch(),
400    ));
401
402    while mi.is_valid() {
403        let key_entry = mi.current_key_entry();
404        let user_key = UserKey {
405            table_id,
406            table_key: key_entry.key.clone(),
407        };
408        if full_key_tracker.observe_multi_version(
409            user_key,
410            key_entry
411                .new_values
412                .iter()
413                .map(|(epoch_with_gap, _)| *epoch_with_gap),
414        ) {
415            let last_entry = merged_entries.last_mut().expect("non-empty");
416            if last_entry.value_offset == values.len() {
417                warn!(key = ?last_entry.key, "key has no value in imm compact. skipped");
418                last_entry.key = full_key_tracker.latest_user_key().table_key.clone();
419            } else {
420                // Record kv entries
421                merged_entries.push(SharedBufferKeyEntry {
422                    key: full_key_tracker.latest_user_key().table_key.clone(),
423                    value_offset: values.len(),
424                });
425            }
426        }
427        values.extend(
428            key_entry
429                .new_values
430                .iter()
431                .map(|(epoch_with_gap, value)| (*epoch_with_gap, value.clone())),
432        );
433        if let Some(old_values) = &mut old_values {
434            old_values.extend(key_entry.old_values.expect("should exist").iter().cloned())
435        }
436        mi.advance_peek_to_next_key();
437        // Since there is no blocking point in this method, but it is cpu intensive, we call this method
438        // to do cooperative scheduling
439        tokio::task::consume_budget().await;
440    }
441
442    let old_values = old_values.map(|old_values| {
443        SharedBufferBatchOldValues::new(
444            old_values,
445            old_value_size,
446            global_old_value_size.expect("should exist when has old value"),
447        )
448    });
449
450    SharedBufferBatch {
451        inner: Arc::new(SharedBufferBatchInner::new_with_multi_epoch_batches(
452            epochs,
453            merged_entries,
454            values,
455            old_values,
456            merged_size,
457            max_imm_id,
458        )),
459        table_id,
460    }
461}
462
463///  Based on the incoming payload and opts, calculate the sharding method and sstable size of shared buffer compaction.
464fn generate_splits(
465    payload: &Vec<ImmutableMemtable>,
466    existing_table_ids: &HashSet<TableId>,
467    storage_opts: &StorageOpts,
468) -> (Vec<KeyRange>, u64, BTreeMap<TableId, u32>) {
469    let mut size_and_start_user_keys = vec![];
470    let mut compact_data_size = 0;
471    let mut table_size_infos: HashMap<TableId, u64> = HashMap::default();
472    let mut table_vnode_partition = BTreeMap::default();
473    for imm in payload {
474        let data_size = {
475            // calculate encoded bytes of key var length
476            (imm.value_count() * EPOCH_LEN + imm.size()) as u64
477        };
478        compact_data_size += data_size;
479        size_and_start_user_keys.push((
480            data_size,
481            FullKey {
482                user_key: imm.start_user_key(),
483                epoch_with_gap: EpochWithGap::new_max_epoch(),
484            }
485            .encode(),
486        ));
487        let v = table_size_infos.entry(imm.table_id).or_insert(0);
488        *v += data_size;
489    }
490
491    size_and_start_user_keys
492        .sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref()));
493    let mut splits = Vec::with_capacity(size_and_start_user_keys.len());
494    splits.push(KeyRange::new(Bytes::new(), Bytes::new()));
495    let sstable_size = (storage_opts.sstable_size_mb as u64) << 20;
496    let min_sstable_size = (storage_opts.min_sstable_size_mb as u64) << 20;
497    let parallel_compact_size = (storage_opts.parallel_compact_size_mb as u64) << 20;
498    let parallelism = std::cmp::min(
499        storage_opts.share_buffers_sync_parallelism as u64,
500        size_and_start_user_keys.len() as u64,
501    );
502    let sub_compaction_data_size = if compact_data_size > parallel_compact_size && parallelism > 1 {
503        compact_data_size / parallelism
504    } else {
505        compact_data_size
506    };
507
508    if parallelism > 1 && compact_data_size > sstable_size {
509        let mut last_buffer_size = 0;
510        let mut last_key: Vec<u8> = vec![];
511        for (data_size, key) in size_and_start_user_keys {
512            if last_buffer_size >= sub_compaction_data_size && !last_key.eq(&key) {
513                splits.last_mut().unwrap().right = Bytes::from(key.clone());
514                splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
515                last_buffer_size = data_size;
516            } else {
517                last_buffer_size += data_size;
518            }
519
520            last_key = key;
521        }
522    }
523
524    if compact_data_size > sstable_size {
525        // Meta node will calculate size of each state-table in one task in `risingwave_meta::hummock::manager::compaction::calculate_vnode_partition`.
526        // To make the calculate result more accurately we shall split the large state-table from other small ones.
527        for table_id in existing_table_ids {
528            if let Some(table_size) = table_size_infos.get(table_id)
529                && *table_size > min_sstable_size
530            {
531                table_vnode_partition.insert(*table_id, 1);
532            }
533        }
534    }
535
536    // mul 1.2 for other extra memory usage.
537    // Ensure that the size of each sstable is still less than `sstable_size` after optimization to avoid generating a huge size sstable which will affect the object store
538    let sub_compaction_sstable_size = std::cmp::min(sstable_size, sub_compaction_data_size * 6 / 5);
539    (splits, sub_compaction_sstable_size, table_vnode_partition)
540}
541
542pub struct SharedBufferCompactRunner {
543    compactor: Compactor,
544    split_index: usize,
545}
546
547impl SharedBufferCompactRunner {
548    pub fn new(
549        split_index: usize,
550        key_range: KeyRange,
551        context: CompactorContext,
552        sub_compaction_sstable_size: usize,
553        table_vnode_partition: BTreeMap<TableId, u32>,
554        use_block_based_filter: bool,
555        object_id_getter: Arc<dyn GetObjectId>,
556    ) -> Self {
557        let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
558        options.capacity = sub_compaction_sstable_size;
559        let compactor = Compactor::new(
560            context,
561            options,
562            super::TaskConfig {
563                key_range,
564                cache_policy: CachePolicy::Fill(Hint::Normal),
565                gc_delete_keys: GC_DELETE_KEYS_FOR_FLUSH,
566                retain_multiple_version: true,
567                stats_target_table_ids: None,
568                task_type: compact_task::TaskType::SharedBuffer,
569                table_vnode_partition,
570                use_block_based_filter,
571                table_schemas: Default::default(),
572                disable_drop_column_optimization: false,
573            },
574            object_id_getter,
575        );
576        Self {
577            compactor,
578            split_index,
579        }
580    }
581
582    pub async fn run(
583        self,
584        iter: impl HummockIterator<Direction = Forward>,
585        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
586    ) -> HummockResult<CompactOutput> {
587        let dummy_compaction_filter = DummyCompactionFilter {};
588        let (ssts, table_stats_map) = self
589            .compactor
590            .compact_key_range(
591                iter,
592                dummy_compaction_filter,
593                compaction_catalog_agent_ref,
594                None,
595                None,
596                None,
597            )
598            .await?;
599        Ok((self.split_index, ssts, table_stats_map))
600    }
601}
602
603#[cfg(test)]
604mod tests {
605    use std::collections::HashSet;
606
607    use bytes::Bytes;
608    use risingwave_common::catalog::TableId;
609    use risingwave_common::hash::VirtualNode;
610    use risingwave_common::util::epoch::test_epoch;
611    use risingwave_hummock_sdk::key::{TableKey, prefix_slice_with_vnode};
612
613    use crate::hummock::compactor::shared_buffer_compact::generate_splits;
614    use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferValue;
615    use crate::mem_table::ImmutableMemtable;
616    use crate::opts::StorageOpts;
617
618    fn generate_key(key: &str) -> TableKey<Bytes> {
619        TableKey(prefix_slice_with_vnode(
620            VirtualNode::from_index(1),
621            key.as_bytes(),
622        ))
623    }
624
625    #[tokio::test]
626    async fn test_generate_splits_in_order() {
627        let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test(
628            test_epoch(3),
629            0,
630            vec![(
631                generate_key("dddd"),
632                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
633            )],
634            1024 * 1024,
635            TableId::new(1),
636        );
637        let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test(
638            test_epoch(3),
639            0,
640            vec![(
641                generate_key("abb"),
642                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
643            )],
644            (1024 + 256) * 1024,
645            TableId::new(1),
646        );
647
648        let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test(
649            test_epoch(2),
650            0,
651            vec![(
652                generate_key("abc"),
653                SharedBufferValue::Insert(Bytes::from_static(b"v2")),
654            )],
655            (1024 + 512) * 1024,
656            TableId::new(1),
657        );
658        let imm4 = ImmutableMemtable::build_shared_buffer_batch_for_test(
659            test_epoch(3),
660            0,
661            vec![(
662                generate_key("aaa"),
663                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
664            )],
665            (1024 + 512) * 1024,
666            TableId::new(1),
667        );
668
669        let imm5 = ImmutableMemtable::build_shared_buffer_batch_for_test(
670            test_epoch(3),
671            0,
672            vec![(
673                generate_key("aaa"),
674                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
675            )],
676            (1024 + 256) * 1024,
677            TableId::new(2),
678        );
679
680        let storage_opts = StorageOpts {
681            share_buffers_sync_parallelism: 3,
682            parallel_compact_size_mb: 1,
683            sstable_size_mb: 1,
684            ..Default::default()
685        };
686        let payload = vec![imm1, imm2, imm3, imm4, imm5];
687        let (splits, _sstable_capacity, vnodes) = generate_splits(
688            &payload,
689            &HashSet::from_iter([1.into(), 2.into()]),
690            &storage_opts,
691        );
692        assert_eq!(
693            splits.len(),
694            storage_opts.share_buffers_sync_parallelism as usize
695        );
696        assert!(vnodes.is_empty());
697
698        // Basic validation: splits should be continuous and monotonic
699        for i in 1..splits.len() {
700            assert_eq!(splits[i].left, splits[i - 1].right);
701            assert!(splits[i].left > splits[i - 1].left);
702            assert!(splits[i].right.is_empty() || splits[i].left < splits[i].right);
703        }
704    }
705
706    #[tokio::test]
707    async fn test_generate_splits_no_duplicate_keys() {
708        // Create test data with specific ordering to detect sorting issues
709        // Make data sizes large enough to trigger splitting
710        let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test(
711            test_epoch(1),
712            0,
713            vec![(
714                generate_key("zzz"), // This should be last after sorting
715                SharedBufferValue::Insert(Bytes::from_static(b"v1")),
716            )],
717            2 * 1024 * 1024, // 2MB to ensure compact_data_size > sstable_size
718            TableId::new(1),
719        );
720
721        let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test(
722            test_epoch(1),
723            0,
724            vec![(
725                generate_key("aaa"), // This should be first after sorting
726                SharedBufferValue::Insert(Bytes::from_static(b"v1")),
727            )],
728            2 * 1024 * 1024, // 2MB
729            TableId::new(1),
730        );
731
732        let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test(
733            test_epoch(1),
734            0,
735            vec![(
736                generate_key("mmm"), // This should be middle after sorting
737                SharedBufferValue::Insert(Bytes::from_static(b"v1")),
738            )],
739            2 * 1024 * 1024, // 2MB
740            TableId::new(1),
741        );
742
743        let storage_opts = StorageOpts {
744            share_buffers_sync_parallelism: 3, // Enable parallelism
745            parallel_compact_size_mb: 2,       // Small threshold to trigger splitting
746            sstable_size_mb: 1,                // Small SSTable size to trigger condition
747            ..Default::default()
748        };
749
750        // Test with payload in wrong order (zzz, aaa, mmm) instead of sorted order (aaa, mmm, zzz)
751        let payload = vec![imm1, imm2, imm3];
752        let (splits, _sstable_capacity, _vnodes) =
753            generate_splits(&payload, &HashSet::from_iter([1.into()]), &storage_opts);
754
755        // Should have multiple splits due to large data size
756        assert!(
757            splits.len() > 1,
758            "Expected multiple splits, got {}",
759            splits.len()
760        );
761
762        // Verify no key range overlaps between splits
763        for i in 0..splits.len() {
764            for j in (i + 1)..splits.len() {
765                let split_i = &splits[i];
766                let split_j = &splits[j];
767
768                // Check that splits don't overlap
769                if !split_i.right.is_empty() && !split_j.left.is_empty() {
770                    assert!(
771                        split_i.right <= split_j.left || split_j.right <= split_i.left,
772                        "Split {} and {} overlap: [{:?}, {:?}) vs [{:?}, {:?})",
773                        i,
774                        j,
775                        split_i.left,
776                        split_i.right,
777                        split_j.left,
778                        split_j.right
779                    );
780                }
781            }
782        }
783
784        // Additional verification: ensure splits are sorted
785        for i in 1..splits.len() {
786            if !splits[i - 1].right.is_empty() && !splits[i].left.is_empty() {
787                assert!(
788                    splits[i - 1].right <= splits[i].left,
789                    "Splits are not in sorted order at index {}: {:?} > {:?}",
790                    i,
791                    splits[i - 1].right,
792                    splits[i].left
793                );
794            }
795        }
796    }
797}