1use std::cmp::Ordering;
16use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::mem::size_of_val;
19use std::ops::Bound::Included;
20use std::ops::{Bound, RangeBounds};
21use std::sync::atomic::AtomicU64;
22use std::sync::atomic::Ordering::Relaxed;
23use std::sync::{Arc, LazyLock};
24
25use bytes::Bytes;
26use risingwave_common::catalog::TableId;
27use risingwave_common::metrics::LabelGuardedIntGauge;
28use risingwave_hummock_sdk::EpochWithGap;
29use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange, UserKey};
30
31use crate::hummock::iterator::{
32 Backward, DirectionEnum, Forward, HummockIterator, HummockIteratorDirection, ValueMeta,
33};
34use crate::hummock::shared_buffer::TableMemoryMetrics;
35use crate::hummock::utils::range_overlap;
36use crate::hummock::value::HummockValue;
37use crate::hummock::{HummockEpoch, HummockResult};
38use crate::mem_table::ImmId;
39use crate::store::ReadOptions;
40
41#[derive(Clone, Copy, Debug, PartialEq)]
42pub enum SharedBufferValue<T> {
43 Insert(T),
44 Update(T),
45 Delete,
46}
47
48impl<T> SharedBufferValue<T> {
49 fn to_ref(&self) -> SharedBufferValue<&T> {
50 match self {
51 SharedBufferValue::Insert(val) => SharedBufferValue::Insert(val),
52 SharedBufferValue::Update(val) => SharedBufferValue::Update(val),
53 SharedBufferValue::Delete => SharedBufferValue::Delete,
54 }
55 }
56}
57
58impl<T> From<SharedBufferValue<T>> for HummockValue<T> {
59 fn from(val: SharedBufferValue<T>) -> HummockValue<T> {
60 match val {
61 SharedBufferValue::Insert(val) | SharedBufferValue::Update(val) => {
62 HummockValue::Put(val)
63 }
64 SharedBufferValue::Delete => HummockValue::Delete,
65 }
66 }
67}
68
69impl<'a, T: AsRef<[u8]>> SharedBufferValue<&'a T> {
70 pub(crate) fn to_slice(self) -> SharedBufferValue<&'a [u8]> {
71 match self {
72 SharedBufferValue::Insert(val) => SharedBufferValue::Insert(val.as_ref()),
73 SharedBufferValue::Update(val) => SharedBufferValue::Update(val.as_ref()),
74 SharedBufferValue::Delete => SharedBufferValue::Delete,
75 }
76 }
77}
78
79pub(crate) type SharedBufferItem = (TableKey<Bytes>, SharedBufferValue<Bytes>);
81pub type SharedBufferBatchId = u64;
82
83pub(crate) type VersionedSharedBufferValue = (EpochWithGap, SharedBufferValue<Bytes>);
84
85pub(crate) struct SharedBufferVersionedEntryRef<'a> {
86 pub(crate) key: &'a TableKey<Bytes>,
87 pub(crate) new_values: &'a [VersionedSharedBufferValue],
88 pub(crate) old_values: Option<&'a [Bytes]>,
89}
90
91#[derive(PartialEq, Debug)]
92pub(crate) struct SharedBufferKeyEntry {
93 pub(crate) key: TableKey<Bytes>,
94 pub(crate) value_offset: usize,
99}
100
101impl SharedBufferKeyEntry {
102 fn value_end_offset<'a, T>(
104 i: usize,
105 entries: &'a [SharedBufferKeyEntry],
106 values: &'a [T],
107 ) -> usize {
108 entries
109 .get(i + 1)
110 .map(|entry| entry.value_offset)
111 .unwrap_or(values.len())
112 }
113
114 fn values<'a, T>(i: usize, entries: &'a [SharedBufferKeyEntry], values: &'a [T]) -> &'a [T] {
115 &values[entries[i].value_offset..Self::value_end_offset(i, entries, values)]
116 }
117}
118
119#[derive(Debug)]
120pub(crate) struct SharedBufferBatchOldValues {
121 values: Vec<Bytes>,
124 pub size: usize,
125 pub global_old_value_size: LabelGuardedIntGauge,
126}
127
128impl Drop for SharedBufferBatchOldValues {
129 fn drop(&mut self) {
130 self.global_old_value_size.sub(self.size as _);
131 }
132}
133
134impl SharedBufferBatchOldValues {
135 pub(crate) fn new(
136 values: Vec<Bytes>,
137 size: usize,
138 global_old_value_size: LabelGuardedIntGauge,
139 ) -> Self {
140 global_old_value_size.add(size as _);
141 Self {
142 values,
143 size,
144 global_old_value_size,
145 }
146 }
147
148 pub(crate) fn for_test(values: Vec<Bytes>, size: usize) -> Self {
149 Self::new(values, size, LabelGuardedIntGauge::test_int_gauge::<1>())
150 }
151}
152
153#[derive(Debug)]
154pub(crate) struct SharedBufferBatchInner {
155 entries: Vec<SharedBufferKeyEntry>,
156 new_values: Vec<VersionedSharedBufferValue>,
157 old_values: Option<SharedBufferBatchOldValues>,
158 epochs: Vec<HummockEpoch>,
160 size: usize,
162 per_table_tracker: Arc<TableMemoryMetrics>,
163 batch_id: SharedBufferBatchId,
166}
167
168impl SharedBufferBatchInner {
169 pub(crate) fn new(
170 epoch: HummockEpoch,
171 spill_offset: u16,
172 payload: Vec<SharedBufferItem>,
173 old_values: Option<SharedBufferBatchOldValues>,
174 size: usize,
175 table_metrics: Arc<TableMemoryMetrics>,
176 ) -> Self {
177 assert!(!payload.is_empty());
178 debug_assert!(payload.iter().is_sorted_by_key(|(key, _)| key));
179 if let Some(old_values) = &old_values {
180 assert_eq!(old_values.values.len(), payload.len());
181 }
182
183 let epoch_with_gap = EpochWithGap::new(epoch, spill_offset);
184 let mut entries = Vec::with_capacity(payload.len());
185 let mut new_values = Vec::with_capacity(payload.len());
186 for (i, (key, value)) in payload.into_iter().enumerate() {
187 entries.push(SharedBufferKeyEntry {
188 key,
189 value_offset: i,
190 });
191 new_values.push((epoch_with_gap, value));
192 }
193
194 let batch_id = SHARED_BUFFER_BATCH_ID_GENERATOR.fetch_add(1, Relaxed);
195
196 table_metrics.inc_imm(size);
197
198 SharedBufferBatchInner {
199 entries,
200 new_values,
201 old_values,
202 epochs: vec![epoch],
203 size,
204 per_table_tracker: table_metrics,
205 batch_id,
206 }
207 }
208
209 pub fn values(&self, i: usize) -> &[VersionedSharedBufferValue] {
210 SharedBufferKeyEntry::values(i, &self.entries, &self.new_values)
211 }
212
213 pub(crate) fn new_with_multi_epoch_batches(
214 epochs: Vec<HummockEpoch>,
215 entries: Vec<SharedBufferKeyEntry>,
216 new_values: Vec<VersionedSharedBufferValue>,
217 old_values: Option<SharedBufferBatchOldValues>,
218 size: usize,
219 imm_id: ImmId,
220 ) -> Self {
221 assert!(new_values.len() >= entries.len());
222 assert!(!entries.is_empty());
223 debug_assert!(entries.iter().is_sorted_by_key(|entry| &entry.key));
224 debug_assert!(entries.iter().is_sorted_by_key(|entry| &entry.value_offset));
225 debug_assert!((0..entries.len()).all(|i| {
226 SharedBufferKeyEntry::values(i, &entries, &new_values)
227 .iter()
228 .rev()
229 .is_sorted_by_key(|(epoch_with_gap, _)| epoch_with_gap)
230 }));
231 debug_assert!(!epochs.is_empty());
232 debug_assert!(epochs.is_sorted());
233
234 Self {
235 entries,
236 new_values,
237 old_values,
238 epochs,
239 size,
240 per_table_tracker: TableMemoryMetrics::for_test(),
241 batch_id: imm_id,
242 }
243 }
244
245 fn get_value<'a>(
248 &'a self,
249 table_key: TableKey<&[u8]>,
250 read_epoch: HummockEpoch,
251 ) -> Option<(HummockValue<&'a Bytes>, EpochWithGap)> {
252 if let Ok(i) = self
254 .entries
255 .binary_search_by(|m| (m.key.as_ref()).cmp(*table_key))
256 {
257 let entry = &self.entries[i];
258 assert_eq!(entry.key.as_ref(), *table_key);
259 for (e, v) in self.values(i) {
261 if read_epoch < e.pure_epoch() {
263 continue;
264 }
265 return Some((v.to_ref().into(), *e));
266 }
267 }
269
270 None
271 }
272}
273
274impl PartialEq for SharedBufferBatchInner {
275 fn eq(&self, other: &Self) -> bool {
276 self.entries == other.entries && self.new_values == other.new_values
277 }
278}
279
280impl Drop for SharedBufferBatchInner {
281 fn drop(&mut self) {
282 self.per_table_tracker.dec_imm(self.size);
283 }
284}
285
286pub static SHARED_BUFFER_BATCH_ID_GENERATOR: LazyLock<AtomicU64> =
287 LazyLock::new(|| AtomicU64::new(0));
288
289#[derive(Clone, Debug)]
291pub struct SharedBufferBatch {
292 pub(crate) inner: Arc<SharedBufferBatchInner>,
293 pub table_id: TableId,
294}
295
296impl SharedBufferBatch {
297 pub fn for_test(
298 sorted_items: Vec<SharedBufferItem>,
299 epoch: HummockEpoch,
300 table_id: TableId,
301 ) -> Self {
302 Self::for_test_inner(sorted_items, None, epoch, table_id)
303 }
304
305 pub fn for_test_with_old_values(
306 sorted_items: Vec<SharedBufferItem>,
307 old_values: Vec<Bytes>,
308 epoch: HummockEpoch,
309 table_id: TableId,
310 ) -> Self {
311 Self::for_test_inner(sorted_items, Some(old_values), epoch, table_id)
312 }
313
314 fn for_test_inner(
315 sorted_items: Vec<SharedBufferItem>,
316 old_values: Option<Vec<Bytes>>,
317 epoch: HummockEpoch,
318 table_id: TableId,
319 ) -> Self {
320 let (size, old_value_size) = Self::measure_batch_size(&sorted_items, old_values.as_deref());
321
322 let old_values = old_values
323 .map(|old_values| SharedBufferBatchOldValues::for_test(old_values, old_value_size));
324
325 Self {
326 inner: Arc::new(SharedBufferBatchInner::new(
327 epoch,
328 0,
329 sorted_items,
330 old_values,
331 size,
332 TableMemoryMetrics::for_test(),
333 )),
334 table_id,
335 }
336 }
337
338 pub fn measure_delete_range_size(batch_items: &[(Bound<Bytes>, Bound<Bytes>)]) -> usize {
339 batch_items
340 .iter()
341 .map(|(left, right)| {
342 let l1 = match left {
344 Bound::Excluded(x) | Bound::Included(x) => x.len() + 13,
345 Bound::Unbounded => 13,
346 };
347 let l2 = match right {
348 Bound::Excluded(x) | Bound::Included(x) => x.len() + 13,
349 Bound::Unbounded => 13,
350 };
351 l1 + l2
352 })
353 .sum()
354 }
355
356 pub fn measure_batch_size(
358 batch_items: &[SharedBufferItem],
359 old_values: Option<&[Bytes]>,
360 ) -> (usize, usize) {
361 let old_value_size = old_values
362 .iter()
363 .flat_map(|slice| slice.iter().map(|value| size_of_val(value) + value.len()))
364 .sum::<usize>();
365 let kv_size = batch_items
367 .iter()
368 .map(|(k, v)| {
369 k.len() + {
370 match v {
371 SharedBufferValue::Insert(val) | SharedBufferValue::Update(val) => {
372 val.len()
373 }
374 SharedBufferValue::Delete => 0,
375 }
376 }
377 })
378 .sum::<usize>();
379 (kv_size + old_value_size, old_value_size)
380 }
381
382 pub fn filter<R, B>(&self, table_id: TableId, table_key_range: &R) -> bool
383 where
384 R: RangeBounds<TableKey<B>>,
385 B: AsRef<[u8]>,
386 {
387 let left = table_key_range
388 .start_bound()
389 .as_ref()
390 .map(|key| TableKey(key.0.as_ref()));
391 let right = table_key_range
392 .end_bound()
393 .as_ref()
394 .map(|key| TableKey(key.0.as_ref()));
395 self.table_id == table_id
396 && range_overlap(
397 &(left, right),
398 &self.start_table_key(),
399 Included(&self.end_table_key()),
400 )
401 }
402
403 pub fn table_id(&self) -> TableId {
404 self.table_id
405 }
406
407 pub fn min_epoch(&self) -> HummockEpoch {
408 *self.inner.epochs.first().unwrap()
409 }
410
411 pub fn max_epoch(&self) -> HummockEpoch {
412 *self.inner.epochs.last().unwrap()
413 }
414
415 pub fn key_count(&self) -> usize {
416 self.inner.entries.len()
417 }
418
419 pub fn value_count(&self) -> usize {
420 self.inner.new_values.len()
421 }
422
423 pub fn has_old_value(&self) -> bool {
424 self.inner.old_values.is_some()
425 }
426
427 pub fn get<'a>(
428 &'a self,
429 table_key: TableKey<&[u8]>,
430 read_epoch: HummockEpoch,
431 _read_options: &ReadOptions,
432 ) -> Option<(HummockValue<&'a Bytes>, EpochWithGap)> {
433 self.inner.get_value(table_key, read_epoch)
434 }
435
436 pub fn range_exists(&self, table_key_range: &TableKeyRange) -> bool {
437 self.inner
438 .entries
439 .binary_search_by(|m| {
440 let key = &m.key;
441 let too_left = match &table_key_range.0 {
442 std::ops::Bound::Included(range_start) => range_start.as_ref() > key.as_ref(),
443 std::ops::Bound::Excluded(range_start) => range_start.as_ref() >= key.as_ref(),
444 std::ops::Bound::Unbounded => false,
445 };
446 if too_left {
447 return Ordering::Less;
448 }
449
450 let too_right = match &table_key_range.1 {
451 std::ops::Bound::Included(range_end) => range_end.as_ref() < key.as_ref(),
452 std::ops::Bound::Excluded(range_end) => range_end.as_ref() <= key.as_ref(),
453 std::ops::Bound::Unbounded => false,
454 };
455 if too_right {
456 return Ordering::Greater;
457 }
458
459 Ordering::Equal
460 })
461 .is_ok()
462 }
463
464 pub fn into_directed_iter<D: HummockIteratorDirection, const IS_NEW_VALUE: bool>(
465 self,
466 ) -> SharedBufferBatchIterator<D, IS_NEW_VALUE> {
467 SharedBufferBatchIterator::<D, IS_NEW_VALUE>::new(self.inner, self.table_id)
468 }
469
470 pub fn into_old_value_iter(self) -> SharedBufferBatchIterator<Forward, false> {
471 self.into_directed_iter()
472 }
473
474 pub fn into_forward_iter(self) -> SharedBufferBatchIterator<Forward> {
475 self.into_directed_iter()
476 }
477
478 pub fn into_backward_iter(self) -> SharedBufferBatchIterator<Backward> {
479 self.into_directed_iter()
480 }
481
482 #[inline(always)]
483 pub fn start_table_key(&self) -> TableKey<&[u8]> {
484 TableKey(self.inner.entries.first().expect("non-empty").key.as_ref())
485 }
486
487 #[inline(always)]
488 pub fn end_table_key(&self) -> TableKey<&[u8]> {
489 TableKey(self.inner.entries.last().expect("non-empty").key.as_ref())
490 }
491
492 #[inline(always)]
493 pub fn raw_largest_key(&self) -> &TableKey<Bytes> {
494 &self.inner.entries.last().expect("non-empty").key
495 }
496
497 pub fn start_user_key(&self) -> UserKey<&[u8]> {
500 UserKey::new(self.table_id, self.start_table_key())
501 }
502
503 pub fn size(&self) -> usize {
504 self.inner.size
505 }
506
507 pub(crate) fn old_values(&self) -> Option<&SharedBufferBatchOldValues> {
508 self.inner.old_values.as_ref()
509 }
510
511 pub fn batch_id(&self) -> SharedBufferBatchId {
512 self.inner.batch_id
513 }
514
515 pub fn epochs(&self) -> &Vec<HummockEpoch> {
516 &self.inner.epochs
517 }
518
519 pub(crate) fn build_shared_buffer_batch(
520 epoch: HummockEpoch,
521 spill_offset: u16,
522 sorted_items: Vec<SharedBufferItem>,
523 old_values: Option<SharedBufferBatchOldValues>,
524 size: usize,
525 table_id: TableId,
526 table_metrics: Arc<TableMemoryMetrics>,
527 ) -> Self {
528 let inner = SharedBufferBatchInner::new(
529 epoch,
530 spill_offset,
531 sorted_items,
532 old_values,
533 size,
534 table_metrics,
535 );
536 SharedBufferBatch {
537 inner: Arc::new(inner),
538 table_id,
539 }
540 }
541
542 #[cfg(any(test, feature = "test"))]
543 pub fn build_shared_buffer_batch_for_test(
544 epoch: HummockEpoch,
545 spill_offset: u16,
546 sorted_items: Vec<SharedBufferItem>,
547 size: usize,
548 table_id: TableId,
549 ) -> Self {
550 let inner = SharedBufferBatchInner::new(
551 epoch,
552 spill_offset,
553 sorted_items,
554 None,
555 size,
556 TableMemoryMetrics::for_test(),
557 );
558 SharedBufferBatch {
559 inner: Arc::new(inner),
560 table_id,
561 }
562 }
563}
564
565pub struct SharedBufferBatchIterator<D: HummockIteratorDirection, const IS_NEW_VALUE: bool = true> {
568 inner: Arc<SharedBufferBatchInner>,
569 current_entry_idx: usize,
571 current_value_idx: usize,
573 value_end_offset: usize,
575 table_id: TableId,
576 _phantom: PhantomData<D>,
577}
578
579impl<D: HummockIteratorDirection, const IS_NEW_VALUE: bool>
580 SharedBufferBatchIterator<D, IS_NEW_VALUE>
581{
582 pub(crate) fn new(inner: Arc<SharedBufferBatchInner>, table_id: TableId) -> Self {
583 if !IS_NEW_VALUE {
584 assert!(
585 inner.old_values.is_some(),
586 "create old value iter with no old value: {:?}",
587 table_id
588 );
589 }
590 Self {
591 inner,
592 current_entry_idx: 0,
593 current_value_idx: 0,
594 value_end_offset: 0,
595 table_id,
596 _phantom: Default::default(),
597 }
598 }
599
600 fn is_valid_entry_idx(&self) -> bool {
601 self.current_entry_idx < self.inner.entries.len()
602 }
603
604 fn invalidate(&mut self) {
605 self.current_entry_idx = self.inner.entries.len();
606 }
607
608 fn advance_to_next_entry(&mut self) {
609 debug_assert!(self.is_valid_entry_idx());
610 match D::direction() {
611 DirectionEnum::Forward => {
612 self.current_entry_idx += 1;
613 }
614 DirectionEnum::Backward => {
615 if self.current_entry_idx == 0 {
616 self.invalidate();
617 } else {
618 self.current_entry_idx -= 1;
619 }
620 }
621 }
622 }
623
624 fn reset_value_idx(&mut self) {
625 debug_assert!(self.is_valid_entry_idx());
626 self.current_value_idx = self.inner.entries[self.current_entry_idx].value_offset;
627 self.value_end_offset = self.get_value_end_offset();
628 }
629
630 fn get_value_end_offset(&self) -> usize {
631 debug_assert!(self.is_valid_entry_idx());
632 SharedBufferKeyEntry::value_end_offset(
633 self.current_entry_idx,
634 &self.inner.entries,
635 &self.inner.new_values,
636 )
637 }
638
639 fn assert_valid_idx(&self) {
640 debug_assert!(self.is_valid_entry_idx());
641 debug_assert!(
642 self.current_value_idx >= self.inner.entries[self.current_entry_idx].value_offset
643 );
644 debug_assert_eq!(self.value_end_offset, self.get_value_end_offset());
645 debug_assert!(self.current_value_idx < self.value_end_offset);
646 if !IS_NEW_VALUE {
647 debug_assert!(!matches!(
648 &self.inner.new_values[self.current_value_idx].1,
649 SharedBufferValue::Insert(_)
650 ));
651 }
652 }
653
654 fn advance_to_next_value(&mut self) {
655 self.assert_valid_idx();
656
657 if self.current_value_idx + 1 < self.value_end_offset {
658 self.current_value_idx += 1;
659 } else {
660 self.advance_to_next_entry();
661 if self.is_valid_entry_idx() {
662 self.reset_value_idx();
663 }
664 }
665 }
666
667 fn advance_until_valid_old_value(&mut self) {
668 debug_assert!(!IS_NEW_VALUE);
669 if !self.is_valid_entry_idx() {
670 return;
671 }
672 loop {
673 while self.current_value_idx < self.value_end_offset
674 && matches!(
675 &self.inner.new_values[self.current_value_idx].1,
676 SharedBufferValue::Insert(_)
677 )
678 {
679 self.current_value_idx += 1;
680 }
681 if self.current_value_idx >= self.value_end_offset {
682 debug_assert_eq!(self.current_value_idx, self.value_end_offset);
683 self.advance_to_next_entry();
684 if self.is_valid_entry_idx() {
685 self.reset_value_idx();
686 continue;
687 } else {
688 break;
689 }
690 } else {
691 break;
692 }
693 }
694 }
695}
696
697impl SharedBufferBatchIterator<Forward> {
698 pub(crate) fn advance_to_next_key(&mut self) {
699 self.advance_to_next_entry();
700 if self.is_valid_entry_idx() {
701 self.reset_value_idx();
702 }
703 }
704
705 pub(crate) fn current_key_entry(&self) -> SharedBufferVersionedEntryRef<'_> {
706 self.assert_valid_idx();
707 debug_assert_eq!(
708 self.current_value_idx,
709 self.inner.entries[self.current_entry_idx].value_offset
710 );
711 SharedBufferVersionedEntryRef {
712 key: &self.inner.entries[self.current_entry_idx].key,
713 new_values: &self.inner.new_values[self.current_value_idx..self.value_end_offset],
714 old_values: self.inner.old_values.as_ref().map(|old_values| {
715 &old_values.values[self.current_value_idx..self.value_end_offset]
716 }),
717 }
718 }
719}
720
721impl<D: HummockIteratorDirection, const IS_NEW_VALUE: bool> HummockIterator
722 for SharedBufferBatchIterator<D, IS_NEW_VALUE>
723{
724 type Direction = D;
725
726 async fn next(&mut self) -> HummockResult<()> {
727 self.advance_to_next_value();
728 if !IS_NEW_VALUE {
729 self.advance_until_valid_old_value();
730 }
731 Ok(())
732 }
733
734 fn key(&self) -> FullKey<&[u8]> {
735 self.assert_valid_idx();
736 let key = self.inner.entries[self.current_entry_idx].key.as_ref();
737 let epoch_with_gap = self.inner.new_values[self.current_value_idx].0;
738 FullKey::new_with_gap_epoch(self.table_id, TableKey(key), epoch_with_gap)
739 }
740
741 fn value(&self) -> HummockValue<&[u8]> {
742 self.assert_valid_idx();
743 if IS_NEW_VALUE {
744 self.inner.new_values[self.current_value_idx]
745 .1
746 .to_ref()
747 .to_slice()
748 .into()
749 } else {
750 HummockValue::put(
751 self.inner.old_values.as_ref().unwrap().values[self.current_value_idx].as_ref(),
752 )
753 }
754 }
755
756 fn is_valid(&self) -> bool {
757 self.is_valid_entry_idx()
758 }
759
760 async fn rewind(&mut self) -> HummockResult<()> {
761 match D::direction() {
762 DirectionEnum::Forward => {
763 self.current_entry_idx = 0;
764 }
765 DirectionEnum::Backward => {
766 self.current_entry_idx = self.inner.entries.len() - 1;
767 }
768 };
769 self.reset_value_idx();
770 if !IS_NEW_VALUE {
771 self.advance_until_valid_old_value();
772 }
773 Ok(())
774 }
775
776 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
777 match key.user_key.table_id.cmp(&self.table_id) {
778 Ordering::Less => {
779 match D::direction() {
780 DirectionEnum::Forward => {
781 self.rewind().await?;
783 return Ok(());
784 }
785 DirectionEnum::Backward => {
786 self.invalidate();
787 return Ok(());
788 }
789 };
790 }
791 Ordering::Greater => {
792 match D::direction() {
793 DirectionEnum::Forward => {
794 self.invalidate();
795 return Ok(());
796 }
797 DirectionEnum::Backward => {
798 self.rewind().await?;
800 return Ok(());
801 }
802 };
803 }
804 Ordering::Equal => (),
805 }
806 let partition_point = self
809 .inner
810 .entries
811 .binary_search_by(|probe| probe.key.as_ref().cmp(*key.user_key.table_key));
812 let seek_key_epoch = key.epoch_with_gap;
813 match partition_point {
814 Ok(i) => {
815 self.current_entry_idx = i;
816 self.reset_value_idx();
817 while self.current_value_idx < self.value_end_offset {
818 let epoch_with_gap = self.inner.new_values[self.current_value_idx].0;
819 if epoch_with_gap <= seek_key_epoch {
820 break;
821 }
822 self.current_value_idx += 1;
823 }
824 if self.current_value_idx == self.value_end_offset {
825 self.advance_to_next_entry();
826 if self.is_valid_entry_idx() {
827 self.reset_value_idx();
828 }
829 }
830 }
831 Err(i) => match D::direction() {
832 DirectionEnum::Forward => {
833 self.current_entry_idx = i;
834 if self.is_valid_entry_idx() {
835 self.reset_value_idx();
836 }
837 }
838 DirectionEnum::Backward => {
839 if i == 0 {
840 self.invalidate();
841 } else {
842 self.current_entry_idx = i - 1;
843 self.reset_value_idx();
844 }
845 }
846 },
847 };
848 if !IS_NEW_VALUE {
849 self.advance_until_valid_old_value();
850 }
851 Ok(())
852 }
853
854 fn collect_local_statistic(&self, _stats: &mut crate::monitor::StoreLocalStatistic) {}
855
856 fn value_meta(&self) -> ValueMeta {
857 ValueMeta::default()
858 }
859}
860
861#[cfg(test)]
862mod tests {
863 use std::ops::Bound::Excluded;
864
865 use itertools::{Itertools, zip_eq};
866 use risingwave_common::util::epoch::{EpochExt, test_epoch};
867 use risingwave_hummock_sdk::key::map_table_key_range;
868
869 use super::*;
870 use crate::hummock::compactor::merge_imms_in_memory;
871 use crate::hummock::iterator::test_utils::{
872 iterator_test_key_of_epoch, iterator_test_table_key_of, transform_shared_buffer,
873 };
874
875 fn to_hummock_value_batch(
876 items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)>,
877 ) -> Vec<(Vec<u8>, HummockValue<Bytes>)> {
878 items.into_iter().map(|(k, v)| (k, v.into())).collect()
879 }
880
881 #[tokio::test]
882 async fn test_shared_buffer_batch_basic() {
883 let epoch = test_epoch(1);
884 let shared_buffer_items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
885 (
886 iterator_test_table_key_of(0),
887 SharedBufferValue::Insert(Bytes::from("value1")),
888 ),
889 (
890 iterator_test_table_key_of(1),
891 SharedBufferValue::Insert(Bytes::from("value1")),
892 ),
893 (
894 iterator_test_table_key_of(2),
895 SharedBufferValue::Insert(Bytes::from("value1")),
896 ),
897 ];
898 let shared_buffer_batch = SharedBufferBatch::for_test(
899 transform_shared_buffer(shared_buffer_items.clone()),
900 epoch,
901 Default::default(),
902 );
903 let shared_buffer_items = to_hummock_value_batch(shared_buffer_items);
904
905 assert_eq!(
907 *shared_buffer_batch.start_table_key(),
908 shared_buffer_items[0].0
909 );
910 assert_eq!(
911 *shared_buffer_batch.end_table_key(),
912 shared_buffer_items[2].0
913 );
914
915 for (k, v) in &shared_buffer_items {
917 assert_eq!(
918 shared_buffer_batch
919 .get(TableKey(k.as_slice()), epoch, &ReadOptions::default())
920 .unwrap()
921 .0
922 .as_slice(),
923 v.as_slice()
924 );
925 }
926 assert_eq!(
927 shared_buffer_batch.get(
928 TableKey(iterator_test_table_key_of(3).as_slice()),
929 epoch,
930 &ReadOptions::default()
931 ),
932 None
933 );
934 assert_eq!(
935 shared_buffer_batch.get(
936 TableKey(iterator_test_table_key_of(4).as_slice()),
937 epoch,
938 &ReadOptions::default()
939 ),
940 None
941 );
942
943 let mut iter = shared_buffer_batch.clone().into_forward_iter();
945 iter.rewind().await.unwrap();
946 let mut output = vec![];
947 while iter.is_valid() {
948 output.push((
949 iter.key().user_key.table_key.to_vec(),
950 iter.value().to_bytes(),
951 ));
952 iter.next().await.unwrap();
953 }
954 assert_eq!(output, shared_buffer_items);
955
956 let mut backward_iter = shared_buffer_batch.clone().into_backward_iter();
958 backward_iter.rewind().await.unwrap();
959 let mut output = vec![];
960 while backward_iter.is_valid() {
961 output.push((
962 backward_iter.key().user_key.table_key.to_vec(),
963 backward_iter.value().to_bytes(),
964 ));
965 backward_iter.next().await.unwrap();
966 }
967 output.reverse();
968 assert_eq!(output, shared_buffer_items);
969 }
970
971 #[tokio::test]
972 async fn test_shared_buffer_batch_seek() {
973 let epoch = test_epoch(1);
974 let shared_buffer_items = vec![
975 (
976 iterator_test_table_key_of(1),
977 SharedBufferValue::Insert(Bytes::from("value1")),
978 ),
979 (
980 iterator_test_table_key_of(2),
981 SharedBufferValue::Insert(Bytes::from("value2")),
982 ),
983 (
984 iterator_test_table_key_of(3),
985 SharedBufferValue::Insert(Bytes::from("value3")),
986 ),
987 ];
988 let shared_buffer_batch = SharedBufferBatch::for_test(
989 transform_shared_buffer(shared_buffer_items.clone()),
990 epoch,
991 Default::default(),
992 );
993 let shared_buffer_items = to_hummock_value_batch(shared_buffer_items);
994
995 let mut iter = shared_buffer_batch.clone().into_forward_iter();
997 iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
998 .await
999 .unwrap();
1000 for item in &shared_buffer_items {
1001 assert!(iter.is_valid());
1002 assert_eq!(*iter.key().user_key.table_key, item.0);
1003 assert_eq!(iter.value(), item.1.as_slice());
1004 iter.next().await.unwrap();
1005 }
1006 assert!(!iter.is_valid());
1007
1008 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1010 iter.seek(iterator_test_key_of_epoch(4, epoch).to_ref())
1011 .await
1012 .unwrap();
1013 assert!(!iter.is_valid());
1014
1015 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1017 iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1018 .await
1019 .unwrap();
1020 for item in &shared_buffer_items[1..] {
1021 assert!(iter.is_valid());
1022 assert_eq!(*iter.key().user_key.table_key, item.0);
1023 assert_eq!(iter.value(), item.1.as_slice());
1024 iter.next().await.unwrap();
1025 }
1026 assert!(!iter.is_valid());
1027
1028 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1030 iter.seek(iterator_test_key_of_epoch(2, test_epoch(2)).to_ref())
1031 .await
1032 .unwrap();
1033 for item in &shared_buffer_items[1..] {
1034 assert!(iter.is_valid());
1035 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1036 assert_eq!(iter.value(), item.1.as_slice());
1037 iter.next().await.unwrap();
1038 }
1039 assert!(!iter.is_valid());
1040
1041 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1043 iter.seek(iterator_test_key_of_epoch(2, test_epoch(0)).to_ref())
1044 .await
1045 .unwrap();
1046 let item = shared_buffer_items.last().unwrap();
1047 assert!(iter.is_valid());
1048 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1049 assert_eq!(iter.value(), item.1.as_slice());
1050 iter.next().await.unwrap();
1051 assert!(!iter.is_valid());
1052
1053 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1055 iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1056 .await
1057 .unwrap();
1058 assert!(!iter.is_valid());
1059
1060 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1062 iter.seek(iterator_test_key_of_epoch(4, epoch).to_ref())
1063 .await
1064 .unwrap();
1065 for item in shared_buffer_items.iter().rev() {
1066 assert!(iter.is_valid());
1067 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1068 assert_eq!(iter.value(), item.1.as_slice());
1069 iter.next().await.unwrap();
1070 }
1071 assert!(!iter.is_valid());
1072
1073 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1075 iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1076 .await
1077 .unwrap();
1078 for item in shared_buffer_items[0..=1].iter().rev() {
1079 assert!(iter.is_valid());
1080 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1081 assert_eq!(iter.value(), item.1.as_slice());
1082 iter.next().await.unwrap();
1083 }
1084 assert!(!iter.is_valid());
1085
1086 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1088 iter.seek(iterator_test_key_of_epoch(2, epoch.prev_epoch()).to_ref())
1089 .await
1090 .unwrap();
1091 assert!(iter.is_valid());
1092 let item = shared_buffer_items.first().unwrap();
1093 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1094 assert_eq!(iter.value(), item.1.as_slice());
1095 iter.next().await.unwrap();
1096 assert!(!iter.is_valid());
1097
1098 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1100 iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1101 .await
1102 .unwrap();
1103 for item in shared_buffer_items[0..=1].iter().rev() {
1104 assert!(iter.is_valid());
1105 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1106 assert_eq!(iter.value(), item.1.as_slice());
1107 iter.next().await.unwrap();
1108 }
1109 assert!(!iter.is_valid());
1110 }
1111
1112 #[tokio::test]
1113 async fn test_shared_buffer_batch_old_value_iter() {
1114 let epoch = test_epoch(1);
1115 let key_values = vec![
1116 (
1117 iterator_test_table_key_of(1),
1118 SharedBufferValue::Insert(Bytes::from("value1")),
1119 ),
1120 (
1121 iterator_test_table_key_of(2),
1122 SharedBufferValue::Update(Bytes::from("value2")),
1123 ),
1124 (
1125 iterator_test_table_key_of(3),
1126 SharedBufferValue::Insert(Bytes::from("value3")),
1127 ),
1128 (iterator_test_table_key_of(4), SharedBufferValue::Delete),
1129 ];
1130 let old_values = vec![
1131 Bytes::new(),
1132 Bytes::from("old_value2"),
1133 Bytes::new(),
1134 Bytes::from("old_value4"),
1135 ];
1136 let shared_buffer_batch = SharedBufferBatch::for_test_with_old_values(
1137 transform_shared_buffer(key_values.clone()),
1138 old_values.clone(),
1139 epoch,
1140 Default::default(),
1141 );
1142 let shared_buffer_items = to_hummock_value_batch(key_values.clone());
1143 let expected_old_value_iter_items = zip_eq(&key_values, &old_values)
1144 .filter(|((_, new_value), _)| !matches!(new_value, SharedBufferValue::Insert(_)))
1145 .map(|((key, _), old_value)| (key.clone(), HummockValue::Put(old_value)))
1146 .collect_vec();
1147
1148 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1149 iter.rewind().await.unwrap();
1150 for item in &expected_old_value_iter_items {
1151 assert!(iter.is_valid());
1152 assert_eq!(*iter.key().user_key.table_key, item.0);
1153 assert_eq!(iter.value(), item.1.as_slice());
1154 iter.next().await.unwrap();
1155 }
1156 assert!(!iter.is_valid());
1157
1158 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1160 iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1161 .await
1162 .unwrap();
1163 for item in &shared_buffer_items {
1164 assert!(iter.is_valid());
1165 assert_eq!(*iter.key().user_key.table_key, item.0);
1166 assert_eq!(iter.value(), item.1.as_slice());
1167 iter.next().await.unwrap();
1168 }
1169 assert!(!iter.is_valid());
1170
1171 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1172 iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
1173 .await
1174 .unwrap();
1175 for item in &expected_old_value_iter_items {
1176 assert!(iter.is_valid());
1177 assert_eq!(*iter.key().user_key.table_key, item.0);
1178 assert_eq!(iter.value(), item.1.as_slice());
1179 iter.next().await.unwrap();
1180 }
1181 assert!(!iter.is_valid());
1182
1183 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1185 iter.seek(iterator_test_key_of_epoch(5, epoch).to_ref())
1186 .await
1187 .unwrap();
1188 assert!(!iter.is_valid());
1189
1190 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1191 iter.seek(iterator_test_key_of_epoch(5, epoch).to_ref())
1192 .await
1193 .unwrap();
1194 assert!(!iter.is_valid());
1195
1196 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1198 iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1199 .await
1200 .unwrap();
1201 for item in &shared_buffer_items[1..] {
1202 assert!(iter.is_valid());
1203 assert_eq!(*iter.key().user_key.table_key, item.0);
1204 assert_eq!(iter.value(), item.1.as_slice());
1205 iter.next().await.unwrap();
1206 }
1207 assert!(!iter.is_valid());
1208
1209 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1210 iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1211 .await
1212 .unwrap();
1213 for item in &expected_old_value_iter_items {
1214 assert!(iter.is_valid());
1215 assert_eq!(*iter.key().user_key.table_key, item.0);
1216 assert_eq!(iter.value(), item.1.as_slice());
1217 iter.next().await.unwrap();
1218 }
1219 assert!(!iter.is_valid());
1220
1221 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1223 iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1224 .await
1225 .unwrap();
1226 for item in &shared_buffer_items[1..] {
1227 assert!(iter.is_valid());
1228 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1229 assert_eq!(iter.value(), item.1.as_slice());
1230 iter.next().await.unwrap();
1231 }
1232 assert!(!iter.is_valid());
1233
1234 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1235 iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1236 .await
1237 .unwrap();
1238 for item in &expected_old_value_iter_items {
1239 assert!(iter.is_valid());
1240 assert_eq!(*iter.key().user_key.table_key, item.0);
1241 assert_eq!(iter.value(), item.1.as_slice());
1242 iter.next().await.unwrap();
1243 }
1244 assert!(!iter.is_valid());
1245
1246 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1247 iter.seek(iterator_test_key_of_epoch(2, epoch.prev_epoch()).to_ref())
1248 .await
1249 .unwrap();
1250 for item in &expected_old_value_iter_items[1..] {
1251 assert!(iter.is_valid());
1252 assert_eq!(*iter.key().user_key.table_key, item.0);
1253 assert_eq!(iter.value(), item.1.as_slice());
1254 iter.next().await.unwrap();
1255 }
1256 assert!(!iter.is_valid());
1257
1258 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1260 iter.seek(iterator_test_key_of_epoch(3, epoch.prev_epoch()).to_ref())
1261 .await
1262 .unwrap();
1263 let item = shared_buffer_items.last().unwrap();
1264 assert!(iter.is_valid());
1265 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1266 assert_eq!(iter.value(), item.1.as_slice());
1267 iter.next().await.unwrap();
1268 assert!(!iter.is_valid());
1269
1270 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1272 iter.seek(iterator_test_key_of_epoch(3, epoch).to_ref())
1273 .await
1274 .unwrap();
1275 for item in &expected_old_value_iter_items[1..] {
1276 assert!(iter.is_valid());
1277 assert_eq!(*iter.key().user_key.table_key, item.0);
1278 assert_eq!(iter.value(), item.1.as_slice());
1279 iter.next().await.unwrap();
1280 }
1281 assert!(!iter.is_valid());
1282 }
1283
1284 #[tokio::test]
1285 #[should_panic]
1286 async fn test_invalid_table_id() {
1287 let epoch = test_epoch(1);
1288 let shared_buffer_batch = SharedBufferBatch::for_test(vec![], epoch, Default::default());
1289 let mut iter = shared_buffer_batch.into_forward_iter();
1291 iter.seek(FullKey::for_test(TableId::new(1), vec![], epoch).to_ref())
1292 .await
1293 .unwrap();
1294 }
1295
1296 #[tokio::test]
1297 async fn test_shared_buffer_batch_range_existx() {
1298 let epoch = test_epoch(1);
1299 let shared_buffer_items = vec![
1300 (
1301 Vec::from("a_1"),
1302 SharedBufferValue::Insert(Bytes::from("value1")),
1303 ),
1304 (
1305 Vec::from("a_3"),
1306 SharedBufferValue::Insert(Bytes::from("value2")),
1307 ),
1308 (
1309 Vec::from("a_5"),
1310 SharedBufferValue::Insert(Bytes::from("value3")),
1311 ),
1312 (
1313 Vec::from("b_2"),
1314 SharedBufferValue::Insert(Bytes::from("value3")),
1315 ),
1316 ];
1317 let shared_buffer_batch = SharedBufferBatch::for_test(
1318 transform_shared_buffer(shared_buffer_items),
1319 epoch,
1320 Default::default(),
1321 );
1322
1323 let range = (Included(Bytes::from("a")), Excluded(Bytes::from("b")));
1324 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1325 let range = (Included(Bytes::from("a_")), Excluded(Bytes::from("b_")));
1326 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1327 let range = (Included(Bytes::from("a_1")), Included(Bytes::from("a_1")));
1328 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1329 let range = (Included(Bytes::from("a_1")), Included(Bytes::from("a_2")));
1330 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1331 let range = (Included(Bytes::from("a_0x")), Included(Bytes::from("a_2x")));
1332 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1333 let range = (Included(Bytes::from("a_")), Excluded(Bytes::from("c_")));
1334 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1335 let range = (Included(Bytes::from("b_0x")), Included(Bytes::from("b_2x")));
1336 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1337 let range = (Included(Bytes::from("b_2")), Excluded(Bytes::from("c_1x")));
1338 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1339
1340 let range = (Included(Bytes::from("a_0")), Excluded(Bytes::from("a_1")));
1341 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1342 let range = (Included(Bytes::from("a__0")), Excluded(Bytes::from("a__5")));
1343 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1344 let range = (Included(Bytes::from("b_1")), Excluded(Bytes::from("b_2")));
1345 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1346 let range = (Included(Bytes::from("b_3")), Excluded(Bytes::from("c_1")));
1347 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1348 let range = (Included(Bytes::from("b__x")), Excluded(Bytes::from("c__x")));
1349 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1350 }
1351
1352 #[tokio::test]
1353 async fn test_merge_imms_basic() {
1354 let table_id = TableId::new(1004);
1355 let shared_buffer_items1: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1356 (
1357 iterator_test_table_key_of(1),
1358 SharedBufferValue::Insert(Bytes::from("value1")),
1359 ),
1360 (
1361 iterator_test_table_key_of(2),
1362 SharedBufferValue::Insert(Bytes::from("value2")),
1363 ),
1364 (
1365 iterator_test_table_key_of(3),
1366 SharedBufferValue::Insert(Bytes::from("value3")),
1367 ),
1368 ];
1369 let epoch = test_epoch(1);
1370 let imm1 = SharedBufferBatch::for_test(
1371 transform_shared_buffer(shared_buffer_items1.clone()),
1372 epoch,
1373 table_id,
1374 );
1375 let shared_buffer_items1 = to_hummock_value_batch(shared_buffer_items1);
1376 let shared_buffer_items2: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1377 (
1378 iterator_test_table_key_of(1),
1379 SharedBufferValue::Insert(Bytes::from("value12")),
1380 ),
1381 (
1382 iterator_test_table_key_of(2),
1383 SharedBufferValue::Insert(Bytes::from("value22")),
1384 ),
1385 (
1386 iterator_test_table_key_of(3),
1387 SharedBufferValue::Insert(Bytes::from("value32")),
1388 ),
1389 ];
1390 let epoch = test_epoch(2);
1391 let imm2 = SharedBufferBatch::for_test(
1392 transform_shared_buffer(shared_buffer_items2.clone()),
1393 epoch,
1394 table_id,
1395 );
1396 let shared_buffer_items2 = to_hummock_value_batch(shared_buffer_items2);
1397
1398 let shared_buffer_items3: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1399 (
1400 iterator_test_table_key_of(1),
1401 SharedBufferValue::Insert(Bytes::from("value13")),
1402 ),
1403 (
1404 iterator_test_table_key_of(2),
1405 SharedBufferValue::Insert(Bytes::from("value23")),
1406 ),
1407 (
1408 iterator_test_table_key_of(3),
1409 SharedBufferValue::Insert(Bytes::from("value33")),
1410 ),
1411 ];
1412 let epoch = test_epoch(3);
1413 let imm3 = SharedBufferBatch::for_test(
1414 transform_shared_buffer(shared_buffer_items3.clone()),
1415 epoch,
1416 table_id,
1417 );
1418 let shared_buffer_items3 = to_hummock_value_batch(shared_buffer_items3);
1419
1420 let batch_items = [
1421 shared_buffer_items1,
1422 shared_buffer_items2,
1423 shared_buffer_items3,
1424 ];
1425 let imms = vec![imm3, imm2, imm1];
1427 let merged_imm = merge_imms_in_memory(table_id, imms.clone()).await;
1428
1429 for (i, items) in batch_items.iter().enumerate() {
1431 for (key, value) in items {
1432 assert_eq!(
1433 merged_imm
1434 .get(
1435 TableKey(key.as_slice()),
1436 test_epoch(i as u64 + 1),
1437 &ReadOptions::default()
1438 )
1439 .unwrap()
1440 .0
1441 .as_slice(),
1442 value.as_slice(),
1443 "epoch: {}, key: {:?}",
1444 test_epoch(i as u64 + 1),
1445 String::from_utf8(key.clone())
1446 );
1447 }
1448 }
1449 assert_eq!(
1450 merged_imm.get(
1451 TableKey(iterator_test_table_key_of(4).as_slice()),
1452 test_epoch(1),
1453 &ReadOptions::default()
1454 ),
1455 None
1456 );
1457 assert_eq!(
1458 merged_imm.get(
1459 TableKey(iterator_test_table_key_of(5).as_slice()),
1460 test_epoch(1),
1461 &ReadOptions::default()
1462 ),
1463 None
1464 );
1465
1466 for snapshot_epoch in 1..=3 {
1468 let mut iter = merged_imm.clone().into_forward_iter();
1469 iter.rewind().await.unwrap();
1470 let mut output = vec![];
1471 while iter.is_valid() {
1472 let epoch = iter.key().epoch_with_gap.pure_epoch();
1473 if test_epoch(snapshot_epoch) == epoch {
1474 output.push((
1475 iter.key().user_key.table_key.to_vec(),
1476 iter.value().to_bytes(),
1477 ));
1478 }
1479 iter.next().await.unwrap();
1480 }
1481 assert_eq!(output, batch_items[snapshot_epoch as usize - 1]);
1482 }
1483
1484 {
1486 let mut iter = merged_imm.clone().into_forward_iter();
1487 iter.rewind().await.unwrap();
1488 let mut output = vec![];
1489 while iter.is_valid() {
1490 output.push((
1491 iter.key().user_key.table_key.to_vec(),
1492 iter.value().to_bytes(),
1493 ));
1494 iter.next().await.unwrap();
1495 }
1496
1497 let mut expected = vec![];
1498 #[expect(clippy::needless_range_loop)]
1499 for key_idx in 0..=2 {
1500 for epoch in (1..=3).rev() {
1501 let item = batch_items[epoch - 1][key_idx].clone();
1502 expected.push(item);
1503 }
1504 }
1505 assert_eq!(expected, output);
1506
1507 let mut backward_iter = merged_imm.clone().into_backward_iter();
1508 backward_iter.rewind().await.unwrap();
1509 let mut output = vec![];
1510 while backward_iter.is_valid() {
1511 output.push((
1512 backward_iter.key().user_key.table_key.to_vec(),
1513 backward_iter.value().to_bytes(),
1514 ));
1515 backward_iter.next().await.unwrap();
1516 }
1517 let mut expected = vec![];
1518 for key_idx in (0..=2).rev() {
1519 for epoch in (1..=3).rev() {
1520 let item = batch_items[epoch - 1][key_idx].clone();
1521 expected.push(item);
1522 }
1523 }
1524 assert_eq!(expected, output);
1525 }
1526 }
1527
1528 #[tokio::test]
1529 async fn test_merge_imms_with_old_values() {
1530 let table_id = TableId::new(1004);
1531 let key_value1: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1532 (
1533 iterator_test_table_key_of(1),
1534 SharedBufferValue::Insert(Bytes::from("value1")),
1535 ),
1536 (
1537 iterator_test_table_key_of(2),
1538 SharedBufferValue::Update(Bytes::from("value2")),
1539 ),
1540 (iterator_test_table_key_of(3), SharedBufferValue::Delete),
1541 ];
1542 let old_value1 = vec![
1543 Bytes::new(),
1544 Bytes::from("old_value2"),
1545 Bytes::from("old_value3"),
1546 ];
1547 let epoch = test_epoch(1);
1548 let imm1 = SharedBufferBatch::for_test_with_old_values(
1549 transform_shared_buffer(key_value1.clone()),
1550 old_value1.clone(),
1551 epoch,
1552 table_id,
1553 );
1554 let shared_buffer_items1 = to_hummock_value_batch(key_value1.clone());
1555 let key_value2: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1556 (
1557 iterator_test_table_key_of(1),
1558 SharedBufferValue::Update(Bytes::from("value12")),
1559 ),
1560 (
1561 iterator_test_table_key_of(2),
1562 SharedBufferValue::Update(Bytes::from("value22")),
1563 ),
1564 (
1565 iterator_test_table_key_of(3),
1566 SharedBufferValue::Insert(Bytes::from("value32")),
1567 ),
1568 ];
1569 let old_value2 = vec![Bytes::from("value1"), Bytes::from("value2"), Bytes::new()];
1570 let epoch = epoch.next_epoch();
1571 let imm2 = SharedBufferBatch::for_test_with_old_values(
1572 transform_shared_buffer(key_value2.clone()),
1573 old_value2.clone(),
1574 epoch,
1575 table_id,
1576 );
1577 let shared_buffer_items2 = to_hummock_value_batch(key_value2.clone());
1578
1579 let key_value3: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
1580 (iterator_test_table_key_of(1), SharedBufferValue::Delete),
1581 (iterator_test_table_key_of(2), SharedBufferValue::Delete),
1582 (
1583 iterator_test_table_key_of(3),
1584 SharedBufferValue::Update(Bytes::from("value33")),
1585 ),
1586 ];
1587 let old_value3 = vec![
1588 Bytes::from("value12"),
1589 Bytes::from("value22"),
1590 Bytes::from("value32"),
1591 ];
1592 let epoch = epoch.next_epoch();
1593 let imm3 = SharedBufferBatch::for_test_with_old_values(
1594 transform_shared_buffer(key_value3.clone()),
1595 old_value3.clone(),
1596 epoch,
1597 table_id,
1598 );
1599 let shared_buffer_items3 = to_hummock_value_batch(key_value3.clone());
1600
1601 let key_values = [
1602 (key_value1, old_value1),
1603 (key_value2, old_value2),
1604 (key_value3, old_value3),
1605 ];
1606
1607 let batch_items = [
1608 shared_buffer_items1,
1609 shared_buffer_items2,
1610 shared_buffer_items3,
1611 ];
1612 let imms = vec![imm3, imm2, imm1];
1614 let merged_imm = merge_imms_in_memory(table_id, imms.clone()).await;
1615
1616 for (i, items) in batch_items.iter().enumerate() {
1618 for (key, value) in items {
1619 assert_eq!(
1620 merged_imm
1621 .get(
1622 TableKey(key.as_slice()),
1623 test_epoch(i as u64 + 1),
1624 &ReadOptions::default()
1625 )
1626 .unwrap()
1627 .0
1628 .as_slice(),
1629 value.as_slice(),
1630 "epoch: {}, key: {:?}",
1631 test_epoch(i as u64 + 1),
1632 String::from_utf8(key.clone())
1633 );
1634 }
1635 }
1636 assert_eq!(
1637 merged_imm.get(
1638 TableKey(iterator_test_table_key_of(4).as_slice()),
1639 test_epoch(1),
1640 &ReadOptions::default()
1641 ),
1642 None
1643 );
1644 assert_eq!(
1645 merged_imm.get(
1646 TableKey(iterator_test_table_key_of(5).as_slice()),
1647 test_epoch(1),
1648 &ReadOptions::default()
1649 ),
1650 None
1651 );
1652
1653 for i in 1..=3 {
1655 let snapshot_epoch = test_epoch(i);
1656 let mut iter = merged_imm.clone().into_forward_iter();
1657 iter.rewind().await.unwrap();
1658 let mut output = vec![];
1659 while iter.is_valid() {
1660 let epoch = iter.key().epoch_with_gap.pure_epoch();
1661 if snapshot_epoch == epoch {
1662 output.push((
1663 iter.key().user_key.table_key.to_vec(),
1664 iter.value().to_bytes(),
1665 ));
1666 }
1667 iter.next().await.unwrap();
1668 }
1669 assert_eq!(output, batch_items[i as usize - 1]);
1670 }
1671
1672 {
1674 let mut iter = merged_imm.clone().into_forward_iter();
1675 iter.rewind().await.unwrap();
1676 let mut output = vec![];
1677 while iter.is_valid() {
1678 output.push((
1679 iter.key().user_key.table_key.to_vec(),
1680 iter.value().to_bytes(),
1681 ));
1682 iter.next().await.unwrap();
1683 }
1684
1685 let mut expected = vec![];
1686 #[expect(clippy::needless_range_loop)]
1687 for key_idx in 0..=2 {
1688 for epoch in (1..=3).rev() {
1689 let item = batch_items[epoch - 1][key_idx].clone();
1690 expected.push(item);
1691 }
1692 }
1693 assert_eq!(expected, output);
1694
1695 let mut backward_iter = merged_imm.clone().into_backward_iter();
1696 backward_iter.rewind().await.unwrap();
1697 let mut output = vec![];
1698 while backward_iter.is_valid() {
1699 output.push((
1700 backward_iter.key().user_key.table_key.to_vec(),
1701 backward_iter.value().to_bytes(),
1702 ));
1703 backward_iter.next().await.unwrap();
1704 }
1705 let mut expected = vec![];
1706 for key_idx in (0..=2).rev() {
1707 for epoch in (1..=3).rev() {
1708 let item = batch_items[epoch - 1][key_idx].clone();
1709 expected.push(item);
1710 }
1711 }
1712 assert_eq!(expected, output);
1713 }
1714
1715 {
1717 let mut iter = merged_imm.clone().into_old_value_iter();
1718 iter.rewind().await.unwrap();
1719 let mut output = vec![];
1720 while iter.is_valid() {
1721 output.push((
1722 iter.key().user_key.table_key.to_vec(),
1723 iter.value().to_bytes(),
1724 ));
1725 iter.next().await.unwrap();
1726 }
1727
1728 let mut expected = vec![];
1729 for key_idx in 0..=2 {
1730 for epoch in (0..=2).rev() {
1731 let (key_values, old_values) = &key_values[epoch];
1732 let (key, new_value) = &key_values[key_idx];
1733 let old_value = &old_values[key_idx];
1734 if matches!(new_value, SharedBufferValue::Insert(_)) {
1735 continue;
1736 }
1737 expected.push((key.clone(), HummockValue::Put(old_value.clone())));
1738 }
1739 }
1740 assert_eq!(expected, output);
1741 }
1742 }
1743 #[tokio::test]
1744 async fn test_shared_buffer_batch_seek_bug() {
1745 let epoch = test_epoch(1);
1747 let table_id = TableId::new(100);
1748 let shared_buffer_items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![(
1749 iterator_test_table_key_of(1), SharedBufferValue::Insert(Bytes::from("value1")),
1751 )];
1752 let shared_buffer_batch = SharedBufferBatch::for_test(
1753 transform_shared_buffer(shared_buffer_items.clone()),
1754 epoch,
1755 table_id,
1756 );
1757
1758 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1765 let seek_key = FullKey::for_test(
1766 TableId::new(99),
1767 iterator_test_table_key_of(2), epoch,
1769 );
1770 iter.seek(seek_key.to_ref()).await.unwrap();
1771
1772 assert!(
1773 iter.is_valid(),
1774 "Iterator should be valid when seeking with smaller table_id, even if the key part is larger"
1775 );
1776 assert_eq!(iter.key().user_key.table_id, table_id);
1777
1778 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1785 let seek_key = FullKey::for_test(
1786 TableId::new(101),
1787 iterator_test_table_key_of(0), epoch,
1789 );
1790 iter.seek(seek_key.to_ref()).await.unwrap();
1791
1792 assert!(
1793 iter.is_valid(),
1794 "Iterator should be valid when seeking with larger table_id, even if the key part is smaller"
1795 );
1796 assert_eq!(iter.key().user_key.table_id, table_id);
1797 }
1798}