1use std::cmp::Ordering;
16use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::mem::size_of_val;
19use std::ops::Bound::Included;
20use std::ops::{Bound, RangeBounds};
21use std::sync::atomic::AtomicU64;
22use std::sync::atomic::Ordering::Relaxed;
23use std::sync::{Arc, LazyLock};
24
25use bytes::Bytes;
26use prometheus::IntGauge;
27use risingwave_common::catalog::TableId;
28use risingwave_hummock_sdk::EpochWithGap;
29use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange, UserKey};
30
31use crate::hummock::iterator::{
32 Backward, DirectionEnum, Forward, HummockIterator, HummockIteratorDirection, ValueMeta,
33};
34use crate::hummock::utils::{MemoryTracker, range_overlap};
35use crate::hummock::value::HummockValue;
36use crate::hummock::{HummockEpoch, HummockResult};
37use crate::mem_table::ImmId;
38use crate::store::ReadOptions;
39
40#[derive(Clone, Copy, Debug, PartialEq)]
41pub enum SharedBufferValue<T> {
42 Insert(T),
43 Update(T),
44 Delete,
45}
46
47impl<T> SharedBufferValue<T> {
48 fn to_ref(&self) -> SharedBufferValue<&T> {
49 match self {
50 SharedBufferValue::Insert(val) => SharedBufferValue::Insert(val),
51 SharedBufferValue::Update(val) => SharedBufferValue::Update(val),
52 SharedBufferValue::Delete => SharedBufferValue::Delete,
53 }
54 }
55}
56
57impl<T> From<SharedBufferValue<T>> for HummockValue<T> {
58 fn from(val: SharedBufferValue<T>) -> HummockValue<T> {
59 match val {
60 SharedBufferValue::Insert(val) | SharedBufferValue::Update(val) => {
61 HummockValue::Put(val)
62 }
63 SharedBufferValue::Delete => HummockValue::Delete,
64 }
65 }
66}
67
68impl<'a, T: AsRef<[u8]>> SharedBufferValue<&'a T> {
69 pub(crate) fn to_slice(self) -> SharedBufferValue<&'a [u8]> {
70 match self {
71 SharedBufferValue::Insert(val) => SharedBufferValue::Insert(val.as_ref()),
72 SharedBufferValue::Update(val) => SharedBufferValue::Update(val.as_ref()),
73 SharedBufferValue::Delete => SharedBufferValue::Delete,
74 }
75 }
76}
77
78pub(crate) type SharedBufferItem = (TableKey<Bytes>, SharedBufferValue<Bytes>);
80pub type SharedBufferBatchId = u64;
81
82pub(crate) type VersionedSharedBufferValue = (EpochWithGap, SharedBufferValue<Bytes>);
83
84pub(crate) struct SharedBufferVersionedEntryRef<'a> {
85 pub(crate) key: &'a TableKey<Bytes>,
86 pub(crate) new_values: &'a [VersionedSharedBufferValue],
87 pub(crate) old_values: Option<&'a [Bytes]>,
88}
89
90#[derive(PartialEq, Debug)]
91pub(crate) struct SharedBufferKeyEntry {
92 pub(crate) key: TableKey<Bytes>,
93 pub(crate) value_offset: usize,
98}
99
100impl SharedBufferKeyEntry {
101 fn value_end_offset<'a, T>(
103 i: usize,
104 entries: &'a [SharedBufferKeyEntry],
105 values: &'a [T],
106 ) -> usize {
107 entries
108 .get(i + 1)
109 .map(|entry| entry.value_offset)
110 .unwrap_or(values.len())
111 }
112
113 fn values<'a, T>(i: usize, entries: &'a [SharedBufferKeyEntry], values: &'a [T]) -> &'a [T] {
114 &values[entries[i].value_offset..Self::value_end_offset(i, entries, values)]
115 }
116}
117
118#[derive(Debug)]
119pub(crate) struct SharedBufferBatchOldValues {
120 values: Vec<Bytes>,
123 pub size: usize,
124 pub global_old_value_size: IntGauge,
125}
126
127impl Drop for SharedBufferBatchOldValues {
128 fn drop(&mut self) {
129 self.global_old_value_size.sub(self.size as _);
130 }
131}
132
133impl SharedBufferBatchOldValues {
134 pub(crate) fn new(values: Vec<Bytes>, size: usize, global_old_value_size: IntGauge) -> Self {
135 global_old_value_size.add(size as _);
136 Self {
137 values,
138 size,
139 global_old_value_size,
140 }
141 }
142
143 pub(crate) fn for_test(values: Vec<Bytes>, size: usize) -> Self {
144 Self::new(values, size, IntGauge::new("test", "test").unwrap())
145 }
146}
147
148#[derive(Debug)]
149pub(crate) struct SharedBufferBatchInner {
150 entries: Vec<SharedBufferKeyEntry>,
151 new_values: Vec<VersionedSharedBufferValue>,
152 old_values: Option<SharedBufferBatchOldValues>,
153 epochs: Vec<HummockEpoch>,
155 size: usize,
157 _tracker: Option<MemoryTracker>,
158 batch_id: SharedBufferBatchId,
161}
162
163impl SharedBufferBatchInner {
164 pub(crate) fn new(
165 epoch: HummockEpoch,
166 spill_offset: u16,
167 payload: Vec<SharedBufferItem>,
168 old_values: Option<SharedBufferBatchOldValues>,
169 size: usize,
170 _tracker: Option<MemoryTracker>,
171 ) -> Self {
172 assert!(!payload.is_empty());
173 debug_assert!(payload.iter().is_sorted_by_key(|(key, _)| key));
174 if let Some(old_values) = &old_values {
175 assert_eq!(old_values.values.len(), payload.len());
176 }
177
178 let epoch_with_gap = EpochWithGap::new(epoch, spill_offset);
179 let mut entries = Vec::with_capacity(payload.len());
180 let mut new_values = Vec::with_capacity(payload.len());
181 for (i, (key, value)) in payload.into_iter().enumerate() {
182 entries.push(SharedBufferKeyEntry {
183 key,
184 value_offset: i,
185 });
186 new_values.push((epoch_with_gap, value));
187 }
188
189 let batch_id = SHARED_BUFFER_BATCH_ID_GENERATOR.fetch_add(1, Relaxed);
190 SharedBufferBatchInner {
191 entries,
192 new_values,
193 old_values,
194 epochs: vec![epoch],
195 size,
196 _tracker,
197 batch_id,
198 }
199 }
200
201 pub fn values(&self, i: usize) -> &[VersionedSharedBufferValue] {
202 SharedBufferKeyEntry::values(i, &self.entries, &self.new_values)
203 }
204
205 #[allow(clippy::too_many_arguments)]
206 pub(crate) fn new_with_multi_epoch_batches(
207 epochs: Vec<HummockEpoch>,
208 entries: Vec<SharedBufferKeyEntry>,
209 new_values: Vec<VersionedSharedBufferValue>,
210 old_values: Option<SharedBufferBatchOldValues>,
211 size: usize,
212 imm_id: ImmId,
213 tracker: Option<MemoryTracker>,
214 ) -> Self {
215 assert!(new_values.len() >= entries.len());
216 assert!(!entries.is_empty());
217 debug_assert!(entries.iter().is_sorted_by_key(|entry| &entry.key));
218 debug_assert!(entries.iter().is_sorted_by_key(|entry| &entry.value_offset));
219 debug_assert!((0..entries.len()).all(|i| {
220 SharedBufferKeyEntry::values(i, &entries, &new_values)
221 .iter()
222 .rev()
223 .is_sorted_by_key(|(epoch_with_gap, _)| epoch_with_gap)
224 }));
225 debug_assert!(!epochs.is_empty());
226 debug_assert!(epochs.is_sorted());
227
228 Self {
229 entries,
230 new_values,
231 old_values,
232 epochs,
233 size,
234 _tracker: tracker,
235 batch_id: imm_id,
236 }
237 }
238
239 fn get_value<'a>(
242 &'a self,
243 table_key: TableKey<&[u8]>,
244 read_epoch: HummockEpoch,
245 ) -> Option<(HummockValue<&'a Bytes>, EpochWithGap)> {
246 if let Ok(i) = self
248 .entries
249 .binary_search_by(|m| (m.key.as_ref()).cmp(*table_key))
250 {
251 let entry = &self.entries[i];
252 assert_eq!(entry.key.as_ref(), *table_key);
253 for (e, v) in self.values(i) {
255 if read_epoch < e.pure_epoch() {
257 continue;
258 }
259 return Some((v.to_ref().into(), *e));
260 }
261 }
263
264 None
265 }
266}
267
268impl PartialEq for SharedBufferBatchInner {
269 fn eq(&self, other: &Self) -> bool {
270 self.entries == other.entries && self.new_values == other.new_values
271 }
272}
273
274pub static SHARED_BUFFER_BATCH_ID_GENERATOR: LazyLock<AtomicU64> =
275 LazyLock::new(|| AtomicU64::new(0));
276
277#[derive(Clone, Debug)]
279pub struct SharedBufferBatch {
280 pub(crate) inner: Arc<SharedBufferBatchInner>,
281 pub table_id: TableId,
282}
283
284impl SharedBufferBatch {
285 pub fn for_test(
286 sorted_items: Vec<SharedBufferItem>,
287 epoch: HummockEpoch,
288 table_id: TableId,
289 ) -> Self {
290 Self::for_test_inner(sorted_items, None, epoch, table_id)
291 }
292
293 pub fn for_test_with_old_values(
294 sorted_items: Vec<SharedBufferItem>,
295 old_values: Vec<Bytes>,
296 epoch: HummockEpoch,
297 table_id: TableId,
298 ) -> Self {
299 Self::for_test_inner(sorted_items, Some(old_values), epoch, table_id)
300 }
301
302 fn for_test_inner(
303 sorted_items: Vec<SharedBufferItem>,
304 old_values: Option<Vec<Bytes>>,
305 epoch: HummockEpoch,
306 table_id: TableId,
307 ) -> Self {
308 let (size, old_value_size) = Self::measure_batch_size(&sorted_items, old_values.as_deref());
309
310 let old_values = old_values
311 .map(|old_values| SharedBufferBatchOldValues::for_test(old_values, old_value_size));
312
313 Self {
314 inner: Arc::new(SharedBufferBatchInner::new(
315 epoch,
316 0,
317 sorted_items,
318 old_values,
319 size,
320 None,
321 )),
322 table_id,
323 }
324 }
325
326 pub fn measure_delete_range_size(batch_items: &[(Bound<Bytes>, Bound<Bytes>)]) -> usize {
327 batch_items
328 .iter()
329 .map(|(left, right)| {
330 let l1 = match left {
332 Bound::Excluded(x) | Bound::Included(x) => x.len() + 13,
333 Bound::Unbounded => 13,
334 };
335 let l2 = match right {
336 Bound::Excluded(x) | Bound::Included(x) => x.len() + 13,
337 Bound::Unbounded => 13,
338 };
339 l1 + l2
340 })
341 .sum()
342 }
343
344 pub fn measure_batch_size(
346 batch_items: &[SharedBufferItem],
347 old_values: Option<&[Bytes]>,
348 ) -> (usize, usize) {
349 let old_value_size = old_values
350 .iter()
351 .flat_map(|slice| slice.iter().map(|value| size_of_val(value) + value.len()))
352 .sum::<usize>();
353 let kv_size = batch_items
355 .iter()
356 .map(|(k, v)| {
357 k.len() + {
358 match v {
359 SharedBufferValue::Insert(val) | SharedBufferValue::Update(val) => {
360 val.len()
361 }
362 SharedBufferValue::Delete => 0,
363 }
364 }
365 })
366 .sum::<usize>();
367 (kv_size + old_value_size, old_value_size)
368 }
369
370 pub fn filter<R, B>(&self, table_id: TableId, table_key_range: &R) -> bool
371 where
372 R: RangeBounds<TableKey<B>>,
373 B: AsRef<[u8]>,
374 {
375 let left = table_key_range
376 .start_bound()
377 .as_ref()
378 .map(|key| TableKey(key.0.as_ref()));
379 let right = table_key_range
380 .end_bound()
381 .as_ref()
382 .map(|key| TableKey(key.0.as_ref()));
383 self.table_id == table_id
384 && range_overlap(
385 &(left, right),
386 &self.start_table_key(),
387 Included(&self.end_table_key()),
388 )
389 }
390
391 pub fn table_id(&self) -> TableId {
392 self.table_id
393 }
394
395 pub fn min_epoch(&self) -> HummockEpoch {
396 *self.inner.epochs.first().unwrap()
397 }
398
399 pub fn max_epoch(&self) -> HummockEpoch {
400 *self.inner.epochs.last().unwrap()
401 }
402
403 pub fn key_count(&self) -> usize {
404 self.inner.entries.len()
405 }
406
407 pub fn value_count(&self) -> usize {
408 self.inner.new_values.len()
409 }
410
411 pub fn has_old_value(&self) -> bool {
412 self.inner.old_values.is_some()
413 }
414
415 pub fn get<'a>(
416 &'a self,
417 table_key: TableKey<&[u8]>,
418 read_epoch: HummockEpoch,
419 _read_options: &ReadOptions,
420 ) -> Option<(HummockValue<&'a Bytes>, EpochWithGap)> {
421 self.inner.get_value(table_key, read_epoch)
422 }
423
424 pub fn range_exists(&self, table_key_range: &TableKeyRange) -> bool {
425 self.inner
426 .entries
427 .binary_search_by(|m| {
428 let key = &m.key;
429 let too_left = match &table_key_range.0 {
430 std::ops::Bound::Included(range_start) => range_start.as_ref() > key.as_ref(),
431 std::ops::Bound::Excluded(range_start) => range_start.as_ref() >= key.as_ref(),
432 std::ops::Bound::Unbounded => false,
433 };
434 if too_left {
435 return Ordering::Less;
436 }
437
438 let too_right = match &table_key_range.1 {
439 std::ops::Bound::Included(range_end) => range_end.as_ref() < key.as_ref(),
440 std::ops::Bound::Excluded(range_end) => range_end.as_ref() <= key.as_ref(),
441 std::ops::Bound::Unbounded => false,
442 };
443 if too_right {
444 return Ordering::Greater;
445 }
446
447 Ordering::Equal
448 })
449 .is_ok()
450 }
451
452 pub fn into_directed_iter<D: HummockIteratorDirection, const IS_NEW_VALUE: bool>(
453 self,
454 ) -> SharedBufferBatchIterator<D, IS_NEW_VALUE> {
455 SharedBufferBatchIterator::<D, IS_NEW_VALUE>::new(self.inner, self.table_id)
456 }
457
458 pub fn into_old_value_iter(self) -> SharedBufferBatchIterator<Forward, false> {
459 self.into_directed_iter()
460 }
461
462 pub fn into_forward_iter(self) -> SharedBufferBatchIterator<Forward> {
463 self.into_directed_iter()
464 }
465
466 pub fn into_backward_iter(self) -> SharedBufferBatchIterator<Backward> {
467 self.into_directed_iter()
468 }
469
470 #[inline(always)]
471 pub fn start_table_key(&self) -> TableKey<&[u8]> {
472 TableKey(self.inner.entries.first().expect("non-empty").key.as_ref())
473 }
474
475 #[inline(always)]
476 pub fn end_table_key(&self) -> TableKey<&[u8]> {
477 TableKey(self.inner.entries.last().expect("non-empty").key.as_ref())
478 }
479
480 #[inline(always)]
481 pub fn raw_largest_key(&self) -> &TableKey<Bytes> {
482 &self.inner.entries.last().expect("non-empty").key
483 }
484
485 pub fn start_user_key(&self) -> UserKey<&[u8]> {
488 UserKey::new(self.table_id, self.start_table_key())
489 }
490
491 pub fn size(&self) -> usize {
492 self.inner.size
493 }
494
495 pub(crate) fn old_values(&self) -> Option<&SharedBufferBatchOldValues> {
496 self.inner.old_values.as_ref()
497 }
498
499 pub fn batch_id(&self) -> SharedBufferBatchId {
500 self.inner.batch_id
501 }
502
503 pub fn epochs(&self) -> &Vec<HummockEpoch> {
504 &self.inner.epochs
505 }
506
507 pub(crate) fn build_shared_buffer_batch(
508 epoch: HummockEpoch,
509 spill_offset: u16,
510 sorted_items: Vec<SharedBufferItem>,
511 old_values: Option<SharedBufferBatchOldValues>,
512 size: usize,
513 table_id: TableId,
514 tracker: Option<MemoryTracker>,
515 ) -> Self {
516 let inner = SharedBufferBatchInner::new(
517 epoch,
518 spill_offset,
519 sorted_items,
520 old_values,
521 size,
522 tracker,
523 );
524 SharedBufferBatch {
525 inner: Arc::new(inner),
526 table_id,
527 }
528 }
529
530 #[cfg(any(test, feature = "test"))]
531 pub fn build_shared_buffer_batch_for_test(
532 epoch: HummockEpoch,
533 spill_offset: u16,
534 sorted_items: Vec<SharedBufferItem>,
535 size: usize,
536 table_id: TableId,
537 ) -> Self {
538 let inner =
539 SharedBufferBatchInner::new(epoch, spill_offset, sorted_items, None, size, None);
540 SharedBufferBatch {
541 inner: Arc::new(inner),
542 table_id,
543 }
544 }
545}
546
547pub struct SharedBufferBatchIterator<D: HummockIteratorDirection, const IS_NEW_VALUE: bool = true> {
550 inner: Arc<SharedBufferBatchInner>,
551 current_entry_idx: usize,
553 current_value_idx: usize,
555 value_end_offset: usize,
557 table_id: TableId,
558 _phantom: PhantomData<D>,
559}
560
561impl<D: HummockIteratorDirection, const IS_NEW_VALUE: bool>
562 SharedBufferBatchIterator<D, IS_NEW_VALUE>
563{
564 pub(crate) fn new(inner: Arc<SharedBufferBatchInner>, table_id: TableId) -> Self {
565 if !IS_NEW_VALUE {
566 assert!(
567 inner.old_values.is_some(),
568 "create old value iter with no old value: {:?}",
569 table_id
570 );
571 }
572 Self {
573 inner,
574 current_entry_idx: 0,
575 current_value_idx: 0,
576 value_end_offset: 0,
577 table_id,
578 _phantom: Default::default(),
579 }
580 }
581
582 fn is_valid_entry_idx(&self) -> bool {
583 self.current_entry_idx < self.inner.entries.len()
584 }
585
586 fn advance_to_next_entry(&mut self) {
587 debug_assert!(self.is_valid_entry_idx());
588 match D::direction() {
589 DirectionEnum::Forward => {
590 self.current_entry_idx += 1;
591 }
592 DirectionEnum::Backward => {
593 if self.current_entry_idx == 0 {
594 self.current_entry_idx = self.inner.entries.len();
595 } else {
596 self.current_entry_idx -= 1;
597 }
598 }
599 }
600 }
601
602 fn reset_value_idx(&mut self) {
603 debug_assert!(self.is_valid_entry_idx());
604 self.current_value_idx = self.inner.entries[self.current_entry_idx].value_offset;
605 self.value_end_offset = self.get_value_end_offset();
606 }
607
608 fn get_value_end_offset(&self) -> usize {
609 debug_assert!(self.is_valid_entry_idx());
610 SharedBufferKeyEntry::value_end_offset(
611 self.current_entry_idx,
612 &self.inner.entries,
613 &self.inner.new_values,
614 )
615 }
616
617 fn assert_valid_idx(&self) {
618 debug_assert!(self.is_valid_entry_idx());
619 debug_assert!(
620 self.current_value_idx >= self.inner.entries[self.current_entry_idx].value_offset
621 );
622 debug_assert_eq!(self.value_end_offset, self.get_value_end_offset());
623 debug_assert!(self.current_value_idx < self.value_end_offset);
624 if !IS_NEW_VALUE {
625 debug_assert!(!matches!(
626 &self.inner.new_values[self.current_value_idx].1,
627 SharedBufferValue::Insert(_)
628 ));
629 }
630 }
631
632 fn advance_to_next_value(&mut self) {
633 self.assert_valid_idx();
634
635 if self.current_value_idx + 1 < self.value_end_offset {
636 self.current_value_idx += 1;
637 } else {
638 self.advance_to_next_entry();
639 if self.is_valid_entry_idx() {
640 self.reset_value_idx();
641 }
642 }
643 }
644
645 fn advance_until_valid_old_value(&mut self) {
646 debug_assert!(!IS_NEW_VALUE);
647 if !self.is_valid_entry_idx() {
648 return;
649 }
650 loop {
651 while self.current_value_idx < self.value_end_offset
652 && matches!(
653 &self.inner.new_values[self.current_value_idx].1,
654 SharedBufferValue::Insert(_)
655 )
656 {
657 self.current_value_idx += 1;
658 }
659 if self.current_value_idx >= self.value_end_offset {
660 debug_assert_eq!(self.current_value_idx, self.value_end_offset);
661 self.advance_to_next_entry();
662 if self.is_valid_entry_idx() {
663 self.reset_value_idx();
664 continue;
665 } else {
666 break;
667 }
668 } else {
669 break;
670 }
671 }
672 }
673}
674
675impl SharedBufferBatchIterator<Forward> {
676 pub(crate) fn advance_to_next_key(&mut self) {
677 self.advance_to_next_entry();
678 if self.is_valid_entry_idx() {
679 self.reset_value_idx();
680 }
681 }
682
683 pub(crate) fn current_key_entry(&self) -> SharedBufferVersionedEntryRef<'_> {
684 self.assert_valid_idx();
685 debug_assert_eq!(
686 self.current_value_idx,
687 self.inner.entries[self.current_entry_idx].value_offset
688 );
689 SharedBufferVersionedEntryRef {
690 key: &self.inner.entries[self.current_entry_idx].key,
691 new_values: &self.inner.new_values[self.current_value_idx..self.value_end_offset],
692 old_values: self.inner.old_values.as_ref().map(|old_values| {
693 &old_values.values[self.current_value_idx..self.value_end_offset]
694 }),
695 }
696 }
697}
698
699impl<D: HummockIteratorDirection, const IS_NEW_VALUE: bool> HummockIterator
700 for SharedBufferBatchIterator<D, IS_NEW_VALUE>
701{
702 type Direction = D;
703
704 async fn next(&mut self) -> HummockResult<()> {
705 self.advance_to_next_value();
706 if !IS_NEW_VALUE {
707 self.advance_until_valid_old_value();
708 }
709 Ok(())
710 }
711
712 fn key(&self) -> FullKey<&[u8]> {
713 self.assert_valid_idx();
714 let key = self.inner.entries[self.current_entry_idx].key.as_ref();
715 let epoch_with_gap = self.inner.new_values[self.current_value_idx].0;
716 FullKey::new_with_gap_epoch(self.table_id, TableKey(key), epoch_with_gap)
717 }
718
719 fn value(&self) -> HummockValue<&[u8]> {
720 self.assert_valid_idx();
721 if IS_NEW_VALUE {
722 self.inner.new_values[self.current_value_idx]
723 .1
724 .to_ref()
725 .to_slice()
726 .into()
727 } else {
728 HummockValue::put(
729 self.inner.old_values.as_ref().unwrap().values[self.current_value_idx].as_ref(),
730 )
731 }
732 }
733
734 fn is_valid(&self) -> bool {
735 self.is_valid_entry_idx()
736 }
737
738 async fn rewind(&mut self) -> HummockResult<()> {
739 match D::direction() {
740 DirectionEnum::Forward => {
741 self.current_entry_idx = 0;
742 }
743 DirectionEnum::Backward => {
744 self.current_entry_idx = self.inner.entries.len() - 1;
745 }
746 };
747 self.reset_value_idx();
748 if !IS_NEW_VALUE {
749 self.advance_until_valid_old_value();
750 }
751 Ok(())
752 }
753
754 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
755 debug_assert_eq!(key.user_key.table_id, self.table_id);
756 let partition_point = self
759 .inner
760 .entries
761 .binary_search_by(|probe| probe.key.as_ref().cmp(*key.user_key.table_key));
762 let seek_key_epoch = key.epoch_with_gap;
763 match partition_point {
764 Ok(i) => {
765 self.current_entry_idx = i;
766 self.reset_value_idx();
767 while self.current_value_idx < self.value_end_offset {
768 let epoch_with_gap = self.inner.new_values[self.current_value_idx].0;
769 if epoch_with_gap <= seek_key_epoch {
770 break;
771 }
772 self.current_value_idx += 1;
773 }
774 if self.current_value_idx == self.value_end_offset {
775 self.advance_to_next_entry();
776 if self.is_valid_entry_idx() {
777 self.reset_value_idx();
778 }
779 }
780 }
781 Err(i) => match D::direction() {
782 DirectionEnum::Forward => {
783 self.current_entry_idx = i;
784 if self.is_valid_entry_idx() {
785 self.reset_value_idx();
786 }
787 }
788 DirectionEnum::Backward => {
789 if i == 0 {
790 self.current_entry_idx = self.inner.entries.len();
791 } else {
792 self.current_entry_idx = i - 1;
793 self.reset_value_idx();
794 }
795 }
796 },
797 };
798 if !IS_NEW_VALUE {
799 self.advance_until_valid_old_value();
800 }
801 Ok(())
802 }
803
804 fn collect_local_statistic(&self, _stats: &mut crate::monitor::StoreLocalStatistic) {}
805
806 fn value_meta(&self) -> ValueMeta {
807 ValueMeta::default()
808 }
809}
810
811#[cfg(test)]
812mod tests {
813 use std::ops::Bound::Excluded;
814
815 use itertools::{Itertools, zip_eq};
816 use risingwave_common::util::epoch::{EpochExt, test_epoch};
817 use risingwave_hummock_sdk::key::map_table_key_range;
818
819 use super::*;
820 use crate::hummock::compactor::merge_imms_in_memory;
821 use crate::hummock::iterator::test_utils::{
822 iterator_test_key_of_epoch, iterator_test_table_key_of, transform_shared_buffer,
823 };
824
825 fn to_hummock_value_batch(
826 items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)>,
827 ) -> Vec<(Vec<u8>, HummockValue<Bytes>)> {
828 items.into_iter().map(|(k, v)| (k, v.into())).collect()
829 }
830
831 #[tokio::test]
832 async fn test_shared_buffer_batch_basic() {
833 let epoch = test_epoch(1);
834 let shared_buffer_items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
835 (
836 iterator_test_table_key_of(0),
837 SharedBufferValue::Insert(Bytes::from("value1")),
838 ),
839 (
840 iterator_test_table_key_of(1),
841 SharedBufferValue::Insert(Bytes::from("value1")),
842 ),
843 (
844 iterator_test_table_key_of(2),
845 SharedBufferValue::Insert(Bytes::from("value1")),
846 ),
847 ];
848 let shared_buffer_batch = SharedBufferBatch::for_test(
849 transform_shared_buffer(shared_buffer_items.clone()),
850 epoch,
851 Default::default(),
852 );
853 let shared_buffer_items = to_hummock_value_batch(shared_buffer_items);
854
855 assert_eq!(
857 *shared_buffer_batch.start_table_key(),
858 shared_buffer_items[0].0
859 );
860 assert_eq!(
861 *shared_buffer_batch.end_table_key(),
862 shared_buffer_items[2].0
863 );
864
865 for (k, v) in &shared_buffer_items {
867 assert_eq!(
868 shared_buffer_batch
869 .get(TableKey(k.as_slice()), epoch, &ReadOptions::default())
870 .unwrap()
871 .0
872 .as_slice(),
873 v.as_slice()
874 );
875 }
876 assert_eq!(
877 shared_buffer_batch.get(
878 TableKey(iterator_test_table_key_of(3).as_slice()),
879 epoch,
880 &ReadOptions::default()
881 ),
882 None
883 );
884 assert_eq!(
885 shared_buffer_batch.get(
886 TableKey(iterator_test_table_key_of(4).as_slice()),
887 epoch,
888 &ReadOptions::default()
889 ),
890 None
891 );
892
893 let mut iter = shared_buffer_batch.clone().into_forward_iter();
895 iter.rewind().await.unwrap();
896 let mut output = vec![];
897 while iter.is_valid() {
898 output.push((
899 iter.key().user_key.table_key.to_vec(),
900 iter.value().to_bytes(),
901 ));
902 iter.next().await.unwrap();
903 }
904 assert_eq!(output, shared_buffer_items);
905
906 let mut backward_iter = shared_buffer_batch.clone().into_backward_iter();
908 backward_iter.rewind().await.unwrap();
909 let mut output = vec![];
910 while backward_iter.is_valid() {
911 output.push((
912 backward_iter.key().user_key.table_key.to_vec(),
913 backward_iter.value().to_bytes(),
914 ));
915 backward_iter.next().await.unwrap();
916 }
917 output.reverse();
918 assert_eq!(output, shared_buffer_items);
919 }
920
921 #[tokio::test]
922 async fn test_shared_buffer_batch_seek() {
923 let epoch = test_epoch(1);
924 let shared_buffer_items = vec![
925 (
926 iterator_test_table_key_of(1),
927 SharedBufferValue::Insert(Bytes::from("value1")),
928 ),
929 (
930 iterator_test_table_key_of(2),
931 SharedBufferValue::Insert(Bytes::from("value2")),
932 ),
933 (
934 iterator_test_table_key_of(3),
935 SharedBufferValue::Insert(Bytes::from("value3")),
936 ),
937 ];
938 let shared_buffer_batch = SharedBufferBatch::for_test(
939 transform_shared_buffer(shared_buffer_items.clone()),
940 epoch,
941 Default::default(),
942 );
943 let shared_buffer_items = to_hummock_value_batch(shared_buffer_items);
944
945 let mut iter = shared_buffer_batch.clone().into_forward_iter();
947 iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
948 .await
949 .unwrap();
950 for item in &shared_buffer_items {
951 assert!(iter.is_valid());
952 assert_eq!(*iter.key().user_key.table_key, item.0);
953 assert_eq!(iter.value(), item.1.as_slice());
954 iter.next().await.unwrap();
955 }
956 assert!(!iter.is_valid());
957
958 let mut iter = shared_buffer_batch.clone().into_forward_iter();
960 iter.seek(iterator_test_key_of_epoch(4, epoch).to_ref())
961 .await
962 .unwrap();
963 assert!(!iter.is_valid());
964
965 let mut iter = shared_buffer_batch.clone().into_forward_iter();
967 iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
968 .await
969 .unwrap();
970 for item in &shared_buffer_items[1..] {
971 assert!(iter.is_valid());
972 assert_eq!(*iter.key().user_key.table_key, item.0);
973 assert_eq!(iter.value(), item.1.as_slice());
974 iter.next().await.unwrap();
975 }
976 assert!(!iter.is_valid());
977
978 let mut iter = shared_buffer_batch.clone().into_forward_iter();
980 iter.seek(iterator_test_key_of_epoch(2, test_epoch(2)).to_ref())
981 .await
982 .unwrap();
983 for item in &shared_buffer_items[1..] {
984 assert!(iter.is_valid());
985 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
986 assert_eq!(iter.value(), item.1.as_slice());
987 iter.next().await.unwrap();
988 }
989 assert!(!iter.is_valid());
990
991 let mut iter = shared_buffer_batch.clone().into_forward_iter();
993 iter.seek(iterator_test_key_of_epoch(2, test_epoch(0)).to_ref())
994 .await
995 .unwrap();
996 let item = shared_buffer_items.last().unwrap();
997 assert!(iter.is_valid());
998 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
999 assert_eq!(iter.value(), item.1.as_slice());
1000 iter.next().await.unwrap();
1001 assert!(!iter.is_valid());
1002
1003 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1005 iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1006 .await
1007 .unwrap();
1008 assert!(!iter.is_valid());
1009
1010 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1012 iter.seek(iterator_test_key_of_epoch(4, epoch).to_ref())
1013 .await
1014 .unwrap();
1015 for item in shared_buffer_items.iter().rev() {
1016 assert!(iter.is_valid());
1017 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1018 assert_eq!(iter.value(), item.1.as_slice());
1019 iter.next().await.unwrap();
1020 }
1021 assert!(!iter.is_valid());
1022
1023 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1025 iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1026 .await
1027 .unwrap();
1028 for item in shared_buffer_items[0..=1].iter().rev() {
1029 assert!(iter.is_valid());
1030 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1031 assert_eq!(iter.value(), item.1.as_slice());
1032 iter.next().await.unwrap();
1033 }
1034 assert!(!iter.is_valid());
1035
1036 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1038 iter.seek(iterator_test_key_of_epoch(2, epoch.prev_epoch()).to_ref())
1039 .await
1040 .unwrap();
1041 assert!(iter.is_valid());
1042 let item = shared_buffer_items.first().unwrap();
1043 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1044 assert_eq!(iter.value(), item.1.as_slice());
1045 iter.next().await.unwrap();
1046 assert!(!iter.is_valid());
1047
1048 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1050 iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1051 .await
1052 .unwrap();
1053 for item in shared_buffer_items[0..=1].iter().rev() {
1054 assert!(iter.is_valid());
1055 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1056 assert_eq!(iter.value(), item.1.as_slice());
1057 iter.next().await.unwrap();
1058 }
1059 assert!(!iter.is_valid());
1060 }
1061
1062 #[tokio::test]
1063 async fn test_shared_buffer_batch_old_value_iter() {
1064 let epoch = test_epoch(1);
1065 let key_values = vec![
1066 (
1067 iterator_test_table_key_of(1),
1068 SharedBufferValue::Insert(Bytes::from("value1")),
1069 ),
1070 (
1071 iterator_test_table_key_of(2),
1072 SharedBufferValue::Update(Bytes::from("value2")),
1073 ),
1074 (
1075 iterator_test_table_key_of(3),
1076 SharedBufferValue::Insert(Bytes::from("value3")),
1077 ),
1078 (iterator_test_table_key_of(4), SharedBufferValue::Delete),
1079 ];
1080 let old_values = vec![
1081 Bytes::new(),
1082 Bytes::from("old_value2"),
1083 Bytes::new(),
1084 Bytes::from("old_value4"),
1085 ];
1086 let shared_buffer_batch = SharedBufferBatch::for_test_with_old_values(
1087 transform_shared_buffer(key_values.clone()),
1088 old_values.clone(),
1089 epoch,
1090 Default::default(),
1091 );
1092 let shared_buffer_items = to_hummock_value_batch(key_values.clone());
1093 let expected_old_value_iter_items = zip_eq(&key_values, &old_values)
1094 .filter(|((_, new_value), _)| !matches!(new_value, SharedBufferValue::Insert(_)))
1095 .map(|((key, _), old_value)| (key.clone(), HummockValue::Put(old_value)))
1096 .collect_vec();
1097
1098 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1099 iter.rewind().await.unwrap();
1100 for item in &expected_old_value_iter_items {
1101 assert!(iter.is_valid());
1102 assert_eq!(*iter.key().user_key.table_key, item.0);
1103 assert_eq!(iter.value(), item.1.as_slice());
1104 iter.next().await.unwrap();
1105 }
1106 assert!(!iter.is_valid());
1107
1108 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1110 iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1111 .await
1112 .unwrap();
1113 for item in &shared_buffer_items {
1114 assert!(iter.is_valid());
1115 assert_eq!(*iter.key().user_key.table_key, item.0);
1116 assert_eq!(iter.value(), item.1.as_slice());
1117 iter.next().await.unwrap();
1118 }
1119 assert!(!iter.is_valid());
1120
1121 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1122 iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1123 .await
1124 .unwrap();
1125 for item in &expected_old_value_iter_items {
1126 assert!(iter.is_valid());
1127 assert_eq!(*iter.key().user_key.table_key, item.0);
1128 assert_eq!(iter.value(), item.1.as_slice());
1129 iter.next().await.unwrap();
1130 }
1131 assert!(!iter.is_valid());
1132
1133 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1135 iter.seek(iterator_test_key_of_epoch(5, epoch).to_ref())
1136 .await
1137 .unwrap();
1138 assert!(!iter.is_valid());
1139
1140 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1141 iter.seek(iterator_test_key_of_epoch(5, epoch).to_ref())
1142 .await
1143 .unwrap();
1144 assert!(!iter.is_valid());
1145
1146 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1148 iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1149 .await
1150 .unwrap();
1151 for item in &shared_buffer_items[1..] {
1152 assert!(iter.is_valid());
1153 assert_eq!(*iter.key().user_key.table_key, item.0);
1154 assert_eq!(iter.value(), item.1.as_slice());
1155 iter.next().await.unwrap();
1156 }
1157 assert!(!iter.is_valid());
1158
1159 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1160 iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1161 .await
1162 .unwrap();
1163 for item in &expected_old_value_iter_items {
1164 assert!(iter.is_valid());
1165 assert_eq!(*iter.key().user_key.table_key, item.0);
1166 assert_eq!(iter.value(), item.1.as_slice());
1167 iter.next().await.unwrap();
1168 }
1169 assert!(!iter.is_valid());
1170
1171 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1173 iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1174 .await
1175 .unwrap();
1176 for item in &shared_buffer_items[1..] {
1177 assert!(iter.is_valid());
1178 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1179 assert_eq!(iter.value(), item.1.as_slice());
1180 iter.next().await.unwrap();
1181 }
1182 assert!(!iter.is_valid());
1183
1184 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1185 iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1186 .await
1187 .unwrap();
1188 for item in &expected_old_value_iter_items {
1189 assert!(iter.is_valid());
1190 assert_eq!(*iter.key().user_key.table_key, item.0);
1191 assert_eq!(iter.value(), item.1.as_slice());
1192 iter.next().await.unwrap();
1193 }
1194 assert!(!iter.is_valid());
1195
1196 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1197 iter.seek(iterator_test_key_of_epoch(2, epoch.prev_epoch()).to_ref())
1198 .await
1199 .unwrap();
1200 for item in &expected_old_value_iter_items[1..] {
1201 assert!(iter.is_valid());
1202 assert_eq!(*iter.key().user_key.table_key, item.0);
1203 assert_eq!(iter.value(), item.1.as_slice());
1204 iter.next().await.unwrap();
1205 }
1206 assert!(!iter.is_valid());
1207
1208 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1210 iter.seek(iterator_test_key_of_epoch(3, epoch.prev_epoch()).to_ref())
1211 .await
1212 .unwrap();
1213 let item = shared_buffer_items.last().unwrap();
1214 assert!(iter.is_valid());
1215 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1216 assert_eq!(iter.value(), item.1.as_slice());
1217 iter.next().await.unwrap();
1218 assert!(!iter.is_valid());
1219
1220 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1222 iter.seek(iterator_test_key_of_epoch(3, epoch).to_ref())
1223 .await
1224 .unwrap();
1225 for item in &expected_old_value_iter_items[1..] {
1226 assert!(iter.is_valid());
1227 assert_eq!(*iter.key().user_key.table_key, item.0);
1228 assert_eq!(iter.value(), item.1.as_slice());
1229 iter.next().await.unwrap();
1230 }
1231 assert!(!iter.is_valid());
1232 }
1233
1234 #[tokio::test]
1235 #[should_panic]
1236 async fn test_invalid_table_id() {
1237 let epoch = test_epoch(1);
1238 let shared_buffer_batch = SharedBufferBatch::for_test(vec![], epoch, Default::default());
1239 let mut iter = shared_buffer_batch.into_forward_iter();
1241 iter.seek(FullKey::for_test(TableId::new(1), vec![], epoch).to_ref())
1242 .await
1243 .unwrap();
1244 }
1245
1246 #[tokio::test]
1247 async fn test_shared_buffer_batch_range_existx() {
1248 let epoch = test_epoch(1);
1249 let shared_buffer_items = vec![
1250 (
1251 Vec::from("a_1"),
1252 SharedBufferValue::Insert(Bytes::from("value1")),
1253 ),
1254 (
1255 Vec::from("a_3"),
1256 SharedBufferValue::Insert(Bytes::from("value2")),
1257 ),
1258 (
1259 Vec::from("a_5"),
1260 SharedBufferValue::Insert(Bytes::from("value3")),
1261 ),
1262 (
1263 Vec::from("b_2"),
1264 SharedBufferValue::Insert(Bytes::from("value3")),
1265 ),
1266 ];
1267 let shared_buffer_batch = SharedBufferBatch::for_test(
1268 transform_shared_buffer(shared_buffer_items),
1269 epoch,
1270 Default::default(),
1271 );
1272
1273 let range = (Included(Bytes::from("a")), Excluded(Bytes::from("b")));
1274 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1275 let range = (Included(Bytes::from("a_")), Excluded(Bytes::from("b_")));
1276 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1277 let range = (Included(Bytes::from("a_1")), Included(Bytes::from("a_1")));
1278 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1279 let range = (Included(Bytes::from("a_1")), Included(Bytes::from("a_2")));
1280 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1281 let range = (Included(Bytes::from("a_0x")), Included(Bytes::from("a_2x")));
1282 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1283 let range = (Included(Bytes::from("a_")), Excluded(Bytes::from("c_")));
1284 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1285 let range = (Included(Bytes::from("b_0x")), Included(Bytes::from("b_2x")));
1286 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1287 let range = (Included(Bytes::from("b_2")), Excluded(Bytes::from("c_1x")));
1288 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1289
1290 let range = (Included(Bytes::from("a_0")), Excluded(Bytes::from("a_1")));
1291 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1292 let range = (Included(Bytes::from("a__0")), Excluded(Bytes::from("a__5")));
1293 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1294 let range = (Included(Bytes::from("b_1")), Excluded(Bytes::from("b_2")));
1295 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1296 let range = (Included(Bytes::from("b_3")), Excluded(Bytes::from("c_1")));
1297 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1298 let range = (Included(Bytes::from("b__x")), Excluded(Bytes::from("c__x")));
1299 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1300 }
1301
1302 #[tokio::test]
1303 async fn test_merge_imms_basic() {
1304 let table_id = TableId { table_id: 1004 };
1305 let shared_buffer_items1: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1306 (
1307 iterator_test_table_key_of(1),
1308 SharedBufferValue::Insert(Bytes::from("value1")),
1309 ),
1310 (
1311 iterator_test_table_key_of(2),
1312 SharedBufferValue::Insert(Bytes::from("value2")),
1313 ),
1314 (
1315 iterator_test_table_key_of(3),
1316 SharedBufferValue::Insert(Bytes::from("value3")),
1317 ),
1318 ];
1319 let epoch = test_epoch(1);
1320 let imm1 = SharedBufferBatch::for_test(
1321 transform_shared_buffer(shared_buffer_items1.clone()),
1322 epoch,
1323 table_id,
1324 );
1325 let shared_buffer_items1 = to_hummock_value_batch(shared_buffer_items1);
1326 let shared_buffer_items2: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1327 (
1328 iterator_test_table_key_of(1),
1329 SharedBufferValue::Insert(Bytes::from("value12")),
1330 ),
1331 (
1332 iterator_test_table_key_of(2),
1333 SharedBufferValue::Insert(Bytes::from("value22")),
1334 ),
1335 (
1336 iterator_test_table_key_of(3),
1337 SharedBufferValue::Insert(Bytes::from("value32")),
1338 ),
1339 ];
1340 let epoch = test_epoch(2);
1341 let imm2 = SharedBufferBatch::for_test(
1342 transform_shared_buffer(shared_buffer_items2.clone()),
1343 epoch,
1344 table_id,
1345 );
1346 let shared_buffer_items2 = to_hummock_value_batch(shared_buffer_items2);
1347
1348 let shared_buffer_items3: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1349 (
1350 iterator_test_table_key_of(1),
1351 SharedBufferValue::Insert(Bytes::from("value13")),
1352 ),
1353 (
1354 iterator_test_table_key_of(2),
1355 SharedBufferValue::Insert(Bytes::from("value23")),
1356 ),
1357 (
1358 iterator_test_table_key_of(3),
1359 SharedBufferValue::Insert(Bytes::from("value33")),
1360 ),
1361 ];
1362 let epoch = test_epoch(3);
1363 let imm3 = SharedBufferBatch::for_test(
1364 transform_shared_buffer(shared_buffer_items3.clone()),
1365 epoch,
1366 table_id,
1367 );
1368 let shared_buffer_items3 = to_hummock_value_batch(shared_buffer_items3);
1369
1370 let batch_items = [
1371 shared_buffer_items1,
1372 shared_buffer_items2,
1373 shared_buffer_items3,
1374 ];
1375 let imms = vec![imm3, imm2, imm1];
1377 let merged_imm = merge_imms_in_memory(table_id, imms.clone(), None).await;
1378
1379 for (i, items) in batch_items.iter().enumerate() {
1381 for (key, value) in items {
1382 assert_eq!(
1383 merged_imm
1384 .get(
1385 TableKey(key.as_slice()),
1386 test_epoch(i as u64 + 1),
1387 &ReadOptions::default()
1388 )
1389 .unwrap()
1390 .0
1391 .as_slice(),
1392 value.as_slice(),
1393 "epoch: {}, key: {:?}",
1394 test_epoch(i as u64 + 1),
1395 String::from_utf8(key.clone())
1396 );
1397 }
1398 }
1399 assert_eq!(
1400 merged_imm.get(
1401 TableKey(iterator_test_table_key_of(4).as_slice()),
1402 test_epoch(1),
1403 &ReadOptions::default()
1404 ),
1405 None
1406 );
1407 assert_eq!(
1408 merged_imm.get(
1409 TableKey(iterator_test_table_key_of(5).as_slice()),
1410 test_epoch(1),
1411 &ReadOptions::default()
1412 ),
1413 None
1414 );
1415
1416 for snapshot_epoch in 1..=3 {
1418 let mut iter = merged_imm.clone().into_forward_iter();
1419 iter.rewind().await.unwrap();
1420 let mut output = vec![];
1421 while iter.is_valid() {
1422 let epoch = iter.key().epoch_with_gap.pure_epoch();
1423 if test_epoch(snapshot_epoch) == epoch {
1424 output.push((
1425 iter.key().user_key.table_key.to_vec(),
1426 iter.value().to_bytes(),
1427 ));
1428 }
1429 iter.next().await.unwrap();
1430 }
1431 assert_eq!(output, batch_items[snapshot_epoch as usize - 1]);
1432 }
1433
1434 {
1436 let mut iter = merged_imm.clone().into_forward_iter();
1437 iter.rewind().await.unwrap();
1438 let mut output = vec![];
1439 while iter.is_valid() {
1440 output.push((
1441 iter.key().user_key.table_key.to_vec(),
1442 iter.value().to_bytes(),
1443 ));
1444 iter.next().await.unwrap();
1445 }
1446
1447 let mut expected = vec![];
1448 for key_idx in 0..=2 {
1449 for epoch in (1..=3).rev() {
1450 let item = batch_items[epoch - 1][key_idx].clone();
1451 expected.push(item);
1452 }
1453 }
1454 assert_eq!(expected, output);
1455
1456 let mut backward_iter = merged_imm.clone().into_backward_iter();
1457 backward_iter.rewind().await.unwrap();
1458 let mut output = vec![];
1459 while backward_iter.is_valid() {
1460 output.push((
1461 backward_iter.key().user_key.table_key.to_vec(),
1462 backward_iter.value().to_bytes(),
1463 ));
1464 backward_iter.next().await.unwrap();
1465 }
1466 let mut expected = vec![];
1467 for key_idx in (0..=2).rev() {
1468 for epoch in (1..=3).rev() {
1469 let item = batch_items[epoch - 1][key_idx].clone();
1470 expected.push(item);
1471 }
1472 }
1473 assert_eq!(expected, output);
1474 }
1475 }
1476
1477 #[tokio::test]
1478 async fn test_merge_imms_with_old_values() {
1479 let table_id = TableId { table_id: 1004 };
1480 let key_value1: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1481 (
1482 iterator_test_table_key_of(1),
1483 SharedBufferValue::Insert(Bytes::from("value1")),
1484 ),
1485 (
1486 iterator_test_table_key_of(2),
1487 SharedBufferValue::Update(Bytes::from("value2")),
1488 ),
1489 (iterator_test_table_key_of(3), SharedBufferValue::Delete),
1490 ];
1491 let old_value1 = vec![
1492 Bytes::new(),
1493 Bytes::from("old_value2"),
1494 Bytes::from("old_value3"),
1495 ];
1496 let epoch = test_epoch(1);
1497 let imm1 = SharedBufferBatch::for_test_with_old_values(
1498 transform_shared_buffer(key_value1.clone()),
1499 old_value1.clone(),
1500 epoch,
1501 table_id,
1502 );
1503 let shared_buffer_items1 = to_hummock_value_batch(key_value1.clone());
1504 let key_value2: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1505 (
1506 iterator_test_table_key_of(1),
1507 SharedBufferValue::Update(Bytes::from("value12")),
1508 ),
1509 (
1510 iterator_test_table_key_of(2),
1511 SharedBufferValue::Update(Bytes::from("value22")),
1512 ),
1513 (
1514 iterator_test_table_key_of(3),
1515 SharedBufferValue::Insert(Bytes::from("value32")),
1516 ),
1517 ];
1518 let old_value2 = vec![Bytes::from("value1"), Bytes::from("value2"), Bytes::new()];
1519 let epoch = epoch.next_epoch();
1520 let imm2 = SharedBufferBatch::for_test_with_old_values(
1521 transform_shared_buffer(key_value2.clone()),
1522 old_value2.clone(),
1523 epoch,
1524 table_id,
1525 );
1526 let shared_buffer_items2 = to_hummock_value_batch(key_value2.clone());
1527
1528 let key_value3: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1529 (iterator_test_table_key_of(1), SharedBufferValue::Delete),
1530 (iterator_test_table_key_of(2), SharedBufferValue::Delete),
1531 (
1532 iterator_test_table_key_of(3),
1533 SharedBufferValue::Update(Bytes::from("value33")),
1534 ),
1535 ];
1536 let old_value3 = vec![
1537 Bytes::from("value12"),
1538 Bytes::from("value22"),
1539 Bytes::from("value32"),
1540 ];
1541 let epoch = epoch.next_epoch();
1542 let imm3 = SharedBufferBatch::for_test_with_old_values(
1543 transform_shared_buffer(key_value3.clone()),
1544 old_value3.clone(),
1545 epoch,
1546 table_id,
1547 );
1548 let shared_buffer_items3 = to_hummock_value_batch(key_value3.clone());
1549
1550 let key_values = [
1551 (key_value1, old_value1),
1552 (key_value2, old_value2),
1553 (key_value3, old_value3),
1554 ];
1555
1556 let batch_items = [
1557 shared_buffer_items1,
1558 shared_buffer_items2,
1559 shared_buffer_items3,
1560 ];
1561 let imms = vec![imm3, imm2, imm1];
1563 let merged_imm = merge_imms_in_memory(table_id, imms.clone(), None).await;
1564
1565 for (i, items) in batch_items.iter().enumerate() {
1567 for (key, value) in items {
1568 assert_eq!(
1569 merged_imm
1570 .get(
1571 TableKey(key.as_slice()),
1572 test_epoch(i as u64 + 1),
1573 &ReadOptions::default()
1574 )
1575 .unwrap()
1576 .0
1577 .as_slice(),
1578 value.as_slice(),
1579 "epoch: {}, key: {:?}",
1580 test_epoch(i as u64 + 1),
1581 String::from_utf8(key.clone())
1582 );
1583 }
1584 }
1585 assert_eq!(
1586 merged_imm.get(
1587 TableKey(iterator_test_table_key_of(4).as_slice()),
1588 test_epoch(1),
1589 &ReadOptions::default()
1590 ),
1591 None
1592 );
1593 assert_eq!(
1594 merged_imm.get(
1595 TableKey(iterator_test_table_key_of(5).as_slice()),
1596 test_epoch(1),
1597 &ReadOptions::default()
1598 ),
1599 None
1600 );
1601
1602 for i in 1..=3 {
1604 let snapshot_epoch = test_epoch(i);
1605 let mut iter = merged_imm.clone().into_forward_iter();
1606 iter.rewind().await.unwrap();
1607 let mut output = vec![];
1608 while iter.is_valid() {
1609 let epoch = iter.key().epoch_with_gap.pure_epoch();
1610 if snapshot_epoch == epoch {
1611 output.push((
1612 iter.key().user_key.table_key.to_vec(),
1613 iter.value().to_bytes(),
1614 ));
1615 }
1616 iter.next().await.unwrap();
1617 }
1618 assert_eq!(output, batch_items[i as usize - 1]);
1619 }
1620
1621 {
1623 let mut iter = merged_imm.clone().into_forward_iter();
1624 iter.rewind().await.unwrap();
1625 let mut output = vec![];
1626 while iter.is_valid() {
1627 output.push((
1628 iter.key().user_key.table_key.to_vec(),
1629 iter.value().to_bytes(),
1630 ));
1631 iter.next().await.unwrap();
1632 }
1633
1634 let mut expected = vec![];
1635 for key_idx in 0..=2 {
1636 for epoch in (1..=3).rev() {
1637 let item = batch_items[epoch - 1][key_idx].clone();
1638 expected.push(item);
1639 }
1640 }
1641 assert_eq!(expected, output);
1642
1643 let mut backward_iter = merged_imm.clone().into_backward_iter();
1644 backward_iter.rewind().await.unwrap();
1645 let mut output = vec![];
1646 while backward_iter.is_valid() {
1647 output.push((
1648 backward_iter.key().user_key.table_key.to_vec(),
1649 backward_iter.value().to_bytes(),
1650 ));
1651 backward_iter.next().await.unwrap();
1652 }
1653 let mut expected = vec![];
1654 for key_idx in (0..=2).rev() {
1655 for epoch in (1..=3).rev() {
1656 let item = batch_items[epoch - 1][key_idx].clone();
1657 expected.push(item);
1658 }
1659 }
1660 assert_eq!(expected, output);
1661 }
1662
1663 {
1665 let mut iter = merged_imm.clone().into_old_value_iter();
1666 iter.rewind().await.unwrap();
1667 let mut output = vec![];
1668 while iter.is_valid() {
1669 output.push((
1670 iter.key().user_key.table_key.to_vec(),
1671 iter.value().to_bytes(),
1672 ));
1673 iter.next().await.unwrap();
1674 }
1675
1676 let mut expected = vec![];
1677 for key_idx in 0..=2 {
1678 for epoch in (0..=2).rev() {
1679 let (key_values, old_values) = &key_values[epoch];
1680 let (key, new_value) = &key_values[key_idx];
1681 let old_value = &old_values[key_idx];
1682 if matches!(new_value, SharedBufferValue::Insert(_)) {
1683 continue;
1684 }
1685 expected.push((key.clone(), HummockValue::Put(old_value.clone())));
1686 }
1687 }
1688 assert_eq!(expected, output);
1689 }
1690 }
1691}