risingwave_storage/hummock/compactor/
shared_buffer_compact.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::Bound;
17use std::sync::atomic::AtomicUsize;
18use std::sync::atomic::Ordering::Relaxed;
19use std::sync::{Arc, LazyLock};
20
21use await_tree::InstrumentAwait;
22use bytes::Bytes;
23use foyer::CacheHint;
24use futures::future::try_join;
25use futures::{FutureExt, StreamExt, stream};
26use itertools::Itertools;
27use risingwave_common::catalog::TableId;
28use risingwave_hummock_sdk::key::{EPOCH_LEN, FullKey, FullKeyTracker, UserKey};
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo};
31use risingwave_pb::hummock::compact_task;
32use thiserror_ext::AsReport;
33use tracing::{error, warn};
34
35use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
36use crate::hummock::compactor::compaction_filter::DummyCompactionFilter;
37use crate::hummock::compactor::context::{CompactorContext, await_tree_key};
38use crate::hummock::compactor::{CompactOutput, Compactor, check_flush_result};
39use crate::hummock::event_handler::uploader::UploadTaskOutput;
40use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator, UserIterator};
41use crate::hummock::shared_buffer::shared_buffer_batch::{
42    SharedBufferBatch, SharedBufferBatchInner, SharedBufferBatchOldValues, SharedBufferKeyEntry,
43    VersionedSharedBufferValue,
44};
45use crate::hummock::utils::MemoryTracker;
46use crate::hummock::{
47    BlockedXor16FilterBuilder, CachePolicy, GetObjectId, HummockError, HummockResult,
48    SstableBuilderOptions, SstableObjectIdManagerRef,
49};
50use crate::mem_table::ImmutableMemtable;
51use crate::opts::StorageOpts;
52
53const GC_DELETE_KEYS_FOR_FLUSH: bool = false;
54
55/// Flush shared buffer to level0. Resulted SSTs are grouped by compaction group.
56pub async fn compact(
57    context: CompactorContext,
58    sstable_object_id_manager: SstableObjectIdManagerRef,
59    payload: Vec<ImmutableMemtable>,
60    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
61) -> HummockResult<UploadTaskOutput> {
62    let table_ids_with_old_value: HashSet<TableId> = payload
63        .iter()
64        .filter(|imm| imm.has_old_value())
65        .map(|imm| imm.table_id)
66        .collect();
67    let mut non_log_store_new_value_payload = Vec::with_capacity(payload.len());
68    let mut log_store_new_value_payload = Vec::with_capacity(payload.len());
69    let mut old_value_payload = Vec::with_capacity(payload.len());
70    for imm in payload {
71        if table_ids_with_old_value.contains(&imm.table_id) {
72            if imm.has_old_value() {
73                old_value_payload.push(imm.clone());
74            }
75            log_store_new_value_payload.push(imm);
76        } else {
77            assert!(!imm.has_old_value());
78            non_log_store_new_value_payload.push(imm);
79        }
80    }
81    let non_log_store_new_value_future = async {
82        if non_log_store_new_value_payload.is_empty() {
83            Ok(vec![])
84        } else {
85            compact_shared_buffer::<true>(
86                context.clone(),
87                sstable_object_id_manager.clone(),
88                compaction_catalog_manager_ref.clone(),
89                non_log_store_new_value_payload,
90            )
91            .instrument_await("shared_buffer_compact_non_log_store_new_value")
92            .await
93        }
94    };
95
96    let log_store_new_value_future = async {
97        if log_store_new_value_payload.is_empty() {
98            Ok(vec![])
99        } else {
100            compact_shared_buffer::<true>(
101                context.clone(),
102                sstable_object_id_manager.clone(),
103                compaction_catalog_manager_ref.clone(),
104                log_store_new_value_payload,
105            )
106            .instrument_await("shared_buffer_compact_log_store_new_value")
107            .await
108        }
109    };
110
111    let old_value_future = async {
112        if old_value_payload.is_empty() {
113            Ok(vec![])
114        } else {
115            compact_shared_buffer::<false>(
116                context.clone(),
117                sstable_object_id_manager.clone(),
118                compaction_catalog_manager_ref.clone(),
119                old_value_payload,
120            )
121            .instrument_await("shared_buffer_compact_log_store_old_value")
122            .await
123        }
124    };
125
126    // Note that the output is reordered compared with input `payload`.
127    let ((non_log_store_new_value_ssts, log_store_new_value_ssts), old_value_ssts) = try_join(
128        try_join(non_log_store_new_value_future, log_store_new_value_future),
129        old_value_future,
130    )
131    .await?;
132
133    let mut new_value_ssts = non_log_store_new_value_ssts;
134    new_value_ssts.extend(log_store_new_value_ssts);
135
136    Ok(UploadTaskOutput {
137        new_value_ssts,
138        old_value_ssts,
139        wait_poll_timer: None,
140    })
141}
142
143/// For compaction from shared buffer to level 0, this is the only function gets called.
144///
145/// The `IS_NEW_VALUE` flag means for the given payload, we are doing compaction using its new value or old value.
146/// When `IS_NEW_VALUE` is false, we are compacting with old value, and the payload imms should have `old_values` not `None`
147async fn compact_shared_buffer<const IS_NEW_VALUE: bool>(
148    context: CompactorContext,
149    sstable_object_id_manager: SstableObjectIdManagerRef,
150    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
151    mut payload: Vec<ImmutableMemtable>,
152) -> HummockResult<Vec<LocalSstableInfo>> {
153    if !IS_NEW_VALUE {
154        assert!(payload.iter().all(|imm| imm.has_old_value()));
155    }
156    // Local memory compaction looks at all key ranges.
157    let existing_table_ids: HashSet<u32> = payload
158        .iter()
159        .map(|imm| imm.table_id.table_id)
160        .dedup()
161        .collect();
162    assert!(!existing_table_ids.is_empty());
163
164    let compaction_catalog_agent_ref = compaction_catalog_manager_ref
165        .acquire(existing_table_ids.iter().copied().collect())
166        .await?;
167    let existing_table_ids = compaction_catalog_agent_ref
168        .table_ids()
169        .collect::<HashSet<_>>();
170    payload.retain(|imm| {
171        let ret = existing_table_ids.contains(&imm.table_id.table_id);
172        if !ret {
173            error!(
174                "can not find table {:?}, it may be removed by meta-service",
175                imm.table_id
176            );
177        }
178        ret
179    });
180
181    let total_key_count = payload.iter().map(|imm| imm.key_count()).sum::<usize>();
182    let (splits, sub_compaction_sstable_size, table_vnode_partition) =
183        generate_splits(&payload, &existing_table_ids, context.storage_opts.as_ref());
184    let parallelism = splits.len();
185    let mut compact_success = true;
186    let mut output_ssts = Vec::with_capacity(parallelism);
187    let mut compaction_futures = vec![];
188    let use_block_based_filter = BlockedXor16FilterBuilder::is_kv_count_too_large(total_key_count);
189
190    for (split_index, key_range) in splits.into_iter().enumerate() {
191        let compactor = SharedBufferCompactRunner::new(
192            split_index,
193            key_range,
194            context.clone(),
195            sub_compaction_sstable_size as usize,
196            table_vnode_partition.clone(),
197            use_block_based_filter,
198            Box::new(sstable_object_id_manager.clone()),
199        );
200        let mut forward_iters = Vec::with_capacity(payload.len());
201        for imm in &payload {
202            forward_iters.push(imm.clone().into_directed_iter::<Forward, IS_NEW_VALUE>());
203        }
204        let compaction_executor = context.compaction_executor.clone();
205        let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
206        let handle = compaction_executor.spawn({
207            static NEXT_SHARED_BUFFER_COMPACT_ID: LazyLock<AtomicUsize> =
208                LazyLock::new(|| AtomicUsize::new(0));
209            let tree_root = context.await_tree_reg.as_ref().map(|reg| {
210                let id = NEXT_SHARED_BUFFER_COMPACT_ID.fetch_add(1, Relaxed);
211                reg.register(
212                    await_tree_key::CompactSharedBuffer { id },
213                    format!(
214                        "Compact Shared Buffer: {:?}",
215                        payload
216                            .iter()
217                            .flat_map(|imm| imm.epochs().iter())
218                            .copied()
219                            .collect::<BTreeSet<_>>()
220                    ),
221                )
222            });
223            let future = compactor.run(
224                MergeIterator::new(forward_iters),
225                compaction_catalog_agent_ref,
226            );
227            if let Some(root) = tree_root {
228                root.instrument(future).left_future()
229            } else {
230                future.right_future()
231            }
232        });
233        compaction_futures.push(handle);
234    }
235
236    let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
237    let mut err = None;
238    while let Some(future_result) = buffered.next().await {
239        match future_result {
240            Ok(Ok((split_index, ssts, table_stats_map))) => {
241                output_ssts.push((split_index, ssts, table_stats_map));
242            }
243            Ok(Err(e)) => {
244                compact_success = false;
245                tracing::warn!(error = %e.as_report(), "Shared Buffer Compaction failed with error");
246                err = Some(e);
247            }
248            Err(e) => {
249                compact_success = false;
250                tracing::warn!(
251                    error = %e.as_report(),
252                    "Shared Buffer Compaction failed with future error",
253                );
254                err = Some(HummockError::compaction_executor(
255                    "failed while execute in tokio",
256                ));
257            }
258        }
259    }
260
261    // Sort by split/key range index.
262    output_ssts.sort_by_key(|(split_index, ..)| *split_index);
263
264    if compact_success {
265        let mut level0 = Vec::with_capacity(parallelism);
266        let mut sst_infos = vec![];
267        for (_, ssts, _) in output_ssts {
268            for sst_info in &ssts {
269                context
270                    .compactor_metrics
271                    .write_build_l0_bytes
272                    .inc_by(sst_info.file_size());
273
274                sst_infos.push(sst_info.sst_info.clone());
275            }
276            level0.extend(ssts);
277        }
278        if context.storage_opts.check_compaction_result {
279            let compaction_executor = context.compaction_executor.clone();
280            let mut forward_iters = Vec::with_capacity(payload.len());
281            for imm in &payload {
282                if !existing_table_ids.contains(&imm.table_id.table_id) {
283                    continue;
284                }
285                forward_iters.push(imm.clone().into_forward_iter());
286            }
287            let iter = MergeIterator::new(forward_iters);
288            let left_iter = UserIterator::new(
289                iter,
290                (Bound::Unbounded, Bound::Unbounded),
291                u64::MAX,
292                0,
293                None,
294            );
295            compaction_executor.spawn(async move {
296                match check_flush_result(
297                    left_iter,
298                    Vec::from_iter(existing_table_ids.iter().cloned()),
299                    sst_infos,
300                    context,
301                )
302                .await
303                {
304                    Err(e) => {
305                        tracing::warn!(error = %e.as_report(), "Failed check flush result of memtable");
306                    }
307                    Ok(true) => (),
308                    Ok(false) => {
309                        panic!(
310                            "failed to check flush result consistency of state-table {:?}",
311                            existing_table_ids
312                        );
313                    }
314                }
315            });
316        }
317        Ok(level0)
318    } else {
319        Err(err.unwrap())
320    }
321}
322
323/// Merge multiple batches into a larger one
324pub async fn merge_imms_in_memory(
325    table_id: TableId,
326    imms: Vec<ImmutableMemtable>,
327    memory_tracker: Option<MemoryTracker>,
328) -> ImmutableMemtable {
329    let mut epochs = vec![];
330    let mut merged_size = 0;
331    assert!(imms.iter().rev().map(|imm| imm.batch_id()).is_sorted());
332    let max_imm_id = imms[0].batch_id();
333
334    let has_old_value = imms[0].has_old_value();
335    // TODO: make sure that the corner case on switch_op_consistency is handled
336    // If the imm of a table id contains old value, all other imm of the same table id should have old value
337    assert!(imms.iter().all(|imm| imm.has_old_value() == has_old_value));
338
339    let (old_value_size, global_old_value_size) = if has_old_value {
340        (
341            imms.iter()
342                .map(|imm| imm.old_values().expect("has old value").size)
343                .sum(),
344            Some(
345                imms[0]
346                    .old_values()
347                    .expect("has old value")
348                    .global_old_value_size
349                    .clone(),
350            ),
351        )
352    } else {
353        (0, None)
354    };
355
356    let mut imm_iters = Vec::with_capacity(imms.len());
357    let key_count = imms.iter().map(|imm| imm.key_count()).sum();
358    let value_count = imms.iter().map(|imm| imm.value_count()).sum();
359    for imm in imms {
360        assert!(imm.key_count() > 0, "imm should not be empty");
361        assert_eq!(
362            table_id,
363            imm.table_id(),
364            "should only merge data belonging to the same table"
365        );
366
367        epochs.push(imm.min_epoch());
368        merged_size += imm.size();
369
370        imm_iters.push(imm.into_forward_iter());
371    }
372    epochs.sort();
373
374    // use merge iterator to merge input imms
375    let mut mi = MergeIterator::new(imm_iters);
376    mi.rewind_no_await();
377    assert!(mi.is_valid());
378
379    let first_item_key = mi.current_key_entry().key.clone();
380
381    let mut merged_entries: Vec<SharedBufferKeyEntry> = Vec::with_capacity(key_count);
382    let mut values: Vec<VersionedSharedBufferValue> = Vec::with_capacity(value_count);
383    let mut old_values: Option<Vec<Bytes>> = if has_old_value {
384        Some(Vec::with_capacity(value_count))
385    } else {
386        None
387    };
388
389    merged_entries.push(SharedBufferKeyEntry {
390        key: first_item_key.clone(),
391        value_offset: 0,
392    });
393
394    // Use first key, max epoch to initialize the tracker to ensure that the check first call to full_key_tracker.observe will succeed
395    let mut full_key_tracker = FullKeyTracker::<Bytes>::new(FullKey::new_with_gap_epoch(
396        table_id,
397        first_item_key,
398        EpochWithGap::new_max_epoch(),
399    ));
400
401    while mi.is_valid() {
402        let key_entry = mi.current_key_entry();
403        let user_key = UserKey {
404            table_id,
405            table_key: key_entry.key.clone(),
406        };
407        if full_key_tracker.observe_multi_version(
408            user_key,
409            key_entry
410                .new_values
411                .iter()
412                .map(|(epoch_with_gap, _)| *epoch_with_gap),
413        ) {
414            let last_entry = merged_entries.last_mut().expect("non-empty");
415            if last_entry.value_offset == values.len() {
416                warn!(key = ?last_entry.key, "key has no value in imm compact. skipped");
417                last_entry.key = full_key_tracker.latest_user_key().table_key.clone();
418            } else {
419                // Record kv entries
420                merged_entries.push(SharedBufferKeyEntry {
421                    key: full_key_tracker.latest_user_key().table_key.clone(),
422                    value_offset: values.len(),
423                });
424            }
425        }
426        values.extend(
427            key_entry
428                .new_values
429                .iter()
430                .map(|(epoch_with_gap, value)| (*epoch_with_gap, value.clone())),
431        );
432        if let Some(old_values) = &mut old_values {
433            old_values.extend(key_entry.old_values.expect("should exist").iter().cloned())
434        }
435        mi.advance_peek_to_next_key();
436        // Since there is no blocking point in this method, but it is cpu intensive, we call this method
437        // to do cooperative scheduling
438        tokio::task::consume_budget().await;
439    }
440
441    let old_values = old_values.map(|old_values| {
442        SharedBufferBatchOldValues::new(
443            old_values,
444            old_value_size,
445            global_old_value_size.expect("should exist when has old value"),
446        )
447    });
448
449    SharedBufferBatch {
450        inner: Arc::new(SharedBufferBatchInner::new_with_multi_epoch_batches(
451            epochs,
452            merged_entries,
453            values,
454            old_values,
455            merged_size,
456            max_imm_id,
457            memory_tracker,
458        )),
459        table_id,
460    }
461}
462
463///  Based on the incoming payload and opts, calculate the sharding method and sstable size of shared buffer compaction.
464fn generate_splits(
465    payload: &Vec<ImmutableMemtable>,
466    existing_table_ids: &HashSet<u32>,
467    storage_opts: &StorageOpts,
468) -> (Vec<KeyRange>, u64, BTreeMap<u32, u32>) {
469    let mut size_and_start_user_keys = vec![];
470    let mut compact_data_size = 0;
471    let mut table_size_infos: HashMap<u32, u64> = HashMap::default();
472    let mut table_vnode_partition = BTreeMap::default();
473    for imm in payload {
474        let data_size = {
475            // calculate encoded bytes of key var length
476            (imm.value_count() * EPOCH_LEN + imm.size()) as u64
477        };
478        compact_data_size += data_size;
479        size_and_start_user_keys.push((data_size, imm.start_user_key()));
480        let v = table_size_infos.entry(imm.table_id.table_id).or_insert(0);
481        *v += data_size;
482    }
483    size_and_start_user_keys.sort_by(|a, b| a.1.cmp(&b.1));
484    let mut splits = Vec::with_capacity(size_and_start_user_keys.len());
485    splits.push(KeyRange::new(Bytes::new(), Bytes::new()));
486    let mut key_split_append = |key_before_last: &Bytes| {
487        splits.last_mut().unwrap().right = key_before_last.clone();
488        splits.push(KeyRange::new(key_before_last.clone(), Bytes::new()));
489    };
490    let sstable_size = (storage_opts.sstable_size_mb as u64) << 20;
491    let min_sstable_size = (storage_opts.min_sstable_size_mb as u64) << 20;
492    let parallel_compact_size = (storage_opts.parallel_compact_size_mb as u64) << 20;
493    let parallelism = std::cmp::min(
494        storage_opts.share_buffers_sync_parallelism as u64,
495        size_and_start_user_keys.len() as u64,
496    );
497    let sub_compaction_data_size = if compact_data_size > parallel_compact_size && parallelism > 1 {
498        compact_data_size / parallelism
499    } else {
500        compact_data_size
501    };
502
503    if existing_table_ids.len() > 1 {
504        if parallelism > 1 && compact_data_size > sstable_size {
505            let mut last_buffer_size = 0;
506            let mut last_user_key: UserKey<Vec<u8>> = UserKey::default();
507            for (data_size, user_key) in size_and_start_user_keys {
508                if last_buffer_size >= sub_compaction_data_size
509                    && last_user_key.as_ref() != user_key
510                {
511                    last_user_key.set(user_key);
512                    key_split_append(
513                        &FullKey {
514                            user_key,
515                            epoch_with_gap: EpochWithGap::new_max_epoch(),
516                        }
517                        .encode()
518                        .into(),
519                    );
520                    last_buffer_size = data_size;
521                } else {
522                    last_user_key.set(user_key);
523                    last_buffer_size += data_size;
524                }
525            }
526        }
527
528        // Meta node will calculate size of each state-table in one task in `risingwave_meta::hummock::manager::compaction::calculate_vnode_partition`.
529        // To make the calculate result more accurately we shall split the large state-table from other small ones.
530        for table_id in existing_table_ids {
531            if let Some(table_size) = table_size_infos.get(table_id)
532                && *table_size > min_sstable_size
533            {
534                table_vnode_partition.insert(*table_id, 1);
535            }
536        }
537    }
538
539    // mul 1.2 for other extra memory usage.
540    // Ensure that the size of each sstable is still less than `sstable_size` after optimization to avoid generating a huge size sstable which will affect the object store
541    let sub_compaction_sstable_size = std::cmp::min(sstable_size, sub_compaction_data_size * 6 / 5);
542    (splits, sub_compaction_sstable_size, table_vnode_partition)
543}
544
545pub struct SharedBufferCompactRunner {
546    compactor: Compactor,
547    split_index: usize,
548}
549
550impl SharedBufferCompactRunner {
551    pub fn new(
552        split_index: usize,
553        key_range: KeyRange,
554        context: CompactorContext,
555        sub_compaction_sstable_size: usize,
556        table_vnode_partition: BTreeMap<u32, u32>,
557        use_block_based_filter: bool,
558        object_id_getter: Box<dyn GetObjectId>,
559    ) -> Self {
560        let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
561        options.capacity = sub_compaction_sstable_size;
562        let compactor = Compactor::new(
563            context,
564            options,
565            super::TaskConfig {
566                key_range,
567                cache_policy: CachePolicy::Fill(CacheHint::Normal),
568                gc_delete_keys: GC_DELETE_KEYS_FOR_FLUSH,
569                retain_multiple_version: true,
570                stats_target_table_ids: None,
571                task_type: compact_task::TaskType::SharedBuffer,
572                table_vnode_partition,
573                use_block_based_filter,
574                table_schemas: Default::default(),
575                disable_drop_column_optimization: false,
576            },
577            object_id_getter,
578        );
579        Self {
580            compactor,
581            split_index,
582        }
583    }
584
585    pub async fn run(
586        self,
587        iter: impl HummockIterator<Direction = Forward>,
588        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
589    ) -> HummockResult<CompactOutput> {
590        let dummy_compaction_filter = DummyCompactionFilter {};
591        let (ssts, table_stats_map) = self
592            .compactor
593            .compact_key_range(
594                iter,
595                dummy_compaction_filter,
596                compaction_catalog_agent_ref,
597                None,
598                None,
599                None,
600            )
601            .await?;
602        Ok((self.split_index, ssts, table_stats_map))
603    }
604}
605
606#[cfg(test)]
607mod tests {
608    use std::collections::HashSet;
609
610    use bytes::Bytes;
611    use risingwave_common::catalog::TableId;
612    use risingwave_common::hash::VirtualNode;
613    use risingwave_common::util::epoch::test_epoch;
614    use risingwave_hummock_sdk::key::{TableKey, prefix_slice_with_vnode};
615
616    use crate::hummock::compactor::shared_buffer_compact::generate_splits;
617    use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferValue;
618    use crate::mem_table::ImmutableMemtable;
619    use crate::opts::StorageOpts;
620
621    fn generate_key(key: &str) -> TableKey<Bytes> {
622        TableKey(prefix_slice_with_vnode(
623            VirtualNode::from_index(1),
624            key.as_bytes(),
625        ))
626    }
627
628    #[tokio::test]
629    async fn test_generate_splits_in_order() {
630        let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test(
631            test_epoch(3),
632            0,
633            vec![(
634                generate_key("dddd"),
635                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
636            )],
637            1024 * 1024,
638            TableId::new(1),
639        );
640        let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test(
641            test_epoch(3),
642            0,
643            vec![(
644                generate_key("abb"),
645                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
646            )],
647            (1024 + 256) * 1024,
648            TableId::new(1),
649        );
650
651        let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test(
652            test_epoch(2),
653            0,
654            vec![(
655                generate_key("abc"),
656                SharedBufferValue::Insert(Bytes::from_static(b"v2")),
657            )],
658            (1024 + 512) * 1024,
659            TableId::new(1),
660        );
661        let imm4 = ImmutableMemtable::build_shared_buffer_batch_for_test(
662            test_epoch(3),
663            0,
664            vec![(
665                generate_key("aaa"),
666                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
667            )],
668            (1024 + 512) * 1024,
669            TableId::new(1),
670        );
671
672        let imm5 = ImmutableMemtable::build_shared_buffer_batch_for_test(
673            test_epoch(3),
674            0,
675            vec![(
676                generate_key("aaa"),
677                SharedBufferValue::Insert(Bytes::from_static(b"v3")),
678            )],
679            (1024 + 256) * 1024,
680            TableId::new(2),
681        );
682
683        let storage_opts = StorageOpts {
684            share_buffers_sync_parallelism: 3,
685            parallel_compact_size_mb: 1,
686            sstable_size_mb: 1,
687            ..Default::default()
688        };
689        let payload = vec![imm1, imm2, imm3, imm4, imm5];
690        let (splits, _sstable_capacity, vnodes) =
691            generate_splits(&payload, &HashSet::from_iter([1, 2]), &storage_opts);
692        assert_eq!(
693            splits.len(),
694            storage_opts.share_buffers_sync_parallelism as usize
695        );
696        assert!(vnodes.is_empty());
697        for i in 1..splits.len() {
698            assert_eq!(splits[i].left, splits[i - 1].right);
699            assert!(splits[i].left > splits[i - 1].left);
700            assert!(splits[i].right.is_empty() || splits[i].left < splits[i].right);
701        }
702    }
703}