risingwave_storage/
mem_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::collections::btree_map::Entry;
17use std::ops::Bound::{Included, Unbounded};
18use std::ops::RangeBounds;
19
20use bytes::Bytes;
21use risingwave_common_estimate_size::{EstimateSize, KvSize};
22use risingwave_hummock_sdk::key::TableKey;
23use thiserror::Error;
24use thiserror_ext::AsReport;
25use tracing::error;
26
27use crate::hummock::iterator::{Backward, Forward, FromRustIterator, RustIteratorBuilder};
28use crate::hummock::shared_buffer::shared_buffer_batch::{SharedBufferBatch, SharedBufferBatchId};
29use crate::hummock::utils::sanity_check_enabled;
30use crate::hummock::value::HummockValue;
31use crate::row_serde::value_serde::ValueRowSerde;
32use crate::store::*;
33pub type ImmutableMemtable = SharedBufferBatch;
34
35pub type ImmId = SharedBufferBatchId;
36
37#[derive(Clone, Debug, EstimateSize)]
38pub enum KeyOp {
39    Insert(Bytes),
40    Delete(Bytes),
41    /// (`old_value`, `new_value`)
42    Update((Bytes, Bytes)),
43}
44
45/// `MemTable` is a buffer for modify operations without encoding
46#[derive(Clone)]
47pub struct MemTable {
48    pub(crate) buffer: MemTableStore,
49    pub(crate) op_consistency_level: OpConsistencyLevel,
50    pub(crate) kv_size: KvSize,
51}
52
53#[derive(Error, Debug)]
54pub enum MemTableError {
55    #[error("Inconsistent operation {key:?}, prev: {prev:?}, new: {new:?}")]
56    InconsistentOperation {
57        key: TableKey<Bytes>,
58        prev: KeyOp,
59        new: KeyOp,
60    },
61}
62
63type Result<T> = std::result::Result<T, Box<MemTableError>>;
64
65pub type MemTableStore = BTreeMap<TableKey<Bytes>, KeyOp>;
66pub struct MemTableIteratorBuilder;
67pub struct MemTableRevIteratorBuilder;
68
69fn map_to_hummock_value<'a>(
70    (key, op): (&'a TableKey<Bytes>, &'a KeyOp),
71) -> (TableKey<&'a [u8]>, HummockValue<&'a [u8]>) {
72    (
73        TableKey(key.0.as_ref()),
74        match op {
75            KeyOp::Insert(value) | KeyOp::Update((_, value)) => HummockValue::Put(value),
76            KeyOp::Delete(_) => HummockValue::Delete,
77        },
78    )
79}
80
81impl RustIteratorBuilder for MemTableRevIteratorBuilder {
82    type Direction = Backward;
83    type Iterable = MemTableStore;
84
85    type RewindIter<'a> =
86        impl Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)> + Send + 'a;
87    type SeekIter<'a> =
88        impl Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)> + Send + 'a;
89
90    fn seek<'a>(iterable: &'a Self::Iterable, seek_key: TableKey<&[u8]>) -> Self::SeekIter<'a> {
91        iterable
92            .range::<[u8], _>((Unbounded, Included(seek_key.0)))
93            .rev()
94            .map(map_to_hummock_value)
95    }
96
97    fn rewind(iterable: &Self::Iterable) -> Self::RewindIter<'_> {
98        iterable.iter().rev().map(map_to_hummock_value)
99    }
100}
101
102impl RustIteratorBuilder for MemTableIteratorBuilder {
103    type Direction = Forward;
104    type Iterable = MemTableStore;
105
106    type RewindIter<'a> =
107        impl Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)> + Send + 'a;
108    type SeekIter<'a> =
109        impl Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)> + Send + 'a;
110
111    fn seek<'a>(iterable: &'a Self::Iterable, seek_key: TableKey<&[u8]>) -> Self::SeekIter<'a> {
112        iterable
113            .range::<[u8], _>((Included(seek_key.0), Unbounded))
114            .map(map_to_hummock_value)
115    }
116
117    fn rewind(iterable: &Self::Iterable) -> Self::RewindIter<'_> {
118        iterable.iter().map(map_to_hummock_value)
119    }
120}
121
122pub type MemTableHummockIterator<'a> = FromRustIterator<'a, MemTableIteratorBuilder>;
123pub type MemTableHummockRevIterator<'a> = FromRustIterator<'a, MemTableRevIteratorBuilder>;
124
125impl MemTable {
126    pub fn new(op_consistency_level: OpConsistencyLevel) -> Self {
127        Self {
128            buffer: BTreeMap::new(),
129            op_consistency_level,
130            kv_size: KvSize::new(),
131        }
132    }
133
134    pub fn drain(&mut self) -> Self {
135        self.kv_size.set(0);
136        std::mem::replace(self, Self::new(self.op_consistency_level.clone()))
137    }
138
139    pub fn is_dirty(&self) -> bool {
140        !self.buffer.is_empty()
141    }
142
143    /// write methods
144    pub fn insert(&mut self, pk: TableKey<Bytes>, value: Bytes) -> Result<()> {
145        if let OpConsistencyLevel::Inconsistent = &self.op_consistency_level {
146            let key_len = std::mem::size_of::<Bytes>() + pk.len();
147            let op = KeyOp::Insert(value);
148            self.kv_size.add(&pk, &op);
149            let old_op = self.buffer.insert(pk, op);
150            self.sub_old_op_size(old_op, key_len);
151            return Ok(());
152        };
153        let entry = self.buffer.entry(pk);
154        match entry {
155            Entry::Vacant(e) => {
156                let op = KeyOp::Insert(value);
157                self.kv_size.add(e.key(), &op);
158                e.insert(op);
159                Ok(())
160            }
161            Entry::Occupied(mut e) => {
162                let old_op = e.get_mut();
163                self.kv_size.sub_val(old_op);
164
165                match old_op {
166                    KeyOp::Delete(old_op_old_value) => {
167                        let new_op = KeyOp::Update((std::mem::take(old_op_old_value), value));
168                        self.kv_size.add_val(&new_op);
169                        e.insert(new_op);
170                        Ok(())
171                    }
172                    KeyOp::Insert(_) | KeyOp::Update(_) => {
173                        let new_op = KeyOp::Insert(value);
174                        let err = MemTableError::InconsistentOperation {
175                            key: e.key().clone(),
176                            prev: e.get().clone(),
177                            new: new_op.clone(),
178                        };
179
180                        if sanity_check_enabled() {
181                            Err(err.into())
182                        } else {
183                            tracing::error!(
184                                error = %err.as_report(),
185                                "double insert / insert on updated, ignoring because sanity check is disabled"
186                            );
187                            self.kv_size.add_val(&new_op);
188                            e.insert(new_op);
189                            Ok(())
190                        }
191                    }
192                }
193            }
194        }
195    }
196
197    pub fn delete(&mut self, pk: TableKey<Bytes>, old_value: Bytes) -> Result<()> {
198        let key_len = std::mem::size_of::<Bytes>() + pk.len();
199        let OpConsistencyLevel::ConsistentOldValue {
200            check_old_value: value_checker,
201            ..
202        } = &self.op_consistency_level
203        else {
204            let op = KeyOp::Delete(old_value);
205            self.kv_size.add(&pk, &op);
206            let old_op = self.buffer.insert(pk, op);
207            self.sub_old_op_size(old_op, key_len);
208            return Ok(());
209        };
210        let entry = self.buffer.entry(pk);
211        match entry {
212            Entry::Vacant(e) => {
213                let op = KeyOp::Delete(old_value);
214                self.kv_size.add(e.key(), &op);
215                e.insert(op);
216                Ok(())
217            }
218            Entry::Occupied(mut e) => {
219                let old_op = e.get_mut();
220                self.kv_size.sub_val(old_op);
221
222                match old_op {
223                    KeyOp::Insert(old_op_new_value) => {
224                        if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
225                            return Err(Box::new(MemTableError::InconsistentOperation {
226                                key: e.key().clone(),
227                                prev: e.get().clone(),
228                                new: KeyOp::Delete(old_value),
229                            }));
230                        }
231
232                        self.kv_size.sub_size(key_len);
233                        e.remove();
234                        Ok(())
235                    }
236                    KeyOp::Delete(_) => {
237                        let new_op = KeyOp::Delete(old_value);
238                        let err = MemTableError::InconsistentOperation {
239                            key: e.key().clone(),
240                            prev: e.get().clone(),
241                            new: new_op.clone(),
242                        };
243
244                        if sanity_check_enabled() {
245                            Err(err.into())
246                        } else {
247                            tracing::error!(
248                                error = %err.as_report(),
249                                "double delete, ignoring because sanity check is disabled"
250                            );
251                            self.kv_size.add_val(&new_op);
252                            e.insert(new_op);
253                            Ok(())
254                        }
255                    }
256                    KeyOp::Update((old_op_old_value, old_op_new_value)) => {
257                        if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
258                            return Err(Box::new(MemTableError::InconsistentOperation {
259                                key: e.key().clone(),
260                                prev: e.get().clone(),
261                                new: KeyOp::Delete(old_value),
262                            }));
263                        }
264
265                        let new_op = KeyOp::Delete(std::mem::take(old_op_old_value));
266                        self.kv_size.add_val(&new_op);
267                        e.insert(new_op);
268                        Ok(())
269                    }
270                }
271            }
272        }
273    }
274
275    pub fn update(
276        &mut self,
277        pk: TableKey<Bytes>,
278        old_value: Bytes,
279        new_value: Bytes,
280    ) -> Result<()> {
281        let OpConsistencyLevel::ConsistentOldValue {
282            check_old_value: value_checker,
283            ..
284        } = &self.op_consistency_level
285        else {
286            let key_len = std::mem::size_of::<Bytes>() + pk.len();
287            let op = KeyOp::Update((old_value, new_value));
288            self.kv_size.add(&pk, &op);
289            let old_op = self.buffer.insert(pk, op);
290            self.sub_old_op_size(old_op, key_len);
291            return Ok(());
292        };
293        let entry = self.buffer.entry(pk);
294        match entry {
295            Entry::Vacant(e) => {
296                let op = KeyOp::Update((old_value, new_value));
297                self.kv_size.add(e.key(), &op);
298                e.insert(op);
299                Ok(())
300            }
301            Entry::Occupied(mut e) => {
302                let old_op = e.get_mut();
303                self.kv_size.sub_val(old_op);
304
305                match old_op {
306                    KeyOp::Insert(old_op_new_value) => {
307                        if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
308                            return Err(Box::new(MemTableError::InconsistentOperation {
309                                key: e.key().clone(),
310                                prev: e.get().clone(),
311                                new: KeyOp::Update((old_value, new_value)),
312                            }));
313                        }
314
315                        let new_op = KeyOp::Insert(new_value);
316                        self.kv_size.add_val(&new_op);
317                        e.insert(new_op);
318                        Ok(())
319                    }
320                    KeyOp::Update((old_op_old_value, old_op_new_value)) => {
321                        if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
322                            return Err(Box::new(MemTableError::InconsistentOperation {
323                                key: e.key().clone(),
324                                prev: e.get().clone(),
325                                new: KeyOp::Update((old_value, new_value)),
326                            }));
327                        }
328
329                        let new_op = KeyOp::Update((std::mem::take(old_op_old_value), new_value));
330                        self.kv_size.add_val(&new_op);
331                        e.insert(new_op);
332                        Ok(())
333                    }
334                    KeyOp::Delete(_) => {
335                        let new_op = KeyOp::Update((old_value, new_value));
336                        let err = MemTableError::InconsistentOperation {
337                            key: e.key().clone(),
338                            prev: e.get().clone(),
339                            new: new_op.clone(),
340                        };
341
342                        if sanity_check_enabled() {
343                            Err(err.into())
344                        } else {
345                            tracing::error!(
346                                error = %err.as_report(),
347                                "update on deleted, ignoring because sanity check is disabled"
348                            );
349                            self.kv_size.add_val(&new_op);
350                            e.insert(new_op);
351                            Ok(())
352                        }
353                    }
354                }
355            }
356        }
357    }
358
359    pub fn into_parts(self) -> BTreeMap<TableKey<Bytes>, KeyOp> {
360        self.buffer
361    }
362
363    pub fn iter<'a, R>(
364        &'a self,
365        key_range: R,
366    ) -> impl Iterator<Item = (&'a TableKey<Bytes>, &'a KeyOp)>
367    where
368        R: RangeBounds<TableKey<Bytes>> + 'a,
369    {
370        self.buffer.range(key_range)
371    }
372
373    pub fn rev_iter<'a, R>(
374        &'a self,
375        key_range: R,
376    ) -> impl Iterator<Item = (&'a TableKey<Bytes>, &'a KeyOp)>
377    where
378        R: RangeBounds<TableKey<Bytes>> + 'a,
379    {
380        self.buffer.range(key_range).rev()
381    }
382
383    fn sub_old_op_size(&mut self, old_op: Option<KeyOp>, key_len: usize) {
384        if let Some(op) = old_op {
385            self.kv_size.sub_val(&op);
386            self.kv_size.sub_size(key_len);
387        }
388    }
389}
390
391impl KeyOp {
392    /// Print as debug string with decoded data.
393    ///
394    /// # Panics
395    ///
396    /// The function will panic if it failed to decode the bytes with provided data types.
397    pub fn debug_fmt(&self, row_deserializer: &impl ValueRowSerde) -> String {
398        match self {
399            Self::Insert(after) => {
400                let after = row_deserializer.deserialize(after.as_ref());
401                format!("Insert({:?})", &after)
402            }
403            Self::Delete(before) => {
404                let before = row_deserializer.deserialize(before.as_ref());
405                format!("Delete({:?})", &before)
406            }
407            Self::Update((before, after)) => {
408                let after = row_deserializer.deserialize(after.as_ref());
409                let before = row_deserializer.deserialize(before.as_ref());
410                format!("Update({:?}, {:?})", &before, &after)
411            }
412        }
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use bytes::{BufMut, Bytes, BytesMut};
419    use itertools::Itertools;
420    use rand::seq::SliceRandom;
421    use rand::{Rng, rng as thread_rng};
422    use risingwave_common::catalog::TableId;
423    use risingwave_common::hash::VirtualNode;
424    use risingwave_common::util::epoch::{EpochExt, test_epoch};
425    use risingwave_hummock_sdk::EpochWithGap;
426    use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
427
428    use crate::hummock::iterator::HummockIterator;
429    use crate::hummock::value::HummockValue;
430    use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator, MemTableHummockRevIterator};
431    use crate::store::{CHECK_BYTES_EQUAL, OpConsistencyLevel};
432
433    #[tokio::test]
434    async fn test_mem_table_memory_size() {
435        let mut mem_table = MemTable::new(OpConsistencyLevel::ConsistentOldValue {
436            check_old_value: CHECK_BYTES_EQUAL.clone(),
437            is_log_store: false,
438        });
439        assert_eq!(mem_table.kv_size.size(), 0);
440
441        mem_table
442            .insert(TableKey("key1".into()), "value1".into())
443            .unwrap();
444        assert_eq!(
445            mem_table.kv_size.size(),
446            std::mem::size_of::<Bytes>()
447                + Bytes::from("key1").len()
448                + std::mem::size_of::<KeyOp>()
449                + Bytes::from("value1").len()
450        );
451
452        // delete
453        mem_table.drain();
454        assert_eq!(mem_table.kv_size.size(), 0);
455        mem_table
456            .delete(TableKey("key2".into()), "value2".into())
457            .unwrap();
458        assert_eq!(
459            mem_table.kv_size.size(),
460            std::mem::size_of::<Bytes>()
461                + Bytes::from("key2").len()
462                + std::mem::size_of::<KeyOp>()
463                + Bytes::from("value2").len()
464        );
465        mem_table
466            .insert(TableKey("key2".into()), "value22".into())
467            .unwrap();
468        assert_eq!(
469            mem_table.kv_size.size(),
470            std::mem::size_of::<Bytes>()
471                + Bytes::from("key2").len()
472                + std::mem::size_of::<KeyOp>()
473                + Bytes::from("value22").len()
474                + Bytes::from("value2").len()
475        );
476
477        mem_table
478            .delete(TableKey("key2".into()), "value22".into())
479            .unwrap();
480
481        assert_eq!(
482            mem_table.kv_size.size(),
483            std::mem::size_of::<Bytes>()
484                + Bytes::from("key2").len()
485                + std::mem::size_of::<KeyOp>()
486                + Bytes::from("value2").len()
487        );
488
489        // update
490        mem_table.drain();
491        assert_eq!(mem_table.kv_size.size(), 0);
492        mem_table
493            .insert(TableKey("key3".into()), "value3".into())
494            .unwrap();
495        assert_eq!(
496            mem_table.kv_size.size(),
497            std::mem::size_of::<Bytes>()
498                + Bytes::from("key3").len()
499                + std::mem::size_of::<KeyOp>()
500                + Bytes::from("value3").len()
501        );
502
503        // update-> insert
504        mem_table
505            .update(TableKey("key3".into()), "value3".into(), "value333".into())
506            .unwrap();
507        assert_eq!(
508            mem_table.kv_size.size(),
509            std::mem::size_of::<Bytes>()
510                + Bytes::from("key3").len()
511                + std::mem::size_of::<KeyOp>()
512                + Bytes::from("value333").len()
513        );
514
515        mem_table.drain();
516        mem_table
517            .update(TableKey("key4".into()), "value4".into(), "value44".into())
518            .unwrap();
519
520        assert_eq!(
521            mem_table.kv_size.size(),
522            std::mem::size_of::<Bytes>()
523                + Bytes::from("key4").len()
524                + std::mem::size_of::<KeyOp>()
525                + Bytes::from("value4").len()
526                + Bytes::from("value44").len()
527        );
528        mem_table
529            .update(
530                TableKey("key4".into()),
531                "value44".into(),
532                "value4444".into(),
533            )
534            .unwrap();
535
536        assert_eq!(
537            mem_table.kv_size.size(),
538            std::mem::size_of::<Bytes>()
539                + Bytes::from("key4").len()
540                + std::mem::size_of::<KeyOp>()
541                + Bytes::from("value4").len()
542                + Bytes::from("value4444").len()
543        );
544    }
545
546    #[tokio::test]
547    async fn test_mem_table_memory_size_not_consistent_op() {
548        let mut mem_table = MemTable::new(OpConsistencyLevel::Inconsistent);
549        assert_eq!(mem_table.kv_size.size(), 0);
550
551        mem_table
552            .insert(TableKey("key1".into()), "value1".into())
553            .unwrap();
554
555        assert_eq!(
556            mem_table.kv_size.size(),
557            std::mem::size_of::<Bytes>()
558                + Bytes::from("key1").len()
559                + std::mem::size_of::<KeyOp>()
560                + Bytes::from("value1").len()
561        );
562
563        mem_table
564            .insert(TableKey("key1".into()), "value111".into())
565            .unwrap();
566        assert_eq!(
567            mem_table.kv_size.size(),
568            std::mem::size_of::<Bytes>()
569                + Bytes::from("key1").len()
570                + std::mem::size_of::<KeyOp>()
571                + Bytes::from("value111").len()
572        );
573        mem_table.drain();
574
575        mem_table
576            .update(TableKey("key4".into()), "value4".into(), "value44".into())
577            .unwrap();
578
579        assert_eq!(
580            mem_table.kv_size.size(),
581            std::mem::size_of::<Bytes>()
582                + Bytes::from("key4").len()
583                + std::mem::size_of::<KeyOp>()
584                + Bytes::from("value4").len()
585                + Bytes::from("value44").len()
586        );
587        mem_table
588            .update(
589                TableKey("key4".into()),
590                "value44".into(),
591                "value4444".into(),
592            )
593            .unwrap();
594
595        assert_eq!(
596            mem_table.kv_size.size(),
597            std::mem::size_of::<Bytes>()
598                + Bytes::from("key4").len()
599                + std::mem::size_of::<KeyOp>()
600                + Bytes::from("value44").len()
601                + Bytes::from("value4444").len()
602        );
603    }
604
605    #[tokio::test]
606    async fn test_mem_table_hummock_iterator() {
607        let mut rng = thread_rng();
608
609        fn get_key(i: usize) -> TableKey<Bytes> {
610            let mut bytes = BytesMut::new();
611            bytes.put(&VirtualNode::ZERO.to_be_bytes()[..]);
612            bytes.put(format!("key_{:20}", i).as_bytes());
613            TableKey(bytes.freeze())
614        }
615
616        let mut ordered_test_data = (0..10000)
617            .map(|i| {
618                let key_op = match rng.random_range(0..=2) {
619                    0 => KeyOp::Insert(Bytes::from("insert")),
620                    1 => KeyOp::Delete(Bytes::from("delete")),
621                    2 => KeyOp::Update((Bytes::from("old_value"), Bytes::from("new_value"))),
622                    _ => unreachable!(),
623                };
624                (get_key(i), key_op)
625            })
626            .collect_vec();
627
628        let mut test_data = ordered_test_data.clone();
629
630        test_data.shuffle(&mut rng);
631        let mut mem_table = MemTable::new(OpConsistencyLevel::ConsistentOldValue {
632            check_old_value: CHECK_BYTES_EQUAL.clone(),
633            is_log_store: false,
634        });
635        for (key, op) in test_data {
636            match op {
637                KeyOp::Insert(value) => {
638                    mem_table.insert(key, value).unwrap();
639                }
640                KeyOp::Delete(value) => mem_table.delete(key, value).unwrap(),
641                KeyOp::Update((old_value, new_value)) => {
642                    mem_table.update(key, old_value, new_value).unwrap();
643                }
644            }
645        }
646
647        const TEST_TABLE_ID: TableId = TableId::new(233);
648        const TEST_EPOCH: u64 = test_epoch(10);
649
650        async fn check_data<I: HummockIterator>(
651            iter: &mut I,
652            test_data: &[(TableKey<Bytes>, KeyOp)],
653        ) {
654            let mut idx = 0;
655            while iter.is_valid() {
656                let key = iter.key();
657                let value = iter.value();
658
659                let (expected_key, expected_value) = test_data[idx].clone();
660                assert_eq!(key.epoch_with_gap, EpochWithGap::new_from_epoch(TEST_EPOCH));
661                assert_eq!(key.user_key.table_id, TEST_TABLE_ID);
662                assert_eq!(
663                    key.user_key.table_key.0,
664                    expected_key.0.as_ref(),
665                    "failed at {}, {:?} != {:?}",
666                    idx,
667                    String::from_utf8(key.user_key.table_key.key_part().to_vec()).unwrap(),
668                    String::from_utf8(expected_key.key_part().to_vec()).unwrap(),
669                );
670                match expected_value {
671                    KeyOp::Insert(expected_value) | KeyOp::Update((_, expected_value)) => {
672                        assert_eq!(value, HummockValue::Put(expected_value.as_ref()));
673                    }
674                    KeyOp::Delete(_) => {
675                        assert_eq!(value, HummockValue::Delete);
676                    }
677                }
678
679                idx += 1;
680                iter.next().await.unwrap();
681            }
682            assert_eq!(idx, test_data.len());
683        }
684
685        let mut iter = MemTableHummockIterator::new(
686            &mem_table.buffer,
687            EpochWithGap::new_from_epoch(TEST_EPOCH),
688            TEST_TABLE_ID,
689        );
690
691        // Test rewind
692        iter.rewind().await.unwrap();
693        check_data(&mut iter, &ordered_test_data).await;
694
695        // Test seek with a later epoch, the first key is not skipped
696        let later_epoch = EpochWithGap::new_from_epoch(TEST_EPOCH.next_epoch());
697        let seek_idx = 500;
698        iter.seek(FullKey {
699            user_key: UserKey {
700                table_id: TEST_TABLE_ID,
701                table_key: TableKey(&get_key(seek_idx)),
702            },
703            epoch_with_gap: later_epoch,
704        })
705        .await
706        .unwrap();
707        check_data(&mut iter, &ordered_test_data[seek_idx..]).await;
708
709        // Test seek with a earlier epoch, the first key is skipped
710        let early_epoch = EpochWithGap::new_from_epoch(TEST_EPOCH.prev_epoch());
711        let seek_idx = 500;
712        iter.seek(FullKey {
713            user_key: UserKey {
714                table_id: TEST_TABLE_ID,
715                table_key: TableKey(&get_key(seek_idx)),
716            },
717            epoch_with_gap: early_epoch,
718        })
719        .await
720        .unwrap();
721        check_data(&mut iter, &ordered_test_data[seek_idx + 1..]).await;
722
723        // Test seek to over the end
724        iter.seek(FullKey {
725            user_key: UserKey {
726                table_id: TEST_TABLE_ID,
727                table_key: TableKey(&get_key(ordered_test_data.len() + 10)),
728            },
729            epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
730        })
731        .await
732        .unwrap();
733        check_data(&mut iter, &[]).await;
734
735        // Test seek with a smaller table id
736        let smaller_table_id = TableId::new(TEST_TABLE_ID.table_id() - 1);
737        iter.seek(FullKey {
738            user_key: UserKey {
739                table_id: smaller_table_id,
740                table_key: TableKey(&get_key(ordered_test_data.len() + 10)),
741            },
742            epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
743        })
744        .await
745        .unwrap();
746        check_data(&mut iter, &ordered_test_data).await;
747
748        // Test seek with a greater table id
749        let greater_table_id = TableId::new(TEST_TABLE_ID.table_id() + 1);
750        iter.seek(FullKey {
751            user_key: UserKey {
752                table_id: greater_table_id,
753                table_key: TableKey(&get_key(0)),
754            },
755            epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
756        })
757        .await
758        .unwrap();
759        check_data(&mut iter, &[]).await;
760
761        // check reverse iterator
762        ordered_test_data.reverse();
763        drop(iter);
764        let mut iter = MemTableHummockRevIterator::new(
765            &mem_table.buffer,
766            EpochWithGap::new_from_epoch(TEST_EPOCH),
767            TEST_TABLE_ID,
768        );
769
770        // Test rewind
771        iter.rewind().await.unwrap();
772        check_data(&mut iter, &ordered_test_data).await;
773
774        // Test seek with a smaller table id
775        let smaller_table_id = TableId::new(TEST_TABLE_ID.table_id() - 1);
776        iter.seek(FullKey {
777            user_key: UserKey {
778                table_id: smaller_table_id,
779                table_key: TableKey(&get_key(ordered_test_data.len() + 10)),
780            },
781            epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
782        })
783        .await
784        .unwrap();
785        check_data(&mut iter, &[]).await;
786
787        // Test seek with a greater table id
788        let greater_table_id = TableId::new(TEST_TABLE_ID.table_id() + 1);
789        iter.seek(FullKey {
790            user_key: UserKey {
791                table_id: greater_table_id,
792                table_key: TableKey(&get_key(0)),
793            },
794            epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
795        })
796        .await
797        .unwrap();
798        check_data(&mut iter, &ordered_test_data).await;
799
800        // Test seek with a later epoch, the first key is skipped
801        let later_epoch = EpochWithGap::new_from_epoch(TEST_EPOCH.next_epoch());
802        let seek_idx = 500;
803        iter.seek(FullKey {
804            user_key: UserKey {
805                table_id: TEST_TABLE_ID,
806                table_key: TableKey(&get_key(seek_idx)),
807            },
808            epoch_with_gap: later_epoch,
809        })
810        .await
811        .unwrap();
812        let rev_seek_idx = ordered_test_data.len() - seek_idx - 1;
813        check_data(&mut iter, &ordered_test_data[rev_seek_idx + 1..]).await;
814
815        // Test seek with a earlier epoch, the first key is not skipped
816        let early_epoch = EpochWithGap::new_from_epoch(TEST_EPOCH.prev_epoch());
817        let seek_idx = 500;
818        iter.seek(FullKey {
819            user_key: UserKey {
820                table_id: TEST_TABLE_ID,
821                table_key: TableKey(&get_key(seek_idx)),
822            },
823            epoch_with_gap: early_epoch,
824        })
825        .await
826        .unwrap();
827        let rev_seek_idx = ordered_test_data.len() - seek_idx - 1;
828        check_data(&mut iter, &ordered_test_data[rev_seek_idx..]).await;
829
830        drop(iter);
831        mem_table.insert(get_key(10001), "value1".into()).unwrap();
832
833        let mut iter = MemTableHummockRevIterator::new(
834            &mem_table.buffer,
835            EpochWithGap::new_from_epoch(TEST_EPOCH),
836            TEST_TABLE_ID,
837        );
838        iter.seek(FullKey {
839            user_key: UserKey {
840                table_id: TEST_TABLE_ID,
841                table_key: TableKey(&get_key(10000)),
842            },
843            epoch_with_gap: early_epoch,
844        })
845        .await
846        .unwrap();
847        assert_eq!(iter.key().user_key.table_key, get_key(9999).to_ref());
848
849        let mut iter = MemTableHummockIterator::new(
850            &mem_table.buffer,
851            EpochWithGap::new_from_epoch(TEST_EPOCH),
852            TEST_TABLE_ID,
853        );
854        iter.seek(FullKey {
855            user_key: UserKey {
856                table_id: TEST_TABLE_ID,
857                table_key: TableKey(&get_key(10000)),
858            },
859            epoch_with_gap: later_epoch,
860        })
861        .await
862        .unwrap();
863        assert_eq!(iter.key().user_key.table_key, get_key(10001).to_ref());
864    }
865}