1use std::cmp::Ordering;
16use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::mem::size_of_val;
19use std::ops::Bound::Included;
20use std::ops::{Bound, RangeBounds};
21use std::sync::atomic::AtomicU64;
22use std::sync::atomic::Ordering::Relaxed;
23use std::sync::{Arc, LazyLock};
24
25use bytes::Bytes;
26use prometheus::IntGauge;
27use risingwave_common::catalog::TableId;
28use risingwave_hummock_sdk::EpochWithGap;
29use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange, UserKey};
30
31use crate::hummock::iterator::{
32 Backward, DirectionEnum, Forward, HummockIterator, HummockIteratorDirection, ValueMeta,
33};
34use crate::hummock::utils::{MemoryTracker, range_overlap};
35use crate::hummock::value::HummockValue;
36use crate::hummock::{HummockEpoch, HummockResult};
37use crate::mem_table::ImmId;
38use crate::store::ReadOptions;
39
40#[derive(Clone, Copy, Debug, PartialEq)]
41pub enum SharedBufferValue<T> {
42 Insert(T),
43 Update(T),
44 Delete,
45}
46
47impl<T> SharedBufferValue<T> {
48 fn to_ref(&self) -> SharedBufferValue<&T> {
49 match self {
50 SharedBufferValue::Insert(val) => SharedBufferValue::Insert(val),
51 SharedBufferValue::Update(val) => SharedBufferValue::Update(val),
52 SharedBufferValue::Delete => SharedBufferValue::Delete,
53 }
54 }
55}
56
57impl<T> From<SharedBufferValue<T>> for HummockValue<T> {
58 fn from(val: SharedBufferValue<T>) -> HummockValue<T> {
59 match val {
60 SharedBufferValue::Insert(val) | SharedBufferValue::Update(val) => {
61 HummockValue::Put(val)
62 }
63 SharedBufferValue::Delete => HummockValue::Delete,
64 }
65 }
66}
67
68impl<'a, T: AsRef<[u8]>> SharedBufferValue<&'a T> {
69 pub(crate) fn to_slice(self) -> SharedBufferValue<&'a [u8]> {
70 match self {
71 SharedBufferValue::Insert(val) => SharedBufferValue::Insert(val.as_ref()),
72 SharedBufferValue::Update(val) => SharedBufferValue::Update(val.as_ref()),
73 SharedBufferValue::Delete => SharedBufferValue::Delete,
74 }
75 }
76}
77
78pub(crate) type SharedBufferItem = (TableKey<Bytes>, SharedBufferValue<Bytes>);
80pub type SharedBufferBatchId = u64;
81
82pub(crate) type VersionedSharedBufferValue = (EpochWithGap, SharedBufferValue<Bytes>);
83
84pub(crate) struct SharedBufferVersionedEntryRef<'a> {
85 pub(crate) key: &'a TableKey<Bytes>,
86 pub(crate) new_values: &'a [VersionedSharedBufferValue],
87 pub(crate) old_values: Option<&'a [Bytes]>,
88}
89
90#[derive(PartialEq, Debug)]
91pub(crate) struct SharedBufferKeyEntry {
92 pub(crate) key: TableKey<Bytes>,
93 pub(crate) value_offset: usize,
98}
99
100impl SharedBufferKeyEntry {
101 fn value_end_offset<'a, T>(
103 i: usize,
104 entries: &'a [SharedBufferKeyEntry],
105 values: &'a [T],
106 ) -> usize {
107 entries
108 .get(i + 1)
109 .map(|entry| entry.value_offset)
110 .unwrap_or(values.len())
111 }
112
113 fn values<'a, T>(i: usize, entries: &'a [SharedBufferKeyEntry], values: &'a [T]) -> &'a [T] {
114 &values[entries[i].value_offset..Self::value_end_offset(i, entries, values)]
115 }
116}
117
118#[derive(Debug)]
119pub(crate) struct SharedBufferBatchOldValues {
120 values: Vec<Bytes>,
123 pub size: usize,
124 pub global_old_value_size: IntGauge,
125}
126
127impl Drop for SharedBufferBatchOldValues {
128 fn drop(&mut self) {
129 self.global_old_value_size.sub(self.size as _);
130 }
131}
132
133impl SharedBufferBatchOldValues {
134 pub(crate) fn new(values: Vec<Bytes>, size: usize, global_old_value_size: IntGauge) -> Self {
135 global_old_value_size.add(size as _);
136 Self {
137 values,
138 size,
139 global_old_value_size,
140 }
141 }
142
143 pub(crate) fn for_test(values: Vec<Bytes>, size: usize) -> Self {
144 Self::new(values, size, IntGauge::new("test", "test").unwrap())
145 }
146}
147
148#[derive(Debug)]
149pub(crate) struct SharedBufferBatchInner {
150 entries: Vec<SharedBufferKeyEntry>,
151 new_values: Vec<VersionedSharedBufferValue>,
152 old_values: Option<SharedBufferBatchOldValues>,
153 epochs: Vec<HummockEpoch>,
155 size: usize,
157 _tracker: Option<MemoryTracker>,
158 batch_id: SharedBufferBatchId,
161}
162
163impl SharedBufferBatchInner {
164 pub(crate) fn new(
165 epoch: HummockEpoch,
166 spill_offset: u16,
167 payload: Vec<SharedBufferItem>,
168 old_values: Option<SharedBufferBatchOldValues>,
169 size: usize,
170 _tracker: Option<MemoryTracker>,
171 ) -> Self {
172 assert!(!payload.is_empty());
173 debug_assert!(payload.iter().is_sorted_by_key(|(key, _)| key));
174 if let Some(old_values) = &old_values {
175 assert_eq!(old_values.values.len(), payload.len());
176 }
177
178 let epoch_with_gap = EpochWithGap::new(epoch, spill_offset);
179 let mut entries = Vec::with_capacity(payload.len());
180 let mut new_values = Vec::with_capacity(payload.len());
181 for (i, (key, value)) in payload.into_iter().enumerate() {
182 entries.push(SharedBufferKeyEntry {
183 key,
184 value_offset: i,
185 });
186 new_values.push((epoch_with_gap, value));
187 }
188
189 let batch_id = SHARED_BUFFER_BATCH_ID_GENERATOR.fetch_add(1, Relaxed);
190 SharedBufferBatchInner {
191 entries,
192 new_values,
193 old_values,
194 epochs: vec![epoch],
195 size,
196 _tracker,
197 batch_id,
198 }
199 }
200
201 pub fn values(&self, i: usize) -> &[VersionedSharedBufferValue] {
202 SharedBufferKeyEntry::values(i, &self.entries, &self.new_values)
203 }
204
205 #[allow(clippy::too_many_arguments)]
206 pub(crate) fn new_with_multi_epoch_batches(
207 epochs: Vec<HummockEpoch>,
208 entries: Vec<SharedBufferKeyEntry>,
209 new_values: Vec<VersionedSharedBufferValue>,
210 old_values: Option<SharedBufferBatchOldValues>,
211 size: usize,
212 imm_id: ImmId,
213 tracker: Option<MemoryTracker>,
214 ) -> Self {
215 assert!(new_values.len() >= entries.len());
216 assert!(!entries.is_empty());
217 debug_assert!(entries.iter().is_sorted_by_key(|entry| &entry.key));
218 debug_assert!(entries.iter().is_sorted_by_key(|entry| &entry.value_offset));
219 debug_assert!((0..entries.len()).all(|i| {
220 SharedBufferKeyEntry::values(i, &entries, &new_values)
221 .iter()
222 .rev()
223 .is_sorted_by_key(|(epoch_with_gap, _)| epoch_with_gap)
224 }));
225 debug_assert!(!epochs.is_empty());
226 debug_assert!(epochs.is_sorted());
227
228 Self {
229 entries,
230 new_values,
231 old_values,
232 epochs,
233 size,
234 _tracker: tracker,
235 batch_id: imm_id,
236 }
237 }
238
239 fn get_value<'a>(
242 &'a self,
243 table_key: TableKey<&[u8]>,
244 read_epoch: HummockEpoch,
245 ) -> Option<(HummockValue<&'a Bytes>, EpochWithGap)> {
246 if let Ok(i) = self
248 .entries
249 .binary_search_by(|m| (m.key.as_ref()).cmp(*table_key))
250 {
251 let entry = &self.entries[i];
252 assert_eq!(entry.key.as_ref(), *table_key);
253 for (e, v) in self.values(i) {
255 if read_epoch < e.pure_epoch() {
257 continue;
258 }
259 return Some((v.to_ref().into(), *e));
260 }
261 }
263
264 None
265 }
266}
267
268impl PartialEq for SharedBufferBatchInner {
269 fn eq(&self, other: &Self) -> bool {
270 self.entries == other.entries && self.new_values == other.new_values
271 }
272}
273
274pub static SHARED_BUFFER_BATCH_ID_GENERATOR: LazyLock<AtomicU64> =
275 LazyLock::new(|| AtomicU64::new(0));
276
277#[derive(Clone, Debug)]
279pub struct SharedBufferBatch {
280 pub(crate) inner: Arc<SharedBufferBatchInner>,
281 pub table_id: TableId,
282}
283
284impl SharedBufferBatch {
285 pub fn for_test(
286 sorted_items: Vec<SharedBufferItem>,
287 epoch: HummockEpoch,
288 table_id: TableId,
289 ) -> Self {
290 Self::for_test_inner(sorted_items, None, epoch, table_id)
291 }
292
293 pub fn for_test_with_old_values(
294 sorted_items: Vec<SharedBufferItem>,
295 old_values: Vec<Bytes>,
296 epoch: HummockEpoch,
297 table_id: TableId,
298 ) -> Self {
299 Self::for_test_inner(sorted_items, Some(old_values), epoch, table_id)
300 }
301
302 fn for_test_inner(
303 sorted_items: Vec<SharedBufferItem>,
304 old_values: Option<Vec<Bytes>>,
305 epoch: HummockEpoch,
306 table_id: TableId,
307 ) -> Self {
308 let (size, old_value_size) = Self::measure_batch_size(&sorted_items, old_values.as_deref());
309
310 let old_values = old_values
311 .map(|old_values| SharedBufferBatchOldValues::for_test(old_values, old_value_size));
312
313 Self {
314 inner: Arc::new(SharedBufferBatchInner::new(
315 epoch,
316 0,
317 sorted_items,
318 old_values,
319 size,
320 None,
321 )),
322 table_id,
323 }
324 }
325
326 pub fn measure_delete_range_size(batch_items: &[(Bound<Bytes>, Bound<Bytes>)]) -> usize {
327 batch_items
328 .iter()
329 .map(|(left, right)| {
330 let l1 = match left {
332 Bound::Excluded(x) | Bound::Included(x) => x.len() + 13,
333 Bound::Unbounded => 13,
334 };
335 let l2 = match right {
336 Bound::Excluded(x) | Bound::Included(x) => x.len() + 13,
337 Bound::Unbounded => 13,
338 };
339 l1 + l2
340 })
341 .sum()
342 }
343
344 pub fn measure_batch_size(
346 batch_items: &[SharedBufferItem],
347 old_values: Option<&[Bytes]>,
348 ) -> (usize, usize) {
349 let old_value_size = old_values
350 .iter()
351 .flat_map(|slice| slice.iter().map(|value| size_of_val(value) + value.len()))
352 .sum::<usize>();
353 let kv_size = batch_items
355 .iter()
356 .map(|(k, v)| {
357 k.len() + {
358 match v {
359 SharedBufferValue::Insert(val) | SharedBufferValue::Update(val) => {
360 val.len()
361 }
362 SharedBufferValue::Delete => 0,
363 }
364 }
365 })
366 .sum::<usize>();
367 (kv_size + old_value_size, old_value_size)
368 }
369
370 pub fn filter<R, B>(&self, table_id: TableId, table_key_range: &R) -> bool
371 where
372 R: RangeBounds<TableKey<B>>,
373 B: AsRef<[u8]>,
374 {
375 let left = table_key_range
376 .start_bound()
377 .as_ref()
378 .map(|key| TableKey(key.0.as_ref()));
379 let right = table_key_range
380 .end_bound()
381 .as_ref()
382 .map(|key| TableKey(key.0.as_ref()));
383 self.table_id == table_id
384 && range_overlap(
385 &(left, right),
386 &self.start_table_key(),
387 Included(&self.end_table_key()),
388 )
389 }
390
391 pub fn table_id(&self) -> TableId {
392 self.table_id
393 }
394
395 pub fn min_epoch(&self) -> HummockEpoch {
396 *self.inner.epochs.first().unwrap()
397 }
398
399 pub fn max_epoch(&self) -> HummockEpoch {
400 *self.inner.epochs.last().unwrap()
401 }
402
403 pub fn key_count(&self) -> usize {
404 self.inner.entries.len()
405 }
406
407 pub fn value_count(&self) -> usize {
408 self.inner.new_values.len()
409 }
410
411 pub fn has_old_value(&self) -> bool {
412 self.inner.old_values.is_some()
413 }
414
415 pub fn get<'a>(
416 &'a self,
417 table_key: TableKey<&[u8]>,
418 read_epoch: HummockEpoch,
419 _read_options: &ReadOptions,
420 ) -> Option<(HummockValue<&'a Bytes>, EpochWithGap)> {
421 self.inner.get_value(table_key, read_epoch)
422 }
423
424 pub fn range_exists(&self, table_key_range: &TableKeyRange) -> bool {
425 self.inner
426 .entries
427 .binary_search_by(|m| {
428 let key = &m.key;
429 let too_left = match &table_key_range.0 {
430 std::ops::Bound::Included(range_start) => range_start.as_ref() > key.as_ref(),
431 std::ops::Bound::Excluded(range_start) => range_start.as_ref() >= key.as_ref(),
432 std::ops::Bound::Unbounded => false,
433 };
434 if too_left {
435 return Ordering::Less;
436 }
437
438 let too_right = match &table_key_range.1 {
439 std::ops::Bound::Included(range_end) => range_end.as_ref() < key.as_ref(),
440 std::ops::Bound::Excluded(range_end) => range_end.as_ref() <= key.as_ref(),
441 std::ops::Bound::Unbounded => false,
442 };
443 if too_right {
444 return Ordering::Greater;
445 }
446
447 Ordering::Equal
448 })
449 .is_ok()
450 }
451
452 pub fn into_directed_iter<D: HummockIteratorDirection, const IS_NEW_VALUE: bool>(
453 self,
454 ) -> SharedBufferBatchIterator<D, IS_NEW_VALUE> {
455 SharedBufferBatchIterator::<D, IS_NEW_VALUE>::new(self.inner, self.table_id)
456 }
457
458 pub fn into_old_value_iter(self) -> SharedBufferBatchIterator<Forward, false> {
459 self.into_directed_iter()
460 }
461
462 pub fn into_forward_iter(self) -> SharedBufferBatchIterator<Forward> {
463 self.into_directed_iter()
464 }
465
466 pub fn into_backward_iter(self) -> SharedBufferBatchIterator<Backward> {
467 self.into_directed_iter()
468 }
469
470 #[inline(always)]
471 pub fn start_table_key(&self) -> TableKey<&[u8]> {
472 TableKey(self.inner.entries.first().expect("non-empty").key.as_ref())
473 }
474
475 #[inline(always)]
476 pub fn end_table_key(&self) -> TableKey<&[u8]> {
477 TableKey(self.inner.entries.last().expect("non-empty").key.as_ref())
478 }
479
480 #[inline(always)]
481 pub fn raw_largest_key(&self) -> &TableKey<Bytes> {
482 &self.inner.entries.last().expect("non-empty").key
483 }
484
485 pub fn start_user_key(&self) -> UserKey<&[u8]> {
488 UserKey::new(self.table_id, self.start_table_key())
489 }
490
491 pub fn size(&self) -> usize {
492 self.inner.size
493 }
494
495 pub(crate) fn old_values(&self) -> Option<&SharedBufferBatchOldValues> {
496 self.inner.old_values.as_ref()
497 }
498
499 pub fn batch_id(&self) -> SharedBufferBatchId {
500 self.inner.batch_id
501 }
502
503 pub fn epochs(&self) -> &Vec<HummockEpoch> {
504 &self.inner.epochs
505 }
506
507 pub(crate) fn build_shared_buffer_batch(
508 epoch: HummockEpoch,
509 spill_offset: u16,
510 sorted_items: Vec<SharedBufferItem>,
511 old_values: Option<SharedBufferBatchOldValues>,
512 size: usize,
513 table_id: TableId,
514 tracker: Option<MemoryTracker>,
515 ) -> Self {
516 let inner = SharedBufferBatchInner::new(
517 epoch,
518 spill_offset,
519 sorted_items,
520 old_values,
521 size,
522 tracker,
523 );
524 SharedBufferBatch {
525 inner: Arc::new(inner),
526 table_id,
527 }
528 }
529
530 #[cfg(any(test, feature = "test"))]
531 pub fn build_shared_buffer_batch_for_test(
532 epoch: HummockEpoch,
533 spill_offset: u16,
534 sorted_items: Vec<SharedBufferItem>,
535 size: usize,
536 table_id: TableId,
537 ) -> Self {
538 let inner =
539 SharedBufferBatchInner::new(epoch, spill_offset, sorted_items, None, size, None);
540 SharedBufferBatch {
541 inner: Arc::new(inner),
542 table_id,
543 }
544 }
545}
546
547pub struct SharedBufferBatchIterator<D: HummockIteratorDirection, const IS_NEW_VALUE: bool = true> {
550 inner: Arc<SharedBufferBatchInner>,
551 current_entry_idx: usize,
553 current_value_idx: usize,
555 value_end_offset: usize,
557 table_id: TableId,
558 _phantom: PhantomData<D>,
559}
560
561impl<D: HummockIteratorDirection, const IS_NEW_VALUE: bool>
562 SharedBufferBatchIterator<D, IS_NEW_VALUE>
563{
564 pub(crate) fn new(inner: Arc<SharedBufferBatchInner>, table_id: TableId) -> Self {
565 if !IS_NEW_VALUE {
566 assert!(
567 inner.old_values.is_some(),
568 "create old value iter with no old value: {:?}",
569 table_id
570 );
571 }
572 Self {
573 inner,
574 current_entry_idx: 0,
575 current_value_idx: 0,
576 value_end_offset: 0,
577 table_id,
578 _phantom: Default::default(),
579 }
580 }
581
582 fn is_valid_entry_idx(&self) -> bool {
583 self.current_entry_idx < self.inner.entries.len()
584 }
585
586 fn invalidate(&mut self) {
587 self.current_entry_idx = self.inner.entries.len();
588 }
589
590 fn advance_to_next_entry(&mut self) {
591 debug_assert!(self.is_valid_entry_idx());
592 match D::direction() {
593 DirectionEnum::Forward => {
594 self.current_entry_idx += 1;
595 }
596 DirectionEnum::Backward => {
597 if self.current_entry_idx == 0 {
598 self.invalidate();
599 } else {
600 self.current_entry_idx -= 1;
601 }
602 }
603 }
604 }
605
606 fn reset_value_idx(&mut self) {
607 debug_assert!(self.is_valid_entry_idx());
608 self.current_value_idx = self.inner.entries[self.current_entry_idx].value_offset;
609 self.value_end_offset = self.get_value_end_offset();
610 }
611
612 fn get_value_end_offset(&self) -> usize {
613 debug_assert!(self.is_valid_entry_idx());
614 SharedBufferKeyEntry::value_end_offset(
615 self.current_entry_idx,
616 &self.inner.entries,
617 &self.inner.new_values,
618 )
619 }
620
621 fn assert_valid_idx(&self) {
622 debug_assert!(self.is_valid_entry_idx());
623 debug_assert!(
624 self.current_value_idx >= self.inner.entries[self.current_entry_idx].value_offset
625 );
626 debug_assert_eq!(self.value_end_offset, self.get_value_end_offset());
627 debug_assert!(self.current_value_idx < self.value_end_offset);
628 if !IS_NEW_VALUE {
629 debug_assert!(!matches!(
630 &self.inner.new_values[self.current_value_idx].1,
631 SharedBufferValue::Insert(_)
632 ));
633 }
634 }
635
636 fn advance_to_next_value(&mut self) {
637 self.assert_valid_idx();
638
639 if self.current_value_idx + 1 < self.value_end_offset {
640 self.current_value_idx += 1;
641 } else {
642 self.advance_to_next_entry();
643 if self.is_valid_entry_idx() {
644 self.reset_value_idx();
645 }
646 }
647 }
648
649 fn advance_until_valid_old_value(&mut self) {
650 debug_assert!(!IS_NEW_VALUE);
651 if !self.is_valid_entry_idx() {
652 return;
653 }
654 loop {
655 while self.current_value_idx < self.value_end_offset
656 && matches!(
657 &self.inner.new_values[self.current_value_idx].1,
658 SharedBufferValue::Insert(_)
659 )
660 {
661 self.current_value_idx += 1;
662 }
663 if self.current_value_idx >= self.value_end_offset {
664 debug_assert_eq!(self.current_value_idx, self.value_end_offset);
665 self.advance_to_next_entry();
666 if self.is_valid_entry_idx() {
667 self.reset_value_idx();
668 continue;
669 } else {
670 break;
671 }
672 } else {
673 break;
674 }
675 }
676 }
677}
678
679impl SharedBufferBatchIterator<Forward> {
680 pub(crate) fn advance_to_next_key(&mut self) {
681 self.advance_to_next_entry();
682 if self.is_valid_entry_idx() {
683 self.reset_value_idx();
684 }
685 }
686
687 pub(crate) fn current_key_entry(&self) -> SharedBufferVersionedEntryRef<'_> {
688 self.assert_valid_idx();
689 debug_assert_eq!(
690 self.current_value_idx,
691 self.inner.entries[self.current_entry_idx].value_offset
692 );
693 SharedBufferVersionedEntryRef {
694 key: &self.inner.entries[self.current_entry_idx].key,
695 new_values: &self.inner.new_values[self.current_value_idx..self.value_end_offset],
696 old_values: self.inner.old_values.as_ref().map(|old_values| {
697 &old_values.values[self.current_value_idx..self.value_end_offset]
698 }),
699 }
700 }
701}
702
703impl<D: HummockIteratorDirection, const IS_NEW_VALUE: bool> HummockIterator
704 for SharedBufferBatchIterator<D, IS_NEW_VALUE>
705{
706 type Direction = D;
707
708 async fn next(&mut self) -> HummockResult<()> {
709 self.advance_to_next_value();
710 if !IS_NEW_VALUE {
711 self.advance_until_valid_old_value();
712 }
713 Ok(())
714 }
715
716 fn key(&self) -> FullKey<&[u8]> {
717 self.assert_valid_idx();
718 let key = self.inner.entries[self.current_entry_idx].key.as_ref();
719 let epoch_with_gap = self.inner.new_values[self.current_value_idx].0;
720 FullKey::new_with_gap_epoch(self.table_id, TableKey(key), epoch_with_gap)
721 }
722
723 fn value(&self) -> HummockValue<&[u8]> {
724 self.assert_valid_idx();
725 if IS_NEW_VALUE {
726 self.inner.new_values[self.current_value_idx]
727 .1
728 .to_ref()
729 .to_slice()
730 .into()
731 } else {
732 HummockValue::put(
733 self.inner.old_values.as_ref().unwrap().values[self.current_value_idx].as_ref(),
734 )
735 }
736 }
737
738 fn is_valid(&self) -> bool {
739 self.is_valid_entry_idx()
740 }
741
742 async fn rewind(&mut self) -> HummockResult<()> {
743 match D::direction() {
744 DirectionEnum::Forward => {
745 self.current_entry_idx = 0;
746 }
747 DirectionEnum::Backward => {
748 self.current_entry_idx = self.inner.entries.len() - 1;
749 }
750 };
751 self.reset_value_idx();
752 if !IS_NEW_VALUE {
753 self.advance_until_valid_old_value();
754 }
755 Ok(())
756 }
757
758 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
759 match key.user_key.table_id.cmp(&self.table_id) {
760 Ordering::Less => {
761 match D::direction() {
762 DirectionEnum::Forward => {
763 self.rewind().await?;
765 return Ok(());
766 }
767 DirectionEnum::Backward => {
768 self.invalidate();
769 return Ok(());
770 }
771 };
772 }
773 Ordering::Greater => {
774 match D::direction() {
775 DirectionEnum::Forward => {
776 self.invalidate();
777 return Ok(());
778 }
779 DirectionEnum::Backward => {
780 self.rewind().await?;
782 return Ok(());
783 }
784 };
785 }
786 Ordering::Equal => (),
787 }
788 let partition_point = self
791 .inner
792 .entries
793 .binary_search_by(|probe| probe.key.as_ref().cmp(*key.user_key.table_key));
794 let seek_key_epoch = key.epoch_with_gap;
795 match partition_point {
796 Ok(i) => {
797 self.current_entry_idx = i;
798 self.reset_value_idx();
799 while self.current_value_idx < self.value_end_offset {
800 let epoch_with_gap = self.inner.new_values[self.current_value_idx].0;
801 if epoch_with_gap <= seek_key_epoch {
802 break;
803 }
804 self.current_value_idx += 1;
805 }
806 if self.current_value_idx == self.value_end_offset {
807 self.advance_to_next_entry();
808 if self.is_valid_entry_idx() {
809 self.reset_value_idx();
810 }
811 }
812 }
813 Err(i) => match D::direction() {
814 DirectionEnum::Forward => {
815 self.current_entry_idx = i;
816 if self.is_valid_entry_idx() {
817 self.reset_value_idx();
818 }
819 }
820 DirectionEnum::Backward => {
821 if i == 0 {
822 self.invalidate();
823 } else {
824 self.current_entry_idx = i - 1;
825 self.reset_value_idx();
826 }
827 }
828 },
829 };
830 if !IS_NEW_VALUE {
831 self.advance_until_valid_old_value();
832 }
833 Ok(())
834 }
835
836 fn collect_local_statistic(&self, _stats: &mut crate::monitor::StoreLocalStatistic) {}
837
838 fn value_meta(&self) -> ValueMeta {
839 ValueMeta::default()
840 }
841}
842
843#[cfg(test)]
844mod tests {
845 use std::ops::Bound::Excluded;
846
847 use itertools::{Itertools, zip_eq};
848 use risingwave_common::util::epoch::{EpochExt, test_epoch};
849 use risingwave_hummock_sdk::key::map_table_key_range;
850
851 use super::*;
852 use crate::hummock::compactor::merge_imms_in_memory;
853 use crate::hummock::iterator::test_utils::{
854 iterator_test_key_of_epoch, iterator_test_table_key_of, transform_shared_buffer,
855 };
856
857 fn to_hummock_value_batch(
858 items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)>,
859 ) -> Vec<(Vec<u8>, HummockValue<Bytes>)> {
860 items.into_iter().map(|(k, v)| (k, v.into())).collect()
861 }
862
863 #[tokio::test]
864 async fn test_shared_buffer_batch_basic() {
865 let epoch = test_epoch(1);
866 let shared_buffer_items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
867 (
868 iterator_test_table_key_of(0),
869 SharedBufferValue::Insert(Bytes::from("value1")),
870 ),
871 (
872 iterator_test_table_key_of(1),
873 SharedBufferValue::Insert(Bytes::from("value1")),
874 ),
875 (
876 iterator_test_table_key_of(2),
877 SharedBufferValue::Insert(Bytes::from("value1")),
878 ),
879 ];
880 let shared_buffer_batch = SharedBufferBatch::for_test(
881 transform_shared_buffer(shared_buffer_items.clone()),
882 epoch,
883 Default::default(),
884 );
885 let shared_buffer_items = to_hummock_value_batch(shared_buffer_items);
886
887 assert_eq!(
889 *shared_buffer_batch.start_table_key(),
890 shared_buffer_items[0].0
891 );
892 assert_eq!(
893 *shared_buffer_batch.end_table_key(),
894 shared_buffer_items[2].0
895 );
896
897 for (k, v) in &shared_buffer_items {
899 assert_eq!(
900 shared_buffer_batch
901 .get(TableKey(k.as_slice()), epoch, &ReadOptions::default())
902 .unwrap()
903 .0
904 .as_slice(),
905 v.as_slice()
906 );
907 }
908 assert_eq!(
909 shared_buffer_batch.get(
910 TableKey(iterator_test_table_key_of(3).as_slice()),
911 epoch,
912 &ReadOptions::default()
913 ),
914 None
915 );
916 assert_eq!(
917 shared_buffer_batch.get(
918 TableKey(iterator_test_table_key_of(4).as_slice()),
919 epoch,
920 &ReadOptions::default()
921 ),
922 None
923 );
924
925 let mut iter = shared_buffer_batch.clone().into_forward_iter();
927 iter.rewind().await.unwrap();
928 let mut output = vec![];
929 while iter.is_valid() {
930 output.push((
931 iter.key().user_key.table_key.to_vec(),
932 iter.value().to_bytes(),
933 ));
934 iter.next().await.unwrap();
935 }
936 assert_eq!(output, shared_buffer_items);
937
938 let mut backward_iter = shared_buffer_batch.clone().into_backward_iter();
940 backward_iter.rewind().await.unwrap();
941 let mut output = vec![];
942 while backward_iter.is_valid() {
943 output.push((
944 backward_iter.key().user_key.table_key.to_vec(),
945 backward_iter.value().to_bytes(),
946 ));
947 backward_iter.next().await.unwrap();
948 }
949 output.reverse();
950 assert_eq!(output, shared_buffer_items);
951 }
952
953 #[tokio::test]
954 async fn test_shared_buffer_batch_seek() {
955 let epoch = test_epoch(1);
956 let shared_buffer_items = vec![
957 (
958 iterator_test_table_key_of(1),
959 SharedBufferValue::Insert(Bytes::from("value1")),
960 ),
961 (
962 iterator_test_table_key_of(2),
963 SharedBufferValue::Insert(Bytes::from("value2")),
964 ),
965 (
966 iterator_test_table_key_of(3),
967 SharedBufferValue::Insert(Bytes::from("value3")),
968 ),
969 ];
970 let shared_buffer_batch = SharedBufferBatch::for_test(
971 transform_shared_buffer(shared_buffer_items.clone()),
972 epoch,
973 Default::default(),
974 );
975 let shared_buffer_items = to_hummock_value_batch(shared_buffer_items);
976
977 let mut iter = shared_buffer_batch.clone().into_forward_iter();
979 iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
980 .await
981 .unwrap();
982 for item in &shared_buffer_items {
983 assert!(iter.is_valid());
984 assert_eq!(*iter.key().user_key.table_key, item.0);
985 assert_eq!(iter.value(), item.1.as_slice());
986 iter.next().await.unwrap();
987 }
988 assert!(!iter.is_valid());
989
990 let mut iter = shared_buffer_batch.clone().into_forward_iter();
992 iter.seek(iterator_test_key_of_epoch(4, epoch).to_ref())
993 .await
994 .unwrap();
995 assert!(!iter.is_valid());
996
997 let mut iter = shared_buffer_batch.clone().into_forward_iter();
999 iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1000 .await
1001 .unwrap();
1002 for item in &shared_buffer_items[1..] {
1003 assert!(iter.is_valid());
1004 assert_eq!(*iter.key().user_key.table_key, item.0);
1005 assert_eq!(iter.value(), item.1.as_slice());
1006 iter.next().await.unwrap();
1007 }
1008 assert!(!iter.is_valid());
1009
1010 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1012 iter.seek(iterator_test_key_of_epoch(2, test_epoch(2)).to_ref())
1013 .await
1014 .unwrap();
1015 for item in &shared_buffer_items[1..] {
1016 assert!(iter.is_valid());
1017 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1018 assert_eq!(iter.value(), item.1.as_slice());
1019 iter.next().await.unwrap();
1020 }
1021 assert!(!iter.is_valid());
1022
1023 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1025 iter.seek(iterator_test_key_of_epoch(2, test_epoch(0)).to_ref())
1026 .await
1027 .unwrap();
1028 let item = shared_buffer_items.last().unwrap();
1029 assert!(iter.is_valid());
1030 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1031 assert_eq!(iter.value(), item.1.as_slice());
1032 iter.next().await.unwrap();
1033 assert!(!iter.is_valid());
1034
1035 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1037 iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1038 .await
1039 .unwrap();
1040 assert!(!iter.is_valid());
1041
1042 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1044 iter.seek(iterator_test_key_of_epoch(4, epoch).to_ref())
1045 .await
1046 .unwrap();
1047 for item in shared_buffer_items.iter().rev() {
1048 assert!(iter.is_valid());
1049 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1050 assert_eq!(iter.value(), item.1.as_slice());
1051 iter.next().await.unwrap();
1052 }
1053 assert!(!iter.is_valid());
1054
1055 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1057 iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1058 .await
1059 .unwrap();
1060 for item in shared_buffer_items[0..=1].iter().rev() {
1061 assert!(iter.is_valid());
1062 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1063 assert_eq!(iter.value(), item.1.as_slice());
1064 iter.next().await.unwrap();
1065 }
1066 assert!(!iter.is_valid());
1067
1068 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1070 iter.seek(iterator_test_key_of_epoch(2, epoch.prev_epoch()).to_ref())
1071 .await
1072 .unwrap();
1073 assert!(iter.is_valid());
1074 let item = shared_buffer_items.first().unwrap();
1075 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1076 assert_eq!(iter.value(), item.1.as_slice());
1077 iter.next().await.unwrap();
1078 assert!(!iter.is_valid());
1079
1080 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1082 iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1083 .await
1084 .unwrap();
1085 for item in shared_buffer_items[0..=1].iter().rev() {
1086 assert!(iter.is_valid());
1087 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1088 assert_eq!(iter.value(), item.1.as_slice());
1089 iter.next().await.unwrap();
1090 }
1091 assert!(!iter.is_valid());
1092 }
1093
1094 #[tokio::test]
1095 async fn test_shared_buffer_batch_old_value_iter() {
1096 let epoch = test_epoch(1);
1097 let key_values = vec![
1098 (
1099 iterator_test_table_key_of(1),
1100 SharedBufferValue::Insert(Bytes::from("value1")),
1101 ),
1102 (
1103 iterator_test_table_key_of(2),
1104 SharedBufferValue::Update(Bytes::from("value2")),
1105 ),
1106 (
1107 iterator_test_table_key_of(3),
1108 SharedBufferValue::Insert(Bytes::from("value3")),
1109 ),
1110 (iterator_test_table_key_of(4), SharedBufferValue::Delete),
1111 ];
1112 let old_values = vec![
1113 Bytes::new(),
1114 Bytes::from("old_value2"),
1115 Bytes::new(),
1116 Bytes::from("old_value4"),
1117 ];
1118 let shared_buffer_batch = SharedBufferBatch::for_test_with_old_values(
1119 transform_shared_buffer(key_values.clone()),
1120 old_values.clone(),
1121 epoch,
1122 Default::default(),
1123 );
1124 let shared_buffer_items = to_hummock_value_batch(key_values.clone());
1125 let expected_old_value_iter_items = zip_eq(&key_values, &old_values)
1126 .filter(|((_, new_value), _)| !matches!(new_value, SharedBufferValue::Insert(_)))
1127 .map(|((key, _), old_value)| (key.clone(), HummockValue::Put(old_value)))
1128 .collect_vec();
1129
1130 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1131 iter.rewind().await.unwrap();
1132 for item in &expected_old_value_iter_items {
1133 assert!(iter.is_valid());
1134 assert_eq!(*iter.key().user_key.table_key, item.0);
1135 assert_eq!(iter.value(), item.1.as_slice());
1136 iter.next().await.unwrap();
1137 }
1138 assert!(!iter.is_valid());
1139
1140 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1142 iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1143 .await
1144 .unwrap();
1145 for item in &shared_buffer_items {
1146 assert!(iter.is_valid());
1147 assert_eq!(*iter.key().user_key.table_key, item.0);
1148 assert_eq!(iter.value(), item.1.as_slice());
1149 iter.next().await.unwrap();
1150 }
1151 assert!(!iter.is_valid());
1152
1153 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1154 iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1155 .await
1156 .unwrap();
1157 for item in &expected_old_value_iter_items {
1158 assert!(iter.is_valid());
1159 assert_eq!(*iter.key().user_key.table_key, item.0);
1160 assert_eq!(iter.value(), item.1.as_slice());
1161 iter.next().await.unwrap();
1162 }
1163 assert!(!iter.is_valid());
1164
1165 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1167 iter.seek(iterator_test_key_of_epoch(5, epoch).to_ref())
1168 .await
1169 .unwrap();
1170 assert!(!iter.is_valid());
1171
1172 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1173 iter.seek(iterator_test_key_of_epoch(5, epoch).to_ref())
1174 .await
1175 .unwrap();
1176 assert!(!iter.is_valid());
1177
1178 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1180 iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1181 .await
1182 .unwrap();
1183 for item in &shared_buffer_items[1..] {
1184 assert!(iter.is_valid());
1185 assert_eq!(*iter.key().user_key.table_key, item.0);
1186 assert_eq!(iter.value(), item.1.as_slice());
1187 iter.next().await.unwrap();
1188 }
1189 assert!(!iter.is_valid());
1190
1191 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1192 iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1193 .await
1194 .unwrap();
1195 for item in &expected_old_value_iter_items {
1196 assert!(iter.is_valid());
1197 assert_eq!(*iter.key().user_key.table_key, item.0);
1198 assert_eq!(iter.value(), item.1.as_slice());
1199 iter.next().await.unwrap();
1200 }
1201 assert!(!iter.is_valid());
1202
1203 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1205 iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1206 .await
1207 .unwrap();
1208 for item in &shared_buffer_items[1..] {
1209 assert!(iter.is_valid());
1210 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1211 assert_eq!(iter.value(), item.1.as_slice());
1212 iter.next().await.unwrap();
1213 }
1214 assert!(!iter.is_valid());
1215
1216 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1217 iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1218 .await
1219 .unwrap();
1220 for item in &expected_old_value_iter_items {
1221 assert!(iter.is_valid());
1222 assert_eq!(*iter.key().user_key.table_key, item.0);
1223 assert_eq!(iter.value(), item.1.as_slice());
1224 iter.next().await.unwrap();
1225 }
1226 assert!(!iter.is_valid());
1227
1228 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1229 iter.seek(iterator_test_key_of_epoch(2, epoch.prev_epoch()).to_ref())
1230 .await
1231 .unwrap();
1232 for item in &expected_old_value_iter_items[1..] {
1233 assert!(iter.is_valid());
1234 assert_eq!(*iter.key().user_key.table_key, item.0);
1235 assert_eq!(iter.value(), item.1.as_slice());
1236 iter.next().await.unwrap();
1237 }
1238 assert!(!iter.is_valid());
1239
1240 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1242 iter.seek(iterator_test_key_of_epoch(3, epoch.prev_epoch()).to_ref())
1243 .await
1244 .unwrap();
1245 let item = shared_buffer_items.last().unwrap();
1246 assert!(iter.is_valid());
1247 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1248 assert_eq!(iter.value(), item.1.as_slice());
1249 iter.next().await.unwrap();
1250 assert!(!iter.is_valid());
1251
1252 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1254 iter.seek(iterator_test_key_of_epoch(3, epoch).to_ref())
1255 .await
1256 .unwrap();
1257 for item in &expected_old_value_iter_items[1..] {
1258 assert!(iter.is_valid());
1259 assert_eq!(*iter.key().user_key.table_key, item.0);
1260 assert_eq!(iter.value(), item.1.as_slice());
1261 iter.next().await.unwrap();
1262 }
1263 assert!(!iter.is_valid());
1264 }
1265
1266 #[tokio::test]
1267 #[should_panic]
1268 async fn test_invalid_table_id() {
1269 let epoch = test_epoch(1);
1270 let shared_buffer_batch = SharedBufferBatch::for_test(vec![], epoch, Default::default());
1271 let mut iter = shared_buffer_batch.into_forward_iter();
1273 iter.seek(FullKey::for_test(TableId::new(1), vec![], epoch).to_ref())
1274 .await
1275 .unwrap();
1276 }
1277
1278 #[tokio::test]
1279 async fn test_shared_buffer_batch_range_existx() {
1280 let epoch = test_epoch(1);
1281 let shared_buffer_items = vec![
1282 (
1283 Vec::from("a_1"),
1284 SharedBufferValue::Insert(Bytes::from("value1")),
1285 ),
1286 (
1287 Vec::from("a_3"),
1288 SharedBufferValue::Insert(Bytes::from("value2")),
1289 ),
1290 (
1291 Vec::from("a_5"),
1292 SharedBufferValue::Insert(Bytes::from("value3")),
1293 ),
1294 (
1295 Vec::from("b_2"),
1296 SharedBufferValue::Insert(Bytes::from("value3")),
1297 ),
1298 ];
1299 let shared_buffer_batch = SharedBufferBatch::for_test(
1300 transform_shared_buffer(shared_buffer_items),
1301 epoch,
1302 Default::default(),
1303 );
1304
1305 let range = (Included(Bytes::from("a")), Excluded(Bytes::from("b")));
1306 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1307 let range = (Included(Bytes::from("a_")), Excluded(Bytes::from("b_")));
1308 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1309 let range = (Included(Bytes::from("a_1")), Included(Bytes::from("a_1")));
1310 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1311 let range = (Included(Bytes::from("a_1")), Included(Bytes::from("a_2")));
1312 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1313 let range = (Included(Bytes::from("a_0x")), Included(Bytes::from("a_2x")));
1314 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1315 let range = (Included(Bytes::from("a_")), Excluded(Bytes::from("c_")));
1316 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1317 let range = (Included(Bytes::from("b_0x")), Included(Bytes::from("b_2x")));
1318 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1319 let range = (Included(Bytes::from("b_2")), Excluded(Bytes::from("c_1x")));
1320 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1321
1322 let range = (Included(Bytes::from("a_0")), Excluded(Bytes::from("a_1")));
1323 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1324 let range = (Included(Bytes::from("a__0")), Excluded(Bytes::from("a__5")));
1325 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1326 let range = (Included(Bytes::from("b_1")), Excluded(Bytes::from("b_2")));
1327 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1328 let range = (Included(Bytes::from("b_3")), Excluded(Bytes::from("c_1")));
1329 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1330 let range = (Included(Bytes::from("b__x")), Excluded(Bytes::from("c__x")));
1331 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1332 }
1333
1334 #[tokio::test]
1335 async fn test_merge_imms_basic() {
1336 let table_id = TableId::new(1004);
1337 let shared_buffer_items1: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1338 (
1339 iterator_test_table_key_of(1),
1340 SharedBufferValue::Insert(Bytes::from("value1")),
1341 ),
1342 (
1343 iterator_test_table_key_of(2),
1344 SharedBufferValue::Insert(Bytes::from("value2")),
1345 ),
1346 (
1347 iterator_test_table_key_of(3),
1348 SharedBufferValue::Insert(Bytes::from("value3")),
1349 ),
1350 ];
1351 let epoch = test_epoch(1);
1352 let imm1 = SharedBufferBatch::for_test(
1353 transform_shared_buffer(shared_buffer_items1.clone()),
1354 epoch,
1355 table_id,
1356 );
1357 let shared_buffer_items1 = to_hummock_value_batch(shared_buffer_items1);
1358 let shared_buffer_items2: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1359 (
1360 iterator_test_table_key_of(1),
1361 SharedBufferValue::Insert(Bytes::from("value12")),
1362 ),
1363 (
1364 iterator_test_table_key_of(2),
1365 SharedBufferValue::Insert(Bytes::from("value22")),
1366 ),
1367 (
1368 iterator_test_table_key_of(3),
1369 SharedBufferValue::Insert(Bytes::from("value32")),
1370 ),
1371 ];
1372 let epoch = test_epoch(2);
1373 let imm2 = SharedBufferBatch::for_test(
1374 transform_shared_buffer(shared_buffer_items2.clone()),
1375 epoch,
1376 table_id,
1377 );
1378 let shared_buffer_items2 = to_hummock_value_batch(shared_buffer_items2);
1379
1380 let shared_buffer_items3: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1381 (
1382 iterator_test_table_key_of(1),
1383 SharedBufferValue::Insert(Bytes::from("value13")),
1384 ),
1385 (
1386 iterator_test_table_key_of(2),
1387 SharedBufferValue::Insert(Bytes::from("value23")),
1388 ),
1389 (
1390 iterator_test_table_key_of(3),
1391 SharedBufferValue::Insert(Bytes::from("value33")),
1392 ),
1393 ];
1394 let epoch = test_epoch(3);
1395 let imm3 = SharedBufferBatch::for_test(
1396 transform_shared_buffer(shared_buffer_items3.clone()),
1397 epoch,
1398 table_id,
1399 );
1400 let shared_buffer_items3 = to_hummock_value_batch(shared_buffer_items3);
1401
1402 let batch_items = [
1403 shared_buffer_items1,
1404 shared_buffer_items2,
1405 shared_buffer_items3,
1406 ];
1407 let imms = vec![imm3, imm2, imm1];
1409 let merged_imm = merge_imms_in_memory(table_id, imms.clone(), None).await;
1410
1411 for (i, items) in batch_items.iter().enumerate() {
1413 for (key, value) in items {
1414 assert_eq!(
1415 merged_imm
1416 .get(
1417 TableKey(key.as_slice()),
1418 test_epoch(i as u64 + 1),
1419 &ReadOptions::default()
1420 )
1421 .unwrap()
1422 .0
1423 .as_slice(),
1424 value.as_slice(),
1425 "epoch: {}, key: {:?}",
1426 test_epoch(i as u64 + 1),
1427 String::from_utf8(key.clone())
1428 );
1429 }
1430 }
1431 assert_eq!(
1432 merged_imm.get(
1433 TableKey(iterator_test_table_key_of(4).as_slice()),
1434 test_epoch(1),
1435 &ReadOptions::default()
1436 ),
1437 None
1438 );
1439 assert_eq!(
1440 merged_imm.get(
1441 TableKey(iterator_test_table_key_of(5).as_slice()),
1442 test_epoch(1),
1443 &ReadOptions::default()
1444 ),
1445 None
1446 );
1447
1448 for snapshot_epoch in 1..=3 {
1450 let mut iter = merged_imm.clone().into_forward_iter();
1451 iter.rewind().await.unwrap();
1452 let mut output = vec![];
1453 while iter.is_valid() {
1454 let epoch = iter.key().epoch_with_gap.pure_epoch();
1455 if test_epoch(snapshot_epoch) == epoch {
1456 output.push((
1457 iter.key().user_key.table_key.to_vec(),
1458 iter.value().to_bytes(),
1459 ));
1460 }
1461 iter.next().await.unwrap();
1462 }
1463 assert_eq!(output, batch_items[snapshot_epoch as usize - 1]);
1464 }
1465
1466 {
1468 let mut iter = merged_imm.clone().into_forward_iter();
1469 iter.rewind().await.unwrap();
1470 let mut output = vec![];
1471 while iter.is_valid() {
1472 output.push((
1473 iter.key().user_key.table_key.to_vec(),
1474 iter.value().to_bytes(),
1475 ));
1476 iter.next().await.unwrap();
1477 }
1478
1479 let mut expected = vec![];
1480 #[expect(clippy::needless_range_loop)]
1481 for key_idx in 0..=2 {
1482 for epoch in (1..=3).rev() {
1483 let item = batch_items[epoch - 1][key_idx].clone();
1484 expected.push(item);
1485 }
1486 }
1487 assert_eq!(expected, output);
1488
1489 let mut backward_iter = merged_imm.clone().into_backward_iter();
1490 backward_iter.rewind().await.unwrap();
1491 let mut output = vec![];
1492 while backward_iter.is_valid() {
1493 output.push((
1494 backward_iter.key().user_key.table_key.to_vec(),
1495 backward_iter.value().to_bytes(),
1496 ));
1497 backward_iter.next().await.unwrap();
1498 }
1499 let mut expected = vec![];
1500 for key_idx in (0..=2).rev() {
1501 for epoch in (1..=3).rev() {
1502 let item = batch_items[epoch - 1][key_idx].clone();
1503 expected.push(item);
1504 }
1505 }
1506 assert_eq!(expected, output);
1507 }
1508 }
1509
1510 #[tokio::test]
1511 async fn test_merge_imms_with_old_values() {
1512 let table_id = TableId::new(1004);
1513 let key_value1: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1514 (
1515 iterator_test_table_key_of(1),
1516 SharedBufferValue::Insert(Bytes::from("value1")),
1517 ),
1518 (
1519 iterator_test_table_key_of(2),
1520 SharedBufferValue::Update(Bytes::from("value2")),
1521 ),
1522 (iterator_test_table_key_of(3), SharedBufferValue::Delete),
1523 ];
1524 let old_value1 = vec![
1525 Bytes::new(),
1526 Bytes::from("old_value2"),
1527 Bytes::from("old_value3"),
1528 ];
1529 let epoch = test_epoch(1);
1530 let imm1 = SharedBufferBatch::for_test_with_old_values(
1531 transform_shared_buffer(key_value1.clone()),
1532 old_value1.clone(),
1533 epoch,
1534 table_id,
1535 );
1536 let shared_buffer_items1 = to_hummock_value_batch(key_value1.clone());
1537 let key_value2: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1538 (
1539 iterator_test_table_key_of(1),
1540 SharedBufferValue::Update(Bytes::from("value12")),
1541 ),
1542 (
1543 iterator_test_table_key_of(2),
1544 SharedBufferValue::Update(Bytes::from("value22")),
1545 ),
1546 (
1547 iterator_test_table_key_of(3),
1548 SharedBufferValue::Insert(Bytes::from("value32")),
1549 ),
1550 ];
1551 let old_value2 = vec![Bytes::from("value1"), Bytes::from("value2"), Bytes::new()];
1552 let epoch = epoch.next_epoch();
1553 let imm2 = SharedBufferBatch::for_test_with_old_values(
1554 transform_shared_buffer(key_value2.clone()),
1555 old_value2.clone(),
1556 epoch,
1557 table_id,
1558 );
1559 let shared_buffer_items2 = to_hummock_value_batch(key_value2.clone());
1560
1561 let key_value3: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1562 (iterator_test_table_key_of(1), SharedBufferValue::Delete),
1563 (iterator_test_table_key_of(2), SharedBufferValue::Delete),
1564 (
1565 iterator_test_table_key_of(3),
1566 SharedBufferValue::Update(Bytes::from("value33")),
1567 ),
1568 ];
1569 let old_value3 = vec![
1570 Bytes::from("value12"),
1571 Bytes::from("value22"),
1572 Bytes::from("value32"),
1573 ];
1574 let epoch = epoch.next_epoch();
1575 let imm3 = SharedBufferBatch::for_test_with_old_values(
1576 transform_shared_buffer(key_value3.clone()),
1577 old_value3.clone(),
1578 epoch,
1579 table_id,
1580 );
1581 let shared_buffer_items3 = to_hummock_value_batch(key_value3.clone());
1582
1583 let key_values = [
1584 (key_value1, old_value1),
1585 (key_value2, old_value2),
1586 (key_value3, old_value3),
1587 ];
1588
1589 let batch_items = [
1590 shared_buffer_items1,
1591 shared_buffer_items2,
1592 shared_buffer_items3,
1593 ];
1594 let imms = vec![imm3, imm2, imm1];
1596 let merged_imm = merge_imms_in_memory(table_id, imms.clone(), None).await;
1597
1598 for (i, items) in batch_items.iter().enumerate() {
1600 for (key, value) in items {
1601 assert_eq!(
1602 merged_imm
1603 .get(
1604 TableKey(key.as_slice()),
1605 test_epoch(i as u64 + 1),
1606 &ReadOptions::default()
1607 )
1608 .unwrap()
1609 .0
1610 .as_slice(),
1611 value.as_slice(),
1612 "epoch: {}, key: {:?}",
1613 test_epoch(i as u64 + 1),
1614 String::from_utf8(key.clone())
1615 );
1616 }
1617 }
1618 assert_eq!(
1619 merged_imm.get(
1620 TableKey(iterator_test_table_key_of(4).as_slice()),
1621 test_epoch(1),
1622 &ReadOptions::default()
1623 ),
1624 None
1625 );
1626 assert_eq!(
1627 merged_imm.get(
1628 TableKey(iterator_test_table_key_of(5).as_slice()),
1629 test_epoch(1),
1630 &ReadOptions::default()
1631 ),
1632 None
1633 );
1634
1635 for i in 1..=3 {
1637 let snapshot_epoch = test_epoch(i);
1638 let mut iter = merged_imm.clone().into_forward_iter();
1639 iter.rewind().await.unwrap();
1640 let mut output = vec![];
1641 while iter.is_valid() {
1642 let epoch = iter.key().epoch_with_gap.pure_epoch();
1643 if snapshot_epoch == epoch {
1644 output.push((
1645 iter.key().user_key.table_key.to_vec(),
1646 iter.value().to_bytes(),
1647 ));
1648 }
1649 iter.next().await.unwrap();
1650 }
1651 assert_eq!(output, batch_items[i as usize - 1]);
1652 }
1653
1654 {
1656 let mut iter = merged_imm.clone().into_forward_iter();
1657 iter.rewind().await.unwrap();
1658 let mut output = vec![];
1659 while iter.is_valid() {
1660 output.push((
1661 iter.key().user_key.table_key.to_vec(),
1662 iter.value().to_bytes(),
1663 ));
1664 iter.next().await.unwrap();
1665 }
1666
1667 let mut expected = vec![];
1668 #[expect(clippy::needless_range_loop)]
1669 for key_idx in 0..=2 {
1670 for epoch in (1..=3).rev() {
1671 let item = batch_items[epoch - 1][key_idx].clone();
1672 expected.push(item);
1673 }
1674 }
1675 assert_eq!(expected, output);
1676
1677 let mut backward_iter = merged_imm.clone().into_backward_iter();
1678 backward_iter.rewind().await.unwrap();
1679 let mut output = vec![];
1680 while backward_iter.is_valid() {
1681 output.push((
1682 backward_iter.key().user_key.table_key.to_vec(),
1683 backward_iter.value().to_bytes(),
1684 ));
1685 backward_iter.next().await.unwrap();
1686 }
1687 let mut expected = vec![];
1688 for key_idx in (0..=2).rev() {
1689 for epoch in (1..=3).rev() {
1690 let item = batch_items[epoch - 1][key_idx].clone();
1691 expected.push(item);
1692 }
1693 }
1694 assert_eq!(expected, output);
1695 }
1696
1697 {
1699 let mut iter = merged_imm.clone().into_old_value_iter();
1700 iter.rewind().await.unwrap();
1701 let mut output = vec![];
1702 while iter.is_valid() {
1703 output.push((
1704 iter.key().user_key.table_key.to_vec(),
1705 iter.value().to_bytes(),
1706 ));
1707 iter.next().await.unwrap();
1708 }
1709
1710 let mut expected = vec![];
1711 for key_idx in 0..=2 {
1712 for epoch in (0..=2).rev() {
1713 let (key_values, old_values) = &key_values[epoch];
1714 let (key, new_value) = &key_values[key_idx];
1715 let old_value = &old_values[key_idx];
1716 if matches!(new_value, SharedBufferValue::Insert(_)) {
1717 continue;
1718 }
1719 expected.push((key.clone(), HummockValue::Put(old_value.clone())));
1720 }
1721 }
1722 assert_eq!(expected, output);
1723 }
1724 }
1725 #[tokio::test]
1726 async fn test_shared_buffer_batch_seek_bug() {
1727 let epoch = test_epoch(1);
1729 let table_id = TableId::new(100);
1730 let shared_buffer_items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![(
1731 iterator_test_table_key_of(1), SharedBufferValue::Insert(Bytes::from("value1")),
1733 )];
1734 let shared_buffer_batch = SharedBufferBatch::for_test(
1735 transform_shared_buffer(shared_buffer_items.clone()),
1736 epoch,
1737 table_id,
1738 );
1739
1740 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1747 let seek_key = FullKey::for_test(
1748 TableId::new(99),
1749 iterator_test_table_key_of(2), epoch,
1751 );
1752 iter.seek(seek_key.to_ref()).await.unwrap();
1753
1754 assert!(
1755 iter.is_valid(),
1756 "Iterator should be valid when seeking with smaller table_id, even if the key part is larger"
1757 );
1758 assert_eq!(iter.key().user_key.table_id, table_id);
1759
1760 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1767 let seek_key = FullKey::for_test(
1768 TableId::new(101),
1769 iterator_test_table_key_of(0), epoch,
1771 );
1772 iter.seek(seek_key.to_ref()).await.unwrap();
1773
1774 assert!(
1775 iter.is_valid(),
1776 "Iterator should be valid when seeking with larger table_id, even if the key part is smaller"
1777 );
1778 assert_eq!(iter.key().user_key.table_id, table_id);
1779 }
1780}