risingwave_storage/
mem_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::collections::btree_map::Entry;
17use std::ops::Bound::{Included, Unbounded};
18use std::ops::RangeBounds;
19
20use bytes::Bytes;
21use risingwave_common::catalog::TableId;
22use risingwave_common_estimate_size::{EstimateSize, KvSize};
23use risingwave_hummock_sdk::key::TableKey;
24use thiserror::Error;
25use thiserror_ext::AsReport;
26
27use crate::hummock::iterator::{Backward, Forward, FromRustIterator, RustIteratorBuilder};
28use crate::hummock::shared_buffer::shared_buffer_batch::{SharedBufferBatch, SharedBufferBatchId};
29use crate::hummock::utils::sanity_check_enabled;
30use crate::hummock::value::HummockValue;
31use crate::row_serde::value_serde::ValueRowSerde;
32use crate::store::*;
33pub type ImmutableMemtable = SharedBufferBatch;
34
35pub type ImmId = SharedBufferBatchId;
36
37#[derive(Clone, Debug, EstimateSize)]
38pub enum KeyOp {
39    Insert(Bytes),
40    Delete(Bytes),
41    /// (`old_value`, `new_value`)
42    Update((Bytes, Bytes)),
43}
44
45/// `MemTable` is a buffer for modify operations without encoding
46#[derive(Clone)]
47pub struct MemTable {
48    table_id: TableId,
49    pub(crate) buffer: MemTableStore,
50    pub(crate) op_consistency_level: OpConsistencyLevel,
51    pub(crate) kv_size: KvSize,
52}
53
54#[derive(Error, Debug)]
55pub enum MemTableError {
56    #[error("Inconsistent operation on table {table_id} {key:?}, prev: {prev:?}, new: {new:?}")]
57    InconsistentOperation {
58        table_id: TableId,
59        key: TableKey<Bytes>,
60        prev: KeyOp,
61        new: KeyOp,
62    },
63}
64
65type Result<T> = std::result::Result<T, Box<MemTableError>>;
66
67pub type MemTableStore = BTreeMap<TableKey<Bytes>, KeyOp>;
68pub struct MemTableIteratorBuilder;
69pub struct MemTableRevIteratorBuilder;
70
71fn map_to_hummock_value<'a>(
72    (key, op): (&'a TableKey<Bytes>, &'a KeyOp),
73) -> (TableKey<&'a [u8]>, HummockValue<&'a [u8]>) {
74    (
75        TableKey(key.0.as_ref()),
76        match op {
77            KeyOp::Insert(value) | KeyOp::Update((_, value)) => HummockValue::Put(value),
78            KeyOp::Delete(_) => HummockValue::Delete,
79        },
80    )
81}
82
83impl RustIteratorBuilder for MemTableRevIteratorBuilder {
84    type Direction = Backward;
85    type Iterable = MemTableStore;
86
87    type RewindIter<'a> =
88        impl Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)> + Send + 'a;
89    type SeekIter<'a> =
90        impl Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)> + Send + 'a;
91
92    fn seek<'a>(iterable: &'a Self::Iterable, seek_key: TableKey<&[u8]>) -> Self::SeekIter<'a> {
93        iterable
94            .range::<[u8], _>((Unbounded, Included(seek_key.0)))
95            .rev()
96            .map(map_to_hummock_value)
97    }
98
99    fn rewind(iterable: &Self::Iterable) -> Self::RewindIter<'_> {
100        iterable.iter().rev().map(map_to_hummock_value)
101    }
102}
103
104impl RustIteratorBuilder for MemTableIteratorBuilder {
105    type Direction = Forward;
106    type Iterable = MemTableStore;
107
108    type RewindIter<'a> =
109        impl Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)> + Send + 'a;
110    type SeekIter<'a> =
111        impl Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)> + Send + 'a;
112
113    fn seek<'a>(iterable: &'a Self::Iterable, seek_key: TableKey<&[u8]>) -> Self::SeekIter<'a> {
114        iterable
115            .range::<[u8], _>((Included(seek_key.0), Unbounded))
116            .map(map_to_hummock_value)
117    }
118
119    fn rewind(iterable: &Self::Iterable) -> Self::RewindIter<'_> {
120        iterable.iter().map(map_to_hummock_value)
121    }
122}
123
124pub type MemTableHummockIterator<'a> = FromRustIterator<'a, MemTableIteratorBuilder>;
125pub type MemTableHummockRevIterator<'a> = FromRustIterator<'a, MemTableRevIteratorBuilder>;
126
127impl MemTable {
128    pub fn new(table_id: TableId, op_consistency_level: OpConsistencyLevel) -> Self {
129        Self {
130            table_id,
131            buffer: BTreeMap::new(),
132            op_consistency_level,
133            kv_size: KvSize::new(),
134        }
135    }
136
137    pub fn drain(&mut self) -> Self {
138        self.kv_size.set(0);
139        std::mem::replace(
140            self,
141            Self::new(self.table_id, self.op_consistency_level.clone()),
142        )
143    }
144
145    pub fn is_dirty(&self) -> bool {
146        !self.buffer.is_empty()
147    }
148
149    /// write methods
150    pub fn insert(&mut self, pk: TableKey<Bytes>, value: Bytes) -> Result<()> {
151        if let OpConsistencyLevel::Inconsistent = &self.op_consistency_level {
152            let key_len = std::mem::size_of::<Bytes>() + pk.len();
153            let op = KeyOp::Insert(value);
154            self.kv_size.add(&pk, &op);
155            let old_op = self.buffer.insert(pk, op);
156            self.sub_old_op_size(old_op, key_len);
157            return Ok(());
158        };
159        let entry = self.buffer.entry(pk);
160        match entry {
161            Entry::Vacant(e) => {
162                let op = KeyOp::Insert(value);
163                self.kv_size.add(e.key(), &op);
164                e.insert(op);
165                Ok(())
166            }
167            Entry::Occupied(mut e) => {
168                let old_op = e.get_mut();
169                self.kv_size.sub_val(old_op);
170
171                match old_op {
172                    KeyOp::Delete(old_op_old_value) => {
173                        let new_op = KeyOp::Update((std::mem::take(old_op_old_value), value));
174                        self.kv_size.add_val(&new_op);
175                        e.insert(new_op);
176                        Ok(())
177                    }
178                    KeyOp::Insert(_) | KeyOp::Update(_) => {
179                        let new_op = KeyOp::Insert(value);
180                        let err = MemTableError::InconsistentOperation {
181                            table_id: self.table_id,
182                            key: e.key().clone(),
183                            prev: e.get().clone(),
184                            new: new_op.clone(),
185                        };
186
187                        if sanity_check_enabled() {
188                            Err(err.into())
189                        } else {
190                            tracing::error!(
191                                error = %err.as_report(),
192                                "double insert / insert on updated, ignoring because sanity check is disabled"
193                            );
194                            self.kv_size.add_val(&new_op);
195                            e.insert(new_op);
196                            Ok(())
197                        }
198                    }
199                }
200            }
201        }
202    }
203
204    pub fn delete(&mut self, pk: TableKey<Bytes>, old_value: Bytes) -> Result<()> {
205        let key_len = std::mem::size_of::<Bytes>() + pk.len();
206        let OpConsistencyLevel::ConsistentOldValue {
207            check_old_value: value_checker,
208            ..
209        } = &self.op_consistency_level
210        else {
211            let op = KeyOp::Delete(old_value);
212            self.kv_size.add(&pk, &op);
213            let old_op = self.buffer.insert(pk, op);
214            self.sub_old_op_size(old_op, key_len);
215            return Ok(());
216        };
217        let entry = self.buffer.entry(pk);
218        match entry {
219            Entry::Vacant(e) => {
220                let op = KeyOp::Delete(old_value);
221                self.kv_size.add(e.key(), &op);
222                e.insert(op);
223                Ok(())
224            }
225            Entry::Occupied(mut e) => {
226                let old_op = e.get_mut();
227                self.kv_size.sub_val(old_op);
228
229                match old_op {
230                    KeyOp::Insert(old_op_new_value) => {
231                        if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
232                            return Err(Box::new(MemTableError::InconsistentOperation {
233                                table_id: self.table_id,
234                                key: e.key().clone(),
235                                prev: e.get().clone(),
236                                new: KeyOp::Delete(old_value),
237                            }));
238                        }
239
240                        self.kv_size.sub_size(key_len);
241                        e.remove();
242                        Ok(())
243                    }
244                    KeyOp::Delete(_) => {
245                        let new_op = KeyOp::Delete(old_value);
246                        let err = MemTableError::InconsistentOperation {
247                            table_id: self.table_id,
248                            key: e.key().clone(),
249                            prev: e.get().clone(),
250                            new: new_op.clone(),
251                        };
252
253                        if sanity_check_enabled() {
254                            Err(err.into())
255                        } else {
256                            tracing::error!(
257                                error = %err.as_report(),
258                                "double delete, ignoring because sanity check is disabled"
259                            );
260                            self.kv_size.add_val(&new_op);
261                            e.insert(new_op);
262                            Ok(())
263                        }
264                    }
265                    KeyOp::Update((old_op_old_value, old_op_new_value)) => {
266                        if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
267                            return Err(Box::new(MemTableError::InconsistentOperation {
268                                table_id: self.table_id,
269                                key: e.key().clone(),
270                                prev: e.get().clone(),
271                                new: KeyOp::Delete(old_value),
272                            }));
273                        }
274
275                        let new_op = KeyOp::Delete(std::mem::take(old_op_old_value));
276                        self.kv_size.add_val(&new_op);
277                        e.insert(new_op);
278                        Ok(())
279                    }
280                }
281            }
282        }
283    }
284
285    pub fn update(
286        &mut self,
287        pk: TableKey<Bytes>,
288        old_value: Bytes,
289        new_value: Bytes,
290    ) -> Result<()> {
291        let OpConsistencyLevel::ConsistentOldValue {
292            check_old_value: value_checker,
293            ..
294        } = &self.op_consistency_level
295        else {
296            let key_len = std::mem::size_of::<Bytes>() + pk.len();
297            let op = KeyOp::Update((old_value, new_value));
298            self.kv_size.add(&pk, &op);
299            let old_op = self.buffer.insert(pk, op);
300            self.sub_old_op_size(old_op, key_len);
301            return Ok(());
302        };
303        let entry = self.buffer.entry(pk);
304        match entry {
305            Entry::Vacant(e) => {
306                let op = KeyOp::Update((old_value, new_value));
307                self.kv_size.add(e.key(), &op);
308                e.insert(op);
309                Ok(())
310            }
311            Entry::Occupied(mut e) => {
312                let old_op = e.get_mut();
313                self.kv_size.sub_val(old_op);
314
315                match old_op {
316                    KeyOp::Insert(old_op_new_value) => {
317                        if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
318                            return Err(Box::new(MemTableError::InconsistentOperation {
319                                table_id: self.table_id,
320                                key: e.key().clone(),
321                                prev: e.get().clone(),
322                                new: KeyOp::Update((old_value, new_value)),
323                            }));
324                        }
325
326                        let new_op = KeyOp::Insert(new_value);
327                        self.kv_size.add_val(&new_op);
328                        e.insert(new_op);
329                        Ok(())
330                    }
331                    KeyOp::Update((old_op_old_value, old_op_new_value)) => {
332                        if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
333                            return Err(Box::new(MemTableError::InconsistentOperation {
334                                table_id: self.table_id,
335                                key: e.key().clone(),
336                                prev: e.get().clone(),
337                                new: KeyOp::Update((old_value, new_value)),
338                            }));
339                        }
340
341                        let new_op = KeyOp::Update((std::mem::take(old_op_old_value), new_value));
342                        self.kv_size.add_val(&new_op);
343                        e.insert(new_op);
344                        Ok(())
345                    }
346                    KeyOp::Delete(_) => {
347                        let new_op = KeyOp::Update((old_value, new_value));
348                        let err = MemTableError::InconsistentOperation {
349                            table_id: self.table_id,
350                            key: e.key().clone(),
351                            prev: e.get().clone(),
352                            new: new_op.clone(),
353                        };
354
355                        if sanity_check_enabled() {
356                            Err(err.into())
357                        } else {
358                            tracing::error!(
359                                error = %err.as_report(),
360                                "update on deleted, ignoring because sanity check is disabled"
361                            );
362                            self.kv_size.add_val(&new_op);
363                            e.insert(new_op);
364                            Ok(())
365                        }
366                    }
367                }
368            }
369        }
370    }
371
372    pub fn into_parts(self) -> BTreeMap<TableKey<Bytes>, KeyOp> {
373        self.buffer
374    }
375
376    pub fn iter<'a, R>(
377        &'a self,
378        key_range: R,
379    ) -> impl Iterator<Item = (&'a TableKey<Bytes>, &'a KeyOp)>
380    where
381        R: RangeBounds<TableKey<Bytes>> + 'a,
382    {
383        self.buffer.range(key_range)
384    }
385
386    pub fn rev_iter<'a, R>(
387        &'a self,
388        key_range: R,
389    ) -> impl Iterator<Item = (&'a TableKey<Bytes>, &'a KeyOp)>
390    where
391        R: RangeBounds<TableKey<Bytes>> + 'a,
392    {
393        self.buffer.range(key_range).rev()
394    }
395
396    fn sub_old_op_size(&mut self, old_op: Option<KeyOp>, key_len: usize) {
397        if let Some(op) = old_op {
398            self.kv_size.sub_val(&op);
399            self.kv_size.sub_size(key_len);
400        }
401    }
402}
403
404impl KeyOp {
405    /// Print as debug string with decoded data.
406    ///
407    /// # Panics
408    ///
409    /// The function will panic if it failed to decode the bytes with provided data types.
410    pub fn debug_fmt(&self, row_deserializer: &impl ValueRowSerde) -> String {
411        match self {
412            Self::Insert(after) => {
413                let after = row_deserializer.deserialize(after.as_ref());
414                format!("Insert({:?})", &after)
415            }
416            Self::Delete(before) => {
417                let before = row_deserializer.deserialize(before.as_ref());
418                format!("Delete({:?})", &before)
419            }
420            Self::Update((before, after)) => {
421                let after = row_deserializer.deserialize(after.as_ref());
422                let before = row_deserializer.deserialize(before.as_ref());
423                format!("Update({:?}, {:?})", &before, &after)
424            }
425        }
426    }
427}
428
429#[cfg(test)]
430mod tests {
431    use bytes::{BufMut, Bytes, BytesMut};
432    use itertools::Itertools;
433    use rand::seq::SliceRandom;
434    use rand::{Rng, rng as thread_rng};
435    use risingwave_common::catalog::TableId;
436    use risingwave_common::hash::VirtualNode;
437    use risingwave_common::util::epoch::{EpochExt, test_epoch};
438    use risingwave_hummock_sdk::EpochWithGap;
439    use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
440
441    use crate::hummock::iterator::HummockIterator;
442    use crate::hummock::value::HummockValue;
443    use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator, MemTableHummockRevIterator};
444    use crate::store::{CHECK_BYTES_EQUAL, OpConsistencyLevel};
445
446    #[tokio::test]
447    async fn test_mem_table_memory_size() {
448        let mut mem_table = MemTable::new(
449            233.into(),
450            OpConsistencyLevel::ConsistentOldValue {
451                check_old_value: CHECK_BYTES_EQUAL.clone(),
452                is_log_store: false,
453            },
454        );
455        assert_eq!(mem_table.kv_size.size(), 0);
456
457        mem_table
458            .insert(TableKey("key1".into()), "value1".into())
459            .unwrap();
460        assert_eq!(
461            mem_table.kv_size.size(),
462            std::mem::size_of::<Bytes>()
463                + Bytes::from("key1").len()
464                + std::mem::size_of::<KeyOp>()
465                + Bytes::from("value1").len()
466        );
467
468        // delete
469        mem_table.drain();
470        assert_eq!(mem_table.kv_size.size(), 0);
471        mem_table
472            .delete(TableKey("key2".into()), "value2".into())
473            .unwrap();
474        assert_eq!(
475            mem_table.kv_size.size(),
476            std::mem::size_of::<Bytes>()
477                + Bytes::from("key2").len()
478                + std::mem::size_of::<KeyOp>()
479                + Bytes::from("value2").len()
480        );
481        mem_table
482            .insert(TableKey("key2".into()), "value22".into())
483            .unwrap();
484        assert_eq!(
485            mem_table.kv_size.size(),
486            std::mem::size_of::<Bytes>()
487                + Bytes::from("key2").len()
488                + std::mem::size_of::<KeyOp>()
489                + Bytes::from("value22").len()
490                + Bytes::from("value2").len()
491        );
492
493        mem_table
494            .delete(TableKey("key2".into()), "value22".into())
495            .unwrap();
496
497        assert_eq!(
498            mem_table.kv_size.size(),
499            std::mem::size_of::<Bytes>()
500                + Bytes::from("key2").len()
501                + std::mem::size_of::<KeyOp>()
502                + Bytes::from("value2").len()
503        );
504
505        // update
506        mem_table.drain();
507        assert_eq!(mem_table.kv_size.size(), 0);
508        mem_table
509            .insert(TableKey("key3".into()), "value3".into())
510            .unwrap();
511        assert_eq!(
512            mem_table.kv_size.size(),
513            std::mem::size_of::<Bytes>()
514                + Bytes::from("key3").len()
515                + std::mem::size_of::<KeyOp>()
516                + Bytes::from("value3").len()
517        );
518
519        // update-> insert
520        mem_table
521            .update(TableKey("key3".into()), "value3".into(), "value333".into())
522            .unwrap();
523        assert_eq!(
524            mem_table.kv_size.size(),
525            std::mem::size_of::<Bytes>()
526                + Bytes::from("key3").len()
527                + std::mem::size_of::<KeyOp>()
528                + Bytes::from("value333").len()
529        );
530
531        mem_table.drain();
532        mem_table
533            .update(TableKey("key4".into()), "value4".into(), "value44".into())
534            .unwrap();
535
536        assert_eq!(
537            mem_table.kv_size.size(),
538            std::mem::size_of::<Bytes>()
539                + Bytes::from("key4").len()
540                + std::mem::size_of::<KeyOp>()
541                + Bytes::from("value4").len()
542                + Bytes::from("value44").len()
543        );
544        mem_table
545            .update(
546                TableKey("key4".into()),
547                "value44".into(),
548                "value4444".into(),
549            )
550            .unwrap();
551
552        assert_eq!(
553            mem_table.kv_size.size(),
554            std::mem::size_of::<Bytes>()
555                + Bytes::from("key4").len()
556                + std::mem::size_of::<KeyOp>()
557                + Bytes::from("value4").len()
558                + Bytes::from("value4444").len()
559        );
560    }
561
562    #[tokio::test]
563    async fn test_mem_table_memory_size_not_consistent_op() {
564        let mut mem_table = MemTable::new(233.into(), OpConsistencyLevel::Inconsistent);
565        assert_eq!(mem_table.kv_size.size(), 0);
566
567        mem_table
568            .insert(TableKey("key1".into()), "value1".into())
569            .unwrap();
570
571        assert_eq!(
572            mem_table.kv_size.size(),
573            std::mem::size_of::<Bytes>()
574                + Bytes::from("key1").len()
575                + std::mem::size_of::<KeyOp>()
576                + Bytes::from("value1").len()
577        );
578
579        mem_table
580            .insert(TableKey("key1".into()), "value111".into())
581            .unwrap();
582        assert_eq!(
583            mem_table.kv_size.size(),
584            std::mem::size_of::<Bytes>()
585                + Bytes::from("key1").len()
586                + std::mem::size_of::<KeyOp>()
587                + Bytes::from("value111").len()
588        );
589        mem_table.drain();
590
591        mem_table
592            .update(TableKey("key4".into()), "value4".into(), "value44".into())
593            .unwrap();
594
595        assert_eq!(
596            mem_table.kv_size.size(),
597            std::mem::size_of::<Bytes>()
598                + Bytes::from("key4").len()
599                + std::mem::size_of::<KeyOp>()
600                + Bytes::from("value4").len()
601                + Bytes::from("value44").len()
602        );
603        mem_table
604            .update(
605                TableKey("key4".into()),
606                "value44".into(),
607                "value4444".into(),
608            )
609            .unwrap();
610
611        assert_eq!(
612            mem_table.kv_size.size(),
613            std::mem::size_of::<Bytes>()
614                + Bytes::from("key4").len()
615                + std::mem::size_of::<KeyOp>()
616                + Bytes::from("value44").len()
617                + Bytes::from("value4444").len()
618        );
619    }
620
621    #[tokio::test]
622    async fn test_mem_table_hummock_iterator() {
623        let mut rng = thread_rng();
624
625        fn get_key(i: usize) -> TableKey<Bytes> {
626            let mut bytes = BytesMut::new();
627            bytes.put(&VirtualNode::ZERO.to_be_bytes()[..]);
628            bytes.put(format!("key_{:20}", i).as_bytes());
629            TableKey(bytes.freeze())
630        }
631
632        let mut ordered_test_data = (0..10000)
633            .map(|i| {
634                let key_op = match rng.random_range(0..=2) {
635                    0 => KeyOp::Insert(Bytes::from("insert")),
636                    1 => KeyOp::Delete(Bytes::from("delete")),
637                    2 => KeyOp::Update((Bytes::from("old_value"), Bytes::from("new_value"))),
638                    _ => unreachable!(),
639                };
640                (get_key(i), key_op)
641            })
642            .collect_vec();
643
644        let mut test_data = ordered_test_data.clone();
645
646        test_data.shuffle(&mut rng);
647        let mut mem_table = MemTable::new(
648            233.into(),
649            OpConsistencyLevel::ConsistentOldValue {
650                check_old_value: CHECK_BYTES_EQUAL.clone(),
651                is_log_store: false,
652            },
653        );
654        for (key, op) in test_data {
655            match op {
656                KeyOp::Insert(value) => {
657                    mem_table.insert(key, value).unwrap();
658                }
659                KeyOp::Delete(value) => mem_table.delete(key, value).unwrap(),
660                KeyOp::Update((old_value, new_value)) => {
661                    mem_table.update(key, old_value, new_value).unwrap();
662                }
663            }
664        }
665
666        const TEST_TABLE_ID: TableId = TableId::new(233);
667        const TEST_EPOCH: u64 = test_epoch(10);
668
669        async fn check_data<I: HummockIterator>(
670            iter: &mut I,
671            test_data: &[(TableKey<Bytes>, KeyOp)],
672        ) {
673            let mut idx = 0;
674            while iter.is_valid() {
675                let key = iter.key();
676                let value = iter.value();
677
678                let (expected_key, expected_value) = test_data[idx].clone();
679                assert_eq!(key.epoch_with_gap, EpochWithGap::new_from_epoch(TEST_EPOCH));
680                assert_eq!(key.user_key.table_id, TEST_TABLE_ID);
681                assert_eq!(
682                    key.user_key.table_key.0,
683                    expected_key.0.as_ref(),
684                    "failed at {}, {:?} != {:?}",
685                    idx,
686                    String::from_utf8(key.user_key.table_key.key_part().to_vec()).unwrap(),
687                    String::from_utf8(expected_key.key_part().to_vec()).unwrap(),
688                );
689                match expected_value {
690                    KeyOp::Insert(expected_value) | KeyOp::Update((_, expected_value)) => {
691                        assert_eq!(value, HummockValue::Put(expected_value.as_ref()));
692                    }
693                    KeyOp::Delete(_) => {
694                        assert_eq!(value, HummockValue::Delete);
695                    }
696                }
697
698                idx += 1;
699                iter.next().await.unwrap();
700            }
701            assert_eq!(idx, test_data.len());
702        }
703
704        let mut iter = MemTableHummockIterator::new(
705            &mem_table.buffer,
706            EpochWithGap::new_from_epoch(TEST_EPOCH),
707            TEST_TABLE_ID,
708        );
709
710        // Test rewind
711        iter.rewind().await.unwrap();
712        check_data(&mut iter, &ordered_test_data).await;
713
714        // Test seek with a later epoch, the first key is not skipped
715        let later_epoch = EpochWithGap::new_from_epoch(TEST_EPOCH.next_epoch());
716        let seek_idx = 500;
717        iter.seek(FullKey {
718            user_key: UserKey {
719                table_id: TEST_TABLE_ID,
720                table_key: TableKey(&get_key(seek_idx)),
721            },
722            epoch_with_gap: later_epoch,
723        })
724        .await
725        .unwrap();
726        check_data(&mut iter, &ordered_test_data[seek_idx..]).await;
727
728        // Test seek with a earlier epoch, the first key is skipped
729        let early_epoch = EpochWithGap::new_from_epoch(TEST_EPOCH.prev_epoch());
730        let seek_idx = 500;
731        iter.seek(FullKey {
732            user_key: UserKey {
733                table_id: TEST_TABLE_ID,
734                table_key: TableKey(&get_key(seek_idx)),
735            },
736            epoch_with_gap: early_epoch,
737        })
738        .await
739        .unwrap();
740        check_data(&mut iter, &ordered_test_data[seek_idx + 1..]).await;
741
742        // Test seek to over the end
743        iter.seek(FullKey {
744            user_key: UserKey {
745                table_id: TEST_TABLE_ID,
746                table_key: TableKey(&get_key(ordered_test_data.len() + 10)),
747            },
748            epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
749        })
750        .await
751        .unwrap();
752        check_data(&mut iter, &[]).await;
753
754        // Test seek with a smaller table id
755        let smaller_table_id = TableId::new(TEST_TABLE_ID.table_id() - 1);
756        iter.seek(FullKey {
757            user_key: UserKey {
758                table_id: smaller_table_id,
759                table_key: TableKey(&get_key(ordered_test_data.len() + 10)),
760            },
761            epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
762        })
763        .await
764        .unwrap();
765        check_data(&mut iter, &ordered_test_data).await;
766
767        // Test seek with a greater table id
768        let greater_table_id = TableId::new(TEST_TABLE_ID.table_id() + 1);
769        iter.seek(FullKey {
770            user_key: UserKey {
771                table_id: greater_table_id,
772                table_key: TableKey(&get_key(0)),
773            },
774            epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
775        })
776        .await
777        .unwrap();
778        check_data(&mut iter, &[]).await;
779
780        // check reverse iterator
781        ordered_test_data.reverse();
782        drop(iter);
783        let mut iter = MemTableHummockRevIterator::new(
784            &mem_table.buffer,
785            EpochWithGap::new_from_epoch(TEST_EPOCH),
786            TEST_TABLE_ID,
787        );
788
789        // Test rewind
790        iter.rewind().await.unwrap();
791        check_data(&mut iter, &ordered_test_data).await;
792
793        // Test seek with a smaller table id
794        let smaller_table_id = TableId::new(TEST_TABLE_ID.table_id() - 1);
795        iter.seek(FullKey {
796            user_key: UserKey {
797                table_id: smaller_table_id,
798                table_key: TableKey(&get_key(ordered_test_data.len() + 10)),
799            },
800            epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
801        })
802        .await
803        .unwrap();
804        check_data(&mut iter, &[]).await;
805
806        // Test seek with a greater table id
807        let greater_table_id = TableId::new(TEST_TABLE_ID.table_id() + 1);
808        iter.seek(FullKey {
809            user_key: UserKey {
810                table_id: greater_table_id,
811                table_key: TableKey(&get_key(0)),
812            },
813            epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
814        })
815        .await
816        .unwrap();
817        check_data(&mut iter, &ordered_test_data).await;
818
819        // Test seek with a later epoch, the first key is skipped
820        let later_epoch = EpochWithGap::new_from_epoch(TEST_EPOCH.next_epoch());
821        let seek_idx = 500;
822        iter.seek(FullKey {
823            user_key: UserKey {
824                table_id: TEST_TABLE_ID,
825                table_key: TableKey(&get_key(seek_idx)),
826            },
827            epoch_with_gap: later_epoch,
828        })
829        .await
830        .unwrap();
831        let rev_seek_idx = ordered_test_data.len() - seek_idx - 1;
832        check_data(&mut iter, &ordered_test_data[rev_seek_idx + 1..]).await;
833
834        // Test seek with a earlier epoch, the first key is not skipped
835        let early_epoch = EpochWithGap::new_from_epoch(TEST_EPOCH.prev_epoch());
836        let seek_idx = 500;
837        iter.seek(FullKey {
838            user_key: UserKey {
839                table_id: TEST_TABLE_ID,
840                table_key: TableKey(&get_key(seek_idx)),
841            },
842            epoch_with_gap: early_epoch,
843        })
844        .await
845        .unwrap();
846        let rev_seek_idx = ordered_test_data.len() - seek_idx - 1;
847        check_data(&mut iter, &ordered_test_data[rev_seek_idx..]).await;
848
849        drop(iter);
850        mem_table.insert(get_key(10001), "value1".into()).unwrap();
851
852        let mut iter = MemTableHummockRevIterator::new(
853            &mem_table.buffer,
854            EpochWithGap::new_from_epoch(TEST_EPOCH),
855            TEST_TABLE_ID,
856        );
857        iter.seek(FullKey {
858            user_key: UserKey {
859                table_id: TEST_TABLE_ID,
860                table_key: TableKey(&get_key(10000)),
861            },
862            epoch_with_gap: early_epoch,
863        })
864        .await
865        .unwrap();
866        assert_eq!(iter.key().user_key.table_key, get_key(9999).to_ref());
867
868        let mut iter = MemTableHummockIterator::new(
869            &mem_table.buffer,
870            EpochWithGap::new_from_epoch(TEST_EPOCH),
871            TEST_TABLE_ID,
872        );
873        iter.seek(FullKey {
874            user_key: UserKey {
875                table_id: TEST_TABLE_ID,
876                table_key: TableKey(&get_key(10000)),
877            },
878            epoch_with_gap: later_epoch,
879        })
880        .await
881        .unwrap();
882        assert_eq!(iter.key().user_key.table_key, get_key(10001).to_ref());
883    }
884}