risingwave_storage/hummock/shared_buffer/
shared_buffer_batch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::mem::size_of_val;
19use std::ops::Bound::Included;
20use std::ops::{Bound, RangeBounds};
21use std::sync::atomic::AtomicU64;
22use std::sync::atomic::Ordering::Relaxed;
23use std::sync::{Arc, LazyLock};
24
25use bytes::Bytes;
26use prometheus::IntGauge;
27use risingwave_common::catalog::TableId;
28use risingwave_hummock_sdk::EpochWithGap;
29use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange, UserKey};
30
31use crate::hummock::iterator::{
32    Backward, DirectionEnum, Forward, HummockIterator, HummockIteratorDirection, ValueMeta,
33};
34use crate::hummock::utils::{MemoryTracker, range_overlap};
35use crate::hummock::value::HummockValue;
36use crate::hummock::{HummockEpoch, HummockResult};
37use crate::mem_table::ImmId;
38use crate::store::ReadOptions;
39
40#[derive(Clone, Copy, Debug, PartialEq)]
41pub enum SharedBufferValue<T> {
42    Insert(T),
43    Update(T),
44    Delete,
45}
46
47impl<T> SharedBufferValue<T> {
48    fn to_ref(&self) -> SharedBufferValue<&T> {
49        match self {
50            SharedBufferValue::Insert(val) => SharedBufferValue::Insert(val),
51            SharedBufferValue::Update(val) => SharedBufferValue::Update(val),
52            SharedBufferValue::Delete => SharedBufferValue::Delete,
53        }
54    }
55}
56
57impl<T> From<SharedBufferValue<T>> for HummockValue<T> {
58    fn from(val: SharedBufferValue<T>) -> HummockValue<T> {
59        match val {
60            SharedBufferValue::Insert(val) | SharedBufferValue::Update(val) => {
61                HummockValue::Put(val)
62            }
63            SharedBufferValue::Delete => HummockValue::Delete,
64        }
65    }
66}
67
68impl<'a, T: AsRef<[u8]>> SharedBufferValue<&'a T> {
69    pub(crate) fn to_slice(self) -> SharedBufferValue<&'a [u8]> {
70        match self {
71            SharedBufferValue::Insert(val) => SharedBufferValue::Insert(val.as_ref()),
72            SharedBufferValue::Update(val) => SharedBufferValue::Update(val.as_ref()),
73            SharedBufferValue::Delete => SharedBufferValue::Delete,
74        }
75    }
76}
77
78/// The key is `table_key`, which does not contain table id or epoch.
79pub(crate) type SharedBufferItem = (TableKey<Bytes>, SharedBufferValue<Bytes>);
80pub type SharedBufferBatchId = u64;
81
82pub(crate) type VersionedSharedBufferValue = (EpochWithGap, SharedBufferValue<Bytes>);
83
84pub(crate) struct SharedBufferVersionedEntryRef<'a> {
85    pub(crate) key: &'a TableKey<Bytes>,
86    pub(crate) new_values: &'a [VersionedSharedBufferValue],
87    pub(crate) old_values: Option<&'a [Bytes]>,
88}
89
90#[derive(PartialEq, Debug)]
91pub(crate) struct SharedBufferKeyEntry {
92    pub(crate) key: TableKey<Bytes>,
93    /// A shared buffer may contain data from multiple epochs for a specific key.
94    /// The values of all keys are stored together in the field `new_values` of `SharedBufferBatchInner`
95    /// as a single vector. `value_offset` is the starting offset of values of the current `key` in the `new_values` vector.
96    /// The end offset is the `value_offset` of the next entry or the vector end if the current entry is not the last one.
97    pub(crate) value_offset: usize,
98}
99
100impl SharedBufferKeyEntry {
101    /// Return an exclusive offset of the values of key of index `i`
102    fn value_end_offset<'a, T>(
103        i: usize,
104        entries: &'a [SharedBufferKeyEntry],
105        values: &'a [T],
106    ) -> usize {
107        entries
108            .get(i + 1)
109            .map(|entry| entry.value_offset)
110            .unwrap_or(values.len())
111    }
112
113    fn values<'a, T>(i: usize, entries: &'a [SharedBufferKeyEntry], values: &'a [T]) -> &'a [T] {
114        &values[entries[i].value_offset..Self::value_end_offset(i, entries, values)]
115    }
116}
117
118#[derive(Debug)]
119pub(crate) struct SharedBufferBatchOldValues {
120    /// Store the old values. If some, the length should be the same as `new_values`. It contains empty `Bytes` when the
121    /// corresponding `new_value` is `Insert`, and contains the old values of `Update` and `Delete`.
122    values: Vec<Bytes>,
123    pub size: usize,
124    pub global_old_value_size: IntGauge,
125}
126
127impl Drop for SharedBufferBatchOldValues {
128    fn drop(&mut self) {
129        self.global_old_value_size.sub(self.size as _);
130    }
131}
132
133impl SharedBufferBatchOldValues {
134    pub(crate) fn new(values: Vec<Bytes>, size: usize, global_old_value_size: IntGauge) -> Self {
135        global_old_value_size.add(size as _);
136        Self {
137            values,
138            size,
139            global_old_value_size,
140        }
141    }
142
143    pub(crate) fn for_test(values: Vec<Bytes>, size: usize) -> Self {
144        Self::new(values, size, IntGauge::new("test", "test").unwrap())
145    }
146}
147
148#[derive(Debug)]
149pub(crate) struct SharedBufferBatchInner {
150    entries: Vec<SharedBufferKeyEntry>,
151    new_values: Vec<VersionedSharedBufferValue>,
152    old_values: Option<SharedBufferBatchOldValues>,
153    /// The epochs of the data in batch, sorted in ascending order (old to new)
154    epochs: Vec<HummockEpoch>,
155    /// Total size of all key-value items (excluding the `epoch` of value versions)
156    size: usize,
157    _tracker: Option<MemoryTracker>,
158    /// For a batch created from multiple batches, this will be
159    /// the largest batch id among input batches
160    batch_id: SharedBufferBatchId,
161}
162
163impl SharedBufferBatchInner {
164    pub(crate) fn new(
165        epoch: HummockEpoch,
166        spill_offset: u16,
167        payload: Vec<SharedBufferItem>,
168        old_values: Option<SharedBufferBatchOldValues>,
169        size: usize,
170        _tracker: Option<MemoryTracker>,
171    ) -> Self {
172        assert!(!payload.is_empty());
173        debug_assert!(payload.iter().is_sorted_by_key(|(key, _)| key));
174        if let Some(old_values) = &old_values {
175            assert_eq!(old_values.values.len(), payload.len());
176        }
177
178        let epoch_with_gap = EpochWithGap::new(epoch, spill_offset);
179        let mut entries = Vec::with_capacity(payload.len());
180        let mut new_values = Vec::with_capacity(payload.len());
181        for (i, (key, value)) in payload.into_iter().enumerate() {
182            entries.push(SharedBufferKeyEntry {
183                key,
184                value_offset: i,
185            });
186            new_values.push((epoch_with_gap, value));
187        }
188
189        let batch_id = SHARED_BUFFER_BATCH_ID_GENERATOR.fetch_add(1, Relaxed);
190        SharedBufferBatchInner {
191            entries,
192            new_values,
193            old_values,
194            epochs: vec![epoch],
195            size,
196            _tracker,
197            batch_id,
198        }
199    }
200
201    pub fn values(&self, i: usize) -> &[VersionedSharedBufferValue] {
202        SharedBufferKeyEntry::values(i, &self.entries, &self.new_values)
203    }
204
205    #[allow(clippy::too_many_arguments)]
206    pub(crate) fn new_with_multi_epoch_batches(
207        epochs: Vec<HummockEpoch>,
208        entries: Vec<SharedBufferKeyEntry>,
209        new_values: Vec<VersionedSharedBufferValue>,
210        old_values: Option<SharedBufferBatchOldValues>,
211        size: usize,
212        imm_id: ImmId,
213        tracker: Option<MemoryTracker>,
214    ) -> Self {
215        assert!(new_values.len() >= entries.len());
216        assert!(!entries.is_empty());
217        debug_assert!(entries.iter().is_sorted_by_key(|entry| &entry.key));
218        debug_assert!(entries.iter().is_sorted_by_key(|entry| &entry.value_offset));
219        debug_assert!((0..entries.len()).all(|i| {
220            SharedBufferKeyEntry::values(i, &entries, &new_values)
221                .iter()
222                .rev()
223                .is_sorted_by_key(|(epoch_with_gap, _)| epoch_with_gap)
224        }));
225        debug_assert!(!epochs.is_empty());
226        debug_assert!(epochs.is_sorted());
227
228        Self {
229            entries,
230            new_values,
231            old_values,
232            epochs,
233            size,
234            _tracker: tracker,
235            batch_id: imm_id,
236        }
237    }
238
239    /// Return `None` if cannot find a visible version
240    /// Return `HummockValue::Delete` if the key has been deleted by some epoch <= `read_epoch`
241    fn get_value<'a>(
242        &'a self,
243        table_key: TableKey<&[u8]>,
244        read_epoch: HummockEpoch,
245    ) -> Option<(HummockValue<&'a Bytes>, EpochWithGap)> {
246        // Perform binary search on table key to find the corresponding entry
247        if let Ok(i) = self
248            .entries
249            .binary_search_by(|m| (m.key.as_ref()).cmp(*table_key))
250        {
251            let entry = &self.entries[i];
252            assert_eq!(entry.key.as_ref(), *table_key);
253            // Scan to find the first version <= epoch
254            for (e, v) in self.values(i) {
255                // skip invisible versions
256                if read_epoch < e.pure_epoch() {
257                    continue;
258                }
259                return Some((v.to_ref().into(), *e));
260            }
261            // cannot find a visible version
262        }
263
264        None
265    }
266}
267
268impl PartialEq for SharedBufferBatchInner {
269    fn eq(&self, other: &Self) -> bool {
270        self.entries == other.entries && self.new_values == other.new_values
271    }
272}
273
274pub static SHARED_BUFFER_BATCH_ID_GENERATOR: LazyLock<AtomicU64> =
275    LazyLock::new(|| AtomicU64::new(0));
276
277/// A write batch stored in the shared buffer.
278#[derive(Clone, Debug)]
279pub struct SharedBufferBatch {
280    pub(crate) inner: Arc<SharedBufferBatchInner>,
281    pub table_id: TableId,
282}
283
284impl SharedBufferBatch {
285    pub fn for_test(
286        sorted_items: Vec<SharedBufferItem>,
287        epoch: HummockEpoch,
288        table_id: TableId,
289    ) -> Self {
290        Self::for_test_inner(sorted_items, None, epoch, table_id)
291    }
292
293    pub fn for_test_with_old_values(
294        sorted_items: Vec<SharedBufferItem>,
295        old_values: Vec<Bytes>,
296        epoch: HummockEpoch,
297        table_id: TableId,
298    ) -> Self {
299        Self::for_test_inner(sorted_items, Some(old_values), epoch, table_id)
300    }
301
302    fn for_test_inner(
303        sorted_items: Vec<SharedBufferItem>,
304        old_values: Option<Vec<Bytes>>,
305        epoch: HummockEpoch,
306        table_id: TableId,
307    ) -> Self {
308        let (size, old_value_size) = Self::measure_batch_size(&sorted_items, old_values.as_deref());
309
310        let old_values = old_values
311            .map(|old_values| SharedBufferBatchOldValues::for_test(old_values, old_value_size));
312
313        Self {
314            inner: Arc::new(SharedBufferBatchInner::new(
315                epoch,
316                0,
317                sorted_items,
318                old_values,
319                size,
320                None,
321            )),
322            table_id,
323        }
324    }
325
326    pub fn measure_delete_range_size(batch_items: &[(Bound<Bytes>, Bound<Bytes>)]) -> usize {
327        batch_items
328            .iter()
329            .map(|(left, right)| {
330                // is_exclude_left_key(bool) + table_id + epoch
331                let l1 = match left {
332                    Bound::Excluded(x) | Bound::Included(x) => x.len() + 13,
333                    Bound::Unbounded => 13,
334                };
335                let l2 = match right {
336                    Bound::Excluded(x) | Bound::Included(x) => x.len() + 13,
337                    Bound::Unbounded => 13,
338                };
339                l1 + l2
340            })
341            .sum()
342    }
343
344    /// Return (total size, old value size or 0)
345    pub fn measure_batch_size(
346        batch_items: &[SharedBufferItem],
347        old_values: Option<&[Bytes]>,
348    ) -> (usize, usize) {
349        let old_value_size = old_values
350            .iter()
351            .flat_map(|slice| slice.iter().map(|value| size_of_val(value) + value.len()))
352            .sum::<usize>();
353        // size = Sum(length of full key + length of user value)
354        let kv_size = batch_items
355            .iter()
356            .map(|(k, v)| {
357                k.len() + {
358                    match v {
359                        SharedBufferValue::Insert(val) | SharedBufferValue::Update(val) => {
360                            val.len()
361                        }
362                        SharedBufferValue::Delete => 0,
363                    }
364                }
365            })
366            .sum::<usize>();
367        (kv_size + old_value_size, old_value_size)
368    }
369
370    pub fn filter<R, B>(&self, table_id: TableId, table_key_range: &R) -> bool
371    where
372        R: RangeBounds<TableKey<B>>,
373        B: AsRef<[u8]>,
374    {
375        let left = table_key_range
376            .start_bound()
377            .as_ref()
378            .map(|key| TableKey(key.0.as_ref()));
379        let right = table_key_range
380            .end_bound()
381            .as_ref()
382            .map(|key| TableKey(key.0.as_ref()));
383        self.table_id == table_id
384            && range_overlap(
385                &(left, right),
386                &self.start_table_key(),
387                Included(&self.end_table_key()),
388            )
389    }
390
391    pub fn table_id(&self) -> TableId {
392        self.table_id
393    }
394
395    pub fn min_epoch(&self) -> HummockEpoch {
396        *self.inner.epochs.first().unwrap()
397    }
398
399    pub fn max_epoch(&self) -> HummockEpoch {
400        *self.inner.epochs.last().unwrap()
401    }
402
403    pub fn key_count(&self) -> usize {
404        self.inner.entries.len()
405    }
406
407    pub fn value_count(&self) -> usize {
408        self.inner.new_values.len()
409    }
410
411    pub fn has_old_value(&self) -> bool {
412        self.inner.old_values.is_some()
413    }
414
415    pub fn get<'a>(
416        &'a self,
417        table_key: TableKey<&[u8]>,
418        read_epoch: HummockEpoch,
419        _read_options: &ReadOptions,
420    ) -> Option<(HummockValue<&'a Bytes>, EpochWithGap)> {
421        self.inner.get_value(table_key, read_epoch)
422    }
423
424    pub fn range_exists(&self, table_key_range: &TableKeyRange) -> bool {
425        self.inner
426            .entries
427            .binary_search_by(|m| {
428                let key = &m.key;
429                let too_left = match &table_key_range.0 {
430                    std::ops::Bound::Included(range_start) => range_start.as_ref() > key.as_ref(),
431                    std::ops::Bound::Excluded(range_start) => range_start.as_ref() >= key.as_ref(),
432                    std::ops::Bound::Unbounded => false,
433                };
434                if too_left {
435                    return Ordering::Less;
436                }
437
438                let too_right = match &table_key_range.1 {
439                    std::ops::Bound::Included(range_end) => range_end.as_ref() < key.as_ref(),
440                    std::ops::Bound::Excluded(range_end) => range_end.as_ref() <= key.as_ref(),
441                    std::ops::Bound::Unbounded => false,
442                };
443                if too_right {
444                    return Ordering::Greater;
445                }
446
447                Ordering::Equal
448            })
449            .is_ok()
450    }
451
452    pub fn into_directed_iter<D: HummockIteratorDirection, const IS_NEW_VALUE: bool>(
453        self,
454    ) -> SharedBufferBatchIterator<D, IS_NEW_VALUE> {
455        SharedBufferBatchIterator::<D, IS_NEW_VALUE>::new(self.inner, self.table_id)
456    }
457
458    pub fn into_old_value_iter(self) -> SharedBufferBatchIterator<Forward, false> {
459        self.into_directed_iter()
460    }
461
462    pub fn into_forward_iter(self) -> SharedBufferBatchIterator<Forward> {
463        self.into_directed_iter()
464    }
465
466    pub fn into_backward_iter(self) -> SharedBufferBatchIterator<Backward> {
467        self.into_directed_iter()
468    }
469
470    #[inline(always)]
471    pub fn start_table_key(&self) -> TableKey<&[u8]> {
472        TableKey(self.inner.entries.first().expect("non-empty").key.as_ref())
473    }
474
475    #[inline(always)]
476    pub fn end_table_key(&self) -> TableKey<&[u8]> {
477        TableKey(self.inner.entries.last().expect("non-empty").key.as_ref())
478    }
479
480    #[inline(always)]
481    pub fn raw_largest_key(&self) -> &TableKey<Bytes> {
482        &self.inner.entries.last().expect("non-empty").key
483    }
484
485    /// return inclusive left endpoint, which means that all data in this batch should be larger or
486    /// equal than this key.
487    pub fn start_user_key(&self) -> UserKey<&[u8]> {
488        UserKey::new(self.table_id, self.start_table_key())
489    }
490
491    pub fn size(&self) -> usize {
492        self.inner.size
493    }
494
495    pub(crate) fn old_values(&self) -> Option<&SharedBufferBatchOldValues> {
496        self.inner.old_values.as_ref()
497    }
498
499    pub fn batch_id(&self) -> SharedBufferBatchId {
500        self.inner.batch_id
501    }
502
503    pub fn epochs(&self) -> &Vec<HummockEpoch> {
504        &self.inner.epochs
505    }
506
507    pub(crate) fn build_shared_buffer_batch(
508        epoch: HummockEpoch,
509        spill_offset: u16,
510        sorted_items: Vec<SharedBufferItem>,
511        old_values: Option<SharedBufferBatchOldValues>,
512        size: usize,
513        table_id: TableId,
514        tracker: Option<MemoryTracker>,
515    ) -> Self {
516        let inner = SharedBufferBatchInner::new(
517            epoch,
518            spill_offset,
519            sorted_items,
520            old_values,
521            size,
522            tracker,
523        );
524        SharedBufferBatch {
525            inner: Arc::new(inner),
526            table_id,
527        }
528    }
529
530    #[cfg(any(test, feature = "test"))]
531    pub fn build_shared_buffer_batch_for_test(
532        epoch: HummockEpoch,
533        spill_offset: u16,
534        sorted_items: Vec<SharedBufferItem>,
535        size: usize,
536        table_id: TableId,
537    ) -> Self {
538        let inner =
539            SharedBufferBatchInner::new(epoch, spill_offset, sorted_items, None, size, None);
540        SharedBufferBatch {
541            inner: Arc::new(inner),
542            table_id,
543        }
544    }
545}
546
547/// Iterate all the items in the shared buffer batch
548/// If there are multiple versions of a key, the iterator will return all versions
549pub struct SharedBufferBatchIterator<D: HummockIteratorDirection, const IS_NEW_VALUE: bool = true> {
550    inner: Arc<SharedBufferBatchInner>,
551    /// The index of the current entry in the payload
552    current_entry_idx: usize,
553    /// The index of current value
554    current_value_idx: usize,
555    /// The exclusive end offset of the value index of current key.
556    value_end_offset: usize,
557    table_id: TableId,
558    _phantom: PhantomData<D>,
559}
560
561impl<D: HummockIteratorDirection, const IS_NEW_VALUE: bool>
562    SharedBufferBatchIterator<D, IS_NEW_VALUE>
563{
564    pub(crate) fn new(inner: Arc<SharedBufferBatchInner>, table_id: TableId) -> Self {
565        if !IS_NEW_VALUE {
566            assert!(
567                inner.old_values.is_some(),
568                "create old value iter with no old value: {:?}",
569                table_id
570            );
571        }
572        Self {
573            inner,
574            current_entry_idx: 0,
575            current_value_idx: 0,
576            value_end_offset: 0,
577            table_id,
578            _phantom: Default::default(),
579        }
580    }
581
582    fn is_valid_entry_idx(&self) -> bool {
583        self.current_entry_idx < self.inner.entries.len()
584    }
585
586    fn invalidate(&mut self) {
587        self.current_entry_idx = self.inner.entries.len();
588    }
589
590    fn advance_to_next_entry(&mut self) {
591        debug_assert!(self.is_valid_entry_idx());
592        match D::direction() {
593            DirectionEnum::Forward => {
594                self.current_entry_idx += 1;
595            }
596            DirectionEnum::Backward => {
597                if self.current_entry_idx == 0 {
598                    self.invalidate();
599                } else {
600                    self.current_entry_idx -= 1;
601                }
602            }
603        }
604    }
605
606    fn reset_value_idx(&mut self) {
607        debug_assert!(self.is_valid_entry_idx());
608        self.current_value_idx = self.inner.entries[self.current_entry_idx].value_offset;
609        self.value_end_offset = self.get_value_end_offset();
610    }
611
612    fn get_value_end_offset(&self) -> usize {
613        debug_assert!(self.is_valid_entry_idx());
614        SharedBufferKeyEntry::value_end_offset(
615            self.current_entry_idx,
616            &self.inner.entries,
617            &self.inner.new_values,
618        )
619    }
620
621    fn assert_valid_idx(&self) {
622        debug_assert!(self.is_valid_entry_idx());
623        debug_assert!(
624            self.current_value_idx >= self.inner.entries[self.current_entry_idx].value_offset
625        );
626        debug_assert_eq!(self.value_end_offset, self.get_value_end_offset());
627        debug_assert!(self.current_value_idx < self.value_end_offset);
628        if !IS_NEW_VALUE {
629            debug_assert!(!matches!(
630                &self.inner.new_values[self.current_value_idx].1,
631                SharedBufferValue::Insert(_)
632            ));
633        }
634    }
635
636    fn advance_to_next_value(&mut self) {
637        self.assert_valid_idx();
638
639        if self.current_value_idx + 1 < self.value_end_offset {
640            self.current_value_idx += 1;
641        } else {
642            self.advance_to_next_entry();
643            if self.is_valid_entry_idx() {
644                self.reset_value_idx();
645            }
646        }
647    }
648
649    fn advance_until_valid_old_value(&mut self) {
650        debug_assert!(!IS_NEW_VALUE);
651        if !self.is_valid_entry_idx() {
652            return;
653        }
654        loop {
655            while self.current_value_idx < self.value_end_offset
656                && matches!(
657                    &self.inner.new_values[self.current_value_idx].1,
658                    SharedBufferValue::Insert(_)
659                )
660            {
661                self.current_value_idx += 1;
662            }
663            if self.current_value_idx >= self.value_end_offset {
664                debug_assert_eq!(self.current_value_idx, self.value_end_offset);
665                self.advance_to_next_entry();
666                if self.is_valid_entry_idx() {
667                    self.reset_value_idx();
668                    continue;
669                } else {
670                    break;
671                }
672            } else {
673                break;
674            }
675        }
676    }
677}
678
679impl SharedBufferBatchIterator<Forward> {
680    pub(crate) fn advance_to_next_key(&mut self) {
681        self.advance_to_next_entry();
682        if self.is_valid_entry_idx() {
683            self.reset_value_idx();
684        }
685    }
686
687    pub(crate) fn current_key_entry(&self) -> SharedBufferVersionedEntryRef<'_> {
688        self.assert_valid_idx();
689        debug_assert_eq!(
690            self.current_value_idx,
691            self.inner.entries[self.current_entry_idx].value_offset
692        );
693        SharedBufferVersionedEntryRef {
694            key: &self.inner.entries[self.current_entry_idx].key,
695            new_values: &self.inner.new_values[self.current_value_idx..self.value_end_offset],
696            old_values: self.inner.old_values.as_ref().map(|old_values| {
697                &old_values.values[self.current_value_idx..self.value_end_offset]
698            }),
699        }
700    }
701}
702
703impl<D: HummockIteratorDirection, const IS_NEW_VALUE: bool> HummockIterator
704    for SharedBufferBatchIterator<D, IS_NEW_VALUE>
705{
706    type Direction = D;
707
708    async fn next(&mut self) -> HummockResult<()> {
709        self.advance_to_next_value();
710        if !IS_NEW_VALUE {
711            self.advance_until_valid_old_value();
712        }
713        Ok(())
714    }
715
716    fn key(&self) -> FullKey<&[u8]> {
717        self.assert_valid_idx();
718        let key = self.inner.entries[self.current_entry_idx].key.as_ref();
719        let epoch_with_gap = self.inner.new_values[self.current_value_idx].0;
720        FullKey::new_with_gap_epoch(self.table_id, TableKey(key), epoch_with_gap)
721    }
722
723    fn value(&self) -> HummockValue<&[u8]> {
724        self.assert_valid_idx();
725        if IS_NEW_VALUE {
726            self.inner.new_values[self.current_value_idx]
727                .1
728                .to_ref()
729                .to_slice()
730                .into()
731        } else {
732            HummockValue::put(
733                self.inner.old_values.as_ref().unwrap().values[self.current_value_idx].as_ref(),
734            )
735        }
736    }
737
738    fn is_valid(&self) -> bool {
739        self.is_valid_entry_idx()
740    }
741
742    async fn rewind(&mut self) -> HummockResult<()> {
743        match D::direction() {
744            DirectionEnum::Forward => {
745                self.current_entry_idx = 0;
746            }
747            DirectionEnum::Backward => {
748                self.current_entry_idx = self.inner.entries.len() - 1;
749            }
750        };
751        self.reset_value_idx();
752        if !IS_NEW_VALUE {
753            self.advance_until_valid_old_value();
754        }
755        Ok(())
756    }
757
758    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
759        match key.user_key.table_id.cmp(&self.table_id) {
760            Ordering::Less => {
761                match D::direction() {
762                    DirectionEnum::Forward => {
763                        // seek key table id < batch table id, so seek to beginning
764                        self.rewind().await?;
765                    }
766                    DirectionEnum::Backward => {
767                        self.invalidate();
768                        return Ok(());
769                    }
770                };
771            }
772            Ordering::Greater => {
773                match D::direction() {
774                    DirectionEnum::Forward => {
775                        self.invalidate();
776                        return Ok(());
777                    }
778                    DirectionEnum::Backward => {
779                        // seek key table id > batch table id, so seek to end
780                        self.rewind().await?;
781                    }
782                };
783            }
784            Ordering::Equal => (),
785        }
786        // Perform binary search on table key because the items in SharedBufferBatch is ordered
787        // by table key.
788        let partition_point = self
789            .inner
790            .entries
791            .binary_search_by(|probe| probe.key.as_ref().cmp(*key.user_key.table_key));
792        let seek_key_epoch = key.epoch_with_gap;
793        match partition_point {
794            Ok(i) => {
795                self.current_entry_idx = i;
796                self.reset_value_idx();
797                while self.current_value_idx < self.value_end_offset {
798                    let epoch_with_gap = self.inner.new_values[self.current_value_idx].0;
799                    if epoch_with_gap <= seek_key_epoch {
800                        break;
801                    }
802                    self.current_value_idx += 1;
803                }
804                if self.current_value_idx == self.value_end_offset {
805                    self.advance_to_next_entry();
806                    if self.is_valid_entry_idx() {
807                        self.reset_value_idx();
808                    }
809                }
810            }
811            Err(i) => match D::direction() {
812                DirectionEnum::Forward => {
813                    self.current_entry_idx = i;
814                    if self.is_valid_entry_idx() {
815                        self.reset_value_idx();
816                    }
817                }
818                DirectionEnum::Backward => {
819                    if i == 0 {
820                        self.invalidate();
821                    } else {
822                        self.current_entry_idx = i - 1;
823                        self.reset_value_idx();
824                    }
825                }
826            },
827        };
828        if !IS_NEW_VALUE {
829            self.advance_until_valid_old_value();
830        }
831        Ok(())
832    }
833
834    fn collect_local_statistic(&self, _stats: &mut crate::monitor::StoreLocalStatistic) {}
835
836    fn value_meta(&self) -> ValueMeta {
837        ValueMeta::default()
838    }
839}
840
841#[cfg(test)]
842mod tests {
843    use std::ops::Bound::Excluded;
844
845    use itertools::{Itertools, zip_eq};
846    use risingwave_common::util::epoch::{EpochExt, test_epoch};
847    use risingwave_hummock_sdk::key::map_table_key_range;
848
849    use super::*;
850    use crate::hummock::compactor::merge_imms_in_memory;
851    use crate::hummock::iterator::test_utils::{
852        iterator_test_key_of_epoch, iterator_test_table_key_of, transform_shared_buffer,
853    };
854
855    fn to_hummock_value_batch(
856        items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)>,
857    ) -> Vec<(Vec<u8>, HummockValue<Bytes>)> {
858        items.into_iter().map(|(k, v)| (k, v.into())).collect()
859    }
860
861    #[tokio::test]
862    async fn test_shared_buffer_batch_basic() {
863        let epoch = test_epoch(1);
864        let shared_buffer_items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
865            (
866                iterator_test_table_key_of(0),
867                SharedBufferValue::Insert(Bytes::from("value1")),
868            ),
869            (
870                iterator_test_table_key_of(1),
871                SharedBufferValue::Insert(Bytes::from("value1")),
872            ),
873            (
874                iterator_test_table_key_of(2),
875                SharedBufferValue::Insert(Bytes::from("value1")),
876            ),
877        ];
878        let shared_buffer_batch = SharedBufferBatch::for_test(
879            transform_shared_buffer(shared_buffer_items.clone()),
880            epoch,
881            Default::default(),
882        );
883        let shared_buffer_items = to_hummock_value_batch(shared_buffer_items);
884
885        // Sketch
886        assert_eq!(
887            *shared_buffer_batch.start_table_key(),
888            shared_buffer_items[0].0
889        );
890        assert_eq!(
891            *shared_buffer_batch.end_table_key(),
892            shared_buffer_items[2].0
893        );
894
895        // Point lookup
896        for (k, v) in &shared_buffer_items {
897            assert_eq!(
898                shared_buffer_batch
899                    .get(TableKey(k.as_slice()), epoch, &ReadOptions::default())
900                    .unwrap()
901                    .0
902                    .as_slice(),
903                v.as_slice()
904            );
905        }
906        assert_eq!(
907            shared_buffer_batch.get(
908                TableKey(iterator_test_table_key_of(3).as_slice()),
909                epoch,
910                &ReadOptions::default()
911            ),
912            None
913        );
914        assert_eq!(
915            shared_buffer_batch.get(
916                TableKey(iterator_test_table_key_of(4).as_slice()),
917                epoch,
918                &ReadOptions::default()
919            ),
920            None
921        );
922
923        // Forward iterator
924        let mut iter = shared_buffer_batch.clone().into_forward_iter();
925        iter.rewind().await.unwrap();
926        let mut output = vec![];
927        while iter.is_valid() {
928            output.push((
929                iter.key().user_key.table_key.to_vec(),
930                iter.value().to_bytes(),
931            ));
932            iter.next().await.unwrap();
933        }
934        assert_eq!(output, shared_buffer_items);
935
936        // Backward iterator
937        let mut backward_iter = shared_buffer_batch.clone().into_backward_iter();
938        backward_iter.rewind().await.unwrap();
939        let mut output = vec![];
940        while backward_iter.is_valid() {
941            output.push((
942                backward_iter.key().user_key.table_key.to_vec(),
943                backward_iter.value().to_bytes(),
944            ));
945            backward_iter.next().await.unwrap();
946        }
947        output.reverse();
948        assert_eq!(output, shared_buffer_items);
949    }
950
951    #[tokio::test]
952    async fn test_shared_buffer_batch_seek() {
953        let epoch = test_epoch(1);
954        let shared_buffer_items = vec![
955            (
956                iterator_test_table_key_of(1),
957                SharedBufferValue::Insert(Bytes::from("value1")),
958            ),
959            (
960                iterator_test_table_key_of(2),
961                SharedBufferValue::Insert(Bytes::from("value2")),
962            ),
963            (
964                iterator_test_table_key_of(3),
965                SharedBufferValue::Insert(Bytes::from("value3")),
966            ),
967        ];
968        let shared_buffer_batch = SharedBufferBatch::for_test(
969            transform_shared_buffer(shared_buffer_items.clone()),
970            epoch,
971            Default::default(),
972        );
973        let shared_buffer_items = to_hummock_value_batch(shared_buffer_items);
974
975        // FORWARD: Seek to a key < 1st key, expect all three items to return
976        let mut iter = shared_buffer_batch.clone().into_forward_iter();
977        iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
978            .await
979            .unwrap();
980        for item in &shared_buffer_items {
981            assert!(iter.is_valid());
982            assert_eq!(*iter.key().user_key.table_key, item.0);
983            assert_eq!(iter.value(), item.1.as_slice());
984            iter.next().await.unwrap();
985        }
986        assert!(!iter.is_valid());
987
988        // FORWARD: Seek to a key > the last key, expect no items to return
989        let mut iter = shared_buffer_batch.clone().into_forward_iter();
990        iter.seek(iterator_test_key_of_epoch(4, epoch).to_ref())
991            .await
992            .unwrap();
993        assert!(!iter.is_valid());
994
995        // FORWARD: Seek to 2nd key with current epoch, expect last two items to return
996        let mut iter = shared_buffer_batch.clone().into_forward_iter();
997        iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
998            .await
999            .unwrap();
1000        for item in &shared_buffer_items[1..] {
1001            assert!(iter.is_valid());
1002            assert_eq!(*iter.key().user_key.table_key, item.0);
1003            assert_eq!(iter.value(), item.1.as_slice());
1004            iter.next().await.unwrap();
1005        }
1006        assert!(!iter.is_valid());
1007
1008        // FORWARD: Seek to 2nd key with future epoch, expect last two items to return
1009        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1010        iter.seek(iterator_test_key_of_epoch(2, test_epoch(2)).to_ref())
1011            .await
1012            .unwrap();
1013        for item in &shared_buffer_items[1..] {
1014            assert!(iter.is_valid());
1015            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1016            assert_eq!(iter.value(), item.1.as_slice());
1017            iter.next().await.unwrap();
1018        }
1019        assert!(!iter.is_valid());
1020
1021        // FORWARD: Seek to 2nd key with old epoch, expect last item to return
1022        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1023        iter.seek(iterator_test_key_of_epoch(2, test_epoch(0)).to_ref())
1024            .await
1025            .unwrap();
1026        let item = shared_buffer_items.last().unwrap();
1027        assert!(iter.is_valid());
1028        assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1029        assert_eq!(iter.value(), item.1.as_slice());
1030        iter.next().await.unwrap();
1031        assert!(!iter.is_valid());
1032
1033        // BACKWARD: Seek to a key < 1st key, expect no items to return
1034        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1035        iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1036            .await
1037            .unwrap();
1038        assert!(!iter.is_valid());
1039
1040        // BACKWARD: Seek to a key > the last key, expect all items to return
1041        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1042        iter.seek(iterator_test_key_of_epoch(4, epoch).to_ref())
1043            .await
1044            .unwrap();
1045        for item in shared_buffer_items.iter().rev() {
1046            assert!(iter.is_valid());
1047            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1048            assert_eq!(iter.value(), item.1.as_slice());
1049            iter.next().await.unwrap();
1050        }
1051        assert!(!iter.is_valid());
1052
1053        // BACKWARD: Seek to 2nd key with current epoch, expect first two items to return
1054        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1055        iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1056            .await
1057            .unwrap();
1058        for item in shared_buffer_items[0..=1].iter().rev() {
1059            assert!(iter.is_valid());
1060            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1061            assert_eq!(iter.value(), item.1.as_slice());
1062            iter.next().await.unwrap();
1063        }
1064        assert!(!iter.is_valid());
1065
1066        // BACKWARD: Seek to 2nd key with old epoch, expect first item to return
1067        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1068        iter.seek(iterator_test_key_of_epoch(2, epoch.prev_epoch()).to_ref())
1069            .await
1070            .unwrap();
1071        assert!(iter.is_valid());
1072        let item = shared_buffer_items.first().unwrap();
1073        assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1074        assert_eq!(iter.value(), item.1.as_slice());
1075        iter.next().await.unwrap();
1076        assert!(!iter.is_valid());
1077
1078        // BACKWARD: Seek to 2nd key with future epoch, expect first two item to return
1079        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1080        iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1081            .await
1082            .unwrap();
1083        for item in shared_buffer_items[0..=1].iter().rev() {
1084            assert!(iter.is_valid());
1085            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1086            assert_eq!(iter.value(), item.1.as_slice());
1087            iter.next().await.unwrap();
1088        }
1089        assert!(!iter.is_valid());
1090    }
1091
1092    #[tokio::test]
1093    async fn test_shared_buffer_batch_old_value_iter() {
1094        let epoch = test_epoch(1);
1095        let key_values = vec![
1096            (
1097                iterator_test_table_key_of(1),
1098                SharedBufferValue::Insert(Bytes::from("value1")),
1099            ),
1100            (
1101                iterator_test_table_key_of(2),
1102                SharedBufferValue::Update(Bytes::from("value2")),
1103            ),
1104            (
1105                iterator_test_table_key_of(3),
1106                SharedBufferValue::Insert(Bytes::from("value3")),
1107            ),
1108            (iterator_test_table_key_of(4), SharedBufferValue::Delete),
1109        ];
1110        let old_values = vec![
1111            Bytes::new(),
1112            Bytes::from("old_value2"),
1113            Bytes::new(),
1114            Bytes::from("old_value4"),
1115        ];
1116        let shared_buffer_batch = SharedBufferBatch::for_test_with_old_values(
1117            transform_shared_buffer(key_values.clone()),
1118            old_values.clone(),
1119            epoch,
1120            Default::default(),
1121        );
1122        let shared_buffer_items = to_hummock_value_batch(key_values.clone());
1123        let expected_old_value_iter_items = zip_eq(&key_values, &old_values)
1124            .filter(|((_, new_value), _)| !matches!(new_value, SharedBufferValue::Insert(_)))
1125            .map(|((key, _), old_value)| (key.clone(), HummockValue::Put(old_value)))
1126            .collect_vec();
1127
1128        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1129        iter.rewind().await.unwrap();
1130        for item in &expected_old_value_iter_items {
1131            assert!(iter.is_valid());
1132            assert_eq!(*iter.key().user_key.table_key, item.0);
1133            assert_eq!(iter.value(), item.1.as_slice());
1134            iter.next().await.unwrap();
1135        }
1136        assert!(!iter.is_valid());
1137
1138        // FORWARD: Seek to a key < 1st key, expect all three items to return
1139        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1140        iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1141            .await
1142            .unwrap();
1143        for item in &shared_buffer_items {
1144            assert!(iter.is_valid());
1145            assert_eq!(*iter.key().user_key.table_key, item.0);
1146            assert_eq!(iter.value(), item.1.as_slice());
1147            iter.next().await.unwrap();
1148        }
1149        assert!(!iter.is_valid());
1150
1151        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1152        iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1153            .await
1154            .unwrap();
1155        for item in &expected_old_value_iter_items {
1156            assert!(iter.is_valid());
1157            assert_eq!(*iter.key().user_key.table_key, item.0);
1158            assert_eq!(iter.value(), item.1.as_slice());
1159            iter.next().await.unwrap();
1160        }
1161        assert!(!iter.is_valid());
1162
1163        // FORWARD: Seek to a key > the last key, expect no items to return
1164        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1165        iter.seek(iterator_test_key_of_epoch(5, epoch).to_ref())
1166            .await
1167            .unwrap();
1168        assert!(!iter.is_valid());
1169
1170        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1171        iter.seek(iterator_test_key_of_epoch(5, epoch).to_ref())
1172            .await
1173            .unwrap();
1174        assert!(!iter.is_valid());
1175
1176        // FORWARD: Seek to 2nd key with current epoch, expect last two items to return
1177        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1178        iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1179            .await
1180            .unwrap();
1181        for item in &shared_buffer_items[1..] {
1182            assert!(iter.is_valid());
1183            assert_eq!(*iter.key().user_key.table_key, item.0);
1184            assert_eq!(iter.value(), item.1.as_slice());
1185            iter.next().await.unwrap();
1186        }
1187        assert!(!iter.is_valid());
1188
1189        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1190        iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1191            .await
1192            .unwrap();
1193        for item in &expected_old_value_iter_items {
1194            assert!(iter.is_valid());
1195            assert_eq!(*iter.key().user_key.table_key, item.0);
1196            assert_eq!(iter.value(), item.1.as_slice());
1197            iter.next().await.unwrap();
1198        }
1199        assert!(!iter.is_valid());
1200
1201        // FORWARD: Seek to 2nd key with future epoch, expect last two items to return
1202        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1203        iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1204            .await
1205            .unwrap();
1206        for item in &shared_buffer_items[1..] {
1207            assert!(iter.is_valid());
1208            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1209            assert_eq!(iter.value(), item.1.as_slice());
1210            iter.next().await.unwrap();
1211        }
1212        assert!(!iter.is_valid());
1213
1214        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1215        iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1216            .await
1217            .unwrap();
1218        for item in &expected_old_value_iter_items {
1219            assert!(iter.is_valid());
1220            assert_eq!(*iter.key().user_key.table_key, item.0);
1221            assert_eq!(iter.value(), item.1.as_slice());
1222            iter.next().await.unwrap();
1223        }
1224        assert!(!iter.is_valid());
1225
1226        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1227        iter.seek(iterator_test_key_of_epoch(2, epoch.prev_epoch()).to_ref())
1228            .await
1229            .unwrap();
1230        for item in &expected_old_value_iter_items[1..] {
1231            assert!(iter.is_valid());
1232            assert_eq!(*iter.key().user_key.table_key, item.0);
1233            assert_eq!(iter.value(), item.1.as_slice());
1234            iter.next().await.unwrap();
1235        }
1236        assert!(!iter.is_valid());
1237
1238        // FORWARD: Seek to 2nd key with old epoch, expect last item to return
1239        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1240        iter.seek(iterator_test_key_of_epoch(3, epoch.prev_epoch()).to_ref())
1241            .await
1242            .unwrap();
1243        let item = shared_buffer_items.last().unwrap();
1244        assert!(iter.is_valid());
1245        assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1246        assert_eq!(iter.value(), item.1.as_slice());
1247        iter.next().await.unwrap();
1248        assert!(!iter.is_valid());
1249
1250        // Seek to an insert key
1251        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1252        iter.seek(iterator_test_key_of_epoch(3, epoch).to_ref())
1253            .await
1254            .unwrap();
1255        for item in &expected_old_value_iter_items[1..] {
1256            assert!(iter.is_valid());
1257            assert_eq!(*iter.key().user_key.table_key, item.0);
1258            assert_eq!(iter.value(), item.1.as_slice());
1259            iter.next().await.unwrap();
1260        }
1261        assert!(!iter.is_valid());
1262    }
1263
1264    #[tokio::test]
1265    #[should_panic]
1266    async fn test_invalid_table_id() {
1267        let epoch = test_epoch(1);
1268        let shared_buffer_batch = SharedBufferBatch::for_test(vec![], epoch, Default::default());
1269        // Seeking to non-current epoch should panic
1270        let mut iter = shared_buffer_batch.into_forward_iter();
1271        iter.seek(FullKey::for_test(TableId::new(1), vec![], epoch).to_ref())
1272            .await
1273            .unwrap();
1274    }
1275
1276    #[tokio::test]
1277    async fn test_shared_buffer_batch_range_existx() {
1278        let epoch = test_epoch(1);
1279        let shared_buffer_items = vec![
1280            (
1281                Vec::from("a_1"),
1282                SharedBufferValue::Insert(Bytes::from("value1")),
1283            ),
1284            (
1285                Vec::from("a_3"),
1286                SharedBufferValue::Insert(Bytes::from("value2")),
1287            ),
1288            (
1289                Vec::from("a_5"),
1290                SharedBufferValue::Insert(Bytes::from("value3")),
1291            ),
1292            (
1293                Vec::from("b_2"),
1294                SharedBufferValue::Insert(Bytes::from("value3")),
1295            ),
1296        ];
1297        let shared_buffer_batch = SharedBufferBatch::for_test(
1298            transform_shared_buffer(shared_buffer_items),
1299            epoch,
1300            Default::default(),
1301        );
1302
1303        let range = (Included(Bytes::from("a")), Excluded(Bytes::from("b")));
1304        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1305        let range = (Included(Bytes::from("a_")), Excluded(Bytes::from("b_")));
1306        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1307        let range = (Included(Bytes::from("a_1")), Included(Bytes::from("a_1")));
1308        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1309        let range = (Included(Bytes::from("a_1")), Included(Bytes::from("a_2")));
1310        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1311        let range = (Included(Bytes::from("a_0x")), Included(Bytes::from("a_2x")));
1312        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1313        let range = (Included(Bytes::from("a_")), Excluded(Bytes::from("c_")));
1314        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1315        let range = (Included(Bytes::from("b_0x")), Included(Bytes::from("b_2x")));
1316        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1317        let range = (Included(Bytes::from("b_2")), Excluded(Bytes::from("c_1x")));
1318        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1319
1320        let range = (Included(Bytes::from("a_0")), Excluded(Bytes::from("a_1")));
1321        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1322        let range = (Included(Bytes::from("a__0")), Excluded(Bytes::from("a__5")));
1323        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1324        let range = (Included(Bytes::from("b_1")), Excluded(Bytes::from("b_2")));
1325        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1326        let range = (Included(Bytes::from("b_3")), Excluded(Bytes::from("c_1")));
1327        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1328        let range = (Included(Bytes::from("b__x")), Excluded(Bytes::from("c__x")));
1329        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1330    }
1331
1332    #[tokio::test]
1333    async fn test_merge_imms_basic() {
1334        let table_id = TableId::new(1004);
1335        let shared_buffer_items1: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1336            (
1337                iterator_test_table_key_of(1),
1338                SharedBufferValue::Insert(Bytes::from("value1")),
1339            ),
1340            (
1341                iterator_test_table_key_of(2),
1342                SharedBufferValue::Insert(Bytes::from("value2")),
1343            ),
1344            (
1345                iterator_test_table_key_of(3),
1346                SharedBufferValue::Insert(Bytes::from("value3")),
1347            ),
1348        ];
1349        let epoch = test_epoch(1);
1350        let imm1 = SharedBufferBatch::for_test(
1351            transform_shared_buffer(shared_buffer_items1.clone()),
1352            epoch,
1353            table_id,
1354        );
1355        let shared_buffer_items1 = to_hummock_value_batch(shared_buffer_items1);
1356        let shared_buffer_items2: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1357            (
1358                iterator_test_table_key_of(1),
1359                SharedBufferValue::Insert(Bytes::from("value12")),
1360            ),
1361            (
1362                iterator_test_table_key_of(2),
1363                SharedBufferValue::Insert(Bytes::from("value22")),
1364            ),
1365            (
1366                iterator_test_table_key_of(3),
1367                SharedBufferValue::Insert(Bytes::from("value32")),
1368            ),
1369        ];
1370        let epoch = test_epoch(2);
1371        let imm2 = SharedBufferBatch::for_test(
1372            transform_shared_buffer(shared_buffer_items2.clone()),
1373            epoch,
1374            table_id,
1375        );
1376        let shared_buffer_items2 = to_hummock_value_batch(shared_buffer_items2);
1377
1378        let shared_buffer_items3: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1379            (
1380                iterator_test_table_key_of(1),
1381                SharedBufferValue::Insert(Bytes::from("value13")),
1382            ),
1383            (
1384                iterator_test_table_key_of(2),
1385                SharedBufferValue::Insert(Bytes::from("value23")),
1386            ),
1387            (
1388                iterator_test_table_key_of(3),
1389                SharedBufferValue::Insert(Bytes::from("value33")),
1390            ),
1391        ];
1392        let epoch = test_epoch(3);
1393        let imm3 = SharedBufferBatch::for_test(
1394            transform_shared_buffer(shared_buffer_items3.clone()),
1395            epoch,
1396            table_id,
1397        );
1398        let shared_buffer_items3 = to_hummock_value_batch(shared_buffer_items3);
1399
1400        let batch_items = [
1401            shared_buffer_items1,
1402            shared_buffer_items2,
1403            shared_buffer_items3,
1404        ];
1405        // newer data comes first
1406        let imms = vec![imm3, imm2, imm1];
1407        let merged_imm = merge_imms_in_memory(table_id, imms.clone(), None).await;
1408
1409        // Point lookup
1410        for (i, items) in batch_items.iter().enumerate() {
1411            for (key, value) in items {
1412                assert_eq!(
1413                    merged_imm
1414                        .get(
1415                            TableKey(key.as_slice()),
1416                            test_epoch(i as u64 + 1),
1417                            &ReadOptions::default()
1418                        )
1419                        .unwrap()
1420                        .0
1421                        .as_slice(),
1422                    value.as_slice(),
1423                    "epoch: {}, key: {:?}",
1424                    test_epoch(i as u64 + 1),
1425                    String::from_utf8(key.clone())
1426                );
1427            }
1428        }
1429        assert_eq!(
1430            merged_imm.get(
1431                TableKey(iterator_test_table_key_of(4).as_slice()),
1432                test_epoch(1),
1433                &ReadOptions::default()
1434            ),
1435            None
1436        );
1437        assert_eq!(
1438            merged_imm.get(
1439                TableKey(iterator_test_table_key_of(5).as_slice()),
1440                test_epoch(1),
1441                &ReadOptions::default()
1442            ),
1443            None
1444        );
1445
1446        // Forward iterator
1447        for snapshot_epoch in 1..=3 {
1448            let mut iter = merged_imm.clone().into_forward_iter();
1449            iter.rewind().await.unwrap();
1450            let mut output = vec![];
1451            while iter.is_valid() {
1452                let epoch = iter.key().epoch_with_gap.pure_epoch();
1453                if test_epoch(snapshot_epoch) == epoch {
1454                    output.push((
1455                        iter.key().user_key.table_key.to_vec(),
1456                        iter.value().to_bytes(),
1457                    ));
1458                }
1459                iter.next().await.unwrap();
1460            }
1461            assert_eq!(output, batch_items[snapshot_epoch as usize - 1]);
1462        }
1463
1464        // Forward and Backward iterator
1465        {
1466            let mut iter = merged_imm.clone().into_forward_iter();
1467            iter.rewind().await.unwrap();
1468            let mut output = vec![];
1469            while iter.is_valid() {
1470                output.push((
1471                    iter.key().user_key.table_key.to_vec(),
1472                    iter.value().to_bytes(),
1473                ));
1474                iter.next().await.unwrap();
1475            }
1476
1477            let mut expected = vec![];
1478            #[expect(clippy::needless_range_loop)]
1479            for key_idx in 0..=2 {
1480                for epoch in (1..=3).rev() {
1481                    let item = batch_items[epoch - 1][key_idx].clone();
1482                    expected.push(item);
1483                }
1484            }
1485            assert_eq!(expected, output);
1486
1487            let mut backward_iter = merged_imm.clone().into_backward_iter();
1488            backward_iter.rewind().await.unwrap();
1489            let mut output = vec![];
1490            while backward_iter.is_valid() {
1491                output.push((
1492                    backward_iter.key().user_key.table_key.to_vec(),
1493                    backward_iter.value().to_bytes(),
1494                ));
1495                backward_iter.next().await.unwrap();
1496            }
1497            let mut expected = vec![];
1498            for key_idx in (0..=2).rev() {
1499                for epoch in (1..=3).rev() {
1500                    let item = batch_items[epoch - 1][key_idx].clone();
1501                    expected.push(item);
1502                }
1503            }
1504            assert_eq!(expected, output);
1505        }
1506    }
1507
1508    #[tokio::test]
1509    async fn test_merge_imms_with_old_values() {
1510        let table_id = TableId::new(1004);
1511        let key_value1: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1512            (
1513                iterator_test_table_key_of(1),
1514                SharedBufferValue::Insert(Bytes::from("value1")),
1515            ),
1516            (
1517                iterator_test_table_key_of(2),
1518                SharedBufferValue::Update(Bytes::from("value2")),
1519            ),
1520            (iterator_test_table_key_of(3), SharedBufferValue::Delete),
1521        ];
1522        let old_value1 = vec![
1523            Bytes::new(),
1524            Bytes::from("old_value2"),
1525            Bytes::from("old_value3"),
1526        ];
1527        let epoch = test_epoch(1);
1528        let imm1 = SharedBufferBatch::for_test_with_old_values(
1529            transform_shared_buffer(key_value1.clone()),
1530            old_value1.clone(),
1531            epoch,
1532            table_id,
1533        );
1534        let shared_buffer_items1 = to_hummock_value_batch(key_value1.clone());
1535        let key_value2: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1536            (
1537                iterator_test_table_key_of(1),
1538                SharedBufferValue::Update(Bytes::from("value12")),
1539            ),
1540            (
1541                iterator_test_table_key_of(2),
1542                SharedBufferValue::Update(Bytes::from("value22")),
1543            ),
1544            (
1545                iterator_test_table_key_of(3),
1546                SharedBufferValue::Insert(Bytes::from("value32")),
1547            ),
1548        ];
1549        let old_value2 = vec![Bytes::from("value1"), Bytes::from("value2"), Bytes::new()];
1550        let epoch = epoch.next_epoch();
1551        let imm2 = SharedBufferBatch::for_test_with_old_values(
1552            transform_shared_buffer(key_value2.clone()),
1553            old_value2.clone(),
1554            epoch,
1555            table_id,
1556        );
1557        let shared_buffer_items2 = to_hummock_value_batch(key_value2.clone());
1558
1559        let key_value3: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1560            (iterator_test_table_key_of(1), SharedBufferValue::Delete),
1561            (iterator_test_table_key_of(2), SharedBufferValue::Delete),
1562            (
1563                iterator_test_table_key_of(3),
1564                SharedBufferValue::Update(Bytes::from("value33")),
1565            ),
1566        ];
1567        let old_value3 = vec![
1568            Bytes::from("value12"),
1569            Bytes::from("value22"),
1570            Bytes::from("value32"),
1571        ];
1572        let epoch = epoch.next_epoch();
1573        let imm3 = SharedBufferBatch::for_test_with_old_values(
1574            transform_shared_buffer(key_value3.clone()),
1575            old_value3.clone(),
1576            epoch,
1577            table_id,
1578        );
1579        let shared_buffer_items3 = to_hummock_value_batch(key_value3.clone());
1580
1581        let key_values = [
1582            (key_value1, old_value1),
1583            (key_value2, old_value2),
1584            (key_value3, old_value3),
1585        ];
1586
1587        let batch_items = [
1588            shared_buffer_items1,
1589            shared_buffer_items2,
1590            shared_buffer_items3,
1591        ];
1592        // newer data comes first
1593        let imms = vec![imm3, imm2, imm1];
1594        let merged_imm = merge_imms_in_memory(table_id, imms.clone(), None).await;
1595
1596        // Point lookup
1597        for (i, items) in batch_items.iter().enumerate() {
1598            for (key, value) in items {
1599                assert_eq!(
1600                    merged_imm
1601                        .get(
1602                            TableKey(key.as_slice()),
1603                            test_epoch(i as u64 + 1),
1604                            &ReadOptions::default()
1605                        )
1606                        .unwrap()
1607                        .0
1608                        .as_slice(),
1609                    value.as_slice(),
1610                    "epoch: {}, key: {:?}",
1611                    test_epoch(i as u64 + 1),
1612                    String::from_utf8(key.clone())
1613                );
1614            }
1615        }
1616        assert_eq!(
1617            merged_imm.get(
1618                TableKey(iterator_test_table_key_of(4).as_slice()),
1619                test_epoch(1),
1620                &ReadOptions::default()
1621            ),
1622            None
1623        );
1624        assert_eq!(
1625            merged_imm.get(
1626                TableKey(iterator_test_table_key_of(5).as_slice()),
1627                test_epoch(1),
1628                &ReadOptions::default()
1629            ),
1630            None
1631        );
1632
1633        // Forward i
1634        for i in 1..=3 {
1635            let snapshot_epoch = test_epoch(i);
1636            let mut iter = merged_imm.clone().into_forward_iter();
1637            iter.rewind().await.unwrap();
1638            let mut output = vec![];
1639            while iter.is_valid() {
1640                let epoch = iter.key().epoch_with_gap.pure_epoch();
1641                if snapshot_epoch == epoch {
1642                    output.push((
1643                        iter.key().user_key.table_key.to_vec(),
1644                        iter.value().to_bytes(),
1645                    ));
1646                }
1647                iter.next().await.unwrap();
1648            }
1649            assert_eq!(output, batch_items[i as usize - 1]);
1650        }
1651
1652        // Forward and Backward iterator
1653        {
1654            let mut iter = merged_imm.clone().into_forward_iter();
1655            iter.rewind().await.unwrap();
1656            let mut output = vec![];
1657            while iter.is_valid() {
1658                output.push((
1659                    iter.key().user_key.table_key.to_vec(),
1660                    iter.value().to_bytes(),
1661                ));
1662                iter.next().await.unwrap();
1663            }
1664
1665            let mut expected = vec![];
1666            #[expect(clippy::needless_range_loop)]
1667            for key_idx in 0..=2 {
1668                for epoch in (1..=3).rev() {
1669                    let item = batch_items[epoch - 1][key_idx].clone();
1670                    expected.push(item);
1671                }
1672            }
1673            assert_eq!(expected, output);
1674
1675            let mut backward_iter = merged_imm.clone().into_backward_iter();
1676            backward_iter.rewind().await.unwrap();
1677            let mut output = vec![];
1678            while backward_iter.is_valid() {
1679                output.push((
1680                    backward_iter.key().user_key.table_key.to_vec(),
1681                    backward_iter.value().to_bytes(),
1682                ));
1683                backward_iter.next().await.unwrap();
1684            }
1685            let mut expected = vec![];
1686            for key_idx in (0..=2).rev() {
1687                for epoch in (1..=3).rev() {
1688                    let item = batch_items[epoch - 1][key_idx].clone();
1689                    expected.push(item);
1690                }
1691            }
1692            assert_eq!(expected, output);
1693        }
1694
1695        // old value iter
1696        {
1697            let mut iter = merged_imm.clone().into_old_value_iter();
1698            iter.rewind().await.unwrap();
1699            let mut output = vec![];
1700            while iter.is_valid() {
1701                output.push((
1702                    iter.key().user_key.table_key.to_vec(),
1703                    iter.value().to_bytes(),
1704                ));
1705                iter.next().await.unwrap();
1706            }
1707
1708            let mut expected = vec![];
1709            for key_idx in 0..=2 {
1710                for epoch in (0..=2).rev() {
1711                    let (key_values, old_values) = &key_values[epoch];
1712                    let (key, new_value) = &key_values[key_idx];
1713                    let old_value = &old_values[key_idx];
1714                    if matches!(new_value, SharedBufferValue::Insert(_)) {
1715                        continue;
1716                    }
1717                    expected.push((key.clone(), HummockValue::Put(old_value.clone())));
1718                }
1719            }
1720            assert_eq!(expected, output);
1721        }
1722    }
1723}