1use std::collections::BTreeMap;
16use std::collections::btree_map::Entry;
17use std::ops::Bound::{Included, Unbounded};
18use std::ops::RangeBounds;
19
20use bytes::Bytes;
21use risingwave_common::catalog::TableId;
22use risingwave_common_estimate_size::{EstimateSize, KvSize};
23use risingwave_hummock_sdk::key::TableKey;
24use thiserror::Error;
25use thiserror_ext::AsReport;
26
27use crate::hummock::iterator::{Backward, Forward, FromRustIterator, RustIteratorBuilder};
28use crate::hummock::shared_buffer::shared_buffer_batch::{SharedBufferBatch, SharedBufferBatchId};
29use crate::hummock::utils::sanity_check_enabled;
30use crate::hummock::value::HummockValue;
31use crate::row_serde::value_serde::ValueRowSerde;
32use crate::store::*;
33pub type ImmutableMemtable = SharedBufferBatch;
34
35pub type ImmId = SharedBufferBatchId;
36
37#[derive(Clone, Debug, EstimateSize)]
38pub enum KeyOp {
39 Insert(Bytes),
40 Delete(Bytes),
41 Update((Bytes, Bytes)),
43}
44
45#[derive(Clone)]
47pub struct MemTable {
48 table_id: TableId,
49 pub(crate) buffer: MemTableStore,
50 pub(crate) op_consistency_level: OpConsistencyLevel,
51 pub(crate) kv_size: KvSize,
52}
53
54#[derive(Error, Debug)]
55pub enum MemTableError {
56 #[error("Inconsistent operation on table {table_id} {key:?}, prev: {prev:?}, new: {new:?}")]
57 InconsistentOperation {
58 table_id: TableId,
59 key: TableKey<Bytes>,
60 prev: KeyOp,
61 new: KeyOp,
62 },
63}
64
65type Result<T> = std::result::Result<T, Box<MemTableError>>;
66
67pub type MemTableStore = BTreeMap<TableKey<Bytes>, KeyOp>;
68pub struct MemTableIteratorBuilder;
69pub struct MemTableRevIteratorBuilder;
70
71fn map_to_hummock_value<'a>(
72 (key, op): (&'a TableKey<Bytes>, &'a KeyOp),
73) -> (TableKey<&'a [u8]>, HummockValue<&'a [u8]>) {
74 (
75 TableKey(key.0.as_ref()),
76 match op {
77 KeyOp::Insert(value) | KeyOp::Update((_, value)) => HummockValue::Put(value),
78 KeyOp::Delete(_) => HummockValue::Delete,
79 },
80 )
81}
82
83impl RustIteratorBuilder for MemTableRevIteratorBuilder {
84 type Direction = Backward;
85 type Iterable = MemTableStore;
86
87 type RewindIter<'a> =
88 impl Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)> + Send + 'a;
89 type SeekIter<'a> =
90 impl Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)> + Send + 'a;
91
92 fn seek<'a>(iterable: &'a Self::Iterable, seek_key: TableKey<&[u8]>) -> Self::SeekIter<'a> {
93 iterable
94 .range::<[u8], _>((Unbounded, Included(seek_key.0)))
95 .rev()
96 .map(map_to_hummock_value)
97 }
98
99 fn rewind(iterable: &Self::Iterable) -> Self::RewindIter<'_> {
100 iterable.iter().rev().map(map_to_hummock_value)
101 }
102}
103
104impl RustIteratorBuilder for MemTableIteratorBuilder {
105 type Direction = Forward;
106 type Iterable = MemTableStore;
107
108 type RewindIter<'a> =
109 impl Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)> + Send + 'a;
110 type SeekIter<'a> =
111 impl Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)> + Send + 'a;
112
113 fn seek<'a>(iterable: &'a Self::Iterable, seek_key: TableKey<&[u8]>) -> Self::SeekIter<'a> {
114 iterable
115 .range::<[u8], _>((Included(seek_key.0), Unbounded))
116 .map(map_to_hummock_value)
117 }
118
119 fn rewind(iterable: &Self::Iterable) -> Self::RewindIter<'_> {
120 iterable.iter().map(map_to_hummock_value)
121 }
122}
123
124pub type MemTableHummockIterator<'a> = FromRustIterator<'a, MemTableIteratorBuilder>;
125pub type MemTableHummockRevIterator<'a> = FromRustIterator<'a, MemTableRevIteratorBuilder>;
126
127impl MemTable {
128 pub fn new(table_id: TableId, op_consistency_level: OpConsistencyLevel) -> Self {
129 Self {
130 table_id,
131 buffer: BTreeMap::new(),
132 op_consistency_level,
133 kv_size: KvSize::new(),
134 }
135 }
136
137 pub fn drain(&mut self) -> Self {
138 self.kv_size.set(0);
139 std::mem::replace(
140 self,
141 Self::new(self.table_id, self.op_consistency_level.clone()),
142 )
143 }
144
145 pub fn is_dirty(&self) -> bool {
146 !self.buffer.is_empty()
147 }
148
149 pub fn insert(&mut self, pk: TableKey<Bytes>, value: Bytes) -> Result<()> {
151 if let OpConsistencyLevel::Inconsistent = &self.op_consistency_level {
152 let key_len = std::mem::size_of::<Bytes>() + pk.len();
153 let op = KeyOp::Insert(value);
154 self.kv_size.add(&pk, &op);
155 let old_op = self.buffer.insert(pk, op);
156 self.sub_old_op_size(old_op, key_len);
157 return Ok(());
158 };
159 let entry = self.buffer.entry(pk);
160 match entry {
161 Entry::Vacant(e) => {
162 let op = KeyOp::Insert(value);
163 self.kv_size.add(e.key(), &op);
164 e.insert(op);
165 Ok(())
166 }
167 Entry::Occupied(mut e) => {
168 let old_op = e.get_mut();
169 self.kv_size.sub_val(old_op);
170
171 match old_op {
172 KeyOp::Delete(old_op_old_value) => {
173 let new_op = KeyOp::Update((std::mem::take(old_op_old_value), value));
174 self.kv_size.add_val(&new_op);
175 e.insert(new_op);
176 Ok(())
177 }
178 KeyOp::Insert(_) | KeyOp::Update(_) => {
179 let new_op = KeyOp::Insert(value);
180 let err = MemTableError::InconsistentOperation {
181 table_id: self.table_id,
182 key: e.key().clone(),
183 prev: e.get().clone(),
184 new: new_op.clone(),
185 };
186
187 if sanity_check_enabled() {
188 Err(err.into())
189 } else {
190 tracing::error!(
191 error = %err.as_report(),
192 "double insert / insert on updated, ignoring because sanity check is disabled"
193 );
194 self.kv_size.add_val(&new_op);
195 e.insert(new_op);
196 Ok(())
197 }
198 }
199 }
200 }
201 }
202 }
203
204 pub fn delete(&mut self, pk: TableKey<Bytes>, old_value: Bytes) -> Result<()> {
205 let key_len = std::mem::size_of::<Bytes>() + pk.len();
206 let OpConsistencyLevel::ConsistentOldValue {
207 check_old_value: value_checker,
208 ..
209 } = &self.op_consistency_level
210 else {
211 let op = KeyOp::Delete(old_value);
212 self.kv_size.add(&pk, &op);
213 let old_op = self.buffer.insert(pk, op);
214 self.sub_old_op_size(old_op, key_len);
215 return Ok(());
216 };
217 let entry = self.buffer.entry(pk);
218 match entry {
219 Entry::Vacant(e) => {
220 let op = KeyOp::Delete(old_value);
221 self.kv_size.add(e.key(), &op);
222 e.insert(op);
223 Ok(())
224 }
225 Entry::Occupied(mut e) => {
226 let old_op = e.get_mut();
227 self.kv_size.sub_val(old_op);
228
229 match old_op {
230 KeyOp::Insert(old_op_new_value) => {
231 if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
232 return Err(Box::new(MemTableError::InconsistentOperation {
233 table_id: self.table_id,
234 key: e.key().clone(),
235 prev: e.get().clone(),
236 new: KeyOp::Delete(old_value),
237 }));
238 }
239
240 self.kv_size.sub_size(key_len);
241 e.remove();
242 Ok(())
243 }
244 KeyOp::Delete(_) => {
245 let new_op = KeyOp::Delete(old_value);
246 let err = MemTableError::InconsistentOperation {
247 table_id: self.table_id,
248 key: e.key().clone(),
249 prev: e.get().clone(),
250 new: new_op.clone(),
251 };
252
253 if sanity_check_enabled() {
254 Err(err.into())
255 } else {
256 tracing::error!(
257 error = %err.as_report(),
258 "double delete, ignoring because sanity check is disabled"
259 );
260 self.kv_size.add_val(&new_op);
261 e.insert(new_op);
262 Ok(())
263 }
264 }
265 KeyOp::Update((old_op_old_value, old_op_new_value)) => {
266 if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
267 return Err(Box::new(MemTableError::InconsistentOperation {
268 table_id: self.table_id,
269 key: e.key().clone(),
270 prev: e.get().clone(),
271 new: KeyOp::Delete(old_value),
272 }));
273 }
274
275 let new_op = KeyOp::Delete(std::mem::take(old_op_old_value));
276 self.kv_size.add_val(&new_op);
277 e.insert(new_op);
278 Ok(())
279 }
280 }
281 }
282 }
283 }
284
285 pub fn update(
286 &mut self,
287 pk: TableKey<Bytes>,
288 old_value: Bytes,
289 new_value: Bytes,
290 ) -> Result<()> {
291 let OpConsistencyLevel::ConsistentOldValue {
292 check_old_value: value_checker,
293 ..
294 } = &self.op_consistency_level
295 else {
296 let key_len = std::mem::size_of::<Bytes>() + pk.len();
297 let op = KeyOp::Update((old_value, new_value));
298 self.kv_size.add(&pk, &op);
299 let old_op = self.buffer.insert(pk, op);
300 self.sub_old_op_size(old_op, key_len);
301 return Ok(());
302 };
303 let entry = self.buffer.entry(pk);
304 match entry {
305 Entry::Vacant(e) => {
306 let op = KeyOp::Update((old_value, new_value));
307 self.kv_size.add(e.key(), &op);
308 e.insert(op);
309 Ok(())
310 }
311 Entry::Occupied(mut e) => {
312 let old_op = e.get_mut();
313 self.kv_size.sub_val(old_op);
314
315 match old_op {
316 KeyOp::Insert(old_op_new_value) => {
317 if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
318 return Err(Box::new(MemTableError::InconsistentOperation {
319 table_id: self.table_id,
320 key: e.key().clone(),
321 prev: e.get().clone(),
322 new: KeyOp::Update((old_value, new_value)),
323 }));
324 }
325
326 let new_op = KeyOp::Insert(new_value);
327 self.kv_size.add_val(&new_op);
328 e.insert(new_op);
329 Ok(())
330 }
331 KeyOp::Update((old_op_old_value, old_op_new_value)) => {
332 if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
333 return Err(Box::new(MemTableError::InconsistentOperation {
334 table_id: self.table_id,
335 key: e.key().clone(),
336 prev: e.get().clone(),
337 new: KeyOp::Update((old_value, new_value)),
338 }));
339 }
340
341 let new_op = KeyOp::Update((std::mem::take(old_op_old_value), new_value));
342 self.kv_size.add_val(&new_op);
343 e.insert(new_op);
344 Ok(())
345 }
346 KeyOp::Delete(_) => {
347 let new_op = KeyOp::Update((old_value, new_value));
348 let err = MemTableError::InconsistentOperation {
349 table_id: self.table_id,
350 key: e.key().clone(),
351 prev: e.get().clone(),
352 new: new_op.clone(),
353 };
354
355 if sanity_check_enabled() {
356 Err(err.into())
357 } else {
358 tracing::error!(
359 error = %err.as_report(),
360 "update on deleted, ignoring because sanity check is disabled"
361 );
362 self.kv_size.add_val(&new_op);
363 e.insert(new_op);
364 Ok(())
365 }
366 }
367 }
368 }
369 }
370 }
371
372 pub fn into_parts(self) -> BTreeMap<TableKey<Bytes>, KeyOp> {
373 self.buffer
374 }
375
376 pub fn iter<'a, R>(
377 &'a self,
378 key_range: R,
379 ) -> impl Iterator<Item = (&'a TableKey<Bytes>, &'a KeyOp)>
380 where
381 R: RangeBounds<TableKey<Bytes>> + 'a,
382 {
383 self.buffer.range(key_range)
384 }
385
386 pub fn rev_iter<'a, R>(
387 &'a self,
388 key_range: R,
389 ) -> impl Iterator<Item = (&'a TableKey<Bytes>, &'a KeyOp)>
390 where
391 R: RangeBounds<TableKey<Bytes>> + 'a,
392 {
393 self.buffer.range(key_range).rev()
394 }
395
396 fn sub_old_op_size(&mut self, old_op: Option<KeyOp>, key_len: usize) {
397 if let Some(op) = old_op {
398 self.kv_size.sub_val(&op);
399 self.kv_size.sub_size(key_len);
400 }
401 }
402}
403
404impl KeyOp {
405 pub fn debug_fmt(&self, row_deserializer: &impl ValueRowSerde) -> String {
411 match self {
412 Self::Insert(after) => {
413 let after = row_deserializer.deserialize(after.as_ref());
414 format!("Insert({:?})", &after)
415 }
416 Self::Delete(before) => {
417 let before = row_deserializer.deserialize(before.as_ref());
418 format!("Delete({:?})", &before)
419 }
420 Self::Update((before, after)) => {
421 let after = row_deserializer.deserialize(after.as_ref());
422 let before = row_deserializer.deserialize(before.as_ref());
423 format!("Update({:?}, {:?})", &before, &after)
424 }
425 }
426 }
427}
428
429#[cfg(test)]
430mod tests {
431 use bytes::{BufMut, Bytes, BytesMut};
432 use itertools::Itertools;
433 use rand::seq::SliceRandom;
434 use rand::{Rng, rng as thread_rng};
435 use risingwave_common::catalog::TableId;
436 use risingwave_common::hash::VirtualNode;
437 use risingwave_common::util::epoch::{EpochExt, test_epoch};
438 use risingwave_hummock_sdk::EpochWithGap;
439 use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
440
441 use crate::hummock::iterator::HummockIterator;
442 use crate::hummock::value::HummockValue;
443 use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator, MemTableHummockRevIterator};
444 use crate::store::{CHECK_BYTES_EQUAL, OpConsistencyLevel};
445
446 #[tokio::test]
447 async fn test_mem_table_memory_size() {
448 let mut mem_table = MemTable::new(
449 233.into(),
450 OpConsistencyLevel::ConsistentOldValue {
451 check_old_value: CHECK_BYTES_EQUAL.clone(),
452 is_log_store: false,
453 },
454 );
455 assert_eq!(mem_table.kv_size.size(), 0);
456
457 mem_table
458 .insert(TableKey("key1".into()), "value1".into())
459 .unwrap();
460 assert_eq!(
461 mem_table.kv_size.size(),
462 std::mem::size_of::<Bytes>()
463 + Bytes::from("key1").len()
464 + std::mem::size_of::<KeyOp>()
465 + Bytes::from("value1").len()
466 );
467
468 mem_table.drain();
470 assert_eq!(mem_table.kv_size.size(), 0);
471 mem_table
472 .delete(TableKey("key2".into()), "value2".into())
473 .unwrap();
474 assert_eq!(
475 mem_table.kv_size.size(),
476 std::mem::size_of::<Bytes>()
477 + Bytes::from("key2").len()
478 + std::mem::size_of::<KeyOp>()
479 + Bytes::from("value2").len()
480 );
481 mem_table
482 .insert(TableKey("key2".into()), "value22".into())
483 .unwrap();
484 assert_eq!(
485 mem_table.kv_size.size(),
486 std::mem::size_of::<Bytes>()
487 + Bytes::from("key2").len()
488 + std::mem::size_of::<KeyOp>()
489 + Bytes::from("value22").len()
490 + Bytes::from("value2").len()
491 );
492
493 mem_table
494 .delete(TableKey("key2".into()), "value22".into())
495 .unwrap();
496
497 assert_eq!(
498 mem_table.kv_size.size(),
499 std::mem::size_of::<Bytes>()
500 + Bytes::from("key2").len()
501 + std::mem::size_of::<KeyOp>()
502 + Bytes::from("value2").len()
503 );
504
505 mem_table.drain();
507 assert_eq!(mem_table.kv_size.size(), 0);
508 mem_table
509 .insert(TableKey("key3".into()), "value3".into())
510 .unwrap();
511 assert_eq!(
512 mem_table.kv_size.size(),
513 std::mem::size_of::<Bytes>()
514 + Bytes::from("key3").len()
515 + std::mem::size_of::<KeyOp>()
516 + Bytes::from("value3").len()
517 );
518
519 mem_table
521 .update(TableKey("key3".into()), "value3".into(), "value333".into())
522 .unwrap();
523 assert_eq!(
524 mem_table.kv_size.size(),
525 std::mem::size_of::<Bytes>()
526 + Bytes::from("key3").len()
527 + std::mem::size_of::<KeyOp>()
528 + Bytes::from("value333").len()
529 );
530
531 mem_table.drain();
532 mem_table
533 .update(TableKey("key4".into()), "value4".into(), "value44".into())
534 .unwrap();
535
536 assert_eq!(
537 mem_table.kv_size.size(),
538 std::mem::size_of::<Bytes>()
539 + Bytes::from("key4").len()
540 + std::mem::size_of::<KeyOp>()
541 + Bytes::from("value4").len()
542 + Bytes::from("value44").len()
543 );
544 mem_table
545 .update(
546 TableKey("key4".into()),
547 "value44".into(),
548 "value4444".into(),
549 )
550 .unwrap();
551
552 assert_eq!(
553 mem_table.kv_size.size(),
554 std::mem::size_of::<Bytes>()
555 + Bytes::from("key4").len()
556 + std::mem::size_of::<KeyOp>()
557 + Bytes::from("value4").len()
558 + Bytes::from("value4444").len()
559 );
560 }
561
562 #[tokio::test]
563 async fn test_mem_table_memory_size_not_consistent_op() {
564 let mut mem_table = MemTable::new(233.into(), OpConsistencyLevel::Inconsistent);
565 assert_eq!(mem_table.kv_size.size(), 0);
566
567 mem_table
568 .insert(TableKey("key1".into()), "value1".into())
569 .unwrap();
570
571 assert_eq!(
572 mem_table.kv_size.size(),
573 std::mem::size_of::<Bytes>()
574 + Bytes::from("key1").len()
575 + std::mem::size_of::<KeyOp>()
576 + Bytes::from("value1").len()
577 );
578
579 mem_table
580 .insert(TableKey("key1".into()), "value111".into())
581 .unwrap();
582 assert_eq!(
583 mem_table.kv_size.size(),
584 std::mem::size_of::<Bytes>()
585 + Bytes::from("key1").len()
586 + std::mem::size_of::<KeyOp>()
587 + Bytes::from("value111").len()
588 );
589 mem_table.drain();
590
591 mem_table
592 .update(TableKey("key4".into()), "value4".into(), "value44".into())
593 .unwrap();
594
595 assert_eq!(
596 mem_table.kv_size.size(),
597 std::mem::size_of::<Bytes>()
598 + Bytes::from("key4").len()
599 + std::mem::size_of::<KeyOp>()
600 + Bytes::from("value4").len()
601 + Bytes::from("value44").len()
602 );
603 mem_table
604 .update(
605 TableKey("key4".into()),
606 "value44".into(),
607 "value4444".into(),
608 )
609 .unwrap();
610
611 assert_eq!(
612 mem_table.kv_size.size(),
613 std::mem::size_of::<Bytes>()
614 + Bytes::from("key4").len()
615 + std::mem::size_of::<KeyOp>()
616 + Bytes::from("value44").len()
617 + Bytes::from("value4444").len()
618 );
619 }
620
621 #[tokio::test]
622 async fn test_mem_table_hummock_iterator() {
623 let mut rng = thread_rng();
624
625 fn get_key(i: usize) -> TableKey<Bytes> {
626 let mut bytes = BytesMut::new();
627 bytes.put(&VirtualNode::ZERO.to_be_bytes()[..]);
628 bytes.put(format!("key_{:20}", i).as_bytes());
629 TableKey(bytes.freeze())
630 }
631
632 let mut ordered_test_data = (0..10000)
633 .map(|i| {
634 let key_op = match rng.random_range(0..=2) {
635 0 => KeyOp::Insert(Bytes::from("insert")),
636 1 => KeyOp::Delete(Bytes::from("delete")),
637 2 => KeyOp::Update((Bytes::from("old_value"), Bytes::from("new_value"))),
638 _ => unreachable!(),
639 };
640 (get_key(i), key_op)
641 })
642 .collect_vec();
643
644 let mut test_data = ordered_test_data.clone();
645
646 test_data.shuffle(&mut rng);
647 let mut mem_table = MemTable::new(
648 233.into(),
649 OpConsistencyLevel::ConsistentOldValue {
650 check_old_value: CHECK_BYTES_EQUAL.clone(),
651 is_log_store: false,
652 },
653 );
654 for (key, op) in test_data {
655 match op {
656 KeyOp::Insert(value) => {
657 mem_table.insert(key, value).unwrap();
658 }
659 KeyOp::Delete(value) => mem_table.delete(key, value).unwrap(),
660 KeyOp::Update((old_value, new_value)) => {
661 mem_table.update(key, old_value, new_value).unwrap();
662 }
663 }
664 }
665
666 const TEST_TABLE_ID: TableId = TableId::new(233);
667 const TEST_EPOCH: u64 = test_epoch(10);
668
669 async fn check_data<I: HummockIterator>(
670 iter: &mut I,
671 test_data: &[(TableKey<Bytes>, KeyOp)],
672 ) {
673 let mut idx = 0;
674 while iter.is_valid() {
675 let key = iter.key();
676 let value = iter.value();
677
678 let (expected_key, expected_value) = test_data[idx].clone();
679 assert_eq!(key.epoch_with_gap, EpochWithGap::new_from_epoch(TEST_EPOCH));
680 assert_eq!(key.user_key.table_id, TEST_TABLE_ID);
681 assert_eq!(
682 key.user_key.table_key.0,
683 expected_key.0.as_ref(),
684 "failed at {}, {:?} != {:?}",
685 idx,
686 String::from_utf8(key.user_key.table_key.key_part().to_vec()).unwrap(),
687 String::from_utf8(expected_key.key_part().to_vec()).unwrap(),
688 );
689 match expected_value {
690 KeyOp::Insert(expected_value) | KeyOp::Update((_, expected_value)) => {
691 assert_eq!(value, HummockValue::Put(expected_value.as_ref()));
692 }
693 KeyOp::Delete(_) => {
694 assert_eq!(value, HummockValue::Delete);
695 }
696 }
697
698 idx += 1;
699 iter.next().await.unwrap();
700 }
701 assert_eq!(idx, test_data.len());
702 }
703
704 let mut iter = MemTableHummockIterator::new(
705 &mem_table.buffer,
706 EpochWithGap::new_from_epoch(TEST_EPOCH),
707 TEST_TABLE_ID,
708 );
709
710 iter.rewind().await.unwrap();
712 check_data(&mut iter, &ordered_test_data).await;
713
714 let later_epoch = EpochWithGap::new_from_epoch(TEST_EPOCH.next_epoch());
716 let seek_idx = 500;
717 iter.seek(FullKey {
718 user_key: UserKey {
719 table_id: TEST_TABLE_ID,
720 table_key: TableKey(&get_key(seek_idx)),
721 },
722 epoch_with_gap: later_epoch,
723 })
724 .await
725 .unwrap();
726 check_data(&mut iter, &ordered_test_data[seek_idx..]).await;
727
728 let early_epoch = EpochWithGap::new_from_epoch(TEST_EPOCH.prev_epoch());
730 let seek_idx = 500;
731 iter.seek(FullKey {
732 user_key: UserKey {
733 table_id: TEST_TABLE_ID,
734 table_key: TableKey(&get_key(seek_idx)),
735 },
736 epoch_with_gap: early_epoch,
737 })
738 .await
739 .unwrap();
740 check_data(&mut iter, &ordered_test_data[seek_idx + 1..]).await;
741
742 iter.seek(FullKey {
744 user_key: UserKey {
745 table_id: TEST_TABLE_ID,
746 table_key: TableKey(&get_key(ordered_test_data.len() + 10)),
747 },
748 epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
749 })
750 .await
751 .unwrap();
752 check_data(&mut iter, &[]).await;
753
754 let smaller_table_id = TableId::new(TEST_TABLE_ID.table_id() - 1);
756 iter.seek(FullKey {
757 user_key: UserKey {
758 table_id: smaller_table_id,
759 table_key: TableKey(&get_key(ordered_test_data.len() + 10)),
760 },
761 epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
762 })
763 .await
764 .unwrap();
765 check_data(&mut iter, &ordered_test_data).await;
766
767 let greater_table_id = TableId::new(TEST_TABLE_ID.table_id() + 1);
769 iter.seek(FullKey {
770 user_key: UserKey {
771 table_id: greater_table_id,
772 table_key: TableKey(&get_key(0)),
773 },
774 epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
775 })
776 .await
777 .unwrap();
778 check_data(&mut iter, &[]).await;
779
780 ordered_test_data.reverse();
782 drop(iter);
783 let mut iter = MemTableHummockRevIterator::new(
784 &mem_table.buffer,
785 EpochWithGap::new_from_epoch(TEST_EPOCH),
786 TEST_TABLE_ID,
787 );
788
789 iter.rewind().await.unwrap();
791 check_data(&mut iter, &ordered_test_data).await;
792
793 let smaller_table_id = TableId::new(TEST_TABLE_ID.table_id() - 1);
795 iter.seek(FullKey {
796 user_key: UserKey {
797 table_id: smaller_table_id,
798 table_key: TableKey(&get_key(ordered_test_data.len() + 10)),
799 },
800 epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
801 })
802 .await
803 .unwrap();
804 check_data(&mut iter, &[]).await;
805
806 let greater_table_id = TableId::new(TEST_TABLE_ID.table_id() + 1);
808 iter.seek(FullKey {
809 user_key: UserKey {
810 table_id: greater_table_id,
811 table_key: TableKey(&get_key(0)),
812 },
813 epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
814 })
815 .await
816 .unwrap();
817 check_data(&mut iter, &ordered_test_data).await;
818
819 let later_epoch = EpochWithGap::new_from_epoch(TEST_EPOCH.next_epoch());
821 let seek_idx = 500;
822 iter.seek(FullKey {
823 user_key: UserKey {
824 table_id: TEST_TABLE_ID,
825 table_key: TableKey(&get_key(seek_idx)),
826 },
827 epoch_with_gap: later_epoch,
828 })
829 .await
830 .unwrap();
831 let rev_seek_idx = ordered_test_data.len() - seek_idx - 1;
832 check_data(&mut iter, &ordered_test_data[rev_seek_idx + 1..]).await;
833
834 let early_epoch = EpochWithGap::new_from_epoch(TEST_EPOCH.prev_epoch());
836 let seek_idx = 500;
837 iter.seek(FullKey {
838 user_key: UserKey {
839 table_id: TEST_TABLE_ID,
840 table_key: TableKey(&get_key(seek_idx)),
841 },
842 epoch_with_gap: early_epoch,
843 })
844 .await
845 .unwrap();
846 let rev_seek_idx = ordered_test_data.len() - seek_idx - 1;
847 check_data(&mut iter, &ordered_test_data[rev_seek_idx..]).await;
848
849 drop(iter);
850 mem_table.insert(get_key(10001), "value1".into()).unwrap();
851
852 let mut iter = MemTableHummockRevIterator::new(
853 &mem_table.buffer,
854 EpochWithGap::new_from_epoch(TEST_EPOCH),
855 TEST_TABLE_ID,
856 );
857 iter.seek(FullKey {
858 user_key: UserKey {
859 table_id: TEST_TABLE_ID,
860 table_key: TableKey(&get_key(10000)),
861 },
862 epoch_with_gap: early_epoch,
863 })
864 .await
865 .unwrap();
866 assert_eq!(iter.key().user_key.table_key, get_key(9999).to_ref());
867
868 let mut iter = MemTableHummockIterator::new(
869 &mem_table.buffer,
870 EpochWithGap::new_from_epoch(TEST_EPOCH),
871 TEST_TABLE_ID,
872 );
873 iter.seek(FullKey {
874 user_key: UserKey {
875 table_id: TEST_TABLE_ID,
876 table_key: TableKey(&get_key(10000)),
877 },
878 epoch_with_gap: later_epoch,
879 })
880 .await
881 .unwrap();
882 assert_eq!(iter.key().user_key.table_key, get_key(10001).to_ref());
883 }
884}