risingwave_storage/hummock/shared_buffer/
shared_buffer_batch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::mem::size_of_val;
19use std::ops::Bound::Included;
20use std::ops::{Bound, RangeBounds};
21use std::sync::atomic::AtomicU64;
22use std::sync::atomic::Ordering::Relaxed;
23use std::sync::{Arc, LazyLock};
24
25use bytes::Bytes;
26use prometheus::IntGauge;
27use risingwave_common::catalog::TableId;
28use risingwave_hummock_sdk::EpochWithGap;
29use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange, UserKey};
30
31use crate::hummock::iterator::{
32    Backward, DirectionEnum, Forward, HummockIterator, HummockIteratorDirection, ValueMeta,
33};
34use crate::hummock::utils::{MemoryTracker, range_overlap};
35use crate::hummock::value::HummockValue;
36use crate::hummock::{HummockEpoch, HummockResult};
37use crate::mem_table::ImmId;
38use crate::store::ReadOptions;
39
40#[derive(Clone, Copy, Debug, PartialEq)]
41pub enum SharedBufferValue<T> {
42    Insert(T),
43    Update(T),
44    Delete,
45}
46
47impl<T> SharedBufferValue<T> {
48    fn to_ref(&self) -> SharedBufferValue<&T> {
49        match self {
50            SharedBufferValue::Insert(val) => SharedBufferValue::Insert(val),
51            SharedBufferValue::Update(val) => SharedBufferValue::Update(val),
52            SharedBufferValue::Delete => SharedBufferValue::Delete,
53        }
54    }
55}
56
57impl<T> From<SharedBufferValue<T>> for HummockValue<T> {
58    fn from(val: SharedBufferValue<T>) -> HummockValue<T> {
59        match val {
60            SharedBufferValue::Insert(val) | SharedBufferValue::Update(val) => {
61                HummockValue::Put(val)
62            }
63            SharedBufferValue::Delete => HummockValue::Delete,
64        }
65    }
66}
67
68impl<'a, T: AsRef<[u8]>> SharedBufferValue<&'a T> {
69    pub(crate) fn to_slice(self) -> SharedBufferValue<&'a [u8]> {
70        match self {
71            SharedBufferValue::Insert(val) => SharedBufferValue::Insert(val.as_ref()),
72            SharedBufferValue::Update(val) => SharedBufferValue::Update(val.as_ref()),
73            SharedBufferValue::Delete => SharedBufferValue::Delete,
74        }
75    }
76}
77
78/// The key is `table_key`, which does not contain table id or epoch.
79pub(crate) type SharedBufferItem = (TableKey<Bytes>, SharedBufferValue<Bytes>);
80pub type SharedBufferBatchId = u64;
81
82pub(crate) type VersionedSharedBufferValue = (EpochWithGap, SharedBufferValue<Bytes>);
83
84pub(crate) struct SharedBufferVersionedEntryRef<'a> {
85    pub(crate) key: &'a TableKey<Bytes>,
86    pub(crate) new_values: &'a [VersionedSharedBufferValue],
87    pub(crate) old_values: Option<&'a [Bytes]>,
88}
89
90#[derive(PartialEq, Debug)]
91pub(crate) struct SharedBufferKeyEntry {
92    pub(crate) key: TableKey<Bytes>,
93    /// A shared buffer may contain data from multiple epochs for a specific key.
94    /// The values of all keys are stored together in the field `new_values` of `SharedBufferBatchInner`
95    /// as a single vector. `value_offset` is the starting offset of values of the current `key` in the `new_values` vector.
96    /// The end offset is the `value_offset` of the next entry or the vector end if the current entry is not the last one.
97    pub(crate) value_offset: usize,
98}
99
100impl SharedBufferKeyEntry {
101    /// Return an exclusive offset of the values of key of index `i`
102    fn value_end_offset<'a, T>(
103        i: usize,
104        entries: &'a [SharedBufferKeyEntry],
105        values: &'a [T],
106    ) -> usize {
107        entries
108            .get(i + 1)
109            .map(|entry| entry.value_offset)
110            .unwrap_or(values.len())
111    }
112
113    fn values<'a, T>(i: usize, entries: &'a [SharedBufferKeyEntry], values: &'a [T]) -> &'a [T] {
114        &values[entries[i].value_offset..Self::value_end_offset(i, entries, values)]
115    }
116}
117
118#[derive(Debug)]
119pub(crate) struct SharedBufferBatchOldValues {
120    /// Store the old values. If some, the length should be the same as `new_values`. It contains empty `Bytes` when the
121    /// corresponding `new_value` is `Insert`, and contains the old values of `Update` and `Delete`.
122    values: Vec<Bytes>,
123    pub size: usize,
124    pub global_old_value_size: IntGauge,
125}
126
127impl Drop for SharedBufferBatchOldValues {
128    fn drop(&mut self) {
129        self.global_old_value_size.sub(self.size as _);
130    }
131}
132
133impl SharedBufferBatchOldValues {
134    pub(crate) fn new(values: Vec<Bytes>, size: usize, global_old_value_size: IntGauge) -> Self {
135        global_old_value_size.add(size as _);
136        Self {
137            values,
138            size,
139            global_old_value_size,
140        }
141    }
142
143    pub(crate) fn for_test(values: Vec<Bytes>, size: usize) -> Self {
144        Self::new(values, size, IntGauge::new("test", "test").unwrap())
145    }
146}
147
148#[derive(Debug)]
149pub(crate) struct SharedBufferBatchInner {
150    entries: Vec<SharedBufferKeyEntry>,
151    new_values: Vec<VersionedSharedBufferValue>,
152    old_values: Option<SharedBufferBatchOldValues>,
153    /// The epochs of the data in batch, sorted in ascending order (old to new)
154    epochs: Vec<HummockEpoch>,
155    /// Total size of all key-value items (excluding the `epoch` of value versions)
156    size: usize,
157    _tracker: Option<MemoryTracker>,
158    /// For a batch created from multiple batches, this will be
159    /// the largest batch id among input batches
160    batch_id: SharedBufferBatchId,
161}
162
163impl SharedBufferBatchInner {
164    pub(crate) fn new(
165        epoch: HummockEpoch,
166        spill_offset: u16,
167        payload: Vec<SharedBufferItem>,
168        old_values: Option<SharedBufferBatchOldValues>,
169        size: usize,
170        _tracker: Option<MemoryTracker>,
171    ) -> Self {
172        assert!(!payload.is_empty());
173        debug_assert!(payload.iter().is_sorted_by_key(|(key, _)| key));
174        if let Some(old_values) = &old_values {
175            assert_eq!(old_values.values.len(), payload.len());
176        }
177
178        let epoch_with_gap = EpochWithGap::new(epoch, spill_offset);
179        let mut entries = Vec::with_capacity(payload.len());
180        let mut new_values = Vec::with_capacity(payload.len());
181        for (i, (key, value)) in payload.into_iter().enumerate() {
182            entries.push(SharedBufferKeyEntry {
183                key,
184                value_offset: i,
185            });
186            new_values.push((epoch_with_gap, value));
187        }
188
189        let batch_id = SHARED_BUFFER_BATCH_ID_GENERATOR.fetch_add(1, Relaxed);
190        SharedBufferBatchInner {
191            entries,
192            new_values,
193            old_values,
194            epochs: vec![epoch],
195            size,
196            _tracker,
197            batch_id,
198        }
199    }
200
201    pub fn values(&self, i: usize) -> &[VersionedSharedBufferValue] {
202        SharedBufferKeyEntry::values(i, &self.entries, &self.new_values)
203    }
204
205    #[allow(clippy::too_many_arguments)]
206    pub(crate) fn new_with_multi_epoch_batches(
207        epochs: Vec<HummockEpoch>,
208        entries: Vec<SharedBufferKeyEntry>,
209        new_values: Vec<VersionedSharedBufferValue>,
210        old_values: Option<SharedBufferBatchOldValues>,
211        size: usize,
212        imm_id: ImmId,
213        tracker: Option<MemoryTracker>,
214    ) -> Self {
215        assert!(new_values.len() >= entries.len());
216        assert!(!entries.is_empty());
217        debug_assert!(entries.iter().is_sorted_by_key(|entry| &entry.key));
218        debug_assert!(entries.iter().is_sorted_by_key(|entry| &entry.value_offset));
219        debug_assert!((0..entries.len()).all(|i| {
220            SharedBufferKeyEntry::values(i, &entries, &new_values)
221                .iter()
222                .rev()
223                .is_sorted_by_key(|(epoch_with_gap, _)| epoch_with_gap)
224        }));
225        debug_assert!(!epochs.is_empty());
226        debug_assert!(epochs.is_sorted());
227
228        Self {
229            entries,
230            new_values,
231            old_values,
232            epochs,
233            size,
234            _tracker: tracker,
235            batch_id: imm_id,
236        }
237    }
238
239    /// Return `None` if cannot find a visible version
240    /// Return `HummockValue::Delete` if the key has been deleted by some epoch <= `read_epoch`
241    fn get_value(
242        &self,
243        table_key: TableKey<&[u8]>,
244        read_epoch: HummockEpoch,
245    ) -> Option<(HummockValue<Bytes>, EpochWithGap)> {
246        // Perform binary search on table key to find the corresponding entry
247        if let Ok(i) = self
248            .entries
249            .binary_search_by(|m| (m.key.as_ref()).cmp(*table_key))
250        {
251            let entry = &self.entries[i];
252            assert_eq!(entry.key.as_ref(), *table_key);
253            // Scan to find the first version <= epoch
254            for (e, v) in self.values(i) {
255                // skip invisible versions
256                if read_epoch < e.pure_epoch() {
257                    continue;
258                }
259                return Some((v.clone().into(), *e));
260            }
261            // cannot find a visible version
262        }
263
264        None
265    }
266}
267
268impl PartialEq for SharedBufferBatchInner {
269    fn eq(&self, other: &Self) -> bool {
270        self.entries == other.entries && self.new_values == other.new_values
271    }
272}
273
274pub static SHARED_BUFFER_BATCH_ID_GENERATOR: LazyLock<AtomicU64> =
275    LazyLock::new(|| AtomicU64::new(0));
276
277/// A write batch stored in the shared buffer.
278#[derive(Clone, Debug)]
279pub struct SharedBufferBatch {
280    pub(crate) inner: Arc<SharedBufferBatchInner>,
281    pub table_id: TableId,
282}
283
284impl SharedBufferBatch {
285    pub fn for_test(
286        sorted_items: Vec<SharedBufferItem>,
287        epoch: HummockEpoch,
288        table_id: TableId,
289    ) -> Self {
290        Self::for_test_inner(sorted_items, None, epoch, table_id)
291    }
292
293    pub fn for_test_with_old_values(
294        sorted_items: Vec<SharedBufferItem>,
295        old_values: Vec<Bytes>,
296        epoch: HummockEpoch,
297        table_id: TableId,
298    ) -> Self {
299        Self::for_test_inner(sorted_items, Some(old_values), epoch, table_id)
300    }
301
302    fn for_test_inner(
303        sorted_items: Vec<SharedBufferItem>,
304        old_values: Option<Vec<Bytes>>,
305        epoch: HummockEpoch,
306        table_id: TableId,
307    ) -> Self {
308        let (size, old_value_size) = Self::measure_batch_size(&sorted_items, old_values.as_deref());
309
310        let old_values = old_values
311            .map(|old_values| SharedBufferBatchOldValues::for_test(old_values, old_value_size));
312
313        Self {
314            inner: Arc::new(SharedBufferBatchInner::new(
315                epoch,
316                0,
317                sorted_items,
318                old_values,
319                size,
320                None,
321            )),
322            table_id,
323        }
324    }
325
326    pub fn measure_delete_range_size(batch_items: &[(Bound<Bytes>, Bound<Bytes>)]) -> usize {
327        batch_items
328            .iter()
329            .map(|(left, right)| {
330                // is_exclude_left_key(bool) + table_id + epoch
331                let l1 = match left {
332                    Bound::Excluded(x) | Bound::Included(x) => x.len() + 13,
333                    Bound::Unbounded => 13,
334                };
335                let l2 = match right {
336                    Bound::Excluded(x) | Bound::Included(x) => x.len() + 13,
337                    Bound::Unbounded => 13,
338                };
339                l1 + l2
340            })
341            .sum()
342    }
343
344    /// Return (total size, old value size or 0)
345    pub fn measure_batch_size(
346        batch_items: &[SharedBufferItem],
347        old_values: Option<&[Bytes]>,
348    ) -> (usize, usize) {
349        let old_value_size = old_values
350            .iter()
351            .flat_map(|slice| slice.iter().map(|value| size_of_val(value) + value.len()))
352            .sum::<usize>();
353        // size = Sum(length of full key + length of user value)
354        let kv_size = batch_items
355            .iter()
356            .map(|(k, v)| {
357                k.len() + {
358                    match v {
359                        SharedBufferValue::Insert(val) | SharedBufferValue::Update(val) => {
360                            val.len()
361                        }
362                        SharedBufferValue::Delete => 0,
363                    }
364                }
365            })
366            .sum::<usize>();
367        (kv_size + old_value_size, old_value_size)
368    }
369
370    pub fn filter<R, B>(&self, table_id: TableId, table_key_range: &R) -> bool
371    where
372        R: RangeBounds<TableKey<B>>,
373        B: AsRef<[u8]>,
374    {
375        let left = table_key_range
376            .start_bound()
377            .as_ref()
378            .map(|key| TableKey(key.0.as_ref()));
379        let right = table_key_range
380            .end_bound()
381            .as_ref()
382            .map(|key| TableKey(key.0.as_ref()));
383        self.table_id == table_id
384            && range_overlap(
385                &(left, right),
386                &self.start_table_key(),
387                Included(&self.end_table_key()),
388            )
389    }
390
391    pub fn table_id(&self) -> TableId {
392        self.table_id
393    }
394
395    pub fn min_epoch(&self) -> HummockEpoch {
396        *self.inner.epochs.first().unwrap()
397    }
398
399    pub fn max_epoch(&self) -> HummockEpoch {
400        *self.inner.epochs.last().unwrap()
401    }
402
403    pub fn key_count(&self) -> usize {
404        self.inner.entries.len()
405    }
406
407    pub fn value_count(&self) -> usize {
408        self.inner.new_values.len()
409    }
410
411    pub fn has_old_value(&self) -> bool {
412        self.inner.old_values.is_some()
413    }
414
415    pub fn get(
416        &self,
417        table_key: TableKey<&[u8]>,
418        read_epoch: HummockEpoch,
419        _read_options: &ReadOptions,
420    ) -> Option<(HummockValue<Bytes>, EpochWithGap)> {
421        self.inner.get_value(table_key, read_epoch)
422    }
423
424    pub fn range_exists(&self, table_key_range: &TableKeyRange) -> bool {
425        self.inner
426            .entries
427            .binary_search_by(|m| {
428                let key = &m.key;
429                let too_left = match &table_key_range.0 {
430                    std::ops::Bound::Included(range_start) => range_start.as_ref() > key.as_ref(),
431                    std::ops::Bound::Excluded(range_start) => range_start.as_ref() >= key.as_ref(),
432                    std::ops::Bound::Unbounded => false,
433                };
434                if too_left {
435                    return Ordering::Less;
436                }
437
438                let too_right = match &table_key_range.1 {
439                    std::ops::Bound::Included(range_end) => range_end.as_ref() < key.as_ref(),
440                    std::ops::Bound::Excluded(range_end) => range_end.as_ref() <= key.as_ref(),
441                    std::ops::Bound::Unbounded => false,
442                };
443                if too_right {
444                    return Ordering::Greater;
445                }
446
447                Ordering::Equal
448            })
449            .is_ok()
450    }
451
452    pub fn into_directed_iter<D: HummockIteratorDirection, const IS_NEW_VALUE: bool>(
453        self,
454    ) -> SharedBufferBatchIterator<D, IS_NEW_VALUE> {
455        SharedBufferBatchIterator::<D, IS_NEW_VALUE>::new(self.inner, self.table_id)
456    }
457
458    pub fn into_old_value_iter(self) -> SharedBufferBatchIterator<Forward, false> {
459        self.into_directed_iter()
460    }
461
462    pub fn into_forward_iter(self) -> SharedBufferBatchIterator<Forward> {
463        self.into_directed_iter()
464    }
465
466    pub fn into_backward_iter(self) -> SharedBufferBatchIterator<Backward> {
467        self.into_directed_iter()
468    }
469
470    #[inline(always)]
471    pub fn start_table_key(&self) -> TableKey<&[u8]> {
472        TableKey(self.inner.entries.first().expect("non-empty").key.as_ref())
473    }
474
475    #[inline(always)]
476    pub fn end_table_key(&self) -> TableKey<&[u8]> {
477        TableKey(self.inner.entries.last().expect("non-empty").key.as_ref())
478    }
479
480    #[inline(always)]
481    pub fn raw_largest_key(&self) -> &TableKey<Bytes> {
482        &self.inner.entries.last().expect("non-empty").key
483    }
484
485    /// return inclusive left endpoint, which means that all data in this batch should be larger or
486    /// equal than this key.
487    pub fn start_user_key(&self) -> UserKey<&[u8]> {
488        UserKey::new(self.table_id, self.start_table_key())
489    }
490
491    pub fn size(&self) -> usize {
492        self.inner.size
493    }
494
495    pub(crate) fn old_values(&self) -> Option<&SharedBufferBatchOldValues> {
496        self.inner.old_values.as_ref()
497    }
498
499    pub fn batch_id(&self) -> SharedBufferBatchId {
500        self.inner.batch_id
501    }
502
503    pub fn epochs(&self) -> &Vec<HummockEpoch> {
504        &self.inner.epochs
505    }
506
507    pub(crate) fn build_shared_buffer_batch(
508        epoch: HummockEpoch,
509        spill_offset: u16,
510        sorted_items: Vec<SharedBufferItem>,
511        old_values: Option<SharedBufferBatchOldValues>,
512        size: usize,
513        table_id: TableId,
514        tracker: Option<MemoryTracker>,
515    ) -> Self {
516        let inner = SharedBufferBatchInner::new(
517            epoch,
518            spill_offset,
519            sorted_items,
520            old_values,
521            size,
522            tracker,
523        );
524        SharedBufferBatch {
525            inner: Arc::new(inner),
526            table_id,
527        }
528    }
529
530    #[cfg(any(test, feature = "test"))]
531    pub fn build_shared_buffer_batch_for_test(
532        epoch: HummockEpoch,
533        spill_offset: u16,
534        sorted_items: Vec<SharedBufferItem>,
535        size: usize,
536        table_id: TableId,
537    ) -> Self {
538        let inner =
539            SharedBufferBatchInner::new(epoch, spill_offset, sorted_items, None, size, None);
540        SharedBufferBatch {
541            inner: Arc::new(inner),
542            table_id,
543        }
544    }
545}
546
547/// Iterate all the items in the shared buffer batch
548/// If there are multiple versions of a key, the iterator will return all versions
549pub struct SharedBufferBatchIterator<D: HummockIteratorDirection, const IS_NEW_VALUE: bool = true> {
550    inner: Arc<SharedBufferBatchInner>,
551    /// The index of the current entry in the payload
552    current_entry_idx: usize,
553    /// The index of current value
554    current_value_idx: usize,
555    /// The exclusive end offset of the value index of current key.
556    value_end_offset: usize,
557    table_id: TableId,
558    _phantom: PhantomData<D>,
559}
560
561impl<D: HummockIteratorDirection, const IS_NEW_VALUE: bool>
562    SharedBufferBatchIterator<D, IS_NEW_VALUE>
563{
564    pub(crate) fn new(inner: Arc<SharedBufferBatchInner>, table_id: TableId) -> Self {
565        if !IS_NEW_VALUE {
566            assert!(
567                inner.old_values.is_some(),
568                "create old value iter with no old value: {:?}",
569                table_id
570            );
571        }
572        Self {
573            inner,
574            current_entry_idx: 0,
575            current_value_idx: 0,
576            value_end_offset: 0,
577            table_id,
578            _phantom: Default::default(),
579        }
580    }
581
582    fn is_valid_entry_idx(&self) -> bool {
583        self.current_entry_idx < self.inner.entries.len()
584    }
585
586    fn advance_to_next_entry(&mut self) {
587        debug_assert!(self.is_valid_entry_idx());
588        match D::direction() {
589            DirectionEnum::Forward => {
590                self.current_entry_idx += 1;
591            }
592            DirectionEnum::Backward => {
593                if self.current_entry_idx == 0 {
594                    self.current_entry_idx = self.inner.entries.len();
595                } else {
596                    self.current_entry_idx -= 1;
597                }
598            }
599        }
600    }
601
602    fn reset_value_idx(&mut self) {
603        debug_assert!(self.is_valid_entry_idx());
604        self.current_value_idx = self.inner.entries[self.current_entry_idx].value_offset;
605        self.value_end_offset = self.get_value_end_offset();
606    }
607
608    fn get_value_end_offset(&self) -> usize {
609        debug_assert!(self.is_valid_entry_idx());
610        SharedBufferKeyEntry::value_end_offset(
611            self.current_entry_idx,
612            &self.inner.entries,
613            &self.inner.new_values,
614        )
615    }
616
617    fn assert_valid_idx(&self) {
618        debug_assert!(self.is_valid_entry_idx());
619        debug_assert!(
620            self.current_value_idx >= self.inner.entries[self.current_entry_idx].value_offset
621        );
622        debug_assert_eq!(self.value_end_offset, self.get_value_end_offset());
623        debug_assert!(self.current_value_idx < self.value_end_offset);
624        if !IS_NEW_VALUE {
625            debug_assert!(!matches!(
626                &self.inner.new_values[self.current_value_idx].1,
627                SharedBufferValue::Insert(_)
628            ));
629        }
630    }
631
632    fn advance_to_next_value(&mut self) {
633        self.assert_valid_idx();
634
635        if self.current_value_idx + 1 < self.value_end_offset {
636            self.current_value_idx += 1;
637        } else {
638            self.advance_to_next_entry();
639            if self.is_valid_entry_idx() {
640                self.reset_value_idx();
641            }
642        }
643    }
644
645    fn advance_until_valid_old_value(&mut self) {
646        debug_assert!(!IS_NEW_VALUE);
647        if !self.is_valid_entry_idx() {
648            return;
649        }
650        loop {
651            while self.current_value_idx < self.value_end_offset
652                && matches!(
653                    &self.inner.new_values[self.current_value_idx].1,
654                    SharedBufferValue::Insert(_)
655                )
656            {
657                self.current_value_idx += 1;
658            }
659            if self.current_value_idx >= self.value_end_offset {
660                debug_assert_eq!(self.current_value_idx, self.value_end_offset);
661                self.advance_to_next_entry();
662                if self.is_valid_entry_idx() {
663                    self.reset_value_idx();
664                    continue;
665                } else {
666                    break;
667                }
668            } else {
669                break;
670            }
671        }
672    }
673}
674
675impl SharedBufferBatchIterator<Forward> {
676    pub(crate) fn advance_to_next_key(&mut self) {
677        self.advance_to_next_entry();
678        if self.is_valid_entry_idx() {
679            self.reset_value_idx();
680        }
681    }
682
683    pub(crate) fn current_key_entry(&self) -> SharedBufferVersionedEntryRef<'_> {
684        self.assert_valid_idx();
685        debug_assert_eq!(
686            self.current_value_idx,
687            self.inner.entries[self.current_entry_idx].value_offset
688        );
689        SharedBufferVersionedEntryRef {
690            key: &self.inner.entries[self.current_entry_idx].key,
691            new_values: &self.inner.new_values[self.current_value_idx..self.value_end_offset],
692            old_values: self.inner.old_values.as_ref().map(|old_values| {
693                &old_values.values[self.current_value_idx..self.value_end_offset]
694            }),
695        }
696    }
697}
698
699impl<D: HummockIteratorDirection, const IS_NEW_VALUE: bool> HummockIterator
700    for SharedBufferBatchIterator<D, IS_NEW_VALUE>
701{
702    type Direction = D;
703
704    async fn next(&mut self) -> HummockResult<()> {
705        self.advance_to_next_value();
706        if !IS_NEW_VALUE {
707            self.advance_until_valid_old_value();
708        }
709        Ok(())
710    }
711
712    fn key(&self) -> FullKey<&[u8]> {
713        self.assert_valid_idx();
714        let key = self.inner.entries[self.current_entry_idx].key.as_ref();
715        let epoch_with_gap = self.inner.new_values[self.current_value_idx].0;
716        FullKey::new_with_gap_epoch(self.table_id, TableKey(key), epoch_with_gap)
717    }
718
719    fn value(&self) -> HummockValue<&[u8]> {
720        self.assert_valid_idx();
721        if IS_NEW_VALUE {
722            self.inner.new_values[self.current_value_idx]
723                .1
724                .to_ref()
725                .to_slice()
726                .into()
727        } else {
728            HummockValue::put(
729                self.inner.old_values.as_ref().unwrap().values[self.current_value_idx].as_ref(),
730            )
731        }
732    }
733
734    fn is_valid(&self) -> bool {
735        self.is_valid_entry_idx()
736    }
737
738    async fn rewind(&mut self) -> HummockResult<()> {
739        match D::direction() {
740            DirectionEnum::Forward => {
741                self.current_entry_idx = 0;
742            }
743            DirectionEnum::Backward => {
744                self.current_entry_idx = self.inner.entries.len() - 1;
745            }
746        };
747        self.reset_value_idx();
748        if !IS_NEW_VALUE {
749            self.advance_until_valid_old_value();
750        }
751        Ok(())
752    }
753
754    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
755        debug_assert_eq!(key.user_key.table_id, self.table_id);
756        // Perform binary search on table key because the items in SharedBufferBatch is ordered
757        // by table key.
758        let partition_point = self
759            .inner
760            .entries
761            .binary_search_by(|probe| probe.key.as_ref().cmp(*key.user_key.table_key));
762        let seek_key_epoch = key.epoch_with_gap;
763        match partition_point {
764            Ok(i) => {
765                self.current_entry_idx = i;
766                self.reset_value_idx();
767                while self.current_value_idx < self.value_end_offset {
768                    let epoch_with_gap = self.inner.new_values[self.current_value_idx].0;
769                    if epoch_with_gap <= seek_key_epoch {
770                        break;
771                    }
772                    self.current_value_idx += 1;
773                }
774                if self.current_value_idx == self.value_end_offset {
775                    self.advance_to_next_entry();
776                    if self.is_valid_entry_idx() {
777                        self.reset_value_idx();
778                    }
779                }
780            }
781            Err(i) => match D::direction() {
782                DirectionEnum::Forward => {
783                    self.current_entry_idx = i;
784                    if self.is_valid_entry_idx() {
785                        self.reset_value_idx();
786                    }
787                }
788                DirectionEnum::Backward => {
789                    if i == 0 {
790                        self.current_entry_idx = self.inner.entries.len();
791                    } else {
792                        self.current_entry_idx = i - 1;
793                        self.reset_value_idx();
794                    }
795                }
796            },
797        };
798        if !IS_NEW_VALUE {
799            self.advance_until_valid_old_value();
800        }
801        Ok(())
802    }
803
804    fn collect_local_statistic(&self, _stats: &mut crate::monitor::StoreLocalStatistic) {}
805
806    fn value_meta(&self) -> ValueMeta {
807        ValueMeta::default()
808    }
809}
810
811#[cfg(test)]
812mod tests {
813    use std::ops::Bound::Excluded;
814
815    use itertools::{Itertools, zip_eq};
816    use risingwave_common::util::epoch::{EpochExt, test_epoch};
817    use risingwave_hummock_sdk::key::map_table_key_range;
818
819    use super::*;
820    use crate::hummock::compactor::merge_imms_in_memory;
821    use crate::hummock::iterator::test_utils::{
822        iterator_test_key_of_epoch, iterator_test_table_key_of, transform_shared_buffer,
823    };
824
825    fn to_hummock_value_batch(
826        items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)>,
827    ) -> Vec<(Vec<u8>, HummockValue<Bytes>)> {
828        items.into_iter().map(|(k, v)| (k, v.into())).collect()
829    }
830
831    #[tokio::test]
832    async fn test_shared_buffer_batch_basic() {
833        let epoch = test_epoch(1);
834        let shared_buffer_items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
835            (
836                iterator_test_table_key_of(0),
837                SharedBufferValue::Insert(Bytes::from("value1")),
838            ),
839            (
840                iterator_test_table_key_of(1),
841                SharedBufferValue::Insert(Bytes::from("value1")),
842            ),
843            (
844                iterator_test_table_key_of(2),
845                SharedBufferValue::Insert(Bytes::from("value1")),
846            ),
847        ];
848        let shared_buffer_batch = SharedBufferBatch::for_test(
849            transform_shared_buffer(shared_buffer_items.clone()),
850            epoch,
851            Default::default(),
852        );
853        let shared_buffer_items = to_hummock_value_batch(shared_buffer_items);
854
855        // Sketch
856        assert_eq!(
857            *shared_buffer_batch.start_table_key(),
858            shared_buffer_items[0].0
859        );
860        assert_eq!(
861            *shared_buffer_batch.end_table_key(),
862            shared_buffer_items[2].0
863        );
864
865        // Point lookup
866        for (k, v) in &shared_buffer_items {
867            assert_eq!(
868                shared_buffer_batch
869                    .get(TableKey(k.as_slice()), epoch, &ReadOptions::default())
870                    .unwrap()
871                    .0,
872                v.clone()
873            );
874        }
875        assert_eq!(
876            shared_buffer_batch.get(
877                TableKey(iterator_test_table_key_of(3).as_slice()),
878                epoch,
879                &ReadOptions::default()
880            ),
881            None
882        );
883        assert_eq!(
884            shared_buffer_batch.get(
885                TableKey(iterator_test_table_key_of(4).as_slice()),
886                epoch,
887                &ReadOptions::default()
888            ),
889            None
890        );
891
892        // Forward iterator
893        let mut iter = shared_buffer_batch.clone().into_forward_iter();
894        iter.rewind().await.unwrap();
895        let mut output = vec![];
896        while iter.is_valid() {
897            output.push((
898                iter.key().user_key.table_key.to_vec(),
899                iter.value().to_bytes(),
900            ));
901            iter.next().await.unwrap();
902        }
903        assert_eq!(output, shared_buffer_items);
904
905        // Backward iterator
906        let mut backward_iter = shared_buffer_batch.clone().into_backward_iter();
907        backward_iter.rewind().await.unwrap();
908        let mut output = vec![];
909        while backward_iter.is_valid() {
910            output.push((
911                backward_iter.key().user_key.table_key.to_vec(),
912                backward_iter.value().to_bytes(),
913            ));
914            backward_iter.next().await.unwrap();
915        }
916        output.reverse();
917        assert_eq!(output, shared_buffer_items);
918    }
919
920    #[tokio::test]
921    async fn test_shared_buffer_batch_seek() {
922        let epoch = test_epoch(1);
923        let shared_buffer_items = vec![
924            (
925                iterator_test_table_key_of(1),
926                SharedBufferValue::Insert(Bytes::from("value1")),
927            ),
928            (
929                iterator_test_table_key_of(2),
930                SharedBufferValue::Insert(Bytes::from("value2")),
931            ),
932            (
933                iterator_test_table_key_of(3),
934                SharedBufferValue::Insert(Bytes::from("value3")),
935            ),
936        ];
937        let shared_buffer_batch = SharedBufferBatch::for_test(
938            transform_shared_buffer(shared_buffer_items.clone()),
939            epoch,
940            Default::default(),
941        );
942        let shared_buffer_items = to_hummock_value_batch(shared_buffer_items);
943
944        // FORWARD: Seek to a key < 1st key, expect all three items to return
945        let mut iter = shared_buffer_batch.clone().into_forward_iter();
946        iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
947            .await
948            .unwrap();
949        for item in &shared_buffer_items {
950            assert!(iter.is_valid());
951            assert_eq!(*iter.key().user_key.table_key, item.0);
952            assert_eq!(iter.value(), item.1.as_slice());
953            iter.next().await.unwrap();
954        }
955        assert!(!iter.is_valid());
956
957        // FORWARD: Seek to a key > the last key, expect no items to return
958        let mut iter = shared_buffer_batch.clone().into_forward_iter();
959        iter.seek(iterator_test_key_of_epoch(4, epoch).to_ref())
960            .await
961            .unwrap();
962        assert!(!iter.is_valid());
963
964        // FORWARD: Seek to 2nd key with current epoch, expect last two items to return
965        let mut iter = shared_buffer_batch.clone().into_forward_iter();
966        iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
967            .await
968            .unwrap();
969        for item in &shared_buffer_items[1..] {
970            assert!(iter.is_valid());
971            assert_eq!(*iter.key().user_key.table_key, item.0);
972            assert_eq!(iter.value(), item.1.as_slice());
973            iter.next().await.unwrap();
974        }
975        assert!(!iter.is_valid());
976
977        // FORWARD: Seek to 2nd key with future epoch, expect last two items to return
978        let mut iter = shared_buffer_batch.clone().into_forward_iter();
979        iter.seek(iterator_test_key_of_epoch(2, test_epoch(2)).to_ref())
980            .await
981            .unwrap();
982        for item in &shared_buffer_items[1..] {
983            assert!(iter.is_valid());
984            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
985            assert_eq!(iter.value(), item.1.as_slice());
986            iter.next().await.unwrap();
987        }
988        assert!(!iter.is_valid());
989
990        // FORWARD: Seek to 2nd key with old epoch, expect last item to return
991        let mut iter = shared_buffer_batch.clone().into_forward_iter();
992        iter.seek(iterator_test_key_of_epoch(2, test_epoch(0)).to_ref())
993            .await
994            .unwrap();
995        let item = shared_buffer_items.last().unwrap();
996        assert!(iter.is_valid());
997        assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
998        assert_eq!(iter.value(), item.1.as_slice());
999        iter.next().await.unwrap();
1000        assert!(!iter.is_valid());
1001
1002        // BACKWARD: Seek to a key < 1st key, expect no items to return
1003        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1004        iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1005            .await
1006            .unwrap();
1007        assert!(!iter.is_valid());
1008
1009        // BACKWARD: Seek to a key > the last key, expect all items to return
1010        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1011        iter.seek(iterator_test_key_of_epoch(4, epoch).to_ref())
1012            .await
1013            .unwrap();
1014        for item in shared_buffer_items.iter().rev() {
1015            assert!(iter.is_valid());
1016            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1017            assert_eq!(iter.value(), item.1.as_slice());
1018            iter.next().await.unwrap();
1019        }
1020        assert!(!iter.is_valid());
1021
1022        // BACKWARD: Seek to 2nd key with current epoch, expect first two items to return
1023        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1024        iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1025            .await
1026            .unwrap();
1027        for item in shared_buffer_items[0..=1].iter().rev() {
1028            assert!(iter.is_valid());
1029            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1030            assert_eq!(iter.value(), item.1.as_slice());
1031            iter.next().await.unwrap();
1032        }
1033        assert!(!iter.is_valid());
1034
1035        // BACKWARD: Seek to 2nd key with old epoch, expect first item to return
1036        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1037        iter.seek(iterator_test_key_of_epoch(2, epoch.prev_epoch()).to_ref())
1038            .await
1039            .unwrap();
1040        assert!(iter.is_valid());
1041        let item = shared_buffer_items.first().unwrap();
1042        assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1043        assert_eq!(iter.value(), item.1.as_slice());
1044        iter.next().await.unwrap();
1045        assert!(!iter.is_valid());
1046
1047        // BACKWARD: Seek to 2nd key with future epoch, expect first two item to return
1048        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1049        iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1050            .await
1051            .unwrap();
1052        for item in shared_buffer_items[0..=1].iter().rev() {
1053            assert!(iter.is_valid());
1054            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1055            assert_eq!(iter.value(), item.1.as_slice());
1056            iter.next().await.unwrap();
1057        }
1058        assert!(!iter.is_valid());
1059    }
1060
1061    #[tokio::test]
1062    async fn test_shared_buffer_batch_old_value_iter() {
1063        let epoch = test_epoch(1);
1064        let key_values = vec![
1065            (
1066                iterator_test_table_key_of(1),
1067                SharedBufferValue::Insert(Bytes::from("value1")),
1068            ),
1069            (
1070                iterator_test_table_key_of(2),
1071                SharedBufferValue::Update(Bytes::from("value2")),
1072            ),
1073            (
1074                iterator_test_table_key_of(3),
1075                SharedBufferValue::Insert(Bytes::from("value3")),
1076            ),
1077            (iterator_test_table_key_of(4), SharedBufferValue::Delete),
1078        ];
1079        let old_values = vec![
1080            Bytes::new(),
1081            Bytes::from("old_value2"),
1082            Bytes::new(),
1083            Bytes::from("old_value4"),
1084        ];
1085        let shared_buffer_batch = SharedBufferBatch::for_test_with_old_values(
1086            transform_shared_buffer(key_values.clone()),
1087            old_values.clone(),
1088            epoch,
1089            Default::default(),
1090        );
1091        let shared_buffer_items = to_hummock_value_batch(key_values.clone());
1092        let expected_old_value_iter_items = zip_eq(&key_values, &old_values)
1093            .filter(|((_, new_value), _)| !matches!(new_value, SharedBufferValue::Insert(_)))
1094            .map(|((key, _), old_value)| (key.clone(), HummockValue::Put(old_value)))
1095            .collect_vec();
1096
1097        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1098        iter.rewind().await.unwrap();
1099        for item in &expected_old_value_iter_items {
1100            assert!(iter.is_valid());
1101            assert_eq!(*iter.key().user_key.table_key, item.0);
1102            assert_eq!(iter.value(), item.1.as_slice());
1103            iter.next().await.unwrap();
1104        }
1105        assert!(!iter.is_valid());
1106
1107        // FORWARD: Seek to a key < 1st key, expect all three items to return
1108        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1109        iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1110            .await
1111            .unwrap();
1112        for item in &shared_buffer_items {
1113            assert!(iter.is_valid());
1114            assert_eq!(*iter.key().user_key.table_key, item.0);
1115            assert_eq!(iter.value(), item.1.as_slice());
1116            iter.next().await.unwrap();
1117        }
1118        assert!(!iter.is_valid());
1119
1120        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1121        iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1122            .await
1123            .unwrap();
1124        for item in &expected_old_value_iter_items {
1125            assert!(iter.is_valid());
1126            assert_eq!(*iter.key().user_key.table_key, item.0);
1127            assert_eq!(iter.value(), item.1.as_slice());
1128            iter.next().await.unwrap();
1129        }
1130        assert!(!iter.is_valid());
1131
1132        // FORWARD: Seek to a key > the last key, expect no items to return
1133        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1134        iter.seek(iterator_test_key_of_epoch(5, epoch).to_ref())
1135            .await
1136            .unwrap();
1137        assert!(!iter.is_valid());
1138
1139        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1140        iter.seek(iterator_test_key_of_epoch(5, epoch).to_ref())
1141            .await
1142            .unwrap();
1143        assert!(!iter.is_valid());
1144
1145        // FORWARD: Seek to 2nd key with current epoch, expect last two items to return
1146        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1147        iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1148            .await
1149            .unwrap();
1150        for item in &shared_buffer_items[1..] {
1151            assert!(iter.is_valid());
1152            assert_eq!(*iter.key().user_key.table_key, item.0);
1153            assert_eq!(iter.value(), item.1.as_slice());
1154            iter.next().await.unwrap();
1155        }
1156        assert!(!iter.is_valid());
1157
1158        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1159        iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1160            .await
1161            .unwrap();
1162        for item in &expected_old_value_iter_items {
1163            assert!(iter.is_valid());
1164            assert_eq!(*iter.key().user_key.table_key, item.0);
1165            assert_eq!(iter.value(), item.1.as_slice());
1166            iter.next().await.unwrap();
1167        }
1168        assert!(!iter.is_valid());
1169
1170        // FORWARD: Seek to 2nd key with future epoch, expect last two items to return
1171        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1172        iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1173            .await
1174            .unwrap();
1175        for item in &shared_buffer_items[1..] {
1176            assert!(iter.is_valid());
1177            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1178            assert_eq!(iter.value(), item.1.as_slice());
1179            iter.next().await.unwrap();
1180        }
1181        assert!(!iter.is_valid());
1182
1183        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1184        iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1185            .await
1186            .unwrap();
1187        for item in &expected_old_value_iter_items {
1188            assert!(iter.is_valid());
1189            assert_eq!(*iter.key().user_key.table_key, item.0);
1190            assert_eq!(iter.value(), item.1.as_slice());
1191            iter.next().await.unwrap();
1192        }
1193        assert!(!iter.is_valid());
1194
1195        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1196        iter.seek(iterator_test_key_of_epoch(2, epoch.prev_epoch()).to_ref())
1197            .await
1198            .unwrap();
1199        for item in &expected_old_value_iter_items[1..] {
1200            assert!(iter.is_valid());
1201            assert_eq!(*iter.key().user_key.table_key, item.0);
1202            assert_eq!(iter.value(), item.1.as_slice());
1203            iter.next().await.unwrap();
1204        }
1205        assert!(!iter.is_valid());
1206
1207        // FORWARD: Seek to 2nd key with old epoch, expect last item to return
1208        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1209        iter.seek(iterator_test_key_of_epoch(3, epoch.prev_epoch()).to_ref())
1210            .await
1211            .unwrap();
1212        let item = shared_buffer_items.last().unwrap();
1213        assert!(iter.is_valid());
1214        assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1215        assert_eq!(iter.value(), item.1.as_slice());
1216        iter.next().await.unwrap();
1217        assert!(!iter.is_valid());
1218
1219        // Seek to an insert key
1220        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1221        iter.seek(iterator_test_key_of_epoch(3, epoch).to_ref())
1222            .await
1223            .unwrap();
1224        for item in &expected_old_value_iter_items[1..] {
1225            assert!(iter.is_valid());
1226            assert_eq!(*iter.key().user_key.table_key, item.0);
1227            assert_eq!(iter.value(), item.1.as_slice());
1228            iter.next().await.unwrap();
1229        }
1230        assert!(!iter.is_valid());
1231    }
1232
1233    #[tokio::test]
1234    #[should_panic]
1235    async fn test_invalid_table_id() {
1236        let epoch = test_epoch(1);
1237        let shared_buffer_batch = SharedBufferBatch::for_test(vec![], epoch, Default::default());
1238        // Seeking to non-current epoch should panic
1239        let mut iter = shared_buffer_batch.into_forward_iter();
1240        iter.seek(FullKey::for_test(TableId::new(1), vec![], epoch).to_ref())
1241            .await
1242            .unwrap();
1243    }
1244
1245    #[tokio::test]
1246    async fn test_shared_buffer_batch_range_existx() {
1247        let epoch = test_epoch(1);
1248        let shared_buffer_items = vec![
1249            (
1250                Vec::from("a_1"),
1251                SharedBufferValue::Insert(Bytes::from("value1")),
1252            ),
1253            (
1254                Vec::from("a_3"),
1255                SharedBufferValue::Insert(Bytes::from("value2")),
1256            ),
1257            (
1258                Vec::from("a_5"),
1259                SharedBufferValue::Insert(Bytes::from("value3")),
1260            ),
1261            (
1262                Vec::from("b_2"),
1263                SharedBufferValue::Insert(Bytes::from("value3")),
1264            ),
1265        ];
1266        let shared_buffer_batch = SharedBufferBatch::for_test(
1267            transform_shared_buffer(shared_buffer_items),
1268            epoch,
1269            Default::default(),
1270        );
1271
1272        let range = (Included(Bytes::from("a")), Excluded(Bytes::from("b")));
1273        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1274        let range = (Included(Bytes::from("a_")), Excluded(Bytes::from("b_")));
1275        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1276        let range = (Included(Bytes::from("a_1")), Included(Bytes::from("a_1")));
1277        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1278        let range = (Included(Bytes::from("a_1")), Included(Bytes::from("a_2")));
1279        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1280        let range = (Included(Bytes::from("a_0x")), Included(Bytes::from("a_2x")));
1281        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1282        let range = (Included(Bytes::from("a_")), Excluded(Bytes::from("c_")));
1283        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1284        let range = (Included(Bytes::from("b_0x")), Included(Bytes::from("b_2x")));
1285        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1286        let range = (Included(Bytes::from("b_2")), Excluded(Bytes::from("c_1x")));
1287        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1288
1289        let range = (Included(Bytes::from("a_0")), Excluded(Bytes::from("a_1")));
1290        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1291        let range = (Included(Bytes::from("a__0")), Excluded(Bytes::from("a__5")));
1292        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1293        let range = (Included(Bytes::from("b_1")), Excluded(Bytes::from("b_2")));
1294        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1295        let range = (Included(Bytes::from("b_3")), Excluded(Bytes::from("c_1")));
1296        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1297        let range = (Included(Bytes::from("b__x")), Excluded(Bytes::from("c__x")));
1298        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1299    }
1300
1301    #[tokio::test]
1302    async fn test_merge_imms_basic() {
1303        let table_id = TableId { table_id: 1004 };
1304        let shared_buffer_items1: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1305            (
1306                iterator_test_table_key_of(1),
1307                SharedBufferValue::Insert(Bytes::from("value1")),
1308            ),
1309            (
1310                iterator_test_table_key_of(2),
1311                SharedBufferValue::Insert(Bytes::from("value2")),
1312            ),
1313            (
1314                iterator_test_table_key_of(3),
1315                SharedBufferValue::Insert(Bytes::from("value3")),
1316            ),
1317        ];
1318        let epoch = test_epoch(1);
1319        let imm1 = SharedBufferBatch::for_test(
1320            transform_shared_buffer(shared_buffer_items1.clone()),
1321            epoch,
1322            table_id,
1323        );
1324        let shared_buffer_items1 = to_hummock_value_batch(shared_buffer_items1);
1325        let shared_buffer_items2: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1326            (
1327                iterator_test_table_key_of(1),
1328                SharedBufferValue::Insert(Bytes::from("value12")),
1329            ),
1330            (
1331                iterator_test_table_key_of(2),
1332                SharedBufferValue::Insert(Bytes::from("value22")),
1333            ),
1334            (
1335                iterator_test_table_key_of(3),
1336                SharedBufferValue::Insert(Bytes::from("value32")),
1337            ),
1338        ];
1339        let epoch = test_epoch(2);
1340        let imm2 = SharedBufferBatch::for_test(
1341            transform_shared_buffer(shared_buffer_items2.clone()),
1342            epoch,
1343            table_id,
1344        );
1345        let shared_buffer_items2 = to_hummock_value_batch(shared_buffer_items2);
1346
1347        let shared_buffer_items3: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1348            (
1349                iterator_test_table_key_of(1),
1350                SharedBufferValue::Insert(Bytes::from("value13")),
1351            ),
1352            (
1353                iterator_test_table_key_of(2),
1354                SharedBufferValue::Insert(Bytes::from("value23")),
1355            ),
1356            (
1357                iterator_test_table_key_of(3),
1358                SharedBufferValue::Insert(Bytes::from("value33")),
1359            ),
1360        ];
1361        let epoch = test_epoch(3);
1362        let imm3 = SharedBufferBatch::for_test(
1363            transform_shared_buffer(shared_buffer_items3.clone()),
1364            epoch,
1365            table_id,
1366        );
1367        let shared_buffer_items3 = to_hummock_value_batch(shared_buffer_items3);
1368
1369        let batch_items = [
1370            shared_buffer_items1,
1371            shared_buffer_items2,
1372            shared_buffer_items3,
1373        ];
1374        // newer data comes first
1375        let imms = vec![imm3, imm2, imm1];
1376        let merged_imm = merge_imms_in_memory(table_id, imms.clone(), None).await;
1377
1378        // Point lookup
1379        for (i, items) in batch_items.iter().enumerate() {
1380            for (key, value) in items {
1381                assert_eq!(
1382                    merged_imm
1383                        .get(
1384                            TableKey(key.as_slice()),
1385                            test_epoch(i as u64 + 1),
1386                            &ReadOptions::default()
1387                        )
1388                        .unwrap()
1389                        .0,
1390                    value.clone(),
1391                    "epoch: {}, key: {:?}",
1392                    test_epoch(i as u64 + 1),
1393                    String::from_utf8(key.clone())
1394                );
1395            }
1396        }
1397        assert_eq!(
1398            merged_imm.get(
1399                TableKey(iterator_test_table_key_of(4).as_slice()),
1400                test_epoch(1),
1401                &ReadOptions::default()
1402            ),
1403            None
1404        );
1405        assert_eq!(
1406            merged_imm.get(
1407                TableKey(iterator_test_table_key_of(5).as_slice()),
1408                test_epoch(1),
1409                &ReadOptions::default()
1410            ),
1411            None
1412        );
1413
1414        // Forward iterator
1415        for snapshot_epoch in 1..=3 {
1416            let mut iter = merged_imm.clone().into_forward_iter();
1417            iter.rewind().await.unwrap();
1418            let mut output = vec![];
1419            while iter.is_valid() {
1420                let epoch = iter.key().epoch_with_gap.pure_epoch();
1421                if test_epoch(snapshot_epoch) == epoch {
1422                    output.push((
1423                        iter.key().user_key.table_key.to_vec(),
1424                        iter.value().to_bytes(),
1425                    ));
1426                }
1427                iter.next().await.unwrap();
1428            }
1429            assert_eq!(output, batch_items[snapshot_epoch as usize - 1]);
1430        }
1431
1432        // Forward and Backward iterator
1433        {
1434            let mut iter = merged_imm.clone().into_forward_iter();
1435            iter.rewind().await.unwrap();
1436            let mut output = vec![];
1437            while iter.is_valid() {
1438                output.push((
1439                    iter.key().user_key.table_key.to_vec(),
1440                    iter.value().to_bytes(),
1441                ));
1442                iter.next().await.unwrap();
1443            }
1444
1445            let mut expected = vec![];
1446            for key_idx in 0..=2 {
1447                for epoch in (1..=3).rev() {
1448                    let item = batch_items[epoch - 1][key_idx].clone();
1449                    expected.push(item);
1450                }
1451            }
1452            assert_eq!(expected, output);
1453
1454            let mut backward_iter = merged_imm.clone().into_backward_iter();
1455            backward_iter.rewind().await.unwrap();
1456            let mut output = vec![];
1457            while backward_iter.is_valid() {
1458                output.push((
1459                    backward_iter.key().user_key.table_key.to_vec(),
1460                    backward_iter.value().to_bytes(),
1461                ));
1462                backward_iter.next().await.unwrap();
1463            }
1464            let mut expected = vec![];
1465            for key_idx in (0..=2).rev() {
1466                for epoch in (1..=3).rev() {
1467                    let item = batch_items[epoch - 1][key_idx].clone();
1468                    expected.push(item);
1469                }
1470            }
1471            assert_eq!(expected, output);
1472        }
1473    }
1474
1475    #[tokio::test]
1476    async fn test_merge_imms_with_old_values() {
1477        let table_id = TableId { table_id: 1004 };
1478        let key_value1: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1479            (
1480                iterator_test_table_key_of(1),
1481                SharedBufferValue::Insert(Bytes::from("value1")),
1482            ),
1483            (
1484                iterator_test_table_key_of(2),
1485                SharedBufferValue::Update(Bytes::from("value2")),
1486            ),
1487            (iterator_test_table_key_of(3), SharedBufferValue::Delete),
1488        ];
1489        let old_value1 = vec![
1490            Bytes::new(),
1491            Bytes::from("old_value2"),
1492            Bytes::from("old_value3"),
1493        ];
1494        let epoch = test_epoch(1);
1495        let imm1 = SharedBufferBatch::for_test_with_old_values(
1496            transform_shared_buffer(key_value1.clone()),
1497            old_value1.clone(),
1498            epoch,
1499            table_id,
1500        );
1501        let shared_buffer_items1 = to_hummock_value_batch(key_value1.clone());
1502        let key_value2: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1503            (
1504                iterator_test_table_key_of(1),
1505                SharedBufferValue::Update(Bytes::from("value12")),
1506            ),
1507            (
1508                iterator_test_table_key_of(2),
1509                SharedBufferValue::Update(Bytes::from("value22")),
1510            ),
1511            (
1512                iterator_test_table_key_of(3),
1513                SharedBufferValue::Insert(Bytes::from("value32")),
1514            ),
1515        ];
1516        let old_value2 = vec![Bytes::from("value1"), Bytes::from("value2"), Bytes::new()];
1517        let epoch = epoch.next_epoch();
1518        let imm2 = SharedBufferBatch::for_test_with_old_values(
1519            transform_shared_buffer(key_value2.clone()),
1520            old_value2.clone(),
1521            epoch,
1522            table_id,
1523        );
1524        let shared_buffer_items2 = to_hummock_value_batch(key_value2.clone());
1525
1526        let key_value3: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1527            (iterator_test_table_key_of(1), SharedBufferValue::Delete),
1528            (iterator_test_table_key_of(2), SharedBufferValue::Delete),
1529            (
1530                iterator_test_table_key_of(3),
1531                SharedBufferValue::Update(Bytes::from("value33")),
1532            ),
1533        ];
1534        let old_value3 = vec![
1535            Bytes::from("value12"),
1536            Bytes::from("value22"),
1537            Bytes::from("value32"),
1538        ];
1539        let epoch = epoch.next_epoch();
1540        let imm3 = SharedBufferBatch::for_test_with_old_values(
1541            transform_shared_buffer(key_value3.clone()),
1542            old_value3.clone(),
1543            epoch,
1544            table_id,
1545        );
1546        let shared_buffer_items3 = to_hummock_value_batch(key_value3.clone());
1547
1548        let key_values = [
1549            (key_value1, old_value1),
1550            (key_value2, old_value2),
1551            (key_value3, old_value3),
1552        ];
1553
1554        let batch_items = [
1555            shared_buffer_items1,
1556            shared_buffer_items2,
1557            shared_buffer_items3,
1558        ];
1559        // newer data comes first
1560        let imms = vec![imm3, imm2, imm1];
1561        let merged_imm = merge_imms_in_memory(table_id, imms.clone(), None).await;
1562
1563        // Point lookup
1564        for (i, items) in batch_items.iter().enumerate() {
1565            for (key, value) in items {
1566                assert_eq!(
1567                    merged_imm
1568                        .get(
1569                            TableKey(key.as_slice()),
1570                            test_epoch(i as u64 + 1),
1571                            &ReadOptions::default()
1572                        )
1573                        .unwrap()
1574                        .0,
1575                    value.clone(),
1576                    "epoch: {}, key: {:?}",
1577                    test_epoch(i as u64 + 1),
1578                    String::from_utf8(key.clone())
1579                );
1580            }
1581        }
1582        assert_eq!(
1583            merged_imm.get(
1584                TableKey(iterator_test_table_key_of(4).as_slice()),
1585                test_epoch(1),
1586                &ReadOptions::default()
1587            ),
1588            None
1589        );
1590        assert_eq!(
1591            merged_imm.get(
1592                TableKey(iterator_test_table_key_of(5).as_slice()),
1593                test_epoch(1),
1594                &ReadOptions::default()
1595            ),
1596            None
1597        );
1598
1599        // Forward i
1600        for i in 1..=3 {
1601            let snapshot_epoch = test_epoch(i);
1602            let mut iter = merged_imm.clone().into_forward_iter();
1603            iter.rewind().await.unwrap();
1604            let mut output = vec![];
1605            while iter.is_valid() {
1606                let epoch = iter.key().epoch_with_gap.pure_epoch();
1607                if snapshot_epoch == epoch {
1608                    output.push((
1609                        iter.key().user_key.table_key.to_vec(),
1610                        iter.value().to_bytes(),
1611                    ));
1612                }
1613                iter.next().await.unwrap();
1614            }
1615            assert_eq!(output, batch_items[i as usize - 1]);
1616        }
1617
1618        // Forward and Backward iterator
1619        {
1620            let mut iter = merged_imm.clone().into_forward_iter();
1621            iter.rewind().await.unwrap();
1622            let mut output = vec![];
1623            while iter.is_valid() {
1624                output.push((
1625                    iter.key().user_key.table_key.to_vec(),
1626                    iter.value().to_bytes(),
1627                ));
1628                iter.next().await.unwrap();
1629            }
1630
1631            let mut expected = vec![];
1632            for key_idx in 0..=2 {
1633                for epoch in (1..=3).rev() {
1634                    let item = batch_items[epoch - 1][key_idx].clone();
1635                    expected.push(item);
1636                }
1637            }
1638            assert_eq!(expected, output);
1639
1640            let mut backward_iter = merged_imm.clone().into_backward_iter();
1641            backward_iter.rewind().await.unwrap();
1642            let mut output = vec![];
1643            while backward_iter.is_valid() {
1644                output.push((
1645                    backward_iter.key().user_key.table_key.to_vec(),
1646                    backward_iter.value().to_bytes(),
1647                ));
1648                backward_iter.next().await.unwrap();
1649            }
1650            let mut expected = vec![];
1651            for key_idx in (0..=2).rev() {
1652                for epoch in (1..=3).rev() {
1653                    let item = batch_items[epoch - 1][key_idx].clone();
1654                    expected.push(item);
1655                }
1656            }
1657            assert_eq!(expected, output);
1658        }
1659
1660        // old value iter
1661        {
1662            let mut iter = merged_imm.clone().into_old_value_iter();
1663            iter.rewind().await.unwrap();
1664            let mut output = vec![];
1665            while iter.is_valid() {
1666                output.push((
1667                    iter.key().user_key.table_key.to_vec(),
1668                    iter.value().to_bytes(),
1669                ));
1670                iter.next().await.unwrap();
1671            }
1672
1673            let mut expected = vec![];
1674            for key_idx in 0..=2 {
1675                for epoch in (0..=2).rev() {
1676                    let (key_values, old_values) = &key_values[epoch];
1677                    let (key, new_value) = &key_values[key_idx];
1678                    let old_value = &old_values[key_idx];
1679                    if matches!(new_value, SharedBufferValue::Insert(_)) {
1680                        continue;
1681                    }
1682                    expected.push((key.clone(), HummockValue::Put(old_value.clone())));
1683                }
1684            }
1685            assert_eq!(expected, output);
1686        }
1687    }
1688}