risingwave_storage/hummock/shared_buffer/
shared_buffer_batch.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::mem::size_of_val;
19use std::ops::Bound::Included;
20use std::ops::{Bound, RangeBounds};
21use std::sync::atomic::AtomicU64;
22use std::sync::atomic::Ordering::Relaxed;
23use std::sync::{Arc, LazyLock};
24
25use bytes::Bytes;
26use risingwave_common::catalog::TableId;
27use risingwave_common::metrics::LabelGuardedIntGauge;
28use risingwave_hummock_sdk::EpochWithGap;
29use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange, UserKey};
30
31use crate::hummock::iterator::{
32    Backward, DirectionEnum, Forward, HummockIterator, HummockIteratorDirection, ValueMeta,
33};
34use crate::hummock::shared_buffer::TableMemoryMetrics;
35use crate::hummock::utils::range_overlap;
36use crate::hummock::value::HummockValue;
37use crate::hummock::{HummockEpoch, HummockResult};
38use crate::mem_table::ImmId;
39use crate::store::ReadOptions;
40
41#[derive(Clone, Copy, Debug, PartialEq)]
42pub enum SharedBufferValue<T> {
43    Insert(T),
44    Update(T),
45    Delete,
46}
47
48impl<T> SharedBufferValue<T> {
49    fn to_ref(&self) -> SharedBufferValue<&T> {
50        match self {
51            SharedBufferValue::Insert(val) => SharedBufferValue::Insert(val),
52            SharedBufferValue::Update(val) => SharedBufferValue::Update(val),
53            SharedBufferValue::Delete => SharedBufferValue::Delete,
54        }
55    }
56}
57
58impl<T> From<SharedBufferValue<T>> for HummockValue<T> {
59    fn from(val: SharedBufferValue<T>) -> HummockValue<T> {
60        match val {
61            SharedBufferValue::Insert(val) | SharedBufferValue::Update(val) => {
62                HummockValue::Put(val)
63            }
64            SharedBufferValue::Delete => HummockValue::Delete,
65        }
66    }
67}
68
69impl<'a, T: AsRef<[u8]>> SharedBufferValue<&'a T> {
70    pub(crate) fn to_slice(self) -> SharedBufferValue<&'a [u8]> {
71        match self {
72            SharedBufferValue::Insert(val) => SharedBufferValue::Insert(val.as_ref()),
73            SharedBufferValue::Update(val) => SharedBufferValue::Update(val.as_ref()),
74            SharedBufferValue::Delete => SharedBufferValue::Delete,
75        }
76    }
77}
78
79/// The key is `table_key`, which does not contain table id or epoch.
80pub(crate) type SharedBufferItem = (TableKey<Bytes>, SharedBufferValue<Bytes>);
81pub type SharedBufferBatchId = u64;
82
83pub(crate) type VersionedSharedBufferValue = (EpochWithGap, SharedBufferValue<Bytes>);
84
85pub(crate) struct SharedBufferVersionedEntryRef<'a> {
86    pub(crate) key: &'a TableKey<Bytes>,
87    pub(crate) new_values: &'a [VersionedSharedBufferValue],
88    pub(crate) old_values: Option<&'a [Bytes]>,
89}
90
91#[derive(PartialEq, Debug)]
92pub(crate) struct SharedBufferKeyEntry {
93    pub(crate) key: TableKey<Bytes>,
94    /// A shared buffer may contain data from multiple epochs for a specific key.
95    /// The values of all keys are stored together in the field `new_values` of `SharedBufferBatchInner`
96    /// as a single vector. `value_offset` is the starting offset of values of the current `key` in the `new_values` vector.
97    /// The end offset is the `value_offset` of the next entry or the vector end if the current entry is not the last one.
98    pub(crate) value_offset: usize,
99}
100
101impl SharedBufferKeyEntry {
102    /// Return an exclusive offset of the values of key of index `i`
103    fn value_end_offset<'a, T>(
104        i: usize,
105        entries: &'a [SharedBufferKeyEntry],
106        values: &'a [T],
107    ) -> usize {
108        entries
109            .get(i + 1)
110            .map(|entry| entry.value_offset)
111            .unwrap_or(values.len())
112    }
113
114    fn values<'a, T>(i: usize, entries: &'a [SharedBufferKeyEntry], values: &'a [T]) -> &'a [T] {
115        &values[entries[i].value_offset..Self::value_end_offset(i, entries, values)]
116    }
117}
118
119#[derive(Debug)]
120pub(crate) struct SharedBufferBatchOldValues {
121    /// Store the old values. If some, the length should be the same as `new_values`. It contains empty `Bytes` when the
122    /// corresponding `new_value` is `Insert`, and contains the old values of `Update` and `Delete`.
123    values: Vec<Bytes>,
124    pub size: usize,
125    pub global_old_value_size: LabelGuardedIntGauge,
126}
127
128impl Drop for SharedBufferBatchOldValues {
129    fn drop(&mut self) {
130        self.global_old_value_size.sub(self.size as _);
131    }
132}
133
134impl SharedBufferBatchOldValues {
135    pub(crate) fn new(
136        values: Vec<Bytes>,
137        size: usize,
138        global_old_value_size: LabelGuardedIntGauge,
139    ) -> Self {
140        global_old_value_size.add(size as _);
141        Self {
142            values,
143            size,
144            global_old_value_size,
145        }
146    }
147
148    pub(crate) fn for_test(values: Vec<Bytes>, size: usize) -> Self {
149        Self::new(values, size, LabelGuardedIntGauge::test_int_gauge::<1>())
150    }
151}
152
153#[derive(Debug)]
154pub(crate) struct SharedBufferBatchInner {
155    entries: Vec<SharedBufferKeyEntry>,
156    new_values: Vec<VersionedSharedBufferValue>,
157    old_values: Option<SharedBufferBatchOldValues>,
158    /// The epochs of the data in batch, sorted in ascending order (old to new)
159    epochs: Vec<HummockEpoch>,
160    /// Total size of all key-value items (excluding the `epoch` of value versions)
161    size: usize,
162    per_table_tracker: Arc<TableMemoryMetrics>,
163    /// For a batch created from multiple batches, this will be
164    /// the largest batch id among input batches
165    batch_id: SharedBufferBatchId,
166}
167
168impl SharedBufferBatchInner {
169    pub(crate) fn new(
170        epoch: HummockEpoch,
171        spill_offset: u16,
172        payload: Vec<SharedBufferItem>,
173        old_values: Option<SharedBufferBatchOldValues>,
174        size: usize,
175        table_metrics: Arc<TableMemoryMetrics>,
176    ) -> Self {
177        assert!(!payload.is_empty());
178        debug_assert!(payload.iter().is_sorted_by_key(|(key, _)| key));
179        if let Some(old_values) = &old_values {
180            assert_eq!(old_values.values.len(), payload.len());
181        }
182
183        let epoch_with_gap = EpochWithGap::new(epoch, spill_offset);
184        let mut entries = Vec::with_capacity(payload.len());
185        let mut new_values = Vec::with_capacity(payload.len());
186        for (i, (key, value)) in payload.into_iter().enumerate() {
187            entries.push(SharedBufferKeyEntry {
188                key,
189                value_offset: i,
190            });
191            new_values.push((epoch_with_gap, value));
192        }
193
194        let batch_id = SHARED_BUFFER_BATCH_ID_GENERATOR.fetch_add(1, Relaxed);
195
196        table_metrics.inc_imm(size);
197
198        SharedBufferBatchInner {
199            entries,
200            new_values,
201            old_values,
202            epochs: vec![epoch],
203            size,
204            per_table_tracker: table_metrics,
205            batch_id,
206        }
207    }
208
209    pub fn values(&self, i: usize) -> &[VersionedSharedBufferValue] {
210        SharedBufferKeyEntry::values(i, &self.entries, &self.new_values)
211    }
212
213    pub(crate) fn new_with_multi_epoch_batches(
214        epochs: Vec<HummockEpoch>,
215        entries: Vec<SharedBufferKeyEntry>,
216        new_values: Vec<VersionedSharedBufferValue>,
217        old_values: Option<SharedBufferBatchOldValues>,
218        size: usize,
219        imm_id: ImmId,
220    ) -> Self {
221        assert!(new_values.len() >= entries.len());
222        assert!(!entries.is_empty());
223        debug_assert!(entries.iter().is_sorted_by_key(|entry| &entry.key));
224        debug_assert!(entries.iter().is_sorted_by_key(|entry| &entry.value_offset));
225        debug_assert!((0..entries.len()).all(|i| {
226            SharedBufferKeyEntry::values(i, &entries, &new_values)
227                .iter()
228                .rev()
229                .is_sorted_by_key(|(epoch_with_gap, _)| epoch_with_gap)
230        }));
231        debug_assert!(!epochs.is_empty());
232        debug_assert!(epochs.is_sorted());
233
234        Self {
235            entries,
236            new_values,
237            old_values,
238            epochs,
239            size,
240            per_table_tracker: TableMemoryMetrics::for_test(),
241            batch_id: imm_id,
242        }
243    }
244
245    /// Return `None` if cannot find a visible version
246    /// Return `HummockValue::Delete` if the key has been deleted by some epoch <= `read_epoch`
247    fn get_value<'a>(
248        &'a self,
249        table_key: TableKey<&[u8]>,
250        read_epoch: HummockEpoch,
251    ) -> Option<(HummockValue<&'a Bytes>, EpochWithGap)> {
252        // Perform binary search on table key to find the corresponding entry
253        if let Ok(i) = self
254            .entries
255            .binary_search_by(|m| (m.key.as_ref()).cmp(*table_key))
256        {
257            let entry = &self.entries[i];
258            assert_eq!(entry.key.as_ref(), *table_key);
259            // Scan to find the first version <= epoch
260            for (e, v) in self.values(i) {
261                // skip invisible versions
262                if read_epoch < e.pure_epoch() {
263                    continue;
264                }
265                return Some((v.to_ref().into(), *e));
266            }
267            // cannot find a visible version
268        }
269
270        None
271    }
272}
273
274impl PartialEq for SharedBufferBatchInner {
275    fn eq(&self, other: &Self) -> bool {
276        self.entries == other.entries && self.new_values == other.new_values
277    }
278}
279
280impl Drop for SharedBufferBatchInner {
281    fn drop(&mut self) {
282        self.per_table_tracker.dec_imm(self.size);
283    }
284}
285
286pub static SHARED_BUFFER_BATCH_ID_GENERATOR: LazyLock<AtomicU64> =
287    LazyLock::new(|| AtomicU64::new(0));
288
289/// A write batch stored in the shared buffer.
290#[derive(Clone, Debug)]
291pub struct SharedBufferBatch {
292    pub(crate) inner: Arc<SharedBufferBatchInner>,
293    pub table_id: TableId,
294}
295
296impl SharedBufferBatch {
297    pub fn for_test(
298        sorted_items: Vec<SharedBufferItem>,
299        epoch: HummockEpoch,
300        table_id: TableId,
301    ) -> Self {
302        Self::for_test_inner(sorted_items, None, epoch, table_id)
303    }
304
305    pub fn for_test_with_old_values(
306        sorted_items: Vec<SharedBufferItem>,
307        old_values: Vec<Bytes>,
308        epoch: HummockEpoch,
309        table_id: TableId,
310    ) -> Self {
311        Self::for_test_inner(sorted_items, Some(old_values), epoch, table_id)
312    }
313
314    fn for_test_inner(
315        sorted_items: Vec<SharedBufferItem>,
316        old_values: Option<Vec<Bytes>>,
317        epoch: HummockEpoch,
318        table_id: TableId,
319    ) -> Self {
320        let (size, old_value_size) = Self::measure_batch_size(&sorted_items, old_values.as_deref());
321
322        let old_values = old_values
323            .map(|old_values| SharedBufferBatchOldValues::for_test(old_values, old_value_size));
324
325        Self {
326            inner: Arc::new(SharedBufferBatchInner::new(
327                epoch,
328                0,
329                sorted_items,
330                old_values,
331                size,
332                TableMemoryMetrics::for_test(),
333            )),
334            table_id,
335        }
336    }
337
338    pub fn measure_delete_range_size(batch_items: &[(Bound<Bytes>, Bound<Bytes>)]) -> usize {
339        batch_items
340            .iter()
341            .map(|(left, right)| {
342                // is_exclude_left_key(bool) + table_id + epoch
343                let l1 = match left {
344                    Bound::Excluded(x) | Bound::Included(x) => x.len() + 13,
345                    Bound::Unbounded => 13,
346                };
347                let l2 = match right {
348                    Bound::Excluded(x) | Bound::Included(x) => x.len() + 13,
349                    Bound::Unbounded => 13,
350                };
351                l1 + l2
352            })
353            .sum()
354    }
355
356    /// Return (total size, old value size or 0)
357    pub fn measure_batch_size(
358        batch_items: &[SharedBufferItem],
359        old_values: Option<&[Bytes]>,
360    ) -> (usize, usize) {
361        let old_value_size = old_values
362            .iter()
363            .flat_map(|slice| slice.iter().map(|value| size_of_val(value) + value.len()))
364            .sum::<usize>();
365        // size = Sum(length of full key + length of user value)
366        let kv_size = batch_items
367            .iter()
368            .map(|(k, v)| {
369                k.len() + {
370                    match v {
371                        SharedBufferValue::Insert(val) | SharedBufferValue::Update(val) => {
372                            val.len()
373                        }
374                        SharedBufferValue::Delete => 0,
375                    }
376                }
377            })
378            .sum::<usize>();
379        (kv_size + old_value_size, old_value_size)
380    }
381
382    pub fn filter<R, B>(&self, table_id: TableId, table_key_range: &R) -> bool
383    where
384        R: RangeBounds<TableKey<B>>,
385        B: AsRef<[u8]>,
386    {
387        let left = table_key_range
388            .start_bound()
389            .as_ref()
390            .map(|key| TableKey(key.0.as_ref()));
391        let right = table_key_range
392            .end_bound()
393            .as_ref()
394            .map(|key| TableKey(key.0.as_ref()));
395        self.table_id == table_id
396            && range_overlap(
397                &(left, right),
398                &self.start_table_key(),
399                Included(&self.end_table_key()),
400            )
401    }
402
403    pub fn table_id(&self) -> TableId {
404        self.table_id
405    }
406
407    pub fn min_epoch(&self) -> HummockEpoch {
408        *self.inner.epochs.first().unwrap()
409    }
410
411    pub fn max_epoch(&self) -> HummockEpoch {
412        *self.inner.epochs.last().unwrap()
413    }
414
415    pub fn key_count(&self) -> usize {
416        self.inner.entries.len()
417    }
418
419    pub fn value_count(&self) -> usize {
420        self.inner.new_values.len()
421    }
422
423    pub fn has_old_value(&self) -> bool {
424        self.inner.old_values.is_some()
425    }
426
427    pub fn get<'a>(
428        &'a self,
429        table_key: TableKey<&[u8]>,
430        read_epoch: HummockEpoch,
431        _read_options: &ReadOptions,
432    ) -> Option<(HummockValue<&'a Bytes>, EpochWithGap)> {
433        self.inner.get_value(table_key, read_epoch)
434    }
435
436    pub fn range_exists(&self, table_key_range: &TableKeyRange) -> bool {
437        self.inner
438            .entries
439            .binary_search_by(|m| {
440                let key = &m.key;
441                let too_left = match &table_key_range.0 {
442                    std::ops::Bound::Included(range_start) => range_start.as_ref() > key.as_ref(),
443                    std::ops::Bound::Excluded(range_start) => range_start.as_ref() >= key.as_ref(),
444                    std::ops::Bound::Unbounded => false,
445                };
446                if too_left {
447                    return Ordering::Less;
448                }
449
450                let too_right = match &table_key_range.1 {
451                    std::ops::Bound::Included(range_end) => range_end.as_ref() < key.as_ref(),
452                    std::ops::Bound::Excluded(range_end) => range_end.as_ref() <= key.as_ref(),
453                    std::ops::Bound::Unbounded => false,
454                };
455                if too_right {
456                    return Ordering::Greater;
457                }
458
459                Ordering::Equal
460            })
461            .is_ok()
462    }
463
464    pub fn into_directed_iter<D: HummockIteratorDirection, const IS_NEW_VALUE: bool>(
465        self,
466    ) -> SharedBufferBatchIterator<D, IS_NEW_VALUE> {
467        SharedBufferBatchIterator::<D, IS_NEW_VALUE>::new(self.inner, self.table_id)
468    }
469
470    pub fn into_old_value_iter(self) -> SharedBufferBatchIterator<Forward, false> {
471        self.into_directed_iter()
472    }
473
474    pub fn into_forward_iter(self) -> SharedBufferBatchIterator<Forward> {
475        self.into_directed_iter()
476    }
477
478    pub fn into_backward_iter(self) -> SharedBufferBatchIterator<Backward> {
479        self.into_directed_iter()
480    }
481
482    #[inline(always)]
483    pub fn start_table_key(&self) -> TableKey<&[u8]> {
484        TableKey(self.inner.entries.first().expect("non-empty").key.as_ref())
485    }
486
487    #[inline(always)]
488    pub fn end_table_key(&self) -> TableKey<&[u8]> {
489        TableKey(self.inner.entries.last().expect("non-empty").key.as_ref())
490    }
491
492    #[inline(always)]
493    pub fn raw_largest_key(&self) -> &TableKey<Bytes> {
494        &self.inner.entries.last().expect("non-empty").key
495    }
496
497    /// return inclusive left endpoint, which means that all data in this batch should be larger or
498    /// equal than this key.
499    pub fn start_user_key(&self) -> UserKey<&[u8]> {
500        UserKey::new(self.table_id, self.start_table_key())
501    }
502
503    pub fn size(&self) -> usize {
504        self.inner.size
505    }
506
507    pub(crate) fn old_values(&self) -> Option<&SharedBufferBatchOldValues> {
508        self.inner.old_values.as_ref()
509    }
510
511    pub fn batch_id(&self) -> SharedBufferBatchId {
512        self.inner.batch_id
513    }
514
515    pub fn epochs(&self) -> &Vec<HummockEpoch> {
516        &self.inner.epochs
517    }
518
519    pub(crate) fn build_shared_buffer_batch(
520        epoch: HummockEpoch,
521        spill_offset: u16,
522        sorted_items: Vec<SharedBufferItem>,
523        old_values: Option<SharedBufferBatchOldValues>,
524        size: usize,
525        table_id: TableId,
526        table_metrics: Arc<TableMemoryMetrics>,
527    ) -> Self {
528        let inner = SharedBufferBatchInner::new(
529            epoch,
530            spill_offset,
531            sorted_items,
532            old_values,
533            size,
534            table_metrics,
535        );
536        SharedBufferBatch {
537            inner: Arc::new(inner),
538            table_id,
539        }
540    }
541
542    #[cfg(any(test, feature = "test"))]
543    pub fn build_shared_buffer_batch_for_test(
544        epoch: HummockEpoch,
545        spill_offset: u16,
546        sorted_items: Vec<SharedBufferItem>,
547        size: usize,
548        table_id: TableId,
549    ) -> Self {
550        let inner = SharedBufferBatchInner::new(
551            epoch,
552            spill_offset,
553            sorted_items,
554            None,
555            size,
556            TableMemoryMetrics::for_test(),
557        );
558        SharedBufferBatch {
559            inner: Arc::new(inner),
560            table_id,
561        }
562    }
563}
564
565/// Iterate all the items in the shared buffer batch
566/// If there are multiple versions of a key, the iterator will return all versions
567pub struct SharedBufferBatchIterator<D: HummockIteratorDirection, const IS_NEW_VALUE: bool = true> {
568    inner: Arc<SharedBufferBatchInner>,
569    /// The index of the current entry in the payload
570    current_entry_idx: usize,
571    /// The index of current value
572    current_value_idx: usize,
573    /// The exclusive end offset of the value index of current key.
574    value_end_offset: usize,
575    table_id: TableId,
576    _phantom: PhantomData<D>,
577}
578
579impl<D: HummockIteratorDirection, const IS_NEW_VALUE: bool>
580    SharedBufferBatchIterator<D, IS_NEW_VALUE>
581{
582    pub(crate) fn new(inner: Arc<SharedBufferBatchInner>, table_id: TableId) -> Self {
583        if !IS_NEW_VALUE {
584            assert!(
585                inner.old_values.is_some(),
586                "create old value iter with no old value: {:?}",
587                table_id
588            );
589        }
590        Self {
591            inner,
592            current_entry_idx: 0,
593            current_value_idx: 0,
594            value_end_offset: 0,
595            table_id,
596            _phantom: Default::default(),
597        }
598    }
599
600    fn is_valid_entry_idx(&self) -> bool {
601        self.current_entry_idx < self.inner.entries.len()
602    }
603
604    fn invalidate(&mut self) {
605        self.current_entry_idx = self.inner.entries.len();
606    }
607
608    fn advance_to_next_entry(&mut self) {
609        debug_assert!(self.is_valid_entry_idx());
610        match D::direction() {
611            DirectionEnum::Forward => {
612                self.current_entry_idx += 1;
613            }
614            DirectionEnum::Backward => {
615                if self.current_entry_idx == 0 {
616                    self.invalidate();
617                } else {
618                    self.current_entry_idx -= 1;
619                }
620            }
621        }
622    }
623
624    fn reset_value_idx(&mut self) {
625        debug_assert!(self.is_valid_entry_idx());
626        self.current_value_idx = self.inner.entries[self.current_entry_idx].value_offset;
627        self.value_end_offset = self.get_value_end_offset();
628    }
629
630    fn get_value_end_offset(&self) -> usize {
631        debug_assert!(self.is_valid_entry_idx());
632        SharedBufferKeyEntry::value_end_offset(
633            self.current_entry_idx,
634            &self.inner.entries,
635            &self.inner.new_values,
636        )
637    }
638
639    fn assert_valid_idx(&self) {
640        debug_assert!(self.is_valid_entry_idx());
641        debug_assert!(
642            self.current_value_idx >= self.inner.entries[self.current_entry_idx].value_offset
643        );
644        debug_assert_eq!(self.value_end_offset, self.get_value_end_offset());
645        debug_assert!(self.current_value_idx < self.value_end_offset);
646        if !IS_NEW_VALUE {
647            debug_assert!(!matches!(
648                &self.inner.new_values[self.current_value_idx].1,
649                SharedBufferValue::Insert(_)
650            ));
651        }
652    }
653
654    fn advance_to_next_value(&mut self) {
655        self.assert_valid_idx();
656
657        if self.current_value_idx + 1 < self.value_end_offset {
658            self.current_value_idx += 1;
659        } else {
660            self.advance_to_next_entry();
661            if self.is_valid_entry_idx() {
662                self.reset_value_idx();
663            }
664        }
665    }
666
667    fn advance_until_valid_old_value(&mut self) {
668        debug_assert!(!IS_NEW_VALUE);
669        if !self.is_valid_entry_idx() {
670            return;
671        }
672        loop {
673            while self.current_value_idx < self.value_end_offset
674                && matches!(
675                    &self.inner.new_values[self.current_value_idx].1,
676                    SharedBufferValue::Insert(_)
677                )
678            {
679                self.current_value_idx += 1;
680            }
681            if self.current_value_idx >= self.value_end_offset {
682                debug_assert_eq!(self.current_value_idx, self.value_end_offset);
683                self.advance_to_next_entry();
684                if self.is_valid_entry_idx() {
685                    self.reset_value_idx();
686                    continue;
687                } else {
688                    break;
689                }
690            } else {
691                break;
692            }
693        }
694    }
695}
696
697impl SharedBufferBatchIterator<Forward> {
698    pub(crate) fn advance_to_next_key(&mut self) {
699        self.advance_to_next_entry();
700        if self.is_valid_entry_idx() {
701            self.reset_value_idx();
702        }
703    }
704
705    pub(crate) fn current_key_entry(&self) -> SharedBufferVersionedEntryRef<'_> {
706        self.assert_valid_idx();
707        debug_assert_eq!(
708            self.current_value_idx,
709            self.inner.entries[self.current_entry_idx].value_offset
710        );
711        SharedBufferVersionedEntryRef {
712            key: &self.inner.entries[self.current_entry_idx].key,
713            new_values: &self.inner.new_values[self.current_value_idx..self.value_end_offset],
714            old_values: self.inner.old_values.as_ref().map(|old_values| {
715                &old_values.values[self.current_value_idx..self.value_end_offset]
716            }),
717        }
718    }
719}
720
721impl<D: HummockIteratorDirection, const IS_NEW_VALUE: bool> HummockIterator
722    for SharedBufferBatchIterator<D, IS_NEW_VALUE>
723{
724    type Direction = D;
725
726    async fn next(&mut self) -> HummockResult<()> {
727        self.advance_to_next_value();
728        if !IS_NEW_VALUE {
729            self.advance_until_valid_old_value();
730        }
731        Ok(())
732    }
733
734    fn key(&self) -> FullKey<&[u8]> {
735        self.assert_valid_idx();
736        let key = self.inner.entries[self.current_entry_idx].key.as_ref();
737        let epoch_with_gap = self.inner.new_values[self.current_value_idx].0;
738        FullKey::new_with_gap_epoch(self.table_id, TableKey(key), epoch_with_gap)
739    }
740
741    fn value(&self) -> HummockValue<&[u8]> {
742        self.assert_valid_idx();
743        if IS_NEW_VALUE {
744            self.inner.new_values[self.current_value_idx]
745                .1
746                .to_ref()
747                .to_slice()
748                .into()
749        } else {
750            HummockValue::put(
751                self.inner.old_values.as_ref().unwrap().values[self.current_value_idx].as_ref(),
752            )
753        }
754    }
755
756    fn is_valid(&self) -> bool {
757        self.is_valid_entry_idx()
758    }
759
760    async fn rewind(&mut self) -> HummockResult<()> {
761        match D::direction() {
762            DirectionEnum::Forward => {
763                self.current_entry_idx = 0;
764            }
765            DirectionEnum::Backward => {
766                self.current_entry_idx = self.inner.entries.len() - 1;
767            }
768        };
769        self.reset_value_idx();
770        if !IS_NEW_VALUE {
771            self.advance_until_valid_old_value();
772        }
773        Ok(())
774    }
775
776    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
777        match key.user_key.table_id.cmp(&self.table_id) {
778            Ordering::Less => {
779                match D::direction() {
780                    DirectionEnum::Forward => {
781                        // seek key table id < batch table id, so seek to beginning
782                        self.rewind().await?;
783                        return Ok(());
784                    }
785                    DirectionEnum::Backward => {
786                        self.invalidate();
787                        return Ok(());
788                    }
789                };
790            }
791            Ordering::Greater => {
792                match D::direction() {
793                    DirectionEnum::Forward => {
794                        self.invalidate();
795                        return Ok(());
796                    }
797                    DirectionEnum::Backward => {
798                        // seek key table id > batch table id, so seek to end
799                        self.rewind().await?;
800                        return Ok(());
801                    }
802                };
803            }
804            Ordering::Equal => (),
805        }
806        // Perform binary search on table key because the items in SharedBufferBatch is ordered
807        // by table key.
808        let partition_point = self
809            .inner
810            .entries
811            .binary_search_by(|probe| probe.key.as_ref().cmp(*key.user_key.table_key));
812        let seek_key_epoch = key.epoch_with_gap;
813        match partition_point {
814            Ok(i) => {
815                self.current_entry_idx = i;
816                self.reset_value_idx();
817                while self.current_value_idx < self.value_end_offset {
818                    let epoch_with_gap = self.inner.new_values[self.current_value_idx].0;
819                    if epoch_with_gap <= seek_key_epoch {
820                        break;
821                    }
822                    self.current_value_idx += 1;
823                }
824                if self.current_value_idx == self.value_end_offset {
825                    self.advance_to_next_entry();
826                    if self.is_valid_entry_idx() {
827                        self.reset_value_idx();
828                    }
829                }
830            }
831            Err(i) => match D::direction() {
832                DirectionEnum::Forward => {
833                    self.current_entry_idx = i;
834                    if self.is_valid_entry_idx() {
835                        self.reset_value_idx();
836                    }
837                }
838                DirectionEnum::Backward => {
839                    if i == 0 {
840                        self.invalidate();
841                    } else {
842                        self.current_entry_idx = i - 1;
843                        self.reset_value_idx();
844                    }
845                }
846            },
847        };
848        if !IS_NEW_VALUE {
849            self.advance_until_valid_old_value();
850        }
851        Ok(())
852    }
853
854    fn collect_local_statistic(&self, _stats: &mut crate::monitor::StoreLocalStatistic) {}
855
856    fn value_meta(&self) -> ValueMeta {
857        ValueMeta::default()
858    }
859}
860
861#[cfg(test)]
862mod tests {
863    use std::ops::Bound::Excluded;
864
865    use itertools::{Itertools, zip_eq};
866    use risingwave_common::util::epoch::{EpochExt, test_epoch};
867    use risingwave_hummock_sdk::key::map_table_key_range;
868
869    use super::*;
870    use crate::hummock::compactor::merge_imms_in_memory;
871    use crate::hummock::iterator::test_utils::{
872        iterator_test_key_of_epoch, iterator_test_table_key_of, transform_shared_buffer,
873    };
874
875    fn to_hummock_value_batch(
876        items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)>,
877    ) -> Vec<(Vec<u8>, HummockValue<Bytes>)> {
878        items.into_iter().map(|(k, v)| (k, v.into())).collect()
879    }
880
881    #[tokio::test]
882    async fn test_shared_buffer_batch_basic() {
883        let epoch = test_epoch(1);
884        let shared_buffer_items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
885            (
886                iterator_test_table_key_of(0),
887                SharedBufferValue::Insert(Bytes::from("value1")),
888            ),
889            (
890                iterator_test_table_key_of(1),
891                SharedBufferValue::Insert(Bytes::from("value1")),
892            ),
893            (
894                iterator_test_table_key_of(2),
895                SharedBufferValue::Insert(Bytes::from("value1")),
896            ),
897        ];
898        let shared_buffer_batch = SharedBufferBatch::for_test(
899            transform_shared_buffer(shared_buffer_items.clone()),
900            epoch,
901            Default::default(),
902        );
903        let shared_buffer_items = to_hummock_value_batch(shared_buffer_items);
904
905        // Sketch
906        assert_eq!(
907            *shared_buffer_batch.start_table_key(),
908            shared_buffer_items[0].0
909        );
910        assert_eq!(
911            *shared_buffer_batch.end_table_key(),
912            shared_buffer_items[2].0
913        );
914
915        // Point lookup
916        for (k, v) in &shared_buffer_items {
917            assert_eq!(
918                shared_buffer_batch
919                    .get(TableKey(k.as_slice()), epoch, &ReadOptions::default())
920                    .unwrap()
921                    .0
922                    .as_slice(),
923                v.as_slice()
924            );
925        }
926        assert_eq!(
927            shared_buffer_batch.get(
928                TableKey(iterator_test_table_key_of(3).as_slice()),
929                epoch,
930                &ReadOptions::default()
931            ),
932            None
933        );
934        assert_eq!(
935            shared_buffer_batch.get(
936                TableKey(iterator_test_table_key_of(4).as_slice()),
937                epoch,
938                &ReadOptions::default()
939            ),
940            None
941        );
942
943        // Forward iterator
944        let mut iter = shared_buffer_batch.clone().into_forward_iter();
945        iter.rewind().await.unwrap();
946        let mut output = vec![];
947        while iter.is_valid() {
948            output.push((
949                iter.key().user_key.table_key.to_vec(),
950                iter.value().to_bytes(),
951            ));
952            iter.next().await.unwrap();
953        }
954        assert_eq!(output, shared_buffer_items);
955
956        // Backward iterator
957        let mut backward_iter = shared_buffer_batch.clone().into_backward_iter();
958        backward_iter.rewind().await.unwrap();
959        let mut output = vec![];
960        while backward_iter.is_valid() {
961            output.push((
962                backward_iter.key().user_key.table_key.to_vec(),
963                backward_iter.value().to_bytes(),
964            ));
965            backward_iter.next().await.unwrap();
966        }
967        output.reverse();
968        assert_eq!(output, shared_buffer_items);
969    }
970
971    #[tokio::test]
972    async fn test_shared_buffer_batch_seek() {
973        let epoch = test_epoch(1);
974        let shared_buffer_items = vec![
975            (
976                iterator_test_table_key_of(1),
977                SharedBufferValue::Insert(Bytes::from("value1")),
978            ),
979            (
980                iterator_test_table_key_of(2),
981                SharedBufferValue::Insert(Bytes::from("value2")),
982            ),
983            (
984                iterator_test_table_key_of(3),
985                SharedBufferValue::Insert(Bytes::from("value3")),
986            ),
987        ];
988        let shared_buffer_batch = SharedBufferBatch::for_test(
989            transform_shared_buffer(shared_buffer_items.clone()),
990            epoch,
991            Default::default(),
992        );
993        let shared_buffer_items = to_hummock_value_batch(shared_buffer_items);
994
995        // FORWARD: Seek to a key < 1st key, expect all three items to return
996        let mut iter = shared_buffer_batch.clone().into_forward_iter();
997        iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
998            .await
999            .unwrap();
1000        for item in &shared_buffer_items {
1001            assert!(iter.is_valid());
1002            assert_eq!(*iter.key().user_key.table_key, item.0);
1003            assert_eq!(iter.value(), item.1.as_slice());
1004            iter.next().await.unwrap();
1005        }
1006        assert!(!iter.is_valid());
1007
1008        // FORWARD: Seek to a key > the last key, expect no items to return
1009        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1010        iter.seek(iterator_test_key_of_epoch(4, epoch).to_ref())
1011            .await
1012            .unwrap();
1013        assert!(!iter.is_valid());
1014
1015        // FORWARD: Seek to 2nd key with current epoch, expect last two items to return
1016        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1017        iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1018            .await
1019            .unwrap();
1020        for item in &shared_buffer_items[1..] {
1021            assert!(iter.is_valid());
1022            assert_eq!(*iter.key().user_key.table_key, item.0);
1023            assert_eq!(iter.value(), item.1.as_slice());
1024            iter.next().await.unwrap();
1025        }
1026        assert!(!iter.is_valid());
1027
1028        // FORWARD: Seek to 2nd key with future epoch, expect last two items to return
1029        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1030        iter.seek(iterator_test_key_of_epoch(2, test_epoch(2)).to_ref())
1031            .await
1032            .unwrap();
1033        for item in &shared_buffer_items[1..] {
1034            assert!(iter.is_valid());
1035            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1036            assert_eq!(iter.value(), item.1.as_slice());
1037            iter.next().await.unwrap();
1038        }
1039        assert!(!iter.is_valid());
1040
1041        // FORWARD: Seek to 2nd key with old epoch, expect last item to return
1042        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1043        iter.seek(iterator_test_key_of_epoch(2, test_epoch(0)).to_ref())
1044            .await
1045            .unwrap();
1046        let item = shared_buffer_items.last().unwrap();
1047        assert!(iter.is_valid());
1048        assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1049        assert_eq!(iter.value(), item.1.as_slice());
1050        iter.next().await.unwrap();
1051        assert!(!iter.is_valid());
1052
1053        // BACKWARD: Seek to a key < 1st key, expect no items to return
1054        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1055        iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1056            .await
1057            .unwrap();
1058        assert!(!iter.is_valid());
1059
1060        // BACKWARD: Seek to a key > the last key, expect all items to return
1061        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1062        iter.seek(iterator_test_key_of_epoch(4, epoch).to_ref())
1063            .await
1064            .unwrap();
1065        for item in shared_buffer_items.iter().rev() {
1066            assert!(iter.is_valid());
1067            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1068            assert_eq!(iter.value(), item.1.as_slice());
1069            iter.next().await.unwrap();
1070        }
1071        assert!(!iter.is_valid());
1072
1073        // BACKWARD: Seek to 2nd key with current epoch, expect first two items to return
1074        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1075        iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1076            .await
1077            .unwrap();
1078        for item in shared_buffer_items[0..=1].iter().rev() {
1079            assert!(iter.is_valid());
1080            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1081            assert_eq!(iter.value(), item.1.as_slice());
1082            iter.next().await.unwrap();
1083        }
1084        assert!(!iter.is_valid());
1085
1086        // BACKWARD: Seek to 2nd key with old epoch, expect first item to return
1087        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1088        iter.seek(iterator_test_key_of_epoch(2, epoch.prev_epoch()).to_ref())
1089            .await
1090            .unwrap();
1091        assert!(iter.is_valid());
1092        let item = shared_buffer_items.first().unwrap();
1093        assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1094        assert_eq!(iter.value(), item.1.as_slice());
1095        iter.next().await.unwrap();
1096        assert!(!iter.is_valid());
1097
1098        // BACKWARD: Seek to 2nd key with future epoch, expect first two item to return
1099        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1100        iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1101            .await
1102            .unwrap();
1103        for item in shared_buffer_items[0..=1].iter().rev() {
1104            assert!(iter.is_valid());
1105            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1106            assert_eq!(iter.value(), item.1.as_slice());
1107            iter.next().await.unwrap();
1108        }
1109        assert!(!iter.is_valid());
1110    }
1111
1112    #[tokio::test]
1113    async fn test_shared_buffer_batch_old_value_iter() {
1114        let epoch = test_epoch(1);
1115        let key_values = vec![
1116            (
1117                iterator_test_table_key_of(1),
1118                SharedBufferValue::Insert(Bytes::from("value1")),
1119            ),
1120            (
1121                iterator_test_table_key_of(2),
1122                SharedBufferValue::Update(Bytes::from("value2")),
1123            ),
1124            (
1125                iterator_test_table_key_of(3),
1126                SharedBufferValue::Insert(Bytes::from("value3")),
1127            ),
1128            (iterator_test_table_key_of(4), SharedBufferValue::Delete),
1129        ];
1130        let old_values = vec![
1131            Bytes::new(),
1132            Bytes::from("old_value2"),
1133            Bytes::new(),
1134            Bytes::from("old_value4"),
1135        ];
1136        let shared_buffer_batch = SharedBufferBatch::for_test_with_old_values(
1137            transform_shared_buffer(key_values.clone()),
1138            old_values.clone(),
1139            epoch,
1140            Default::default(),
1141        );
1142        let shared_buffer_items = to_hummock_value_batch(key_values.clone());
1143        let expected_old_value_iter_items = zip_eq(&key_values, &old_values)
1144            .filter(|((_, new_value), _)| !matches!(new_value, SharedBufferValue::Insert(_)))
1145            .map(|((key, _), old_value)| (key.clone(), HummockValue::Put(old_value)))
1146            .collect_vec();
1147
1148        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1149        iter.rewind().await.unwrap();
1150        for item in &expected_old_value_iter_items {
1151            assert!(iter.is_valid());
1152            assert_eq!(*iter.key().user_key.table_key, item.0);
1153            assert_eq!(iter.value(), item.1.as_slice());
1154            iter.next().await.unwrap();
1155        }
1156        assert!(!iter.is_valid());
1157
1158        // FORWARD: Seek to a key < 1st key, expect all three items to return
1159        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1160        iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1161            .await
1162            .unwrap();
1163        for item in &shared_buffer_items {
1164            assert!(iter.is_valid());
1165            assert_eq!(*iter.key().user_key.table_key, item.0);
1166            assert_eq!(iter.value(), item.1.as_slice());
1167            iter.next().await.unwrap();
1168        }
1169        assert!(!iter.is_valid());
1170
1171        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1172        iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1173            .await
1174            .unwrap();
1175        for item in &expected_old_value_iter_items {
1176            assert!(iter.is_valid());
1177            assert_eq!(*iter.key().user_key.table_key, item.0);
1178            assert_eq!(iter.value(), item.1.as_slice());
1179            iter.next().await.unwrap();
1180        }
1181        assert!(!iter.is_valid());
1182
1183        // FORWARD: Seek to a key > the last key, expect no items to return
1184        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1185        iter.seek(iterator_test_key_of_epoch(5, epoch).to_ref())
1186            .await
1187            .unwrap();
1188        assert!(!iter.is_valid());
1189
1190        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1191        iter.seek(iterator_test_key_of_epoch(5, epoch).to_ref())
1192            .await
1193            .unwrap();
1194        assert!(!iter.is_valid());
1195
1196        // FORWARD: Seek to 2nd key with current epoch, expect last two items to return
1197        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1198        iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1199            .await
1200            .unwrap();
1201        for item in &shared_buffer_items[1..] {
1202            assert!(iter.is_valid());
1203            assert_eq!(*iter.key().user_key.table_key, item.0);
1204            assert_eq!(iter.value(), item.1.as_slice());
1205            iter.next().await.unwrap();
1206        }
1207        assert!(!iter.is_valid());
1208
1209        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1210        iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1211            .await
1212            .unwrap();
1213        for item in &expected_old_value_iter_items {
1214            assert!(iter.is_valid());
1215            assert_eq!(*iter.key().user_key.table_key, item.0);
1216            assert_eq!(iter.value(), item.1.as_slice());
1217            iter.next().await.unwrap();
1218        }
1219        assert!(!iter.is_valid());
1220
1221        // FORWARD: Seek to 2nd key with future epoch, expect last two items to return
1222        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1223        iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1224            .await
1225            .unwrap();
1226        for item in &shared_buffer_items[1..] {
1227            assert!(iter.is_valid());
1228            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1229            assert_eq!(iter.value(), item.1.as_slice());
1230            iter.next().await.unwrap();
1231        }
1232        assert!(!iter.is_valid());
1233
1234        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1235        iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1236            .await
1237            .unwrap();
1238        for item in &expected_old_value_iter_items {
1239            assert!(iter.is_valid());
1240            assert_eq!(*iter.key().user_key.table_key, item.0);
1241            assert_eq!(iter.value(), item.1.as_slice());
1242            iter.next().await.unwrap();
1243        }
1244        assert!(!iter.is_valid());
1245
1246        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1247        iter.seek(iterator_test_key_of_epoch(2, epoch.prev_epoch()).to_ref())
1248            .await
1249            .unwrap();
1250        for item in &expected_old_value_iter_items[1..] {
1251            assert!(iter.is_valid());
1252            assert_eq!(*iter.key().user_key.table_key, item.0);
1253            assert_eq!(iter.value(), item.1.as_slice());
1254            iter.next().await.unwrap();
1255        }
1256        assert!(!iter.is_valid());
1257
1258        // FORWARD: Seek to 2nd key with old epoch, expect last item to return
1259        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1260        iter.seek(iterator_test_key_of_epoch(3, epoch.prev_epoch()).to_ref())
1261            .await
1262            .unwrap();
1263        let item = shared_buffer_items.last().unwrap();
1264        assert!(iter.is_valid());
1265        assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1266        assert_eq!(iter.value(), item.1.as_slice());
1267        iter.next().await.unwrap();
1268        assert!(!iter.is_valid());
1269
1270        // Seek to an insert key
1271        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1272        iter.seek(iterator_test_key_of_epoch(3, epoch).to_ref())
1273            .await
1274            .unwrap();
1275        for item in &expected_old_value_iter_items[1..] {
1276            assert!(iter.is_valid());
1277            assert_eq!(*iter.key().user_key.table_key, item.0);
1278            assert_eq!(iter.value(), item.1.as_slice());
1279            iter.next().await.unwrap();
1280        }
1281        assert!(!iter.is_valid());
1282    }
1283
1284    #[tokio::test]
1285    #[should_panic]
1286    async fn test_invalid_table_id() {
1287        let epoch = test_epoch(1);
1288        let shared_buffer_batch = SharedBufferBatch::for_test(vec![], epoch, Default::default());
1289        // Seeking to non-current epoch should panic
1290        let mut iter = shared_buffer_batch.into_forward_iter();
1291        iter.seek(FullKey::for_test(TableId::new(1), vec![], epoch).to_ref())
1292            .await
1293            .unwrap();
1294    }
1295
1296    #[tokio::test]
1297    async fn test_shared_buffer_batch_range_existx() {
1298        let epoch = test_epoch(1);
1299        let shared_buffer_items = vec![
1300            (
1301                Vec::from("a_1"),
1302                SharedBufferValue::Insert(Bytes::from("value1")),
1303            ),
1304            (
1305                Vec::from("a_3"),
1306                SharedBufferValue::Insert(Bytes::from("value2")),
1307            ),
1308            (
1309                Vec::from("a_5"),
1310                SharedBufferValue::Insert(Bytes::from("value3")),
1311            ),
1312            (
1313                Vec::from("b_2"),
1314                SharedBufferValue::Insert(Bytes::from("value3")),
1315            ),
1316        ];
1317        let shared_buffer_batch = SharedBufferBatch::for_test(
1318            transform_shared_buffer(shared_buffer_items),
1319            epoch,
1320            Default::default(),
1321        );
1322
1323        let range = (Included(Bytes::from("a")), Excluded(Bytes::from("b")));
1324        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1325        let range = (Included(Bytes::from("a_")), Excluded(Bytes::from("b_")));
1326        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1327        let range = (Included(Bytes::from("a_1")), Included(Bytes::from("a_1")));
1328        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1329        let range = (Included(Bytes::from("a_1")), Included(Bytes::from("a_2")));
1330        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1331        let range = (Included(Bytes::from("a_0x")), Included(Bytes::from("a_2x")));
1332        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1333        let range = (Included(Bytes::from("a_")), Excluded(Bytes::from("c_")));
1334        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1335        let range = (Included(Bytes::from("b_0x")), Included(Bytes::from("b_2x")));
1336        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1337        let range = (Included(Bytes::from("b_2")), Excluded(Bytes::from("c_1x")));
1338        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1339
1340        let range = (Included(Bytes::from("a_0")), Excluded(Bytes::from("a_1")));
1341        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1342        let range = (Included(Bytes::from("a__0")), Excluded(Bytes::from("a__5")));
1343        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1344        let range = (Included(Bytes::from("b_1")), Excluded(Bytes::from("b_2")));
1345        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1346        let range = (Included(Bytes::from("b_3")), Excluded(Bytes::from("c_1")));
1347        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1348        let range = (Included(Bytes::from("b__x")), Excluded(Bytes::from("c__x")));
1349        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1350    }
1351
1352    #[tokio::test]
1353    async fn test_merge_imms_basic() {
1354        let table_id = TableId::new(1004);
1355        let shared_buffer_items1: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1356            (
1357                iterator_test_table_key_of(1),
1358                SharedBufferValue::Insert(Bytes::from("value1")),
1359            ),
1360            (
1361                iterator_test_table_key_of(2),
1362                SharedBufferValue::Insert(Bytes::from("value2")),
1363            ),
1364            (
1365                iterator_test_table_key_of(3),
1366                SharedBufferValue::Insert(Bytes::from("value3")),
1367            ),
1368        ];
1369        let epoch = test_epoch(1);
1370        let imm1 = SharedBufferBatch::for_test(
1371            transform_shared_buffer(shared_buffer_items1.clone()),
1372            epoch,
1373            table_id,
1374        );
1375        let shared_buffer_items1 = to_hummock_value_batch(shared_buffer_items1);
1376        let shared_buffer_items2: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1377            (
1378                iterator_test_table_key_of(1),
1379                SharedBufferValue::Insert(Bytes::from("value12")),
1380            ),
1381            (
1382                iterator_test_table_key_of(2),
1383                SharedBufferValue::Insert(Bytes::from("value22")),
1384            ),
1385            (
1386                iterator_test_table_key_of(3),
1387                SharedBufferValue::Insert(Bytes::from("value32")),
1388            ),
1389        ];
1390        let epoch = test_epoch(2);
1391        let imm2 = SharedBufferBatch::for_test(
1392            transform_shared_buffer(shared_buffer_items2.clone()),
1393            epoch,
1394            table_id,
1395        );
1396        let shared_buffer_items2 = to_hummock_value_batch(shared_buffer_items2);
1397
1398        let shared_buffer_items3: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1399            (
1400                iterator_test_table_key_of(1),
1401                SharedBufferValue::Insert(Bytes::from("value13")),
1402            ),
1403            (
1404                iterator_test_table_key_of(2),
1405                SharedBufferValue::Insert(Bytes::from("value23")),
1406            ),
1407            (
1408                iterator_test_table_key_of(3),
1409                SharedBufferValue::Insert(Bytes::from("value33")),
1410            ),
1411        ];
1412        let epoch = test_epoch(3);
1413        let imm3 = SharedBufferBatch::for_test(
1414            transform_shared_buffer(shared_buffer_items3.clone()),
1415            epoch,
1416            table_id,
1417        );
1418        let shared_buffer_items3 = to_hummock_value_batch(shared_buffer_items3);
1419
1420        let batch_items = [
1421            shared_buffer_items1,
1422            shared_buffer_items2,
1423            shared_buffer_items3,
1424        ];
1425        // newer data comes first
1426        let imms = vec![imm3, imm2, imm1];
1427        let merged_imm = merge_imms_in_memory(table_id, imms.clone()).await;
1428
1429        // Point lookup
1430        for (i, items) in batch_items.iter().enumerate() {
1431            for (key, value) in items {
1432                assert_eq!(
1433                    merged_imm
1434                        .get(
1435                            TableKey(key.as_slice()),
1436                            test_epoch(i as u64 + 1),
1437                            &ReadOptions::default()
1438                        )
1439                        .unwrap()
1440                        .0
1441                        .as_slice(),
1442                    value.as_slice(),
1443                    "epoch: {}, key: {:?}",
1444                    test_epoch(i as u64 + 1),
1445                    String::from_utf8(key.clone())
1446                );
1447            }
1448        }
1449        assert_eq!(
1450            merged_imm.get(
1451                TableKey(iterator_test_table_key_of(4).as_slice()),
1452                test_epoch(1),
1453                &ReadOptions::default()
1454            ),
1455            None
1456        );
1457        assert_eq!(
1458            merged_imm.get(
1459                TableKey(iterator_test_table_key_of(5).as_slice()),
1460                test_epoch(1),
1461                &ReadOptions::default()
1462            ),
1463            None
1464        );
1465
1466        // Forward iterator
1467        for snapshot_epoch in 1..=3 {
1468            let mut iter = merged_imm.clone().into_forward_iter();
1469            iter.rewind().await.unwrap();
1470            let mut output = vec![];
1471            while iter.is_valid() {
1472                let epoch = iter.key().epoch_with_gap.pure_epoch();
1473                if test_epoch(snapshot_epoch) == epoch {
1474                    output.push((
1475                        iter.key().user_key.table_key.to_vec(),
1476                        iter.value().to_bytes(),
1477                    ));
1478                }
1479                iter.next().await.unwrap();
1480            }
1481            assert_eq!(output, batch_items[snapshot_epoch as usize - 1]);
1482        }
1483
1484        // Forward and Backward iterator
1485        {
1486            let mut iter = merged_imm.clone().into_forward_iter();
1487            iter.rewind().await.unwrap();
1488            let mut output = vec![];
1489            while iter.is_valid() {
1490                output.push((
1491                    iter.key().user_key.table_key.to_vec(),
1492                    iter.value().to_bytes(),
1493                ));
1494                iter.next().await.unwrap();
1495            }
1496
1497            let mut expected = vec![];
1498            #[expect(clippy::needless_range_loop)]
1499            for key_idx in 0..=2 {
1500                for epoch in (1..=3).rev() {
1501                    let item = batch_items[epoch - 1][key_idx].clone();
1502                    expected.push(item);
1503                }
1504            }
1505            assert_eq!(expected, output);
1506
1507            let mut backward_iter = merged_imm.clone().into_backward_iter();
1508            backward_iter.rewind().await.unwrap();
1509            let mut output = vec![];
1510            while backward_iter.is_valid() {
1511                output.push((
1512                    backward_iter.key().user_key.table_key.to_vec(),
1513                    backward_iter.value().to_bytes(),
1514                ));
1515                backward_iter.next().await.unwrap();
1516            }
1517            let mut expected = vec![];
1518            for key_idx in (0..=2).rev() {
1519                for epoch in (1..=3).rev() {
1520                    let item = batch_items[epoch - 1][key_idx].clone();
1521                    expected.push(item);
1522                }
1523            }
1524            assert_eq!(expected, output);
1525        }
1526    }
1527
1528    #[tokio::test]
1529    async fn test_merge_imms_with_old_values() {
1530        let table_id = TableId::new(1004);
1531        let key_value1: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1532            (
1533                iterator_test_table_key_of(1),
1534                SharedBufferValue::Insert(Bytes::from("value1")),
1535            ),
1536            (
1537                iterator_test_table_key_of(2),
1538                SharedBufferValue::Update(Bytes::from("value2")),
1539            ),
1540            (iterator_test_table_key_of(3), SharedBufferValue::Delete),
1541        ];
1542        let old_value1 = vec![
1543            Bytes::new(),
1544            Bytes::from("old_value2"),
1545            Bytes::from("old_value3"),
1546        ];
1547        let epoch = test_epoch(1);
1548        let imm1 = SharedBufferBatch::for_test_with_old_values(
1549            transform_shared_buffer(key_value1.clone()),
1550            old_value1.clone(),
1551            epoch,
1552            table_id,
1553        );
1554        let shared_buffer_items1 = to_hummock_value_batch(key_value1.clone());
1555        let key_value2: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1556            (
1557                iterator_test_table_key_of(1),
1558                SharedBufferValue::Update(Bytes::from("value12")),
1559            ),
1560            (
1561                iterator_test_table_key_of(2),
1562                SharedBufferValue::Update(Bytes::from("value22")),
1563            ),
1564            (
1565                iterator_test_table_key_of(3),
1566                SharedBufferValue::Insert(Bytes::from("value32")),
1567            ),
1568        ];
1569        let old_value2 = vec![Bytes::from("value1"), Bytes::from("value2"), Bytes::new()];
1570        let epoch = epoch.next_epoch();
1571        let imm2 = SharedBufferBatch::for_test_with_old_values(
1572            transform_shared_buffer(key_value2.clone()),
1573            old_value2.clone(),
1574            epoch,
1575            table_id,
1576        );
1577        let shared_buffer_items2 = to_hummock_value_batch(key_value2.clone());
1578
1579        let key_value3: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1580            (iterator_test_table_key_of(1), SharedBufferValue::Delete),
1581            (iterator_test_table_key_of(2), SharedBufferValue::Delete),
1582            (
1583                iterator_test_table_key_of(3),
1584                SharedBufferValue::Update(Bytes::from("value33")),
1585            ),
1586        ];
1587        let old_value3 = vec![
1588            Bytes::from("value12"),
1589            Bytes::from("value22"),
1590            Bytes::from("value32"),
1591        ];
1592        let epoch = epoch.next_epoch();
1593        let imm3 = SharedBufferBatch::for_test_with_old_values(
1594            transform_shared_buffer(key_value3.clone()),
1595            old_value3.clone(),
1596            epoch,
1597            table_id,
1598        );
1599        let shared_buffer_items3 = to_hummock_value_batch(key_value3.clone());
1600
1601        let key_values = [
1602            (key_value1, old_value1),
1603            (key_value2, old_value2),
1604            (key_value3, old_value3),
1605        ];
1606
1607        let batch_items = [
1608            shared_buffer_items1,
1609            shared_buffer_items2,
1610            shared_buffer_items3,
1611        ];
1612        // newer data comes first
1613        let imms = vec![imm3, imm2, imm1];
1614        let merged_imm = merge_imms_in_memory(table_id, imms.clone()).await;
1615
1616        // Point lookup
1617        for (i, items) in batch_items.iter().enumerate() {
1618            for (key, value) in items {
1619                assert_eq!(
1620                    merged_imm
1621                        .get(
1622                            TableKey(key.as_slice()),
1623                            test_epoch(i as u64 + 1),
1624                            &ReadOptions::default()
1625                        )
1626                        .unwrap()
1627                        .0
1628                        .as_slice(),
1629                    value.as_slice(),
1630                    "epoch: {}, key: {:?}",
1631                    test_epoch(i as u64 + 1),
1632                    String::from_utf8(key.clone())
1633                );
1634            }
1635        }
1636        assert_eq!(
1637            merged_imm.get(
1638                TableKey(iterator_test_table_key_of(4).as_slice()),
1639                test_epoch(1),
1640                &ReadOptions::default()
1641            ),
1642            None
1643        );
1644        assert_eq!(
1645            merged_imm.get(
1646                TableKey(iterator_test_table_key_of(5).as_slice()),
1647                test_epoch(1),
1648                &ReadOptions::default()
1649            ),
1650            None
1651        );
1652
1653        // Forward i
1654        for i in 1..=3 {
1655            let snapshot_epoch = test_epoch(i);
1656            let mut iter = merged_imm.clone().into_forward_iter();
1657            iter.rewind().await.unwrap();
1658            let mut output = vec![];
1659            while iter.is_valid() {
1660                let epoch = iter.key().epoch_with_gap.pure_epoch();
1661                if snapshot_epoch == epoch {
1662                    output.push((
1663                        iter.key().user_key.table_key.to_vec(),
1664                        iter.value().to_bytes(),
1665                    ));
1666                }
1667                iter.next().await.unwrap();
1668            }
1669            assert_eq!(output, batch_items[i as usize - 1]);
1670        }
1671
1672        // Forward and Backward iterator
1673        {
1674            let mut iter = merged_imm.clone().into_forward_iter();
1675            iter.rewind().await.unwrap();
1676            let mut output = vec![];
1677            while iter.is_valid() {
1678                output.push((
1679                    iter.key().user_key.table_key.to_vec(),
1680                    iter.value().to_bytes(),
1681                ));
1682                iter.next().await.unwrap();
1683            }
1684
1685            let mut expected = vec![];
1686            #[expect(clippy::needless_range_loop)]
1687            for key_idx in 0..=2 {
1688                for epoch in (1..=3).rev() {
1689                    let item = batch_items[epoch - 1][key_idx].clone();
1690                    expected.push(item);
1691                }
1692            }
1693            assert_eq!(expected, output);
1694
1695            let mut backward_iter = merged_imm.clone().into_backward_iter();
1696            backward_iter.rewind().await.unwrap();
1697            let mut output = vec![];
1698            while backward_iter.is_valid() {
1699                output.push((
1700                    backward_iter.key().user_key.table_key.to_vec(),
1701                    backward_iter.value().to_bytes(),
1702                ));
1703                backward_iter.next().await.unwrap();
1704            }
1705            let mut expected = vec![];
1706            for key_idx in (0..=2).rev() {
1707                for epoch in (1..=3).rev() {
1708                    let item = batch_items[epoch - 1][key_idx].clone();
1709                    expected.push(item);
1710                }
1711            }
1712            assert_eq!(expected, output);
1713        }
1714
1715        // old value iter
1716        {
1717            let mut iter = merged_imm.clone().into_old_value_iter();
1718            iter.rewind().await.unwrap();
1719            let mut output = vec![];
1720            while iter.is_valid() {
1721                output.push((
1722                    iter.key().user_key.table_key.to_vec(),
1723                    iter.value().to_bytes(),
1724                ));
1725                iter.next().await.unwrap();
1726            }
1727
1728            let mut expected = vec![];
1729            for key_idx in 0..=2 {
1730                for epoch in (0..=2).rev() {
1731                    let (key_values, old_values) = &key_values[epoch];
1732                    let (key, new_value) = &key_values[key_idx];
1733                    let old_value = &old_values[key_idx];
1734                    if matches!(new_value, SharedBufferValue::Insert(_)) {
1735                        continue;
1736                    }
1737                    expected.push((key.clone(), HummockValue::Put(old_value.clone())));
1738                }
1739            }
1740            assert_eq!(expected, output);
1741        }
1742    }
1743    #[tokio::test]
1744    async fn test_shared_buffer_batch_seek_bug() {
1745        // Reproduce the bug where seek falls through to binary_search when table_id mismatch
1746        let epoch = test_epoch(1);
1747        let table_id = TableId::new(100);
1748        let shared_buffer_items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![(
1749            iterator_test_table_key_of(1), // "key_test_000...001"
1750            SharedBufferValue::Insert(Bytes::from("value1")),
1751        )];
1752        let shared_buffer_batch = SharedBufferBatch::for_test(
1753            transform_shared_buffer(shared_buffer_items.clone()),
1754            epoch,
1755            table_id,
1756        );
1757
1758        // Case 1: Seek with smaller TableId (99), but larger TableKey ("key_test_000...002").
1759        // "key_test...002" > "key_test...001".
1760        // Forward Iterator.
1761        // Expected: Should land on the first item (TableId 100 > TableId 99).
1762        // Bug description: rewinds (index 0), then binary_search("key_2") in batch (only "key_1").
1763        // "key_2" > "key_1", so index becomes 1 (end). Iterator invalid.
1764        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1765        let seek_key = FullKey::for_test(
1766            TableId::new(99),
1767            iterator_test_table_key_of(2), // larger key
1768            epoch,
1769        );
1770        iter.seek(seek_key.to_ref()).await.unwrap();
1771
1772        assert!(
1773            iter.is_valid(),
1774            "Iterator should be valid when seeking with smaller table_id, even if the key part is larger"
1775        );
1776        assert_eq!(iter.key().user_key.table_id, table_id);
1777
1778        // Case 2: Seek with larger TableId (101), but smaller TableKey ("key_test_000...000").
1779        // "key_test...000" < "key_test...001".
1780        // Backward Iterator.
1781        // Expected: Should land on the last item (TableId 100 < TableId 101).
1782        // Bug description: rewinds (index valid), then binary_search("key_0") which returns Err(0) (insertion point at start).
1783        // Backward generic logic for Err(0) is `invalidate()`.
1784        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1785        let seek_key = FullKey::for_test(
1786            TableId::new(101),
1787            iterator_test_table_key_of(0), // smaller key
1788            epoch,
1789        );
1790        iter.seek(seek_key.to_ref()).await.unwrap();
1791
1792        assert!(
1793            iter.is_valid(),
1794            "Iterator should be valid when seeking with larger table_id, even if the key part is smaller"
1795        );
1796        assert_eq!(iter.key().user_key.table_id, table_id);
1797    }
1798}