risingwave_storage/hummock/shared_buffer/
shared_buffer_batch.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::mem::size_of_val;
19use std::ops::Bound::Included;
20use std::ops::{Bound, RangeBounds};
21use std::sync::atomic::AtomicU64;
22use std::sync::atomic::Ordering::Relaxed;
23use std::sync::{Arc, LazyLock};
24
25use bytes::Bytes;
26use risingwave_common::catalog::TableId;
27use risingwave_common::metrics::LabelGuardedIntGauge;
28use risingwave_hummock_sdk::EpochWithGap;
29use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange, UserKey};
30
31use crate::hummock::iterator::{
32    Backward, DirectionEnum, Forward, HummockIterator, HummockIteratorDirection, ValueMeta,
33};
34use crate::hummock::shared_buffer::TableMemoryMetrics;
35use crate::hummock::utils::range_overlap;
36use crate::hummock::value::HummockValue;
37use crate::hummock::{HummockEpoch, HummockResult};
38use crate::store::ReadOptions;
39
40#[derive(Clone, Copy, Debug, PartialEq)]
41pub enum SharedBufferValue<T> {
42    Insert(T),
43    Update(T),
44    Delete,
45}
46
47impl<T> SharedBufferValue<T> {
48    fn to_ref(&self) -> SharedBufferValue<&T> {
49        match self {
50            SharedBufferValue::Insert(val) => SharedBufferValue::Insert(val),
51            SharedBufferValue::Update(val) => SharedBufferValue::Update(val),
52            SharedBufferValue::Delete => SharedBufferValue::Delete,
53        }
54    }
55}
56
57impl<T> From<SharedBufferValue<T>> for HummockValue<T> {
58    fn from(val: SharedBufferValue<T>) -> HummockValue<T> {
59        match val {
60            SharedBufferValue::Insert(val) | SharedBufferValue::Update(val) => {
61                HummockValue::Put(val)
62            }
63            SharedBufferValue::Delete => HummockValue::Delete,
64        }
65    }
66}
67
68impl<'a, T: AsRef<[u8]>> SharedBufferValue<&'a T> {
69    pub(crate) fn to_slice(self) -> SharedBufferValue<&'a [u8]> {
70        match self {
71            SharedBufferValue::Insert(val) => SharedBufferValue::Insert(val.as_ref()),
72            SharedBufferValue::Update(val) => SharedBufferValue::Update(val.as_ref()),
73            SharedBufferValue::Delete => SharedBufferValue::Delete,
74        }
75    }
76}
77
78/// The key is `table_key`, which does not contain table id or epoch.
79pub(crate) type SharedBufferItem = (TableKey<Bytes>, SharedBufferValue<Bytes>);
80pub type SharedBufferBatchId = u64;
81
82#[derive(Debug, PartialEq)]
83pub(crate) struct SharedBufferEntry {
84    pub(crate) key: TableKey<Bytes>,
85    pub(crate) value: SharedBufferValue<Bytes>,
86}
87
88#[derive(Debug)]
89pub(crate) struct SharedBufferBatchOldValues {
90    /// Store the old values. If some, the length should be the same as `entries`. It contains empty `Bytes` when the
91    /// corresponding `value` is `Insert`, and contains the old values of `Update` and `Delete`.
92    values: Vec<Bytes>,
93    pub size: usize,
94    pub global_old_value_size: LabelGuardedIntGauge,
95}
96
97impl Drop for SharedBufferBatchOldValues {
98    fn drop(&mut self) {
99        self.global_old_value_size.sub(self.size as _);
100    }
101}
102
103impl SharedBufferBatchOldValues {
104    pub(crate) fn new(
105        values: Vec<Bytes>,
106        size: usize,
107        global_old_value_size: LabelGuardedIntGauge,
108    ) -> Self {
109        global_old_value_size.add(size as _);
110        Self {
111            values,
112            size,
113            global_old_value_size,
114        }
115    }
116
117    pub(crate) fn for_test(values: Vec<Bytes>, size: usize) -> Self {
118        Self::new(values, size, LabelGuardedIntGauge::test_int_gauge::<1>())
119    }
120}
121
122#[derive(Debug)]
123pub(crate) struct SharedBufferBatchInner {
124    entries: Vec<SharedBufferEntry>,
125    old_values: Option<SharedBufferBatchOldValues>,
126    epoch_with_gap: EpochWithGap,
127    /// Total size of all key-value items (excluding the `epoch` of value versions)
128    size: usize,
129    per_table_tracker: Arc<TableMemoryMetrics>,
130    /// For a batch created from multiple batches, this will be
131    /// the largest batch id among input batches
132    batch_id: SharedBufferBatchId,
133}
134
135impl SharedBufferBatchInner {
136    pub(crate) fn new(
137        epoch: HummockEpoch,
138        spill_offset: u16,
139        payload: Vec<SharedBufferItem>,
140        old_values: Option<SharedBufferBatchOldValues>,
141        size: usize,
142        table_metrics: Arc<TableMemoryMetrics>,
143    ) -> Self {
144        assert!(!payload.is_empty());
145        debug_assert!(payload.iter().is_sorted_by_key(|(key, _)| key));
146        if let Some(old_values) = &old_values {
147            assert_eq!(old_values.values.len(), payload.len());
148        }
149
150        let epoch_with_gap = EpochWithGap::new(epoch, spill_offset);
151        let entries = payload
152            .into_iter()
153            .map(|(key, value)| SharedBufferEntry { key, value })
154            .collect();
155
156        let batch_id = SHARED_BUFFER_BATCH_ID_GENERATOR.fetch_add(1, Relaxed);
157
158        table_metrics.inc_imm(size);
159
160        SharedBufferBatchInner {
161            entries,
162            old_values,
163            epoch_with_gap,
164            size,
165            per_table_tracker: table_metrics,
166            batch_id,
167        }
168    }
169
170    /// Return `None` if cannot find a visible version
171    /// Return `HummockValue::Delete` if the key has been deleted by some epoch <= `read_epoch`
172    fn get_value<'a>(
173        &'a self,
174        table_key: TableKey<&[u8]>,
175        read_epoch: HummockEpoch,
176    ) -> Option<(HummockValue<&'a Bytes>, EpochWithGap)> {
177        // Perform binary search on table key to find the corresponding entry
178        if let Ok(i) = self
179            .entries
180            .binary_search_by(|m| (m.key.as_ref()).cmp(*table_key))
181        {
182            let SharedBufferEntry { key, value } = &self.entries[i];
183            debug_assert_eq!(key.as_ref(), *table_key);
184            if read_epoch >= self.epoch_with_gap.pure_epoch() {
185                return Some((value.to_ref().into(), self.epoch_with_gap));
186            }
187        }
188
189        None
190    }
191}
192
193impl Drop for SharedBufferBatchInner {
194    fn drop(&mut self) {
195        self.per_table_tracker.dec_imm(self.size);
196    }
197}
198
199pub static SHARED_BUFFER_BATCH_ID_GENERATOR: LazyLock<AtomicU64> =
200    LazyLock::new(|| AtomicU64::new(0));
201
202/// A write batch stored in the shared buffer.
203#[derive(Clone, Debug)]
204pub struct SharedBufferBatch {
205    pub(crate) inner: Arc<SharedBufferBatchInner>,
206    pub table_id: TableId,
207}
208
209impl SharedBufferBatch {
210    pub fn for_test(
211        sorted_items: Vec<SharedBufferItem>,
212        epoch: HummockEpoch,
213        table_id: TableId,
214    ) -> Self {
215        Self::for_test_inner(sorted_items, None, epoch, table_id)
216    }
217
218    pub fn for_test_with_old_values(
219        sorted_items: Vec<SharedBufferItem>,
220        old_values: Vec<Bytes>,
221        epoch: HummockEpoch,
222        table_id: TableId,
223    ) -> Self {
224        Self::for_test_inner(sorted_items, Some(old_values), epoch, table_id)
225    }
226
227    fn for_test_inner(
228        sorted_items: Vec<SharedBufferItem>,
229        old_values: Option<Vec<Bytes>>,
230        epoch: HummockEpoch,
231        table_id: TableId,
232    ) -> Self {
233        let (size, old_value_size) = Self::measure_batch_size(&sorted_items, old_values.as_deref());
234
235        let old_values = old_values
236            .map(|old_values| SharedBufferBatchOldValues::for_test(old_values, old_value_size));
237
238        Self {
239            inner: Arc::new(SharedBufferBatchInner::new(
240                epoch,
241                0,
242                sorted_items,
243                old_values,
244                size,
245                TableMemoryMetrics::for_test(),
246            )),
247            table_id,
248        }
249    }
250
251    pub fn measure_delete_range_size(batch_items: &[(Bound<Bytes>, Bound<Bytes>)]) -> usize {
252        batch_items
253            .iter()
254            .map(|(left, right)| {
255                // is_exclude_left_key(bool) + table_id + epoch
256                let l1 = match left {
257                    Bound::Excluded(x) | Bound::Included(x) => x.len() + 13,
258                    Bound::Unbounded => 13,
259                };
260                let l2 = match right {
261                    Bound::Excluded(x) | Bound::Included(x) => x.len() + 13,
262                    Bound::Unbounded => 13,
263                };
264                l1 + l2
265            })
266            .sum()
267    }
268
269    /// Return (total size, old value size or 0)
270    pub fn measure_batch_size(
271        batch_items: &[SharedBufferItem],
272        old_values: Option<&[Bytes]>,
273    ) -> (usize, usize) {
274        let old_value_size = old_values
275            .iter()
276            .flat_map(|slice| slice.iter().map(|value| size_of_val(value) + value.len()))
277            .sum::<usize>();
278        // size = Sum(length of full key + length of user value)
279        let kv_size = batch_items
280            .iter()
281            .map(|(k, v)| {
282                k.len() + {
283                    match v {
284                        SharedBufferValue::Insert(val) | SharedBufferValue::Update(val) => {
285                            val.len()
286                        }
287                        SharedBufferValue::Delete => 0,
288                    }
289                }
290            })
291            .sum::<usize>();
292        (kv_size + old_value_size, old_value_size)
293    }
294
295    pub fn filter<R, B>(&self, table_id: TableId, table_key_range: &R) -> bool
296    where
297        R: RangeBounds<TableKey<B>>,
298        B: AsRef<[u8]>,
299    {
300        let left = table_key_range
301            .start_bound()
302            .as_ref()
303            .map(|key| TableKey(key.0.as_ref()));
304        let right = table_key_range
305            .end_bound()
306            .as_ref()
307            .map(|key| TableKey(key.0.as_ref()));
308        self.table_id == table_id
309            && range_overlap(
310                &(left, right),
311                &self.start_table_key(),
312                Included(&self.end_table_key()),
313            )
314    }
315
316    pub fn table_id(&self) -> TableId {
317        self.table_id
318    }
319
320    pub fn key_count(&self) -> usize {
321        self.inner.entries.len()
322    }
323
324    pub fn value_count(&self) -> usize {
325        self.inner.entries.len()
326    }
327
328    pub fn has_old_value(&self) -> bool {
329        self.inner.old_values.is_some()
330    }
331
332    pub fn get<'a>(
333        &'a self,
334        table_key: TableKey<&[u8]>,
335        read_epoch: HummockEpoch,
336        _read_options: &ReadOptions,
337    ) -> Option<(HummockValue<&'a Bytes>, EpochWithGap)> {
338        self.inner.get_value(table_key, read_epoch)
339    }
340
341    pub fn range_exists(&self, table_key_range: &TableKeyRange) -> bool {
342        self.inner
343            .entries
344            .binary_search_by(|m| {
345                let key = &m.key;
346                let too_left = match &table_key_range.0 {
347                    std::ops::Bound::Included(range_start) => range_start.as_ref() > key.as_ref(),
348                    std::ops::Bound::Excluded(range_start) => range_start.as_ref() >= key.as_ref(),
349                    std::ops::Bound::Unbounded => false,
350                };
351                if too_left {
352                    return Ordering::Less;
353                }
354
355                let too_right = match &table_key_range.1 {
356                    std::ops::Bound::Included(range_end) => range_end.as_ref() < key.as_ref(),
357                    std::ops::Bound::Excluded(range_end) => range_end.as_ref() <= key.as_ref(),
358                    std::ops::Bound::Unbounded => false,
359                };
360                if too_right {
361                    return Ordering::Greater;
362                }
363
364                Ordering::Equal
365            })
366            .is_ok()
367    }
368
369    pub fn into_directed_iter<D: HummockIteratorDirection, const IS_NEW_VALUE: bool>(
370        self,
371    ) -> SharedBufferBatchIterator<D, IS_NEW_VALUE> {
372        SharedBufferBatchIterator::<D, IS_NEW_VALUE>::new(self.inner, self.table_id)
373    }
374
375    pub fn into_old_value_iter(self) -> SharedBufferBatchIterator<Forward, false> {
376        self.into_directed_iter()
377    }
378
379    pub fn into_forward_iter(self) -> SharedBufferBatchIterator<Forward> {
380        self.into_directed_iter()
381    }
382
383    pub fn into_backward_iter(self) -> SharedBufferBatchIterator<Backward> {
384        self.into_directed_iter()
385    }
386
387    #[inline(always)]
388    pub fn start_table_key(&self) -> TableKey<&[u8]> {
389        TableKey(self.inner.entries.first().expect("non-empty").key.as_ref())
390    }
391
392    #[inline(always)]
393    pub fn end_table_key(&self) -> TableKey<&[u8]> {
394        TableKey(self.inner.entries.last().expect("non-empty").key.as_ref())
395    }
396
397    #[inline(always)]
398    pub fn raw_largest_key(&self) -> &TableKey<Bytes> {
399        &self.inner.entries.last().expect("non-empty").key
400    }
401
402    /// return inclusive left endpoint, which means that all data in this batch should be larger or
403    /// equal than this key.
404    pub fn start_user_key(&self) -> UserKey<&[u8]> {
405        UserKey::new(self.table_id, self.start_table_key())
406    }
407
408    pub fn size(&self) -> usize {
409        self.inner.size
410    }
411
412    pub fn batch_id(&self) -> SharedBufferBatchId {
413        self.inner.batch_id
414    }
415
416    pub fn epoch(&self) -> HummockEpoch {
417        self.inner.epoch_with_gap.pure_epoch()
418    }
419
420    pub(crate) fn build_shared_buffer_batch(
421        epoch: HummockEpoch,
422        spill_offset: u16,
423        sorted_items: Vec<SharedBufferItem>,
424        old_values: Option<SharedBufferBatchOldValues>,
425        size: usize,
426        table_id: TableId,
427        table_metrics: Arc<TableMemoryMetrics>,
428    ) -> Self {
429        let inner = SharedBufferBatchInner::new(
430            epoch,
431            spill_offset,
432            sorted_items,
433            old_values,
434            size,
435            table_metrics,
436        );
437        SharedBufferBatch {
438            inner: Arc::new(inner),
439            table_id,
440        }
441    }
442
443    #[cfg(any(test, feature = "test"))]
444    pub fn build_shared_buffer_batch_for_test(
445        epoch: HummockEpoch,
446        spill_offset: u16,
447        sorted_items: Vec<SharedBufferItem>,
448        size: usize,
449        table_id: TableId,
450    ) -> Self {
451        let inner = SharedBufferBatchInner::new(
452            epoch,
453            spill_offset,
454            sorted_items,
455            None,
456            size,
457            TableMemoryMetrics::for_test(),
458        );
459        SharedBufferBatch {
460            inner: Arc::new(inner),
461            table_id,
462        }
463    }
464}
465
466/// Iterate all the items in the shared buffer batch
467/// If there are multiple versions of a key, the iterator will return all versions
468pub struct SharedBufferBatchIterator<D: HummockIteratorDirection, const IS_NEW_VALUE: bool = true> {
469    inner: Arc<SharedBufferBatchInner>,
470    /// The index of the current entry in the payload
471    current_entry_idx: usize,
472    table_id: TableId,
473    _phantom: PhantomData<D>,
474}
475
476impl<D: HummockIteratorDirection, const IS_NEW_VALUE: bool>
477    SharedBufferBatchIterator<D, IS_NEW_VALUE>
478{
479    pub(crate) fn new(inner: Arc<SharedBufferBatchInner>, table_id: TableId) -> Self {
480        if !IS_NEW_VALUE {
481            assert!(
482                inner.old_values.is_some(),
483                "create old value iter with no old value: {:?}",
484                table_id
485            );
486        }
487        Self {
488            inner,
489            current_entry_idx: 0,
490            table_id,
491            _phantom: Default::default(),
492        }
493    }
494
495    fn is_valid_entry_idx(&self) -> bool {
496        self.current_entry_idx < self.inner.entries.len()
497    }
498
499    fn invalidate(&mut self) {
500        self.current_entry_idx = self.inner.entries.len();
501    }
502
503    fn advance_to_next_entry(&mut self) {
504        debug_assert!(self.is_valid_entry_idx());
505        match D::direction() {
506            DirectionEnum::Forward => {
507                self.current_entry_idx += 1;
508            }
509            DirectionEnum::Backward => {
510                if self.current_entry_idx == 0 {
511                    self.invalidate();
512                } else {
513                    self.current_entry_idx -= 1;
514                }
515            }
516        }
517    }
518
519    fn assert_valid_idx(&self) {
520        debug_assert!(self.is_valid_entry_idx());
521        if !IS_NEW_VALUE {
522            debug_assert!(!matches!(
523                self.inner.entries[self.current_entry_idx].value,
524                SharedBufferValue::Insert(_)
525            ));
526        }
527    }
528
529    fn advance_until_valid_old_value(&mut self) {
530        debug_assert!(!IS_NEW_VALUE);
531        while self.is_valid_entry_idx()
532            && matches!(
533                self.inner.entries[self.current_entry_idx].value,
534                SharedBufferValue::Insert(_)
535            )
536        {
537            self.advance_to_next_entry();
538        }
539    }
540}
541
542impl<D: HummockIteratorDirection, const IS_NEW_VALUE: bool> HummockIterator
543    for SharedBufferBatchIterator<D, IS_NEW_VALUE>
544{
545    type Direction = D;
546
547    async fn next(&mut self) -> HummockResult<()> {
548        self.advance_to_next_entry();
549        if !IS_NEW_VALUE {
550            self.advance_until_valid_old_value();
551        }
552        Ok(())
553    }
554
555    fn key(&self) -> FullKey<&[u8]> {
556        self.assert_valid_idx();
557        let entry = &self.inner.entries[self.current_entry_idx];
558        FullKey::new_with_gap_epoch(
559            self.table_id,
560            TableKey(entry.key.as_ref()),
561            self.inner.epoch_with_gap,
562        )
563    }
564
565    fn value(&self) -> HummockValue<&[u8]> {
566        self.assert_valid_idx();
567        if IS_NEW_VALUE {
568            self.inner.entries[self.current_entry_idx]
569                .value
570                .to_ref()
571                .to_slice()
572                .into()
573        } else {
574            HummockValue::put(
575                self.inner.old_values.as_ref().unwrap().values[self.current_entry_idx].as_ref(),
576            )
577        }
578    }
579
580    fn is_valid(&self) -> bool {
581        self.is_valid_entry_idx()
582    }
583
584    async fn rewind(&mut self) -> HummockResult<()> {
585        match D::direction() {
586            DirectionEnum::Forward => {
587                self.current_entry_idx = 0;
588            }
589            DirectionEnum::Backward => {
590                self.current_entry_idx = self.inner.entries.len() - 1;
591            }
592        };
593        if !IS_NEW_VALUE {
594            self.advance_until_valid_old_value();
595        }
596        Ok(())
597    }
598
599    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
600        match key.user_key.table_id.cmp(&self.table_id) {
601            Ordering::Less => {
602                match D::direction() {
603                    DirectionEnum::Forward => {
604                        // seek key table id < batch table id, so seek to beginning
605                        self.rewind().await?;
606                        return Ok(());
607                    }
608                    DirectionEnum::Backward => {
609                        self.invalidate();
610                        return Ok(());
611                    }
612                };
613            }
614            Ordering::Greater => {
615                match D::direction() {
616                    DirectionEnum::Forward => {
617                        self.invalidate();
618                        return Ok(());
619                    }
620                    DirectionEnum::Backward => {
621                        // seek key table id > batch table id, so seek to end
622                        self.rewind().await?;
623                        return Ok(());
624                    }
625                };
626            }
627            Ordering::Equal => (),
628        }
629        // Perform binary search on table key because the items in SharedBufferBatch is ordered
630        // by table key.
631        let partition_point = self
632            .inner
633            .entries
634            .binary_search_by(|probe| probe.key.as_ref().cmp(*key.user_key.table_key));
635        match partition_point {
636            Ok(i) => {
637                self.current_entry_idx = i;
638                // Epoch order --------->
639                // Epochs:  epoch300 epoch200 epoch100
640                // Forward: ------------------------->
641                // Backward:<-------------------------
642                // Assume self.inner.epoch_with_gap is epoch200
643                let skip_on_epoch = match D::direction() {
644                    DirectionEnum::Forward => {
645                        // should advance when key.epoch_with_gap is epoch100
646                        key.epoch_with_gap < self.inner.epoch_with_gap
647                    }
648                    DirectionEnum::Backward => {
649                        // should advance when key.epoch_with_gap is epoch300
650                        key.epoch_with_gap > self.inner.epoch_with_gap
651                    }
652                };
653                if skip_on_epoch {
654                    self.advance_to_next_entry()
655                }
656            }
657            Err(i) => match D::direction() {
658                DirectionEnum::Forward => {
659                    self.current_entry_idx = i;
660                }
661                DirectionEnum::Backward => {
662                    if i == 0 {
663                        self.invalidate();
664                    } else {
665                        self.current_entry_idx = i - 1;
666                    }
667                }
668            },
669        };
670        if !IS_NEW_VALUE {
671            self.advance_until_valid_old_value();
672        }
673        Ok(())
674    }
675
676    fn collect_local_statistic(&self, _stats: &mut crate::monitor::StoreLocalStatistic) {}
677
678    fn value_meta(&self) -> ValueMeta {
679        ValueMeta::default()
680    }
681}
682
683#[cfg(test)]
684mod tests {
685    use std::ops::Bound::Excluded;
686
687    use itertools::{Itertools, zip_eq};
688    use risingwave_common::util::epoch::{EpochExt, test_epoch};
689    use risingwave_hummock_sdk::key::map_table_key_range;
690
691    use super::*;
692    use crate::hummock::iterator::test_utils::{
693        iterator_test_key_of_epoch, iterator_test_table_key_of, transform_shared_buffer,
694    };
695
696    fn to_hummock_value_batch(
697        items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)>,
698    ) -> Vec<(Vec<u8>, HummockValue<Bytes>)> {
699        items.into_iter().map(|(k, v)| (k, v.into())).collect()
700    }
701
702    #[tokio::test]
703    async fn test_shared_buffer_batch_basic() {
704        let epoch = test_epoch(1);
705        let shared_buffer_items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
706            (
707                iterator_test_table_key_of(0),
708                SharedBufferValue::Insert(Bytes::from("value1")),
709            ),
710            (
711                iterator_test_table_key_of(1),
712                SharedBufferValue::Insert(Bytes::from("value1")),
713            ),
714            (
715                iterator_test_table_key_of(2),
716                SharedBufferValue::Insert(Bytes::from("value1")),
717            ),
718        ];
719        let shared_buffer_batch = SharedBufferBatch::for_test(
720            transform_shared_buffer(shared_buffer_items.clone()),
721            epoch,
722            Default::default(),
723        );
724        let shared_buffer_items = to_hummock_value_batch(shared_buffer_items);
725
726        // Sketch
727        assert_eq!(
728            *shared_buffer_batch.start_table_key(),
729            shared_buffer_items[0].0
730        );
731        assert_eq!(
732            *shared_buffer_batch.end_table_key(),
733            shared_buffer_items[2].0
734        );
735
736        // Point lookup
737        for (k, v) in &shared_buffer_items {
738            assert_eq!(
739                shared_buffer_batch
740                    .get(TableKey(k.as_slice()), epoch, &ReadOptions::default())
741                    .unwrap()
742                    .0
743                    .as_slice(),
744                v.as_slice()
745            );
746        }
747        assert_eq!(
748            shared_buffer_batch.get(
749                TableKey(iterator_test_table_key_of(3).as_slice()),
750                epoch,
751                &ReadOptions::default()
752            ),
753            None
754        );
755        assert_eq!(
756            shared_buffer_batch.get(
757                TableKey(iterator_test_table_key_of(4).as_slice()),
758                epoch,
759                &ReadOptions::default()
760            ),
761            None
762        );
763
764        // Forward iterator
765        let mut iter = shared_buffer_batch.clone().into_forward_iter();
766        iter.rewind().await.unwrap();
767        let mut output = vec![];
768        while iter.is_valid() {
769            output.push((
770                iter.key().user_key.table_key.to_vec(),
771                iter.value().to_bytes(),
772            ));
773            iter.next().await.unwrap();
774        }
775        assert_eq!(output, shared_buffer_items);
776
777        // Backward iterator
778        let mut backward_iter = shared_buffer_batch.clone().into_backward_iter();
779        backward_iter.rewind().await.unwrap();
780        let mut output = vec![];
781        while backward_iter.is_valid() {
782            output.push((
783                backward_iter.key().user_key.table_key.to_vec(),
784                backward_iter.value().to_bytes(),
785            ));
786            backward_iter.next().await.unwrap();
787        }
788        output.reverse();
789        assert_eq!(output, shared_buffer_items);
790    }
791
792    #[tokio::test]
793    async fn test_shared_buffer_batch_seek() {
794        let epoch = test_epoch(1);
795        let shared_buffer_items = vec![
796            (
797                iterator_test_table_key_of(1),
798                SharedBufferValue::Insert(Bytes::from("value1")),
799            ),
800            (
801                iterator_test_table_key_of(2),
802                SharedBufferValue::Insert(Bytes::from("value2")),
803            ),
804            (
805                iterator_test_table_key_of(3),
806                SharedBufferValue::Insert(Bytes::from("value3")),
807            ),
808        ];
809        let shared_buffer_batch = SharedBufferBatch::for_test(
810            transform_shared_buffer(shared_buffer_items.clone()),
811            epoch,
812            Default::default(),
813        );
814        let shared_buffer_items = to_hummock_value_batch(shared_buffer_items);
815
816        // FORWARD: Seek to a key < 1st key, expect all three items to return
817        let mut iter = shared_buffer_batch.clone().into_forward_iter();
818        iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
819            .await
820            .unwrap();
821        for item in &shared_buffer_items {
822            assert!(iter.is_valid());
823            assert_eq!(*iter.key().user_key.table_key, item.0);
824            assert_eq!(iter.value(), item.1.as_slice());
825            iter.next().await.unwrap();
826        }
827        assert!(!iter.is_valid());
828
829        // FORWARD: Seek to a key > the last key, expect no items to return
830        let mut iter = shared_buffer_batch.clone().into_forward_iter();
831        iter.seek(iterator_test_key_of_epoch(4, epoch).to_ref())
832            .await
833            .unwrap();
834        assert!(!iter.is_valid());
835
836        // FORWARD: Seek to 2nd key with current epoch, expect last two items to return
837        let mut iter = shared_buffer_batch.clone().into_forward_iter();
838        iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
839            .await
840            .unwrap();
841        for item in &shared_buffer_items[1..] {
842            assert!(iter.is_valid());
843            assert_eq!(*iter.key().user_key.table_key, item.0);
844            assert_eq!(iter.value(), item.1.as_slice());
845            iter.next().await.unwrap();
846        }
847        assert!(!iter.is_valid());
848
849        // FORWARD: Seek to 2nd key with future epoch, expect last two items to return
850        let mut iter = shared_buffer_batch.clone().into_forward_iter();
851        iter.seek(iterator_test_key_of_epoch(2, test_epoch(2)).to_ref())
852            .await
853            .unwrap();
854        for item in &shared_buffer_items[1..] {
855            assert!(iter.is_valid());
856            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
857            assert_eq!(iter.value(), item.1.as_slice());
858            iter.next().await.unwrap();
859        }
860        assert!(!iter.is_valid());
861
862        // FORWARD: Seek to 2nd key with old epoch, expect last item to return
863        let mut iter = shared_buffer_batch.clone().into_forward_iter();
864        iter.seek(iterator_test_key_of_epoch(2, test_epoch(0)).to_ref())
865            .await
866            .unwrap();
867        let item = shared_buffer_items.last().unwrap();
868        assert!(iter.is_valid());
869        assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
870        assert_eq!(iter.value(), item.1.as_slice());
871        iter.next().await.unwrap();
872        assert!(!iter.is_valid());
873
874        // BACKWARD: Seek to a key < 1st key, expect no items to return
875        let mut iter = shared_buffer_batch.clone().into_backward_iter();
876        iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
877            .await
878            .unwrap();
879        assert!(!iter.is_valid());
880
881        // BACKWARD: Seek to a key > the last key, expect all items to return
882        let mut iter = shared_buffer_batch.clone().into_backward_iter();
883        iter.seek(iterator_test_key_of_epoch(4, epoch).to_ref())
884            .await
885            .unwrap();
886        for item in shared_buffer_items.iter().rev() {
887            assert!(iter.is_valid());
888            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
889            assert_eq!(iter.value(), item.1.as_slice());
890            iter.next().await.unwrap();
891        }
892        assert!(!iter.is_valid());
893
894        // BACKWARD: Seek to 2nd key with current epoch, expect first two items to return
895        let mut iter = shared_buffer_batch.clone().into_backward_iter();
896        iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
897            .await
898            .unwrap();
899        for item in shared_buffer_items[0..=1].iter().rev() {
900            assert!(iter.is_valid());
901            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
902            assert_eq!(iter.value(), item.1.as_slice());
903            iter.next().await.unwrap();
904        }
905        assert!(!iter.is_valid());
906
907        // BACKWARD: Seek to 2nd key with old epoch, expect first two item to return
908        let mut iter = shared_buffer_batch.clone().into_backward_iter();
909        iter.seek(iterator_test_key_of_epoch(2, epoch.prev_epoch()).to_ref())
910            .await
911            .unwrap();
912        for item in shared_buffer_items[0..=1].iter().rev() {
913            assert!(iter.is_valid());
914            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
915            assert_eq!(iter.value(), item.1.as_slice());
916            iter.next().await.unwrap();
917        }
918        assert!(!iter.is_valid());
919
920        // BACKWARD: Seek to 2nd key with future epoch, expect first item to return
921        let mut iter = shared_buffer_batch.clone().into_backward_iter();
922        iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
923            .await
924            .unwrap();
925        assert!(iter.is_valid());
926        let item = shared_buffer_items.first().unwrap();
927        assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
928        assert_eq!(iter.value(), item.1.as_slice());
929        iter.next().await.unwrap();
930        assert!(!iter.is_valid());
931    }
932
933    #[tokio::test]
934    async fn test_shared_buffer_batch_old_value_iter() {
935        let epoch = test_epoch(1);
936        let key_values = vec![
937            (
938                iterator_test_table_key_of(1),
939                SharedBufferValue::Insert(Bytes::from("value1")),
940            ),
941            (
942                iterator_test_table_key_of(2),
943                SharedBufferValue::Update(Bytes::from("value2")),
944            ),
945            (
946                iterator_test_table_key_of(3),
947                SharedBufferValue::Insert(Bytes::from("value3")),
948            ),
949            (iterator_test_table_key_of(4), SharedBufferValue::Delete),
950        ];
951        let old_values = vec![
952            Bytes::new(),
953            Bytes::from("old_value2"),
954            Bytes::new(),
955            Bytes::from("old_value4"),
956        ];
957        let shared_buffer_batch = SharedBufferBatch::for_test_with_old_values(
958            transform_shared_buffer(key_values.clone()),
959            old_values.clone(),
960            epoch,
961            Default::default(),
962        );
963        let shared_buffer_items = to_hummock_value_batch(key_values.clone());
964        let expected_old_value_iter_items = zip_eq(&key_values, &old_values)
965            .filter(|((_, new_value), _)| !matches!(new_value, SharedBufferValue::Insert(_)))
966            .map(|((key, _), old_value)| (key.clone(), HummockValue::Put(old_value)))
967            .collect_vec();
968
969        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
970        iter.rewind().await.unwrap();
971        for item in &expected_old_value_iter_items {
972            assert!(iter.is_valid());
973            assert_eq!(*iter.key().user_key.table_key, item.0);
974            assert_eq!(iter.value(), item.1.as_slice());
975            iter.next().await.unwrap();
976        }
977        assert!(!iter.is_valid());
978
979        // FORWARD: Seek to a key < 1st key, expect all three items to return
980        let mut iter = shared_buffer_batch.clone().into_forward_iter();
981        iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
982            .await
983            .unwrap();
984        for item in &shared_buffer_items {
985            assert!(iter.is_valid());
986            assert_eq!(*iter.key().user_key.table_key, item.0);
987            assert_eq!(iter.value(), item.1.as_slice());
988            iter.next().await.unwrap();
989        }
990        assert!(!iter.is_valid());
991
992        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
993        iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
994            .await
995            .unwrap();
996        for item in &expected_old_value_iter_items {
997            assert!(iter.is_valid());
998            assert_eq!(*iter.key().user_key.table_key, item.0);
999            assert_eq!(iter.value(), item.1.as_slice());
1000            iter.next().await.unwrap();
1001        }
1002        assert!(!iter.is_valid());
1003
1004        // FORWARD: Seek to a key > the last key, expect no items to return
1005        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1006        iter.seek(iterator_test_key_of_epoch(5, epoch).to_ref())
1007            .await
1008            .unwrap();
1009        assert!(!iter.is_valid());
1010
1011        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1012        iter.seek(iterator_test_key_of_epoch(5, epoch).to_ref())
1013            .await
1014            .unwrap();
1015        assert!(!iter.is_valid());
1016
1017        // FORWARD: Seek to 2nd key with current epoch, expect last two items to return
1018        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1019        iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1020            .await
1021            .unwrap();
1022        for item in &shared_buffer_items[1..] {
1023            assert!(iter.is_valid());
1024            assert_eq!(*iter.key().user_key.table_key, item.0);
1025            assert_eq!(iter.value(), item.1.as_slice());
1026            iter.next().await.unwrap();
1027        }
1028        assert!(!iter.is_valid());
1029
1030        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1031        iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1032            .await
1033            .unwrap();
1034        for item in &expected_old_value_iter_items {
1035            assert!(iter.is_valid());
1036            assert_eq!(*iter.key().user_key.table_key, item.0);
1037            assert_eq!(iter.value(), item.1.as_slice());
1038            iter.next().await.unwrap();
1039        }
1040        assert!(!iter.is_valid());
1041
1042        // FORWARD: Seek to 2nd key with future epoch, expect last two items to return
1043        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1044        iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1045            .await
1046            .unwrap();
1047        for item in &shared_buffer_items[1..] {
1048            assert!(iter.is_valid());
1049            assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1050            assert_eq!(iter.value(), item.1.as_slice());
1051            iter.next().await.unwrap();
1052        }
1053        assert!(!iter.is_valid());
1054
1055        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1056        iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1057            .await
1058            .unwrap();
1059        for item in &expected_old_value_iter_items {
1060            assert!(iter.is_valid());
1061            assert_eq!(*iter.key().user_key.table_key, item.0);
1062            assert_eq!(iter.value(), item.1.as_slice());
1063            iter.next().await.unwrap();
1064        }
1065        assert!(!iter.is_valid());
1066
1067        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1068        iter.seek(iterator_test_key_of_epoch(2, epoch.prev_epoch()).to_ref())
1069            .await
1070            .unwrap();
1071        for item in &expected_old_value_iter_items[1..] {
1072            assert!(iter.is_valid());
1073            assert_eq!(*iter.key().user_key.table_key, item.0);
1074            assert_eq!(iter.value(), item.1.as_slice());
1075            iter.next().await.unwrap();
1076        }
1077        assert!(!iter.is_valid());
1078
1079        // FORWARD: Seek to 2nd key with old epoch, expect last item to return
1080        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1081        iter.seek(iterator_test_key_of_epoch(3, epoch.prev_epoch()).to_ref())
1082            .await
1083            .unwrap();
1084        let item = shared_buffer_items.last().unwrap();
1085        assert!(iter.is_valid());
1086        assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1087        assert_eq!(iter.value(), item.1.as_slice());
1088        iter.next().await.unwrap();
1089        assert!(!iter.is_valid());
1090
1091        // Seek to an insert key
1092        let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1093        iter.seek(iterator_test_key_of_epoch(3, epoch).to_ref())
1094            .await
1095            .unwrap();
1096        for item in &expected_old_value_iter_items[1..] {
1097            assert!(iter.is_valid());
1098            assert_eq!(*iter.key().user_key.table_key, item.0);
1099            assert_eq!(iter.value(), item.1.as_slice());
1100            iter.next().await.unwrap();
1101        }
1102        assert!(!iter.is_valid());
1103    }
1104
1105    #[tokio::test]
1106    #[should_panic]
1107    async fn test_invalid_table_id() {
1108        let epoch = test_epoch(1);
1109        let shared_buffer_batch = SharedBufferBatch::for_test(vec![], epoch, Default::default());
1110        // Seeking to non-current epoch should panic
1111        let mut iter = shared_buffer_batch.into_forward_iter();
1112        iter.seek(FullKey::for_test(TableId::new(1), vec![], epoch).to_ref())
1113            .await
1114            .unwrap();
1115    }
1116
1117    #[tokio::test]
1118    async fn test_shared_buffer_batch_range_existx() {
1119        let epoch = test_epoch(1);
1120        let shared_buffer_items = vec![
1121            (
1122                Vec::from("a_1"),
1123                SharedBufferValue::Insert(Bytes::from("value1")),
1124            ),
1125            (
1126                Vec::from("a_3"),
1127                SharedBufferValue::Insert(Bytes::from("value2")),
1128            ),
1129            (
1130                Vec::from("a_5"),
1131                SharedBufferValue::Insert(Bytes::from("value3")),
1132            ),
1133            (
1134                Vec::from("b_2"),
1135                SharedBufferValue::Insert(Bytes::from("value3")),
1136            ),
1137        ];
1138        let shared_buffer_batch = SharedBufferBatch::for_test(
1139            transform_shared_buffer(shared_buffer_items),
1140            epoch,
1141            Default::default(),
1142        );
1143
1144        let range = (Included(Bytes::from("a")), Excluded(Bytes::from("b")));
1145        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1146        let range = (Included(Bytes::from("a_")), Excluded(Bytes::from("b_")));
1147        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1148        let range = (Included(Bytes::from("a_1")), Included(Bytes::from("a_1")));
1149        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1150        let range = (Included(Bytes::from("a_1")), Included(Bytes::from("a_2")));
1151        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1152        let range = (Included(Bytes::from("a_0x")), Included(Bytes::from("a_2x")));
1153        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1154        let range = (Included(Bytes::from("a_")), Excluded(Bytes::from("c_")));
1155        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1156        let range = (Included(Bytes::from("b_0x")), Included(Bytes::from("b_2x")));
1157        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1158        let range = (Included(Bytes::from("b_2")), Excluded(Bytes::from("c_1x")));
1159        assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1160
1161        let range = (Included(Bytes::from("a_0")), Excluded(Bytes::from("a_1")));
1162        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1163        let range = (Included(Bytes::from("a__0")), Excluded(Bytes::from("a__5")));
1164        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1165        let range = (Included(Bytes::from("b_1")), Excluded(Bytes::from("b_2")));
1166        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1167        let range = (Included(Bytes::from("b_3")), Excluded(Bytes::from("c_1")));
1168        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1169        let range = (Included(Bytes::from("b__x")), Excluded(Bytes::from("c__x")));
1170        assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1171    }
1172
1173    #[tokio::test]
1174    async fn test_shared_buffer_batch_seek_bug() {
1175        // Reproduce the bug where seek falls through to binary_search when table_id mismatch
1176        let epoch = test_epoch(1);
1177        let table_id = TableId::new(100);
1178        let shared_buffer_items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![(
1179            iterator_test_table_key_of(1), // "key_test_000...001"
1180            SharedBufferValue::Insert(Bytes::from("value1")),
1181        )];
1182        let shared_buffer_batch = SharedBufferBatch::for_test(
1183            transform_shared_buffer(shared_buffer_items.clone()),
1184            epoch,
1185            table_id,
1186        );
1187
1188        // Case 1: Seek with smaller TableId (99), but larger TableKey ("key_test_000...002").
1189        // "key_test...002" > "key_test...001".
1190        // Forward Iterator.
1191        // Expected: Should land on the first item (TableId 100 > TableId 99).
1192        // Bug description: rewinds (index 0), then binary_search("key_2") in batch (only "key_1").
1193        // "key_2" > "key_1", so index becomes 1 (end). Iterator invalid.
1194        let mut iter = shared_buffer_batch.clone().into_forward_iter();
1195        let seek_key = FullKey::for_test(
1196            TableId::new(99),
1197            iterator_test_table_key_of(2), // larger key
1198            epoch,
1199        );
1200        iter.seek(seek_key.to_ref()).await.unwrap();
1201
1202        assert!(
1203            iter.is_valid(),
1204            "Iterator should be valid when seeking with smaller table_id, even if the key part is larger"
1205        );
1206        assert_eq!(iter.key().user_key.table_id, table_id);
1207
1208        // Case 2: Seek with larger TableId (101), but smaller TableKey ("key_test_000...000").
1209        // "key_test...000" < "key_test...001".
1210        // Backward Iterator.
1211        // Expected: Should land on the last item (TableId 100 < TableId 101).
1212        // Bug description: rewinds (index valid), then binary_search("key_0") which returns Err(0) (insertion point at start).
1213        // Backward generic logic for Err(0) is `invalidate()`.
1214        let mut iter = shared_buffer_batch.clone().into_backward_iter();
1215        let seek_key = FullKey::for_test(
1216            TableId::new(101),
1217            iterator_test_table_key_of(0), // smaller key
1218            epoch,
1219        );
1220        iter.seek(seek_key.to_ref()).await.unwrap();
1221
1222        assert!(
1223            iter.is_valid(),
1224            "Iterator should be valid when seeking with larger table_id, even if the key part is smaller"
1225        );
1226        assert_eq!(iter.key().user_key.table_id, table_id);
1227    }
1228}