1use std::collections::BTreeMap;
16use std::collections::btree_map::Entry;
17use std::ops::Bound::{Included, Unbounded};
18use std::ops::RangeBounds;
19
20use bytes::Bytes;
21use risingwave_common_estimate_size::{EstimateSize, KvSize};
22use risingwave_hummock_sdk::key::TableKey;
23use thiserror::Error;
24use thiserror_ext::AsReport;
25use tracing::error;
26
27use crate::hummock::iterator::{Backward, Forward, FromRustIterator, RustIteratorBuilder};
28use crate::hummock::shared_buffer::shared_buffer_batch::{SharedBufferBatch, SharedBufferBatchId};
29use crate::hummock::utils::sanity_check_enabled;
30use crate::hummock::value::HummockValue;
31use crate::row_serde::value_serde::ValueRowSerde;
32use crate::store::*;
33pub type ImmutableMemtable = SharedBufferBatch;
34
35pub type ImmId = SharedBufferBatchId;
36
37#[derive(Clone, Debug, EstimateSize)]
38pub enum KeyOp {
39 Insert(Bytes),
40 Delete(Bytes),
41 Update((Bytes, Bytes)),
43}
44
45#[derive(Clone)]
47pub struct MemTable {
48 pub(crate) buffer: MemTableStore,
49 pub(crate) op_consistency_level: OpConsistencyLevel,
50 pub(crate) kv_size: KvSize,
51}
52
53#[derive(Error, Debug)]
54pub enum MemTableError {
55 #[error("Inconsistent operation {key:?}, prev: {prev:?}, new: {new:?}")]
56 InconsistentOperation {
57 key: TableKey<Bytes>,
58 prev: KeyOp,
59 new: KeyOp,
60 },
61}
62
63type Result<T> = std::result::Result<T, Box<MemTableError>>;
64
65pub type MemTableStore = BTreeMap<TableKey<Bytes>, KeyOp>;
66pub struct MemTableIteratorBuilder;
67pub struct MemTableRevIteratorBuilder;
68
69fn map_to_hummock_value<'a>(
70 (key, op): (&'a TableKey<Bytes>, &'a KeyOp),
71) -> (TableKey<&'a [u8]>, HummockValue<&'a [u8]>) {
72 (
73 TableKey(key.0.as_ref()),
74 match op {
75 KeyOp::Insert(value) | KeyOp::Update((_, value)) => HummockValue::Put(value),
76 KeyOp::Delete(_) => HummockValue::Delete,
77 },
78 )
79}
80
81impl RustIteratorBuilder for MemTableRevIteratorBuilder {
82 type Direction = Backward;
83 type Iterable = MemTableStore;
84
85 type RewindIter<'a> =
86 impl Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)> + Send + 'a;
87 type SeekIter<'a> =
88 impl Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)> + Send + 'a;
89
90 fn seek<'a>(iterable: &'a Self::Iterable, seek_key: TableKey<&[u8]>) -> Self::SeekIter<'a> {
91 iterable
92 .range::<[u8], _>((Unbounded, Included(seek_key.0)))
93 .rev()
94 .map(map_to_hummock_value)
95 }
96
97 fn rewind(iterable: &Self::Iterable) -> Self::RewindIter<'_> {
98 iterable.iter().rev().map(map_to_hummock_value)
99 }
100}
101
102impl RustIteratorBuilder for MemTableIteratorBuilder {
103 type Direction = Forward;
104 type Iterable = MemTableStore;
105
106 type RewindIter<'a> =
107 impl Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)> + Send + 'a;
108 type SeekIter<'a> =
109 impl Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)> + Send + 'a;
110
111 fn seek<'a>(iterable: &'a Self::Iterable, seek_key: TableKey<&[u8]>) -> Self::SeekIter<'a> {
112 iterable
113 .range::<[u8], _>((Included(seek_key.0), Unbounded))
114 .map(map_to_hummock_value)
115 }
116
117 fn rewind(iterable: &Self::Iterable) -> Self::RewindIter<'_> {
118 iterable.iter().map(map_to_hummock_value)
119 }
120}
121
122pub type MemTableHummockIterator<'a> = FromRustIterator<'a, MemTableIteratorBuilder>;
123pub type MemTableHummockRevIterator<'a> = FromRustIterator<'a, MemTableRevIteratorBuilder>;
124
125impl MemTable {
126 pub fn new(op_consistency_level: OpConsistencyLevel) -> Self {
127 Self {
128 buffer: BTreeMap::new(),
129 op_consistency_level,
130 kv_size: KvSize::new(),
131 }
132 }
133
134 pub fn drain(&mut self) -> Self {
135 self.kv_size.set(0);
136 std::mem::replace(self, Self::new(self.op_consistency_level.clone()))
137 }
138
139 pub fn is_dirty(&self) -> bool {
140 !self.buffer.is_empty()
141 }
142
143 pub fn insert(&mut self, pk: TableKey<Bytes>, value: Bytes) -> Result<()> {
145 if let OpConsistencyLevel::Inconsistent = &self.op_consistency_level {
146 let key_len = std::mem::size_of::<Bytes>() + pk.len();
147 let op = KeyOp::Insert(value);
148 self.kv_size.add(&pk, &op);
149 let old_op = self.buffer.insert(pk, op);
150 self.sub_old_op_size(old_op, key_len);
151 return Ok(());
152 };
153 let entry = self.buffer.entry(pk);
154 match entry {
155 Entry::Vacant(e) => {
156 let op = KeyOp::Insert(value);
157 self.kv_size.add(e.key(), &op);
158 e.insert(op);
159 Ok(())
160 }
161 Entry::Occupied(mut e) => {
162 let old_op = e.get_mut();
163 self.kv_size.sub_val(old_op);
164
165 match old_op {
166 KeyOp::Delete(old_op_old_value) => {
167 let new_op = KeyOp::Update((std::mem::take(old_op_old_value), value));
168 self.kv_size.add_val(&new_op);
169 e.insert(new_op);
170 Ok(())
171 }
172 KeyOp::Insert(_) | KeyOp::Update(_) => {
173 let new_op = KeyOp::Insert(value);
174 let err = MemTableError::InconsistentOperation {
175 key: e.key().clone(),
176 prev: e.get().clone(),
177 new: new_op.clone(),
178 };
179
180 if sanity_check_enabled() {
181 Err(err.into())
182 } else {
183 tracing::error!(
184 error = %err.as_report(),
185 "double insert / insert on updated, ignoring because sanity check is disabled"
186 );
187 self.kv_size.add_val(&new_op);
188 e.insert(new_op);
189 Ok(())
190 }
191 }
192 }
193 }
194 }
195 }
196
197 pub fn delete(&mut self, pk: TableKey<Bytes>, old_value: Bytes) -> Result<()> {
198 let key_len = std::mem::size_of::<Bytes>() + pk.len();
199 let OpConsistencyLevel::ConsistentOldValue {
200 check_old_value: value_checker,
201 ..
202 } = &self.op_consistency_level
203 else {
204 let op = KeyOp::Delete(old_value);
205 self.kv_size.add(&pk, &op);
206 let old_op = self.buffer.insert(pk, op);
207 self.sub_old_op_size(old_op, key_len);
208 return Ok(());
209 };
210 let entry = self.buffer.entry(pk);
211 match entry {
212 Entry::Vacant(e) => {
213 let op = KeyOp::Delete(old_value);
214 self.kv_size.add(e.key(), &op);
215 e.insert(op);
216 Ok(())
217 }
218 Entry::Occupied(mut e) => {
219 let old_op = e.get_mut();
220 self.kv_size.sub_val(old_op);
221
222 match old_op {
223 KeyOp::Insert(old_op_new_value) => {
224 if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
225 return Err(Box::new(MemTableError::InconsistentOperation {
226 key: e.key().clone(),
227 prev: e.get().clone(),
228 new: KeyOp::Delete(old_value),
229 }));
230 }
231
232 self.kv_size.sub_size(key_len);
233 e.remove();
234 Ok(())
235 }
236 KeyOp::Delete(_) => {
237 let new_op = KeyOp::Delete(old_value);
238 let err = MemTableError::InconsistentOperation {
239 key: e.key().clone(),
240 prev: e.get().clone(),
241 new: new_op.clone(),
242 };
243
244 if sanity_check_enabled() {
245 Err(err.into())
246 } else {
247 tracing::error!(
248 error = %err.as_report(),
249 "double delete, ignoring because sanity check is disabled"
250 );
251 self.kv_size.add_val(&new_op);
252 e.insert(new_op);
253 Ok(())
254 }
255 }
256 KeyOp::Update((old_op_old_value, old_op_new_value)) => {
257 if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
258 return Err(Box::new(MemTableError::InconsistentOperation {
259 key: e.key().clone(),
260 prev: e.get().clone(),
261 new: KeyOp::Delete(old_value),
262 }));
263 }
264
265 let new_op = KeyOp::Delete(std::mem::take(old_op_old_value));
266 self.kv_size.add_val(&new_op);
267 e.insert(new_op);
268 Ok(())
269 }
270 }
271 }
272 }
273 }
274
275 pub fn update(
276 &mut self,
277 pk: TableKey<Bytes>,
278 old_value: Bytes,
279 new_value: Bytes,
280 ) -> Result<()> {
281 let OpConsistencyLevel::ConsistentOldValue {
282 check_old_value: value_checker,
283 ..
284 } = &self.op_consistency_level
285 else {
286 let key_len = std::mem::size_of::<Bytes>() + pk.len();
287 let op = KeyOp::Update((old_value, new_value));
288 self.kv_size.add(&pk, &op);
289 let old_op = self.buffer.insert(pk, op);
290 self.sub_old_op_size(old_op, key_len);
291 return Ok(());
292 };
293 let entry = self.buffer.entry(pk);
294 match entry {
295 Entry::Vacant(e) => {
296 let op = KeyOp::Update((old_value, new_value));
297 self.kv_size.add(e.key(), &op);
298 e.insert(op);
299 Ok(())
300 }
301 Entry::Occupied(mut e) => {
302 let old_op = e.get_mut();
303 self.kv_size.sub_val(old_op);
304
305 match old_op {
306 KeyOp::Insert(old_op_new_value) => {
307 if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
308 return Err(Box::new(MemTableError::InconsistentOperation {
309 key: e.key().clone(),
310 prev: e.get().clone(),
311 new: KeyOp::Update((old_value, new_value)),
312 }));
313 }
314
315 let new_op = KeyOp::Insert(new_value);
316 self.kv_size.add_val(&new_op);
317 e.insert(new_op);
318 Ok(())
319 }
320 KeyOp::Update((old_op_old_value, old_op_new_value)) => {
321 if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
322 return Err(Box::new(MemTableError::InconsistentOperation {
323 key: e.key().clone(),
324 prev: e.get().clone(),
325 new: KeyOp::Update((old_value, new_value)),
326 }));
327 }
328
329 let new_op = KeyOp::Update((std::mem::take(old_op_old_value), new_value));
330 self.kv_size.add_val(&new_op);
331 e.insert(new_op);
332 Ok(())
333 }
334 KeyOp::Delete(_) => {
335 let new_op = KeyOp::Update((old_value, new_value));
336 let err = MemTableError::InconsistentOperation {
337 key: e.key().clone(),
338 prev: e.get().clone(),
339 new: new_op.clone(),
340 };
341
342 if sanity_check_enabled() {
343 Err(err.into())
344 } else {
345 tracing::error!(
346 error = %err.as_report(),
347 "update on deleted, ignoring because sanity check is disabled"
348 );
349 self.kv_size.add_val(&new_op);
350 e.insert(new_op);
351 Ok(())
352 }
353 }
354 }
355 }
356 }
357 }
358
359 pub fn into_parts(self) -> BTreeMap<TableKey<Bytes>, KeyOp> {
360 self.buffer
361 }
362
363 pub fn iter<'a, R>(
364 &'a self,
365 key_range: R,
366 ) -> impl Iterator<Item = (&'a TableKey<Bytes>, &'a KeyOp)>
367 where
368 R: RangeBounds<TableKey<Bytes>> + 'a,
369 {
370 self.buffer.range(key_range)
371 }
372
373 pub fn rev_iter<'a, R>(
374 &'a self,
375 key_range: R,
376 ) -> impl Iterator<Item = (&'a TableKey<Bytes>, &'a KeyOp)>
377 where
378 R: RangeBounds<TableKey<Bytes>> + 'a,
379 {
380 self.buffer.range(key_range).rev()
381 }
382
383 fn sub_old_op_size(&mut self, old_op: Option<KeyOp>, key_len: usize) {
384 if let Some(op) = old_op {
385 self.kv_size.sub_val(&op);
386 self.kv_size.sub_size(key_len);
387 }
388 }
389}
390
391impl KeyOp {
392 pub fn debug_fmt(&self, row_deserializer: &impl ValueRowSerde) -> String {
398 match self {
399 Self::Insert(after) => {
400 let after = row_deserializer.deserialize(after.as_ref());
401 format!("Insert({:?})", &after)
402 }
403 Self::Delete(before) => {
404 let before = row_deserializer.deserialize(before.as_ref());
405 format!("Delete({:?})", &before)
406 }
407 Self::Update((before, after)) => {
408 let after = row_deserializer.deserialize(after.as_ref());
409 let before = row_deserializer.deserialize(before.as_ref());
410 format!("Update({:?}, {:?})", &before, &after)
411 }
412 }
413 }
414}
415
416#[cfg(test)]
417mod tests {
418 use bytes::{BufMut, Bytes, BytesMut};
419 use itertools::Itertools;
420 use rand::seq::SliceRandom;
421 use rand::{Rng, rng as thread_rng};
422 use risingwave_common::catalog::TableId;
423 use risingwave_common::hash::VirtualNode;
424 use risingwave_common::util::epoch::{EpochExt, test_epoch};
425 use risingwave_hummock_sdk::EpochWithGap;
426 use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
427
428 use crate::hummock::iterator::HummockIterator;
429 use crate::hummock::value::HummockValue;
430 use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator, MemTableHummockRevIterator};
431 use crate::store::{CHECK_BYTES_EQUAL, OpConsistencyLevel};
432
433 #[tokio::test]
434 async fn test_mem_table_memory_size() {
435 let mut mem_table = MemTable::new(OpConsistencyLevel::ConsistentOldValue {
436 check_old_value: CHECK_BYTES_EQUAL.clone(),
437 is_log_store: false,
438 });
439 assert_eq!(mem_table.kv_size.size(), 0);
440
441 mem_table
442 .insert(TableKey("key1".into()), "value1".into())
443 .unwrap();
444 assert_eq!(
445 mem_table.kv_size.size(),
446 std::mem::size_of::<Bytes>()
447 + Bytes::from("key1").len()
448 + std::mem::size_of::<KeyOp>()
449 + Bytes::from("value1").len()
450 );
451
452 mem_table.drain();
454 assert_eq!(mem_table.kv_size.size(), 0);
455 mem_table
456 .delete(TableKey("key2".into()), "value2".into())
457 .unwrap();
458 assert_eq!(
459 mem_table.kv_size.size(),
460 std::mem::size_of::<Bytes>()
461 + Bytes::from("key2").len()
462 + std::mem::size_of::<KeyOp>()
463 + Bytes::from("value2").len()
464 );
465 mem_table
466 .insert(TableKey("key2".into()), "value22".into())
467 .unwrap();
468 assert_eq!(
469 mem_table.kv_size.size(),
470 std::mem::size_of::<Bytes>()
471 + Bytes::from("key2").len()
472 + std::mem::size_of::<KeyOp>()
473 + Bytes::from("value22").len()
474 + Bytes::from("value2").len()
475 );
476
477 mem_table
478 .delete(TableKey("key2".into()), "value22".into())
479 .unwrap();
480
481 assert_eq!(
482 mem_table.kv_size.size(),
483 std::mem::size_of::<Bytes>()
484 + Bytes::from("key2").len()
485 + std::mem::size_of::<KeyOp>()
486 + Bytes::from("value2").len()
487 );
488
489 mem_table.drain();
491 assert_eq!(mem_table.kv_size.size(), 0);
492 mem_table
493 .insert(TableKey("key3".into()), "value3".into())
494 .unwrap();
495 assert_eq!(
496 mem_table.kv_size.size(),
497 std::mem::size_of::<Bytes>()
498 + Bytes::from("key3").len()
499 + std::mem::size_of::<KeyOp>()
500 + Bytes::from("value3").len()
501 );
502
503 mem_table
505 .update(TableKey("key3".into()), "value3".into(), "value333".into())
506 .unwrap();
507 assert_eq!(
508 mem_table.kv_size.size(),
509 std::mem::size_of::<Bytes>()
510 + Bytes::from("key3").len()
511 + std::mem::size_of::<KeyOp>()
512 + Bytes::from("value333").len()
513 );
514
515 mem_table.drain();
516 mem_table
517 .update(TableKey("key4".into()), "value4".into(), "value44".into())
518 .unwrap();
519
520 assert_eq!(
521 mem_table.kv_size.size(),
522 std::mem::size_of::<Bytes>()
523 + Bytes::from("key4").len()
524 + std::mem::size_of::<KeyOp>()
525 + Bytes::from("value4").len()
526 + Bytes::from("value44").len()
527 );
528 mem_table
529 .update(
530 TableKey("key4".into()),
531 "value44".into(),
532 "value4444".into(),
533 )
534 .unwrap();
535
536 assert_eq!(
537 mem_table.kv_size.size(),
538 std::mem::size_of::<Bytes>()
539 + Bytes::from("key4").len()
540 + std::mem::size_of::<KeyOp>()
541 + Bytes::from("value4").len()
542 + Bytes::from("value4444").len()
543 );
544 }
545
546 #[tokio::test]
547 async fn test_mem_table_memory_size_not_consistent_op() {
548 let mut mem_table = MemTable::new(OpConsistencyLevel::Inconsistent);
549 assert_eq!(mem_table.kv_size.size(), 0);
550
551 mem_table
552 .insert(TableKey("key1".into()), "value1".into())
553 .unwrap();
554
555 assert_eq!(
556 mem_table.kv_size.size(),
557 std::mem::size_of::<Bytes>()
558 + Bytes::from("key1").len()
559 + std::mem::size_of::<KeyOp>()
560 + Bytes::from("value1").len()
561 );
562
563 mem_table
564 .insert(TableKey("key1".into()), "value111".into())
565 .unwrap();
566 assert_eq!(
567 mem_table.kv_size.size(),
568 std::mem::size_of::<Bytes>()
569 + Bytes::from("key1").len()
570 + std::mem::size_of::<KeyOp>()
571 + Bytes::from("value111").len()
572 );
573 mem_table.drain();
574
575 mem_table
576 .update(TableKey("key4".into()), "value4".into(), "value44".into())
577 .unwrap();
578
579 assert_eq!(
580 mem_table.kv_size.size(),
581 std::mem::size_of::<Bytes>()
582 + Bytes::from("key4").len()
583 + std::mem::size_of::<KeyOp>()
584 + Bytes::from("value4").len()
585 + Bytes::from("value44").len()
586 );
587 mem_table
588 .update(
589 TableKey("key4".into()),
590 "value44".into(),
591 "value4444".into(),
592 )
593 .unwrap();
594
595 assert_eq!(
596 mem_table.kv_size.size(),
597 std::mem::size_of::<Bytes>()
598 + Bytes::from("key4").len()
599 + std::mem::size_of::<KeyOp>()
600 + Bytes::from("value44").len()
601 + Bytes::from("value4444").len()
602 );
603 }
604
605 #[tokio::test]
606 async fn test_mem_table_hummock_iterator() {
607 let mut rng = thread_rng();
608
609 fn get_key(i: usize) -> TableKey<Bytes> {
610 let mut bytes = BytesMut::new();
611 bytes.put(&VirtualNode::ZERO.to_be_bytes()[..]);
612 bytes.put(format!("key_{:20}", i).as_bytes());
613 TableKey(bytes.freeze())
614 }
615
616 let mut ordered_test_data = (0..10000)
617 .map(|i| {
618 let key_op = match rng.random_range(0..=2) {
619 0 => KeyOp::Insert(Bytes::from("insert")),
620 1 => KeyOp::Delete(Bytes::from("delete")),
621 2 => KeyOp::Update((Bytes::from("old_value"), Bytes::from("new_value"))),
622 _ => unreachable!(),
623 };
624 (get_key(i), key_op)
625 })
626 .collect_vec();
627
628 let mut test_data = ordered_test_data.clone();
629
630 test_data.shuffle(&mut rng);
631 let mut mem_table = MemTable::new(OpConsistencyLevel::ConsistentOldValue {
632 check_old_value: CHECK_BYTES_EQUAL.clone(),
633 is_log_store: false,
634 });
635 for (key, op) in test_data {
636 match op {
637 KeyOp::Insert(value) => {
638 mem_table.insert(key, value).unwrap();
639 }
640 KeyOp::Delete(value) => mem_table.delete(key, value).unwrap(),
641 KeyOp::Update((old_value, new_value)) => {
642 mem_table.update(key, old_value, new_value).unwrap();
643 }
644 }
645 }
646
647 const TEST_TABLE_ID: TableId = TableId::new(233);
648 const TEST_EPOCH: u64 = test_epoch(10);
649
650 async fn check_data<I: HummockIterator>(
651 iter: &mut I,
652 test_data: &[(TableKey<Bytes>, KeyOp)],
653 ) {
654 let mut idx = 0;
655 while iter.is_valid() {
656 let key = iter.key();
657 let value = iter.value();
658
659 let (expected_key, expected_value) = test_data[idx].clone();
660 assert_eq!(key.epoch_with_gap, EpochWithGap::new_from_epoch(TEST_EPOCH));
661 assert_eq!(key.user_key.table_id, TEST_TABLE_ID);
662 assert_eq!(
663 key.user_key.table_key.0,
664 expected_key.0.as_ref(),
665 "failed at {}, {:?} != {:?}",
666 idx,
667 String::from_utf8(key.user_key.table_key.key_part().to_vec()).unwrap(),
668 String::from_utf8(expected_key.key_part().to_vec()).unwrap(),
669 );
670 match expected_value {
671 KeyOp::Insert(expected_value) | KeyOp::Update((_, expected_value)) => {
672 assert_eq!(value, HummockValue::Put(expected_value.as_ref()));
673 }
674 KeyOp::Delete(_) => {
675 assert_eq!(value, HummockValue::Delete);
676 }
677 }
678
679 idx += 1;
680 iter.next().await.unwrap();
681 }
682 assert_eq!(idx, test_data.len());
683 }
684
685 let mut iter = MemTableHummockIterator::new(
686 &mem_table.buffer,
687 EpochWithGap::new_from_epoch(TEST_EPOCH),
688 TEST_TABLE_ID,
689 );
690
691 iter.rewind().await.unwrap();
693 check_data(&mut iter, &ordered_test_data).await;
694
695 let later_epoch = EpochWithGap::new_from_epoch(TEST_EPOCH.next_epoch());
697 let seek_idx = 500;
698 iter.seek(FullKey {
699 user_key: UserKey {
700 table_id: TEST_TABLE_ID,
701 table_key: TableKey(&get_key(seek_idx)),
702 },
703 epoch_with_gap: later_epoch,
704 })
705 .await
706 .unwrap();
707 check_data(&mut iter, &ordered_test_data[seek_idx..]).await;
708
709 let early_epoch = EpochWithGap::new_from_epoch(TEST_EPOCH.prev_epoch());
711 let seek_idx = 500;
712 iter.seek(FullKey {
713 user_key: UserKey {
714 table_id: TEST_TABLE_ID,
715 table_key: TableKey(&get_key(seek_idx)),
716 },
717 epoch_with_gap: early_epoch,
718 })
719 .await
720 .unwrap();
721 check_data(&mut iter, &ordered_test_data[seek_idx + 1..]).await;
722
723 iter.seek(FullKey {
725 user_key: UserKey {
726 table_id: TEST_TABLE_ID,
727 table_key: TableKey(&get_key(ordered_test_data.len() + 10)),
728 },
729 epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
730 })
731 .await
732 .unwrap();
733 check_data(&mut iter, &[]).await;
734
735 let smaller_table_id = TableId::new(TEST_TABLE_ID.table_id() - 1);
737 iter.seek(FullKey {
738 user_key: UserKey {
739 table_id: smaller_table_id,
740 table_key: TableKey(&get_key(ordered_test_data.len() + 10)),
741 },
742 epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
743 })
744 .await
745 .unwrap();
746 check_data(&mut iter, &ordered_test_data).await;
747
748 let greater_table_id = TableId::new(TEST_TABLE_ID.table_id() + 1);
750 iter.seek(FullKey {
751 user_key: UserKey {
752 table_id: greater_table_id,
753 table_key: TableKey(&get_key(0)),
754 },
755 epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
756 })
757 .await
758 .unwrap();
759 check_data(&mut iter, &[]).await;
760
761 ordered_test_data.reverse();
763 drop(iter);
764 let mut iter = MemTableHummockRevIterator::new(
765 &mem_table.buffer,
766 EpochWithGap::new_from_epoch(TEST_EPOCH),
767 TEST_TABLE_ID,
768 );
769
770 iter.rewind().await.unwrap();
772 check_data(&mut iter, &ordered_test_data).await;
773
774 let smaller_table_id = TableId::new(TEST_TABLE_ID.table_id() - 1);
776 iter.seek(FullKey {
777 user_key: UserKey {
778 table_id: smaller_table_id,
779 table_key: TableKey(&get_key(ordered_test_data.len() + 10)),
780 },
781 epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
782 })
783 .await
784 .unwrap();
785 check_data(&mut iter, &[]).await;
786
787 let greater_table_id = TableId::new(TEST_TABLE_ID.table_id() + 1);
789 iter.seek(FullKey {
790 user_key: UserKey {
791 table_id: greater_table_id,
792 table_key: TableKey(&get_key(0)),
793 },
794 epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
795 })
796 .await
797 .unwrap();
798 check_data(&mut iter, &ordered_test_data).await;
799
800 let later_epoch = EpochWithGap::new_from_epoch(TEST_EPOCH.next_epoch());
802 let seek_idx = 500;
803 iter.seek(FullKey {
804 user_key: UserKey {
805 table_id: TEST_TABLE_ID,
806 table_key: TableKey(&get_key(seek_idx)),
807 },
808 epoch_with_gap: later_epoch,
809 })
810 .await
811 .unwrap();
812 let rev_seek_idx = ordered_test_data.len() - seek_idx - 1;
813 check_data(&mut iter, &ordered_test_data[rev_seek_idx + 1..]).await;
814
815 let early_epoch = EpochWithGap::new_from_epoch(TEST_EPOCH.prev_epoch());
817 let seek_idx = 500;
818 iter.seek(FullKey {
819 user_key: UserKey {
820 table_id: TEST_TABLE_ID,
821 table_key: TableKey(&get_key(seek_idx)),
822 },
823 epoch_with_gap: early_epoch,
824 })
825 .await
826 .unwrap();
827 let rev_seek_idx = ordered_test_data.len() - seek_idx - 1;
828 check_data(&mut iter, &ordered_test_data[rev_seek_idx..]).await;
829
830 drop(iter);
831 mem_table.insert(get_key(10001), "value1".into()).unwrap();
832
833 let mut iter = MemTableHummockRevIterator::new(
834 &mem_table.buffer,
835 EpochWithGap::new_from_epoch(TEST_EPOCH),
836 TEST_TABLE_ID,
837 );
838 iter.seek(FullKey {
839 user_key: UserKey {
840 table_id: TEST_TABLE_ID,
841 table_key: TableKey(&get_key(10000)),
842 },
843 epoch_with_gap: early_epoch,
844 })
845 .await
846 .unwrap();
847 assert_eq!(iter.key().user_key.table_key, get_key(9999).to_ref());
848
849 let mut iter = MemTableHummockIterator::new(
850 &mem_table.buffer,
851 EpochWithGap::new_from_epoch(TEST_EPOCH),
852 TEST_TABLE_ID,
853 );
854 iter.seek(FullKey {
855 user_key: UserKey {
856 table_id: TEST_TABLE_ID,
857 table_key: TableKey(&get_key(10000)),
858 },
859 epoch_with_gap: later_epoch,
860 })
861 .await
862 .unwrap();
863 assert_eq!(iter.key().user_key.table_key, get_key(10001).to_ref());
864 }
865}