risingwave_storage/hummock/shared_buffer/
shared_buffer_batch.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::mem::size_of_val;
19use std::ops::Bound::Included;
20use std::ops::{Bound, RangeBounds};
21use std::sync::atomic::AtomicU64;
22use std::sync::atomic::Ordering::Relaxed;
23use std::sync::{Arc, LazyLock};
24
25use bytes::Bytes;
26use prometheus::IntGauge;
27use risingwave_common::catalog::TableId;
28use risingwave_hummock_sdk::EpochWithGap;
29use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange, UserKey};
30
31use crate::hummock::iterator::{
32    Backward, DirectionEnum, Forward, HummockIterator, HummockIteratorDirection, ValueMeta,
33};
34use crate::hummock::utils::{MemoryTracker, range_overlap};
35use crate::hummock::value::HummockValue;
36use crate::hummock::{HummockEpoch, HummockResult};
37use crate::mem_table::ImmId;
38use crate::store::ReadOptions;
39
40#[derive(Clone, Copy, Debug, PartialEq)]
41pub enum SharedBufferValue<T> {
42    Insert(T),
43    Update(T),
44    Delete,
45}
46
47impl<T> SharedBufferValue<T> {
48    fn to_ref(&self) -> SharedBufferValue<&T> {
49        match self {
50            SharedBufferValue::Insert(val) => SharedBufferValue::Insert(val),
51            SharedBufferValue::Update(val) => SharedBufferValue::Update(val),
52            SharedBufferValue::Delete => SharedBufferValue::Delete,
53        }
54    }
55}
56
57impl<T> From<SharedBufferValue<T>> for HummockValue<T> {
58    fn from(val: SharedBufferValue<T>) -> HummockValue<T> {
59        match val {
60            SharedBufferValue::Insert(val) | SharedBufferValue::Update(val) => {
61                HummockValue::Put(val)
62            }
63            SharedBufferValue::Delete => HummockValue::Delete,
64        }
65    }
66}
67
68impl<'a, T: AsRef<[u8]>> SharedBufferValue<&'a T> {
69    pub(crate) fn to_slice(self) -> SharedBufferValue<&'a [u8]> {
70        match self {
71            SharedBufferValue::Insert(val) => SharedBufferValue::Insert(val.as_ref()),
72            SharedBufferValue::Update(val) => SharedBufferValue::Update(val.as_ref()),
73            SharedBufferValue::Delete => SharedBufferValue::Delete,
74        }
75    }
76}
77
78/// The key is `table_key`, which does not contain table id or epoch.
79pub(crate) type SharedBufferItem = (TableKey<Bytes>, SharedBufferValue<Bytes>);
80pub type SharedBufferBatchId = u64;
81
82pub(crate) type VersionedSharedBufferValue = (EpochWithGap, SharedBufferValue<Bytes>);
83
84pub(crate) struct SharedBufferVersionedEntryRef<'a> {
85    pub(crate) key: &'a TableKey<Bytes>,
86    pub(crate) new_values: &'a [VersionedSharedBufferValue],
87    pub(crate) old_values: Option<&'a [Bytes]>,
88}
89
90#[derive(PartialEq, Debug)]
91pub(crate) struct SharedBufferKeyEntry {
92    pub(crate) key: TableKey<Bytes>,
93    /// A shared buffer may contain data from multiple epochs for a specific key.
94    /// The values of all keys are stored together in the field `new_values` of `SharedBufferBatchInner`
95    /// as a single vector. `value_offset` is the starting offset of values of the current `key` in the `new_values` vector.
96    /// The end offset is the `value_offset` of the next entry or the vector end if the current entry is not the last one.
97    pub(crate) value_offset: usize,
98}
99
100impl SharedBufferKeyEntry {
101    /// Return an exclusive offset of the values of key of index `i`
102    fn value_end_offset<'a, T>(
103        i: usize,
104        entries: &'a [SharedBufferKeyEntry],
105        values: &'a [T],
106    ) -> usize {
107        entries
108            .get(i + 1)
109            .map(|entry| entry.value_offset)
110            .unwrap_or(values.len())
111    }
112
113    fn values<'a, T>(i: usize, entries: &'a [SharedBufferKeyEntry], values: &'a [T]) -> &'a [T] {
114        &values[entries[i].value_offset..Self::value_end_offset(i, entries, values)]
115    }
116}
117
118#[derive(Debug)]
119pub(crate) struct SharedBufferBatchOldValues {
120    /// Store the old values. If some, the length should be the same as `new_values`. It contains empty `Bytes` when the
121    /// corresponding `new_value` is `Insert`, and contains the old values of `Update` and `Delete`.
122    values: Vec<Bytes>,
123    pub size: usize,
124    pub global_old_value_size: IntGauge,
125}
126
127impl Drop for SharedBufferBatchOldValues {
128    fn drop(&mut self) {
129        self.global_old_value_size.sub(self.size as _);
130    }
131}
132
133impl SharedBufferBatchOldValues {
134    pub(crate) fn new(values: Vec<Bytes>, size: usize, global_old_value_size: IntGauge) -> Self {
135        global_old_value_size.add(size as _);
136        Self {
137            values,
138            size,
139            global_old_value_size,
140        }
141    }
142
143    pub(crate) fn for_test(values: Vec<Bytes>, size: usize) -> Self {
144        Self::new(values, size, IntGauge::new("test", "test").unwrap())
145    }
146}
147
148#[derive(Debug)]
149pub(crate) struct SharedBufferBatchInner {
150    entries: Vec<SharedBufferKeyEntry>,
151    new_values: Vec<VersionedSharedBufferValue>,
152    old_values: Option<SharedBufferBatchOldValues>,
153    /// The epochs of the data in batch, sorted in ascending order (old to new)
154    epochs: Vec<HummockEpoch>,
155    /// Total size of all key-value items (excluding the `epoch` of value versions)
156    size: usize,
157    _tracker: Option<MemoryTracker>,
158    /// For a batch created from multiple batches, this will be
159    /// the largest batch id among input batches
160    batch_id: SharedBufferBatchId,
161}
162
163impl SharedBufferBatchInner {
164    pub(crate) fn new(
165        epoch: HummockEpoch,
166        spill_offset: u16,
167        payload: Vec<SharedBufferItem>,
168        old_values: Option<SharedBufferBatchOldValues>,
169        size: usize,
170        _tracker: Option<MemoryTracker>,
171    ) -> Self {
172        assert!(!payload.is_empty());
173        debug_assert!(payload.iter().is_sorted_by_key(|(key, _)| key));
174        if let Some(old_values) = &old_values {
175            assert_eq!(old_values.values.len(), payload.len());
176        }
177
178        let epoch_with_gap = EpochWithGap::new(epoch, spill_offset);
179        let mut entries = Vec::with_capacity(payload.len());
180        let mut new_values = Vec::with_capacity(payload.len());
181        for (i, (key, value)) in payload.into_iter().enumerate() {
182            entries.push(SharedBufferKeyEntry {
183                key,
184                value_offset: i,
185            });
186            new_values.push((epoch_with_gap, value));
187        }
188
189        let batch_id = SHARED_BUFFER_BATCH_ID_GENERATOR.fetch_add(1, Relaxed);
190        SharedBufferBatchInner {
191            entries,
192            new_values,
193            old_values,
194            epochs: vec![epoch],
195            size,
196            _tracker,
197            batch_id,
198        }
199    }
200
201    pub fn values(&self, i: usize) -> &[VersionedSharedBufferValue] {
202        SharedBufferKeyEntry::values(i, &self.entries, &self.new_values)
203    }
204
205    #[allow(clippy::too_many_arguments)]
206    pub(crate) fn new_with_multi_epoch_batches(
207        epochs: Vec<HummockEpoch>,
208        entries: Vec<SharedBufferKeyEntry>,
209        new_values: Vec<VersionedSharedBufferValue>,
210        old_values: Option<SharedBufferBatchOldValues>,
211        size: usize,
212        imm_id: ImmId,
213        tracker: Option<MemoryTracker>,
214    ) -> Self {
215        assert!(new_values.len() >= entries.len());
216        assert!(!entries.is_empty());
217        debug_assert!(entries.iter().is_sorted_by_key(|entry| &entry.key));
218        debug_assert!(entries.iter().is_sorted_by_key(|entry| &entry.value_offset));
219        debug_assert!((0..entries.len()).all(|i| {
220            SharedBufferKeyEntry::values(i, &entries, &new_values)
221                .iter()
222                .rev()
223                .is_sorted_by_key(|(epoch_with_gap, _)| epoch_with_gap)
224        }));
225        debug_assert!(!epochs.is_empty());
226        debug_assert!(epochs.is_sorted());
227
228        Self {
229            entries,
230            new_values,
231            old_values,
232            epochs,
233            size,
234            _tracker: tracker,
235            batch_id: imm_id,
236        }
237    }
238
239    /// Return `None` if cannot find a visible version
240    /// Return `HummockValue::Delete` if the key has been deleted by some epoch <= `read_epoch`
241    fn get_value<'a>(
242        &'a self,
243        table_key: TableKey<&[u8]>,
244        read_epoch: HummockEpoch,
245    ) -> Option<(HummockValue<&'a Bytes>, EpochWithGap)> {
246        // Perform binary search on table key to find the corresponding entry
247        if let Ok(i) = self
248            .entries
249            .binary_search_by(|m| (m.key.as_ref()).cmp(*table_key))
250        {
251            let entry = &self.entries[i];
252            assert_eq!(entry.key.as_ref(), *table_key);
253            // Scan to find the first version <= epoch
254            for (e, v) in self.values(i) {
255                // skip invisible versions
256                if read_epoch < e.pure_epoch() {
257                    continue;
258                }
259                return Some((v.to_ref().into(), *e));
260            }
261            // cannot find a visible version
262        }
263
264        None
265    }
266}
267
268impl PartialEq for SharedBufferBatchInner {
269    fn eq(&self, other: &Self) -> bool {
270        self.entries == other.entries && self.new_values == other.new_values
271    }
272}
273
274pub static SHARED_BUFFER_BATCH_ID_GENERATOR: LazyLock<AtomicU64> =
275    LazyLock::new(|| AtomicU64::new(0));
276
277/// A write batch stored in the shared buffer.
278#[derive(Clone, Debug)]
279pub struct SharedBufferBatch {
280    pub(crate) inner: Arc<SharedBufferBatchInner>,
281    pub table_id: TableId,
282}
283
284impl SharedBufferBatch {
285    pub fn for_test(
286        sorted_items: Vec<SharedBufferItem>,
287        epoch: HummockEpoch,
288        table_id: TableId,
289    ) -> Self {
290        Self::for_test_inner(sorted_items, None, epoch, table_id)
291    }
292
293    pub fn for_test_with_old_values(
294        sorted_items: Vec<SharedBufferItem>,
295        old_values: Vec<Bytes>,
296        epoch: HummockEpoch,
297        table_id: TableId,
298    ) -> Self {
299        Self::for_test_inner(sorted_items, Some(old_values), epoch, table_id)
300    }
301
302    fn for_test_inner(
303        sorted_items: Vec<SharedBufferItem>,
304        old_values: Option<Vec<Bytes>>,
305        epoch: HummockEpoch,
306        table_id: TableId,
307    ) -> Self {
308        let (size, old_value_size) = Self::measure_batch_size(&sorted_items, old_values.as_deref());
309
310        let old_values = old_values
311            .map(|old_values| SharedBufferBatchOldValues::for_test(old_values, old_value_size));
312
313        Self {
314            inner: Arc::new(SharedBufferBatchInner::new(
315                epoch,
316                0,
317                sorted_items,
318                old_values,
319                size,
320                None,
321            )),
322            table_id,
323        }
324    }
325
326    pub fn measure_delete_range_size(batch_items: &[(Bound<Bytes>, Bound<Bytes>)]) -> usize {
327        batch_items
328            .iter()
329            .map(|(left, right)| {
330                // is_exclude_left_key(bool) + table_id + epoch
331                let l1 = match left {
332                    Bound::Excluded(x) | Bound::Included(x) => x.len() + 13,
333                    Bound::Unbounded => 13,
334                };
335                let l2 = match right {
336                    Bound::Excluded(x) | Bound::Included(x) => x.len() + 13,
337                    Bound::Unbounded => 13,
338                };
339                l1 + l2
340            })
341            .sum()
342    }
343
344    /// Return (total size, old value size or 0)
345    pub fn measure_batch_size(
346        batch_items: &[SharedBufferItem],
347        old_values: Option<&[Bytes]>,
348    ) -> (usize, usize) {
349        let old_value_size = old_values
350            .iter()
351            .flat_map(|slice| slice.iter().map(|value| size_of_val(value) + value.len()))
352            .sum::<usize>();
353        // size = Sum(length of full key + length of user value)
354        let kv_size = batch_items
355            .iter()
356            .map(|(k, v)| {
357                k.len() + {
358                    match v {
359                        SharedBufferValue::Insert(val) | SharedBufferValue::Update(val) => {
360                            val.len()
361                        }
362                        SharedBufferValue::Delete => 0,
363                    }
364                }
365            })
366            .sum::<usize>();
367        (kv_size + old_value_size, old_value_size)
368    }
369
370    pub fn filter<R, B>(&self, table_id: TableId, table_key_range: &R) -> bool
371    where
372        R: RangeBounds<TableKey<B>>,
373        B: AsRef<[u8]>,
374    {
375        let left = table_key_range
376            .start_bound()
377            .as_ref()
378            .map(|key| TableKey(key.0.as_ref()));
379        let right = table_key_range
380            .end_bound()
381            .as_ref()
382            .map(|key| TableKey(key.0.as_ref()));
383        self.table_id == table_id
384            && range_overlap(
385                &(left, right),
386                &self.start_table_key(),
387                Included(&self.end_table_key()),
388            )
389    }
390
391    pub fn table_id(&self) -> TableId {
392        self.table_id
393    }
394
395    pub fn min_epoch(&self) -> HummockEpoch {
396        *self.inner.epochs.first().unwrap()
397    }
398
399    pub fn max_epoch(&self) -> HummockEpoch {
400        *self.inner.epochs.last().unwrap()
401    }
402
403    pub fn key_count(&self) -> usize {
404        self.inner.entries.len()
405    }
406
407    pub fn value_count(&self) -> usize {
408        self.inner.new_values.len()
409    }
410
411    pub fn has_old_value(&self) -> bool {
412        self.inner.old_values.is_some()
413    }
414
415    pub fn get<'a>(
416        &'a self,
417        table_key: TableKey<&[u8]>,
418        read_epoch: HummockEpoch,
419        _read_options: &ReadOptions,
420    ) -> Option<(HummockValue<&'a Bytes>, EpochWithGap)> {
421        self.inner.get_value(table_key, read_epoch)
422    }
423
424    pub fn range_exists(&self, table_key_range: &TableKeyRange) -> bool {
425        self.inner
426            .entries
427            .binary_search_by(|m| {
428                let key = &m.key;
429                let too_left = match &table_key_range.0 {
430                    std::ops::Bound::Included(range_start) => range_start.as_ref() > key.as_ref(),
431                    std::ops::Bound::Excluded(range_start) => range_start.as_ref() >= key.as_ref(),
432                    std::ops::Bound::Unbounded => false,
433                };
434                if too_left {
435                    return Ordering::Less;
436                }
437
438                let too_right = match &table_key_range.1 {
439                    std::ops::Bound::Included(range_end) => range_end.as_ref() < key.as_ref(),
440                    std::ops::Bound::Excluded(range_end) => range_end.as_ref() <= key.as_ref(),
441                    std::ops::Bound::Unbounded => false,
442                };
443                if too_right {
444                    return Ordering::Greater;
445                }
446
447                Ordering::Equal
448            })
449            .is_ok()
450    }
451
452    pub fn into_directed_iter<D: HummockIteratorDirection, const IS_NEW_VALUE: bool>(
453        self,
454    ) -> SharedBufferBatchIterator<D, IS_NEW_VALUE> {
455        SharedBufferBatchIterator::<D, IS_NEW_VALUE>::new(self.inner, self.table_id)
456    }
457
458    pub fn into_old_value_iter(self) -> SharedBufferBatchIterator<Forward, false> {
459        self.into_directed_iter()
460    }
461
462    pub fn into_forward_iter(self) -> SharedBufferBatchIterator<Forward> {
463        self.into_directed_iter()
464    }
465
466    pub fn into_backward_iter(self) -> SharedBufferBatchIterator<Backward> {
467        self.into_directed_iter()
468    }
469
470    #[inline(always)]
471    pub fn start_table_key(&self) -> TableKey<&[u8]> {
472        TableKey(self.inner.entries.first().expect("non-empty").key.as_ref())
473    }
474
475    #[inline(always)]
476    pub fn end_table_key(&self) -> TableKey<&[u8]> {
477        TableKey(self.inner.entries.last().expect("non-empty").key.as_ref())
478    }
479
480    #[inline(always)]
481    pub fn raw_largest_key(&self) -> &TableKey<Bytes> {
482        &self.inner.entries.last().expect("non-empty").key
483    }
484
485    /// return inclusive left endpoint, which means that all data in this batch should be larger or
486    /// equal than this key.
487    pub fn start_user_key(&self) -> UserKey<&[u8]> {
488        UserKey::new(self.table_id, self.start_table_key())
489    }
490
491    pub fn size(&self) -> usize {
492        self.inner.size
493    }
494
495    pub(crate) fn old_values(&self) -> Option<&SharedBufferBatchOldValues> {
496        self.inner.old_values.as_ref()
497    }
498
499    pub fn batch_id(&self) -> SharedBufferBatchId {
500        self.inner.batch_id
501    }
502
503    pub fn epochs(&self) -> &Vec<HummockEpoch> {
504        &self.inner.epochs
505    }
506
507    pub(crate) fn build_shared_buffer_batch(
508        epoch: HummockEpoch,
509        spill_offset: u16,
510        sorted_items: Vec<SharedBufferItem>,
511        old_values: Option<SharedBufferBatchOldValues>,
512        size: usize,
513        table_id: TableId,
514        tracker: Option<MemoryTracker>,
515    ) -> Self {
516        let inner = SharedBufferBatchInner::new(
517            epoch,
518            spill_offset,
519            sorted_items,
520            old_values,
521            size,
522            tracker,
523        );
524        SharedBufferBatch {
525            inner: Arc::new(inner),
526            table_id,
527        }
528    }
529
530    #[cfg(any(test, feature = "test"))]
531    pub fn build_shared_buffer_batch_for_test(
532        epoch: HummockEpoch,
533        spill_offset: u16,
534        sorted_items: Vec<SharedBufferItem>,
535        size: usize,
536        table_id: TableId,
537    ) -> Self {
538        let inner =
539            SharedBufferBatchInner::new(epoch, spill_offset, sorted_items, None, size, None);
540        SharedBufferBatch {
541            inner: Arc::new(inner),
542            table_id,
543        }
544    }
545}
546
547/// Iterate all the items in the shared buffer batch
548/// If there are multiple versions of a key, the iterator will return all versions
549pub struct SharedBufferBatchIterator<D: HummockIteratorDirection, const IS_NEW_VALUE: bool = true> {
550    inner: Arc<SharedBufferBatchInner>,
551    /// The index of the current entry in the payload
552    current_entry_idx: usize,
553    /// The index of current value
554    current_value_idx: usize,
555    /// The exclusive end offset of the value index of current key.
556    value_end_offset: usize,
557    table_id: TableId,
558    _phantom: PhantomData<D>,
559}
560
561impl<D: HummockIteratorDirection, const IS_NEW_VALUE: bool>
562    SharedBufferBatchIterator<D, IS_NEW_VALUE>
563{
564    pub(crate) fn new(inner: Arc<SharedBufferBatchInner>, table_id: TableId) -> Self {
565        if !IS_NEW_VALUE {
566            assert!(
567                inner.old_values.is_some(),
568                "create old value iter with no old value: {:?}",
569                table_id
570            );
571        }
572        Self {
573            inner,
574            current_entry_idx: 0,
575            current_value_idx: 0,
576            value_end_offset: 0,
577            table_id,
578            _phantom: Default::default(),
579        }
580    }
581
582    fn is_valid_entry_idx(&self) -> bool {
583        self.current_entry_idx < self.inner.entries.len()
584    }
585
586    fn invalidate(&mut self) {
587        self.current_entry_idx = self.inner.entries.len();
588    }
589
590    fn advance_to_next_entry(&mut self) {
591        debug_assert!(self.is_valid_entry_idx());
592        match D::direction() {
593            DirectionEnum::Forward => {
594                self.current_entry_idx += 1;
595            }
596            DirectionEnum::Backward => {
597                if self.current_entry_idx == 0 {
598                    self.invalidate();
599                } else {
600                    self.current_entry_idx -= 1;
601                }
602            }
603        }
604    }
605
606    fn reset_value_idx(&mut self) {
607        debug_assert!(self.is_valid_entry_idx());
608        self.current_value_idx = self.inner.entries[self.current_entry_idx].value_offset;
609        self.value_end_offset = self.get_value_end_offset();
610    }
611
612    fn get_value_end_offset(&self) -> usize {
613        debug_assert!(self.is_valid_entry_idx());
614        SharedBufferKeyEntry::value_end_offset(
615            self.current_entry_idx,
616            &self.inner.entries,
617            &self.inner.new_values,
618        )
619    }
620
621    fn assert_valid_idx(&self) {
622        debug_assert!(self.is_valid_entry_idx());
623        debug_assert!(
624            self.current_value_idx >= self.inner.entries[self.current_entry_idx].value_offset
625        );
626        debug_assert_eq!(self.value_end_offset, self.get_value_end_offset());
627        debug_assert!(self.current_value_idx < self.value_end_offset);
628        if !IS_NEW_VALUE {
629            debug_assert!(!matches!(
630                &self.inner.new_values[self.current_value_idx].1,
631                SharedBufferValue::Insert(_)
632            ));
633        }
634    }
635
636    fn advance_to_next_value(&mut self) {
637        self.assert_valid_idx();
638
639        if self.current_value_idx + 1 < self.value_end_offset {
640            self.current_value_idx += 1;
641        } else {
642            self.advance_to_next_entry();
643            if self.is_valid_entry_idx() {
644                self.reset_value_idx();
645            }
646        }
647    }
648
649    fn advance_until_valid_old_value(&mut self) {
650        debug_assert!(!IS_NEW_VALUE);
651        if !self.is_valid_entry_idx() {
652            return;
653        }
654        loop {
655            while self.current_value_idx < self.value_end_offset
656                && matches!(
657                    &self.inner.new_values[self.current_value_idx].1,
658                    SharedBufferValue::Insert(_)
659                )
660            {
661                self.current_value_idx += 1;
662            }
663            if self.current_value_idx >= self.value_end_offset {
664                debug_assert_eq!(self.current_value_idx, self.value_end_offset);
665                self.advance_to_next_entry();
666                if self.is_valid_entry_idx() {
667                    self.reset_value_idx();
668                    continue;
669                } else {
670                    break;
671                }
672            } else {
673                break;
674            }
675        }
676    }
677}
678
679impl SharedBufferBatchIterator<Forward> {
680    pub(crate) fn advance_to_next_key(&mut self) {
681        self.advance_to_next_entry();
682        if self.is_valid_entry_idx() {
683            self.reset_value_idx();
684        }
685    }
686
687    pub(crate) fn current_key_entry(&self) -> SharedBufferVersionedEntryRef<'_> {
688        self.assert_valid_idx();
689        debug_assert_eq!(
690            self.current_value_idx,
691            self.inner.entries[self.current_entry_idx].value_offset
692        );
693        SharedBufferVersionedEntryRef {
694            key: &self.inner.entries[self.current_entry_idx].key,
695            new_values: &self.inner.new_values[self.current_value_idx..self.value_end_offset],
696            old_values: self.inner.old_values.as_ref().map(|old_values| {
697                &old_values.values[self.current_value_idx..self.value_end_offset]
698            }),
699        }
700    }
701}
702
703impl<D: HummockIteratorDirection, const IS_NEW_VALUE: bool> HummockIterator
704    for SharedBufferBatchIterator<D, IS_NEW_VALUE>
705{
706    type Direction = D;
707
708    async fn next(&mut self) -> HummockResult<()> {
709        self.advance_to_next_value();
710        if !IS_NEW_VALUE {
711            self.advance_until_valid_old_value();
712        }
713        Ok(())
714    }
715
716    fn key(&self) -> FullKey<&[u8]> {
717        self.assert_valid_idx();
718        let key = self.inner.entries[self.current_entry_idx].key.as_ref();
719        let epoch_with_gap = self.inner.new_values[self.current_value_idx].0;
720        FullKey::new_with_gap_epoch(self.table_id, TableKey(key), epoch_with_gap)
721    }
722
723    fn value(&self) -> HummockValue<&[u8]> {
724        self.assert_valid_idx();
725        if IS_NEW_VALUE {
726            self.inner.new_values[self.current_value_idx]
727                .1
728                .to_ref()
729                .to_slice()
730                .into()
731        } else {
732            HummockValue::put(
733                self.inner.old_values.as_ref().unwrap().values[self.current_value_idx].as_ref(),
734            )
735        }
736    }
737
738    fn is_valid(&self) -> bool {
739        self.is_valid_entry_idx()
740    }
741
742    async fn rewind(&mut self) -> HummockResult<()> {
743        match D::direction() {
744            DirectionEnum::Forward => {
745                self.current_entry_idx = 0;
746            }
747            DirectionEnum::Backward => {
748                self.current_entry_idx = self.inner.entries.len() - 1;
749            }
750        };
751        self.reset_value_idx();
752        if !IS_NEW_VALUE {
753            self.advance_until_valid_old_value();
754        }
755        Ok(())
756    }
757
758    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
759        match key.user_key.table_id.cmp(&self.table_id) {
760            Ordering::Less => {
761                match D::direction() {
762                    DirectionEnum::Forward => {
763                        // seek key table id < batch table id, so seek to beginning
764                        self.rewind().await?;
765                        return Ok(());
766                    }
767                    DirectionEnum::Backward => {
768                        self.invalidate();
769                        return Ok(());
770                    }
771                };
772            }
773            Ordering::Greater => {
774                match D::direction() {
775                    DirectionEnum::Forward => {
776                        self.invalidate();
777                        return Ok(());
778                    }
779                    DirectionEnum::Backward => {
780                        // seek key table id > batch table id, so seek to end
781                        self.rewind().await?;
782                        return Ok(());
783                    }
784                };
785            }
786            Ordering::Equal => (),
787        }
788        // Perform binary search on table key because the items in SharedBufferBatch is ordered
789        // by table key.
790        let partition_point = self
791            .inner
792            .entries
793            .binary_search_by(|probe| probe.key.as_ref().cmp(*key.user_key.table_key));
794        let seek_key_epoch = key.epoch_with_gap;
795        match partition_point {
796            Ok(i) => {
797                self.current_entry_idx = i;
798                self.reset_value_idx();
799                while self.current_value_idx < self.value_end_offset {
800                    let epoch_with_gap = self.inner.new_values[self.current_value_idx].0;
801                    if epoch_with_gap <= seek_key_epoch {
802                        break;
803                    }
804                    self.current_value_idx += 1;
805                }
806                if self.current_value_idx == self.value_end_offset {
807                    self.advance_to_next_entry();
808                    if self.is_valid_entry_idx() {
809                        self.reset_value_idx();
810                    }
811                }
812            }
813            Err(i) => match D::direction() {
814                DirectionEnum::Forward => {
815                    self.current_entry_idx = i;
816                    if self.is_valid_entry_idx() {
817                        self.reset_value_idx();
818                    }
819                }
820                DirectionEnum::Backward => {
821                    if i == 0 {
822                        self.invalidate();
823                    } else {
824                        self.current_entry_idx = i - 1;
825                        self.reset_value_idx();
826                    }
827                }
828            },
829        };
830        if !IS_NEW_VALUE {
831            self.advance_until_valid_old_value();
832        }
833        Ok(())
834    }
835
836    fn collect_local_statistic(&self, _stats: &mut crate::monitor::StoreLocalStatistic) {}
837
838    fn value_meta(&self) -> ValueMeta {
839        ValueMeta::default()
840    }
841}
842
843#[cfg(test)]
844mod tests {
845    use std::ops::Bound::Excluded;
846
847    use itertools::{Itertools, zip_eq};
848    use risingwave_common::util::epoch::{EpochExt, test_epoch};
849    use risingwave_hummock_sdk::key::map_table_key_range;
850
851    use super::*;
852    use crate::hummock::compactor::merge_imms_in_memory;
853    use crate::hummock::iterator::test_utils::{
854        iterator_test_key_of_epoch, iterator_test_table_key_of, transform_shared_buffer,
855    };
856
857    fn to_hummock_value_batch(
858        items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)>,
859    ) -> Vec<(Vec<u8>, HummockValue<Bytes>)> {
860        items.into_iter().map(|(k, v)| (k, v.into())).collect()
861    }
862
863    #[tokio::test]
864    async fn test_shared_buffer_batch_basic() {
865        let epoch = test_epoch(1);
866        let shared_buffer_items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
867            (
868                iterator_test_table_key_of(0),
869                SharedBufferValue::Insert(Bytes::from("value1")),
870            ),
871            (
872                iterator_test_table_key_of(1),
873                SharedBufferValue::Insert(Bytes::from("value1")),
874            ),
875            (
876                iterator_test_table_key_of(2),
877                SharedBufferValue::Insert(Bytes::from("value1")),
878            ),
879        ];
880        let shared_buffer_batch = SharedBufferBatch::for_test(
881            transform_shared_buffer(shared_buffer_items.clone()),
882            epoch,
883            Default::default(),
884        );
885        let shared_buffer_items = to_hummock_value_batch(shared_buffer_items);
886
887        // Sketch
888        assert_eq!(
889            *shared_buffer_batch.start_table_key(),
890            shared_buffer_items[0].0
891        );
892        assert_eq!(
893            *shared_buffer_batch.end_table_key(),
894            shared_buffer_items[2].0
895        );
896
897        // Point lookup
898        for (k, v) in &shared_buffer_items {
899            assert_eq!(
900                shared_buffer_batch
901                    .get(TableKey(k.as_slice()), epoch, &ReadOptions::default())
902                    .unwrap()
903                    .0
904                    .as_slice(),
905                v.as_slice()
906            );
907        }
908        assert_eq!(
909            shared_buffer_batch.get(
910                TableKey(iterator_test_table_key_of(3).as_slice()),
911                epoch,
912                &ReadOptions::default()
913            ),
914            None
915        );
916        assert_eq!(
917            shared_buffer_batch.get(
918                TableKey(iterator_test_table_key_of(4).as_slice()),
919                epoch,
920                &ReadOptions::default()
921            ),
922            None
923        );
924
925        // Forward iterator
926        let mut iter = shared_buffer_batch.clone().into_forward_iter();
927        iter.rewind().await.unwrap();
928        let mut output = vec![];
929        while iter.is_valid() {
930            output.push((
931                iter.key().user_key.table_key.to_vec(),
932                iter.value().to_bytes(),
933            ));
934            iter.next().await.unwrap();
935        }
936        assert_eq!(output, shared_buffer_items);
937
938        // Backward iterator
939        let mut backward_iter = shared_buffer_batch.clone().into_backward_iter();
940        backward_iter.rewind().await.unwrap();
941        let mut output = vec![];
942        while backward_iter.is_valid() {
943            output.push((
944                backward_iter.key().user_key.table_key.to_vec(),
945                backward_iter.value().to_bytes(),
946            ));
947            backward_iter.next().await.unwrap();
948        }
949        output.reverse();
950        assert_eq!(output, shared_buffer_items);
951    }
952
953    #[tokio::test]
954    async fn test_shared_buffer_batch_seek() {
955        let epoch = test_epoch(1);
956        let shared_buffer_items = vec![
957            (
958                iterator_test_table_key_of(1),
959                SharedBufferValue::Insert(Bytes::from("value1")),
960            ),
961            (
962                iterator_test_table_key_of(2),
963                SharedBufferValue::Insert(Bytes::from("value2")),
964            ),
965            (
966                iterator_test_table_key_of(3),
967                SharedBufferValue::Insert(Bytes::from("value3")),
968            ),
969        ];
970        let shared_buffer_batch = SharedBufferBatch::for_test(
971            transform_shared_buffer(shared_buffer_items.clone()),
972            epoch,
973            Default::default(),
974        );
975        let shared_buffer_items = to_hummock_value_batch(shared_buffer_items);
976
977        // FORWARD: Seek to a key < 1st key, expect all three items to return
978        let mut iter = shared_buffer_batch.clone().into_forward_iter();
979        iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
980            .await
981            .unwrap();
982        for item in &shared_buffer_items {
983            assert!(iter.is_valid());
984            assert_eq!(*iter.key().user_key.table_key, item.0);
985            assert_eq!(iter.value(), item.1.as_slice());
986            iter.next().await.unwrap();
987        }
988        assert!(!iter.is_valid());
989
990        // FORWARD: Seek to a key > the last key, expect no items to return
991        let mut iter = shared_buffer_batch.clone().into_forward_iter();
992        iter.seek(iterator_test_key_of_epoch(4, epoch).to_ref())
993            .await
994            .unwrap();
995        assert!(!iter.is_valid());
996
997        // FORWARD: Seek to 2nd key with current epoch, expect last two items to return
998        let mut iter = shared_buffer_batch.clone().into_forward_iter();
999        iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1000            .await
1001            .unwrap();
1002        for item in &shared_buffer_items[1..] {
1003            assert!(iter.is_valid());
1004            assert_eq!(*iter.key().user_key.table_key, item.0);
1005            assert_eq!(iter.value(), item.1.as_slice());
1006            iter.next().await.unwrap();
1007        }
1008        assert!(!iter.is_valid());
1009
1010        // FORWARD: Seek to 2nd key with future epoch, expect last two items to return
1011        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1012        iter.seek(iterator_test_key_of_epoch(2, test_epoch(2)).to_ref())
1013            .await
1014            .unwrap();
1015        for item in &shared_buffer_items[1..] {
1016            assert!(iter.is_valid());
1017            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1018            assert_eq!(iter.value(), item.1.as_slice());
1019            iter.next().await.unwrap();
1020        }
1021        assert!(!iter.is_valid());
1022
1023        // FORWARD: Seek to 2nd key with old epoch, expect last item to return
1024        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1025        iter.seek(iterator_test_key_of_epoch(2, test_epoch(0)).to_ref())
1026            .await
1027            .unwrap();
1028        let item = shared_buffer_items.last().unwrap();
1029        assert!(iter.is_valid());
1030        assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1031        assert_eq!(iter.value(), item.1.as_slice());
1032        iter.next().await.unwrap();
1033        assert!(!iter.is_valid());
1034
1035        // BACKWARD: Seek to a key < 1st key, expect no items to return
1036        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1037        iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1038            .await
1039            .unwrap();
1040        assert!(!iter.is_valid());
1041
1042        // BACKWARD: Seek to a key > the last key, expect all items to return
1043        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1044        iter.seek(iterator_test_key_of_epoch(4, epoch).to_ref())
1045            .await
1046            .unwrap();
1047        for item in shared_buffer_items.iter().rev() {
1048            assert!(iter.is_valid());
1049            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1050            assert_eq!(iter.value(), item.1.as_slice());
1051            iter.next().await.unwrap();
1052        }
1053        assert!(!iter.is_valid());
1054
1055        // BACKWARD: Seek to 2nd key with current epoch, expect first two items to return
1056        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1057        iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1058            .await
1059            .unwrap();
1060        for item in shared_buffer_items[0..=1].iter().rev() {
1061            assert!(iter.is_valid());
1062            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1063            assert_eq!(iter.value(), item.1.as_slice());
1064            iter.next().await.unwrap();
1065        }
1066        assert!(!iter.is_valid());
1067
1068        // BACKWARD: Seek to 2nd key with old epoch, expect first item to return
1069        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1070        iter.seek(iterator_test_key_of_epoch(2, epoch.prev_epoch()).to_ref())
1071            .await
1072            .unwrap();
1073        assert!(iter.is_valid());
1074        let item = shared_buffer_items.first().unwrap();
1075        assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1076        assert_eq!(iter.value(), item.1.as_slice());
1077        iter.next().await.unwrap();
1078        assert!(!iter.is_valid());
1079
1080        // BACKWARD: Seek to 2nd key with future epoch, expect first two item to return
1081        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1082        iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1083            .await
1084            .unwrap();
1085        for item in shared_buffer_items[0..=1].iter().rev() {
1086            assert!(iter.is_valid());
1087            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1088            assert_eq!(iter.value(), item.1.as_slice());
1089            iter.next().await.unwrap();
1090        }
1091        assert!(!iter.is_valid());
1092    }
1093
1094    #[tokio::test]
1095    async fn test_shared_buffer_batch_old_value_iter() {
1096        let epoch = test_epoch(1);
1097        let key_values = vec![
1098            (
1099                iterator_test_table_key_of(1),
1100                SharedBufferValue::Insert(Bytes::from("value1")),
1101            ),
1102            (
1103                iterator_test_table_key_of(2),
1104                SharedBufferValue::Update(Bytes::from("value2")),
1105            ),
1106            (
1107                iterator_test_table_key_of(3),
1108                SharedBufferValue::Insert(Bytes::from("value3")),
1109            ),
1110            (iterator_test_table_key_of(4), SharedBufferValue::Delete),
1111        ];
1112        let old_values = vec![
1113            Bytes::new(),
1114            Bytes::from("old_value2"),
1115            Bytes::new(),
1116            Bytes::from("old_value4"),
1117        ];
1118        let shared_buffer_batch = SharedBufferBatch::for_test_with_old_values(
1119            transform_shared_buffer(key_values.clone()),
1120            old_values.clone(),
1121            epoch,
1122            Default::default(),
1123        );
1124        let shared_buffer_items = to_hummock_value_batch(key_values.clone());
1125        let expected_old_value_iter_items = zip_eq(&key_values, &old_values)
1126            .filter(|((_, new_value), _)| !matches!(new_value, SharedBufferValue::Insert(_)))
1127            .map(|((key, _), old_value)| (key.clone(), HummockValue::Put(old_value)))
1128            .collect_vec();
1129
1130        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1131        iter.rewind().await.unwrap();
1132        for item in &expected_old_value_iter_items {
1133            assert!(iter.is_valid());
1134            assert_eq!(*iter.key().user_key.table_key, item.0);
1135            assert_eq!(iter.value(), item.1.as_slice());
1136            iter.next().await.unwrap();
1137        }
1138        assert!(!iter.is_valid());
1139
1140        // FORWARD: Seek to a key < 1st key, expect all three items to return
1141        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1142        iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1143            .await
1144            .unwrap();
1145        for item in &shared_buffer_items {
1146            assert!(iter.is_valid());
1147            assert_eq!(*iter.key().user_key.table_key, item.0);
1148            assert_eq!(iter.value(), item.1.as_slice());
1149            iter.next().await.unwrap();
1150        }
1151        assert!(!iter.is_valid());
1152
1153        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1154        iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1155            .await
1156            .unwrap();
1157        for item in &expected_old_value_iter_items {
1158            assert!(iter.is_valid());
1159            assert_eq!(*iter.key().user_key.table_key, item.0);
1160            assert_eq!(iter.value(), item.1.as_slice());
1161            iter.next().await.unwrap();
1162        }
1163        assert!(!iter.is_valid());
1164
1165        // FORWARD: Seek to a key > the last key, expect no items to return
1166        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1167        iter.seek(iterator_test_key_of_epoch(5, epoch).to_ref())
1168            .await
1169            .unwrap();
1170        assert!(!iter.is_valid());
1171
1172        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1173        iter.seek(iterator_test_key_of_epoch(5, epoch).to_ref())
1174            .await
1175            .unwrap();
1176        assert!(!iter.is_valid());
1177
1178        // FORWARD: Seek to 2nd key with current epoch, expect last two items to return
1179        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1180        iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1181            .await
1182            .unwrap();
1183        for item in &shared_buffer_items[1..] {
1184            assert!(iter.is_valid());
1185            assert_eq!(*iter.key().user_key.table_key, item.0);
1186            assert_eq!(iter.value(), item.1.as_slice());
1187            iter.next().await.unwrap();
1188        }
1189        assert!(!iter.is_valid());
1190
1191        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1192        iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1193            .await
1194            .unwrap();
1195        for item in &expected_old_value_iter_items {
1196            assert!(iter.is_valid());
1197            assert_eq!(*iter.key().user_key.table_key, item.0);
1198            assert_eq!(iter.value(), item.1.as_slice());
1199            iter.next().await.unwrap();
1200        }
1201        assert!(!iter.is_valid());
1202
1203        // FORWARD: Seek to 2nd key with future epoch, expect last two items to return
1204        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1205        iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1206            .await
1207            .unwrap();
1208        for item in &shared_buffer_items[1..] {
1209            assert!(iter.is_valid());
1210            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1211            assert_eq!(iter.value(), item.1.as_slice());
1212            iter.next().await.unwrap();
1213        }
1214        assert!(!iter.is_valid());
1215
1216        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1217        iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1218            .await
1219            .unwrap();
1220        for item in &expected_old_value_iter_items {
1221            assert!(iter.is_valid());
1222            assert_eq!(*iter.key().user_key.table_key, item.0);
1223            assert_eq!(iter.value(), item.1.as_slice());
1224            iter.next().await.unwrap();
1225        }
1226        assert!(!iter.is_valid());
1227
1228        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1229        iter.seek(iterator_test_key_of_epoch(2, epoch.prev_epoch()).to_ref())
1230            .await
1231            .unwrap();
1232        for item in &expected_old_value_iter_items[1..] {
1233            assert!(iter.is_valid());
1234            assert_eq!(*iter.key().user_key.table_key, item.0);
1235            assert_eq!(iter.value(), item.1.as_slice());
1236            iter.next().await.unwrap();
1237        }
1238        assert!(!iter.is_valid());
1239
1240        // FORWARD: Seek to 2nd key with old epoch, expect last item to return
1241        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1242        iter.seek(iterator_test_key_of_epoch(3, epoch.prev_epoch()).to_ref())
1243            .await
1244            .unwrap();
1245        let item = shared_buffer_items.last().unwrap();
1246        assert!(iter.is_valid());
1247        assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1248        assert_eq!(iter.value(), item.1.as_slice());
1249        iter.next().await.unwrap();
1250        assert!(!iter.is_valid());
1251
1252        // Seek to an insert key
1253        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1254        iter.seek(iterator_test_key_of_epoch(3, epoch).to_ref())
1255            .await
1256            .unwrap();
1257        for item in &expected_old_value_iter_items[1..] {
1258            assert!(iter.is_valid());
1259            assert_eq!(*iter.key().user_key.table_key, item.0);
1260            assert_eq!(iter.value(), item.1.as_slice());
1261            iter.next().await.unwrap();
1262        }
1263        assert!(!iter.is_valid());
1264    }
1265
1266    #[tokio::test]
1267    #[should_panic]
1268    async fn test_invalid_table_id() {
1269        let epoch = test_epoch(1);
1270        let shared_buffer_batch = SharedBufferBatch::for_test(vec![], epoch, Default::default());
1271        // Seeking to non-current epoch should panic
1272        let mut iter = shared_buffer_batch.into_forward_iter();
1273        iter.seek(FullKey::for_test(TableId::new(1), vec![], epoch).to_ref())
1274            .await
1275            .unwrap();
1276    }
1277
1278    #[tokio::test]
1279    async fn test_shared_buffer_batch_range_existx() {
1280        let epoch = test_epoch(1);
1281        let shared_buffer_items = vec![
1282            (
1283                Vec::from("a_1"),
1284                SharedBufferValue::Insert(Bytes::from("value1")),
1285            ),
1286            (
1287                Vec::from("a_3"),
1288                SharedBufferValue::Insert(Bytes::from("value2")),
1289            ),
1290            (
1291                Vec::from("a_5"),
1292                SharedBufferValue::Insert(Bytes::from("value3")),
1293            ),
1294            (
1295                Vec::from("b_2"),
1296                SharedBufferValue::Insert(Bytes::from("value3")),
1297            ),
1298        ];
1299        let shared_buffer_batch = SharedBufferBatch::for_test(
1300            transform_shared_buffer(shared_buffer_items),
1301            epoch,
1302            Default::default(),
1303        );
1304
1305        let range = (Included(Bytes::from("a")), Excluded(Bytes::from("b")));
1306        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1307        let range = (Included(Bytes::from("a_")), Excluded(Bytes::from("b_")));
1308        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1309        let range = (Included(Bytes::from("a_1")), Included(Bytes::from("a_1")));
1310        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1311        let range = (Included(Bytes::from("a_1")), Included(Bytes::from("a_2")));
1312        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1313        let range = (Included(Bytes::from("a_0x")), Included(Bytes::from("a_2x")));
1314        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1315        let range = (Included(Bytes::from("a_")), Excluded(Bytes::from("c_")));
1316        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1317        let range = (Included(Bytes::from("b_0x")), Included(Bytes::from("b_2x")));
1318        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1319        let range = (Included(Bytes::from("b_2")), Excluded(Bytes::from("c_1x")));
1320        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1321
1322        let range = (Included(Bytes::from("a_0")), Excluded(Bytes::from("a_1")));
1323        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1324        let range = (Included(Bytes::from("a__0")), Excluded(Bytes::from("a__5")));
1325        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1326        let range = (Included(Bytes::from("b_1")), Excluded(Bytes::from("b_2")));
1327        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1328        let range = (Included(Bytes::from("b_3")), Excluded(Bytes::from("c_1")));
1329        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1330        let range = (Included(Bytes::from("b__x")), Excluded(Bytes::from("c__x")));
1331        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1332    }
1333
1334    #[tokio::test]
1335    async fn test_merge_imms_basic() {
1336        let table_id = TableId::new(1004);
1337        let shared_buffer_items1: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1338            (
1339                iterator_test_table_key_of(1),
1340                SharedBufferValue::Insert(Bytes::from("value1")),
1341            ),
1342            (
1343                iterator_test_table_key_of(2),
1344                SharedBufferValue::Insert(Bytes::from("value2")),
1345            ),
1346            (
1347                iterator_test_table_key_of(3),
1348                SharedBufferValue::Insert(Bytes::from("value3")),
1349            ),
1350        ];
1351        let epoch = test_epoch(1);
1352        let imm1 = SharedBufferBatch::for_test(
1353            transform_shared_buffer(shared_buffer_items1.clone()),
1354            epoch,
1355            table_id,
1356        );
1357        let shared_buffer_items1 = to_hummock_value_batch(shared_buffer_items1);
1358        let shared_buffer_items2: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1359            (
1360                iterator_test_table_key_of(1),
1361                SharedBufferValue::Insert(Bytes::from("value12")),
1362            ),
1363            (
1364                iterator_test_table_key_of(2),
1365                SharedBufferValue::Insert(Bytes::from("value22")),
1366            ),
1367            (
1368                iterator_test_table_key_of(3),
1369                SharedBufferValue::Insert(Bytes::from("value32")),
1370            ),
1371        ];
1372        let epoch = test_epoch(2);
1373        let imm2 = SharedBufferBatch::for_test(
1374            transform_shared_buffer(shared_buffer_items2.clone()),
1375            epoch,
1376            table_id,
1377        );
1378        let shared_buffer_items2 = to_hummock_value_batch(shared_buffer_items2);
1379
1380        let shared_buffer_items3: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1381            (
1382                iterator_test_table_key_of(1),
1383                SharedBufferValue::Insert(Bytes::from("value13")),
1384            ),
1385            (
1386                iterator_test_table_key_of(2),
1387                SharedBufferValue::Insert(Bytes::from("value23")),
1388            ),
1389            (
1390                iterator_test_table_key_of(3),
1391                SharedBufferValue::Insert(Bytes::from("value33")),
1392            ),
1393        ];
1394        let epoch = test_epoch(3);
1395        let imm3 = SharedBufferBatch::for_test(
1396            transform_shared_buffer(shared_buffer_items3.clone()),
1397            epoch,
1398            table_id,
1399        );
1400        let shared_buffer_items3 = to_hummock_value_batch(shared_buffer_items3);
1401
1402        let batch_items = [
1403            shared_buffer_items1,
1404            shared_buffer_items2,
1405            shared_buffer_items3,
1406        ];
1407        // newer data comes first
1408        let imms = vec![imm3, imm2, imm1];
1409        let merged_imm = merge_imms_in_memory(table_id, imms.clone(), None).await;
1410
1411        // Point lookup
1412        for (i, items) in batch_items.iter().enumerate() {
1413            for (key, value) in items {
1414                assert_eq!(
1415                    merged_imm
1416                        .get(
1417                            TableKey(key.as_slice()),
1418                            test_epoch(i as u64 + 1),
1419                            &ReadOptions::default()
1420                        )
1421                        .unwrap()
1422                        .0
1423                        .as_slice(),
1424                    value.as_slice(),
1425                    "epoch: {}, key: {:?}",
1426                    test_epoch(i as u64 + 1),
1427                    String::from_utf8(key.clone())
1428                );
1429            }
1430        }
1431        assert_eq!(
1432            merged_imm.get(
1433                TableKey(iterator_test_table_key_of(4).as_slice()),
1434                test_epoch(1),
1435                &ReadOptions::default()
1436            ),
1437            None
1438        );
1439        assert_eq!(
1440            merged_imm.get(
1441                TableKey(iterator_test_table_key_of(5).as_slice()),
1442                test_epoch(1),
1443                &ReadOptions::default()
1444            ),
1445            None
1446        );
1447
1448        // Forward iterator
1449        for snapshot_epoch in 1..=3 {
1450            let mut iter = merged_imm.clone().into_forward_iter();
1451            iter.rewind().await.unwrap();
1452            let mut output = vec![];
1453            while iter.is_valid() {
1454                let epoch = iter.key().epoch_with_gap.pure_epoch();
1455                if test_epoch(snapshot_epoch) == epoch {
1456                    output.push((
1457                        iter.key().user_key.table_key.to_vec(),
1458                        iter.value().to_bytes(),
1459                    ));
1460                }
1461                iter.next().await.unwrap();
1462            }
1463            assert_eq!(output, batch_items[snapshot_epoch as usize - 1]);
1464        }
1465
1466        // Forward and Backward iterator
1467        {
1468            let mut iter = merged_imm.clone().into_forward_iter();
1469            iter.rewind().await.unwrap();
1470            let mut output = vec![];
1471            while iter.is_valid() {
1472                output.push((
1473                    iter.key().user_key.table_key.to_vec(),
1474                    iter.value().to_bytes(),
1475                ));
1476                iter.next().await.unwrap();
1477            }
1478
1479            let mut expected = vec![];
1480            #[expect(clippy::needless_range_loop)]
1481            for key_idx in 0..=2 {
1482                for epoch in (1..=3).rev() {
1483                    let item = batch_items[epoch - 1][key_idx].clone();
1484                    expected.push(item);
1485                }
1486            }
1487            assert_eq!(expected, output);
1488
1489            let mut backward_iter = merged_imm.clone().into_backward_iter();
1490            backward_iter.rewind().await.unwrap();
1491            let mut output = vec![];
1492            while backward_iter.is_valid() {
1493                output.push((
1494                    backward_iter.key().user_key.table_key.to_vec(),
1495                    backward_iter.value().to_bytes(),
1496                ));
1497                backward_iter.next().await.unwrap();
1498            }
1499            let mut expected = vec![];
1500            for key_idx in (0..=2).rev() {
1501                for epoch in (1..=3).rev() {
1502                    let item = batch_items[epoch - 1][key_idx].clone();
1503                    expected.push(item);
1504                }
1505            }
1506            assert_eq!(expected, output);
1507        }
1508    }
1509
1510    #[tokio::test]
1511    async fn test_merge_imms_with_old_values() {
1512        let table_id = TableId::new(1004);
1513        let key_value1: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1514            (
1515                iterator_test_table_key_of(1),
1516                SharedBufferValue::Insert(Bytes::from("value1")),
1517            ),
1518            (
1519                iterator_test_table_key_of(2),
1520                SharedBufferValue::Update(Bytes::from("value2")),
1521            ),
1522            (iterator_test_table_key_of(3), SharedBufferValue::Delete),
1523        ];
1524        let old_value1 = vec![
1525            Bytes::new(),
1526            Bytes::from("old_value2"),
1527            Bytes::from("old_value3"),
1528        ];
1529        let epoch = test_epoch(1);
1530        let imm1 = SharedBufferBatch::for_test_with_old_values(
1531            transform_shared_buffer(key_value1.clone()),
1532            old_value1.clone(),
1533            epoch,
1534            table_id,
1535        );
1536        let shared_buffer_items1 = to_hummock_value_batch(key_value1.clone());
1537        let key_value2: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1538            (
1539                iterator_test_table_key_of(1),
1540                SharedBufferValue::Update(Bytes::from("value12")),
1541            ),
1542            (
1543                iterator_test_table_key_of(2),
1544                SharedBufferValue::Update(Bytes::from("value22")),
1545            ),
1546            (
1547                iterator_test_table_key_of(3),
1548                SharedBufferValue::Insert(Bytes::from("value32")),
1549            ),
1550        ];
1551        let old_value2 = vec![Bytes::from("value1"), Bytes::from("value2"), Bytes::new()];
1552        let epoch = epoch.next_epoch();
1553        let imm2 = SharedBufferBatch::for_test_with_old_values(
1554            transform_shared_buffer(key_value2.clone()),
1555            old_value2.clone(),
1556            epoch,
1557            table_id,
1558        );
1559        let shared_buffer_items2 = to_hummock_value_batch(key_value2.clone());
1560
1561        let key_value3: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1562            (iterator_test_table_key_of(1), SharedBufferValue::Delete),
1563            (iterator_test_table_key_of(2), SharedBufferValue::Delete),
1564            (
1565                iterator_test_table_key_of(3),
1566                SharedBufferValue::Update(Bytes::from("value33")),
1567            ),
1568        ];
1569        let old_value3 = vec![
1570            Bytes::from("value12"),
1571            Bytes::from("value22"),
1572            Bytes::from("value32"),
1573        ];
1574        let epoch = epoch.next_epoch();
1575        let imm3 = SharedBufferBatch::for_test_with_old_values(
1576            transform_shared_buffer(key_value3.clone()),
1577            old_value3.clone(),
1578            epoch,
1579            table_id,
1580        );
1581        let shared_buffer_items3 = to_hummock_value_batch(key_value3.clone());
1582
1583        let key_values = [
1584            (key_value1, old_value1),
1585            (key_value2, old_value2),
1586            (key_value3, old_value3),
1587        ];
1588
1589        let batch_items = [
1590            shared_buffer_items1,
1591            shared_buffer_items2,
1592            shared_buffer_items3,
1593        ];
1594        // newer data comes first
1595        let imms = vec![imm3, imm2, imm1];
1596        let merged_imm = merge_imms_in_memory(table_id, imms.clone(), None).await;
1597
1598        // Point lookup
1599        for (i, items) in batch_items.iter().enumerate() {
1600            for (key, value) in items {
1601                assert_eq!(
1602                    merged_imm
1603                        .get(
1604                            TableKey(key.as_slice()),
1605                            test_epoch(i as u64 + 1),
1606                            &ReadOptions::default()
1607                        )
1608                        .unwrap()
1609                        .0
1610                        .as_slice(),
1611                    value.as_slice(),
1612                    "epoch: {}, key: {:?}",
1613                    test_epoch(i as u64 + 1),
1614                    String::from_utf8(key.clone())
1615                );
1616            }
1617        }
1618        assert_eq!(
1619            merged_imm.get(
1620                TableKey(iterator_test_table_key_of(4).as_slice()),
1621                test_epoch(1),
1622                &ReadOptions::default()
1623            ),
1624            None
1625        );
1626        assert_eq!(
1627            merged_imm.get(
1628                TableKey(iterator_test_table_key_of(5).as_slice()),
1629                test_epoch(1),
1630                &ReadOptions::default()
1631            ),
1632            None
1633        );
1634
1635        // Forward i
1636        for i in 1..=3 {
1637            let snapshot_epoch = test_epoch(i);
1638            let mut iter = merged_imm.clone().into_forward_iter();
1639            iter.rewind().await.unwrap();
1640            let mut output = vec![];
1641            while iter.is_valid() {
1642                let epoch = iter.key().epoch_with_gap.pure_epoch();
1643                if snapshot_epoch == epoch {
1644                    output.push((
1645                        iter.key().user_key.table_key.to_vec(),
1646                        iter.value().to_bytes(),
1647                    ));
1648                }
1649                iter.next().await.unwrap();
1650            }
1651            assert_eq!(output, batch_items[i as usize - 1]);
1652        }
1653
1654        // Forward and Backward iterator
1655        {
1656            let mut iter = merged_imm.clone().into_forward_iter();
1657            iter.rewind().await.unwrap();
1658            let mut output = vec![];
1659            while iter.is_valid() {
1660                output.push((
1661                    iter.key().user_key.table_key.to_vec(),
1662                    iter.value().to_bytes(),
1663                ));
1664                iter.next().await.unwrap();
1665            }
1666
1667            let mut expected = vec![];
1668            #[expect(clippy::needless_range_loop)]
1669            for key_idx in 0..=2 {
1670                for epoch in (1..=3).rev() {
1671                    let item = batch_items[epoch - 1][key_idx].clone();
1672                    expected.push(item);
1673                }
1674            }
1675            assert_eq!(expected, output);
1676
1677            let mut backward_iter = merged_imm.clone().into_backward_iter();
1678            backward_iter.rewind().await.unwrap();
1679            let mut output = vec![];
1680            while backward_iter.is_valid() {
1681                output.push((
1682                    backward_iter.key().user_key.table_key.to_vec(),
1683                    backward_iter.value().to_bytes(),
1684                ));
1685                backward_iter.next().await.unwrap();
1686            }
1687            let mut expected = vec![];
1688            for key_idx in (0..=2).rev() {
1689                for epoch in (1..=3).rev() {
1690                    let item = batch_items[epoch - 1][key_idx].clone();
1691                    expected.push(item);
1692                }
1693            }
1694            assert_eq!(expected, output);
1695        }
1696
1697        // old value iter
1698        {
1699            let mut iter = merged_imm.clone().into_old_value_iter();
1700            iter.rewind().await.unwrap();
1701            let mut output = vec![];
1702            while iter.is_valid() {
1703                output.push((
1704                    iter.key().user_key.table_key.to_vec(),
1705                    iter.value().to_bytes(),
1706                ));
1707                iter.next().await.unwrap();
1708            }
1709
1710            let mut expected = vec![];
1711            for key_idx in 0..=2 {
1712                for epoch in (0..=2).rev() {
1713                    let (key_values, old_values) = &key_values[epoch];
1714                    let (key, new_value) = &key_values[key_idx];
1715                    let old_value = &old_values[key_idx];
1716                    if matches!(new_value, SharedBufferValue::Insert(_)) {
1717                        continue;
1718                    }
1719                    expected.push((key.clone(), HummockValue::Put(old_value.clone())));
1720                }
1721            }
1722            assert_eq!(expected, output);
1723        }
1724    }
1725    #[tokio::test]
1726    async fn test_shared_buffer_batch_seek_bug() {
1727        // Reproduce the bug where seek falls through to binary_search when table_id mismatch
1728        let epoch = test_epoch(1);
1729        let table_id = TableId::new(100);
1730        let shared_buffer_items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![(
1731            iterator_test_table_key_of(1), // "key_test_000...001"
1732            SharedBufferValue::Insert(Bytes::from("value1")),
1733        )];
1734        let shared_buffer_batch = SharedBufferBatch::for_test(
1735            transform_shared_buffer(shared_buffer_items.clone()),
1736            epoch,
1737            table_id,
1738        );
1739
1740        // Case 1: Seek with smaller TableId (99), but larger TableKey ("key_test_000...002").
1741        // "key_test...002" > "key_test...001".
1742        // Forward Iterator.
1743        // Expected: Should land on the first item (TableId 100 > TableId 99).
1744        // Bug description: rewinds (index 0), then binary_search("key_2") in batch (only "key_1").
1745        // "key_2" > "key_1", so index becomes 1 (end). Iterator invalid.
1746        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1747        let seek_key = FullKey::for_test(
1748            TableId::new(99),
1749            iterator_test_table_key_of(2), // larger key
1750            epoch,
1751        );
1752        iter.seek(seek_key.to_ref()).await.unwrap();
1753
1754        assert!(
1755            iter.is_valid(),
1756            "Iterator should be valid when seeking with smaller table_id, even if the key part is larger"
1757        );
1758        assert_eq!(iter.key().user_key.table_id, table_id);
1759
1760        // Case 2: Seek with larger TableId (101), but smaller TableKey ("key_test_000...000").
1761        // "key_test...000" < "key_test...001".
1762        // Backward Iterator.
1763        // Expected: Should land on the last item (TableId 100 < TableId 101).
1764        // Bug description: rewinds (index valid), then binary_search("key_0") which returns Err(0) (insertion point at start).
1765        // Backward generic logic for Err(0) is `invalidate()`.
1766        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1767        let seek_key = FullKey::for_test(
1768            TableId::new(101),
1769            iterator_test_table_key_of(0), // smaller key
1770            epoch,
1771        );
1772        iter.seek(seek_key.to_ref()).await.unwrap();
1773
1774        assert!(
1775            iter.is_valid(),
1776            "Iterator should be valid when seeking with larger table_id, even if the key part is smaller"
1777        );
1778        assert_eq!(iter.key().user_key.table_id, table_id);
1779    }
1780}