risingwave_storage/
mem_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::collections::btree_map::Entry;
17use std::ops::Bound::{Included, Unbounded};
18use std::ops::RangeBounds;
19
20use bytes::Bytes;
21use risingwave_common::catalog::TableId;
22use risingwave_common_estimate_size::{EstimateSize, KvSize};
23use risingwave_hummock_sdk::key::TableKey;
24use thiserror::Error;
25use thiserror_ext::AsReport;
26use tracing::error;
27
28use crate::hummock::iterator::{Backward, Forward, FromRustIterator, RustIteratorBuilder};
29use crate::hummock::shared_buffer::shared_buffer_batch::{SharedBufferBatch, SharedBufferBatchId};
30use crate::hummock::utils::sanity_check_enabled;
31use crate::hummock::value::HummockValue;
32use crate::row_serde::value_serde::ValueRowSerde;
33use crate::store::*;
34pub type ImmutableMemtable = SharedBufferBatch;
35
36pub type ImmId = SharedBufferBatchId;
37
38#[derive(Clone, Debug, EstimateSize)]
39pub enum KeyOp {
40    Insert(Bytes),
41    Delete(Bytes),
42    /// (`old_value`, `new_value`)
43    Update((Bytes, Bytes)),
44}
45
46/// `MemTable` is a buffer for modify operations without encoding
47#[derive(Clone)]
48pub struct MemTable {
49    table_id: TableId,
50    pub(crate) buffer: MemTableStore,
51    pub(crate) op_consistency_level: OpConsistencyLevel,
52    pub(crate) kv_size: KvSize,
53}
54
55#[derive(Error, Debug)]
56pub enum MemTableError {
57    #[error("Inconsistent operation on table {table_id} {key:?}, prev: {prev:?}, new: {new:?}")]
58    InconsistentOperation {
59        table_id: TableId,
60        key: TableKey<Bytes>,
61        prev: KeyOp,
62        new: KeyOp,
63    },
64}
65
66type Result<T> = std::result::Result<T, Box<MemTableError>>;
67
68pub type MemTableStore = BTreeMap<TableKey<Bytes>, KeyOp>;
69pub struct MemTableIteratorBuilder;
70pub struct MemTableRevIteratorBuilder;
71
72fn map_to_hummock_value<'a>(
73    (key, op): (&'a TableKey<Bytes>, &'a KeyOp),
74) -> (TableKey<&'a [u8]>, HummockValue<&'a [u8]>) {
75    (
76        TableKey(key.0.as_ref()),
77        match op {
78            KeyOp::Insert(value) | KeyOp::Update((_, value)) => HummockValue::Put(value),
79            KeyOp::Delete(_) => HummockValue::Delete,
80        },
81    )
82}
83
84impl RustIteratorBuilder for MemTableRevIteratorBuilder {
85    type Direction = Backward;
86    type Iterable = MemTableStore;
87
88    type RewindIter<'a> =
89        impl Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)> + Send + 'a;
90    type SeekIter<'a> =
91        impl Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)> + Send + 'a;
92
93    fn seek<'a>(iterable: &'a Self::Iterable, seek_key: TableKey<&[u8]>) -> Self::SeekIter<'a> {
94        iterable
95            .range::<[u8], _>((Unbounded, Included(seek_key.0)))
96            .rev()
97            .map(map_to_hummock_value)
98    }
99
100    fn rewind(iterable: &Self::Iterable) -> Self::RewindIter<'_> {
101        iterable.iter().rev().map(map_to_hummock_value)
102    }
103}
104
105impl RustIteratorBuilder for MemTableIteratorBuilder {
106    type Direction = Forward;
107    type Iterable = MemTableStore;
108
109    type RewindIter<'a> =
110        impl Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)> + Send + 'a;
111    type SeekIter<'a> =
112        impl Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)> + Send + 'a;
113
114    fn seek<'a>(iterable: &'a Self::Iterable, seek_key: TableKey<&[u8]>) -> Self::SeekIter<'a> {
115        iterable
116            .range::<[u8], _>((Included(seek_key.0), Unbounded))
117            .map(map_to_hummock_value)
118    }
119
120    fn rewind(iterable: &Self::Iterable) -> Self::RewindIter<'_> {
121        iterable.iter().map(map_to_hummock_value)
122    }
123}
124
125pub type MemTableHummockIterator<'a> = FromRustIterator<'a, MemTableIteratorBuilder>;
126pub type MemTableHummockRevIterator<'a> = FromRustIterator<'a, MemTableRevIteratorBuilder>;
127
128impl MemTable {
129    pub fn new(table_id: TableId, op_consistency_level: OpConsistencyLevel) -> Self {
130        Self {
131            table_id,
132            buffer: BTreeMap::new(),
133            op_consistency_level,
134            kv_size: KvSize::new(),
135        }
136    }
137
138    pub fn drain(&mut self) -> Self {
139        self.kv_size.set(0);
140        std::mem::replace(
141            self,
142            Self::new(self.table_id, self.op_consistency_level.clone()),
143        )
144    }
145
146    pub fn is_dirty(&self) -> bool {
147        !self.buffer.is_empty()
148    }
149
150    /// write methods
151    pub fn insert(&mut self, pk: TableKey<Bytes>, value: Bytes) -> Result<()> {
152        if let OpConsistencyLevel::Inconsistent = &self.op_consistency_level {
153            let key_len = std::mem::size_of::<Bytes>() + pk.len();
154            let op = KeyOp::Insert(value);
155            self.kv_size.add(&pk, &op);
156            let old_op = self.buffer.insert(pk, op);
157            self.sub_old_op_size(old_op, key_len);
158            return Ok(());
159        };
160        let entry = self.buffer.entry(pk);
161        match entry {
162            Entry::Vacant(e) => {
163                let op = KeyOp::Insert(value);
164                self.kv_size.add(e.key(), &op);
165                e.insert(op);
166                Ok(())
167            }
168            Entry::Occupied(mut e) => {
169                let old_op = e.get_mut();
170                self.kv_size.sub_val(old_op);
171
172                match old_op {
173                    KeyOp::Delete(old_op_old_value) => {
174                        let new_op = KeyOp::Update((std::mem::take(old_op_old_value), value));
175                        self.kv_size.add_val(&new_op);
176                        e.insert(new_op);
177                        Ok(())
178                    }
179                    KeyOp::Insert(_) | KeyOp::Update(_) => {
180                        let new_op = KeyOp::Insert(value);
181                        let err = MemTableError::InconsistentOperation {
182                            table_id: self.table_id,
183                            key: e.key().clone(),
184                            prev: e.get().clone(),
185                            new: new_op.clone(),
186                        };
187
188                        if sanity_check_enabled() {
189                            Err(err.into())
190                        } else {
191                            tracing::error!(
192                                error = %err.as_report(),
193                                "double insert / insert on updated, ignoring because sanity check is disabled"
194                            );
195                            self.kv_size.add_val(&new_op);
196                            e.insert(new_op);
197                            Ok(())
198                        }
199                    }
200                }
201            }
202        }
203    }
204
205    pub fn delete(&mut self, pk: TableKey<Bytes>, old_value: Bytes) -> Result<()> {
206        let key_len = std::mem::size_of::<Bytes>() + pk.len();
207        let OpConsistencyLevel::ConsistentOldValue {
208            check_old_value: value_checker,
209            ..
210        } = &self.op_consistency_level
211        else {
212            let op = KeyOp::Delete(old_value);
213            self.kv_size.add(&pk, &op);
214            let old_op = self.buffer.insert(pk, op);
215            self.sub_old_op_size(old_op, key_len);
216            return Ok(());
217        };
218        let entry = self.buffer.entry(pk);
219        match entry {
220            Entry::Vacant(e) => {
221                let op = KeyOp::Delete(old_value);
222                self.kv_size.add(e.key(), &op);
223                e.insert(op);
224                Ok(())
225            }
226            Entry::Occupied(mut e) => {
227                let old_op = e.get_mut();
228                self.kv_size.sub_val(old_op);
229
230                match old_op {
231                    KeyOp::Insert(old_op_new_value) => {
232                        if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
233                            return Err(Box::new(MemTableError::InconsistentOperation {
234                                table_id: self.table_id,
235                                key: e.key().clone(),
236                                prev: e.get().clone(),
237                                new: KeyOp::Delete(old_value),
238                            }));
239                        }
240
241                        self.kv_size.sub_size(key_len);
242                        e.remove();
243                        Ok(())
244                    }
245                    KeyOp::Delete(_) => {
246                        let new_op = KeyOp::Delete(old_value);
247                        let err = MemTableError::InconsistentOperation {
248                            table_id: self.table_id,
249                            key: e.key().clone(),
250                            prev: e.get().clone(),
251                            new: new_op.clone(),
252                        };
253
254                        if sanity_check_enabled() {
255                            Err(err.into())
256                        } else {
257                            tracing::error!(
258                                error = %err.as_report(),
259                                "double delete, ignoring because sanity check is disabled"
260                            );
261                            self.kv_size.add_val(&new_op);
262                            e.insert(new_op);
263                            Ok(())
264                        }
265                    }
266                    KeyOp::Update((old_op_old_value, old_op_new_value)) => {
267                        if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
268                            return Err(Box::new(MemTableError::InconsistentOperation {
269                                table_id: self.table_id,
270                                key: e.key().clone(),
271                                prev: e.get().clone(),
272                                new: KeyOp::Delete(old_value),
273                            }));
274                        }
275
276                        let new_op = KeyOp::Delete(std::mem::take(old_op_old_value));
277                        self.kv_size.add_val(&new_op);
278                        e.insert(new_op);
279                        Ok(())
280                    }
281                }
282            }
283        }
284    }
285
286    pub fn update(
287        &mut self,
288        pk: TableKey<Bytes>,
289        old_value: Bytes,
290        new_value: Bytes,
291    ) -> Result<()> {
292        let OpConsistencyLevel::ConsistentOldValue {
293            check_old_value: value_checker,
294            ..
295        } = &self.op_consistency_level
296        else {
297            let key_len = std::mem::size_of::<Bytes>() + pk.len();
298            let op = KeyOp::Update((old_value, new_value));
299            self.kv_size.add(&pk, &op);
300            let old_op = self.buffer.insert(pk, op);
301            self.sub_old_op_size(old_op, key_len);
302            return Ok(());
303        };
304        let entry = self.buffer.entry(pk);
305        match entry {
306            Entry::Vacant(e) => {
307                let op = KeyOp::Update((old_value, new_value));
308                self.kv_size.add(e.key(), &op);
309                e.insert(op);
310                Ok(())
311            }
312            Entry::Occupied(mut e) => {
313                let old_op = e.get_mut();
314                self.kv_size.sub_val(old_op);
315
316                match old_op {
317                    KeyOp::Insert(old_op_new_value) => {
318                        if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
319                            return Err(Box::new(MemTableError::InconsistentOperation {
320                                table_id: self.table_id,
321                                key: e.key().clone(),
322                                prev: e.get().clone(),
323                                new: KeyOp::Update((old_value, new_value)),
324                            }));
325                        }
326
327                        let new_op = KeyOp::Insert(new_value);
328                        self.kv_size.add_val(&new_op);
329                        e.insert(new_op);
330                        Ok(())
331                    }
332                    KeyOp::Update((old_op_old_value, old_op_new_value)) => {
333                        if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
334                            return Err(Box::new(MemTableError::InconsistentOperation {
335                                table_id: self.table_id,
336                                key: e.key().clone(),
337                                prev: e.get().clone(),
338                                new: KeyOp::Update((old_value, new_value)),
339                            }));
340                        }
341
342                        let new_op = KeyOp::Update((std::mem::take(old_op_old_value), new_value));
343                        self.kv_size.add_val(&new_op);
344                        e.insert(new_op);
345                        Ok(())
346                    }
347                    KeyOp::Delete(_) => {
348                        let new_op = KeyOp::Update((old_value, new_value));
349                        let err = MemTableError::InconsistentOperation {
350                            table_id: self.table_id,
351                            key: e.key().clone(),
352                            prev: e.get().clone(),
353                            new: new_op.clone(),
354                        };
355
356                        if sanity_check_enabled() {
357                            Err(err.into())
358                        } else {
359                            tracing::error!(
360                                error = %err.as_report(),
361                                "update on deleted, ignoring because sanity check is disabled"
362                            );
363                            self.kv_size.add_val(&new_op);
364                            e.insert(new_op);
365                            Ok(())
366                        }
367                    }
368                }
369            }
370        }
371    }
372
373    pub fn into_parts(self) -> BTreeMap<TableKey<Bytes>, KeyOp> {
374        self.buffer
375    }
376
377    pub fn iter<'a, R>(
378        &'a self,
379        key_range: R,
380    ) -> impl Iterator<Item = (&'a TableKey<Bytes>, &'a KeyOp)>
381    where
382        R: RangeBounds<TableKey<Bytes>> + 'a,
383    {
384        self.buffer.range(key_range)
385    }
386
387    pub fn rev_iter<'a, R>(
388        &'a self,
389        key_range: R,
390    ) -> impl Iterator<Item = (&'a TableKey<Bytes>, &'a KeyOp)>
391    where
392        R: RangeBounds<TableKey<Bytes>> + 'a,
393    {
394        self.buffer.range(key_range).rev()
395    }
396
397    fn sub_old_op_size(&mut self, old_op: Option<KeyOp>, key_len: usize) {
398        if let Some(op) = old_op {
399            self.kv_size.sub_val(&op);
400            self.kv_size.sub_size(key_len);
401        }
402    }
403}
404
405impl KeyOp {
406    /// Print as debug string with decoded data.
407    ///
408    /// # Panics
409    ///
410    /// The function will panic if it failed to decode the bytes with provided data types.
411    pub fn debug_fmt(&self, row_deserializer: &impl ValueRowSerde) -> String {
412        match self {
413            Self::Insert(after) => {
414                let after = row_deserializer.deserialize(after.as_ref());
415                format!("Insert({:?})", &after)
416            }
417            Self::Delete(before) => {
418                let before = row_deserializer.deserialize(before.as_ref());
419                format!("Delete({:?})", &before)
420            }
421            Self::Update((before, after)) => {
422                let after = row_deserializer.deserialize(after.as_ref());
423                let before = row_deserializer.deserialize(before.as_ref());
424                format!("Update({:?}, {:?})", &before, &after)
425            }
426        }
427    }
428}
429
430#[cfg(test)]
431mod tests {
432    use bytes::{BufMut, Bytes, BytesMut};
433    use itertools::Itertools;
434    use rand::seq::SliceRandom;
435    use rand::{Rng, rng as thread_rng};
436    use risingwave_common::catalog::TableId;
437    use risingwave_common::hash::VirtualNode;
438    use risingwave_common::util::epoch::{EpochExt, test_epoch};
439    use risingwave_hummock_sdk::EpochWithGap;
440    use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
441
442    use crate::hummock::iterator::HummockIterator;
443    use crate::hummock::value::HummockValue;
444    use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator, MemTableHummockRevIterator};
445    use crate::store::{CHECK_BYTES_EQUAL, OpConsistencyLevel};
446
447    #[tokio::test]
448    async fn test_mem_table_memory_size() {
449        let mut mem_table = MemTable::new(
450            233.into(),
451            OpConsistencyLevel::ConsistentOldValue {
452                check_old_value: CHECK_BYTES_EQUAL.clone(),
453                is_log_store: false,
454            },
455        );
456        assert_eq!(mem_table.kv_size.size(), 0);
457
458        mem_table
459            .insert(TableKey("key1".into()), "value1".into())
460            .unwrap();
461        assert_eq!(
462            mem_table.kv_size.size(),
463            std::mem::size_of::<Bytes>()
464                + Bytes::from("key1").len()
465                + std::mem::size_of::<KeyOp>()
466                + Bytes::from("value1").len()
467        );
468
469        // delete
470        mem_table.drain();
471        assert_eq!(mem_table.kv_size.size(), 0);
472        mem_table
473            .delete(TableKey("key2".into()), "value2".into())
474            .unwrap();
475        assert_eq!(
476            mem_table.kv_size.size(),
477            std::mem::size_of::<Bytes>()
478                + Bytes::from("key2").len()
479                + std::mem::size_of::<KeyOp>()
480                + Bytes::from("value2").len()
481        );
482        mem_table
483            .insert(TableKey("key2".into()), "value22".into())
484            .unwrap();
485        assert_eq!(
486            mem_table.kv_size.size(),
487            std::mem::size_of::<Bytes>()
488                + Bytes::from("key2").len()
489                + std::mem::size_of::<KeyOp>()
490                + Bytes::from("value22").len()
491                + Bytes::from("value2").len()
492        );
493
494        mem_table
495            .delete(TableKey("key2".into()), "value22".into())
496            .unwrap();
497
498        assert_eq!(
499            mem_table.kv_size.size(),
500            std::mem::size_of::<Bytes>()
501                + Bytes::from("key2").len()
502                + std::mem::size_of::<KeyOp>()
503                + Bytes::from("value2").len()
504        );
505
506        // update
507        mem_table.drain();
508        assert_eq!(mem_table.kv_size.size(), 0);
509        mem_table
510            .insert(TableKey("key3".into()), "value3".into())
511            .unwrap();
512        assert_eq!(
513            mem_table.kv_size.size(),
514            std::mem::size_of::<Bytes>()
515                + Bytes::from("key3").len()
516                + std::mem::size_of::<KeyOp>()
517                + Bytes::from("value3").len()
518        );
519
520        // update-> insert
521        mem_table
522            .update(TableKey("key3".into()), "value3".into(), "value333".into())
523            .unwrap();
524        assert_eq!(
525            mem_table.kv_size.size(),
526            std::mem::size_of::<Bytes>()
527                + Bytes::from("key3").len()
528                + std::mem::size_of::<KeyOp>()
529                + Bytes::from("value333").len()
530        );
531
532        mem_table.drain();
533        mem_table
534            .update(TableKey("key4".into()), "value4".into(), "value44".into())
535            .unwrap();
536
537        assert_eq!(
538            mem_table.kv_size.size(),
539            std::mem::size_of::<Bytes>()
540                + Bytes::from("key4").len()
541                + std::mem::size_of::<KeyOp>()
542                + Bytes::from("value4").len()
543                + Bytes::from("value44").len()
544        );
545        mem_table
546            .update(
547                TableKey("key4".into()),
548                "value44".into(),
549                "value4444".into(),
550            )
551            .unwrap();
552
553        assert_eq!(
554            mem_table.kv_size.size(),
555            std::mem::size_of::<Bytes>()
556                + Bytes::from("key4").len()
557                + std::mem::size_of::<KeyOp>()
558                + Bytes::from("value4").len()
559                + Bytes::from("value4444").len()
560        );
561    }
562
563    #[tokio::test]
564    async fn test_mem_table_memory_size_not_consistent_op() {
565        let mut mem_table = MemTable::new(233.into(), OpConsistencyLevel::Inconsistent);
566        assert_eq!(mem_table.kv_size.size(), 0);
567
568        mem_table
569            .insert(TableKey("key1".into()), "value1".into())
570            .unwrap();
571
572        assert_eq!(
573            mem_table.kv_size.size(),
574            std::mem::size_of::<Bytes>()
575                + Bytes::from("key1").len()
576                + std::mem::size_of::<KeyOp>()
577                + Bytes::from("value1").len()
578        );
579
580        mem_table
581            .insert(TableKey("key1".into()), "value111".into())
582            .unwrap();
583        assert_eq!(
584            mem_table.kv_size.size(),
585            std::mem::size_of::<Bytes>()
586                + Bytes::from("key1").len()
587                + std::mem::size_of::<KeyOp>()
588                + Bytes::from("value111").len()
589        );
590        mem_table.drain();
591
592        mem_table
593            .update(TableKey("key4".into()), "value4".into(), "value44".into())
594            .unwrap();
595
596        assert_eq!(
597            mem_table.kv_size.size(),
598            std::mem::size_of::<Bytes>()
599                + Bytes::from("key4").len()
600                + std::mem::size_of::<KeyOp>()
601                + Bytes::from("value4").len()
602                + Bytes::from("value44").len()
603        );
604        mem_table
605            .update(
606                TableKey("key4".into()),
607                "value44".into(),
608                "value4444".into(),
609            )
610            .unwrap();
611
612        assert_eq!(
613            mem_table.kv_size.size(),
614            std::mem::size_of::<Bytes>()
615                + Bytes::from("key4").len()
616                + std::mem::size_of::<KeyOp>()
617                + Bytes::from("value44").len()
618                + Bytes::from("value4444").len()
619        );
620    }
621
622    #[tokio::test]
623    async fn test_mem_table_hummock_iterator() {
624        let mut rng = thread_rng();
625
626        fn get_key(i: usize) -> TableKey<Bytes> {
627            let mut bytes = BytesMut::new();
628            bytes.put(&VirtualNode::ZERO.to_be_bytes()[..]);
629            bytes.put(format!("key_{:20}", i).as_bytes());
630            TableKey(bytes.freeze())
631        }
632
633        let mut ordered_test_data = (0..10000)
634            .map(|i| {
635                let key_op = match rng.random_range(0..=2) {
636                    0 => KeyOp::Insert(Bytes::from("insert")),
637                    1 => KeyOp::Delete(Bytes::from("delete")),
638                    2 => KeyOp::Update((Bytes::from("old_value"), Bytes::from("new_value"))),
639                    _ => unreachable!(),
640                };
641                (get_key(i), key_op)
642            })
643            .collect_vec();
644
645        let mut test_data = ordered_test_data.clone();
646
647        test_data.shuffle(&mut rng);
648        let mut mem_table = MemTable::new(
649            233.into(),
650            OpConsistencyLevel::ConsistentOldValue {
651                check_old_value: CHECK_BYTES_EQUAL.clone(),
652                is_log_store: false,
653            },
654        );
655        for (key, op) in test_data {
656            match op {
657                KeyOp::Insert(value) => {
658                    mem_table.insert(key, value).unwrap();
659                }
660                KeyOp::Delete(value) => mem_table.delete(key, value).unwrap(),
661                KeyOp::Update((old_value, new_value)) => {
662                    mem_table.update(key, old_value, new_value).unwrap();
663                }
664            }
665        }
666
667        const TEST_TABLE_ID: TableId = TableId::new(233);
668        const TEST_EPOCH: u64 = test_epoch(10);
669
670        async fn check_data<I: HummockIterator>(
671            iter: &mut I,
672            test_data: &[(TableKey<Bytes>, KeyOp)],
673        ) {
674            let mut idx = 0;
675            while iter.is_valid() {
676                let key = iter.key();
677                let value = iter.value();
678
679                let (expected_key, expected_value) = test_data[idx].clone();
680                assert_eq!(key.epoch_with_gap, EpochWithGap::new_from_epoch(TEST_EPOCH));
681                assert_eq!(key.user_key.table_id, TEST_TABLE_ID);
682                assert_eq!(
683                    key.user_key.table_key.0,
684                    expected_key.0.as_ref(),
685                    "failed at {}, {:?} != {:?}",
686                    idx,
687                    String::from_utf8(key.user_key.table_key.key_part().to_vec()).unwrap(),
688                    String::from_utf8(expected_key.key_part().to_vec()).unwrap(),
689                );
690                match expected_value {
691                    KeyOp::Insert(expected_value) | KeyOp::Update((_, expected_value)) => {
692                        assert_eq!(value, HummockValue::Put(expected_value.as_ref()));
693                    }
694                    KeyOp::Delete(_) => {
695                        assert_eq!(value, HummockValue::Delete);
696                    }
697                }
698
699                idx += 1;
700                iter.next().await.unwrap();
701            }
702            assert_eq!(idx, test_data.len());
703        }
704
705        let mut iter = MemTableHummockIterator::new(
706            &mem_table.buffer,
707            EpochWithGap::new_from_epoch(TEST_EPOCH),
708            TEST_TABLE_ID,
709        );
710
711        // Test rewind
712        iter.rewind().await.unwrap();
713        check_data(&mut iter, &ordered_test_data).await;
714
715        // Test seek with a later epoch, the first key is not skipped
716        let later_epoch = EpochWithGap::new_from_epoch(TEST_EPOCH.next_epoch());
717        let seek_idx = 500;
718        iter.seek(FullKey {
719            user_key: UserKey {
720                table_id: TEST_TABLE_ID,
721                table_key: TableKey(&get_key(seek_idx)),
722            },
723            epoch_with_gap: later_epoch,
724        })
725        .await
726        .unwrap();
727        check_data(&mut iter, &ordered_test_data[seek_idx..]).await;
728
729        // Test seek with a earlier epoch, the first key is skipped
730        let early_epoch = EpochWithGap::new_from_epoch(TEST_EPOCH.prev_epoch());
731        let seek_idx = 500;
732        iter.seek(FullKey {
733            user_key: UserKey {
734                table_id: TEST_TABLE_ID,
735                table_key: TableKey(&get_key(seek_idx)),
736            },
737            epoch_with_gap: early_epoch,
738        })
739        .await
740        .unwrap();
741        check_data(&mut iter, &ordered_test_data[seek_idx + 1..]).await;
742
743        // Test seek to over the end
744        iter.seek(FullKey {
745            user_key: UserKey {
746                table_id: TEST_TABLE_ID,
747                table_key: TableKey(&get_key(ordered_test_data.len() + 10)),
748            },
749            epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
750        })
751        .await
752        .unwrap();
753        check_data(&mut iter, &[]).await;
754
755        // Test seek with a smaller table id
756        let smaller_table_id = TableId::new(TEST_TABLE_ID.table_id() - 1);
757        iter.seek(FullKey {
758            user_key: UserKey {
759                table_id: smaller_table_id,
760                table_key: TableKey(&get_key(ordered_test_data.len() + 10)),
761            },
762            epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
763        })
764        .await
765        .unwrap();
766        check_data(&mut iter, &ordered_test_data).await;
767
768        // Test seek with a greater table id
769        let greater_table_id = TableId::new(TEST_TABLE_ID.table_id() + 1);
770        iter.seek(FullKey {
771            user_key: UserKey {
772                table_id: greater_table_id,
773                table_key: TableKey(&get_key(0)),
774            },
775            epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
776        })
777        .await
778        .unwrap();
779        check_data(&mut iter, &[]).await;
780
781        // check reverse iterator
782        ordered_test_data.reverse();
783        drop(iter);
784        let mut iter = MemTableHummockRevIterator::new(
785            &mem_table.buffer,
786            EpochWithGap::new_from_epoch(TEST_EPOCH),
787            TEST_TABLE_ID,
788        );
789
790        // Test rewind
791        iter.rewind().await.unwrap();
792        check_data(&mut iter, &ordered_test_data).await;
793
794        // Test seek with a smaller table id
795        let smaller_table_id = TableId::new(TEST_TABLE_ID.table_id() - 1);
796        iter.seek(FullKey {
797            user_key: UserKey {
798                table_id: smaller_table_id,
799                table_key: TableKey(&get_key(ordered_test_data.len() + 10)),
800            },
801            epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
802        })
803        .await
804        .unwrap();
805        check_data(&mut iter, &[]).await;
806
807        // Test seek with a greater table id
808        let greater_table_id = TableId::new(TEST_TABLE_ID.table_id() + 1);
809        iter.seek(FullKey {
810            user_key: UserKey {
811                table_id: greater_table_id,
812                table_key: TableKey(&get_key(0)),
813            },
814            epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
815        })
816        .await
817        .unwrap();
818        check_data(&mut iter, &ordered_test_data).await;
819
820        // Test seek with a later epoch, the first key is skipped
821        let later_epoch = EpochWithGap::new_from_epoch(TEST_EPOCH.next_epoch());
822        let seek_idx = 500;
823        iter.seek(FullKey {
824            user_key: UserKey {
825                table_id: TEST_TABLE_ID,
826                table_key: TableKey(&get_key(seek_idx)),
827            },
828            epoch_with_gap: later_epoch,
829        })
830        .await
831        .unwrap();
832        let rev_seek_idx = ordered_test_data.len() - seek_idx - 1;
833        check_data(&mut iter, &ordered_test_data[rev_seek_idx + 1..]).await;
834
835        // Test seek with a earlier epoch, the first key is not skipped
836        let early_epoch = EpochWithGap::new_from_epoch(TEST_EPOCH.prev_epoch());
837        let seek_idx = 500;
838        iter.seek(FullKey {
839            user_key: UserKey {
840                table_id: TEST_TABLE_ID,
841                table_key: TableKey(&get_key(seek_idx)),
842            },
843            epoch_with_gap: early_epoch,
844        })
845        .await
846        .unwrap();
847        let rev_seek_idx = ordered_test_data.len() - seek_idx - 1;
848        check_data(&mut iter, &ordered_test_data[rev_seek_idx..]).await;
849
850        drop(iter);
851        mem_table.insert(get_key(10001), "value1".into()).unwrap();
852
853        let mut iter = MemTableHummockRevIterator::new(
854            &mem_table.buffer,
855            EpochWithGap::new_from_epoch(TEST_EPOCH),
856            TEST_TABLE_ID,
857        );
858        iter.seek(FullKey {
859            user_key: UserKey {
860                table_id: TEST_TABLE_ID,
861                table_key: TableKey(&get_key(10000)),
862            },
863            epoch_with_gap: early_epoch,
864        })
865        .await
866        .unwrap();
867        assert_eq!(iter.key().user_key.table_key, get_key(9999).to_ref());
868
869        let mut iter = MemTableHummockIterator::new(
870            &mem_table.buffer,
871            EpochWithGap::new_from_epoch(TEST_EPOCH),
872            TEST_TABLE_ID,
873        );
874        iter.seek(FullKey {
875            user_key: UserKey {
876                table_id: TEST_TABLE_ID,
877                table_key: TableKey(&get_key(10000)),
878            },
879            epoch_with_gap: later_epoch,
880        })
881        .await
882        .unwrap();
883        assert_eq!(iter.key().user_key.table_key, get_key(10001).to_ref());
884    }
885}