1use std::cmp::Ordering;
16use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::mem::size_of_val;
19use std::ops::Bound::Included;
20use std::ops::{Bound, RangeBounds};
21use std::sync::atomic::AtomicU64;
22use std::sync::atomic::Ordering::Relaxed;
23use std::sync::{Arc, LazyLock};
24
25use bytes::Bytes;
26use prometheus::IntGauge;
27use risingwave_common::catalog::TableId;
28use risingwave_hummock_sdk::EpochWithGap;
29use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange, UserKey};
30
31use crate::hummock::iterator::{
32 Backward, DirectionEnum, Forward, HummockIterator, HummockIteratorDirection, ValueMeta,
33};
34use crate::hummock::utils::{MemoryTracker, range_overlap};
35use crate::hummock::value::HummockValue;
36use crate::hummock::{HummockEpoch, HummockResult};
37use crate::mem_table::ImmId;
38use crate::store::ReadOptions;
39
40#[derive(Clone, Copy, Debug, PartialEq)]
41pub enum SharedBufferValue<T> {
42 Insert(T),
43 Update(T),
44 Delete,
45}
46
47impl<T> SharedBufferValue<T> {
48 fn to_ref(&self) -> SharedBufferValue<&T> {
49 match self {
50 SharedBufferValue::Insert(val) => SharedBufferValue::Insert(val),
51 SharedBufferValue::Update(val) => SharedBufferValue::Update(val),
52 SharedBufferValue::Delete => SharedBufferValue::Delete,
53 }
54 }
55}
56
57impl<T> From<SharedBufferValue<T>> for HummockValue<T> {
58 fn from(val: SharedBufferValue<T>) -> HummockValue<T> {
59 match val {
60 SharedBufferValue::Insert(val) | SharedBufferValue::Update(val) => {
61 HummockValue::Put(val)
62 }
63 SharedBufferValue::Delete => HummockValue::Delete,
64 }
65 }
66}
67
68impl<'a, T: AsRef<[u8]>> SharedBufferValue<&'a T> {
69 pub(crate) fn to_slice(self) -> SharedBufferValue<&'a [u8]> {
70 match self {
71 SharedBufferValue::Insert(val) => SharedBufferValue::Insert(val.as_ref()),
72 SharedBufferValue::Update(val) => SharedBufferValue::Update(val.as_ref()),
73 SharedBufferValue::Delete => SharedBufferValue::Delete,
74 }
75 }
76}
77
78pub(crate) type SharedBufferItem = (TableKey<Bytes>, SharedBufferValue<Bytes>);
80pub type SharedBufferBatchId = u64;
81
82pub(crate) type VersionedSharedBufferValue = (EpochWithGap, SharedBufferValue<Bytes>);
83
84pub(crate) struct SharedBufferVersionedEntryRef<'a> {
85 pub(crate) key: &'a TableKey<Bytes>,
86 pub(crate) new_values: &'a [VersionedSharedBufferValue],
87 pub(crate) old_values: Option<&'a [Bytes]>,
88}
89
90#[derive(PartialEq, Debug)]
91pub(crate) struct SharedBufferKeyEntry {
92 pub(crate) key: TableKey<Bytes>,
93 pub(crate) value_offset: usize,
98}
99
100impl SharedBufferKeyEntry {
101 fn value_end_offset<'a, T>(
103 i: usize,
104 entries: &'a [SharedBufferKeyEntry],
105 values: &'a [T],
106 ) -> usize {
107 entries
108 .get(i + 1)
109 .map(|entry| entry.value_offset)
110 .unwrap_or(values.len())
111 }
112
113 fn values<'a, T>(i: usize, entries: &'a [SharedBufferKeyEntry], values: &'a [T]) -> &'a [T] {
114 &values[entries[i].value_offset..Self::value_end_offset(i, entries, values)]
115 }
116}
117
118#[derive(Debug)]
119pub(crate) struct SharedBufferBatchOldValues {
120 values: Vec<Bytes>,
123 pub size: usize,
124 pub global_old_value_size: IntGauge,
125}
126
127impl Drop for SharedBufferBatchOldValues {
128 fn drop(&mut self) {
129 self.global_old_value_size.sub(self.size as _);
130 }
131}
132
133impl SharedBufferBatchOldValues {
134 pub(crate) fn new(values: Vec<Bytes>, size: usize, global_old_value_size: IntGauge) -> Self {
135 global_old_value_size.add(size as _);
136 Self {
137 values,
138 size,
139 global_old_value_size,
140 }
141 }
142
143 pub(crate) fn for_test(values: Vec<Bytes>, size: usize) -> Self {
144 Self::new(values, size, IntGauge::new("test", "test").unwrap())
145 }
146}
147
148#[derive(Debug)]
149pub(crate) struct SharedBufferBatchInner {
150 entries: Vec<SharedBufferKeyEntry>,
151 new_values: Vec<VersionedSharedBufferValue>,
152 old_values: Option<SharedBufferBatchOldValues>,
153 epochs: Vec<HummockEpoch>,
155 size: usize,
157 _tracker: Option<MemoryTracker>,
158 batch_id: SharedBufferBatchId,
161}
162
163impl SharedBufferBatchInner {
164 pub(crate) fn new(
165 epoch: HummockEpoch,
166 spill_offset: u16,
167 payload: Vec<SharedBufferItem>,
168 old_values: Option<SharedBufferBatchOldValues>,
169 size: usize,
170 _tracker: Option<MemoryTracker>,
171 ) -> Self {
172 assert!(!payload.is_empty());
173 debug_assert!(payload.iter().is_sorted_by_key(|(key, _)| key));
174 if let Some(old_values) = &old_values {
175 assert_eq!(old_values.values.len(), payload.len());
176 }
177
178 let epoch_with_gap = EpochWithGap::new(epoch, spill_offset);
179 let mut entries = Vec::with_capacity(payload.len());
180 let mut new_values = Vec::with_capacity(payload.len());
181 for (i, (key, value)) in payload.into_iter().enumerate() {
182 entries.push(SharedBufferKeyEntry {
183 key,
184 value_offset: i,
185 });
186 new_values.push((epoch_with_gap, value));
187 }
188
189 let batch_id = SHARED_BUFFER_BATCH_ID_GENERATOR.fetch_add(1, Relaxed);
190 SharedBufferBatchInner {
191 entries,
192 new_values,
193 old_values,
194 epochs: vec![epoch],
195 size,
196 _tracker,
197 batch_id,
198 }
199 }
200
201 pub fn values(&self, i: usize) -> &[VersionedSharedBufferValue] {
202 SharedBufferKeyEntry::values(i, &self.entries, &self.new_values)
203 }
204
205 #[allow(clippy::too_many_arguments)]
206 pub(crate) fn new_with_multi_epoch_batches(
207 epochs: Vec<HummockEpoch>,
208 entries: Vec<SharedBufferKeyEntry>,
209 new_values: Vec<VersionedSharedBufferValue>,
210 old_values: Option<SharedBufferBatchOldValues>,
211 size: usize,
212 imm_id: ImmId,
213 tracker: Option<MemoryTracker>,
214 ) -> Self {
215 assert!(new_values.len() >= entries.len());
216 assert!(!entries.is_empty());
217 debug_assert!(entries.iter().is_sorted_by_key(|entry| &entry.key));
218 debug_assert!(entries.iter().is_sorted_by_key(|entry| &entry.value_offset));
219 debug_assert!((0..entries.len()).all(|i| {
220 SharedBufferKeyEntry::values(i, &entries, &new_values)
221 .iter()
222 .rev()
223 .is_sorted_by_key(|(epoch_with_gap, _)| epoch_with_gap)
224 }));
225 debug_assert!(!epochs.is_empty());
226 debug_assert!(epochs.is_sorted());
227
228 Self {
229 entries,
230 new_values,
231 old_values,
232 epochs,
233 size,
234 _tracker: tracker,
235 batch_id: imm_id,
236 }
237 }
238
239 fn get_value(
242 &self,
243 table_key: TableKey<&[u8]>,
244 read_epoch: HummockEpoch,
245 ) -> Option<(HummockValue<Bytes>, EpochWithGap)> {
246 if let Ok(i) = self
248 .entries
249 .binary_search_by(|m| (m.key.as_ref()).cmp(*table_key))
250 {
251 let entry = &self.entries[i];
252 assert_eq!(entry.key.as_ref(), *table_key);
253 for (e, v) in self.values(i) {
255 if read_epoch < e.pure_epoch() {
257 continue;
258 }
259 return Some((v.clone().into(), *e));
260 }
261 }
263
264 None
265 }
266}
267
268impl PartialEq for SharedBufferBatchInner {
269 fn eq(&self, other: &Self) -> bool {
270 self.entries == other.entries && self.new_values == other.new_values
271 }
272}
273
274pub static SHARED_BUFFER_BATCH_ID_GENERATOR: LazyLock<AtomicU64> =
275 LazyLock::new(|| AtomicU64::new(0));
276
277#[derive(Clone, Debug)]
279pub struct SharedBufferBatch {
280 pub(crate) inner: Arc<SharedBufferBatchInner>,
281 pub table_id: TableId,
282}
283
284impl SharedBufferBatch {
285 pub fn for_test(
286 sorted_items: Vec<SharedBufferItem>,
287 epoch: HummockEpoch,
288 table_id: TableId,
289 ) -> Self {
290 Self::for_test_inner(sorted_items, None, epoch, table_id)
291 }
292
293 pub fn for_test_with_old_values(
294 sorted_items: Vec<SharedBufferItem>,
295 old_values: Vec<Bytes>,
296 epoch: HummockEpoch,
297 table_id: TableId,
298 ) -> Self {
299 Self::for_test_inner(sorted_items, Some(old_values), epoch, table_id)
300 }
301
302 fn for_test_inner(
303 sorted_items: Vec<SharedBufferItem>,
304 old_values: Option<Vec<Bytes>>,
305 epoch: HummockEpoch,
306 table_id: TableId,
307 ) -> Self {
308 let (size, old_value_size) = Self::measure_batch_size(&sorted_items, old_values.as_deref());
309
310 let old_values = old_values
311 .map(|old_values| SharedBufferBatchOldValues::for_test(old_values, old_value_size));
312
313 Self {
314 inner: Arc::new(SharedBufferBatchInner::new(
315 epoch,
316 0,
317 sorted_items,
318 old_values,
319 size,
320 None,
321 )),
322 table_id,
323 }
324 }
325
326 pub fn measure_delete_range_size(batch_items: &[(Bound<Bytes>, Bound<Bytes>)]) -> usize {
327 batch_items
328 .iter()
329 .map(|(left, right)| {
330 let l1 = match left {
332 Bound::Excluded(x) | Bound::Included(x) => x.len() + 13,
333 Bound::Unbounded => 13,
334 };
335 let l2 = match right {
336 Bound::Excluded(x) | Bound::Included(x) => x.len() + 13,
337 Bound::Unbounded => 13,
338 };
339 l1 + l2
340 })
341 .sum()
342 }
343
344 pub fn measure_batch_size(
346 batch_items: &[SharedBufferItem],
347 old_values: Option<&[Bytes]>,
348 ) -> (usize, usize) {
349 let old_value_size = old_values
350 .iter()
351 .flat_map(|slice| slice.iter().map(|value| size_of_val(value) + value.len()))
352 .sum::<usize>();
353 let kv_size = batch_items
355 .iter()
356 .map(|(k, v)| {
357 k.len() + {
358 match v {
359 SharedBufferValue::Insert(val) | SharedBufferValue::Update(val) => {
360 val.len()
361 }
362 SharedBufferValue::Delete => 0,
363 }
364 }
365 })
366 .sum::<usize>();
367 (kv_size + old_value_size, old_value_size)
368 }
369
370 pub fn filter<R, B>(&self, table_id: TableId, table_key_range: &R) -> bool
371 where
372 R: RangeBounds<TableKey<B>>,
373 B: AsRef<[u8]>,
374 {
375 let left = table_key_range
376 .start_bound()
377 .as_ref()
378 .map(|key| TableKey(key.0.as_ref()));
379 let right = table_key_range
380 .end_bound()
381 .as_ref()
382 .map(|key| TableKey(key.0.as_ref()));
383 self.table_id == table_id
384 && range_overlap(
385 &(left, right),
386 &self.start_table_key(),
387 Included(&self.end_table_key()),
388 )
389 }
390
391 pub fn table_id(&self) -> TableId {
392 self.table_id
393 }
394
395 pub fn min_epoch(&self) -> HummockEpoch {
396 *self.inner.epochs.first().unwrap()
397 }
398
399 pub fn max_epoch(&self) -> HummockEpoch {
400 *self.inner.epochs.last().unwrap()
401 }
402
403 pub fn key_count(&self) -> usize {
404 self.inner.entries.len()
405 }
406
407 pub fn value_count(&self) -> usize {
408 self.inner.new_values.len()
409 }
410
411 pub fn has_old_value(&self) -> bool {
412 self.inner.old_values.is_some()
413 }
414
415 pub fn get(
416 &self,
417 table_key: TableKey<&[u8]>,
418 read_epoch: HummockEpoch,
419 _read_options: &ReadOptions,
420 ) -> Option<(HummockValue<Bytes>, EpochWithGap)> {
421 self.inner.get_value(table_key, read_epoch)
422 }
423
424 pub fn range_exists(&self, table_key_range: &TableKeyRange) -> bool {
425 self.inner
426 .entries
427 .binary_search_by(|m| {
428 let key = &m.key;
429 let too_left = match &table_key_range.0 {
430 std::ops::Bound::Included(range_start) => range_start.as_ref() > key.as_ref(),
431 std::ops::Bound::Excluded(range_start) => range_start.as_ref() >= key.as_ref(),
432 std::ops::Bound::Unbounded => false,
433 };
434 if too_left {
435 return Ordering::Less;
436 }
437
438 let too_right = match &table_key_range.1 {
439 std::ops::Bound::Included(range_end) => range_end.as_ref() < key.as_ref(),
440 std::ops::Bound::Excluded(range_end) => range_end.as_ref() <= key.as_ref(),
441 std::ops::Bound::Unbounded => false,
442 };
443 if too_right {
444 return Ordering::Greater;
445 }
446
447 Ordering::Equal
448 })
449 .is_ok()
450 }
451
452 pub fn into_directed_iter<D: HummockIteratorDirection, const IS_NEW_VALUE: bool>(
453 self,
454 ) -> SharedBufferBatchIterator<D, IS_NEW_VALUE> {
455 SharedBufferBatchIterator::<D, IS_NEW_VALUE>::new(self.inner, self.table_id)
456 }
457
458 pub fn into_old_value_iter(self) -> SharedBufferBatchIterator<Forward, false> {
459 self.into_directed_iter()
460 }
461
462 pub fn into_forward_iter(self) -> SharedBufferBatchIterator<Forward> {
463 self.into_directed_iter()
464 }
465
466 pub fn into_backward_iter(self) -> SharedBufferBatchIterator<Backward> {
467 self.into_directed_iter()
468 }
469
470 #[inline(always)]
471 pub fn start_table_key(&self) -> TableKey<&[u8]> {
472 TableKey(self.inner.entries.first().expect("non-empty").key.as_ref())
473 }
474
475 #[inline(always)]
476 pub fn end_table_key(&self) -> TableKey<&[u8]> {
477 TableKey(self.inner.entries.last().expect("non-empty").key.as_ref())
478 }
479
480 #[inline(always)]
481 pub fn raw_largest_key(&self) -> &TableKey<Bytes> {
482 &self.inner.entries.last().expect("non-empty").key
483 }
484
485 pub fn start_user_key(&self) -> UserKey<&[u8]> {
488 UserKey::new(self.table_id, self.start_table_key())
489 }
490
491 pub fn size(&self) -> usize {
492 self.inner.size
493 }
494
495 pub(crate) fn old_values(&self) -> Option<&SharedBufferBatchOldValues> {
496 self.inner.old_values.as_ref()
497 }
498
499 pub fn batch_id(&self) -> SharedBufferBatchId {
500 self.inner.batch_id
501 }
502
503 pub fn epochs(&self) -> &Vec<HummockEpoch> {
504 &self.inner.epochs
505 }
506
507 pub(crate) fn build_shared_buffer_batch(
508 epoch: HummockEpoch,
509 spill_offset: u16,
510 sorted_items: Vec<SharedBufferItem>,
511 old_values: Option<SharedBufferBatchOldValues>,
512 size: usize,
513 table_id: TableId,
514 tracker: Option<MemoryTracker>,
515 ) -> Self {
516 let inner = SharedBufferBatchInner::new(
517 epoch,
518 spill_offset,
519 sorted_items,
520 old_values,
521 size,
522 tracker,
523 );
524 SharedBufferBatch {
525 inner: Arc::new(inner),
526 table_id,
527 }
528 }
529
530 #[cfg(any(test, feature = "test"))]
531 pub fn build_shared_buffer_batch_for_test(
532 epoch: HummockEpoch,
533 spill_offset: u16,
534 sorted_items: Vec<SharedBufferItem>,
535 size: usize,
536 table_id: TableId,
537 ) -> Self {
538 let inner =
539 SharedBufferBatchInner::new(epoch, spill_offset, sorted_items, None, size, None);
540 SharedBufferBatch {
541 inner: Arc::new(inner),
542 table_id,
543 }
544 }
545}
546
547pub struct SharedBufferBatchIterator<D: HummockIteratorDirection, const IS_NEW_VALUE: bool = true> {
550 inner: Arc<SharedBufferBatchInner>,
551 current_entry_idx: usize,
553 current_value_idx: usize,
555 value_end_offset: usize,
557 table_id: TableId,
558 _phantom: PhantomData<D>,
559}
560
561impl<D: HummockIteratorDirection, const IS_NEW_VALUE: bool>
562 SharedBufferBatchIterator<D, IS_NEW_VALUE>
563{
564 pub(crate) fn new(inner: Arc<SharedBufferBatchInner>, table_id: TableId) -> Self {
565 if !IS_NEW_VALUE {
566 assert!(
567 inner.old_values.is_some(),
568 "create old value iter with no old value: {:?}",
569 table_id
570 );
571 }
572 Self {
573 inner,
574 current_entry_idx: 0,
575 current_value_idx: 0,
576 value_end_offset: 0,
577 table_id,
578 _phantom: Default::default(),
579 }
580 }
581
582 fn is_valid_entry_idx(&self) -> bool {
583 self.current_entry_idx < self.inner.entries.len()
584 }
585
586 fn advance_to_next_entry(&mut self) {
587 debug_assert!(self.is_valid_entry_idx());
588 match D::direction() {
589 DirectionEnum::Forward => {
590 self.current_entry_idx += 1;
591 }
592 DirectionEnum::Backward => {
593 if self.current_entry_idx == 0 {
594 self.current_entry_idx = self.inner.entries.len();
595 } else {
596 self.current_entry_idx -= 1;
597 }
598 }
599 }
600 }
601
602 fn reset_value_idx(&mut self) {
603 debug_assert!(self.is_valid_entry_idx());
604 self.current_value_idx = self.inner.entries[self.current_entry_idx].value_offset;
605 self.value_end_offset = self.get_value_end_offset();
606 }
607
608 fn get_value_end_offset(&self) -> usize {
609 debug_assert!(self.is_valid_entry_idx());
610 SharedBufferKeyEntry::value_end_offset(
611 self.current_entry_idx,
612 &self.inner.entries,
613 &self.inner.new_values,
614 )
615 }
616
617 fn assert_valid_idx(&self) {
618 debug_assert!(self.is_valid_entry_idx());
619 debug_assert!(
620 self.current_value_idx >= self.inner.entries[self.current_entry_idx].value_offset
621 );
622 debug_assert_eq!(self.value_end_offset, self.get_value_end_offset());
623 debug_assert!(self.current_value_idx < self.value_end_offset);
624 if !IS_NEW_VALUE {
625 debug_assert!(!matches!(
626 &self.inner.new_values[self.current_value_idx].1,
627 SharedBufferValue::Insert(_)
628 ));
629 }
630 }
631
632 fn advance_to_next_value(&mut self) {
633 self.assert_valid_idx();
634
635 if self.current_value_idx + 1 < self.value_end_offset {
636 self.current_value_idx += 1;
637 } else {
638 self.advance_to_next_entry();
639 if self.is_valid_entry_idx() {
640 self.reset_value_idx();
641 }
642 }
643 }
644
645 fn advance_until_valid_old_value(&mut self) {
646 debug_assert!(!IS_NEW_VALUE);
647 if !self.is_valid_entry_idx() {
648 return;
649 }
650 loop {
651 while self.current_value_idx < self.value_end_offset
652 && matches!(
653 &self.inner.new_values[self.current_value_idx].1,
654 SharedBufferValue::Insert(_)
655 )
656 {
657 self.current_value_idx += 1;
658 }
659 if self.current_value_idx >= self.value_end_offset {
660 debug_assert_eq!(self.current_value_idx, self.value_end_offset);
661 self.advance_to_next_entry();
662 if self.is_valid_entry_idx() {
663 self.reset_value_idx();
664 continue;
665 } else {
666 break;
667 }
668 } else {
669 break;
670 }
671 }
672 }
673}
674
675impl SharedBufferBatchIterator<Forward> {
676 pub(crate) fn advance_to_next_key(&mut self) {
677 self.advance_to_next_entry();
678 if self.is_valid_entry_idx() {
679 self.reset_value_idx();
680 }
681 }
682
683 pub(crate) fn current_key_entry(&self) -> SharedBufferVersionedEntryRef<'_> {
684 self.assert_valid_idx();
685 debug_assert_eq!(
686 self.current_value_idx,
687 self.inner.entries[self.current_entry_idx].value_offset
688 );
689 SharedBufferVersionedEntryRef {
690 key: &self.inner.entries[self.current_entry_idx].key,
691 new_values: &self.inner.new_values[self.current_value_idx..self.value_end_offset],
692 old_values: self.inner.old_values.as_ref().map(|old_values| {
693 &old_values.values[self.current_value_idx..self.value_end_offset]
694 }),
695 }
696 }
697}
698
699impl<D: HummockIteratorDirection, const IS_NEW_VALUE: bool> HummockIterator
700 for SharedBufferBatchIterator<D, IS_NEW_VALUE>
701{
702 type Direction = D;
703
704 async fn next(&mut self) -> HummockResult<()> {
705 self.advance_to_next_value();
706 if !IS_NEW_VALUE {
707 self.advance_until_valid_old_value();
708 }
709 Ok(())
710 }
711
712 fn key(&self) -> FullKey<&[u8]> {
713 self.assert_valid_idx();
714 let key = self.inner.entries[self.current_entry_idx].key.as_ref();
715 let epoch_with_gap = self.inner.new_values[self.current_value_idx].0;
716 FullKey::new_with_gap_epoch(self.table_id, TableKey(key), epoch_with_gap)
717 }
718
719 fn value(&self) -> HummockValue<&[u8]> {
720 self.assert_valid_idx();
721 if IS_NEW_VALUE {
722 self.inner.new_values[self.current_value_idx]
723 .1
724 .to_ref()
725 .to_slice()
726 .into()
727 } else {
728 HummockValue::put(
729 self.inner.old_values.as_ref().unwrap().values[self.current_value_idx].as_ref(),
730 )
731 }
732 }
733
734 fn is_valid(&self) -> bool {
735 self.is_valid_entry_idx()
736 }
737
738 async fn rewind(&mut self) -> HummockResult<()> {
739 match D::direction() {
740 DirectionEnum::Forward => {
741 self.current_entry_idx = 0;
742 }
743 DirectionEnum::Backward => {
744 self.current_entry_idx = self.inner.entries.len() - 1;
745 }
746 };
747 self.reset_value_idx();
748 if !IS_NEW_VALUE {
749 self.advance_until_valid_old_value();
750 }
751 Ok(())
752 }
753
754 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
755 debug_assert_eq!(key.user_key.table_id, self.table_id);
756 let partition_point = self
759 .inner
760 .entries
761 .binary_search_by(|probe| probe.key.as_ref().cmp(*key.user_key.table_key));
762 let seek_key_epoch = key.epoch_with_gap;
763 match partition_point {
764 Ok(i) => {
765 self.current_entry_idx = i;
766 self.reset_value_idx();
767 while self.current_value_idx < self.value_end_offset {
768 let epoch_with_gap = self.inner.new_values[self.current_value_idx].0;
769 if epoch_with_gap <= seek_key_epoch {
770 break;
771 }
772 self.current_value_idx += 1;
773 }
774 if self.current_value_idx == self.value_end_offset {
775 self.advance_to_next_entry();
776 if self.is_valid_entry_idx() {
777 self.reset_value_idx();
778 }
779 }
780 }
781 Err(i) => match D::direction() {
782 DirectionEnum::Forward => {
783 self.current_entry_idx = i;
784 if self.is_valid_entry_idx() {
785 self.reset_value_idx();
786 }
787 }
788 DirectionEnum::Backward => {
789 if i == 0 {
790 self.current_entry_idx = self.inner.entries.len();
791 } else {
792 self.current_entry_idx = i - 1;
793 self.reset_value_idx();
794 }
795 }
796 },
797 };
798 if !IS_NEW_VALUE {
799 self.advance_until_valid_old_value();
800 }
801 Ok(())
802 }
803
804 fn collect_local_statistic(&self, _stats: &mut crate::monitor::StoreLocalStatistic) {}
805
806 fn value_meta(&self) -> ValueMeta {
807 ValueMeta::default()
808 }
809}
810
811#[cfg(test)]
812mod tests {
813 use std::ops::Bound::Excluded;
814
815 use itertools::{Itertools, zip_eq};
816 use risingwave_common::util::epoch::{EpochExt, test_epoch};
817 use risingwave_hummock_sdk::key::map_table_key_range;
818
819 use super::*;
820 use crate::hummock::compactor::merge_imms_in_memory;
821 use crate::hummock::iterator::test_utils::{
822 iterator_test_key_of_epoch, iterator_test_table_key_of, transform_shared_buffer,
823 };
824
825 fn to_hummock_value_batch(
826 items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)>,
827 ) -> Vec<(Vec<u8>, HummockValue<Bytes>)> {
828 items.into_iter().map(|(k, v)| (k, v.into())).collect()
829 }
830
831 #[tokio::test]
832 async fn test_shared_buffer_batch_basic() {
833 let epoch = test_epoch(1);
834 let shared_buffer_items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
835 (
836 iterator_test_table_key_of(0),
837 SharedBufferValue::Insert(Bytes::from("value1")),
838 ),
839 (
840 iterator_test_table_key_of(1),
841 SharedBufferValue::Insert(Bytes::from("value1")),
842 ),
843 (
844 iterator_test_table_key_of(2),
845 SharedBufferValue::Insert(Bytes::from("value1")),
846 ),
847 ];
848 let shared_buffer_batch = SharedBufferBatch::for_test(
849 transform_shared_buffer(shared_buffer_items.clone()),
850 epoch,
851 Default::default(),
852 );
853 let shared_buffer_items = to_hummock_value_batch(shared_buffer_items);
854
855 assert_eq!(
857 *shared_buffer_batch.start_table_key(),
858 shared_buffer_items[0].0
859 );
860 assert_eq!(
861 *shared_buffer_batch.end_table_key(),
862 shared_buffer_items[2].0
863 );
864
865 for (k, v) in &shared_buffer_items {
867 assert_eq!(
868 shared_buffer_batch
869 .get(TableKey(k.as_slice()), epoch, &ReadOptions::default())
870 .unwrap()
871 .0,
872 v.clone()
873 );
874 }
875 assert_eq!(
876 shared_buffer_batch.get(
877 TableKey(iterator_test_table_key_of(3).as_slice()),
878 epoch,
879 &ReadOptions::default()
880 ),
881 None
882 );
883 assert_eq!(
884 shared_buffer_batch.get(
885 TableKey(iterator_test_table_key_of(4).as_slice()),
886 epoch,
887 &ReadOptions::default()
888 ),
889 None
890 );
891
892 let mut iter = shared_buffer_batch.clone().into_forward_iter();
894 iter.rewind().await.unwrap();
895 let mut output = vec![];
896 while iter.is_valid() {
897 output.push((
898 iter.key().user_key.table_key.to_vec(),
899 iter.value().to_bytes(),
900 ));
901 iter.next().await.unwrap();
902 }
903 assert_eq!(output, shared_buffer_items);
904
905 let mut backward_iter = shared_buffer_batch.clone().into_backward_iter();
907 backward_iter.rewind().await.unwrap();
908 let mut output = vec![];
909 while backward_iter.is_valid() {
910 output.push((
911 backward_iter.key().user_key.table_key.to_vec(),
912 backward_iter.value().to_bytes(),
913 ));
914 backward_iter.next().await.unwrap();
915 }
916 output.reverse();
917 assert_eq!(output, shared_buffer_items);
918 }
919
920 #[tokio::test]
921 async fn test_shared_buffer_batch_seek() {
922 let epoch = test_epoch(1);
923 let shared_buffer_items = vec![
924 (
925 iterator_test_table_key_of(1),
926 SharedBufferValue::Insert(Bytes::from("value1")),
927 ),
928 (
929 iterator_test_table_key_of(2),
930 SharedBufferValue::Insert(Bytes::from("value2")),
931 ),
932 (
933 iterator_test_table_key_of(3),
934 SharedBufferValue::Insert(Bytes::from("value3")),
935 ),
936 ];
937 let shared_buffer_batch = SharedBufferBatch::for_test(
938 transform_shared_buffer(shared_buffer_items.clone()),
939 epoch,
940 Default::default(),
941 );
942 let shared_buffer_items = to_hummock_value_batch(shared_buffer_items);
943
944 let mut iter = shared_buffer_batch.clone().into_forward_iter();
946 iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
947 .await
948 .unwrap();
949 for item in &shared_buffer_items {
950 assert!(iter.is_valid());
951 assert_eq!(*iter.key().user_key.table_key, item.0);
952 assert_eq!(iter.value(), item.1.as_slice());
953 iter.next().await.unwrap();
954 }
955 assert!(!iter.is_valid());
956
957 let mut iter = shared_buffer_batch.clone().into_forward_iter();
959 iter.seek(iterator_test_key_of_epoch(4, epoch).to_ref())
960 .await
961 .unwrap();
962 assert!(!iter.is_valid());
963
964 let mut iter = shared_buffer_batch.clone().into_forward_iter();
966 iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
967 .await
968 .unwrap();
969 for item in &shared_buffer_items[1..] {
970 assert!(iter.is_valid());
971 assert_eq!(*iter.key().user_key.table_key, item.0);
972 assert_eq!(iter.value(), item.1.as_slice());
973 iter.next().await.unwrap();
974 }
975 assert!(!iter.is_valid());
976
977 let mut iter = shared_buffer_batch.clone().into_forward_iter();
979 iter.seek(iterator_test_key_of_epoch(2, test_epoch(2)).to_ref())
980 .await
981 .unwrap();
982 for item in &shared_buffer_items[1..] {
983 assert!(iter.is_valid());
984 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
985 assert_eq!(iter.value(), item.1.as_slice());
986 iter.next().await.unwrap();
987 }
988 assert!(!iter.is_valid());
989
990 let mut iter = shared_buffer_batch.clone().into_forward_iter();
992 iter.seek(iterator_test_key_of_epoch(2, test_epoch(0)).to_ref())
993 .await
994 .unwrap();
995 let item = shared_buffer_items.last().unwrap();
996 assert!(iter.is_valid());
997 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
998 assert_eq!(iter.value(), item.1.as_slice());
999 iter.next().await.unwrap();
1000 assert!(!iter.is_valid());
1001
1002 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1004 iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1005 .await
1006 .unwrap();
1007 assert!(!iter.is_valid());
1008
1009 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1011 iter.seek(iterator_test_key_of_epoch(4, epoch).to_ref())
1012 .await
1013 .unwrap();
1014 for item in shared_buffer_items.iter().rev() {
1015 assert!(iter.is_valid());
1016 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1017 assert_eq!(iter.value(), item.1.as_slice());
1018 iter.next().await.unwrap();
1019 }
1020 assert!(!iter.is_valid());
1021
1022 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1024 iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1025 .await
1026 .unwrap();
1027 for item in shared_buffer_items[0..=1].iter().rev() {
1028 assert!(iter.is_valid());
1029 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1030 assert_eq!(iter.value(), item.1.as_slice());
1031 iter.next().await.unwrap();
1032 }
1033 assert!(!iter.is_valid());
1034
1035 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1037 iter.seek(iterator_test_key_of_epoch(2, epoch.prev_epoch()).to_ref())
1038 .await
1039 .unwrap();
1040 assert!(iter.is_valid());
1041 let item = shared_buffer_items.first().unwrap();
1042 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1043 assert_eq!(iter.value(), item.1.as_slice());
1044 iter.next().await.unwrap();
1045 assert!(!iter.is_valid());
1046
1047 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1049 iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1050 .await
1051 .unwrap();
1052 for item in shared_buffer_items[0..=1].iter().rev() {
1053 assert!(iter.is_valid());
1054 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1055 assert_eq!(iter.value(), item.1.as_slice());
1056 iter.next().await.unwrap();
1057 }
1058 assert!(!iter.is_valid());
1059 }
1060
1061 #[tokio::test]
1062 async fn test_shared_buffer_batch_old_value_iter() {
1063 let epoch = test_epoch(1);
1064 let key_values = vec![
1065 (
1066 iterator_test_table_key_of(1),
1067 SharedBufferValue::Insert(Bytes::from("value1")),
1068 ),
1069 (
1070 iterator_test_table_key_of(2),
1071 SharedBufferValue::Update(Bytes::from("value2")),
1072 ),
1073 (
1074 iterator_test_table_key_of(3),
1075 SharedBufferValue::Insert(Bytes::from("value3")),
1076 ),
1077 (iterator_test_table_key_of(4), SharedBufferValue::Delete),
1078 ];
1079 let old_values = vec![
1080 Bytes::new(),
1081 Bytes::from("old_value2"),
1082 Bytes::new(),
1083 Bytes::from("old_value4"),
1084 ];
1085 let shared_buffer_batch = SharedBufferBatch::for_test_with_old_values(
1086 transform_shared_buffer(key_values.clone()),
1087 old_values.clone(),
1088 epoch,
1089 Default::default(),
1090 );
1091 let shared_buffer_items = to_hummock_value_batch(key_values.clone());
1092 let expected_old_value_iter_items = zip_eq(&key_values, &old_values)
1093 .filter(|((_, new_value), _)| !matches!(new_value, SharedBufferValue::Insert(_)))
1094 .map(|((key, _), old_value)| (key.clone(), HummockValue::Put(old_value)))
1095 .collect_vec();
1096
1097 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1098 iter.rewind().await.unwrap();
1099 for item in &expected_old_value_iter_items {
1100 assert!(iter.is_valid());
1101 assert_eq!(*iter.key().user_key.table_key, item.0);
1102 assert_eq!(iter.value(), item.1.as_slice());
1103 iter.next().await.unwrap();
1104 }
1105 assert!(!iter.is_valid());
1106
1107 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1109 iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1110 .await
1111 .unwrap();
1112 for item in &shared_buffer_items {
1113 assert!(iter.is_valid());
1114 assert_eq!(*iter.key().user_key.table_key, item.0);
1115 assert_eq!(iter.value(), item.1.as_slice());
1116 iter.next().await.unwrap();
1117 }
1118 assert!(!iter.is_valid());
1119
1120 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1121 iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1122 .await
1123 .unwrap();
1124 for item in &expected_old_value_iter_items {
1125 assert!(iter.is_valid());
1126 assert_eq!(*iter.key().user_key.table_key, item.0);
1127 assert_eq!(iter.value(), item.1.as_slice());
1128 iter.next().await.unwrap();
1129 }
1130 assert!(!iter.is_valid());
1131
1132 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1134 iter.seek(iterator_test_key_of_epoch(5, epoch).to_ref())
1135 .await
1136 .unwrap();
1137 assert!(!iter.is_valid());
1138
1139 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1140 iter.seek(iterator_test_key_of_epoch(5, epoch).to_ref())
1141 .await
1142 .unwrap();
1143 assert!(!iter.is_valid());
1144
1145 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1147 iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1148 .await
1149 .unwrap();
1150 for item in &shared_buffer_items[1..] {
1151 assert!(iter.is_valid());
1152 assert_eq!(*iter.key().user_key.table_key, item.0);
1153 assert_eq!(iter.value(), item.1.as_slice());
1154 iter.next().await.unwrap();
1155 }
1156 assert!(!iter.is_valid());
1157
1158 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1159 iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1160 .await
1161 .unwrap();
1162 for item in &expected_old_value_iter_items {
1163 assert!(iter.is_valid());
1164 assert_eq!(*iter.key().user_key.table_key, item.0);
1165 assert_eq!(iter.value(), item.1.as_slice());
1166 iter.next().await.unwrap();
1167 }
1168 assert!(!iter.is_valid());
1169
1170 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1172 iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1173 .await
1174 .unwrap();
1175 for item in &shared_buffer_items[1..] {
1176 assert!(iter.is_valid());
1177 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1178 assert_eq!(iter.value(), item.1.as_slice());
1179 iter.next().await.unwrap();
1180 }
1181 assert!(!iter.is_valid());
1182
1183 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1184 iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1185 .await
1186 .unwrap();
1187 for item in &expected_old_value_iter_items {
1188 assert!(iter.is_valid());
1189 assert_eq!(*iter.key().user_key.table_key, item.0);
1190 assert_eq!(iter.value(), item.1.as_slice());
1191 iter.next().await.unwrap();
1192 }
1193 assert!(!iter.is_valid());
1194
1195 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1196 iter.seek(iterator_test_key_of_epoch(2, epoch.prev_epoch()).to_ref())
1197 .await
1198 .unwrap();
1199 for item in &expected_old_value_iter_items[1..] {
1200 assert!(iter.is_valid());
1201 assert_eq!(*iter.key().user_key.table_key, item.0);
1202 assert_eq!(iter.value(), item.1.as_slice());
1203 iter.next().await.unwrap();
1204 }
1205 assert!(!iter.is_valid());
1206
1207 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1209 iter.seek(iterator_test_key_of_epoch(3, epoch.prev_epoch()).to_ref())
1210 .await
1211 .unwrap();
1212 let item = shared_buffer_items.last().unwrap();
1213 assert!(iter.is_valid());
1214 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1215 assert_eq!(iter.value(), item.1.as_slice());
1216 iter.next().await.unwrap();
1217 assert!(!iter.is_valid());
1218
1219 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1221 iter.seek(iterator_test_key_of_epoch(3, epoch).to_ref())
1222 .await
1223 .unwrap();
1224 for item in &expected_old_value_iter_items[1..] {
1225 assert!(iter.is_valid());
1226 assert_eq!(*iter.key().user_key.table_key, item.0);
1227 assert_eq!(iter.value(), item.1.as_slice());
1228 iter.next().await.unwrap();
1229 }
1230 assert!(!iter.is_valid());
1231 }
1232
1233 #[tokio::test]
1234 #[should_panic]
1235 async fn test_invalid_table_id() {
1236 let epoch = test_epoch(1);
1237 let shared_buffer_batch = SharedBufferBatch::for_test(vec![], epoch, Default::default());
1238 let mut iter = shared_buffer_batch.into_forward_iter();
1240 iter.seek(FullKey::for_test(TableId::new(1), vec![], epoch).to_ref())
1241 .await
1242 .unwrap();
1243 }
1244
1245 #[tokio::test]
1246 async fn test_shared_buffer_batch_range_existx() {
1247 let epoch = test_epoch(1);
1248 let shared_buffer_items = vec![
1249 (
1250 Vec::from("a_1"),
1251 SharedBufferValue::Insert(Bytes::from("value1")),
1252 ),
1253 (
1254 Vec::from("a_3"),
1255 SharedBufferValue::Insert(Bytes::from("value2")),
1256 ),
1257 (
1258 Vec::from("a_5"),
1259 SharedBufferValue::Insert(Bytes::from("value3")),
1260 ),
1261 (
1262 Vec::from("b_2"),
1263 SharedBufferValue::Insert(Bytes::from("value3")),
1264 ),
1265 ];
1266 let shared_buffer_batch = SharedBufferBatch::for_test(
1267 transform_shared_buffer(shared_buffer_items),
1268 epoch,
1269 Default::default(),
1270 );
1271
1272 let range = (Included(Bytes::from("a")), Excluded(Bytes::from("b")));
1273 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1274 let range = (Included(Bytes::from("a_")), Excluded(Bytes::from("b_")));
1275 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1276 let range = (Included(Bytes::from("a_1")), Included(Bytes::from("a_1")));
1277 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1278 let range = (Included(Bytes::from("a_1")), Included(Bytes::from("a_2")));
1279 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1280 let range = (Included(Bytes::from("a_0x")), Included(Bytes::from("a_2x")));
1281 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1282 let range = (Included(Bytes::from("a_")), Excluded(Bytes::from("c_")));
1283 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1284 let range = (Included(Bytes::from("b_0x")), Included(Bytes::from("b_2x")));
1285 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1286 let range = (Included(Bytes::from("b_2")), Excluded(Bytes::from("c_1x")));
1287 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1288
1289 let range = (Included(Bytes::from("a_0")), Excluded(Bytes::from("a_1")));
1290 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1291 let range = (Included(Bytes::from("a__0")), Excluded(Bytes::from("a__5")));
1292 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1293 let range = (Included(Bytes::from("b_1")), Excluded(Bytes::from("b_2")));
1294 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1295 let range = (Included(Bytes::from("b_3")), Excluded(Bytes::from("c_1")));
1296 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1297 let range = (Included(Bytes::from("b__x")), Excluded(Bytes::from("c__x")));
1298 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1299 }
1300
1301 #[tokio::test]
1302 async fn test_merge_imms_basic() {
1303 let table_id = TableId { table_id: 1004 };
1304 let shared_buffer_items1: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1305 (
1306 iterator_test_table_key_of(1),
1307 SharedBufferValue::Insert(Bytes::from("value1")),
1308 ),
1309 (
1310 iterator_test_table_key_of(2),
1311 SharedBufferValue::Insert(Bytes::from("value2")),
1312 ),
1313 (
1314 iterator_test_table_key_of(3),
1315 SharedBufferValue::Insert(Bytes::from("value3")),
1316 ),
1317 ];
1318 let epoch = test_epoch(1);
1319 let imm1 = SharedBufferBatch::for_test(
1320 transform_shared_buffer(shared_buffer_items1.clone()),
1321 epoch,
1322 table_id,
1323 );
1324 let shared_buffer_items1 = to_hummock_value_batch(shared_buffer_items1);
1325 let shared_buffer_items2: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1326 (
1327 iterator_test_table_key_of(1),
1328 SharedBufferValue::Insert(Bytes::from("value12")),
1329 ),
1330 (
1331 iterator_test_table_key_of(2),
1332 SharedBufferValue::Insert(Bytes::from("value22")),
1333 ),
1334 (
1335 iterator_test_table_key_of(3),
1336 SharedBufferValue::Insert(Bytes::from("value32")),
1337 ),
1338 ];
1339 let epoch = test_epoch(2);
1340 let imm2 = SharedBufferBatch::for_test(
1341 transform_shared_buffer(shared_buffer_items2.clone()),
1342 epoch,
1343 table_id,
1344 );
1345 let shared_buffer_items2 = to_hummock_value_batch(shared_buffer_items2);
1346
1347 let shared_buffer_items3: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1348 (
1349 iterator_test_table_key_of(1),
1350 SharedBufferValue::Insert(Bytes::from("value13")),
1351 ),
1352 (
1353 iterator_test_table_key_of(2),
1354 SharedBufferValue::Insert(Bytes::from("value23")),
1355 ),
1356 (
1357 iterator_test_table_key_of(3),
1358 SharedBufferValue::Insert(Bytes::from("value33")),
1359 ),
1360 ];
1361 let epoch = test_epoch(3);
1362 let imm3 = SharedBufferBatch::for_test(
1363 transform_shared_buffer(shared_buffer_items3.clone()),
1364 epoch,
1365 table_id,
1366 );
1367 let shared_buffer_items3 = to_hummock_value_batch(shared_buffer_items3);
1368
1369 let batch_items = [
1370 shared_buffer_items1,
1371 shared_buffer_items2,
1372 shared_buffer_items3,
1373 ];
1374 let imms = vec![imm3, imm2, imm1];
1376 let merged_imm = merge_imms_in_memory(table_id, imms.clone(), None).await;
1377
1378 for (i, items) in batch_items.iter().enumerate() {
1380 for (key, value) in items {
1381 assert_eq!(
1382 merged_imm
1383 .get(
1384 TableKey(key.as_slice()),
1385 test_epoch(i as u64 + 1),
1386 &ReadOptions::default()
1387 )
1388 .unwrap()
1389 .0,
1390 value.clone(),
1391 "epoch: {}, key: {:?}",
1392 test_epoch(i as u64 + 1),
1393 String::from_utf8(key.clone())
1394 );
1395 }
1396 }
1397 assert_eq!(
1398 merged_imm.get(
1399 TableKey(iterator_test_table_key_of(4).as_slice()),
1400 test_epoch(1),
1401 &ReadOptions::default()
1402 ),
1403 None
1404 );
1405 assert_eq!(
1406 merged_imm.get(
1407 TableKey(iterator_test_table_key_of(5).as_slice()),
1408 test_epoch(1),
1409 &ReadOptions::default()
1410 ),
1411 None
1412 );
1413
1414 for snapshot_epoch in 1..=3 {
1416 let mut iter = merged_imm.clone().into_forward_iter();
1417 iter.rewind().await.unwrap();
1418 let mut output = vec![];
1419 while iter.is_valid() {
1420 let epoch = iter.key().epoch_with_gap.pure_epoch();
1421 if test_epoch(snapshot_epoch) == epoch {
1422 output.push((
1423 iter.key().user_key.table_key.to_vec(),
1424 iter.value().to_bytes(),
1425 ));
1426 }
1427 iter.next().await.unwrap();
1428 }
1429 assert_eq!(output, batch_items[snapshot_epoch as usize - 1]);
1430 }
1431
1432 {
1434 let mut iter = merged_imm.clone().into_forward_iter();
1435 iter.rewind().await.unwrap();
1436 let mut output = vec![];
1437 while iter.is_valid() {
1438 output.push((
1439 iter.key().user_key.table_key.to_vec(),
1440 iter.value().to_bytes(),
1441 ));
1442 iter.next().await.unwrap();
1443 }
1444
1445 let mut expected = vec![];
1446 for key_idx in 0..=2 {
1447 for epoch in (1..=3).rev() {
1448 let item = batch_items[epoch - 1][key_idx].clone();
1449 expected.push(item);
1450 }
1451 }
1452 assert_eq!(expected, output);
1453
1454 let mut backward_iter = merged_imm.clone().into_backward_iter();
1455 backward_iter.rewind().await.unwrap();
1456 let mut output = vec![];
1457 while backward_iter.is_valid() {
1458 output.push((
1459 backward_iter.key().user_key.table_key.to_vec(),
1460 backward_iter.value().to_bytes(),
1461 ));
1462 backward_iter.next().await.unwrap();
1463 }
1464 let mut expected = vec![];
1465 for key_idx in (0..=2).rev() {
1466 for epoch in (1..=3).rev() {
1467 let item = batch_items[epoch - 1][key_idx].clone();
1468 expected.push(item);
1469 }
1470 }
1471 assert_eq!(expected, output);
1472 }
1473 }
1474
1475 #[tokio::test]
1476 async fn test_merge_imms_with_old_values() {
1477 let table_id = TableId { table_id: 1004 };
1478 let key_value1: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1479 (
1480 iterator_test_table_key_of(1),
1481 SharedBufferValue::Insert(Bytes::from("value1")),
1482 ),
1483 (
1484 iterator_test_table_key_of(2),
1485 SharedBufferValue::Update(Bytes::from("value2")),
1486 ),
1487 (iterator_test_table_key_of(3), SharedBufferValue::Delete),
1488 ];
1489 let old_value1 = vec![
1490 Bytes::new(),
1491 Bytes::from("old_value2"),
1492 Bytes::from("old_value3"),
1493 ];
1494 let epoch = test_epoch(1);
1495 let imm1 = SharedBufferBatch::for_test_with_old_values(
1496 transform_shared_buffer(key_value1.clone()),
1497 old_value1.clone(),
1498 epoch,
1499 table_id,
1500 );
1501 let shared_buffer_items1 = to_hummock_value_batch(key_value1.clone());
1502 let key_value2: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1503 (
1504 iterator_test_table_key_of(1),
1505 SharedBufferValue::Update(Bytes::from("value12")),
1506 ),
1507 (
1508 iterator_test_table_key_of(2),
1509 SharedBufferValue::Update(Bytes::from("value22")),
1510 ),
1511 (
1512 iterator_test_table_key_of(3),
1513 SharedBufferValue::Insert(Bytes::from("value32")),
1514 ),
1515 ];
1516 let old_value2 = vec![Bytes::from("value1"), Bytes::from("value2"), Bytes::new()];
1517 let epoch = epoch.next_epoch();
1518 let imm2 = SharedBufferBatch::for_test_with_old_values(
1519 transform_shared_buffer(key_value2.clone()),
1520 old_value2.clone(),
1521 epoch,
1522 table_id,
1523 );
1524 let shared_buffer_items2 = to_hummock_value_batch(key_value2.clone());
1525
1526 let key_value3: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1527 (iterator_test_table_key_of(1), SharedBufferValue::Delete),
1528 (iterator_test_table_key_of(2), SharedBufferValue::Delete),
1529 (
1530 iterator_test_table_key_of(3),
1531 SharedBufferValue::Update(Bytes::from("value33")),
1532 ),
1533 ];
1534 let old_value3 = vec![
1535 Bytes::from("value12"),
1536 Bytes::from("value22"),
1537 Bytes::from("value32"),
1538 ];
1539 let epoch = epoch.next_epoch();
1540 let imm3 = SharedBufferBatch::for_test_with_old_values(
1541 transform_shared_buffer(key_value3.clone()),
1542 old_value3.clone(),
1543 epoch,
1544 table_id,
1545 );
1546 let shared_buffer_items3 = to_hummock_value_batch(key_value3.clone());
1547
1548 let key_values = [
1549 (key_value1, old_value1),
1550 (key_value2, old_value2),
1551 (key_value3, old_value3),
1552 ];
1553
1554 let batch_items = [
1555 shared_buffer_items1,
1556 shared_buffer_items2,
1557 shared_buffer_items3,
1558 ];
1559 let imms = vec![imm3, imm2, imm1];
1561 let merged_imm = merge_imms_in_memory(table_id, imms.clone(), None).await;
1562
1563 for (i, items) in batch_items.iter().enumerate() {
1565 for (key, value) in items {
1566 assert_eq!(
1567 merged_imm
1568 .get(
1569 TableKey(key.as_slice()),
1570 test_epoch(i as u64 + 1),
1571 &ReadOptions::default()
1572 )
1573 .unwrap()
1574 .0,
1575 value.clone(),
1576 "epoch: {}, key: {:?}",
1577 test_epoch(i as u64 + 1),
1578 String::from_utf8(key.clone())
1579 );
1580 }
1581 }
1582 assert_eq!(
1583 merged_imm.get(
1584 TableKey(iterator_test_table_key_of(4).as_slice()),
1585 test_epoch(1),
1586 &ReadOptions::default()
1587 ),
1588 None
1589 );
1590 assert_eq!(
1591 merged_imm.get(
1592 TableKey(iterator_test_table_key_of(5).as_slice()),
1593 test_epoch(1),
1594 &ReadOptions::default()
1595 ),
1596 None
1597 );
1598
1599 for i in 1..=3 {
1601 let snapshot_epoch = test_epoch(i);
1602 let mut iter = merged_imm.clone().into_forward_iter();
1603 iter.rewind().await.unwrap();
1604 let mut output = vec![];
1605 while iter.is_valid() {
1606 let epoch = iter.key().epoch_with_gap.pure_epoch();
1607 if snapshot_epoch == epoch {
1608 output.push((
1609 iter.key().user_key.table_key.to_vec(),
1610 iter.value().to_bytes(),
1611 ));
1612 }
1613 iter.next().await.unwrap();
1614 }
1615 assert_eq!(output, batch_items[i as usize - 1]);
1616 }
1617
1618 {
1620 let mut iter = merged_imm.clone().into_forward_iter();
1621 iter.rewind().await.unwrap();
1622 let mut output = vec![];
1623 while iter.is_valid() {
1624 output.push((
1625 iter.key().user_key.table_key.to_vec(),
1626 iter.value().to_bytes(),
1627 ));
1628 iter.next().await.unwrap();
1629 }
1630
1631 let mut expected = vec![];
1632 for key_idx in 0..=2 {
1633 for epoch in (1..=3).rev() {
1634 let item = batch_items[epoch - 1][key_idx].clone();
1635 expected.push(item);
1636 }
1637 }
1638 assert_eq!(expected, output);
1639
1640 let mut backward_iter = merged_imm.clone().into_backward_iter();
1641 backward_iter.rewind().await.unwrap();
1642 let mut output = vec![];
1643 while backward_iter.is_valid() {
1644 output.push((
1645 backward_iter.key().user_key.table_key.to_vec(),
1646 backward_iter.value().to_bytes(),
1647 ));
1648 backward_iter.next().await.unwrap();
1649 }
1650 let mut expected = vec![];
1651 for key_idx in (0..=2).rev() {
1652 for epoch in (1..=3).rev() {
1653 let item = batch_items[epoch - 1][key_idx].clone();
1654 expected.push(item);
1655 }
1656 }
1657 assert_eq!(expected, output);
1658 }
1659
1660 {
1662 let mut iter = merged_imm.clone().into_old_value_iter();
1663 iter.rewind().await.unwrap();
1664 let mut output = vec![];
1665 while iter.is_valid() {
1666 output.push((
1667 iter.key().user_key.table_key.to_vec(),
1668 iter.value().to_bytes(),
1669 ));
1670 iter.next().await.unwrap();
1671 }
1672
1673 let mut expected = vec![];
1674 for key_idx in 0..=2 {
1675 for epoch in (0..=2).rev() {
1676 let (key_values, old_values) = &key_values[epoch];
1677 let (key, new_value) = &key_values[key_idx];
1678 let old_value = &old_values[key_idx];
1679 if matches!(new_value, SharedBufferValue::Insert(_)) {
1680 continue;
1681 }
1682 expected.push((key.clone(), HummockValue::Put(old_value.clone())));
1683 }
1684 }
1685 assert_eq!(expected, output);
1686 }
1687 }
1688}