risingwave_storage/hummock/shared_buffer/
shared_buffer_batch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::mem::size_of_val;
19use std::ops::Bound::Included;
20use std::ops::{Bound, RangeBounds};
21use std::sync::atomic::AtomicU64;
22use std::sync::atomic::Ordering::Relaxed;
23use std::sync::{Arc, LazyLock};
24
25use bytes::Bytes;
26use prometheus::IntGauge;
27use risingwave_common::catalog::TableId;
28use risingwave_hummock_sdk::EpochWithGap;
29use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange, UserKey};
30
31use crate::hummock::iterator::{
32    Backward, DirectionEnum, Forward, HummockIterator, HummockIteratorDirection, ValueMeta,
33};
34use crate::hummock::utils::{MemoryTracker, range_overlap};
35use crate::hummock::value::HummockValue;
36use crate::hummock::{HummockEpoch, HummockResult};
37use crate::mem_table::ImmId;
38use crate::store::ReadOptions;
39
40#[derive(Clone, Copy, Debug, PartialEq)]
41pub enum SharedBufferValue<T> {
42    Insert(T),
43    Update(T),
44    Delete,
45}
46
47impl<T> SharedBufferValue<T> {
48    fn to_ref(&self) -> SharedBufferValue<&T> {
49        match self {
50            SharedBufferValue::Insert(val) => SharedBufferValue::Insert(val),
51            SharedBufferValue::Update(val) => SharedBufferValue::Update(val),
52            SharedBufferValue::Delete => SharedBufferValue::Delete,
53        }
54    }
55}
56
57impl<T> From<SharedBufferValue<T>> for HummockValue<T> {
58    fn from(val: SharedBufferValue<T>) -> HummockValue<T> {
59        match val {
60            SharedBufferValue::Insert(val) | SharedBufferValue::Update(val) => {
61                HummockValue::Put(val)
62            }
63            SharedBufferValue::Delete => HummockValue::Delete,
64        }
65    }
66}
67
68impl<'a, T: AsRef<[u8]>> SharedBufferValue<&'a T> {
69    pub(crate) fn to_slice(self) -> SharedBufferValue<&'a [u8]> {
70        match self {
71            SharedBufferValue::Insert(val) => SharedBufferValue::Insert(val.as_ref()),
72            SharedBufferValue::Update(val) => SharedBufferValue::Update(val.as_ref()),
73            SharedBufferValue::Delete => SharedBufferValue::Delete,
74        }
75    }
76}
77
78/// The key is `table_key`, which does not contain table id or epoch.
79pub(crate) type SharedBufferItem = (TableKey<Bytes>, SharedBufferValue<Bytes>);
80pub type SharedBufferBatchId = u64;
81
82pub(crate) type VersionedSharedBufferValue = (EpochWithGap, SharedBufferValue<Bytes>);
83
84pub(crate) struct SharedBufferVersionedEntryRef<'a> {
85    pub(crate) key: &'a TableKey<Bytes>,
86    pub(crate) new_values: &'a [VersionedSharedBufferValue],
87    pub(crate) old_values: Option<&'a [Bytes]>,
88}
89
90#[derive(PartialEq, Debug)]
91pub(crate) struct SharedBufferKeyEntry {
92    pub(crate) key: TableKey<Bytes>,
93    /// A shared buffer may contain data from multiple epochs for a specific key.
94    /// The values of all keys are stored together in the field `new_values` of `SharedBufferBatchInner`
95    /// as a single vector. `value_offset` is the starting offset of values of the current `key` in the `new_values` vector.
96    /// The end offset is the `value_offset` of the next entry or the vector end if the current entry is not the last one.
97    pub(crate) value_offset: usize,
98}
99
100impl SharedBufferKeyEntry {
101    /// Return an exclusive offset of the values of key of index `i`
102    fn value_end_offset<'a, T>(
103        i: usize,
104        entries: &'a [SharedBufferKeyEntry],
105        values: &'a [T],
106    ) -> usize {
107        entries
108            .get(i + 1)
109            .map(|entry| entry.value_offset)
110            .unwrap_or(values.len())
111    }
112
113    fn values<'a, T>(i: usize, entries: &'a [SharedBufferKeyEntry], values: &'a [T]) -> &'a [T] {
114        &values[entries[i].value_offset..Self::value_end_offset(i, entries, values)]
115    }
116}
117
118#[derive(Debug)]
119pub(crate) struct SharedBufferBatchOldValues {
120    /// Store the old values. If some, the length should be the same as `new_values`. It contains empty `Bytes` when the
121    /// corresponding `new_value` is `Insert`, and contains the old values of `Update` and `Delete`.
122    values: Vec<Bytes>,
123    pub size: usize,
124    pub global_old_value_size: IntGauge,
125}
126
127impl Drop for SharedBufferBatchOldValues {
128    fn drop(&mut self) {
129        self.global_old_value_size.sub(self.size as _);
130    }
131}
132
133impl SharedBufferBatchOldValues {
134    pub(crate) fn new(values: Vec<Bytes>, size: usize, global_old_value_size: IntGauge) -> Self {
135        global_old_value_size.add(size as _);
136        Self {
137            values,
138            size,
139            global_old_value_size,
140        }
141    }
142
143    pub(crate) fn for_test(values: Vec<Bytes>, size: usize) -> Self {
144        Self::new(values, size, IntGauge::new("test", "test").unwrap())
145    }
146}
147
148#[derive(Debug)]
149pub(crate) struct SharedBufferBatchInner {
150    entries: Vec<SharedBufferKeyEntry>,
151    new_values: Vec<VersionedSharedBufferValue>,
152    old_values: Option<SharedBufferBatchOldValues>,
153    /// The epochs of the data in batch, sorted in ascending order (old to new)
154    epochs: Vec<HummockEpoch>,
155    /// Total size of all key-value items (excluding the `epoch` of value versions)
156    size: usize,
157    _tracker: Option<MemoryTracker>,
158    /// For a batch created from multiple batches, this will be
159    /// the largest batch id among input batches
160    batch_id: SharedBufferBatchId,
161}
162
163impl SharedBufferBatchInner {
164    pub(crate) fn new(
165        epoch: HummockEpoch,
166        spill_offset: u16,
167        payload: Vec<SharedBufferItem>,
168        old_values: Option<SharedBufferBatchOldValues>,
169        size: usize,
170        _tracker: Option<MemoryTracker>,
171    ) -> Self {
172        assert!(!payload.is_empty());
173        debug_assert!(payload.iter().is_sorted_by_key(|(key, _)| key));
174        if let Some(old_values) = &old_values {
175            assert_eq!(old_values.values.len(), payload.len());
176        }
177
178        let epoch_with_gap = EpochWithGap::new(epoch, spill_offset);
179        let mut entries = Vec::with_capacity(payload.len());
180        let mut new_values = Vec::with_capacity(payload.len());
181        for (i, (key, value)) in payload.into_iter().enumerate() {
182            entries.push(SharedBufferKeyEntry {
183                key,
184                value_offset: i,
185            });
186            new_values.push((epoch_with_gap, value));
187        }
188
189        let batch_id = SHARED_BUFFER_BATCH_ID_GENERATOR.fetch_add(1, Relaxed);
190        SharedBufferBatchInner {
191            entries,
192            new_values,
193            old_values,
194            epochs: vec![epoch],
195            size,
196            _tracker,
197            batch_id,
198        }
199    }
200
201    pub fn values(&self, i: usize) -> &[VersionedSharedBufferValue] {
202        SharedBufferKeyEntry::values(i, &self.entries, &self.new_values)
203    }
204
205    #[allow(clippy::too_many_arguments)]
206    pub(crate) fn new_with_multi_epoch_batches(
207        epochs: Vec<HummockEpoch>,
208        entries: Vec<SharedBufferKeyEntry>,
209        new_values: Vec<VersionedSharedBufferValue>,
210        old_values: Option<SharedBufferBatchOldValues>,
211        size: usize,
212        imm_id: ImmId,
213        tracker: Option<MemoryTracker>,
214    ) -> Self {
215        assert!(new_values.len() >= entries.len());
216        assert!(!entries.is_empty());
217        debug_assert!(entries.iter().is_sorted_by_key(|entry| &entry.key));
218        debug_assert!(entries.iter().is_sorted_by_key(|entry| &entry.value_offset));
219        debug_assert!((0..entries.len()).all(|i| {
220            SharedBufferKeyEntry::values(i, &entries, &new_values)
221                .iter()
222                .rev()
223                .is_sorted_by_key(|(epoch_with_gap, _)| epoch_with_gap)
224        }));
225        debug_assert!(!epochs.is_empty());
226        debug_assert!(epochs.is_sorted());
227
228        Self {
229            entries,
230            new_values,
231            old_values,
232            epochs,
233            size,
234            _tracker: tracker,
235            batch_id: imm_id,
236        }
237    }
238
239    /// Return `None` if cannot find a visible version
240    /// Return `HummockValue::Delete` if the key has been deleted by some epoch <= `read_epoch`
241    fn get_value<'a>(
242        &'a self,
243        table_key: TableKey<&[u8]>,
244        read_epoch: HummockEpoch,
245    ) -> Option<(HummockValue<&'a Bytes>, EpochWithGap)> {
246        // Perform binary search on table key to find the corresponding entry
247        if let Ok(i) = self
248            .entries
249            .binary_search_by(|m| (m.key.as_ref()).cmp(*table_key))
250        {
251            let entry = &self.entries[i];
252            assert_eq!(entry.key.as_ref(), *table_key);
253            // Scan to find the first version <= epoch
254            for (e, v) in self.values(i) {
255                // skip invisible versions
256                if read_epoch < e.pure_epoch() {
257                    continue;
258                }
259                return Some((v.to_ref().into(), *e));
260            }
261            // cannot find a visible version
262        }
263
264        None
265    }
266}
267
268impl PartialEq for SharedBufferBatchInner {
269    fn eq(&self, other: &Self) -> bool {
270        self.entries == other.entries && self.new_values == other.new_values
271    }
272}
273
274pub static SHARED_BUFFER_BATCH_ID_GENERATOR: LazyLock<AtomicU64> =
275    LazyLock::new(|| AtomicU64::new(0));
276
277/// A write batch stored in the shared buffer.
278#[derive(Clone, Debug)]
279pub struct SharedBufferBatch {
280    pub(crate) inner: Arc<SharedBufferBatchInner>,
281    pub table_id: TableId,
282}
283
284impl SharedBufferBatch {
285    pub fn for_test(
286        sorted_items: Vec<SharedBufferItem>,
287        epoch: HummockEpoch,
288        table_id: TableId,
289    ) -> Self {
290        Self::for_test_inner(sorted_items, None, epoch, table_id)
291    }
292
293    pub fn for_test_with_old_values(
294        sorted_items: Vec<SharedBufferItem>,
295        old_values: Vec<Bytes>,
296        epoch: HummockEpoch,
297        table_id: TableId,
298    ) -> Self {
299        Self::for_test_inner(sorted_items, Some(old_values), epoch, table_id)
300    }
301
302    fn for_test_inner(
303        sorted_items: Vec<SharedBufferItem>,
304        old_values: Option<Vec<Bytes>>,
305        epoch: HummockEpoch,
306        table_id: TableId,
307    ) -> Self {
308        let (size, old_value_size) = Self::measure_batch_size(&sorted_items, old_values.as_deref());
309
310        let old_values = old_values
311            .map(|old_values| SharedBufferBatchOldValues::for_test(old_values, old_value_size));
312
313        Self {
314            inner: Arc::new(SharedBufferBatchInner::new(
315                epoch,
316                0,
317                sorted_items,
318                old_values,
319                size,
320                None,
321            )),
322            table_id,
323        }
324    }
325
326    pub fn measure_delete_range_size(batch_items: &[(Bound<Bytes>, Bound<Bytes>)]) -> usize {
327        batch_items
328            .iter()
329            .map(|(left, right)| {
330                // is_exclude_left_key(bool) + table_id + epoch
331                let l1 = match left {
332                    Bound::Excluded(x) | Bound::Included(x) => x.len() + 13,
333                    Bound::Unbounded => 13,
334                };
335                let l2 = match right {
336                    Bound::Excluded(x) | Bound::Included(x) => x.len() + 13,
337                    Bound::Unbounded => 13,
338                };
339                l1 + l2
340            })
341            .sum()
342    }
343
344    /// Return (total size, old value size or 0)
345    pub fn measure_batch_size(
346        batch_items: &[SharedBufferItem],
347        old_values: Option<&[Bytes]>,
348    ) -> (usize, usize) {
349        let old_value_size = old_values
350            .iter()
351            .flat_map(|slice| slice.iter().map(|value| size_of_val(value) + value.len()))
352            .sum::<usize>();
353        // size = Sum(length of full key + length of user value)
354        let kv_size = batch_items
355            .iter()
356            .map(|(k, v)| {
357                k.len() + {
358                    match v {
359                        SharedBufferValue::Insert(val) | SharedBufferValue::Update(val) => {
360                            val.len()
361                        }
362                        SharedBufferValue::Delete => 0,
363                    }
364                }
365            })
366            .sum::<usize>();
367        (kv_size + old_value_size, old_value_size)
368    }
369
370    pub fn filter<R, B>(&self, table_id: TableId, table_key_range: &R) -> bool
371    where
372        R: RangeBounds<TableKey<B>>,
373        B: AsRef<[u8]>,
374    {
375        let left = table_key_range
376            .start_bound()
377            .as_ref()
378            .map(|key| TableKey(key.0.as_ref()));
379        let right = table_key_range
380            .end_bound()
381            .as_ref()
382            .map(|key| TableKey(key.0.as_ref()));
383        self.table_id == table_id
384            && range_overlap(
385                &(left, right),
386                &self.start_table_key(),
387                Included(&self.end_table_key()),
388            )
389    }
390
391    pub fn table_id(&self) -> TableId {
392        self.table_id
393    }
394
395    pub fn min_epoch(&self) -> HummockEpoch {
396        *self.inner.epochs.first().unwrap()
397    }
398
399    pub fn max_epoch(&self) -> HummockEpoch {
400        *self.inner.epochs.last().unwrap()
401    }
402
403    pub fn key_count(&self) -> usize {
404        self.inner.entries.len()
405    }
406
407    pub fn value_count(&self) -> usize {
408        self.inner.new_values.len()
409    }
410
411    pub fn has_old_value(&self) -> bool {
412        self.inner.old_values.is_some()
413    }
414
415    pub fn get<'a>(
416        &'a self,
417        table_key: TableKey<&[u8]>,
418        read_epoch: HummockEpoch,
419        _read_options: &ReadOptions,
420    ) -> Option<(HummockValue<&'a Bytes>, EpochWithGap)> {
421        self.inner.get_value(table_key, read_epoch)
422    }
423
424    pub fn range_exists(&self, table_key_range: &TableKeyRange) -> bool {
425        self.inner
426            .entries
427            .binary_search_by(|m| {
428                let key = &m.key;
429                let too_left = match &table_key_range.0 {
430                    std::ops::Bound::Included(range_start) => range_start.as_ref() > key.as_ref(),
431                    std::ops::Bound::Excluded(range_start) => range_start.as_ref() >= key.as_ref(),
432                    std::ops::Bound::Unbounded => false,
433                };
434                if too_left {
435                    return Ordering::Less;
436                }
437
438                let too_right = match &table_key_range.1 {
439                    std::ops::Bound::Included(range_end) => range_end.as_ref() < key.as_ref(),
440                    std::ops::Bound::Excluded(range_end) => range_end.as_ref() <= key.as_ref(),
441                    std::ops::Bound::Unbounded => false,
442                };
443                if too_right {
444                    return Ordering::Greater;
445                }
446
447                Ordering::Equal
448            })
449            .is_ok()
450    }
451
452    pub fn into_directed_iter<D: HummockIteratorDirection, const IS_NEW_VALUE: bool>(
453        self,
454    ) -> SharedBufferBatchIterator<D, IS_NEW_VALUE> {
455        SharedBufferBatchIterator::<D, IS_NEW_VALUE>::new(self.inner, self.table_id)
456    }
457
458    pub fn into_old_value_iter(self) -> SharedBufferBatchIterator<Forward, false> {
459        self.into_directed_iter()
460    }
461
462    pub fn into_forward_iter(self) -> SharedBufferBatchIterator<Forward> {
463        self.into_directed_iter()
464    }
465
466    pub fn into_backward_iter(self) -> SharedBufferBatchIterator<Backward> {
467        self.into_directed_iter()
468    }
469
470    #[inline(always)]
471    pub fn start_table_key(&self) -> TableKey<&[u8]> {
472        TableKey(self.inner.entries.first().expect("non-empty").key.as_ref())
473    }
474
475    #[inline(always)]
476    pub fn end_table_key(&self) -> TableKey<&[u8]> {
477        TableKey(self.inner.entries.last().expect("non-empty").key.as_ref())
478    }
479
480    #[inline(always)]
481    pub fn raw_largest_key(&self) -> &TableKey<Bytes> {
482        &self.inner.entries.last().expect("non-empty").key
483    }
484
485    /// return inclusive left endpoint, which means that all data in this batch should be larger or
486    /// equal than this key.
487    pub fn start_user_key(&self) -> UserKey<&[u8]> {
488        UserKey::new(self.table_id, self.start_table_key())
489    }
490
491    pub fn size(&self) -> usize {
492        self.inner.size
493    }
494
495    pub(crate) fn old_values(&self) -> Option<&SharedBufferBatchOldValues> {
496        self.inner.old_values.as_ref()
497    }
498
499    pub fn batch_id(&self) -> SharedBufferBatchId {
500        self.inner.batch_id
501    }
502
503    pub fn epochs(&self) -> &Vec<HummockEpoch> {
504        &self.inner.epochs
505    }
506
507    pub(crate) fn build_shared_buffer_batch(
508        epoch: HummockEpoch,
509        spill_offset: u16,
510        sorted_items: Vec<SharedBufferItem>,
511        old_values: Option<SharedBufferBatchOldValues>,
512        size: usize,
513        table_id: TableId,
514        tracker: Option<MemoryTracker>,
515    ) -> Self {
516        let inner = SharedBufferBatchInner::new(
517            epoch,
518            spill_offset,
519            sorted_items,
520            old_values,
521            size,
522            tracker,
523        );
524        SharedBufferBatch {
525            inner: Arc::new(inner),
526            table_id,
527        }
528    }
529
530    #[cfg(any(test, feature = "test"))]
531    pub fn build_shared_buffer_batch_for_test(
532        epoch: HummockEpoch,
533        spill_offset: u16,
534        sorted_items: Vec<SharedBufferItem>,
535        size: usize,
536        table_id: TableId,
537    ) -> Self {
538        let inner =
539            SharedBufferBatchInner::new(epoch, spill_offset, sorted_items, None, size, None);
540        SharedBufferBatch {
541            inner: Arc::new(inner),
542            table_id,
543        }
544    }
545}
546
547/// Iterate all the items in the shared buffer batch
548/// If there are multiple versions of a key, the iterator will return all versions
549pub struct SharedBufferBatchIterator<D: HummockIteratorDirection, const IS_NEW_VALUE: bool = true> {
550    inner: Arc<SharedBufferBatchInner>,
551    /// The index of the current entry in the payload
552    current_entry_idx: usize,
553    /// The index of current value
554    current_value_idx: usize,
555    /// The exclusive end offset of the value index of current key.
556    value_end_offset: usize,
557    table_id: TableId,
558    _phantom: PhantomData<D>,
559}
560
561impl<D: HummockIteratorDirection, const IS_NEW_VALUE: bool>
562    SharedBufferBatchIterator<D, IS_NEW_VALUE>
563{
564    pub(crate) fn new(inner: Arc<SharedBufferBatchInner>, table_id: TableId) -> Self {
565        if !IS_NEW_VALUE {
566            assert!(
567                inner.old_values.is_some(),
568                "create old value iter with no old value: {:?}",
569                table_id
570            );
571        }
572        Self {
573            inner,
574            current_entry_idx: 0,
575            current_value_idx: 0,
576            value_end_offset: 0,
577            table_id,
578            _phantom: Default::default(),
579        }
580    }
581
582    fn is_valid_entry_idx(&self) -> bool {
583        self.current_entry_idx < self.inner.entries.len()
584    }
585
586    fn advance_to_next_entry(&mut self) {
587        debug_assert!(self.is_valid_entry_idx());
588        match D::direction() {
589            DirectionEnum::Forward => {
590                self.current_entry_idx += 1;
591            }
592            DirectionEnum::Backward => {
593                if self.current_entry_idx == 0 {
594                    self.current_entry_idx = self.inner.entries.len();
595                } else {
596                    self.current_entry_idx -= 1;
597                }
598            }
599        }
600    }
601
602    fn reset_value_idx(&mut self) {
603        debug_assert!(self.is_valid_entry_idx());
604        self.current_value_idx = self.inner.entries[self.current_entry_idx].value_offset;
605        self.value_end_offset = self.get_value_end_offset();
606    }
607
608    fn get_value_end_offset(&self) -> usize {
609        debug_assert!(self.is_valid_entry_idx());
610        SharedBufferKeyEntry::value_end_offset(
611            self.current_entry_idx,
612            &self.inner.entries,
613            &self.inner.new_values,
614        )
615    }
616
617    fn assert_valid_idx(&self) {
618        debug_assert!(self.is_valid_entry_idx());
619        debug_assert!(
620            self.current_value_idx >= self.inner.entries[self.current_entry_idx].value_offset
621        );
622        debug_assert_eq!(self.value_end_offset, self.get_value_end_offset());
623        debug_assert!(self.current_value_idx < self.value_end_offset);
624        if !IS_NEW_VALUE {
625            debug_assert!(!matches!(
626                &self.inner.new_values[self.current_value_idx].1,
627                SharedBufferValue::Insert(_)
628            ));
629        }
630    }
631
632    fn advance_to_next_value(&mut self) {
633        self.assert_valid_idx();
634
635        if self.current_value_idx + 1 < self.value_end_offset {
636            self.current_value_idx += 1;
637        } else {
638            self.advance_to_next_entry();
639            if self.is_valid_entry_idx() {
640                self.reset_value_idx();
641            }
642        }
643    }
644
645    fn advance_until_valid_old_value(&mut self) {
646        debug_assert!(!IS_NEW_VALUE);
647        if !self.is_valid_entry_idx() {
648            return;
649        }
650        loop {
651            while self.current_value_idx < self.value_end_offset
652                && matches!(
653                    &self.inner.new_values[self.current_value_idx].1,
654                    SharedBufferValue::Insert(_)
655                )
656            {
657                self.current_value_idx += 1;
658            }
659            if self.current_value_idx >= self.value_end_offset {
660                debug_assert_eq!(self.current_value_idx, self.value_end_offset);
661                self.advance_to_next_entry();
662                if self.is_valid_entry_idx() {
663                    self.reset_value_idx();
664                    continue;
665                } else {
666                    break;
667                }
668            } else {
669                break;
670            }
671        }
672    }
673}
674
675impl SharedBufferBatchIterator<Forward> {
676    pub(crate) fn advance_to_next_key(&mut self) {
677        self.advance_to_next_entry();
678        if self.is_valid_entry_idx() {
679            self.reset_value_idx();
680        }
681    }
682
683    pub(crate) fn current_key_entry(&self) -> SharedBufferVersionedEntryRef<'_> {
684        self.assert_valid_idx();
685        debug_assert_eq!(
686            self.current_value_idx,
687            self.inner.entries[self.current_entry_idx].value_offset
688        );
689        SharedBufferVersionedEntryRef {
690            key: &self.inner.entries[self.current_entry_idx].key,
691            new_values: &self.inner.new_values[self.current_value_idx..self.value_end_offset],
692            old_values: self.inner.old_values.as_ref().map(|old_values| {
693                &old_values.values[self.current_value_idx..self.value_end_offset]
694            }),
695        }
696    }
697}
698
699impl<D: HummockIteratorDirection, const IS_NEW_VALUE: bool> HummockIterator
700    for SharedBufferBatchIterator<D, IS_NEW_VALUE>
701{
702    type Direction = D;
703
704    async fn next(&mut self) -> HummockResult<()> {
705        self.advance_to_next_value();
706        if !IS_NEW_VALUE {
707            self.advance_until_valid_old_value();
708        }
709        Ok(())
710    }
711
712    fn key(&self) -> FullKey<&[u8]> {
713        self.assert_valid_idx();
714        let key = self.inner.entries[self.current_entry_idx].key.as_ref();
715        let epoch_with_gap = self.inner.new_values[self.current_value_idx].0;
716        FullKey::new_with_gap_epoch(self.table_id, TableKey(key), epoch_with_gap)
717    }
718
719    fn value(&self) -> HummockValue<&[u8]> {
720        self.assert_valid_idx();
721        if IS_NEW_VALUE {
722            self.inner.new_values[self.current_value_idx]
723                .1
724                .to_ref()
725                .to_slice()
726                .into()
727        } else {
728            HummockValue::put(
729                self.inner.old_values.as_ref().unwrap().values[self.current_value_idx].as_ref(),
730            )
731        }
732    }
733
734    fn is_valid(&self) -> bool {
735        self.is_valid_entry_idx()
736    }
737
738    async fn rewind(&mut self) -> HummockResult<()> {
739        match D::direction() {
740            DirectionEnum::Forward => {
741                self.current_entry_idx = 0;
742            }
743            DirectionEnum::Backward => {
744                self.current_entry_idx = self.inner.entries.len() - 1;
745            }
746        };
747        self.reset_value_idx();
748        if !IS_NEW_VALUE {
749            self.advance_until_valid_old_value();
750        }
751        Ok(())
752    }
753
754    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
755        debug_assert_eq!(key.user_key.table_id, self.table_id);
756        // Perform binary search on table key because the items in SharedBufferBatch is ordered
757        // by table key.
758        let partition_point = self
759            .inner
760            .entries
761            .binary_search_by(|probe| probe.key.as_ref().cmp(*key.user_key.table_key));
762        let seek_key_epoch = key.epoch_with_gap;
763        match partition_point {
764            Ok(i) => {
765                self.current_entry_idx = i;
766                self.reset_value_idx();
767                while self.current_value_idx < self.value_end_offset {
768                    let epoch_with_gap = self.inner.new_values[self.current_value_idx].0;
769                    if epoch_with_gap <= seek_key_epoch {
770                        break;
771                    }
772                    self.current_value_idx += 1;
773                }
774                if self.current_value_idx == self.value_end_offset {
775                    self.advance_to_next_entry();
776                    if self.is_valid_entry_idx() {
777                        self.reset_value_idx();
778                    }
779                }
780            }
781            Err(i) => match D::direction() {
782                DirectionEnum::Forward => {
783                    self.current_entry_idx = i;
784                    if self.is_valid_entry_idx() {
785                        self.reset_value_idx();
786                    }
787                }
788                DirectionEnum::Backward => {
789                    if i == 0 {
790                        self.current_entry_idx = self.inner.entries.len();
791                    } else {
792                        self.current_entry_idx = i - 1;
793                        self.reset_value_idx();
794                    }
795                }
796            },
797        };
798        if !IS_NEW_VALUE {
799            self.advance_until_valid_old_value();
800        }
801        Ok(())
802    }
803
804    fn collect_local_statistic(&self, _stats: &mut crate::monitor::StoreLocalStatistic) {}
805
806    fn value_meta(&self) -> ValueMeta {
807        ValueMeta::default()
808    }
809}
810
811#[cfg(test)]
812mod tests {
813    use std::ops::Bound::Excluded;
814
815    use itertools::{Itertools, zip_eq};
816    use risingwave_common::util::epoch::{EpochExt, test_epoch};
817    use risingwave_hummock_sdk::key::map_table_key_range;
818
819    use super::*;
820    use crate::hummock::compactor::merge_imms_in_memory;
821    use crate::hummock::iterator::test_utils::{
822        iterator_test_key_of_epoch, iterator_test_table_key_of, transform_shared_buffer,
823    };
824
825    fn to_hummock_value_batch(
826        items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)>,
827    ) -> Vec<(Vec<u8>, HummockValue<Bytes>)> {
828        items.into_iter().map(|(k, v)| (k, v.into())).collect()
829    }
830
831    #[tokio::test]
832    async fn test_shared_buffer_batch_basic() {
833        let epoch = test_epoch(1);
834        let shared_buffer_items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
835            (
836                iterator_test_table_key_of(0),
837                SharedBufferValue::Insert(Bytes::from("value1")),
838            ),
839            (
840                iterator_test_table_key_of(1),
841                SharedBufferValue::Insert(Bytes::from("value1")),
842            ),
843            (
844                iterator_test_table_key_of(2),
845                SharedBufferValue::Insert(Bytes::from("value1")),
846            ),
847        ];
848        let shared_buffer_batch = SharedBufferBatch::for_test(
849            transform_shared_buffer(shared_buffer_items.clone()),
850            epoch,
851            Default::default(),
852        );
853        let shared_buffer_items = to_hummock_value_batch(shared_buffer_items);
854
855        // Sketch
856        assert_eq!(
857            *shared_buffer_batch.start_table_key(),
858            shared_buffer_items[0].0
859        );
860        assert_eq!(
861            *shared_buffer_batch.end_table_key(),
862            shared_buffer_items[2].0
863        );
864
865        // Point lookup
866        for (k, v) in &shared_buffer_items {
867            assert_eq!(
868                shared_buffer_batch
869                    .get(TableKey(k.as_slice()), epoch, &ReadOptions::default())
870                    .unwrap()
871                    .0
872                    .as_slice(),
873                v.as_slice()
874            );
875        }
876        assert_eq!(
877            shared_buffer_batch.get(
878                TableKey(iterator_test_table_key_of(3).as_slice()),
879                epoch,
880                &ReadOptions::default()
881            ),
882            None
883        );
884        assert_eq!(
885            shared_buffer_batch.get(
886                TableKey(iterator_test_table_key_of(4).as_slice()),
887                epoch,
888                &ReadOptions::default()
889            ),
890            None
891        );
892
893        // Forward iterator
894        let mut iter = shared_buffer_batch.clone().into_forward_iter();
895        iter.rewind().await.unwrap();
896        let mut output = vec![];
897        while iter.is_valid() {
898            output.push((
899                iter.key().user_key.table_key.to_vec(),
900                iter.value().to_bytes(),
901            ));
902            iter.next().await.unwrap();
903        }
904        assert_eq!(output, shared_buffer_items);
905
906        // Backward iterator
907        let mut backward_iter = shared_buffer_batch.clone().into_backward_iter();
908        backward_iter.rewind().await.unwrap();
909        let mut output = vec![];
910        while backward_iter.is_valid() {
911            output.push((
912                backward_iter.key().user_key.table_key.to_vec(),
913                backward_iter.value().to_bytes(),
914            ));
915            backward_iter.next().await.unwrap();
916        }
917        output.reverse();
918        assert_eq!(output, shared_buffer_items);
919    }
920
921    #[tokio::test]
922    async fn test_shared_buffer_batch_seek() {
923        let epoch = test_epoch(1);
924        let shared_buffer_items = vec![
925            (
926                iterator_test_table_key_of(1),
927                SharedBufferValue::Insert(Bytes::from("value1")),
928            ),
929            (
930                iterator_test_table_key_of(2),
931                SharedBufferValue::Insert(Bytes::from("value2")),
932            ),
933            (
934                iterator_test_table_key_of(3),
935                SharedBufferValue::Insert(Bytes::from("value3")),
936            ),
937        ];
938        let shared_buffer_batch = SharedBufferBatch::for_test(
939            transform_shared_buffer(shared_buffer_items.clone()),
940            epoch,
941            Default::default(),
942        );
943        let shared_buffer_items = to_hummock_value_batch(shared_buffer_items);
944
945        // FORWARD: Seek to a key < 1st key, expect all three items to return
946        let mut iter = shared_buffer_batch.clone().into_forward_iter();
947        iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
948            .await
949            .unwrap();
950        for item in &shared_buffer_items {
951            assert!(iter.is_valid());
952            assert_eq!(*iter.key().user_key.table_key, item.0);
953            assert_eq!(iter.value(), item.1.as_slice());
954            iter.next().await.unwrap();
955        }
956        assert!(!iter.is_valid());
957
958        // FORWARD: Seek to a key > the last key, expect no items to return
959        let mut iter = shared_buffer_batch.clone().into_forward_iter();
960        iter.seek(iterator_test_key_of_epoch(4, epoch).to_ref())
961            .await
962            .unwrap();
963        assert!(!iter.is_valid());
964
965        // FORWARD: Seek to 2nd key with current epoch, expect last two items to return
966        let mut iter = shared_buffer_batch.clone().into_forward_iter();
967        iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
968            .await
969            .unwrap();
970        for item in &shared_buffer_items[1..] {
971            assert!(iter.is_valid());
972            assert_eq!(*iter.key().user_key.table_key, item.0);
973            assert_eq!(iter.value(), item.1.as_slice());
974            iter.next().await.unwrap();
975        }
976        assert!(!iter.is_valid());
977
978        // FORWARD: Seek to 2nd key with future epoch, expect last two items to return
979        let mut iter = shared_buffer_batch.clone().into_forward_iter();
980        iter.seek(iterator_test_key_of_epoch(2, test_epoch(2)).to_ref())
981            .await
982            .unwrap();
983        for item in &shared_buffer_items[1..] {
984            assert!(iter.is_valid());
985            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
986            assert_eq!(iter.value(), item.1.as_slice());
987            iter.next().await.unwrap();
988        }
989        assert!(!iter.is_valid());
990
991        // FORWARD: Seek to 2nd key with old epoch, expect last item to return
992        let mut iter = shared_buffer_batch.clone().into_forward_iter();
993        iter.seek(iterator_test_key_of_epoch(2, test_epoch(0)).to_ref())
994            .await
995            .unwrap();
996        let item = shared_buffer_items.last().unwrap();
997        assert!(iter.is_valid());
998        assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
999        assert_eq!(iter.value(), item.1.as_slice());
1000        iter.next().await.unwrap();
1001        assert!(!iter.is_valid());
1002
1003        // BACKWARD: Seek to a key < 1st key, expect no items to return
1004        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1005        iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1006            .await
1007            .unwrap();
1008        assert!(!iter.is_valid());
1009
1010        // BACKWARD: Seek to a key > the last key, expect all items to return
1011        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1012        iter.seek(iterator_test_key_of_epoch(4, epoch).to_ref())
1013            .await
1014            .unwrap();
1015        for item in shared_buffer_items.iter().rev() {
1016            assert!(iter.is_valid());
1017            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1018            assert_eq!(iter.value(), item.1.as_slice());
1019            iter.next().await.unwrap();
1020        }
1021        assert!(!iter.is_valid());
1022
1023        // BACKWARD: Seek to 2nd key with current epoch, expect first two items to return
1024        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1025        iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1026            .await
1027            .unwrap();
1028        for item in shared_buffer_items[0..=1].iter().rev() {
1029            assert!(iter.is_valid());
1030            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1031            assert_eq!(iter.value(), item.1.as_slice());
1032            iter.next().await.unwrap();
1033        }
1034        assert!(!iter.is_valid());
1035
1036        // BACKWARD: Seek to 2nd key with old epoch, expect first item to return
1037        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1038        iter.seek(iterator_test_key_of_epoch(2, epoch.prev_epoch()).to_ref())
1039            .await
1040            .unwrap();
1041        assert!(iter.is_valid());
1042        let item = shared_buffer_items.first().unwrap();
1043        assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1044        assert_eq!(iter.value(), item.1.as_slice());
1045        iter.next().await.unwrap();
1046        assert!(!iter.is_valid());
1047
1048        // BACKWARD: Seek to 2nd key with future epoch, expect first two item to return
1049        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1050        iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1051            .await
1052            .unwrap();
1053        for item in shared_buffer_items[0..=1].iter().rev() {
1054            assert!(iter.is_valid());
1055            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1056            assert_eq!(iter.value(), item.1.as_slice());
1057            iter.next().await.unwrap();
1058        }
1059        assert!(!iter.is_valid());
1060    }
1061
1062    #[tokio::test]
1063    async fn test_shared_buffer_batch_old_value_iter() {
1064        let epoch = test_epoch(1);
1065        let key_values = vec![
1066            (
1067                iterator_test_table_key_of(1),
1068                SharedBufferValue::Insert(Bytes::from("value1")),
1069            ),
1070            (
1071                iterator_test_table_key_of(2),
1072                SharedBufferValue::Update(Bytes::from("value2")),
1073            ),
1074            (
1075                iterator_test_table_key_of(3),
1076                SharedBufferValue::Insert(Bytes::from("value3")),
1077            ),
1078            (iterator_test_table_key_of(4), SharedBufferValue::Delete),
1079        ];
1080        let old_values = vec![
1081            Bytes::new(),
1082            Bytes::from("old_value2"),
1083            Bytes::new(),
1084            Bytes::from("old_value4"),
1085        ];
1086        let shared_buffer_batch = SharedBufferBatch::for_test_with_old_values(
1087            transform_shared_buffer(key_values.clone()),
1088            old_values.clone(),
1089            epoch,
1090            Default::default(),
1091        );
1092        let shared_buffer_items = to_hummock_value_batch(key_values.clone());
1093        let expected_old_value_iter_items = zip_eq(&key_values, &old_values)
1094            .filter(|((_, new_value), _)| !matches!(new_value, SharedBufferValue::Insert(_)))
1095            .map(|((key, _), old_value)| (key.clone(), HummockValue::Put(old_value)))
1096            .collect_vec();
1097
1098        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1099        iter.rewind().await.unwrap();
1100        for item in &expected_old_value_iter_items {
1101            assert!(iter.is_valid());
1102            assert_eq!(*iter.key().user_key.table_key, item.0);
1103            assert_eq!(iter.value(), item.1.as_slice());
1104            iter.next().await.unwrap();
1105        }
1106        assert!(!iter.is_valid());
1107
1108        // FORWARD: Seek to a key < 1st key, expect all three items to return
1109        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1110        iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1111            .await
1112            .unwrap();
1113        for item in &shared_buffer_items {
1114            assert!(iter.is_valid());
1115            assert_eq!(*iter.key().user_key.table_key, item.0);
1116            assert_eq!(iter.value(), item.1.as_slice());
1117            iter.next().await.unwrap();
1118        }
1119        assert!(!iter.is_valid());
1120
1121        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1122        iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1123            .await
1124            .unwrap();
1125        for item in &expected_old_value_iter_items {
1126            assert!(iter.is_valid());
1127            assert_eq!(*iter.key().user_key.table_key, item.0);
1128            assert_eq!(iter.value(), item.1.as_slice());
1129            iter.next().await.unwrap();
1130        }
1131        assert!(!iter.is_valid());
1132
1133        // FORWARD: Seek to a key > the last key, expect no items to return
1134        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1135        iter.seek(iterator_test_key_of_epoch(5, epoch).to_ref())
1136            .await
1137            .unwrap();
1138        assert!(!iter.is_valid());
1139
1140        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1141        iter.seek(iterator_test_key_of_epoch(5, epoch).to_ref())
1142            .await
1143            .unwrap();
1144        assert!(!iter.is_valid());
1145
1146        // FORWARD: Seek to 2nd key with current epoch, expect last two items to return
1147        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1148        iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1149            .await
1150            .unwrap();
1151        for item in &shared_buffer_items[1..] {
1152            assert!(iter.is_valid());
1153            assert_eq!(*iter.key().user_key.table_key, item.0);
1154            assert_eq!(iter.value(), item.1.as_slice());
1155            iter.next().await.unwrap();
1156        }
1157        assert!(!iter.is_valid());
1158
1159        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1160        iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1161            .await
1162            .unwrap();
1163        for item in &expected_old_value_iter_items {
1164            assert!(iter.is_valid());
1165            assert_eq!(*iter.key().user_key.table_key, item.0);
1166            assert_eq!(iter.value(), item.1.as_slice());
1167            iter.next().await.unwrap();
1168        }
1169        assert!(!iter.is_valid());
1170
1171        // FORWARD: Seek to 2nd key with future epoch, expect last two items to return
1172        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1173        iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1174            .await
1175            .unwrap();
1176        for item in &shared_buffer_items[1..] {
1177            assert!(iter.is_valid());
1178            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1179            assert_eq!(iter.value(), item.1.as_slice());
1180            iter.next().await.unwrap();
1181        }
1182        assert!(!iter.is_valid());
1183
1184        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1185        iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1186            .await
1187            .unwrap();
1188        for item in &expected_old_value_iter_items {
1189            assert!(iter.is_valid());
1190            assert_eq!(*iter.key().user_key.table_key, item.0);
1191            assert_eq!(iter.value(), item.1.as_slice());
1192            iter.next().await.unwrap();
1193        }
1194        assert!(!iter.is_valid());
1195
1196        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1197        iter.seek(iterator_test_key_of_epoch(2, epoch.prev_epoch()).to_ref())
1198            .await
1199            .unwrap();
1200        for item in &expected_old_value_iter_items[1..] {
1201            assert!(iter.is_valid());
1202            assert_eq!(*iter.key().user_key.table_key, item.0);
1203            assert_eq!(iter.value(), item.1.as_slice());
1204            iter.next().await.unwrap();
1205        }
1206        assert!(!iter.is_valid());
1207
1208        // FORWARD: Seek to 2nd key with old epoch, expect last item to return
1209        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1210        iter.seek(iterator_test_key_of_epoch(3, epoch.prev_epoch()).to_ref())
1211            .await
1212            .unwrap();
1213        let item = shared_buffer_items.last().unwrap();
1214        assert!(iter.is_valid());
1215        assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1216        assert_eq!(iter.value(), item.1.as_slice());
1217        iter.next().await.unwrap();
1218        assert!(!iter.is_valid());
1219
1220        // Seek to an insert key
1221        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1222        iter.seek(iterator_test_key_of_epoch(3, epoch).to_ref())
1223            .await
1224            .unwrap();
1225        for item in &expected_old_value_iter_items[1..] {
1226            assert!(iter.is_valid());
1227            assert_eq!(*iter.key().user_key.table_key, item.0);
1228            assert_eq!(iter.value(), item.1.as_slice());
1229            iter.next().await.unwrap();
1230        }
1231        assert!(!iter.is_valid());
1232    }
1233
1234    #[tokio::test]
1235    #[should_panic]
1236    async fn test_invalid_table_id() {
1237        let epoch = test_epoch(1);
1238        let shared_buffer_batch = SharedBufferBatch::for_test(vec![], epoch, Default::default());
1239        // Seeking to non-current epoch should panic
1240        let mut iter = shared_buffer_batch.into_forward_iter();
1241        iter.seek(FullKey::for_test(TableId::new(1), vec![], epoch).to_ref())
1242            .await
1243            .unwrap();
1244    }
1245
1246    #[tokio::test]
1247    async fn test_shared_buffer_batch_range_existx() {
1248        let epoch = test_epoch(1);
1249        let shared_buffer_items = vec![
1250            (
1251                Vec::from("a_1"),
1252                SharedBufferValue::Insert(Bytes::from("value1")),
1253            ),
1254            (
1255                Vec::from("a_3"),
1256                SharedBufferValue::Insert(Bytes::from("value2")),
1257            ),
1258            (
1259                Vec::from("a_5"),
1260                SharedBufferValue::Insert(Bytes::from("value3")),
1261            ),
1262            (
1263                Vec::from("b_2"),
1264                SharedBufferValue::Insert(Bytes::from("value3")),
1265            ),
1266        ];
1267        let shared_buffer_batch = SharedBufferBatch::for_test(
1268            transform_shared_buffer(shared_buffer_items),
1269            epoch,
1270            Default::default(),
1271        );
1272
1273        let range = (Included(Bytes::from("a")), Excluded(Bytes::from("b")));
1274        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1275        let range = (Included(Bytes::from("a_")), Excluded(Bytes::from("b_")));
1276        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1277        let range = (Included(Bytes::from("a_1")), Included(Bytes::from("a_1")));
1278        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1279        let range = (Included(Bytes::from("a_1")), Included(Bytes::from("a_2")));
1280        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1281        let range = (Included(Bytes::from("a_0x")), Included(Bytes::from("a_2x")));
1282        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1283        let range = (Included(Bytes::from("a_")), Excluded(Bytes::from("c_")));
1284        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1285        let range = (Included(Bytes::from("b_0x")), Included(Bytes::from("b_2x")));
1286        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1287        let range = (Included(Bytes::from("b_2")), Excluded(Bytes::from("c_1x")));
1288        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1289
1290        let range = (Included(Bytes::from("a_0")), Excluded(Bytes::from("a_1")));
1291        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1292        let range = (Included(Bytes::from("a__0")), Excluded(Bytes::from("a__5")));
1293        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1294        let range = (Included(Bytes::from("b_1")), Excluded(Bytes::from("b_2")));
1295        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1296        let range = (Included(Bytes::from("b_3")), Excluded(Bytes::from("c_1")));
1297        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1298        let range = (Included(Bytes::from("b__x")), Excluded(Bytes::from("c__x")));
1299        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1300    }
1301
1302    #[tokio::test]
1303    async fn test_merge_imms_basic() {
1304        let table_id = TableId { table_id: 1004 };
1305        let shared_buffer_items1: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1306            (
1307                iterator_test_table_key_of(1),
1308                SharedBufferValue::Insert(Bytes::from("value1")),
1309            ),
1310            (
1311                iterator_test_table_key_of(2),
1312                SharedBufferValue::Insert(Bytes::from("value2")),
1313            ),
1314            (
1315                iterator_test_table_key_of(3),
1316                SharedBufferValue::Insert(Bytes::from("value3")),
1317            ),
1318        ];
1319        let epoch = test_epoch(1);
1320        let imm1 = SharedBufferBatch::for_test(
1321            transform_shared_buffer(shared_buffer_items1.clone()),
1322            epoch,
1323            table_id,
1324        );
1325        let shared_buffer_items1 = to_hummock_value_batch(shared_buffer_items1);
1326        let shared_buffer_items2: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1327            (
1328                iterator_test_table_key_of(1),
1329                SharedBufferValue::Insert(Bytes::from("value12")),
1330            ),
1331            (
1332                iterator_test_table_key_of(2),
1333                SharedBufferValue::Insert(Bytes::from("value22")),
1334            ),
1335            (
1336                iterator_test_table_key_of(3),
1337                SharedBufferValue::Insert(Bytes::from("value32")),
1338            ),
1339        ];
1340        let epoch = test_epoch(2);
1341        let imm2 = SharedBufferBatch::for_test(
1342            transform_shared_buffer(shared_buffer_items2.clone()),
1343            epoch,
1344            table_id,
1345        );
1346        let shared_buffer_items2 = to_hummock_value_batch(shared_buffer_items2);
1347
1348        let shared_buffer_items3: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1349            (
1350                iterator_test_table_key_of(1),
1351                SharedBufferValue::Insert(Bytes::from("value13")),
1352            ),
1353            (
1354                iterator_test_table_key_of(2),
1355                SharedBufferValue::Insert(Bytes::from("value23")),
1356            ),
1357            (
1358                iterator_test_table_key_of(3),
1359                SharedBufferValue::Insert(Bytes::from("value33")),
1360            ),
1361        ];
1362        let epoch = test_epoch(3);
1363        let imm3 = SharedBufferBatch::for_test(
1364            transform_shared_buffer(shared_buffer_items3.clone()),
1365            epoch,
1366            table_id,
1367        );
1368        let shared_buffer_items3 = to_hummock_value_batch(shared_buffer_items3);
1369
1370        let batch_items = [
1371            shared_buffer_items1,
1372            shared_buffer_items2,
1373            shared_buffer_items3,
1374        ];
1375        // newer data comes first
1376        let imms = vec![imm3, imm2, imm1];
1377        let merged_imm = merge_imms_in_memory(table_id, imms.clone(), None).await;
1378
1379        // Point lookup
1380        for (i, items) in batch_items.iter().enumerate() {
1381            for (key, value) in items {
1382                assert_eq!(
1383                    merged_imm
1384                        .get(
1385                            TableKey(key.as_slice()),
1386                            test_epoch(i as u64 + 1),
1387                            &ReadOptions::default()
1388                        )
1389                        .unwrap()
1390                        .0
1391                        .as_slice(),
1392                    value.as_slice(),
1393                    "epoch: {}, key: {:?}",
1394                    test_epoch(i as u64 + 1),
1395                    String::from_utf8(key.clone())
1396                );
1397            }
1398        }
1399        assert_eq!(
1400            merged_imm.get(
1401                TableKey(iterator_test_table_key_of(4).as_slice()),
1402                test_epoch(1),
1403                &ReadOptions::default()
1404            ),
1405            None
1406        );
1407        assert_eq!(
1408            merged_imm.get(
1409                TableKey(iterator_test_table_key_of(5).as_slice()),
1410                test_epoch(1),
1411                &ReadOptions::default()
1412            ),
1413            None
1414        );
1415
1416        // Forward iterator
1417        for snapshot_epoch in 1..=3 {
1418            let mut iter = merged_imm.clone().into_forward_iter();
1419            iter.rewind().await.unwrap();
1420            let mut output = vec![];
1421            while iter.is_valid() {
1422                let epoch = iter.key().epoch_with_gap.pure_epoch();
1423                if test_epoch(snapshot_epoch) == epoch {
1424                    output.push((
1425                        iter.key().user_key.table_key.to_vec(),
1426                        iter.value().to_bytes(),
1427                    ));
1428                }
1429                iter.next().await.unwrap();
1430            }
1431            assert_eq!(output, batch_items[snapshot_epoch as usize - 1]);
1432        }
1433
1434        // Forward and Backward iterator
1435        {
1436            let mut iter = merged_imm.clone().into_forward_iter();
1437            iter.rewind().await.unwrap();
1438            let mut output = vec![];
1439            while iter.is_valid() {
1440                output.push((
1441                    iter.key().user_key.table_key.to_vec(),
1442                    iter.value().to_bytes(),
1443                ));
1444                iter.next().await.unwrap();
1445            }
1446
1447            let mut expected = vec![];
1448            for key_idx in 0..=2 {
1449                for epoch in (1..=3).rev() {
1450                    let item = batch_items[epoch - 1][key_idx].clone();
1451                    expected.push(item);
1452                }
1453            }
1454            assert_eq!(expected, output);
1455
1456            let mut backward_iter = merged_imm.clone().into_backward_iter();
1457            backward_iter.rewind().await.unwrap();
1458            let mut output = vec![];
1459            while backward_iter.is_valid() {
1460                output.push((
1461                    backward_iter.key().user_key.table_key.to_vec(),
1462                    backward_iter.value().to_bytes(),
1463                ));
1464                backward_iter.next().await.unwrap();
1465            }
1466            let mut expected = vec![];
1467            for key_idx in (0..=2).rev() {
1468                for epoch in (1..=3).rev() {
1469                    let item = batch_items[epoch - 1][key_idx].clone();
1470                    expected.push(item);
1471                }
1472            }
1473            assert_eq!(expected, output);
1474        }
1475    }
1476
1477    #[tokio::test]
1478    async fn test_merge_imms_with_old_values() {
1479        let table_id = TableId { table_id: 1004 };
1480        let key_value1: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1481            (
1482                iterator_test_table_key_of(1),
1483                SharedBufferValue::Insert(Bytes::from("value1")),
1484            ),
1485            (
1486                iterator_test_table_key_of(2),
1487                SharedBufferValue::Update(Bytes::from("value2")),
1488            ),
1489            (iterator_test_table_key_of(3), SharedBufferValue::Delete),
1490        ];
1491        let old_value1 = vec![
1492            Bytes::new(),
1493            Bytes::from("old_value2"),
1494            Bytes::from("old_value3"),
1495        ];
1496        let epoch = test_epoch(1);
1497        let imm1 = SharedBufferBatch::for_test_with_old_values(
1498            transform_shared_buffer(key_value1.clone()),
1499            old_value1.clone(),
1500            epoch,
1501            table_id,
1502        );
1503        let shared_buffer_items1 = to_hummock_value_batch(key_value1.clone());
1504        let key_value2: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1505            (
1506                iterator_test_table_key_of(1),
1507                SharedBufferValue::Update(Bytes::from("value12")),
1508            ),
1509            (
1510                iterator_test_table_key_of(2),
1511                SharedBufferValue::Update(Bytes::from("value22")),
1512            ),
1513            (
1514                iterator_test_table_key_of(3),
1515                SharedBufferValue::Insert(Bytes::from("value32")),
1516            ),
1517        ];
1518        let old_value2 = vec![Bytes::from("value1"), Bytes::from("value2"), Bytes::new()];
1519        let epoch = epoch.next_epoch();
1520        let imm2 = SharedBufferBatch::for_test_with_old_values(
1521            transform_shared_buffer(key_value2.clone()),
1522            old_value2.clone(),
1523            epoch,
1524            table_id,
1525        );
1526        let shared_buffer_items2 = to_hummock_value_batch(key_value2.clone());
1527
1528        let key_value3: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1529            (iterator_test_table_key_of(1), SharedBufferValue::Delete),
1530            (iterator_test_table_key_of(2), SharedBufferValue::Delete),
1531            (
1532                iterator_test_table_key_of(3),
1533                SharedBufferValue::Update(Bytes::from("value33")),
1534            ),
1535        ];
1536        let old_value3 = vec![
1537            Bytes::from("value12"),
1538            Bytes::from("value22"),
1539            Bytes::from("value32"),
1540        ];
1541        let epoch = epoch.next_epoch();
1542        let imm3 = SharedBufferBatch::for_test_with_old_values(
1543            transform_shared_buffer(key_value3.clone()),
1544            old_value3.clone(),
1545            epoch,
1546            table_id,
1547        );
1548        let shared_buffer_items3 = to_hummock_value_batch(key_value3.clone());
1549
1550        let key_values = [
1551            (key_value1, old_value1),
1552            (key_value2, old_value2),
1553            (key_value3, old_value3),
1554        ];
1555
1556        let batch_items = [
1557            shared_buffer_items1,
1558            shared_buffer_items2,
1559            shared_buffer_items3,
1560        ];
1561        // newer data comes first
1562        let imms = vec![imm3, imm2, imm1];
1563        let merged_imm = merge_imms_in_memory(table_id, imms.clone(), None).await;
1564
1565        // Point lookup
1566        for (i, items) in batch_items.iter().enumerate() {
1567            for (key, value) in items {
1568                assert_eq!(
1569                    merged_imm
1570                        .get(
1571                            TableKey(key.as_slice()),
1572                            test_epoch(i as u64 + 1),
1573                            &ReadOptions::default()
1574                        )
1575                        .unwrap()
1576                        .0
1577                        .as_slice(),
1578                    value.as_slice(),
1579                    "epoch: {}, key: {:?}",
1580                    test_epoch(i as u64 + 1),
1581                    String::from_utf8(key.clone())
1582                );
1583            }
1584        }
1585        assert_eq!(
1586            merged_imm.get(
1587                TableKey(iterator_test_table_key_of(4).as_slice()),
1588                test_epoch(1),
1589                &ReadOptions::default()
1590            ),
1591            None
1592        );
1593        assert_eq!(
1594            merged_imm.get(
1595                TableKey(iterator_test_table_key_of(5).as_slice()),
1596                test_epoch(1),
1597                &ReadOptions::default()
1598            ),
1599            None
1600        );
1601
1602        // Forward i
1603        for i in 1..=3 {
1604            let snapshot_epoch = test_epoch(i);
1605            let mut iter = merged_imm.clone().into_forward_iter();
1606            iter.rewind().await.unwrap();
1607            let mut output = vec![];
1608            while iter.is_valid() {
1609                let epoch = iter.key().epoch_with_gap.pure_epoch();
1610                if snapshot_epoch == epoch {
1611                    output.push((
1612                        iter.key().user_key.table_key.to_vec(),
1613                        iter.value().to_bytes(),
1614                    ));
1615                }
1616                iter.next().await.unwrap();
1617            }
1618            assert_eq!(output, batch_items[i as usize - 1]);
1619        }
1620
1621        // Forward and Backward iterator
1622        {
1623            let mut iter = merged_imm.clone().into_forward_iter();
1624            iter.rewind().await.unwrap();
1625            let mut output = vec![];
1626            while iter.is_valid() {
1627                output.push((
1628                    iter.key().user_key.table_key.to_vec(),
1629                    iter.value().to_bytes(),
1630                ));
1631                iter.next().await.unwrap();
1632            }
1633
1634            let mut expected = vec![];
1635            for key_idx in 0..=2 {
1636                for epoch in (1..=3).rev() {
1637                    let item = batch_items[epoch - 1][key_idx].clone();
1638                    expected.push(item);
1639                }
1640            }
1641            assert_eq!(expected, output);
1642
1643            let mut backward_iter = merged_imm.clone().into_backward_iter();
1644            backward_iter.rewind().await.unwrap();
1645            let mut output = vec![];
1646            while backward_iter.is_valid() {
1647                output.push((
1648                    backward_iter.key().user_key.table_key.to_vec(),
1649                    backward_iter.value().to_bytes(),
1650                ));
1651                backward_iter.next().await.unwrap();
1652            }
1653            let mut expected = vec![];
1654            for key_idx in (0..=2).rev() {
1655                for epoch in (1..=3).rev() {
1656                    let item = batch_items[epoch - 1][key_idx].clone();
1657                    expected.push(item);
1658                }
1659            }
1660            assert_eq!(expected, output);
1661        }
1662
1663        // old value iter
1664        {
1665            let mut iter = merged_imm.clone().into_old_value_iter();
1666            iter.rewind().await.unwrap();
1667            let mut output = vec![];
1668            while iter.is_valid() {
1669                output.push((
1670                    iter.key().user_key.table_key.to_vec(),
1671                    iter.value().to_bytes(),
1672                ));
1673                iter.next().await.unwrap();
1674            }
1675
1676            let mut expected = vec![];
1677            for key_idx in 0..=2 {
1678                for epoch in (0..=2).rev() {
1679                    let (key_values, old_values) = &key_values[epoch];
1680                    let (key, new_value) = &key_values[key_idx];
1681                    let old_value = &old_values[key_idx];
1682                    if matches!(new_value, SharedBufferValue::Insert(_)) {
1683                        continue;
1684                    }
1685                    expected.push((key.clone(), HummockValue::Put(old_value.clone())));
1686                }
1687            }
1688            assert_eq!(expected, output);
1689        }
1690    }
1691}