1use std::cmp::Ordering;
16use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::mem::size_of_val;
19use std::ops::Bound::Included;
20use std::ops::{Bound, RangeBounds};
21use std::sync::atomic::AtomicU64;
22use std::sync::atomic::Ordering::Relaxed;
23use std::sync::{Arc, LazyLock};
24
25use bytes::Bytes;
26use prometheus::IntGauge;
27use risingwave_common::catalog::TableId;
28use risingwave_hummock_sdk::EpochWithGap;
29use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange, UserKey};
30
31use crate::hummock::iterator::{
32 Backward, DirectionEnum, Forward, HummockIterator, HummockIteratorDirection, ValueMeta,
33};
34use crate::hummock::utils::{MemoryTracker, range_overlap};
35use crate::hummock::value::HummockValue;
36use crate::hummock::{HummockEpoch, HummockResult};
37use crate::mem_table::ImmId;
38use crate::store::ReadOptions;
39
40#[derive(Clone, Copy, Debug, PartialEq)]
41pub enum SharedBufferValue<T> {
42 Insert(T),
43 Update(T),
44 Delete,
45}
46
47impl<T> SharedBufferValue<T> {
48 fn to_ref(&self) -> SharedBufferValue<&T> {
49 match self {
50 SharedBufferValue::Insert(val) => SharedBufferValue::Insert(val),
51 SharedBufferValue::Update(val) => SharedBufferValue::Update(val),
52 SharedBufferValue::Delete => SharedBufferValue::Delete,
53 }
54 }
55}
56
57impl<T> From<SharedBufferValue<T>> for HummockValue<T> {
58 fn from(val: SharedBufferValue<T>) -> HummockValue<T> {
59 match val {
60 SharedBufferValue::Insert(val) | SharedBufferValue::Update(val) => {
61 HummockValue::Put(val)
62 }
63 SharedBufferValue::Delete => HummockValue::Delete,
64 }
65 }
66}
67
68impl<'a, T: AsRef<[u8]>> SharedBufferValue<&'a T> {
69 pub(crate) fn to_slice(self) -> SharedBufferValue<&'a [u8]> {
70 match self {
71 SharedBufferValue::Insert(val) => SharedBufferValue::Insert(val.as_ref()),
72 SharedBufferValue::Update(val) => SharedBufferValue::Update(val.as_ref()),
73 SharedBufferValue::Delete => SharedBufferValue::Delete,
74 }
75 }
76}
77
78pub(crate) type SharedBufferItem = (TableKey<Bytes>, SharedBufferValue<Bytes>);
80pub type SharedBufferBatchId = u64;
81
82pub(crate) type VersionedSharedBufferValue = (EpochWithGap, SharedBufferValue<Bytes>);
83
84pub(crate) struct SharedBufferVersionedEntryRef<'a> {
85 pub(crate) key: &'a TableKey<Bytes>,
86 pub(crate) new_values: &'a [VersionedSharedBufferValue],
87 pub(crate) old_values: Option<&'a [Bytes]>,
88}
89
90#[derive(PartialEq, Debug)]
91pub(crate) struct SharedBufferKeyEntry {
92 pub(crate) key: TableKey<Bytes>,
93 pub(crate) value_offset: usize,
98}
99
100impl SharedBufferKeyEntry {
101 fn value_end_offset<'a, T>(
103 i: usize,
104 entries: &'a [SharedBufferKeyEntry],
105 values: &'a [T],
106 ) -> usize {
107 entries
108 .get(i + 1)
109 .map(|entry| entry.value_offset)
110 .unwrap_or(values.len())
111 }
112
113 fn values<'a, T>(i: usize, entries: &'a [SharedBufferKeyEntry], values: &'a [T]) -> &'a [T] {
114 &values[entries[i].value_offset..Self::value_end_offset(i, entries, values)]
115 }
116}
117
118#[derive(Debug)]
119pub(crate) struct SharedBufferBatchOldValues {
120 values: Vec<Bytes>,
123 pub size: usize,
124 pub global_old_value_size: IntGauge,
125}
126
127impl Drop for SharedBufferBatchOldValues {
128 fn drop(&mut self) {
129 self.global_old_value_size.sub(self.size as _);
130 }
131}
132
133impl SharedBufferBatchOldValues {
134 pub(crate) fn new(values: Vec<Bytes>, size: usize, global_old_value_size: IntGauge) -> Self {
135 global_old_value_size.add(size as _);
136 Self {
137 values,
138 size,
139 global_old_value_size,
140 }
141 }
142
143 pub(crate) fn for_test(values: Vec<Bytes>, size: usize) -> Self {
144 Self::new(values, size, IntGauge::new("test", "test").unwrap())
145 }
146}
147
148#[derive(Debug)]
149pub(crate) struct SharedBufferBatchInner {
150 entries: Vec<SharedBufferKeyEntry>,
151 new_values: Vec<VersionedSharedBufferValue>,
152 old_values: Option<SharedBufferBatchOldValues>,
153 epochs: Vec<HummockEpoch>,
155 size: usize,
157 _tracker: Option<MemoryTracker>,
158 batch_id: SharedBufferBatchId,
161}
162
163impl SharedBufferBatchInner {
164 pub(crate) fn new(
165 epoch: HummockEpoch,
166 spill_offset: u16,
167 payload: Vec<SharedBufferItem>,
168 old_values: Option<SharedBufferBatchOldValues>,
169 size: usize,
170 _tracker: Option<MemoryTracker>,
171 ) -> Self {
172 assert!(!payload.is_empty());
173 debug_assert!(payload.iter().is_sorted_by_key(|(key, _)| key));
174 if let Some(old_values) = &old_values {
175 assert_eq!(old_values.values.len(), payload.len());
176 }
177
178 let epoch_with_gap = EpochWithGap::new(epoch, spill_offset);
179 let mut entries = Vec::with_capacity(payload.len());
180 let mut new_values = Vec::with_capacity(payload.len());
181 for (i, (key, value)) in payload.into_iter().enumerate() {
182 entries.push(SharedBufferKeyEntry {
183 key,
184 value_offset: i,
185 });
186 new_values.push((epoch_with_gap, value));
187 }
188
189 let batch_id = SHARED_BUFFER_BATCH_ID_GENERATOR.fetch_add(1, Relaxed);
190 SharedBufferBatchInner {
191 entries,
192 new_values,
193 old_values,
194 epochs: vec![epoch],
195 size,
196 _tracker,
197 batch_id,
198 }
199 }
200
201 pub fn values(&self, i: usize) -> &[VersionedSharedBufferValue] {
202 SharedBufferKeyEntry::values(i, &self.entries, &self.new_values)
203 }
204
205 #[allow(clippy::too_many_arguments)]
206 pub(crate) fn new_with_multi_epoch_batches(
207 epochs: Vec<HummockEpoch>,
208 entries: Vec<SharedBufferKeyEntry>,
209 new_values: Vec<VersionedSharedBufferValue>,
210 old_values: Option<SharedBufferBatchOldValues>,
211 size: usize,
212 imm_id: ImmId,
213 tracker: Option<MemoryTracker>,
214 ) -> Self {
215 assert!(new_values.len() >= entries.len());
216 assert!(!entries.is_empty());
217 debug_assert!(entries.iter().is_sorted_by_key(|entry| &entry.key));
218 debug_assert!(entries.iter().is_sorted_by_key(|entry| &entry.value_offset));
219 debug_assert!((0..entries.len()).all(|i| {
220 SharedBufferKeyEntry::values(i, &entries, &new_values)
221 .iter()
222 .rev()
223 .is_sorted_by_key(|(epoch_with_gap, _)| epoch_with_gap)
224 }));
225 debug_assert!(!epochs.is_empty());
226 debug_assert!(epochs.is_sorted());
227
228 Self {
229 entries,
230 new_values,
231 old_values,
232 epochs,
233 size,
234 _tracker: tracker,
235 batch_id: imm_id,
236 }
237 }
238
239 fn get_value<'a>(
242 &'a self,
243 table_key: TableKey<&[u8]>,
244 read_epoch: HummockEpoch,
245 ) -> Option<(HummockValue<&'a Bytes>, EpochWithGap)> {
246 if let Ok(i) = self
248 .entries
249 .binary_search_by(|m| (m.key.as_ref()).cmp(*table_key))
250 {
251 let entry = &self.entries[i];
252 assert_eq!(entry.key.as_ref(), *table_key);
253 for (e, v) in self.values(i) {
255 if read_epoch < e.pure_epoch() {
257 continue;
258 }
259 return Some((v.to_ref().into(), *e));
260 }
261 }
263
264 None
265 }
266}
267
268impl PartialEq for SharedBufferBatchInner {
269 fn eq(&self, other: &Self) -> bool {
270 self.entries == other.entries && self.new_values == other.new_values
271 }
272}
273
274pub static SHARED_BUFFER_BATCH_ID_GENERATOR: LazyLock<AtomicU64> =
275 LazyLock::new(|| AtomicU64::new(0));
276
277#[derive(Clone, Debug)]
279pub struct SharedBufferBatch {
280 pub(crate) inner: Arc<SharedBufferBatchInner>,
281 pub table_id: TableId,
282}
283
284impl SharedBufferBatch {
285 pub fn for_test(
286 sorted_items: Vec<SharedBufferItem>,
287 epoch: HummockEpoch,
288 table_id: TableId,
289 ) -> Self {
290 Self::for_test_inner(sorted_items, None, epoch, table_id)
291 }
292
293 pub fn for_test_with_old_values(
294 sorted_items: Vec<SharedBufferItem>,
295 old_values: Vec<Bytes>,
296 epoch: HummockEpoch,
297 table_id: TableId,
298 ) -> Self {
299 Self::for_test_inner(sorted_items, Some(old_values), epoch, table_id)
300 }
301
302 fn for_test_inner(
303 sorted_items: Vec<SharedBufferItem>,
304 old_values: Option<Vec<Bytes>>,
305 epoch: HummockEpoch,
306 table_id: TableId,
307 ) -> Self {
308 let (size, old_value_size) = Self::measure_batch_size(&sorted_items, old_values.as_deref());
309
310 let old_values = old_values
311 .map(|old_values| SharedBufferBatchOldValues::for_test(old_values, old_value_size));
312
313 Self {
314 inner: Arc::new(SharedBufferBatchInner::new(
315 epoch,
316 0,
317 sorted_items,
318 old_values,
319 size,
320 None,
321 )),
322 table_id,
323 }
324 }
325
326 pub fn measure_delete_range_size(batch_items: &[(Bound<Bytes>, Bound<Bytes>)]) -> usize {
327 batch_items
328 .iter()
329 .map(|(left, right)| {
330 let l1 = match left {
332 Bound::Excluded(x) | Bound::Included(x) => x.len() + 13,
333 Bound::Unbounded => 13,
334 };
335 let l2 = match right {
336 Bound::Excluded(x) | Bound::Included(x) => x.len() + 13,
337 Bound::Unbounded => 13,
338 };
339 l1 + l2
340 })
341 .sum()
342 }
343
344 pub fn measure_batch_size(
346 batch_items: &[SharedBufferItem],
347 old_values: Option<&[Bytes]>,
348 ) -> (usize, usize) {
349 let old_value_size = old_values
350 .iter()
351 .flat_map(|slice| slice.iter().map(|value| size_of_val(value) + value.len()))
352 .sum::<usize>();
353 let kv_size = batch_items
355 .iter()
356 .map(|(k, v)| {
357 k.len() + {
358 match v {
359 SharedBufferValue::Insert(val) | SharedBufferValue::Update(val) => {
360 val.len()
361 }
362 SharedBufferValue::Delete => 0,
363 }
364 }
365 })
366 .sum::<usize>();
367 (kv_size + old_value_size, old_value_size)
368 }
369
370 pub fn filter<R, B>(&self, table_id: TableId, table_key_range: &R) -> bool
371 where
372 R: RangeBounds<TableKey<B>>,
373 B: AsRef<[u8]>,
374 {
375 let left = table_key_range
376 .start_bound()
377 .as_ref()
378 .map(|key| TableKey(key.0.as_ref()));
379 let right = table_key_range
380 .end_bound()
381 .as_ref()
382 .map(|key| TableKey(key.0.as_ref()));
383 self.table_id == table_id
384 && range_overlap(
385 &(left, right),
386 &self.start_table_key(),
387 Included(&self.end_table_key()),
388 )
389 }
390
391 pub fn table_id(&self) -> TableId {
392 self.table_id
393 }
394
395 pub fn min_epoch(&self) -> HummockEpoch {
396 *self.inner.epochs.first().unwrap()
397 }
398
399 pub fn max_epoch(&self) -> HummockEpoch {
400 *self.inner.epochs.last().unwrap()
401 }
402
403 pub fn key_count(&self) -> usize {
404 self.inner.entries.len()
405 }
406
407 pub fn value_count(&self) -> usize {
408 self.inner.new_values.len()
409 }
410
411 pub fn has_old_value(&self) -> bool {
412 self.inner.old_values.is_some()
413 }
414
415 pub fn get<'a>(
416 &'a self,
417 table_key: TableKey<&[u8]>,
418 read_epoch: HummockEpoch,
419 _read_options: &ReadOptions,
420 ) -> Option<(HummockValue<&'a Bytes>, EpochWithGap)> {
421 self.inner.get_value(table_key, read_epoch)
422 }
423
424 pub fn range_exists(&self, table_key_range: &TableKeyRange) -> bool {
425 self.inner
426 .entries
427 .binary_search_by(|m| {
428 let key = &m.key;
429 let too_left = match &table_key_range.0 {
430 std::ops::Bound::Included(range_start) => range_start.as_ref() > key.as_ref(),
431 std::ops::Bound::Excluded(range_start) => range_start.as_ref() >= key.as_ref(),
432 std::ops::Bound::Unbounded => false,
433 };
434 if too_left {
435 return Ordering::Less;
436 }
437
438 let too_right = match &table_key_range.1 {
439 std::ops::Bound::Included(range_end) => range_end.as_ref() < key.as_ref(),
440 std::ops::Bound::Excluded(range_end) => range_end.as_ref() <= key.as_ref(),
441 std::ops::Bound::Unbounded => false,
442 };
443 if too_right {
444 return Ordering::Greater;
445 }
446
447 Ordering::Equal
448 })
449 .is_ok()
450 }
451
452 pub fn into_directed_iter<D: HummockIteratorDirection, const IS_NEW_VALUE: bool>(
453 self,
454 ) -> SharedBufferBatchIterator<D, IS_NEW_VALUE> {
455 SharedBufferBatchIterator::<D, IS_NEW_VALUE>::new(self.inner, self.table_id)
456 }
457
458 pub fn into_old_value_iter(self) -> SharedBufferBatchIterator<Forward, false> {
459 self.into_directed_iter()
460 }
461
462 pub fn into_forward_iter(self) -> SharedBufferBatchIterator<Forward> {
463 self.into_directed_iter()
464 }
465
466 pub fn into_backward_iter(self) -> SharedBufferBatchIterator<Backward> {
467 self.into_directed_iter()
468 }
469
470 #[inline(always)]
471 pub fn start_table_key(&self) -> TableKey<&[u8]> {
472 TableKey(self.inner.entries.first().expect("non-empty").key.as_ref())
473 }
474
475 #[inline(always)]
476 pub fn end_table_key(&self) -> TableKey<&[u8]> {
477 TableKey(self.inner.entries.last().expect("non-empty").key.as_ref())
478 }
479
480 #[inline(always)]
481 pub fn raw_largest_key(&self) -> &TableKey<Bytes> {
482 &self.inner.entries.last().expect("non-empty").key
483 }
484
485 pub fn start_user_key(&self) -> UserKey<&[u8]> {
488 UserKey::new(self.table_id, self.start_table_key())
489 }
490
491 pub fn size(&self) -> usize {
492 self.inner.size
493 }
494
495 pub(crate) fn old_values(&self) -> Option<&SharedBufferBatchOldValues> {
496 self.inner.old_values.as_ref()
497 }
498
499 pub fn batch_id(&self) -> SharedBufferBatchId {
500 self.inner.batch_id
501 }
502
503 pub fn epochs(&self) -> &Vec<HummockEpoch> {
504 &self.inner.epochs
505 }
506
507 pub(crate) fn build_shared_buffer_batch(
508 epoch: HummockEpoch,
509 spill_offset: u16,
510 sorted_items: Vec<SharedBufferItem>,
511 old_values: Option<SharedBufferBatchOldValues>,
512 size: usize,
513 table_id: TableId,
514 tracker: Option<MemoryTracker>,
515 ) -> Self {
516 let inner = SharedBufferBatchInner::new(
517 epoch,
518 spill_offset,
519 sorted_items,
520 old_values,
521 size,
522 tracker,
523 );
524 SharedBufferBatch {
525 inner: Arc::new(inner),
526 table_id,
527 }
528 }
529
530 #[cfg(any(test, feature = "test"))]
531 pub fn build_shared_buffer_batch_for_test(
532 epoch: HummockEpoch,
533 spill_offset: u16,
534 sorted_items: Vec<SharedBufferItem>,
535 size: usize,
536 table_id: TableId,
537 ) -> Self {
538 let inner =
539 SharedBufferBatchInner::new(epoch, spill_offset, sorted_items, None, size, None);
540 SharedBufferBatch {
541 inner: Arc::new(inner),
542 table_id,
543 }
544 }
545}
546
547pub struct SharedBufferBatchIterator<D: HummockIteratorDirection, const IS_NEW_VALUE: bool = true> {
550 inner: Arc<SharedBufferBatchInner>,
551 current_entry_idx: usize,
553 current_value_idx: usize,
555 value_end_offset: usize,
557 table_id: TableId,
558 _phantom: PhantomData<D>,
559}
560
561impl<D: HummockIteratorDirection, const IS_NEW_VALUE: bool>
562 SharedBufferBatchIterator<D, IS_NEW_VALUE>
563{
564 pub(crate) fn new(inner: Arc<SharedBufferBatchInner>, table_id: TableId) -> Self {
565 if !IS_NEW_VALUE {
566 assert!(
567 inner.old_values.is_some(),
568 "create old value iter with no old value: {:?}",
569 table_id
570 );
571 }
572 Self {
573 inner,
574 current_entry_idx: 0,
575 current_value_idx: 0,
576 value_end_offset: 0,
577 table_id,
578 _phantom: Default::default(),
579 }
580 }
581
582 fn is_valid_entry_idx(&self) -> bool {
583 self.current_entry_idx < self.inner.entries.len()
584 }
585
586 fn invalidate(&mut self) {
587 self.current_entry_idx = self.inner.entries.len();
588 }
589
590 fn advance_to_next_entry(&mut self) {
591 debug_assert!(self.is_valid_entry_idx());
592 match D::direction() {
593 DirectionEnum::Forward => {
594 self.current_entry_idx += 1;
595 }
596 DirectionEnum::Backward => {
597 if self.current_entry_idx == 0 {
598 self.invalidate();
599 } else {
600 self.current_entry_idx -= 1;
601 }
602 }
603 }
604 }
605
606 fn reset_value_idx(&mut self) {
607 debug_assert!(self.is_valid_entry_idx());
608 self.current_value_idx = self.inner.entries[self.current_entry_idx].value_offset;
609 self.value_end_offset = self.get_value_end_offset();
610 }
611
612 fn get_value_end_offset(&self) -> usize {
613 debug_assert!(self.is_valid_entry_idx());
614 SharedBufferKeyEntry::value_end_offset(
615 self.current_entry_idx,
616 &self.inner.entries,
617 &self.inner.new_values,
618 )
619 }
620
621 fn assert_valid_idx(&self) {
622 debug_assert!(self.is_valid_entry_idx());
623 debug_assert!(
624 self.current_value_idx >= self.inner.entries[self.current_entry_idx].value_offset
625 );
626 debug_assert_eq!(self.value_end_offset, self.get_value_end_offset());
627 debug_assert!(self.current_value_idx < self.value_end_offset);
628 if !IS_NEW_VALUE {
629 debug_assert!(!matches!(
630 &self.inner.new_values[self.current_value_idx].1,
631 SharedBufferValue::Insert(_)
632 ));
633 }
634 }
635
636 fn advance_to_next_value(&mut self) {
637 self.assert_valid_idx();
638
639 if self.current_value_idx + 1 < self.value_end_offset {
640 self.current_value_idx += 1;
641 } else {
642 self.advance_to_next_entry();
643 if self.is_valid_entry_idx() {
644 self.reset_value_idx();
645 }
646 }
647 }
648
649 fn advance_until_valid_old_value(&mut self) {
650 debug_assert!(!IS_NEW_VALUE);
651 if !self.is_valid_entry_idx() {
652 return;
653 }
654 loop {
655 while self.current_value_idx < self.value_end_offset
656 && matches!(
657 &self.inner.new_values[self.current_value_idx].1,
658 SharedBufferValue::Insert(_)
659 )
660 {
661 self.current_value_idx += 1;
662 }
663 if self.current_value_idx >= self.value_end_offset {
664 debug_assert_eq!(self.current_value_idx, self.value_end_offset);
665 self.advance_to_next_entry();
666 if self.is_valid_entry_idx() {
667 self.reset_value_idx();
668 continue;
669 } else {
670 break;
671 }
672 } else {
673 break;
674 }
675 }
676 }
677}
678
679impl SharedBufferBatchIterator<Forward> {
680 pub(crate) fn advance_to_next_key(&mut self) {
681 self.advance_to_next_entry();
682 if self.is_valid_entry_idx() {
683 self.reset_value_idx();
684 }
685 }
686
687 pub(crate) fn current_key_entry(&self) -> SharedBufferVersionedEntryRef<'_> {
688 self.assert_valid_idx();
689 debug_assert_eq!(
690 self.current_value_idx,
691 self.inner.entries[self.current_entry_idx].value_offset
692 );
693 SharedBufferVersionedEntryRef {
694 key: &self.inner.entries[self.current_entry_idx].key,
695 new_values: &self.inner.new_values[self.current_value_idx..self.value_end_offset],
696 old_values: self.inner.old_values.as_ref().map(|old_values| {
697 &old_values.values[self.current_value_idx..self.value_end_offset]
698 }),
699 }
700 }
701}
702
703impl<D: HummockIteratorDirection, const IS_NEW_VALUE: bool> HummockIterator
704 for SharedBufferBatchIterator<D, IS_NEW_VALUE>
705{
706 type Direction = D;
707
708 async fn next(&mut self) -> HummockResult<()> {
709 self.advance_to_next_value();
710 if !IS_NEW_VALUE {
711 self.advance_until_valid_old_value();
712 }
713 Ok(())
714 }
715
716 fn key(&self) -> FullKey<&[u8]> {
717 self.assert_valid_idx();
718 let key = self.inner.entries[self.current_entry_idx].key.as_ref();
719 let epoch_with_gap = self.inner.new_values[self.current_value_idx].0;
720 FullKey::new_with_gap_epoch(self.table_id, TableKey(key), epoch_with_gap)
721 }
722
723 fn value(&self) -> HummockValue<&[u8]> {
724 self.assert_valid_idx();
725 if IS_NEW_VALUE {
726 self.inner.new_values[self.current_value_idx]
727 .1
728 .to_ref()
729 .to_slice()
730 .into()
731 } else {
732 HummockValue::put(
733 self.inner.old_values.as_ref().unwrap().values[self.current_value_idx].as_ref(),
734 )
735 }
736 }
737
738 fn is_valid(&self) -> bool {
739 self.is_valid_entry_idx()
740 }
741
742 async fn rewind(&mut self) -> HummockResult<()> {
743 match D::direction() {
744 DirectionEnum::Forward => {
745 self.current_entry_idx = 0;
746 }
747 DirectionEnum::Backward => {
748 self.current_entry_idx = self.inner.entries.len() - 1;
749 }
750 };
751 self.reset_value_idx();
752 if !IS_NEW_VALUE {
753 self.advance_until_valid_old_value();
754 }
755 Ok(())
756 }
757
758 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
759 match key.user_key.table_id.cmp(&self.table_id) {
760 Ordering::Less => {
761 match D::direction() {
762 DirectionEnum::Forward => {
763 self.rewind().await?;
765 }
766 DirectionEnum::Backward => {
767 self.invalidate();
768 return Ok(());
769 }
770 };
771 }
772 Ordering::Greater => {
773 match D::direction() {
774 DirectionEnum::Forward => {
775 self.invalidate();
776 return Ok(());
777 }
778 DirectionEnum::Backward => {
779 self.rewind().await?;
781 }
782 };
783 }
784 Ordering::Equal => (),
785 }
786 let partition_point = self
789 .inner
790 .entries
791 .binary_search_by(|probe| probe.key.as_ref().cmp(*key.user_key.table_key));
792 let seek_key_epoch = key.epoch_with_gap;
793 match partition_point {
794 Ok(i) => {
795 self.current_entry_idx = i;
796 self.reset_value_idx();
797 while self.current_value_idx < self.value_end_offset {
798 let epoch_with_gap = self.inner.new_values[self.current_value_idx].0;
799 if epoch_with_gap <= seek_key_epoch {
800 break;
801 }
802 self.current_value_idx += 1;
803 }
804 if self.current_value_idx == self.value_end_offset {
805 self.advance_to_next_entry();
806 if self.is_valid_entry_idx() {
807 self.reset_value_idx();
808 }
809 }
810 }
811 Err(i) => match D::direction() {
812 DirectionEnum::Forward => {
813 self.current_entry_idx = i;
814 if self.is_valid_entry_idx() {
815 self.reset_value_idx();
816 }
817 }
818 DirectionEnum::Backward => {
819 if i == 0 {
820 self.invalidate();
821 } else {
822 self.current_entry_idx = i - 1;
823 self.reset_value_idx();
824 }
825 }
826 },
827 };
828 if !IS_NEW_VALUE {
829 self.advance_until_valid_old_value();
830 }
831 Ok(())
832 }
833
834 fn collect_local_statistic(&self, _stats: &mut crate::monitor::StoreLocalStatistic) {}
835
836 fn value_meta(&self) -> ValueMeta {
837 ValueMeta::default()
838 }
839}
840
841#[cfg(test)]
842mod tests {
843 use std::ops::Bound::Excluded;
844
845 use itertools::{Itertools, zip_eq};
846 use risingwave_common::util::epoch::{EpochExt, test_epoch};
847 use risingwave_hummock_sdk::key::map_table_key_range;
848
849 use super::*;
850 use crate::hummock::compactor::merge_imms_in_memory;
851 use crate::hummock::iterator::test_utils::{
852 iterator_test_key_of_epoch, iterator_test_table_key_of, transform_shared_buffer,
853 };
854
855 fn to_hummock_value_batch(
856 items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)>,
857 ) -> Vec<(Vec<u8>, HummockValue<Bytes>)> {
858 items.into_iter().map(|(k, v)| (k, v.into())).collect()
859 }
860
861 #[tokio::test]
862 async fn test_shared_buffer_batch_basic() {
863 let epoch = test_epoch(1);
864 let shared_buffer_items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
865 (
866 iterator_test_table_key_of(0),
867 SharedBufferValue::Insert(Bytes::from("value1")),
868 ),
869 (
870 iterator_test_table_key_of(1),
871 SharedBufferValue::Insert(Bytes::from("value1")),
872 ),
873 (
874 iterator_test_table_key_of(2),
875 SharedBufferValue::Insert(Bytes::from("value1")),
876 ),
877 ];
878 let shared_buffer_batch = SharedBufferBatch::for_test(
879 transform_shared_buffer(shared_buffer_items.clone()),
880 epoch,
881 Default::default(),
882 );
883 let shared_buffer_items = to_hummock_value_batch(shared_buffer_items);
884
885 assert_eq!(
887 *shared_buffer_batch.start_table_key(),
888 shared_buffer_items[0].0
889 );
890 assert_eq!(
891 *shared_buffer_batch.end_table_key(),
892 shared_buffer_items[2].0
893 );
894
895 for (k, v) in &shared_buffer_items {
897 assert_eq!(
898 shared_buffer_batch
899 .get(TableKey(k.as_slice()), epoch, &ReadOptions::default())
900 .unwrap()
901 .0
902 .as_slice(),
903 v.as_slice()
904 );
905 }
906 assert_eq!(
907 shared_buffer_batch.get(
908 TableKey(iterator_test_table_key_of(3).as_slice()),
909 epoch,
910 &ReadOptions::default()
911 ),
912 None
913 );
914 assert_eq!(
915 shared_buffer_batch.get(
916 TableKey(iterator_test_table_key_of(4).as_slice()),
917 epoch,
918 &ReadOptions::default()
919 ),
920 None
921 );
922
923 let mut iter = shared_buffer_batch.clone().into_forward_iter();
925 iter.rewind().await.unwrap();
926 let mut output = vec![];
927 while iter.is_valid() {
928 output.push((
929 iter.key().user_key.table_key.to_vec(),
930 iter.value().to_bytes(),
931 ));
932 iter.next().await.unwrap();
933 }
934 assert_eq!(output, shared_buffer_items);
935
936 let mut backward_iter = shared_buffer_batch.clone().into_backward_iter();
938 backward_iter.rewind().await.unwrap();
939 let mut output = vec![];
940 while backward_iter.is_valid() {
941 output.push((
942 backward_iter.key().user_key.table_key.to_vec(),
943 backward_iter.value().to_bytes(),
944 ));
945 backward_iter.next().await.unwrap();
946 }
947 output.reverse();
948 assert_eq!(output, shared_buffer_items);
949 }
950
951 #[tokio::test]
952 async fn test_shared_buffer_batch_seek() {
953 let epoch = test_epoch(1);
954 let shared_buffer_items = vec![
955 (
956 iterator_test_table_key_of(1),
957 SharedBufferValue::Insert(Bytes::from("value1")),
958 ),
959 (
960 iterator_test_table_key_of(2),
961 SharedBufferValue::Insert(Bytes::from("value2")),
962 ),
963 (
964 iterator_test_table_key_of(3),
965 SharedBufferValue::Insert(Bytes::from("value3")),
966 ),
967 ];
968 let shared_buffer_batch = SharedBufferBatch::for_test(
969 transform_shared_buffer(shared_buffer_items.clone()),
970 epoch,
971 Default::default(),
972 );
973 let shared_buffer_items = to_hummock_value_batch(shared_buffer_items);
974
975 let mut iter = shared_buffer_batch.clone().into_forward_iter();
977 iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
978 .await
979 .unwrap();
980 for item in &shared_buffer_items {
981 assert!(iter.is_valid());
982 assert_eq!(*iter.key().user_key.table_key, item.0);
983 assert_eq!(iter.value(), item.1.as_slice());
984 iter.next().await.unwrap();
985 }
986 assert!(!iter.is_valid());
987
988 let mut iter = shared_buffer_batch.clone().into_forward_iter();
990 iter.seek(iterator_test_key_of_epoch(4, epoch).to_ref())
991 .await
992 .unwrap();
993 assert!(!iter.is_valid());
994
995 let mut iter = shared_buffer_batch.clone().into_forward_iter();
997 iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
998 .await
999 .unwrap();
1000 for item in &shared_buffer_items[1..] {
1001 assert!(iter.is_valid());
1002 assert_eq!(*iter.key().user_key.table_key, item.0);
1003 assert_eq!(iter.value(), item.1.as_slice());
1004 iter.next().await.unwrap();
1005 }
1006 assert!(!iter.is_valid());
1007
1008 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1010 iter.seek(iterator_test_key_of_epoch(2, test_epoch(2)).to_ref())
1011 .await
1012 .unwrap();
1013 for item in &shared_buffer_items[1..] {
1014 assert!(iter.is_valid());
1015 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1016 assert_eq!(iter.value(), item.1.as_slice());
1017 iter.next().await.unwrap();
1018 }
1019 assert!(!iter.is_valid());
1020
1021 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1023 iter.seek(iterator_test_key_of_epoch(2, test_epoch(0)).to_ref())
1024 .await
1025 .unwrap();
1026 let item = shared_buffer_items.last().unwrap();
1027 assert!(iter.is_valid());
1028 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1029 assert_eq!(iter.value(), item.1.as_slice());
1030 iter.next().await.unwrap();
1031 assert!(!iter.is_valid());
1032
1033 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1035 iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1036 .await
1037 .unwrap();
1038 assert!(!iter.is_valid());
1039
1040 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1042 iter.seek(iterator_test_key_of_epoch(4, epoch).to_ref())
1043 .await
1044 .unwrap();
1045 for item in shared_buffer_items.iter().rev() {
1046 assert!(iter.is_valid());
1047 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1048 assert_eq!(iter.value(), item.1.as_slice());
1049 iter.next().await.unwrap();
1050 }
1051 assert!(!iter.is_valid());
1052
1053 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1055 iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1056 .await
1057 .unwrap();
1058 for item in shared_buffer_items[0..=1].iter().rev() {
1059 assert!(iter.is_valid());
1060 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1061 assert_eq!(iter.value(), item.1.as_slice());
1062 iter.next().await.unwrap();
1063 }
1064 assert!(!iter.is_valid());
1065
1066 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1068 iter.seek(iterator_test_key_of_epoch(2, epoch.prev_epoch()).to_ref())
1069 .await
1070 .unwrap();
1071 assert!(iter.is_valid());
1072 let item = shared_buffer_items.first().unwrap();
1073 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1074 assert_eq!(iter.value(), item.1.as_slice());
1075 iter.next().await.unwrap();
1076 assert!(!iter.is_valid());
1077
1078 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1080 iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1081 .await
1082 .unwrap();
1083 for item in shared_buffer_items[0..=1].iter().rev() {
1084 assert!(iter.is_valid());
1085 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1086 assert_eq!(iter.value(), item.1.as_slice());
1087 iter.next().await.unwrap();
1088 }
1089 assert!(!iter.is_valid());
1090 }
1091
1092 #[tokio::test]
1093 async fn test_shared_buffer_batch_old_value_iter() {
1094 let epoch = test_epoch(1);
1095 let key_values = vec![
1096 (
1097 iterator_test_table_key_of(1),
1098 SharedBufferValue::Insert(Bytes::from("value1")),
1099 ),
1100 (
1101 iterator_test_table_key_of(2),
1102 SharedBufferValue::Update(Bytes::from("value2")),
1103 ),
1104 (
1105 iterator_test_table_key_of(3),
1106 SharedBufferValue::Insert(Bytes::from("value3")),
1107 ),
1108 (iterator_test_table_key_of(4), SharedBufferValue::Delete),
1109 ];
1110 let old_values = vec![
1111 Bytes::new(),
1112 Bytes::from("old_value2"),
1113 Bytes::new(),
1114 Bytes::from("old_value4"),
1115 ];
1116 let shared_buffer_batch = SharedBufferBatch::for_test_with_old_values(
1117 transform_shared_buffer(key_values.clone()),
1118 old_values.clone(),
1119 epoch,
1120 Default::default(),
1121 );
1122 let shared_buffer_items = to_hummock_value_batch(key_values.clone());
1123 let expected_old_value_iter_items = zip_eq(&key_values, &old_values)
1124 .filter(|((_, new_value), _)| !matches!(new_value, SharedBufferValue::Insert(_)))
1125 .map(|((key, _), old_value)| (key.clone(), HummockValue::Put(old_value)))
1126 .collect_vec();
1127
1128 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1129 iter.rewind().await.unwrap();
1130 for item in &expected_old_value_iter_items {
1131 assert!(iter.is_valid());
1132 assert_eq!(*iter.key().user_key.table_key, item.0);
1133 assert_eq!(iter.value(), item.1.as_slice());
1134 iter.next().await.unwrap();
1135 }
1136 assert!(!iter.is_valid());
1137
1138 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1140 iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1141 .await
1142 .unwrap();
1143 for item in &shared_buffer_items {
1144 assert!(iter.is_valid());
1145 assert_eq!(*iter.key().user_key.table_key, item.0);
1146 assert_eq!(iter.value(), item.1.as_slice());
1147 iter.next().await.unwrap();
1148 }
1149 assert!(!iter.is_valid());
1150
1151 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1152 iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1153 .await
1154 .unwrap();
1155 for item in &expected_old_value_iter_items {
1156 assert!(iter.is_valid());
1157 assert_eq!(*iter.key().user_key.table_key, item.0);
1158 assert_eq!(iter.value(), item.1.as_slice());
1159 iter.next().await.unwrap();
1160 }
1161 assert!(!iter.is_valid());
1162
1163 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1165 iter.seek(iterator_test_key_of_epoch(5, epoch).to_ref())
1166 .await
1167 .unwrap();
1168 assert!(!iter.is_valid());
1169
1170 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1171 iter.seek(iterator_test_key_of_epoch(5, epoch).to_ref())
1172 .await
1173 .unwrap();
1174 assert!(!iter.is_valid());
1175
1176 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1178 iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1179 .await
1180 .unwrap();
1181 for item in &shared_buffer_items[1..] {
1182 assert!(iter.is_valid());
1183 assert_eq!(*iter.key().user_key.table_key, item.0);
1184 assert_eq!(iter.value(), item.1.as_slice());
1185 iter.next().await.unwrap();
1186 }
1187 assert!(!iter.is_valid());
1188
1189 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1190 iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1191 .await
1192 .unwrap();
1193 for item in &expected_old_value_iter_items {
1194 assert!(iter.is_valid());
1195 assert_eq!(*iter.key().user_key.table_key, item.0);
1196 assert_eq!(iter.value(), item.1.as_slice());
1197 iter.next().await.unwrap();
1198 }
1199 assert!(!iter.is_valid());
1200
1201 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1203 iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1204 .await
1205 .unwrap();
1206 for item in &shared_buffer_items[1..] {
1207 assert!(iter.is_valid());
1208 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1209 assert_eq!(iter.value(), item.1.as_slice());
1210 iter.next().await.unwrap();
1211 }
1212 assert!(!iter.is_valid());
1213
1214 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1215 iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1216 .await
1217 .unwrap();
1218 for item in &expected_old_value_iter_items {
1219 assert!(iter.is_valid());
1220 assert_eq!(*iter.key().user_key.table_key, item.0);
1221 assert_eq!(iter.value(), item.1.as_slice());
1222 iter.next().await.unwrap();
1223 }
1224 assert!(!iter.is_valid());
1225
1226 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1227 iter.seek(iterator_test_key_of_epoch(2, epoch.prev_epoch()).to_ref())
1228 .await
1229 .unwrap();
1230 for item in &expected_old_value_iter_items[1..] {
1231 assert!(iter.is_valid());
1232 assert_eq!(*iter.key().user_key.table_key, item.0);
1233 assert_eq!(iter.value(), item.1.as_slice());
1234 iter.next().await.unwrap();
1235 }
1236 assert!(!iter.is_valid());
1237
1238 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1240 iter.seek(iterator_test_key_of_epoch(3, epoch.prev_epoch()).to_ref())
1241 .await
1242 .unwrap();
1243 let item = shared_buffer_items.last().unwrap();
1244 assert!(iter.is_valid());
1245 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1246 assert_eq!(iter.value(), item.1.as_slice());
1247 iter.next().await.unwrap();
1248 assert!(!iter.is_valid());
1249
1250 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1252 iter.seek(iterator_test_key_of_epoch(3, epoch).to_ref())
1253 .await
1254 .unwrap();
1255 for item in &expected_old_value_iter_items[1..] {
1256 assert!(iter.is_valid());
1257 assert_eq!(*iter.key().user_key.table_key, item.0);
1258 assert_eq!(iter.value(), item.1.as_slice());
1259 iter.next().await.unwrap();
1260 }
1261 assert!(!iter.is_valid());
1262 }
1263
1264 #[tokio::test]
1265 #[should_panic]
1266 async fn test_invalid_table_id() {
1267 let epoch = test_epoch(1);
1268 let shared_buffer_batch = SharedBufferBatch::for_test(vec![], epoch, Default::default());
1269 let mut iter = shared_buffer_batch.into_forward_iter();
1271 iter.seek(FullKey::for_test(TableId::new(1), vec![], epoch).to_ref())
1272 .await
1273 .unwrap();
1274 }
1275
1276 #[tokio::test]
1277 async fn test_shared_buffer_batch_range_existx() {
1278 let epoch = test_epoch(1);
1279 let shared_buffer_items = vec![
1280 (
1281 Vec::from("a_1"),
1282 SharedBufferValue::Insert(Bytes::from("value1")),
1283 ),
1284 (
1285 Vec::from("a_3"),
1286 SharedBufferValue::Insert(Bytes::from("value2")),
1287 ),
1288 (
1289 Vec::from("a_5"),
1290 SharedBufferValue::Insert(Bytes::from("value3")),
1291 ),
1292 (
1293 Vec::from("b_2"),
1294 SharedBufferValue::Insert(Bytes::from("value3")),
1295 ),
1296 ];
1297 let shared_buffer_batch = SharedBufferBatch::for_test(
1298 transform_shared_buffer(shared_buffer_items),
1299 epoch,
1300 Default::default(),
1301 );
1302
1303 let range = (Included(Bytes::from("a")), Excluded(Bytes::from("b")));
1304 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1305 let range = (Included(Bytes::from("a_")), Excluded(Bytes::from("b_")));
1306 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1307 let range = (Included(Bytes::from("a_1")), Included(Bytes::from("a_1")));
1308 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1309 let range = (Included(Bytes::from("a_1")), Included(Bytes::from("a_2")));
1310 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1311 let range = (Included(Bytes::from("a_0x")), Included(Bytes::from("a_2x")));
1312 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1313 let range = (Included(Bytes::from("a_")), Excluded(Bytes::from("c_")));
1314 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1315 let range = (Included(Bytes::from("b_0x")), Included(Bytes::from("b_2x")));
1316 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1317 let range = (Included(Bytes::from("b_2")), Excluded(Bytes::from("c_1x")));
1318 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1319
1320 let range = (Included(Bytes::from("a_0")), Excluded(Bytes::from("a_1")));
1321 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1322 let range = (Included(Bytes::from("a__0")), Excluded(Bytes::from("a__5")));
1323 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1324 let range = (Included(Bytes::from("b_1")), Excluded(Bytes::from("b_2")));
1325 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1326 let range = (Included(Bytes::from("b_3")), Excluded(Bytes::from("c_1")));
1327 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1328 let range = (Included(Bytes::from("b__x")), Excluded(Bytes::from("c__x")));
1329 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1330 }
1331
1332 #[tokio::test]
1333 async fn test_merge_imms_basic() {
1334 let table_id = TableId::new(1004);
1335 let shared_buffer_items1: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1336 (
1337 iterator_test_table_key_of(1),
1338 SharedBufferValue::Insert(Bytes::from("value1")),
1339 ),
1340 (
1341 iterator_test_table_key_of(2),
1342 SharedBufferValue::Insert(Bytes::from("value2")),
1343 ),
1344 (
1345 iterator_test_table_key_of(3),
1346 SharedBufferValue::Insert(Bytes::from("value3")),
1347 ),
1348 ];
1349 let epoch = test_epoch(1);
1350 let imm1 = SharedBufferBatch::for_test(
1351 transform_shared_buffer(shared_buffer_items1.clone()),
1352 epoch,
1353 table_id,
1354 );
1355 let shared_buffer_items1 = to_hummock_value_batch(shared_buffer_items1);
1356 let shared_buffer_items2: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1357 (
1358 iterator_test_table_key_of(1),
1359 SharedBufferValue::Insert(Bytes::from("value12")),
1360 ),
1361 (
1362 iterator_test_table_key_of(2),
1363 SharedBufferValue::Insert(Bytes::from("value22")),
1364 ),
1365 (
1366 iterator_test_table_key_of(3),
1367 SharedBufferValue::Insert(Bytes::from("value32")),
1368 ),
1369 ];
1370 let epoch = test_epoch(2);
1371 let imm2 = SharedBufferBatch::for_test(
1372 transform_shared_buffer(shared_buffer_items2.clone()),
1373 epoch,
1374 table_id,
1375 );
1376 let shared_buffer_items2 = to_hummock_value_batch(shared_buffer_items2);
1377
1378 let shared_buffer_items3: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1379 (
1380 iterator_test_table_key_of(1),
1381 SharedBufferValue::Insert(Bytes::from("value13")),
1382 ),
1383 (
1384 iterator_test_table_key_of(2),
1385 SharedBufferValue::Insert(Bytes::from("value23")),
1386 ),
1387 (
1388 iterator_test_table_key_of(3),
1389 SharedBufferValue::Insert(Bytes::from("value33")),
1390 ),
1391 ];
1392 let epoch = test_epoch(3);
1393 let imm3 = SharedBufferBatch::for_test(
1394 transform_shared_buffer(shared_buffer_items3.clone()),
1395 epoch,
1396 table_id,
1397 );
1398 let shared_buffer_items3 = to_hummock_value_batch(shared_buffer_items3);
1399
1400 let batch_items = [
1401 shared_buffer_items1,
1402 shared_buffer_items2,
1403 shared_buffer_items3,
1404 ];
1405 let imms = vec![imm3, imm2, imm1];
1407 let merged_imm = merge_imms_in_memory(table_id, imms.clone(), None).await;
1408
1409 for (i, items) in batch_items.iter().enumerate() {
1411 for (key, value) in items {
1412 assert_eq!(
1413 merged_imm
1414 .get(
1415 TableKey(key.as_slice()),
1416 test_epoch(i as u64 + 1),
1417 &ReadOptions::default()
1418 )
1419 .unwrap()
1420 .0
1421 .as_slice(),
1422 value.as_slice(),
1423 "epoch: {}, key: {:?}",
1424 test_epoch(i as u64 + 1),
1425 String::from_utf8(key.clone())
1426 );
1427 }
1428 }
1429 assert_eq!(
1430 merged_imm.get(
1431 TableKey(iterator_test_table_key_of(4).as_slice()),
1432 test_epoch(1),
1433 &ReadOptions::default()
1434 ),
1435 None
1436 );
1437 assert_eq!(
1438 merged_imm.get(
1439 TableKey(iterator_test_table_key_of(5).as_slice()),
1440 test_epoch(1),
1441 &ReadOptions::default()
1442 ),
1443 None
1444 );
1445
1446 for snapshot_epoch in 1..=3 {
1448 let mut iter = merged_imm.clone().into_forward_iter();
1449 iter.rewind().await.unwrap();
1450 let mut output = vec![];
1451 while iter.is_valid() {
1452 let epoch = iter.key().epoch_with_gap.pure_epoch();
1453 if test_epoch(snapshot_epoch) == epoch {
1454 output.push((
1455 iter.key().user_key.table_key.to_vec(),
1456 iter.value().to_bytes(),
1457 ));
1458 }
1459 iter.next().await.unwrap();
1460 }
1461 assert_eq!(output, batch_items[snapshot_epoch as usize - 1]);
1462 }
1463
1464 {
1466 let mut iter = merged_imm.clone().into_forward_iter();
1467 iter.rewind().await.unwrap();
1468 let mut output = vec![];
1469 while iter.is_valid() {
1470 output.push((
1471 iter.key().user_key.table_key.to_vec(),
1472 iter.value().to_bytes(),
1473 ));
1474 iter.next().await.unwrap();
1475 }
1476
1477 let mut expected = vec![];
1478 #[expect(clippy::needless_range_loop)]
1479 for key_idx in 0..=2 {
1480 for epoch in (1..=3).rev() {
1481 let item = batch_items[epoch - 1][key_idx].clone();
1482 expected.push(item);
1483 }
1484 }
1485 assert_eq!(expected, output);
1486
1487 let mut backward_iter = merged_imm.clone().into_backward_iter();
1488 backward_iter.rewind().await.unwrap();
1489 let mut output = vec![];
1490 while backward_iter.is_valid() {
1491 output.push((
1492 backward_iter.key().user_key.table_key.to_vec(),
1493 backward_iter.value().to_bytes(),
1494 ));
1495 backward_iter.next().await.unwrap();
1496 }
1497 let mut expected = vec![];
1498 for key_idx in (0..=2).rev() {
1499 for epoch in (1..=3).rev() {
1500 let item = batch_items[epoch - 1][key_idx].clone();
1501 expected.push(item);
1502 }
1503 }
1504 assert_eq!(expected, output);
1505 }
1506 }
1507
1508 #[tokio::test]
1509 async fn test_merge_imms_with_old_values() {
1510 let table_id = TableId::new(1004);
1511 let key_value1: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1512 (
1513 iterator_test_table_key_of(1),
1514 SharedBufferValue::Insert(Bytes::from("value1")),
1515 ),
1516 (
1517 iterator_test_table_key_of(2),
1518 SharedBufferValue::Update(Bytes::from("value2")),
1519 ),
1520 (iterator_test_table_key_of(3), SharedBufferValue::Delete),
1521 ];
1522 let old_value1 = vec![
1523 Bytes::new(),
1524 Bytes::from("old_value2"),
1525 Bytes::from("old_value3"),
1526 ];
1527 let epoch = test_epoch(1);
1528 let imm1 = SharedBufferBatch::for_test_with_old_values(
1529 transform_shared_buffer(key_value1.clone()),
1530 old_value1.clone(),
1531 epoch,
1532 table_id,
1533 );
1534 let shared_buffer_items1 = to_hummock_value_batch(key_value1.clone());
1535 let key_value2: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1536 (
1537 iterator_test_table_key_of(1),
1538 SharedBufferValue::Update(Bytes::from("value12")),
1539 ),
1540 (
1541 iterator_test_table_key_of(2),
1542 SharedBufferValue::Update(Bytes::from("value22")),
1543 ),
1544 (
1545 iterator_test_table_key_of(3),
1546 SharedBufferValue::Insert(Bytes::from("value32")),
1547 ),
1548 ];
1549 let old_value2 = vec![Bytes::from("value1"), Bytes::from("value2"), Bytes::new()];
1550 let epoch = epoch.next_epoch();
1551 let imm2 = SharedBufferBatch::for_test_with_old_values(
1552 transform_shared_buffer(key_value2.clone()),
1553 old_value2.clone(),
1554 epoch,
1555 table_id,
1556 );
1557 let shared_buffer_items2 = to_hummock_value_batch(key_value2.clone());
1558
1559 let key_value3: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1560 (iterator_test_table_key_of(1), SharedBufferValue::Delete),
1561 (iterator_test_table_key_of(2), SharedBufferValue::Delete),
1562 (
1563 iterator_test_table_key_of(3),
1564 SharedBufferValue::Update(Bytes::from("value33")),
1565 ),
1566 ];
1567 let old_value3 = vec![
1568 Bytes::from("value12"),
1569 Bytes::from("value22"),
1570 Bytes::from("value32"),
1571 ];
1572 let epoch = epoch.next_epoch();
1573 let imm3 = SharedBufferBatch::for_test_with_old_values(
1574 transform_shared_buffer(key_value3.clone()),
1575 old_value3.clone(),
1576 epoch,
1577 table_id,
1578 );
1579 let shared_buffer_items3 = to_hummock_value_batch(key_value3.clone());
1580
1581 let key_values = [
1582 (key_value1, old_value1),
1583 (key_value2, old_value2),
1584 (key_value3, old_value3),
1585 ];
1586
1587 let batch_items = [
1588 shared_buffer_items1,
1589 shared_buffer_items2,
1590 shared_buffer_items3,
1591 ];
1592 let imms = vec![imm3, imm2, imm1];
1594 let merged_imm = merge_imms_in_memory(table_id, imms.clone(), None).await;
1595
1596 for (i, items) in batch_items.iter().enumerate() {
1598 for (key, value) in items {
1599 assert_eq!(
1600 merged_imm
1601 .get(
1602 TableKey(key.as_slice()),
1603 test_epoch(i as u64 + 1),
1604 &ReadOptions::default()
1605 )
1606 .unwrap()
1607 .0
1608 .as_slice(),
1609 value.as_slice(),
1610 "epoch: {}, key: {:?}",
1611 test_epoch(i as u64 + 1),
1612 String::from_utf8(key.clone())
1613 );
1614 }
1615 }
1616 assert_eq!(
1617 merged_imm.get(
1618 TableKey(iterator_test_table_key_of(4).as_slice()),
1619 test_epoch(1),
1620 &ReadOptions::default()
1621 ),
1622 None
1623 );
1624 assert_eq!(
1625 merged_imm.get(
1626 TableKey(iterator_test_table_key_of(5).as_slice()),
1627 test_epoch(1),
1628 &ReadOptions::default()
1629 ),
1630 None
1631 );
1632
1633 for i in 1..=3 {
1635 let snapshot_epoch = test_epoch(i);
1636 let mut iter = merged_imm.clone().into_forward_iter();
1637 iter.rewind().await.unwrap();
1638 let mut output = vec![];
1639 while iter.is_valid() {
1640 let epoch = iter.key().epoch_with_gap.pure_epoch();
1641 if snapshot_epoch == epoch {
1642 output.push((
1643 iter.key().user_key.table_key.to_vec(),
1644 iter.value().to_bytes(),
1645 ));
1646 }
1647 iter.next().await.unwrap();
1648 }
1649 assert_eq!(output, batch_items[i as usize - 1]);
1650 }
1651
1652 {
1654 let mut iter = merged_imm.clone().into_forward_iter();
1655 iter.rewind().await.unwrap();
1656 let mut output = vec![];
1657 while iter.is_valid() {
1658 output.push((
1659 iter.key().user_key.table_key.to_vec(),
1660 iter.value().to_bytes(),
1661 ));
1662 iter.next().await.unwrap();
1663 }
1664
1665 let mut expected = vec![];
1666 #[expect(clippy::needless_range_loop)]
1667 for key_idx in 0..=2 {
1668 for epoch in (1..=3).rev() {
1669 let item = batch_items[epoch - 1][key_idx].clone();
1670 expected.push(item);
1671 }
1672 }
1673 assert_eq!(expected, output);
1674
1675 let mut backward_iter = merged_imm.clone().into_backward_iter();
1676 backward_iter.rewind().await.unwrap();
1677 let mut output = vec![];
1678 while backward_iter.is_valid() {
1679 output.push((
1680 backward_iter.key().user_key.table_key.to_vec(),
1681 backward_iter.value().to_bytes(),
1682 ));
1683 backward_iter.next().await.unwrap();
1684 }
1685 let mut expected = vec![];
1686 for key_idx in (0..=2).rev() {
1687 for epoch in (1..=3).rev() {
1688 let item = batch_items[epoch - 1][key_idx].clone();
1689 expected.push(item);
1690 }
1691 }
1692 assert_eq!(expected, output);
1693 }
1694
1695 {
1697 let mut iter = merged_imm.clone().into_old_value_iter();
1698 iter.rewind().await.unwrap();
1699 let mut output = vec![];
1700 while iter.is_valid() {
1701 output.push((
1702 iter.key().user_key.table_key.to_vec(),
1703 iter.value().to_bytes(),
1704 ));
1705 iter.next().await.unwrap();
1706 }
1707
1708 let mut expected = vec![];
1709 for key_idx in 0..=2 {
1710 for epoch in (0..=2).rev() {
1711 let (key_values, old_values) = &key_values[epoch];
1712 let (key, new_value) = &key_values[key_idx];
1713 let old_value = &old_values[key_idx];
1714 if matches!(new_value, SharedBufferValue::Insert(_)) {
1715 continue;
1716 }
1717 expected.push((key.clone(), HummockValue::Put(old_value.clone())));
1718 }
1719 }
1720 assert_eq!(expected, output);
1721 }
1722 }
1723}