1use std::collections::BTreeMap;
16use std::collections::btree_map::Entry;
17use std::ops::Bound::{Included, Unbounded};
18use std::ops::RangeBounds;
19
20use bytes::Bytes;
21use risingwave_common::catalog::TableId;
22use risingwave_common_estimate_size::{EstimateSize, KvSize};
23use risingwave_hummock_sdk::key::TableKey;
24use thiserror::Error;
25use thiserror_ext::AsReport;
26use tracing::error;
27
28use crate::hummock::iterator::{Backward, Forward, FromRustIterator, RustIteratorBuilder};
29use crate::hummock::shared_buffer::shared_buffer_batch::{SharedBufferBatch, SharedBufferBatchId};
30use crate::hummock::utils::sanity_check_enabled;
31use crate::hummock::value::HummockValue;
32use crate::row_serde::value_serde::ValueRowSerde;
33use crate::store::*;
34pub type ImmutableMemtable = SharedBufferBatch;
35
36pub type ImmId = SharedBufferBatchId;
37
38#[derive(Clone, Debug, EstimateSize)]
39pub enum KeyOp {
40 Insert(Bytes),
41 Delete(Bytes),
42 Update((Bytes, Bytes)),
44}
45
46#[derive(Clone)]
48pub struct MemTable {
49 table_id: TableId,
50 pub(crate) buffer: MemTableStore,
51 pub(crate) op_consistency_level: OpConsistencyLevel,
52 pub(crate) kv_size: KvSize,
53}
54
55#[derive(Error, Debug)]
56pub enum MemTableError {
57 #[error("Inconsistent operation on table {table_id} {key:?}, prev: {prev:?}, new: {new:?}")]
58 InconsistentOperation {
59 table_id: TableId,
60 key: TableKey<Bytes>,
61 prev: KeyOp,
62 new: KeyOp,
63 },
64}
65
66type Result<T> = std::result::Result<T, Box<MemTableError>>;
67
68pub type MemTableStore = BTreeMap<TableKey<Bytes>, KeyOp>;
69pub struct MemTableIteratorBuilder;
70pub struct MemTableRevIteratorBuilder;
71
72fn map_to_hummock_value<'a>(
73 (key, op): (&'a TableKey<Bytes>, &'a KeyOp),
74) -> (TableKey<&'a [u8]>, HummockValue<&'a [u8]>) {
75 (
76 TableKey(key.0.as_ref()),
77 match op {
78 KeyOp::Insert(value) | KeyOp::Update((_, value)) => HummockValue::Put(value),
79 KeyOp::Delete(_) => HummockValue::Delete,
80 },
81 )
82}
83
84impl RustIteratorBuilder for MemTableRevIteratorBuilder {
85 type Direction = Backward;
86 type Iterable = MemTableStore;
87
88 type RewindIter<'a> =
89 impl Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)> + Send + 'a;
90 type SeekIter<'a> =
91 impl Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)> + Send + 'a;
92
93 fn seek<'a>(iterable: &'a Self::Iterable, seek_key: TableKey<&[u8]>) -> Self::SeekIter<'a> {
94 iterable
95 .range::<[u8], _>((Unbounded, Included(seek_key.0)))
96 .rev()
97 .map(map_to_hummock_value)
98 }
99
100 fn rewind(iterable: &Self::Iterable) -> Self::RewindIter<'_> {
101 iterable.iter().rev().map(map_to_hummock_value)
102 }
103}
104
105impl RustIteratorBuilder for MemTableIteratorBuilder {
106 type Direction = Forward;
107 type Iterable = MemTableStore;
108
109 type RewindIter<'a> =
110 impl Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)> + Send + 'a;
111 type SeekIter<'a> =
112 impl Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)> + Send + 'a;
113
114 fn seek<'a>(iterable: &'a Self::Iterable, seek_key: TableKey<&[u8]>) -> Self::SeekIter<'a> {
115 iterable
116 .range::<[u8], _>((Included(seek_key.0), Unbounded))
117 .map(map_to_hummock_value)
118 }
119
120 fn rewind(iterable: &Self::Iterable) -> Self::RewindIter<'_> {
121 iterable.iter().map(map_to_hummock_value)
122 }
123}
124
125pub type MemTableHummockIterator<'a> = FromRustIterator<'a, MemTableIteratorBuilder>;
126pub type MemTableHummockRevIterator<'a> = FromRustIterator<'a, MemTableRevIteratorBuilder>;
127
128impl MemTable {
129 pub fn new(table_id: TableId, op_consistency_level: OpConsistencyLevel) -> Self {
130 Self {
131 table_id,
132 buffer: BTreeMap::new(),
133 op_consistency_level,
134 kv_size: KvSize::new(),
135 }
136 }
137
138 pub fn drain(&mut self) -> Self {
139 self.kv_size.set(0);
140 std::mem::replace(
141 self,
142 Self::new(self.table_id, self.op_consistency_level.clone()),
143 )
144 }
145
146 pub fn is_dirty(&self) -> bool {
147 !self.buffer.is_empty()
148 }
149
150 pub fn insert(&mut self, pk: TableKey<Bytes>, value: Bytes) -> Result<()> {
152 if let OpConsistencyLevel::Inconsistent = &self.op_consistency_level {
153 let key_len = std::mem::size_of::<Bytes>() + pk.len();
154 let op = KeyOp::Insert(value);
155 self.kv_size.add(&pk, &op);
156 let old_op = self.buffer.insert(pk, op);
157 self.sub_old_op_size(old_op, key_len);
158 return Ok(());
159 };
160 let entry = self.buffer.entry(pk);
161 match entry {
162 Entry::Vacant(e) => {
163 let op = KeyOp::Insert(value);
164 self.kv_size.add(e.key(), &op);
165 e.insert(op);
166 Ok(())
167 }
168 Entry::Occupied(mut e) => {
169 let old_op = e.get_mut();
170 self.kv_size.sub_val(old_op);
171
172 match old_op {
173 KeyOp::Delete(old_op_old_value) => {
174 let new_op = KeyOp::Update((std::mem::take(old_op_old_value), value));
175 self.kv_size.add_val(&new_op);
176 e.insert(new_op);
177 Ok(())
178 }
179 KeyOp::Insert(_) | KeyOp::Update(_) => {
180 let new_op = KeyOp::Insert(value);
181 let err = MemTableError::InconsistentOperation {
182 table_id: self.table_id,
183 key: e.key().clone(),
184 prev: e.get().clone(),
185 new: new_op.clone(),
186 };
187
188 if sanity_check_enabled() {
189 Err(err.into())
190 } else {
191 tracing::error!(
192 error = %err.as_report(),
193 "double insert / insert on updated, ignoring because sanity check is disabled"
194 );
195 self.kv_size.add_val(&new_op);
196 e.insert(new_op);
197 Ok(())
198 }
199 }
200 }
201 }
202 }
203 }
204
205 pub fn delete(&mut self, pk: TableKey<Bytes>, old_value: Bytes) -> Result<()> {
206 let key_len = std::mem::size_of::<Bytes>() + pk.len();
207 let OpConsistencyLevel::ConsistentOldValue {
208 check_old_value: value_checker,
209 ..
210 } = &self.op_consistency_level
211 else {
212 let op = KeyOp::Delete(old_value);
213 self.kv_size.add(&pk, &op);
214 let old_op = self.buffer.insert(pk, op);
215 self.sub_old_op_size(old_op, key_len);
216 return Ok(());
217 };
218 let entry = self.buffer.entry(pk);
219 match entry {
220 Entry::Vacant(e) => {
221 let op = KeyOp::Delete(old_value);
222 self.kv_size.add(e.key(), &op);
223 e.insert(op);
224 Ok(())
225 }
226 Entry::Occupied(mut e) => {
227 let old_op = e.get_mut();
228 self.kv_size.sub_val(old_op);
229
230 match old_op {
231 KeyOp::Insert(old_op_new_value) => {
232 if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
233 return Err(Box::new(MemTableError::InconsistentOperation {
234 table_id: self.table_id,
235 key: e.key().clone(),
236 prev: e.get().clone(),
237 new: KeyOp::Delete(old_value),
238 }));
239 }
240
241 self.kv_size.sub_size(key_len);
242 e.remove();
243 Ok(())
244 }
245 KeyOp::Delete(_) => {
246 let new_op = KeyOp::Delete(old_value);
247 let err = MemTableError::InconsistentOperation {
248 table_id: self.table_id,
249 key: e.key().clone(),
250 prev: e.get().clone(),
251 new: new_op.clone(),
252 };
253
254 if sanity_check_enabled() {
255 Err(err.into())
256 } else {
257 tracing::error!(
258 error = %err.as_report(),
259 "double delete, ignoring because sanity check is disabled"
260 );
261 self.kv_size.add_val(&new_op);
262 e.insert(new_op);
263 Ok(())
264 }
265 }
266 KeyOp::Update((old_op_old_value, old_op_new_value)) => {
267 if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
268 return Err(Box::new(MemTableError::InconsistentOperation {
269 table_id: self.table_id,
270 key: e.key().clone(),
271 prev: e.get().clone(),
272 new: KeyOp::Delete(old_value),
273 }));
274 }
275
276 let new_op = KeyOp::Delete(std::mem::take(old_op_old_value));
277 self.kv_size.add_val(&new_op);
278 e.insert(new_op);
279 Ok(())
280 }
281 }
282 }
283 }
284 }
285
286 pub fn update(
287 &mut self,
288 pk: TableKey<Bytes>,
289 old_value: Bytes,
290 new_value: Bytes,
291 ) -> Result<()> {
292 let OpConsistencyLevel::ConsistentOldValue {
293 check_old_value: value_checker,
294 ..
295 } = &self.op_consistency_level
296 else {
297 let key_len = std::mem::size_of::<Bytes>() + pk.len();
298 let op = KeyOp::Update((old_value, new_value));
299 self.kv_size.add(&pk, &op);
300 let old_op = self.buffer.insert(pk, op);
301 self.sub_old_op_size(old_op, key_len);
302 return Ok(());
303 };
304 let entry = self.buffer.entry(pk);
305 match entry {
306 Entry::Vacant(e) => {
307 let op = KeyOp::Update((old_value, new_value));
308 self.kv_size.add(e.key(), &op);
309 e.insert(op);
310 Ok(())
311 }
312 Entry::Occupied(mut e) => {
313 let old_op = e.get_mut();
314 self.kv_size.sub_val(old_op);
315
316 match old_op {
317 KeyOp::Insert(old_op_new_value) => {
318 if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
319 return Err(Box::new(MemTableError::InconsistentOperation {
320 table_id: self.table_id,
321 key: e.key().clone(),
322 prev: e.get().clone(),
323 new: KeyOp::Update((old_value, new_value)),
324 }));
325 }
326
327 let new_op = KeyOp::Insert(new_value);
328 self.kv_size.add_val(&new_op);
329 e.insert(new_op);
330 Ok(())
331 }
332 KeyOp::Update((old_op_old_value, old_op_new_value)) => {
333 if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
334 return Err(Box::new(MemTableError::InconsistentOperation {
335 table_id: self.table_id,
336 key: e.key().clone(),
337 prev: e.get().clone(),
338 new: KeyOp::Update((old_value, new_value)),
339 }));
340 }
341
342 let new_op = KeyOp::Update((std::mem::take(old_op_old_value), new_value));
343 self.kv_size.add_val(&new_op);
344 e.insert(new_op);
345 Ok(())
346 }
347 KeyOp::Delete(_) => {
348 let new_op = KeyOp::Update((old_value, new_value));
349 let err = MemTableError::InconsistentOperation {
350 table_id: self.table_id,
351 key: e.key().clone(),
352 prev: e.get().clone(),
353 new: new_op.clone(),
354 };
355
356 if sanity_check_enabled() {
357 Err(err.into())
358 } else {
359 tracing::error!(
360 error = %err.as_report(),
361 "update on deleted, ignoring because sanity check is disabled"
362 );
363 self.kv_size.add_val(&new_op);
364 e.insert(new_op);
365 Ok(())
366 }
367 }
368 }
369 }
370 }
371 }
372
373 pub fn into_parts(self) -> BTreeMap<TableKey<Bytes>, KeyOp> {
374 self.buffer
375 }
376
377 pub fn iter<'a, R>(
378 &'a self,
379 key_range: R,
380 ) -> impl Iterator<Item = (&'a TableKey<Bytes>, &'a KeyOp)>
381 where
382 R: RangeBounds<TableKey<Bytes>> + 'a,
383 {
384 self.buffer.range(key_range)
385 }
386
387 pub fn rev_iter<'a, R>(
388 &'a self,
389 key_range: R,
390 ) -> impl Iterator<Item = (&'a TableKey<Bytes>, &'a KeyOp)>
391 where
392 R: RangeBounds<TableKey<Bytes>> + 'a,
393 {
394 self.buffer.range(key_range).rev()
395 }
396
397 fn sub_old_op_size(&mut self, old_op: Option<KeyOp>, key_len: usize) {
398 if let Some(op) = old_op {
399 self.kv_size.sub_val(&op);
400 self.kv_size.sub_size(key_len);
401 }
402 }
403}
404
405impl KeyOp {
406 pub fn debug_fmt(&self, row_deserializer: &impl ValueRowSerde) -> String {
412 match self {
413 Self::Insert(after) => {
414 let after = row_deserializer.deserialize(after.as_ref());
415 format!("Insert({:?})", &after)
416 }
417 Self::Delete(before) => {
418 let before = row_deserializer.deserialize(before.as_ref());
419 format!("Delete({:?})", &before)
420 }
421 Self::Update((before, after)) => {
422 let after = row_deserializer.deserialize(after.as_ref());
423 let before = row_deserializer.deserialize(before.as_ref());
424 format!("Update({:?}, {:?})", &before, &after)
425 }
426 }
427 }
428}
429
430#[cfg(test)]
431mod tests {
432 use bytes::{BufMut, Bytes, BytesMut};
433 use itertools::Itertools;
434 use rand::seq::SliceRandom;
435 use rand::{Rng, rng as thread_rng};
436 use risingwave_common::catalog::TableId;
437 use risingwave_common::hash::VirtualNode;
438 use risingwave_common::util::epoch::{EpochExt, test_epoch};
439 use risingwave_hummock_sdk::EpochWithGap;
440 use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
441
442 use crate::hummock::iterator::HummockIterator;
443 use crate::hummock::value::HummockValue;
444 use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator, MemTableHummockRevIterator};
445 use crate::store::{CHECK_BYTES_EQUAL, OpConsistencyLevel};
446
447 #[tokio::test]
448 async fn test_mem_table_memory_size() {
449 let mut mem_table = MemTable::new(
450 233.into(),
451 OpConsistencyLevel::ConsistentOldValue {
452 check_old_value: CHECK_BYTES_EQUAL.clone(),
453 is_log_store: false,
454 },
455 );
456 assert_eq!(mem_table.kv_size.size(), 0);
457
458 mem_table
459 .insert(TableKey("key1".into()), "value1".into())
460 .unwrap();
461 assert_eq!(
462 mem_table.kv_size.size(),
463 std::mem::size_of::<Bytes>()
464 + Bytes::from("key1").len()
465 + std::mem::size_of::<KeyOp>()
466 + Bytes::from("value1").len()
467 );
468
469 mem_table.drain();
471 assert_eq!(mem_table.kv_size.size(), 0);
472 mem_table
473 .delete(TableKey("key2".into()), "value2".into())
474 .unwrap();
475 assert_eq!(
476 mem_table.kv_size.size(),
477 std::mem::size_of::<Bytes>()
478 + Bytes::from("key2").len()
479 + std::mem::size_of::<KeyOp>()
480 + Bytes::from("value2").len()
481 );
482 mem_table
483 .insert(TableKey("key2".into()), "value22".into())
484 .unwrap();
485 assert_eq!(
486 mem_table.kv_size.size(),
487 std::mem::size_of::<Bytes>()
488 + Bytes::from("key2").len()
489 + std::mem::size_of::<KeyOp>()
490 + Bytes::from("value22").len()
491 + Bytes::from("value2").len()
492 );
493
494 mem_table
495 .delete(TableKey("key2".into()), "value22".into())
496 .unwrap();
497
498 assert_eq!(
499 mem_table.kv_size.size(),
500 std::mem::size_of::<Bytes>()
501 + Bytes::from("key2").len()
502 + std::mem::size_of::<KeyOp>()
503 + Bytes::from("value2").len()
504 );
505
506 mem_table.drain();
508 assert_eq!(mem_table.kv_size.size(), 0);
509 mem_table
510 .insert(TableKey("key3".into()), "value3".into())
511 .unwrap();
512 assert_eq!(
513 mem_table.kv_size.size(),
514 std::mem::size_of::<Bytes>()
515 + Bytes::from("key3").len()
516 + std::mem::size_of::<KeyOp>()
517 + Bytes::from("value3").len()
518 );
519
520 mem_table
522 .update(TableKey("key3".into()), "value3".into(), "value333".into())
523 .unwrap();
524 assert_eq!(
525 mem_table.kv_size.size(),
526 std::mem::size_of::<Bytes>()
527 + Bytes::from("key3").len()
528 + std::mem::size_of::<KeyOp>()
529 + Bytes::from("value333").len()
530 );
531
532 mem_table.drain();
533 mem_table
534 .update(TableKey("key4".into()), "value4".into(), "value44".into())
535 .unwrap();
536
537 assert_eq!(
538 mem_table.kv_size.size(),
539 std::mem::size_of::<Bytes>()
540 + Bytes::from("key4").len()
541 + std::mem::size_of::<KeyOp>()
542 + Bytes::from("value4").len()
543 + Bytes::from("value44").len()
544 );
545 mem_table
546 .update(
547 TableKey("key4".into()),
548 "value44".into(),
549 "value4444".into(),
550 )
551 .unwrap();
552
553 assert_eq!(
554 mem_table.kv_size.size(),
555 std::mem::size_of::<Bytes>()
556 + Bytes::from("key4").len()
557 + std::mem::size_of::<KeyOp>()
558 + Bytes::from("value4").len()
559 + Bytes::from("value4444").len()
560 );
561 }
562
563 #[tokio::test]
564 async fn test_mem_table_memory_size_not_consistent_op() {
565 let mut mem_table = MemTable::new(233.into(), OpConsistencyLevel::Inconsistent);
566 assert_eq!(mem_table.kv_size.size(), 0);
567
568 mem_table
569 .insert(TableKey("key1".into()), "value1".into())
570 .unwrap();
571
572 assert_eq!(
573 mem_table.kv_size.size(),
574 std::mem::size_of::<Bytes>()
575 + Bytes::from("key1").len()
576 + std::mem::size_of::<KeyOp>()
577 + Bytes::from("value1").len()
578 );
579
580 mem_table
581 .insert(TableKey("key1".into()), "value111".into())
582 .unwrap();
583 assert_eq!(
584 mem_table.kv_size.size(),
585 std::mem::size_of::<Bytes>()
586 + Bytes::from("key1").len()
587 + std::mem::size_of::<KeyOp>()
588 + Bytes::from("value111").len()
589 );
590 mem_table.drain();
591
592 mem_table
593 .update(TableKey("key4".into()), "value4".into(), "value44".into())
594 .unwrap();
595
596 assert_eq!(
597 mem_table.kv_size.size(),
598 std::mem::size_of::<Bytes>()
599 + Bytes::from("key4").len()
600 + std::mem::size_of::<KeyOp>()
601 + Bytes::from("value4").len()
602 + Bytes::from("value44").len()
603 );
604 mem_table
605 .update(
606 TableKey("key4".into()),
607 "value44".into(),
608 "value4444".into(),
609 )
610 .unwrap();
611
612 assert_eq!(
613 mem_table.kv_size.size(),
614 std::mem::size_of::<Bytes>()
615 + Bytes::from("key4").len()
616 + std::mem::size_of::<KeyOp>()
617 + Bytes::from("value44").len()
618 + Bytes::from("value4444").len()
619 );
620 }
621
622 #[tokio::test]
623 async fn test_mem_table_hummock_iterator() {
624 let mut rng = thread_rng();
625
626 fn get_key(i: usize) -> TableKey<Bytes> {
627 let mut bytes = BytesMut::new();
628 bytes.put(&VirtualNode::ZERO.to_be_bytes()[..]);
629 bytes.put(format!("key_{:20}", i).as_bytes());
630 TableKey(bytes.freeze())
631 }
632
633 let mut ordered_test_data = (0..10000)
634 .map(|i| {
635 let key_op = match rng.random_range(0..=2) {
636 0 => KeyOp::Insert(Bytes::from("insert")),
637 1 => KeyOp::Delete(Bytes::from("delete")),
638 2 => KeyOp::Update((Bytes::from("old_value"), Bytes::from("new_value"))),
639 _ => unreachable!(),
640 };
641 (get_key(i), key_op)
642 })
643 .collect_vec();
644
645 let mut test_data = ordered_test_data.clone();
646
647 test_data.shuffle(&mut rng);
648 let mut mem_table = MemTable::new(
649 233.into(),
650 OpConsistencyLevel::ConsistentOldValue {
651 check_old_value: CHECK_BYTES_EQUAL.clone(),
652 is_log_store: false,
653 },
654 );
655 for (key, op) in test_data {
656 match op {
657 KeyOp::Insert(value) => {
658 mem_table.insert(key, value).unwrap();
659 }
660 KeyOp::Delete(value) => mem_table.delete(key, value).unwrap(),
661 KeyOp::Update((old_value, new_value)) => {
662 mem_table.update(key, old_value, new_value).unwrap();
663 }
664 }
665 }
666
667 const TEST_TABLE_ID: TableId = TableId::new(233);
668 const TEST_EPOCH: u64 = test_epoch(10);
669
670 async fn check_data<I: HummockIterator>(
671 iter: &mut I,
672 test_data: &[(TableKey<Bytes>, KeyOp)],
673 ) {
674 let mut idx = 0;
675 while iter.is_valid() {
676 let key = iter.key();
677 let value = iter.value();
678
679 let (expected_key, expected_value) = test_data[idx].clone();
680 assert_eq!(key.epoch_with_gap, EpochWithGap::new_from_epoch(TEST_EPOCH));
681 assert_eq!(key.user_key.table_id, TEST_TABLE_ID);
682 assert_eq!(
683 key.user_key.table_key.0,
684 expected_key.0.as_ref(),
685 "failed at {}, {:?} != {:?}",
686 idx,
687 String::from_utf8(key.user_key.table_key.key_part().to_vec()).unwrap(),
688 String::from_utf8(expected_key.key_part().to_vec()).unwrap(),
689 );
690 match expected_value {
691 KeyOp::Insert(expected_value) | KeyOp::Update((_, expected_value)) => {
692 assert_eq!(value, HummockValue::Put(expected_value.as_ref()));
693 }
694 KeyOp::Delete(_) => {
695 assert_eq!(value, HummockValue::Delete);
696 }
697 }
698
699 idx += 1;
700 iter.next().await.unwrap();
701 }
702 assert_eq!(idx, test_data.len());
703 }
704
705 let mut iter = MemTableHummockIterator::new(
706 &mem_table.buffer,
707 EpochWithGap::new_from_epoch(TEST_EPOCH),
708 TEST_TABLE_ID,
709 );
710
711 iter.rewind().await.unwrap();
713 check_data(&mut iter, &ordered_test_data).await;
714
715 let later_epoch = EpochWithGap::new_from_epoch(TEST_EPOCH.next_epoch());
717 let seek_idx = 500;
718 iter.seek(FullKey {
719 user_key: UserKey {
720 table_id: TEST_TABLE_ID,
721 table_key: TableKey(&get_key(seek_idx)),
722 },
723 epoch_with_gap: later_epoch,
724 })
725 .await
726 .unwrap();
727 check_data(&mut iter, &ordered_test_data[seek_idx..]).await;
728
729 let early_epoch = EpochWithGap::new_from_epoch(TEST_EPOCH.prev_epoch());
731 let seek_idx = 500;
732 iter.seek(FullKey {
733 user_key: UserKey {
734 table_id: TEST_TABLE_ID,
735 table_key: TableKey(&get_key(seek_idx)),
736 },
737 epoch_with_gap: early_epoch,
738 })
739 .await
740 .unwrap();
741 check_data(&mut iter, &ordered_test_data[seek_idx + 1..]).await;
742
743 iter.seek(FullKey {
745 user_key: UserKey {
746 table_id: TEST_TABLE_ID,
747 table_key: TableKey(&get_key(ordered_test_data.len() + 10)),
748 },
749 epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
750 })
751 .await
752 .unwrap();
753 check_data(&mut iter, &[]).await;
754
755 let smaller_table_id = TableId::new(TEST_TABLE_ID.table_id() - 1);
757 iter.seek(FullKey {
758 user_key: UserKey {
759 table_id: smaller_table_id,
760 table_key: TableKey(&get_key(ordered_test_data.len() + 10)),
761 },
762 epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
763 })
764 .await
765 .unwrap();
766 check_data(&mut iter, &ordered_test_data).await;
767
768 let greater_table_id = TableId::new(TEST_TABLE_ID.table_id() + 1);
770 iter.seek(FullKey {
771 user_key: UserKey {
772 table_id: greater_table_id,
773 table_key: TableKey(&get_key(0)),
774 },
775 epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
776 })
777 .await
778 .unwrap();
779 check_data(&mut iter, &[]).await;
780
781 ordered_test_data.reverse();
783 drop(iter);
784 let mut iter = MemTableHummockRevIterator::new(
785 &mem_table.buffer,
786 EpochWithGap::new_from_epoch(TEST_EPOCH),
787 TEST_TABLE_ID,
788 );
789
790 iter.rewind().await.unwrap();
792 check_data(&mut iter, &ordered_test_data).await;
793
794 let smaller_table_id = TableId::new(TEST_TABLE_ID.table_id() - 1);
796 iter.seek(FullKey {
797 user_key: UserKey {
798 table_id: smaller_table_id,
799 table_key: TableKey(&get_key(ordered_test_data.len() + 10)),
800 },
801 epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
802 })
803 .await
804 .unwrap();
805 check_data(&mut iter, &[]).await;
806
807 let greater_table_id = TableId::new(TEST_TABLE_ID.table_id() + 1);
809 iter.seek(FullKey {
810 user_key: UserKey {
811 table_id: greater_table_id,
812 table_key: TableKey(&get_key(0)),
813 },
814 epoch_with_gap: EpochWithGap::new_from_epoch(TEST_EPOCH),
815 })
816 .await
817 .unwrap();
818 check_data(&mut iter, &ordered_test_data).await;
819
820 let later_epoch = EpochWithGap::new_from_epoch(TEST_EPOCH.next_epoch());
822 let seek_idx = 500;
823 iter.seek(FullKey {
824 user_key: UserKey {
825 table_id: TEST_TABLE_ID,
826 table_key: TableKey(&get_key(seek_idx)),
827 },
828 epoch_with_gap: later_epoch,
829 })
830 .await
831 .unwrap();
832 let rev_seek_idx = ordered_test_data.len() - seek_idx - 1;
833 check_data(&mut iter, &ordered_test_data[rev_seek_idx + 1..]).await;
834
835 let early_epoch = EpochWithGap::new_from_epoch(TEST_EPOCH.prev_epoch());
837 let seek_idx = 500;
838 iter.seek(FullKey {
839 user_key: UserKey {
840 table_id: TEST_TABLE_ID,
841 table_key: TableKey(&get_key(seek_idx)),
842 },
843 epoch_with_gap: early_epoch,
844 })
845 .await
846 .unwrap();
847 let rev_seek_idx = ordered_test_data.len() - seek_idx - 1;
848 check_data(&mut iter, &ordered_test_data[rev_seek_idx..]).await;
849
850 drop(iter);
851 mem_table.insert(get_key(10001), "value1".into()).unwrap();
852
853 let mut iter = MemTableHummockRevIterator::new(
854 &mem_table.buffer,
855 EpochWithGap::new_from_epoch(TEST_EPOCH),
856 TEST_TABLE_ID,
857 );
858 iter.seek(FullKey {
859 user_key: UserKey {
860 table_id: TEST_TABLE_ID,
861 table_key: TableKey(&get_key(10000)),
862 },
863 epoch_with_gap: early_epoch,
864 })
865 .await
866 .unwrap();
867 assert_eq!(iter.key().user_key.table_key, get_key(9999).to_ref());
868
869 let mut iter = MemTableHummockIterator::new(
870 &mem_table.buffer,
871 EpochWithGap::new_from_epoch(TEST_EPOCH),
872 TEST_TABLE_ID,
873 );
874 iter.seek(FullKey {
875 user_key: UserKey {
876 table_id: TEST_TABLE_ID,
877 table_key: TableKey(&get_key(10000)),
878 },
879 epoch_with_gap: later_epoch,
880 })
881 .await
882 .unwrap();
883 assert_eq!(iter.key().user_key.table_key, get_key(10001).to_ref());
884 }
885}